diff --git a/src/algo_naive.c b/src/algo_naive.c index c794100..04593f0 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -2,8 +2,7 @@ struct naive_ctx { int ref_count; - struct buffer_packet tcp_to_udp; - struct buffer_packet udp_to_tcp; + GHashTable* packet_buffer; }; void free_nothing(void* app_ctx) {} @@ -13,6 +12,11 @@ void free_naive(void* app_ctx) { if (ctx->ref_count <= 0) free(ctx); } +char* get_port(char* out, char* in) { + sscanf(in, "%*[a-z]:%*[a-z]:%*[a-zA-Z0-9.]:%[0-9]", out); + return out; +} + void on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { int conn_sock1, conn_sock2, port; struct sockaddr_in addr; @@ -54,9 +58,91 @@ co_error: exit(EXIT_FAILURE); } -char* get_port(char* out, char* in) { - sscanf(in, "%*[a-z]:%*[a-z]:%*[a-zA-Z0-9.]:%[0-9]", out); - return out; +struct buffer_packet* get_bp(struct evt_core_cat* cat, int fd, int is_read_buffer) { + struct buffer_packet* bp; + struct naive_ctx* ctx = cat->app_ctx; + + bp = g_hash_table_lookup (ctx->packet_buffer, &fd); + if (bp != NULL) return bp; + if (!is_read_buffer) goto alloc_error; + + bp = malloc(sizeof(struct buffer_packet)); + if (bp == NULL) goto alloc_error; + memset(bp, 0, sizeof(struct buffer_packet)); + bp->fdread = fd; + bp->fdwrite = -1; + bp->ref_count++; + g_hash_table_insert(ctx->packet_buffer, &(bp->fdread), bp); + return bp; + +alloc_error: + perror("alloc error"); + exit(EXIT_FAILURE); +} + +void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd); +void on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd); +void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd); +void on_udp_write (struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd); + +void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { + struct buffer_packet* bp; + struct evt_core_fdinfo *to_fdinfo = NULL; + struct naive_ctx* app_ctx = cat->app_ctx; + int read_res = FDS_READY; + char url[255]; + + // 1. Read in our buffer packet + bp = get_bp(cat, fd, TRUE); + while (read_res != FDS_AGAIN && bp->mode == BP_READING) { + read_res = read_packet_from_tcp (fd, bp); + if (read_res == FDS_ERR) goto co_error; + } + if (bp->mode != BP_SWITCH_WRITE) return; + + // 2. Packet has just been read, choose a destination + sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); + to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) goto co_error; + + // 3. Configure packet buffer for the destination + bp->mode = BP_WRITING; + bp->fdwrite = to_fdinfo->fd; + bp->ref_count++; + g_hash_table_insert(app_ctx->packet_buffer, &(bp->fdwrite), bp); + on_udp_write(ctx, to_fdinfo->cat, to_fdinfo->fd); + + return; +co_error: + perror("Failed to TCP read"); + exit(EXIT_FAILURE); +} + +void on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { + struct buffer_packet* bp; + struct evt_core_fdinfo *to_fdinfo; + int write_res = FDS_READY; + char url[255]; + + bp = get_bp(cat, fd, FALSE); + if (bp == NULL) return; + + while (write_res != FDS_AGAIN && bp->mode == BP_WRITING) { + write_res = write_packet_to_tcp(fd, bp); + if (write_res == FDS_ERR) goto co_error; + } + +co_error: + perror("Failed to TCP write"); + exit(EXIT_FAILURE); +} + +void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { + +} + +void on_udp_write (struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { + } void tcp_to_udp(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { @@ -112,7 +198,7 @@ void udp_to_tcp(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { // 1. Read packet from UDP socket if (bp->mode == BP_READING) { - read_res = read_packet_from_udp (fd, bp, &(app_ctx->udp_t)); + read_res = read_packet_from_udp (fd, bp); if (read_res == FDS_ERR) goto co_error; } @@ -130,10 +216,28 @@ co_error: exit(EXIT_FAILURE); } +void free_buffer_packet(void *v) { + struct buffer_packet* bp = (struct buffer_packet*) v; + bp->ref_count--; + if (bp->ref_count == 0) free(v); +} + +void free_simple(void* v) { + free(v); +} + void algo_naive(struct algo_skel* as) { struct naive_ctx* ctx = malloc(sizeof(struct naive_ctx)); if (ctx == NULL) goto init_err; memset(ctx, 0, sizeof(struct naive_ctx)); + ctx->packet_buffer = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, free_buffer_packet); + + as->on_init.name = "init"; + as->on_init.flags = 0; + as->on_init.app_ctx = NULL; + as->on_init.free_app_ctx = NULL; + as->on_init.cb = NULL; + as->on_init.socklist = NULL; as->on_tcp_co.name = "tcp-listen"; as->on_tcp_co.flags = EPOLLIN; @@ -146,7 +250,7 @@ void algo_naive(struct algo_skel* as) { as->on_tcp_read.flags = EPOLLIN | EPOLLET | EPOLLRDHUP; as->on_tcp_read.app_ctx = ctx; as->on_tcp_read.free_app_ctx = free_naive; - as->on_tcp_read.cb = tcp_to_udp; + as->on_tcp_read.cb = on_tcp_read; as->on_tcp_read.socklist = NULL; ctx->ref_count++; @@ -154,7 +258,7 @@ void algo_naive(struct algo_skel* as) { as->on_udp_read.flags = EPOLLIN | EPOLLET; as->on_udp_read.app_ctx = ctx; as->on_udp_read.free_app_ctx = free_naive; - as->on_udp_read.cb = udp_to_tcp; + as->on_udp_read.cb = on_udp_read; as->on_udp_read.socklist = NULL; ctx->ref_count++; @@ -162,7 +266,7 @@ void algo_naive(struct algo_skel* as) { as->on_tcp_write.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP; as->on_tcp_write.app_ctx = ctx; as->on_tcp_write.free_app_ctx = free_naive; - as->on_tcp_write.cb = udp_to_tcp; + as->on_tcp_write.cb = on_tcp_write; as->on_tcp_write.socklist = NULL; ctx->ref_count++; @@ -170,7 +274,7 @@ void algo_naive(struct algo_skel* as) { as->on_udp_write.flags = EPOLLOUT | EPOLLET; as->on_udp_write.app_ctx = ctx; as->on_udp_write.free_app_ctx = free_naive; - as->on_udp_write.cb = tcp_to_udp; + as->on_udp_write.cb = on_udp_write; as->on_udp_write.socklist = NULL; ctx->ref_count++; diff --git a/src/algo_skel.h b/src/algo_skel.h index e4a21ef..bdfe972 100644 --- a/src/algo_skel.h +++ b/src/algo_skel.h @@ -8,6 +8,7 @@ #include "utils.h" struct algo_skel { + struct evt_core_cat on_init; struct evt_core_cat on_udp_read; struct evt_core_cat on_tcp_read; struct evt_core_cat on_udp_write; diff --git a/src/donar.c b/src/donar.c index 9af7a19..8e40703 100644 --- a/src/donar.c +++ b/src/donar.c @@ -51,7 +51,7 @@ int main(int argc, char** argv) { if (!(is_server ^ is_client)) goto in_error; if (algo == NULL) goto in_error; - struct algo_skel as; + struct algo_skel as = {0}; init_algo(&as, algo); if (is_server) { diff --git a/src/donar_client.c b/src/donar_client.c index f21f1eb..08f9ea9 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -150,6 +150,7 @@ void donar_client(struct donar_client_ctx* ctx, struct algo_skel* algo, char* on .socklist = NULL }; evt_core_add_cat (&(ctx->evts), &init_socks5); + evt_core_add_cat (&(ctx->evts), &(algo->on_init)); evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_co)); evt_core_add_cat (&(ctx->evts), &(algo->on_udp_read)); evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_read)); diff --git a/src/donar_server.c b/src/donar_server.c index bcebeab..8437cb8 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -82,6 +82,7 @@ socket_failed: void donar_server(struct donar_server_ctx* ctx, struct algo_skel* algo, GPtrArray* ports) { evt_core_init (&(ctx->evts)); + evt_core_add_cat (&(ctx->evts), &(algo->on_init)); evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_co)); evt_core_add_cat (&(ctx->evts), &(algo->on_udp_read)); evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_read)); diff --git a/src/evt_core.c b/src/evt_core.c index 37bc76d..5a6e457 100644 --- a/src/evt_core.c +++ b/src/evt_core.c @@ -133,6 +133,11 @@ void evt_core_loop(struct evt_core_ctx* ctx) { struct evt_core_fdinfo* fdinfo; struct evt_core_cat* cat; + cat = g_hash_table_lookup(ctx->catlist, "init"); + if (cat != NULL) { + cat->cb(ctx, cat, -1); + } + printf("--- Start main loop\n"); int num_fd, n = 0; while(1) { @@ -172,6 +177,11 @@ void evt_core_loop(struct evt_core_ctx* ctx) { } } + cat = g_hash_table_lookup(ctx->catlist, "destroy"); + if (cat != NULL) { + cat->cb(ctx, cat, -1); + } + evt_core_free(ctx); } diff --git a/src/packet.c b/src/packet.c index 34bf579..568d926 100644 --- a/src/packet.c +++ b/src/packet.c @@ -3,7 +3,7 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { ssize_t nread; size_t pkt_size_size = sizeof(bp->ip.ap.str.size); - if (bp->mode == BP_WRITING) return FDS_ERR; + if (bp->mode != BP_READING) return FDS_ERR; while (bp->aread < pkt_size_size) { nread = read(fd, &(bp->ip.ap.raw) + bp->aread, pkt_size_size - bp->aread); @@ -21,7 +21,7 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { bp->aread += nread; } - bp->mode = BP_WRITING; + bp->mode = BP_SWITCH_WRITE; bp->awrite = 0; return FDS_READY; @@ -30,6 +30,7 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) { enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { ssize_t nwrite; + if (bp->mode != BP_WRITING) return FDS_ERR; while (bp->awrite < bp->ip.ap.str.size) { nwrite = send(fd, &(bp->ip.ap.raw), bp->ip.ap.str.size, 0); if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; @@ -37,7 +38,7 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) { bp->awrite += nwrite; } - bp->mode = BP_READING; + bp->mode = BP_SWITCH_READ; bp->aread = 0; return FDS_READY; @@ -48,7 +49,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp) { size_t bytes_to_send; size_t pkt_header_size = sizeof(bp->ip.ap.str) - sizeof(char); - if (bp->mode == BP_READING) return FDS_ERR; + if (bp->mode != BP_WRITING) return FDS_ERR; bytes_to_send = bp->ip.ap.str.size - pkt_header_size; nwrite = send(fd, @@ -59,7 +60,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp) { if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN; if (nwrite != bytes_to_send) return FDS_ERR; - bp->mode = BP_READING; + bp->mode = BP_SWITCH_READ; bp->aread = 0; return FDS_READY; @@ -67,7 +68,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp) { enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp) { ssize_t nread; - if (bp->mode == BP_WRITING) return FDS_ERR; + if (bp->mode != BP_READING) return FDS_ERR; size_t pkt_header_size = sizeof(bp->ip.ap.str) - sizeof(char); // We remove the payload size_t udp_packet_size = sizeof(struct internet_packet) - pkt_header_size; @@ -80,7 +81,7 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp) { bp->ip.ap.str.size = nread + pkt_header_size; - bp->mode = BP_WRITING; + bp->mode = BP_SWITCH_WRITE; bp->awrite = 0; return FDS_READY; diff --git a/src/packet.h b/src/packet.h index f8d1f8f..357e956 100644 --- a/src/packet.h +++ b/src/packet.h @@ -24,7 +24,9 @@ enum FD_STATE { enum BP_MODE { BP_READING, - BP_WRITING + BP_WRITING, + BP_SWITCH_WRITE, + BP_SWITCH_READ }; union abstract_packet { @@ -43,7 +45,10 @@ struct internet_packet { }; struct buffer_packet { + uint64_t ref_count; uint8_t mode; + int fdread; + int fdwrite; uint16_t aread; uint16_t awrite; struct internet_packet ip;