diff --git a/CMakeLists.txt b/CMakeLists.txt index d9c9490..e6661e6 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -25,8 +25,8 @@ list(APPEND CSOURCES src/url.c src/donar_init.h src/donar_init.c - src/algo_rr.c src/algo_dup2.c + src/algo_thunder.c src/algo_utils.h src/algo_utils.c src/proxy.h diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 27fd1d9..e179f79 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -19,10 +19,11 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo char url[256]; struct evt_core_fdinfo *to_fdinfo = NULL; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; // Check that we didn't already received the packet struct dup2_ctx* dup2c = app_ctx->misc; - if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { + if (ring_ge(dup2c->recv_id, ap->fmt.content.clear.id)) { mv_buffer_rtof(&app_ctx->br, fdinfo); return 0; } diff --git a/src/algo_naive.c b/src/algo_naive.c index 6a7bac1..fb85873 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -9,9 +9,10 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf char url[256]; struct evt_core_fdinfo *to_fdinfo = NULL; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; // 1. Find destination - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); + sprintf(url, "udp:write:127.0.0.1:%d", ap->fmt.content.udp_encapsulated.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); diff --git a/src/algo_rr.c b/src/algo_rr.c deleted file mode 100644 index 788d3cb..0000000 --- a/src/algo_rr.c +++ /dev/null @@ -1,379 +0,0 @@ -#include -#include "algo_utils.h" -#include "utils.h" -#include "url.h" -#include "proxy.h" -#include "timer.h" - -struct timer_info { - uint16_t health_id; - uint8_t prevlink; - uint16_t min_blocked_pkt; - struct algo_ctx* algo; -}; - -struct queued_pkt { - uint8_t on; - int link_fd; - int idx; - uint16_t id; - struct algo_ctx* algo; -}; - -struct rr_ctx { - uint8_t my_links; - uint8_t remote_links; - uint8_t current_link; - int64_t mjit; - uint16_t health_id; - uint16_t health_id_late; - uint16_t content_id; - uint16_t sent_health_id; - uint16_t sent_content_id; - struct internet_packet prev_packet; - struct timespec emit_time; - struct queued_pkt real[PACKET_BUFFER_SIZE]; - struct timer_info wait[PACKET_BUFFER_SIZE]; -}; - -void show_link_availability(struct rr_ctx* rr) { - printf("Links availability: my_links["); - for (int i = 0; i < 8; i++) { - if (rr->my_links & 1 << i) printf("U"); - else printf("-"); - } - printf("], rem_links["); - for (int i = 0; i < 8; i++) { - if (rr->remote_links & 1 << i) printf("U"); - else printf("-"); - } - printf("]\n"); -} - -void blacklist_link(struct rr_ctx* rr, int sel_link) { - printf("Blacklist link=%d | ", sel_link); - rr->remote_links &= 0xff ^ 1 << sel_link; - show_link_availability (rr); -} - -void on_timeout_health (struct evt_core_ctx* ctx, void* user); - -void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - uint16_t real_idx = bp->ip.ap.fmt.content.clear.id % PACKET_BUFFER_SIZE; - - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - - // 1. We queue the packet to keep it - if (rr->real[real_idx].on && ring_lt(rr->real[real_idx].id, bp->ip.ap.fmt.content.clear.id)) { - fprintf(stderr, "Real array is full for packet_id=%d, idx=%d, last_delivered_content_id=%d BUG: [\n", - bp->ip.ap.fmt.content.clear.id, - real_idx, - rr->content_id); - for (int i = 0; i < PACKET_BUFFER_SIZE; i++) { - fprintf(stderr, "\t%d => %d\n", rr->real[i].id, rr->real[i].on); - } - fprintf(stderr, "] - could be replaced by drop\n"); - exit(EXIT_FAILURE); - } else if (!rr->real[real_idx].on && ring_gt(bp->ip.ap.fmt.content.clear.id, rr->content_id)) { - rr->real[real_idx].on = 1; - rr->real[real_idx].id = bp->ip.ap.fmt.content.clear.id; - rr->real[real_idx].idx = real_idx; - rr->real[real_idx].link_fd = fdinfo->fd; - rr->real[real_idx].algo = app_ctx; - mv_buffer_rtoa(&app_ctx->br, fdinfo, &rr->real[real_idx].idx); - } else { - if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); - mv_buffer_rtof (&app_ctx->br, fdinfo); - } -} - -void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queued_pkt* dp) { - struct evt_core_fdinfo *to_fdinfo = NULL; - struct rr_ctx* rr = app_ctx->misc; - char url[255]; - - // 1. Marked the packet as handled - dp->on = 0; - - // 2. Get the buffer - struct buffer_packet* bp = get_app_buffer (&app_ctx->br, &dp->idx); - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - - // 3. We update our cursor - rr->content_id = bp->ip.ap.fmt.content.clear.id; - - // 4. Find its target - sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.fmt.content.clear.port); - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) { - fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx); - //mv_buffer_wtor (app_ctx, fdinfo, bp); - mv_buffer_atof (&app_ctx->br, &dp->idx); - } - - // 5. We move the buffer and notify the target - //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); - mv_buffer_atow (&app_ctx->br, &dp->idx, to_fdinfo); - main_on_udp_write(ctx, to_fdinfo); -} - -void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - - assert(bp->ip.ap.fmt.headers.cmd == CMD_HEALTH); - - // 1. Health packet was received too late, dropping it - if (ring_le(bp->ip.ap.fmt.content.health.id, rr->health_id_late)) goto release; - - // 2. Reactivate link if deactivated - char buffer[16]; - url_get_port (buffer, fdinfo->url); - int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded - if (!(rr->remote_links & (1 << link_num))) { - printf("Activate link=%d | ", link_num); - rr->remote_links |= 1 << link_num; // Make sure that the link is marked as working - show_link_availability (rr); - } - - // 3. Update RR structure if its the greatest health_id we received - if (ring_gt(bp->ip.ap.fmt.content.health.id, rr->health_id)) { - // 3.1. Update current health id - rr->health_id = bp->ip.ap.fmt.content.health.id; - - // 3.2. Update my links I can use thanks to target feedback - if (bp->ip.ap.fmt.content.health.bitfield != rr->my_links) { - rr->my_links = bp->ip.ap.fmt.content.health.bitfield; - printf("Update my links | "); - show_link_availability (rr); - } - } - - // 4. Set callback to close this health packet window - int64_t timeout = rr->mjit - (int64_t) bp->ip.ap.fmt.content.health.deltat; - if (timeout <= 0) timeout = 0; - uint64_t idx = bp->ip.ap.fmt.content.health.id % PACKET_BUFFER_SIZE; - - rr->wait[idx].health_id = bp->ip.ap.fmt.content.health.id; - rr->wait[idx].prevlink = bp->ip.ap.fmt.content.health.prevlink; - rr->wait[idx].min_blocked_pkt = bp->ip.ap.fmt.content.health.min_blocked_pkt; - rr->wait[idx].algo = app_ctx; - - set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); - -release: - mv_buffer_rtof(&app_ctx->br, fdinfo); -} - -uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { - struct rr_ctx* rr = app_ctx->misc; - struct evt_core_fdinfo* fdinfo = NULL; - struct buffer_packet* bp = NULL; - uint64_t delivered = 0; - - while(1) { - //printf("Trying to deliver %d\n", rr->recv_id+1); - struct queued_pkt* def = &rr->real[(rr->content_id+1) % PACKET_BUFFER_SIZE]; - if (!def->on) break; - rr_deliver(ctx, app_ctx, def); - delivered++; - //printf("Delivered %d\n", rr->recv_id); - } - return delivered; -} - -//------ - -int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - - if (bp->ip.ap.fmt.headers.cmd == CMD_CLEAR) { - if (ctx->verbose > 1) fprintf(stderr, " [algo/rr] Received a CLEAR packet of size %d on URL %s\n", bp->ip.ap.fmt.headers.size, fdinfo->url); - // 1. Register packet in our queue - rr_pkt_register(ctx, fdinfo, bp); - - // 2. Process queue - rr_pkt_unroll (ctx, app_ctx); - } else if (bp->ip.ap.fmt.headers.cmd == CMD_HEALTH) { - if (ctx->verbose > 1) fprintf(stderr, " [algo/rr] Received a HEALTH packet of size %d on URL %s\n", bp->ip.ap.fmt.headers.size, fdinfo->url); - rr_pkt_manage_links(ctx, fdinfo, bp); - } else { - fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd); - mv_buffer_rtof(&app_ctx->br, fdinfo); - } - - return 0; -} - -int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - struct evt_core_fdinfo *to_fdinfo = NULL; - uint16_t min_pkt; - guint len; - char url[255]; - size_t health_packet_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); - size_t max_size = sizeof(struct internet_packet) - health_packet_size; - - if (ctx->verbose > 1) fprintf(stderr, " [algo/rr] Read a UDP packet on URL %s\n", fdinfo->url); - - // 1. Prepare RR state and packet values - struct timespec curr; - int secs, nsecs; - uint64_t mili_sec; - - if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ - perror("clock_gettime error"); - exit(EXIT_FAILURE); - } - - // 2. Compute delta t - secs = curr.tv_sec - rr->emit_time.tv_sec; - nsecs = curr.tv_nsec - rr->emit_time.tv_nsec; - rr->emit_time = curr; - mili_sec = secs * 1000 + nsecs / 1000000; - if (mili_sec > rr->mjit) mili_sec = rr->mjit; - - // 3. Prepare fresh packet - assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - bp->ip.ap.fmt.content.clear.id = rr->sent_content_id; - min_pkt = rr->sent_content_id; - rr->sent_content_id++; - if (bp->ip.ap.fmt.headers.size > max_size) { - fprintf(stderr, "Packet is too big to be relayed. Oops...\n"); - exit(EXIT_FAILURE); - } - - // 4. Append redundancy if needed - if (app_ctx->ap.redundant_data == 1) { - size_t current_size = get_full_size (bp); - size_t final_size = current_size + rr->prev_packet.ap.fmt.headers.size; - if (final_size <= max_size) { - min_pkt = rr->prev_packet.ap.fmt.content.clear.id; - append_buffer(&bp->ip.ap, bp->ap_count, &rr->prev_packet.ap); // We append previous packet - bp->ap_count++; - } else if (ctx->verbose) { - fprintf(stderr, " [algo/rr] Can't append redundancy (current=%ld, after=%ld, max=%ld)\n", current_size, final_size, max_size); - } - - append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time - } - - // 5. Append health packet - struct buffer_packet hp; - hp.ip.ap.fmt.headers.cmd = CMD_HEALTH; - hp.ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); - hp.ip.ap.fmt.content.health.id = rr->sent_health_id; - hp.ip.ap.fmt.content.health.deltat = mili_sec; - hp.ip.ap.fmt.content.health.prevlink = rr->current_link; - hp.ip.ap.fmt.content.health.bitfield = rr->remote_links; - hp.ip.ap.fmt.content.health.min_blocked_pkt = min_pkt; - rr->sent_health_id++; - append_buffer(&bp->ip.ap, bp->ap_count, &hp.ip.ap); - bp->ap_count++; - - // 6. Try to find someone to send it - int max = 16; - uint8_t sel_link = rr->current_link; - while(max-- >= 0) { - if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Still bootstrapping - sel_link = (sel_link + 1) % app_ctx->ap.links; - sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded - to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) { - if (ctx->verbose) fprintf(stderr, " [algo/rr] write fd %s has not been found, skipping\n", url); - continue; - } - - if ((len = write_queue_len (&app_ctx->br, to_fdinfo)) > 0) { - if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); - blacklist_link (rr, sel_link); - continue; - } - - if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { - rr->current_link = sel_link; - mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo); - main_on_tcp_write(ctx, to_fdinfo); - return 0; - } else { - struct buffer_packet* dup_bp = dup_buffer_tow(&app_ctx->br, bp, to_fdinfo); - /* - * for later - dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; - dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor... - */ - main_on_tcp_write(ctx, to_fdinfo); - } - } - -not_ready: - // 3. We find no up target - fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); - mv_buffer_wtof (&app_ctx->br, fdinfo); - return 0; -} - -void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { - struct timer_info* t = raw; - struct algo_ctx* app_ctx = t->algo; - struct rr_ctx* rr = app_ctx->misc; - - // 1. Update link recovery window if needed - if (ring_gt(t->health_id, rr->health_id_late)) rr->health_id_late = t->health_id; - - // 2. Blacklist previous link if needed - uint16_t prev_health_id = (t->health_id - 1); - uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; - struct timer_info* t_old = &rr->wait[prev_health_idx]; - if (t_old->health_id != prev_health_id) blacklist_link (rr, t->prevlink); - - // 3. Deliver blocked packets - // @FIXME CRAPPY CODE / CRAPPY LOGIC - //printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); - if (ring_gt(t->min_blocked_pkt, rr->content_id) && !rr->real[t->min_blocked_pkt % PACKET_BUFFER_SIZE].on) { - fprintf(stderr, "min_blocked_packet has not been received, t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id); - exit(EXIT_FAILURE); - } - while (ring_gt(t->min_blocked_pkt, rr->content_id - 1)) { - rr->content_id++; - rr_pkt_unroll (ctx, app_ctx); - } - rr_pkt_unroll (ctx, app_ctx); -} - -int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - if (strstr(fdinfo->cat->name, "udp") != NULL) return 1; - return 0; -} - -void algo_rr_free(void* v) { - struct rr_ctx* rr = v; - free(rr); -} - -void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { - struct rr_ctx* rr = malloc(sizeof(struct rr_ctx)); - if (rr == NULL) { - perror("malloc failed for rr_init."); - exit(EXIT_FAILURE); - } - memset(rr, 0, sizeof(struct rr_ctx)); - rr->mjit = 200; - rr->my_links = 0xff; - rr->remote_links = 0xff; - rr->sent_health_id = 1; - rr->sent_content_id = 1; - rr->health_id = 0; - rr->health_id_late = 0; - rr->content_id = 0; - rr->current_link = 0; - app_ctx->misc = rr; - app_ctx->free_misc = algo_rr_free; - - init_timer(ctx); -} - diff --git a/src/algo_thunder.c b/src/algo_thunder.c new file mode 100644 index 0000000..7956afc --- /dev/null +++ b/src/algo_thunder.c @@ -0,0 +1,40 @@ +#include +#include "algo_utils.h" +#include "utils.h" +#include "url.h" +#include "proxy.h" +#include "timer.h" + + +void prepare(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { +} + +void pad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + +} + +int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + + + return 0; +} + +void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { + +} + +int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + + return 0; +} + +int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + prepare(ctx, fdinfo, bp); + pad(ctx, fdinfo, bp); + return schedule(ctx, fdinfo, bp); +} + +int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { + if (strstr(fdinfo->cat->name, "udp") != NULL) return 1; + return 0; +} diff --git a/src/packet.c b/src/packet.c index 8af43f7..f49d6a2 100644 --- a/src/packet.c +++ b/src/packet.c @@ -1,28 +1,33 @@ #include "packet.h" size_t get_full_size(struct buffer_packet* bp) { - union abstract_packet* ap = &bp->ip.ap; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; for (int i = 0; i < bp->ap_count; i++) { ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); } - return &ap->raw - &bp->ip.ap.raw; + return &ap->raw - &bp->ip[0]; } -enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { +void append_data(struct buffer_packet* bp, char* data, size_t size) { + +} + +enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { ssize_t nread; - size_t pkt_size_size = sizeof(bp->ip.ap.fmt.headers.size); + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + size_t pkt_size_size = sizeof(ap->fmt.headers.size); if (bp->mode != BP_READING) return FDS_ERR; while (bp->aread < pkt_size_size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, pkt_size_size - bp->aread); + nread = read(fdinfo->fd, &(ap->raw) + bp->aread, pkt_size_size - bp->aread); if (nread == 0) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1) return FDS_ERR; bp->aread += nread; } - while (bp->aread < bp->ip.ap.fmt.headers.size) { - nread = read(fd, &(bp->ip.ap.raw) + bp->aread, bp->ip.ap.fmt.headers.size - bp->aread); + while (bp->aread < ap->fmt.headers.size) { + nread = read(fdinfo->fd, &(ap->raw) + bp->aread, ap->fmt.headers.size - bp->aread); if (nread == 0) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1) return FDS_ERR; @@ -36,13 +41,14 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { return FDS_READY; } -enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { +enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { ssize_t nwrite; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; //dump_buffer_packet (bp); if (bp->mode != BP_WRITING) return FDS_ERR; while (bp->awrite < get_full_size(bp)) { - nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); + nwrite = send(fdinfo->fd, &(ap->raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite == -1) return FDS_ERR; bp->awrite += nwrite; @@ -54,11 +60,13 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { return FDS_READY; } -enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_target* udp_t) { +enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nwrite; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + size_t bytes_to_send; assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); - size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); + size_t pkt_header_size = sizeof(ap->fmt.headers); struct sockaddr* addr = NULL; socklen_t addrlen = 0; if (udp_t->set) { @@ -68,9 +76,9 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t if (bp->mode != BP_WRITING) return FDS_ERR; - bytes_to_send = bp->ip.ap.fmt.headers.size - pkt_header_size; - nwrite = sendto(fd, - &(bp->ip.ap.fmt.content.clear.payload), + bytes_to_send = ap->fmt.headers.size - pkt_header_size; + nwrite = sendto(fdinfo->fd, + &(ap->fmt.content.udp_encapsulated), bytes_to_send, 0, addr, @@ -86,19 +94,21 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t return FDS_READY; } -enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp_target* udp_t) { +enum FD_STATE read_packet_from_udp (struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) { ssize_t nread; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + if (bp->mode != BP_READING) { fprintf(stderr, "Buffer packet is not in reading mode (mode: %d)\n", bp->mode); return FDS_ERR; } - size_t pkt_header_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.clear) - sizeof(char); // We remove the payload - size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; + size_t pkt_header_size = sizeof(ap->fmt.headers); + size_t udp_packet_size = sizeof(bp->ip) - pkt_header_size; socklen_t addrlen = sizeof(struct sockaddr_in); - nread = recvfrom(fd, - &(bp->ip.ap.fmt.content.clear.payload), + nread = recvfrom(fdinfo->fd, + &(ap->fmt.content.udp_encapsulated.payload), udp_packet_size, MSG_TRUNC, (struct sockaddr*)&udp_t->addr, @@ -117,8 +127,9 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp udp_t->set = 1; udp_t->addrlen = addrlen; - bp->ip.ap.fmt.headers.size = nread + pkt_header_size; - bp->ip.ap.fmt.headers.cmd = CMD_CLEAR; + ap->fmt.headers.size = nread + pkt_header_size; + ap->fmt.headers.cmd = CMD_UDP_ENCAPSULATED; + ap->fmt.content.udp_encapsulated.port = url_get_port_int (fdinfo->url); bp->mode = BP_WRITING; bp->awrite = 0; @@ -129,8 +140,8 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp void dump_buffer_packet(struct buffer_packet* bp) { printf("\n"); - printf(" mode=%d, aread=%d, awrite=%d, ap_count=%d, usage=%ld/%ld\n", bp->mode, bp->aread, bp->awrite, bp->ap_count, get_full_size (bp), sizeof(struct internet_packet)); - union abstract_packet* ap = &bp->ip.ap; + printf(" mode=%d, aread=%d, awrite=%d, ap_count=%d, usage=%ld/%ld\n", bp->mode, bp->aread, bp->awrite, bp->ap_count, get_full_size (bp), sizeof(bp->ip)); + union abstract_packet* ap = (union abstract_packet*) &bp->ip; for (int i = 0; i < bp->ap_count; i++) { dump_abstract_packet(ap); ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size); @@ -142,21 +153,20 @@ void dump_abstract_packet(union abstract_packet* ap) { printf(" \n"); printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd); switch (ap->fmt.headers.cmd) { - case CMD_HEALTH: + case CMD_LINK_MONITORING_THUNDER: printf(" id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x\n", - ap->fmt.content.health.id, - ap->fmt.content.health.deltat, - ap->fmt.content.health.prevlink, - ap->fmt.content.health.min_blocked_pkt, - ap->fmt.content.health.bitfield); + ap->fmt.content.link_monitoring_thunder.id, + ap->fmt.content.link_monitoring_thunder.deltat, + ap->fmt.content.link_monitoring_thunder.prevlink, + ap->fmt.content.link_monitoring_thunder.min_blocked_pkt, + ap->fmt.content.link_monitoring_thunder.bitfield); break; - case CMD_CLEAR: - printf(" id=%d, port=%d\n", - ap->fmt.content.clear.id, - ap->fmt.content.clear.port); + case CMD_UDP_METADATA_THUNDER: + printf(" id=%d\n", + ap->fmt.content.udp_metadata_thunder.id); break; - case CMD_XOR: - printf(" Unimplemented\n"); + case CMD_UDP_ENCAPSULATED: + printf(" port=%d\n", ap->fmt.content.udp_encapsulated.port); break; default: printf(" \n"); diff --git a/src/packet.h b/src/packet.h index a8e78a8..30cb1d7 100644 --- a/src/packet.h +++ b/src/packet.h @@ -9,6 +9,8 @@ #include #include #include +#include "evt_core.h" +#include "url.h" /* * man 7 udp about receive operation on UDP sockets: @@ -30,9 +32,9 @@ enum BP_MODE { }; enum PKT_CMD { - CMD_HEALTH, - CMD_CLEAR, - CMD_XOR + CMD_UDP_ENCAPSULATED, + CMD_LINK_MONITORING_THUNDER, + CMD_UDP_METADATA_THUNDER, }; union abstract_packet { @@ -50,28 +52,27 @@ union abstract_packet { uint8_t prevlink; uint16_t deltat; uint16_t min_blocked_pkt; - } health; + } link_monitoring_thunder; struct { uint16_t id; + uint16_t deltat; + + } udp_metadata_thunder; + struct { uint16_t port; char payload; - } clear; + } udp_encapsulated; } content; } fmt; }; -struct internet_packet { - union abstract_packet ap; - char rest[1499]; // MTU = 1500, 1 byte in the union as payload -}; - struct buffer_packet { enum BP_MODE mode; uint8_t ap_count; uint16_t aread; uint16_t awrite; struct timespec seen; - struct internet_packet ip; + char ip[1500]; }; struct udp_target { @@ -83,10 +84,10 @@ struct udp_target { size_t get_full_size(struct buffer_packet* bp); -enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp); -enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp); -enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_target* udp_t); -enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp_target* udp_t); +enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fd, struct buffer_packet* bp); +enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fd, struct buffer_packet* bp); +enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fd, struct buffer_packet* bp, struct udp_target* udp_t); +enum FD_STATE read_packet_from_udp (struct evt_core_fdinfo* fd, struct buffer_packet* bp, struct udp_target* udp_t); void dump_buffer_packet(struct buffer_packet* bp); void dump_abstract_packet(union abstract_packet* ap); diff --git a/src/proxy.c b/src/proxy.c index afe9772..39c4c44 100644 --- a/src/proxy.c +++ b/src/proxy.c @@ -48,7 +48,7 @@ int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n"); while (bp->mode == BP_READING) { - read_res = read_packet_from_tcp (fdinfo->fd, bp); + read_res = read_packet_from_tcp (fdinfo, bp); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; } @@ -72,8 +72,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Read packet from socket - bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); - read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); + read_res = read_packet_from_udp (fdinfo, bp, fdinfo->other); if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_AGAIN) return 1; @@ -105,7 +104,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) // 2. Write data from the buffer to the socket while (bp->mode == BP_WRITING) { - write_res = write_packet_to_tcp(fdinfo->fd, bp); + write_res = write_packet_to_tcp(fdinfo, bp); if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_AGAIN) return 1; } @@ -131,7 +130,7 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; // 2. Write buffer - write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); + write_res = write_packet_to_udp(fdinfo, bp, fdinfo->other); if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_AGAIN) return 1; diff --git a/src/proxy.h b/src/proxy.h index fc9ab61..aee276e 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -50,16 +50,16 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); -void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); -int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); -int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); - void algo_dup2_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_dup2_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); +void algo_thunder_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); + static struct algo_desc available_algo[] = { { .name = "naive", @@ -68,19 +68,19 @@ static struct algo_desc available_algo[] = { .on_datagram = algo_naive_on_datagram, .on_err = algo_naive_on_err }, - { - .name = "rr", - .init = algo_rr_init, - .on_stream = algo_rr_on_stream, - .on_datagram = algo_rr_on_datagram, - .on_err = algo_rr_on_err - }, { .name = "dup2", .init = algo_dup2_init, .on_stream = algo_dup2_on_stream, .on_datagram = algo_dup2_on_datagram, .on_err = algo_dup2_on_err + }, + { + .name = "thunder", + .init = algo_thunder_init, + .on_stream = algo_thunder_on_stream, + .on_datagram = algo_thunder_on_datagram, + .on_err = algo_thunder_on_err } };