diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 30b8e73..530f20e 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -22,15 +22,15 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; - if (ring_ge(dup2c->recv_id, bp->ip.ap.str.id)) { + if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { mv_buffer_rtof(app_ctx, fdinfo); return 0; } - dup2c->recv_id = bp->ip.ap.str.id; + dup2c->recv_id = bp->ip.ap.fmt.content.clear.id; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); @@ -49,7 +49,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct dup2_ctx* dup2c = app_ctx->misc; - bp->ip.ap.str.id = dup2c->emit_id; + bp->ip.ap.fmt.content.clear.id = dup2c->emit_id; dup2c->emit_id = dup2c->emit_id + 1; struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); diff --git a/src/algo_naive.c b/src/algo_naive.c index 17a7d99..90afdaa 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -11,7 +11,7 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); diff --git a/src/algo_rr.c b/src/algo_rr.c index c251b10..440e903 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -5,34 +5,35 @@ #include "proxy.h" #include "timer.h" -struct waited_pkt { - uint16_t id; - int link_num; - uint8_t on; +struct timer_info { + uint16_t health_id; + uint8_t prevlink; + uint16_t min_blocked_pkt; struct algo_ctx* algo; }; -struct deferred_pkt { +struct queued_pkt { + uint8_t on; int link_fd; int idx; uint16_t id; - uint8_t on; struct algo_ctx* algo; }; struct rr_ctx { uint8_t my_links; - uint16_t my_links_ver; uint8_t remote_links; - int64_t mjit; - uint16_t recv_id; - uint16_t recv_id_late; - uint16_t sent_id; uint8_t current_link; + int64_t mjit; + uint16_t health_id; + uint16_t health_id_late; + uint16_t content_id; + uint16_t sent_health_id; + uint16_t sent_content_id; struct internet_packet prev_packet; struct timespec emit_time; - struct deferred_pkt real[PACKET_BUFFER_SIZE]; - struct waited_pkt wait[PACKET_BUFFER_SIZE]; + struct queued_pkt real[PACKET_BUFFER_SIZE]; + struct timer_info wait[PACKET_BUFFER_SIZE]; }; void show_link_availability(struct rr_ctx* rr) { @@ -49,110 +50,53 @@ void show_link_availability(struct rr_ctx* rr) { printf("]\n"); } -void expired_wait (struct evt_core_ctx* ctx, void* user); -void expired_late(struct evt_core_ctx* ctx, void* user); +void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE; - uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE; + uint16_t real_idx = bp->ip.ap.fmt.content.clear.id % PACKET_BUFFER_SIZE; - //printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id); + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); - // 0. Update remote links - if (ring_lt(rr->recv_id_late, bp->ip.ap.str.id) && !(rr->remote_links & 1 << link_num)) { - printf("Activate link=%d | ", link_num); - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working - show_link_availability (rr); - } - - // 1. Update my links I can use thanks to target feedback - if (bp->ip.ap.str.id > rr->my_links_ver && bp->ip.ap.str.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.str.bitfield; - rr->my_links_ver = bp->ip.ap.str.id; - printf("Update my links | "); - show_link_availability (rr); - } - - // 2. If packet arrived too late or already queued, we discard it - if (ring_ge(rr->recv_id, bp->ip.ap.str.id) || rr->real[real_idx].id == bp->ip.ap.str.id) { - // Packet has already been delivered or dropped, we free the buffer - fprintf(stderr, "Packet %d arrived too late (current: %d) or already received\n", bp->ip.ap.str.id, rr->recv_id); - mv_buffer_wtof (app_ctx, fdinfo); - return; - } - - // 3. If packet arrived too early, we wait for its predecessors - //printf("%d < %d = %d\n", rr->recv_id, bp->ip.ap.str.id - 1, ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)); - if (ring_lt(rr->recv_id, bp->ip.ap.str.id - 1)) { - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.str.deltat; - //printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout); - if (timeout <= 0) timeout = 0; - - if (rr->wait[wait_idx].on && rr->wait[wait_idx].id != bp->ip.ap.str.id - 1) { - fprintf(stderr, "Waiting array overlap, BUG: [\n"); - for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - printf("\t%d => %d\n", rr->wait[i].id, rr->wait[i].on); - } - printf("] - could be replaced by drop\n"); - exit(EXIT_FAILURE); - } else if (!rr->wait[wait_idx].on) { - rr->wait[wait_idx].on = 1; - rr->wait[wait_idx].id = bp->ip.ap.str.id - 1; - rr->wait[wait_idx].link_num = bp->ip.ap.str.prevlink; - rr->wait[wait_idx].algo = app_ctx; - set_timeout(ctx, timeout, &rr->wait[wait_idx], expired_wait); - } - } - - // 4. We queue the packet to keep it - if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.str.id) { - fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, real_idx); + // 1. We queue the packet to keep it + if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.fmt.content.clear.id) { + fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.fmt.content.clear.id, real_idx); for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { printf("\t%d => %d\n", rr->real[i].id, rr->real[i].on); } printf("] - could be replaced by drop\n"); exit(EXIT_FAILURE); - } else if (!rr->real[real_idx].on) { - rr->real[real_idx].on = 2; - rr->real[real_idx].id = bp->ip.ap.str.id; + } else if (!rr->real[real_idx].on && ring_gt(bp->ip.ap.fmt.content.clear.id, rr->content_id)) { + rr->real[real_idx].on = 1; + rr->real[real_idx].id = bp->ip.ap.fmt.content.clear.id; rr->real[real_idx].idx = real_idx; rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].algo = app_ctx; mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); - - // 5. We register a timer for this packet to create a reactivation window for broken links - set_timeout(ctx, rr->mjit + 1, &rr->real[real_idx], expired_late); - - //printf("%d is added to real as %d\n", bp->ip.ap.str.id, idx_real); } else { - fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id); - mv_buffer_wtof (app_ctx, fdinfo); + if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); + mv_buffer_rtof (app_ctx, fdinfo); } - } -void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) { +void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queued_pkt* dp) { struct evt_core_fdinfo *to_fdinfo = NULL; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; char url[255]; // 1. Marked the packet as handled - dp->on--; + dp->on = 0; // 2. Get the buffer struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); // 3. We update our cursor - rr->recv_id = bp->ip.ap.str.id; + rr->content_id = bp->ip.ap.fmt.content.clear.id; // 4. Find its target - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet :( \n", url); @@ -166,6 +110,54 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct main_on_udp_write(ctx, to_fdinfo); } +void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; + + assert(bp->ip.ap.headers.cmd == CMD_HEALTH); + + // 1. Health packet was received too late, dropping it + if (ring_le(bp->ip.ap.fmt.content.health.id, rr->health_id_late)) goto release; + + // 2. Reactivate link if deactivated + char buffer[16]; + url_get_port (buffer, fdinfo->url); + int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded + if (!(rr->remote_links & (1 << link_num))) { + printf("Activate link=%d | ", link_num); + rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working + show_link_availability (rr); + } + + // 3. Update RR structure if its the greatest health_id we received + if (ring_gt(bp->ip.ap.fmt.content.health.id, rr->health_id)) { + // 3.1. Update current health id + rr->health_id = bp->ip.ap.fmt.content.health.id; + + // 3.2. Update my links I can use thanks to target feedback + if (bp->ip.ap.fmt.content.health.bitfield != rr->my_links) { + rr->my_links = bp->ip.ap.fmt.content.health.bitfield; + printf("Update my links | "); + show_link_availability (rr); + } + } + + // 4. Set callback to close this health packet window + int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.fmt.content.health.deltat; + if (timeout <= 0) timeout = 0; + uint64_t idx = bp->ip.ap.fmt.content.health.id % PACKET_BUFFER_SIZE; + + rr->wait[idx].health_id = bp->ip.ap.fmt.content.health.id; + rr->wait[idx].prevlink = bp->ip.ap.fmt.content.health.prevlink; + rr->wait[idx].min_blocked_pkt = bp->ip.ap.fmt.content.health.min_blocked_pkt; + rr->wait[idx].algo = app_ctx; + + set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); + +release: + mv_buffer_rtof(app_ctx, fdinfo); +} + void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct rr_ctx* rr = app_ctx->misc; struct evt_core_fdinfo* fdinfo = NULL; @@ -173,16 +165,9 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { while(1) { //printf("Trying to deliver %d\n", rr->recv_id+1); - struct deferred_pkt* def = &rr->real[(rr->recv_id+1) % PACKET_BUFFER_SIZE]; + struct queued_pkt* def = &rr->real[(rr->content_id+1) % PACKET_BUFFER_SIZE]; if (!def->on) break; - fdinfo = evt_core_get_from_fd (ctx, def->link_fd); - if (fdinfo == NULL) { - fprintf(stderr, "An error occured as the link seems to be closed for the requested fd\n"); - rr->recv_id++; - continue; - } - - rr_deliver(ctx, fdinfo, def); + rr_deliver(ctx, app_ctx, def); //printf("Delivered %d\n", rr->recv_id); } } @@ -193,11 +178,17 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - // 1. Register packet in our queue - rr_pkt_register(ctx, fdinfo, bp); + if (bp->ip.ap.fmt.headers.cmd == CMD_CLEAR) { + if (ctx->verbose) printf("Received a CLEAR packet of size %d\n", bp->ip.ap.fmt.headers.size); + // 1. Register packet in our queue + rr_pkt_register(ctx, fdinfo, bp); - // 2. Process queue - rr_pkt_unroll (ctx, app_ctx); + // 2. Process queue + rr_pkt_unroll (ctx, app_ctx); + } else if (bp->ip.ap.fmt.headers.cmd == CMD_HEALTH) { + if (ctx->verbose) printf("Received a HEALTH packet of size %d\n", bp->ip.ap.fmt.headers.size); + rr_pkt_manage_links(ctx, fdinfo, bp); + } return 0; co_error: @@ -221,28 +212,45 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo exit(EXIT_FAILURE); } + // 2. Compute delta t secs = curr.tv_sec - rr->emit_time.tv_sec; nsecs = curr.tv_nsec - rr->emit_time.tv_nsec; + rr->emit_time = curr; mili_sec = secs * 1000 + nsecs / 1000000; if (mili_sec > rr->mjit) mili_sec = rr->mjit; - bp->ip.ap.str.id = rr->sent_id; - bp->ip.ap.str.flags = 0; - bp->ip.ap.str.deltat = mili_sec; - bp->ip.ap.str.bitfield = rr->remote_links; - bp->ip.ap.str.prevlink = rr->current_link; + // 3. Backup clear packet + struct buffer_packet clear_packet; + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); + append_buffer (&clear_packet.ip.ap, 0, &bp->ip.ap); + // 4. Set health packet + bp->ip.ap.fmt.headers.cmd = CMD_HEALTH; + bp->ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); + bp->ip.ap.fmt.content.health.id = rr->sent_health_id; + bp->ip.ap.fmt.content.health.deltat = mili_sec; + bp->ip.ap.fmt.content.health.prevlink = rr->current_link; + bp->ip.ap.fmt.content.health.bitfield = rr->remote_links; + rr->sent_health_id++; + + // 5. Append clear packet + clear_packet.ip.ap.fmt.content.clear.id = rr->sent_content_id; + rr->sent_content_id++; + bp->ip.ap.fmt.content.health.min_blocked_pkt = clear_packet.ip.ap.fmt.content.clear.id; + append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap); + bp->ap_count++; + + // 6. Append redundancy if needed if (app_ctx->ap.redundant_data == 1) { - append_buffer(&bp->ip.ap, 1, &rr->prev_packet.ap); // We append previous packet - append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time + assert(ring_gt(bp->ip.ap.fmt.content.health.min_blocked_pkt, rr->prev_packet.ap.fmt.content.clear.id)); + bp->ip.ap.fmt.content.health.min_blocked_pkt = rr->prev_packet.ap.fmt.content.clear.id; + append_buffer(&bp->ip.ap, 2, &rr->prev_packet.ap); // We append previous packet + append_buffer(&rr->prev_packet.ap, 0, &clear_packet.ip.ap); // We store current packet for next time bp->ap_count++; } //printf("Will send packet id=%d\n", bp->ip.ap.str.id); - rr->emit_time = curr; - rr->sent_id++; - - // 2. Try to find someone to send it + // 7. Try to find someone to send it int max = 16; uint8_t sel_link = rr->current_link; while(max-- >= 0) { @@ -251,13 +259,14 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) continue; // Missing link if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Some links are down - if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) { + if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { rr->current_link = sel_link; mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); return 0; } else { - dup_buffer_tow(app_ctx, bp, to_fdinfo); + struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); + dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor... main_on_tcp_write(ctx, to_fdinfo); } } @@ -273,39 +282,30 @@ co_error: exit(EXIT_FAILURE); } -void expired_wait(struct evt_core_ctx* ctx, void* user) { - struct waited_pkt* pkt = user; - struct rr_ctx* rr = pkt->algo->misc; +void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { + struct timer_info* t = raw; + struct algo_ctx* app_ctx = t->algo; + struct rr_ctx* rr = app_ctx->misc; - // 1. Release lock - pkt->on = 0; + // 1. Update link recovery window if needed + if (ring_gt(t->health_id, rr->health_id_late)) rr->health_id_late = t->health_id; - // 2. We will not reactivate link for this packet - if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; - - // 3. Stop if packet has been received and delivered - if (ring_le (pkt->id, rr->recv_id)) return; - - printf("Timer reached for packet %d\n", pkt->id); - - // 4. BLACKLIST LINK - printf("Blacklist link=%d | ", pkt->link_num); - rr->remote_links &= 0xff ^ 1 << pkt->link_num; - show_link_availability (rr); - - // 5. Deliver following packets - while (ring_lt(rr->recv_id, pkt->id)) { - rr->recv_id++; - rr_pkt_unroll (ctx, pkt->algo); + // 2. Blacklist previous link if needed + uint16_t prev_health_id = (t->health_id - 1); + uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; + struct timer_info* t_old = &rr->wait[prev_health_idx]; + if (t_old->health_id != prev_health_id) { + printf("Blacklist link=%d | ", t->prevlink); + rr->remote_links &= 0xff ^ 1 << t->prevlink; + show_link_availability (rr); } -} -void expired_late(struct evt_core_ctx* ctx, void* user) { - struct deferred_pkt* pkt = user; - struct rr_ctx* rr = pkt->algo->misc; - - pkt->on--; - if (ring_lt(rr->recv_id_late, pkt->id)) rr->recv_id_late = pkt->id; + // 3. Deliver blocked packets + //printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); + while (ring_gt(t->min_blocked_pkt, rr->content_id)) { + rr->content_id++; + rr_pkt_unroll (ctx, app_ctx); + } } int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { @@ -328,9 +328,12 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg rr->mjit = 200; rr->my_links = 0xff; rr->remote_links = 0xff; - rr->sent_id = 1; - rr->recv_id = 0; - rr->recv_id_late = 0; + rr->sent_health_id = 1; + rr->sent_content_id = 1; + rr->health_id = 0; + rr->health_id_late = 0; + rr->content_id = 0; + rr->current_link = 0; app_ctx->misc = rr; app_ctx->free_misc = algo_rr_free; diff --git a/src/algo_utils.c b/src/algo_utils.c index fc1fa4b..a85bd10 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -43,7 +43,7 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_ struct buffer_packet* bp; // 1. Check if we don't have a buffer - bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); + bp = fdinfo == NULL ? NULL : g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); if (bp != NULL) return bp; // 2. Get a new buffer otherwise @@ -189,14 +189,14 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { g_queue_push_tail (app_ctx->free_buffer, bp); } -void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { +struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { GQueue* q; // 1. We get a free buffer struct buffer_packet* bp_dest = g_queue_pop_head(app_ctx->free_buffer); if (bp_dest == NULL) { debug_buffer(app_ctx, to); - return; + return NULL; } // 2. We duplicate the data @@ -211,6 +211,7 @@ void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct e // 4. We push the content to the appropriate destination g_queue_push_tail(q, bp_dest); + return bp_dest; } struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { @@ -238,9 +239,9 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) { char* target = &(dest->raw); while (pos-- > 0) { - target += dest->str.size; + target += ((union abstract_packet*) target)->fmt.headers.size; } - memcpy(target, src, src->str.size); + memcpy(target, src, src->fmt.headers.size); return 0; } diff --git a/src/algo_utils.h b/src/algo_utils.h index ce8d26a..76154de 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -51,7 +51,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); -void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); diff --git a/src/packet.c b/src/packet.c index 6d520dd..518cb53 100644 --- a/src/packet.c +++ b/src/packet.c @@ -3,14 +3,14 @@ size_t get_full_size(struct buffer_packet* bp) { union abstract_packet* ap = &bp->ip.ap; for (int i = 0; i < bp->ap_count; i++) { - ap = (union abstract_packet*)(&ap->raw + ap->str.size); + ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); } return &ap->raw - &bp->ip.ap.raw; } enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { ssize_t nread; - size_t pkt_size_size = sizeof(bp->ip.ap.str.size); + size_t pkt_size_size = sizeof(bp->ip.ap.fmt.headers.size); if (bp->mode != BP_READING) return FDS_ERR; while (bp->aread < pkt_size_size) { @@ -21,8 +21,8 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { bp->aread += nread; } - while (bp->aread < bp->ip.ap.str.size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.str.size - bp->aread); + while (bp->aread < bp->ip.ap.fmt.headers.size) { + nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.fmt.headers.size - bp->aread); if (nread == 0) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1) return FDS_ERR; @@ -46,7 +46,6 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; } - bp->mode = BP_READING; bp->aread = 0; bp->ap_count = 0; @@ -57,7 +56,8 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nwrite; size_t bytes_to_send; - size_t pkt_header_size = sizeof(bp->ip.ap.str) - sizeof(char); + assert(bp->ip.ap.headers.cmd == CMD_CLEAR); + size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); struct sockaddr* addr = NULL; socklen_t addrlen = 0; if (udp_t->set) { @@ -67,9 +67,9 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = bp->ip.ap.str.size - pkt_header_size; + bytes_to_send = bp->ip.ap.fmt.headers.size - pkt_header_size; nwrite = sendto(fd, - &(bp->ip.ap.str.payload), + &(bp->ip.ap.fmt.content.clear.payload), bytes_to_send, 0, addr, @@ -89,12 +89,12 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp ssize_t nread; if (bp->mode != BP_READING) return FDS_ERR; - size_t pkt_header_size = sizeof(bp->ip.ap.str) - sizeof(char); // We remove the payload + size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); // We remove the payload size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; socklen_t addrlen = sizeof(struct sockaddr_in); nread = recvfrom(fd, - &(bp->ip.ap.str.payload), + &(bp->ip.ap.fmt.content.clear.payload), udp_packet_size, MSG_TRUNC, (struct sockaddr*)&udp_t->addr, @@ -106,7 +106,8 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp udp_t->set = 1; udp_t->addrlen = addrlen; - bp->ip.ap.str.size = nread + pkt_header_size; + bp->ip.ap.fmt.headers.size = nread + pkt_header_size; + bp->ip.ap.fmt.headers.cmd = CMD_CLEAR; bp->mode = BP_WRITING; bp->awrite = 0; diff --git a/src/packet.h b/src/packet.h index f07fd67..0c7b43d 100644 --- a/src/packet.h +++ b/src/packet.h @@ -1,4 +1,5 @@ #pragma once +#include #include #include #include @@ -28,22 +29,35 @@ enum BP_MODE { BP_WRITING }; -enum PKT_FLAGS { - PKT_CONTROL = 1 << 0 +enum PKT_CMD { + CMD_HEALTH, + CMD_CLEAR, + CMD_XOR }; union abstract_packet { char raw; struct { - uint16_t size; - uint16_t port; - uint16_t id; - uint8_t bitfield; - uint8_t prevlink; - uint16_t deltat; - uint8_t flags; - char payload; - } str; + struct { + uint16_t size; + enum PKT_CMD cmd; + } headers; + + union { + struct { + uint16_t id; + uint8_t bitfield; + uint8_t prevlink; + uint16_t deltat; + uint16_t min_blocked_pkt; + } health; + struct { + uint16_t id; + uint16_t port; + char payload; + } clear; + } content; + } fmt; }; struct internet_packet { diff --git a/src/proxy.c b/src/proxy.c index b392dee..60fe7c7 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -71,7 +71,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; // 2. Read packet from socket - bp->ip.ap.str.port = url_get_port_int (fdinfo->url); + bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1;