X-Git-Url: https://git.rrq.au/?a=blobdiff_plain;f=rrqnet.c;h=69c363c0f9c30bf144a069520cf85270ea919a72;hb=refs%2Ftags%2F1.6.2;hp=90b246a3aba15055ae6cbcf83af0faf16affc179;hpb=163de6cf40516674279551895071c0ab7ed0c09e;p=rrq%2Frrqnet.git diff --git a/rrqnet.c b/rrqnet.c index 90b246a..69c363c 100644 --- a/rrqnet.c +++ b/rrqnet.c @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -26,6 +27,7 @@ #include #include #include +#include #include #include @@ -64,7 +66,9 @@ struct Allowed { // Details of actualized connections. struct Remote { - struct SockAddr uaddr; + struct SockAddr uaddr; // The remote IP address + struct SockAddr laddr; // The local IP for this remote + int ifindex; // The local interface index struct Allowed *spec; // Rule being instantiated struct timeval rec_when; // Last received packet time, in seconds }; @@ -82,23 +86,30 @@ struct Interface { typedef struct _PacketItem { QueueItem base; int fd; - struct SockAddr src; + struct SockAddr src; // the remote IP for this packet + union { + struct in_pktinfo in4; + struct in6_pktinfo in6; + } dstinfo; // The PKTINFO for this packet ssize_t len; unsigned char buffer[ BUFSIZE ]; } PacketItem; +typedef struct _ReaderData { + int fd; +} ReaderData; + // heartbeat interval, in seconds -#define HEARTBEAT 30 -#define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 ) +#define HEARTBEAT_MICROS ( heart_rate * 1000000 ) // Macros for timing, for struct timeval variables #define TIME_MICROS(TM) (((int64_t) (TM)->tv_sec * 1000000) + (TM)->tv_usec ) #define DIFF_MICROS(TM1,TM2) ( TIME_MICROS(TM1) - TIME_MICROS(TM2) ) -// RECENT(T,M) is the time logic for requiring a gap time (in +// RECENT_MICROS(T,M) is the time logic for requiring a gap time (in // milliseconds) before shifting a MAC to a new remote. The limit is -// 6000 for broadcast and 20000 for unicast. -#define RECENT(T,M) ((M) < ((T)? 6000 : 20000 )) +// 6s for broadcast and 20s for unicast. +#define RECENT_MICROS(T,M) ((M) < ((T)? 6000000 : 20000000 )) // VERYOLD_MICROSS is used for discarding downlink remotes whose latest // activity is older than this. @@ -170,8 +181,10 @@ static int stdio = 0; // Default is neither stdio nor tap static char *tap = 0; // Name of tap, if any, or "-" for stdio static int tap_fd = 0; // Also used for stdin in stdio mode static int udp_fd; +static int udp_port; static int threads_count = 0; static int buffers_count = 0; +static int heart_rate = 30; // Setup for multicast channel static struct { @@ -184,6 +197,16 @@ static struct { // Flag to signal the UDP socket as being ipv6 or not (forced ipv4) static int udp6 = 1; +// The given UDP source address, if any +static struct { + int family; + unsigned char address[16]; +} udp_source; + +// Flag to indicate tpg transport patch = avoid UDP payload of 1470 +// bytes by adding 2 tag-along bytes +static int tpg_quirk = 0; + // Flag whether to make some stderr outputs or not. // 1 = normal verbosity, 2 = more output, 3 = source debug level stuff static int verbose; @@ -211,14 +234,6 @@ static pthread_mutex_t printing = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP; #define VERBOSE3OUT(fmt, ...) \ if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) ) -// A buffer for reading stdin in fragmented way, to allow outgoing -// packets during the reading of stdin packets, if it's fragmented. -static struct { - unsigned char buffer[ BUFSIZE ]; // Packet data - unsigned int end; // Packet size - unsigned int cur; // Amount read so far -} input; - // The actual name of this program (argv[0]) static unsigned char *progname; @@ -574,6 +589,7 @@ static int parse_bits(char *bits,int max,struct Allowed *into) { // Formats: [/][:][=keyfile] // Formats: [/][=keyfile] // Formats: \[[/]\][:][=keyfile] +// Formats: hostname:port[=keyfile] static int parse_allowed(char *arg,struct Allowed *into) { static char buffer[10000]; int n = strlen( arg ); @@ -676,6 +692,14 @@ static int parse_threads_count(char *arg) { return 0; } +static int parse_heartbeat_rate(char *arg) { + if ( ( sscanf( arg, "%u", &heart_rate ) != 1 ) || heart_rate < 0 ) { + return 1; + } + VERBOSEOUT( "** Heartbeat rate = %d\n", heart_rate ); + return 0; +} + static int parse_buffers_count(char *arg) { if ( ( sscanf( arg, "%u", &buffers_count ) != 1 ) || buffers_count < 1 ) { return 1; @@ -727,6 +751,38 @@ static int parse_mcast(char *arg) { return 0; } +//** IP address parsing utility for UDP source address +// Return 0 if ok and 1 otherwise +// Formats: or +// The ipv4 address should be a multicast address in ranges +// 224.0.0.0/22, 232.0.0.0/7, 234.0.0.0/8 or 239.0.0.0/8 +// though it's not checked here. +static int parse_udp_source(char *arg) { + if ( inet_pton( AF_INET6, arg, udp_source.address ) ) { + // An ipv6 address is given. + if ( udp6 ) { + udp_source.family = AF_INET6; + return 0; + } + return 1; + } + if ( ! inet_pton( AF_INET, arg, udp_source.address ) ) { + return 1; + } + + // An ipv4 address is given. + if ( udp6 ) { + // Translate into ipv6-encoded ipv4 + memmove( udp_source.address + 12, udp_source.address, 4 ); + memset( udp_source.address, 0, 10 ); + memset( udp_source.address + 10, -1, 2 ); + udp_source.family = AF_INET6; + } else { + udp_source.family = AF_INET; + } + return 0; +} + // Utility that sets upt the multicast socket, which is used for // receiving multicast packets. static void setup_mcast() { @@ -874,18 +930,107 @@ static int write_tap(unsigned char *buf, int n) { return dowrite( tap_fd, buf, n ); } + +// All sorts of fiddling is needed to set the source address for UDP +// And 2 different ways for pure ipv4 versus ipv6 sockets +static void sendpacket4(unsigned char *buf, int n,struct Remote *r) { + // The UDP socket is pure ipv4 + struct iovec data[1] = {{ .iov_base = buf, .iov_len = n }}; + struct { + struct cmsghdr hdr; + struct in_pktinfo data; + } control = { + .hdr.cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)), + .hdr.cmsg_level = IPPROTO_IP, + .hdr.cmsg_type = IP_PKTINFO, + .data.ipi_ifindex = r->ifindex, + .data.ipi_spec_dst = r->laddr.in4.sin_addr + }; + struct msghdr msg = { + .msg_name = &r->uaddr.in4, + .msg_namelen = sizeof( struct sockaddr_in ), + .msg_iov = data, + .msg_iovlen = 1, + .msg_control = &control, + .msg_controllen = CMSG_SPACE( sizeof( struct in_pktinfo ) ), + .msg_flags = 0 // unused + }; + VERBOSE2OUT( "sendmsg ipv4 %zu from %s to %s\n", + msg.msg_controllen, + inet_stoa( &r->laddr ), + inet_stoa( &r->uaddr ) ); + if ( sendmsg( udp_fd, &msg, 0 ) < n ) { + perror( "Writing socket" ); + } +} + +static void sendpacket6(unsigned char *buf, int n,struct Remote *r) { + // The UDP socket is ipv6, possibly with mapped ipv4 address(es) + struct iovec data[1] = {{ .iov_base = buf, .iov_len = n }}; + struct { + struct cmsghdr hdr; + union { + struct in_pktinfo in4; + struct in6_pktinfo in6; + } data; + } control; + struct msghdr msg = { + .msg_name = &r->uaddr.in6, + .msg_namelen = sizeof( struct sockaddr_in6 ), + .msg_iov = data, + .msg_iovlen = 1, + .msg_control = 0, + .msg_controllen = 0, + .msg_flags = 0 // unused + }; + if ( r->ifindex ) { + switch ( r->laddr.in.sa_family ) { + case AF_INET6: + control.hdr.cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo)); + control.hdr.cmsg_level = IPPROTO_IPV6; + control.hdr.cmsg_type = IPV6_PKTINFO; + control.data.in6.ipi6_ifindex = r->ifindex; + memcpy( &control.data.in6.ipi6_addr, &r->laddr.in6.sin6_addr, 16 ); + msg.msg_control = &control; + msg.msg_controllen = CMSG_SPACE( sizeof( struct in6_pktinfo ) ); + break; + case AF_INET: + control.hdr.cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)); + control.hdr.cmsg_level = IPPROTO_IP; + control.hdr.cmsg_type = IP_PKTINFO; + control.data.in4.ipi_ifindex = r->ifindex; + control.data.in4.ipi_spec_dst = r->laddr.in4.sin_addr; + msg.msg_control = &control; + msg.msg_controllen = CMSG_SPACE( sizeof( struct in_pktinfo ) ); + break; + } + } + VERBOSE2OUT( "sendmsg ipv6 %d from %s to %s\n", + r->ifindex, + inet_stoa( &r->laddr ), + inet_stoa( &r->uaddr ) ); + if ( sendmsg( udp_fd, &msg, 0 ) < n ) { + perror( "Writing socket" ); + } +} + // Write a packet via the given Interface with encryption as specified. static void write_remote(unsigned char *buf, int n,struct Remote *r) { // A packet buffer unsigned char output[ BUFSIZE ]; if ( n < 12 ) { - VERBOSE2OUT( "SEND %d bytes to %s\n", n, inet_stoa( &r->uaddr ) ); + VERBOSE2OUT( "SENDing %d bytes to %s\n", n, inet_stoa( &r->uaddr ) ); } else { - VERBOSE2OUT( "SEND %s -> %s to %s\n", + VERBOSE2OUT( "SENDing %d bytes %s -> %s to %s\n", n, inet_mtoa( buf+6 ), inet_mtoa( buf ), inet_stoa( &r->uaddr ) ); } memcpy( output, buf, n ); // Use the private buffer for delivery + // Apply the TPG quirk + if ( tpg_quirk && ( n > 1460 ) && ( n < 1478 ) ) { + VERBOSE2OUT( "tpg quirk applied\n" ); + n = 1478; // Add some "random" tag-along bytes + } if ( r->spec == 0 ) { if ( r->uaddr.in.sa_family == 0 ) { // Output to tap/stdio @@ -903,33 +1048,10 @@ static void write_remote(unsigned char *buf, int n,struct Remote *r) { } else if ( r->spec->psk.keyfile ) { encrypt( output, n, &r->spec->psk ); } - struct sockaddr *sock = &r->uaddr.in; - size_t size; - if ( sock->sa_family == AF_INET6 ) { - // Note that the size of +struct sockaddr_in6+ is actually - // larger than the size of +struct sockaddr+ (due to the - // addition of the +sin6_flowinfo+ field). It results in the - // following cuteness for passing arguments to +sendto+. - size = sizeof( struct sockaddr_in6 ); - VERBOSE2OUT( "IPv6 UDP %d %s\n", - udp_fd, inet_stoa( (struct SockAddr*) sock ) ); + if ( udp6 ) { + sendpacket6( output, n, r ); } else { - size = sizeof( struct sockaddr_in ); - VERBOSE2OUT( "IPv4 UDP %d %s\n", - udp_fd, inet_stoa( (struct SockAddr*) sock ) ); - } - VERBOSE2OUT( "SEND %d bytes to %s [%s -> %s]\n", - n, inet_stoa( (struct SockAddr*) sock ), - ( n < 12 )? "" : inet_mtoa( buf+6 ), - ( n < 12 )? "" : inet_mtoa( buf ) - ); - // IS sendto thread safe?? - if ( sendto( udp_fd, output, n, 0, sock, size ) < n ) { - perror( "Writing socket" ); - // Invalidate remote temporarily instead? But if it's an - // "uplink" it should be retried eventually... - // For now: just ignore the error. - // exit( 1 ); + sendpacket4( output, n, r ); } } @@ -973,10 +1095,12 @@ static void unmap_if_mapped(struct SockAddr *s) { } // Route the packet from the given src -static struct Interface *input_check( - unsigned char *buf,ssize_t len,struct SockAddr *src ) -{ - VERBOSE2OUT( "RECV %ld bytes from %s\n", len, inet_stoa( src ) ); +static struct Interface *input_check(PacketItem *pi) { + unsigned char *buf = pi->buffer; + ssize_t len = pi->len; + struct SockAddr *src = &pi->src; + + VERBOSE2OUT( "RECV %zd bytes from %s\n", len, inet_stoa( src ) ); struct Remote *r = 0; struct timeval now = { 0 }; if ( gettimeofday( &now, 0 ) ) { @@ -993,12 +1117,30 @@ static struct Interface *input_check( VERBOSEOUT( "New remote %s by %s\n", inet_stoa( src ), a->source ); r = add_remote( src, a ); //r->rec_when = now; // Set activity stamp of new remote + // Set the local addressing for the remote, unless set already + // Note: potential for multi-thread competition here + if ( udp6 ) { + r->ifindex = pi->dstinfo.in6.ipi6_ifindex; + r->laddr.in6.sin6_family = AF_INET6; + r->laddr.in6.sin6_port = htons( udp_port ); + memcpy( &r->laddr.in6.sin6_addr, + &pi->dstinfo.in6.ipi6_addr, + 16 ); + unmap_if_mapped( &r->laddr ); + } else { + r->ifindex = pi->dstinfo.in4.ipi_ifindex; + r->laddr.in4.sin_family = AF_INET; + r->laddr.in4.sin_port = htons( udp_port ); + memcpy( &r->laddr.in4.sin_addr, + &pi->dstinfo.in4.ipi_spec_dst, + 4 ); + } } if ( len < 12 ) { // Ignore short data, but maintain channel r->rec_when = now; // Update activity stamp touched remote if ( len > 0 ) { - VERBOSEOUT( "Ignoring %ld bytes from %s\n", + VERBOSEOUT( "Ignoring %zd bytes from %s\n", len, inet_stoa( src ) ); } return 0; @@ -1062,14 +1204,14 @@ static struct Interface *input_check( // The packet source MAC has arrived on other than its // previous channel. It thus gets dropped if tap/stdin is the // primary channel, or the time since the last packet for that - // interface is less than RECENT, with different limits for - // broadcast and unicast. - int64_t dmac = DIFF_MICROS( &now, &x->rec_when); - if ( x->remote->spec == 0 || RECENT( *buf & 1, dmac ) ) { + // interface is less than RECENT_MICROS, with different limits + // for broadcast and unicast. + uint64_t dmac = DIFF_MICROS( &now, &x->rec_when); + if ( x->remote->spec == 0 || RECENT_MICROS( *buf & 1, dmac ) ) { if ( verbose >= 2 ) { fprintf( stderr, - "Dropped. MAC %s (%ld) from %s, should be %s\n", + "Dropped. MAC %s (%"PRIu64") from %s, should be %s\n", inet_mtoa( buf+6 ), dmac, inet_stoa( src ), inet_stoa( &x->remote->uaddr ) ); } @@ -1094,33 +1236,36 @@ static struct Interface *input_check( } // Check packet and deliver out -static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { - struct Interface *x = input_check( buf, len, src ); +static void route_packet(PacketItem *pi) { + //unsigned char *buf = pi->buffer; + //ssize_t len = pi->len; + struct Interface *x = input_check( pi ); if ( x == 0 ) { + VERBOSE2OUT( "not a nice packet\n" ); return; // not a nice packet } - if ( ( *buf & 1 ) == 0 ) { + if ( ( *pi->buffer & 1 ) == 0 ) { // unicast struct Interface *y = 0; // reuse for destination interface - Interface_FIND( buf, y ); + Interface_FIND( pi->buffer, y ); if ( y == 0 ) { VERBOSE2OUT( "RECV %s -> %s from %s without channel and dropped\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), + inet_mtoa( pi->buffer + 6 ), inet_mtoa( pi->buffer ), inet_stoa( &x->remote->uaddr ) ); return; } if ( x->remote == y->remote ) { VERBOSEOUT( "RECV loop for %s -> %s from %s to %s\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), + inet_mtoa( pi->buffer+6 ), inet_mtoa( pi->buffer ), inet_stoa( &x->remote->uaddr ), inet_stoa( &y->remote->uaddr ) ); Interface_DEL( y ); // Need to see this interface again return; } - VERBOSE2OUT( "RECV route %s -> %s to %s\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), - inet_stoa( &y->remote->uaddr ) ); - write_remote( buf, len, y->remote ); + VERBOSE2OUT( "RECV route %s -> %s\n", + inet_mtoa( pi->buffer+6 ), + inet_mtoa( pi->buffer ) ); + write_remote( pi->buffer, pi->len, y->remote ); return; } // broadcast. +x+ is source interface @@ -1130,9 +1275,10 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { perror( "RECV time" ); now.tv_sec = time( 0 ); } - VERBOSE2OUT( "BC %s -> %s from %s\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), - inet_stoa( &x->remote->uaddr ) ); + VERBOSE2OUT( "BC %s -> %s from %s to %s\n", + inet_mtoa( pi->buffer+6 ), inet_mtoa( pi->buffer ), + inet_stoa( &x->remote->uaddr ), + inet_stoa( &x->remote->laddr ) ); struct Remote *r; unsigned int i = 0; Remote_LOCK; @@ -1150,7 +1296,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { if ( r->spec && ! is_uplink( r->spec ) && DIFF_MICROS( &now, &r->rec_when ) > VERYOLD_MICROS ) { // remove old downlink connection - VERBOSEOUT( "Old remote discarded %s (%ld)\n", + VERBOSEOUT( "Old remote discarded %s (%"PRId64")\n", inet_stoa( &r->uaddr ), TIME_MICROS( &r->rec_when ) ); // Removing a downlink might have threading implications @@ -1159,7 +1305,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { } // Send packet to the remote // Only no-clash or to the tap/stdin - write_remote( buf, len, r ); + write_remote( pi->buffer, pi->len, r ); } Remote_UNLOCK; } @@ -1168,6 +1314,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) { static struct { Queue full; Queue free; + sem_t reading; } todolist; // The threadcontrol program for handling packets. @@ -1176,14 +1323,17 @@ static void *packet_handler(void *data) { for ( ;; ) { PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.full ); if ( todo->fd == mcast.fd ) { - // Patch multicast address as source for multicast packet - route_packet( todo->buffer, todo->len, &mcast.sock ); + // Patch in the multicast address as source for multicast packet + memcpy( &todo->src, &mcast.sock, sizeof( todo->src ) ); + route_packet( todo ); } else { if ( udp6 ) { unmap_if_mapped( &todo->src ); } - route_packet( todo->buffer, todo->len, &todo->src ); + route_packet( todo ); } + memset( &todo->src, 0, sizeof( struct SockAddr ) ); + memset( &todo->dstinfo, 0, sizeof( todo->dstinfo ) ); Queue_addItem( &todolist.free, (QueueItem*) todo ); } return 0; @@ -1200,6 +1350,10 @@ void todolist_initialize(int nbuf,int nthr) { perror( "FATAL" ); exit( 1 ); } + if ( sem_init( &todolist.reading, 0, 1 ) ) { + perror( "FATAL" ); + exit( 1 ); + } Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) ); for ( ; nthr > 0; nthr-- ) { pthread_t thread; // Temporary thread id @@ -1207,51 +1361,64 @@ void todolist_initialize(int nbuf,int nthr) { } } +// Reads a UDP packet on the given file descriptor and captures the +// source and destination addresses of the UDP message. +inline static ssize_t recvpacket(int fd,PacketItem *p) { + char data[100]; // Data area for "pktinfo" + struct iovec buffer[1] = {{ p->buffer, BUFSIZE }}; + struct msghdr msg = { + .msg_name = &p->src.in, + .msg_namelen = udp6? sizeof( p->src.in6 ) : sizeof( p->src.in4 ), + .msg_iov = buffer, + .msg_iovlen = 1, + .msg_control = data, + .msg_controllen = sizeof( data ), + .msg_flags = 0 // Return value + }; + p->len = recvmsg( fd, &msg, udp6? 0 : IP_PKTINFO ); + struct cmsghdr *cmsg = CMSG_FIRSTHDR( &msg ); + if ( cmsg ) { + if ( udp6 ) { + memcpy( &p->dstinfo.in6, CMSG_DATA( cmsg ), + sizeof( struct in6_pktinfo ) ); + VERBOSE2OUT( "DEST= udp6 %d\n", p->dstinfo.in6.ipi6_ifindex ); + } else { + memcpy( &p->dstinfo.in4, CMSG_DATA( cmsg ), + sizeof( struct in_pktinfo ) ); + VERBOSE2OUT( "DEST= %d\n", p->dstinfo.in4.ipi_ifindex ); + } + } + return p->len; +} + + + // Read a full UDP packet into the given buffer, associate with a // connection, or create a new connection, the decrypt the as // specified, and capture the sender MAC address. The connection table // is updated for the new MAC address, However, if there is then a MAC // address clash in the connection table, then the associated remote // is removed, and the packet is dropped. -static void doreadUDP(int fd) { - PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free ); - socklen_t addrlen = - udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 ); - memset( &todo->src, 0, sizeof( todo->src ) ); - todo->fd = fd; - todo->len = recvfrom( - fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen ); - if ( todo->len == -1) { - perror( "Receiving UDP" ); - exit( 1 ); - } +static void *doreadUDP(void *data) { + int fd = ((ReaderData *) data)->fd; + while ( 1 ) { + PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free ); + todo->fd = fd; + VERBOSE3OUT( "Reading packet on %d\n", fd ); + ssize_t len = recvpacket( fd, todo ); + if ( len == -1) { + perror( "Receiving UDP" ); + exit( 1 ); + } #ifdef GPROF - if ( todo->len == 17 && - memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) { - exit( 0 ); - } -#endif - Queue_addItem( &todolist.full, (QueueItem*) todo ); -} - -// Handle packet received on the tap/stdio channel -static void received_tap(unsigned char *buf, int len) { - static struct Remote *tap_remote = 0; - if ( tap_remote == 0 ) { - Remote_LOCK; - if ( tap_remote == 0 ) { - tap_remote = add_remote( 0, 0 ); + if ( len == 17 && + memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) { + exit( 0 ); } - Remote_UNLOCK; +#endif + Queue_addItem( &todolist.full, (QueueItem*) todo ); } - PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free ); - memcpy( (void*)todo->buffer, (void*)buf, len ); - memcpy( (void*)&todo->src, - (void*)&tap_remote->uaddr, - sizeof( struct SockAddr ) ); - todo->fd = 0; - todo->len = len; - Queue_addItem( &todolist.full, (QueueItem*) todo ); + return 0; } // Read up to n bytes from the given file descriptor into the buffer @@ -1303,7 +1470,7 @@ static void heartbeat(int fd) { r = (struct Remote *) tmp; VERBOSE3OUT( "heartbeat check %s\n", inet_stoa( &r->uaddr ) ); if ( r->spec && is_uplink( r->spec ) ) { - if ( DIFF_MICROS( &now, &r->rec_when ) > HEARTBEAT_MICROS ) { + if ( DIFF_MICROS( &now, &r->rec_when ) >= HEARTBEAT_MICROS ) { VERBOSE3OUT( "heartbeat %s\n", inet_stoa( &r->uaddr ) ); write_remote( data, 0, r ); } @@ -1312,25 +1479,21 @@ static void heartbeat(int fd) { Remote_UNLOCK; } -// The threadcontrol program for issuing heartbeat packets. -// Regular heartbeat -static void *hearbeater_thread(void *data) { - (void) data; - for ( ;; ) { - sleep( HEARTBEAT ); - heartbeat( udp_fd ); - } - return 0; -} - // Tell how to use this program and exit with failure. static void usage(void) { fprintf( stderr, "Packet tunneling over UDP, multiple channels, " ); - fprintf( stderr, "version 0.2.5\n" ); + fprintf( stderr, "version 1.5.3\n" ); fprintf( stderr, "Usage: " ); - fprintf( stderr, - "%s [-v] [-4] [-B n] [-T n] [-m mcast] [-t tap] port [remote]+ \n", - progname ); + fprintf( stderr, "%s [options] port [remote]+ \n", progname ); + fprintf( stderr, "** options must be given or omitted in order!!\n" ); + fprintf( stderr, " -v = verbose log, -vv or -vvv for more logs\n" ); + fprintf( stderr, " -tpg = UDP transport quirk: avoid bad sizes\n" ); + fprintf( stderr, " -4 = use an ipv4 UDP socket\n" ); + fprintf( stderr, " -B n = use n buffers (2*threads) by default\n"); + fprintf( stderr, " -T n = use n delivery threads (5 bu default)\n" ); + fprintf( stderr, " -m mcast = allow remotes on multicast address\n" ); + fprintf( stderr, " -t tap = use the nominated tap (or - for stdio)\n" ); + fprintf( stderr, " -S source = use given source address for UDP\n" ); exit( 1 ); } @@ -1356,102 +1519,60 @@ static int tun_alloc(char *dev, int flags) { return fd; } -static void doreadTap() { - if ( stdio ) { - if ( input.end == 0 ) { +// Handle packet received on the tap/stdio channel +static void initialize_tap() { + // Ensure there is a Remote for this + static struct Remote *tap_remote = 0; + if ( tap_remote == 0 ) { + Remote_LOCK; + if ( tap_remote == 0 ) { + tap_remote = add_remote( 0, 0 ); + } + Remote_UNLOCK; + } +} + +// Thread to handle tap/stdio input +static void *doreadTap(void *data) { + int fd = ((ReaderData*) data)->fd; + unsigned int end = 0; // Packet size + unsigned int cur = 0; // Amount read so far + size_t e; + PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free ); + while ( 1 ) { + if ( stdio ) { uint16_t plength; int n = read_into( 0, (unsigned char *) &plength, - sizeof( plength ), 0 ); + sizeof( plength ), 0 ); if ( n == 0 ) { // Tap/stdio closed => exit silently exit( 0 ); } - input.end = ntohs( plength ); - input.cur = 0; - } - size_t e = input.end - input.cur; - unsigned char *p = input.buffer + input.cur; - if ( input.end > BUFSIZE ) { - // Oversize packets should be read and discarded - if ( e > BUFSIZE ) { - e = BUFSIZE; - } - p = input.buffer; - } - input.cur += read_into( 0, p, e, 1 ); - } else { - input.end = doread( tap_fd, input.buffer, BUFSIZE ); - input.cur = input.end; - } - if ( input.end == input.cur ) { - VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end ); - if ( input.end <= BUFSIZE ) { - received_tap( input.buffer, input.end ); - } - input.end = 0; // Ready for next packet - } - // End handling tap -} - -// MAXFD is Finalized before multi-threading, and then remains constant. -static int MAXFD; - -// This is the main packet handling loop -static int packet_loop() { - // A packet buffer for receiving UDP - unsigned char buffer[ BUFSIZE ]; - int n; - //time_t next_heartbeat = time(0); - int cycle = 0; // which channel to check first - while( 1 ) { - fd_set rd_set; - FD_ZERO( &rd_set ); - if ( mcast.fd ) { - FD_SET( mcast.fd, &rd_set ); - } - FD_SET( udp_fd, &rd_set ); - if ( udp6 ) { - FD_SET( udp_fd, &rd_set ); - } - if ( tap ) { // tap/stdio - FD_SET( tap_fd, &rd_set ); - } - n = select( MAXFD, &rd_set, NULL, NULL, NULL ); - VERBOSE3OUT( "select got %d\n", n ); - if ( n < 0 ) { - if ( errno == EINTR ) { - continue; + end = ntohs( plength ); + cur = 0; + while ( ( e = ( end - cur ) ) != 0 ) { + unsigned char *p = todo->buffer + cur; + if ( end > BUFSIZE ) { + // Oversize packets should be read and discarded + if ( e > BUFSIZE ) { + e = BUFSIZE; + } + p = todo->buffer; + } + cur += read_into( 0, p, e, 1 ); } - perror("select"); - exit(1); + } else { + end = doread( fd, todo->buffer, BUFSIZE ); + cur = end; } - if ( n == 0 ) { - continue; - } - // Process input with alternating priority across channels - for ( ;; cycle++ ) { - if ( cycle >= 3 ) { - cycle = 0; - } - if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) { - // Check and process UDP socket - doreadUDP( udp_fd ) ; - cycle = 1; - break; - } - if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) { - // Check and process multicast socket - doreadUDP( mcast.fd ) ; - cycle = 2; - break; - } - if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) { - // Check and process tap/stdio socket - doreadTap( tap_fd, buffer ); - cycle = 0; - break; - } + VERBOSE3OUT( "TAP/stdio input %d bytes\n", end ); + if ( end <= BUFSIZE ) { + todo->fd = 0; + todo->len = end; + Queue_addItem( &todolist.full, (QueueItem*) todo ); + todo = (PacketItem*) Queue_getItem( &todolist.free ); } + // End handling tap } return 0; } @@ -1464,7 +1585,8 @@ static int packet_loop() { // remote = [ipv6(/maskwidth)](:port)(=key) // ip = ipv4 | [ipv6] int main(int argc, char *argv[]) { - int port, i; + pthread_t thread; // Temporary thread id + int i; progname = (unsigned char *) argv[0]; ///// Parse command line arguments i = 1; @@ -1484,6 +1606,11 @@ int main(int argc, char *argv[]) { i++; ENSUREARGS( 1 ); } + if ( strncmp( "-tpg", argv[i], 4 ) == 0 ) { + tpg_quirk = 1; + i++; + ENSUREARGS( 1 ); + } // then: optional -4 if ( strncmp( "-4", argv[i], 2 ) == 0 ) { udp6 = 0; @@ -1508,6 +1635,15 @@ int main(int argc, char *argv[]) { i += 2; ENSUREARGS( 1 ); } + // then: optional -H seconds + if ( strncmp( "-H", argv[i], 2 ) == 0 ) { + ENSUREARGS( 2 ); + if ( parse_heartbeat_rate( argv[i+1] ) ) { + usage(); + } + i += 2; + ENSUREARGS( 1 ); + } // then: optional -m mcast if ( strncmp( "-m", argv[i], 2 ) == 0 ) { ENSUREARGS( 2 ); @@ -1524,9 +1660,18 @@ int main(int argc, char *argv[]) { i += 2; ENSUREARGS( 1 ); } + // Then optional source address for UDP + if ( strncmp( "-S", argv[i], 2 ) == 0 ) { + ENSUREARGS( 2 ); + if ( parse_udp_source( argv[i+1] ) ) { + usage(); + } + i += 2; + ENSUREARGS( 1 ); + } // then: required port - if ( sscanf( argv[i++], "%d", &port ) != 1 ) { - fprintf( stderr, "Bad local port" ); + if ( sscanf( argv[i++], "%d", &udp_port ) != 1 ) { + fprintf( stderr, "Bad local port: %s\n", argv[i-1] ); usage(); } // then: any number of allowed remotes @@ -1559,7 +1704,6 @@ int main(int argc, char *argv[]) { } todolist_initialize( buffers_count, threads_count ); - MAXFD = 0; // Set up the tap/stdio channel if ( tap ) { // set up the nominated tap @@ -1570,15 +1714,13 @@ int main(int argc, char *argv[]) { exit(1); } VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd ); - MAXFD = tap_fd; stdio = 0; // pretend a zero packet on the tap, for initializing. - received_tap( 0, 0 ); + initialize_tap(); } else { // set up for stdin/stdout local traffix setbuf( stdout, NULL ); // No buffering on stdout. tap_fd = 0; // actually stdin - MAXFD = 0; stdio = 1; } } else { @@ -1591,9 +1733,6 @@ int main(int argc, char *argv[]) { VERBOSEOUT( "Using multicast %s:%d at %d\n", inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ), mcast.fd ); - if ( mcast.fd > MAXFD ) { - MAXFD = mcast.fd; - } } // Set up the unicast UPD channel (all interfaces) if ( udp6 == 0 ) { @@ -1604,14 +1743,23 @@ int main(int argc, char *argv[]) { } struct sockaddr_in udp_addr = { .sin_family = AF_INET, - .sin_port = htons( port ), - .sin_addr.s_addr = htonl(INADDR_ANY), + .sin_port = htons( udp_port ), }; + if ( udp_source.family == 0 ) { + udp_addr.sin_addr.s_addr = htonl( INADDR_ANY ); + } else { + udp_addr.sin_addr.s_addr = *((uint32_t*) udp_source.address); + } if ( bind( udp_fd, (struct sockaddr*) &udp_addr, sizeof(udp_addr))) { fprintf( stderr, "Error binding socket!\n"); exit(1); } VERBOSEOUT( "Using ipv4 UDP at %d\n", udp_fd ); + int opt = 1; + if ( setsockopt( udp_fd, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt)) ) { + fprintf( stderr, "Error configuring socket!\n"); + exit(1); + } } else { // set up ipv6 socket if ( ( udp_fd = socket( AF_INET6, SOCK_DGRAM, 0 ) ) == 0 ) { @@ -1620,19 +1768,21 @@ int main(int argc, char *argv[]) { } struct sockaddr_in6 udp6_addr = { .sin6_family = AF_INET6, - .sin6_port = htons( port ), - .sin6_addr = IN6ADDR_ANY_INIT, + .sin6_port = htons( udp_port ), }; + memcpy( udp6_addr.sin6_addr.s6_addr, udp_source.address, 16 ); if ( bind( udp_fd, (struct sockaddr*) &udp6_addr, sizeof(udp6_addr))) { fprintf( stderr, "Error binding socket!\n"); exit(1); } VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd ); + int opt = 1; + if ( setsockopt( + udp_fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &opt, sizeof(opt)) ) { + fprintf( stderr, "Error configuring socket!\n"); + exit(1); + } } - if ( udp_fd > MAXFD ) { - MAXFD = udp_fd; - } - MAXFD++ ; // If not using stdio for local traffic, then stdin and stdout are // closed here, so as to avoid that any other traffic channel gets // 0 or 1 as its file descriptor. Note: stderr (2) is left open. @@ -1640,13 +1790,31 @@ int main(int argc, char *argv[]) { close( 0 ); close( 1 ); } - VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n", - tap_fd, mcast.fd, udp_fd, MAXFD ); - - // Start heartbeater thread - pthread_t thread; // Temporary thread id -- not used - pthread_create( &thread, 0, hearbeater_thread, 0 ); + VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n", + tap_fd, mcast.fd, udp_fd ); // Handle packets - return packet_loop(); + ReaderData udp_reader = { .fd = udp_fd }; + pthread_create( &thread, 0, doreadUDP, &udp_reader ); + + if ( mcast.group.imr_multiaddr.s_addr ) { + ReaderData mcast_reader = { .fd = mcast.fd }; + pthread_create( &thread, 0, doreadUDP, &mcast_reader ); + } + + if ( tap_fd || stdio ) { + ReaderData tap_reader = { .fd = tap_fd }; + pthread_create( &thread, 0, doreadTap, &tap_reader ); + } + + // Start heartbeating to uplinks + for ( ;; ) { + if ( heart_rate != 0 ) { + sleep( heart_rate ); + heartbeat( udp_fd ); + } else { + sleep( 600 ); + } + } + return 0; }