Repository : ssh://git@open-mesh.org/alfred
On branch : master
commit 5b8c4d4ad44f58de9e6d9c5e18a8ff263b3dbe8b Author: Sven Eckelmann sven@open-mesh.com Date: Fri Jan 18 11:40:05 2013 +0100
alfred: Don't block server when waiting for master reply
A slave server should not block completely when a unix client requested a data type and the master server doesn't immediately finishes his transaction.
Instead the unix client is stored as part of the transaction and either the finishing or purging of the transaction triggers a reply. This also removes the extra server loop in the unix socket code and allows to increase the data retrieval timeout from 1 second to 10 seconds.
Signed-off-by: Sven Eckelmann sven@open-mesh.com
5b8c4d4ad44f58de9e6d9c5e18a8ff263b3dbe8b alfred.h | 13 +++-- recv.c | 39 +++++++++----- server.c | 15 ++++++ unix_sock.c | 173 ++++++++++++++++++++++++++++++----------------------------- 4 files changed, 141 insertions(+), 99 deletions(-)
diff --git a/alfred.h b/alfred.h index 5e7a505..dce4733 100644 --- a/alfred.h +++ b/alfred.h @@ -33,7 +33,7 @@ #include "packet.h"
#define ALFRED_INTERVAL 10 -#define ALFRED_REQUEST_TIMEOUT 1 +#define ALFRED_REQUEST_TIMEOUT 10 #define ALFRED_SERVER_TIMEOUT 60 #define ALFRED_DATA_TIMEOUT 600 #define ALFRED_SOCK_PATH "/var/run/alfred.sock" @@ -62,8 +62,10 @@ struct transaction_packet { struct transaction_head { struct ether_addr server_addr; uint16_t id; + uint8_t requested_type; int finished; int num_packet; + int client_socket; struct timespec last_rx_time; struct list_head packet_list; }; @@ -123,8 +125,11 @@ int alfred_client_set_data(struct globals *globals); int recv_alfred_packet(struct globals *globals); struct transaction_head * transaction_add(struct globals *globals, struct ether_addr mac, uint16_t id); -struct transaction_head * transaction_clean(struct globals *globals, - struct transaction_head *search); +struct transaction_head * +transaction_clean_hash(struct globals *globals, + struct transaction_head *search); +struct transaction_head *transaction_clean(struct globals *globals, + struct transaction_head *head); /* send.c */ int push_data(struct globals *globals, struct in6_addr *destination, enum data_source max_source_level, int type_filter, @@ -139,6 +144,8 @@ int unix_sock_read(struct globals *globals); int unix_sock_open_daemon(struct globals *globals, char *path); int unix_sock_open_client(struct globals *globals, char *path); int unix_sock_close(struct globals *globals); +int unix_sock_req_data_finish(struct globals *globals, + struct transaction_head *head); /* vis.c */ int vis_update_data(struct globals *globals); /* netsock.c */ diff --git a/recv.c b/recv.c index d2accf4..0170695 100644 --- a/recv.c +++ b/recv.c @@ -113,8 +113,11 @@ transaction_add(struct globals *globals, struct ether_addr mac, uint16_t id)
head->server_addr = mac; head->id = id; + head->requested_type = 0; head->finished = 0; head->num_packet = 0; + head->client_socket = -1; + clock_gettime(CLOCK_MONOTONIC, &head->last_rx_time); INIT_LIST_HEAD(&head->packet_list); if (hash_add(globals->transaction_hash, head)) { free(head); @@ -124,15 +127,10 @@ transaction_add(struct globals *globals, struct ether_addr mac, uint16_t id) return head; }
-struct transaction_head * transaction_clean(struct globals *globals, - struct transaction_head *search) +struct transaction_head *transaction_clean(struct globals *globals, + struct transaction_head *head) { struct transaction_packet *transaction_packet, *safe; - struct transaction_head *head; - - head = hash_find(globals->transaction_hash, search); - if (!head) - return head;
list_for_each_entry_safe(transaction_packet, safe, &head->packet_list, list) { @@ -141,7 +139,20 @@ struct transaction_head * transaction_clean(struct globals *globals, free(transaction_packet); }
- return hash_remove(globals->transaction_hash, search); + hash_remove(globals->transaction_hash, head); + return head; +} + +struct transaction_head * +transaction_clean_hash(struct globals *globals, struct transaction_head *search) +{ + struct transaction_head *head; + + head = hash_find(globals->transaction_hash, search); + if (!head) + return head; + + return transaction_clean(globals, head); }
static int process_alfred_push_data(struct globals *globals, @@ -340,10 +351,14 @@ static int process_alfred_status_txend(struct globals *globals, free(transaction_packet); }
- /* master mode only syncs. no client is waiting the finished - * transaction */ - if (globals->opmode == OPMODE_MASTER) - transaction_clean(globals, &search); + head = transaction_clean_hash(globals, &search); + if (!head) + return -1; + + if (head->client_socket < 0) + free(head); + else + unix_sock_req_data_finish(globals, head);
return 0; } diff --git a/server.c b/server.c index 8f4c37d..02eab42 100644 --- a/server.c +++ b/server.c @@ -192,6 +192,21 @@ static int purge_data(struct globals *globals) if (!globals->best_server) set_best_server(globals);
+ while ((hashit = hash_iterate(globals->transaction_hash, hashit))) { + struct transaction_head *head = hashit->bucket->data; + + time_diff(&now, &head->last_rx_time, &diff); + if (diff.tv_sec < ALFRED_REQUEST_TIMEOUT) + continue; + + hash_remove_bucket(globals->transaction_hash, hashit); + transaction_clean(globals, head); + if (head->client_socket < 0) + free(head); + else + unix_sock_req_data_finish(globals, head); + } + return 0; }
diff --git a/unix_sock.c b/unix_sock.c index 206b43d..93f84d4 100644 --- a/unix_sock.c +++ b/unix_sock.c @@ -91,42 +91,43 @@ int unix_sock_open_client(struct globals *globals, char *path)
static int unix_sock_add_data(struct globals *globals, - struct alfred_push_data_v0 *push) + struct alfred_push_data_v0 *push, + int client_sock) { struct alfred_data *data; struct dataset *dataset; - int len, data_len; + int len, data_len, ret = -1;
len = ntohs(push->header.length);
if (len < (int)(sizeof(*push) + sizeof(push->header))) - return -1; + goto err;
/* subtract rest of push header */ len -= sizeof(*push) - sizeof(push->header);
if (len < (int)(sizeof(*data))) - return -1; + goto err;
data = push->data; data_len = ntohs(data->header.length); memcpy(data->source, &globals->hwaddr, sizeof(globals->hwaddr));
if ((int)(data_len + sizeof(*data)) > len) - return -1; + goto err;
dataset = hash_find(globals->data_hash, data); if (!dataset) { dataset = malloc(sizeof(*dataset)); if (!dataset) - return -1; + goto err;
dataset->buf = NULL;
memcpy(&dataset->data, data, sizeof(*data)); if (hash_add(globals->data_hash, dataset)) { free(dataset); - return -1; + goto err; } } dataset->data_source = SOURCE_LOCAL; @@ -138,102 +139,39 @@ static int unix_sock_add_data(struct globals *globals, dataset->buf = malloc(data_len); /* that's not good */ if (!dataset->buf) - return -1; + goto err;
dataset->data.header.length = data_len; memcpy(dataset->buf, data->data, data_len);
- return 0; + ret = 0; +err: + close(client_sock); + return ret; }
-static int unix_sock_req_data(struct globals *globals, - struct alfred_request_v0 *request, - int client_sock) +static int unix_sock_req_data_reply(struct globals *globals, int client_sock, + uint16_t id, uint8_t requested_type) { + int len; + struct alfred_push_data_v0 *push; struct hash_it_t *hashit = NULL; - struct timespec tv, last_check, now; - fd_set fds; - int ret, len; uint8_t buf[MAX_PAYLOAD]; - struct alfred_push_data_v0 *push; uint16_t seqno = 0; - uint16_t id; - struct transaction_head search, *head = NULL; - struct alfred_status_v0 status; - - len = ntohs(request->header.length); - - if (len != (sizeof(*request) - sizeof(request->header))) - return -1; - - /* no server to send the request to, only give back what we have now. */ - if (!globals->best_server) - goto send_reply; - - /* a master already has data to respond with */ - if (globals->opmode == OPMODE_MASTER) - goto send_reply; - - id = ntohs(request->tx_id); - head = transaction_add(globals, globals->best_server->hwaddr, id); - if (!head) - return -1; - - search.server_addr = globals->best_server->hwaddr; - search.id = id; - - send_alfred_packet(globals, &globals->best_server->address, - request, sizeof(*request)); - - /* process incoming packets ... */ - FD_ZERO(&fds); - clock_gettime(CLOCK_MONOTONIC, &last_check); - - while (1) { - clock_gettime(CLOCK_MONOTONIC, &now); - now.tv_sec -= ALFRED_REQUEST_TIMEOUT; - if (!time_diff(&last_check, &now, &tv) || head->finished) - break; - - FD_SET(globals->netsock, &fds); - - ret = pselect(globals->netsock + 1, &fds, NULL, NULL, &tv, - NULL); - - if (ret == -1) { - fprintf(stderr, "select failed ...: %s\n", - strerror(errno)); - return -1; - } - - if (FD_ISSET(globals->netsock, &fds)) - recv_alfred_packet(globals); - } - - head = transaction_clean(globals, &search); - if (!head || head->finished != 1) { - free(head); - goto reply_error; - } - -send_reply: - - if (globals->opmode != OPMODE_MASTER) - free(head);
/* send some data back through the unix socket */
push = (struct alfred_push_data_v0 *)buf; push->header.type = ALFRED_PUSH_DATA; push->header.version = ALFRED_VERSION; - push->tx.id = request->tx_id; + push->tx.id = htons(id);
while (NULL != (hashit = hash_iterate(globals->data_hash, hashit))) { struct dataset *dataset = hashit->bucket->data; struct alfred_data *data;
- if (dataset->data.header.type != request->requested_type) + if (dataset->data.header.type != requested_type) continue;
data = push->data; @@ -248,18 +186,80 @@ send_reply: write(client_sock, buf, sizeof(*push) + len); }
+ close(client_sock); + + return 0; +} + +static int unix_sock_req_data(struct globals *globals, + struct alfred_request_v0 *request, + int client_sock) +{ + int len; + uint16_t id; + struct transaction_head *head = NULL; + + len = ntohs(request->header.length); + + if (len != (sizeof(*request) - sizeof(request->header))) + return -1; + + id = ntohs(request->tx_id); + + /* no server to send the request to, only give back what we have now. */ + if (!globals->best_server) + return unix_sock_req_data_reply(globals, client_sock, id, + request->requested_type); + + /* a master already has data to respond with */ + if (globals->opmode == OPMODE_MASTER) + return unix_sock_req_data_reply(globals, client_sock, id, + request->requested_type); + + head = transaction_add(globals, globals->best_server->hwaddr, id); + if (!head) + return -1; + + head->client_socket = client_sock; + head->requested_type = request->requested_type; + + send_alfred_packet(globals, &globals->best_server->address, + request, sizeof(*request)); + return 0; +}
-reply_error: +int unix_sock_req_data_finish(struct globals *globals, + struct transaction_head *head) +{ + struct alfred_status_v0 status; + int send_data = 1; + int client_sock; + uint16_t id; + uint8_t requested_type; + + requested_type = head->requested_type; + id = head->id; + client_sock = head->client_socket; + if (head->finished != 1) + send_data = 0;
free(head); + + if (send_data) { + unix_sock_req_data_reply(globals, client_sock, id, + requested_type); + return 0; + } + status.header.type = ALFRED_STATUS_ERROR; status.header.version = ALFRED_VERSION; status.header.length = htons(sizeof(status) - sizeof(status.header)); - status.tx.id = request->tx_id; + status.tx.id = htons(id); status.tx.seqno = 1; write(client_sock, &status, sizeof(status));
+ close(client_sock); return 0; }
@@ -306,7 +306,8 @@ int unix_sock_read(struct globals *globals) switch (packet->type) { case ALFRED_PUSH_DATA: ret = unix_sock_add_data(globals, - (struct alfred_push_data_v0 *)packet); + (struct alfred_push_data_v0 *)packet, + client_sock); break; case ALFRED_REQUEST: ret = unix_sock_req_data(globals, @@ -316,7 +317,11 @@ int unix_sock_read(struct globals *globals) default: /* unknown packet type */ ret = -1; + goto err; } + + return ret; + err: close(client_sock); return ret;