diff --git a/src/algo_rr.c b/src/algo_rr.c index d8c7603..3dacc92 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -13,23 +13,21 @@ struct timer_info { }; struct queued_pkt { + uint8_t on; int link_fd; int idx; uint16_t id; - uint8_t on; struct algo_ctx* algo; }; struct rr_ctx { uint8_t my_links; - uint16_t my_links_ver; uint8_t remote_links; int64_t mjit; uint16_t health_id; uint16_t health_id_late; uint16_t content_id; uint16_t sent_id; - uint8_t current_link; struct internet_packet prev_packet; struct timespec emit_time; struct queued_pkt real[PACKET_BUFFER_SIZE]; @@ -50,94 +48,50 @@ void show_link_availability(struct rr_ctx* rr) { printf("]\n"); } -void expired_wait (struct evt_core_ctx* ctx, void* user); -void expired_late(struct evt_core_ctx* ctx, void* user); void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - uint16_t real_idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; - uint16_t wait_idx = (bp->ip.ap.content.health.id - 1) % PACKET_BUFFER_SIZE; + uint16_t real_idx = bp->ip.ap.content.clear.id % PACKET_BUFFER_SIZE; - //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); - - // 2. If packet arrived too late or already queued, we discard it - if (ring_ge(rr->recv_id, bp->ip.ap.str.id) || rr->real[real_idx].id == bp->ip.ap.str.id) { - // Packet has already been delivered or dropped, we free the buffer - fprintf(stderr, "Packet %d arrived too late (current: %d) or already received\n", bp->ip.ap.str.id, rr->recv_id); - mv_buffer_wtof (app_ctx, fdinfo); - return; - } - - // 3. If packet arrived too early, we wait for its predecessors - //printf("%d < %d = %d\n", rr->recv_id, bp->ip.ap.str.id - 1, ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)); - if (ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)) { - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.str.deltat; - //printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout); - if (timeout <= 0) timeout = 0; - - if (rr->wait[wait_idx].on && rr->wait[wait_idx].id != bp->ip.ap.str.id - 1) { - fprintf(stderr, "Waiting array overlap, BUG: [\n"); - for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - printf("\t%d => %d\n", rr->wait[i].id, rr->wait[i].on); - } - printf("] - could be replaced by drop\n"); - exit(EXIT_FAILURE); - } else if (!rr->wait[wait_idx].on) { - rr->wait[wait_idx].on = 1; - rr->wait[wait_idx].id = bp->ip.ap.str.id - 1; - rr->wait[wait_idx].link_num = bp->ip.ap.str.prevlink; - rr->wait[wait_idx].algo = app_ctx; - set_timeout(ctx, timeout, &rr->wait[wait_idx], expired_wait); - } - } + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 4. We queue the packet to keep it - if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.str.id) { - fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, real_idx); + if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.content.clear.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); } printf("] - could be replaced by drop\n"); exit(EXIT_FAILURE); } else if (!rr->real[real_idx].on) { - rr->real[real_idx].on = 2; - rr->real[real_idx].id = bp->ip.ap.str.id; + rr->real[real_idx].on = 1; + rr->real[real_idx].id = bp->ip.ap.content.clear.id; rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); - - // 5. We register a timer for this packet to create a reactivation window for broken links - set_timeout(ctx, rr->mjit + 1, &rr->real[real_idx], expired_late); - - //printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real); } else { - fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); - mv_buffer_wtof (app_ctx, fdinfo); + fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.content.clear.id, rr->content_id); + mv_buffer_rtof (app_ctx, fdinfo); } - } -void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { +void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queued_pkt* dp) { struct evt_core_fdinfo *to_fdinfo = NULL; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; char url[255]; // 1. Marked the packet as handled - dp->on--; + dp->on = 0; // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 3. We update our cursor - rr->recv_id = bp->ip.ap.content.clear.id; + rr->content_id = bp->ip.ap.content.clear.id; // 4. Find its target sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); @@ -161,7 +115,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf assert(bp->ip.ap.headers.cmd == CMD_HEALTH); // 1. Health packet was received too late, dropping it - if (ring_le(bp->ip.ap.health.id, rr->health_id_late)) goto release; + if (ring_le(bp->ip.ap.content.health.id, rr->health_id_late)) goto release; // 2. Reactivate link if deactivated char buffer[16]; @@ -174,14 +128,13 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf } // 3. Update RR structure if its the greatest health_id we received - if (ring_gt(bp->ip.ap.health.id, rr->health_id)) { + if (ring_gt(bp->ip.ap.content.health.id, rr->health_id)) { // 3.1. Update current health id - rr->health_id = bp->ip.ap.health.id; + rr->health_id = bp->ip.ap.content.health.id; // 3.2. Update my links I can use thanks to target feedback - if (bp->ip.ap.str.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.str.bitfield; - rr->my_links_ver = bp->ip.ap.str.id; + if (bp->ip.ap.content.health.bitfield != rr->my_links) { + rr->my_links = bp->ip.ap.content.health.bitfield; printf("Update my links | "); show_link_availability (rr); } @@ -192,7 +145,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf if (timeout <= 0) timeout = 0; uint64_t idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; - rr->wait[idx].health_id = bp->ip.content.health.id; + rr->wait[idx].health_id = bp->ip.ap.content.health.id; rr->wait[idx].prevlink = bp->ip.ap.content.health.prevlink; rr->wait[idx].min_blocked_pkt = bp->ip.ap.content.health.min_blocked_pkt; rr->wait[idx].algo = app_ctx; @@ -210,16 +163,9 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { while(1) { //printf("Trying to deliver %d\n", rr->recv_id+1); - struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; + struct queued_pkt* def = &rr->real[(rr->content_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; - fdinfo = evt_core_get_from_fd (ctx, def->link_fd); - if (fdinfo == NULL) { - fprintf(stderr, "An error occured as the link seems to be closed for the requested fd\n"); - rr->recv_id++; - continue; - } - - rr_deliver(ctx, fdinfo, def); + rr_deliver(ctx, app_ctx, def); //printf("Delivered %d\n", rr->recv_id); } } @@ -316,7 +262,7 @@ co_error: void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { struct timer_info* t = raw; - struct algo_ctx* app_ctx = t->algo->cat->app_ctx; + struct algo_ctx* app_ctx = t->algo; struct rr_ctx* rr = app_ctx->misc; // 1. Update link recovery window if needed @@ -325,7 +271,7 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { // 2. Blacklist previous link if needed uint16_t prev_health_id = (t->health_id - 1); uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; - struct timer_info* t_old = rr->wait[prev_health_idx]; + struct timer_info* t_old = &rr->wait[prev_health_idx]; if (t_old->health_id != prev_health_id) { printf("Blacklist link=%d | ", t->prevlink); rr->remote_links &= 0xff ^ 1 << t->prevlink; @@ -339,43 +285,6 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { } } -void expired_wait(struct evt_core_ctx* ctx, void* user) { - struct waited_pkt* pkt = user; - struct rr_ctx* rr = pkt->algo->misc; - - // 1. Release lock - pkt->on = 0; - - // 2. We will not reactivate link for this packet - if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; - - /* - // 3. Stop if packet has been received and delivered - if (ring_le (pkt->id, rr->recv_id)) return; - - printf("Timer reached for packet %d\n", pkt->id); - */ - - // 4. BLACKLIST LINK - printf("Blacklist link=%d | ", pkt->link_num); - rr->remote_links &= 0xff ^ 1 << pkt->link_num; - show_link_availability (rr); - - // 5. Deliver following packets - while (ring_lt(rr->recv_id, pkt->id)) { - rr->recv_id++; - rr_pkt_unroll (ctx, pkt->algo); - } -} - -void expired_late(struct evt_core_ctx* ctx, void* user) { - struct deferred_pkt* pkt = user; - struct rr_ctx* rr = pkt->algo->misc; - - pkt->on--; - if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; -} - int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { // We do nothing return 0; @@ -397,8 +306,9 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->my_links = 0xff; rr->remote_links = 0xff; rr->sent_id = 1; - rr->recv_id = 0; - rr->recv_id_late = 0; + rr->health_id = 0; + rr->health_id_late = 0; + rr->content_id = 0; app_ctx->misc = rr; app_ctx->free_misc = algo_rr_free;