Fix a first set of bugs

This commit is contained in:
Quentin 2019-08-28 14:57:20 +02:00
parent 4d5dd554c5
commit a1d58c1203
5 changed files with 66 additions and 26 deletions

View file

@ -17,7 +17,7 @@ struct thunder_ctx {
uint8_t total_links; uint8_t total_links;
uint8_t delta_t_per_link[MAX_LINKS]; uint8_t delta_t_per_link[MAX_LINKS];
uint64_t received_pkts_on_link[MAX_LINKS]; uint64_t received_pkts_on_link[MAX_LINKS];
uint8_t blacklisted[MAX_LINKS]; uint64_t blacklisted[MAX_LINKS];
size_t monit_pkt_size; size_t monit_pkt_size;
struct timespec prev_link_time, prev_packet_time; struct timespec prev_link_time, prev_packet_time;
}; };
@ -125,6 +125,10 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
} }
li[thunderc->selected_link].delta_t = 0; li[thunderc->selected_link].delta_t = 0;
if (ctx->verbose > 1) {
dump_buffer_packet(bp_dup);
fprintf(stderr, " [algo_thunder] Will send this info\n");
}
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
} while (is_blacklisted (thunderc, thunderc->selected_link)); } while (is_blacklisted (thunderc, thunderc->selected_link));
@ -136,20 +140,6 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
return 0; return 0;
} }
void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) {
app_ctx->misc = malloc(sizeof(struct thunder_ctx));
if (app_ctx->misc == NULL) {
perror("malloc failed in algo thunder init");
exit(EXIT_FAILURE);
}
memset(app_ctx->misc, 0, sizeof(struct thunder_ctx));
struct thunder_ctx* thunderc = app_ctx->misc;
thunderc->selected_link = UINT8_MAX - 1;
union abstract_packet links = {};
thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1);
}
struct block_info { uint8_t i; struct algo_ctx* app_ctx; uint64_t missing; }; struct block_info { uint8_t i; struct algo_ctx* app_ctx; uint64_t missing; };
void on_block (struct evt_core_ctx* ctx, void* raw) { void on_block (struct evt_core_ctx* ctx, void* raw) {
@ -219,8 +209,29 @@ void unpad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff
} }
} }
void adapt() { void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct unpad_info *ui) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct thunder_ctx* thunderc = app_ctx->misc;
char url[256];
struct evt_core_fdinfo *to_fdinfo = NULL;
for (uint8_t i = ui->ap_arr_vals-1; i >= 0; i--) {
if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue;
thunderc->recv_id = ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id;
// Find destination
sprintf(url, "udp:write:127.0.0.1:%d", ui->ap_arr_pl[i]->fmt.content.udp_encapsulated.port);
to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
}
struct buffer_packet *bp_dest = inject_buffer_tow (&app_ctx->br, to_fdinfo);
buffer_append_ap (bp_dest, ui->ap_arr_pl[i]);
main_on_udp_write(ctx, to_fdinfo);
}
mv_buffer_rtof (&app_ctx->br, fdinfo);
} }
int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
@ -228,14 +239,34 @@ int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi
classify(ctx, fdinfo, bp); classify(ctx, fdinfo, bp);
unpad(ctx, fdinfo, bp, &ui); unpad(ctx, fdinfo, bp, &ui);
adapt(); adapt(ctx, fdinfo, bp, &ui);
return 0; return 0;
} }
int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
prepare(ctx, fdinfo, bp); prepare(ctx, fdinfo, bp);
pad(ctx, fdinfo, bp); pad(ctx, fdinfo, bp);
return schedule(ctx, fdinfo, bp); schedule(ctx, fdinfo, bp);
return 0;
}
void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) {
app_ctx->misc = malloc(sizeof(struct thunder_ctx));
if (app_ctx->misc == NULL) {
perror("malloc failed in algo thunder init");
exit(EXIT_FAILURE);
}
memset(app_ctx->misc, 0, sizeof(struct thunder_ctx));
struct thunder_ctx* thunderc = app_ctx->misc;
thunderc->recv_id = 1;
thunderc->emit_id = 1;
thunderc->total_links = app_ctx->ap.links;
thunderc->selected_link = thunderc->total_links;
for (int i = 0; i < MAX_LINKS; i++) thunderc->received_pkts_on_link[i] = 1;
union abstract_packet links = {};
//fprintf(stderr, "Total links %d\n", thunderc->total_links);
thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1);
} }
int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) {

View file

@ -207,7 +207,7 @@ void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) {
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { struct buffer_packet* inject_buffer_tow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
// 1. We get a free buffer // 1. We get a free buffer
@ -217,21 +217,28 @@ struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct bu
return NULL; return NULL;
} }
// 2. We duplicate the data // 2. We get the target writing queue
memcpy(bp_dest, bp, sizeof(struct buffer_packet));
// 3. We get the target writing queue
q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd));
if (q == NULL) { if (q == NULL) {
q = g_queue_new (); q = g_queue_new ();
g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q);
} }
// 4. We push the content to the appropriate destination // 3. We push the content to the appropriate destination
g_queue_push_tail(q, bp_dest); g_queue_push_tail(q, bp_dest);
return bp_dest; return bp_dest;
} }
struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) {
// 1. Inject a new buffer
struct buffer_packet* bp_dest = inject_buffer_tow (app_ctx, to);
// 2. We duplicate the data
memcpy(bp_dest, bp, sizeof(struct buffer_packet));
return bp_dest;
}
struct buffer_packet* dup_buffer_toa(struct buffer_resources *app_ctx, struct buffer_packet* bp, void* to) { struct buffer_packet* dup_buffer_toa(struct buffer_resources *app_ctx, struct buffer_packet* bp, void* to) {
GQueue* q; GQueue* q;

View file

@ -23,6 +23,8 @@ void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* fr
void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to);
void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to);
void mv_buffer_atof(struct buffer_resources* app_ctx, void* from); void mv_buffer_atof(struct buffer_resources* app_ctx, void* from);
struct buffer_packet* inject_buffer_tow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* to);
struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
struct buffer_packet* dup_buffer_toa(struct buffer_resources* app_ctx, struct buffer_packet* bp, void* to); struct buffer_packet* dup_buffer_toa(struct buffer_resources* app_ctx, struct buffer_packet* bp, void* to);
guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);

View file

@ -28,7 +28,7 @@ int on_signal(struct evt_core_ctx* evts, struct evt_core_fdinfo* fdinfo) {
} }
void signal_init(struct evt_core_ctx* evts) { void signal_init(struct evt_core_ctx* evts) {
sigset_t mask; sigset_t mask = {0};
struct evt_core_cat signal_read = { struct evt_core_cat signal_read = {
.name = "signal-read", .name = "signal-read",

View file

@ -208,7 +208,7 @@ void dump_abstract_packet(union abstract_packet* ap) {
printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd); printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd);
switch (ap->fmt.headers.cmd) { switch (ap->fmt.headers.cmd) {
case CMD_LINK_MONITORING_THUNDER: case CMD_LINK_MONITORING_THUNDER:
printf(" <LinkMonitoringThunder/>\n"); printf(" <LinkMonitoringThunder></LinkMonitoringThunder>\n");
break; break;
case CMD_UDP_METADATA_THUNDER: case CMD_UDP_METADATA_THUNDER:
printf(" <UdpMetadataThunder>id=%d</UdpMetadataThunder>\n", printf(" <UdpMetadataThunder>id=%d</UdpMetadataThunder>\n",