From b592754973b55d71b7c6050b82d65d8e32f161ba Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 27 May 2019 17:32:00 +0200 Subject: [PATCH] WIP refactor to capture traffic --- CMakeLists.txt | 2 ++ src/algo_dup2.c | 10 +++--- src/algo_naive.c | 8 ++--- src/algo_rr.c | 22 ++++++------- src/algo_utils.c | 73 +++++++++++++++++++++++-------------------- src/algo_utils.h | 61 +++++++++--------------------------- src/capture_traffic.c | 8 +++++ src/capture_traffic.h | 14 +++++++++ src/donar.c | 11 +++++-- src/donar_client.c | 3 +- src/donar_init.c | 1 + src/donar_init.h | 2 +- src/donar_server.c | 3 +- src/proxy.c | 57 ++++++++++++++++++--------------- src/proxy.h | 36 ++++++++++++++++++++- 15 files changed, 178 insertions(+), 133 deletions(-) create mode 100644 src/capture_traffic.c create mode 100644 src/capture_traffic.h diff --git a/CMakeLists.txt b/CMakeLists.txt index dbba341..caeef5f 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,6 +33,8 @@ list(APPEND CSOURCES src/proxy.c src/timer.h src/timer.c + src/capture_traffic.h + src/capture_traffic.c ) add_executable(donar ${CSOURCES} src/donar.c) diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 530f20e..27fd1d9 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -23,7 +23,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { - mv_buffer_rtof(app_ctx, fdinfo); + mv_buffer_rtof(&app_ctx->br, fdinfo); return 0; } @@ -34,12 +34,12 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 1; } // 2. Move buffer - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); return 0; } @@ -63,12 +63,12 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin } // 2. We move the buffer and notify the target - dup_buffer_tow (app_ctx, bp, to_fdinfo); + dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); } // 3. Release the buffer - mv_buffer_rtof (app_ctx, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); return 0; } diff --git a/src/algo_naive.c b/src/algo_naive.c index 90afdaa..6a7bac1 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -15,12 +15,12 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 1; } // 2. Move buffer - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); return 0; @@ -35,13 +35,13 @@ int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL; if (to_fdinfo == NULL) { fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 1; } //printf("Pass packet from %s to %s\n", fdinfo->url, url); // 2. We move the buffer and notify the target - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); return 0; diff --git a/src/algo_rr.c b/src/algo_rr.c index f3e004d..712aa52 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -82,10 +82,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; - mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); + mv_buffer_rtoa(&app_ctx->br, fdinfo, &rr->real[real_idx].idx); } else { if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); - mv_buffer_rtof (app_ctx, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); } } @@ -98,7 +98,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue dp->on = 0; // 2. Get the buffer - struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); + struct buffer_packet* bp = get_app_buffer (&app_ctx->br, &dp->idx); assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); // 3. We update our cursor @@ -110,12 +110,12 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx); //mv_buffer_wtor (app_ctx, fdinfo, bp); - mv_buffer_atof (app_ctx, &dp->idx); + mv_buffer_atof (&app_ctx->br, &dp->idx); } // 5. We move the buffer and notify the target //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); - mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); + mv_buffer_atow (&app_ctx->br, &dp->idx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); } @@ -164,7 +164,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); release: - mv_buffer_rtof(app_ctx, fdinfo); + mv_buffer_rtof(&app_ctx->br, fdinfo); } uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { @@ -202,7 +202,7 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr_pkt_manage_links(ctx, fdinfo, bp); } else { fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd); - mv_buffer_rtof(app_ctx, fdinfo); + mv_buffer_rtof(&app_ctx->br, fdinfo); } return 0; @@ -288,7 +288,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo continue; } - if ((len = write_queue_len (app_ctx, to_fdinfo)) > 0) { + if ((len = write_queue_len (&app_ctx->br, to_fdinfo)) > 0) { if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); blacklist_link (rr, sel_link); continue; @@ -296,11 +296,11 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { rr->current_link = sel_link; - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); return 0; } else { - struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); + struct buffer_packet* dup_bp = dup_buffer_tow(&app_ctx->br, bp, to_fdinfo); /* * for later dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; @@ -313,7 +313,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo not_ready: // 3. We find no up target fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 0; } diff --git a/src/algo_utils.c b/src/algo_utils.c index 02ce087..af67e58 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -1,19 +1,5 @@ #include "algo_utils.h" -void free_nothing(void* app_ctx) {} -void free_naive(void* app_ctx) { - struct algo_ctx* ctx = (struct algo_ctx*) app_ctx; - ctx->ref_count--; - if (ctx->ref_count > 0) return; - if (ctx->free_misc) ctx->free_misc(ctx->misc); - g_queue_free(ctx->free_buffer); - g_queue_free(ctx->read_waiting); - g_hash_table_destroy (ctx->application_waiting); - g_hash_table_destroy (ctx->used_buffer); - g_hash_table_destroy (ctx->write_waiting); - free(ctx); -} - void iterate(int* fd, GQueue* q, int* waiting_count) { fprintf(stderr, "Queue for fd=%d has length=%d\n", *fd, q->length); *waiting_count += q->length; @@ -23,7 +9,12 @@ void iterate2(int* fd, struct buffer_packet *bp, gpointer user_data) { fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd); } -void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +void naive_free_simple(void* v) { + GQueue* g = v; + g_queue_free (g); +} + +void debug_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); int waiting_count = 0; g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); @@ -36,10 +27,30 @@ void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { waiting_count); } +void init_buffer_management(struct buffer_resources* ctx) { + ctx->free_buffer = g_queue_new (); + ctx->read_waiting = g_queue_new (); + ctx->application_waiting = g_hash_table_new (NULL, NULL); + ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); + ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { + memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet)); + g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); + } +} + +void destroy_buffer_management(struct buffer_resources* ctx) { + g_queue_free(ctx->free_buffer); + g_queue_free(ctx->read_waiting); + g_hash_table_destroy (ctx->application_waiting); + g_hash_table_destroy (ctx->used_buffer); + g_hash_table_destroy (ctx->write_waiting); +} + /** * Returns a buffer if available, NULL otherwise */ -struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* bp; // 1. Check if we don't have a buffer @@ -62,12 +73,12 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_ return bp; } -void __push_to_free(struct algo_ctx *app_ctx, struct buffer_packet* bp) { +void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) { memset(bp, 0, sizeof(struct buffer_packet)); g_queue_push_tail (app_ctx->free_buffer, bp); } -guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { GQueue* q; if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue @@ -77,7 +88,7 @@ guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) /** * Returns a buffer if available, NULL otherwise */ -struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* bp; GQueue* q; @@ -98,7 +109,7 @@ struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core return bp; } -void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) { +void mv_buffer_rtow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) { GQueue* q; struct buffer_packet* bp; @@ -121,7 +132,7 @@ void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, stru g_queue_push_tail(q, bp); } -void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from) { +void mv_buffer_rtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from) { struct buffer_packet* bp; // 1. We get the packet buffer @@ -134,7 +145,7 @@ void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from) { __push_to_free (app_ctx, bp); } -void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo) { +void mv_buffer_wtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); if (bp == NULL) { fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in wtof\n", fdinfo->fd, fdinfo->url); @@ -144,7 +155,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo) { __push_to_free (app_ctx, bp); } -void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to) { +void mv_buffer_rtoa(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, void* to) { struct buffer_packet* bp; bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd); if (bp == NULL) { @@ -159,7 +170,7 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void g_hash_table_insert(app_ctx->application_waiting, to, bp); } -void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to) { +void mv_buffer_atow(struct buffer_resources *app_ctx, void* from, struct evt_core_fdinfo* to) { GQueue* q; struct buffer_packet* bp; @@ -182,7 +193,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo g_queue_push_tail(q, bp); } -void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { +void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) { struct buffer_packet* bp; // 1. Remove the buffer @@ -197,7 +208,7 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { __push_to_free (app_ctx, bp); } -struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { +struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { GQueue* q; // 1. We get a free buffer @@ -222,11 +233,11 @@ struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_pac return bp_dest; } -struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { +struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx) { return g_hash_table_lookup (app_ctx->application_waiting, idx); } -void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { +void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx) { struct evt_core_fdinfo* next_fdinfo = NULL; while (next_fdinfo == NULL) { int* fd = g_queue_pop_head(app_ctx->read_waiting); @@ -252,9 +263,3 @@ int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* s memcpy(target, src, src->fmt.headers.size); return 0; } - -void naive_free_simple(void* v) { - GQueue* g = v; - g_queue_free (g); -} - diff --git a/src/algo_utils.h b/src/algo_utils.h index 1952554..f2845d1 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -6,62 +6,31 @@ #include "evt_core.h" #define PACKET_BUFFER_SIZE 1024 -struct algo_params { - uint8_t is_waiting_bootstrap; - uint8_t is_healing; - char* algo_name; - int links, fresh_data, redundant_data; -}; - - -struct algo_ctx; -typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); -typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); - -typedef void (*algo_ctx_free_misc)(void*); - -struct algo_desc { - char* name; - algo_init init; - algo_ctx_on_buffer on_stream; - algo_ctx_on_buffer on_datagram; - algo_ctx_on_event on_err; -}; - -struct algo_ctx { - struct algo_desc* desc; - uint8_t link_count; - uint8_t is_rdy; - struct algo_params ap; - int ref_count; +struct buffer_resources { struct buffer_packet bps[PACKET_BUFFER_SIZE]; GQueue* free_buffer; // Available buffers GHashTable* used_buffer; // Buffers used for reading or writing GQueue* read_waiting; // Who wait to be notified for a read GHashTable* application_waiting; // Structure that can be used by the algo for its internal logic GHashTable* write_waiting; // Structure to track packets waiting to be written - void* misc; // Additional structures - algo_ctx_free_misc free_misc; // Fx ptr to free misc }; -void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to); -void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); -void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); -void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); -void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); -void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); -struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); -guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); +void init_buffer_management(struct buffer_resources* br); +void destroy_buffer_management(struct buffer_resources* br); +void mv_buffer_rtow(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to); +void mv_buffer_rtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from); +void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from); +void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to); +void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to); +void mv_buffer_atof(struct buffer_resources* app_ctx, void* from); +struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); -struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); -struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); -struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx); +struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); +struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); +struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx); -void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx); +void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx); -void free_naive(void* app_ctx); -void free_nothing(void* app_ctx); -void naive_free_simple(void* v); diff --git a/src/capture_traffic.c b/src/capture_traffic.c new file mode 100644 index 0000000..c431e8c --- /dev/null +++ b/src/capture_traffic.c @@ -0,0 +1,8 @@ +#include "capture_traffic.h" +void init_traffic_capture(struct capture_ctx* ctx) { + +} + +void stop_traffic_capture(struct capture_ctx* ctx) { + +} diff --git a/src/capture_traffic.h b/src/capture_traffic.h new file mode 100644 index 0000000..7024248 --- /dev/null +++ b/src/capture_traffic.h @@ -0,0 +1,14 @@ +#pragma once +#include +#include +#include +#include +#include +#include "evt_core.h" + +struct capture_ctx { + +}; + +void init_traffic_capture(struct capture_ctx* ctx); +void stop_traffic_capture(struct capture_ctx* ctx); diff --git a/src/donar.c b/src/donar.c index d9dd2d5..1fc6402 100644 --- a/src/donar.c +++ b/src/donar.c @@ -14,7 +14,7 @@ int main(int argc, char** argv) { struct donar_params dp = {0}; donar_init_params (&dp); - while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:")) != -1) { + while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:f:")) != -1) { switch(dp.opt) { case 'v': dp.verbose++; @@ -53,6 +53,9 @@ int main(int argc, char** argv) { case 'd': sscanf(optarg, "%d,%d", &dp.fresh_data, &dp.redundant_data); break; + case 'f': + dp.capture_file = strdup(optarg); + break; default: goto in_error; } @@ -74,14 +77,16 @@ int main(int argc, char** argv) { in_error: dp.errored = 1; - fprintf(stderr, "Usage as client : %s -c -a -o [-h] [-b] [-l ] [-d ,] [-e ]* [-r ]*\n", argv[0]); - fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] [-l ] [-d ,] [-e ]* [-r ]*\n\n", argv[0]); + fprintf(stderr, "Usage as client : %s -c -a -o [-h] [-b] [-f ] [-l ] [-d ,] [-e ]* [-r ]*\n", argv[0]); + fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] [-l ] [-f ] [-d ,] [-e ]* [-r ]*\n\n", argv[0]); fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n", dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data); terminate: if (dp.onion_file != NULL) free(dp.onion_file); if (dp.algo != NULL) free(dp.algo); + if (dp.capture_file != NULL) free(dp.capture_file); + g_ptr_array_free(dp.exposed_ports, TRUE); g_ptr_array_free(dp.remote_ports, TRUE); diff --git a/src/donar_client.c b/src/donar_client.c index 1a46aac..cef5dc3 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -76,7 +76,8 @@ void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { .algo_name = dp->algo, .links = dp->links, .fresh_data = dp->fresh_data, - .redundant_data = dp->redundant_data + .redundant_data = dp->redundant_data, + .capture_file = dp->capture_file }; evt_core_init (&(ctx->evts), dp->verbose); diff --git a/src/donar_init.c b/src/donar_init.c index 3beaa39..5d21c9e 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -146,6 +146,7 @@ void free_port (void* ptr) { void donar_init_params(struct donar_params* dp) { dp->onion_file = NULL; dp->algo = NULL; + dp->capture_file = NULL; dp->is_server = 0; dp->is_client = 0; dp->is_healing = 0; diff --git a/src/donar_init.h b/src/donar_init.h index c423f3f..3629fd7 100644 --- a/src/donar_init.h +++ b/src/donar_init.h @@ -10,7 +10,7 @@ struct donar_params { int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored, verbose, links, fresh_data, redundant_data; - char *port, *onion_file, *algo; + char *port, *onion_file, *algo, *capture_file; GPtrArray *remote_ports, *exposed_ports; }; diff --git a/src/donar_server.c b/src/donar_server.c index f8b7c8a..e403672 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -58,7 +58,8 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { .algo_name = dp->algo, .links = dp->links, .fresh_data = dp->fresh_data, - .redundant_data = dp->redundant_data + .redundant_data = dp->redundant_data, + .capture_file = dp->capture_file }; evt_core_init (&(ctx->evts), dp->verbose); diff --git a/src/proxy.c b/src/proxy.c index 45c3c9c..d918700 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -44,7 +44,7 @@ int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int read_res = FDS_READY; if (ctx->verbose > 1) fprintf(stderr, " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later\n"); - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n"); while (bp->mode == BP_READING) { @@ -69,7 +69,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { char url[255]; // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Read packet from socket bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); @@ -98,7 +98,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Write data from the buffer to the socket while (bp->mode == BP_WRITING) { @@ -109,8 +109,8 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) // 3. A whole packet has been written // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); + mv_buffer_wtof(&app_ctx->br, fdinfo); + notify_read(ctx, &app_ctx->br); return 0; co_error: @@ -125,7 +125,7 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) int write_res = FDS_READY; // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Write buffer write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); @@ -134,8 +134,8 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) // 3. A whole packet has been written // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); + mv_buffer_wtof(&app_ctx->br, fdinfo); + notify_read(ctx, &app_ctx->br); return 0; co_error: @@ -148,41 +148,43 @@ int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; // 1. If has a "used" buffer, remove it - mv_buffer_rtof (app_ctx, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); // 2. If appears in the write waiting queue, remove it - while (get_write_buffer (app_ctx, fdinfo) != NULL) { - mv_buffer_wtof(app_ctx, fdinfo); + while (get_write_buffer (&app_ctx->br, fdinfo) != NULL) { + mv_buffer_wtof(&app_ctx->br, fdinfo); } // 3. If appears in the read waiting queue, remove it - g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); + g_queue_remove_all (app_ctx->br.read_waiting, &(fdinfo->fd)); return app_ctx->desc->on_err(ctx, fdinfo); } +void algo_main_destroy(void* app_ctx) { + struct algo_ctx* ctx = (struct algo_ctx*) app_ctx; + ctx->ref_count--; + if (ctx->ref_count > 0) return; + + stop_traffic_capture(&ctx->cap); + destroy_buffer_management(&ctx->br); + if (ctx->free_misc) ctx->free_misc(ctx->misc); + free(ctx); +} + void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct algo_ctx)); - ctx->free_buffer = g_queue_new (); - ctx->read_waiting = g_queue_new (); - ctx->application_waiting = g_hash_table_new (NULL, NULL); - ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); - ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); ctx->link_count = 8; ctx->is_rdy = 0; ctx->ap = *ap; - for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { - memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet)); - g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); - } struct evt_core_cat tcp_listen = { .name = "tcp-listen", .flags = EPOLLIN, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_tcp_co, .err_cb = NULL }; @@ -193,7 +195,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "tcp-read", .flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_tcp_read, .err_cb = main_on_err }; @@ -204,7 +206,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "udp-read", .flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_udp_read, .err_cb = main_on_err }; @@ -215,7 +217,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "tcp-write", .flags = EPOLLOUT | EPOLLET | EPOLLRDHUP, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_tcp_write, .err_cb = main_on_err }; @@ -226,13 +228,16 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "udp-write", .flags = EPOLLOUT | EPOLLET, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_udp_write, .err_cb = main_on_err }; ctx->ref_count++; evt_core_add_cat(evt, &udp_write); + init_buffer_management(&ctx->br); + init_traffic_capture(&ctx->cap); + for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { if (strcmp(available_algo[i].name, ap->algo_name) == 0) { ctx->desc = &(available_algo[i]); diff --git a/src/proxy.h b/src/proxy.h index 5d7e082..fc9ab61 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -6,10 +6,45 @@ #include #include "evt_core.h" #include "algo_utils.h" +#include "capture_traffic.h" #include "url.h" #include "utils.h" #include "packet.h" +struct algo_params { + uint8_t is_waiting_bootstrap; + uint8_t is_healing; + char *algo_name, *capture_file; + int links, fresh_data, redundant_data; +}; + +struct algo_ctx; +typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); + +typedef void (*algo_ctx_free_misc)(void*); + +struct algo_desc { + char* name; + algo_init init; + algo_ctx_on_buffer on_stream; + algo_ctx_on_buffer on_datagram; + algo_ctx_on_event on_err; +}; + +struct algo_ctx { + struct algo_desc* desc; + uint8_t link_count; + uint8_t is_rdy; + struct algo_params ap; + int ref_count; + struct capture_ctx cap; + struct buffer_resources br; + void* misc; // Additional structures + algo_ctx_free_misc free_misc; // Fx ptr to free misc +}; + void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); @@ -49,7 +84,6 @@ static struct algo_desc available_algo[] = { } }; - void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap); int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);