diff --git a/src/algo_thunder.c b/src/algo_thunder.c index 20e854a..3e66176 100644 --- a/src/algo_thunder.c +++ b/src/algo_thunder.c @@ -17,7 +17,7 @@ struct thunder_ctx { uint8_t total_links; uint8_t delta_t_per_link[MAX_LINKS]; uint64_t received_pkts_on_link[MAX_LINKS]; - uint8_t blacklisted[MAX_LINKS]; + uint64_t blacklisted[MAX_LINKS]; size_t monit_pkt_size; struct timespec prev_link_time, prev_packet_time; }; @@ -125,6 +125,10 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu } li[thunderc->selected_link].delta_t = 0; + if (ctx->verbose > 1) { + dump_buffer_packet(bp_dup); + fprintf(stderr, " [algo_thunder] Will send this info\n"); + } main_on_tcp_write(ctx, to_fdinfo); } while (is_blacklisted (thunderc, thunderc->selected_link)); @@ -136,20 +140,6 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu return 0; } -void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { - app_ctx->misc = malloc(sizeof(struct thunder_ctx)); - if (app_ctx->misc == NULL) { - perror("malloc failed in algo thunder init"); - exit(EXIT_FAILURE); - } - memset(app_ctx->misc, 0, sizeof(struct thunder_ctx)); - struct thunder_ctx* thunderc = app_ctx->misc; - thunderc->selected_link = UINT8_MAX - 1; - - union abstract_packet links = {}; - thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1); -} - struct block_info { uint8_t i; struct algo_ctx* app_ctx; uint64_t missing; }; void on_block (struct evt_core_ctx* ctx, void* raw) { @@ -219,8 +209,29 @@ void unpad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff } } -void adapt() { +void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct unpad_info *ui) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + char url[256]; + struct evt_core_fdinfo *to_fdinfo = NULL; + for (uint8_t i = ui->ap_arr_vals-1; i >= 0; i--) { + if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue; + thunderc->recv_id = ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id; + + // Find destination + sprintf(url, "udp:write:127.0.0.1:%d", ui->ap_arr_pl[i]->fmt.content.udp_encapsulated.port); + to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) { + fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); + } + + struct buffer_packet *bp_dest = inject_buffer_tow (&app_ctx->br, to_fdinfo); + buffer_append_ap (bp_dest, ui->ap_arr_pl[i]); + main_on_udp_write(ctx, to_fdinfo); + } + + mv_buffer_rtof (&app_ctx->br, fdinfo); } int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { @@ -228,14 +239,34 @@ int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi classify(ctx, fdinfo, bp); unpad(ctx, fdinfo, bp, &ui); - adapt(); + adapt(ctx, fdinfo, bp, &ui); return 0; } int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { prepare(ctx, fdinfo, bp); pad(ctx, fdinfo, bp); - return schedule(ctx, fdinfo, bp); + schedule(ctx, fdinfo, bp); + return 0; +} + +void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { + app_ctx->misc = malloc(sizeof(struct thunder_ctx)); + if (app_ctx->misc == NULL) { + perror("malloc failed in algo thunder init"); + exit(EXIT_FAILURE); + } + memset(app_ctx->misc, 0, sizeof(struct thunder_ctx)); + struct thunder_ctx* thunderc = app_ctx->misc; + thunderc->recv_id = 1; + thunderc->emit_id = 1; + thunderc->total_links = app_ctx->ap.links; + thunderc->selected_link = thunderc->total_links; + for (int i = 0; i < MAX_LINKS; i++) thunderc->received_pkts_on_link[i] = 1; + + union abstract_packet links = {}; + //fprintf(stderr, "Total links %d\n", thunderc->total_links); + thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1); } int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { diff --git a/src/algo_utils.c b/src/algo_utils.c index 6b5c301..68f862c 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -207,7 +207,7 @@ void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) { __push_to_free (app_ctx, bp); } -struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { +struct buffer_packet* inject_buffer_tow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* to) { GQueue* q; // 1. We get a free buffer @@ -217,21 +217,28 @@ struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct bu return NULL; } - // 2. We duplicate the data - memcpy(bp_dest, bp, sizeof(struct buffer_packet)); - - // 3. We get the target writing queue + // 2. We get the target writing queue q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); if (q == NULL) { q = g_queue_new (); g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); } - // 4. We push the content to the appropriate destination + // 3. We push the content to the appropriate destination g_queue_push_tail(q, bp_dest); return bp_dest; } +struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { + // 1. Inject a new buffer + struct buffer_packet* bp_dest = inject_buffer_tow (app_ctx, to); + + // 2. We duplicate the data + memcpy(bp_dest, bp, sizeof(struct buffer_packet)); + + return bp_dest; +} + struct buffer_packet* dup_buffer_toa(struct buffer_resources *app_ctx, struct buffer_packet* bp, void* to) { GQueue* q; diff --git a/src/algo_utils.h b/src/algo_utils.h index ab53795..1a78950 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -23,6 +23,8 @@ void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* fr void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct buffer_resources* app_ctx, void* from); + +struct buffer_packet* inject_buffer_tow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* to); struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); struct buffer_packet* dup_buffer_toa(struct buffer_resources* app_ctx, struct buffer_packet* bp, void* to); guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/donar_init.c b/src/donar_init.c index 081a8fa..0975f47 100644 --- a/src/donar_init.c +++ b/src/donar_init.c @@ -28,7 +28,7 @@ int on_signal(struct evt_core_ctx* evts, struct evt_core_fdinfo* fdinfo) { } void signal_init(struct evt_core_ctx* evts) { - sigset_t mask; + sigset_t mask = {0}; struct evt_core_cat signal_read = { .name = "signal-read", diff --git a/src/packet.c b/src/packet.c index 1727fc8..3422a5f 100644 --- a/src/packet.c +++ b/src/packet.c @@ -208,7 +208,7 @@ void dump_abstract_packet(union abstract_packet* ap) { printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd); switch (ap->fmt.headers.cmd) { case CMD_LINK_MONITORING_THUNDER: - printf(" \n"); + printf(" \n"); break; case CMD_UDP_METADATA_THUNDER: printf(" id=%d\n",