New round robin is compiling!

This commit is contained in:
Quentin 2019-05-13 18:37:49 +02:00
parent 9cc27e9613
commit dafd69e649
3 changed files with 40 additions and 20 deletions

View file

@ -23,11 +23,13 @@ struct queued_pkt {
struct rr_ctx {
uint8_t my_links;
uint8_t remote_links;
uint8_t current_link;
int64_t mjit;
uint16_t health_id;
uint16_t health_id_late;
uint16_t content_id;
uint16_t sent_id;
uint16_t sent_health_id;
uint16_t sent_content_id;
struct internet_packet prev_packet;
struct timespec emit_time;
struct queued_pkt real[PACKET_BUFFER_SIZE];
@ -208,28 +210,43 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
exit(EXIT_FAILURE);
}
// 2. Compute delta t
secs = curr.tv_sec - rr->emit_time.tv_sec;
nsecs = curr.tv_nsec - rr->emit_time.tv_nsec;
rr->emit_time = curr;
mili_sec = secs * 1000 + nsecs / 1000000;
if (mili_sec > rr->mjit) mili_sec = rr->mjit;
bp->ip.ap.str.id = rr->sent_id;
bp->ip.ap.str.flags = 0;
bp->ip.ap.str.deltat = mili_sec;
bp->ip.ap.str.bitfield = rr->remote_links;
bp->ip.ap.str.prevlink = rr->current_link;
// 3. Backup clear packet
struct buffer_packet clear_packet;
assert(bp->ip.ap.headers.cmd == CMD_CLEAR);
append_buffer (&clear_packet.ip.ap, 0, &bp->ip.ap);
// 4. Set health packet
bp->ip.ap.headers.cmd = CMD_HEALTH;
bp->ip.ap.headers.size = sizeof(bp->ip.ap.headers) + sizeof(bp->ip.ap.content.health);
bp->ip.ap.content.health.id = rr->sent_health_id;
bp->ip.ap.content.health.deltat = mili_sec;
bp->ip.ap.content.health.prevlink = rr->current_link;
bp->ip.ap.content.health.bitfield = rr->remote_links;
rr->sent_health_id++;
// 5. Append clear packet
clear_packet.ip.ap.content.clear.id = rr->sent_content_id;
rr->sent_content_id++;
append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap);
bp->ap_count++;
// 6. Append redundancy if needed
if (app_ctx->ap.redundant_data == 1) {
append_buffer(&bp->ip.ap, 1, &rr->prev_packet.ap); // We append previous packet
append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time
append_buffer(&bp->ip.ap, 2, &rr->prev_packet.ap); // We append previous packet
append_buffer(&rr->prev_packet.ap, 0, &clear_packet.ip.ap); // We store current packet for next time
bp->ap_count++;
}
//printf("Will send packet id=%d\n", bp->ip.ap.str.id);
rr->emit_time = curr;
rr->sent_id++;
// 2. Try to find someone to send it
// 7. Try to find someone to send it
int max = 16;
uint8_t sel_link = rr->current_link;
while(max-- >= 0) {
@ -238,13 +255,14 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) continue; // Missing link
if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Some links are down
if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) {
if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) {
rr->current_link = sel_link;
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo);
return 0;
} else {
dup_buffer_tow(app_ctx, bp, to_fdinfo);
struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo);
dup_bp->ap_count = 1;
main_on_tcp_write(ctx, to_fdinfo);
}
}
@ -305,7 +323,8 @@ void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct alg
rr->mjit = 200;
rr->my_links = 0xff;
rr->remote_links = 0xff;
rr->sent_id = 1;
rr->sent_health_id = 0;
rr->sent_content_id = 0;
rr->health_id = 0;
rr->health_id_late = 0;
rr->content_id = 0;

View file

@ -43,7 +43,7 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_
struct buffer_packet* bp;
// 1. Check if we don't have a buffer
bp = g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd);
bp = fdinfo == NULL ? NULL : g_hash_table_lookup (app_ctx->used_buffer, &fdinfo->fd);
if (bp != NULL) return bp;
// 2. Get a new buffer otherwise
@ -189,14 +189,14 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) {
g_queue_push_tail (app_ctx->free_buffer, bp);
}
void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) {
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) {
GQueue* q;
// 1. We get a free buffer
struct buffer_packet* bp_dest = g_queue_pop_head(app_ctx->free_buffer);
if (bp_dest == NULL) {
debug_buffer(app_ctx, to);
return;
return NULL;
}
// 2. We duplicate the data
@ -211,6 +211,7 @@ void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct e
// 4. We push the content to the appropriate destination
g_queue_push_tail(q, bp_dest);
return bp_dest;
}
struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) {
@ -238,7 +239,7 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) {
int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src) {
char* target = &(dest->raw);
while (pos-- > 0) {
target += dest->headers.size;
target += ((struct abstract_packet*) target)->headers.size;
}
memcpy(target, src, src->headers.size);
return 0;

View file

@ -51,7 +51,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from);
void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to);
void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to);
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from);
void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
int append_buffer(struct abstract_packet* dest, int pos, struct abstract_packet* src);