This commit is contained in:
Quentin 2019-08-29 17:53:49 +02:00
parent f28674a79d
commit 5ed70a0848
2 changed files with 30 additions and 3 deletions

View file

@ -99,6 +99,7 @@ void pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer
} }
int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
char url[256];
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct thunder_ctx* thunderc = app_ctx->misc; struct thunder_ctx* thunderc = app_ctx->misc;
struct evt_core_fdinfo *to_fdinfo = NULL; struct evt_core_fdinfo *to_fdinfo = NULL;
@ -110,7 +111,13 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] No link available, packet will be dropped\n"); if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] No link available, packet will be dropped\n");
break; break;
} }
thunderc->selected_link = (thunderc->selected_link + 1) % cat->socklist->len;
to_fdinfo = NULL;
do {
thunderc->selected_link = (thunderc->selected_link + 1) % thunderc->total_links;
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + thunderc->selected_link);
to_fdinfo = evt_core_get_from_url (ctx, url);
} while (to_fdinfo == NULL);
// 2. We create the packet template // 2. We create the packet template
union abstract_packet links = { union abstract_packet links = {
@ -120,8 +127,6 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
.fmt.content.link_monitoring_thunder.links_status = {} .fmt.content.link_monitoring_thunder.links_status = {}
}; };
to_fdinfo = g_array_index(cat->socklist, struct evt_core_fdinfo*, thunderc->selected_link);
// 3. We append the template to the buffer // 3. We append the template to the buffer
struct buffer_packet* bp_dup = dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); struct buffer_packet* bp_dup = dup_buffer_tow (&app_ctx->br, bp, to_fdinfo);
union abstract_packet *new_ap = buffer_append_ap (bp_dup, &links); union abstract_packet *new_ap = buffer_append_ap (bp_dup, &links);
@ -142,6 +147,7 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
fprintf(stderr, " [algo_thunder] Will send this info\n"); fprintf(stderr, " [algo_thunder] Will send this info\n");
} }
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
} while (is_blacklisted (thunderc, thunderc->selected_link)); } while (is_blacklisted (thunderc, thunderc->selected_link));
if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Packets sent\n"); if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Packets sent\n");
@ -161,6 +167,7 @@ void on_block (struct evt_core_ctx* ctx, void* raw) {
if (thunderc->received_pkts_on_link[bi->i] >= bi->missing) goto release; if (thunderc->received_pkts_on_link[bi->i] >= bi->missing) goto release;
if (thunderc->blacklisted[bi->i] >= bi->missing) goto release; if (thunderc->blacklisted[bi->i] >= bi->missing) goto release;
//printf("[algo_thunder] Blacklisting link %d\n", bi->i);
thunderc->blacklisted[bi->i] = bi->missing; thunderc->blacklisted[bi->i] = bi->missing;
release: release:
@ -178,8 +185,13 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (ap->fmt.headers.flags & FLAG_RESET) {
for (int i = 0; i < MAX_LINKS; i++) thunderc->received_pkts_on_link[i] = 1;
}
int link_id = url_get_port_int(fdinfo->url) - 7500; int link_id = url_get_port_int(fdinfo->url) - 7500;
thunderc->received_pkts_on_link[link_id]++; thunderc->received_pkts_on_link[link_id]++;
printf("Received %ld packets on link %d\n", thunderc->received_pkts_on_link[link_id], link_id);
struct link_info *li = &ap->fmt.content.link_monitoring_thunder.links_status; struct link_info *li = &ap->fmt.content.link_monitoring_thunder.links_status;
for (uint8_t i = 0; i < thunderc->total_links; i++) { for (uint8_t i = 0; i < thunderc->total_links; i++) {
@ -193,12 +205,20 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b
bi->i = i; bi->app_ctx = app_ctx; bi->missing = expected; bi->i = i; bi->app_ctx = app_ctx; bi->missing = expected;
set_timeout (ctx, timeout, bi, on_block); set_timeout (ctx, timeout, bi, on_block);
printf(" Triggered timeout for link %d in %ldms (expected: %ld, seen: %ld)\n", i, timeout, expected, thunderc->received_pkts_on_link[i]);
if (ctx->verbose > 1) { if (ctx->verbose > 1) {
fprintf(stderr, " [algo_thunder] Set timeout on link %d of %ld ms (packets expected: %ld, seen: %ld)\n", fprintf(stderr, " [algo_thunder] Set timeout on link %d of %ld ms (packets expected: %ld, seen: %ld)\n",
i, timeout, expected, thunderc->received_pkts_on_link[i]); i, timeout, expected, thunderc->received_pkts_on_link[i]);
} }
} }
if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Classify done\n"); if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Classify done\n");
printf("Blacklisted links: ");
for (int i = 0; i < thunderc->total_links; i++) {
if (is_blacklisted (thunderc, i)) printf("_");
else printf("U");
}
printf("\n");
} }
struct unpad_info { struct unpad_info {
@ -232,6 +252,7 @@ void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff
struct thunder_ctx* thunderc = app_ctx->misc; struct thunder_ctx* thunderc = app_ctx->misc;
char url[256]; char url[256];
struct evt_core_fdinfo *to_fdinfo = NULL; struct evt_core_fdinfo *to_fdinfo = NULL;
uint64_t delivered = 0;
for (int i = ui->ap_arr_vals-1; i >= 0; i--) { for (int i = ui->ap_arr_vals-1; i >= 0; i--) {
//fprintf(stderr, "i=%d, ui->ap_arr_vals=%d\n", i, ui->ap_arr_vals); //fprintf(stderr, "i=%d, ui->ap_arr_vals=%d\n", i, ui->ap_arr_vals);
@ -250,6 +271,11 @@ void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff
//dump_buffer_packet (bp_dest); //dump_buffer_packet (bp_dest);
buffer_append_ap (bp_dest, ui->ap_arr_pl[i]); buffer_append_ap (bp_dest, ui->ap_arr_pl[i]);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
delivered++;
}
if (delivered != 1) {
//printf("[algo_thunder] Delivered %ld packets (now id=%d)\n", delivered, thunderc->recv_id);
} }
mv_buffer_rtof (&app_ctx->br, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);

View file

@ -39,6 +39,7 @@ enum PKT_CMD {
enum PKT_FLAGS { enum PKT_FLAGS {
FLAG_READ_NEXT = 1 << 0, FLAG_READ_NEXT = 1 << 0,
FLAG_RESET = 1 << 1,
}; };
struct link_info { struct link_info {