diff --git a/CMakeLists.txt b/CMakeLists.txt index f2b545b..dbba341 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -31,6 +31,8 @@ list(APPEND CSOURCES src/algo_utils.c src/proxy.h src/proxy.c + src/timer.h + src/timer.c ) add_executable(donar ${CSOURCES} src/donar.c) @@ -55,4 +57,4 @@ target_link_libraries(torecho ${GLIB_LDFLAGS}) install(TARGETS donar measlat udpecho torecho RUNTIME DESTINATION bin - LIBRARY DESTINATION lib) \ No newline at end of file + LIBRARY DESTINATION lib) diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 8a3b03f..2134a25 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -73,11 +73,6 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin return 0; } -int algo_dup2_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - // We do nothing - return 0; -} - int algo_dup2_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { // We do nothing return 1; diff --git a/src/algo_naive.c b/src/algo_naive.c index d821a05..17a7d99 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -47,11 +47,6 @@ int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi return 0; } -int algo_naive_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - // We do nothing - return 0; -} - int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { // We do nothing return 1; diff --git a/src/algo_rr.c b/src/algo_rr.c index d45e823..0c4381c 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -3,12 +3,13 @@ #include "utils.h" #include "url.h" #include "proxy.h" +#include "timer.h" struct waited_pkt { uint16_t id; int link_num; uint8_t on; - int timer_fd; + struct algo_ctx* algo; }; struct deferred_pkt { @@ -16,6 +17,7 @@ struct deferred_pkt { int idx; uint16_t id; uint8_t on; + struct algo_ctx* algo; }; struct rr_ctx { @@ -46,45 +48,8 @@ void show_link_availability(struct rr_ctx* rr) { printf("]\n"); } -// @TODO Might be extracted from RR -int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt* wpkt) { - struct timespec now; - struct itimerspec timer_config; - char url[1024]; - struct evt_core_cat cat = {0}; - struct evt_core_fdinfo fdinfo = {0}; - fdinfo.cat = &cat; - fdinfo.url = url; - - //printf("Will add a timeout of %ld ms\n", milli_sec); - if (clock_gettime(CLOCK_REALTIME, &now) == -1) { - perror("clock_gettime"); - exit(EXIT_FAILURE); - } - - uint64_t ns = now.tv_nsec + (milli_sec % 1000) * 1000000; - timer_config.it_value.tv_sec = now.tv_sec + milli_sec / 1000 + ns / 1000000000; - timer_config.it_value.tv_nsec = ns % 1000000000; - timer_config.it_interval.tv_sec = 60; - timer_config.it_interval.tv_nsec = 0; - - fdinfo.fd = timerfd_create(CLOCK_REALTIME, 0); - if (fdinfo.fd == -1) { - perror("Unable to timerfd_create"); - exit(EXIT_FAILURE); - } - if (timerfd_settime (fdinfo.fd, TFD_TIMER_ABSTIME, &timer_config, NULL) == -1) { - perror("Unable to timerfd_settime"); - exit(EXIT_FAILURE); - } - fdinfo.cat->name = "timeout"; - fdinfo.other = wpkt; // Should put the link number and the id - fdinfo.free_other = NULL; - sprintf(fdinfo.url, "timer:%ld:1", milli_sec); - evt_core_add_fd (evts, &fdinfo); - - return fdinfo.fd; -} +void expired_wait (struct evt_core_ctx* ctx, void* user); +void expired_late(struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; @@ -111,57 +76,63 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, } // 2. If packet arrived too late, we discard it - if (ring_gt(rr->recv_id, bp->ip.ap.str.id - 1)) { + if (ring_ge(rr->recv_id, bp->ip.ap.str.id)) { // Packet has already been delivered or dropped, we free the buffer fprintf(stderr, "Packet %d arrived too late (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); mv_buffer_wtof (app_ctx, fdinfo); return; } - // 3. If packet arrived too early, we register a timer + // 3. If packet arrived too early, we wait for its predecessors //printf("%d < %d = %d\n", rr->recv_id, bp->ip.ap.str.id - 1, ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)); if (ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)) { int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.str.deltat; //printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout); if (timeout <= 0) timeout = 0; - int idx_waited = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE; - rr->wait[idx_waited].on = 1; - rr->wait[idx_waited].id = bp->ip.ap.str.id - 1; - rr->wait[idx_waited].link_num = bp->ip.ap.str.prevlink; - rr->wait[idx_waited].timer_fd = set_timeout(ctx, timeout, &rr->wait[idx_waited]); - } - // 4. If packet has not already a timer or has a wrong timer - int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - if (rr->wait[idx_real].on && rr->wait[idx_real].id != bp->ip.ap.str.id) { - fprintf(stderr, "Waiting array is full, BUG\n"); - exit(EXIT_FAILURE); - } else if (!rr->wait[idx_real].on) { - rr->wait[idx_real].on = 1; - rr->wait[idx_real].id = bp->ip.ap.str.id; - rr->wait[idx_real].link_num = link_num; - rr->wait[idx_real].timer_fd = set_timeout(ctx, rr->mjit + 1, &rr->wait[idx_real]); - } - - // 5. We queue the packet - if (rr->real[idx_real].on && rr->real[idx_real].id != bp->ip.ap.str.id) { - fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, idx_real); - for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - printf("\t%d => %d\n", i, rr->real[i].on); + uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE; + if (rr->wait[wait_idx].on && rr->wait[wait_idx].id != bp->ip.ap.str.id - 1) { + fprintf(stderr, "Waiting array overlap, BUG: [\n"); + for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { + printf("\t%d => %d\n", rr->wait[i].id, rr->wait[i].on); + } + printf("] - could be replaced by drop\n"); + exit(EXIT_FAILURE); + } else if (!rr->wait[wait_idx].on) { + rr->wait[wait_idx].on = 1; + rr->wait[wait_idx].id = bp->ip.ap.str.id - 1; + rr->wait[wait_idx].link_num = bp->ip.ap.str.prevlink; + rr->wait[wait_idx].algo = app_ctx; + set_timeout(ctx, timeout, &rr->wait[wait_idx], expired_wait); } - printf("]\n"); + } + + // 4. We queue the packet to keep it + uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; + if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.str.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, real_idx); + for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { + printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); + } + printf("] - could be replaced by drop\n"); exit(EXIT_FAILURE); - } else if (!rr->real[idx_real].on) { - rr->real[idx_real].on = 1; - rr->real[idx_real].id = bp->ip.ap.str.id; - rr->real[idx_real].idx = idx_real; - rr->real[idx_real].link_fd = fdinfo->fd; - mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx); + } else if (!rr->real[real_idx].on) { + rr->real[real_idx].on = 2; + rr->real[real_idx].id = bp->ip.ap.str.id; + rr->real[real_idx].idx = real_idx; + rr->real[real_idx].link_fd = fdinfo->fd; + rr->real[real_idx].algo = app_ctx; + mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); + + // 5. We register a timer for this packet to create a reactivation window for broken links + set_timeout(ctx, rr->mjit + 1, &rr->real[real_idx], expired_late); + //printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real); } else { fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); mv_buffer_wtof (app_ctx, fdinfo); } + } void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { @@ -171,25 +142,15 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct char url[255]; // 1. Marked the packet as handled - dp->on = 0; + dp->on--; - // 2. Get the buffer and update rr state + // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); - int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - rr->real[idx_real].on = 0; - //printf("%d is removed from real as %d\n", bp->ip.ap.str.id, idx_real); - //printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id); // 3. We update our cursor rr->recv_id = bp->ip.ap.str.id; - // 4. We free the buffer if it's a control packet and quit - /*if (bp->ip.ap.str.flags & PKT_CONTROL) { - mv_buffer_atof (app_ctx, &dp->idx); - return; - }*/ - - // 5. Find its target + // 4. Find its target sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { @@ -198,7 +159,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct mv_buffer_atof (app_ctx, &dp->idx); } - // 6. We move the buffer and notify the target + // 5. We move the buffer and notify the target //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); @@ -305,30 +266,39 @@ co_error: exit(EXIT_FAILURE); } -int algo_rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; +void expired_wait(struct evt_core_ctx* ctx, void* user) { + struct waited_pkt* pkt = user; + struct rr_ctx* rr = pkt->algo->misc; - struct waited_pkt* pkt = fdinfo->other; + // 1. Release lock pkt->on = 0; - if (ring_gt (pkt->id, rr->recv_id_late)) rr->recv_id_late = pkt->id; - if (ring_le (pkt->id, rr->recv_id)) goto end; + + // 2. We will not reactivate link for this packet + if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; + + // 3. Stop if packet has been received and delivered + if (ring_le (pkt->id, rr->recv_id)) return; printf("Timer reached for packet %d\n", pkt->id); - // !BLACKLIST LINK + // 4. BLACKLIST LINK printf("Blacklist link=%d | ", pkt->link_num); rr->remote_links &= 0xff ^ 1 << pkt->link_num; show_link_availability (rr); + // 5. Deliver following packets while (ring_lt(rr->recv_id, pkt->id)) { rr->recv_id++; - rr_pkt_unroll (ctx, app_ctx); + rr_pkt_unroll (ctx, pkt->algo); } +} -end: - evt_core_rm_fd(ctx, fdinfo->fd); - return 1; +void expired_late(struct evt_core_ctx* ctx, void* user) { + struct deferred_pkt* pkt = user; + struct rr_ctx* rr = pkt->algo->misc; + + pkt->on--; + if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; } int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { @@ -351,5 +321,7 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->recv_id = 0; rr->recv_id_late = 0; app_ctx->misc = rr; + + init_timer(ctx); } diff --git a/src/algo_utils.h b/src/algo_utils.h index e0cbff9..3ecabd3 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -4,7 +4,7 @@ #include #include "packet.h" #include "evt_core.h" -#define PACKET_BUFFER_SIZE 20 +#define PACKET_BUFFER_SIZE 128 struct algo_params { uint8_t is_waiting_bootstrap; @@ -25,7 +25,6 @@ struct algo_desc { algo_init init; algo_ctx_on_buffer on_stream; algo_ctx_on_buffer on_datagram; - algo_ctx_on_event on_timer; algo_ctx_on_event on_err; }; diff --git a/src/proxy.c b/src/proxy.c index 3be7cd7..96a2959 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -141,25 +141,6 @@ co_error: exit(EXIT_FAILURE); } -int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - uint64_t ctr; - ssize_t tmr_rd; - tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr)); - if (tmr_rd == -1 && errno == EAGAIN) return 1; - if (tmr_rd < 0) { - perror("read on timer"); - fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd); - exit(EXIT_FAILURE); - } - - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - return app_ctx->desc->on_timer (ctx, fdinfo); - -end: - evt_core_rm_fd(ctx, fdinfo->fd); - return 1; -} - int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct buffer_packet* bp; @@ -256,17 +237,6 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { ctx->ref_count++; evt_core_add_cat(evt, &udp_write); - struct evt_core_cat timer = { - .name = "timeout", - .flags = EPOLLIN | EPOLLET, - .app_ctx = ctx, - .free_app_ctx = free_naive, - .cb = main_on_timer, - .err_cb = NULL - }; - ctx->ref_count++; - evt_core_add_cat(evt, &timer); - for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { if (strcmp(available_algo[i].name, ap->algo_name) == 0) { ctx->desc = &(available_algo[i]); diff --git a/src/proxy.h b/src/proxy.h index 1a9d2d8..5d7e082 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -13,19 +13,16 @@ void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_naive_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); void algo_dup2_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_dup2_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int algo_dup2_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); static struct algo_desc available_algo[] = { @@ -34,7 +31,6 @@ static struct algo_desc available_algo[] = { .init = algo_naive_init, .on_stream = algo_naive_on_stream, .on_datagram = algo_naive_on_datagram, - .on_timer = algo_naive_on_timer, .on_err = algo_naive_on_err }, { @@ -42,7 +38,6 @@ static struct algo_desc available_algo[] = { .init = algo_rr_init, .on_stream = algo_rr_on_stream, .on_datagram = algo_rr_on_datagram, - .on_timer = algo_rr_on_timer, .on_err = algo_rr_on_err }, { @@ -50,7 +45,6 @@ static struct algo_desc available_algo[] = { .init = algo_dup2_init, .on_stream = algo_dup2_on_stream, .on_datagram = algo_dup2_on_datagram, - .on_timer = algo_dup2_on_timer, .on_err = algo_dup2_on_err } }; diff --git a/src/timer.c b/src/timer.c new file mode 100644 index 0000000..a6e6964 --- /dev/null +++ b/src/timer.c @@ -0,0 +1,92 @@ +#include "timer.h" + +struct timer_ctx { + timer_cb cb; + void* user_ctx; +}; + +void free_timerctx(void* c) { + free(c); +} + +int set_timeout_handle(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + uint64_t ctr; + ssize_t tmr_rd; + tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr)); + if (tmr_rd == -1 && errno == EAGAIN) return 1; + if (tmr_rd < 0) { + perror("read on timer"); + fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd); + exit(EXIT_FAILURE); + } + + struct timer_ctx* tctx = fdinfo->other; + tctx->cb(ctx, tctx->user_ctx); + + evt_core_rm_fd(ctx, fdinfo->fd); + return 1; +} + +void init_timer(struct evt_core_ctx* evts) { + struct evt_core_cat* cat = evt_core_get_from_cat (evts, "set_timeout"); + if (cat != NULL) { + fprintf(stderr, "timeout category has already been registered\n"); + return; + } + + struct evt_core_cat timer = { + .name = "set_timeout", + .flags = EPOLLIN | EPOLLET, + .app_ctx = NULL, + .free_app_ctx = NULL, + .cb = set_timeout_handle, + .err_cb = NULL + }; + evt_core_add_cat(evts, &timer); +} + +int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, void* ctx, timer_cb cb) { + struct timespec now; + struct itimerspec timer_config; + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + + //printf("Will add a timeout of %ld ms\n", milli_sec); + if (clock_gettime(CLOCK_REALTIME, &now) == -1) { + perror("clock_gettime"); + exit(EXIT_FAILURE); + } + + uint64_t ns = now.tv_nsec + (milli_sec % 1000) * 1000000; + timer_config.it_value.tv_sec = now.tv_sec + milli_sec / 1000 + ns / 1000000000; + timer_config.it_value.tv_nsec = ns % 1000000000; + timer_config.it_interval.tv_sec = 60; + timer_config.it_interval.tv_nsec = 0; + + fdinfo.fd = timerfd_create(CLOCK_REALTIME, 0); + if (fdinfo.fd == -1) { + perror("Unable to timerfd_create"); + exit(EXIT_FAILURE); + } + if (timerfd_settime (fdinfo.fd, TFD_TIMER_ABSTIME, &timer_config, NULL) == -1) { + perror("Unable to timerfd_settime"); + exit(EXIT_FAILURE); + } + fdinfo.cat->name = "set_timeout"; + struct timer_ctx* tctx = malloc(sizeof(struct timer_ctx)); // Should put the link number and the id + if (tctx == NULL) { + perror("malloc failed in set_timeout"); + exit(EXIT_FAILURE); + } + tctx->user_ctx = ctx; + tctx->cb = cb; + fdinfo.other = tctx; + fdinfo.free_other = free_timerctx; + sprintf(fdinfo.url, "timer:%ld:1", milli_sec); + evt_core_add_fd (evts, &fdinfo); + + return fdinfo.fd; +} diff --git a/src/timer.h b/src/timer.h new file mode 100644 index 0000000..d1687b5 --- /dev/null +++ b/src/timer.h @@ -0,0 +1,7 @@ +#pragma once +#include +#include "evt_core.h" + +typedef void (*timer_cb)(struct evt_core_ctx* ctx, void* user_data); +void init_timer(struct evt_core_ctx* evts); +int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, void* ctx, timer_cb cb); diff --git a/src/utils.h b/src/utils.h index 026255f..fad8837 100644 --- a/src/utils.h +++ b/src/utils.h @@ -25,3 +25,4 @@ int ring_lt(uint16_t v1, uint16_t v2); int ring_le(uint16_t v1, uint16_t v2); uint64_t elapsed_micros(struct timespec* t1, struct timespec* t2); +