diff --git a/src/algo_lightning.c b/src/algo_lightning.c index 1592dfb..246a788 100644 --- a/src/algo_lightning.c +++ b/src/algo_lightning.c @@ -42,6 +42,7 @@ struct light_ctx { struct timespec last[MAX_LINKS]; uint64_t pkt_rcv_id; uint64_t pkt_sent_id; + uint64_t uniq_pkt_sent_id; uint8_t selected_link; uint8_t total_links; int fast_count; @@ -75,6 +76,8 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str lightc->sleep_duration = 500; lightc->csv = 0; lightc->explain = 0; + lightc->pkt_sent_id = 1; + lightc->uniq_pkt_sent_id = 1; uint64_t window = 2000; if (ap->algo_specific_params != NULL) { @@ -116,7 +119,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str void algo_lightning_pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct light_ctx* lightc = app_ctx->misc; - uint64_t ref = lightc->pkt_sent_id; + uint64_t ref = lightc->uniq_pkt_sent_id; // 0. Store current buffer to application fprintf(stderr, " [algo_lightning] Store buffer with pointer %p\n", (void*) ref); @@ -131,18 +134,24 @@ void algo_lightning_pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // 2. Append abstract packets stored in our buffers uint64_t add_ref = ref; while(1) { - fprintf(stderr, " [algo_lightning] Enter loop\n"); - if (add_ref < 1) break; + fprintf(stderr, " [algo_lightning] Enter loop with ref %ld\n", add_ref); + if (add_ref < 1) { + fprintf(stderr, " [algo_lightning] add_ref=%ld < 1\n", add_ref); + break; + } add_ref--; struct buffer_packet *bp_iter = get_app_buffer (&app_ctx->br, (void *)add_ref); - if (bp_iter == NULL) break; + if (bp_iter == NULL) { + fprintf(stderr, " [algo_lightning] bp_iter=%p == NULL\n", bp_iter); + break; + } union abstract_packet *ap = buffer_first_ap (bp_iter); if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) { fprintf(stderr, "Invalid buffer payload!\n"); exit(EXIT_FAILURE); } - fprintf(stderr, " [algo_lightning] Start loop, currently %ld bytes, would be %ld\n", buffer_full_size (bp), buffer_full_size (bp) + ap->fmt.headers.size); + fprintf(stderr, " [algo_lightning] Currently %ld bytes, would be %ld\n", buffer_full_size (bp), buffer_full_size (bp) + ap->fmt.headers.size); if (buffer_full_size (bp) + ap->fmt.headers.size > TOR_CELL_SIZE - lightc->monit_pkt_size) break; @@ -311,7 +320,6 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { struct buffer_packet* bp_dup = dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); - lightc->pkt_sent_id++; union abstract_packet monit = { .fmt.headers.cmd = CMD_LINK_MONITORING_LIGHTNING, .fmt.headers.size = lightc->monit_pkt_size, @@ -336,6 +344,7 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { } lightc->prev_links[lightc->pkt_sent_id % MAX_LINKS] = lightc->selected_link; + lightc->pkt_sent_id++; if (ctx->verbose > 1) { dump_buffer_packet(bp_dup); @@ -401,6 +410,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* cur = ap_next(cur); continue; } + printf("tagging packets delivered on slow link\n"); struct measure_packet *mp = (void*)&cur->fmt.content.udp_encapsulated.payload; mp->flag = 1; cur = ap_next(cur); @@ -411,6 +421,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* if (lightc->csv) printf("%ld,%d,slow\n", now_timestamp, lightc->selected_link); } + lightc->uniq_pkt_sent_id++; mv_buffer_rtof (&app_ctx->br, fdinfo); return 0; }