Repository : ssh://git@open-mesh.org/alfred
On branch : master
commit 9d830cd0d602826e0a893ba0bf43cf44fa5a4d49 Author: Sven Eckelmann sven@open-mesh.com Date: Thu Jan 17 14:24:41 2013 +0100
alfred: Only apply changes after txend status was received
A transaction has to be handled in a atomic way. Either the complete data was received or it wasn't. The simplest method is to use a buffer to hold all unconfirmed data instead of using a redo/undo log. All data in this transaction buffer will be written after the transaction was confirmed or in case of an failure just dropped.
Signed-off-by: Sven Eckelmann sven@open-mesh.com
9d830cd0d602826e0a893ba0bf43cf44fa5a4d49 alfred.h | 16 +++++++ recv.c | 144 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++---- server.c | 41 +++++++++++++++++- 3 files changed, 191 insertions(+), 10 deletions(-)
diff --git a/alfred.h b/alfred.h index 6a7045b..ae2dd7c 100644 --- a/alfred.h +++ b/alfred.h @@ -29,6 +29,7 @@ #include <netinet/in.h> #include <time.h> #include "hash.h" +#include "list.h" #include "packet.h"
#define ALFRED_INTERVAL 10 @@ -53,6 +54,20 @@ struct dataset { uint8_t local_data; };
+struct transaction_packet { + struct alfred_push_data_v0 *push; + struct list_head list; +}; + +struct transaction_head { + struct ether_addr server_addr; + uint16_t id; + int finished; + int num_packet; + struct timespec last_rx_time; + struct list_head packet_list; +}; + struct server { struct ether_addr hwaddr; struct in6_addr address; @@ -88,6 +103,7 @@ struct globals {
struct hashtable_t *server_hash; struct hashtable_t *data_hash; + struct hashtable_t *transaction_hash; };
#define debugMalloc(size, num) malloc(size) diff --git a/recv.c b/recv.c index fd25f46..24e8d47 100644 --- a/recv.c +++ b/recv.c @@ -33,20 +33,14 @@ #include "alfred.h" #include "batadv_query.h"
-static int process_alfred_push_data(struct globals *globals, - struct in6_addr *source, - struct alfred_push_data_v0 *push) +static int finish_alfred_push_data(struct globals *globals, + struct ether_addr mac, + struct alfred_push_data_v0 *push) { int len, data_len; struct alfred_data *data; struct dataset *dataset; uint8_t *pos; - struct ether_addr mac; - int ret; - - ret = ipv6_to_mac(source, &mac); - if (ret < 0) - goto err;
len = ntohs(push->header.length); len -= sizeof(*push) - sizeof(push->header); @@ -108,6 +102,79 @@ err: return -1; }
+static int process_alfred_push_data(struct globals *globals, + struct in6_addr *source, + struct alfred_push_data_v0 *push) +{ + int len; + struct ether_addr mac; + int ret; + struct transaction_head search, *head; + struct transaction_packet *transaction_packet; + int found; + + ret = ipv6_to_mac(source, &mac); + if (ret < 0) + goto err; + + len = ntohs(push->header.length); + + search.server_addr = mac; + search.id = ntohs(push->tx.id); + + head = hash_find(globals->transaction_hash, &search); + if (!head) { + head = malloc(sizeof(*head)); + if (!head) + goto err; + + head->server_addr = mac; + head->id = ntohs(push->tx.id); + head->finished = 0; + head->num_packet = 0; + INIT_LIST_HEAD(&head->packet_list); + if (hash_add(globals->transaction_hash, head)) { + free(head); + goto err; + } + } + clock_gettime(CLOCK_MONOTONIC, &head->last_rx_time); + + /* this transaction was already finished/dropped */ + if (head->finished != 0) + return -1; + + found = 0; + list_for_each_entry(transaction_packet, &head->packet_list, list) { + if (transaction_packet->push->tx.seqno == push->tx.seqno) { + found = 1; + break; + } + } + + /* it seems the packet was duplicated */ + if (found) + return 0; + + transaction_packet = malloc(sizeof(*transaction_packet)); + if (!transaction_packet) + goto err; + + transaction_packet->push = malloc(len + sizeof(push->header)); + if (!transaction_packet->push) { + free(transaction_packet); + goto err; + } + + memcpy(transaction_packet->push, push, len + sizeof(push->header)); + list_add_tail(&transaction_packet->list, &head->packet_list); + head->num_packet++; + + return 0; +err: + return -1; +} + static int process_alfred_announce_master(struct globals *globals, struct in6_addr *source, @@ -186,6 +253,62 @@ static int process_alfred_request(struct globals *globals, }
+static int process_alfred_status_txend(struct globals *globals, + struct in6_addr *source, + struct alfred_status_v0 *request) +{ + + struct transaction_head search, *head; + struct transaction_packet *transaction_packet, *safe; + struct ether_addr mac; + int len, ret; + + len = ntohs(request->header.length); + + if (request->header.version != ALFRED_VERSION) + return -1; + + if (len != (sizeof(*request) - sizeof(request->header))) + return -1; + + ret = ipv6_to_mac(source, &mac); + if (ret < 0) + return -1; + + search.server_addr = mac; + search.id = ntohs(request->tx.id); + + head = hash_find(globals->transaction_hash, &search); + if (!head) + return -1; + + /* this transaction was already finished/dropped */ + if (head->finished != 0) + return -1; + + /* missing packets -> cleanup everything */ + if (head->num_packet != ntohs(request->tx.seqno)) + head->finished = -1; + else + head->finished = 1; + + list_for_each_entry_safe(transaction_packet, safe, &head->packet_list, + list) { + if (head->finished == 1) + finish_alfred_push_data(globals, mac, + transaction_packet->push); + + list_del(&transaction_packet->list); + free(transaction_packet->push); + free(transaction_packet); + } + + hash_remove(globals->transaction_hash, &search); + + return 0; +} + + int recv_alfred_packet(struct globals *globals) { uint8_t buf[MAX_PAYLOAD]; @@ -235,6 +358,9 @@ int recv_alfred_packet(struct globals *globals) process_alfred_request(globals, &source.sin6_addr, (struct alfred_request_v0 *)packet); break; + case ALFRED_STATUS_TXEND: + process_alfred_status_txend(globals, &source.sin6_addr, + (struct alfred_status_v0 *)packet); default: /* unknown packet type */ return -1; diff --git a/server.c b/server.c index acd8a9c..8f4c37d 100644 --- a/server.c +++ b/server.c @@ -85,13 +85,52 @@ static int data_choose(void *d1, int size) return hash % size; }
+static int tx_compare(void *d1, void *d2) +{ + struct transaction_head *txh1 = d1; + struct transaction_head *txh2 = d2; + + if (memcmp(&txh1->server_addr, &txh2->server_addr, + sizeof(txh1->server_addr)) == 0 && + txh1->id == txh2->id) + return 1; + else + return 0; +} + +static int tx_choose(void *d1, int size) +{ + struct transaction_head *txh1 = d1; + unsigned char *key = (unsigned char *)&txh1->server_addr; + uint32_t hash = 0; + size_t i; + + for (i = 0; i < ETH_ALEN; i++) { + hash += key[i]; + hash += (hash << 10); + hash ^= (hash >> 6); + } + + hash += txh1->id; + hash += (hash << 10); + hash ^= (hash >> 6); + + hash += (hash << 3); + hash ^= (hash >> 11); + hash += (hash << 15); + + return hash % size; +} +
static int create_hashes(struct globals *globals) { globals->server_hash = hash_new(64, server_compare, server_choose); globals->data_hash = hash_new(128, data_compare, data_choose); - if (!globals->server_hash || !globals->data_hash) + globals->transaction_hash = hash_new(64, tx_compare, tx_choose); + if (!globals->server_hash || !globals->data_hash || + !globals->transaction_hash) return -1;
return 0;