diff --git a/src/algo_naive.c b/src/algo_naive.c index 744aa35..8e4e22c 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -86,6 +86,11 @@ int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; int write_res = FDS_READY; + if (!app_ctx->is_rdy && strcmp(fdinfo->url, "tcp:write:127.0.0.1:7500") == 0) { + app_ctx->is_rdy = 1; + printf("=== Requested circuit is up ===\n"); + } + // 1. Get current write buffer OR a buffer from the waiting queue OR leave if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; @@ -194,7 +199,7 @@ int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { return 0; } -void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as) { +void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct algo_ctx)); @@ -203,6 +208,8 @@ void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as) { ctx->application_waiting = g_hash_table_new (NULL, NULL); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + ctx->ap = *ap; + ctx->is_rdy = 0; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); } diff --git a/src/algo_rr.c b/src/algo_rr.c index c4136cc..f6dd0de 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -13,6 +13,7 @@ struct waited_pkt { struct deferred_pkt { int link_fd; int idx; + uint16_t id; uint8_t on; }; @@ -22,6 +23,7 @@ struct rr_ctx { uint8_t remote_links; int64_t mjit; uint16_t recv_id; + uint16_t recv_id_late; uint16_t sent_id; uint8_t current_link; struct timespec emit_time; @@ -72,6 +74,20 @@ co_error: exit(EXIT_FAILURE); } +void show_link_availability(struct rr_ctx* rr) { + printf("Links availability: my_links["); + for (int i = 0; i < 8; i++) { + if (rr->my_links & 1 << i) printf("U"); + else printf("-"); + } + printf("], rem_links["); + for (int i = 0; i < 8; i++) { + if (rr->remote_links & 1 << i) printf("U"); + else printf("-"); + } + printf("]\n"); +} + int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt* wpkt) { struct timespec now; struct itimerspec timer_config; @@ -114,19 +130,25 @@ int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; + char buffer[16]; + url_get_port (buffer, fdinfo->url); + int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); - // 1. Update links I can use thanks to target feedback - if (bp->ip.ap.str.id > rr->my_links_ver) { + // 0. Update remote links + if (ring_lt(rr->recv_id_late, bp->ip.ap.str.id) && !(rr->remote_links & 1 << link_num)) { + printf("Activate link=%d | ", link_num); + rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working + show_link_availability (rr); + } + + // 1. Update my links I can use thanks to target feedback + if (bp->ip.ap.str.id > rr->my_links_ver && bp->ip.ap.str.bitfield != rr->my_links) { rr->my_links = bp->ip.ap.str.bitfield; rr->my_links_ver = bp->ip.ap.str.id; - printf("Links availability: ["); - for (int i = 0; i < 8; i++) { - if (rr->my_links & 1 << i) printf("U"); - else printf("-"); - } - printf("]\n"); + printf("Update my links | "); + show_link_availability (rr); } // 2. If packet arrived too late, we discard it @@ -150,18 +172,37 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr->wait[idx_waited].timer_fd = set_timeout(ctx, timeout, &rr->wait[idx_waited]); } - // 4. We queue the packet + // 4. If packet has not already a timer or has a wrong timer int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - rr->real[idx_real].on = 1; - rr->real[idx_real].idx = idx_real; - rr->real[idx_real].link_fd = fdinfo->fd; - mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); + if (rr->wait[idx_real].on && rr->wait[idx_real].id != bp->ip.ap.str.id) { + fprintf(stderr, "Waiting array is full, BUG\n"); + exit(EXIT_FAILURE); + } else if (!rr->wait[idx_real].on) { + rr->wait[idx_real].on = 1; + rr->wait[idx_real].id = bp->ip.ap.str.id; + rr->wait[idx_real].link_num = link_num; + rr->wait[idx_real].timer_fd = set_timeout(ctx, rr->mjit + 1, &rr->wait[idx_real]); + } - // 5. We make sure that the remote link is set to up - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working + // 5. We queue the packet + if (rr->real[idx_real].on && rr->real[idx_real].id != bp->ip.ap.str.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, idx_real); + for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { + printf("\t%d => %d\n", i, rr->real[i].on); + } + printf("]\n"); + exit(EXIT_FAILURE); + } else if (!rr->real[idx_real].on) { + rr->real[idx_real].on = 1; + rr->real[idx_real].id = bp->ip.ap.str.id; + rr->real[idx_real].idx = idx_real; + rr->real[idx_real].link_fd = fdinfo->fd; + mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); + //printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real); + } else { + fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); + mv_buffer_wtof (app_ctx, fdinfo); + } } void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { @@ -173,28 +214,23 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 1. Marked the packet as handled dp->on = 0; - // 2. Get the buffer + // 2. Get the buffer and update rr state struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); + int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; + rr->real[idx_real].on = 0; + //printf("%d is removed from real as %d\n", bp->ip.ap.str.id, idx_real); //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor rr->recv_id = bp->ip.ap.str.id; - // 4. We check that we don't have a running timeout - int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - if (rr->wait[idx_real].on) { - rr->wait[idx_real].on = 0; - evt_core_rm_fd (ctx, rr->wait[idx_real].timer_fd); - printf("Removed timer for packet %d\n",bp->ip.ap.str.id); - } - - // 5. We free the buffer if it's a control packet and quit - if (bp->ip.ap.str.flags & PKT_CONTROL) { + // 4. We free the buffer if it's a control packet and quit + /*if (bp->ip.ap.str.flags & PKT_CONTROL) { mv_buffer_atof (app_ctx, &dp->idx); return; - } + }*/ - // 6. Find its target + // 5. Find its target sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { @@ -203,7 +239,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct mv_buffer_atof (app_ctx, &dp->idx); } - // 4. We move the buffer and notify the target + // 6. We move the buffer and notify the target //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); rr_on_udp_write(ctx, to_fdinfo); @@ -215,6 +251,7 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct buffer_packet* bp = NULL; while(1) { + //printf("Trying to deliver %d\n", rr->recv_id+1); struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; fdinfo = evt_core_get_from_fd (ctx, def->link_fd); @@ -225,6 +262,7 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } rr_deliver(ctx, fdinfo, def); + //printf("Delivered %d\n", rr->recv_id); } } @@ -317,35 +355,34 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { bp->ip.ap.str.deltat = mili_sec; bp->ip.ap.str.bitfield = rr->remote_links; bp->ip.ap.str.prevlink = rr->current_link; + //printf("Will send packet id=%d\n", bp->ip.ap.str.id); - int max = 10; - while(1) { - if (max-- < 0) break; - rr->current_link = (rr->current_link + 1) % 10; - if (!(rr->my_links & (1 << rr->current_link))) continue; - sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + rr->current_link); //@FIXME Hardcoded - //printf("-- Trying %s\n", url); - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo != NULL) { - //printf("Selected url %s for pkt %d to be sent on Tor\n", url, bp->ip.ap.str.id); - break; - } - } rr->emit_time = curr; rr->sent_id++; - // 4. A whole packet has been read, we will find someone to write it - if (to_fdinfo == NULL) { - fprintf(stderr, "No fd for URL %s in udp-read. Dropping packet :( \n", fdinfo->url); - mv_buffer_wtof (app_ctx, fdinfo); - return 1; + int max = 10; + uint8_t sel_link = rr->current_link; + while(max-- >= 0) { + sel_link = (sel_link + 1) % 8; + sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded + to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) continue; // Missing link + if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Some links are down + if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) { + rr->current_link = sel_link; + mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + rr_on_tcp_write(ctx, to_fdinfo); + return 0; + } else { + dup_buffer_tow(app_ctx, bp, to_fdinfo); + rr_on_tcp_write(ctx, to_fdinfo); + } } - //printf("Pass packet from %s to %s\n", fdinfo->url, url); - - // 5. We move the buffer and notify the target - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); - rr_on_tcp_write(ctx, to_fdinfo); +not_ready: + // 4. A whole packet has been read, we will find someone to write it + fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); + mv_buffer_wtof (app_ctx, fdinfo); return 0; co_error: @@ -356,8 +393,15 @@ co_error: int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; int write_res = FDS_READY; + // 0. Show some information about circuits + uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0; + if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count); + else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count); + app_ctx->is_rdy = is_rdy; + // 1. Get current write buffer OR a buffer from the waiting queue OR leave if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; @@ -380,24 +424,38 @@ co_error: } int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + uint64_t ctr; + ssize_t tmr_rd; + tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr)); + if (tmr_rd == -1 && errno == EAGAIN) return 1; + if (tmr_rd < 0) { + perror("read on timer"); + fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd); + exit(EXIT_FAILURE); + } + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; struct waited_pkt* pkt = fdinfo->other; - evt_core_rm_fd(ctx, fdinfo->fd); pkt->on = 0; - if (ring_lt(pkt->id, rr->recv_id)) return 1; + if (ring_gt (pkt->id, rr->recv_id_late)) rr->recv_id_late = pkt->id; + if (ring_le (pkt->id, rr->recv_id)) goto end; printf("Timer reached for packet %d\n", pkt->id); // !BLACKLIST LINK + printf("Blacklist link=%d | ", pkt->link_num); rr->remote_links &= 0xff ^ 1 << pkt->link_num; + show_link_availability (rr); while (ring_lt(rr->recv_id, pkt->id)) { rr->recv_id++; rr_pkt_unroll (ctx, app_ctx); } +end: + evt_core_rm_fd(ctx, fdinfo->fd); return 1; } @@ -427,7 +485,7 @@ int rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { return 0; } -void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { +void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct algo_ctx)); @@ -436,6 +494,9 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { ctx->application_waiting = g_hash_table_new (NULL, NULL); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + ctx->link_count = 8; + ctx->is_rdy = 0; + ctx->ap = *ap; struct rr_ctx* rr = malloc(sizeof(struct rr_ctx)); if (rr == NULL) goto init_err; memset(rr, 0, sizeof(struct rr_ctx)); @@ -443,6 +504,8 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { rr->my_links = 0xff; rr->remote_links = 0xff; rr->sent_id = 1; + rr->recv_id = 0; + rr->recv_id_late = 0; ctx->misc = rr; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); diff --git a/src/algo_skel.c b/src/algo_skel.c index b3fa20b..5162f42 100644 --- a/src/algo_skel.c +++ b/src/algo_skel.c @@ -1,13 +1,12 @@ #include "algo_skel.h" -void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name) { +void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap) { for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { if (strcmp(available_algo[i].name, name) == 0) { - available_algo[i].init(ctx, as); + available_algo[i].init(ctx, as, ap); return; } } fprintf(stderr, "Algorithm %s has not been found\n", name); exit(EXIT_FAILURE); } - diff --git a/src/algo_skel.h b/src/algo_skel.h index 40f140f..658cc10 100644 --- a/src/algo_skel.h +++ b/src/algo_skel.h @@ -8,6 +8,11 @@ #include "utils.h" #include "url.h" +struct algo_params { + uint8_t is_waiting_bootstrap; + uint8_t is_healing; +}; + struct algo_skel { struct evt_core_cat on_udp_read; struct evt_core_cat on_tcp_read; @@ -16,11 +21,11 @@ struct algo_skel { struct evt_core_cat on_tcp_co; }; -typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as); +typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); -void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name); -void algo_naive(struct evt_core_ctx* ctx, struct algo_skel* as); -void algo_rr(struct evt_core_ctx* ctx, struct algo_skel* as); +void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap); +void algo_naive(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); +void algo_rr(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); struct algo_desc { algo_init init; diff --git a/src/algo_utils.c b/src/algo_utils.c index f8b7a04..2db616a 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -21,6 +21,19 @@ void iterate2(int* fd, struct buffer_packet *bp, gpointer user_data) { fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd); } +void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { + fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); + int waiting_count = 0; + g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); + g_hash_table_foreach(app_ctx->used_buffer, (GHFunc)iterate2, NULL); + fprintf(stderr, "total_buffers=%d, free_buffer=%d, used_buffers=%d, app_buffer=%d, write_buffer=%d.\n", + PACKET_BUFFER_SIZE, + app_ctx->free_buffer->length, + g_hash_table_size(app_ctx->used_buffer), + g_hash_table_size(app_ctx->application_waiting), + waiting_count); +} + /** * Returns a buffer if available, NULL otherwise */ @@ -34,16 +47,8 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_ // 2. Get a new buffer otherwise bp = g_queue_pop_head(app_ctx->free_buffer); if (bp == NULL) { - fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); - int waiting_count = 0; - g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); - g_hash_table_foreach(app_ctx->used_buffer, (GHFunc)iterate2, NULL); - fprintf(stderr, "total_buffers=%d, free_buffer=%d, used_buffers=%d, app_buffer=%d, write_buffer=%d.\n", - PACKET_BUFFER_SIZE, - app_ctx->free_buffer->length, - g_hash_table_size(app_ctx->used_buffer), - g_hash_table_size(app_ctx->application_waiting), - waiting_count); + debug_buffer(app_ctx, fdinfo); + // 2.1 If no buffer is available, we subscribe to be notified later g_queue_push_tail (app_ctx->read_waiting, &(fdinfo->fd)); return NULL; @@ -121,7 +126,8 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void } g_hash_table_remove(app_ctx->used_buffer, &from->fd); if (g_hash_table_contains(app_ctx->application_waiting, to)) { - fprintf(stderr, "Data already exist for this entry\n"); + fprintf(stderr, "Data already exists for this entry\n"); + debug_buffer(app_ctx, from); exit(EXIT_FAILURE); } g_hash_table_insert(app_ctx->application_waiting, to, bp); @@ -165,6 +171,30 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { g_queue_push_tail (app_ctx->free_buffer, bp); } +void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { + GQueue* q; + + // 1. We get a free buffer + struct buffer_packet* bp_dest = g_queue_pop_head(app_ctx->free_buffer); + if (bp_dest == NULL) { + debug_buffer(app_ctx, to); + return; + } + + // 2. We duplicate the data + memcpy(bp_dest, bp, sizeof(struct buffer_packet)); + + // 3. We get the target writing queue + q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); + if (q == NULL) { + q = g_queue_new (); + g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); + } + + // 4. We push the content to the appropriate destination + g_queue_push_tail(q, bp_dest); +} + struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { return g_hash_table_lookup (app_ctx->application_waiting, idx); } diff --git a/src/algo_utils.h b/src/algo_utils.h index 815e2b8..755b004 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -1,13 +1,16 @@ #pragma once -#include "algo_skel.h" #include #include #include +#include "algo_skel.h" #define PACKET_BUFFER_SIZE 20 typedef void (*algo_ctx_free_misc)(void*); struct algo_ctx { + uint8_t link_count; + uint8_t is_rdy; + struct algo_params ap; int ref_count; struct buffer_packet bps[PACKET_BUFFER_SIZE]; GQueue* free_buffer; // Available buffers @@ -24,6 +27,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); +void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/donar.c b/src/donar.c index 9dfc185..85905f9 100644 --- a/src/donar.c +++ b/src/donar.c @@ -8,79 +8,74 @@ #include "donar_client.h" #include "donar_server.h" -void free_port (void* ptr) { - free(ptr); -} - int main(int argc, char** argv) { setvbuf(stdout, NULL, _IONBF, 0); printf("~ Donar ~\n"); - int opt, is_server, is_client, errored; - char *port, *onion_file, *algo; - onion_file = NULL; - algo = NULL; - is_server = 0; - is_client = 0; - errored = 0; - GPtrArray* remote_ports = g_ptr_array_new_with_free_func (free_port); - GPtrArray* exposed_ports = g_ptr_array_new_with_free_func (free_port); + struct donar_params dp; + donar_init_params (&dp); - while ((opt = getopt(argc, argv, "csh:e:r:o:a:")) != -1) { - switch(opt) { + while ((dp.opt = getopt(argc, argv, "cse:r:o:a:bh")) != -1) { + switch(dp.opt) { case 's': - is_server = 1; + dp.is_server = 1; break; case 'e': - port = strdup(optarg); - if (port == NULL) goto terminate; - g_ptr_array_add (exposed_ports, port); + dp.port = strdup(optarg); + if (dp.port == NULL) goto terminate; + g_ptr_array_add (dp.exposed_ports, dp.port); break; case 'r': - port = strdup(optarg); - if (port == NULL) goto terminate; - g_ptr_array_add (remote_ports, port); + dp.port = strdup(optarg); + if (dp.port == NULL) goto terminate; + g_ptr_array_add (dp.remote_ports, dp.port); break; case 'o': - onion_file = strdup(optarg); + dp.onion_file = strdup(optarg); break; case 'c': - is_client = 1; + dp.is_client = 1; break; case 'a': - algo = strdup(optarg); + dp.algo = strdup(optarg); + break; + case 'h': + dp.is_healing = 1; + break; + case 'b': + dp.is_waiting_bootstrap = 1; break; default: goto in_error; } } - if (!(is_server ^ is_client)) goto in_error; - if (algo == NULL) goto in_error; + if (!(dp.is_server ^ dp.is_client)) goto in_error; + if (dp.algo == NULL) goto in_error; - if (is_server) { + if (dp.is_server) { struct donar_server_ctx ctx; - if (exposed_ports->len < 1 && remote_ports->len < 1) goto in_error; - donar_server(&ctx, algo, exposed_ports, remote_ports); - } else if (is_client) { + if (dp.exposed_ports->len < 1 && dp.remote_ports->len < 1) goto in_error; + donar_server(&ctx, &dp); + } else if (dp.is_client) { struct donar_client_ctx ctx; - if ((exposed_ports->len < 1 && remote_ports->len < 1) || onion_file == NULL) goto in_error; - donar_client(&ctx, algo, onion_file, exposed_ports, remote_ports); + if ((dp.exposed_ports->len < 1 && dp.remote_ports->len < 1) || dp.onion_file == NULL) goto in_error; + donar_client(&ctx, &dp); } goto terminate; in_error: - errored = 1; - fprintf(stderr, "Usage as client : %s -c -a -o -e -r \n", argv[0]); - fprintf(stderr, "Usage as server : %s -s -a -e -r \n\n", argv[0]); + dp.errored = 1; + fprintf(stderr, "Usage as client : %s -c -a [-h] [-b] -o -e [-e ...]* -r [-r ...]*\n", argv[0]); + fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] -e [-e ...]* -r [-r ...]*\n\n", argv[0]); fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s\n", - is_client, is_server, algo, exposed_ports->len, remote_ports->len, onion_file); + dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file); terminate: - if (onion_file != NULL) free(onion_file); - if (algo != NULL) free(algo); - g_ptr_array_free(exposed_ports, TRUE); - g_ptr_array_free(remote_ports, TRUE); + if (dp.onion_file != NULL) free(dp.onion_file); + if (dp.algo != NULL) free(dp.algo); + g_ptr_array_free(dp.exposed_ports, TRUE); + g_ptr_array_free(dp.remote_ports, TRUE); - return errored; + return dp.errored; } diff --git a/src/donar_client.c b/src/donar_client.c index 32bf02e..ee4e994 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -69,12 +69,15 @@ void init_socks5_sinks(struct donar_client_ctx* app_ctx) { evt_core_add_cat(&app_ctx->evts, &template); } -void donar_client(struct donar_client_ctx* ctx, char* algoname, - char* onion_file, GPtrArray* exposed_ports, GPtrArray* remote_ports) { +void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { struct algo_skel algo = {0}; + struct algo_params ap = { + .is_waiting_bootstrap = dp->is_waiting_bootstrap, + .is_healing = dp->is_healing + }; evt_core_init (&(ctx->evts)); - init_algo(&ctx->evts, &algo, algoname); + init_algo(&ctx->evts, &algo, dp->algo, &ap); socks5_init (&ctx->evts); init_socks5_sinks(ctx); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); @@ -84,7 +87,7 @@ void donar_client(struct donar_client_ctx* ctx, char* algoname, evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_write)); printf("--- Categories created\n"); - load_onion_services (ctx, onion_file, CLIENT_PORT_SIZE); + load_onion_services (ctx, dp->onion_file, CLIENT_PORT_SIZE); printf("--- Onion services loaded\n"); for (int i = 0; i < CLIENT_PORT_SIZE; i++) { @@ -92,10 +95,10 @@ void donar_client(struct donar_client_ctx* ctx, char* algoname, } printf("--- TCP Clients Connected\n"); - g_ptr_array_foreach (remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); + g_ptr_array_foreach (dp->remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); printf("--- Remote ports are binded locally\n"); - g_ptr_array_foreach (exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); + g_ptr_array_foreach (dp->exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); printf("--- Local UDP services are exposed\n"); evt_core_loop(&(ctx->evts)); diff --git a/src/donar_client.h b/src/donar_client.h index 497259d..4493aa4 100644 --- a/src/donar_client.h +++ b/src/donar_client.h @@ -7,7 +7,7 @@ #include "socks5.h" #include "donar_init.h" -#define CLIENT_PORT_SIZE 10 +#define CLIENT_PORT_SIZE 8 struct donar_client_ctx { struct tor_os_str tos; @@ -19,5 +19,4 @@ struct donar_client_ctx { } client_sock[CLIENT_PORT_SIZE]; }; -void donar_client(struct donar_client_ctx* ctx, char* algoname, - char* onion_file, GPtrArray* exposed_ports, GPtrArray* remote_ports); +void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp); diff --git a/src/donar_init.c b/src/donar_init.c index d6dce07..9c48f87 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -87,3 +87,19 @@ socket_failed: fprintf(stderr, "UDP socket init failed\n"); exit(EXIT_FAILURE); } + +void free_port (void* ptr) { + free(ptr); +} + +void donar_init_params(struct donar_params* dp) { + dp->onion_file = NULL; + dp->algo = NULL; + dp->is_server = 0; + dp->is_client = 0; + dp->is_healing = 0; + dp->is_waiting_bootstrap = 0; + dp->errored = 0; + dp->remote_ports = g_ptr_array_new_with_free_func (free_port); + dp->exposed_ports = g_ptr_array_new_with_free_func (free_port); +} diff --git a/src/donar_init.h b/src/donar_init.h index bb6fe6c..72595c5 100644 --- a/src/donar_init.h +++ b/src/donar_init.h @@ -6,5 +6,12 @@ #include "evt_core.h" #include "packet.h" +struct donar_params { + int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored; + char *port, *onion_file, *algo; + GPtrArray *remote_ports, *exposed_ports; +}; + void init_udp_remote(char* port, struct evt_core_ctx* evts); void init_udp_exposed(char* port, struct evt_core_ctx* evts); +void donar_init_params(struct donar_params* dp); diff --git a/src/donar_server.c b/src/donar_server.c index f557261..310f2b5 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -51,12 +51,15 @@ socket_create_err: exit(EXIT_FAILURE); } -void donar_server(struct donar_server_ctx* ctx, char* algoname, - GPtrArray* exposed_ports, GPtrArray* remote_ports) { +void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { struct algo_skel algo = {0}; + struct algo_params ap = { + .is_waiting_bootstrap = dp->is_waiting_bootstrap, + .is_healing = dp->is_healing + }; evt_core_init (&(ctx->evts)); - init_algo(&ctx->evts, &algo, algoname); + init_algo(&ctx->evts, &algo, dp->algo, &ap); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read)); evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read)); @@ -74,10 +77,10 @@ void donar_server(struct donar_server_ctx* ctx, char* algoname, init_tcp_servers(ctx); printf("--- TCP servers are listening\n"); - g_ptr_array_foreach (remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); + g_ptr_array_foreach (dp->remote_ports, (void(*)(void*, void*))init_udp_remote, &(ctx->evts)); printf("--- Remote ports are binded locally\n"); - g_ptr_array_foreach (exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); + g_ptr_array_foreach (dp->exposed_ports, (void(*)(void*, void*))init_udp_exposed, &(ctx->evts)); printf("--- Local UDP services are exposed\n"); evt_core_loop (&(ctx->evts)); diff --git a/src/donar_server.h b/src/donar_server.h index bc4875b..94eb42d 100644 --- a/src/donar_server.h +++ b/src/donar_server.h @@ -11,7 +11,7 @@ #include "algo_skel.h" #include "donar_init.h" -#define PORT_SIZE 10 +#define PORT_SIZE 8 struct donar_server_ctx { struct tor_os_str tos; @@ -20,5 +20,4 @@ struct donar_server_ctx { uint16_t ports[PORT_SIZE]; }; -void donar_server(struct donar_server_ctx* ctx, char* algoname, - GPtrArray* exposed_ports, GPtrArray* remote_ports); +void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp); diff --git a/src/evt_core.c b/src/evt_core.c index 1e7b728..0d231d5 100644 --- a/src/evt_core.c +++ b/src/evt_core.c @@ -65,7 +65,7 @@ void evt_core_add_cat(struct evt_core_ctx* ctx, struct evt_core_cat* cat) { } void evt_core_mv_fd(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct evt_core_cat* to_cat) { - printf("Moving fd=%d from cat=%s to cat=%s\n",fdinfo->fd, fdinfo->cat->name, to_cat->name); + //printf("Moving fd=%d from cat=%s to cat=%s\n",fdinfo->fd, fdinfo->cat->name, to_cat->name); // 1. Update old category for (int i = 0; i < fdinfo->cat->socklist->len; i++) { @@ -130,7 +130,7 @@ struct evt_core_fdinfo* evt_core_add_fd(struct evt_core_ctx* ctx, struct evt_cor // 5. Add file descriptor to epoll add_fd_to_epoll(ctx->epollfd, user_data->fd, cat->flags); - printf("Added fd=%d with url=%s in cat=%s\n", fdinfo->fd, fdinfo->url, fdinfo->cat->name); + //printf("Added fd=%d with url=%s in cat=%s\n", fdinfo->fd, fdinfo->url, fdinfo->cat->name); // 6. Ensure that events arrived before epoll registering are handled fdinfo->cat->cb(ctx, fdinfo); @@ -145,7 +145,7 @@ struct evt_core_cat* evt_core_rm_fd(struct evt_core_ctx* ctx, int fd) { struct evt_core_fdinfo* fdinfo = g_hash_table_lookup (ctx->socklist, &fd); if (fdinfo == NULL) return NULL; cat = fdinfo->cat; - printf("Closing fd=%d from cat=%s\n",fdinfo->fd, fdinfo->cat->name); + //printf("Closing fd=%d from cat=%s\n",fdinfo->fd, fdinfo->cat->name); // 2. Update category for (int i = 0; i < cat->socklist->len; i++) { diff --git a/src/meas_lat.c b/src/meas_lat.c index ebf39be..1eb76ed 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -96,7 +96,7 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct measure_conf* mc = fdinfo->other; s = read(fdinfo->fd, &ticks, sizeof(uint64_t)); - if (s == -1 && errno == EAGAIN) return 0; + if (s == -1 && errno == EAGAIN) return 1; if (s != sizeof(uint64_t)) { perror("Read error"); exit(EXIT_FAILURE); diff --git a/src/net_tools.c b/src/net_tools.c index cf1c419..0b01658 100644 --- a/src/net_tools.c +++ b/src/net_tools.c @@ -44,7 +44,15 @@ int create_ip_client(char* host, char* service, int type) { } int create_tcp_client(char* host, char* service) { - return create_ip_client (host, service, SOCK_STREAM); + int sock = create_ip_client (host, service, SOCK_STREAM); + int activate = 1; + int err; + err = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &activate, sizeof(activate)); + if (err < 0) { + perror("setsockopt TCP_NODELAY"); + exit(EXIT_FAILURE); + } + return sock; } int create_udp_client(char* host, char* service) { @@ -93,7 +101,16 @@ int create_ip_server(char* host, char* service, int type) { } int create_tcp_server(char* host, char* service) { - return create_ip_server (host, service, SOCK_STREAM); + int sock = create_ip_server (host, service, SOCK_STREAM); + int activate = 1; + int err; + err = setsockopt(sock, IPPROTO_TCP, TCP_NODELAY, &activate, sizeof(activate)); + if (err < 0) { + perror("setsockopt TCP_NODELAY"); + exit(EXIT_FAILURE); + } + + return sock; } int create_udp_server(char* host, char* service) { diff --git a/src/net_tools.h b/src/net_tools.h index a847b9a..0975ebd 100644 --- a/src/net_tools.h +++ b/src/net_tools.h @@ -7,6 +7,8 @@ #include #include #include +#include +#include int create_tcp_client(char* host, char* service); int create_udp_client(char* host, char* service); diff --git a/src/socks5.c b/src/socks5.c index a129cd4..8f1f5b0 100644 --- a/src/socks5.c +++ b/src/socks5.c @@ -2,6 +2,7 @@ void socks5_free_ctx(void* elem) { struct socks5_ctx* ctx = elem; + free(ctx->addr); free(ctx); } @@ -38,7 +39,7 @@ void socks5_create_dns_client(struct evt_core_ctx* ctx, char* proxy_host, char* memset(fdinfo.other, 0, sizeof(struct socks5_ctx)); fdinfo.free_other = socks5_free_ctx; sprintf(url, "socks5:send-hs:%s:%d", addr, port); - fdinfo.url = strdup(url); + fdinfo.url = url; // 3. Fill socks5_ctx structures s5ctx = fdinfo.other;