diff --git a/src/algo_dup2.c b/src/algo_dup2.c index c5db60f..530f20e 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -22,15 +22,15 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; - if (ring_ge(dup2c->recv_id, bp->ip.ap.content.clear.id)) { + if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { mv_buffer_rtof(app_ctx, fdinfo); return 0; } - dup2c->recv_id = bp->ip.ap.content.clear.id; + dup2c->recv_id = bp->ip.ap.fmt.content.clear.id; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); @@ -49,7 +49,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct dup2_ctx* dup2c = app_ctx->misc; - bp->ip.ap.content.clear.id = dup2c->emit_id; + bp->ip.ap.fmt.content.clear.id = dup2c->emit_id; dup2c->emit_id = dup2c->emit_id + 1; struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); diff --git a/src/algo_naive.c b/src/algo_naive.c index e9b2390..90afdaa 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -11,7 +11,7 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); diff --git a/src/algo_rr.c b/src/algo_rr.c index b15d275..8fc2467 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -55,13 +55,13 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - uint16_t real_idx = bp->ip.ap.content.clear.id % PACKET_BUFFER_SIZE; + uint16_t real_idx = bp->ip.ap.fmt.content.clear.id % PACKET_BUFFER_SIZE; assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 1. We queue the packet to keep it - if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.content.clear.id) { - fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.content.clear.id, real_idx); + if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.fmt.content.clear.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.fmt.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); } @@ -69,13 +69,13 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, exit(EXIT_FAILURE); } else if (!rr->real[real_idx].on) { rr->real[real_idx].on = 1; - rr->real[real_idx].id = bp->ip.ap.content.clear.id; + rr->real[real_idx].id = bp->ip.ap.fmt.content.clear.id; rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); } else { - fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.content.clear.id, rr->content_id); + fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); mv_buffer_rtof (app_ctx, fdinfo); } } @@ -93,10 +93,10 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 3. We update our cursor - rr->content_id = bp->ip.ap.content.clear.id; + rr->content_id = bp->ip.ap.fmt.content.clear.id; // 4. Find its target - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet :( \n", url); @@ -117,7 +117,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf assert(bp->ip.ap.headers.cmd == CMD_HEALTH); // 1. Health packet was received too late, dropping it - if (ring_le(bp->ip.ap.content.health.id, rr->health_id_late)) goto release; + if (ring_le(bp->ip.ap.fmt.content.health.id, rr->health_id_late)) goto release; // 2. Reactivate link if deactivated char buffer[16]; @@ -130,26 +130,26 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf } // 3. Update RR structure if its the greatest health_id we received - if (ring_gt(bp->ip.ap.content.health.id, rr->health_id)) { + if (ring_gt(bp->ip.ap.fmt.content.health.id, rr->health_id)) { // 3.1. Update current health id - rr->health_id = bp->ip.ap.content.health.id; + rr->health_id = bp->ip.ap.fmt.content.health.id; // 3.2. Update my links I can use thanks to target feedback - if (bp->ip.ap.content.health.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.content.health.bitfield; + if (bp->ip.ap.fmt.content.health.bitfield != rr->my_links) { + rr->my_links = bp->ip.ap.fmt.content.health.bitfield; printf("Update my links | "); show_link_availability (rr); } } // 4. Set callback to close this health packet window - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.content.health.deltat; + int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.fmt.content.health.deltat; if (timeout <= 0) timeout = 0; - uint64_t idx = bp->ip.ap.content.health.id % PACKET_BUFFER_SIZE; + uint64_t idx = bp->ip.ap.fmt.content.health.id % PACKET_BUFFER_SIZE; - rr->wait[idx].health_id = bp->ip.ap.content.health.id; - rr->wait[idx].prevlink = bp->ip.ap.content.health.prevlink; - rr->wait[idx].min_blocked_pkt = bp->ip.ap.content.health.min_blocked_pkt; + rr->wait[idx].health_id = bp->ip.ap.fmt.content.health.id; + rr->wait[idx].prevlink = bp->ip.ap.fmt.content.health.prevlink; + rr->wait[idx].min_blocked_pkt = bp->ip.ap.fmt.content.health.min_blocked_pkt; rr->wait[idx].algo = app_ctx; set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); @@ -178,13 +178,15 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - if (bp->ip.ap.headers.cmd == CMD_CLEAR) { + if (bp->ip.ap.fmt.headers.cmd == CMD_CLEAR) { + if (ctx->verbose) printf("Received a CLEAR packet of size %d\n", bp->ip.ap.fmt.headers.size); // 1. Register packet in our queue rr_pkt_register(ctx, fdinfo, bp); // 2. Process queue rr_pkt_unroll (ctx, app_ctx); - } else if (bp->ip.ap.headers.cmd == CMD_HEALTH) { + } else if (bp->ip.ap.fmt.headers.cmd == CMD_HEALTH) { + if (ctx->verbose) printf("Received a HEALTH packet of size %d\n", bp->ip.ap.fmt.headers.size); rr_pkt_manage_links(ctx, fdinfo, bp); } @@ -223,17 +225,17 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo append_buffer (&clear_packet.ip.ap, 0, &bp->ip.ap); // 4. Set health packet - bp->ip.ap.headers.cmd = CMD_HEALTH; - bp->ip.ap.headers.size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.health); - bp->ip.ap.content.health.id = rr->sent_health_id; - bp->ip.ap.content.health.deltat = mili_sec; - bp->ip.ap.content.health.prevlink = rr->current_link; - bp->ip.ap.content.health.bitfield = rr->remote_links; + bp->ip.ap.fmt.headers.cmd = CMD_HEALTH; + bp->ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); + bp->ip.ap.fmt.content.health.id = rr->sent_health_id; + bp->ip.ap.fmt.content.health.deltat = mili_sec; + bp->ip.ap.fmt.content.health.prevlink = rr->current_link; + bp->ip.ap.fmt.content.health.bitfield = rr->remote_links; rr->sent_health_id++; // 5. Append clear packet - clear_packet.ip.ap.content.clear.id = rr->sent_content_id; + clear_packet.ip.ap.fmt.content.clear.id = rr->sent_content_id; rr->sent_content_id++; append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap); bp->ap_count++; diff --git a/src/algo_utils.c b/src/algo_utils.c index 28d1015..a85bd10 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -236,12 +236,12 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { } } -int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src) { +int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { char* target = &(dest->raw); while (pos-- > 0) { - target += ((struct abstract_packet*) target)->headers.size; + target += ((union abstract_packet*) target)->fmt.headers.size; } - memcpy(target, src, src->headers.size); + memcpy(target, src, src->fmt.headers.size); return 0; } diff --git a/src/algo_utils.h b/src/algo_utils.h index 52c0d92..76154de 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -53,7 +53,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); -int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src); +int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); diff --git a/src/packet.c b/src/packet.c index 0569182..7a4ba9c 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,16 +1,16 @@ #include "packet.h" size_t get_full_size(struct buffer_packet* bp) { - struct abstract_packet* ap = &bp->ip.ap; + union abstract_packet* ap = &bp->ip.ap; for (int i = 0; i < bp->ap_count; i++) { - ap = (struct abstract_packet*)(&ap->raw + ap->headers.size); + ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); } return &ap->raw - &bp->ip.ap.raw; } enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { ssize_t nread; - size_t pkt_size_size = sizeof(bp->ip.ap.headers.size); + size_t pkt_size_size = sizeof(bp->ip.ap.fmt.headers.size); if (bp->mode != BP_READING) return FDS_ERR; while (bp->aread < pkt_size_size) { @@ -20,9 +20,10 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { if (nread == -1) return FDS_ERR; bp->aread += nread; } + printf("read packet size = %d, read on network = %d, computed size to read = %ld.\n", bp->ip.ap.fmt.headers.size, bp->aread, pkt_size_size); - while (bp->aread < bp->ip.ap.headers.size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.headers.size - bp->aread); + while (bp->aread < bp->ip.ap.fmt.headers.size) { + nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.fmt.headers.size - bp->aread); if (nread == 0) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1) return FDS_ERR; @@ -40,13 +41,14 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { ssize_t nwrite; if (bp->mode != BP_WRITING) return FDS_ERR; + printf("Already written: %d\n", bp->awrite); while (bp->awrite < get_full_size(bp)) { nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; } - + printf("headers.size=%d, fullsize=%ld, bp->awrite=%d, first_read=%d\n", bp->ip.ap.fmt.headers.size, get_full_size (bp), bp->awrite, *(uint16_t*)&(bp->ip.ap.raw)); bp->mode = BP_READING; bp->aread = 0; bp->ap_count = 0; @@ -58,7 +60,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t ssize_t nwrite; size_t bytes_to_send; assert(bp->ip.ap.headers.cmd == CMD_CLEAR); - size_t pkt_header_size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.clear) - sizeof(char); + size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); struct sockaddr* addr = NULL; socklen_t addrlen = 0; if (udp_t->set) { @@ -68,9 +70,9 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = bp->ip.ap.headers.size - pkt_header_size; + bytes_to_send = bp->ip.ap.fmt.headers.size - pkt_header_size; nwrite = sendto(fd, - &(bp->ip.ap.content.clear.payload), + &(bp->ip.ap.fmt.content.clear.payload), bytes_to_send, 0, addr, @@ -90,12 +92,12 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp ssize_t nread; if (bp->mode != BP_READING) return FDS_ERR; - size_t pkt_header_size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.clear) - sizeof(char); // We remove the payload + size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); // We remove the payload size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; socklen_t addrlen = sizeof(struct sockaddr_in); nread = recvfrom(fd, - &(bp->ip.ap.content.clear.payload), + &(bp->ip.ap.fmt.content.clear.payload), udp_packet_size, MSG_TRUNC, (struct sockaddr*)&udp_t->addr, @@ -107,8 +109,8 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp udp_t->set = 1; udp_t->addrlen = addrlen; - bp->ip.ap.headers.size = nread + pkt_header_size; - bp->ip.ap.headers.cmd = CMD_CLEAR; + bp->ip.ap.fmt.headers.size = nread + pkt_header_size; + bp->ip.ap.fmt.headers.cmd = CMD_CLEAR; bp->mode = BP_WRITING; bp->awrite = 0; diff --git a/src/packet.h b/src/packet.h index 80dc4ff..0c7b43d 100644 --- a/src/packet.h +++ b/src/packet.h @@ -35,32 +35,33 @@ enum PKT_CMD { CMD_XOR }; -struct abstract_packet { +union abstract_packet { char raw; - struct { - enum PKT_CMD cmd; - uint16_t size; - } headers; + struct { + uint16_t size; + enum PKT_CMD cmd; + } headers; - union { - struct { - uint16_t id; - uint8_t bitfield; - uint8_t prevlink; - uint16_t deltat; - uint16_t min_blocked_pkt; - } health; - struct { - uint16_t id; - uint16_t port; - char payload; - } clear; - } content; + union { + struct { + uint16_t id; + uint8_t bitfield; + uint8_t prevlink; + uint16_t deltat; + uint16_t min_blocked_pkt; + } health; + struct { + uint16_t id; + uint16_t port; + char payload; + } clear; + } content; + } fmt; }; struct internet_packet { - struct abstract_packet ap; + union abstract_packet ap; char rest[1499]; // MTU = 1500, 1 byte in the union as payload }; diff --git a/src/proxy.c b/src/proxy.c index b40160c..60fe7c7 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -71,7 +71,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; // 2. Read packet from socket - bp->ip.ap.content.clear.port = url_get_port_int (fdinfo->url); + bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1;