It (quite) works

This commit is contained in:
Quentin 2019-10-21 18:21:57 +02:00
parent f21da52e8c
commit a88f5be07b
2 changed files with 67 additions and 14 deletions

View file

@ -95,7 +95,7 @@ ggplot(data=sqldf("select * from xa where run='out/bhTF0rd7MOI5SOPs-6'"), aes(x=
xb <- read.csv("../res/tmp_light/v.csv") xb <- read.csv("../res/tmp_light/v.csv")
xb$flag <- factor(xb$flag) xb$flag <- factor(xb$flag)
xb$link_id <- factor(xb$link_id) xb$link_id <- factor(xb$link_id)
xc <- sqldf("select *, 1.0 * latency / 1000.0 as lat from xb where vanilla = 1 and link_id = 7") xc <- sqldf("select *, 1.0 * latency / 1000.0 as lat from xb where vanilla = 1 and link_id = 5")
ggplot(data=xc, aes(x=packet_id, y=lat, color=link_id:way)) + ggplot(data=xc, aes(x=packet_id, y=lat, color=link_id:way)) +
coord_cartesian(ylim=c(100,600)) + coord_cartesian(ylim=c(100,600)) +
geom_line() + geom_line() +

View file

@ -54,12 +54,18 @@ struct timing_entry {
uint64_t pkt_id; uint64_t pkt_id;
}; };
struct link_status {
struct timespec last;
uint8_t used;
};
struct light_ctx { struct light_ctx {
uint8_t prev_links[MAX_LINKS]; uint8_t prev_links[MAX_LINKS];
int16_t remote_stats[MAX_LINKS]; int16_t remote_stats[MAX_LINKS];
int16_t local_stats[MAX_LINKS]; int16_t local_stats[MAX_LINKS];
struct timing_entry historic[HISTORIC_SIZE]; struct timing_entry historic[HISTORIC_SIZE];
struct timespec last[MAX_LINKS]; struct link_status status[MAX_LINKS];
uint8_t used;
uint64_t pkt_rcv_id; uint64_t pkt_rcv_id;
uint64_t pkt_sent_id; uint64_t pkt_sent_id;
uint64_t uniq_pkt_sent_id; uint64_t uniq_pkt_sent_id;
@ -68,6 +74,7 @@ struct light_ctx {
int fast_count; int fast_count;
int sent_past_links; int sent_past_links;
struct timespec window; struct timespec window;
struct timespec last_update_used;
size_t monit_pkt_size; size_t monit_pkt_size;
int csv; int csv;
int is_measlat; int is_measlat;
@ -93,7 +100,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
lightc->total_links = app_ctx->ap.links; lightc->total_links = app_ctx->ap.links;
lightc->selected_link = lightc->total_links - 1; lightc->selected_link = lightc->total_links - 1;
lightc->sent_past_links = lightc->total_links; lightc->sent_past_links = lightc->total_links;
lightc->fast_count = lightc->total_links / 2; lightc->fast_count = lightc->total_links / 4;
lightc->csv = 0; lightc->csv = 0;
lightc->explain = 0; lightc->explain = 0;
lightc->pkt_sent_id = 1; lightc->pkt_sent_id = 1;
@ -122,6 +129,11 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
for (int i = 0; i < lightc->sent_past_links; i++) for (int i = 0; i < lightc->sent_past_links; i++)
lightc->prev_links[i] = UINT8_MAX; lightc->prev_links[i] = UINT8_MAX;
lightc->used = lightc->fast_count * 2;
for (int i = 0; i < lightc->used; i++) {
lightc->status[i].used = 1;
}
union abstract_packet m; union abstract_packet m;
lightc->monit_pkt_size = lightc->monit_pkt_size =
sizeof(m.fmt.headers) + sizeof(m.fmt.headers) +
@ -355,7 +367,7 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) {
struct algo_ctx* app_ctx = cat->app_ctx; struct algo_ctx* app_ctx = cat->app_ctx;
struct light_ctx* lightc = app_ctx->misc; struct light_ctx* lightc = app_ctx->misc;
if (lightc->selected_link >= lightc->total_links) return 0; if (lightc->selected_link >= lightc->total_links) return 0;
set_now(&lightc->last[lightc->selected_link]); set_now(&lightc->status[lightc->selected_link].last);
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link); sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link);
struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url); struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url);
@ -417,10 +429,49 @@ void tag_packet_measlat(union abstract_packet* ap, uint8_t link_id, uint8_t is_s
} }
} }
void algo_lightning_update_used(struct light_ctx *lightc, struct stat_entry *stats, struct timespec *now) {
struct timespec not_before = {0}, oldest = *now;
timespec_diff(now, &lightc->window, &not_before);
if (timespec_gt(&lightc->last_update_used, &not_before)) return;
printf("update triggered\n");
int used_to_not = 0, not_to_used = 0;
int64_t max_ooo = 0;
for (int i = 0; i < lightc->total_links; i++) {
if (lightc->status[stats[i].link_id].used) {
if (stats[i].ooo >= max_ooo) {
max_ooo = stats[i].ooo;
used_to_not = stats[i].link_id;
}
} else {
if (timespec_lt(&lightc->status[stats[i].link_id].last, &oldest)) {
oldest = lightc->status[stats[i].link_id].last;
not_to_used = stats[i].link_id;
}
}
}
// Do we have a good link not used?
for (int i = 0; i < lightc->used; i++) {
if (!lightc->status[stats[i].link_id].used) {
not_to_used = stats[i].link_id;
break;
}
}
// Swap them
lightc->status[used_to_not].used = 0;
lightc->status[not_to_used].used = 1;
lightc->last_update_used = *now;
}
int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct light_ctx* lightc = app_ctx->misc; struct light_ctx* lightc = app_ctx->misc;
union abstract_packet* ap = (union abstract_packet*) &bp->ip; union abstract_packet* ap = (union abstract_packet*) &bp->ip;
struct timespec now, sel_link_last;
set_now(&now);
// Pad packet // Pad packet
algo_lightning_pad (ctx, fdinfo, bp); algo_lightning_pad (ctx, fdinfo, bp);
@ -428,6 +479,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
// Compute stats // Compute stats
struct stat_entry stats[MAX_LINKS] = {0}; struct stat_entry stats[MAX_LINKS] = {0};
algo_lightning_update_stats(lightc, stats); algo_lightning_update_stats(lightc, stats);
algo_lightning_update_used(lightc, stats, &now);
if (ctx->verbose > 1) { if (ctx->verbose > 1) {
printf("after sort: "); printf("after sort: ");
@ -437,35 +489,36 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
printf("\n"); printf("\n");
} }
// Init vars
struct timespec now, sel_link_last;
set_now(&now);
uint64_t now_timestamp = timespec_get_unit(&now, MILISEC); uint64_t now_timestamp = timespec_get_unit(&now, MILISEC);
// Select fast link // Select fast link
if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_FAST) { if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_FAST) {
sel_link_last = now; sel_link_last = now;
lightc->selected_link = UINT8_MAX; lightc->selected_link = UINT8_MAX;
for (int i = 0; i < lightc->fast_count; i++) { for (int i = 0, j = 0; i < lightc->total_links && j < lightc->fast_count; i++) {
if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) { if (!lightc->status[stats[i].link_id].used) continue;
if (timespec_lt (&lightc->status[stats[i].link_id].last, &sel_link_last)) {
lightc->selected_link = stats[i].link_id; lightc->selected_link = stats[i].link_id;
sel_link_last = lightc->last[stats[i].link_id]; sel_link_last = lightc->status[stats[i].link_id].last;
} }
j++;
} }
if (lightc->is_measlat) tag_packet_measlat (ap, lightc->selected_link, 0); if (lightc->is_measlat) tag_packet_measlat (ap, lightc->selected_link, 0);
send_message (ctx, bp); send_message (ctx, bp);
if (lightc->csv) printf("%ld,%d,fast\n", now_timestamp, lightc->selected_link); if (lightc->csv) printf("%ld,%d,fast\n", now_timestamp, lightc->selected_link);
} }
if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_SLOW) {
// Select slow link // Select slow link
if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_SLOW) {
sel_link_last = now; sel_link_last = now;
lightc->selected_link = UINT8_MAX; lightc->selected_link = UINT8_MAX;
for (int i = lightc->fast_count; i < lightc->total_links; i++) { for (int i = lightc->total_links-1, j = 0; i >= 0 && j < lightc->fast_count; i++) {
if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) { if (!lightc->status[stats[i].link_id].used) continue;
if (timespec_lt (&lightc->status[stats[i].link_id].last, &sel_link_last)) {
lightc->selected_link = stats[i].link_id; lightc->selected_link = stats[i].link_id;
sel_link_last = lightc->last[stats[i].link_id]; sel_link_last = lightc->status[stats[i].link_id].last;
} }
j++;
} }
if (lightc->is_measlat) tag_packet_measlat (ap, lightc->selected_link, 1); if (lightc->is_measlat) tag_packet_measlat (ap, lightc->selected_link, 1);
send_message (ctx, bp); send_message (ctx, bp);