diff --git a/CMakeLists.txt b/CMakeLists.txt index 9f91415..5adfd64 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -16,8 +16,6 @@ list(APPEND CSOURCES src/donar_server.c src/evt_core.h src/evt_core.c - src/algo_skel.h - src/algo_skel.c src/algo_naive.c src/utils.h src/utils.c @@ -30,6 +28,8 @@ list(APPEND CSOURCES src/algo_rr.c src/algo_utils.h src/algo_utils.c + src/proxy.h + src/proxy.c ) add_executable(donar ${CSOURCES} src/donar.c) diff --git a/scripts/run-seq b/scripts/run-seq old mode 100644 new mode 100755 diff --git a/src/algo_naive.c b/src/algo_naive.c index 6367216..2f7cce7 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -1,67 +1,16 @@ +#include "proxy.h" #include "algo_utils.h" -#include "algo_skel.h" -int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); - -int on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - int conn_sock1, conn_sock2; - struct sockaddr_in addr; - socklen_t in_len; - char url[1024], port[6]; - struct evt_core_cat local_cat = {0}; - struct evt_core_fdinfo to_fdinfo = {0}; - to_fdinfo.cat = &local_cat; - to_fdinfo.url = url; - - in_len = sizeof(addr); - conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len); - - if (conn_sock1 == -1 && errno == EAGAIN) return 1; - if (conn_sock1 == -1) goto co_error; - conn_sock2 = dup(conn_sock1); - if (conn_sock2 == -1) goto co_error; - //printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2); - - url_get_port(port, fdinfo->url); - - to_fdinfo.fd = conn_sock1; - to_fdinfo.cat->name = "tcp-read"; - sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port); - evt_core_add_fd (ctx, &to_fdinfo); - - to_fdinfo.fd = conn_sock2; - to_fdinfo.cat->name = "tcp-write"; - sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port); - evt_core_add_fd (ctx, &to_fdinfo); - - return 0; - -co_error: - perror("Failed to handle new connection"); - exit(EXIT_FAILURE); +void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { + // We do nothing } -int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; +int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + char url[256]; struct evt_core_fdinfo *to_fdinfo = NULL; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - int read_res = FDS_READY; - char url[255]; - // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Try to read a whole packet in the buffer - while (bp->mode == BP_READING) { - read_res = read_packet_from_tcp (fdinfo->fd, bp); - if (read_res == FDS_ERR) goto co_error; - if (read_res == FDS_AGAIN) return 1; - } - - // 3. A whole packet has been read, we will find someone to write it + // 1. Find destination sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { @@ -69,66 +18,20 @@ int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { mv_buffer_wtof (app_ctx, fdinfo); return 1; } - //printf("Pass packet from %s to %s\n", fdinfo->url, url); - // 4. We move the buffer and notify the target + // 2. Move buffer mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); - on_udp_write(ctx, to_fdinfo); + main_on_udp_write(ctx, to_fdinfo); return 0; -co_error: - perror("Failed to TCP read"); - exit(EXIT_FAILURE); } -int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; +int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + char url[256]; + struct evt_core_fdinfo *to_fdinfo = NULL; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - int write_res = FDS_READY; - if (!app_ctx->is_rdy && strcmp(fdinfo->url, "tcp:write:127.0.0.1:7500") == 0) { - app_ctx->is_rdy = 1; - printf("=== Requested circuit is up ===\n"); - } - - // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Write data from the buffer to the socket - while (bp->mode == BP_WRITING) { - write_res = write_packet_to_tcp(fdinfo->fd, bp); - if (write_res == FDS_ERR) goto co_error; - if (write_res == FDS_AGAIN) return 1; - } - - // 3. A whole packet has been written - // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); - - return 0; -co_error: - perror("Failed to TCP write"); - exit(EXIT_FAILURE); -} - -int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; - struct evt_core_fdinfo *to_fdinfo; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - int read_res = FDS_READY; - char url[255]; - - // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Read packet from socket - bp->ip.ap.str.port = url_get_port_int (fdinfo->url); - read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); - if (read_res == FDS_ERR) goto co_error; - if (read_res == FDS_AGAIN) return 1; - - // 3. A whole packet has been read, we will find someone to write it + // 1. A whole packet has been read, we will find someone to write it sprintf(url, "tcp:write:127.0.0.1:7500"); to_fdinfo = evt_core_get_from_url (ctx, url); if (to_fdinfo == NULL) { @@ -138,123 +41,19 @@ int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { } //printf("Pass packet from %s to %s\n", fdinfo->url, url); - // 4. We move the buffer and notify the target + // 2. We move the buffer and notify the target mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); - on_tcp_write(ctx, to_fdinfo); - - return 0; - -co_error: - perror("Failed to UDP read"); - exit(EXIT_FAILURE); -} - -int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - int write_res = FDS_READY; - - // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Write buffer - write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); - if (write_res == FDS_ERR) goto co_error; - if (write_res == FDS_AGAIN) return 1; - - // 3. A whole packet has been written - // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); - - return 0; -co_error: - perror("Failed to UDP write"); - exit(EXIT_FAILURE); -} - -int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct buffer_packet* bp; - - // 1. If has a "used" buffer, remove it - bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); - if (bp != NULL) { - fprintf(stderr, "begin removing entry in app_ctx->used_buffer for %s\n", fdinfo->url); - g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd)); - fprintf(stderr, "end removing entry in app_ctx->used_buffer\n"); - memset(bp, 0, sizeof(struct buffer_packet)); - g_queue_push_tail(app_ctx->free_buffer, bp); - } - - // 2. If appears in the write waiting queue, remove it - GQueue* writew = g_hash_table_lookup (app_ctx->write_waiting, &(fdinfo->fd)); - while (writew != NULL && (bp = g_queue_pop_head (writew)) != NULL) { - memset(bp, 0, sizeof(struct buffer_packet)); - g_queue_push_tail(app_ctx->free_buffer, bp); - } - if (writew) g_hash_table_remove (app_ctx->write_waiting, &(fdinfo->fd)); - - // 3. If appears in the read waiting queue, remove it - g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); + main_on_tcp_write(ctx, to_fdinfo); return 0; } -void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) { - struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); - if (ctx == NULL) goto init_err; - memset(ctx, 0, sizeof(struct algo_ctx)); - ctx->free_buffer = g_queue_new (); - ctx->read_waiting = g_queue_new (); - ctx->application_waiting = g_hash_table_new (NULL, NULL); - ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); - ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); - ctx->ap = *ap; - ctx->is_rdy = 0; - for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { - g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); - } - - as->on_tcp_co.name = "tcp-listen"; - as->on_tcp_co.flags = EPOLLIN; - as->on_tcp_co.free_app_ctx = free_nothing; - as->on_tcp_co.cb = on_tcp_co; - - as->on_tcp_read.name = "tcp-read"; - as->on_tcp_read.flags = EPOLLIN | EPOLLET | EPOLLRDHUP; - as->on_tcp_read.app_ctx = ctx; - as->on_tcp_read.free_app_ctx = free_naive; - as->on_tcp_read.cb = on_tcp_read; - as->on_tcp_read.err_cb = on_err; - ctx->ref_count++; - - as->on_udp_read.name = "udp-read"; - as->on_udp_read.flags = EPOLLIN | EPOLLET; - as->on_udp_read.app_ctx = ctx; - as->on_udp_read.free_app_ctx = free_naive; - as->on_udp_read.cb = on_udp_read; - as->on_udp_read.err_cb = on_err; - ctx->ref_count++; - - as->on_tcp_write.name = "tcp-write"; - as->on_tcp_write.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP; - as->on_tcp_write.app_ctx = ctx; - as->on_tcp_write.free_app_ctx = free_naive; - as->on_tcp_write.cb = on_tcp_write; - as->on_tcp_write.err_cb = on_err; - ctx->ref_count++; - - as->on_udp_write.name = "udp-write"; - as->on_udp_write.flags = EPOLLOUT | EPOLLET; - as->on_udp_write.app_ctx = ctx; - as->on_udp_write.free_app_ctx = free_naive; - as->on_udp_write.cb = on_udp_write; - as->on_udp_write.err_cb = on_err; - ctx->ref_count++; - - return; -init_err: - fprintf(stderr, "Failed to init algo naive\n"); - exit(EXIT_FAILURE); +int algo_naive_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + // We do nothing + return 0; +} + +int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { + // We do nothing + return 1; } diff --git a/src/algo_rr.c b/src/algo_rr.c index 23f3f31..d45e823 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -1,7 +1,8 @@ #include -#include "algo_skel.h" #include "algo_utils.h" #include "utils.h" +#include "url.h" +#include "proxy.h" struct waited_pkt { uint16_t id; @@ -31,49 +32,6 @@ struct rr_ctx { struct waited_pkt wait[PACKET_BUFFER_SIZE]; }; -int rr_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -int rr_on_udp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); - -int rr_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - int conn_sock1, conn_sock2; - struct sockaddr_in addr; - socklen_t in_len; - char url[1024], port[6]; - struct evt_core_cat local_cat = {0}; - struct evt_core_fdinfo to_fdinfo = {0}; - to_fdinfo.cat = &local_cat; - to_fdinfo.url = url; - - in_len = sizeof(addr); - conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len); - - if (conn_sock1 == -1 && errno == EAGAIN) return 1; - if (conn_sock1 == -1) goto co_error; - conn_sock2 = dup(conn_sock1); - if (conn_sock2 == -1) goto co_error; - //printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2); - - url_get_port(port, fdinfo->url); - - to_fdinfo.fd = conn_sock1; - to_fdinfo.cat->name = "tcp-read"; - sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port); - evt_core_add_fd (ctx, &to_fdinfo); - - to_fdinfo.fd = conn_sock2; - to_fdinfo.cat->name = "tcp-write"; - sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port); - evt_core_add_fd (ctx, &to_fdinfo); - - return 0; - -co_error: - perror("Failed to handle new connection"); - exit(EXIT_FAILURE); -} - void show_link_availability(struct rr_ctx* rr) { printf("Links availability: my_links["); for (int i = 0; i < 8; i++) { @@ -88,6 +46,7 @@ void show_link_availability(struct rr_ctx* rr) { printf("]\n"); } +// @TODO Might be extracted from RR int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt* wpkt) { struct timespec now; struct itimerspec timer_config; @@ -242,7 +201,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct // 6. We move the buffer and notify the target //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); - rr_on_udp_write(ctx, to_fdinfo); + main_on_udp_write(ctx, to_fdinfo); } void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { @@ -268,24 +227,14 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { //------ -int rr_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; +int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; - int read_res = FDS_READY; - // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Try to read a whole packet in the buffer - while (bp->mode == BP_READING) { - read_res = read_packet_from_tcp (fdinfo->fd, bp); - if (read_res == FDS_ERR) goto co_error; - if (read_res == FDS_AGAIN) return 1; - } - - // 3. Logic on packet + // 1. Register packet in our queue rr_pkt_register(ctx, fdinfo, bp); + + // 2. Process queue rr_pkt_unroll (ctx, app_ctx); return 0; @@ -294,48 +243,13 @@ co_error: exit(EXIT_FAILURE); } -int rr_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - int write_res = FDS_READY; +int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; + struct evt_core_fdinfo *to_fdinfo = NULL; + char url[255]; - // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Write buffer - write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); - if (write_res == FDS_ERR) goto co_error; - if (write_res == FDS_AGAIN) return 1; - - // 3. A whole packet has been written - // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); - - return 0; -co_error: - perror("Failed to UDP write"); - exit(EXIT_FAILURE); -} - -int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; - struct evt_core_fdinfo *to_fdinfo = NULL; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - int read_res = FDS_READY; - char url[255]; - - // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later - if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Read packet from socket - bp->ip.ap.str.port = url_get_port_int (fdinfo->url); - read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); - if (read_res == FDS_ERR) goto co_error; - if (read_res == FDS_AGAIN) return 1; - - // 3. Prepare RR state and packet values + // 1. Prepare RR state and packet values struct timespec curr; int secs, nsecs; uint64_t mili_sec; @@ -360,6 +274,7 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { rr->emit_time = curr; rr->sent_id++; + // 2. Try to find someone to send it int max = 10; uint8_t sel_link = rr->current_link; while(max-- >= 0) { @@ -371,16 +286,16 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) { rr->current_link = sel_link; mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); - rr_on_tcp_write(ctx, to_fdinfo); + main_on_tcp_write(ctx, to_fdinfo); return 0; } else { dup_buffer_tow(app_ctx, bp, to_fdinfo); - rr_on_tcp_write(ctx, to_fdinfo); + main_on_tcp_write(ctx, to_fdinfo); } } not_ready: - // 4. A whole packet has been read, we will find someone to write it + // 3. We find no up target fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); mv_buffer_wtof (app_ctx, fdinfo); return 0; @@ -390,50 +305,7 @@ co_error: exit(EXIT_FAILURE); } -int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct buffer_packet* bp; - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct rr_ctx* rr = app_ctx->misc; - int write_res = FDS_READY; - - // 0. Show some information about circuits - uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0; - if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count); - else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count); - app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests - - // 1. Get current write buffer OR a buffer from the waiting queue OR leave - if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; - - // 2. Write data from the buffer to the socket - while (bp->mode == BP_WRITING) { - write_res = write_packet_to_tcp(fdinfo->fd, bp); - if (write_res == FDS_ERR) goto co_error; - if (write_res == FDS_AGAIN) return 1; - } - - // 3. A whole packet has been written - // Release the buffer and notify - mv_buffer_wtof(app_ctx, fdinfo); - notify_read(ctx, app_ctx); - - return 0; -co_error: - perror("Failed to TCP write"); - exit(EXIT_FAILURE); -} - -int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - uint64_t ctr; - ssize_t tmr_rd; - tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr)); - if (tmr_rd == -1 && errno == EAGAIN) return 1; - if (tmr_rd < 0) { - perror("read on timer"); - fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd); - exit(EXIT_FAILURE); - } - +int algo_rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct rr_ctx* rr = app_ctx->misc; @@ -459,46 +331,18 @@ end: return 1; } -int rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; - struct buffer_packet* bp; - - // 1. If has a "used" buffer, remove it - bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); - if (bp != NULL) { - g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd)); - memset(bp, 0, sizeof(struct buffer_packet)); - g_queue_push_tail(app_ctx->free_buffer, bp); - } - - // 2. If appears in the write waiting queue, remove it - GQueue* writew = g_hash_table_lookup (app_ctx->write_waiting, &(fdinfo->fd)); - while (writew != NULL && (bp = g_queue_pop_head (writew)) != NULL) { - memset(bp, 0, sizeof(struct buffer_packet)); - g_queue_push_tail(app_ctx->free_buffer, bp); - } - if (writew) g_hash_table_remove (app_ctx->write_waiting, &(fdinfo->fd)); - - // 3. If appears in the read waiting queue, remove it - g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); - +int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + // We do nothing return 0; } -void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) { - struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); - if (ctx == NULL) goto init_err; - memset(ctx, 0, sizeof(struct algo_ctx)); - ctx->free_buffer = g_queue_new (); - ctx->read_waiting = g_queue_new (); - ctx->application_waiting = g_hash_table_new (NULL, NULL); - ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); - ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); - ctx->link_count = 8; - ctx->is_rdy = 0; - ctx->ap = *ap; + +void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { struct rr_ctx* rr = malloc(sizeof(struct rr_ctx)); - if (rr == NULL) goto init_err; + if (rr == NULL) { + perror("malloc failed for rr_init."); + exit(EXIT_FAILURE); + } memset(rr, 0, sizeof(struct rr_ctx)); rr->mjit = 200; rr->my_links = 0xff; @@ -506,63 +350,6 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* rr->sent_id = 1; rr->recv_id = 0; rr->recv_id_late = 0; - ctx->misc = rr; - for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { - g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); - } - - - as->on_tcp_co.name = "tcp-listen"; - as->on_tcp_co.flags = EPOLLIN; - as->on_tcp_co.free_app_ctx = free_nothing; - as->on_tcp_co.cb = rr_on_tcp_co; - - as->on_tcp_read.name = "tcp-read"; - as->on_tcp_read.flags = EPOLLIN | EPOLLET | EPOLLRDHUP; - as->on_tcp_read.app_ctx = ctx; - as->on_tcp_read.free_app_ctx = free_naive; - as->on_tcp_read.cb = rr_on_tcp_read; - as->on_tcp_read.err_cb = rr_on_err; - ctx->ref_count++; - - - as->on_udp_read.name = "udp-read"; - as->on_udp_read.flags = EPOLLIN | EPOLLET; - as->on_udp_read.app_ctx = ctx; - as->on_udp_read.free_app_ctx = free_naive; - as->on_udp_read.cb = rr_on_udp_read; - as->on_udp_read.err_cb = rr_on_err; - ctx->ref_count++; - - as->on_tcp_write.name = "tcp-write"; - as->on_tcp_write.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP; - as->on_tcp_write.app_ctx = ctx; - as->on_tcp_write.free_app_ctx = free_naive; - as->on_tcp_write.cb = rr_on_tcp_write; - as->on_tcp_write.err_cb = rr_on_err; - ctx->ref_count++; - - as->on_udp_write.name = "udp-write"; - as->on_udp_write.flags = EPOLLOUT | EPOLLET; - as->on_udp_write.app_ctx = ctx; - as->on_udp_write.free_app_ctx = free_naive; - as->on_udp_write.cb = rr_on_udp_write; - as->on_udp_write.err_cb = rr_on_err; - ctx->ref_count++; - - struct evt_core_cat tcat = { - .name = "timeout", - .flags = EPOLLIN | EPOLLET, - .app_ctx = ctx, - .free_app_ctx = free_naive, - .cb = rr_on_timer, - .err_cb = NULL - }; - ctx->ref_count++; - evt_core_add_cat(evt, &tcat); - - return; -init_err: - fprintf(stderr, "Failed to init algo naive\n"); - exit(EXIT_FAILURE); + app_ctx->misc = rr; } + diff --git a/src/algo_skel.c b/src/algo_skel.c deleted file mode 100644 index 5162f42..0000000 --- a/src/algo_skel.c +++ /dev/null @@ -1,12 +0,0 @@ -#include "algo_skel.h" - -void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap) { - for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { - if (strcmp(available_algo[i].name, name) == 0) { - available_algo[i].init(ctx, as, ap); - return; - } - } - fprintf(stderr, "Algorithm %s has not been found\n", name); - exit(EXIT_FAILURE); -} diff --git a/src/algo_skel.h b/src/algo_skel.h deleted file mode 100644 index 658cc10..0000000 --- a/src/algo_skel.h +++ /dev/null @@ -1,44 +0,0 @@ -#pragma once -#include -#include -#include -#include -#include "packet.h" -#include "evt_core.h" -#include "utils.h" -#include "url.h" - -struct algo_params { - uint8_t is_waiting_bootstrap; - uint8_t is_healing; -}; - -struct algo_skel { - struct evt_core_cat on_udp_read; - struct evt_core_cat on_tcp_read; - struct evt_core_cat on_udp_write; - struct evt_core_cat on_tcp_write; - struct evt_core_cat on_tcp_co; -}; - -typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); - -void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap); -void algo_naive(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); -void algo_rr(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap); - -struct algo_desc { - algo_init init; - char* name; -}; - -static struct algo_desc available_algo[] = { - { - .init = algo_naive, - .name = "naive" - }, - { - .init = algo_rr, - .name = "rr" - } -}; diff --git a/src/algo_utils.h b/src/algo_utils.h index 755b004..eeb690a 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -2,12 +2,35 @@ #include #include #include -#include "algo_skel.h" +#include "packet.h" +#include "evt_core.h" #define PACKET_BUFFER_SIZE 20 +struct algo_params { + uint8_t is_waiting_bootstrap; + uint8_t is_healing; + char* algo_name; +}; + + +struct algo_ctx; +typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); + typedef void (*algo_ctx_free_misc)(void*); +struct algo_desc { + char* name; + algo_init init; + algo_ctx_on_buffer on_stream; + algo_ctx_on_buffer on_datagram; + algo_ctx_on_event on_timer; + algo_ctx_on_event on_err; +}; + struct algo_ctx { + struct algo_desc* desc; uint8_t link_count; uint8_t is_rdy; struct algo_params ap; diff --git a/src/donar.c b/src/donar.c index 96316f7..411a90c 100644 --- a/src/donar.c +++ b/src/donar.c @@ -4,7 +4,6 @@ #include #include #include -#include "algo_skel.h" #include "donar_client.h" #include "donar_server.h" diff --git a/src/donar_client.c b/src/donar_client.c index 99f9e85..55d82a9 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -70,22 +70,19 @@ void init_socks5_sinks(struct donar_client_ctx* app_ctx) { } void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { - struct algo_skel algo = {0}; struct algo_params ap = { .is_waiting_bootstrap = dp->is_waiting_bootstrap, - .is_healing = dp->is_healing + .is_healing = dp->is_healing, + .algo_name = dp->algo }; evt_core_init (&(ctx->evts), dp->verbose); - init_algo(&ctx->evts, &algo, dp->algo, &ap); + algo_main_init(&ctx->evts, &ap); + printf("--- Algorithm initialized\n"); + socks5_init (&ctx->evts); init_socks5_sinks(ctx); - evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); - evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read)); - evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read)); - evt_core_add_cat (&(ctx->evts), &(algo.on_udp_write)); - evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_write)); - printf("--- Categories created\n"); + printf("--- Socks5 connection process started\n"); load_onion_services (ctx, dp->onion_file, CLIENT_PORT_SIZE); printf("--- Onion services loaded\n"); diff --git a/src/donar_client.h b/src/donar_client.h index 4493aa4..32d188d 100644 --- a/src/donar_client.h +++ b/src/donar_client.h @@ -2,9 +2,9 @@ #include #include #include -#include "algo_skel.h" #include "tor_os.h" #include "socks5.h" +#include "proxy.h" #include "donar_init.h" #define CLIENT_PORT_SIZE 8 diff --git a/src/donar_server.c b/src/donar_server.c index 48e309f..66e7f87 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -52,21 +52,15 @@ socket_create_err: } void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { - struct algo_skel algo = {0}; struct algo_params ap = { .is_waiting_bootstrap = dp->is_waiting_bootstrap, - .is_healing = dp->is_healing + .is_healing = dp->is_healing, + .algo_name = dp->algo }; evt_core_init (&(ctx->evts), dp->verbose); - init_algo(&ctx->evts, &algo, dp->algo, &ap); - evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co)); - evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read)); - evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read)); - evt_core_add_cat (&(ctx->evts), &(algo.on_udp_write)); - evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_write)); - - printf("--- Categories created\n"); + algo_main_init(&ctx->evts, &ap); + printf("--- Algorithm initialized\n"); for (uint16_t i = 0; i < PORT_SIZE ; i++) { ctx->ports[i] = 7500 + i; diff --git a/src/donar_server.h b/src/donar_server.h index 94eb42d..37e9404 100644 --- a/src/donar_server.h +++ b/src/donar_server.h @@ -8,8 +8,8 @@ #include "tor_os.h" #include "tor_ctl.h" #include "evt_core.h" -#include "algo_skel.h" #include "donar_init.h" +#include "proxy.h" #define PORT_SIZE 8 diff --git a/src/proxy.c b/src/proxy.c new file mode 100644 index 0000000..3be7cd7 --- /dev/null +++ b/src/proxy.c @@ -0,0 +1,283 @@ +#include "proxy.h" + +int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + int conn_sock1, conn_sock2; + struct sockaddr_in addr; + socklen_t in_len; + char url[1024], port[6]; + struct evt_core_cat local_cat = {0}; + struct evt_core_fdinfo to_fdinfo = {0}; + to_fdinfo.cat = &local_cat; + to_fdinfo.url = url; + + in_len = sizeof(addr); + conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len); + + if (conn_sock1 == -1 && errno == EAGAIN) return 1; + if (conn_sock1 == -1) goto co_error; + conn_sock2 = dup(conn_sock1); + if (conn_sock2 == -1) goto co_error; + //printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2); + + url_get_port(port, fdinfo->url); + + to_fdinfo.fd = conn_sock1; + to_fdinfo.cat->name = "tcp-read"; + sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port); + evt_core_add_fd (ctx, &to_fdinfo); + + to_fdinfo.fd = conn_sock2; + to_fdinfo.cat->name = "tcp-write"; + sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port); + evt_core_add_fd (ctx, &to_fdinfo); + + return 0; + +co_error: + perror("Failed to handle new connection"); + exit(EXIT_FAILURE); +} + +int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct buffer_packet* bp; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + int read_res = FDS_READY; + + // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later + if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; + + // 2. Try to read a whole packet in the buffer + while (bp->mode == BP_READING) { + read_res = read_packet_from_tcp (fdinfo->fd, bp); + if (read_res == FDS_ERR) goto co_error; + if (read_res == FDS_AGAIN) return 1; + } + + // 3. Logic on packet + return app_ctx->desc->on_stream(ctx, fdinfo, bp); + +co_error: + perror("Failed to TCP read"); + exit(EXIT_FAILURE); +} + +int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct buffer_packet* bp; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + int read_res = FDS_READY; + char url[255]; + + // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later + if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; + + // 2. Read packet from socket + bp->ip.ap.str.port = url_get_port_int (fdinfo->url); + read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); + if (read_res == FDS_ERR) goto co_error; + if (read_res == FDS_AGAIN) return 1; + + // 3. Apply logic + return app_ctx->desc->on_datagram(ctx, fdinfo, bp); + +co_error: + perror("Failed to UDP read"); + exit(EXIT_FAILURE); +} + +int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct buffer_packet* bp; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct rr_ctx* rr = app_ctx->misc; + int write_res = FDS_READY; + + // 0. Show some information about circuits + uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0; + if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count); + else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count); + app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests + + // 1. Get current write buffer OR a buffer from the waiting queue OR leave + if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; + + // 2. Write data from the buffer to the socket + while (bp->mode == BP_WRITING) { + write_res = write_packet_to_tcp(fdinfo->fd, bp); + if (write_res == FDS_ERR) goto co_error; + if (write_res == FDS_AGAIN) return 1; + } + + // 3. A whole packet has been written + // Release the buffer and notify + mv_buffer_wtof(app_ctx, fdinfo); + notify_read(ctx, app_ctx); + + return 0; +co_error: + perror("Failed to TCP write"); + exit(EXIT_FAILURE); +} + +int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct buffer_packet* bp; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + int write_res = FDS_READY; + + // 1. Get current write buffer OR a buffer from the waiting queue OR leave + if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; + + // 2. Write buffer + write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); + if (write_res == FDS_ERR) goto co_error; + if (write_res == FDS_AGAIN) return 1; + + // 3. A whole packet has been written + // Release the buffer and notify + mv_buffer_wtof(app_ctx, fdinfo); + notify_read(ctx, app_ctx); + + return 0; +co_error: + perror("Failed to UDP write"); + exit(EXIT_FAILURE); +} + +int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + uint64_t ctr; + ssize_t tmr_rd; + tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr)); + if (tmr_rd == -1 && errno == EAGAIN) return 1; + if (tmr_rd < 0) { + perror("read on timer"); + fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd); + exit(EXIT_FAILURE); + } + + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + return app_ctx->desc->on_timer (ctx, fdinfo); + +end: + evt_core_rm_fd(ctx, fdinfo->fd); + return 1; +} + +int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct buffer_packet* bp; + + // 1. If has a "used" buffer, remove it + bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); + if (bp != NULL) { + g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd)); + memset(bp, 0, sizeof(struct buffer_packet)); + g_queue_push_tail(app_ctx->free_buffer, bp); + } + + // 2. If appears in the write waiting queue, remove it + GQueue* writew = g_hash_table_lookup (app_ctx->write_waiting, &(fdinfo->fd)); + while (writew != NULL && (bp = g_queue_pop_head (writew)) != NULL) { + memset(bp, 0, sizeof(struct buffer_packet)); + g_queue_push_tail(app_ctx->free_buffer, bp); + } + if (writew) g_hash_table_remove (app_ctx->write_waiting, &(fdinfo->fd)); + + // 3. If appears in the read waiting queue, remove it + g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); + + return app_ctx->desc->on_err(ctx, fdinfo); +} + +void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { + struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); + if (ctx == NULL) goto init_err; + memset(ctx, 0, sizeof(struct algo_ctx)); + ctx->free_buffer = g_queue_new (); + ctx->read_waiting = g_queue_new (); + ctx->application_waiting = g_hash_table_new (NULL, NULL); + ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); + ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + ctx->link_count = 8; + ctx->is_rdy = 0; + ctx->ap = *ap; + for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { + g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); + } + + struct evt_core_cat tcp_listen = { + .name = "tcp-listen", + .flags = EPOLLIN, + .app_ctx = ctx, + .free_app_ctx = free_nothing, + .cb = main_on_tcp_co, + .err_cb = NULL + }; + evt_core_add_cat(evt, &tcp_listen); + + struct evt_core_cat tcp_read = { + .name = "tcp-read", + .flags = EPOLLIN | EPOLLET | EPOLLRDHUP, + .app_ctx = ctx, + .free_app_ctx = free_naive, + .cb = main_on_tcp_read, + .err_cb = main_on_err + }; + ctx->ref_count++; + evt_core_add_cat(evt, &tcp_read); + + struct evt_core_cat udp_read = { + .name = "udp-read", + .flags = EPOLLIN | EPOLLET | EPOLLRDHUP, + .app_ctx = ctx, + .free_app_ctx = free_naive, + .cb = main_on_udp_read, + .err_cb = main_on_err + }; + ctx->ref_count++; + evt_core_add_cat(evt, &udp_read); + + struct evt_core_cat tcp_write = { + .name = "tcp-write", + .flags = EPOLLOUT | EPOLLET | EPOLLRDHUP, + .app_ctx = ctx, + .free_app_ctx = free_naive, + .cb = main_on_tcp_write, + .err_cb = main_on_err + }; + ctx->ref_count++; + evt_core_add_cat(evt, &tcp_write); + + struct evt_core_cat udp_write = { + .name = "udp-write", + .flags = EPOLLOUT | EPOLLET, + .app_ctx = ctx, + .free_app_ctx = free_naive, + .cb = main_on_udp_write, + .err_cb = main_on_err + }; + ctx->ref_count++; + evt_core_add_cat(evt, &udp_write); + + struct evt_core_cat timer = { + .name = "timeout", + .flags = EPOLLIN | EPOLLET, + .app_ctx = ctx, + .free_app_ctx = free_naive, + .cb = main_on_timer, + .err_cb = NULL + }; + ctx->ref_count++; + evt_core_add_cat(evt, &timer); + + for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { + if (strcmp(available_algo[i].name, ap->algo_name) == 0) { + ctx->desc = &(available_algo[i]); + ctx->desc->init(evt, ctx, ap); + return; + } + } + fprintf(stderr, "Algorithm %s has not been found\n", ap->algo_name); + exit(EXIT_FAILURE); + +init_err: + fprintf(stderr, "Failed to init proxy\n"); + exit(EXIT_FAILURE); +} diff --git a/src/proxy.h b/src/proxy.h new file mode 100644 index 0000000..bd0420c --- /dev/null +++ b/src/proxy.h @@ -0,0 +1,61 @@ +#pragma once +#include +#include +#include +#include +#include +#include "evt_core.h" +#include "algo_utils.h" +#include "url.h" +#include "utils.h" +#include "packet.h" + +void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_naive_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); + +void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); + +static struct algo_desc available_algo[] = { + { + .name = "naive", + .init = algo_naive_init, + .on_stream = algo_naive_on_stream, + .on_datagram = algo_naive_on_datagram, + .on_timer = algo_naive_on_timer, + .on_err = algo_naive_on_err + }, + { + .name = "rr", + .init = algo_rr_init, + .on_stream = algo_rr_on_stream, + .on_datagram = algo_rr_on_datagram, + .on_timer = algo_rr_on_timer, + .on_err = algo_rr_on_err + }, + { + .name = "dup2", + .init = NULL, + .on_stream = NULL, + .on_datagram = NULL, + .on_timer = NULL, + .on_err = NULL + } +}; + + +void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap); + +int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); +int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);