diff --git a/src/algo_rr.c b/src/algo_rr.c index 33c7784..c251b10 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -29,6 +29,7 @@ struct rr_ctx { uint16_t recv_id_late; uint16_t sent_id; uint8_t current_link; + struct internet_packet prev_packet; struct timespec emit_time; struct deferred_pkt real[PACKET_BUFFER_SIZE]; struct waited_pkt wait[PACKET_BUFFER_SIZE]; @@ -57,6 +58,8 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, char buffer[16]; url_get_port (buffer, fdinfo->url); int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded + uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; + uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE; //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); @@ -75,10 +78,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, show_link_availability (rr); } - // 2. If packet arrived too late, we discard it - if (ring_ge(rr->recv_id, bp->ip.ap.str.id)) { + // 2. If packet arrived too late or already queued, we discard it + if (ring_ge(rr->recv_id, bp->ip.ap.str.id) || rr->real[real_idx].id == bp->ip.ap.str.id) { // Packet has already been delivered or dropped, we free the buffer - fprintf(stderr, "Packet %d arrived too late (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); + fprintf(stderr, "Packet %d arrived too late (current: %d) or already received\n", bp->ip.ap.str.id, rr->recv_id); mv_buffer_wtof (app_ctx, fdinfo); return; } @@ -90,7 +93,6 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, //printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout); if (timeout <= 0) timeout = 0; - uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE; if (rr->wait[wait_idx].on && rr->wait[wait_idx].id != bp->ip.ap.str.id - 1) { fprintf(stderr, "Waiting array overlap, BUG: [\n"); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { @@ -108,7 +110,6 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, } // 4. We queue the packet to keep it - uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.str.id) { fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { @@ -230,13 +231,19 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo bp->ip.ap.str.deltat = mili_sec; bp->ip.ap.str.bitfield = rr->remote_links; bp->ip.ap.str.prevlink = rr->current_link; + + if (app_ctx->ap.redundant_data == 1) { + append_buffer(&bp->ip.ap, 1, &rr->prev_packet.ap); // We append previous packet + append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time + bp->ap_count++; + } //printf("Will send packet id=%d\n", bp->ip.ap.str.id); rr->emit_time = curr; rr->sent_id++; // 2. Try to find someone to send it - int max = 10; + int max = 16; uint8_t sel_link = rr->current_link; while(max-- >= 0) { sel_link = (sel_link + 1) % app_ctx->ap.links; diff --git a/src/algo_utils.c b/src/algo_utils.c index 91d1742..fc1fa4b 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -235,6 +235,15 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } } +int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { + char* target = &(dest->raw); + while (pos-- > 0) { + target += dest->str.size; + } + memcpy(target, src, src->str.size); + return 0; +} + void naive_free_simple(void* v) { GQueue* g = v; g_queue_free (g); diff --git a/src/algo_utils.h b/src/algo_utils.h index 1f23c40..ce8d26a 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -53,6 +53,8 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); + struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx); diff --git a/src/donar.c b/src/donar.c index f6f3bdc..d9dd2d5 100644 --- a/src/donar.c +++ b/src/donar.c @@ -14,7 +14,7 @@ int main(int argc, char** argv) { struct donar_params dp = {0}; donar_init_params (&dp); - while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:")) != -1) { + while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:")) != -1) { switch(dp.opt) { case 'v': dp.verbose++; @@ -74,10 +74,10 @@ int main(int argc, char** argv) { in_error: dp.errored = 1; - fprintf(stderr, "Usage as client : %s -c -a [-h] [-b] -o -e [-e ...]* -r [-r ...]*\n", argv[0]); - fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] -e [-e ...]* -r [-r ...]*\n\n", argv[0]); - fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s\n", - dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file); + fprintf(stderr, "Usage as client : %s -c -a -o [-h] [-b] [-l ] [-d ,] [-e ]* [-r ]*\n", argv[0]); + fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] [-l ] [-d ,] [-e ]* [-r ]*\n\n", argv[0]); + fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n", + dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data); terminate: if (dp.onion_file != NULL) free(dp.onion_file); diff --git a/src/donar_init.c b/src/donar_init.c index 40d599d..3beaa39 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -152,6 +152,8 @@ void donar_init_params(struct donar_params* dp) { dp->is_waiting_bootstrap = 0; dp->errored = 0; dp->links = 2; + dp->fresh_data = 1; + dp->redundant_data = 0; dp->remote_ports = g_ptr_array_new_with_free_func (free_port); dp->exposed_ports = g_ptr_array_new_with_free_func (free_port); } diff --git a/src/donar_server.c b/src/donar_server.c index 79540f1..f8b7c8a 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -55,7 +55,10 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { struct algo_params ap = { .is_waiting_bootstrap = dp->is_waiting_bootstrap, .is_healing = dp->is_healing, - .algo_name = dp->algo + .algo_name = dp->algo, + .links = dp->links, + .fresh_data = dp->fresh_data, + .redundant_data = dp->redundant_data }; evt_core_init (&(ctx->evts), dp->verbose); diff --git a/src/packet.c b/src/packet.c index 2ce45ba..6d520dd 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,5 +1,13 @@ #include "packet.h" +size_t get_full_size(struct buffer_packet* bp) { + union abstract_packet* ap = &bp->ip.ap; + for (int i = 0; i < bp->ap_count; i++) { + ap = (union abstract_packet*)(&ap->raw + ap->str.size); + } + return &ap->raw - &bp->ip.ap.raw; +} + enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { ssize_t nread; size_t pkt_size_size = sizeof(bp->ip.ap.str.size); @@ -23,6 +31,7 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { bp->mode = BP_WRITING; bp->awrite = 0; + bp->ap_count = 1; return FDS_READY; } @@ -31,8 +40,8 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { ssize_t nwrite; if (bp->mode != BP_WRITING) return FDS_ERR; - while (bp->awrite < bp->ip.ap.str.size) { - nwrite = send(fd, &(bp->ip.ap.raw), bp->ip.ap.str.size, 0); + while (bp->awrite < get_full_size(bp)) { + nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; @@ -40,6 +49,7 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { bp->mode = BP_READING; bp->aread = 0; + bp->ap_count = 0; return FDS_READY; } @@ -70,6 +80,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t bp->mode = BP_READING; bp->aread = 0; + bp->ap_count = 0; return FDS_READY; } @@ -99,6 +110,7 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp bp->mode = BP_WRITING; bp->awrite = 0; + bp->ap_count = 1; return FDS_READY; } diff --git a/src/packet.h b/src/packet.h index e97821b..f07fd67 100644 --- a/src/packet.h +++ b/src/packet.h @@ -48,11 +48,12 @@ union abstract_packet { struct internet_packet { union abstract_packet ap; - char rest[1499]; // MTU = 1500, 1 byte in the union + char rest[1499]; // MTU = 1500, 1 byte in the union as payload }; struct buffer_packet { - uint8_t mode; + enum BP_MODE mode; + uint8_t ap_count; uint16_t aread; uint16_t awrite; struct internet_packet ip;