As announced in A.L.F.R.E.D. issue #202 (https://www.open-mesh.org/issues/202), I gave implementing server-to-server communication via TCP a spin.
The result is this - rather large - patch. I'd be glad for comments on the implementation. Also, I might split this into more commits, but I think I need ideas on how to properly separate the various parts - they mostly make sense to me as a unit only.
A rough overview:
alfred.h: * struct tcp_connection is new, stores socket fd, pointer to data buffer, a counter for data already read from socket (TCP is a stream and we can't rely on self-contained datagrams), and list management * struct interface is extended by a socket fd for the TCP socket and a list of "tcp_connection"s * struct globals is extended by a "requestproto" field that stores desired operation mode.
main.c: * TCP operation is for now triggered by a command line flag "-t".
netsock.c: * for each interface, a TCP socket, bound/listening, is created and teared down along with the UDP sockets * a list of tcp_connection sockets is kept, managed and cleared alongside * the sockets are incorporated into the fd_set management for select()
recv.c: * process_alfred_request() gets another argument for a socket fd. It just gets handed over to push_data() for sending the reply over the same socket the request came from. * recv_alfred_stream() is new and the analog function to recv_alfred_packet(). It manages the data buffer and will act on a received packet if and when it is completely received. ALFRED_REQUEST, ALFRED_PUSH_DATA and ALFRED_STATUS_TXEND packets are handled for now.
send.c: * connect_tcp() function for setting up a new connection to a (TCP) server * push_data() gets a new argument for a socket fd (c.f. process_alfred_request() in recv.c). If said argument is >=0, the function will operate on the socket rather than recvmsg() a datagram. * sync_data() will act on requestproto configuration in globals struct and opt to make the sync via TCP if configured to do so * send_alfred_stream() will use connect_tcp() to create a new connection, then send a packet analog to send_alfred_packet() and will then register the socket to the list of "tcp_connection"s for the interface it is currently operating upon. This way, the socket can receive a reply and will be handled as long as it is not closed on the other end.
server.c: * check_if_socket() will tear down the TCP server and client sockets along with the UDP sockets
unix_sock.c: * depending on the globals' requestproto setting, a REQUEST will be send via TCP rather than UDP.
Testing has been only in an artificial virtual setup for now. I have some hopes to get "mission clearance" for giving it a spin in our local "Freifunk" setup. Even then TCP will most certainly only be used on a handful of gateways since on the majority of nodes, reading values from A.L.F.R.E.D. is not really needed (they are mostly push-only).
I'd be glad to hear some feedback.
-hwh
Hans-Werner Hilse (1): alfred: implement TCP support for server-to-server communication
alfred.h | 24 +++++++++- main.c | 8 +++- netsock.c | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- recv.c | 88 ++++++++++++++++++++++++++++++++++-- send.c | 119 +++++++++++++++++++++++++++++++++++++++++++----- server.c | 11 +++++ unix_sock.c | 8 ++++ 7 files changed, 389 insertions(+), 17 deletions(-)
This implements support for using TCP instead of UDP for server-to-server communication for slave-to-master requests and for master-to-master synchronization.
Both these scenarios potentially exchange large chunks of data. Alfred distributes this data over multiple UDP packets. However, it - sensibly - uses 64kByte as the maximum UDP packet size. This makes sense since the data stored might well exceed the small sizes of typical network layer packages (like 1280 or 1500 Bytes).
Large UDP packets will then get fragmented by the Kernel on the IPv6 layer. For busy links and for exchange over very heterogenous network architectures, this lead to the loss of fragments (some or all of them). That makes proper communication a matter of luck for some users.
In all these cases, reliable communication is preferred. Instead of implementing our own handling for this upon UDP, using TCP will provide everything what's needed.
Implementation overview:
The interface struct is extended by a list of TCP sockets. Sockets in this list will get cleaned up when the interface goes away. The sockets in this list will be monitored for incoming data (or for them to get closed on the other side). Incoming data is handled more or less the same way as incoming UDP data is, with the added speciality that packets are accumulated since it is not arriving in self-contained datagrams like when using UDP. The packet data is stored along the socket information. When a full packet is received, it is handled just like a packet received as a UDP datagram.
A REQUEST is answered on the same socket as it was received on, since TCP allows bidirectional communication.
Signed-off-by: Hans-Werner Hilse hwhilse@gmail.com --- alfred.h | 24 +++++++++- main.c | 8 +++- netsock.c | 148 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- recv.c | 88 ++++++++++++++++++++++++++++++++++-- send.c | 119 +++++++++++++++++++++++++++++++++++++++++++----- server.c | 11 +++++ unix_sock.c | 8 ++++ 7 files changed, 389 insertions(+), 17 deletions(-)
diff --git a/alfred.h b/alfred.h index 7e5db16..121c445 100644 --- a/alfred.h +++ b/alfred.h @@ -89,6 +89,11 @@ enum opmode { OPMODE_MASTER, };
+enum requestproto { + REQPROTO_UDP, + REQPROTO_TCP +}; + enum clientmode { CLIENT_NONE, CLIENT_REQUEST_DATA, @@ -97,6 +102,15 @@ enum clientmode { CLIENT_CHANGE_INTERFACE, };
+struct tcp_connection { + int netsock; + struct in6_addr address; + struct alfred_tlv *packet; + uint16_t read; + + struct list_head list; +}; + struct interface { struct ether_addr hwaddr; struct in6_addr address; @@ -104,6 +118,9 @@ struct interface { char *interface; int netsock; int netsock_mcast; + int netsock_tcp; + + struct list_head tcp_connections;
struct hashtable_t *server_hash;
@@ -117,6 +134,7 @@ struct globals { struct server *best_server; /* NULL if we are a server ourselves */ const char *mesh_iface; enum opmode opmode; + enum requestproto requestproto; enum clientmode clientmode; int clientmode_arg; int clientmode_version; @@ -155,6 +173,8 @@ int alfred_client_change_interface(struct globals *globals); /* recv.c */ int recv_alfred_packet(struct globals *globals, struct interface *interface, int recv_sock); +int recv_alfred_stream(struct globals *globals, + struct tcp_connection *tcp_connection); struct transaction_head * transaction_add(struct globals *globals, struct ether_addr mac, uint16_t id); struct transaction_head * @@ -165,12 +185,14 @@ struct transaction_head *transaction_clean(struct globals *globals, /* send.c */ int push_data(struct globals *globals, struct interface *interface, struct in6_addr *destination, enum data_source max_source_level, - int type_filter, uint16_t tx_id); + int type_filter, uint16_t tx_id, int socket); int announce_master(struct globals *globals); int push_local_data(struct globals *globals); int sync_data(struct globals *globals); ssize_t send_alfred_packet(struct interface *interface, const struct in6_addr *dest, void *buf, int length); +ssize_t send_alfred_stream(struct interface *interface, + const struct in6_addr *dest, void *buf, int length); /* unix_sock.c */ int unix_sock_read(struct globals *globals); int unix_sock_open_daemon(struct globals *globals); diff --git a/main.c b/main.c index 9610398..667e86b 100644 --- a/main.c +++ b/main.c @@ -58,6 +58,7 @@ static void alfred_usage(void) printf(" -m, --master start up the daemon in master mode, which\n"); printf(" accepts data from slaves and syncs it with\n"); printf(" other masters\n"); + printf(" -t, --tcp use TCP protocol for server-to-server communication\n"); printf("\n"); printf(" -u, --unix-path [path] path to unix socket used for client-server\n"); printf(" communication (default: ""ALFRED_SOCK_PATH_DEFAULT"")\n"); @@ -149,6 +150,7 @@ static struct globals *alfred_init(int argc, char *argv[]) {"request", required_argument, NULL, 'r'}, {"interface", required_argument, NULL, 'i'}, {"master", no_argument, NULL, 'm'}, + {"tcp", no_argument, NULL, 't'}, {"help", no_argument, NULL, 'h'}, {"req-version", required_argument, NULL, 'V'}, {"modeswitch", required_argument, NULL, 'M'}, @@ -170,6 +172,7 @@ static struct globals *alfred_init(int argc, char *argv[]) INIT_LIST_HEAD(&globals->interfaces); globals->change_interface = NULL; globals->opmode = OPMODE_SLAVE; + globals->requestproto = REQPROTO_UDP; globals->clientmode = CLIENT_NONE; globals->best_server = NULL; globals->clientmode_version = 0; @@ -182,7 +185,7 @@ static struct globals *alfred_init(int argc, char *argv[])
time_random_seed();
- while ((opt = getopt_long(argc, argv, "ms:r:hi:b:vV:M:I:u:dc:", long_options, + while ((opt = getopt_long(argc, argv, "mts:r:hi:b:vV:M:I:u:dc:", long_options, &opt_ind)) != -1) { switch (opt) { case 'r': @@ -207,6 +210,9 @@ static struct globals *alfred_init(int argc, char *argv[]) case 'm': globals->opmode = OPMODE_MASTER; break; + case 't': + globals->requestproto = REQPROTO_TCP; + break; case 'i': netsock_set_interfaces(globals, optarg); break; diff --git a/netsock.c b/netsock.c index d72541e..eb47bb9 100644 --- a/netsock.c +++ b/netsock.c @@ -80,12 +80,23 @@ static int server_choose(void *d1, int size) void netsock_close_all(struct globals *globals) { struct interface *interface, *is; + struct tcp_connection *tcp_connection, *tc;
list_for_each_entry_safe(interface, is, &globals->interfaces, list) { + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + free(tcp_connection->packet); + free(tcp_connection); + } if (interface->netsock >= 0) close(interface->netsock); if (interface->netsock_mcast >= 0) close(interface->netsock_mcast); + if (interface->netsock_tcp >= 0) + close(interface->netsock_tcp); list_del(&interface->list); hash_delete(interface->server_hash, free); free(interface->interface); @@ -147,6 +158,7 @@ int netsock_set_interfaces(struct globals *globals, char *interfaces) interface->interface = NULL; interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1; interface->server_hash = NULL;
interface->interface = strdup(token); @@ -165,6 +177,8 @@ int netsock_set_interfaces(struct globals *globals, char *interfaces) return -ENOMEM; }
+ INIT_LIST_HEAD(&interface->tcp_connections); + list_add(&interface->list, &globals->interfaces); }
@@ -214,6 +228,7 @@ static int netsock_open(struct interface *interface) { int sock; int sock_mc; + int sock_tcp; struct sockaddr_in6 sin6, sin6_mc; struct ipv6_mreq mreq; struct ifreq ifr; @@ -221,6 +236,7 @@ static int netsock_open(struct interface *interface)
interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1;
sock = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP); if (sock < 0) { @@ -235,6 +251,14 @@ static int netsock_open(struct interface *interface) return -1; }
+ sock_tcp = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (sock_tcp < 0) { + close(sock); + close(sock_mc); + perror("can't open socket"); + return -1; + } + memset(&ifr, 0, sizeof(ifr)); strncpy(ifr.ifr_name, interface->interface, IFNAMSIZ); ifr.ifr_name[IFNAMSIZ - 1] = '\0'; @@ -286,6 +310,16 @@ static int netsock_open(struct interface *interface) goto err; }
+ if (bind(sock_tcp, (struct sockaddr *)&sin6, sizeof(sin6)) < 0) { + perror("can't bind"); + goto err; + } + + if (listen(sock_tcp, 10) < 0) { + perror("can't listen on tcp socket"); + goto err; + } + if (bind(sock_mc, (struct sockaddr *)&sin6_mc, sizeof(sin6_mc)) < 0) { perror("can't bind"); goto err; @@ -327,11 +361,13 @@ static int netsock_open(struct interface *interface)
interface->netsock = sock; interface->netsock_mcast = sock_mc; + interface->netsock_tcp = sock_tcp;
return 0; err: close(sock); close(sock_mc); + close(sock_tcp); return -1; }
@@ -363,6 +399,7 @@ void netsock_reopen(struct globals *globals) int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock) { struct interface *interface; + struct tcp_connection *tcp_connection;
list_for_each_entry(interface, &globals->interfaces, list) { if (interface->netsock >= 0) { @@ -376,6 +413,19 @@ int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock) if (maxsock < interface->netsock_mcast) maxsock = interface->netsock_mcast; } + + if (interface->netsock_tcp >= 0) { + FD_SET(interface->netsock_tcp, fds); + if (maxsock < interface->netsock_tcp) + maxsock = interface->netsock_tcp; + } + + list_for_each_entry(tcp_connection, + &interface->tcp_connections, list) { + FD_SET(tcp_connection->netsock, fds); + if (maxsock < tcp_connection->netsock) + maxsock = tcp_connection->netsock; + } }
return maxsock; @@ -384,12 +434,26 @@ int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock) void netsock_check_error(struct globals *globals, fd_set *errfds) { struct interface *interface; + struct tcp_connection *tcp_connection, *tc;
list_for_each_entry(interface, &globals->interfaces, list) { + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + if (FD_ISSET(tcp_connection->netsock, errfds)) { + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + free(tcp_connection->packet); + free(tcp_connection); + } + } + if ((interface->netsock < 0 || !FD_ISSET(interface->netsock, errfds)) && (interface->netsock_mcast < 0 || - !FD_ISSET(interface->netsock_mcast, errfds))) + !FD_ISSET(interface->netsock_mcast, errfds)) && + (interface->netsock_tcp < 0 || + !FD_ISSET(interface->netsock_tcp, errfds))) continue;
fprintf(stderr, "Error on netsock detected\n"); @@ -400,15 +464,23 @@ void netsock_check_error(struct globals *globals, fd_set *errfds) if (interface->netsock_mcast >= 0) close(interface->netsock_mcast);
+ if (interface->netsock_tcp >= 0) + close(interface->netsock_tcp); + interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1; } }
int netsock_receive_packet(struct globals *globals, fd_set *fds) { struct interface *interface; + struct tcp_connection *tcp_connection, *tc; + struct sockaddr_in6 sin6; + socklen_t sin6_len = sizeof(sin6); int recvs = 0; + int sock_client;
list_for_each_entry(interface, &globals->interfaces, list) { if (interface->netsock >= 0 && @@ -424,6 +496,80 @@ int netsock_receive_packet(struct globals *globals, fd_set *fds) interface->netsock_mcast); recvs++; } + + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + if (FD_ISSET(tcp_connection->netsock, fds)) { + if (recv_alfred_stream(globals, + tcp_connection)) { + /* upon error, close and free TCP + * connection + */ + shutdown(tcp_connection->netsock, + SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + free(tcp_connection->packet); + free(tcp_connection); + } + recvs++; + } + } + + if (interface->netsock_tcp >= 0 && + FD_ISSET(interface->netsock_tcp, fds)) { + sock_client = accept(interface->netsock_tcp, + (struct sockaddr *)&sin6, + &sin6_len); + if (sock_client < 0) { + perror("can't accept TCP connection"); + goto tcp_done; + } + + /* drop packets not sent over link-local ipv6 */ + if (!is_ipv6_eui64(&sin6.sin6_addr)) { + fprintf(stderr, "not handling TCP connection " + "from non-link-local address" + "\n"); + goto tcp_drop; + } + + /* drop packets from ourselves */ + if (netsock_own_address(globals, &sin6.sin6_addr)) { + fprintf(stderr, "not handling TCP connection " + "from ourselves\n"); + goto tcp_drop; + } + + tcp_connection = malloc(sizeof(*tcp_connection)); + if (!tcp_connection) { + fprintf(stderr, "out of memory, cannot handle " + "TCP client connection\n"); + goto tcp_drop; + } + + tcp_connection->packet = + calloc(1, sizeof(struct alfred_tlv)); + if (!tcp_connection->packet) { + fprintf(stderr, "out of memory, cannot handle " + "TCP client connection\n"); + free(tcp_connection); + goto tcp_drop; + } + + tcp_connection->read = 0; + tcp_connection->netsock = sock_client; + memcpy(&tcp_connection->address, &sin6.sin6_addr, + sizeof(tcp_connection->address)); + list_add(&tcp_connection->list, + &interface->tcp_connections); + goto tcp_done; +tcp_drop: + shutdown(sock_client, SHUT_RDWR); + close(sock_client); +tcp_done: + recvs++; + } }
return recvs; diff --git a/recv.c b/recv.c index 98539cb..04ee4ce 100644 --- a/recv.c +++ b/recv.c @@ -302,7 +302,8 @@ process_alfred_announce_master(struct globals *globals, static int process_alfred_request(struct globals *globals, struct interface *interface, struct in6_addr *source, - struct alfred_request_v0 *request) + struct alfred_request_v0 *request, + int socket) { int len;
@@ -315,7 +316,7 @@ static int process_alfred_request(struct globals *globals, return -1;
push_data(globals, interface, source, SOURCE_SYNCED, - request->requested_type, request->tx_id); + request->requested_type, request->tx_id, socket);
return 0; } @@ -432,7 +433,7 @@ int recv_alfred_packet(struct globals *globals, struct interface *interface, break; case ALFRED_REQUEST: process_alfred_request(globals, interface, &source.sin6_addr, - (struct alfred_request_v0 *)packet); + (struct alfred_request_v0 *)packet, -1); break; case ALFRED_STATUS_TXEND: process_alfred_status_txend(globals, &source.sin6_addr, @@ -445,3 +446,84 @@ int recv_alfred_packet(struct globals *globals, struct interface *interface,
return 0; } + +int recv_alfred_stream(struct globals *globals, struct tcp_connection *tcp_connection) +{ + size_t to_read; + int res; + const size_t header_len = sizeof(struct alfred_tlv); + void *mem; + + /* determine how many bytes we're still expecting */ + if (tcp_connection->read < header_len) { + /* TLV header still incomplete */ + to_read = header_len - tcp_connection->read; + } else { + /* payload still incomplete */ + to_read = header_len + + ntohs(tcp_connection->packet->length) + - tcp_connection->read; + } + + res = recv(tcp_connection->netsock, + (uint8_t*)tcp_connection->packet + tcp_connection->read, + to_read, MSG_DONTWAIT); + + if (res < 0) { + return (errno == EAGAIN || errno == EWOULDBLOCK) ? 0 : -1; + } else if (res == 0) { + /* end of stream */ + return -1; + } + + tcp_connection->read += res; + + if (tcp_connection->read == header_len + && tcp_connection->packet->length > 0) { + /* there's payload, so adjust buffer size */ + mem = realloc(tcp_connection->packet, + header_len + ntohs(tcp_connection->packet->length)); + if (!mem) { + fprintf(stderr, "out of memory when reading from TCP " + "client\n"); + return -1; + } + tcp_connection->packet = (struct alfred_tlv *)mem; + } + + if (tcp_connection->read == + header_len + ntohs(tcp_connection->packet->length)) { + /* packet is complete */ + switch(tcp_connection->packet->type) { + case ALFRED_REQUEST: + process_alfred_request(globals, NULL, + &tcp_connection->address, + (struct alfred_request_v0 *)tcp_connection->packet, + tcp_connection->netsock); + break; + case ALFRED_PUSH_DATA: + process_alfred_push_data(globals, &tcp_connection->address, + (struct alfred_push_data_v0 *)tcp_connection->packet); + + /* do not close connection, but expect more packets */ + mem = realloc(tcp_connection->packet, header_len); + if (!mem) { + fprintf(stderr, "out of memory when reading " + "from TCP client\n"); + return -1; + } + memset(mem, 0, header_len); + tcp_connection->packet = (struct alfred_tlv *)mem; + tcp_connection->read = 0; + return 0; + case ALFRED_STATUS_TXEND: + process_alfred_status_txend(globals, &tcp_connection->address, + (struct alfred_status_v0 *)tcp_connection->packet); + break; + } + /* close connection */ + return -1; + } + + return 0; +} diff --git a/send.c b/send.c index 70f694c..3d7dced 100644 --- a/send.c +++ b/send.c @@ -27,11 +27,36 @@ #include <errno.h> #include <stdio.h> #include <unistd.h> +#include <stdlib.h> #include "alfred.h" #include "hash.h" #include "packet.h" #include "list.h"
+int connect_tcp(struct interface *interface, const struct in6_addr *dest) +{ + struct sockaddr_in6 dest_addr; + int sock; + + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin6_family = AF_INET6; + dest_addr.sin6_port = htons(ALFRED_PORT); + dest_addr.sin6_scope_id = interface->scope_id; + memcpy(&dest_addr.sin6_addr, dest, sizeof(*dest)); + + sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) + return -1; + + if (connect(sock, (struct sockaddr *)&dest_addr, + sizeof(struct sockaddr_in6)) < 0) { + close(sock); + return -1; + } + + return sock; +} + int announce_master(struct globals *globals) { struct alfred_announce_master_v0 announcement; @@ -51,7 +76,7 @@ int announce_master(struct globals *globals)
int push_data(struct globals *globals, struct interface *interface, struct in6_addr *destination, enum data_source max_source_level, - int type_filter, uint16_t tx_id) + int type_filter, uint16_t tx_id, int socket) { struct hash_it_t *hashit = NULL; uint8_t buf[MAX_PAYLOAD]; @@ -90,8 +115,14 @@ int push_data(struct globals *globals, struct interface *interface, tlv_length += sizeof(*push) - sizeof(push->header); push->header.length = htons(tlv_length); push->tx.seqno = htons(seqno++); - send_alfred_packet(interface, destination, push, - sizeof(*push) + total_length); + if (socket < 0) { + send_alfred_packet(interface, destination, push, + sizeof(*push) + + total_length); + } else { + send(socket, push, sizeof(*push) + total_length, + MSG_NOSIGNAL); + } total_length = 0; }
@@ -114,8 +145,13 @@ int push_data(struct globals *globals, struct interface *interface, tlv_length += sizeof(*push) - sizeof(push->header); push->header.length = htons(tlv_length); push->tx.seqno = htons(seqno++); - send_alfred_packet(interface, destination, push, - sizeof(*push) + total_length); + if (socket < 0) { + send_alfred_packet(interface, destination, push, + sizeof(*push) + total_length); + } else { + send(socket, push, sizeof(*push) + total_length, + MSG_NOSIGNAL); + } }
/* send transaction txend packet */ @@ -128,8 +164,13 @@ int push_data(struct globals *globals, struct interface *interface, status_end.tx.id = tx_id; status_end.tx.seqno = htons(seqno);
- send_alfred_packet(interface, destination, &status_end, - sizeof(status_end)); + if (socket < 0) { + send_alfred_packet(interface, destination, &status_end, + sizeof(status_end)); + } else { + send(socket, &status_end, sizeof(status_end), + MSG_NOSIGNAL); + } }
return 0; @@ -139,6 +180,7 @@ int sync_data(struct globals *globals) { struct hash_it_t *hashit = NULL; struct interface *interface; + int sock;
/* send local data and data from our clients to (all) other servers */ list_for_each_entry(interface, &globals->interfaces, list) { @@ -146,9 +188,20 @@ int sync_data(struct globals *globals) hashit))) { struct server *server = hashit->bucket->data;
- push_data(globals, interface, &server->address, - SOURCE_FIRST_HAND, NO_FILTER, - get_random_id()); + if (globals->requestproto == REQPROTO_TCP) { + sock = connect_tcp(interface, &server->address); + if (sock < 0) + continue; + push_data(globals, interface, &server->address, + SOURCE_FIRST_HAND, NO_FILTER, + get_random_id(), sock); + shutdown(sock, SHUT_RDWR); + close(sock); + } else { + push_data(globals, interface, &server->address, + SOURCE_FIRST_HAND, NO_FILTER, + get_random_id(), -1); + } } } return 0; @@ -164,7 +217,7 @@ int push_local_data(struct globals *globals)
list_for_each_entry(interface, &globals->interfaces, list) { push_data(globals, interface, &globals->best_server->address, - SOURCE_LOCAL, NO_FILTER, get_random_id()); + SOURCE_LOCAL, NO_FILTER, get_random_id(), -1); }
return 0; @@ -198,3 +251,47 @@ ssize_t send_alfred_packet(struct interface *interface,
return ret; } + +ssize_t send_alfred_stream(struct interface *interface, + const struct in6_addr *dest, void *buf, int length) +{ + ssize_t ret; + int sock; + struct tcp_connection *tcp_connection; + + sock = connect_tcp(interface, dest); + if (sock < 0) + return -1; + + ret = send(sock, buf, length, MSG_NOSIGNAL); + if (ret < 0) { + shutdown(sock, SHUT_RDWR); + close(sock); + return -1; + } + + /* close socket for writing */ + shutdown(sock, SHUT_WR); + + /* put socket on the interface's tcp socket list for reading */ + tcp_connection = malloc(sizeof(*tcp_connection)); + if (!tcp_connection) { + goto tcp_drop; + } + tcp_connection->packet = calloc(1, sizeof(struct alfred_tlv)); + if (!tcp_connection->packet) { + free(tcp_connection); + goto tcp_drop; + } + tcp_connection->read = 0; + tcp_connection->netsock = sock; + memcpy(&tcp_connection->address, dest, sizeof(tcp_connection->address)); + list_add(&tcp_connection->list, &interface->tcp_connections); + + return 0; + +tcp_drop: + shutdown(sock, SHUT_RDWR); + close(sock); + return -1; +} diff --git a/server.c b/server.c index 47aee4f..457b143 100644 --- a/server.c +++ b/server.c @@ -222,6 +222,7 @@ static void check_if_socket(struct interface *interface) { int sock; struct ifreq ifr; + struct tcp_connection *tcp_connection, *tc;
if (interface->netsock < 0) return; @@ -261,10 +262,20 @@ static void check_if_socket(struct interface *interface) return;
close: + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + free(tcp_connection->packet); + free(tcp_connection); + } close(interface->netsock); close(interface->netsock_mcast); + close(interface->netsock_tcp); interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1; close(sock); }
diff --git a/unix_sock.c b/unix_sock.c index a0ccc13..d0ff9de 100644 --- a/unix_sock.c +++ b/unix_sock.c @@ -251,6 +251,14 @@ static int unix_sock_req_data(struct globals *globals, head->client_socket = client_sock; head->requested_type = request->requested_type;
+ if (globals->requestproto == REQPROTO_TCP) { + if (!send_alfred_stream(interface, + &globals->best_server->address, + request, sizeof(*request))) + return 0; + } + + /* default and fallback case: UDP */ send_alfred_packet(interface, &globals->best_server->address, request, sizeof(*request));
+int connect_tcp(struct interface *interface, const struct in6_addr *dest) +{
- struct sockaddr_in6 dest_addr;
- int sock;
- memset(&dest_addr, 0, sizeof(dest_addr));
- dest_addr.sin6_family = AF_INET6;
- dest_addr.sin6_port = htons(ALFRED_PORT);
- dest_addr.sin6_scope_id = interface->scope_id;
- memcpy(&dest_addr.sin6_addr, dest, sizeof(*dest));
- sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP);
- if (sock < 0)
return -1;
- if (connect(sock, (struct sockaddr *)&dest_addr,
sizeof(struct sockaddr_in6)) < 0) {
close(sock);
return -1;
- }
- return sock;
+}
Wouldn't this hang for a while and make the alfred server "unresponsive" when the remote is not reachable at this moment?.
Kind regards, Sven
Hi,
Am 2016-03-21 10:13, schrieb Sven Eckelmann:
+int connect_tcp(struct interface *interface, const struct in6_addr *dest) +{ [...] +}
Wouldn't this hang for a while and make the alfred server "unresponsive" when the remote is not reachable at this moment?.
Yes, it most definitely would, you're right.
If the route I'm following is generally right, I think I'll work on this part (*that* I can make into a different commit). The connect() can be made non-blocking, and I would add the socket and the request to send to a list (possibly using the transaction list that already exists right now).
-hwh
Hello Hans-Werner,
On Monday 21 March 2016 10:38:55 Hans-Werner Hilse wrote:
Hi,
Am 2016-03-21 10:13, schrieb Sven Eckelmann:
+int connect_tcp(struct interface *interface, const struct in6_addr *dest) +{ [...] +}
Wouldn't this hang for a while and make the alfred server "unresponsive" when the remote is not reachable at this moment?.
Yes, it most definitely would, you're right.
If the route I'm following is generally right, I think I'll work on this part (*that* I can make into a different commit). The connect() can be made non-blocking, and I would add the socket and the request to send to a list (possibly using the transaction list that already exists right now).
I think you are on the right track - in the ticket, we discussed whether we do TCP or just decrease the fragment size, but doing TCP is probably the more reliable option to do.
I agree that we should definitely use nonblocking operation.
I was also thinking that we might want to use TCP in general when the packet size reaches a certain threshold (e.g. >3kB), and always use UDP for small packets. Then we would also not need the commandline option (unless you think we should leave a way to force a mode). Any thoughts on that?
I don't think you have to split your patch further, at least I don't see a good way right now. I didn't do a deep review yet, but can do in the next iteration with the nonblocking stuff implemented. :)
Thanks a lot for taking care of this! Simon
I was also thinking that we might want to use TCP in general when the packet size reaches a certain threshold (e.g. >3kB), and always use UDP for small packets. Then we would also not need the commandline option (unless you think we should leave a way to force a mode). Any thoughts on that?
Hi Simon
Either/or behaviour like this can cause hard to understand issues. e.g. i opened a hole in the firewall to let Alfred traffic though, but it suddenly stops when the network gets big, because it swapped to TCP, and i don't have a hole of that. Similar for QoS filters, i prioritise UDP Alfred packets, but when it swaps to TCP they are not prioritised and get discarded during congestion...
Andrew
This implements support for using TCP instead of UDP for server-to-server communication for slave-to-master requests and for master-to-master synchronization.
Both these scenarios potentially exchange large chunks of data. Alfred distributes this data over multiple UDP packets. However, it - sensibly - uses 64kByte as the maximum UDP packet size. This makes sense since the data stored might well exceed the small sizes of typical network layer packages (like 1280 or 1500 Bytes).
Large UDP packets will then get fragmented by the Kernel on the IPv6 layer. For busy links and for exchange over very heterogenous network architectures, this lead to the loss of fragments (some or all of them). That makes proper communication a matter of luck for some users.
In all these cases, reliable communication is preferred. Instead of implementing our own handling for this upon UDP, using TCP will provide everything what's needed.
Implementation overview:
The interface struct is extended by a list of TCP sockets. Sockets in this list will get cleaned up when the interface goes away. The sockets in this list will be monitored for incoming data (or for them to get closed on the other side). Incoming data is handled more or less the same way as incoming UDP data is, with the added speciality that packets are accumulated since it is not arriving in self-contained datagrams like when using UDP. The packet data is stored along the socket information. When a full packet is received, it is handled just like a packet received as a UDP datagram.
A REQUEST is answered on the same socket as it was received on, since TCP allows bidirectional communication.
Signed-off-by: Hans-Werner Hilse hwhilse@gmail.com --- Changes in v2: - non-blocking sending over TCP sockets
alfred.h | 44 ++++++++++- main.c | 8 +- netsock.c | 239 +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++- recv.c | 90 ++++++++++++++++++++++- send.c | 204 +++++++++++++++++++++++++++++++++++++++++++++++---- server.c | 30 +++++++- unix_sock.c | 14 ++++ 7 files changed, 607 insertions(+), 22 deletions(-)
diff --git a/alfred.h b/alfred.h index 7e5db16..2de1955 100644 --- a/alfred.h +++ b/alfred.h @@ -89,6 +89,11 @@ enum opmode { OPMODE_MASTER, };
+enum requestproto { + REQPROTO_UDP, + REQPROTO_TCP +}; + enum clientmode { CLIENT_NONE, CLIENT_REQUEST_DATA, @@ -97,6 +102,27 @@ enum clientmode { CLIENT_CHANGE_INTERFACE, };
+enum tcp_close { + CLOSE_WHEN_READ, + CLOSE_WHEN_WRITTEN, + CLOSED_FOR_READING, +}; + +struct tcp_connection { + int netsock; + struct in6_addr address; + + struct alfred_tlv *packet; + uint16_t read; + + uint8_t *send_packet; + uint32_t send_length; + uint32_t written; + + enum tcp_close close; + struct list_head list; +}; + struct interface { struct ether_addr hwaddr; struct in6_addr address; @@ -104,6 +130,9 @@ struct interface { char *interface; int netsock; int netsock_mcast; + int netsock_tcp; + + struct list_head tcp_connections;
struct hashtable_t *server_hash;
@@ -117,6 +146,7 @@ struct globals { struct server *best_server; /* NULL if we are a server ourselves */ const char *mesh_iface; enum opmode opmode; + enum requestproto requestproto; enum clientmode clientmode; int clientmode_arg; int clientmode_version; @@ -139,6 +169,7 @@ struct globals { #define debugFree(ptr, num) free(ptr)
#define MAX_PAYLOAD ((1 << 16) - 1 - sizeof(struct udphdr)) +#define MAX_UDP_ANSWER 1280 - 40 - sizeof(struct udphdr)
extern const struct in6_addr in6addr_localmcast;
@@ -155,6 +186,9 @@ int alfred_client_change_interface(struct globals *globals); /* recv.c */ int recv_alfred_packet(struct globals *globals, struct interface *interface, int recv_sock); +int recv_alfred_stream(struct globals *globals, + struct interface *interface, + struct tcp_connection *tcp_connection); struct transaction_head * transaction_add(struct globals *globals, struct ether_addr mac, uint16_t id); struct transaction_head * @@ -165,12 +199,17 @@ struct transaction_head *transaction_clean(struct globals *globals, /* send.c */ int push_data(struct globals *globals, struct interface *interface, struct in6_addr *destination, enum data_source max_source_level, - int type_filter, uint16_t tx_id); + int type_filter, uint16_t tx_id, + struct tcp_connection *tcp_connection); int announce_master(struct globals *globals); int push_local_data(struct globals *globals); int sync_data(struct globals *globals); ssize_t send_alfred_packet(struct interface *interface, const struct in6_addr *dest, void *buf, int length); +ssize_t open_alfred_stream(struct interface *interface, + const struct in6_addr *dest, void *buf, int length, + enum tcp_close close_mode); +ssize_t send_alfred_stream(struct tcp_connection *tcp_client); /* unix_sock.c */ int unix_sock_read(struct globals *globals); int unix_sock_open_daemon(struct globals *globals); @@ -187,8 +226,11 @@ int netsock_set_interfaces(struct globals *globals, char *interfaces); struct interface *netsock_first_interface(struct globals *globals); void netsock_reopen(struct globals *globals); int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock); +int netsock_prepare_write_select(struct globals *globals, fd_set *fds, + int maxsock); void netsock_check_error(struct globals *globals, fd_set *errfds); int netsock_receive_packet(struct globals *globals, fd_set *fds); +int netsock_send(struct globals *globals, fd_set *fds); int netsock_own_address(const struct globals *globals, const struct in6_addr *address); /* util.c */ diff --git a/main.c b/main.c index 9610398..667e86b 100644 --- a/main.c +++ b/main.c @@ -58,6 +58,7 @@ static void alfred_usage(void) printf(" -m, --master start up the daemon in master mode, which\n"); printf(" accepts data from slaves and syncs it with\n"); printf(" other masters\n"); + printf(" -t, --tcp use TCP protocol for server-to-server communication\n"); printf("\n"); printf(" -u, --unix-path [path] path to unix socket used for client-server\n"); printf(" communication (default: ""ALFRED_SOCK_PATH_DEFAULT"")\n"); @@ -149,6 +150,7 @@ static struct globals *alfred_init(int argc, char *argv[]) {"request", required_argument, NULL, 'r'}, {"interface", required_argument, NULL, 'i'}, {"master", no_argument, NULL, 'm'}, + {"tcp", no_argument, NULL, 't'}, {"help", no_argument, NULL, 'h'}, {"req-version", required_argument, NULL, 'V'}, {"modeswitch", required_argument, NULL, 'M'}, @@ -170,6 +172,7 @@ static struct globals *alfred_init(int argc, char *argv[]) INIT_LIST_HEAD(&globals->interfaces); globals->change_interface = NULL; globals->opmode = OPMODE_SLAVE; + globals->requestproto = REQPROTO_UDP; globals->clientmode = CLIENT_NONE; globals->best_server = NULL; globals->clientmode_version = 0; @@ -182,7 +185,7 @@ static struct globals *alfred_init(int argc, char *argv[])
time_random_seed();
- while ((opt = getopt_long(argc, argv, "ms:r:hi:b:vV:M:I:u:dc:", long_options, + while ((opt = getopt_long(argc, argv, "mts:r:hi:b:vV:M:I:u:dc:", long_options, &opt_ind)) != -1) { switch (opt) { case 'r': @@ -207,6 +210,9 @@ static struct globals *alfred_init(int argc, char *argv[]) case 'm': globals->opmode = OPMODE_MASTER; break; + case 't': + globals->requestproto = REQPROTO_TCP; + break; case 'i': netsock_set_interfaces(globals, optarg); break; diff --git a/netsock.c b/netsock.c index d72541e..068b294 100644 --- a/netsock.c +++ b/netsock.c @@ -80,12 +80,26 @@ static int server_choose(void *d1, int size) void netsock_close_all(struct globals *globals) { struct interface *interface, *is; + struct tcp_connection *tcp_connection, *tc;
list_for_each_entry_safe(interface, is, &globals->interfaces, list) { + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + if(tcp_connection->packet) + free(tcp_connection->packet); + if(tcp_connection->send_packet) + free(tcp_connection->send_packet); + free(tcp_connection); + } if (interface->netsock >= 0) close(interface->netsock); if (interface->netsock_mcast >= 0) close(interface->netsock_mcast); + if (interface->netsock_tcp >= 0) + close(interface->netsock_tcp); list_del(&interface->list); hash_delete(interface->server_hash, free); free(interface->interface); @@ -147,6 +161,7 @@ int netsock_set_interfaces(struct globals *globals, char *interfaces) interface->interface = NULL; interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1; interface->server_hash = NULL;
interface->interface = strdup(token); @@ -165,6 +180,8 @@ int netsock_set_interfaces(struct globals *globals, char *interfaces) return -ENOMEM; }
+ INIT_LIST_HEAD(&interface->tcp_connections); + list_add(&interface->list, &globals->interfaces); }
@@ -214,13 +231,16 @@ static int netsock_open(struct interface *interface) { int sock; int sock_mc; + int sock_tcp; struct sockaddr_in6 sin6, sin6_mc; struct ipv6_mreq mreq; struct ifreq ifr; int ret; + int yes;
interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1;
sock = socket(PF_INET6, SOCK_DGRAM, IPPROTO_UDP); if (sock < 0) { @@ -235,6 +255,14 @@ static int netsock_open(struct interface *interface) return -1; }
+ sock_tcp = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (sock_tcp < 0) { + close(sock); + close(sock_mc); + perror("can't open socket"); + return -1; + } + memset(&ifr, 0, sizeof(ifr)); strncpy(ifr.ifr_name, interface->interface, IFNAMSIZ); ifr.ifr_name[IFNAMSIZ - 1] = '\0'; @@ -286,6 +314,23 @@ static int netsock_open(struct interface *interface) goto err; }
+ yes = 1; + if (setsockopt(sock_tcp, SOL_SOCKET, SO_REUSEADDR, + &yes, sizeof(yes)) == -1) { + perror("setsockopt: SO_REUSEADDR"); + goto err; + } + + if (bind(sock_tcp, (struct sockaddr *)&sin6, sizeof(sin6)) < 0) { + perror("can't bind"); + goto err; + } + + if (listen(sock_tcp, 10) < 0) { + perror("can't listen on tcp socket"); + goto err; + } + if (bind(sock_mc, (struct sockaddr *)&sin6_mc, sizeof(sin6_mc)) < 0) { perror("can't bind"); goto err; @@ -327,11 +372,13 @@ static int netsock_open(struct interface *interface)
interface->netsock = sock; interface->netsock_mcast = sock_mc; + interface->netsock_tcp = sock_tcp;
return 0; err: close(sock); close(sock_mc); + close(sock_tcp); return -1; }
@@ -363,6 +410,7 @@ void netsock_reopen(struct globals *globals) int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock) { struct interface *interface; + struct tcp_connection *tcp_connection;
list_for_each_entry(interface, &globals->interfaces, list) { if (interface->netsock >= 0) { @@ -376,6 +424,57 @@ int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock) if (maxsock < interface->netsock_mcast) maxsock = interface->netsock_mcast; } + + if (interface->netsock_tcp >= 0) { + FD_SET(interface->netsock_tcp, fds); + if (maxsock < interface->netsock_tcp) + maxsock = interface->netsock_tcp; + } + + list_for_each_entry(tcp_connection, + &interface->tcp_connections, list) { + if (tcp_connection->close != CLOSED_FOR_READING) { + FD_SET(tcp_connection->netsock, fds); + if (maxsock < tcp_connection->netsock) + maxsock = tcp_connection->netsock; + } + } + } + + return maxsock; +} + +int netsock_prepare_write_select(struct globals *globals, fd_set *fds, + int maxsock) +{ + struct interface *interface; + struct tcp_connection *tcp_connection; + + list_for_each_entry(interface, &globals->interfaces, list) { + list_for_each_entry(tcp_connection, + &interface->tcp_connections, list) { + if (tcp_connection->send_length) { + /* monitor fd only if we actually have + * data we'd like to send + */ + FD_SET(tcp_connection->netsock, fds); + if (maxsock < tcp_connection->netsock) + maxsock = tcp_connection->netsock; + } else if (tcp_connection->close == CLOSE_WHEN_WRITTEN) { + /* we have a TCP connection that should be + * closed when everything is written and it + * seems that is the case now + */ + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + if (tcp_connection->packet) + free(tcp_connection->packet); + if (tcp_connection->send_packet) + free(tcp_connection->send_packet); + free(tcp_connection); + } + } }
return maxsock; @@ -384,12 +483,29 @@ int netsock_prepare_select(struct globals *globals, fd_set *fds, int maxsock) void netsock_check_error(struct globals *globals, fd_set *errfds) { struct interface *interface; + struct tcp_connection *tcp_connection, *tc;
list_for_each_entry(interface, &globals->interfaces, list) { + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + if (FD_ISSET(tcp_connection->netsock, errfds)) { + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + if (tcp_connection->packet) + free(tcp_connection->packet); + if (tcp_connection->send_packet) + free(tcp_connection->send_packet); + free(tcp_connection); + } + } + if ((interface->netsock < 0 || !FD_ISSET(interface->netsock, errfds)) && (interface->netsock_mcast < 0 || - !FD_ISSET(interface->netsock_mcast, errfds))) + !FD_ISSET(interface->netsock_mcast, errfds)) && + (interface->netsock_tcp < 0 || + !FD_ISSET(interface->netsock_tcp, errfds))) continue;
fprintf(stderr, "Error on netsock detected\n"); @@ -400,15 +516,23 @@ void netsock_check_error(struct globals *globals, fd_set *errfds) if (interface->netsock_mcast >= 0) close(interface->netsock_mcast);
+ if (interface->netsock_tcp >= 0) + close(interface->netsock_tcp); + interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1; } }
int netsock_receive_packet(struct globals *globals, fd_set *fds) { struct interface *interface; + struct tcp_connection *tcp_connection, *tc; + struct sockaddr_in6 sin6; + socklen_t sin6_len = sizeof(sin6); int recvs = 0; + int sock_client;
list_for_each_entry(interface, &globals->interfaces, list) { if (interface->netsock >= 0 && @@ -424,6 +548,119 @@ int netsock_receive_packet(struct globals *globals, fd_set *fds) interface->netsock_mcast); recvs++; } + + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + if (FD_ISSET(tcp_connection->netsock, fds)) { + if (recv_alfred_stream(globals, interface, + tcp_connection) < 0) { + if(tcp_connection->close == CLOSE_WHEN_READ) { + shutdown(tcp_connection->netsock, + SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + if (tcp_connection->packet) + free(tcp_connection->packet); + if (tcp_connection->send_packet) + free(tcp_connection->send_packet); + free(tcp_connection); + } else { + tcp_connection->close = + CLOSED_FOR_READING; + } + } + recvs++; + } + } + + if (interface->netsock_tcp >= 0 && + FD_ISSET(interface->netsock_tcp, fds)) { + sock_client = accept(interface->netsock_tcp, + (struct sockaddr *)&sin6, + &sin6_len); + if (sock_client < 0) { + perror("can't accept TCP connection"); + goto tcp_done; + } + + /* drop packets not sent over link-local ipv6 */ + if (!is_ipv6_eui64(&sin6.sin6_addr)) { + fprintf(stderr, "not handling TCP connection " + "from non-link-local address" + "\n"); + goto tcp_drop; + } + + /* drop packets from ourselves */ + if (netsock_own_address(globals, &sin6.sin6_addr)) { + fprintf(stderr, "not handling TCP connection " + "from ourselves\n"); + goto tcp_drop; + } + + tcp_connection = calloc(1, sizeof(*tcp_connection)); + if (!tcp_connection) { + fprintf(stderr, "out of memory, cannot handle " + "TCP client connection\n"); + goto tcp_drop; + } + + tcp_connection->packet = + calloc(1, sizeof(struct alfred_tlv)); + if (!tcp_connection->packet) { + fprintf(stderr, "out of memory, cannot handle " + "TCP client connection\n"); + free(tcp_connection); + goto tcp_drop; + } + + tcp_connection->netsock = sock_client; + tcp_connection->close = CLOSE_WHEN_READ; + + memcpy(&tcp_connection->address, &sin6.sin6_addr, + sizeof(tcp_connection->address)); + list_add(&tcp_connection->list, + &interface->tcp_connections); + goto tcp_done; +tcp_drop: + shutdown(sock_client, SHUT_RDWR); + close(sock_client); +tcp_done: + recvs++; + } + } + + return recvs; +} + +int netsock_send(struct globals *globals, fd_set *fds) +{ + struct interface *interface; + struct tcp_connection *tcp_connection, *tc; + int recvs = 0; + + list_for_each_entry(interface, &globals->interfaces, list) { + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + if (FD_ISSET(tcp_connection->netsock, fds)) { + if(tcp_connection->send_length + && send_alfred_stream(tcp_connection) < 0 + && (tcp_connection->close == CLOSE_WHEN_WRITTEN + || tcp_connection->close == CLOSED_FOR_READING) + ) { + shutdown(tcp_connection->netsock, + SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + if (tcp_connection->packet) + free(tcp_connection->packet); + if (tcp_connection->send_packet) + free(tcp_connection->send_packet); + free(tcp_connection); + } + recvs++; + } + } }
return recvs; diff --git a/recv.c b/recv.c index 98539cb..563a0d1 100644 --- a/recv.c +++ b/recv.c @@ -302,7 +302,8 @@ process_alfred_announce_master(struct globals *globals, static int process_alfred_request(struct globals *globals, struct interface *interface, struct in6_addr *source, - struct alfred_request_v0 *request) + struct alfred_request_v0 *request, + struct tcp_connection *tcp_connection) { int len;
@@ -315,7 +316,7 @@ static int process_alfred_request(struct globals *globals, return -1;
push_data(globals, interface, source, SOURCE_SYNCED, - request->requested_type, request->tx_id); + request->requested_type, request->tx_id, tcp_connection);
return 0; } @@ -432,7 +433,7 @@ int recv_alfred_packet(struct globals *globals, struct interface *interface, break; case ALFRED_REQUEST: process_alfred_request(globals, interface, &source.sin6_addr, - (struct alfred_request_v0 *)packet); + (struct alfred_request_v0 *)packet, -1); break; case ALFRED_STATUS_TXEND: process_alfred_status_txend(globals, &source.sin6_addr, @@ -445,3 +446,86 @@ int recv_alfred_packet(struct globals *globals, struct interface *interface,
return 0; } + +int recv_alfred_stream(struct globals *globals, struct interface *interface, + struct tcp_connection *tcp_connection) +{ + size_t to_read; + int res; + const size_t header_len = sizeof(struct alfred_tlv); + void *mem; + + /* determine how many bytes we're still expecting */ + if (tcp_connection->read < header_len) { + /* TLV header still incomplete */ + to_read = header_len - tcp_connection->read; + } else { + /* payload still incomplete */ + to_read = header_len + + ntohs(tcp_connection->packet->length) + - tcp_connection->read; + } + + res = recv(tcp_connection->netsock, + (uint8_t*)tcp_connection->packet + tcp_connection->read, + to_read, MSG_DONTWAIT); + + if (res < 0) { + return (errno == EAGAIN || errno == EWOULDBLOCK) ? 0 : -1; + } else if (res == 0) { + /* end of stream */ + return -1; + } + + tcp_connection->read += res; + + if (tcp_connection->read == header_len + && tcp_connection->packet->length > 0) { + /* there's payload, so adjust buffer size */ + mem = realloc(tcp_connection->packet, + header_len + ntohs(tcp_connection->packet->length)); + if (!mem) { + fprintf(stderr, "out of memory when reading from TCP " + "client\n"); + return -1; + } + tcp_connection->packet = (struct alfred_tlv *)mem; + } + + if (tcp_connection->read == + header_len + ntohs(tcp_connection->packet->length)) { + /* packet is complete */ + switch(tcp_connection->packet->type) { + case ALFRED_REQUEST: + tcp_connection->close = CLOSE_WHEN_WRITTEN; + process_alfred_request(globals, interface, + &tcp_connection->address, + (struct alfred_request_v0 *)tcp_connection->packet, + tcp_connection); + return 0; + case ALFRED_PUSH_DATA: + process_alfred_push_data(globals, &tcp_connection->address, + (struct alfred_push_data_v0 *)tcp_connection->packet); + + /* do not close connection, but expect more packets */ + mem = realloc(tcp_connection->packet, header_len); + if (!mem) { + fprintf(stderr, "out of memory when reading " + "from TCP client\n"); + return -1; + } + memset(mem, 0, header_len); + tcp_connection->packet = (struct alfred_tlv *)mem; + tcp_connection->read = 0; + return 0; + case ALFRED_STATUS_TXEND: + process_alfred_status_txend(globals, &tcp_connection->address, + (struct alfred_status_v0 *)tcp_connection->packet); + break; + } + /* close connection */ + return -1; + } + + return 0; +} diff --git a/send.c b/send.c index 70f694c..e657216 100644 --- a/send.c +++ b/send.c @@ -27,6 +27,8 @@ #include <errno.h> #include <stdio.h> #include <unistd.h> +#include <stdlib.h> +#include <fcntl.h> #include "alfred.h" #include "hash.h" #include "packet.h" @@ -49,18 +51,21 @@ int announce_master(struct globals *globals) return 0; }
-int push_data(struct globals *globals, struct interface *interface, - struct in6_addr *destination, enum data_source max_source_level, - int type_filter, uint16_t tx_id) +int push_data_handler(struct globals *globals, + enum data_source max_source_level, + int type_filter, uint16_t tx_id, + void (*send_func)(void *state, void *destination, + void *data, int length), + void *state, void *destination) { struct hash_it_t *hashit = NULL; uint8_t buf[MAX_PAYLOAD]; struct alfred_push_data_v0 *push; struct alfred_data *data; uint16_t total_length = 0; + int overall_length = 0; size_t tlv_length; uint16_t seqno = 0; - uint16_t length; struct alfred_status_v0 status_end;
push = (struct alfred_push_data_v0 *)buf; @@ -90,8 +95,12 @@ int push_data(struct globals *globals, struct interface *interface, tlv_length += sizeof(*push) - sizeof(push->header); push->header.length = htons(tlv_length); push->tx.seqno = htons(seqno++); - send_alfred_packet(interface, destination, push, - sizeof(*push) + total_length); + if (send_func) { + send_func(state, destination, push, + sizeof(*push) + total_length); + } else { + overall_length += sizeof(*push) + total_length; + } total_length = 0; }
@@ -114,24 +123,93 @@ int push_data(struct globals *globals, struct interface *interface, tlv_length += sizeof(*push) - sizeof(push->header); push->header.length = htons(tlv_length); push->tx.seqno = htons(seqno++); - send_alfred_packet(interface, destination, push, - sizeof(*push) + total_length); + if (send_func) { + send_func(state, destination, push, + sizeof(*push) + total_length); + } else { + overall_length += sizeof(*push) + total_length; + } }
/* send transaction txend packet */ if (seqno > 0 || type_filter != NO_FILTER) { status_end.header.type = ALFRED_STATUS_TXEND; status_end.header.version = ALFRED_VERSION; - length = sizeof(status_end) - sizeof(status_end.header); - status_end.header.length = htons(length); + total_length = sizeof(status_end) - sizeof(status_end.header); + status_end.header.length = htons(total_length);
status_end.tx.id = tx_id; status_end.tx.seqno = htons(seqno);
- send_alfred_packet(interface, destination, &status_end, - sizeof(status_end)); + if (send_func) { + send_func(state, destination, &status_end, + sizeof(status_end)); + } else { + overall_length += sizeof(status_end); + } + } + + return overall_length; +} + +void send_func_udp(void *state, void *destination, void *data, int length) +{ + send_alfred_packet((struct interface *)state, + (struct in6_addr *)destination, data, length); +} + +void send_func_buf(void *state, void *destination, void *data, int length) +{ + int *pos = (int *)state; + uint8_t *buf = (uint8_t *)destination; + memcpy(buf + *pos, data, length); + *pos += length; +} + +int push_data(struct globals *globals, struct interface *interface, + struct in6_addr *destination, enum data_source max_source_level, + int type_filter, uint16_t tx_id, + struct tcp_connection *tcp_connection) +{ + uint32_t length; + int written = 0; + void *buf; + + length = push_data_handler(globals, max_source_level, + type_filter, tx_id, + NULL, NULL, NULL); + if (tcp_connection) { + /* request via TCP, send answer via this socket */ + buf = malloc(length); + if (!buf) + return -1; + + push_data_handler(globals, max_source_level, type_filter, tx_id, + send_func_buf, &written, buf); + + tcp_connection->send_length = length; + tcp_connection->send_packet = buf; + return 0; }
+ /* request not via an established TCP socket. */ + if (globals->requestproto == REQPROTO_TCP && length > MAX_UDP_ANSWER) { + /* Depending on response payload size, decide if we + * gonna send the response via TCP nevertheless + */ + buf = malloc(length); + if (buf) { + push_data_handler(globals, max_source_level, + type_filter, tx_id, + send_func_buf, &written, buf); + if (open_alfred_stream(interface, destination, buf, + length, CLOSE_WHEN_WRITTEN) >= 0) + return 0; + } + } + /* fallback / default case: answer via UDP */ + push_data_handler(globals, max_source_level, type_filter, + tx_id, send_func_udp, interface, destination); return 0; }
@@ -148,7 +226,7 @@ int sync_data(struct globals *globals)
push_data(globals, interface, &server->address, SOURCE_FIRST_HAND, NO_FILTER, - get_random_id()); + get_random_id(), NULL); } } return 0; @@ -164,7 +242,7 @@ int push_local_data(struct globals *globals)
list_for_each_entry(interface, &globals->interfaces, list) { push_data(globals, interface, &globals->best_server->address, - SOURCE_LOCAL, NO_FILTER, get_random_id()); + SOURCE_LOCAL, NO_FILTER, get_random_id(), NULL); }
return 0; @@ -198,3 +276,101 @@ ssize_t send_alfred_packet(struct interface *interface,
return ret; } + +ssize_t open_alfred_stream(struct interface *interface, + const struct in6_addr *dest, void *buf, int length, + enum tcp_close close_mode) +{ + struct tcp_connection *tcp_connection; + struct sockaddr_in6 dest_addr; + int sock; + int flags; + + list_for_each_entry(tcp_connection, &interface->tcp_connections, list) { + /* when there is already a connection in progress, + * no not open a new one - also, don't report an error either. + */ + if (0 == memcmp(dest, &tcp_connection->address, sizeof(*dest))) + return 0; + } + + memset(&dest_addr, 0, sizeof(dest_addr)); + dest_addr.sin6_family = AF_INET6; + dest_addr.sin6_port = htons(ALFRED_PORT); + dest_addr.sin6_scope_id = interface->scope_id; + memcpy(&dest_addr.sin6_addr, dest, sizeof(*dest)); + + sock = socket(PF_INET6, SOCK_STREAM, IPPROTO_TCP); + if (sock < 0) + return -1; + + flags = fcntl(sock, F_GETFL, 0); + if (flags < 0) { + close(sock); + return -1; + } + flags |= O_NONBLOCK; + if (fcntl(sock, F_SETFL, flags) < 0) { + close(sock); + return -1; + } + if (connect(sock, (struct sockaddr *)&dest_addr, + sizeof(struct sockaddr_in6)) < 0 + && errno != EINPROGRESS) { + close(sock); + return -1; + } + + /* put socket on the interface's tcp client list for writing */ + tcp_connection = calloc(1, sizeof(*tcp_connection)); + if (!tcp_connection) { + shutdown(sock, SHUT_RDWR); + close(sock); + return -1; + } + + memcpy(&tcp_connection->address, dest, sizeof(tcp_connection->address)); + + tcp_connection->close = close_mode; + tcp_connection->netsock = sock; + tcp_connection->send_length = length; + tcp_connection->send_packet = buf; + + tcp_connection->packet = calloc(1, sizeof(struct alfred_tlv)); + if (!tcp_connection->packet) { + close(sock); + free(tcp_connection->send_packet); + free(tcp_connection); + return -1; + } + + list_add(&tcp_connection->list, &interface->tcp_connections); + + return 0; +} + +ssize_t send_alfred_stream(struct tcp_connection *tcp_connection) +{ + ssize_t ret; + ret = send(tcp_connection->netsock, + (uint8_t*) tcp_connection->send_packet + + tcp_connection->written, + tcp_connection->send_length, + MSG_NOSIGNAL); + + if (ret < 0) { + if (errno == EAGAIN || errno == EWOULDBLOCK) + return 0; + /* another error: do not try to send again */ + tcp_connection->send_length = 0; + return -1; + } + + tcp_connection->written += ret; + tcp_connection->send_length -= ret; + + if (tcp_connection->send_length == 0) + return -1; + + return 0; +} diff --git a/server.c b/server.c index 47aee4f..5be7516 100644 --- a/server.c +++ b/server.c @@ -222,6 +222,7 @@ static void check_if_socket(struct interface *interface) { int sock; struct ifreq ifr; + struct tcp_connection *tcp_connection, *tc;
if (interface->netsock < 0) return; @@ -261,10 +262,23 @@ static void check_if_socket(struct interface *interface) return;
close: + list_for_each_entry_safe(tcp_connection, tc, + &interface->tcp_connections, list) { + shutdown(tcp_connection->netsock, SHUT_RDWR); + close(tcp_connection->netsock); + list_del(&tcp_connection->list); + if (tcp_connection->packet) + free(tcp_connection->packet); + if (tcp_connection->send_packet) + free(tcp_connection->send_packet); + free(tcp_connection); + } close(interface->netsock); close(interface->netsock_mcast); + close(interface->netsock_tcp); interface->netsock = -1; interface->netsock_mcast = -1; + interface->netsock_tcp = -1; close(sock); }
@@ -338,7 +352,7 @@ int alfred_server(struct globals *globals) { int maxsock, ret, recvs; struct timespec last_check, now, tv; - fd_set fds, errfds; + fd_set fds, wrfds, errfds; int num_socks;
if (create_hashes(globals)) @@ -381,14 +395,23 @@ int alfred_server(struct globals *globals) netsock_reopen(globals);
FD_ZERO(&fds); + FD_ZERO(&wrfds); FD_ZERO(&errfds); FD_SET(globals->unix_sock, &fds); maxsock = globals->unix_sock;
+ /* testing write sockets might clean up socket list, + * so this has to be done before proceeding to the + * list of sockets tested for reading + */ + maxsock = netsock_prepare_write_select(globals, &wrfds, + maxsock); + maxsock = netsock_prepare_write_select(globals, &errfds, + maxsock); maxsock = netsock_prepare_select(globals, &fds, maxsock); maxsock = netsock_prepare_select(globals, &errfds, maxsock);
- ret = pselect(maxsock + 1, &fds, NULL, &errfds, &tv, NULL); + ret = pselect(maxsock + 1, &fds, &wrfds, &errfds, &tv, NULL);
if (ret == -1) { perror("main loop select failed ..."); @@ -404,6 +427,9 @@ int alfred_server(struct globals *globals) if (recvs > 0) continue; } + + if (netsock_send(globals, &wrfds) > 0) + continue; } clock_gettime(CLOCK_MONOTONIC, &last_check);
diff --git a/unix_sock.c b/unix_sock.c index a0ccc13..d00569b 100644 --- a/unix_sock.c +++ b/unix_sock.c @@ -222,6 +222,7 @@ static int unix_sock_req_data(struct globals *globals, { int len; uint16_t id; + uint8_t *buf; struct transaction_head *head = NULL; struct interface *interface;
@@ -251,6 +252,19 @@ static int unix_sock_req_data(struct globals *globals, head->client_socket = client_sock; head->requested_type = request->requested_type;
+ if (globals->requestproto == REQPROTO_TCP) { + buf = malloc(sizeof(*request)); + if (buf) { + memcpy(buf, request, sizeof(*request)); + if (!open_alfred_stream(interface, + &globals->best_server->address, + buf, sizeof(*request), + CLOSE_WHEN_READ)) + return 0; + } + } + + /* default and fallback case: UDP */ send_alfred_packet(interface, &globals->best_server->address, request, sizeof(*request));
Hi,
Am 2016-03-27 20:26, schrieb Hans-Werner Hilse:
[...] Changes in v2:
- non-blocking sending over TCP sockets
This uses non-blocking IO also for sending via TCP. TCP is chosen when the message size exceeds MAX_UDP_ANSWER (from alfred.h), which is for now conservatively chosen to fit in bad-case MTU settings - or if the data request came via TCP (as the same connection is then used for the reply).
I've run this for a few hours in a test setup with 3x alfred as master and 30 slaves, with some errors, dups and latency thrown in for good measure (yay for VDE, virtual distributed ethernet).
The current implementation has a downside: non-blocking TCP sending for now assembles the full data that is to be sent in a memory buffer, which will then get sent bit by bit (depending on buffers, network and the remote side). This is a consequence of the lack of a good way to guarantee a consistent state of the data store over the time it takes until the full message stream is completely transmitted to the remote end (the data store might get modified between beginning and finishing transmission).
An alternative approach - and a major redesign of data storage - would be some kind of log-based approach. I'm not so sure if that really is warranted for, and in any case, that's quite a different undertaking.
The flag "-t" is still present and will toggle whether alfred will try to request using TCP at all. This will allow to use a TCP-enabled alfred in an environment that is not yet fully TCP enabled (this matters only for the master servers AFAICS).
-hwh
i installed your patch .. here some feedback/problems a had
with OpenWRT and gluon i succeeded ... actually testet it on a tplink841n-v9 runs with or without -m (master) for a while (30 min++) /usr/sbin/alfred -i br-client -b bat0 -m [my big wonder is , why do these router always only output their own entry for alfred -r 158 , while there are master .. on debian based master this is filled up to 290]
on debian 3.16.0-4-amd64 #1 SMP Debian 3.16.7-ckt20-1+deb8u3 (2016-01-17) x86_64 GNU/Linux i got some build errors .. and than after a short while in alfred master mode ... segmentation fault (1 or 2 minutes, sometimes only seconds) this after master announcements and already collecting some 90+ alfred information.
####### patch stuff ####### wget https://patchwork.open-mesh.org/patch/15944/raw/ -O tcp.patch patch < tcp.patch patching file alfred.h Hunk #1 succeeded at 90 (offset 1 line). Hunk #2 succeeded at 103 (offset 1 line). Hunk #3 succeeded at 131 (offset 1 line). Hunk #4 succeeded at 147 (offset 1 line). Hunk #5 succeeded at 170 (offset 1 line). Hunk #6 succeeded at 187 (offset 1 line). Hunk #7 succeeded at 200 (offset 1 line). Hunk #8 succeeded at 227 (offset 1 line). patching file main.c patching file netsock.c patching file recv.c patching file send.c patching file server.c patching file unix_sock.c Hunk #1 succeeded at 229 (offset 7 lines). Hunk #2 succeeded at 259 (offset 7 lines).
####### build errors make CC main.o CC server.o CC client.o CC netsock.o CC send.o CC recv.o recv.c: In function 'recv_alfred_packet': recv.c:436:48: warning: passing argument 5 of 'process_alfred_request' makes pointer from integer without a cast (struct alfred_request_v0 *)packet, -1); ^ recv.c:302:12: note: expected 'struct tcp_connection *' but argument is of type 'int' static int process_alfred_request(struct globals *globals, ^ CC hash.o CC unix_sock.o CC util.o CC debugfs.o CC batadv_query.o LD alfred make -C vis all make[1]: Entering directory '/home/freifunk/alfred/vis' CC vis.o CC debugfs.o LD batadv-vis make[1]: Leaving directory '/home/freifunk/alfred/vis' make -C gpsd all make[1]: Entering directory '/home/freifunk/alfred/gpsd' CC alfred-gpsd.o LD alfred-gpsd make[1]: Leaving directory '/home/freifunk/alfred/gpsd'
###### var/log/kern.log Apr 22 20:01:06 fffr-spielwiese kernel: [4398895.515177] alfred[31733]: segfault at 2f ip 0000000000406034 sp 00007fff9dc39580 error 6 in alfred[400000+d000] Apr 22 20:02:05 fffr-spielwiese kernel: [4398954.569665] alfred[32657]: segfault at 2f ip 0000000000406034 sp 00007ffc45b97d50 error 6 in alfred[400000+d000]
On 27.03.2016 20:37, Hans-Werner Hilse wrote:
Hi,
Am 2016-03-27 20:26, schrieb Hans-Werner Hilse:
[...] Changes in v2:
- non-blocking sending over TCP sockets
This uses non-blocking IO also for sending via TCP. TCP is chosen when the message size exceeds MAX_UDP_ANSWER (from alfred.h), which is for now conservatively chosen to fit in bad-case MTU settings - or if the data request came via TCP (as the same connection is then used for the reply).
I've run this for a few hours in a test setup with 3x alfred as master and 30 slaves, with some errors, dups and latency thrown in for good measure (yay for VDE, virtual distributed ethernet).
The current implementation has a downside: non-blocking TCP sending for now assembles the full data that is to be sent in a memory buffer, which will then get sent bit by bit (depending on buffers, network and the remote side). This is a consequence of the lack of a good way to guarantee a consistent state of the data store over the time it takes until the full message stream is completely transmitted to the remote end (the data store might get modified between beginning and finishing transmission).
An alternative approach - and a major redesign of data storage - would be some kind of log-based approach. I'm not so sure if that really is warranted for, and in any case, that's quite a different undertaking.
The flag "-t" is still present and will toggle whether alfred will try to request using TCP at all. This will allow to use a TCP-enabled alfred in an environment that is not yet fully TCP enabled (this matters only for the master servers AFAICS).
-hwh
Hi,
first: many thanks for testing.
Am 2016-04-22 20:37, schrieb jens:
####### patch stuff ####### wget https://patchwork.open-mesh.org/patch/15944/raw/ -O tcp.patch patch < tcp.patch patching file alfred.h Hunk #1 succeeded at 90 (offset 1 line). Hunk #2 succeeded at 103 (offset 1 line). Hunk #3 succeeded at 131 (offset 1 line). Hunk #4 succeeded at 147 (offset 1 line). Hunk #5 succeeded at 170 (offset 1 line). Hunk #6 succeeded at 187 (offset 1 line). Hunk #7 succeeded at 200 (offset 1 line). Hunk #8 succeeded at 227 (offset 1 line). patching file main.c patching file netsock.c patching file recv.c patching file send.c patching file server.c patching file unix_sock.c Hunk #1 succeeded at 229 (offset 7 lines). Hunk #2 succeeded at 259 (offset 7 lines).
Is this patch run against git master's HEAD? Even then, offsets might occur, there were some minor changes since I've sent the patch.
####### build errors make CC main.o CC server.o CC client.o CC netsock.o CC send.o CC recv.o recv.c: In function 'recv_alfred_packet': recv.c:436:48: warning: passing argument 5 of 'process_alfred_request' makes pointer from integer without a cast (struct alfred_request_v0 *)packet, -1); ^ recv.c:302:12: note: expected 'struct tcp_connection *' but argument is of type 'int' static int process_alfred_request(struct globals *globals, ^
Oh dear. I'm not sure how my gcc managed to stay quiet at this. Also, I'm astonished it didn't blew to pieces during my tests. The "-1" is a leftover from an earlier implementation, where I still thought that I hadn't that much TCP connection state to carry around - so I passed socket fds. I no longer do that, and switched to a struct holding the state. In this case, the "-1" should be NULL. It seems to be the only oversight of this kind as far as a I can see.
I will update the patch shortly.
###### var/log/kern.log Apr 22 20:01:06 fffr-spielwiese kernel: [4398895.515177] alfred[31733]: segfault at 2f ip 0000000000406034 sp 00007fff9dc39580 error 6 in alfred[400000+d000] Apr 22 20:02:05 fffr-spielwiese kernel: [4398954.569665] alfred[32657]: segfault at 2f ip 0000000000406034 sp 00007ffc45b97d50 error 6 in alfred[400000+d000]
In the light of a dereference of a "-1" pointer, that doesn't come as a surprise.
-hwh
b.a.t.m.a.n@lists.open-mesh.org