From: Ralph Ronnquist Date: Sun, 17 Oct 2021 05:39:05 +0000 (+1100) Subject: use per-socket reader threads X-Git-Tag: 1.4~1 X-Git-Url: https://git.rrq.au/?a=commitdiff_plain;h=9b24d4e275dbfa538b80ee6bcfe786f63c1c0904;p=rrq%2Frrqnet.git use per-socket reader threads --- diff --git a/rrqnet.c b/rrqnet.c index 90b246a..db93fd7 100644 --- a/rrqnet.c +++ b/rrqnet.c @@ -87,6 +87,10 @@ typedef struct _PacketItem { unsigned char buffer[ BUFSIZE ]; } PacketItem; +typedef struct _ReaderData { + int fd; +} ReaderData; + // heartbeat interval, in seconds #define HEARTBEAT 30 #define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 ) @@ -211,14 +215,6 @@ static pthread_mutex_t printing = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; #define VERBOSE3OUT(fmt, ...) \ if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) ) -// A buffer for reading stdin in fragmented way, to allow outgoing -// packets during the reading of stdin packets, if it's fragmented. -static struct { - unsigned char buffer[ BUFSIZE ]; // Packet data - unsigned int end; // Packet size - unsigned int cur; // Amount read so far -} input; - // The actual name of this program (argv[0]) static unsigned char *progname; @@ -1168,6 +1164,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { static struct { Queue full; Queue free; + sem_t reading; } todolist; // The threadcontrol program for handling packets. @@ -1200,6 +1197,10 @@ void todolist_initialize(int nbuf,int nthr) { perror( "FATAL" ); exit( 1 ); } + if ( sem_init( &todolist.reading, 0, 1 ) ) { + perror( "FATAL" ); + exit( 1 ); + } Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) ); for ( ; nthr > 0; nthr-- ) { pthread_t thread; // Temporary thread id @@ -1213,45 +1214,29 @@ void todolist_initialize(int nbuf,int nthr) { // is updated for the new MAC address, However, if there is then a MAC // address clash in the connection table, then the associated remote // is removed, and the packet is dropped. -static void doreadUDP(int fd) { - PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free ); - socklen_t addrlen = - udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 ); - memset( &todo->src, 0, sizeof( todo->src ) ); - todo->fd = fd; - todo->len = recvfrom( - fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen ); - if ( todo->len == -1) { - perror( "Receiving UDP" ); - exit( 1 ); - } +static void *doreadUDP(void *data) { + int fd = ((ReaderData *) data)->fd; + while ( 1 ) { + PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free ); + socklen_t addrlen = + udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 ); + memset( &todo->src, 0, sizeof( todo->src ) ); + todo->fd = fd; + todo->len = recvfrom( + fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen ); + if ( todo->len == -1) { + perror( "Receiving UDP" ); + exit( 1 ); + } #ifdef GPROF - if ( todo->len == 17 && - memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) { - exit( 0 ); - } -#endif - Queue_addItem( &todolist.full, (QueueItem*) todo ); -} - -// Handle packet received on the tap/stdio channel -static void received_tap(unsigned char *buf, int len) { - static struct Remote *tap_remote = 0; - if ( tap_remote == 0 ) { - Remote_LOCK; - if ( tap_remote == 0 ) { - tap_remote = add_remote( 0, 0 ); + if ( todo->len == 17 && + memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) { + exit( 0 ); } - Remote_UNLOCK; +#endif + Queue_addItem( &todolist.full, (QueueItem*) todo ); } - PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free ); - memcpy( (void*)todo->buffer, (void*)buf, len ); - memcpy( (void*)&todo->src, - (void*)&tap_remote->uaddr, - sizeof( struct SockAddr ) ); - todo->fd = 0; - todo->len = len; - Queue_addItem( &todolist.full, (QueueItem*) todo ); + return 0; } // Read up to n bytes from the given file descriptor into the buffer @@ -1312,17 +1297,6 @@ static void heartbeat(int fd) { Remote_UNLOCK; } -// The threadcontrol program for issuing heartbeat packets. -// Regular heartbeat -static void *hearbeater_thread(void *data) { - (void) data; - for ( ;; ) { - sleep( HEARTBEAT ); - heartbeat( udp_fd ); - } - return 0; -} - // Tell how to use this program and exit with failure. static void usage(void) { fprintf( stderr, "Packet tunneling over UDP, multiple channels, " ); @@ -1356,102 +1330,60 @@ static int tun_alloc(char *dev, int flags) { return fd; } -static void doreadTap() { - if ( stdio ) { - if ( input.end == 0 ) { +// Handle packet received on the tap/stdio channel +static void initialize_tap() { + // Ensure there is a Remote for this + static struct Remote *tap_remote = 0; + if ( tap_remote == 0 ) { + Remote_LOCK; + if ( tap_remote == 0 ) { + tap_remote = add_remote( 0, 0 ); + } + Remote_UNLOCK; + } +} + +// Thread to handle tap/stdio input +static void *doreadTap(void *data) { + int fd = ((ReaderData*) data)->fd; + unsigned int end = 0; // Packet size + unsigned int cur = 0; // Amount read so far + size_t e; + PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free ); + while ( 1 ) { + if ( stdio ) { uint16_t plength; int n = read_into( 0, (unsigned char *) &plength, - sizeof( plength ), 0 ); + sizeof( plength ), 0 ); if ( n == 0 ) { // Tap/stdio closed => exit silently exit( 0 ); } - input.end = ntohs( plength ); - input.cur = 0; - } - size_t e = input.end - input.cur; - unsigned char *p = input.buffer + input.cur; - if ( input.end > BUFSIZE ) { - // Oversize packets should be read and discarded - if ( e > BUFSIZE ) { - e = BUFSIZE; - } - p = input.buffer; - } - input.cur += read_into( 0, p, e, 1 ); - } else { - input.end = doread( tap_fd, input.buffer, BUFSIZE ); - input.cur = input.end; - } - if ( input.end == input.cur ) { - VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end ); - if ( input.end <= BUFSIZE ) { - received_tap( input.buffer, input.end ); - } - input.end = 0; // Ready for next packet - } - // End handling tap -} - -// MAXFD is Finalized before multi-threading, and then remains constant. -static int MAXFD; - -// This is the main packet handling loop -static int packet_loop() { - // A packet buffer for receiving UDP - unsigned char buffer[ BUFSIZE ]; - int n; - //time_t next_heartbeat = time(0); - int cycle = 0; // which channel to check first - while( 1 ) { - fd_set rd_set; - FD_ZERO( &rd_set ); - if ( mcast.fd ) { - FD_SET( mcast.fd, &rd_set ); - } - FD_SET( udp_fd, &rd_set ); - if ( udp6 ) { - FD_SET( udp_fd, &rd_set ); - } - if ( tap ) { // tap/stdio - FD_SET( tap_fd, &rd_set ); - } - n = select( MAXFD, &rd_set, NULL, NULL, NULL ); - VERBOSE3OUT( "select got %d\n", n ); - if ( n < 0 ) { - if ( errno == EINTR ) { - continue; + end = ntohs( plength ); + cur = 0; + while ( ( e = ( end - cur ) ) != 0 ) { + unsigned char *p = todo->buffer + cur; + if ( end > BUFSIZE ) { + // Oversize packets should be read and discarded + if ( e > BUFSIZE ) { + e = BUFSIZE; + } + p = todo->buffer; + } + cur += read_into( 0, p, e, 1 ); } - perror("select"); - exit(1); + } else { + end = doread( fd, todo->buffer, BUFSIZE ); + cur = end; } - if ( n == 0 ) { - continue; - } - // Process input with alternating priority across channels - for ( ;; cycle++ ) { - if ( cycle >= 3 ) { - cycle = 0; - } - if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) { - // Check and process UDP socket - doreadUDP( udp_fd ) ; - cycle = 1; - break; - } - if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) { - // Check and process multicast socket - doreadUDP( mcast.fd ) ; - cycle = 2; - break; - } - if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) { - // Check and process tap/stdio socket - doreadTap( tap_fd, buffer ); - cycle = 0; - break; - } + VERBOSE3OUT( "TAP/stdio input %d bytes\n", end ); + if ( end <= BUFSIZE ) { + todo->fd = 0; + todo->len = end; + Queue_addItem( &todolist.full, (QueueItem*) todo ); + todo = (PacketItem*) Queue_getItem( &todolist.free ); } + // End handling tap } return 0; } @@ -1464,6 +1396,7 @@ static int packet_loop() { // remote = [ipv6(/maskwidth)](:port)(=key) // ip = ipv4 | [ipv6] int main(int argc, char *argv[]) { + pthread_t thread; // Temporary thread id int port, i; progname = (unsigned char *) argv[0]; ///// Parse command line arguments @@ -1559,7 +1492,6 @@ int main(int argc, char *argv[]) { } todolist_initialize( buffers_count, threads_count ); - MAXFD = 0; // Set up the tap/stdio channel if ( tap ) { // set up the nominated tap @@ -1570,15 +1502,13 @@ int main(int argc, char *argv[]) { exit(1); } VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd ); - MAXFD = tap_fd; stdio = 0; // pretend a zero packet on the tap, for initializing. - received_tap( 0, 0 ); + initialize_tap(); } else { // set up for stdin/stdout local traffix setbuf( stdout, NULL ); // No buffering on stdout. tap_fd = 0; // actually stdin - MAXFD = 0; stdio = 1; } } else { @@ -1591,9 +1521,6 @@ int main(int argc, char *argv[]) { VERBOSEOUT( "Using multicast %s:%d at %d\n", inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ), mcast.fd ); - if ( mcast.fd > MAXFD ) { - MAXFD = mcast.fd; - } } // Set up the unicast UPD channel (all interfaces) if ( udp6 == 0 ) { @@ -1629,10 +1556,6 @@ int main(int argc, char *argv[]) { } VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd ); } - if ( udp_fd > MAXFD ) { - MAXFD = udp_fd; - } - MAXFD++ ; // If not using stdio for local traffic, then stdin and stdout are // closed here, so as to avoid that any other traffic channel gets // 0 or 1 as its file descriptor. Note: stderr (2) is left open. @@ -1640,13 +1563,27 @@ int main(int argc, char *argv[]) { close( 0 ); close( 1 ); } - VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n", - tap_fd, mcast.fd, udp_fd, MAXFD ); - - // Start heartbeater thread - pthread_t thread; // Temporary thread id -- not used - pthread_create( &thread, 0, hearbeater_thread, 0 ); + VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n", + tap_fd, mcast.fd, udp_fd ); // Handle packets - return packet_loop(); + ReaderData udp_reader = { .fd = udp_fd }; + pthread_create( &thread, 0, doreadUDP, &udp_reader ); + + if ( mcast.group.imr_multiaddr.s_addr ) { + ReaderData mcast_reader = { .fd = mcast.fd }; + pthread_create( &thread, 0, doreadUDP, &mcast_reader ); + } + + if ( tap_fd || stdio ) { + ReaderData tap_reader = { .fd = tap_fd }; + pthread_create( &thread, 0, doreadTap, &tap_reader ); + } + + // Start heartbeating to uplinks + for ( ;; ) { + sleep( HEARTBEAT ); + heartbeat( udp_fd ); + } + return 0; }