From b04283b59ca2272eabf6746607bcc9f1be4d100f Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 15 Mar 2019 16:44:47 +0100 Subject: [PATCH] Huge WIP --- src/algo_rr.c | 32 ++++++++++++++++++++++++++++++-- src/algo_utils.h | 15 ++++++++++----- src/packet.h | 6 ++++++ 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/src/algo_rr.c b/src/algo_rr.c index 165ac81..93b2ba6 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -68,7 +68,7 @@ void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec) { exit(EXIT_FAILURE); } fdinfo.cat->name = "timeout"; - fdinfo.other = NULL; + fdinfo.other = NULL; // Should put the duration, the file descriptor, the packet id fdinfo.free_other = NULL; sprintf(fdinfo.url, "timer:%ld:1", micro_sec); evt_core_add_fd (evts, &fdinfo); @@ -94,8 +94,22 @@ void rr_handle_recv(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct e else if (app_ctx->recv_id < bp->ip.ap.str.id - 1) { int64_t timeout = app_ctx->mjit - (int64_t) bp->ip.ap.str.deltat; if (timeout <= 0) timeout = 0; - //bp->ip.ap.str.id; set_timeout(ctx, timeout); + app_ctx->waiting[bp->ip.ap.str.id % LINK_COUNT] = bp; // should store more than that + } + + // 4. If we were waiting this packet + else { + if (bp->ip.ap.str.flags & PKT_TIMEOUT) broken_rlink(fdinfo); + else if (bp->ip.ap.str.flags & PKT_CONTROL) working_rlink(fdinfo); + else deliver(bp); + app_ctx->recv_id = bp->ip.ap.str.id; + int next = app_ctx->recv_id+1; + if (app_ctx->waiting[next % LINK_COUNT] != NULL) { + bp = app_ctx->waiting[next % LINK_COUNT]; + app_ctx->waiting[next % LINK_COUNT] = NULL; + rr_handle_recv(ctx,bp,NULL); + } } } @@ -140,6 +154,19 @@ co_error: exit(EXIT_FAILURE); } +struct deferred_pkt { + int link_fd; + struct buffer_packet* bp; +}; + +struct rr_ctx { + uint16_t mjit; + uint16_t recv_id; + uint16_t sent_id; + struct deferred_pkt real[LINK_COUNT]; + struct deferred_pkt stub[LINK_COUNT]; +}; + void algo_rr(struct algo_skel* as) { struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); if (ctx == NULL) goto init_err; @@ -148,6 +175,7 @@ void algo_rr(struct algo_skel* as) { ctx->read_waiting = g_queue_new (); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); + ctx->misc = malloc(sizeof(struct rr_ctx)); for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); } diff --git a/src/algo_utils.h b/src/algo_utils.h index 8c5c634..5165af1 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -3,14 +3,19 @@ #include #include #include +#define LINK_COUNT 10 + +typedef void (*algo_ctx_free_misc)(void*); struct algo_ctx { int ref_count; - struct buffer_packet bps[10]; - GQueue* free_buffer; // Available buffers - GHashTable* used_buffer; // Buffers used for reading or writing - GQueue* read_waiting; // Who wait to be notified for a read - GHashTable* write_waiting; // Structure to track packets waiting to be written + struct buffer_packet bps[LINK_COUNT]; + GQueue* free_buffer; // Available buffers + GHashTable* used_buffer; // Buffers used for reading or writing + GQueue* read_waiting; // Who wait to be notified for a read + GHashTable* write_waiting; // Structure to track packets waiting to be written + void* misc; // Additional structures + algo_ctx_free_misc free_misc; // Fx ptr to free misc }; void mv_buffer_rtow(struct algo_ctx* app_ctx, diff --git a/src/packet.h b/src/packet.h index 7a0c72a..29e13b8 100644 --- a/src/packet.h +++ b/src/packet.h @@ -28,6 +28,11 @@ enum BP_MODE { BP_WRITING }; +enum PKT_FLAGS { + PKT_TIMEOUT = 1 << 0, + PKT_CONTROL = 1 << 1 +}; + union abstract_packet { char raw; struct { @@ -37,6 +42,7 @@ union abstract_packet { uint8_t bitfield; uint8_t prevlink; uint16_t deltat; + uint8_t flags; char payload; } str; };