From d89b7151c92ea88f5bb8f0e7466cf75a7d7f5c17 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 19 Mar 2019 13:50:38 +0100 Subject: [PATCH] We do not use LINK_COUNT anymore --- src/algo_rr.c | 57 ++++++++++++++++++++++++++++++++++++++---------- src/algo_utils.h | 4 ++-- 2 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 990b30b..7e9ad51 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -3,6 +3,8 @@ #include "algo_utils.h" #include "utils.h" +#define WAITING 10 + struct deferred_pkt { int link_fd; struct buffer_packet* bp; @@ -16,9 +18,8 @@ struct rr_ctx { uint16_t recv_id; uint16_t sent_id; uint8_t current_link; - struct deferred_pkt real[LINK_COUNT]; - struct deferred_pkt stub[LINK_COUNT]; - struct buffer_packet stub_pool[LINK_COUNT]; + struct deferred_pkt real[WAITING]; + struct buffer_packet stub_bp; }; int rr_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); @@ -63,7 +64,7 @@ co_error: exit(EXIT_FAILURE); } -void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec) { +void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec, int64_t link) { struct timespec now; struct itimerspec timer_config; char url[1024]; @@ -92,7 +93,7 @@ void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec) { exit(EXIT_FAILURE); } fdinfo.cat->name = "timeout"; - fdinfo.other = NULL; // Should put the duration, the file descriptor, the packet id + fdinfo.other = (void*)link; // Should put the link number and the id fdinfo.free_other = NULL; sprintf(fdinfo.url, "timer:%ld:1", micro_sec); evt_core_add_fd (evts, &fdinfo); @@ -101,9 +102,6 @@ void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec) { int rr_update_states(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded // 1. Update links I can use thanks to target feedback if (bp->ip.ap.str.id > rr->my_links_ver) { @@ -113,6 +111,7 @@ int rr_update_states(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct // 2. If packet arrived too late if (ring_gt(rr->recv_id, bp->ip.ap.str.id - 1)) { + if (bp->ip.ap.str.flags & PKT_TIMEOUT) return 0; // We don't use real buffer pkt for PKT_TIMEOUT // Packet has already been delivered or dropped, we free the buffer g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd)); memset(bp, 0, sizeof(struct buffer_packet)); @@ -124,10 +123,10 @@ int rr_update_states(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct if (ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)) { int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.str.deltat; if (timeout <= 0) timeout = 0; - set_timeout(ctx, timeout); + set_timeout(ctx, timeout, bp->ip.ap.str.prevlink); // Add a buffer to stub too // Bitfield can be anything as a greater packet has triggered the bitfield update before - int idx = bp->ip.ap.str.id % LINK_COUNT; + int idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; rr->real[idx].bp = bp; rr->real[idx].link_fd = fdinfo->fd; return 0; @@ -135,6 +134,11 @@ int rr_update_states(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct // 4. If we were waiting this packet rr->recv_id = bp->ip.ap.str.id; + + char buffer[16]; + url_get_port (buffer, fdinfo->url); + int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded + // 4.1 This is a timeout packet, we set the link as dead if (bp->ip.ap.str.flags & PKT_TIMEOUT) { rr->remote_links &= (1 << link_num) ^ UINT16_MAX; @@ -175,7 +179,7 @@ void rr_pkt_recv(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struc do { if(rr_update_states(ctx, bp, fdinfo)) rr_deliver(ctx, bp, fdinfo); do { - struct deferred_pkt* def = &rr->real[rr->recv_id+1 % LINK_COUNT]; + struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; if (def->bp == NULL) break; def->bp = NULL; struct evt_core_fdinfo* fdinfo = evt_core_get_from_fd (ctx, def->link_fd); @@ -235,6 +239,28 @@ co_error: exit(EXIT_FAILURE); } +int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; + uint16_t id; + + evt_core_rm_fd (ctx, fdinfo->fd); // We use the timeout only once + int fd = (int)fdinfo->other; + struct evt_core_fdinfo* tfdinfo = evt_core_get_from_fd (ctx, fd); + if (tfdinfo == NULL) { + fprintf(stderr, "An error occured as the link seems to be closed for the requested fd\n"); + rr->recv_id++; + return 1; + } + + rr->stub_bp.ip.ap.str.id = 0; //? + rr->stub_bp.ip.ap.str.flags = PKT_TIMEOUT; + rr->stub_bp.ip.ap.str.deltat = rr->mjit; + rr->stub_bp.ip.ap.str.prevlink = UINT8_MAX; + rr_pkt_recv (ctx, tfdinfo, &rr->stub_bp); + return 1; +} + int rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct buffer_packet* bp; @@ -293,6 +319,7 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { as->on_tcp_read.cb = rr_on_tcp_read; as->on_tcp_read.err_cb = rr_on_err; ctx->ref_count++; + /* as->on_udp_read.name = "udp-read"; as->on_udp_read.flags = EPOLLIN | EPOLLET; @@ -320,8 +347,14 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as) { ctx->ref_count++; struct evt_core_cat tcat = { - + .name = "timeout", + .flags = EPOLLIN | EPOLLET, + .app_ctx = ctx, + .free_app_ctx = free_naive, + .cb = rr_on_timer, + .err_cb = NULL }; + ctx->ref_count++; evt_core_add_cat(evt, &tcat); return; diff --git a/src/algo_utils.h b/src/algo_utils.h index 5165af1..0c4d93e 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -3,13 +3,13 @@ #include #include #include -#define LINK_COUNT 10 +#define PACKET_BUFFER_SIZE 10 typedef void (*algo_ctx_free_misc)(void*); struct algo_ctx { int ref_count; - struct buffer_packet bps[LINK_COUNT]; + struct buffer_packet bps[PACKET_BUFFER_SIZE]; GQueue* free_buffer; // Available buffers GHashTable* used_buffer; // Buffers used for reading or writing GQueue* read_waiting; // Who wait to be notified for a read