diff --git a/src/algo_rr.c b/src/algo_rr.c index 0775b62..d80071a 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -63,9 +63,9 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.fmt.content.clear.id) { fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.fmt.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); + fprintf(stderr, "\t%d => %d\n", rr->real[i].id, rr->real[i].on); } - printf("] - could be replaced by drop\n"); + fprintf(stderr, "] - could be replaced by drop\n"); exit(EXIT_FAILURE); } else if (!rr->real[real_idx].on && ring_gt(bp->ip.ap.fmt.content.clear.id, rr->content_id)) { rr->real[real_idx].on = 1; @@ -158,18 +158,21 @@ release: mv_buffer_rtof(app_ctx, fdinfo); } -void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { +uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct rr_ctx* rr = app_ctx->misc; struct evt_core_fdinfo* fdinfo = NULL; struct buffer_packet* bp = NULL; + uint64_t delivered = 0; while(1) { //printf("Trying to deliver %d\n", rr->recv_id+1); struct queued_pkt* def = &rr->real[(rr->content_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; rr_deliver(ctx, app_ctx, def); + delivered++; //printf("Delivered %d\n", rr->recv_id); } + return delivered; } //------ @@ -266,6 +269,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo return 0; } else { struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); + dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor... main_on_tcp_write(ctx, to_fdinfo); } @@ -301,11 +305,17 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { } // 3. Deliver blocked packets + // @FIXME CRAPPY CODE / CRAPPY LOGIC //printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); - while (ring_gt(t->min_blocked_pkt, rr->content_id)) { + /*if (ring_gt(t->min_blocked_pkt, rr->content_id) && !rr->real[t->min_blocked_pkt % PACKET_BUFFER_SIZE].on) { + fprintf(stderr, "min_blocked_packet has not been received, t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); + exit(EXIT_FAILURE); + }*/ + while (ring_gt(t->min_blocked_pkt, rr->content_id - 1)) { rr->content_id++; rr_pkt_unroll (ctx, app_ctx); } + rr_pkt_unroll (ctx, app_ctx); } int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {