diff --git a/CMakeLists.txt b/CMakeLists.txt index daf4ff9..c6d3edb 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -27,6 +27,9 @@ list(APPEND CSOURCES src/url.c src/donar_init.h src/donar_init.c + src/algo_rr.c + src/algo_utils.h + src/algo_utils.c ) add_executable(donar ${CSOURCES} src/donar.c) diff --git a/src/algo_naive.c b/src/algo_naive.c index 40d220c..22fe3ca 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -1,30 +1,11 @@ +#include "algo_utils.h" #include "algo_skel.h" -struct naive_ctx { - int ref_count; - struct buffer_packet bps[10]; - GQueue* free_buffer; // Available buffers - GHashTable* used_buffer; // Buffers used for reading or writing - GQueue* read_waiting; // Who wait to be notified for a read - GHashTable* write_waiting; // Structure to track packets waiting to be written -}; - int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); -void free_nothing(void* app_ctx) {} -void free_naive(void* app_ctx) { - struct naive_ctx* ctx = (struct naive_ctx*) app_ctx; - ctx->ref_count--; - if (ctx->ref_count > 0) return; - g_queue_free(ctx->free_buffer); - g_queue_free(ctx->read_waiting); - g_hash_table_destroy (ctx->used_buffer); - free(ctx); -} - int on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int conn_sock1, conn_sock2; struct sockaddr_in addr; @@ -62,96 +43,10 @@ co_error: exit(EXIT_FAILURE); } -/** - * Returns a buffer if available, NULL otherwise - */ -struct buffer_packet* get_read_buffer(struct naive_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { - struct buffer_packet* bp; - - // 1. Check if we don't have a buffer - bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); - if (bp != NULL) return bp; - - // 2. Get a new buffer otherwise - bp = g_queue_pop_head(app_ctx->free_buffer); - if (bp == NULL) { - // 2.1 If no buffer is available, we subscribe to be notified later - g_queue_push_tail (app_ctx->read_waiting, &(fdinfo->fd)); - return NULL; - } - - // 3. Update state - g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); - - return bp; -} - -/** - * Returns a buffer if available, NULL otherwise - */ -struct buffer_packet* get_write_buffer(struct naive_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { - struct buffer_packet* bp; - GQueue* q; - - // 1. Check if we don't have a buffer - bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); - if (bp != NULL) return bp; - - // 2. Check our waiting queue otherwise - if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return NULL; - bp = g_queue_pop_head(q); - if (bp == NULL) return NULL; // No packet to process - - // 3. Update state - g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); - - return bp; -} - -void mv_buffer_rtow(struct naive_ctx* app_ctx, - struct evt_core_fdinfo* from, - struct evt_core_fdinfo* to, - struct buffer_packet* bp) { - - // 1. We get the target writing queue - GQueue* q; - q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); - if (q == NULL) { - q = g_queue_new (); - g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); - } - - // 2. We move the buffer to the target queue - g_hash_table_remove(app_ctx->used_buffer, &from->fd); - g_queue_push_tail(q, bp); -} - -void mv_buffer_wtor(struct naive_ctx* app_ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - bp->mode = BP_READING; - bp->aread = 0; - g_queue_push_tail (app_ctx->free_buffer, bp); - g_hash_table_remove(app_ctx->used_buffer, &(fdinfo->fd)); -} - -void notify_read(struct evt_core_ctx* ctx, struct naive_ctx* app_ctx) { - struct evt_core_fdinfo* next_fdinfo = NULL; - while (next_fdinfo == NULL) { - int fd = GPOINTER_TO_INT(g_queue_pop_head(app_ctx->read_waiting)); - if (fd == 0) break; - next_fdinfo = evt_core_get_from_fd (ctx, fd); - if (strcmp(next_fdinfo->cat->name, "tcp-read") == 0) on_tcp_read(ctx, next_fdinfo); - else if (strcmp(next_fdinfo->cat->name, "udp-read") == 0) on_udp_read(ctx, next_fdinfo); - else { - fprintf(stderr, "A fd from category %s can't be stored in read_waiting\n", next_fdinfo->cat->name); - exit(EXIT_FAILURE); - } - } -} - int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; struct evt_core_fdinfo *to_fdinfo = NULL; - struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; int read_res = FDS_READY; char url[255]; @@ -187,7 +82,7 @@ co_error: int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; - struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; int write_res = FDS_READY; // 1. Get current write buffer OR a buffer from the waiting queue OR leave @@ -214,7 +109,7 @@ co_error: int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; struct evt_core_fdinfo *to_fdinfo; - struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; int read_res = FDS_READY; char url[255]; @@ -250,7 +145,7 @@ co_error: int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct buffer_packet* bp; - struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; int write_res = FDS_READY; // 1. Get current write buffer OR a buffer from the waiting queue OR leave @@ -272,13 +167,8 @@ co_error: exit(EXIT_FAILURE); } - -void naive_free_simple(void* v) { - free(v); -} - int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct buffer_packet* bp; // 1. If has a "used" buffer, remove it @@ -304,9 +194,9 @@ int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { } void algo_naive(struct algo_skel* as) { - struct naive_ctx* ctx = malloc(sizeof(struct naive_ctx)); + struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; - memset(ctx, 0, sizeof(struct naive_ctx)); + memset(ctx, 0, sizeof(struct algo_ctx)); ctx->free_buffer = g_queue_new (); ctx->read_waiting = g_queue_new (); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); diff --git a/src/algo_rr.c b/src/algo_rr.c new file mode 100644 index 0000000..6404eca --- /dev/null +++ b/src/algo_rr.c @@ -0,0 +1,58 @@ +#include "algo_skel.h" +#include "algo_utils.h" + +void algo_rr(struct algo_skel* as) { + struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); + if (ctx == NULL) goto init_err; + memset(ctx, 0, sizeof(struct algo_ctx)); + ctx->free_buffer = g_queue_new (); + ctx->read_waiting = g_queue_new (); + ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); + ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { + g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); + } + +/* + as->on_tcp_co.name = "tcp-listen"; + as->on_tcp_co.flags = EPOLLIN; + as->on_tcp_co.free_app_ctx = free_nothing; + as->on_tcp_co.cb = on_tcp_co; + + as->on_tcp_read.name = "tcp-read"; + as->on_tcp_read.flags = EPOLLIN | EPOLLET | EPOLLRDHUP; + as->on_tcp_read.app_ctx = ctx; + as->on_tcp_read.free_app_ctx = free_naive; + as->on_tcp_read.cb = on_tcp_read; + as->on_tcp_read.err_cb = on_err; + ctx->ref_count++; + + as->on_udp_read.name = "udp-read"; + as->on_udp_read.flags = EPOLLIN | EPOLLET; + as->on_udp_read.app_ctx = ctx; + as->on_udp_read.free_app_ctx = free_naive; + as->on_udp_read.cb = on_udp_read; + as->on_udp_read.err_cb = on_err; + ctx->ref_count++; + + as->on_tcp_write.name = "tcp-write"; + as->on_tcp_write.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP; + as->on_tcp_write.app_ctx = ctx; + as->on_tcp_write.free_app_ctx = free_naive; + as->on_tcp_write.cb = on_tcp_write; + as->on_tcp_write.err_cb = on_err; + ctx->ref_count++; + + as->on_udp_write.name = "udp-write"; + as->on_udp_write.flags = EPOLLOUT | EPOLLET; + as->on_udp_write.app_ctx = ctx; + as->on_udp_write.free_app_ctx = free_naive; + as->on_udp_write.cb = on_udp_write; + as->on_udp_write.err_cb = on_err; + ctx->ref_count++; +*/ + return; +init_err: + fprintf(stderr, "Failed to init algo naive\n"); + exit(EXIT_FAILURE); +} diff --git a/src/algo_utils.c b/src/algo_utils.c new file mode 100644 index 0000000..5e6542e --- /dev/null +++ b/src/algo_utils.c @@ -0,0 +1,104 @@ +#include "algo_utils.h" + +void free_nothing(void* app_ctx) {} +void free_naive(void* app_ctx) { + struct algo_ctx* ctx = (struct algo_ctx*) app_ctx; + ctx->ref_count--; + if (ctx->ref_count > 0) return; + g_queue_free(ctx->free_buffer); + g_queue_free(ctx->read_waiting); + g_hash_table_destroy (ctx->used_buffer); + g_hash_table_destroy (ctx->write_waiting); + free(ctx); +} + +/** + * Returns a buffer if available, NULL otherwise + */ +struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { + struct buffer_packet* bp; + + // 1. Check if we don't have a buffer + bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); + if (bp != NULL) return bp; + + // 2. Get a new buffer otherwise + bp = g_queue_pop_head(app_ctx->free_buffer); + if (bp == NULL) { + // 2.1 If no buffer is available, we subscribe to be notified later + g_queue_push_tail (app_ctx->read_waiting, &(fdinfo->fd)); + return NULL; + } + + // 3. Update state + g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); + + return bp; +} + +/** + * Returns a buffer if available, NULL otherwise + */ +struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { + struct buffer_packet* bp; + GQueue* q; + + // 1. Check if we don't have a buffer + bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); + if (bp != NULL) return bp; + + // 2. Check our waiting queue otherwise + if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return NULL; + bp = g_queue_pop_head(q); + if (bp == NULL) return NULL; // No packet to process + + // 3. Update state + g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); + + return bp; +} + +void mv_buffer_rtow(struct algo_ctx* app_ctx, + struct evt_core_fdinfo* from, + struct evt_core_fdinfo* to, + struct buffer_packet* bp) { + + // 1. We get the target writing queue + GQueue* q; + q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); + if (q == NULL) { + q = g_queue_new (); + g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); + } + + // 2. We move the buffer to the target queue + g_hash_table_remove(app_ctx->used_buffer, &from->fd); + g_queue_push_tail(q, bp); +} + +void mv_buffer_wtor(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + bp->mode = BP_READING; + bp->aread = 0; + g_queue_push_tail (app_ctx->free_buffer, bp); + g_hash_table_remove(app_ctx->used_buffer, &(fdinfo->fd)); +} + +void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { + struct evt_core_fdinfo* next_fdinfo = NULL; + while (next_fdinfo == NULL) { + int fd = GPOINTER_TO_INT(g_queue_pop_head(app_ctx->read_waiting)); + if (fd == 0) break; + next_fdinfo = evt_core_get_from_fd (ctx, fd); + if (strcmp(next_fdinfo->cat->name, "tcp-read") == 0 || strcmp(next_fdinfo->cat->name, "udp-read") == 0) { + next_fdinfo->cat->cb(ctx, next_fdinfo); + } else { + fprintf(stderr, "A fd from category %s can't be stored in read_waiting\n", next_fdinfo->cat->name); + exit(EXIT_FAILURE); + } + } +} + +void naive_free_simple(void* v) { + free(v); +} + diff --git a/src/algo_utils.h b/src/algo_utils.h new file mode 100644 index 0000000..8c5c634 --- /dev/null +++ b/src/algo_utils.h @@ -0,0 +1,26 @@ +#pragma once +#include "algo_skel.h" +#include +#include +#include + +struct algo_ctx { + int ref_count; + struct buffer_packet bps[10]; + GQueue* free_buffer; // Available buffers + GHashTable* used_buffer; // Buffers used for reading or writing + GQueue* read_waiting; // Who wait to be notified for a read + GHashTable* write_waiting; // Structure to track packets waiting to be written +}; + +void mv_buffer_rtow(struct algo_ctx* app_ctx, + struct evt_core_fdinfo* from, + struct evt_core_fdinfo* to, + struct buffer_packet* bp); +void mv_buffer_wtor(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); +struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); +void free_naive(void* app_ctx); +void free_nothing(void* app_ctx); +void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx); +void naive_free_simple(void* v);