diff --git a/r/lightning_begin.R b/r/lightning_begin.R index 8564e78..66ef5dd 100644 --- a/r/lightning_begin.R +++ b/r/lightning_begin.R @@ -95,7 +95,7 @@ ggplot(data=sqldf("select * from xa where run='out/bhTF0rd7MOI5SOPs-6'"), aes(x= xb <- read.csv("../res/tmp_light/v.csv") xb$flag <- factor(xb$flag) xb$link_id <- factor(xb$link_id) -xc <- sqldf("select *, 1.0 * latency / 1000.0 as lat from xb where vanilla = 1 and link_id = 7") +xc <- sqldf("select *, 1.0 * latency / 1000.0 as lat from xb where vanilla = 1 and link_id = 5") ggplot(data=xc, aes(x=packet_id, y=lat, color=link_id:way)) + coord_cartesian(ylim=c(100,600)) + geom_line() + diff --git a/src/algo_lightning.c b/src/algo_lightning.c index b1884d9..556c8ba 100644 --- a/src/algo_lightning.c +++ b/src/algo_lightning.c @@ -54,12 +54,18 @@ struct timing_entry { uint64_t pkt_id; }; +struct link_status { + struct timespec last; + uint8_t used; +}; + struct light_ctx { uint8_t prev_links[MAX_LINKS]; int16_t remote_stats[MAX_LINKS]; int16_t local_stats[MAX_LINKS]; struct timing_entry historic[HISTORIC_SIZE]; - struct timespec last[MAX_LINKS]; + struct link_status status[MAX_LINKS]; + uint8_t used; uint64_t pkt_rcv_id; uint64_t pkt_sent_id; uint64_t uniq_pkt_sent_id; @@ -68,6 +74,7 @@ struct light_ctx { int fast_count; int sent_past_links; struct timespec window; + struct timespec last_update_used; size_t monit_pkt_size; int csv; int is_measlat; @@ -93,7 +100,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str lightc->total_links = app_ctx->ap.links; lightc->selected_link = lightc->total_links - 1; lightc->sent_past_links = lightc->total_links; - lightc->fast_count = lightc->total_links / 2; + lightc->fast_count = lightc->total_links / 4; lightc->csv = 0; lightc->explain = 0; lightc->pkt_sent_id = 1; @@ -122,6 +129,11 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str for (int i = 0; i < lightc->sent_past_links; i++) lightc->prev_links[i] = UINT8_MAX; + lightc->used = lightc->fast_count * 2; + for (int i = 0; i < lightc->used; i++) { + lightc->status[i].used = 1; + } + union abstract_packet m; lightc->monit_pkt_size = sizeof(m.fmt.headers) + @@ -355,7 +367,7 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { struct algo_ctx* app_ctx = cat->app_ctx; struct light_ctx* lightc = app_ctx->misc; if (lightc->selected_link >= lightc->total_links) return 0; - set_now(&lightc->last[lightc->selected_link]); + set_now(&lightc->status[lightc->selected_link].last); sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link); struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url); @@ -417,10 +429,49 @@ void tag_packet_measlat(union abstract_packet* ap, uint8_t link_id, uint8_t is_s } } +void algo_lightning_update_used(struct light_ctx *lightc, struct stat_entry *stats, struct timespec *now) { + struct timespec not_before = {0}, oldest = *now; + timespec_diff(now, &lightc->window, ¬_before); + if (timespec_gt(&lightc->last_update_used, ¬_before)) return; + + printf("update triggered\n"); + + int used_to_not = 0, not_to_used = 0; + int64_t max_ooo = 0; + for (int i = 0; i < lightc->total_links; i++) { + if (lightc->status[stats[i].link_id].used) { + if (stats[i].ooo >= max_ooo) { + max_ooo = stats[i].ooo; + used_to_not = stats[i].link_id; + } + } else { + if (timespec_lt(&lightc->status[stats[i].link_id].last, &oldest)) { + oldest = lightc->status[stats[i].link_id].last; + not_to_used = stats[i].link_id; + } + } + } + + // Do we have a good link not used? + for (int i = 0; i < lightc->used; i++) { + if (!lightc->status[stats[i].link_id].used) { + not_to_used = stats[i].link_id; + break; + } + } + + // Swap them + lightc->status[used_to_not].used = 0; + lightc->status[not_to_used].used = 1; + lightc->last_update_used = *now; +} + int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct light_ctx* lightc = app_ctx->misc; union abstract_packet* ap = (union abstract_packet*) &bp->ip; + struct timespec now, sel_link_last; + set_now(&now); // Pad packet algo_lightning_pad (ctx, fdinfo, bp); @@ -428,6 +479,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* // Compute stats struct stat_entry stats[MAX_LINKS] = {0}; algo_lightning_update_stats(lightc, stats); + algo_lightning_update_used(lightc, stats, &now); if (ctx->verbose > 1) { printf("after sort: "); @@ -437,35 +489,36 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* printf("\n"); } - // Init vars - struct timespec now, sel_link_last; - set_now(&now); uint64_t now_timestamp = timespec_get_unit(&now, MILISEC); // Select fast link if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_FAST) { sel_link_last = now; lightc->selected_link = UINT8_MAX; - for (int i = 0; i < lightc->fast_count; i++) { - if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) { + for (int i = 0, j = 0; i < lightc->total_links && j < lightc->fast_count; i++) { + if (!lightc->status[stats[i].link_id].used) continue; + if (timespec_lt (&lightc->status[stats[i].link_id].last, &sel_link_last)) { lightc->selected_link = stats[i].link_id; - sel_link_last = lightc->last[stats[i].link_id]; + sel_link_last = lightc->status[stats[i].link_id].last; } + j++; } if (lightc->is_measlat) tag_packet_measlat (ap, lightc->selected_link, 0); send_message (ctx, bp); if (lightc->csv) printf("%ld,%d,fast\n", now_timestamp, lightc->selected_link); } + // Select slow link if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_SLOW) { - // Select slow link sel_link_last = now; lightc->selected_link = UINT8_MAX; - for (int i = lightc->fast_count; i < lightc->total_links; i++) { - if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) { + for (int i = lightc->total_links-1, j = 0; i >= 0 && j < lightc->fast_count; i++) { + if (!lightc->status[stats[i].link_id].used) continue; + if (timespec_lt (&lightc->status[stats[i].link_id].last, &sel_link_last)) { lightc->selected_link = stats[i].link_id; - sel_link_last = lightc->last[stats[i].link_id]; + sel_link_last = lightc->status[stats[i].link_id].last; } + j++; } if (lightc->is_measlat) tag_packet_measlat (ap, lightc->selected_link, 1); send_message (ctx, bp);