diff --git a/CMakeLists.txt b/CMakeLists.txt index d9c9490..e6661e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,8 +25,8 @@ list(APPEND CSOURCES src/url.c src/donar_init.h src/donar_init.c - src/algo_rr.c src/algo_dup2.c + src/algo_thunder.c src/algo_utils.h src/algo_utils.c src/proxy.h diff --git a/scripts/run-2 b/scripts/run-2 deleted file mode 100755 index 9c3f4b3..0000000 --- a/scripts/run-2 +++ /dev/null @@ -1,88 +0,0 @@ -#!/bin/bash -WAITFOR=2280 # 38min - -echo "Create output folder..." -docker run \ - --rm \ - --user root \ - -v `pwd`/out:/home/donar \ - registry.gitlab.inria.fr/qdufour/donar \ - chown -R 1000:1000 /home/donar - -for i in {1..10}; do -echo "Spawning container $i..." -docker run \ - --rm \ - -d \ - --name "donarxp_server_${i}" \ - -e HOME='/tmp' \ - -v `pwd`/out:/home/donar \ - registry.gitlab.inria.fr/qdufour/donar \ - tor -f /etc/torrc - -docker run \ - --rm \ - -d \ - --name "donarxp_client_${i}" \ - -e HOME='/tmp' \ - -v `pwd`/out:/home/donar \ - registry.gitlab.inria.fr/qdufour/donar \ - tor -f /etc/torrc - -done - -sleep 10 - -for j in {1..100}; do -echo "Run xp $j..." -run_fold=`mktemp -up . XXXXXXXXXXXXXXXX` - -echo "Reset containers..." -for i in {1..10}; do - docker exec donarxp_client_${i} sh -c 'killall -9 bash; killall -9 donar; killall -9 measlat; killall -9 udpecho' - docker exec donarxp_server_${i} sh -c 'killall -9 bash; killall -9 donar; killall -9 measlat; killall -9 udpecho' -done - -echo "Launch servers..." -docker exec donarxp_server_2 rrhr-server ${run_fold}-rrhr-2 & -docker exec donarxp_server_3 dup2-server ${run_fold}-dup2-3 & -docker exec donarxp_server_4 rrhr-server ${run_fold}-rrhr-4 & -docker exec donarxp_server_5 dup2-server ${run_fold}-dup2-5 & -docker exec donarxp_server_6 rrhr-server ${run_fold}-rrhr-6 & -docker exec donarxp_server_7 dup2-server ${run_fold}-dup2-7 & -docker exec donarxp_server_8 orig-server ${run_fold}-orig-8 & -docker exec donarxp_server_9 orig-server ${run_fold}-orig-9 & -docker exec donarxp_server_10 orig-server ${run_fold}-orig-10 & - -sleep 10 -echo "Launch measures..." -timeout $WAITFOR bash <cat->app_ctx; - - // Check that we didn't already received the packet + union abstract_packet *ap = (union abstract_packet*) &bp->ip; struct dup2_ctx* dup2c = app_ctx->misc; - if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { + int32_t id = -1, port = -1; + if (ctx->verbose > 1) { + fprintf(stderr, " [algo_dup2] Received a buffer\n"); + dump_buffer_packet(bp); + } + + do { + switch (ap->fmt.headers.cmd) { + case CMD_UDP_METADATA_THUNDER: + id = ap->fmt.content.udp_metadata_thunder.id; + break; + case CMD_UDP_ENCAPSULATED: + port = ap->fmt.content.udp_encapsulated.port; + break; + default: + break; + } + } while ((ap = ap_next(ap)) != NULL); + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Extracted port=%d and id=%d\n", port, id); + + if (port == -1 || id == -1) { + fprintf(stderr, "Missing data port=%d and id=%d...\n", port, id); + exit(EXIT_FAILURE); + } + + // Check that received identifier has not been delivered + if (ring_ge(dup2c->recv_id, id)) { + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packet already delivered, dropping\n"); mv_buffer_rtof(&app_ctx->br, fdinfo); return 0; } - dup2c->recv_id = bp->ip.ap.fmt.content.clear.id; + // Update delivered identifier + dup2c->recv_id = id; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); - mv_buffer_wtof (&app_ctx->br, fdinfo); + mv_buffer_rtof (&app_ctx->br, fdinfo); return 1; } // 2. Move buffer + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Scheduling packet for write\n"); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); + return 0; } @@ -49,11 +79,20 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct dup2_ctx* dup2c = app_ctx->misc; - bp->ip.ap.fmt.content.clear.id = dup2c->emit_id; dup2c->emit_id = dup2c->emit_id + 1; + union abstract_packet metadata = { + .fmt.headers.cmd = CMD_UDP_METADATA_THUNDER, + .fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder), + .fmt.headers.flags = 0, + .fmt.content.udp_metadata_thunder.id = dup2c->emit_id + }; + buffer_append_ap (bp, &metadata); + if (ctx->verbose > 1) { + dump_buffer_packet(bp); + fprintf(stderr, " [algo_dup2] Added metadata\n"); + } struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); - for (int i = 0; i < app_ctx->ap.links; i++) { // 1. A whole packet has been read, we will find someone to write it to_fdinfo = cat->socklist->len > i ? g_array_index(cat->socklist, struct evt_core_fdinfo*, i) : NULL; @@ -66,6 +105,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); } + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packets sent\n"); // 3. Release the buffer mv_buffer_rtof (&app_ctx->br, fdinfo); diff --git a/src/algo_naive.c b/src/algo_naive.c index 6a7bac1..8b7a51e 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -9,9 +9,10 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf char url[256]; struct evt_core_fdinfo *to_fdinfo = NULL; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; - // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); + if (ctx->verbose > 1) fprintf(stderr, " [algo_naive] 1/2 Find destination\n"); + sprintf(url, "udp:write:127.0.0.1:%d", ap->fmt.content.udp_encapsulated.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); @@ -19,7 +20,7 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf return 1; } - // 2. Move buffer + if (ctx->verbose > 1) fprintf(stderr, " [algo_naive] 2/2 Move buffer\n"); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); @@ -35,7 +36,7 @@ int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL; if (to_fdinfo == NULL) { fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name); - mv_buffer_wtof (&app_ctx->br, fdinfo); + mv_buffer_wtof(&app_ctx->br, fdinfo); return 1; } //printf("Pass packet from %s to %s\n", fdinfo->url, url); diff --git a/src/algo_rr.c b/src/algo_rr.c deleted file mode 100644 index 788d3cb..0000000 --- a/src/algo_rr.c +++ /dev/null @@ -1,379 +0,0 @@ -#include -#include "algo_utils.h" -#include "utils.h" -#include "url.h" -#include "proxy.h" -#include "timer.h" - -struct timer_info { - uint16_t health_id; - uint8_t prevlink; - uint16_t min_blocked_pkt; - struct algo_ctx* algo; -}; - -struct queued_pkt { - uint8_t on; - int link_fd; - int idx; - uint16_t id; - struct algo_ctx* algo; -}; - -struct rr_ctx { - uint8_t my_links; - uint8_t remote_links; - uint8_t current_link; - int64_t mjit; - uint16_t health_id; - uint16_t health_id_late; - uint16_t content_id; - uint16_t sent_health_id; - uint16_t sent_content_id; - struct internet_packet prev_packet; - struct timespec emit_time; - struct queued_pkt real[PACKET_BUFFER_SIZE]; - struct timer_info wait[PACKET_BUFFER_SIZE]; -}; - -void show_link_availability(struct rr_ctx* rr) { - printf("Links availability: my_links["); - for (int i = 0; i < 8; i++) { - if (rr->my_links & 1 << i) printf("U"); - else printf("-"); - } - printf("], rem_links["); - for (int i = 0; i < 8; i++) { - if (rr->remote_links & 1 << i) printf("U"); - else printf("-"); - } - printf("]\n"); -} - -void blacklist_link(struct rr_ctx* rr, int sel_link) { - printf("Blacklist link=%d | ", sel_link); - rr->remote_links &= 0xff ^ 1 << sel_link; - show_link_availability (rr); -} - -void on_timeout_health (struct evt_core_ctx* ctx, void* user); - -void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - uint16_t real_idx = bp->ip.ap.fmt.content.clear.id % PACKET_BUFFER_SIZE; - - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - - // 1. We queue the packet to keep it - if (rr->real[real_idx].on && ring_lt(rr->real[real_idx].id, bp->ip.ap.fmt.content.clear.id)) { - fprintf(stderr, "Real array is full for packet_id=%d, idx=%d, last_delivered_content_id=%d BUG: [\n", - bp->ip.ap.fmt.content.clear.id, - real_idx, - rr->content_id); - for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - fprintf(stderr, "\t%d => %d\n", rr->real[i].id, rr->real[i].on); - } - fprintf(stderr, "] - could be replaced by drop\n"); - exit(EXIT_FAILURE); - } else if (!rr->real[real_idx].on && ring_gt(bp->ip.ap.fmt.content.clear.id, rr->content_id)) { - rr->real[real_idx].on = 1; - rr->real[real_idx].id = bp->ip.ap.fmt.content.clear.id; - rr->real[real_idx].idx = real_idx; - rr->real[real_idx].link_fd = fdinfo->fd; - rr->real[real_idx].algo = app_ctx; - mv_buffer_rtoa(&app_ctx->br, fdinfo, &rr->real[real_idx].idx); - } else { - if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); - mv_buffer_rtof (&app_ctx->br, fdinfo); - } -} - -void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queued_pkt* dp) { - struct evt_core_fdinfo *to_fdinfo = NULL; - struct rr_ctx* rr = app_ctx->misc; - char url[255]; - - // 1. Marked the packet as handled - dp->on = 0; - - // 2. Get the buffer - struct buffer_packet* bp = get_app_buffer (&app_ctx->br, &dp->idx); - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - - // 3. We update our cursor - rr->content_id = bp->ip.ap.fmt.content.clear.id; - - // 4. Find its target - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) { - fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx); - //mv_buffer_wtor (app_ctx, fdinfo, bp); - mv_buffer_atof (&app_ctx->br, &dp->idx); - } - - // 5. We move the buffer and notify the target - //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); - mv_buffer_atow (&app_ctx->br, &dp->idx, to_fdinfo); - main_on_udp_write(ctx, to_fdinfo); -} - -void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - - assert(bp->ip.ap.fmt.headers.cmd == CMD_HEALTH); - - // 1. Health packet was received too late, dropping it - if (ring_le(bp->ip.ap.fmt.content.health.id, rr->health_id_late)) goto release; - - // 2. Reactivate link if deactivated - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - if (!(rr->remote_links & (1 << link_num))) { - printf("Activate link=%d | ", link_num); - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working - show_link_availability (rr); - } - - // 3. Update RR structure if its the greatest health_id we received - if (ring_gt(bp->ip.ap.fmt.content.health.id, rr->health_id)) { - // 3.1. Update current health id - rr->health_id = bp->ip.ap.fmt.content.health.id; - - // 3.2. Update my links I can use thanks to target feedback - if (bp->ip.ap.fmt.content.health.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.fmt.content.health.bitfield; - printf("Update my links | "); - show_link_availability (rr); - } - } - - // 4. Set callback to close this health packet window - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.fmt.content.health.deltat; - if (timeout <= 0) timeout = 0; - uint64_t idx = bp->ip.ap.fmt.content.health.id % PACKET_BUFFER_SIZE; - - rr->wait[idx].health_id = bp->ip.ap.fmt.content.health.id; - rr->wait[idx].prevlink = bp->ip.ap.fmt.content.health.prevlink; - rr->wait[idx].min_blocked_pkt = bp->ip.ap.fmt.content.health.min_blocked_pkt; - rr->wait[idx].algo = app_ctx; - - set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); - -release: - mv_buffer_rtof(&app_ctx->br, fdinfo); -} - -uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { - struct rr_ctx* rr = app_ctx->misc; - struct evt_core_fdinfo* fdinfo = NULL; - struct buffer_packet* bp = NULL; - uint64_t delivered = 0; - - while(1) { - //printf("Trying to deliver %d\n", rr->recv_id+1); - struct queued_pkt* def = &rr->real[(rr->content_id+1) % PACKET_BUFFER_SIZE]; - if (!def->on) break; - rr_deliver(ctx, app_ctx, def); - delivered++; - //printf("Delivered %d\n", rr->recv_id); - } - return delivered; -} - -//------ - -int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - - if (bp->ip.ap.fmt.headers.cmd == CMD_CLEAR) { - if (ctx->verbose > 1) fprintf(stderr, " [algo/rr] Received a CLEAR packet of size %d on URL %s\n", bp->ip.ap.fmt.headers.size, fdinfo->url); - // 1. Register packet in our queue - rr_pkt_register(ctx, fdinfo, bp); - - // 2. Process queue - rr_pkt_unroll (ctx, app_ctx); - } else if (bp->ip.ap.fmt.headers.cmd == CMD_HEALTH) { - if (ctx->verbose > 1) fprintf(stderr, " [algo/rr] Received a HEALTH packet of size %d on URL %s\n", bp->ip.ap.fmt.headers.size, fdinfo->url); - rr_pkt_manage_links(ctx, fdinfo, bp); - } else { - fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd); - mv_buffer_rtof(&app_ctx->br, fdinfo); - } - - return 0; -} - -int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - struct evt_core_fdinfo *to_fdinfo = NULL; - uint16_t min_pkt; - guint len; - char url[255]; - size_t health_packet_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); - size_t max_size = sizeof(struct internet_packet) - health_packet_size; - - if (ctx->verbose > 1) fprintf(stderr, " [algo/rr] Read a UDP packet on URL %s\n", fdinfo->url); - - // 1. Prepare RR state and packet values - struct timespec curr; - int secs, nsecs; - uint64_t mili_sec; - - if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ - perror("clock_gettime error"); - exit(EXIT_FAILURE); - } - - // 2. Compute delta t - secs = curr.tv_sec - rr->emit_time.tv_sec; - nsecs = curr.tv_nsec - rr->emit_time.tv_nsec; - rr->emit_time = curr; - mili_sec = secs * 1000 + nsecs / 1000000; - if (mili_sec > rr->mjit) mili_sec = rr->mjit; - - // 3. Prepare fresh packet - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - bp->ip.ap.fmt.content.clear.id = rr->sent_content_id; - min_pkt = rr->sent_content_id; - rr->sent_content_id++; - if (bp->ip.ap.fmt.headers.size > max_size) { - fprintf(stderr, "Packet is too big to be relayed. Oops...\n"); - exit(EXIT_FAILURE); - } - - // 4. Append redundancy if needed - if (app_ctx->ap.redundant_data == 1) { - size_t current_size = get_full_size (bp); - size_t final_size = current_size + rr->prev_packet.ap.fmt.headers.size; - if (final_size <= max_size) { - min_pkt = rr->prev_packet.ap.fmt.content.clear.id; - append_buffer(&bp->ip.ap, bp->ap_count, &rr->prev_packet.ap); // We append previous packet - bp->ap_count++; - } else if (ctx->verbose) { - fprintf(stderr, " [algo/rr] Can't append redundancy (current=%ld, after=%ld, max=%ld)\n", current_size, final_size, max_size); - } - - append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time - } - - // 5. Append health packet - struct buffer_packet hp; - hp.ip.ap.fmt.headers.cmd = CMD_HEALTH; - hp.ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); - hp.ip.ap.fmt.content.health.id = rr->sent_health_id; - hp.ip.ap.fmt.content.health.deltat = mili_sec; - hp.ip.ap.fmt.content.health.prevlink = rr->current_link; - hp.ip.ap.fmt.content.health.bitfield = rr->remote_links; - hp.ip.ap.fmt.content.health.min_blocked_pkt = min_pkt; - rr->sent_health_id++; - append_buffer(&bp->ip.ap, bp->ap_count, &hp.ip.ap); - bp->ap_count++; - - // 6. Try to find someone to send it - int max = 16; - uint8_t sel_link = rr->current_link; - while(max-- >= 0) { - if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Still bootstrapping - sel_link = (sel_link + 1) % app_ctx->ap.links; - sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) { - if (ctx->verbose) fprintf(stderr, " [algo/rr] write fd %s has not been found, skipping\n", url); - continue; - } - - if ((len = write_queue_len (&app_ctx->br, to_fdinfo)) > 0) { - if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); - blacklist_link (rr, sel_link); - continue; - } - - if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { - rr->current_link = sel_link; - mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); - main_on_tcp_write(ctx, to_fdinfo); - return 0; - } else { - struct buffer_packet* dup_bp = dup_buffer_tow(&app_ctx->br, bp, to_fdinfo); - /* - * for later - dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; - dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor... - */ - main_on_tcp_write(ctx, to_fdinfo); - } - } - -not_ready: - // 3. We find no up target - fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); - mv_buffer_wtof (&app_ctx->br, fdinfo); - return 0; -} - -void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { - struct timer_info* t = raw; - struct algo_ctx* app_ctx = t->algo; - struct rr_ctx* rr = app_ctx->misc; - - // 1. Update link recovery window if needed - if (ring_gt(t->health_id, rr->health_id_late)) rr->health_id_late = t->health_id; - - // 2. Blacklist previous link if needed - uint16_t prev_health_id = (t->health_id - 1); - uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; - struct timer_info* t_old = &rr->wait[prev_health_idx]; - if (t_old->health_id != prev_health_id) blacklist_link (rr, t->prevlink); - - // 3. Deliver blocked packets - // @FIXME CRAPPY CODE / CRAPPY LOGIC - //printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); - if (ring_gt(t->min_blocked_pkt, rr->content_id) && !rr->real[t->min_blocked_pkt % PACKET_BUFFER_SIZE].on) { - fprintf(stderr, "min_blocked_packet has not been received, t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); - exit(EXIT_FAILURE); - } - while (ring_gt(t->min_blocked_pkt, rr->content_id - 1)) { - rr->content_id++; - rr_pkt_unroll (ctx, app_ctx); - } - rr_pkt_unroll (ctx, app_ctx); -} - -int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - if (strstr(fdinfo->cat->name, "udp") != NULL) return 1; - return 0; -} - -void algo_rr_free(void* v) { - struct rr_ctx* rr = v; - free(rr); -} - -void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { - struct rr_ctx* rr = malloc(sizeof(struct rr_ctx)); - if (rr == NULL) { - perror("malloc failed for rr_init."); - exit(EXIT_FAILURE); - } - memset(rr, 0, sizeof(struct rr_ctx)); - rr->mjit = 200; - rr->my_links = 0xff; - rr->remote_links = 0xff; - rr->sent_health_id = 1; - rr->sent_content_id = 1; - rr->health_id = 0; - rr->health_id_late = 0; - rr->content_id = 0; - rr->current_link = 0; - app_ctx->misc = rr; - app_ctx->free_misc = algo_rr_free; - - init_timer(ctx); -} - diff --git a/src/algo_thunder.c b/src/algo_thunder.c new file mode 100644 index 0000000..3b30208 --- /dev/null +++ b/src/algo_thunder.c @@ -0,0 +1,387 @@ +#include +#include "algo_utils.h" +#include "utils.h" +#include "url.h" +#include "proxy.h" +#include "timer.h" + +// A Tor cell size is 512 bytes but handle only 498 bytes of data +#define TOR_CELL_SIZE 498 +#define ALLOWED_JITTER_MS 100 +#define MAX_LINKS 64 + +struct thunder_ctx { + uint16_t recv_id; + uint16_t emit_id; + uint8_t selected_link; + uint8_t total_links; + uint64_t delta_t_per_link[MAX_LINKS]; + uint64_t rcv_delta_t_per_link[MAX_LINKS]; + uint64_t received_pkts_on_link[MAX_LINKS]; + uint64_t blacklisted[MAX_LINKS]; + size_t monit_pkt_size; + struct timespec prev_link_time, prev_rcv_link_time; +}; + +uint64_t compute_delta(struct timespec* prev_time, uint64_t max) { + struct timespec curr; + int secs, nsecs; + uint64_t mili_sec; + + // 1. We compute the time difference + if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ + perror("clock_gettime error"); + exit(EXIT_FAILURE); + } + secs = curr.tv_sec - prev_time->tv_sec; + nsecs = curr.tv_nsec - prev_time->tv_nsec; + *prev_time = curr; + mili_sec = secs * 1000 + nsecs / 1000000; + if (mili_sec > max) mili_sec = max; + + return mili_sec; +} + +int is_blacklisted(struct thunder_ctx* thunderc, int link_id) { + return thunderc->blacklisted[link_id] >= thunderc->received_pkts_on_link[link_id]; +} + +void prepare(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + + thunderc->emit_id++; + union abstract_packet metadata = { + .fmt.headers.cmd = CMD_UDP_METADATA_THUNDER, + .fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder), + .fmt.headers.flags = 0, + .fmt.content.udp_metadata_thunder.id = thunderc->emit_id, + }; + buffer_append_ap (bp, &metadata); + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] UDP metadata added\n"); +} + +void pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + uint64_t ref = 0l + thunderc->emit_id; + + dup_buffer_toa (&app_ctx->br, bp, (void *)ref); + + // 1. Clean old buffers (we keep only thunderc->total_links buffer, keeping more would be useless) + if (ref > thunderc->total_links && get_app_buffer (&app_ctx->br, (void *)(ref - thunderc->total_links))) { + mv_buffer_atof (&app_ctx->br, (void *)(ref - thunderc->total_links)); + } + + // 2. Append abstract packets stored in our buffers + uint64_t add_ref = ref; + while(1) { + if (add_ref < 1) break; + add_ref--; + struct buffer_packet *bp_iter = get_app_buffer (&app_ctx->br, (void *)add_ref); + if (bp_iter == NULL) break; + union abstract_packet *ap = buffer_first_ap (bp_iter); + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) { + fprintf(stderr, "Invalid buffer payload!\n"); + exit(EXIT_FAILURE); + } + union abstract_packet *ap_meta = ap_next (ap); + if (ap_meta->fmt.headers.cmd != CMD_UDP_METADATA_THUNDER) { + fprintf(stderr, "Invalid buffer metadata!\n"); + exit(EXIT_FAILURE); + } + + if (buffer_full_size (bp) + ap->fmt.headers.size + ap_meta->fmt.headers.size > TOR_CELL_SIZE - thunderc->monit_pkt_size) break; + + buffer_append_ap (bp, ap); + buffer_append_ap (bp, ap_meta); + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Pad packet (now %ld bytes)\n", buffer_full_size (bp)); + } +} + +int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + char url[256]; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + struct evt_core_fdinfo *to_fdinfo = NULL; + struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); + + do { + // 1. We choose the link + if (cat->socklist->len == 0) { + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] No link available, packet will be dropped\n"); + break; + } + + to_fdinfo = NULL; + do { + thunderc->selected_link = (thunderc->selected_link + 1) % thunderc->total_links; + sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + thunderc->selected_link); + to_fdinfo = evt_core_get_from_url (ctx, url); + } while (to_fdinfo == NULL); + //printf("URL %s has been retained\n", url); + + // 2. We create the packet template + union abstract_packet links = { + .fmt.headers.cmd = CMD_LINK_MONITORING_THUNDER, + .fmt.headers.size = thunderc->monit_pkt_size, + .fmt.headers.flags = 0, + .fmt.content.link_monitoring_thunder.links_status = {} + }; + + // 3. We append the template to the buffer + struct buffer_packet* bp_dup = dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); + union abstract_packet *new_ap = buffer_append_ap (bp_dup, &links); + + // 4. We compute the time difference + uint64_t mili_sec = compute_delta (&thunderc->prev_link_time, UINT16_MAX); + + // 5. We create the array + struct link_info *li = &new_ap->fmt.content.link_monitoring_thunder.links_status; + for (int i = 0; i < thunderc->total_links; i++) { + thunderc->delta_t_per_link[i] += mili_sec; + li[i].delta_t = thunderc->delta_t_per_link[i] > UINT16_MAX ? UINT16_MAX : thunderc->delta_t_per_link[i]; + } + thunderc->delta_t_per_link[thunderc->selected_link] = 0; + li[thunderc->selected_link].delta_t = 0; + + if (ctx->verbose > 1) { + dump_buffer_packet(bp_dup); + fprintf(stderr, " [algo_thunder] Will send this info\n"); + } + main_on_tcp_write(ctx, to_fdinfo); + + } while (is_blacklisted (thunderc, thunderc->selected_link)); + + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Packets sent\n"); + + // Release the buffer + mv_buffer_rtof (&app_ctx->br, fdinfo); + + return 0; +} + +struct block_info { uint8_t i; struct algo_ctx* app_ctx; uint64_t missing;}; + +void on_block (struct evt_core_ctx* ctx, void* raw) { + struct block_info* bi = raw; + struct thunder_ctx* thunderc = bi->app_ctx->misc; + + if (thunderc->received_pkts_on_link[bi->i] >= bi->missing) goto release; + if (thunderc->blacklisted[bi->i] >= bi->missing) goto release; + + //printf("[algo_thunder] Blacklisting link %d\n", bi->i); + thunderc->blacklisted[bi->i] = bi->missing; + +release: + free(bi); +} + +int is_in_order(struct thunder_ctx* thunderc, uint8_t link_id) { + uint64_t ref = thunderc->received_pkts_on_link[link_id]; + for (int i = 0; i < thunderc->total_links; i++) { + uint64_t expected = link_id >= i ? ref : ref - 1; + if (thunderc->received_pkts_on_link[i] > expected) { + //printf("link_id=%d, i=%d, pkt_i=%ld, pkt_i_expected=%ld, pkt_link_id=%ld\n", link_id, i, thunderc->received_pkts_on_link[i], expected, ref); + return 0; + } + } + + return 1; +} + +void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + + union abstract_packet* ap = buffer_first_ap (bp); + while (ap != NULL && ap->fmt.headers.cmd != CMD_LINK_MONITORING_THUNDER) ap = ap_next(ap); + if (ap == NULL) { + fprintf(stderr, "Unable to find our packet\n"); + exit(EXIT_FAILURE); + } + + /* + if (ap->fmt.headers.flags & FLAG_RESET) { + for (int i = 0; i < MAX_LINKS; i++) thunderc->received_pkts_on_link[i] = 1; + } + */ + + // 1. Update link info + int link_id = url_get_port_int(fdinfo->url) - 7500; + thunderc->received_pkts_on_link[link_id]++; + //printf("Received %ld packets on link %d\n", thunderc->received_pkts_on_link[link_id], link_id); + struct link_info *li = &ap->fmt.content.link_monitoring_thunder.links_status; + + uint64_t mili_sec = compute_delta (&thunderc->prev_rcv_link_time, UINT16_MAX); + for (int i = 0; i < thunderc->total_links; i++) { + thunderc->rcv_delta_t_per_link[i] += mili_sec; + } + thunderc->rcv_delta_t_per_link[link_id] = 0; + + // 2. Disable links that have received packets too late + if (is_in_order (thunderc, link_id)) { + /*printf("Local: "); + for (int i = 0; i < thunderc->total_links; i++) { + printf("%ld ", thunderc->rcv_delta_t_per_link[i]); + } + printf("\n"); + printf("Packet: "); + for (int i = 0; i < thunderc->total_links; i++) { + printf("%d ", li[i].delta_t); + } + printf("\n");*/ + + for (int i = 0; i < thunderc->total_links; i++) { + if (ALLOWED_JITTER_MS >= li[i].delta_t) continue; + if (li[i].delta_t - ALLOWED_JITTER_MS <= thunderc->rcv_delta_t_per_link[i]) continue; + + struct block_info *bi = malloc(sizeof(struct block_info)); + bi->i = i; bi->app_ctx = app_ctx; bi->missing = thunderc->received_pkts_on_link[i]+1; + + //printf(" Packet Too Late - Blocked link %d (expected: at least %dms ago, received: %ldms ago)\n", i, li[i].delta_t - ALLOWED_JITTER_MS, thunderc->rcv_delta_t_per_link[i]); + on_block(ctx, bi); + } + } + + // 3. Disable links that miss packets + for (uint8_t i = 0; i < thunderc->total_links; i++) { + uint64_t expected = i <= link_id ? thunderc->received_pkts_on_link[link_id] : thunderc->received_pkts_on_link[link_id] - 1; + if (thunderc->received_pkts_on_link[i] >= expected) continue; // Nothing to do, all packets have been received + + int64_t timeout = ALLOWED_JITTER_MS - li[i].delta_t; + + struct block_info *bi = malloc(sizeof(struct block_info)); + bi->i = i; bi->app_ctx = app_ctx; bi->missing = expected; + + if (timeout <= 0) { + on_block(ctx, bi); + //printf(" Missing Packet - Blocked link %d (expected: %ld, seen: %ld)\n", i, expected, thunderc->received_pkts_on_link[i]); + continue; + } + + set_timeout (ctx, timeout, bi, on_block); + //printf(" Missing Packet - Triggered timeout for link %d in %ldms (expected: %ld, seen: %ld)\n", i, timeout, expected, thunderc->received_pkts_on_link[i]); + if (ctx->verbose > 1) { + fprintf(stderr, " [algo_thunder] Set timeout on link %d of %ld ms (packets expected: %ld, seen: %ld)\n", + i, timeout, expected, thunderc->received_pkts_on_link[i]); + } + } + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Classify done\n"); + + printf("Blacklisted links: "); + for (int i = 0; i < thunderc->total_links; i++) { + if (is_blacklisted (thunderc, i)) printf("_"); + else printf("U"); + } + printf("\n"); +} + +struct unpad_info { + union abstract_packet *ap_arr_pl[MAX_LINKS], *ap_arr_meta[MAX_LINKS]; + uint8_t ap_arr_vals; +}; + +void unpad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct unpad_info *ui) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + + for (union abstract_packet* ap = buffer_first_ap (bp); ap != NULL; ap = ap_next(ap)) { + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) continue; + + union abstract_packet* ap_meta = ap_next(ap); + if (ap_meta == NULL || ap_meta->fmt.headers.cmd != CMD_UDP_METADATA_THUNDER) { + fprintf(stderr, "Unexpected packet, expecting udp metadata\n"); + } + + if (ap_meta->fmt.content.udp_metadata_thunder.id > thunderc->recv_id) { + ui->ap_arr_pl[ui->ap_arr_vals] = ap; + ui->ap_arr_meta[ui->ap_arr_vals] = ap_meta; + ui->ap_arr_vals++; + } + } + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Unpad done\n"); +} + +void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct unpad_info *ui) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + char url[256]; + struct evt_core_fdinfo *to_fdinfo = NULL; + uint64_t delivered = 0; + + for (int i = ui->ap_arr_vals-1; i >= 0; i--) { + //fprintf(stderr, "i=%d, ui->ap_arr_vals=%d\n", i, ui->ap_arr_vals); + if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue; + thunderc->recv_id = ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id; + + // Find destination + sprintf(url, "udp:write:127.0.0.1:%d", ui->ap_arr_pl[i]->fmt.content.udp_encapsulated.port); + to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) { + fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); + } + + struct buffer_packet *bp_dest = inject_buffer_tow (&app_ctx->br, to_fdinfo); + bp_dest->mode = BP_WRITING; + //dump_buffer_packet (bp_dest); + buffer_append_ap (bp_dest, ui->ap_arr_pl[i]); + main_on_udp_write(ctx, to_fdinfo); + delivered++; + } + + if (delivered != 1) { + //printf("[algo_thunder] Delivered %ld packets (now id=%d)\n", delivered, thunderc->recv_id); + } + + mv_buffer_rtof (&app_ctx->br, fdinfo); + if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Adapt done\n"); +} + +int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct unpad_info ui = {0}; + + classify(ctx, fdinfo, bp); + unpad(ctx, fdinfo, bp, &ui); + adapt(ctx, fdinfo, bp, &ui); + return 0; +} + +int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + prepare(ctx, fdinfo, bp); + pad(ctx, fdinfo, bp); + schedule(ctx, fdinfo, bp); + return 0; +} + +void algo_thunder_free(void* v) { + struct rr_ctx* rr = v; + free(rr); +} + +void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { + app_ctx->misc = malloc(sizeof(struct thunder_ctx)); + app_ctx->free_misc = algo_thunder_free; + if (app_ctx->misc == NULL) { + perror("malloc failed in algo thunder init"); + exit(EXIT_FAILURE); + } + memset(app_ctx->misc, 0, sizeof(struct thunder_ctx)); + struct thunder_ctx* thunderc = app_ctx->misc; + thunderc->recv_id = 1; + thunderc->emit_id = 1; + thunderc->total_links = app_ctx->ap.links; + thunderc->selected_link = thunderc->total_links - 1; + for (int i = 0; i < MAX_LINKS; i++) thunderc->received_pkts_on_link[i] = 1; + + union abstract_packet links = {}; + //fprintf(stderr, "Total links %d\n", thunderc->total_links); + thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1); + + init_timer(ctx); +} + +int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { + if (strstr(fdinfo->cat->name, "udp") != NULL) return 1; + return 0; +} diff --git a/src/algo_utils.c b/src/algo_utils.c index be059b0..68f862c 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -14,6 +14,11 @@ void naive_free_simple(void* v) { g_queue_free (g); } +void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) { + memset(bp, 0, sizeof(struct buffer_packet)); + g_queue_push_tail (app_ctx->free_buffer, bp); +} + void debug_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); int waiting_count = 0; @@ -34,8 +39,7 @@ void init_buffer_management(struct buffer_resources* ctx) { ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { - memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet)); - g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); + __push_to_free (ctx, &(ctx->bps[i])); } } @@ -73,11 +77,6 @@ struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct e return bp; } -void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) { - memset(bp, 0, sizeof(struct buffer_packet)); - g_queue_push_tail (app_ctx->free_buffer, bp); -} - guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { GQueue* q; @@ -104,8 +103,6 @@ struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct // 3. Update state g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); - bp->mode = BP_WRITING; - bp->awrite = 0; return bp; } @@ -210,7 +207,39 @@ void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) { __push_to_free (app_ctx, bp); } +struct buffer_packet* inject_buffer_tow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* to) { + GQueue* q; + + // 1. We get a free buffer + struct buffer_packet* bp_dest = g_queue_pop_head(app_ctx->free_buffer); + if (bp_dest == NULL) { + debug_buffer(app_ctx, to); + return NULL; + } + + // 2. We get the target writing queue + q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); + if (q == NULL) { + q = g_queue_new (); + g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); + } + + // 3. We push the content to the appropriate destination + g_queue_push_tail(q, bp_dest); + return bp_dest; +} + struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { + // 1. Inject a new buffer + struct buffer_packet* bp_dest = inject_buffer_tow (app_ctx, to); + + // 2. We duplicate the data + memcpy(bp_dest, bp, sizeof(struct buffer_packet)); + + return bp_dest; +} + +struct buffer_packet* dup_buffer_toa(struct buffer_resources *app_ctx, struct buffer_packet* bp, void* to) { GQueue* q; // 1. We get a free buffer @@ -223,15 +252,12 @@ struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct bu // 2. We duplicate the data memcpy(bp_dest, bp, sizeof(struct buffer_packet)); - // 3. We get the target writing queue - q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); - if (q == NULL) { - q = g_queue_new (); - g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); + // 3. We put the data + if (g_hash_table_contains(app_ctx->application_waiting, to)) { + fprintf(stderr, "Data already exists for this entry\n"); + exit(EXIT_FAILURE); } - - // 4. We push the content to the appropriate destination - g_queue_push_tail(q, bp_dest); + g_hash_table_insert(app_ctx->application_waiting, to, bp_dest); return bp_dest; } @@ -256,12 +282,3 @@ void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx) { } } } - -int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { - char* target = &(dest->raw); - while (pos-- > 0) { - target += ((union abstract_packet*) target)->fmt.headers.size; - } - memcpy(target, src, src->fmt.headers.size); - return 0; -} diff --git a/src/algo_utils.h b/src/algo_utils.h index f2845d1..1a78950 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -23,10 +23,11 @@ void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* fr void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct buffer_resources* app_ctx, void* from); -struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); -guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); -int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); +struct buffer_packet* inject_buffer_tow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* to); +struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +struct buffer_packet* dup_buffer_toa(struct buffer_resources* app_ctx, struct buffer_packet* bp, void* to); +guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/capdiff.c b/src/capdiff.c index 96f73d8..143fc90 100644 --- a/src/capdiff.c +++ b/src/capdiff.c @@ -9,11 +9,12 @@ #define MAX_PKTS_TO_CHECK_FOR_DROP 10 uint8_t are_packets_equal(struct buffer_packet bpread[]) { - size_t s1 = bpread[0].ip.ap.fmt.headers.size, s2 = bpread[1].ip.ap.fmt.headers.size; + union abstract_packet *ap1 = (union abstract_packet*)&bpread[0].ip, *ap2 = (union abstract_packet*) bpread[1].ip; + size_t s1 = ap1->fmt.headers.size, s2 = ap2->fmt.headers.size; if (s1 != s2) return 0; - for (size_t idx = sizeof(bpread[0].ip.ap.fmt.headers) + sizeof(bpread[0].ip.ap.fmt.content.clear) - sizeof(char); idx < s1; idx++) { - char e1 = (&bpread[0].ip.ap.raw)[idx], e2 = (&bpread[1].ip.ap.raw)[idx]; + for (size_t idx = &ap1->fmt.content.udp_encapsulated.payload - (char*)ap1; idx < s1; idx++) { + char e1 = (&ap1->raw)[idx], e2 = (&ap2->raw)[idx]; if (e1 != e2) return 0; } @@ -41,7 +42,10 @@ void destroy_pkt_stats(gpointer data) { } void update_stats(struct buffer_packet *bp, GHashTable* stat_elem) { - gint port = bp->ip.ap.fmt.content.clear.port; + union abstract_packet *ap = (union abstract_packet*)&bp->ip; + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) return; + + gint port = ap->fmt.content.udp_encapsulated.port; struct pkt_stats *ps = g_hash_table_lookup(stat_elem, &port); if (ps == NULL) { ps = malloc(sizeof(struct pkt_stats)); @@ -57,7 +61,7 @@ void update_stats(struct buffer_packet *bp, GHashTable* stat_elem) { } ps->last = bp->seen; ps->count++; - ps->cumulated_size += bp->ip.ap.fmt.headers.size; + ps->cumulated_size += ap->fmt.headers.size; } void unroll_packets(struct cap_file cf[], struct buffer_packet bpread[], GHashTable* stats[], struct pkt_reconstruct *pr, int m, int i) { diff --git a/src/capreplay.c b/src/capreplay.c index d84873f..dbf5537 100644 --- a/src/capreplay.c +++ b/src/capreplay.c @@ -11,7 +11,9 @@ void get_ports(struct cap_file *cf) { size_t entry_count = cap_count_bp (cf); for (int c = 0; c < entry_count; c++) { cap_next_bp (cf, &bp); - int a = bp.ip.ap.fmt.content.clear.port; + union abstract_packet* ap = (union abstract_packet*) &bp.ip; + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) continue; + int a = ap->fmt.content.udp_encapsulated.port; } cap_begin(cf); } diff --git a/src/donar_init.c b/src/donar_init.c index 081a8fa..0975f47 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -28,7 +28,7 @@ int on_signal(struct evt_core_ctx* evts, struct evt_core_fdinfo* fdinfo) { } void signal_init(struct evt_core_ctx* evts) { - sigset_t mask; + sigset_t mask = {0}; struct evt_core_cat signal_read = { .name = "signal-read", diff --git a/src/packet.c b/src/packet.c index 8af43f7..c447f3d 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,48 +1,117 @@ #include "packet.h" -size_t get_full_size(struct buffer_packet* bp) { - union abstract_packet* ap = &bp->ip.ap; - for (int i = 0; i < bp->ap_count; i++) { - ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); - } - return &ap->raw - &bp->ip.ap.raw; +int ap_exists(union abstract_packet* ap) { + return ap->fmt.headers.cmd != 0; } -enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { - ssize_t nread; - size_t pkt_size_size = sizeof(bp->ip.ap.fmt.headers.size); +int buffer_has_ap(struct buffer_packet* bp) { + return ap_exists(buffer_first_ap (bp)); +} + +union abstract_packet* ap_next(union abstract_packet* ap) { + if (ap_exists (ap) && ap->fmt.headers.flags & FLAG_READ_NEXT) + return (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); + + return NULL; +} + +union abstract_packet* buffer_first_ap(struct buffer_packet* bp) { + return (union abstract_packet*) &bp->ip; +} + +union abstract_packet* buffer_last_ap(struct buffer_packet* bp) { + union abstract_packet* ap = buffer_first_ap (bp), *apn = NULL; + while ((apn = ap_next(ap)) != NULL) ap = apn; + + return ap; +} + +union abstract_packet* buffer_free_ap(struct buffer_packet* bp) { + union abstract_packet* ap = buffer_last_ap (bp); + ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); + + return ap; +} + +size_t buffer_count_ap(struct buffer_packet* bp) { + size_t s = 1; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + while ((ap = ap_next(ap)) != NULL) s++; + return s; +} + +size_t buffer_full_size(struct buffer_packet* bp) { + return &(buffer_free_ap (bp))->raw - &bp->ip[0]; +} + +union abstract_packet* buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap) { + if (buffer_has_ap (bp)) + buffer_last_ap(bp)->fmt.headers.flags |= FLAG_READ_NEXT; + + union abstract_packet *new_ap = buffer_last_ap(bp); + memcpy(new_ap, ap, ap->fmt.headers.size); + bp->ap_count++; + new_ap->fmt.headers.flags &= ~FLAG_READ_NEXT; + return new_ap; +} + +enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + ssize_t nread = 0, ap_aread = 0, cur_ap_aread = 0; + union abstract_packet* ap = buffer_first_ap (bp); + size_t pkt_size_size = sizeof(ap->fmt.headers.size); if (bp->mode != BP_READING) return FDS_ERR; - while (bp->aread < pkt_size_size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, pkt_size_size - bp->aread); - if (nread == 0) return FDS_AGAIN; - if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; - if (nread == -1) return FDS_ERR; - bp->aread += nread; - } + //fprintf(stderr, "Entering read_packet_from_tcp\n"); + do { - while (bp->aread < bp->ip.ap.fmt.headers.size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.fmt.headers.size - bp->aread); - if (nread == 0) return FDS_AGAIN; - if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; - if (nread == -1) return FDS_ERR; - bp->aread += nread; - } + //fprintf(stderr, "bp->ap_count=%d\n", bp->ap_count); + ap = buffer_first_ap (bp); + ap_aread = 0; + for (int i = 0; i < bp->ap_count; i++) { + ap_aread += ap->fmt.headers.size; + ap = ap_next (ap); + } + cur_ap_aread = bp->aread - ap_aread; + + //fprintf(stderr, "[size] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread); + while (cur_ap_aread < pkt_size_size) { + nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, pkt_size_size - cur_ap_aread); + if (nread == 0) return FDS_AGAIN; + if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; + if (nread == -1) return FDS_ERR; + bp->aread += nread; + cur_ap_aread += nread; + } + + //fprintf(stderr, "[content] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread); + while (cur_ap_aread < ap->fmt.headers.size) { + nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, ap->fmt.headers.size - cur_ap_aread); + if (nread == 0) return FDS_AGAIN; + if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; + if (nread == -1) return FDS_ERR; + bp->aread += nread; + cur_ap_aread += nread; + } + + bp->ap_count++; + //fprintf(stderr, "bp->ap_count=%d, buffer_count_ap(bp)=%ld\n", bp->ap_count, buffer_count_ap (bp)); + //dump_buffer_packet (bp); + } while (bp->ap_count != buffer_count_ap (bp)); bp->mode = BP_WRITING; bp->awrite = 0; - bp->ap_count = 1; return FDS_READY; } -enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { +enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { ssize_t nwrite; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; //dump_buffer_packet (bp); if (bp->mode != BP_WRITING) return FDS_ERR; - while (bp->awrite < get_full_size(bp)) { - nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); + while (bp->awrite < buffer_full_size(bp)) { + nwrite = send(fdinfo->fd, &(ap->raw) + bp->awrite, buffer_full_size(bp) - bp->awrite, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; @@ -54,30 +123,36 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { return FDS_READY; } -enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_target* udp_t) { +enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nwrite; - size_t bytes_to_send; - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); - struct sockaddr* addr = NULL; - socklen_t addrlen = 0; - if (udp_t->set) { - addr = (struct sockaddr*) &udp_t->addr; - addrlen = sizeof(struct sockaddr_in); - } - + union abstract_packet* ap = (union abstract_packet*) (&bp->ip + bp->awrite); if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = bp->ip.ap.fmt.headers.size - pkt_header_size; - nwrite = sendto(fd, - &(bp->ip.ap.fmt.content.clear.payload), + do { + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) continue; + + size_t bytes_to_send; + size_t pkt_header_size = sizeof(ap->fmt.headers) + sizeof(ap->fmt.content.udp_encapsulated) - sizeof(ap->fmt.content.udp_encapsulated.payload); + struct sockaddr* addr = NULL; + socklen_t addrlen = 0; + if (udp_t->set) { + addr = (struct sockaddr*) &udp_t->addr; + addrlen = sizeof(struct sockaddr_in); + } + + bytes_to_send = ap->fmt.headers.size - pkt_header_size; + nwrite = sendto(fdinfo->fd, + &(ap->fmt.content.udp_encapsulated.payload), bytes_to_send, 0, addr, addrlen); - if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; - if (nwrite != bytes_to_send) return FDS_ERR; + if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; + if (nwrite != bytes_to_send) return FDS_ERR; + bp->awrite += nwrite; + + } while((ap = ap_next(ap)) != NULL); bp->mode = BP_READING; bp->aread = 0; @@ -86,19 +161,21 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t return FDS_READY; } -enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp_target* udp_t) { +enum FD_STATE read_packet_from_udp (struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nread; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + if (bp->mode != BP_READING) { fprintf(stderr, "Buffer packet is not in reading mode (mode: %d)\n", bp->mode); return FDS_ERR; } - size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); // We remove the payload - size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; + size_t pkt_header_size = sizeof(ap->fmt.headers) + sizeof(ap->fmt.content.udp_encapsulated) - sizeof(ap->fmt.content.udp_encapsulated.payload); + size_t udp_packet_size = sizeof(bp->ip) - pkt_header_size; socklen_t addrlen = sizeof(struct sockaddr_in); - nread = recvfrom(fd, - &(bp->ip.ap.fmt.content.clear.payload), + nread = recvfrom(fdinfo->fd, + &(ap->fmt.content.udp_encapsulated.payload), udp_packet_size, MSG_TRUNC, (struct sockaddr*)&udp_t->addr, @@ -117,8 +194,9 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp udp_t->set = 1; udp_t->addrlen = addrlen; - bp->ip.ap.fmt.headers.size = nread + pkt_header_size; - bp->ip.ap.fmt.headers.cmd = CMD_CLEAR; + ap->fmt.headers.size = nread + pkt_header_size; + ap->fmt.headers.cmd = CMD_UDP_ENCAPSULATED; + ap->fmt.content.udp_encapsulated.port = url_get_port_int (fdinfo->url); bp->mode = BP_WRITING; bp->awrite = 0; @@ -129,11 +207,9 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp void dump_buffer_packet(struct buffer_packet* bp) { printf("\n"); - printf(" mode=%d, aread=%d, awrite=%d, ap_count=%d, usage=%ld/%ld\n", bp->mode, bp->aread, bp->awrite, bp->ap_count, get_full_size (bp), sizeof(struct internet_packet)); - union abstract_packet* ap = &bp->ip.ap; - for (int i = 0; i < bp->ap_count; i++) { + printf(" mode=%d, aread=%d, awrite=%d, ap_count=%d, usage=%ld/%ld\n", bp->mode, bp->aread, bp->awrite, bp->ap_count, buffer_full_size (bp), sizeof(bp->ip)); + for (union abstract_packet* ap = buffer_first_ap (bp); ap != NULL; ap = ap_next (ap)) { dump_abstract_packet(ap); - ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); } printf("\n"); } @@ -142,21 +218,15 @@ void dump_abstract_packet(union abstract_packet* ap) { printf(" \n"); printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd); switch (ap->fmt.headers.cmd) { - case CMD_HEALTH: - printf(" id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x\n", - ap->fmt.content.health.id, - ap->fmt.content.health.deltat, - ap->fmt.content.health.prevlink, - ap->fmt.content.health.min_blocked_pkt, - ap->fmt.content.health.bitfield); + case CMD_LINK_MONITORING_THUNDER: + printf(" \n"); break; - case CMD_CLEAR: - printf(" id=%d, port=%d\n", - ap->fmt.content.clear.id, - ap->fmt.content.clear.port); + case CMD_UDP_METADATA_THUNDER: + printf(" id=%d\n", + ap->fmt.content.udp_metadata_thunder.id); break; - case CMD_XOR: - printf(" Unimplemented\n"); + case CMD_UDP_ENCAPSULATED: + printf(" port=%d\n", ap->fmt.content.udp_encapsulated.port); break; default: printf(" \n"); diff --git a/src/packet.h b/src/packet.h index a8e78a8..544e9cc 100644 --- a/src/packet.h +++ b/src/packet.h @@ -9,6 +9,8 @@ #include #include #include +#include "evt_core.h" +#include "url.h" /* * man 7 udp about receive operation on UDP sockets: @@ -30,9 +32,18 @@ enum BP_MODE { }; enum PKT_CMD { - CMD_HEALTH, - CMD_CLEAR, - CMD_XOR + CMD_UDP_ENCAPSULATED = 1, + CMD_LINK_MONITORING_THUNDER = 2, + CMD_UDP_METADATA_THUNDER = 3, +}; + +enum PKT_FLAGS { + FLAG_READ_NEXT = 1 << 0, + FLAG_RESET = 1 << 1, +}; + +struct link_info { + uint16_t delta_t; }; union abstract_packet { @@ -40,38 +51,32 @@ union abstract_packet { struct { struct { uint16_t size; - enum PKT_CMD cmd; + uint8_t cmd; + uint8_t flags; } headers; union { struct { - uint16_t id; - uint8_t bitfield; - uint8_t prevlink; - uint16_t deltat; - uint16_t min_blocked_pkt; - } health; + struct link_info links_status; + } link_monitoring_thunder; struct { uint16_t id; + } udp_metadata_thunder; + struct { uint16_t port; char payload; - } clear; + } udp_encapsulated; } content; } fmt; }; -struct internet_packet { - union abstract_packet ap; - char rest[1499]; // MTU = 1500, 1 byte in the union as payload -}; - struct buffer_packet { enum BP_MODE mode; uint8_t ap_count; uint16_t aread; uint16_t awrite; struct timespec seen; - struct internet_packet ip; + char ip[1500]; }; struct udp_target { @@ -83,10 +88,17 @@ struct udp_target { size_t get_full_size(struct buffer_packet* bp); -enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp); -enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp); -enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_target* udp_t); -enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp_target* udp_t); +union abstract_packet* buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap); +union abstract_packet* buffer_free_ap(struct buffer_packet* bp); +union abstract_packet* buffer_first_ap(struct buffer_packet* bp); +union abstract_packet* buffer_last_ap(struct buffer_packet* bp); +size_t buffer_full_size(struct buffer_packet* bp); +union abstract_packet* ap_next(union abstract_packet* ap); + +enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fd, struct buffer_packet* bp); +enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fd, struct buffer_packet* bp); +enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fd, struct buffer_packet* bp, struct udp_target* udp_t); +enum FD_STATE read_packet_from_udp (struct evt_core_fdinfo* fd, struct buffer_packet* bp, struct udp_target* udp_t); void dump_buffer_packet(struct buffer_packet* bp); void dump_abstract_packet(union abstract_packet* ap); diff --git a/src/proxy.c b/src/proxy.c index afe9772..1d0e169 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -48,7 +48,7 @@ int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n"); while (bp->mode == BP_READING) { - read_res = read_packet_from_tcp (fdinfo->fd, bp); + read_res = read_packet_from_tcp (fdinfo, bp); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; } @@ -72,8 +72,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Read packet from socket - bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); - read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); + read_res = read_packet_from_udp (fdinfo, bp, fdinfo->other); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; @@ -105,7 +104,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) // 2. Write data from the buffer to the socket while (bp->mode == BP_WRITING) { - write_res = write_packet_to_tcp(fdinfo->fd, bp); + write_res = write_packet_to_tcp(fdinfo, bp); if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_AGAIN) return 1; } @@ -128,18 +127,22 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) int write_res = FDS_READY; // 1. Get current write buffer OR a buffer from the waiting queue OR leave + if (ctx->verbose > 1) fprintf(stderr, " [proxy] Find write buffer\n"); if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Write buffer - write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); + if (ctx->verbose > 1) fprintf(stderr, " [proxy] Write UDP packet\n"); + write_res = write_packet_to_udp(fdinfo, bp, fdinfo->other); if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_AGAIN) return 1; // 3. Notify helpers + if (ctx->verbose > 1) fprintf(stderr, " [proxy] Notify traffic capture\n"); traffic_capture_notify (&app_ctx->cap, bp, "out"); // 4. A whole packet has been written // Release the buffer and notify + if (ctx->verbose > 1) fprintf(stderr, " [proxy] Release buffer and notify\n"); mv_buffer_wtof(&app_ctx->br, fdinfo); notify_read(ctx, &app_ctx->br); diff --git a/src/proxy.h b/src/proxy.h index fc9ab61..aee276e 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -50,16 +50,16 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); -void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); -int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); - void algo_dup2_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_dup2_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); +void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); + static struct algo_desc available_algo[] = { { .name = "naive", @@ -68,19 +68,19 @@ static struct algo_desc available_algo[] = { .on_datagram = algo_naive_on_datagram, .on_err = algo_naive_on_err }, - { - .name = "rr", - .init = algo_rr_init, - .on_stream = algo_rr_on_stream, - .on_datagram = algo_rr_on_datagram, - .on_err = algo_rr_on_err - }, { .name = "dup2", .init = algo_dup2_init, .on_stream = algo_dup2_on_stream, .on_datagram = algo_dup2_on_datagram, .on_err = algo_dup2_on_err + }, + { + .name = "thunder", + .init = algo_thunder_init, + .on_stream = algo_thunder_on_stream, + .on_datagram = algo_thunder_on_datagram, + .on_err = algo_thunder_on_err } };