From 77b6027feba7689b094b8017c06bc8d08f61062d Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 10:02:43 +0200 Subject: [PATCH 01/14] Rework packet format --- src/algo_rr.c | 2 ++ src/packet.c | 24 +++++++++++++----------- src/packet.h | 36 ++++++++++++++++++++++++------------ 3 files changed, 39 insertions(+), 23 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index c251b10..834c6d1 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -283,10 +283,12 @@ void expired_wait(struct evt_core_ctx* ctx, void* user) { // 2. We will not reactivate link for this packet if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; + /* // 3. Stop if packet has been received and delivered if (ring_le (pkt->id, rr->recv_id)) return; printf("Timer reached for packet %d\n", pkt->id); + */ // 4. BLACKLIST LINK printf("Blacklist link=%d | ", pkt->link_num); diff --git a/src/packet.c b/src/packet.c index 6d520dd..0569182 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,16 +1,16 @@ #include "packet.h" size_t get_full_size(struct buffer_packet* bp) { - union abstract_packet* ap = &bp->ip.ap; + struct abstract_packet* ap = &bp->ip.ap; for (int i = 0; i < bp->ap_count; i++) { - ap = (union abstract_packet*)(&ap->raw + ap->str.size); + ap = (struct abstract_packet*)(&ap->raw + ap->headers.size); } return &ap->raw - &bp->ip.ap.raw; } enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { ssize_t nread; - size_t pkt_size_size = sizeof(bp->ip.ap.str.size); + size_t pkt_size_size = sizeof(bp->ip.ap.headers.size); if (bp->mode != BP_READING) return FDS_ERR; while (bp->aread < pkt_size_size) { @@ -21,8 +21,8 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { bp->aread += nread; } - while (bp->aread < bp->ip.ap.str.size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.str.size - bp->aread); + while (bp->aread < bp->ip.ap.headers.size) { + nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.headers.size - bp->aread); if (nread == 0) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1) return FDS_ERR; @@ -57,7 +57,8 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nwrite; size_t bytes_to_send; - size_t pkt_header_size = sizeof(bp->ip.ap.str) - sizeof(char); + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); + size_t pkt_header_size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.clear) - sizeof(char); struct sockaddr* addr = NULL; socklen_t addrlen = 0; if (udp_t->set) { @@ -67,9 +68,9 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = bp->ip.ap.str.size - pkt_header_size; + bytes_to_send = bp->ip.ap.headers.size - pkt_header_size; nwrite = sendto(fd, - &(bp->ip.ap.str.payload), + &(bp->ip.ap.content.clear.payload), bytes_to_send, 0, addr, @@ -89,12 +90,12 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp ssize_t nread; if (bp->mode != BP_READING) return FDS_ERR; - size_t pkt_header_size = sizeof(bp->ip.ap.str) - sizeof(char); // We remove the payload + size_t pkt_header_size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.clear) - sizeof(char); // We remove the payload size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; socklen_t addrlen = sizeof(struct sockaddr_in); nread = recvfrom(fd, - &(bp->ip.ap.str.payload), + &(bp->ip.ap.content.clear.payload), udp_packet_size, MSG_TRUNC, (struct sockaddr*)&udp_t->addr, @@ -106,7 +107,8 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp udp_t->set = 1; udp_t->addrlen = addrlen; - bp->ip.ap.str.size = nread + pkt_header_size; + bp->ip.ap.headers.size = nread + pkt_header_size; + bp->ip.ap.headers.cmd = CMD_CLEAR; bp->mode = BP_WRITING; bp->awrite = 0; diff --git a/src/packet.h b/src/packet.h index f07fd67..dc229ca 100644 --- a/src/packet.h +++ b/src/packet.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -28,26 +29,37 @@ enum BP_MODE { BP_WRITING }; -enum PKT_FLAGS { - PKT_CONTROL = 1 << 0 +enum PKT_CMD { + CMD_HEALTH, + CMD_CLEAR, + CMD_XOR }; -union abstract_packet { +struct abstract_packet { char raw; + struct { + enum PKT_CMD cmd; uint16_t size; - uint16_t port; - uint16_t id; - uint8_t bitfield; - uint8_t prevlink; - uint16_t deltat; - uint8_t flags; - char payload; - } str; + } headers; + + union { + struct { + uint16_t id; + uint8_t bitfield; + uint8_t prevlink; + uint16_t deltat; + } health; + struct { + uint16_t id; + uint16_t port; + char payload; + } clear; + } content; }; struct internet_packet { - union abstract_packet ap; + struct abstract_packet ap; char rest[1499]; // MTU = 1500, 1 byte in the union as payload }; From 778181a1539e69716f3e038a9f3b4e592407891f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 10:21:11 +0200 Subject: [PATCH 02/14] Fix simple algorithms --- src/algo_dup2.c | 8 ++++---- src/algo_naive.c | 2 +- src/algo_utils.c | 6 +++--- src/algo_utils.h | 2 +- src/proxy.c | 2 +- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 30b8e73..c5db60f 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -22,15 +22,15 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; - if (ring_ge(dup2c->recv_id, bp->ip.ap.str.id)) { + if (ring_ge(dup2c->recv_id, bp->ip.ap.content.clear.id)) { mv_buffer_rtof(app_ctx, fdinfo); return 0; } - dup2c->recv_id = bp->ip.ap.str.id; + dup2c->recv_id = bp->ip.ap.content.clear.id; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); @@ -49,7 +49,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct dup2_ctx* dup2c = app_ctx->misc; - bp->ip.ap.str.id = dup2c->emit_id; + bp->ip.ap.content.clear.id = dup2c->emit_id; dup2c->emit_id = dup2c->emit_id + 1; struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); diff --git a/src/algo_naive.c b/src/algo_naive.c index 17a7d99..e9b2390 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -11,7 +11,7 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); diff --git a/src/algo_utils.c b/src/algo_utils.c index fc1fa4b..a4e9984 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -235,12 +235,12 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } } -int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { +int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src) { char* target = &(dest->raw); while (pos-- > 0) { - target += dest->str.size; + target += dest->headers.size; } - memcpy(target, src, src->str.size); + memcpy(target, src, src->headers.size); return 0; } diff --git a/src/algo_utils.h b/src/algo_utils.h index ce8d26a..0df855f 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -53,7 +53,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); -int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); +int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src); struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/proxy.c b/src/proxy.c index b392dee..b40160c 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -71,7 +71,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; // 2. Read packet from socket - bp->ip.ap.str.port = url_get_port_int (fdinfo->url); + bp->ip.ap.content.clear.port = url_get_port_int (fdinfo->url); read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; From 5d38679dbbe8d88ad3514a0bf3bdd118459fbbe8 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 16:14:23 +0200 Subject: [PATCH 03/14] Reimplement timeout RR --- src/algo_rr.c | 87 +++++++++++++++++++++++++++++++++++++++++---------- src/packet.h | 1 + 2 files changed, 71 insertions(+), 17 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 834c6d1..5bbc8e4 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -5,14 +5,14 @@ #include "proxy.h" #include "timer.h" -struct waited_pkt { - uint16_t id; - int link_num; - uint8_t on; +struct timer_info { + uint16_t health_id; + uint8_t prevlink; + uint16_t min_blocked_pkt; struct algo_ctx* algo; }; -struct deferred_pkt { +struct queued_pkt { int link_fd; int idx; uint16_t id; @@ -25,14 +25,15 @@ struct rr_ctx { uint16_t my_links_ver; uint8_t remote_links; int64_t mjit; - uint16_t recv_id; - uint16_t recv_id_late; + uint16_t health_id; + uint16_t health_id_late; + uint16_t content_id; uint16_t sent_id; uint8_t current_link; struct internet_packet prev_packet; struct timespec emit_time; - struct deferred_pkt real[PACKET_BUFFER_SIZE]; - struct waited_pkt wait[PACKET_BUFFER_SIZE]; + struct queued_pkt real[PACKET_BUFFER_SIZE]; + struct timer_info wait[PACKET_BUFFER_SIZE]; }; void show_link_availability(struct rr_ctx* rr) { @@ -51,6 +52,7 @@ void show_link_availability(struct rr_ctx* rr) { void expired_wait (struct evt_core_ctx* ctx, void* user); void expired_late(struct evt_core_ctx* ctx, void* user); +void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; @@ -58,8 +60,8 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, char buffer[16]; url_get_port (buffer, fdinfo->url); int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE; + uint16_t real_idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; + uint16_t wait_idx = (bp->ip.ap.content.health.id - 1) % PACKET_BUFFER_SIZE; //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); @@ -147,12 +149,13 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 3. We update our cursor - rr->recv_id = bp->ip.ap.str.id; + rr->recv_id = bp->ip.ap.content.clear.id; // 4. Find its target - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet :( \n", url); @@ -166,6 +169,27 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct main_on_udp_write(ctx, to_fdinfo); } +void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; + + assert(bp->ip.ap.headers.cmd == CMD_HEALTH); + if (ring_le(app_ctx->health_id, rr->health_id_late)) { + return; + } + + int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.content.health.deltat; + if (timeout <= 0) timeout = 0; + uint64_t idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; + + rr->wait[idx].health_id = bp->ip.content.health.id; + rr->wait[idx].prevlink = bp->ip.ap.content.health.prevlink; + rr->wait[idx].min_blocked_pkt = bp->ip.ap.content.health.min_blocked_pkt; + rr->wait[idx].algo = app_ctx; + + set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); +} + void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct rr_ctx* rr = app_ctx->misc; struct evt_core_fdinfo* fdinfo = NULL; @@ -193,11 +217,15 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - // 1. Register packet in our queue - rr_pkt_register(ctx, fdinfo, bp); + if (bp->ip.ap.headers.cmd == CMD_CLEAR) { + // 1. Register packet in our queue + rr_pkt_register(ctx, fdinfo, bp); - // 2. Process queue - rr_pkt_unroll (ctx, app_ctx); + // 2. Process queue + rr_pkt_unroll (ctx, app_ctx); + } else if (bp->ip.ap.headers.cmd == CMD_HEALTH) { + rr_pkt_manage_links(ctx, fdinfo, bp); + } return 0; co_error: @@ -273,6 +301,31 @@ co_error: exit(EXIT_FAILURE); } +void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { + struct timer_info* t = raw; + struct algo_ctx* app_ctx = t->algo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; + + // 1. Update link recovery window if needed + if (ring_gt(t->health_id, rr->health_id_late)) rr->health_id_late = t->health_id; + + // 2. Blacklist previous link if needed + uint16_t prev_health_id = (t->health_id - 1); + uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; + struct timer_info* t_old = rr->wait[prev_health_idx]; + if (t_old->health_id != prev_health_id) { + printf("Blacklist link=%d | ", t->prevlink); + rr->remote_links &= 0xff ^ 1 << t->prevlink; + show_link_availability (rr); + } + + // 3. Deliver blocked packets + while (ring_gt(t->min_blocked_pkt, rr->content_id)) { + rr->content_id++; + rr_pkt_unroll (ctx, app_ctx); + } +} + void expired_wait(struct evt_core_ctx* ctx, void* user) { struct waited_pkt* pkt = user; struct rr_ctx* rr = pkt->algo->misc; diff --git a/src/packet.h b/src/packet.h index dc229ca..80dc4ff 100644 --- a/src/packet.h +++ b/src/packet.h @@ -49,6 +49,7 @@ struct abstract_packet { uint8_t bitfield; uint8_t prevlink; uint16_t deltat; + uint16_t min_blocked_pkt; } health; struct { uint16_t id; From 5bcab9639343d6b5f05fb65beacf186dfb650d6f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 16:34:22 +0200 Subject: [PATCH 04/14] Add buffer release for control packets (health) --- src/algo_rr.c | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 5bbc8e4..82ece34 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -174,9 +174,8 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf struct rr_ctx* rr = app_ctx->misc; assert(bp->ip.ap.headers.cmd == CMD_HEALTH); - if (ring_le(app_ctx->health_id, rr->health_id_late)) { - return; - } + + if (ring_le(bp->ip.ap.health.id, rr->health_id_late)) goto release; int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.content.health.deltat; if (timeout <= 0) timeout = 0; @@ -188,6 +187,9 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf rr->wait[idx].algo = app_ctx; set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); + +release: + mv_buffer_rtof(app_ctx, fdinfo); } void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { From 67a3c2a610002c4da5fd1f00d564a749a2a682cf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 16:46:04 +0200 Subject: [PATCH 05/14] Add missing features to health packet --- src/algo_rr.c | 41 ++++++++++++++++++++++++++--------------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 82ece34..d8c7603 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -65,21 +65,6 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); - // 0. Update remote links - if (ring_lt(rr->recv_id_late, bp->ip.ap.str.id) && !(rr->remote_links & 1 << link_num)) { - printf("Activate link=%d | ", link_num); - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working - show_link_availability (rr); - } - - // 1. Update my links I can use thanks to target feedback - if (bp->ip.ap.str.id > rr->my_links_ver && bp->ip.ap.str.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.str.bitfield; - rr->my_links_ver = bp->ip.ap.str.id; - printf("Update my links | "); - show_link_availability (rr); - } - // 2. If packet arrived too late or already queued, we discard it if (ring_ge(rr->recv_id, bp->ip.ap.str.id) || rr->real[real_idx].id == bp->ip.ap.str.id) { // Packet has already been delivered or dropped, we free the buffer @@ -175,8 +160,34 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf assert(bp->ip.ap.headers.cmd == CMD_HEALTH); + // 1. Health packet was received too late, dropping it if (ring_le(bp->ip.ap.health.id, rr->health_id_late)) goto release; + // 2. Reactivate link if deactivated + char buffer[16]; + url_get_port (buffer, fdinfo->url); + int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded + if (!(rr->remote_links & (1 << link_num))) { + printf("Activate link=%d | ", link_num); + rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working + show_link_availability (rr); + } + + // 3. Update RR structure if its the greatest health_id we received + if (ring_gt(bp->ip.ap.health.id, rr->health_id)) { + // 3.1. Update current health id + rr->health_id = bp->ip.ap.health.id; + + // 3.2. Update my links I can use thanks to target feedback + if (bp->ip.ap.str.bitfield != rr->my_links) { + rr->my_links = bp->ip.ap.str.bitfield; + rr->my_links_ver = bp->ip.ap.str.id; + printf("Update my links | "); + show_link_availability (rr); + } + } + + // 4. Set callback to close this health packet window int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.content.health.deltat; if (timeout <= 0) timeout = 0; uint64_t idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; From 9cc27e9613a83f65b2fa76f9d9142c188bed8501 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 17:26:31 +0200 Subject: [PATCH 06/14] Fix receive logic --- src/algo_rr.c | 140 +++++++++----------------------------------------- 1 file changed, 25 insertions(+), 115 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index d8c7603..3dacc92 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -13,23 +13,21 @@ struct timer_info { }; struct queued_pkt { + uint8_t on; int link_fd; int idx; uint16_t id; - uint8_t on; struct algo_ctx* algo; }; struct rr_ctx { uint8_t my_links; - uint16_t my_links_ver; uint8_t remote_links; int64_t mjit; uint16_t health_id; uint16_t health_id_late; uint16_t content_id; uint16_t sent_id; - uint8_t current_link; struct internet_packet prev_packet; struct timespec emit_time; struct queued_pkt real[PACKET_BUFFER_SIZE]; @@ -50,94 +48,50 @@ void show_link_availability(struct rr_ctx* rr) { printf("]\n"); } -void expired_wait (struct evt_core_ctx* ctx, void* user); -void expired_late(struct evt_core_ctx* ctx, void* user); void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - uint16_t real_idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; - uint16_t wait_idx = (bp->ip.ap.content.health.id - 1) % PACKET_BUFFER_SIZE; + uint16_t real_idx = bp->ip.ap.content.clear.id % PACKET_BUFFER_SIZE; - //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); - - // 2. If packet arrived too late or already queued, we discard it - if (ring_ge(rr->recv_id, bp->ip.ap.str.id) || rr->real[real_idx].id == bp->ip.ap.str.id) { - // Packet has already been delivered or dropped, we free the buffer - fprintf(stderr, "Packet %d arrived too late (current: %d) or already received\n", bp->ip.ap.str.id, rr->recv_id); - mv_buffer_wtof (app_ctx, fdinfo); - return; - } - - // 3. If packet arrived too early, we wait for its predecessors - //printf("%d < %d = %d\n", rr->recv_id, bp->ip.ap.str.id - 1, ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)); - if (ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)) { - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.str.deltat; - //printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout); - if (timeout <= 0) timeout = 0; - - if (rr->wait[wait_idx].on && rr->wait[wait_idx].id != bp->ip.ap.str.id - 1) { - fprintf(stderr, "Waiting array overlap, BUG: [\n"); - for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - printf("\t%d => %d\n", rr->wait[i].id, rr->wait[i].on); - } - printf("] - could be replaced by drop\n"); - exit(EXIT_FAILURE); - } else if (!rr->wait[wait_idx].on) { - rr->wait[wait_idx].on = 1; - rr->wait[wait_idx].id = bp->ip.ap.str.id - 1; - rr->wait[wait_idx].link_num = bp->ip.ap.str.prevlink; - rr->wait[wait_idx].algo = app_ctx; - set_timeout(ctx, timeout, &rr->wait[wait_idx], expired_wait); - } - } + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 4. We queue the packet to keep it - if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.str.id) { - fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, real_idx); + if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.content.clear.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); } printf("] - could be replaced by drop\n"); exit(EXIT_FAILURE); } else if (!rr->real[real_idx].on) { - rr->real[real_idx].on = 2; - rr->real[real_idx].id = bp->ip.ap.str.id; + rr->real[real_idx].on = 1; + rr->real[real_idx].id = bp->ip.ap.content.clear.id; rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); - - // 5. We register a timer for this packet to create a reactivation window for broken links - set_timeout(ctx, rr->mjit + 1, &rr->real[real_idx], expired_late); - - //printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real); } else { - fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); - mv_buffer_wtof (app_ctx, fdinfo); + fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.content.clear.id, rr->content_id); + mv_buffer_rtof (app_ctx, fdinfo); } - } -void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { +void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queued_pkt* dp) { struct evt_core_fdinfo *to_fdinfo = NULL; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; char url[255]; // 1. Marked the packet as handled - dp->on--; + dp->on = 0; // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 3. We update our cursor - rr->recv_id = bp->ip.ap.content.clear.id; + rr->content_id = bp->ip.ap.content.clear.id; // 4. Find its target sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); @@ -161,7 +115,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf assert(bp->ip.ap.headers.cmd == CMD_HEALTH); // 1. Health packet was received too late, dropping it - if (ring_le(bp->ip.ap.health.id, rr->health_id_late)) goto release; + if (ring_le(bp->ip.ap.content.health.id, rr->health_id_late)) goto release; // 2. Reactivate link if deactivated char buffer[16]; @@ -174,14 +128,13 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf } // 3. Update RR structure if its the greatest health_id we received - if (ring_gt(bp->ip.ap.health.id, rr->health_id)) { + if (ring_gt(bp->ip.ap.content.health.id, rr->health_id)) { // 3.1. Update current health id - rr->health_id = bp->ip.ap.health.id; + rr->health_id = bp->ip.ap.content.health.id; // 3.2. Update my links I can use thanks to target feedback - if (bp->ip.ap.str.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.str.bitfield; - rr->my_links_ver = bp->ip.ap.str.id; + if (bp->ip.ap.content.health.bitfield != rr->my_links) { + rr->my_links = bp->ip.ap.content.health.bitfield; printf("Update my links | "); show_link_availability (rr); } @@ -192,7 +145,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf if (timeout <= 0) timeout = 0; uint64_t idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; - rr->wait[idx].health_id = bp->ip.content.health.id; + rr->wait[idx].health_id = bp->ip.ap.content.health.id; rr->wait[idx].prevlink = bp->ip.ap.content.health.prevlink; rr->wait[idx].min_blocked_pkt = bp->ip.ap.content.health.min_blocked_pkt; rr->wait[idx].algo = app_ctx; @@ -210,16 +163,9 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { while(1) { //printf("Trying to deliver %d\n", rr->recv_id+1); - struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; + struct queued_pkt* def = &rr->real[(rr->content_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; - fdinfo = evt_core_get_from_fd (ctx, def->link_fd); - if (fdinfo == NULL) { - fprintf(stderr, "An error occured as the link seems to be closed for the requested fd\n"); - rr->recv_id++; - continue; - } - - rr_deliver(ctx, fdinfo, def); + rr_deliver(ctx, app_ctx, def); //printf("Delivered %d\n", rr->recv_id); } } @@ -316,7 +262,7 @@ co_error: void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { struct timer_info* t = raw; - struct algo_ctx* app_ctx = t->algo->cat->app_ctx; + struct algo_ctx* app_ctx = t->algo; struct rr_ctx* rr = app_ctx->misc; // 1. Update link recovery window if needed @@ -325,7 +271,7 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { // 2. Blacklist previous link if needed uint16_t prev_health_id = (t->health_id - 1); uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; - struct timer_info* t_old = rr->wait[prev_health_idx]; + struct timer_info* t_old = &rr->wait[prev_health_idx]; if (t_old->health_id != prev_health_id) { printf("Blacklist link=%d | ", t->prevlink); rr->remote_links &= 0xff ^ 1 << t->prevlink; @@ -339,43 +285,6 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { } } -void expired_wait(struct evt_core_ctx* ctx, void* user) { - struct waited_pkt* pkt = user; - struct rr_ctx* rr = pkt->algo->misc; - - // 1. Release lock - pkt->on = 0; - - // 2. We will not reactivate link for this packet - if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; - - /* - // 3. Stop if packet has been received and delivered - if (ring_le (pkt->id, rr->recv_id)) return; - - printf("Timer reached for packet %d\n", pkt->id); - */ - - // 4. BLACKLIST LINK - printf("Blacklist link=%d | ", pkt->link_num); - rr->remote_links &= 0xff ^ 1 << pkt->link_num; - show_link_availability (rr); - - // 5. Deliver following packets - while (ring_lt(rr->recv_id, pkt->id)) { - rr->recv_id++; - rr_pkt_unroll (ctx, pkt->algo); - } -} - -void expired_late(struct evt_core_ctx* ctx, void* user) { - struct deferred_pkt* pkt = user; - struct rr_ctx* rr = pkt->algo->misc; - - pkt->on--; - if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; -} - int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { // We do nothing return 0; @@ -397,8 +306,9 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->my_links = 0xff; rr->remote_links = 0xff; rr->sent_id = 1; - rr->recv_id = 0; - rr->recv_id_late = 0; + rr->health_id = 0; + rr->health_id_late = 0; + rr->content_id = 0; app_ctx->misc = rr; app_ctx->free_misc = algo_rr_free; From dafd69e649c6093e3093ca575decf7461980df69 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 18:37:49 +0200 Subject: [PATCH 07/14] New round robin is compiling! --- src/algo_rr.c | 49 +++++++++++++++++++++++++++++++++--------------- src/algo_utils.c | 9 +++++---- src/algo_utils.h | 2 +- 3 files changed, 40 insertions(+), 20 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 3dacc92..5412894 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -23,11 +23,13 @@ struct queued_pkt { struct rr_ctx { uint8_t my_links; uint8_t remote_links; + uint8_t current_link; int64_t mjit; uint16_t health_id; uint16_t health_id_late; uint16_t content_id; - uint16_t sent_id; + uint16_t sent_health_id; + uint16_t sent_content_id; struct internet_packet prev_packet; struct timespec emit_time; struct queued_pkt real[PACKET_BUFFER_SIZE]; @@ -208,28 +210,43 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo exit(EXIT_FAILURE); } + // 2. Compute delta t secs = curr.tv_sec - rr->emit_time.tv_sec; nsecs = curr.tv_nsec - rr->emit_time.tv_nsec; + rr->emit_time = curr; mili_sec = secs * 1000 + nsecs / 1000000; if (mili_sec > rr->mjit) mili_sec = rr->mjit; - bp->ip.ap.str.id = rr->sent_id; - bp->ip.ap.str.flags = 0; - bp->ip.ap.str.deltat = mili_sec; - bp->ip.ap.str.bitfield = rr->remote_links; - bp->ip.ap.str.prevlink = rr->current_link; + // 3. Backup clear packet + struct buffer_packet clear_packet; + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); + append_buffer (&clear_packet.ip.ap, 0, &bp->ip.ap); + // 4. Set health packet + bp->ip.ap.headers.cmd = CMD_HEALTH; + bp->ip.ap.headers.size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.health); + bp->ip.ap.content.health.id = rr->sent_health_id; + bp->ip.ap.content.health.deltat = mili_sec; + bp->ip.ap.content.health.prevlink = rr->current_link; + bp->ip.ap.content.health.bitfield = rr->remote_links; + rr->sent_health_id++; + + + // 5. Append clear packet + clear_packet.ip.ap.content.clear.id = rr->sent_content_id; + rr->sent_content_id++; + append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap); + bp->ap_count++; + + // 6. Append redundancy if needed if (app_ctx->ap.redundant_data == 1) { - append_buffer(&bp->ip.ap, 1, &rr->prev_packet.ap); // We append previous packet - append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time + append_buffer(&bp->ip.ap, 2, &rr->prev_packet.ap); // We append previous packet + append_buffer(&rr->prev_packet.ap, 0, &clear_packet.ip.ap); // We store current packet for next time bp->ap_count++; } //printf("Will send packet id=%d\n", bp->ip.ap.str.id); - rr->emit_time = curr; - rr->sent_id++; - - // 2. Try to find someone to send it + // 7. Try to find someone to send it int max = 16; uint8_t sel_link = rr->current_link; while(max-- >= 0) { @@ -238,13 +255,14 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) continue; // Missing link if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Some links are down - if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) { + if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { rr->current_link = sel_link; mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); return 0; } else { - dup_buffer_tow(app_ctx, bp, to_fdinfo); + struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); + dup_bp->ap_count = 1; main_on_tcp_write(ctx, to_fdinfo); } } @@ -305,7 +323,8 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->mjit = 200; rr->my_links = 0xff; rr->remote_links = 0xff; - rr->sent_id = 1; + rr->sent_health_id = 0; + rr->sent_content_id = 0; rr->health_id = 0; rr->health_id_late = 0; rr->content_id = 0; diff --git a/src/algo_utils.c b/src/algo_utils.c index a4e9984..28d1015 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -43,7 +43,7 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_ struct buffer_packet* bp; // 1. Check if we don't have a buffer - bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); + bp = fdinfo == NULL ? NULL : g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); if (bp != NULL) return bp; // 2. Get a new buffer otherwise @@ -189,14 +189,14 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { g_queue_push_tail (app_ctx->free_buffer, bp); } -void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { +struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { GQueue* q; // 1. We get a free buffer struct buffer_packet* bp_dest = g_queue_pop_head(app_ctx->free_buffer); if (bp_dest == NULL) { debug_buffer(app_ctx, to); - return; + return NULL; } // 2. We duplicate the data @@ -211,6 +211,7 @@ void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct e // 4. We push the content to the appropriate destination g_queue_push_tail(q, bp_dest); + return bp_dest; } struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { @@ -238,7 +239,7 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src) { char* target = &(dest->raw); while (pos-- > 0) { - target += dest->headers.size; + target += ((struct abstract_packet*) target)->headers.size; } memcpy(target, src, src->headers.size); return 0; diff --git a/src/algo_utils.h b/src/algo_utils.h index 0df855f..52c0d92 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -51,7 +51,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); -void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src); From 9fb2396e5443d36848ec9e695095ede9859f5054 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 18:38:48 +0200 Subject: [PATCH 08/14] Add current link just in case --- src/algo_rr.c | 1 + 1 file changed, 1 insertion(+) diff --git a/src/algo_rr.c b/src/algo_rr.c index 5412894..99e51dc 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -328,6 +328,7 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->health_id = 0; rr->health_id_late = 0; rr->content_id = 0; + rr->current_link = 0; app_ctx->misc = rr; app_ctx->free_misc = algo_rr_free; From 3e07b012e2ac12398a6ed92215fcfd49c1789f25 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 13 May 2019 18:41:10 +0200 Subject: [PATCH 09/14] Better comments --- src/algo_rr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 99e51dc..b15d275 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -59,7 +59,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, assert(bp->ip.ap.headers.cmd == CMD_CLEAR); - // 4. We queue the packet to keep it + // 1. We queue the packet to keep it if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.content.clear.id) { fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { @@ -262,7 +262,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo return 0; } else { struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); - dup_bp->ap_count = 1; + dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor... main_on_tcp_write(ctx, to_fdinfo); } } From 36a6dbaad22bea52a1f31ac9507faaa294405387 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 14 May 2019 11:23:23 +0200 Subject: [PATCH 10/14] We want to use an union --- src/algo_dup2.c | 8 +++---- src/algo_naive.c | 2 +- src/algo_rr.c | 54 +++++++++++++++++++++++++----------------------- src/algo_utils.c | 6 +++--- src/algo_utils.h | 2 +- src/packet.c | 28 +++++++++++++------------ src/packet.h | 41 ++++++++++++++++++------------------ src/proxy.c | 2 +- 8 files changed, 74 insertions(+), 69 deletions(-) diff --git a/src/algo_dup2.c b/src/algo_dup2.c index c5db60f..530f20e 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -22,15 +22,15 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; - if (ring_ge(dup2c->recv_id, bp->ip.ap.content.clear.id)) { + if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { mv_buffer_rtof(app_ctx, fdinfo); return 0; } - dup2c->recv_id = bp->ip.ap.content.clear.id; + dup2c->recv_id = bp->ip.ap.fmt.content.clear.id; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); @@ -49,7 +49,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct dup2_ctx* dup2c = app_ctx->misc; - bp->ip.ap.content.clear.id = dup2c->emit_id; + bp->ip.ap.fmt.content.clear.id = dup2c->emit_id; dup2c->emit_id = dup2c->emit_id + 1; struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); diff --git a/src/algo_naive.c b/src/algo_naive.c index e9b2390..90afdaa 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -11,7 +11,7 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); diff --git a/src/algo_rr.c b/src/algo_rr.c index b15d275..8fc2467 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -55,13 +55,13 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - uint16_t real_idx = bp->ip.ap.content.clear.id % PACKET_BUFFER_SIZE; + uint16_t real_idx = bp->ip.ap.fmt.content.clear.id % PACKET_BUFFER_SIZE; assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 1. We queue the packet to keep it - if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.content.clear.id) { - fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.content.clear.id, real_idx); + if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.fmt.content.clear.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.fmt.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); } @@ -69,13 +69,13 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, exit(EXIT_FAILURE); } else if (!rr->real[real_idx].on) { rr->real[real_idx].on = 1; - rr->real[real_idx].id = bp->ip.ap.content.clear.id; + rr->real[real_idx].id = bp->ip.ap.fmt.content.clear.id; rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); } else { - fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.content.clear.id, rr->content_id); + fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); mv_buffer_rtof (app_ctx, fdinfo); } } @@ -93,10 +93,10 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 3. We update our cursor - rr->content_id = bp->ip.ap.content.clear.id; + rr->content_id = bp->ip.ap.fmt.content.clear.id; // 4. Find its target - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet :( \n", url); @@ -117,7 +117,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf assert(bp->ip.ap.headers.cmd == CMD_HEALTH); // 1. Health packet was received too late, dropping it - if (ring_le(bp->ip.ap.content.health.id, rr->health_id_late)) goto release; + if (ring_le(bp->ip.ap.fmt.content.health.id, rr->health_id_late)) goto release; // 2. Reactivate link if deactivated char buffer[16]; @@ -130,26 +130,26 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf } // 3. Update RR structure if its the greatest health_id we received - if (ring_gt(bp->ip.ap.content.health.id, rr->health_id)) { + if (ring_gt(bp->ip.ap.fmt.content.health.id, rr->health_id)) { // 3.1. Update current health id - rr->health_id = bp->ip.ap.content.health.id; + rr->health_id = bp->ip.ap.fmt.content.health.id; // 3.2. Update my links I can use thanks to target feedback - if (bp->ip.ap.content.health.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.content.health.bitfield; + if (bp->ip.ap.fmt.content.health.bitfield != rr->my_links) { + rr->my_links = bp->ip.ap.fmt.content.health.bitfield; printf("Update my links | "); show_link_availability (rr); } } // 4. Set callback to close this health packet window - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.content.health.deltat; + int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.fmt.content.health.deltat; if (timeout <= 0) timeout = 0; - uint64_t idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; + uint64_t idx = bp->ip.ap.fmt.content.health.id % PACKET_BUFFER_SIZE; - rr->wait[idx].health_id = bp->ip.ap.content.health.id; - rr->wait[idx].prevlink = bp->ip.ap.content.health.prevlink; - rr->wait[idx].min_blocked_pkt = bp->ip.ap.content.health.min_blocked_pkt; + rr->wait[idx].health_id = bp->ip.ap.fmt.content.health.id; + rr->wait[idx].prevlink = bp->ip.ap.fmt.content.health.prevlink; + rr->wait[idx].min_blocked_pkt = bp->ip.ap.fmt.content.health.min_blocked_pkt; rr->wait[idx].algo = app_ctx; set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); @@ -178,13 +178,15 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - if (bp->ip.ap.headers.cmd == CMD_CLEAR) { + if (bp->ip.ap.fmt.headers.cmd == CMD_CLEAR) { + if (ctx->verbose) printf("Received a CLEAR packet of size %d\n", bp->ip.ap.fmt.headers.size); // 1. Register packet in our queue rr_pkt_register(ctx, fdinfo, bp); // 2. Process queue rr_pkt_unroll (ctx, app_ctx); - } else if (bp->ip.ap.headers.cmd == CMD_HEALTH) { + } else if (bp->ip.ap.fmt.headers.cmd == CMD_HEALTH) { + if (ctx->verbose) printf("Received a HEALTH packet of size %d\n", bp->ip.ap.fmt.headers.size); rr_pkt_manage_links(ctx, fdinfo, bp); } @@ -223,17 +225,17 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo append_buffer (&clear_packet.ip.ap, 0, &bp->ip.ap); // 4. Set health packet - bp->ip.ap.headers.cmd = CMD_HEALTH; - bp->ip.ap.headers.size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.health); - bp->ip.ap.content.health.id = rr->sent_health_id; - bp->ip.ap.content.health.deltat = mili_sec; - bp->ip.ap.content.health.prevlink = rr->current_link; - bp->ip.ap.content.health.bitfield = rr->remote_links; + bp->ip.ap.fmt.headers.cmd = CMD_HEALTH; + bp->ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); + bp->ip.ap.fmt.content.health.id = rr->sent_health_id; + bp->ip.ap.fmt.content.health.deltat = mili_sec; + bp->ip.ap.fmt.content.health.prevlink = rr->current_link; + bp->ip.ap.fmt.content.health.bitfield = rr->remote_links; rr->sent_health_id++; // 5. Append clear packet - clear_packet.ip.ap.content.clear.id = rr->sent_content_id; + clear_packet.ip.ap.fmt.content.clear.id = rr->sent_content_id; rr->sent_content_id++; append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap); bp->ap_count++; diff --git a/src/algo_utils.c b/src/algo_utils.c index 28d1015..a85bd10 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -236,12 +236,12 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } } -int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src) { +int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { char* target = &(dest->raw); while (pos-- > 0) { - target += ((struct abstract_packet*) target)->headers.size; + target += ((union abstract_packet*) target)->fmt.headers.size; } - memcpy(target, src, src->headers.size); + memcpy(target, src, src->fmt.headers.size); return 0; } diff --git a/src/algo_utils.h b/src/algo_utils.h index 52c0d92..76154de 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -53,7 +53,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); -int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src); +int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/packet.c b/src/packet.c index 0569182..7a4ba9c 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,16 +1,16 @@ #include "packet.h" size_t get_full_size(struct buffer_packet* bp) { - struct abstract_packet* ap = &bp->ip.ap; + union abstract_packet* ap = &bp->ip.ap; for (int i = 0; i < bp->ap_count; i++) { - ap = (struct abstract_packet*)(&ap->raw + ap->headers.size); + ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); } return &ap->raw - &bp->ip.ap.raw; } enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { ssize_t nread; - size_t pkt_size_size = sizeof(bp->ip.ap.headers.size); + size_t pkt_size_size = sizeof(bp->ip.ap.fmt.headers.size); if (bp->mode != BP_READING) return FDS_ERR; while (bp->aread < pkt_size_size) { @@ -20,9 +20,10 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { if (nread == -1) return FDS_ERR; bp->aread += nread; } + printf("read packet size = %d, read on network = %d, computed size to read = %ld.\n", bp->ip.ap.fmt.headers.size, bp->aread, pkt_size_size); - while (bp->aread < bp->ip.ap.headers.size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.headers.size - bp->aread); + while (bp->aread < bp->ip.ap.fmt.headers.size) { + nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.fmt.headers.size - bp->aread); if (nread == 0) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1) return FDS_ERR; @@ -40,13 +41,14 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { ssize_t nwrite; if (bp->mode != BP_WRITING) return FDS_ERR; + printf("Already written: %d\n", bp->awrite); while (bp->awrite < get_full_size(bp)) { nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; } - + printf("headers.size=%d, fullsize=%ld, bp->awrite=%d, first_read=%d\n", bp->ip.ap.fmt.headers.size, get_full_size (bp), bp->awrite, *(uint16_t*)&(bp->ip.ap.raw)); bp->mode = BP_READING; bp->aread = 0; bp->ap_count = 0; @@ -58,7 +60,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t ssize_t nwrite; size_t bytes_to_send; assert(bp->ip.ap.headers.cmd == CMD_CLEAR); - size_t pkt_header_size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.clear) - sizeof(char); + size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); struct sockaddr* addr = NULL; socklen_t addrlen = 0; if (udp_t->set) { @@ -68,9 +70,9 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = bp->ip.ap.headers.size - pkt_header_size; + bytes_to_send = bp->ip.ap.fmt.headers.size - pkt_header_size; nwrite = sendto(fd, - &(bp->ip.ap.content.clear.payload), + &(bp->ip.ap.fmt.content.clear.payload), bytes_to_send, 0, addr, @@ -90,12 +92,12 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp ssize_t nread; if (bp->mode != BP_READING) return FDS_ERR; - size_t pkt_header_size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.clear) - sizeof(char); // We remove the payload + size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); // We remove the payload size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; socklen_t addrlen = sizeof(struct sockaddr_in); nread = recvfrom(fd, - &(bp->ip.ap.content.clear.payload), + &(bp->ip.ap.fmt.content.clear.payload), udp_packet_size, MSG_TRUNC, (struct sockaddr*)&udp_t->addr, @@ -107,8 +109,8 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp udp_t->set = 1; udp_t->addrlen = addrlen; - bp->ip.ap.headers.size = nread + pkt_header_size; - bp->ip.ap.headers.cmd = CMD_CLEAR; + bp->ip.ap.fmt.headers.size = nread + pkt_header_size; + bp->ip.ap.fmt.headers.cmd = CMD_CLEAR; bp->mode = BP_WRITING; bp->awrite = 0; diff --git a/src/packet.h b/src/packet.h index 80dc4ff..0c7b43d 100644 --- a/src/packet.h +++ b/src/packet.h @@ -35,32 +35,33 @@ enum PKT_CMD { CMD_XOR }; -struct abstract_packet { +union abstract_packet { char raw; - struct { - enum PKT_CMD cmd; - uint16_t size; - } headers; + struct { + uint16_t size; + enum PKT_CMD cmd; + } headers; - union { - struct { - uint16_t id; - uint8_t bitfield; - uint8_t prevlink; - uint16_t deltat; - uint16_t min_blocked_pkt; - } health; - struct { - uint16_t id; - uint16_t port; - char payload; - } clear; - } content; + union { + struct { + uint16_t id; + uint8_t bitfield; + uint8_t prevlink; + uint16_t deltat; + uint16_t min_blocked_pkt; + } health; + struct { + uint16_t id; + uint16_t port; + char payload; + } clear; + } content; + } fmt; }; struct internet_packet { - struct abstract_packet ap; + union abstract_packet ap; char rest[1499]; // MTU = 1500, 1 byte in the union as payload }; diff --git a/src/proxy.c b/src/proxy.c index b40160c..60fe7c7 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -71,7 +71,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; // 2. Read packet from socket - bp->ip.ap.content.clear.port = url_get_port_int (fdinfo->url); + bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; From 46f54fd145f2f1146549b41cbb2c38b8cc30af91 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 14 May 2019 11:25:33 +0200 Subject: [PATCH 11/14] Remove bugs --- src/packet.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/packet.c b/src/packet.c index 7a4ba9c..518cb53 100644 --- a/src/packet.c +++ b/src/packet.c @@ -20,7 +20,6 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { if (nread == -1) return FDS_ERR; bp->aread += nread; } - printf("read packet size = %d, read on network = %d, computed size to read = %ld.\n", bp->ip.ap.fmt.headers.size, bp->aread, pkt_size_size); while (bp->aread < bp->ip.ap.fmt.headers.size) { nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.fmt.headers.size - bp->aread); @@ -41,14 +40,12 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { ssize_t nwrite; if (bp->mode != BP_WRITING) return FDS_ERR; - printf("Already written: %d\n", bp->awrite); while (bp->awrite < get_full_size(bp)) { nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; } - printf("headers.size=%d, fullsize=%ld, bp->awrite=%d, first_read=%d\n", bp->ip.ap.fmt.headers.size, get_full_size (bp), bp->awrite, *(uint16_t*)&(bp->ip.ap.raw)); bp->mode = BP_READING; bp->aread = 0; bp->ap_count = 0; From 05384a424a3b46edd6479120dca79891e6d688cc Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 14 May 2019 12:02:56 +0200 Subject: [PATCH 12/14] Add missing information --- src/algo_rr.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 8fc2467..a17a4cb 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -233,15 +233,17 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo bp->ip.ap.fmt.content.health.bitfield = rr->remote_links; rr->sent_health_id++; - // 5. Append clear packet clear_packet.ip.ap.fmt.content.clear.id = rr->sent_content_id; rr->sent_content_id++; + bp->ip.ap.fmt.content.health.min_blocked_pkt = clear_packet.ip.ap.fmt.content.clear.id; append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap); bp->ap_count++; // 6. Append redundancy if needed if (app_ctx->ap.redundant_data == 1) { + assert(ring_gt(bp->ip.ap.fmt.content.health.min_blocked_pkt, rr->prev_packet.ap.fmt.content.clear.id)); + bp->ip.ap.fmt.content.health.min_blocked_pkt = rr->prev_packet.ap.fmt.content.clear.id; append_buffer(&bp->ip.ap, 2, &rr->prev_packet.ap); // We append previous packet append_buffer(&rr->prev_packet.ap, 0, &clear_packet.ip.ap); // We store current packet for next time bp->ap_count++; From f94d2109d931ee9d89149f9f385e447e77dfe59d Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 14 May 2019 13:56:19 +0200 Subject: [PATCH 13/14] Register packet only if not already delivered! --- src/algo_rr.c | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index a17a4cb..65ad259 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -67,7 +67,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, } printf("] - could be replaced by drop\n"); exit(EXIT_FAILURE); - } else if (!rr->real[real_idx].on) { + } else if (!rr->real[real_idx].on && ring_gt(bp->ip.ap.fmt.content.clear.id, rr->content_id)) { rr->real[real_idx].on = 1; rr->real[real_idx].id = bp->ip.ap.fmt.content.clear.id; rr->real[real_idx].idx = real_idx; @@ -75,7 +75,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, rr->real[real_idx].algo = app_ctx; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); } else { - fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); + if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); mv_buffer_rtof (app_ctx, fdinfo); } } @@ -301,6 +301,7 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { } // 3. Deliver blocked packets + //printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); while (ring_gt(t->min_blocked_pkt, rr->content_id)) { rr->content_id++; rr_pkt_unroll (ctx, app_ctx); From a3712bac7747ee8deeb54bb142de90a35bb8c9be Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 14 May 2019 14:08:58 +0200 Subject: [PATCH 14/14] Must start with id=1 --- src/algo_rr.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 65ad259..440e903 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -328,8 +328,8 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->mjit = 200; rr->my_links = 0xff; rr->remote_links = 0xff; - rr->sent_health_id = 0; - rr->sent_content_id = 0; + rr->sent_health_id = 1; + rr->sent_content_id = 1; rr->health_id = 0; rr->health_id_late = 0; rr->content_id = 0;