diff --git a/src/algo_lightning.c b/src/algo_lightning.c index 4061b3a..9b39109 100644 --- a/src/algo_lightning.c +++ b/src/algo_lightning.c @@ -77,7 +77,7 @@ struct light_ctx { int16_t local_stats[MAX_LINKS]; struct timing_entry historic[HISTORIC_SIZE]; struct link_status status[MAX_LINKS]; - uint8_t used; + uint8_t active; uint64_t pkt_rcv_id; uint64_t pkt_sent_id; uint64_t uniq_pkt_sent_id; @@ -119,6 +119,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str lightc->pkt_sent_id = 1; lightc->uniq_pkt_sent_id = 1; lightc->disable_scheduler = 0; + lightc->active = 0; lightc->sched_strat = SCHEDULE_BOTH; uint64_t window = 2000; @@ -142,9 +143,8 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str for (int i = 0; i < lightc->sent_past_links; i++) lightc->prev_links[i] = UINT8_MAX; - lightc->used = lightc->fast_count * 2; for (int i = 0; i < lightc->total_links; i++) { - lightc->status[i].used = i < lightc->used ? LINK_SLOW : LINK_NOT_USED; + lightc->status[i].used = LINK_NOT_USED; } union abstract_packet m; @@ -294,9 +294,11 @@ int algo_lightning_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* f int compare_stat_entry_max(const void *a, const void *b) { const struct stat_entry *sea = a, *seb = b; - if (sea->ooo == -1) return 1; - if (seb->ooo == -1) return -1; - return sea->ooo - seb->ooo; + int ra = sea->ooo; + int rb = seb->ooo; + if (ra < 0) ra = INT16_MAX + -ra; + if (rb < 0) rb = INT16_MAX + -rb; + return ra - rb; } void algo_lightning_update_stats (struct light_ctx *lightc, struct evt_core_ctx* ctx) { @@ -347,15 +349,16 @@ void algo_lightning_update_stats (struct light_ctx *lightc, struct evt_core_ctx* for (int i = 0; i < lightc->total_links; i++) { if (lightc->stats[i].meas_occ <= 0) lightc->stats[i].ooo = -1; else lightc->stats[i].ooo = lightc->stats[i].ooo / lightc->stats[i].meas_occ; + + lightc->local_stats[i] = lightc->stats[i].ooo; } // Set my local stats + merge remote stats for (int i = 0; i < lightc->total_links; i++) { - lightc->local_stats[i] = lightc->stats[i].ooo; /* AVG */ - if (lightc->remote_stats[i] == -1) continue; - if (lightc->stats[i].ooo == -1) lightc->stats[i].ooo = lightc->remote_stats[i]; + if (lightc->remote_stats[i] < 0) continue; + if (lightc->stats[i].ooo < 0) lightc->stats[i].ooo = lightc->remote_stats[i]; else lightc->stats[i].ooo = (lightc->remote_stats[i] + lightc->stats[i].ooo) / 2; /* MAX @@ -370,7 +373,7 @@ void algo_lightning_update_stats (struct light_ctx *lightc, struct evt_core_ctx* for (int i = 0; i < lightc->total_links; i++) { sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + i); struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) lightc->stats[i].ooo = -1; + if (to_fdinfo == NULL) lightc->stats[i].ooo = -2; } // Sort @@ -463,16 +466,17 @@ void algo_lightning_update_used(struct light_ctx *lightc, struct timespec *now) timespec_diff(now, &lightc->window, ¬_before); if (timespec_gt(&lightc->last_update_used, ¬_before)) return; - int used_to_not = 0, not_to_used = 0; + int used_to_not = -1, not_to_used = -1; int64_t max_ooo = 0; for (int i = 0; i < lightc->total_links; i++) { if (lightc->status[lightc->stats[i].link_id].used == LINK_FAST || lightc->status[lightc->stats[i].link_id].used == LINK_SLOW) { - int64_t retained_ooo = lightc->stats[i].ooo == -1 ? INT64_MAX : lightc->stats[i].ooo; + int64_t retained_ooo = lightc->stats[i].ooo < 0 ? INT64_MAX : lightc->stats[i].ooo; if (retained_ooo >= max_ooo) { max_ooo = retained_ooo; used_to_not = lightc->stats[i].link_id; } } else { + if (lightc->stats[i].ooo == -2) continue; if (timespec_lt(&lightc->status[lightc->stats[i].link_id].last, &oldest)) { oldest = lightc->status[lightc->stats[i].link_id].last; not_to_used = lightc->stats[i].link_id; @@ -480,27 +484,20 @@ void algo_lightning_update_used(struct light_ctx *lightc, struct timespec *now) } } - // Do we have a good link not used? - /*for (int i = 0; i < lightc->used; i++) { - if (lightc->status[lightc->stats[i].link_id].used == LINK_NOT_USED) { - not_to_used = lightc->stats[i].link_id; - break; - } - }*/ - // Swap them //printf("Link %d will be disabled, %d will be enabled\n", used_to_not, not_to_used); + if (used_to_not < 0 || not_to_used < 0) return; lightc->status[used_to_not].used = LINK_NOT_USED; lightc->status[not_to_used].used = LINK_SLOW; lightc->last_update_used = *now; } -void algo_lightning_link_cat(struct light_ctx *lightc) { +void algo_lightning_link_cat(struct light_ctx *lightc, int cur_fast_count) { uint8_t used = 0; //printf("---\n"); for (int i = 0; i < lightc->total_links; i++) { if (lightc->status[lightc->stats[i].link_id].used != LINK_NOT_USED) { - if (used < lightc->fast_count) lightc->status[lightc->stats[i].link_id].used = LINK_FAST; + if (used < cur_fast_count) lightc->status[lightc->stats[i].link_id].used = LINK_FAST; else lightc->status[lightc->stats[i].link_id].used = LINK_SLOW; used++; } @@ -518,15 +515,34 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* // Pad packet algo_lightning_pad (ctx, fdinfo, bp); - // Compute stats + // Prepare links algo_lightning_update_stats(lightc, ctx); - algo_lightning_update_used(lightc, &now); - algo_lightning_link_cat(lightc); - if (ctx->verbose > 1) { - printf("after sort: "); + // Adapt tags quantity to active links + struct evt_core_cat *cat = evt_core_get_from_cat (ctx, "tcp-write"); + int target_to_use = lightc->fast_count*2 < cat->socklist->len ? lightc->fast_count*2 : cat->socklist->len; + int diff = target_to_use - ((int) lightc->active); + for (int i = 0; i < lightc->total_links && diff > 0; i++) { + if (lightc->status[lightc->stats[i].link_id].used != LINK_NOT_USED) continue; + lightc->status[lightc->stats[i].link_id].used = LINK_SLOW; + diff--; + } + for (int i = lightc->total_links-1; i >= 0 && diff < 0; i--) { + if (lightc->status[lightc->stats[i].link_id].used == LINK_NOT_USED) continue; + lightc->status[lightc->stats[i].link_id].used = LINK_NOT_USED; + diff++; + } + + lightc->active = target_to_use; + + // Update link tags + algo_lightning_update_used(lightc, &now); + algo_lightning_link_cat(lightc, target_to_use/2); + + if (ctx->verbose > 1 || TRUE) { + printf("link ranking (%d fast links, %d total links)\nposition | port | score | class \n", target_to_use/2, target_to_use); for (int i = 0; i < lightc->total_links; i++) { - printf("%d (%ld), ", lightc->stats[i].link_id, lightc->stats[i].ooo); + printf("%8d | %4d | %9ld | %9s \n", i, lightc->stats[i].link_id+7500, lightc->stats[i].ooo, link_cat_str[lightc->status[lightc->stats[i].link_id].used]); } printf("\n"); } @@ -537,7 +553,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* if (lightc->sched_strat == SCHEDULE_BOTH || lightc->sched_strat == SCHEDULE_FAST) { sel_link_last = now; lightc->selected_link = UINT8_MAX; - for (int i = 0, j = 0; i < lightc->total_links && j < lightc->fast_count; i++) { + for (int i = 0; i < lightc->total_links; i++) { if (lightc->status[lightc->stats[i].link_id].used != LINK_FAST) continue; if (timespec_lt (&lightc->status[lightc->stats[i].link_id].last, &sel_link_last)) { lightc->selected_link = lightc->stats[i].link_id; diff --git a/src/dcall.c b/src/dcall.c index e4f9e23..6cf9313 100644 --- a/src/dcall.c +++ b/src/dcall.c @@ -241,6 +241,7 @@ int main(int argc, char *argv[]) { break; case 'c': de.local_port = atoi(optarg); + break; case 'd': de.gstreamer_log_path = optarg; break;