diff --git a/src/algo_thunder.c b/src/algo_thunder.c index 0573e9f..cce83e7 100644 --- a/src/algo_thunder.c +++ b/src/algo_thunder.c @@ -14,6 +14,7 @@ struct thunder_ctx { uint8_t selected_link; uint8_t total_links; uint8_t delta_t_per_link[64]; + uint64_t received_pkts_on_link[64]; uint8_t blacklisted[64]; size_t monit_pkt_size; struct timespec prev_link_time, prev_packet_time; @@ -42,19 +43,12 @@ void prepare(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct thunder_ctx* thunderc = app_ctx->misc; - // 1. Put the raw buffer in cache thunderc->emit_id++; - uint64_t ref = 0l + thunderc->emit_id; - dup_buffer_toa (&app_ctx->br, bp, (void *)ref); - - uint64_t delta_pkt = compute_delta(&thunderc->prev_packet_time, 200); - union abstract_packet metadata = { .fmt.headers.cmd = CMD_UDP_METADATA_THUNDER, .fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder), .fmt.headers.flags = 0, .fmt.content.udp_metadata_thunder.id = thunderc->emit_id, - .fmt.content.udp_metadata_thunder.deltat = delta_pkt }; buffer_append_ap (bp, &metadata); } @@ -64,9 +58,11 @@ void pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer struct thunder_ctx* thunderc = app_ctx->misc; uint64_t ref = 0l + thunderc->emit_id; - // 1. Clean old buffers - if (ref > 10 && get_app_buffer (&app_ctx->br, (void *)ref - 10l)) { - mv_buffer_atof (&app_ctx->br, (void *)ref - 10l); + dup_buffer_toa (&app_ctx->br, bp, (void *)ref); + + // 1. Clean old buffers (we keep only thunderc->total_links buffer, keeping more would be useless) + if (ref > thunderc->total_links && get_app_buffer (&app_ctx->br, (void *)(ref - thunderc->total_links))) { + mv_buffer_atof (&app_ctx->br, (void *)(ref - thunderc->total_links)); } // 2. Append abstract packets stored in our buffers @@ -148,8 +144,26 @@ void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struc thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1); } -void classify() { +void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct thunder_ctx* thunderc = app_ctx->misc; + union abstract_packet* ap = buffer_first_ap (bp); + while (ap != NULL && ap->fmt.headers.cmd != CMD_LINK_MONITORING_THUNDER) ap = ap_next(ap); + if (ap == NULL) { + fprintf(stderr, "Unable to find our packet\n"); + exit(EXIT_FAILURE); + } + + int link_id = url_get_port_int(fdinfo->url) - 7500; + thunderc->received_pkts_on_link[link_id]++; + + struct link_info *li = &ap->fmt.content.link_monitoring_thunder.links_status; + for (int i = 0; i < thunderc->total_links; i++) { + uint64_t expected = i <= link_id ? thunderc->received_pkts_on_link[link_id] : thunderc->received_pkts_on_link[link_id] - 1; + if (thunderc->received_pkts_on_link[i] >= expected) continue; // Nothing to do, all packets have been received + set_timeout () + } } void unpad() { @@ -161,7 +175,7 @@ void adapt() { } int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - classify(); + classify(ctx, fdinfo, bp); unpad(); adapt(); return 0; diff --git a/src/packet.h b/src/packet.h index 94216d3..81423df 100644 --- a/src/packet.h +++ b/src/packet.h @@ -60,7 +60,6 @@ union abstract_packet { } link_monitoring_thunder; struct { uint16_t id; - uint16_t deltat; } udp_metadata_thunder; struct { uint16_t port;