use per-socket reader threads
authorRalph Ronnquist <ralph.ronnquist@gmail.com>
Sun, 17 Oct 2021 05:39:05 +0000 (16:39 +1100)
committerRalph Ronnquist <ralph.ronnquist@gmail.com>
Sun, 17 Oct 2021 05:39:05 +0000 (16:39 +1100)
rrqnet.c

index 90b246a3aba15055ae6cbcf83af0faf16affc179..db93fd70ec50e19bd708215d45e275622a8c06a3 100644 (file)
--- a/rrqnet.c
+++ b/rrqnet.c
@@ -87,6 +87,10 @@ typedef struct _PacketItem {
     unsigned char buffer[ BUFSIZE ];
 } PacketItem;
 
+typedef struct _ReaderData {
+    int fd;
+} ReaderData;
+
 // heartbeat interval, in seconds
 #define HEARTBEAT 30
 #define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 )
@@ -211,14 +215,6 @@ static pthread_mutex_t printing = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 #define VERBOSE3OUT(fmt, ...) \
     if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) )
 
-// A buffer for reading stdin in fragmented way, to allow outgoing
-// packets during the reading of stdin packets, if it's fragmented.
-static struct {
-    unsigned char buffer[ BUFSIZE ]; // Packet data
-    unsigned int end;               // Packet size
-    unsigned int cur;               // Amount read so far
-} input;
-
 // The actual name of this program (argv[0])
 static unsigned char *progname;
 
