Refactor algorithm

This commit is contained in:
Quentin 2019-10-08 14:47:35 +02:00
parent 192f355121
commit bf9663116a

View file

@ -14,6 +14,11 @@ enum ooo_state {
OOO_DONE OOO_DONE
}; };
struct stat_entry {
uint8_t link_id;
int64_t max_ooo;
};
struct timing_entry { struct timing_entry {
enum ooo_state state; enum ooo_state state;
struct timespec detected_at; struct timespec detected_at;
@ -30,7 +35,7 @@ struct light_ctx {
uint64_t pkt_sent_id; uint64_t pkt_sent_id;
uint8_t selected_link; uint8_t selected_link;
uint8_t total_links; uint8_t total_links;
int max_ooo; int fast_count;
int sleep_duration; int sleep_duration;
int sent_past_links; int sent_past_links;
struct timespec window; struct timespec window;
@ -54,17 +59,17 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
lightc->total_links = app_ctx->ap.links; lightc->total_links = app_ctx->ap.links;
lightc->selected_link = lightc->total_links - 1; lightc->selected_link = lightc->total_links - 1;
lightc->sent_past_links = lightc->total_links; lightc->sent_past_links = lightc->total_links;
lightc->max_ooo = 50; lightc->fast_count = lightc->total_links / 2;
lightc->sleep_duration = 500; lightc->sleep_duration = 500;
uint64_t window = 5000; uint64_t window = 2000;
if (ap->algo_specific_params != NULL) { if (ap->algo_specific_params != NULL) {
char *parse_ptr, *token, *params; char *parse_ptr, *token, *params;
for (params = ap->algo_specific_params; ; params = NULL) { for (params = ap->algo_specific_params; ; params = NULL) {
token = strtok_r(params, ",", &parse_ptr); token = strtok_r(params, ",", &parse_ptr);
if (token == NULL) break; if (token == NULL) break;
sscanf(token, "max_ooo=%d", &lightc->max_ooo); sscanf(token, "fast_count=%d", &lightc->fast_count);
sscanf(token, "recovery=%d", &lightc->sleep_duration); sscanf(token, "recovery=%d", &lightc->sleep_duration);
sscanf(token, "window=%ld", &window); sscanf(token, "window=%ld", &window);
sscanf(token, "sent_past_links=%d", &lightc->sent_past_links); sscanf(token, "sent_past_links=%d", &lightc->sent_past_links);
@ -78,7 +83,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
lightc->monit_pkt_size = sizeof(m.fmt.headers) + sizeof(m.fmt.content.link_monitoring_lightning) + sizeof(uint8_t) * (lightc->sent_past_links - 1); lightc->monit_pkt_size = sizeof(m.fmt.headers) + sizeof(m.fmt.content.link_monitoring_lightning) + sizeof(uint8_t) * (lightc->sent_past_links - 1);
timespec_set_unit (&lightc->window, window, MILISEC); timespec_set_unit (&lightc->window, window, MILISEC);
printf("max_ooo = %d ms\n", lightc->max_ooo); printf("fast_count = %d\n", lightc->fast_count);
printf("recovery = %d ms\n", lightc->sleep_duration); printf("recovery = %d ms\n", lightc->sleep_duration);
printf("window check = %ld ms\n", window); printf("window check = %ld ms\n", window);
printf("sent_past_links = %d\n", lightc->sent_past_links); printf("sent_past_links = %d\n", lightc->sent_past_links);
@ -149,12 +154,22 @@ int algo_lightning_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* f
return deliver(ctx, fdinfo, bp); return deliver(ctx, fdinfo, bp);
} }
void algo_lightning_update_stats (struct light_ctx *lightc, int64_t *stats) { int compare_stat_entry_max(const void *a, const void *b) {
const struct stat_entry *sea = a, *seb = b;
if (sea->max_ooo == -1) return 1;
if (seb->max_ooo == -1) return -1;
return sea->max_ooo - seb->max_ooo;
}
void algo_lightning_update_stats (struct light_ctx *lightc, struct stat_entry *stats) {
struct timespec now, not_before = {0}, temp_time; struct timespec now, not_before = {0}, temp_time;
set_now(&now); set_now(&now);
timespec_diff (&now, &lightc->window, &not_before); timespec_diff (&now, &lightc->window, &not_before);
for (int i = 0; i < lightc->total_links; i++) stats[i] = -1; for (int i = 0; i < lightc->total_links; i++) {
stats[i].link_id = i;
stats[i].max_ooo = -1;
}
for (int i = 0; i < HISTORIC_SIZE; i++) { for (int i = 0; i < HISTORIC_SIZE; i++) {
if (timespec_lt(&lightc->historic[i].finished_at, &not_before)) continue; if (timespec_lt(&lightc->historic[i].finished_at, &not_before)) continue;
@ -173,9 +188,11 @@ void algo_lightning_update_stats (struct light_ctx *lightc, int64_t *stats) {
delta = timespec_get_unit (&temp_time, MILISEC); delta = timespec_get_unit (&temp_time, MILISEC);
break; break;
} }
stats[l].link_id = l;
stats[l] = delta > stats[l] ? delta : stats[l]; stats[l].max_ooo = delta > stats[l].max_ooo ? delta : stats[l].max_ooo;
} }
qsort(stats, lightc->total_links, sizeof(struct stat_entry), compare_stat_entry_max);
} }
int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) {
@ -188,6 +205,7 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) {
struct algo_ctx* app_ctx = cat->app_ctx; struct algo_ctx* app_ctx = cat->app_ctx;
struct light_ctx* lightc = app_ctx->misc; struct light_ctx* lightc = app_ctx->misc;
if (lightc->selected_link >= lightc->total_links) return 0;
set_now(&lightc->last[lightc->selected_link]); set_now(&lightc->last[lightc->selected_link]);
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link); sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link);
@ -226,43 +244,44 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
union abstract_packet* ap = (union abstract_packet*) &bp->ip; union abstract_packet* ap = (union abstract_packet*) &bp->ip;
// Last step, send packet // Last step, send packet
int64_t stats[MAX_LINKS]; struct stat_entry stats[MAX_LINKS] = {0};
algo_lightning_update_stats(lightc, stats); algo_lightning_update_stats(lightc, stats);
printf("retained stat: "); if (ctx->verbose > 1) {
for (int i = 0; i < lightc->total_links; i++) { printf("after sort: ");
printf("%ld, ", stats[i]); for (int i = 0; i < lightc->total_links; i++) {
printf("%d (%ld), ", stats[i].link_id, stats[i].max_ooo);
}
printf("\n");
} }
printf("\n");
struct timespec now, temp_time; struct timespec now, sel_link_last, temp_time;
set_now(&now); set_now(&now);
int protection = lightc->total_links; // Select fast link
while (protection-- > 0) { sel_link_last = now;
lightc->selected_link = (lightc->selected_link + 1) % lightc->total_links; lightc->selected_link = UINT8_MAX;
int64_t cdelta = stats[lightc->selected_link]; for (int i = 0; i < lightc->fast_count; i++) {
if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) {
// OK link lightc->selected_link = stats[i].link_id;
if (cdelta < lightc->max_ooo) { sel_link_last = lightc->last[stats[i].link_id];
send_message (ctx, bp);
break;
} }
}
send_message (ctx, bp);
// Broken link that must be probed // Select slow link
timespec_diff (&now, &lightc->last[lightc->selected_link], &temp_time); sel_link_last = now;
uint64_t elapsed = timespec_get_unit(&temp_time, MILISEC); lightc->selected_link = UINT8_MAX;
if (elapsed >= lightc->sleep_duration) { for (int i = lightc->fast_count; i < lightc->total_links; i++) {
send_message (ctx, bp); if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) {
continue; lightc->selected_link = stats[i].link_id;
sel_link_last = lightc->last[stats[i].link_id];
} }
// Broken link probed recently
}
if (protection <= 0) {
fprintf(stderr, "Schedule error\n");
exit(EXIT_FAILURE);
} }
timespec_diff (&now, &sel_link_last, &temp_time);
uint64_t elapsed = timespec_get_unit(&temp_time, MILISEC);
if (elapsed >= lightc->sleep_duration)
send_message (ctx, bp);
mv_buffer_rtof (&app_ctx->br, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
return 0; return 0;