From 02a3507ef5f664e591f5a2adc6861a9c4eeba10c Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 18 Mar 2019 10:26:02 +0100 Subject: [PATCH] Global structures for RR algo --- src/algo_rr.c | 73 +++++++++++++++++++++++++++++++-------------------- 1 file changed, 44 insertions(+), 29 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 93b2ba6..72c500e 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -2,6 +2,24 @@ #include "algo_skel.h" #include "algo_utils.h" + +struct deferred_pkt { + int link_fd; + struct buffer_packet* bp; +}; + +struct rr_ctx { + uint8_t my_links; + uint16_t my_links_ver; + uint8_t remote_links; + uint16_t mjit; + uint16_t recv_id; + uint16_t sent_id; + struct deferred_pkt real[LINK_COUNT]; + struct deferred_pkt stub[LINK_COUNT]; +}; + + int rr_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int conn_sock1, conn_sock2; struct sockaddr_in addr; @@ -76,14 +94,15 @@ void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec) { void rr_handle_recv(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; - // 1. Update links I can used thanks to target feedback - if (bp->ip.ap.str.bitfield ^ app_ctx->my_bitfield) { - update_my_bitfield(bp, app_ctx); + // 1. Update links I can use thanks to target feedback + if (bp->ip.ap.str.bitfield ^ rr->my_links) { + update_my_bitfield(bp, rr); } // 2. If packet arrived too late - if (app_ctx->recv_id > bp->ip.ap.str.id - 1) { + if (rr->recv_id > bp->ip.ap.str.id - 1) { // Packet has already been delivered or dropped, we free the buffer g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd)); memset(bp, 0, sizeof(struct buffer_packet)); @@ -91,24 +110,26 @@ void rr_handle_recv(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct e } // 3. If packet arrived too early - else if (app_ctx->recv_id < bp->ip.ap.str.id - 1) { - int64_t timeout = app_ctx->mjit - (int64_t) bp->ip.ap.str.deltat; + else if (rr->recv_id < bp->ip.ap.str.id - 1) { + int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.str.deltat; if (timeout <= 0) timeout = 0; set_timeout(ctx, timeout); - app_ctx->waiting[bp->ip.ap.str.id % LINK_COUNT] = bp; // should store more than that + int idx = bp->ip.ap.str.id % LINK_COUNT; + rr->real[idx].bp = bp; + rr->real[idx].link_fd = fdinfo->fd; } // 4. If we were waiting this packet else { - if (bp->ip.ap.str.flags & PKT_TIMEOUT) broken_rlink(fdinfo); - else if (bp->ip.ap.str.flags & PKT_CONTROL) working_rlink(fdinfo); + if (bp->ip.ap.str.flags & PKT_TIMEOUT) broken_rlink(rr, fdinfo); + else if (bp->ip.ap.str.flags & PKT_CONTROL) working_rlink(rr, fdinfo); else deliver(bp); - app_ctx->recv_id = bp->ip.ap.str.id; - int next = app_ctx->recv_id+1; - if (app_ctx->waiting[next % LINK_COUNT] != NULL) { - bp = app_ctx->waiting[next % LINK_COUNT]; - app_ctx->waiting[next % LINK_COUNT] = NULL; - rr_handle_recv(ctx,bp,NULL); + rr->recv_id = bp->ip.ap.str.id; + struct deferred_pkt* def = &rr->real[rr->recv_id+1]; + if (def->bp != NULL) { + struct evt_core_fdinfo* next = evt_core_get_from_fd (ctx, def->link_fd); + if (next == NULL) return; + rr_handle_recv(ctx, def->bp, next); } } } @@ -131,7 +152,7 @@ int rr_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { } // 3. Logic on packet - rr_handle_recv(bp, app_ctx, fdinfo); + rr_handle_recv(ctx, bp, fdinfo); /* // 3. A whole packet has been read, we will find someone to write it @@ -154,18 +175,6 @@ co_error: exit(EXIT_FAILURE); } -struct deferred_pkt { - int link_fd; - struct buffer_packet* bp; -}; - -struct rr_ctx { - uint16_t mjit; - uint16_t recv_id; - uint16_t sent_id; - struct deferred_pkt real[LINK_COUNT]; - struct deferred_pkt stub[LINK_COUNT]; -}; void algo_rr(struct algo_skel* as) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); @@ -175,7 +184,13 @@ void algo_rr(struct algo_skel* as) { ctx->read_waiting = g_queue_new (); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); - ctx->misc = malloc(sizeof(struct rr_ctx)); + struct rr_ctx* rr = malloc(sizeof(struct rr_ctx)); + if (rr == NULL) goto init_err; + memset(rr, 0, sizeof(struct rr_ctx)); + rr->mjit = 200; + rr->my_links = 0xff; + rr->remote_links = 0xff; + ctx->misc = rr; for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); }