From debe0feb5652a25ed68ae713ae87ec508691da0e Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Tue, 12 Feb 2019 11:17:37 +0100 Subject: [PATCH] Introduce a ring buffer --- CMakeLists.txt | 2 + src/algo_naive.c | 132 +++++++++++++++++++++++++++------------------ src/algo_skel.h | 7 ++- src/donar_server.c | 9 ++-- src/evt_core.c | 2 +- src/net_tools.c | 4 +- src/net_tools.h | 2 +- src/utils.c | 61 +++++++++++++++++++++ src/utils.h | 19 +++++++ 9 files changed, 177 insertions(+), 61 deletions(-) create mode 100644 src/utils.c create mode 100644 src/utils.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 77172a2..36f3e23 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -18,6 +18,8 @@ list(APPEND CSOURCES src/evt_core.c src/algo_skel.h src/algo_naive.c + src/utils.h + src/utils.c ) add_executable(donar-proxy ${CSOURCES} src/donar_proxy.c) diff --git a/src/algo_naive.c b/src/algo_naive.c index 801d581..6853244 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -1,6 +1,12 @@ #include "algo_skel.h" #define NAIVE_BUFFER 128 +struct naive_ctx { + struct ring_buffer rb; +}; + +void free_nothing(void* app_ctx) {} + void free_naive(void* app_ctx) { if (app_ctx != NULL) free(app_ctx); } @@ -13,12 +19,10 @@ void on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { in_len = sizeof(in_addr); - //while (1) { conn_sock = accept(fd, &in_addr, &in_len); - //if (conn_sock == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) break; if (conn_sock == -1) goto co_error; - evt_core_add_fd (ctx, "tcp-data", fd); - //} + evt_core_add_fd (ctx, "tcp-read", fd); + evt_core_add_fd (ctx, "tcp-write", fd); return; @@ -27,26 +31,31 @@ co_error: exit(EXIT_FAILURE); } -void on_tcp_data(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { +void tcp_to_udp(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { + // Get target file descriptor struct evt_core_cat* udp = g_hash_table_lookup (ctx->catlist, "udp-data"); if (udp == NULL || udp->socklist->len < 1) goto co_error; int udp_fd = g_array_index(udp->socklist, int, 0); - char buffer[NAIVE_BUFFER]; - int nread, nwrite; - nread = read(fd, buffer, sizeof(char) * NAIVE_BUFFER); - if (nread == 0) return; - //if (nread == -1 && errno == EAGAIN) return; - if (nread == -1) goto co_error; - nwrite = write(udp_fd, buffer, nread); - if (nwrite == -1 && errno == EAGAIN) { - printf("Lost data EAGAIN\n"); - return; - } - if (nwrite == -1) goto co_error; - if (nread != nwrite) { - printf("Lost data not everything has been written\n"); - return; + // Init data structures for the transfer + struct naive_ctx* app_ctx = cat->app_ctx; + struct ring_buffer* rb = &(app_ctx->rb); + char buffer[RING_BUFFER_SIZE]; + int nread, nwrite, rb_free_space; + + while (1) { + rb_free_space = ring_buffer_free_space (rb); // We can't afford to read more + nread = read(fd, buffer, rb_free_space); // Effective read + if (nread == 0) return; // End of file + if (nread == -1 && errno == EAGAIN) return; // No more data to read + if (nread == -1) goto co_error; // A bad error + ring_buffer_write(rb, buffer, nread); // Persist read data in our buffer + + nread = ring_buffer_read(rb, buffer, RING_BUFFER_SIZE); + nwrite = write(udp_fd, buffer, nread); + if (nwrite == -1 && errno == EAGAIN) return; + if (nwrite == -1) goto co_error; + ring_buffer_ack_read (rb, nwrite); } return; @@ -55,26 +64,31 @@ co_error: exit(EXIT_FAILURE); } -void on_udp_data(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { +void udp_to_tcp(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { + // Get target file descriptor struct evt_core_cat* tcp = g_hash_table_lookup (ctx->catlist, "tcp-data"); if (tcp == NULL || tcp->socklist->len < 1) goto co_error; int tcp_fd = g_array_index(tcp->socklist, int, 0); - char buffer[NAIVE_BUFFER]; - int nread, nwrite; - nread = read(fd, buffer, sizeof(char) * NAIVE_BUFFER); - if (nread == 0) return; - //if (nread == -1 && errno == EAGAIN) return; - if (nread == -1) goto co_error; - nwrite = write(tcp_fd, buffer, nread); - if (nwrite == -1 && errno == EAGAIN) { - printf("Lost data EAGAIN\n"); - return; - } - if (nwrite == -1) goto co_error; - if (nread != nwrite) { - printf("Lost data not everything has been written\n"); - return; + // Init data structures for the transfer + struct naive_ctx* app_ctx = cat->app_ctx; + struct ring_buffer* rb = &(app_ctx->rb); + char buffer[RING_BUFFER_SIZE]; + int nread, nwrite, rb_free_space; + + while (1) { + rb_free_space = ring_buffer_free_space (rb); // We can't afford to read more + nread = read(fd, buffer, rb_free_space); // Effective read + if (nread == 0) return; // End of file + if (nread == -1 && errno == EAGAIN) return; // No more data to read + if (nread == -1) goto co_error; // A bad error + ring_buffer_write(rb, buffer, nread); // Persist read data in our buffer + + nread = ring_buffer_read(rb, buffer, RING_BUFFER_SIZE); + nwrite = write(tcp_fd, buffer, nread); + if (nwrite == -1 && errno == EAGAIN) return; + if (nwrite == -1) goto co_error; + ring_buffer_ack_read (rb, nwrite); } return; @@ -85,29 +99,43 @@ co_error: } void algo_naive(struct algo_skel* as) { - as->on_tcp_data.name = "tcp-data"; - as->on_tcp_data.flags = 0; - as->on_tcp_data.app_ctx = NULL; - as->on_tcp_data.free_app_ctx = free_naive; - as->on_tcp_data.cb = on_tcp_data; - as->on_tcp_data.socklist = NULL; - as->on_tcp_co.name = "tcp-listen"; - as->on_tcp_co.flags = 0; + as->on_tcp_co.flags = EPOLLIN; as->on_tcp_co.app_ctx = NULL; as->on_tcp_co.free_app_ctx = free_naive; as->on_tcp_co.cb = on_tcp_co; as->on_tcp_co.socklist = NULL; - as->on_udp_data.name = "udp-data"; - as->on_udp_data.flags = 0; - as->on_udp_data.app_ctx = NULL; - as->on_udp_data.free_app_ctx = free_naive; - as->on_udp_data.cb = on_udp_data; - as->on_udp_data.socklist = NULL; + as->on_tcp_read.name = "tcp-read"; + as->on_tcp_read.flags = EPOLLIN | EPOLLET; + as->on_tcp_read.app_ctx = malloc(sizeof(struct naive_ctx)); + as->on_tcp_read.free_app_ctx = free_naive; + as->on_tcp_read.cb = tcp_to_udp; + as->on_tcp_read.socklist = NULL; - /*if (as->on_tcp_data.app_ctx == NULL || as->on_udp_data.app_ctx == NULL) { + as->on_udp_read.name = "udp-read"; + as->on_udp_read.flags = EPOLLIN | EPOLLET; + as->on_udp_read.app_ctx = malloc(sizeof(struct naive_ctx)); + as->on_udp_read.free_app_ctx = free_naive; + as->on_udp_read.cb = tcp_to_udp; + as->on_udp_read.socklist = NULL; + + as->on_tcp_write.name = "tcp-write"; + as->on_tcp_write.flags = EPOLLOUT | EPOLLET; + as->on_tcp_write.app_ctx = as->on_udp_read.app_ctx; + as->on_tcp_write.free_app_ctx = free_nothing; + as->on_tcp_write.cb = udp_to_tcp; + as->on_tcp_write.socklist = NULL; + + as->on_udp_write.name = "udp-write"; + as->on_udp_write.flags = EPOLLOUT | EPOLLET; + as->on_udp_write.app_ctx = as->on_tcp_read.app_ctx; + as->on_udp_write.free_app_ctx = free_nothing; + as->on_udp_write.cb = tcp_to_udp; + as->on_udp_write.socklist = NULL; + + if (as->on_tcp_read.app_ctx == NULL || as->on_udp_read.app_ctx == NULL) { fprintf(stderr, "Failed to malloc naive_ctx\n"); exit(EXIT_FAILURE); - }*/ + } } diff --git a/src/algo_skel.h b/src/algo_skel.h index 4245c12..cf7a67f 100644 --- a/src/algo_skel.h +++ b/src/algo_skel.h @@ -3,10 +3,13 @@ #include #include #include "evt_core.h" +#include "utils.h" struct algo_skel { - struct evt_core_cat on_udp_data; - struct evt_core_cat on_tcp_data; + struct evt_core_cat on_udp_read; + struct evt_core_cat on_tcp_read; + struct evt_core_cat on_udp_write; + struct evt_core_cat on_tcp_write; struct evt_core_cat on_tcp_co; }; diff --git a/src/donar_server.c b/src/donar_server.c index 8efc4b5..ee8cf82 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -43,12 +43,15 @@ socket_create_err: void donar_server(struct donar_server_ctx* ctx, struct algo_skel* algo, char* udp_host, char* udp_port) { evt_core_init (&(ctx->evts)); - evt_core_add_cat (&(ctx->evts), &(algo->on_udp_data)); - evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_data)); evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_co)); + evt_core_add_cat (&(ctx->evts), &(algo->on_udp_read)); + evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_read)); + evt_core_add_cat (&(ctx->evts), &(algo->on_udp_write)); + evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_write)); int sock = create_udp_client (udp_host, udp_port); - evt_core_add_fd (&(ctx->evts), "udp-data", sock); + evt_core_add_fd (&(ctx->evts), "udp-read", sock); + evt_core_add_fd (&(ctx->evts), "udp-write", sock); for (uint16_t i = 0; i < PORT_SIZE ; i++) { ctx->ports[i] = 7500 + i; diff --git a/src/evt_core.c b/src/evt_core.c index 2504a2c..eb221c0 100644 --- a/src/evt_core.c +++ b/src/evt_core.c @@ -81,7 +81,7 @@ void evt_core_add_fd(struct evt_core_ctx* ctx, char* name, int fd) { g_array_append_val (cat->socklist, fd); g_hash_table_insert(ctx->socklist, key, cat); - add_fd_to_epoll(ctx->epollfd, fd); + add_fd_to_epoll(ctx->epollfd, fd, cat->flags); } void evt_core_free(struct evt_core_ctx* ctx) { diff --git a/src/net_tools.c b/src/net_tools.c index f0426cf..e2c3371 100644 --- a/src/net_tools.c +++ b/src/net_tools.c @@ -128,12 +128,12 @@ void fill_buffer(size_t* written, char* dest, void *src, size_t n) { * You need to read everything before going back to epoll * Which means keeping state too */ -void add_fd_to_epoll(int epollfd, int fd) { +void add_fd_to_epoll(int epollfd, int fd, int flags) { make_socket_non_blocking (fd); struct epoll_event current_event; //current_event.events = EPOLLIN | EPOLLET; - current_event.events = EPOLLIN; + current_event.events = flags; current_event.data.fd = fd; if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, ¤t_event) == -1) { perror("Failed to add a file descriptor to epoll with epoll_ctl"); diff --git a/src/net_tools.h b/src/net_tools.h index 956833e..4bcd539 100644 --- a/src/net_tools.h +++ b/src/net_tools.h @@ -12,6 +12,6 @@ int create_tcp_client(char* host, char* service); int create_udp_client(char* host, char* service); int create_tcp_server(char* service); int make_socket_non_blocking(int fd); -void add_fd_to_epoll(int epollfd, int fd); +void add_fd_to_epoll(int epollfd, int fd, int flags); int read_entity(int fd, void* entity, int size); void fill_buffer(size_t* written, char* dest, void *src, size_t n); diff --git a/src/utils.c b/src/utils.c new file mode 100644 index 0000000..d0b0aa6 --- /dev/null +++ b/src/utils.c @@ -0,0 +1,61 @@ +#include "utils.h" + +int ring_buffer_read(struct ring_buffer* rb, char* dest, int size) { + int slice1 = size; + int slice2 = 0; + + int used_space = ring_buffer_used_space (rb); + if (used_space < slice1) + slice1 = used_space; + + if (RING_BUFFER_SIZE - rb->head < slice1) { + slice1 = RING_BUFFER_SIZE - rb->head; + slice2 = size - slice1; + if (used_space - slice1 < slice2) + slice2 = used_space - slice1; + } + + memcpy(dest, rb->buffer + rb->head, slice1); + memcpy(dest+slice1, rb->buffer, slice2); + + return slice1 + slice2; +} + +void ring_buffer_ack_read(struct ring_buffer* rb, int size) { + if (size > ring_buffer_used_space (rb)) { + fprintf(stderr, "You try to ack more data than contained in the ring buffer\n"); + exit(EXIT_FAILURE); + } + rb->head = (rb->head + size) % RING_BUFFER_SIZE; +} + +int ring_buffer_write(struct ring_buffer* rb, char* source, int size) { + if (size > ring_buffer_free_space (rb)) { + fprintf(stderr, "You try to write more data than available space in the buffer\n"); + exit(EXIT_FAILURE); + } + + int slice1 = size; + int slice2 = 0; + + if (RING_BUFFER_SIZE - rb->tail < slice1) { + slice1 = RING_BUFFER_SIZE - rb->tail; + slice2 = size - slice1; + } + + memcpy(rb->buffer + rb->tail, source, slice1); + memcpy(rb->buffer, source + slice1, slice2); + + rb->tail = (rb->tail + slice1 + slice2) % RING_BUFFER_SIZE; + + return slice1 + slice2; +} + +int ring_buffer_free_space(struct ring_buffer* rb) { + if (rb->head > rb->tail) return rb->head - rb->tail; + return RING_BUFFER_SIZE + (rb->tail - rb->head); +} + +int ring_buffer_used_space(struct ring_buffer* rb) { + return RING_BUFFER_SIZE - ring_buffer_free_space (rb); +} diff --git a/src/utils.h b/src/utils.h new file mode 100644 index 0000000..3679791 --- /dev/null +++ b/src/utils.h @@ -0,0 +1,19 @@ +#pragma once +#include +#include +#include +#include + +#define RING_BUFFER_SIZE 1024 + +struct ring_buffer { + char buffer[RING_BUFFER_SIZE]; + int head; + int tail; +}; + +int ring_buffer_read(struct ring_buffer* rb, char* dest, int size); +void ring_buffer_ack_read(struct ring_buffer* rb, int size); +int ring_buffer_write(struct ring_buffer* rb, char* source, int size); +int ring_buffer_free_space(struct ring_buffer* rb); +int ring_buffer_used_space(struct ring_buffer* rb);