#include "algo_utils.h" void free_nothing(void* app_ctx) {} void free_naive(void* app_ctx) { struct algo_ctx* ctx = (struct algo_ctx*) app_ctx; ctx->ref_count--; if (ctx->ref_count > 0) return; g_queue_free(ctx->free_buffer); g_queue_free(ctx->read_waiting); g_hash_table_destroy (ctx->used_buffer); g_hash_table_destroy (ctx->write_waiting); free(ctx); } /** * Returns a buffer if available, NULL otherwise */ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* bp; // 1. Check if we don't have a buffer bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); if (bp != NULL) return bp; // 2. Get a new buffer otherwise bp = g_queue_pop_head(app_ctx->free_buffer); if (bp == NULL) { // 2.1 If no buffer is available, we subscribe to be notified later g_queue_push_tail (app_ctx->read_waiting, &(fdinfo->fd)); return NULL; } // 3. Update state g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); return bp; } /** * Returns a buffer if available, NULL otherwise */ struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* bp; GQueue* q; // 1. Check if we don't have a buffer bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd); if (bp != NULL) return bp; // 2. Check our waiting queue otherwise if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return NULL; bp = g_queue_pop_head(q); if (bp == NULL) return NULL; // No packet to process // 3. Update state g_hash_table_insert(app_ctx->used_buffer, &(fdinfo->fd), bp); return bp; } void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to, struct buffer_packet* bp) { // 1. We get the target writing queue GQueue* q; q = g_hash_table_lookup(app_ctx->write_waiting, &(to->fd)); if (q == NULL) { q = g_queue_new (); g_hash_table_insert(app_ctx->write_waiting, &(to->fd), q); } // 2. We move the buffer to the target queue g_hash_table_remove(app_ctx->used_buffer, &from->fd); g_queue_push_tail(q, bp); } void mv_buffer_wtor(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { bp->mode = BP_READING; bp->aread = 0; g_queue_push_tail (app_ctx->free_buffer, bp); g_hash_table_remove(app_ctx->used_buffer, &(fdinfo->fd)); } void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { struct evt_core_fdinfo* next_fdinfo = NULL; while (next_fdinfo == NULL) { int fd = GPOINTER_TO_INT(g_queue_pop_head(app_ctx->read_waiting)); if (fd == 0) break; next_fdinfo = evt_core_get_from_fd (ctx, fd); if (strcmp(next_fdinfo->cat->name, "tcp-read") == 0 || strcmp(next_fdinfo->cat->name, "udp-read") == 0) { next_fdinfo->cat->cb(ctx, next_fdinfo); } else { fprintf(stderr, "A fd from category %s can't be stored in read_waiting\n", next_fdinfo->cat->name); exit(EXIT_FAILURE); } } } void naive_free_simple(void* v) { free(v); }