WIP receive

This commit is contained in:
Quentin 2019-08-27 17:28:14 +02:00
parent 55f7bb3eef
commit 6e3eb95778
2 changed files with 26 additions and 13 deletions

View file

@ -14,6 +14,7 @@ struct thunder_ctx {
uint8_t selected_link;
uint8_t total_links;
uint8_t delta_t_per_link[64];
uint64_t received_pkts_on_link[64];
uint8_t blacklisted[64];
size_t monit_pkt_size;
struct timespec prev_link_time, prev_packet_time;
@ -42,19 +43,12 @@ void prepare(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct thunder_ctx* thunderc = app_ctx->misc;
// 1. Put the raw buffer in cache
thunderc->emit_id++;
uint64_t ref = 0l + thunderc->emit_id;
dup_buffer_toa (&app_ctx->br, bp, (void *)ref);
uint64_t delta_pkt = compute_delta(&thunderc->prev_packet_time, 200);
union abstract_packet metadata = {
.fmt.headers.cmd = CMD_UDP_METADATA_THUNDER,
.fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder),
.fmt.headers.flags = 0,
.fmt.content.udp_metadata_thunder.id = thunderc->emit_id,
.fmt.content.udp_metadata_thunder.deltat = delta_pkt
};
buffer_append_ap (bp, &metadata);
}
@ -64,9 +58,11 @@ void pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer
struct thunder_ctx* thunderc = app_ctx->misc;
uint64_t ref = 0l + thunderc->emit_id;
// 1. Clean old buffers
if (ref > 10 && get_app_buffer (&app_ctx->br, (void *)ref - 10l)) {
mv_buffer_atof (&app_ctx->br, (void *)ref - 10l);
dup_buffer_toa (&app_ctx->br, bp, (void *)ref);
// 1. Clean old buffers (we keep only thunderc->total_links buffer, keeping more would be useless)
if (ref > thunderc->total_links && get_app_buffer (&app_ctx->br, (void *)(ref - thunderc->total_links))) {
mv_buffer_atof (&app_ctx->br, (void *)(ref - thunderc->total_links));
}
// 2. Append abstract packets stored in our buffers
@ -148,8 +144,26 @@ void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struc
thunderc->monit_pkt_size = sizeof(links.fmt.headers) + sizeof(links.fmt.content.link_monitoring_thunder) + sizeof(struct link_info) * (thunderc->total_links - 1);
}
void classify() {
void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct thunder_ctx* thunderc = app_ctx->misc;
union abstract_packet* ap = buffer_first_ap (bp);
while (ap != NULL && ap->fmt.headers.cmd != CMD_LINK_MONITORING_THUNDER) ap = ap_next(ap);
if (ap == NULL) {
fprintf(stderr, "Unable to find our packet\n");
exit(EXIT_FAILURE);
}
int link_id = url_get_port_int(fdinfo->url) - 7500;
thunderc->received_pkts_on_link[link_id]++;
struct link_info *li = &ap->fmt.content.link_monitoring_thunder.links_status;
for (int i = 0; i < thunderc->total_links; i++) {
uint64_t expected = i <= link_id ? thunderc->received_pkts_on_link[link_id] : thunderc->received_pkts_on_link[link_id] - 1;
if (thunderc->received_pkts_on_link[i] >= expected) continue; // Nothing to do, all packets have been received
set_timeout ()
}
}
void unpad() {
@ -161,7 +175,7 @@ void adapt() {
}
int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
classify();
classify(ctx, fdinfo, bp);
unpad();
adapt();
return 0;

View file

@ -60,7 +60,6 @@ union abstract_packet {
} link_monitoring_thunder;
struct {
uint16_t id;
uint16_t deltat;
} udp_metadata_thunder;
struct {
uint16_t port;