new version
[rrq/rrqnet.git] / rrqnet.c
index 85f609ebddee6851041be21cd1415cdd2805ac68..69c363c0f9c30bf144a069520cf85270ea919a72 100644 (file)
--- a/rrqnet.c
+++ b/rrqnet.c
@@ -16,6 +16,7 @@
 
 #include <errno.h>
 #include <fcntl.h>
+#include <inttypes.h>
 #include <linux/if.h>
 #include <linux/if_tun.h>
 #include <stddef.h>
@@ -24,8 +25,9 @@
 #include <string.h>
 #include <sys/ioctl.h>
 #include <sys/stat.h>
-#include <sys/timeb.h>
+#include <sys/time.h>
 #include <sys/types.h>
+#include <sys/socket.h>
 #include <time.h>
 #include <unistd.h>
 
@@ -64,15 +66,17 @@ struct Allowed {
 
 // Details of actualized connections.
 struct Remote {
-    struct SockAddr uaddr;
+    struct SockAddr uaddr;     // The remote IP address
+    struct SockAddr laddr;     // The local IP for this remote
+    int ifindex;               // The local interface index
     struct Allowed *spec;      // Rule being instantiated
-    struct timeb rec_when;     // Last received packet time, in seconds
+    struct timeval rec_when;   // Last received packet time, in seconds
 };
 
 // Details of an interface at a remote.
 struct Interface  {
     unsigned char mac[6];      // MAC address used last (key for by_mac table)
-    struct timeb rec_when;     // Last packet time, in seconds
+    struct timeval rec_when;   // Last packet time, in seconds
     struct Remote *remote;
 };
 
@@ -82,28 +86,34 @@ struct Interface  {
 typedef struct _PacketItem {
     QueueItem base;
     int fd;
-    struct SockAddr src;
+    struct SockAddr src; // the remote IP for this packet
+    union {
+       struct in_pktinfo in4;
+       struct in6_pktinfo in6;
+    } dstinfo;         // The PKTINFO for this packet
     ssize_t len;
     unsigned char buffer[ BUFSIZE ];
 } PacketItem;
 
+typedef struct _ReaderData {
+    int fd;
+} ReaderData;
+
 // heartbeat interval, in seconds
-#define HEARTBEAT 30
-#define HEARTBEAT_MILLIS ( HEARTBEAT * 1000 )
+#define HEARTBEAT_MICROS ( heart_rate * 1000000 )
 
-// Macros for timing, for struct timeb variables
-#define TIMEB_MILLIS(TM) (((int64_t) (TM)->time * 1000) + (TM)->millitm )
-#define DIFFB_MILLIS(TM1,TM2) ( TIMEB_MILLIS(TM1) - TIMEB_MILLIS(TM2) )
+// Macros for timing, for struct timeval variables
+#define TIME_MICROS(TM) (((int64_t) (TM)->tv_sec * 1000000) + (TM)->tv_usec )
+#define DIFF_MICROS(TM1,TM2) ( TIME_MICROS(TM1) - TIME_MICROS(TM2) )
 
-// RECENT(T,M) is the time logic for requiring a gap time (in
+// RECENT_MICROS(T,M) is the time logic for requiring a gap time (in
 // milliseconds) before shifting a MAC to a new remote. The limit is
-// 6000 for broadcast and 20000 for unicast.
-#define RECENT(T,M) ((M) < ((T)? 6000 : 20000 ))
+// 6s for broadcast and 20s for unicast.
+#define RECENT_MICROS(T,M) ((M) < ((T)? 6000000 : 20000000 ))
 
-// VERYOLD_MILLIS is used for discarding downlink remotes whose latest
+// VERYOLD_MICROSS is used for discarding downlink remotes whose latest
 // activity is older than this.
-#define VERYOLD_MILLIS 180000
-
+#define VERYOLD_MICROS 180000000
 
 ////////// Variables
 
@@ -171,8 +181,10 @@ static int stdio = 0; // Default is neither stdio nor tap
 static char *tap = 0; // Name of tap, if any, or "-" for stdio
 static int tap_fd = 0; // Also used for stdin in stdio mode
 static int udp_fd;
+static int udp_port;
 static int threads_count = 0;
 static int buffers_count = 0;
+static int heart_rate = 30;
 
 // Setup for multicast channel
 static struct {
@@ -185,6 +197,16 @@ static struct {
 // Flag to signal the UDP socket as being ipv6 or not (forced ipv4)
 static int udp6 = 1;
 
+// The given UDP source address, if any
+static struct {
+    int family;
+    unsigned char address[16];
+} udp_source;
+
+// Flag to indicate tpg transport patch = avoid UDP payload of 1470
+// bytes by adding 2 tag-along bytes
+static int tpg_quirk = 0;
+
 // Flag whether to make some stderr outputs or not.
 // 1 = normal verbosity, 2 = more output, 3 = source debug level stuff
 static int verbose;
@@ -212,14 +234,6 @@ static pthread_mutex_t printing = PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP;
 #define VERBOSE3OUT(fmt, ...) \
     if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) )
 
-// A buffer for reading stdin in fragmented way, to allow outgoing
-// packets during the reading of stdin packets, if it's fragmented.
-static struct {
-    unsigned char buffer[ BUFSIZE ]; // Packet data
-    unsigned int end;               // Packet size
-    unsigned int cur;               // Amount read so far
-} input;
-
 // The actual name of this program (argv[0])
 static unsigned char *progname;
 
@@ -575,6 +589,7 @@ static int parse_bits(char *bits,int max,struct Allowed *into) {
 // Formats: <ipv4-address>[/<bits>][:<port>][=keyfile]
 // Formats: <ipv6-address>[/<bits>][=keyfile]
 // Formats: \[<ipv6-address>[/<bits>]\][:<port>][=keyfile]
+// Formats: hostname:port[=keyfile]
 static int parse_allowed(char *arg,struct Allowed *into) {
     static char buffer[10000];
     int n = strlen( arg );
@@ -677,6 +692,14 @@ static int parse_threads_count(char *arg) {
     return 0;
 }
 
+static int parse_heartbeat_rate(char *arg) {
+    if ( ( sscanf( arg, "%u", &heart_rate ) != 1 ) || heart_rate < 0 ) {
+       return 1;
+    }
+    VERBOSEOUT( "** Heartbeat rate = %d\n", heart_rate );
+    return 0;
+}
+
 static int parse_buffers_count(char *arg) {
     if ( ( sscanf( arg, "%u", &buffers_count ) != 1 ) || buffers_count < 1 ) {
        return 1;
@@ -728,6 +751,38 @@ static int parse_mcast(char *arg) {
     return 0;
 }
 
+//** IP address parsing utility for UDP source address
+// Return 0 if ok and 1 otherwise
+// Formats: <ipv4-address> or <ipv6-address>
+// The ipv4 address should be a multicast address in ranges
+// 224.0.0.0/22, 232.0.0.0/7, 234.0.0.0/8 or 239.0.0.0/8
+// though it's not checked here.
+static int parse_udp_source(char *arg) {
+    if ( inet_pton( AF_INET6, arg, udp_source.address ) ) {
+       // An ipv6 address is given.
+       if ( udp6 ) {
+           udp_source.family = AF_INET6;
+           return 0;
+       }
+       return 1;
+    }
+    if ( ! inet_pton( AF_INET, arg, udp_source.address ) ) {
+       return 1;
+    }
+
+    // An ipv4 address is given.
+    if ( udp6 ) {
+       // Translate into ipv6-encoded ipv4
+       memmove( udp_source.address + 12, udp_source.address, 4 );
+       memset( udp_source.address, 0, 10 );
+       memset( udp_source.address + 10, -1, 2 );
+       udp_source.family = AF_INET6;
+    } else {
+       udp_source.family = AF_INET;
+    }
+    return 0;
+}
+
 // Utility that sets upt the multicast socket, which is used for
 // receiving multicast packets.
 static void setup_mcast() {
@@ -875,18 +930,107 @@ static int write_tap(unsigned char *buf, int n) {
     return dowrite( tap_fd, buf, n );
 }
 
+
+// All sorts of fiddling is needed to set the source address for UDP
+// And 2 different ways for pure ipv4 versus ipv6 sockets
+static void sendpacket4(unsigned char *buf, int n,struct Remote *r) {
+    // The UDP socket is pure ipv4
+    struct iovec data[1] = {{ .iov_base = buf, .iov_len = n }};
+    struct {
+       struct cmsghdr hdr;
+       struct in_pktinfo data;
+    } control = {
+       .hdr.cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)),
+       .hdr.cmsg_level = IPPROTO_IP,
+       .hdr.cmsg_type = IP_PKTINFO,
+       .data.ipi_ifindex = r->ifindex,
+       .data.ipi_spec_dst = r->laddr.in4.sin_addr
+    };
+    struct msghdr msg = {
+       .msg_name = &r->uaddr.in4,
+       .msg_namelen = sizeof( struct sockaddr_in ),
+       .msg_iov = data,
+       .msg_iovlen = 1,
+       .msg_control =  &control,
+       .msg_controllen = CMSG_SPACE( sizeof( struct in_pktinfo ) ),
+       .msg_flags = 0 // unused
+    };
+    VERBOSE2OUT( "sendmsg ipv4 %zu from %s to %s\n",
+                msg.msg_controllen,
+                inet_stoa( &r->laddr ),
+                inet_stoa( &r->uaddr ) );
+    if ( sendmsg( udp_fd, &msg, 0 ) < n ) {
+       perror( "Writing socket" );
+    }
+}
+
+static void sendpacket6(unsigned char *buf, int n,struct Remote *r) {
+    // The UDP socket is ipv6, possibly with mapped ipv4 address(es)
+    struct iovec data[1] = {{ .iov_base = buf, .iov_len = n }};
+    struct {
+       struct cmsghdr hdr;
+       union {
+           struct in_pktinfo in4;
+           struct in6_pktinfo in6;
+       } data;
+    } control;
+    struct msghdr msg = {
+       .msg_name = &r->uaddr.in6,
+       .msg_namelen = sizeof( struct sockaddr_in6 ),
+       .msg_iov = data,
+       .msg_iovlen = 1,
+       .msg_control = 0,
+       .msg_controllen = 0,
+       .msg_flags = 0 // unused
+    };
+    if ( r->ifindex ) {
+       switch ( r->laddr.in.sa_family ) {
+       case AF_INET6:
+           control.hdr.cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo));
+           control.hdr.cmsg_level = IPPROTO_IPV6;
+           control.hdr.cmsg_type = IPV6_PKTINFO;
+           control.data.in6.ipi6_ifindex = r->ifindex;
+           memcpy( &control.data.in6.ipi6_addr, &r->laddr.in6.sin6_addr, 16 );
+           msg.msg_control = &control;
+           msg.msg_controllen = CMSG_SPACE( sizeof( struct in6_pktinfo ) );
+           break;
+       case AF_INET:
+           control.hdr.cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo));
+           control.hdr.cmsg_level = IPPROTO_IP;
+           control.hdr.cmsg_type = IP_PKTINFO;
+           control.data.in4.ipi_ifindex = r->ifindex;
+           control.data.in4.ipi_spec_dst = r->laddr.in4.sin_addr;
+           msg.msg_control = &control;
+           msg.msg_controllen = CMSG_SPACE( sizeof( struct in_pktinfo ) );
+           break;
+       }
+    }
+    VERBOSE2OUT( "sendmsg ipv6 %d from %s to %s\n",
+                r->ifindex,
+                inet_stoa( &r->laddr ),
+                inet_stoa( &r->uaddr ) );
+    if ( sendmsg( udp_fd, &msg, 0 ) < n ) {
+       perror( "Writing socket" );
+    }
+}
+
 // Write a packet via the given Interface with encryption as specified.
 static void write_remote(unsigned char *buf, int n,struct Remote *r) {
     // A packet buffer
     unsigned char output[ BUFSIZE ];
     if ( n < 12 ) {
-       VERBOSE2OUT( "SEND %d bytes to %s\n", n, inet_stoa( &r->uaddr ) );
+       VERBOSE2OUT( "SENDing %d bytes to %s\n", n, inet_stoa( &r->uaddr ) );
     } else {
-       VERBOSE2OUT( "SEND %s -> %s to %s\n",
+       VERBOSE2OUT( "SENDing %d bytes %s -> %s to %s\n", n,
                     inet_mtoa( buf+6 ), inet_mtoa( buf ),
                     inet_stoa( &r->uaddr ) );
     }
     memcpy( output, buf, n ); // Use the private buffer for delivery
+    // Apply the TPG quirk
+    if ( tpg_quirk && ( n > 1460 ) && ( n < 1478 ) ) {
+       VERBOSE2OUT( "tpg quirk applied\n" );
+       n = 1478; // Add some "random" tag-along bytes
+    }
     if ( r->spec == 0 ) {
        if ( r->uaddr.in.sa_family == 0 ) {
            // Output to tap/stdio
@@ -904,33 +1048,10 @@ static void write_remote(unsigned char *buf, int n,struct Remote *r) {
     } else if ( r->spec->psk.keyfile ) {
        encrypt( output, n, &r->spec->psk );
     }
-    struct sockaddr *sock = &r->uaddr.in;
-    size_t size;
-    if ( sock->sa_family == AF_INET6 ) {
-       // Note that the size of +struct sockaddr_in6+ is actually
-       // larger than the size of +struct sockaddr+ (due to the
-       // addition of the +sin6_flowinfo+ field). It results in the
-       // following cuteness for passing arguments to +sendto+.
-       size = sizeof( struct sockaddr_in6 );
-       VERBOSE2OUT( "IPv6 UDP %d %s\n",
-                    udp_fd, inet_stoa( (struct SockAddr*) sock ) );
+    if ( udp6 ) {
+       sendpacket6( output, n, r );
     } else {
-       size = sizeof( struct sockaddr_in );
-       VERBOSE2OUT( "IPv4 UDP %d %s\n",
-                    udp_fd, inet_stoa( (struct SockAddr*) sock ) );
-    }
-    VERBOSE2OUT( "SEND %d bytes to %s [%s -> %s]\n",
-                n, inet_stoa( (struct SockAddr*) sock ),
-                ( n < 12 )? "" : inet_mtoa( buf+6 ),
-                ( n < 12 )? "" : inet_mtoa( buf )
-             );
-    // IS sendto thread safe??
-    if ( sendto( udp_fd, output, n, 0, sock, size ) < n ) {
-       perror( "Writing socket" );
-       // Invalidate remote temporarily instead? But if it's an
-       // "uplink" it should be retried eventually...
-       // For now: just ignore the error.
-       // exit( 1 );
+       sendpacket4( output, n, r );
     }
 }
 
@@ -974,15 +1095,17 @@ static void unmap_if_mapped(struct SockAddr *s) {
 }
 
 // Route the packet from the given src
-static struct Interface *input_check(
-    unsigned char *buf,ssize_t len,struct SockAddr *src )
-{
-    VERBOSE2OUT( "RECV %ld bytes from %s\n", len, inet_stoa( src ) );
+static struct Interface *input_check(PacketItem *pi) {
+    unsigned char *buf = pi->buffer;
+    ssize_t len = pi->len;
+    struct SockAddr *src = &pi->src;
+
+    VERBOSE2OUT( "RECV %zd bytes from %s\n", len, inet_stoa( src ) );
     struct Remote *r = 0;
-    struct timeb now = { 0 };
-    if ( ftime( &now ) ) {
+    struct timeval now = { 0 };
+    if ( gettimeofday( &now, 0 ) ) {
        perror( "RECV time" );
-       now.time = time( 0 );
+       now.tv_sec = time( 0 );
     }
     Remote_FIND( src, r );
     if ( r == 0 ) {
@@ -994,12 +1117,30 @@ static struct Interface *input_check(
        VERBOSEOUT( "New remote %s by %s\n", inet_stoa( src ), a->source );
        r = add_remote( src, a );
        //r->rec_when = now; // Set activity stamp of new remote
+       // Set the local addressing for the remote, unless set already
+       // Note: potential for multi-thread competition here
+       if ( udp6 ) {
+           r->ifindex = pi->dstinfo.in6.ipi6_ifindex;
+           r->laddr.in6.sin6_family = AF_INET6;
+           r->laddr.in6.sin6_port = htons( udp_port );
+           memcpy( &r->laddr.in6.sin6_addr,
+                   &pi->dstinfo.in6.ipi6_addr,
+                   16 );
+           unmap_if_mapped( &r->laddr );
+       } else {
+           r->ifindex = pi->dstinfo.in4.ipi_ifindex;
+           r->laddr.in4.sin_family = AF_INET;
+           r->laddr.in4.sin_port = htons( udp_port );
+           memcpy( &r->laddr.in4.sin_addr,
+                   &pi->dstinfo.in4.ipi_spec_dst,
+                   4 );
+       }
     }
     if ( len < 12 ) {
        // Ignore short data, but maintain channel
        r->rec_when = now; // Update activity stamp touched remote
        if ( len > 0 ) {
-           VERBOSEOUT( "Ignoring %ld bytes from %s\n",
+           VERBOSEOUT( "Ignoring %zd bytes from %s\n",
                        len, inet_stoa( src ) );
        }
        return 0;
@@ -1063,14 +1204,14 @@ static struct Interface *input_check(
        // The packet source MAC has arrived on other than its
        // previous channel. It thus gets dropped if tap/stdin is the
        // primary channel, or the time since the last packet for that
-       // interface is less than RECENT, with different limits for
-       // broadcast and unicast.
-       int64_t dmac = DIFFB_MILLIS( &now, &x->rec_when);
-       if ( x->remote->spec == 0 || RECENT( *buf & 1, dmac ) ) {
+       // interface is less than RECENT_MICROS, with different limits
+       // for broadcast and unicast.
+       uint64_t dmac = DIFF_MICROS( &now, &x->rec_when);
+       if ( x->remote->spec == 0 || RECENT_MICROS( *buf & 1, dmac ) ) {
            if ( verbose >= 2 ) {
                fprintf(
                    stderr,
-                   "Dropped. MAC %s (%ld) from %s, should be %s\n",
+                   "Dropped. MAC %s (%"PRIu64") from %s, should be %s\n",
                    inet_mtoa( buf+6 ), dmac,
                    inet_stoa( src ), inet_stoa( &x->remote->uaddr ) );
            }
@@ -1095,45 +1236,49 @@ static struct Interface *input_check(
 }
 
 // Check packet and deliver out
-static void route_packet(unsigned char *buf,int len,struct SockAddr *src) {
-    struct Interface *x = input_check( buf, len, src );
+static void route_packet(PacketItem *pi) {
+    //unsigned char *buf = pi->buffer;
+    //ssize_t len = pi->len;
+    struct Interface *x = input_check( pi );
     if ( x == 0 ) {
+       VERBOSE2OUT( "not a nice packet\n" );
        return; // not a nice packet
     }
-    if ( ( *buf & 1 ) == 0 ) {
+    if ( ( *pi->buffer & 1 ) == 0 ) {
        // unicast
        struct Interface *y = 0; // reuse for destination interface
-       Interface_FIND( buf, y );
+       Interface_FIND( pi->buffer, y );
        if ( y == 0 ) {
            VERBOSE2OUT( "RECV %s -> %s from %s without channel and dropped\n",
-                       inet_mtoa( buf+6 ), inet_mtoa( buf ),
+                       inet_mtoa( pi->buffer + 6 ), inet_mtoa( pi->buffer ),
                        inet_stoa( &x->remote->uaddr ) );
            return;
        }
        if ( x->remote == y->remote ) {
            VERBOSEOUT( "RECV loop for %s -> %s from %s to %s\n",
-                       inet_mtoa( buf+6 ), inet_mtoa( buf ),
+                       inet_mtoa( pi->buffer+6 ), inet_mtoa( pi->buffer ),
                        inet_stoa( &x->remote->uaddr ),
                        inet_stoa( &y->remote->uaddr ) );
            Interface_DEL( y ); // Need to see this interface again
            return;
        }
-       VERBOSE2OUT( "RECV route %s -> %s to %s\n",
-                    inet_mtoa( buf+6 ), inet_mtoa( buf ),
-                    inet_stoa( &y->remote->uaddr ) );
-       write_remote( buf, len, y->remote );
+       VERBOSE2OUT( "RECV route %s -> %s\n",
+                    inet_mtoa( pi->buffer+6 ),
+                    inet_mtoa( pi->buffer ) );
+       write_remote( pi->buffer, pi->len, y->remote );
        return;
     }
     // broadcast. +x+ is source interface
     // x->rec_when is not updated 
-    struct timeb now = { 0 };
-    if ( ftime( &now ) ) {
+    struct timeval now = { 0 };
+    if ( gettimeofday( &now, 0 ) ) {
        perror( "RECV time" );
-       now.time = time( 0 );
+       now.tv_sec = time( 0 );
     }
-    VERBOSE2OUT( "BC %s -> %s from %s\n",
-                inet_mtoa( buf+6 ), inet_mtoa( buf ),
-                inet_stoa( &x->remote->uaddr ) );
+    VERBOSE2OUT( "BC %s -> %s from %s to %s\n",
+                inet_mtoa( pi->buffer+6 ), inet_mtoa( pi->buffer ),
+                inet_stoa( &x->remote->uaddr ),
+                inet_stoa( &x->remote->laddr ) );
     struct Remote *r;
     unsigned int i = 0;
     Remote_LOCK;
@@ -1149,18 +1294,18 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) {
            continue;
        }
        if ( r->spec && ! is_uplink( r->spec ) &&
-            DIFFB_MILLIS( &now, &r->rec_when ) > VERYOLD_MILLIS ) {
+            DIFF_MICROS( &now, &r->rec_when ) > VERYOLD_MICROS ) {
            // remove old downlink connection
-           VERBOSEOUT( "Old remote discarded %s (%ld)\n",
+           VERBOSEOUT( "Old remote discarded %s (%"PRId64")\n",
                        inet_stoa( &r->uaddr ),
-                       TIMEB_MILLIS( &r->rec_when ) );
+                       TIME_MICROS( &r->rec_when ) );
            // Removing a downlink might have threading implications
            delete_remote( r );
            continue;
        }
        // Send packet to the remote
        // Only no-clash or to the tap/stdin
-       write_remote( buf, len, r );
+       write_remote( pi->buffer, pi->len, r );
     }
     Remote_UNLOCK;
 }
@@ -1169,6 +1314,7 @@ static void route_packet(unsigned char *buf,int len,struct SockAddr *src) {
 static struct {
     Queue full;
     Queue free;
+    sem_t reading;
 } todolist;
 
 // The threadcontrol program for handling packets.
@@ -1177,14 +1323,17 @@ static void *packet_handler(void *data) {
     for ( ;; ) {
        PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.full );
        if ( todo->fd == mcast.fd ) {
-           // Patch multicast address as source for multicast packet
-           route_packet( todo->buffer, todo->len, &mcast.sock );
+           // Patch in the multicast address as source for multicast packet
+           memcpy( &todo->src, &mcast.sock, sizeof( todo->src ) );
+           route_packet( todo );
        } else {
            if ( udp6 ) {
                unmap_if_mapped( &todo->src );
            }
-           route_packet( todo->buffer, todo->len, &todo->src );
+           route_packet( todo );
        }
+       memset( &todo->src, 0, sizeof( struct SockAddr ) );
+       memset( &todo->dstinfo, 0, sizeof( todo->dstinfo ) );
        Queue_addItem( &todolist.free, (QueueItem*) todo );
     }
     return 0;
@@ -1201,6 +1350,10 @@ void todolist_initialize(int nbuf,int nthr) {
        perror( "FATAL" );
        exit( 1 );
     }
+    if ( sem_init( &todolist.reading, 0, 1 ) ) {
+       perror( "FATAL" );
+       exit( 1 );
+    }
     Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) );
     for ( ; nthr > 0; nthr-- ) {
        pthread_t thread; // Temporary thread id
@@ -1208,50 +1361,64 @@ void todolist_initialize(int nbuf,int nthr) {
     }
 }
 
+// Reads a UDP packet on the given file descriptor and captures the
+// source and destination addresses of the UDP message.
+inline static ssize_t recvpacket(int fd,PacketItem *p) {
+    char data[100]; // Data area for "pktinfo"
+    struct iovec buffer[1] = {{ p->buffer, BUFSIZE }};
+    struct msghdr msg = {
+       .msg_name =  &p->src.in,
+       .msg_namelen = udp6? sizeof( p->src.in6 ) : sizeof( p->src.in4 ),
+       .msg_iov = buffer,
+       .msg_iovlen = 1,
+       .msg_control = data,
+       .msg_controllen = sizeof( data ),
+       .msg_flags = 0 // Return value
+    };
+    p->len = recvmsg( fd, &msg, udp6? 0 : IP_PKTINFO );
+    struct cmsghdr *cmsg = CMSG_FIRSTHDR( &msg );
+    if ( cmsg ) {
+       if ( udp6 ) {
+           memcpy( &p->dstinfo.in6, CMSG_DATA( cmsg ),
+                   sizeof( struct in6_pktinfo ) );
+           VERBOSE2OUT( "DEST= udp6 %d\n", p->dstinfo.in6.ipi6_ifindex );
+       } else {
+           memcpy( &p->dstinfo.in4, CMSG_DATA( cmsg ),
+                   sizeof( struct in_pktinfo ) );
+           VERBOSE2OUT( "DEST= %d\n", p->dstinfo.in4.ipi_ifindex );
+       }
+    }
+    return p->len;
+}
+
+
+
 // Read a full UDP packet into the given buffer, associate with a
 // connection, or create a new connection, the decrypt the as
 // specified, and capture the sender MAC address. The connection table
 // is updated for the new MAC address, However, if there is then a MAC
 // address clash in the connection table, then the associated remote
 // is removed, and the packet is dropped.
-static void doreadUDP(int fd) {
-    PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
-    socklen_t addrlen =
-       udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
-    memset( &todo->src, 0, sizeof( todo->src ) );
-    todo->fd = fd;
-    todo->len = recvfrom(
-       fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
-    if ( todo->len == -1) {
-       perror( "Receiving UDP" );
-       exit( 1 );
-    }
+static void *doreadUDP(void *data) {
+    int fd = ((ReaderData *) data)->fd;
+    while ( 1 ) {
+       PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
+       todo->fd = fd;
+       VERBOSE3OUT( "Reading packet on %d\n", fd );
+       ssize_t len = recvpacket( fd, todo );
+       if ( len == -1) {
+           perror( "Receiving UDP" );
+           exit( 1 );
+       }
 #ifdef GPROF
-    if ( len == 17 && memcmp( buf, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
-       exit( 0 );
-    }
-#endif
-    Queue_addItem( &todolist.full, (QueueItem*) todo );
-}
-
-// Handle packet received on the tap/stdio channel
-static void received_tap(unsigned char *buf, int len) {
-    static struct Remote *tap_remote = 0;
-    if ( tap_remote == 0 ) {
-       Remote_LOCK;
-       if ( tap_remote == 0 ) {
-           tap_remote = add_remote( 0, 0 );
+       if ( len == 17 &&
+            memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
+           exit( 0 );
        }
-       Remote_UNLOCK;
+#endif
+       Queue_addItem( &todolist.full, (QueueItem*) todo );
     }
-    PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
-    memcpy( (void*)todo->buffer, (void*)buf, len );
-    memcpy( (void*)&todo->src,
-           (void*)&tap_remote->uaddr,
-           sizeof( struct SockAddr ) );
-    todo->fd = 0;
-    todo->len = len;
-    Queue_addItem( &todolist.full, (QueueItem*) todo );
+    return 0;
 }
 
 // Read up to n bytes from the given file descriptor into the buffer
@@ -1288,11 +1455,11 @@ static void heartbeat(int fd) {
     VERBOSE3OUT( "heartbeat fd=%d\n", fd );
     struct Remote *r;
     unsigned int i = 0;
-    struct timeb now;
-    if ( ftime( &now ) ) {
+    struct timeval now;
+    if ( gettimeofday( &now, 0 ) ) {
        perror( "HEARTBEAT time" );
-       now.time = time( 0 );
-       now.millitm = 0;
+       now.tv_sec = time( 0 );
+       now.tv_usec = 0;
     }
     Remote_LOCK;
     for ( ; i < remotes.by_addr.size; i++ ) {
@@ -1303,7 +1470,7 @@ static void heartbeat(int fd) {
        r = (struct Remote *) tmp;
        VERBOSE3OUT( "heartbeat check %s\n", inet_stoa( &r->uaddr ) );
        if ( r->spec && is_uplink( r->spec ) ) {
-           if ( DIFFB_MILLIS( &now, &r->rec_when ) > HEARTBEAT_MILLIS ) {
+           if ( DIFF_MICROS( &now, &r->rec_when ) >= HEARTBEAT_MICROS ) {
                VERBOSE3OUT( "heartbeat %s\n", inet_stoa( &r->uaddr ) );
                write_remote( data, 0, r );
            }
@@ -1312,25 +1479,21 @@ static void heartbeat(int fd) {
     Remote_UNLOCK;
 }
 
-// The threadcontrol program for issuing heartbeat packets.
-// Regular heartbeat
-static void *hearbeater_thread(void *data) {
-    (void) data;
-    for ( ;; ) {
-       sleep( HEARTBEAT );
-       heartbeat( udp_fd );
-    }
-    return 0;
-}
-
 // Tell how to use this program and exit with failure.
 static void usage(void) {
     fprintf( stderr, "Packet tunneling over UDP, multiple channels, " );
-    fprintf( stderr, "version 0.2.5\n" );
+    fprintf( stderr, "version 1.5.3\n" );
     fprintf( stderr, "Usage: " );
-    fprintf( stderr,
- "%s [-v] [-4] [-B n] [-T n] [-m mcast] [-t tap] port [remote]+ \n",
-            progname );
+    fprintf( stderr, "%s [options] port [remote]+ \n", progname );
+    fprintf( stderr, "** options must be given or omitted in order!!\n" );
+    fprintf( stderr, " -v        = verbose log, -vv or -vvv for more logs\n" );
+    fprintf( stderr, " -tpg      = UDP transport quirk: avoid bad sizes\n" );
+    fprintf( stderr, " -4        = use an ipv4 UDP socket\n" );
+    fprintf( stderr, " -B n      = use n buffers (2*threads) by default\n");
+    fprintf( stderr, " -T n      = use n delivery threads (5 bu default)\n" );
+    fprintf( stderr, " -m mcast  = allow remotes on multicast address\n" );
+    fprintf( stderr, " -t tap    = use the nominated tap (or - for stdio)\n" );
+    fprintf( stderr, " -S source = use given source address for UDP\n" );
     exit( 1 );
 }
 
@@ -1356,102 +1519,60 @@ static int tun_alloc(char *dev, int flags) {
     return fd;
 }
 
-static void doreadTap() {
-    if ( stdio ) {
-       if ( input.end == 0 ) {
+// Handle packet received on the tap/stdio channel
+static void initialize_tap() {
+    // Ensure there is a Remote for this
+    static struct Remote *tap_remote = 0;
+    if ( tap_remote == 0 ) {
+       Remote_LOCK;
+       if ( tap_remote == 0 ) {
+           tap_remote = add_remote( 0, 0 );
+       }
+       Remote_UNLOCK;
+    }
+}
+
+// Thread to handle tap/stdio input
+static void *doreadTap(void *data) {
+    int fd = ((ReaderData*) data)->fd;
+    unsigned int end = 0; // Packet size
+    unsigned int cur = 0; // Amount read so far
+    size_t e;
+    PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
+    while ( 1 ) {
+       if ( stdio ) {
            uint16_t plength;
            int n = read_into( 0, (unsigned char *) &plength,
-                          sizeof( plength ), 0 );
+                              sizeof( plength ), 0 );
            if ( n == 0 ) {
                // Tap/stdio closed => exit silently
                exit( 0 );
            }
-           input.end = ntohs( plength );
-           input.cur = 0;
-       }
-       size_t e = input.end - input.cur;
-       unsigned char *p = input.buffer + input.cur;
-       if ( input.end > BUFSIZE ) {
-           // Oversize packets should be read and discarded
-           if ( e > BUFSIZE ) {
-               e = BUFSIZE;
-           }
-           p = input.buffer;
-       }
-       input.cur += read_into( 0, p, e, 1 );
-    } else {
-       input.end = doread( tap_fd, input.buffer, BUFSIZE );
-       input.cur = input.end;
-    }
-    if ( input.end == input.cur ) {
-       VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end );
-       if ( input.end <= BUFSIZE ) {
-           received_tap( input.buffer, input.end );
-       }
-       input.end = 0; // Ready for next packet
-    }
-    // End handling tap
-}
-
-// MAXFD is Finalized before multi-threading, and then remains constant.
-static int MAXFD;
-
-// This is the main packet handling loop
-static int packet_loop() {
-    // A packet buffer for receiving UDP
-    unsigned char buffer[ BUFSIZE ];
-    int n;
-    //time_t next_heartbeat = time(0);
-    int cycle = 0; // which channel to check first
-    while( 1 ) {
-       fd_set rd_set;
-       FD_ZERO( &rd_set );
-       if ( mcast.fd ) {
-           FD_SET( mcast.fd, &rd_set );
-       }
-       FD_SET( udp_fd, &rd_set );
-       if ( udp6 ) {
-           FD_SET( udp_fd, &rd_set );
-       }
-       if ( tap ) { // tap/stdio
-           FD_SET( tap_fd, &rd_set );
-       }
-       n = select( MAXFD, &rd_set, NULL, NULL, NULL );
-       VERBOSE3OUT( "select got %d\n", n );
-       if ( n < 0 ) {
-           if ( errno == EINTR ) {
-               continue;
+           end = ntohs( plength );
+           cur = 0;
+           while ( ( e = ( end - cur ) ) != 0 ) {
+               unsigned char *p = todo->buffer + cur;
+               if ( end > BUFSIZE ) {
+                   // Oversize packets should be read and discarded
+                   if ( e > BUFSIZE ) {
+                       e = BUFSIZE;
+                   }
+                   p = todo->buffer;
+               }
+               cur += read_into( 0, p, e, 1 );
            }
-           perror("select");
-           exit(1);
+       } else {
+           end = doread( fd, todo->buffer, BUFSIZE );
+           cur = end;
        }
-       if ( n == 0 ) {
-           continue;
-       }
-       // Process input with alternating priority across channels
-       for ( ;; cycle++ ) {
-           if ( cycle >= 3 ) {
-               cycle = 0;
-           }
-           if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) {
-               // Check and process UDP socket
-               doreadUDP( udp_fd ) ;
-               cycle = 1;
-               break;
-           }
-           if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) {
-               // Check and process multicast socket
-               doreadUDP( mcast.fd ) ;
-               cycle = 2;
-               break;
-           }
-           if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) {
-               // Check and process tap/stdio socket
-               doreadTap( tap_fd, buffer );
-               cycle = 0;
-               break;
-           }
+       VERBOSE3OUT( "TAP/stdio input %d bytes\n", end );
+       if ( end <= BUFSIZE ) {
+           todo->fd = 0;
+           todo->len = end;
+           Queue_addItem( &todolist.full, (QueueItem*) todo );
+           todo = (PacketItem*) Queue_getItem( &todolist.free );
        }
+       // End handling tap
     }
     return 0;
 }
@@ -1464,7 +1585,8 @@ static int packet_loop() {
 // remote = [ipv6(/maskwidth)](:port)(=key)
 // ip = ipv4 | [ipv6]
 int main(int argc, char *argv[]) {
-    int port, i;
+    pthread_t thread; // Temporary thread id
+    int i;
     progname = (unsigned char *) argv[0];
     ///// Parse command line arguments
     i = 1;
@@ -1484,6 +1606,11 @@ int main(int argc, char *argv[]) {
        i++;
        ENSUREARGS( 1 );
     }
+    if ( strncmp( "-tpg", argv[i], 4 ) == 0 ) {
+       tpg_quirk = 1;
+       i++;
+       ENSUREARGS( 1 );
+    }
     // then: optional -4
     if ( strncmp( "-4", argv[i], 2 ) == 0 ) {
        udp6 = 0;
@@ -1508,6 +1635,15 @@ int main(int argc, char *argv[]) {
        i += 2;
        ENSUREARGS( 1 );
     }
+    // then: optional -H seconds
+    if ( strncmp( "-H", argv[i], 2 ) == 0 ) {
+       ENSUREARGS( 2 );
+       if ( parse_heartbeat_rate( argv[i+1] ) ) {
+           usage();
+       }
+       i += 2;
+       ENSUREARGS( 1 );
+    }
     // then: optional -m mcast
     if ( strncmp( "-m", argv[i], 2 ) == 0 ) {
        ENSUREARGS( 2 );
@@ -1524,9 +1660,18 @@ int main(int argc, char *argv[]) {
        i += 2;
        ENSUREARGS( 1 );
     }
+    // Then optional source address for UDP
+    if ( strncmp( "-S", argv[i], 2 ) == 0 ) {
+       ENSUREARGS( 2 );
+       if ( parse_udp_source( argv[i+1] ) ) {
+           usage();
+       }
+       i += 2;
+       ENSUREARGS( 1 );
+    }
     // then: required port
-    if ( sscanf( argv[i++], "%d", &port ) != 1 ) {
-       fprintf( stderr, "Bad local port" );
+    if ( sscanf( argv[i++], "%d", &udp_port ) != 1 ) {
+       fprintf( stderr, "Bad local port: %s\n", argv[i-1] );
        usage();
     }
     // then: any number of allowed remotes
@@ -1559,7 +1704,6 @@ int main(int argc, char *argv[]) {
     }
     todolist_initialize( buffers_count, threads_count );
 
-    MAXFD = 0;
     // Set up the tap/stdio channel
     if ( tap ) {
        // set up the nominated tap
@@ -1570,15 +1714,13 @@ int main(int argc, char *argv[]) {
                exit(1);
            }
            VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd );
-           MAXFD = tap_fd;
            stdio = 0;
            // pretend a zero packet on the tap, for initializing.
-           received_tap( 0, 0 ); 
+           initialize_tap(); 
        } else {
            // set up for stdin/stdout local traffix
            setbuf( stdout, NULL ); // No buffering on stdout.
            tap_fd = 0; // actually stdin
-           MAXFD = 0;
            stdio = 1;
        }
     } else {
@@ -1591,9 +1733,6 @@ int main(int argc, char *argv[]) {
         VERBOSEOUT( "Using multicast %s:%d at %d\n",
                    inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ),
                    mcast.fd );
-       if ( mcast.fd > MAXFD ) {
-           MAXFD = mcast.fd;
-       }
     }
     // Set up the unicast UPD channel (all interfaces)
     if ( udp6 == 0 ) {
@@ -1604,14 +1743,23 @@ int main(int argc, char *argv[]) {
        }
        struct sockaddr_in udp_addr = {
            .sin_family = AF_INET,
-           .sin_port = htons( port ),
-           .sin_addr.s_addr = htonl(INADDR_ANY),
+           .sin_port = htons( udp_port ),
        };
+       if ( udp_source.family == 0 ) {
+           udp_addr.sin_addr.s_addr = htonl( INADDR_ANY );
+       } else {
+           udp_addr.sin_addr.s_addr = *((uint32_t*) udp_source.address); 
+       }
        if ( bind( udp_fd, (struct sockaddr*) &udp_addr, sizeof(udp_addr))) {
            fprintf( stderr, "Error binding socket!\n");
            exit(1);
        }
        VERBOSEOUT( "Using ipv4 UDP at %d\n", udp_fd );
+       int opt = 1;
+       if ( setsockopt( udp_fd, IPPROTO_IP, IP_PKTINFO, &opt, sizeof(opt)) ) {
+           fprintf( stderr, "Error configuring socket!\n");
+            exit(1);
+       }
     } else {
        // set up ipv6 socket
        if ( ( udp_fd = socket( AF_INET6, SOCK_DGRAM, 0 ) ) == 0 ) {
@@ -1620,19 +1768,21 @@ int main(int argc, char *argv[]) {
        }
        struct sockaddr_in6 udp6_addr = {
            .sin6_family = AF_INET6,
-           .sin6_port = htons( port ),
-           .sin6_addr = IN6ADDR_ANY_INIT,
+           .sin6_port = htons( udp_port ),
        };
+       memcpy( udp6_addr.sin6_addr.s6_addr, udp_source.address, 16 );
        if ( bind( udp_fd, (struct sockaddr*) &udp6_addr, sizeof(udp6_addr))) {
            fprintf( stderr, "Error binding socket!\n");
            exit(1);
        }
        VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd );
+       int opt = 1;
+       if ( setsockopt(
+                udp_fd, IPPROTO_IPV6, IPV6_RECVPKTINFO, &opt, sizeof(opt)) ) {
+           fprintf( stderr, "Error configuring socket!\n");
+            exit(1);
+       }
     }
-    if ( udp_fd > MAXFD ) {
-       MAXFD = udp_fd;
-    }
-    MAXFD++ ;
     // If not using stdio for local traffic, then stdin and stdout are
     // closed here, so as to avoid that any other traffic channel gets
     // 0 or 1 as its file descriptor. Note: stderr (2) is left open.
@@ -1640,13 +1790,31 @@ int main(int argc, char *argv[]) {
        close( 0 );
        close( 1 );
     }
-    VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n",
-                tap_fd, mcast.fd, udp_fd, MAXFD );
-
-    // Start heartbeater thread
-    pthread_t thread; // Temporary thread id -- not used
-    pthread_create( &thread, 0, hearbeater_thread, 0 );
+    VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n",
+                tap_fd, mcast.fd, udp_fd );
 
     // Handle packets
-    return packet_loop();
+    ReaderData udp_reader = { .fd = udp_fd };
+    pthread_create( &thread, 0, doreadUDP, &udp_reader );
+
+    if ( mcast.group.imr_multiaddr.s_addr ) {
+       ReaderData mcast_reader = { .fd = mcast.fd };
+       pthread_create( &thread, 0, doreadUDP, &mcast_reader );
+    }
+
+    if ( tap_fd || stdio ) {
+       ReaderData tap_reader = { .fd = tap_fd };
+       pthread_create( &thread, 0, doreadTap, &tap_reader );
+    }
+
+    // Start heartbeating to uplinks
+    for ( ;; ) {
+       if ( heart_rate != 0 ) {
+           sleep( heart_rate );
+           heartbeat( udp_fd );
+       } else {
+           sleep( 600 );
+       }
+    }
+    return 0;
 }