unsigned char buffer[ BUFSIZE ];
} PacketItem;
+typedef struct _ReaderData {
+ int fd;
+} ReaderData;
+
// heartbeat interval, in seconds
#define HEARTBEAT 30
#define HEARTBEAT_MICROS ( HEARTBEAT * 1000000 )
#define VERBOSE3OUT(fmt, ...) \
if ( verbose >= 3 ) PRINT( fprintf( stderr, fmt, ##__VA_ARGS__ ) )
-// A buffer for reading stdin in fragmented way, to allow outgoing
-// packets during the reading of stdin packets, if it's fragmented.
-static struct {
- unsigned char buffer[ BUFSIZE ]; // Packet data
- unsigned int end; // Packet size
- unsigned int cur; // Amount read so far
-} input;
-
// The actual name of this program (argv[0])
static unsigned char *progname;
static struct {
Queue full;
Queue free;
+ sem_t reading;
} todolist;
// The threadcontrol program for handling packets.
perror( "FATAL" );
exit( 1 );
}
+ if ( sem_init( &todolist.reading, 0, 1 ) ) {
+ perror( "FATAL" );
+ exit( 1 );
+ }
Queue_initialize( &todolist.free, nbuf, sizeof( PacketItem ) );
for ( ; nthr > 0; nthr-- ) {
pthread_t thread; // Temporary thread id
// is updated for the new MAC address, However, if there is then a MAC
// address clash in the connection table, then the associated remote
// is removed, and the packet is dropped.
-static void doreadUDP(int fd) {
- PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
- socklen_t addrlen =
- udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
- memset( &todo->src, 0, sizeof( todo->src ) );
- todo->fd = fd;
- todo->len = recvfrom(
- fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
- if ( todo->len == -1) {
- perror( "Receiving UDP" );
- exit( 1 );
- }
+static void *doreadUDP(void *data) {
+ int fd = ((ReaderData *) data)->fd;
+ while ( 1 ) {
+ PacketItem *todo = (PacketItem *) Queue_getItem( &todolist.free );
+ socklen_t addrlen =
+ udp6? sizeof( todo->src.in6 ) : sizeof( todo->src.in4 );
+ memset( &todo->src, 0, sizeof( todo->src ) );
+ todo->fd = fd;
+ todo->len = recvfrom(
+ fd, todo->buffer, BUFSIZE, 0, &todo->src.in, &addrlen );
+ if ( todo->len == -1) {
+ perror( "Receiving UDP" );
+ exit( 1 );
+ }
#ifdef GPROF
- if ( todo->len == 17 &&
- memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
- exit( 0 );
- }
-#endif
- Queue_addItem( &todolist.full, (QueueItem*) todo );
-}
-
-// Handle packet received on the tap/stdio channel
-static void received_tap(unsigned char *buf, int len) {
- static struct Remote *tap_remote = 0;
- if ( tap_remote == 0 ) {
- Remote_LOCK;
- if ( tap_remote == 0 ) {
- tap_remote = add_remote( 0, 0 );
+ if ( todo->len == 17 &&
+ memcmp( todo->buffer, "STOPSTOPSTOPSTOP", 16 ) == 0 ) {
+ exit( 0 );
}
- Remote_UNLOCK;
+#endif
+ Queue_addItem( &todolist.full, (QueueItem*) todo );
}
- PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
- memcpy( (void*)todo->buffer, (void*)buf, len );
- memcpy( (void*)&todo->src,
- (void*)&tap_remote->uaddr,
- sizeof( struct SockAddr ) );
- todo->fd = 0;
- todo->len = len;
- Queue_addItem( &todolist.full, (QueueItem*) todo );
+ return 0;
}
// Read up to n bytes from the given file descriptor into the buffer
Remote_UNLOCK;
}
-// The threadcontrol program for issuing heartbeat packets.
-// Regular heartbeat
-static void *hearbeater_thread(void *data) {
- (void) data;
- for ( ;; ) {
- sleep( HEARTBEAT );
- heartbeat( udp_fd );
- }
- return 0;
-}
-
// Tell how to use this program and exit with failure.
static void usage(void) {
fprintf( stderr, "Packet tunneling over UDP, multiple channels, " );
return fd;
}
-static void doreadTap() {
- if ( stdio ) {
- if ( input.end == 0 ) {
+// Handle packet received on the tap/stdio channel
+static void initialize_tap() {
+ // Ensure there is a Remote for this
+ static struct Remote *tap_remote = 0;
+ if ( tap_remote == 0 ) {
+ Remote_LOCK;
+ if ( tap_remote == 0 ) {
+ tap_remote = add_remote( 0, 0 );
+ }
+ Remote_UNLOCK;
+ }
+}
+
+// Thread to handle tap/stdio input
+static void *doreadTap(void *data) {
+ int fd = ((ReaderData*) data)->fd;
+ unsigned int end = 0; // Packet size
+ unsigned int cur = 0; // Amount read so far
+ size_t e;
+ PacketItem *todo = (PacketItem*) Queue_getItem( &todolist.free );
+ while ( 1 ) {
+ if ( stdio ) {
uint16_t plength;
int n = read_into( 0, (unsigned char *) &plength,
- sizeof( plength ), 0 );
+ sizeof( plength ), 0 );
if ( n == 0 ) {
// Tap/stdio closed => exit silently
exit( 0 );
}
- input.end = ntohs( plength );
- input.cur = 0;
- }
- size_t e = input.end - input.cur;
- unsigned char *p = input.buffer + input.cur;
- if ( input.end > BUFSIZE ) {
- // Oversize packets should be read and discarded
- if ( e > BUFSIZE ) {
- e = BUFSIZE;
- }
- p = input.buffer;
- }
- input.cur += read_into( 0, p, e, 1 );
- } else {
- input.end = doread( tap_fd, input.buffer, BUFSIZE );
- input.cur = input.end;
- }
- if ( input.end == input.cur ) {
- VERBOSE3OUT( "TAP/stdio input %d bytes\n", input.end );
- if ( input.end <= BUFSIZE ) {
- received_tap( input.buffer, input.end );
- }
- input.end = 0; // Ready for next packet
- }
- // End handling tap
-}
-
-// MAXFD is Finalized before multi-threading, and then remains constant.
-static int MAXFD;
-
-// This is the main packet handling loop
-static int packet_loop() {
- // A packet buffer for receiving UDP
- unsigned char buffer[ BUFSIZE ];
- int n;
- //time_t next_heartbeat = time(0);
- int cycle = 0; // which channel to check first
- while( 1 ) {
- fd_set rd_set;
- FD_ZERO( &rd_set );
- if ( mcast.fd ) {
- FD_SET( mcast.fd, &rd_set );
- }
- FD_SET( udp_fd, &rd_set );
- if ( udp6 ) {
- FD_SET( udp_fd, &rd_set );
- }
- if ( tap ) { // tap/stdio
- FD_SET( tap_fd, &rd_set );
- }
- n = select( MAXFD, &rd_set, NULL, NULL, NULL );
- VERBOSE3OUT( "select got %d\n", n );
- if ( n < 0 ) {
- if ( errno == EINTR ) {
- continue;
+ end = ntohs( plength );
+ cur = 0;
+ while ( ( e = ( end - cur ) ) != 0 ) {
+ unsigned char *p = todo->buffer + cur;
+ if ( end > BUFSIZE ) {
+ // Oversize packets should be read and discarded
+ if ( e > BUFSIZE ) {
+ e = BUFSIZE;
+ }
+ p = todo->buffer;
+ }
+ cur += read_into( 0, p, e, 1 );
}
- perror("select");
- exit(1);
+ } else {
+ end = doread( fd, todo->buffer, BUFSIZE );
+ cur = end;
}
- if ( n == 0 ) {
- continue;
- }
- // Process input with alternating priority across channels
- for ( ;; cycle++ ) {
- if ( cycle >= 3 ) {
- cycle = 0;
- }
- if ( cycle == 0 && FD_ISSET( udp_fd, &rd_set ) ) {
- // Check and process UDP socket
- doreadUDP( udp_fd ) ;
- cycle = 1;
- break;
- }
- if ( cycle == 1 && FD_ISSET( mcast.fd, &rd_set ) ) {
- // Check and process multicast socket
- doreadUDP( mcast.fd ) ;
- cycle = 2;
- break;
- }
- if ( cycle == 2 && FD_ISSET( tap_fd, &rd_set ) ) {
- // Check and process tap/stdio socket
- doreadTap( tap_fd, buffer );
- cycle = 0;
- break;
- }
+ VERBOSE3OUT( "TAP/stdio input %d bytes\n", end );
+ if ( end <= BUFSIZE ) {
+ todo->fd = 0;
+ todo->len = end;
+ Queue_addItem( &todolist.full, (QueueItem*) todo );
+ todo = (PacketItem*) Queue_getItem( &todolist.free );
}
+ // End handling tap
}
return 0;
}
// remote = [ipv6(/maskwidth)](:port)(=key)
// ip = ipv4 | [ipv6]
int main(int argc, char *argv[]) {
+ pthread_t thread; // Temporary thread id
int port, i;
progname = (unsigned char *) argv[0];
///// Parse command line arguments
}
todolist_initialize( buffers_count, threads_count );
- MAXFD = 0;
// Set up the tap/stdio channel
if ( tap ) {
// set up the nominated tap
exit(1);
}
VERBOSEOUT( "Using tap %s at %d\n", tap, tap_fd );
- MAXFD = tap_fd;
stdio = 0;
// pretend a zero packet on the tap, for initializing.
- received_tap( 0, 0 );
+ initialize_tap();
} else {
// set up for stdin/stdout local traffix
setbuf( stdout, NULL ); // No buffering on stdout.
tap_fd = 0; // actually stdin
- MAXFD = 0;
stdio = 1;
}
} else {
VERBOSEOUT( "Using multicast %s:%d at %d\n",
inet_nmtoa( x, 4 ), ntohs( mcast.sock.in4.sin_port ),
mcast.fd );
- if ( mcast.fd > MAXFD ) {
- MAXFD = mcast.fd;
- }
}
// Set up the unicast UPD channel (all interfaces)
if ( udp6 == 0 ) {
}
VERBOSEOUT( "Using ipv6 UDP at %d\n", udp_fd );
}
- if ( udp_fd > MAXFD ) {
- MAXFD = udp_fd;
- }
- MAXFD++ ;
// If not using stdio for local traffic, then stdin and stdout are
// closed here, so as to avoid that any other traffic channel gets
// 0 or 1 as its file descriptor. Note: stderr (2) is left open.
close( 0 );
close( 1 );
}
- VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d max=%d\n",
- tap_fd, mcast.fd, udp_fd, MAXFD );
-
- // Start heartbeater thread
- pthread_t thread; // Temporary thread id -- not used
- pthread_create( &thread, 0, hearbeater_thread, 0 );
+ VERBOSE2OUT( "Socket loop tap=%d mcast=%d udp=%d\n",
+ tap_fd, mcast.fd, udp_fd );
// Handle packets
- return packet_loop();
+ ReaderData udp_reader = { .fd = udp_fd };
+ pthread_create( &thread, 0, doreadUDP, &udp_reader );
+
+ if ( mcast.group.imr_multiaddr.s_addr ) {
+ ReaderData mcast_reader = { .fd = mcast.fd };
+ pthread_create( &thread, 0, doreadUDP, &mcast_reader );
+ }
+
+ if ( tap_fd || stdio ) {
+ ReaderData tap_reader = { .fd = tap_fd };
+ pthread_create( &thread, 0, doreadTap, &tap_reader );
+ }
+
+ // Start heartbeating to uplinks
+ for ( ;; ) {
+ sleep( HEARTBEAT );
+ heartbeat( udp_fd );
+ }
+ return 0;
}