diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 647a978..7750df0 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -20,50 +20,51 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo char url[256]; struct evt_core_fdinfo *to_fdinfo = NULL; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - union abstract_packet *ap = (union abstract_packet*) &bp->ip, *ap_prev = NULL; + union abstract_packet *ap = (union abstract_packet*) &bp->ip; struct dup2_ctx* dup2c = app_ctx->misc; - uint16_t id = 0; + int32_t id = -1, port = -1; - switch (ap->fmt.headers.cmd) { - case CMD_UDP_METADATA_THUNDER: - id = ap->fmt.content.udp_metadata_thunder.id; - mv_buffer_rtof(&app_ctx->br, fdinfo); - - // Check that received identifier has not been delivered - if (ring_ge(dup2c->recv_id, id)) { - mv_buffer_atof(&app_ctx->br, fdinfo); - return 0; + do { + switch (ap->fmt.headers.cmd) { + case CMD_UDP_METADATA_THUNDER: + id = ap->fmt.content.udp_metadata_thunder.id; + break; + case CMD_UDP_ENCAPSULATED: + port = ap->fmt.content.udp_encapsulated.port; + break; + default: + break; } + } while ((ap = ap_next(ap)) != NULL); - // Update delivered identifier - dup2c->recv_id = id; - - // 1. Find destination - ap_prev = (union abstract_packet*) get_app_buffer(&app_ctx->br, fdinfo)->ip; - sprintf(url, "udp:write:127.0.0.1:%d", ap_prev->fmt.content.udp_encapsulated.port); - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) { - fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); - mv_buffer_atof (&app_ctx->br, fdinfo); - return 1; - } - - // 2. Move buffer - mv_buffer_atow (&app_ctx->br, fdinfo, to_fdinfo); - main_on_udp_write(ctx, to_fdinfo); - - break; - - case CMD_UDP_ENCAPSULATED: - mv_buffer_rtoa(&app_ctx->br, fdinfo, fdinfo); - break; - - default: - fprintf(stderr, "Unknown packet type\n"); - break; + if (port == -1 || id == -1) { + fprintf(stderr, "Missing data..."); + exit(EXIT_FAILURE); } - return 1; + // Check that received identifier has not been delivered + if (ring_ge(dup2c->recv_id, id)) { + mv_buffer_atof(&app_ctx->br, fdinfo); + return 0; + } + + // Update delivered identifier + dup2c->recv_id = id; + + // 1. Find destination + sprintf(url, "udp:write:127.0.0.1:%d", port); + to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) { + fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); + mv_buffer_atof (&app_ctx->br, fdinfo); + return 1; + } + + // 2. Move buffer + mv_buffer_atow (&app_ctx->br, fdinfo, to_fdinfo); + main_on_udp_write(ctx, to_fdinfo); + + return 0; } int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { @@ -71,9 +72,15 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct dup2_ctx* dup2c = app_ctx->misc; - bp->ip.ap.fmt.content.clear.id = dup2c->emit_id; - dup2c->emit_id = dup2c->emit_id + 1; + union abstract_packet metadata = { + .fmt.headers.cmd = CMD_UDP_METADATA_THUNDER, + .fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder), + .fmt.headers.flags = 0, + .fmt.content.udp_metadata_thunder.id = dup2c->emit_id + }; + buffer_append_ap (bp, &metadata); + dup2c->emit_id = dup2c->emit_id + 1; struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); for (int i = 0; i < app_ctx->ap.links; i++) { diff --git a/src/algo_utils.c b/src/algo_utils.c index be059b0..c17b60a 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -14,6 +14,11 @@ void naive_free_simple(void* v) { g_queue_free (g); } +void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) { + memset(bp, 0, sizeof(struct buffer_packet)); + g_queue_push_tail (app_ctx->free_buffer, bp); +} + void debug_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); int waiting_count = 0; @@ -34,8 +39,7 @@ void init_buffer_management(struct buffer_resources* ctx) { ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { - memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet)); - g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); + __push_to_free (ctx, &(ctx->bps[i])); } } @@ -73,11 +77,6 @@ struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct e return bp; } -void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) { - memset(bp, 0, sizeof(struct buffer_packet)); - g_queue_push_tail (app_ctx->free_buffer, bp); -} - guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) { GQueue* q; @@ -104,8 +103,6 @@ struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct // 3. Update state g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); - bp->mode = BP_WRITING; - bp->awrite = 0; return bp; } @@ -256,12 +253,3 @@ void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx) { } } } - -int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { - char* target = &(dest->raw); - while (pos-- > 0) { - target += ((union abstract_packet*) target)->fmt.headers.size; - } - memcpy(target, src, src->fmt.headers.size); - return 0; -} diff --git a/src/algo_utils.h b/src/algo_utils.h index f2845d1..275923d 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -26,8 +26,6 @@ void mv_buffer_atof(struct buffer_resources* app_ctx, void* from); struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); -int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); - struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx); diff --git a/src/capdiff.c b/src/capdiff.c index 96f73d8..143fc90 100644 --- a/src/capdiff.c +++ b/src/capdiff.c @@ -9,11 +9,12 @@ #define MAX_PKTS_TO_CHECK_FOR_DROP 10 uint8_t are_packets_equal(struct buffer_packet bpread[]) { - size_t s1 = bpread[0].ip.ap.fmt.headers.size, s2 = bpread[1].ip.ap.fmt.headers.size; + union abstract_packet *ap1 = (union abstract_packet*)&bpread[0].ip, *ap2 = (union abstract_packet*) bpread[1].ip; + size_t s1 = ap1->fmt.headers.size, s2 = ap2->fmt.headers.size; if (s1 != s2) return 0; - for (size_t idx = sizeof(bpread[0].ip.ap.fmt.headers) + sizeof(bpread[0].ip.ap.fmt.content.clear) - sizeof(char); idx < s1; idx++) { - char e1 = (&bpread[0].ip.ap.raw)[idx], e2 = (&bpread[1].ip.ap.raw)[idx]; + for (size_t idx = &ap1->fmt.content.udp_encapsulated.payload - (char*)ap1; idx < s1; idx++) { + char e1 = (&ap1->raw)[idx], e2 = (&ap2->raw)[idx]; if (e1 != e2) return 0; } @@ -41,7 +42,10 @@ void destroy_pkt_stats(gpointer data) { } void update_stats(struct buffer_packet *bp, GHashTable* stat_elem) { - gint port = bp->ip.ap.fmt.content.clear.port; + union abstract_packet *ap = (union abstract_packet*)&bp->ip; + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) return; + + gint port = ap->fmt.content.udp_encapsulated.port; struct pkt_stats *ps = g_hash_table_lookup(stat_elem, &port); if (ps == NULL) { ps = malloc(sizeof(struct pkt_stats)); @@ -57,7 +61,7 @@ void update_stats(struct buffer_packet *bp, GHashTable* stat_elem) { } ps->last = bp->seen; ps->count++; - ps->cumulated_size += bp->ip.ap.fmt.headers.size; + ps->cumulated_size += ap->fmt.headers.size; } void unroll_packets(struct cap_file cf[], struct buffer_packet bpread[], GHashTable* stats[], struct pkt_reconstruct *pr, int m, int i) { diff --git a/src/capreplay.c b/src/capreplay.c index d84873f..dbf5537 100644 --- a/src/capreplay.c +++ b/src/capreplay.c @@ -11,7 +11,9 @@ void get_ports(struct cap_file *cf) { size_t entry_count = cap_count_bp (cf); for (int c = 0; c < entry_count; c++) { cap_next_bp (cf, &bp); - int a = bp.ip.ap.fmt.content.clear.port; + union abstract_packet* ap = (union abstract_packet*) &bp.ip; + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) continue; + int a = ap->fmt.content.udp_encapsulated.port; } cap_begin(cf); } diff --git a/src/packet.c b/src/packet.c index f49d6a2..95dcd18 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,15 +1,33 @@ #include "packet.h" -size_t get_full_size(struct buffer_packet* bp) { - union abstract_packet* ap = (union abstract_packet*) &bp->ip; - for (int i = 0; i < bp->ap_count; i++) { - ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); - } - return &ap->raw - &bp->ip[0]; +union abstract_packet* ap_next(union abstract_packet* ap) { + if (ap->fmt.headers.flags & FLAG_READ_NEXT) + return (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); + + return NULL; } -void append_data(struct buffer_packet* bp, char* data, size_t size) { +union abstract_packet* buffer_last_ptr(struct buffer_packet* bp) { + union abstract_packet* ap = (union abstract_packet*) &bp->ip, *apn = NULL; + while ((apn = ap_next(ap)) != NULL) ap = apn; + return ap; +} + +union abstract_packet* buffer_free_ptr(struct buffer_packet* bp) { + union abstract_packet* ap = buffer_last_ptr (bp); + ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); + + return ap; +} + +size_t get_full_size(struct buffer_packet* bp) { + return &(buffer_free_ptr (bp))->raw - &bp->ip[0]; +} + +void buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap) { + buffer_last_ptr(bp)->fmt.headers.flags |= FLAG_READ_NEXT; + memcpy(buffer_last_ptr(bp), ap, ap->fmt.headers.size); } enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { @@ -34,9 +52,11 @@ enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer bp->aread += nread; } + bp->ap_count++; + if (bp->ap_count != get_full_size (bp)) return FDS_AGAIN; + bp->mode = BP_WRITING; bp->awrite = 0; - bp->ap_count = 1; return FDS_READY; } @@ -62,30 +82,34 @@ enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_ enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nwrite; - union abstract_packet* ap = (union abstract_packet*) &bp->ip; - - size_t bytes_to_send; - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - size_t pkt_header_size = sizeof(ap->fmt.headers); - struct sockaddr* addr = NULL; - socklen_t addrlen = 0; - if (udp_t->set) { - addr = (struct sockaddr*) &udp_t->addr; - addrlen = sizeof(struct sockaddr_in); - } - + union abstract_packet* ap = (union abstract_packet*) (&bp->ip + bp->awrite); + union abstract_packet* end = buffer_free_ptr (bp); if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = ap->fmt.headers.size - pkt_header_size; - nwrite = sendto(fdinfo->fd, + while (ap != end) { + if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) continue; + + size_t bytes_to_send; + size_t pkt_header_size = sizeof(ap->fmt.headers); + struct sockaddr* addr = NULL; + socklen_t addrlen = 0; + if (udp_t->set) { + addr = (struct sockaddr*) &udp_t->addr; + addrlen = sizeof(struct sockaddr_in); + } + + bytes_to_send = ap->fmt.headers.size - pkt_header_size; + nwrite = sendto(fdinfo->fd, &(ap->fmt.content.udp_encapsulated), bytes_to_send, 0, addr, addrlen); - if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; - if (nwrite != bytes_to_send) return FDS_ERR; + if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; + if (nwrite != bytes_to_send) return FDS_ERR; + bp->awrite += nwrite; + } bp->mode = BP_READING; bp->aread = 0; diff --git a/src/packet.h b/src/packet.h index 30cb1d7..5377cd4 100644 --- a/src/packet.h +++ b/src/packet.h @@ -37,12 +37,17 @@ enum PKT_CMD { CMD_UDP_METADATA_THUNDER, }; +enum PKT_FLAGS { + FLAG_READ_NEXT = 1 << 0, +}; + union abstract_packet { char raw; struct { struct { uint16_t size; - enum PKT_CMD cmd; + uint8_t cmd; + uint8_t flags; } headers; union { @@ -56,7 +61,6 @@ union abstract_packet { struct { uint16_t id; uint16_t deltat; - } udp_metadata_thunder; struct { uint16_t port; @@ -84,6 +88,11 @@ struct udp_target { size_t get_full_size(struct buffer_packet* bp); +void buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap); +union abstract_packet* buffer_free_ptr(struct buffer_packet* bp); +union abstract_packet* buffer_last_ptr(struct buffer_packet* bp); +union abstract_packet* ap_next(union abstract_packet* ap); + enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fd, struct buffer_packet* bp); enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fd, struct buffer_packet* bp); enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fd, struct buffer_packet* bp, struct udp_target* udp_t);