diff --git a/CMakeLists.txt b/CMakeLists.txt index dbba341..1182bb3 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -33,12 +33,15 @@ list(APPEND CSOURCES src/proxy.c src/timer.h src/timer.c + src/capture_traffic.h + src/capture_traffic.c ) add_executable(donar ${CSOURCES} src/donar.c) add_executable(measlat ${CSOURCES} src/meas_lat.c) add_executable(udpecho ${CSOURCES} src/udp_echo.c) add_executable(torecho ${CSOURCES} src/tor_echo.c) +add_executable(capdiff ${CSOURCES} src/capdiff.c) find_package(PkgConfig REQUIRED) pkg_search_module(GLIB REQUIRED glib-2.0) @@ -55,6 +58,9 @@ target_link_libraries(udpecho ${GLIB_LDFLAGS}) target_include_directories(torecho PRIVATE ${GLIB_INCLUDE_DIRS}) target_link_libraries(torecho ${GLIB_LDFLAGS}) -install(TARGETS donar measlat udpecho torecho +target_include_directories(capdiff PRIVATE ${GLIB_INCLUDE_DIRS}) +target_link_libraries(capdiff ${GLIB_LDFLAGS}) + +install(TARGETS donar measlat udpecho torecho capdiff RUNTIME DESTINATION bin LIBRARY DESTINATION lib) diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 530f20e..27fd1d9 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -23,7 +23,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { - mv_buffer_rtof(app_ctx, fdinfo); + mv_buffer_rtof(&app_ctx->br, fdinfo); return 0; } @@ -34,12 +34,12 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 1; } // 2. Move buffer - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); return 0; } @@ -63,12 +63,12 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin } // 2. We move the buffer and notify the target - dup_buffer_tow (app_ctx, bp, to_fdinfo); + dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); } // 3. Release the buffer - mv_buffer_rtof (app_ctx, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); return 0; } diff --git a/src/algo_naive.c b/src/algo_naive.c index 90afdaa..6a7bac1 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -15,12 +15,12 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 1; } // 2. Move buffer - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); return 0; @@ -35,13 +35,13 @@ int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL; if (to_fdinfo == NULL) { fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 1; } //printf("Pass packet from %s to %s\n", fdinfo->url, url); // 2. We move the buffer and notify the target - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); return 0; diff --git a/src/algo_rr.c b/src/algo_rr.c index f3e004d..712aa52 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -82,10 +82,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; - mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); + mv_buffer_rtoa(&app_ctx->br, fdinfo, &rr->real[real_idx].idx); } else { if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); - mv_buffer_rtof (app_ctx, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); } } @@ -98,7 +98,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue dp->on = 0; // 2. Get the buffer - struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); + struct buffer_packet* bp = get_app_buffer (&app_ctx->br, &dp->idx); assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); // 3. We update our cursor @@ -110,12 +110,12 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx); //mv_buffer_wtor (app_ctx, fdinfo, bp); - mv_buffer_atof (app_ctx, &dp->idx); + mv_buffer_atof (&app_ctx->br, &dp->idx); } // 5. We move the buffer and notify the target //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); - mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); + mv_buffer_atow (&app_ctx->br, &dp->idx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); } @@ -164,7 +164,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); release: - mv_buffer_rtof(app_ctx, fdinfo); + mv_buffer_rtof(&app_ctx->br, fdinfo); } uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { @@ -202,7 +202,7 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr_pkt_manage_links(ctx, fdinfo, bp); } else { fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd); - mv_buffer_rtof(app_ctx, fdinfo); + mv_buffer_rtof(&app_ctx->br, fdinfo); } return 0; @@ -288,7 +288,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo continue; } - if ((len = write_queue_len (app_ctx, to_fdinfo)) > 0) { + if ((len = write_queue_len (&app_ctx->br, to_fdinfo)) > 0) { if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); blacklist_link (rr, sel_link); continue; @@ -296,11 +296,11 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { rr->current_link = sel_link; - mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); + mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); return 0; } else { - struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); + struct buffer_packet* dup_bp = dup_buffer_tow(&app_ctx->br, bp, to_fdinfo); /* * for later dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; @@ -313,7 +313,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo not_ready: // 3. We find no up target fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); - mv_buffer_wtof (app_ctx, fdinfo); + mv_buffer_wtof (&app_ctx->br, fdinfo); return 0; } diff --git a/src/algo_utils.c b/src/algo_utils.c index 02ce087..3813a6d 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -1,19 +1,5 @@ #include "algo_utils.h" -void free_nothing(void* app_ctx) {} -void free_naive(void* app_ctx) { - struct algo_ctx* ctx = (struct algo_ctx*) app_ctx; - ctx->ref_count--; - if (ctx->ref_count > 0) return; - if (ctx->free_misc) ctx->free_misc(ctx->misc); - g_queue_free(ctx->free_buffer); - g_queue_free(ctx->read_waiting); - g_hash_table_destroy (ctx->application_waiting); - g_hash_table_destroy (ctx->used_buffer); - g_hash_table_destroy (ctx->write_waiting); - free(ctx); -} - void iterate(int* fd, GQueue* q, int* waiting_count) { fprintf(stderr, "Queue for fd=%d has length=%d\n", *fd, q->length); *waiting_count += q->length; @@ -23,7 +9,12 @@ void iterate2(int* fd, struct buffer_packet *bp, gpointer user_data) { fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd); } -void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +void naive_free_simple(void* v) { + GQueue* g = v; + g_queue_free (g); +} + +void debug_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); int waiting_count = 0; g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); @@ -36,10 +27,30 @@ void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { waiting_count); } +void init_buffer_management(struct buffer_resources* ctx) { + ctx->free_buffer = g_queue_new (); + ctx->read_waiting = g_queue_new (); + ctx->application_waiting = g_hash_table_new (NULL, NULL); + ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); + ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { + memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet)); + g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); + } +} + +void destroy_buffer_management(struct buffer_resources* ctx) { + g_queue_free(ctx->free_buffer); + g_queue_free(ctx->read_waiting); + g_hash_table_destroy (ctx->application_waiting); + g_hash_table_destroy (ctx->used_buffer); + g_hash_table_destroy (ctx->write_waiting); +} + /** * Returns a buffer if available, NULL otherwise */ -struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* bp; // 1. Check if we don't have a buffer @@ -62,12 +73,12 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_ return bp; } -void __push_to_free(struct algo_ctx *app_ctx, struct buffer_packet* bp) { +void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) { memset(bp, 0, sizeof(struct buffer_packet)); g_queue_push_tail (app_ctx->free_buffer, bp); } -guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { GQueue* q; if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue @@ -77,7 +88,7 @@ guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) /** * Returns a buffer if available, NULL otherwise */ -struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { +struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* bp; GQueue* q; @@ -98,7 +109,7 @@ struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core return bp; } -void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) { +void mv_buffer_rtow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) { GQueue* q; struct buffer_packet* bp; @@ -121,30 +132,32 @@ void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, stru g_queue_push_tail(q, bp); } -void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from) { +void mv_buffer_rtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from) { struct buffer_packet* bp; // 1. We get the packet buffer bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd); if (bp == NULL) { fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in rtof\n", from->fd, from->url); + return; } g_hash_table_remove(app_ctx->used_buffer, &(from->fd)); __push_to_free (app_ctx, bp); } -void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo) { +void mv_buffer_wtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); if (bp == NULL) { fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in wtof\n", fdinfo->fd, fdinfo->url); + return; } g_hash_table_remove(app_ctx->used_buffer, &(fdinfo->fd)); __push_to_free (app_ctx, bp); } -void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to) { +void mv_buffer_rtoa(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, void* to) { struct buffer_packet* bp; bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd); if (bp == NULL) { @@ -159,7 +172,7 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void g_hash_table_insert(app_ctx->application_waiting, to, bp); } -void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to) { +void mv_buffer_atow(struct buffer_resources *app_ctx, void* from, struct evt_core_fdinfo* to) { GQueue* q; struct buffer_packet* bp; @@ -182,7 +195,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo g_queue_push_tail(q, bp); } -void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { +void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) { struct buffer_packet* bp; // 1. Remove the buffer @@ -197,7 +210,7 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { __push_to_free (app_ctx, bp); } -struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { +struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { GQueue* q; // 1. We get a free buffer @@ -222,11 +235,11 @@ struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_pac return bp_dest; } -struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { +struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx) { return g_hash_table_lookup (app_ctx->application_waiting, idx); } -void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { +void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx) { struct evt_core_fdinfo* next_fdinfo = NULL; while (next_fdinfo == NULL) { int* fd = g_queue_pop_head(app_ctx->read_waiting); @@ -252,9 +265,3 @@ int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* s memcpy(target, src, src->fmt.headers.size); return 0; } - -void naive_free_simple(void* v) { - GQueue* g = v; - g_queue_free (g); -} - diff --git a/src/algo_utils.h b/src/algo_utils.h index 1952554..f2845d1 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -6,62 +6,31 @@ #include "evt_core.h" #define PACKET_BUFFER_SIZE 1024 -struct algo_params { - uint8_t is_waiting_bootstrap; - uint8_t is_healing; - char* algo_name; - int links, fresh_data, redundant_data; -}; - - -struct algo_ctx; -typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); -typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); - -typedef void (*algo_ctx_free_misc)(void*); - -struct algo_desc { - char* name; - algo_init init; - algo_ctx_on_buffer on_stream; - algo_ctx_on_buffer on_datagram; - algo_ctx_on_event on_err; -}; - -struct algo_ctx { - struct algo_desc* desc; - uint8_t link_count; - uint8_t is_rdy; - struct algo_params ap; - int ref_count; +struct buffer_resources { struct buffer_packet bps[PACKET_BUFFER_SIZE]; GQueue* free_buffer; // Available buffers GHashTable* used_buffer; // Buffers used for reading or writing GQueue* read_waiting; // Who wait to be notified for a read GHashTable* application_waiting; // Structure that can be used by the algo for its internal logic GHashTable* write_waiting; // Structure to track packets waiting to be written - void* misc; // Additional structures - algo_ctx_free_misc free_misc; // Fx ptr to free misc }; -void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to); -void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); -void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); -void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); -void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); -void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); -struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); -guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); +void init_buffer_management(struct buffer_resources* br); +void destroy_buffer_management(struct buffer_resources* br); +void mv_buffer_rtow(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to); +void mv_buffer_rtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from); +void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from); +void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to); +void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to); +void mv_buffer_atof(struct buffer_resources* app_ctx, void* from); +struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); -struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); -struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); -struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx); +struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); +struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); +struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx); -void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx); +void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx); -void free_naive(void* app_ctx); -void free_nothing(void* app_ctx); -void naive_free_simple(void* v); diff --git a/src/capdiff.c b/src/capdiff.c new file mode 100644 index 0000000..0912c23 --- /dev/null +++ b/src/capdiff.c @@ -0,0 +1,87 @@ +#include +#include +#include "packet.h" + +int main(int argc, char** argv) { + if (argc != 3) { + fprintf(stderr, "Usage %s ./cap.0 ./cap.1\n", argv[0]); + exit(EXIT_FAILURE); + } + uint8_t verbose = 0; + + FILE *fd[2]; size_t sz[2]; + for (int i = 0; i < 2; i++) { + if ((fd[i] = fopen(argv[i+1], "r")) == NULL) { + perror("Unable to open file"); + exit(EXIT_FAILURE); + } + + fseek(fd[i], 0, SEEK_END); + sz[i] = ftell(fd[i]); + fseek(fd[i], 0, SEEK_SET); + } + + if (sz[0] != sz[1]) { + printf("[!!] %s has %ld entries, %s has %ld entries\n", + argv[1], + sz[0]/sizeof(struct buffer_packet), + argv[2], + sz[1]/sizeof(struct buffer_packet)); + } else if (verbose) { + printf("[OK] %s and %s have %ld entries\n", + argv[1], + argv[2], + sz[0]/sizeof(struct buffer_packet)); + } + + size_t to_read = sz[0] < sz[1] ? sz[0] : sz[1]; + size_t to_read_obj = to_read / sizeof(struct buffer_packet); + uint16_t expected_id = 0; + struct buffer_packet bpread[2]; + for (size_t c = 0; c < to_read_obj; c++) { + for (int i = 0; i < 2; i++) fread(&bpread[i], sizeof(struct buffer_packet), 1, fd[i]); + /*if (expected_id != bpread[0].ip.ap.fmt.content.clear.id || expected_id != bpread[1].ip.ap.fmt.content.clear.id) { + printf("[!!] expected id is %d, %s has id %d, %s has id %d\n", + expected_id, + argv[1], + bpread[0].ip.ap.fmt.content.clear.id, + argv[2], + bpread[1].ip.ap.fmt.content.clear.id); + }*/ + + uint8_t is_same_size = bpread[0].ip.ap.fmt.headers.size == bpread[1].ip.ap.fmt.headers.size; + if (!is_same_size) { + printf("[!!] %s packet has size %d, %s packet has size %d for expected id %d\n", + argv[1], + bpread[0].ip.ap.fmt.headers.size, + argv[2], + bpread[1].ip.ap.fmt.headers.size, + expected_id); + } else if (verbose) { + printf("[OK] %s and %s packets for expected id %d have size %d\n", + argv[1], + argv[2], + expected_id, + bpread[0].ip.ap.fmt.headers.size); + } + + size_t s1 = bpread[0].ip.ap.fmt.headers.size, s2 = bpread[1].ip.ap.fmt.headers.size; + size_t max_size = s1 > s2 ? s1 : s2; + for (size_t idx = 0; idx < max_size; idx++) { + char e1 = (&bpread[0].ip.ap.raw)[idx], e2 = (&bpread[0].ip.ap.raw)[idx]; + if (e1 != e2) { + printf("[!!] for expected id %d, byte 0x%04x is different: 0x%02x vs 0x%02x\n", expected_id, (uint16_t)idx, (uint8_t)e1, (uint8_t)e2); + } + } + + expected_id++; + } + + + for (int i = 0; i < 2; i++) { + fclose(fd[i]); + } + + printf("done\n"); + return 0; +} diff --git a/src/capture_traffic.c b/src/capture_traffic.c new file mode 100644 index 0000000..d1f0522 --- /dev/null +++ b/src/capture_traffic.c @@ -0,0 +1,90 @@ +#include "capture_traffic.h" + +void dynbuf_init(struct dynbuf* db) { + db->content = malloc(MEGABYTE); + if (db->content == NULL) { + perror("malloc dynbuf failed"); + exit(EXIT_FAILURE); + } + db->written = 0; + db->alloced = MEGABYTE; +} + +void dynbuf_check_alloc(struct dynbuf* db, size_t len) { + if (db->written + len > db->alloced) { + size_t new_alloced = db->written + len > 2 * db->alloced ? db->written + len : 2 * db->alloced; + db->content = realloc(db->content, new_alloced); + if (db->content == NULL) { + perror("realloc dynbuf failed"); + exit(EXIT_FAILURE); + } + db->alloced = new_alloced; + } +} + +void dynbuf_append(struct dynbuf* db, char* ptr, size_t len) { + dynbuf_check_alloc(db, len); + + memcpy(db->content + db->written, ptr, len); + db->written += len; +} + +void dynbuf_destroy(struct dynbuf* db) { + free(db->content); +} + +void traffic_capture_init(struct capture_ctx* ctx, char* filename) { + ctx->activated = filename == NULL ? 0 : 1; + if (!ctx->activated) return; + + ctx->filename = strdup(filename); + dynbuf_init (&ctx->in); + dynbuf_init (&ctx->out); +} + +void traffic_capture_notify(struct capture_ctx* ctx, struct buffer_packet *bp, char* dest) { + if (!ctx->activated) return; + + if (clock_gettime(CLOCK_MONOTONIC, &bp->seen) == -1){ + perror("clock_gettime error"); + exit(EXIT_FAILURE); + } + + dynbuf_append ( + strcmp(dest, "in") == 0 ? &ctx->in : &ctx->out, + (char*)bp, + sizeof(struct buffer_packet)); +} + +void traffic_capture_stop(struct capture_ctx* ctx) { + if (!ctx->activated) return; + + FILE* fd = NULL; + + struct dynbuf *dbs[] = {&ctx->in, &ctx->out}; + for (int i = 0; i < 2; i++) { + size_t written = 0, ack = 0; + char *out_file = NULL; + + asprintf(&out_file, "%s.%d", ctx->filename, i); + if (out_file == NULL || (fd = fopen(out_file, "w")) == NULL) { + perror("failed to open file"); + exit(EXIT_FAILURE); + } + free(out_file); + + while (written < dbs[i]->written) { + ack = fwrite(dbs[i]->content+written, sizeof(char), dbs[i]->written, fd); + if (ack <= 0) { + perror("unable to write capture file"); + exit(EXIT_FAILURE); + } + written += ack; + } + + fclose(fd); + dynbuf_destroy (dbs[i]); + } + + free(ctx->filename); +} diff --git a/src/capture_traffic.h b/src/capture_traffic.h new file mode 100644 index 0000000..ccdabcf --- /dev/null +++ b/src/capture_traffic.h @@ -0,0 +1,39 @@ +#pragma once +#define _GNU_SOURCE + +#include +#include +#include +#include +#include +#include +#include +#include "packet.h" +#include "evt_core.h" + +#define KILOBYTE 1024l +#define MEGABYTE 1024l * KILOBYTE +#define GIGABYTE 1024l * MEGABYTE + +struct captured_packet { + struct timeval* captured_time; + char* pkt; +}; + +struct dynbuf { + char* content; + size_t written; + size_t alloced; +}; + +struct capture_ctx { + uint8_t activated; + char* filename; + struct timeval* start_time; + struct dynbuf in; + struct dynbuf out; +}; + +void traffic_capture_init(struct capture_ctx* ctx, char* filename); +void traffic_capture_stop(struct capture_ctx* ctx); +void traffic_capture_notify(struct capture_ctx* ctx, struct buffer_packet *bp, char* dest); diff --git a/src/donar.c b/src/donar.c index d9dd2d5..1fc6402 100644 --- a/src/donar.c +++ b/src/donar.c @@ -14,7 +14,7 @@ int main(int argc, char** argv) { struct donar_params dp = {0}; donar_init_params (&dp); - while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:")) != -1) { + while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:f:")) != -1) { switch(dp.opt) { case 'v': dp.verbose++; @@ -53,6 +53,9 @@ int main(int argc, char** argv) { case 'd': sscanf(optarg, "%d,%d", &dp.fresh_data, &dp.redundant_data); break; + case 'f': + dp.capture_file = strdup(optarg); + break; default: goto in_error; } @@ -74,14 +77,16 @@ int main(int argc, char** argv) { in_error: dp.errored = 1; - fprintf(stderr, "Usage as client : %s -c -a -o [-h] [-b] [-l ] [-d ,] [-e ]* [-r ]*\n", argv[0]); - fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] [-l ] [-d ,] [-e ]* [-r ]*\n\n", argv[0]); + fprintf(stderr, "Usage as client : %s -c -a -o [-h] [-b] [-f ] [-l ] [-d ,] [-e ]* [-r ]*\n", argv[0]); + fprintf(stderr, "Usage as server : %s -s -a [-h] [-b] [-l ] [-f ] [-d ,] [-e ]* [-r ]*\n\n", argv[0]); fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n", dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data); terminate: if (dp.onion_file != NULL) free(dp.onion_file); if (dp.algo != NULL) free(dp.algo); + if (dp.capture_file != NULL) free(dp.capture_file); + g_ptr_array_free(dp.exposed_ports, TRUE); g_ptr_array_free(dp.remote_ports, TRUE); diff --git a/src/donar_client.c b/src/donar_client.c index 1a46aac..cef5dc3 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -76,7 +76,8 @@ void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { .algo_name = dp->algo, .links = dp->links, .fresh_data = dp->fresh_data, - .redundant_data = dp->redundant_data + .redundant_data = dp->redundant_data, + .capture_file = dp->capture_file }; evt_core_init (&(ctx->evts), dp->verbose); diff --git a/src/donar_init.c b/src/donar_init.c index 3beaa39..5d21c9e 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -146,6 +146,7 @@ void free_port (void* ptr) { void donar_init_params(struct donar_params* dp) { dp->onion_file = NULL; dp->algo = NULL; + dp->capture_file = NULL; dp->is_server = 0; dp->is_client = 0; dp->is_healing = 0; diff --git a/src/donar_init.h b/src/donar_init.h index c423f3f..3629fd7 100644 --- a/src/donar_init.h +++ b/src/donar_init.h @@ -10,7 +10,7 @@ struct donar_params { int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored, verbose, links, fresh_data, redundant_data; - char *port, *onion_file, *algo; + char *port, *onion_file, *algo, *capture_file; GPtrArray *remote_ports, *exposed_ports; }; diff --git a/src/donar_server.c b/src/donar_server.c index f8b7c8a..e403672 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -58,7 +58,8 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { .algo_name = dp->algo, .links = dp->links, .fresh_data = dp->fresh_data, - .redundant_data = dp->redundant_data + .redundant_data = dp->redundant_data, + .capture_file = dp->capture_file }; evt_core_init (&(ctx->evts), dp->verbose); diff --git a/src/meas_lat.c b/src/meas_lat.c index 42961c2..2565a0e 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -41,6 +41,7 @@ struct measure_conf* create_measure_conf(int max_mes, int plsize) { perror("malloc failed"); exit(EXIT_FAILURE); } + return mc; } @@ -114,6 +115,15 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { perror("clock_gettime error"); exit(EXIT_FAILURE); } + + char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire."; + size_t msg_len = strlen(my_msg); + size_t cursor_msg = 0; + for (size_t i = sizeof(struct packet_header); i < mc->payload_size; i++) { + mc->payload[i] = my_msg[cursor_msg]; + cursor_msg = (cursor_msg + 1) % msg_len; + } + struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read"); if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read"); if (tgtinfo == NULL) { diff --git a/src/packet.h b/src/packet.h index 6f161e2..a8e78a8 100644 --- a/src/packet.h +++ b/src/packet.h @@ -70,6 +70,7 @@ struct buffer_packet { uint8_t ap_count; uint16_t aread; uint16_t awrite; + struct timespec seen; struct internet_packet ip; }; diff --git a/src/proxy.c b/src/proxy.c index 45c3c9c..afe9772 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -44,7 +44,7 @@ int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int read_res = FDS_READY; if (ctx->verbose > 1) fprintf(stderr, " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later\n"); - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n"); while (bp->mode == BP_READING) { @@ -69,7 +69,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { char url[255]; // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Read packet from socket bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); @@ -77,7 +77,10 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; - // 3. Apply logic + // 3. Notify helpers + traffic_capture_notify (&app_ctx->cap, bp, "in"); + + // 4. Apply logic return app_ctx->desc->on_datagram(ctx, fdinfo, bp); co_error: @@ -98,7 +101,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Write data from the buffer to the socket while (bp->mode == BP_WRITING) { @@ -109,8 +112,8 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) // 3. A whole packet has been written // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); + mv_buffer_wtof(&app_ctx->br, fdinfo); + notify_read(ctx, &app_ctx->br); return 0; co_error: @@ -125,17 +128,20 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) int write_res = FDS_READY; // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; + if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Write buffer write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_AGAIN) return 1; - // 3. A whole packet has been written + // 3. Notify helpers + traffic_capture_notify (&app_ctx->cap, bp, "out"); + + // 4. A whole packet has been written // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); + mv_buffer_wtof(&app_ctx->br, fdinfo); + notify_read(ctx, &app_ctx->br); return 0; co_error: @@ -148,41 +154,43 @@ int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; // 1. If has a "used" buffer, remove it - mv_buffer_rtof (app_ctx, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); // 2. If appears in the write waiting queue, remove it - while (get_write_buffer (app_ctx, fdinfo) != NULL) { - mv_buffer_wtof(app_ctx, fdinfo); + while (get_write_buffer (&app_ctx->br, fdinfo) != NULL) { + mv_buffer_wtof(&app_ctx->br, fdinfo); } // 3. If appears in the read waiting queue, remove it - g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); + g_queue_remove_all (app_ctx->br.read_waiting, &(fdinfo->fd)); return app_ctx->desc->on_err(ctx, fdinfo); } +void algo_main_destroy(void* app_ctx) { + struct algo_ctx* ctx = (struct algo_ctx*) app_ctx; + ctx->ref_count--; + if (ctx->ref_count > 0) return; + + traffic_capture_stop(&ctx->cap); + destroy_buffer_management(&ctx->br); + if (ctx->free_misc) ctx->free_misc(ctx->misc); + free(ctx); +} + void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct algo_ctx)); - ctx->free_buffer = g_queue_new (); - ctx->read_waiting = g_queue_new (); - ctx->application_waiting = g_hash_table_new (NULL, NULL); - ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); - ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); ctx->link_count = 8; ctx->is_rdy = 0; ctx->ap = *ap; - for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { - memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet)); - g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); - } struct evt_core_cat tcp_listen = { .name = "tcp-listen", .flags = EPOLLIN, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_tcp_co, .err_cb = NULL }; @@ -193,7 +201,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "tcp-read", .flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_tcp_read, .err_cb = main_on_err }; @@ -204,7 +212,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "udp-read", .flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_udp_read, .err_cb = main_on_err }; @@ -215,7 +223,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "tcp-write", .flags = EPOLLOUT | EPOLLET | EPOLLRDHUP, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_tcp_write, .err_cb = main_on_err }; @@ -226,13 +234,16 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { .name = "udp-write", .flags = EPOLLOUT | EPOLLET, .app_ctx = ctx, - .free_app_ctx = free_naive, + .free_app_ctx = algo_main_destroy, .cb = main_on_udp_write, .err_cb = main_on_err }; ctx->ref_count++; evt_core_add_cat(evt, &udp_write); + init_buffer_management(&ctx->br); + traffic_capture_init(&ctx->cap, ap->capture_file); + for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { if (strcmp(available_algo[i].name, ap->algo_name) == 0) { ctx->desc = &(available_algo[i]); diff --git a/src/proxy.h b/src/proxy.h index 5d7e082..fc9ab61 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -6,10 +6,45 @@ #include #include "evt_core.h" #include "algo_utils.h" +#include "capture_traffic.h" #include "url.h" #include "utils.h" #include "packet.h" +struct algo_params { + uint8_t is_waiting_bootstrap; + uint8_t is_healing; + char *algo_name, *capture_file; + int links, fresh_data, redundant_data; +}; + +struct algo_ctx; +typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); + +typedef void (*algo_ctx_free_misc)(void*); + +struct algo_desc { + char* name; + algo_init init; + algo_ctx_on_buffer on_stream; + algo_ctx_on_buffer on_datagram; + algo_ctx_on_event on_err; +}; + +struct algo_ctx { + struct algo_desc* desc; + uint8_t link_count; + uint8_t is_rdy; + struct algo_params ap; + int ref_count; + struct capture_ctx cap; + struct buffer_resources br; + void* misc; // Additional structures + algo_ctx_free_misc free_misc; // Fx ptr to free misc +}; + void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); @@ -49,7 +84,6 @@ static struct algo_desc available_algo[] = { } }; - void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap); int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);