From 6a22df4ac120f2c8194b5a2ccce22e0871e1ba36 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 11:48:09 +0100 Subject: [PATCH 01/17] Improve algorithm logging --- src/algo_rr.c | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index c4136cc..bf61f3a 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -72,6 +72,20 @@ co_error: exit(EXIT_FAILURE); } +void show_link_availability(struct rr_ctx* rr) { + printf("Links availability: my_links["); + for (int i = 0; i < 8; i++) { + if (rr->my_links & 1 << i) printf("U"); + else printf("-"); + } + printf("], rem_links["); + for (int i = 0; i < 8; i++) { + if (rr->remote_links & 1 << i) printf("U"); + else printf("-"); + } + printf("]\n"); +} + int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt* wpkt) { struct timespec now; struct itimerspec timer_config; @@ -121,12 +135,6 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, if (bp->ip.ap.str.id > rr->my_links_ver) { rr->my_links = bp->ip.ap.str.bitfield; rr->my_links_ver = bp->ip.ap.str.id; - printf("Links availability: ["); - for (int i = 0; i < 8; i++) { - if (rr->my_links & 1 << i) printf("U"); - else printf("-"); - } - printf("]\n"); } // 2. If packet arrived too late, we discard it @@ -161,6 +169,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, char buffer[16]; url_get_port (buffer, fdinfo->url); int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded + if (!(rr->remote_links & 1 << link_num)) { + printf("Activate link=%d\n", link_num); + show_link_availability (rr); + } rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working } @@ -322,7 +334,8 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { while(1) { if (max-- < 0) break; rr->current_link = (rr->current_link + 1) % 10; - if (!(rr->my_links & (1 << rr->current_link))) continue; + //if (!(rr->my_links & (1 << rr->current_link))) continue; + if (!(rr->remote_links & (1 << rr->current_link))) continue; sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + rr->current_link); //@FIXME Hardcoded //printf("-- Trying %s\n", url); to_fdinfo = evt_core_get_from_url (ctx, url); @@ -391,6 +404,8 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { printf("Timer reached for packet %d\n", pkt->id); // !BLACKLIST LINK + printf("Blacklist link=%d\n", pkt->link_num); + show_link_availability (rr); rr->remote_links &= 0xff ^ 1 << pkt->link_num; while (ring_lt(rr->recv_id, pkt->id)) { From 83b205e2c86f17a4fc1a607485939cf88ebbe525 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 14:17:41 +0100 Subject: [PATCH 02/17] improve readability --- src/algo_rr.c | 17 +++++++---------- 1 file changed, 7 insertions(+), 10 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index bf61f3a..2a5c5b1 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -169,11 +169,11 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, char buffer[16]; url_get_port (buffer, fdinfo->url); int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded + rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working if (!(rr->remote_links & 1 << link_num)) { printf("Activate link=%d\n", link_num); show_link_availability (rr); } - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working } void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { @@ -332,17 +332,14 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int max = 10; while(1) { - if (max-- < 0) break; - rr->current_link = (rr->current_link + 1) % 10; - //if (!(rr->my_links & (1 << rr->current_link))) continue; - if (!(rr->remote_links & (1 << rr->current_link))) continue; + if (max-- < 0) break; // We have an error + rr->current_link = (rr->current_link + 1) % 8; + if (!(rr->my_links & (1 << rr->current_link))) continue; + //if (!(rr->remote_links & (1 << rr->current_link))) continue; sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + rr->current_link); //@FIXME Hardcoded //printf("-- Trying %s\n", url); to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo != NULL) { - //printf("Selected url %s for pkt %d to be sent on Tor\n", url, bp->ip.ap.str.id); - break; - } + if (to_fdinfo != NULL) break; // We found a good candidate } rr->emit_time = curr; rr->sent_id++; @@ -405,8 +402,8 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { // !BLACKLIST LINK printf("Blacklist link=%d\n", pkt->link_num); - show_link_availability (rr); rr->remote_links &= 0xff ^ 1 << pkt->link_num; + show_link_availability (rr); while (ring_lt(rr->recv_id, pkt->id)) { rr->recv_id++; From acae81b91c8e8675097485829b08e2753f39eedc Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 16:24:39 +0100 Subject: [PATCH 03/17] WIP link healing --- src/algo_rr.c | 92 ++++++++++++++++++++++++++++-------------------- src/algo_utils.c | 49 ++++++++++++++++++++------ src/algo_utils.h | 1 + src/net_tools.c | 20 +++++++++-- src/net_tools.h | 2 ++ 5 files changed, 114 insertions(+), 50 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 2a5c5b1..b8b7469 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -22,6 +22,7 @@ struct rr_ctx { uint8_t remote_links; int64_t mjit; uint16_t recv_id; + uint16_t recv_id_late; uint16_t sent_id; uint8_t current_link; struct timespec emit_time; @@ -128,10 +129,20 @@ int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; + char buffer[16]; + url_get_port (buffer, fdinfo->url); + int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); + printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); - // 1. Update links I can use thanks to target feedback + // 0. Update remote links + if (ring_lt(rr->recv_id_late, bp->ip.ap.str.id) && !(rr->remote_links & 1 << link_num)) { + printf("Activate link=%d\n", link_num); + rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working + show_link_availability (rr); + } + + // 1. Update my links I can use thanks to target feedback if (bp->ip.ap.str.id > rr->my_links_ver) { rr->my_links = bp->ip.ap.str.bitfield; rr->my_links_ver = bp->ip.ap.str.id; @@ -158,22 +169,22 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr->wait[idx_waited].timer_fd = set_timeout(ctx, timeout, &rr->wait[idx_waited]); } - // 4. We queue the packet + // 4. If packet has not already a timer or has a wrong timer int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; + if (rr->wait[idx_real].on && rr->wait[idx_real].id != bp->ip.ap.str.id) { + fprintf(stderr, "Waiting array is full, BUG\n"); + } else if (!rr->wait[idx_real].on) { + rr->wait[idx_real].on = 1; + rr->wait[idx_real].id = bp->ip.ap.str.id; + rr->wait[idx_real].link_num = link_num; + rr->wait[idx_real].timer_fd = set_timeout(ctx, rr->mjit + 1, &rr->wait[idx_real]); + } + + // 5. We queue the packet rr->real[idx_real].on = 1; rr->real[idx_real].idx = idx_real; rr->real[idx_real].link_fd = fdinfo->fd; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); - - // 5. We make sure that the remote link is set to up - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working - if (!(rr->remote_links & 1 << link_num)) { - printf("Activate link=%d\n", link_num); - show_link_availability (rr); - } } void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { @@ -187,18 +198,19 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); - //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); + printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor rr->recv_id = bp->ip.ap.str.id; // 4. We check that we don't have a running timeout - int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; + // We want to keep timer until the end to allow link update on multi receive + /*int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; if (rr->wait[idx_real].on) { rr->wait[idx_real].on = 0; evt_core_rm_fd (ctx, rr->wait[idx_real].timer_fd); printf("Removed timer for packet %d\n",bp->ip.ap.str.id); - } + }*/ // 5. We free the buffer if it's a control packet and quit if (bp->ip.ap.str.flags & PKT_CONTROL) { @@ -227,6 +239,7 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct buffer_packet* bp = NULL; while(1) { + printf("Trying to deliver %d\n", rr->recv_id+1); struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; fdinfo = evt_core_get_from_fd (ctx, def->link_fd); @@ -237,6 +250,7 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } rr_deliver(ctx, fdinfo, def); + printf("Delivered %d\n", rr->recv_id); } } @@ -329,33 +343,32 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { bp->ip.ap.str.deltat = mili_sec; bp->ip.ap.str.bitfield = rr->remote_links; bp->ip.ap.str.prevlink = rr->current_link; + printf("Will send packet id=%d\n", bp->ip.ap.str.id); - int max = 10; - while(1) { - if (max-- < 0) break; // We have an error - rr->current_link = (rr->current_link + 1) % 8; - if (!(rr->my_links & (1 << rr->current_link))) continue; - //if (!(rr->remote_links & (1 << rr->current_link))) continue; - sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + rr->current_link); //@FIXME Hardcoded - //printf("-- Trying %s\n", url); - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo != NULL) break; // We found a good candidate - } rr->emit_time = curr; rr->sent_id++; - // 4. A whole packet has been read, we will find someone to write it - if (to_fdinfo == NULL) { - fprintf(stderr, "No fd for URL %s in udp-read. Dropping packet :( \n", fdinfo->url); - mv_buffer_wtof (app_ctx, fdinfo); - return 1; + int max = 10; + uint8_t sel_link = rr->current_link; + while(max-- >= 0) { + sel_link = (sel_link + 1) % 8; + sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded + to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) continue; // Missing link + if (rr->my_links & (1 << sel_link)) { + rr->current_link = sel_link; + mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + rr_on_tcp_write(ctx, to_fdinfo); + return 0; + } else { + dup_buffer_tow(app_ctx, bp, to_fdinfo); + rr_on_tcp_write(ctx, to_fdinfo); + } } - //printf("Pass packet from %s to %s\n", fdinfo->url, url); - - // 5. We move the buffer and notify the target - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); - rr_on_tcp_write(ctx, to_fdinfo); + // 4. A whole packet has been read, we will find someone to write it + fprintf(stderr, "No fd for URL %s in udp-read. Dropping packet :( \n", fdinfo->url); + mv_buffer_wtof (app_ctx, fdinfo); return 0; co_error: @@ -396,7 +409,8 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct waited_pkt* pkt = fdinfo->other; evt_core_rm_fd(ctx, fdinfo->fd); pkt->on = 0; - if (ring_lt(pkt->id, rr->recv_id)) return 1; + if (ring_gt (pkt->id, rr->recv_id_late)) rr->recv_id_late = pkt->id; + if (ring_le (pkt->id, rr->recv_id)) return 1; printf("Timer reached for packet %d\n", pkt->id); @@ -455,6 +469,8 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { rr->my_links = 0xff; rr->remote_links = 0xff; rr->sent_id = 1; + rr->recv_id = 0; + rr->recv_id_late = 0; ctx->misc = rr; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); diff --git a/src/algo_utils.c b/src/algo_utils.c index f8b7a04..b483c46 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -21,6 +21,19 @@ void iterate2(int* fd, struct buffer_packet *bp, gpointer user_data) { fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd); } +void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { + fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); + int waiting_count = 0; + g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); + g_hash_table_foreach(app_ctx->used_buffer, (GHFunc)iterate2, NULL); + fprintf(stderr, "total_buffers=%d, free_buffer=%d, used_buffers=%d, app_buffer=%d, write_buffer=%d.\n", + PACKET_BUFFER_SIZE, + app_ctx->free_buffer->length, + g_hash_table_size(app_ctx->used_buffer), + g_hash_table_size(app_ctx->application_waiting), + waiting_count); +} + /** * Returns a buffer if available, NULL otherwise */ @@ -34,16 +47,8 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_ // 2. Get a new buffer otherwise bp = g_queue_pop_head(app_ctx->free_buffer); if (bp == NULL) { - fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); - int waiting_count = 0; - g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); - g_hash_table_foreach(app_ctx->used_buffer, (GHFunc)iterate2, NULL); - fprintf(stderr, "total_buffers=%d, free_buffer=%d, used_buffers=%d, app_buffer=%d, write_buffer=%d.\n", - PACKET_BUFFER_SIZE, - app_ctx->free_buffer->length, - g_hash_table_size(app_ctx->used_buffer), - g_hash_table_size(app_ctx->application_waiting), - waiting_count); + debug_buffer(app_ctx, fdinfo); + // 2.1 If no buffer is available, we subscribe to be notified later g_queue_push_tail (app_ctx->read_waiting, &(fdinfo->fd)); return NULL; @@ -165,6 +170,30 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { g_queue_push_tail (app_ctx->free_buffer, bp); } +void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { + GQueue* q; + + // 1. We get a free buffer + struct buffer_packet* bp_dest = g_queue_pop_head(app_ctx->free_buffer); + if (bp_dest == NULL) { + debug_buffer(app_ctx, to); + return; + } + + // 2. We duplicate the data + memcpy(bp_dest, bp, sizeof(struct buffer_packet)); + + // 3. We get the target writing queue + q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); + if (q == NULL) { + q = g_queue_new (); + g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); + } + + // 4. We push the content to the appropriate destination + g_queue_push_tail(q, bp_dest); +} + struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { return g_hash_table_lookup (app_ctx->application_waiting, idx); } diff --git a/src/algo_utils.h b/src/algo_utils.h index 815e2b8..3fc55cc 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -24,6 +24,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); +void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/net_tools.c b/src/net_tools.c index cf1c419..18b5890 100644 --- a/src/net_tools.c +++ b/src/net_tools.c @@ -44,7 +44,15 @@ int create_ip_client(char* host, char* service, int type) { } int create_tcp_client(char* host, char* service) { - return create_ip_client (host, service, SOCK_STREAM); + int sock = create_ip_client (host, service, SOCK_STREAM); + int activate = 1; + int err; + err = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &activate, sizeof(activate)); + if (err < 0) { + perror("setsockopt TCP_NODELAY"); + exit(EXIT_FAILURE); + } + return sock; } int create_udp_client(char* host, char* service) { @@ -93,7 +101,15 @@ int create_ip_server(char* host, char* service, int type) { } int create_tcp_server(char* host, char* service) { - return create_ip_server (host, service, SOCK_STREAM); + int sock = create_ip_server (host, service, SOCK_STREAM); + int activate = 1; + int err; + err = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &activate, sizeof(activate)); + if (err < 0) { + perror("setsockopt TCP_NODELAY"); + exit(EXIT_FAILURE); + } + return sock; } int create_udp_server(char* host, char* service) { diff --git a/src/net_tools.h b/src/net_tools.h index a847b9a..0975ebd 100644 --- a/src/net_tools.h +++ b/src/net_tools.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include int create_tcp_client(char* host, char* service); int create_udp_client(char* host, char* service); From b6258759f0fc3b2ea9971bed20c65e36b48c7f63 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 17:31:46 +0100 Subject: [PATCH 04/17] Something that might work --- src/algo_rr.c | 12 +++++++++++- src/algo_utils.c | 3 ++- src/meas_lat.c | 2 +- src/net_tools.c | 1 + 4 files changed, 15 insertions(+), 3 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index b8b7469..792c820 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -403,11 +403,20 @@ co_error: } int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + uint64_t ctr; + ssize_t tmr_rd; + tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr)); + if (tmr_rd == -1 && errno == EAGAIN) return 1; + if (tmr_rd < 0) { + perror("read on timer"); + fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd); + exit(EXIT_FAILURE); + } + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; struct waited_pkt* pkt = fdinfo->other; - evt_core_rm_fd(ctx, fdinfo->fd); pkt->on = 0; if (ring_gt (pkt->id, rr->recv_id_late)) rr->recv_id_late = pkt->id; if (ring_le (pkt->id, rr->recv_id)) return 1; @@ -424,6 +433,7 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { rr_pkt_unroll (ctx, app_ctx); } + evt_core_rm_fd(ctx, fdinfo->fd); return 1; } diff --git a/src/algo_utils.c b/src/algo_utils.c index b483c46..2db616a 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -126,7 +126,8 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void } g_hash_table_remove(app_ctx->used_buffer, &from->fd); if (g_hash_table_contains(app_ctx->application_waiting, to)) { - fprintf(stderr, "Data already exist for this entry\n"); + fprintf(stderr, "Data already exists for this entry\n"); + debug_buffer(app_ctx, from); exit(EXIT_FAILURE); } g_hash_table_insert(app_ctx->application_waiting, to, bp); diff --git a/src/meas_lat.c b/src/meas_lat.c index ebf39be..1eb76ed 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -96,7 +96,7 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct measure_conf* mc = fdinfo->other; s = read(fdinfo->fd, &ticks, sizeof(uint64_t)); - if (s == -1 && errno == EAGAIN) return 0; + if (s == -1 && errno == EAGAIN) return 1; if (s != sizeof(uint64_t)) { perror("Read error"); exit(EXIT_FAILURE); diff --git a/src/net_tools.c b/src/net_tools.c index 18b5890..0b01658 100644 --- a/src/net_tools.c +++ b/src/net_tools.c @@ -109,6 +109,7 @@ int create_tcp_server(char* host, char* service) { perror("setsockopt TCP_NODELAY"); exit(EXIT_FAILURE); } + return sock; } From da6815ec62c7606f013d7a155e48daca104b1d5a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 17:37:34 +0100 Subject: [PATCH 05/17] Fix memleak --- src/socks5.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/socks5.c b/src/socks5.c index a129cd4..f8f27ea 100644 --- a/src/socks5.c +++ b/src/socks5.c @@ -38,7 +38,7 @@ void socks5_create_dns_client(struct evt_core_ctx* ctx, char* proxy_host, char* memset(fdinfo.other, 0, sizeof(struct socks5_ctx)); fdinfo.free_other = socks5_free_ctx; sprintf(url, "socks5:send-hs:%s:%d", addr, port); - fdinfo.url = strdup(url); + fdinfo.url = url; // 3. Fill socks5_ctx structures s5ctx = fdinfo.other; From 283dfd82e18ea6d66b95799832b74f8fd9f15931 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 17:41:03 +0100 Subject: [PATCH 06/17] Fix another memory leak --- src/socks5.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/socks5.c b/src/socks5.c index f8f27ea..8f1f5b0 100644 --- a/src/socks5.c +++ b/src/socks5.c @@ -2,6 +2,7 @@ void socks5_free_ctx(void* elem) { struct socks5_ctx* ctx = elem; + free(ctx->addr); free(ctx); } From c6f7199bf6b7c5995a2d1b98d40ea7ab1460628b Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 17:46:10 +0100 Subject: [PATCH 07/17] Fix timer bug + comment some logs --- src/algo_rr.c | 28 ++++++++++------------------ 1 file changed, 10 insertions(+), 18 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 792c820..1c2acb3 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -133,7 +133,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, url_get_port (buffer, fdinfo->url); int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); + //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); // 0. Update remote links if (ring_lt(rr->recv_id_late, bp->ip.ap.str.id) && !(rr->remote_links & 1 << link_num)) { @@ -198,27 +198,18 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); - printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); + //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor rr->recv_id = bp->ip.ap.str.id; - // 4. We check that we don't have a running timeout - // We want to keep timer until the end to allow link update on multi receive - /*int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - if (rr->wait[idx_real].on) { - rr->wait[idx_real].on = 0; - evt_core_rm_fd (ctx, rr->wait[idx_real].timer_fd); - printf("Removed timer for packet %d\n",bp->ip.ap.str.id); - }*/ - - // 5. We free the buffer if it's a control packet and quit + // 4. We free the buffer if it's a control packet and quit if (bp->ip.ap.str.flags & PKT_CONTROL) { mv_buffer_atof (app_ctx, &dp->idx); return; } - // 6. Find its target + // 5. Find its target sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { @@ -227,7 +218,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct mv_buffer_atof (app_ctx, &dp->idx); } - // 4. We move the buffer and notify the target + // 6. We move the buffer and notify the target //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); rr_on_udp_write(ctx, to_fdinfo); @@ -239,7 +230,7 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct buffer_packet* bp = NULL; while(1) { - printf("Trying to deliver %d\n", rr->recv_id+1); + //printf("Trying to deliver %d\n", rr->recv_id+1); struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; fdinfo = evt_core_get_from_fd (ctx, def->link_fd); @@ -250,7 +241,7 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } rr_deliver(ctx, fdinfo, def); - printf("Delivered %d\n", rr->recv_id); + //printf("Delivered %d\n", rr->recv_id); } } @@ -343,7 +334,7 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { bp->ip.ap.str.deltat = mili_sec; bp->ip.ap.str.bitfield = rr->remote_links; bp->ip.ap.str.prevlink = rr->current_link; - printf("Will send packet id=%d\n", bp->ip.ap.str.id); + //printf("Will send packet id=%d\n", bp->ip.ap.str.id); rr->emit_time = curr; rr->sent_id++; @@ -419,7 +410,7 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct waited_pkt* pkt = fdinfo->other; pkt->on = 0; if (ring_gt (pkt->id, rr->recv_id_late)) rr->recv_id_late = pkt->id; - if (ring_le (pkt->id, rr->recv_id)) return 1; + if (ring_le (pkt->id, rr->recv_id)) goto end; printf("Timer reached for packet %d\n", pkt->id); @@ -433,6 +424,7 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { rr_pkt_unroll (ctx, app_ctx); } +end: evt_core_rm_fd(ctx, fdinfo->fd); return 1; } From e15e15325c1e8bb910ecb21ab9dec530f2894961 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 18:11:05 +0100 Subject: [PATCH 08/17] Better logging --- src/algo_rr.c | 4 +++- src/evt_core.c | 6 +++--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 1c2acb3..82396e8 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -143,9 +143,11 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, } // 1. Update my links I can use thanks to target feedback - if (bp->ip.ap.str.id > rr->my_links_ver) { + if (bp->ip.ap.str.id > rr->my_links_ver && bp->ip.ap.str.bitfield != rr->my_links) { rr->my_links = bp->ip.ap.str.bitfield; rr->my_links_ver = bp->ip.ap.str.id; + printf("Update my links\n"); + show_link_availability (rr); } // 2. If packet arrived too late, we discard it diff --git a/src/evt_core.c b/src/evt_core.c index 1e7b728..0d231d5 100644 --- a/src/evt_core.c +++ b/src/evt_core.c @@ -65,7 +65,7 @@ void evt_core_add_cat(struct evt_core_ctx* ctx, struct evt_core_cat* cat) { } void evt_core_mv_fd(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct evt_core_cat* to_cat) { - printf("Moving fd=%d from cat=%s to cat=%s\n",fdinfo->fd, fdinfo->cat->name, to_cat->name); + //printf("Moving fd=%d from cat=%s to cat=%s\n",fdinfo->fd, fdinfo->cat->name, to_cat->name); // 1. Update old category for (int i = 0; i < fdinfo->cat->socklist->len; i++) { @@ -130,7 +130,7 @@ struct evt_core_fdinfo* evt_core_add_fd(struct evt_core_ctx* ctx, struct evt_cor // 5. Add file descriptor to epoll add_fd_to_epoll(ctx->epollfd, user_data->fd, cat->flags); - printf("Added fd=%d with url=%s in cat=%s\n", fdinfo->fd, fdinfo->url, fdinfo->cat->name); + //printf("Added fd=%d with url=%s in cat=%s\n", fdinfo->fd, fdinfo->url, fdinfo->cat->name); // 6. Ensure that events arrived before epoll registering are handled fdinfo->cat->cb(ctx, fdinfo); @@ -145,7 +145,7 @@ struct evt_core_cat* evt_core_rm_fd(struct evt_core_ctx* ctx, int fd) { struct evt_core_fdinfo* fdinfo = g_hash_table_lookup (ctx->socklist, &fd); if (fdinfo == NULL) return NULL; cat = fdinfo->cat; - printf("Closing fd=%d from cat=%s\n",fdinfo->fd, fdinfo->cat->name); + //printf("Closing fd=%d from cat=%s\n",fdinfo->fd, fdinfo->cat->name); // 2. Update category for (int i = 0; i < cat->socklist->len; i++) { From ca8eddc0add4611c735e78d1ddc635a222e192b9 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 18:27:44 +0100 Subject: [PATCH 09/17] Some improvement on RR algo --- src/algo_rr.c | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 82396e8..302565e 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -13,6 +13,7 @@ struct waited_pkt { struct deferred_pkt { int link_fd; int idx; + uint16_t id; uint8_t on; }; @@ -175,6 +176,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; if (rr->wait[idx_real].on && rr->wait[idx_real].id != bp->ip.ap.str.id) { fprintf(stderr, "Waiting array is full, BUG\n"); + exit(EXIT_FAILURE); } else if (!rr->wait[idx_real].on) { rr->wait[idx_real].on = 1; rr->wait[idx_real].id = bp->ip.ap.str.id; @@ -183,10 +185,18 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, } // 5. We queue the packet - rr->real[idx_real].on = 1; - rr->real[idx_real].idx = idx_real; - rr->real[idx_real].link_fd = fdinfo->fd; - mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); + if (rr->real[idx_real].on && rr->real[idx_real].id != bp->ip.ap.str.id) { + fprintf(stderr, "Real array is full, BUG\n"); + exit(EXIT_FAILURE); + } else if (!rr->real[idx_real].on) { + rr->real[idx_real].on = 1; + rr->real[idx_real].idx = idx_real; + rr->real[idx_real].link_fd = fdinfo->fd; + mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); + } else { + fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); + mv_buffer_wtof (app_ctx, fdinfo); + } } void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { @@ -198,18 +208,20 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 1. Marked the packet as handled dp->on = 0; - // 2. Get the buffer + // 2. Get the buffer and update rr state struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); + int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; + rr->wait[idx_real].on = 0; //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor rr->recv_id = bp->ip.ap.str.id; // 4. We free the buffer if it's a control packet and quit - if (bp->ip.ap.str.flags & PKT_CONTROL) { + /*if (bp->ip.ap.str.flags & PKT_CONTROL) { mv_buffer_atof (app_ctx, &dp->idx); return; - } + }*/ // 5. Find its target sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); From 6c276ae3c7b36a9690a13330637f1515d35cc1d6 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 18:29:19 +0100 Subject: [PATCH 10/17] Fix stupid bug --- src/algo_rr.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 302565e..737fdd4 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -211,7 +211,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 2. Get the buffer and update rr state struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - rr->wait[idx_real].on = 0; + rr->real[idx_real].on = 0; //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor From 1b0954ecca618862d7c080858b3bd230bf7c8d93 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 27 Mar 2019 18:46:48 +0100 Subject: [PATCH 11/17] It works better when you write correct code --- src/algo_rr.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 737fdd4..216e430 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -186,13 +186,19 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, // 5. We queue the packet if (rr->real[idx_real].on && rr->real[idx_real].id != bp->ip.ap.str.id) { - fprintf(stderr, "Real array is full, BUG\n"); + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, idx_real); + for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { + printf("\t%d => %d\n", i, rr->real[i].on); + } + printf("]\n"); exit(EXIT_FAILURE); } else if (!rr->real[idx_real].on) { rr->real[idx_real].on = 1; + rr->real[idx_real].id = bp->ip.ap.str.id; rr->real[idx_real].idx = idx_real; rr->real[idx_real].link_fd = fdinfo->fd; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); + //printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real); } else { fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); mv_buffer_wtof (app_ctx, fdinfo); @@ -212,6 +218,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; rr->real[idx_real].on = 0; + //printf("%d is removed from real as %d\n", bp->ip.ap.str.id, idx_real); //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor From c12715e39e09bbfe20d73b1a71379760ed4fe432 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 28 Mar 2019 11:47:14 +0100 Subject: [PATCH 12/17] Add a donar parameters structure --- src/donar.c | 79 ++++++++++++++++++++++------------------------ src/donar_client.c | 11 +++---- src/donar_client.h | 3 +- src/donar_init.c | 16 ++++++++++ src/donar_init.h | 7 ++++ src/donar_server.c | 9 +++--- src/donar_server.h | 3 +- 7 files changed, 71 insertions(+), 57 deletions(-) diff --git a/src/donar.c b/src/donar.c index 9dfc185..635e985 100644 --- a/src/donar.c +++ b/src/donar.c @@ -8,79 +8,74 @@ #include "donar_client.h" #include "donar_server.h" -void free_port (void* ptr) { - free(ptr); -} - int main(int argc, char** argv) { setvbuf(stdout, NULL, _IONBF, 0); printf("~ Donar ~\n"); - int opt, is_server, is_client, errored; - char *port, *onion_file, *algo; - onion_file = NULL; - algo = NULL; - is_server = 0; - is_client = 0; - errored = 0; - GPtrArray* remote_ports = g_ptr_array_new_with_free_func (free_port); - GPtrArray* exposed_ports = g_ptr_array_new_with_free_func (free_port); + struct donar_params dp; + donar_init_params (&dp); - while ((opt = getopt(argc, argv, "csh:e:r:o:a:")) != -1) { - switch(opt) { + while ((dp.opt = getopt(argc, argv, "csh:e:r:o:a:b:h:")) != -1) { + switch(dp.opt) { case 's': - is_server = 1; + dp.is_server = 1; break; case 'e': - port = strdup(optarg); - if (port == NULL) goto terminate; - g_ptr_array_add (exposed_ports, port); + dp.port = strdup(optarg); + if (dp.port == NULL) goto terminate; + g_ptr_array_add (dp.exposed_ports, dp.port); break; case 'r': - port = strdup(optarg); - if (port == NULL) goto terminate; - g_ptr_array_add (remote_ports, port); + dp.port = strdup(optarg); + if (dp.port == NULL) goto terminate; + g_ptr_array_add (dp.remote_ports, dp.port); break; case 'o': - onion_file = strdup(optarg); + dp.onion_file = strdup(optarg); break; case 'c': - is_client = 1; + dp.is_client = 1; break; case 'a': - algo = strdup(optarg); + dp.algo = strdup(optarg); + break; + case 'h': + dp.is_healing = 1; + break; + case 'b': + dp.is_waiting_bootstrap = 1; break; default: goto in_error; } } - if (!(is_server ^ is_client)) goto in_error; - if (algo == NULL) goto in_error; + if (!(dp.is_server ^ dp.is_client)) goto in_error; + if (dp.algo == NULL) goto in_error; - if (is_server) { + if (dp.is_server) { struct donar_server_ctx ctx; - if (exposed_ports->len < 1 && remote_ports->len < 1) goto in_error; - donar_server(&ctx, algo, exposed_ports, remote_ports); - } else if (is_client) { + if (dp.exposed_ports->len < 1 && dp.remote_ports->len < 1) goto in_error; + donar_server(&ctx, &dp); + } else if (dp.is_client) { struct donar_client_ctx ctx; - if ((exposed_ports->len < 1 && remote_ports->len < 1) || onion_file == NULL) goto in_error; - donar_client(&ctx, algo, onion_file, exposed_ports, remote_ports); + if ((dp.exposed_ports->len < 1 && dp.remote_ports->len < 1) || dp.onion_file == NULL) goto in_error; + donar_client(&ctx, &dp); } goto terminate; in_error: - errored = 1; - fprintf(stderr, "Usage as client : %s -c -a -o -e -r \n", argv[0]); - fprintf(stderr, "Usage as server : %s -s -a -e -r \n\n", argv[0]); + dp.errored = 1; + fprintf(stderr, "Usage as client : %s -c -a [-h] [-b] -o -e [-e ...]* -r [-r ...]*\n", argv[0]); + fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] -e [-e ...]* -r [-r ...]*\n\n", argv[0]); fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s\n", - is_client, is_server, algo, exposed_ports->len, remote_ports->len, onion_file); + dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file); terminate: - if (onion_file != NULL) free(onion_file); - if (algo != NULL) free(algo); - g_ptr_array_free(exposed_ports, TRUE); - g_ptr_array_free(remote_ports, TRUE); + if (dp.onion_file != NULL) free(dp.onion_file); + if (dp.algo != NULL) free(dp.algo); + g_ptr_array_free(dp.exposed_ports, TRUE); + g_ptr_array_free(dp.remote_ports, TRUE); - return errored; + return dp.errored; } diff --git a/src/donar_client.c b/src/donar_client.c index 32bf02e..206976f 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -69,12 +69,11 @@ void init_socks5_sinks(struct donar_client_ctx* app_ctx) { evt_core_add_cat(&app_ctx->evts, &template); } -void donar_client(struct donar_client_ctx* ctx, char* algoname, - char* onion_file, GPtrArray* exposed_ports, GPtrArray* remote_ports) { +void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { struct algo_skel algo = {0}; evt_core_init (&(ctx->evts)); - init_algo(&ctx->evts, &algo, algoname); + init_algo(&ctx->evts, &algo, dp->algo); socks5_init (&ctx->evts); init_socks5_sinks(ctx); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); @@ -84,7 +83,7 @@ void donar_client(struct donar_client_ctx* ctx, char* algoname, evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_write)); printf("--- Categories created\n"); - load_onion_services (ctx, onion_file, CLIENT_PORT_SIZE); + load_onion_services (ctx, dp->onion_file, CLIENT_PORT_SIZE); printf("--- Onion services loaded\n"); for (int i = 0; i < CLIENT_PORT_SIZE; i++) { @@ -92,10 +91,10 @@ void donar_client(struct donar_client_ctx* ctx, char* algoname, } printf("--- TCP Clients Connected\n"); - g_ptr_array_foreach (remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); + g_ptr_array_foreach (dp->remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); printf("--- Remote ports are binded locally\n"); - g_ptr_array_foreach (exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); + g_ptr_array_foreach (dp->exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); printf("--- Local UDP services are exposed\n"); evt_core_loop(&(ctx->evts)); diff --git a/src/donar_client.h b/src/donar_client.h index 497259d..4abfb03 100644 --- a/src/donar_client.h +++ b/src/donar_client.h @@ -19,5 +19,4 @@ struct donar_client_ctx { } client_sock[CLIENT_PORT_SIZE]; }; -void donar_client(struct donar_client_ctx* ctx, char* algoname, - char* onion_file, GPtrArray* exposed_ports, GPtrArray* remote_ports); +void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp); diff --git a/src/donar_init.c b/src/donar_init.c index d6dce07..9c48f87 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -87,3 +87,19 @@ socket_failed: fprintf(stderr, "UDP socket init failed\n"); exit(EXIT_FAILURE); } + +void free_port (void* ptr) { + free(ptr); +} + +void donar_init_params(struct donar_params* dp) { + dp->onion_file = NULL; + dp->algo = NULL; + dp->is_server = 0; + dp->is_client = 0; + dp->is_healing = 0; + dp->is_waiting_bootstrap = 0; + dp->errored = 0; + dp->remote_ports = g_ptr_array_new_with_free_func (free_port); + dp->exposed_ports = g_ptr_array_new_with_free_func (free_port); +} diff --git a/src/donar_init.h b/src/donar_init.h index bb6fe6c..72595c5 100644 --- a/src/donar_init.h +++ b/src/donar_init.h @@ -6,5 +6,12 @@ #include "evt_core.h" #include "packet.h" +struct donar_params { + int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored; + char *port, *onion_file, *algo; + GPtrArray *remote_ports, *exposed_ports; +}; + void init_udp_remote(char* port, struct evt_core_ctx* evts); void init_udp_exposed(char* port, struct evt_core_ctx* evts); +void donar_init_params(struct donar_params* dp); diff --git a/src/donar_server.c b/src/donar_server.c index f557261..4335f0b 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -51,12 +51,11 @@ socket_create_err: exit(EXIT_FAILURE); } -void donar_server(struct donar_server_ctx* ctx, char* algoname, - GPtrArray* exposed_ports, GPtrArray* remote_ports) { +void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { struct algo_skel algo = {0}; evt_core_init (&(ctx->evts)); - init_algo(&ctx->evts, &algo, algoname); + init_algo(&ctx->evts, &algo, dp->algo); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read)); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read)); @@ -74,10 +73,10 @@ void donar_server(struct donar_server_ctx* ctx, char* algoname, init_tcp_servers(ctx); printf("--- TCP servers are listening\n"); - g_ptr_array_foreach (remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); + g_ptr_array_foreach (dp->remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); printf("--- Remote ports are binded locally\n"); - g_ptr_array_foreach (exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); + g_ptr_array_foreach (dp->exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); printf("--- Local UDP services are exposed\n"); evt_core_loop (&(ctx->evts)); diff --git a/src/donar_server.h b/src/donar_server.h index bc4875b..0e761da 100644 --- a/src/donar_server.h +++ b/src/donar_server.h @@ -20,5 +20,4 @@ struct donar_server_ctx { uint16_t ports[PORT_SIZE]; }; -void donar_server(struct donar_server_ctx* ctx, char* algoname, - GPtrArray* exposed_ports, GPtrArray* remote_ports); +void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp); From b9d7ec48a2ca2b98198d1bd9692536ce83f63674 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 28 Mar 2019 11:54:01 +0100 Subject: [PATCH 13/17] Update algo structure to support parameters --- src/algo_naive.c | 2 +- src/algo_rr.c | 2 +- src/algo_skel.c | 4 ++-- src/algo_skel.h | 13 +++++++++---- src/donar_client.c | 6 +++++- src/donar_server.c | 6 +++++- 6 files changed, 23 insertions(+), 10 deletions(-) diff --git a/src/algo_naive.c b/src/algo_naive.c index 744aa35..b3f6dca 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -194,7 +194,7 @@ int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { return 0; } -void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as) { +void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct algo_ctx)); diff --git a/src/algo_rr.c b/src/algo_rr.c index 216e430..4728a2a 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -476,7 +476,7 @@ int rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { return 0; } -void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { +void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct algo_ctx)); diff --git a/src/algo_skel.c b/src/algo_skel.c index b3fa20b..32cefff 100644 --- a/src/algo_skel.c +++ b/src/algo_skel.c @@ -1,9 +1,9 @@ #include "algo_skel.h" -void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name) { +void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap) { for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { if (strcmp(available_algo[i].name, name) == 0) { - available_algo[i].init(ctx, as); + available_algo[i].init(ctx, as, ap); return; } } diff --git a/src/algo_skel.h b/src/algo_skel.h index 40f140f..cd7c5fa 100644 --- a/src/algo_skel.h +++ b/src/algo_skel.h @@ -16,11 +16,16 @@ struct algo_skel { struct evt_core_cat on_tcp_co; }; -typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as); +struct algo_params { + uint8_t is_waiting_bootstrap; + uint8_t is_healing; +}; -void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name); -void algo_naive(struct evt_core_ctx* ctx, struct algo_skel* as); -void algo_rr(struct evt_core_ctx* ctx, struct algo_skel* as); +typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); + +void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap); +void algo_naive(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); +void algo_rr(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); struct algo_desc { algo_init init; diff --git a/src/donar_client.c b/src/donar_client.c index 206976f..ee4e994 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -71,9 +71,13 @@ void init_socks5_sinks(struct donar_client_ctx* app_ctx) { void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { struct algo_skel algo = {0}; + struct algo_params ap = { + .is_waiting_bootstrap = dp->is_waiting_bootstrap, + .is_healing = dp->is_healing + }; evt_core_init (&(ctx->evts)); - init_algo(&ctx->evts, &algo, dp->algo); + init_algo(&ctx->evts, &algo, dp->algo, &ap); socks5_init (&ctx->evts); init_socks5_sinks(ctx); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); diff --git a/src/donar_server.c b/src/donar_server.c index 4335f0b..310f2b5 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -53,9 +53,13 @@ socket_create_err: void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { struct algo_skel algo = {0}; + struct algo_params ap = { + .is_waiting_bootstrap = dp->is_waiting_bootstrap, + .is_healing = dp->is_healing + }; evt_core_init (&(ctx->evts)); - init_algo(&ctx->evts, &algo, dp->algo); + init_algo(&ctx->evts, &algo, dp->algo, &ap); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read)); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read)); From 5e326563b1a752674bb18c0c5bad804df32fa384 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 28 Mar 2019 15:26:33 +0100 Subject: [PATCH 14/17] Effectively support -h and -b --- src/algo_rr.c | 19 +++++++++++++++++-- src/donar_client.h | 2 +- src/donar_server.h | 2 +- 3 files changed, 19 insertions(+), 4 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 4728a2a..324b84a 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -18,6 +18,8 @@ struct deferred_pkt { }; struct rr_ctx { + uint8_t link_count; + uint8_t is_rdy; uint8_t my_links; uint16_t my_links_ver; uint8_t remote_links; @@ -26,6 +28,7 @@ struct rr_ctx { uint16_t recv_id_late; uint16_t sent_id; uint8_t current_link; + struct algo_params ap; struct timespec emit_time; struct deferred_pkt real[PACKET_BUFFER_SIZE]; struct waited_pkt wait[PACKET_BUFFER_SIZE]; @@ -367,7 +370,8 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) continue; // Missing link - if (rr->my_links & (1 << sel_link)) { + if (rr->ap.is_waiting_bootstrap && !rr->is_rdy) goto not_ready; // Some links are down + if (!rr->ap.is_healing || rr->my_links & (1 << sel_link)) { rr->current_link = sel_link; mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); rr_on_tcp_write(ctx, to_fdinfo); @@ -378,8 +382,9 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { } } +not_ready: // 4. A whole packet has been read, we will find someone to write it - fprintf(stderr, "No fd for URL %s in udp-read. Dropping packet :( \n", fdinfo->url); + fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); mv_buffer_wtof (app_ctx, fdinfo); return 0; @@ -391,8 +396,15 @@ co_error: int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; int write_res = FDS_READY; + // 0. Show some information about circuits + uint8_t is_rdy = fdinfo->cat->socklist->len >= rr->link_count ? 1 : 0; + if (!rr->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", rr->link_count); + else if (rr->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, rr->link_count); + rr->is_rdy = is_rdy; + // 1. Get current write buffer OR a buffer from the waiting queue OR leave if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; @@ -494,6 +506,9 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* rr->sent_id = 1; rr->recv_id = 0; rr->recv_id_late = 0; + rr->link_count = 8; + rr->is_rdy = 0; + rr->ap = *ap; ctx->misc = rr; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); diff --git a/src/donar_client.h b/src/donar_client.h index 4abfb03..4493aa4 100644 --- a/src/donar_client.h +++ b/src/donar_client.h @@ -7,7 +7,7 @@ #include "socks5.h" #include "donar_init.h" -#define CLIENT_PORT_SIZE 10 +#define CLIENT_PORT_SIZE 8 struct donar_client_ctx { struct tor_os_str tos; diff --git a/src/donar_server.h b/src/donar_server.h index 0e761da..94eb42d 100644 --- a/src/donar_server.h +++ b/src/donar_server.h @@ -11,7 +11,7 @@ #include "algo_skel.h" #include "donar_init.h" -#define PORT_SIZE 10 +#define PORT_SIZE 8 struct donar_server_ctx { struct tor_os_str tos; From 12c057e65124bdcecdf09f10302ac814ccc7dea1 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 28 Mar 2019 15:29:34 +0100 Subject: [PATCH 15/17] Improve links logging --- src/algo_rr.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 324b84a..fad6a42 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -141,7 +141,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, // 0. Update remote links if (ring_lt(rr->recv_id_late, bp->ip.ap.str.id) && !(rr->remote_links & 1 << link_num)) { - printf("Activate link=%d\n", link_num); + printf("Activate link=%d | ", link_num); rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working show_link_availability (rr); } @@ -150,7 +150,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, if (bp->ip.ap.str.id > rr->my_links_ver && bp->ip.ap.str.bitfield != rr->my_links) { rr->my_links = bp->ip.ap.str.bitfield; rr->my_links_ver = bp->ip.ap.str.id; - printf("Update my links\n"); + printf("Update my links | "); show_link_availability (rr); } @@ -448,7 +448,7 @@ int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { printf("Timer reached for packet %d\n", pkt->id); // !BLACKLIST LINK - printf("Blacklist link=%d\n", pkt->link_num); + printf("Blacklist link=%d | ", pkt->link_num); rr->remote_links &= 0xff ^ 1 << pkt->link_num; show_link_availability (rr); From 044989b8d586b6fb98aa067baa0be8ea76346ee5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 28 Mar 2019 15:35:48 +0100 Subject: [PATCH 16/17] Fix option parsing --- src/donar.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/donar.c b/src/donar.c index 635e985..85905f9 100644 --- a/src/donar.c +++ b/src/donar.c @@ -15,7 +15,7 @@ int main(int argc, char** argv) { struct donar_params dp; donar_init_params (&dp); - while ((dp.opt = getopt(argc, argv, "csh:e:r:o:a:b:h:")) != -1) { + while ((dp.opt = getopt(argc, argv, "cse:r:o:a:bh")) != -1) { switch(dp.opt) { case 's': dp.is_server = 1; From 347ea4f6cd93561770be4c4b06e9522d9b2a49b5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 28 Mar 2019 15:58:18 +0100 Subject: [PATCH 17/17] Inform users that circuit is up on naive --- src/algo_naive.c | 7 +++++++ src/algo_rr.c | 21 +++++++++------------ src/algo_skel.c | 1 - src/algo_skel.h | 10 +++++----- src/algo_utils.h | 5 ++++- 5 files changed, 25 insertions(+), 19 deletions(-) diff --git a/src/algo_naive.c b/src/algo_naive.c index b3f6dca..8e4e22c 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -86,6 +86,11 @@ int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; int write_res = FDS_READY; + if (!app_ctx->is_rdy && strcmp(fdinfo->url, "tcp:write:127.0.0.1:7500") == 0) { + app_ctx->is_rdy = 1; + printf("=== Requested circuit is up ===\n"); + } + // 1. Get current write buffer OR a buffer from the waiting queue OR leave if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; @@ -203,6 +208,8 @@ void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_para ctx->application_waiting = g_hash_table_new (NULL, NULL); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + ctx->ap = *ap; + ctx->is_rdy = 0; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); } diff --git a/src/algo_rr.c b/src/algo_rr.c index fad6a42..f6dd0de 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -18,8 +18,6 @@ struct deferred_pkt { }; struct rr_ctx { - uint8_t link_count; - uint8_t is_rdy; uint8_t my_links; uint16_t my_links_ver; uint8_t remote_links; @@ -28,7 +26,6 @@ struct rr_ctx { uint16_t recv_id_late; uint16_t sent_id; uint8_t current_link; - struct algo_params ap; struct timespec emit_time; struct deferred_pkt real[PACKET_BUFFER_SIZE]; struct waited_pkt wait[PACKET_BUFFER_SIZE]; @@ -370,8 +367,8 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) continue; // Missing link - if (rr->ap.is_waiting_bootstrap && !rr->is_rdy) goto not_ready; // Some links are down - if (!rr->ap.is_healing || rr->my_links & (1 << sel_link)) { + if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Some links are down + if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) { rr->current_link = sel_link; mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); rr_on_tcp_write(ctx, to_fdinfo); @@ -400,10 +397,10 @@ int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int write_res = FDS_READY; // 0. Show some information about circuits - uint8_t is_rdy = fdinfo->cat->socklist->len >= rr->link_count ? 1 : 0; - if (!rr->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", rr->link_count); - else if (rr->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, rr->link_count); - rr->is_rdy = is_rdy; + uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0; + if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count); + else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count); + app_ctx->is_rdy = is_rdy; // 1. Get current write buffer OR a buffer from the waiting queue OR leave if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; @@ -497,6 +494,9 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ctx->application_waiting = g_hash_table_new (NULL, NULL); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + ctx->link_count = 8; + ctx->is_rdy = 0; + ctx->ap = *ap; struct rr_ctx* rr = malloc(sizeof(struct rr_ctx)); if (rr == NULL) goto init_err; memset(rr, 0, sizeof(struct rr_ctx)); @@ -506,9 +506,6 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* rr->sent_id = 1; rr->recv_id = 0; rr->recv_id_late = 0; - rr->link_count = 8; - rr->is_rdy = 0; - rr->ap = *ap; ctx->misc = rr; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); diff --git a/src/algo_skel.c b/src/algo_skel.c index 32cefff..5162f42 100644 --- a/src/algo_skel.c +++ b/src/algo_skel.c @@ -10,4 +10,3 @@ void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struc fprintf(stderr, "Algorithm %s has not been found\n", name); exit(EXIT_FAILURE); } - diff --git a/src/algo_skel.h b/src/algo_skel.h index cd7c5fa..658cc10 100644 --- a/src/algo_skel.h +++ b/src/algo_skel.h @@ -8,6 +8,11 @@ #include "utils.h" #include "url.h" +struct algo_params { + uint8_t is_waiting_bootstrap; + uint8_t is_healing; +}; + struct algo_skel { struct evt_core_cat on_udp_read; struct evt_core_cat on_tcp_read; @@ -16,11 +21,6 @@ struct algo_skel { struct evt_core_cat on_tcp_co; }; -struct algo_params { - uint8_t is_waiting_bootstrap; - uint8_t is_healing; -}; - typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap); diff --git a/src/algo_utils.h b/src/algo_utils.h index 3fc55cc..755b004 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -1,13 +1,16 @@ #pragma once -#include "algo_skel.h" #include #include #include +#include "algo_skel.h" #define PACKET_BUFFER_SIZE 20 typedef void (*algo_ctx_free_misc)(void*); struct algo_ctx { + uint8_t link_count; + uint8_t is_rdy; + struct algo_params ap; int ref_count; struct buffer_packet bps[PACKET_BUFFER_SIZE]; GQueue* free_buffer; // Available buffers