diff --git a/src/algo_lightning.c b/src/algo_lightning.c index dc5930b..6683c34 100644 --- a/src/algo_lightning.c +++ b/src/algo_lightning.c @@ -14,6 +14,11 @@ enum ooo_state { OOO_DONE }; +struct stat_entry { + uint8_t link_id; + int64_t max_ooo; +}; + struct timing_entry { enum ooo_state state; struct timespec detected_at; @@ -30,7 +35,7 @@ struct light_ctx { uint64_t pkt_sent_id; uint8_t selected_link; uint8_t total_links; - int max_ooo; + int fast_count; int sleep_duration; int sent_past_links; struct timespec window; @@ -54,17 +59,17 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str lightc->total_links = app_ctx->ap.links; lightc->selected_link = lightc->total_links - 1; lightc->sent_past_links = lightc->total_links; - lightc->max_ooo = 50; + lightc->fast_count = lightc->total_links / 2; lightc->sleep_duration = 500; - uint64_t window = 5000; + uint64_t window = 2000; if (ap->algo_specific_params != NULL) { char *parse_ptr, *token, *params; for (params = ap->algo_specific_params; ; params = NULL) { token = strtok_r(params, ",", &parse_ptr); if (token == NULL) break; - sscanf(token, "max_ooo=%d", &lightc->max_ooo); + sscanf(token, "fast_count=%d", &lightc->fast_count); sscanf(token, "recovery=%d", &lightc->sleep_duration); sscanf(token, "window=%ld", &window); sscanf(token, "sent_past_links=%d", &lightc->sent_past_links); @@ -78,7 +83,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str lightc->monit_pkt_size = sizeof(m.fmt.headers) + sizeof(m.fmt.content.link_monitoring_lightning) + sizeof(uint8_t) * (lightc->sent_past_links - 1); timespec_set_unit (&lightc->window, window, MILISEC); - printf("max_ooo = %d ms\n", lightc->max_ooo); + printf("fast_count = %d\n", lightc->fast_count); printf("recovery = %d ms\n", lightc->sleep_duration); printf("window check = %ld ms\n", window); printf("sent_past_links = %d\n", lightc->sent_past_links); @@ -149,12 +154,22 @@ int algo_lightning_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* f return deliver(ctx, fdinfo, bp); } -void algo_lightning_update_stats (struct light_ctx *lightc, int64_t *stats) { +int compare_stat_entry_max(const void *a, const void *b) { + const struct stat_entry *sea = a, *seb = b; + if (sea->max_ooo == -1) return 1; + if (seb->max_ooo == -1) return -1; + return sea->max_ooo - seb->max_ooo; +} + +void algo_lightning_update_stats (struct light_ctx *lightc, struct stat_entry *stats) { struct timespec now, not_before = {0}, temp_time; set_now(&now); timespec_diff (&now, &lightc->window, ¬_before); - for (int i = 0; i < lightc->total_links; i++) stats[i] = -1; + for (int i = 0; i < lightc->total_links; i++) { + stats[i].link_id = i; + stats[i].max_ooo = -1; + } for (int i = 0; i < HISTORIC_SIZE; i++) { if (timespec_lt(&lightc->historic[i].finished_at, ¬_before)) continue; @@ -173,9 +188,11 @@ void algo_lightning_update_stats (struct light_ctx *lightc, int64_t *stats) { delta = timespec_get_unit (&temp_time, MILISEC); break; } - - stats[l] = delta > stats[l] ? delta : stats[l]; + stats[l].link_id = l; + stats[l].max_ooo = delta > stats[l].max_ooo ? delta : stats[l].max_ooo; } + + qsort(stats, lightc->total_links, sizeof(struct stat_entry), compare_stat_entry_max); } int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { @@ -188,6 +205,7 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { struct algo_ctx* app_ctx = cat->app_ctx; struct light_ctx* lightc = app_ctx->misc; + if (lightc->selected_link >= lightc->total_links) return 0; set_now(&lightc->last[lightc->selected_link]); sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link); @@ -226,43 +244,44 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* union abstract_packet* ap = (union abstract_packet*) &bp->ip; // Last step, send packet - int64_t stats[MAX_LINKS]; + struct stat_entry stats[MAX_LINKS] = {0}; algo_lightning_update_stats(lightc, stats); - printf("retained stat: "); - for (int i = 0; i < lightc->total_links; i++) { - printf("%ld, ", stats[i]); + if (ctx->verbose > 1) { + printf("after sort: "); + for (int i = 0; i < lightc->total_links; i++) { + printf("%d (%ld), ", stats[i].link_id, stats[i].max_ooo); + } + printf("\n"); } - printf("\n"); - struct timespec now, temp_time; + struct timespec now, sel_link_last, temp_time; set_now(&now); - int protection = lightc->total_links; - while (protection-- > 0) { - lightc->selected_link = (lightc->selected_link + 1) % lightc->total_links; - int64_t cdelta = stats[lightc->selected_link]; - - // OK link - if (cdelta < lightc->max_ooo) { - send_message (ctx, bp); - break; + // Select fast link + sel_link_last = now; + lightc->selected_link = UINT8_MAX; + for (int i = 0; i < lightc->fast_count; i++) { + if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) { + lightc->selected_link = stats[i].link_id; + sel_link_last = lightc->last[stats[i].link_id]; } + } + send_message (ctx, bp); - // Broken link that must be probed - timespec_diff (&now, &lightc->last[lightc->selected_link], &temp_time); - uint64_t elapsed = timespec_get_unit(&temp_time, MILISEC); - if (elapsed >= lightc->sleep_duration) { - send_message (ctx, bp); - continue; + // Select slow link + sel_link_last = now; + lightc->selected_link = UINT8_MAX; + for (int i = lightc->fast_count; i < lightc->total_links; i++) { + if (timespec_lt (&lightc->last[stats[i].link_id], &sel_link_last)) { + lightc->selected_link = stats[i].link_id; + sel_link_last = lightc->last[stats[i].link_id]; } - - // Broken link probed recently - } - if (protection <= 0) { - fprintf(stderr, "Schedule error\n"); - exit(EXIT_FAILURE); } + timespec_diff (&now, &sel_link_last, &temp_time); + uint64_t elapsed = timespec_get_unit(&temp_time, MILISEC); + if (elapsed >= lightc->sleep_duration) + send_message (ctx, bp); mv_buffer_rtof (&app_ctx->br, fdinfo); return 0;