@@ -1168,6 +1164,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) {
 static struct {
     Queue full;
     Queue free;
+    sem_t reading;
 } todolist;
 
 // The threadcontrol program for handling packets.
@@ -1200,6 +1197,10 @@ void todolist_initialize(int nbuf,int nthr) {
        perror( "FATAL" );
        exit( 1 );
     }
+    if ( sem_init( &todolist.reading, 0, 1 ) ) {
+       perror( "FATAL" );
+       exit( 1 );
+    }
     Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) );
     for ( ; nthr > 0; nthr-- ) {
        pthread_t thread; // Temporary thread id
@@ -1213,45 +1214,29 @@ void todolist_initialize(int nbuf,int nthr) {
 // is updated for the new MAC address, However, if there is then a MAC
 // address clash in the connection table, then the associated remote
 // is removed, and the packet is dropped.
-static void doreadUDP(int fd) {
-    PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
-    socklen_t addrlen =
-       udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
-    memset( &todo->src, 0, sizeof( todo->src ) );
-    todo->fd = fd;
-    todo->len = recvfrom(
-       fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
-    if ( todo->len == -1) {
-       perror( "Receiving UDP" );
-       exit( 1 );
-    }
+static void *doreadUDP(void *data) {
+    int fd = ((ReaderData *) data)->fd;
+    while ( 1 ) {
+       PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
+       socklen_t addrlen =
+           udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
+       memset( &todo->src, 0, sizeof( todo->src ) );
+       todo->fd = fd;
+       todo->len = recvfrom(
+           fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
+       if ( todo->len == -1) {
+           perror( "Receiving UDP" );
+           exit( 1 );
+       }
 #ifdef GPROF
-    if ( todo->len == 17 &&
-        memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
-       exit( 0 );
-    }
-#endif
-    Queue_addItem( &todolist.full, (QueueItem*) todo );
-}
-
-// Handle packet received on the tap/stdio channel
-static void received_tap(unsigned char *buf, int len) {
-    static struct Remote *tap_remote = 0;
-    if ( tap_remote == 0 ) {
-       Remote_LOCK;
-       if ( tap_remote == 0 ) {
-           tap_remote = add_remote( 0, 0 );
+       if ( todo->len == 17 &&
+            memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
+           exit( 0 );
        }
-       Remote_UNLOCK;
+#endif
+       Queue_addItem( &todolist.full, (QueueItem*) todo );
     }
-    PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
-    memcpy( (void*)todo->buffer, (void*)buf, len );
-    memcpy( (void*)&todo->src,
-           (void*)&tap_remote->uaddr,
-           sizeof( struct SockAddr ) );
-    todo->fd = 0;
-    todo->len = len;
-    Queue_addItem( &todolist.full, (QueueItem*) todo );
+    return 0;
 }
 
 // Read up to n bytes from the given file descriptor into the buffer
@@ -1312,17 +1297,6 @@ static void heartbeat(int fd) {
     Remote_UNLOCK;
 }
 
-// The threadcontrol program for issuing heartbeat packets.
-// Regular heartbeat
-static void *hearbeater_thread(void *data) {
-    (void) data;
-    for ( ;; ) {
-       sleep( HEARTBEAT );
-       heartbeat( udp_fd );
-    }
-    return 0;
-}
-
 // Tell how to use this program and exit with failure.
 static void usage(void) {
     fprintf( stderr, "Packet tunneling over UDP, multiple channels, " );
@@ -1356,102 +1330,60 @@ static int tun_alloc(char *dev, int flags) {
     return fd;
 }
 
-static void doreadTap() {
-    if ( stdio ) {
-       if ( input.end == 0 ) {
+// Handle packet received on the tap/stdio channel
+static void initialize_tap() {
+    // Ensure there is a Remote for this
+    static struct Remote *tap_remote = 0;
+    if ( tap_remote == 0 ) {
+       Remote_LOCK;
+       if ( tap_remote == 0 ) {
+           tap_remote = add_remote( 0, 0 );
+       }
+       Remote_UNLOCK;
+    }
+}
+
+// Thread to handle tap/stdio input
+static void *doreadTap(void *data) {
+    int fd = ((ReaderData*) data)->fd;
+    unsigned int end = 0; // Packet size
+    unsigned int cur = 0; // Amount read so far
+    size_t e;
+    PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
+    while ( 1 ) {
+       if ( stdio ) {
            uint16_t plength;
            int n = read_into( 0, (unsigned char *) &plength,
-                          sizeof( plength ), 0 );
+                              sizeof( plength ), 0 );
            if ( n == 0 ) {
                // Tap/stdio closed => exit silently
                exit( 0 );
            }
-           input.end = ntohs( plength );
-           input.cur = 0;
-       }
-       size_t e = input.end - input.cur;
-       unsigned char *p = input.buffer + input.cur;
-       if ( input.end > BUFSIZE ) {
-           // Oversize packets should be read and discarded
-           if ( e > BUFSIZE ) {
-               e = BUFSIZE;
-           }
-           p = input.buffer;
-       }
-       input.cur += read_into( 0, p, e, 1 );
-    } else {
-       input.end = doread( tap_fd, input.buffer, BUFSIZE );
-       input.cur = input.end;
-    }
-    if ( input.end == input.cur ) {
-       VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end );
-       if ( input.end <= BUFSIZE ) {
-           received_tap( input.buffer, input.end );
-       }
-       input.end = 0; // Ready for next packet
-    }
-    // End handling tap
-}
-
-// MAXFD is Finalized before multi-threading, and then remains constant.
-static int MAXFD;
-
-// This is the main packet handling loop
-static int packet_loop() {
-    // A packet buffer for receiving UDP
-    unsigned char buffer[ BUFSIZE ];
-    int n;
-    //time_t next_heartbeat = time(0);
-    int cycle = 0; // which channel to check first
-    while( 1 ) {
-       fd_set rd_set;
-       FD_ZERO( &rd_set );
-       if ( mcast.fd ) {
-           FD_SET( mcast.fd, &rd_set );
-       }
-       FD_SET( udp_fd, &rd_set );
-       if ( udp6 ) {
-           FD_SET( udp_fd, &rd_set );
-       }
-       if ( tap ) { // tap/stdio
-           FD_SET( tap_fd, &rd_set );
-       }
-       n = select( MAXFD, &rd_set, NULL, NULL, NULL );
-       VERBOSE3OUT( "select got %d\n", n );
-       if ( n < 0 ) {
-           if ( errno == EINTR ) {
-               continue;
+           end = ntohs( plength );
+           cur = 0;
+           while ( ( e = ( end - cur ) ) != 0 ) {
+               unsigned char *p = todo->buffer + cur;
+               if ( end > BUFSIZE ) {
+                   // Oversize packets should be read and discarded
+                   if ( e > BUFSIZE ) {
+                       e = BUFSIZE;
+                   }
+                   p = todo->buffer;
+               }
+               cur += read_into( 0, p, e, 1 );
            }
-           perror("select");
-           exit(1);
+       } else {
+           end = doread( fd, todo->buffer, BUFSIZE );
+           cur = end;
        }
-       if ( n == 0 ) {
-           continue;
-       }
-       // Process input with alternating priority across channels
-       for ( ;; cycle++ ) {
-           if ( cycle >= 3 ) {
-               cycle = 0;
-           }
-           if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) {
-               // Check and process UDP socket
-               doreadUDP( udp_fd ) ;
-               cycle = 1;
-               break;
-           }
-           if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) {
-               // Check and process multicast socket
-               doreadUDP( mcast.fd ) ;
-               cycle = 2;
-               break;
-           }
-           if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) {
-               // Check and process tap/stdio socket
-               doreadTap( tap_fd, buffer );
-               cycle = 0;
-               break;
-           }
+       VERBOSE3OUT( "TAP/stdio input %d bytes\n", end );
+       if ( end <= BUFSIZE ) {
+           todo->fd = 0;
+           todo->len = end;
+           Queue_addItem( &todolist.full, (QueueItem*) todo );
+           todo = (PacketItem*) Queue_getItem( &todolist.free );
        }
+       // End handling tap
     }
     return 0;
 }
@@ -1464,6 +1396,7 @@ static int packet_loop() {
 // remote = [ipv6(/maskwidth)](:port)(=key)
 // ip = ipv4 | [ipv6]
 int main(int argc, char *argv[]) {
+    pthread_t thread; // Temporary thread id
     int port, i;
     progname = (unsigned char *) argv[0];
     ///// Parse command line arguments
@@ -1559,7 +1492,6 @@ int main(int argc, char *argv[]) {
     }
     todolist_initialize( buffers_count, threads_count );
 
-    MAXFD = 0;
     // Set up the tap/stdio channel
     if ( tap ) {
        // set up the nominated tap
@@ -1570,15 +1502,13 @@ int main(int argc, char *argv[]) {
                exit(1);
            }
            VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd );
-           MAXFD = tap_fd;
            stdio = 0;
            // pretend a zero packet on the tap, for initializing.
-           received_tap( 0, 0 ); 
+           initialize_tap(); 
        } else {
            // set up for stdin/stdout local traffix
            setbuf( stdout, NULL ); // No buffering on stdout.
            tap_fd = 0; // actually stdin
-           MAXFD = 0;
            stdio = 1;
        }
     } else {
@@ -1591,9 +1521,6 @@ int main(int argc, char *argv[]) {
         VERBOSEOUT( "Using multicast %s:%d at %d\n",
                    inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ),
                    mcast.fd );
-       if ( mcast.fd > MAXFD ) {
-           MAXFD = mcast.fd;
-       }
     }
     // Set up the unicast UPD channel (all interfaces)
     if ( udp6 == 0 ) {
@@ -1629,10 +1556,6 @@ int main(int argc, char *argv[]) {
        }
        VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd );
     }
-    if ( udp_fd > MAXFD ) {
-       MAXFD = udp_fd;
-    }
-    MAXFD++ ;
     // If not using stdio for local traffic, then stdin and stdout are
     // closed here, so as to avoid that any other traffic channel gets
     // 0 or 1 as its file descriptor. Note: stderr (2) is left open.
@@ -1640,13 +1563,27 @@ int main(int argc, char *argv[]) {
        close( 0 );
        close( 1 );
     }
-    VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n",
-                tap_fd, mcast.fd, udp_fd, MAXFD );
-
-    // Start heartbeater thread
-    pthread_t thread; // Temporary thread id -- not used
-    pthread_create( &thread, 0, hearbeater_thread, 0 );
+    VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n",
+                tap_fd, mcast.fd, udp_fd );
 
     // Handle packets
-    return packet_loop();
+    ReaderData udp_reader = { .fd = udp_fd };
+    pthread_create( &thread, 0, doreadUDP, &udp_reader );
+
+    if ( mcast.group.imr_multiaddr.s_addr ) {
+       ReaderData mcast_reader = { .fd = mcast.fd };
+       pthread_create( &thread, 0, doreadUDP, &mcast_reader );
+    }
+
+    if ( tap_fd || stdio ) {
+       ReaderData tap_reader = { .fd = tap_fd };
+       pthread_create( &thread, 0, doreadTap, &tap_reader );
+    }
+
+    // Start heartbeating to uplinks
+    for ( ;; ) {
+       sleep( HEARTBEAT );
+       heartbeat( udp_fd );
+    }
+    return 0;
 }