X-Git-Url: https://git.rrq.au/?a=blobdiff_plain;f=rrqnet.c;h=efd7066b457cb2438d96f1246cc553ad152cc3ed;hb=refs%2Ftags%2F1.5.3;hp=ba5bb584dbf7dfd5d1560791847d57d294e448ed;hpb=c593cf1bd8dc2bfe6c0e6958757f30944ea3ab50;p=rrq%2Frrqnet.git diff --git a/rrqnet.c b/rrqnet.c index ba5bb58..efd7066 100644 --- a/rrqnet.c +++ b/rrqnet.c @@ -24,7 +24,7 @@ #include #include #include -#include +#include #include #include #include @@ -66,13 +66,13 @@ struct Allowed { struct Remote { struct SockAddr uaddr; struct Allowed *spec; // Rule being instantiated - struct timeb rec_when; // Last received packet time, in seconds + struct timeval rec_when; // Last received packet time, in seconds }; // Details of an interface at a remote. struct Interface { unsigned char mac[6]; // MAC address used last (key for by_mac table) - struct timeb rec_when; // Last packet time, in seconds + struct timeval rec_when; // Last packet time, in seconds struct Remote *remote; }; @@ -87,23 +87,26 @@ typedef struct _PacketItem { unsigned char buffer[ BUFSIZE ]; } PacketItem; +typedef struct _ReaderData { + int fd; +} ReaderData; + // heartbeat interval, in seconds #define HEARTBEAT 30 -#define HEARTBEAT_MILLIS ( HEARTBEAT * 1000 ) +#define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 ) -// Macros for timing, for struct timeb variables -#define TIMEB_MILLIS(TM) (((int64_t) (TM)->time * 1000) + (TM)->millitm ) -#define DIFFB_MILLIS(TM1,TM2) ( TIMEB_MILLIS(TM1) - TIMEB_MILLIS(TM2) ) +// Macros for timing, for struct timeval variables +#define TIME_MICROS(TM) (((int64_t) (TM)->tv_sec * 1000000) + (TM)->tv_usec ) +#define DIFF_MICROS(TM1,TM2) ( TIME_MICROS(TM1) - TIME_MICROS(TM2) ) -// RECENT(T,M) is the time logic for requiring a gap time (in +// RECENT_MICROS(T,M) is the time logic for requiring a gap time (in // milliseconds) before shifting a MAC to a new remote. The limit is -// 6000 for broadcast and 20000 for unicast. -#define RECENT(T,M) ((M) < ((T)? 6000 : 20000 )) +// 6s for broadcast and 20s for unicast. +#define RECENT_MICROS(T,M) ((M) < ((T)? 6000000 : 20000000 )) -// VERYOLD_MILLIS is used for discarding downlink remotes whose latest +// VERYOLD_MICROSS is used for discarding downlink remotes whose latest // activity is older than this. -#define VERYOLD_MILLIS 180000 - +#define VERYOLD_MICROS 180000000 ////////// Variables @@ -185,6 +188,10 @@ static struct { // Flag to signal the UDP socket as being ipv6 or not (forced ipv4) static int udp6 = 1; +// Flag to indicate tpg transport patch = avoid UDP payload of 1470 +// bytes by adding 2 tag-along bytes +static int tpg_quirk = 0; + // Flag whether to make some stderr outputs or not. // 1 = normal verbosity, 2 = more output, 3 = source debug level stuff static int verbose; @@ -212,14 +219,6 @@ static pthread_mutex_t printing = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; #define VERBOSE3OUT(fmt, ...) \ if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) ) -// A buffer for reading stdin in fragmented way, to allow outgoing -// packets during the reading of stdin packets, if it's fragmented. -static struct { - unsigned char buffer[ BUFSIZE ]; // Packet data - unsigned int end; // Packet size - unsigned int cur; // Amount read so far -} input; - // The actual name of this program (argv[0]) static unsigned char *progname; @@ -271,7 +270,7 @@ static char *inet_nmtoa(unsigned char *b,int w) { b[8], b[9], b[10], b[11], b[12], b[13], b[14], b[15] ); } else { - VERBOSE3OUT( "HEX data of %d bytes\n", i ); + VERBOSE3OUT( "HEX data of %d bytes\n", w ); for ( ; i < w && i < 19000; i++, p += 3 ) { sprintf( p, "%02x:", b[i] ); } @@ -575,6 +574,7 @@ static int parse_bits(char *bits,int max,struct Allowed *into) { // Formats: [/][:][=keyfile] // Formats: [/][=keyfile] // Formats: \[[/]\][:][=keyfile] +// Formats: hostname:port[=keyfile] static int parse_allowed(char *arg,struct Allowed *into) { static char buffer[10000]; int n = strlen( arg ); @@ -882,11 +882,16 @@ static void write_remote(unsigned char *buf, int n,struct Remote *r) { if ( n < 12 ) { VERBOSE2OUT( "SEND %d bytes to %s\n", n, inet_stoa( &r->uaddr ) ); } else { - VERBOSE2OUT( "SEND %s -> %s to %s\n", + VERBOSE2OUT( "SEND %d bytes %s -> %s to %s\n", n, inet_mtoa( buf+6 ), inet_mtoa( buf ), inet_stoa( &r->uaddr ) ); } memcpy( output, buf, n ); // Use the private buffer for delivery + // Apply the TPG quirk + if ( tpg_quirk && ( n > 1460 ) && ( n < 1478 ) ) { + VERBOSE2OUT( "tpg quirk applied\n" ); + n = 1478; // Add some "random" tag-along bytes + } if ( r->spec == 0 ) { if ( r->uaddr.in.sa_family == 0 ) { // Output to tap/stdio @@ -979,10 +984,10 @@ static struct Interface *input_check( { VERBOSE2OUT( "RECV %ld bytes from %s\n", len, inet_stoa( src ) ); struct Remote *r = 0; - struct timeb now = { 0 }; - if ( ftime( &now ) ) { + struct timeval now = { 0 }; + if ( gettimeofday( &now, 0 ) ) { perror( "RECV time" ); - now.time = time( 0 ); + now.tv_sec = time( 0 ); } Remote_FIND( src, r ); if ( r == 0 ) { @@ -995,9 +1000,13 @@ static struct Interface *input_check( r = add_remote( src, a ); //r->rec_when = now; // Set activity stamp of new remote } - if ( len <= 12 ) { + if ( len < 12 ) { // Ignore short data, but maintain channel r->rec_when = now; // Update activity stamp touched remote + if ( len > 0 ) { + VERBOSEOUT( "Ignoring %ld bytes from %s\n", + len, inet_stoa( src ) ); + } return 0; } // Now decrypt the data as needed @@ -1059,10 +1068,10 @@ static struct Interface *input_check( // The packet source MAC has arrived on other than its // previous channel. It thus gets dropped if tap/stdin is the // primary channel, or the time since the last packet for that - // interface is less than RECENT, with different limits for - // broadcast and unicast. - int64_t dmac = DIFFB_MILLIS( &now, &x->rec_when); - if ( x->remote->spec == 0 || RECENT( *buf & 1, dmac ) ) { + // interface is less than RECENT_MICROS, with different limits + // for broadcast and unicast. + int64_t dmac = DIFF_MICROS( &now, &x->rec_when); + if ( x->remote->spec == 0 || RECENT_MICROS( *buf & 1, dmac ) ) { if ( verbose >= 2 ) { fprintf( stderr, @@ -1122,10 +1131,10 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { } // broadcast. +x+ is source interface // x->rec_when is not updated - struct timeb now = { 0 }; - if ( ftime( &now ) ) { + struct timeval now = { 0 }; + if ( gettimeofday( &now, 0 ) ) { perror( "RECV time" ); - now.time = time( 0 ); + now.tv_sec = time( 0 ); } VERBOSE2OUT( "BC %s -> %s from %s\n", inet_mtoa( buf+6 ), inet_mtoa( buf ), @@ -1145,11 +1154,11 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { continue; } if ( r->spec && ! is_uplink( r->spec ) && - DIFFB_MILLIS( &now, &r->rec_when ) > VERYOLD_MILLIS ) { + DIFF_MICROS( &now, &r->rec_when ) > VERYOLD_MICROS ) { // remove old downlink connection VERBOSEOUT( "Old remote discarded %s (%ld)\n", inet_stoa( &r->uaddr ), - TIMEB_MILLIS( &r->rec_when ) ); + TIME_MICROS( &r->rec_when ) ); // Removing a downlink might have threading implications delete_remote( r ); continue; @@ -1165,6 +1174,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { static struct { Queue full; Queue free; + sem_t reading; } todolist; // The threadcontrol program for handling packets. @@ -1197,6 +1207,10 @@ void todolist_initialize(int nbuf,int nthr) { perror( "FATAL" ); exit( 1 ); } + if ( sem_init( &todolist.reading, 0, 1 ) ) { + perror( "FATAL" ); + exit( 1 ); + } Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) ); for ( ; nthr > 0; nthr-- ) { pthread_t thread; // Temporary thread id @@ -1210,44 +1224,29 @@ void todolist_initialize(int nbuf,int nthr) { // is updated for the new MAC address, However, if there is then a MAC // address clash in the connection table, then the associated remote // is removed, and the packet is dropped. -static void doreadUDP(int fd) { - PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free ); - socklen_t addrlen = - udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 ); - memset( &todo->src, 0, sizeof( todo->src ) ); - todo->fd = fd; - todo->len = recvfrom( - fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen ); - if ( todo->len == -1) { - perror( "Receiving UDP" ); - exit( 1 ); - } +static void *doreadUDP(void *data) { + int fd = ((ReaderData *) data)->fd; + while ( 1 ) { + PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free ); + socklen_t addrlen = + udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 ); + memset( &todo->src, 0, sizeof( todo->src ) ); + todo->fd = fd; + todo->len = recvfrom( + fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen ); + if ( todo->len == -1) { + perror( "Receiving UDP" ); + exit( 1 ); + } #ifdef GPROF - if ( len == 17 && memcmp( buf, "STOPSTOPSTOPSTOP", 16 ) == 0 ) { - exit( 0 ); - } -#endif - Queue_addItem( &todolist.full, (QueueItem*) todo ); -} - -// Handle packet received on the tap/stdio channel -static void received_tap(unsigned char *buf, int len) { - static struct Remote *tap_remote = 0; - if ( tap_remote == 0 ) { - Remote_LOCK; - if ( tap_remote == 0 ) { - tap_remote = add_remote( 0, 0 ); + if ( todo->len == 17 && + memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) { + exit( 0 ); } - Remote_UNLOCK; +#endif + Queue_addItem( &todolist.full, (QueueItem*) todo ); } - PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free ); - memcpy( (void*)todo->buffer, (void*)buf, len ); - memcpy( (void*)&todo->src, - (void*)&tap_remote->uaddr, - sizeof( struct SockAddr ) ); - todo->fd = 0; - todo->len = len; - Queue_addItem( &todolist.full, (QueueItem*) todo ); + return 0; } // Read up to n bytes from the given file descriptor into the buffer @@ -1284,11 +1283,11 @@ static void heartbeat(int fd) { VERBOSE3OUT( "heartbeat fd=%d\n", fd ); struct Remote *r; unsigned int i = 0; - struct timeb now; - if ( ftime( &now ) ) { + struct timeval now; + if ( gettimeofday( &now, 0 ) ) { perror( "HEARTBEAT time" ); - now.time = time( 0 ); - now.millitm = 0; + now.tv_sec = time( 0 ); + now.tv_usec = 0; } Remote_LOCK; for ( ; i < remotes.by_addr.size; i++ ) { @@ -1299,7 +1298,7 @@ static void heartbeat(int fd) { r = (struct Remote *) tmp; VERBOSE3OUT( "heartbeat check %s\n", inet_stoa( &r->uaddr ) ); if ( r->spec && is_uplink( r->spec ) ) { - if ( DIFFB_MILLIS( &now, &r->rec_when ) > HEARTBEAT_MILLIS ) { + if ( DIFF_MICROS( &now, &r->rec_when ) > HEARTBEAT_MICROS ) { VERBOSE3OUT( "heartbeat %s\n", inet_stoa( &r->uaddr ) ); write_remote( data, 0, r ); } @@ -1308,24 +1307,13 @@ static void heartbeat(int fd) { Remote_UNLOCK; } -// The threadcontrol program for issuing heartbeat packets. -// Regular heartbeat -static void *hearbeater_thread(void *data) { - (void) data; - for ( ;; ) { - sleep( HEARTBEAT ); - heartbeat( udp_fd ); - } - return 0; -} - // Tell how to use this program and exit with failure. static void usage(void) { fprintf( stderr, "Packet tunneling over UDP, multiple channels, " ); - fprintf( stderr, "version 0.2.5\n" ); + fprintf( stderr, "version 1.5.3\n" ); fprintf( stderr, "Usage: " ); fprintf( stderr, - "%s [-v] [-4] [-B n] [-T n] [-m mcast] [-t tap] port [remote]+ \n", + "%s [-v] [-tpg] [-4] [-B n] [-T n] [-m mcast] [-t tap] port [remote]+ \n", progname ); exit( 1 ); } @@ -1352,102 +1340,60 @@ static int tun_alloc(char *dev, int flags) { return fd; } -static void doreadTap() { - if ( stdio ) { - if ( input.end == 0 ) { +// Handle packet received on the tap/stdio channel +static void initialize_tap() { + // Ensure there is a Remote for this + static struct Remote *tap_remote = 0; + if ( tap_remote == 0 ) { + Remote_LOCK; + if ( tap_remote == 0 ) { + tap_remote = add_remote( 0, 0 ); + } + Remote_UNLOCK; + } +} + +// Thread to handle tap/stdio input +static void *doreadTap(void *data) { + int fd = ((ReaderData*) data)->fd; + unsigned int end = 0; // Packet size + unsigned int cur = 0; // Amount read so far + size_t e; + PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free ); + while ( 1 ) { + if ( stdio ) { uint16_t plength; int n = read_into( 0, (unsigned char *) &plength, - sizeof( plength ), 0 ); + sizeof( plength ), 0 ); if ( n == 0 ) { // Tap/stdio closed => exit silently exit( 0 ); } - input.end = ntohs( plength ); - input.cur = 0; - } - size_t e = input.end - input.cur; - unsigned char *p = input.buffer + input.cur; - if ( input.end > BUFSIZE ) { - // Oversize packets should be read and discarded - if ( e > BUFSIZE ) { - e = BUFSIZE; - } - p = input.buffer; - } - input.cur += read_into( 0, p, e, 1 ); - } else { - input.end = doread( tap_fd, input.buffer, BUFSIZE ); - input.cur = input.end; - } - if ( input.end == input.cur ) { - VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end ); - if ( input.end <= BUFSIZE ) { - received_tap( input.buffer, input.end ); - } - input.end = 0; // Ready for next packet - } - // End handling tap -} - -// MAXFD is Finalized before multi-threading, and then remains constant. -static int MAXFD; - -// This is the main packet handling loop -static int packet_loop() { - // A packet buffer for receiving UDP - unsigned char buffer[ BUFSIZE ]; - int n; - //time_t next_heartbeat = time(0); - int cycle = 0; // which channel to check first - while( 1 ) { - fd_set rd_set; - FD_ZERO( &rd_set ); - if ( mcast.fd ) { - FD_SET( mcast.fd, &rd_set ); - } - FD_SET( udp_fd, &rd_set ); - if ( udp6 ) { - FD_SET( udp_fd, &rd_set ); - } - if ( tap ) { // tap/stdio - FD_SET( tap_fd, &rd_set ); - } - n = select( MAXFD, &rd_set, NULL, NULL, NULL ); - VERBOSE3OUT( "select got %d\n", n ); - if ( n < 0 ) { - if ( errno == EINTR ) { - continue; + end = ntohs( plength ); + cur = 0; + while ( ( e = ( end - cur ) ) != 0 ) { + unsigned char *p = todo->buffer + cur; + if ( end > BUFSIZE ) { + // Oversize packets should be read and discarded + if ( e > BUFSIZE ) { + e = BUFSIZE; + } + p = todo->buffer; + } + cur += read_into( 0, p, e, 1 ); } - perror("select"); - exit(1); - } - if ( n == 0 ) { - continue; + } else { + end = doread( fd, todo->buffer, BUFSIZE ); + cur = end; } - // Process input with alternating priority across channels - for ( ;; cycle++ ) { - if ( cycle >= 3 ) { - cycle = 0; - } - if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) { - // Check and process UDP socket - doreadUDP( udp_fd ) ; - cycle = 1; - break; - } - if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) { - // Check and process multicast socket - doreadUDP( mcast.fd ) ; - cycle = 2; - break; - } - if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) { - // Check and process tap/stdio socket - doreadTap( tap_fd, buffer ); - cycle = 0; - break; - } + VERBOSE3OUT( "TAP/stdio input %d bytes\n", end ); + if ( end <= BUFSIZE ) { + todo->fd = 0; + todo->len = end; + Queue_addItem( &todolist.full, (QueueItem*) todo ); + todo = (PacketItem*) Queue_getItem( &todolist.free ); } + // End handling tap } return 0; } @@ -1460,6 +1406,7 @@ static int packet_loop() { // remote = [ipv6(/maskwidth)](:port)(=key) // ip = ipv4 | [ipv6] int main(int argc, char *argv[]) { + pthread_t thread; // Temporary thread id int port, i; progname = (unsigned char *) argv[0]; ///// Parse command line arguments @@ -1480,6 +1427,11 @@ int main(int argc, char *argv[]) { i++; ENSUREARGS( 1 ); } + if ( strncmp( "-tpg", argv[i], 4 ) == 0 ) { + tpg_quirk = 1; + i++; + ENSUREARGS( 1 ); + } // then: optional -4 if ( strncmp( "-4", argv[i], 2 ) == 0 ) { udp6 = 0; @@ -1522,7 +1474,7 @@ int main(int argc, char *argv[]) { } // then: required port if ( sscanf( argv[i++], "%d", &port ) != 1 ) { - fprintf( stderr, "Bad local port" ); + fprintf( stderr, "Bad local port: %s\n", argv[i-1] ); usage(); } // then: any number of allowed remotes @@ -1555,7 +1507,6 @@ int main(int argc, char *argv[]) { } todolist_initialize( buffers_count, threads_count ); - MAXFD = 0; // Set up the tap/stdio channel if ( tap ) { // set up the nominated tap @@ -1566,15 +1517,13 @@ int main(int argc, char *argv[]) { exit(1); } VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd ); - MAXFD = tap_fd; stdio = 0; // pretend a zero packet on the tap, for initializing. - received_tap( 0, 0 ); + initialize_tap(); } else { // set up for stdin/stdout local traffix setbuf( stdout, NULL ); // No buffering on stdout. tap_fd = 0; // actually stdin - MAXFD = 0; stdio = 1; } } else { @@ -1587,9 +1536,6 @@ int main(int argc, char *argv[]) { VERBOSEOUT( "Using multicast %s:%d at %d\n", inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ), mcast.fd ); - if ( mcast.fd > MAXFD ) { - MAXFD = mcast.fd; - } } // Set up the unicast UPD channel (all interfaces) if ( udp6 == 0 ) { @@ -1625,10 +1571,6 @@ int main(int argc, char *argv[]) { } VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd ); } - if ( udp_fd > MAXFD ) { - MAXFD = udp_fd; - } - MAXFD++ ; // If not using stdio for local traffic, then stdin and stdout are // closed here, so as to avoid that any other traffic channel gets // 0 or 1 as its file descriptor. Note: stderr (2) is left open. @@ -1636,13 +1578,27 @@ int main(int argc, char *argv[]) { close( 0 ); close( 1 ); } - VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n", - tap_fd, mcast.fd, udp_fd, MAXFD ); - - // Start heartbeater thread - pthread_t thread; // Temporary thread id -- not used - pthread_create( &thread, 0, hearbeater_thread, 0 ); + VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n", + tap_fd, mcast.fd, udp_fd ); // Handle packets - return packet_loop(); + ReaderData udp_reader = { .fd = udp_fd }; + pthread_create( &thread, 0, doreadUDP, &udp_reader ); + + if ( mcast.group.imr_multiaddr.s_addr ) { + ReaderData mcast_reader = { .fd = mcast.fd }; + pthread_create( &thread, 0, doreadUDP, &mcast_reader ); + } + + if ( tap_fd || stdio ) { + ReaderData tap_reader = { .fd = tap_fd }; + pthread_create( &thread, 0, doreadTap, &tap_reader ); + } + + // Start heartbeating to uplinks + for ( ;; ) { + sleep( HEARTBEAT ); + heartbeat( udp_fd ); + } + return 0; }