This commit is contained in:
Quentin Dufour 2019-03-15 16:44:47 +01:00
parent cf9914e18a
commit b04283b59c
3 changed files with 46 additions and 7 deletions

View file

@ -68,7 +68,7 @@ void set_timeout(struct evt_core_ctx* evts, uint64_t micro_sec) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
fdinfo.cat->name = "timeout"; fdinfo.cat->name = "timeout";
fdinfo.other = NULL; fdinfo.other = NULL; // Should put the duration, the file descriptor, the packet id
fdinfo.free_other = NULL; fdinfo.free_other = NULL;
sprintf(fdinfo.url, "timer:%ld:1", micro_sec); sprintf(fdinfo.url, "timer:%ld:1", micro_sec);
evt_core_add_fd (evts, &fdinfo); evt_core_add_fd (evts, &fdinfo);
@ -94,8 +94,22 @@ void rr_handle_recv(struct evt_core_ctx* ctx, struct buffer_packet* bp, struct e
else if (app_ctx->recv_id < bp->ip.ap.str.id - 1) { else if (app_ctx->recv_id < bp->ip.ap.str.id - 1) {
int64_t timeout = app_ctx->mjit - (int64_t) bp->ip.ap.str.deltat; int64_t timeout = app_ctx->mjit - (int64_t) bp->ip.ap.str.deltat;
if (timeout <= 0) timeout = 0; if (timeout <= 0) timeout = 0;
//bp->ip.ap.str.id;
set_timeout(ctx, timeout); set_timeout(ctx, timeout);
app_ctx->waiting[bp->ip.ap.str.id % LINK_COUNT] = bp; // should store more than that
}
// 4. If we were waiting this packet
else {
if (bp->ip.ap.str.flags & PKT_TIMEOUT) broken_rlink(fdinfo);
else if (bp->ip.ap.str.flags & PKT_CONTROL) working_rlink(fdinfo);
else deliver(bp);
app_ctx->recv_id = bp->ip.ap.str.id;
int next = app_ctx->recv_id+1;
if (app_ctx->waiting[next % LINK_COUNT] != NULL) {
bp = app_ctx->waiting[next % LINK_COUNT];
app_ctx->waiting[next % LINK_COUNT] = NULL;
rr_handle_recv(ctx,bp,NULL);
}
} }
} }
@ -140,6 +154,19 @@ co_error:
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
struct deferred_pkt {
int link_fd;
struct buffer_packet* bp;
};
struct rr_ctx {
uint16_t mjit;
uint16_t recv_id;
uint16_t sent_id;
struct deferred_pkt real[LINK_COUNT];
struct deferred_pkt stub[LINK_COUNT];
};
void algo_rr(struct algo_skel* as) { void algo_rr(struct algo_skel* as) {
struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx));
if (ctx == NULL) goto init_err; if (ctx == NULL) goto init_err;
@ -148,6 +175,7 @@ void algo_rr(struct algo_skel* as) {
ctx->read_waiting = g_queue_new (); ctx->read_waiting = g_queue_new ();
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal); ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple); ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
ctx->misc = malloc(sizeof(struct rr_ctx));
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) { for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i])); g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
} }

View file

@ -3,14 +3,19 @@
#include <glib-2.0/glib.h> #include <glib-2.0/glib.h>
#include <glib-2.0/gmodule.h> #include <glib-2.0/gmodule.h>
#include <glib-2.0/glib-object.h> #include <glib-2.0/glib-object.h>
#define LINK_COUNT 10
typedef void (*algo_ctx_free_misc)(void*);
struct algo_ctx { struct algo_ctx {
int ref_count; int ref_count;
struct buffer_packet bps[10]; struct buffer_packet bps[LINK_COUNT];
GQueue* free_buffer; // Available buffers GQueue* free_buffer; // Available buffers
GHashTable* used_buffer; // Buffers used for reading or writing GHashTable* used_buffer; // Buffers used for reading or writing
GQueue* read_waiting; // Who wait to be notified for a read GQueue* read_waiting; // Who wait to be notified for a read
GHashTable* write_waiting; // Structure to track packets waiting to be written GHashTable* write_waiting; // Structure to track packets waiting to be written
void* misc; // Additional structures
algo_ctx_free_misc free_misc; // Fx ptr to free misc
}; };
void mv_buffer_rtow(struct algo_ctx* app_ctx, void mv_buffer_rtow(struct algo_ctx* app_ctx,

View file

@ -28,6 +28,11 @@ enum BP_MODE {
BP_WRITING BP_WRITING
}; };
enum PKT_FLAGS {
PKT_TIMEOUT = 1 << 0,
PKT_CONTROL = 1 << 1
};
union abstract_packet { union abstract_packet {
char raw; char raw;
struct { struct {
@ -37,6 +42,7 @@ union abstract_packet {
uint8_t bitfield; uint8_t bitfield;
uint8_t prevlink; uint8_t prevlink;
uint16_t deltat; uint16_t deltat;
uint8_t flags;
char payload; char payload;
} str; } str;
}; };