diff --git a/src/algo_lightning.c b/src/algo_lightning.c
index c4635ab..90e38c0 100644
--- a/src/algo_lightning.c
+++ b/src/algo_lightning.c
@@ -23,6 +23,7 @@ struct timing_entry {
};
struct light_ctx {
+ uint8_t prev_links[MAX_LINKS];
struct timing_entry historic[HISTORIC_SIZE];
struct timespec last[MAX_LINKS];
uint64_t pkt_rcv_id;
@@ -31,6 +32,7 @@ struct light_ctx {
uint8_t total_links;
int max_ooo;
int sleep_duration;
+ int sent_past_links;
struct timespec window;
size_t monit_pkt_size;
};
@@ -51,6 +53,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
struct light_ctx* lightc = app_ctx->misc;
lightc->total_links = app_ctx->ap.links;
lightc->selected_link = lightc->total_links - 1;
+ lightc->sent_past_links = lightc->total_links;
lightc->max_ooo = 50;
lightc->sleep_duration = 500;
@@ -64,22 +67,83 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
sscanf(token, "max_ooo=%d", &lightc->max_ooo);
sscanf(token, "recovery=%d", &lightc->sleep_duration);
sscanf(token, "window=%ld", &window);
+ sscanf(token, "sent_past_links=%d", &lightc->sent_past_links);
}
}
union abstract_packet m;
- lightc->monit_pkt_size = sizeof(m.fmt.headers) + sizeof(m.fmt.content.link_monitoring_lightning);
+ lightc->monit_pkt_size = sizeof(m.fmt.headers) + sizeof(m.fmt.content.link_monitoring_lightning) + sizeof(uint8_t) * (lightc->sent_past_links - 1);
timespec_set_unit (&lightc->window, window, MICROSEC);
- printf("max_ooo = %d\n", lightc->max_ooo);
- printf("recovery = %d\n", lightc->sleep_duration);
- printf("window = %ld ms\n", window);
+ printf("max_ooo = %d ms\n", lightc->max_ooo);
+ printf("recovery = %d ms\n", lightc->sleep_duration);
+ printf("window check = %ld ms\n", window);
+ printf("sent_past_links = %d\n", lightc->sent_past_links);
+}
+
+void monitoring(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
+ struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
+ struct light_ctx* lightc = app_ctx->misc;
+ union abstract_packet* ap = (union abstract_packet*) &bp->ip;
+
+ while (ap != NULL && ap->fmt.headers.cmd != CMD_LINK_MONITORING_LIGHTNING) ap = ap_next(ap);
+ if (ap == NULL) {
+ fprintf(stderr, "[algo_lightning] Unable to find our monitoring information\n");
+ exit(EXIT_FAILURE);
+ }
+
+ uint8_t *prev_links = &ap->fmt.content.link_monitoring_lightning.prev_links;
+ int64_t pkt_id = ap->fmt.content.link_monitoring_lightning.id;
+ int64_t missing = pkt_id - (lightc->pkt_rcv_id + 1);
+ struct timespec now;
+ set_now(&now);
+
+ // Detect OoO
+ for (int i = 0; i < missing && i < lightc->sent_past_links; i++) {
+ uint8_t link_id = prev_links[i];
+ int64_t miss_id = pkt_id - (i+1);
+ struct timing_entry *te = &lightc->historic[miss_id % HISTORIC_SIZE];
+ if (te->pkt_id == miss_id) continue; // Entry already exists
+ te->state = OOO_ONGOING;
+ te->detected_at = now;
+ te->link_id = link_id;
+ te->pkt_id = miss_id;
+ }
+
+ // Update current packet status
+ int link_id = url_get_port_int(fdinfo->url) - 7500;
+ struct timing_entry *te2 = &lightc->historic[pkt_id % HISTORIC_SIZE];
+ te2->state = te2->pkt_id == pkt_id ? OOO_DONE : IN_ORDER;
+ te2->pkt_id = pkt_id;
+ te2->link_id = link_id;
+ te2->finished_at = now;
+}
+
+int deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
+ char url[256];
+ struct evt_core_fdinfo *to_fdinfo = NULL;
+ struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
+ union abstract_packet* ap = (union abstract_packet*) &bp->ip;
+
+ if (ctx->verbose > 1) fprintf(stderr, " [algo_lightning] 1/2 Find destination\n");
+ sprintf(url, "udp:write:127.0.0.1:%d", ap->fmt.content.udp_encapsulated.port);
+ to_fdinfo = evt_core_get_from_url (ctx, url);
+ if (to_fdinfo == NULL) {
+ fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
+ mv_buffer_wtof (&app_ctx->br, fdinfo);
+ return 1;
+ }
+
+ if (ctx->verbose > 1) fprintf(stderr, " [algo_lightning] 2/2 Move buffer\n");
+ mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
+ main_on_udp_write(ctx, to_fdinfo);
+
+ return 0;
}
int algo_lightning_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
- union abstract_packet* ap = (union abstract_packet*) &bp->ip;
-
- return 0;
+ monitoring(ctx, fdinfo, bp);
+ return deliver(ctx, fdinfo, bp);
}
void algo_lightning_update_stats (struct light_ctx *lightc, int64_t *stats) {
@@ -131,9 +195,16 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) {
.fmt.headers.cmd = CMD_LINK_MONITORING_LIGHTNING,
.fmt.headers.size = lightc->monit_pkt_size,
.fmt.headers.flags = 0,
- .fmt.content.link_monitoring_lightning.id = ++lightc->pkt_sent_id
+ .fmt.content.link_monitoring_lightning.id = lightc->pkt_sent_id
};
- buffer_append_ap (bp_dup, &monit);
+ union abstract_packet *ap_buf = buffer_append_ap (bp_dup, &monit);
+ uint8_t *links = &ap_buf->fmt.content.link_monitoring_lightning.prev_links;
+ for (int i = 0; i < lightc->sent_past_links; i++) {
+ links[i] = lightc->prev_links[lightc->pkt_sent_id - (i + 1) % MAX_LINKS];
+ }
+
+ lightc->prev_links[lightc->pkt_sent_id % MAX_LINKS] = lightc->selected_link;
+ lightc->pkt_sent_id++;
if (ctx->verbose > 1) {
dump_buffer_packet(bp_dup);
diff --git a/src/packet.c b/src/packet.c
index c447f3d..90f156a 100644
--- a/src/packet.c
+++ b/src/packet.c
@@ -228,6 +228,9 @@ void dump_abstract_packet(union abstract_packet* ap) {
case CMD_UDP_ENCAPSULATED:
printf(" port=%d\n", ap->fmt.content.udp_encapsulated.port);
break;
+ case CMD_LINK_MONITORING_LIGHTNING:
+ printf(" id=%ld\n", ap->fmt.content.link_monitoring_lightning.id);
+ break;
default:
printf(" \n");
break;
diff --git a/src/packet.h b/src/packet.h
index 4bf212c..9f59324 100644
--- a/src/packet.h
+++ b/src/packet.h
@@ -59,6 +59,7 @@ union abstract_packet {
union {
struct {
uint64_t id;
+ uint8_t prev_links;
} link_monitoring_lightning;
struct {
uint8_t to_increment;
diff --git a/src/proxy.h b/src/proxy.h
index e4371f1..0217535 100644
--- a/src/proxy.h
+++ b/src/proxy.h
@@ -89,7 +89,10 @@ static struct algo_desc available_algo[] = {
},
{
.name = "lightning",
- .init = algo_lightning_init
+ .init = algo_lightning_init,
+ .on_stream = algo_lightning_on_stream,
+ .on_datagram = algo_lightning_on_datagram,
+ .on_err = algo_lightning_on_err
}
};