X-Git-Url: https://git.rrq.au/?a=blobdiff_plain;f=rrqnet.c;h=69c363c0f9c30bf144a069520cf85270ea919a72;hb=refs%2Ftags%2F1.6.2;hp=739eff6181cded40da5b396b2e5b19c23b0574b2;hpb=b298d167067a38cd2cdc863d4e0052de2a2701c0;p=rrq%2Frrqnet.git diff --git a/rrqnet.c b/rrqnet.c index 739eff6..69c363c 100644 --- a/rrqnet.c +++ b/rrqnet.c @@ -16,6 +16,7 @@ #include #include +#include #include #include #include @@ -66,7 +67,8 @@ struct Allowed { // Details of actualized connections. struct Remote { struct SockAddr uaddr; // The remote IP address - struct SockAddr laddr; // The local IP address + struct SockAddr laddr; // The local IP for this remote + int ifindex; // The local interface index struct Allowed *spec; // Rule being instantiated struct timeval rec_when; // Last received packet time, in seconds }; @@ -85,7 +87,10 @@ typedef struct _PacketItem { QueueItem base; int fd; struct SockAddr src; // the remote IP for this packet - struct SockAddr dst; // the local IP for this packet + union { + struct in_pktinfo in4; + struct in6_pktinfo in6; + } dstinfo; // The PKTINFO for this packet ssize_t len; unsigned char buffer[ BUFSIZE ]; } PacketItem; @@ -95,8 +100,7 @@ typedef struct _ReaderData { } ReaderData; // heartbeat interval, in seconds -#define HEARTBEAT 30 -#define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 ) +#define HEARTBEAT_MICROS ( heart_rate * 1000000 ) // Macros for timing, for struct timeval variables #define TIME_MICROS(TM) (((int64_t) (TM)->tv_sec * 1000000) + (TM)->tv_usec ) @@ -177,8 +181,10 @@ static int stdio = 0; // Default is neither stdio nor tap static char *tap = 0; // Name of tap, if any, or "-" for stdio static int tap_fd = 0; // Also used for stdin in stdio mode static int udp_fd; +static int udp_port; static int threads_count = 0; static int buffers_count = 0; +static int heart_rate = 30; // Setup for multicast channel static struct { @@ -686,6 +692,14 @@ static int parse_threads_count(char *arg) { return 0; } +static int parse_heartbeat_rate(char *arg) { + if ( ( sscanf( arg, "%u", &heart_rate ) != 1 ) || heart_rate < 0 ) { + return 1; + } + VERBOSEOUT( "** Heartbeat rate = %d\n", heart_rate ); + return 0; +} + static int parse_buffers_count(char *arg) { if ( ( sscanf( arg, "%u", &buffers_count ) != 1 ) || buffers_count < 1 ) { return 1; @@ -916,6 +930,90 @@ static int write_tap(unsigned char *buf, int n) { return dowrite( tap_fd, buf, n ); } + +// All sorts of fiddling is needed to set the source address for UDP +// And 2 different ways for pure ipv4 versus ipv6 sockets +static void sendpacket4(unsigned char *buf, int n,struct Remote *r) { + // The UDP socket is pure ipv4 + struct iovec data[1] = {{ .iov_base = buf, .iov_len = n }}; + struct { + struct cmsghdr hdr; + struct in_pktinfo data; + } control = { + .hdr.cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)), + .hdr.cmsg_level = IPPROTO_IP, + .hdr.cmsg_type = IP_PKTINFO, + .data.ipi_ifindex = r->ifindex, + .data.ipi_spec_dst = r->laddr.in4.sin_addr + }; + struct msghdr msg = { + .msg_name = &r->uaddr.in4, + .msg_namelen = sizeof( struct sockaddr_in ), + .msg_iov = data, + .msg_iovlen = 1, + .msg_control = &control, + .msg_controllen = CMSG_SPACE( sizeof( struct in_pktinfo ) ), + .msg_flags = 0 // unused + }; + VERBOSE2OUT( "sendmsg ipv4 %zu from %s to %s\n", + msg.msg_controllen, + inet_stoa( &r->laddr ), + inet_stoa( &r->uaddr ) ); + if ( sendmsg( udp_fd, &msg, 0 ) < n ) { + perror( "Writing socket" ); + } +} + +static void sendpacket6(unsigned char *buf, int n,struct Remote *r) { + // The UDP socket is ipv6, possibly with mapped ipv4 address(es) + struct iovec data[1] = {{ .iov_base = buf, .iov_len = n }}; + struct { + struct cmsghdr hdr; + union { + struct in_pktinfo in4; + struct in6_pktinfo in6; + } data; + } control; + struct msghdr msg = { + .msg_name = &r->uaddr.in6, + .msg_namelen = sizeof( struct sockaddr_in6 ), + .msg_iov = data, + .msg_iovlen = 1, + .msg_control = 0, + .msg_controllen = 0, + .msg_flags = 0 // unused + }; + if ( r->ifindex ) { + switch ( r->laddr.in.sa_family ) { + case AF_INET6: + control.hdr.cmsg_len = CMSG_LEN(sizeof(struct in6_pktinfo)); + control.hdr.cmsg_level = IPPROTO_IPV6; + control.hdr.cmsg_type = IPV6_PKTINFO; + control.data.in6.ipi6_ifindex = r->ifindex; + memcpy( &control.data.in6.ipi6_addr, &r->laddr.in6.sin6_addr, 16 ); + msg.msg_control = &control; + msg.msg_controllen = CMSG_SPACE( sizeof( struct in6_pktinfo ) ); + break; + case AF_INET: + control.hdr.cmsg_len = CMSG_LEN(sizeof(struct in_pktinfo)); + control.hdr.cmsg_level = IPPROTO_IP; + control.hdr.cmsg_type = IP_PKTINFO; + control.data.in4.ipi_ifindex = r->ifindex; + control.data.in4.ipi_spec_dst = r->laddr.in4.sin_addr; + msg.msg_control = &control; + msg.msg_controllen = CMSG_SPACE( sizeof( struct in_pktinfo ) ); + break; + } + } + VERBOSE2OUT( "sendmsg ipv6 %d from %s to %s\n", + r->ifindex, + inet_stoa( &r->laddr ), + inet_stoa( &r->uaddr ) ); + if ( sendmsg( udp_fd, &msg, 0 ) < n ) { + perror( "Writing socket" ); + } +} + // Write a packet via the given Interface with encryption as specified. static void write_remote(unsigned char *buf, int n,struct Remote *r) { // A packet buffer @@ -923,9 +1021,8 @@ static void write_remote(unsigned char *buf, int n,struct Remote *r) { if ( n < 12 ) { VERBOSE2OUT( "SENDing %d bytes to %s\n", n, inet_stoa( &r->uaddr ) ); } else { - VERBOSE2OUT( "SENDing %d bytes %s -> %s from %s to %s\n", n, + VERBOSE2OUT( "SENDing %d bytes %s -> %s to %s\n", n, inet_mtoa( buf+6 ), inet_mtoa( buf ), - inet_stoa( &r->laddr ), inet_stoa( &r->uaddr ) ); } memcpy( output, buf, n ); // Use the private buffer for delivery @@ -951,82 +1048,10 @@ static void write_remote(unsigned char *buf, int n,struct Remote *r) { } else if ( r->spec->psk.keyfile ) { encrypt( output, n, &r->spec->psk ); } - // Reserve for packet addressing - struct in_pktinfo pkt4info = { - .ipi_ifindex = 0, /* Interface index */ - .ipi_spec_dst.s_addr = 0, /* Local address */ - .ipi_addr.s_addr = 0, /* Header Destination address */ - }; - struct in6_pktinfo pkt6info = { - .ipi6_addr.s6_addr32 = { 0, 0, 0, 0 }, - .ipi6_ifindex = 0, - }; - void *pktinfo = 0; - int pktinfosize = 0; - struct sockaddr_in *sock4 = &r->uaddr.in4; - struct sockaddr_in6 *sock6 = &r->uaddr.in6; - void *sock; - size_t size; - int flags = 0; if ( udp6 ) { - // Note that the size of +struct sockaddr_in6+ is actually - // larger than the size of +struct sockaddr+ (due to the - // addition of the +sin6_flowinfo+ field). It results in the - // following cuteness for passing arguments to +sendto+. - sock = sock6; - size = sizeof( struct sockaddr_in6 ); - VERBOSE2OUT( "IPv6 UDP %d %s %s\n", udp_fd, - inet_stoa( &r->laddr ), - inet_stoa( &r->uaddr ) ); - switch ( r->laddr.in.sa_family ) { - case AF_INET6: - memcpy( &pkt6info.ipi6_addr, &sock6->sin6_addr, 16 ); - pktinfo = &pkt6info; - pktinfosize = sizeof( pkt6info ); - flags = IPV6_PKTINFO; - break; - case AF_INET: - memcpy( &pkt4info.ipi_spec_dst, &sock4->sin_addr, 4 ); - pktinfo = &pkt4info; - pktinfosize = sizeof( pkt4info ); - flags = IP_PKTINFO; - break; - } + sendpacket6( output, n, r ); } else { - sock = sock4; - size = sizeof( struct sockaddr_in ); - VERBOSE2OUT( "IPv4 UDP %d %s %s\n", udp_fd, - inet_stoa( &r->laddr ), - inet_stoa( &r->uaddr ) ); - memcpy( &pkt4info.ipi_spec_dst, &sock4->sin_addr, 4 ); - pktinfo = &pkt4info; - pktinfosize = sizeof( pkt4info ); - flags = IP_PKTINFO; - } - VERBOSE2OUT( "SEND %d bytes from %s to %s [%s -> %s]\n", - n, - inet_stoa( &r->laddr ), - inet_stoa( &r->uaddr ), - ( n < 12 )? "" : inet_mtoa( buf+6 ), - ( n < 12 )? "" : inet_mtoa( buf ) - ); - // IS sendmsg thread safe?? - struct iovec data[1] = {{ output, n }}; - struct msghdr msg = { - .msg_name = sock, - .msg_namelen = size, - .msg_iov = data, - .msg_iovlen = 1, - .msg_control = pktinfo, - .msg_controllen = pktinfosize, - .msg_flags = 0 // unused - }; - if ( sendmsg( udp_fd, &msg, flags ) < n ) { - perror( "Writing socket" ); - // Invalidate remote temporarily instead? But if it's an - // "uplink" it should be retried eventually... - // For now: just ignore the error. - // exit( 1 ); + sendpacket4( output, n, r ); } } @@ -1070,10 +1095,12 @@ static void unmap_if_mapped(struct SockAddr *s) { } // Route the packet from the given src -static struct Interface *input_check( - unsigned char *buf,ssize_t len,struct SockAddr *src ) -{ - VERBOSE2OUT( "RECV %ld bytes from %s\n", len, inet_stoa( src ) ); +static struct Interface *input_check(PacketItem *pi) { + unsigned char *buf = pi->buffer; + ssize_t len = pi->len; + struct SockAddr *src = &pi->src; + + VERBOSE2OUT( "RECV %zd bytes from %s\n", len, inet_stoa( src ) ); struct Remote *r = 0; struct timeval now = { 0 }; if ( gettimeofday( &now, 0 ) ) { @@ -1090,12 +1117,30 @@ static struct Interface *input_check( VERBOSEOUT( "New remote %s by %s\n", inet_stoa( src ), a->source ); r = add_remote( src, a ); //r->rec_when = now; // Set activity stamp of new remote + // Set the local addressing for the remote, unless set already + // Note: potential for multi-thread competition here + if ( udp6 ) { + r->ifindex = pi->dstinfo.in6.ipi6_ifindex; + r->laddr.in6.sin6_family = AF_INET6; + r->laddr.in6.sin6_port = htons( udp_port ); + memcpy( &r->laddr.in6.sin6_addr, + &pi->dstinfo.in6.ipi6_addr, + 16 ); + unmap_if_mapped( &r->laddr ); + } else { + r->ifindex = pi->dstinfo.in4.ipi_ifindex; + r->laddr.in4.sin_family = AF_INET; + r->laddr.in4.sin_port = htons( udp_port ); + memcpy( &r->laddr.in4.sin_addr, + &pi->dstinfo.in4.ipi_spec_dst, + 4 ); + } } if ( len < 12 ) { // Ignore short data, but maintain channel r->rec_when = now; // Update activity stamp touched remote if ( len > 0 ) { - VERBOSEOUT( "Ignoring %ld bytes from %s\n", + VERBOSEOUT( "Ignoring %zd bytes from %s\n", len, inet_stoa( src ) ); } return 0; @@ -1161,12 +1206,12 @@ static struct Interface *input_check( // primary channel, or the time since the last packet for that // interface is less than RECENT_MICROS, with different limits // for broadcast and unicast. - int64_t dmac = DIFF_MICROS( &now, &x->rec_when); + uint64_t dmac = DIFF_MICROS( &now, &x->rec_when); if ( x->remote->spec == 0 || RECENT_MICROS( *buf & 1, dmac ) ) { if ( verbose >= 2 ) { fprintf( stderr, - "Dropped. MAC %s (%ld) from %s, should be %s\n", + "Dropped. MAC %s (%"PRIu64") from %s, should be %s\n", inet_mtoa( buf+6 ), dmac, inet_stoa( src ), inet_stoa( &x->remote->uaddr ) ); } @@ -1192,37 +1237,35 @@ static struct Interface *input_check( // Check packet and deliver out static void route_packet(PacketItem *pi) { - unsigned char *buf = pi->buffer; - int len = pi->len; - struct SockAddr *src = &pi->src; - struct Interface *x = input_check( buf, len, src ); + //unsigned char *buf = pi->buffer; + //ssize_t len = pi->len; + struct Interface *x = input_check( pi ); if ( x == 0 ) { + VERBOSE2OUT( "not a nice packet\n" ); return; // not a nice packet } - if ( ( *buf & 1 ) == 0 ) { + if ( ( *pi->buffer & 1 ) == 0 ) { // unicast struct Interface *y = 0; // reuse for destination interface - Interface_FIND( buf, y ); + Interface_FIND( pi->buffer, y ); if ( y == 0 ) { VERBOSE2OUT( "RECV %s -> %s from %s without channel and dropped\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), + inet_mtoa( pi->buffer + 6 ), inet_mtoa( pi->buffer ), inet_stoa( &x->remote->uaddr ) ); return; } if ( x->remote == y->remote ) { VERBOSEOUT( "RECV loop for %s -> %s from %s to %s\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), + inet_mtoa( pi->buffer+6 ), inet_mtoa( pi->buffer ), inet_stoa( &x->remote->uaddr ), inet_stoa( &y->remote->uaddr ) ); Interface_DEL( y ); // Need to see this interface again return; } - // Set the local address for the remote - memcpy( &x->remote->laddr, &pi->dst, sizeof( pi->dst ) ); - VERBOSE2OUT( "RECV route %s -> %s using %s\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), - inet_stoa( &x->remote->laddr ) ); - write_remote( buf, len, y->remote ); + VERBOSE2OUT( "RECV route %s -> %s\n", + inet_mtoa( pi->buffer+6 ), + inet_mtoa( pi->buffer ) ); + write_remote( pi->buffer, pi->len, y->remote ); return; } // broadcast. +x+ is source interface @@ -1232,9 +1275,10 @@ static void route_packet(PacketItem *pi) { perror( "RECV time" ); now.tv_sec = time( 0 ); } - VERBOSE2OUT( "BC %s -> %s from %s\n", - inet_mtoa( buf+6 ), inet_mtoa( buf ), - inet_stoa( &x->remote->uaddr ) ); + VERBOSE2OUT( "BC %s -> %s from %s to %s\n", + inet_mtoa( pi->buffer+6 ), inet_mtoa( pi->buffer ), + inet_stoa( &x->remote->uaddr ), + inet_stoa( &x->remote->laddr ) ); struct Remote *r; unsigned int i = 0; Remote_LOCK; @@ -1252,7 +1296,7 @@ static void route_packet(PacketItem *pi) { if ( r->spec && ! is_uplink( r->spec ) && DIFF_MICROS( &now, &r->rec_when ) > VERYOLD_MICROS ) { // remove old downlink connection - VERBOSEOUT( "Old remote discarded %s (%ld)\n", + VERBOSEOUT( "Old remote discarded %s (%"PRId64")\n", inet_stoa( &r->uaddr ), TIME_MICROS( &r->rec_when ) ); // Removing a downlink might have threading implications @@ -1261,7 +1305,7 @@ static void route_packet(PacketItem *pi) { } // Send packet to the remote // Only no-clash or to the tap/stdin - write_remote( buf, len, r ); + write_remote( pi->buffer, pi->len, r ); } Remote_UNLOCK; } @@ -1285,12 +1329,11 @@ static void *packet_handler(void *data) { } else { if ( udp6 ) { unmap_if_mapped( &todo->src ); - unmap_if_mapped( &todo->dst ); } route_packet( todo ); } memset( &todo->src, 0, sizeof( struct SockAddr ) ); - memset( &todo->dst, 0, sizeof( struct SockAddr ) ); + memset( &todo->dstinfo, 0, sizeof( todo->dstinfo ) ); Queue_addItem( &todolist.free, (QueueItem*) todo ); } return 0; @@ -1336,17 +1379,13 @@ inline static ssize_t recvpacket(int fd,PacketItem *p) { struct cmsghdr *cmsg = CMSG_FIRSTHDR( &msg ); if ( cmsg ) { if ( udp6 ) { - struct in6_pktinfo *pinf = (struct in6_pktinfo*) CMSG_DATA( cmsg ); - p->dst.in6.sin6_family = AF_INET6; - memcpy( &p->dst.in6.sin6_addr, &pinf->ipi6_addr, 16 ); - VERBOSE2OUT( "DEST= udp6 %d %s\n", - pinf->ipi6_ifindex, inet_stoa( &p->dst ) ); + memcpy( &p->dstinfo.in6, CMSG_DATA( cmsg ), + sizeof( struct in6_pktinfo ) ); + VERBOSE2OUT( "DEST= udp6 %d\n", p->dstinfo.in6.ipi6_ifindex ); } else { - struct in_pktinfo *pinf = (struct in_pktinfo*) CMSG_DATA( cmsg ); - p->dst.in4.sin_family = AF_INET; - p->dst.in4.sin_addr = pinf->ipi_addr; - VERBOSE2OUT( "DEST= %d %s\n", - pinf->ipi_ifindex, inet_stoa( &p->dst ) ); + memcpy( &p->dstinfo.in4, CMSG_DATA( cmsg ), + sizeof( struct in_pktinfo ) ); + VERBOSE2OUT( "DEST= %d\n", p->dstinfo.in4.ipi_ifindex ); } } return p->len; @@ -1431,7 +1470,7 @@ static void heartbeat(int fd) { r = (struct Remote *) tmp; VERBOSE3OUT( "heartbeat check %s\n", inet_stoa( &r->uaddr ) ); if ( r->spec && is_uplink( r->spec ) ) { - if ( DIFF_MICROS( &now, &r->rec_when ) > HEARTBEAT_MICROS ) { + if ( DIFF_MICROS( &now, &r->rec_when ) >= HEARTBEAT_MICROS ) { VERBOSE3OUT( "heartbeat %s\n", inet_stoa( &r->uaddr ) ); write_remote( data, 0, r ); } @@ -1547,7 +1586,7 @@ static void *doreadTap(void *data) { // ip = ipv4 | [ipv6] int main(int argc, char *argv[]) { pthread_t thread; // Temporary thread id - int port, i; + int i; progname = (unsigned char *) argv[0]; ///// Parse command line arguments i = 1; @@ -1596,6 +1635,15 @@ int main(int argc, char *argv[]) { i += 2; ENSUREARGS( 1 ); } + // then: optional -H seconds + if ( strncmp( "-H", argv[i], 2 ) == 0 ) { + ENSUREARGS( 2 ); + if ( parse_heartbeat_rate( argv[i+1] ) ) { + usage(); + } + i += 2; + ENSUREARGS( 1 ); + } // then: optional -m mcast if ( strncmp( "-m", argv[i], 2 ) == 0 ) { ENSUREARGS( 2 ); @@ -1622,7 +1670,7 @@ int main(int argc, char *argv[]) { ENSUREARGS( 1 ); } // then: required port - if ( sscanf( argv[i++], "%d", &port ) != 1 ) { + if ( sscanf( argv[i++], "%d", &udp_port ) != 1 ) { fprintf( stderr, "Bad local port: %s\n", argv[i-1] ); usage(); } @@ -1695,7 +1743,7 @@ int main(int argc, char *argv[]) { } struct sockaddr_in udp_addr = { .sin_family = AF_INET, - .sin_port = htons( port ), + .sin_port = htons( udp_port ), }; if ( udp_source.family == 0 ) { udp_addr.sin_addr.s_addr = htonl( INADDR_ANY ); @@ -1720,7 +1768,7 @@ int main(int argc, char *argv[]) { } struct sockaddr_in6 udp6_addr = { .sin6_family = AF_INET6, - .sin6_port = htons( port ), + .sin6_port = htons( udp_port ), }; memcpy( udp6_addr.sin6_addr.s6_addr, udp_source.address, 16 ); if ( bind( udp_fd, (struct sockaddr*) &udp6_addr, sizeof(udp6_addr))) { @@ -1761,8 +1809,12 @@ int main(int argc, char *argv[]) { // Start heartbeating to uplinks for ( ;; ) { - sleep( HEARTBEAT ); - heartbeat( udp_fd ); + if ( heart_rate != 0 ) { + sleep( heart_rate ); + heartbeat( udp_fd ); + } else { + sleep( 600 ); + } } return 0; }