Change packet order reception
This commit is contained in:
parent
e98fb6d8d4
commit
ff1b94e3ef
1 changed files with 25 additions and 23 deletions
|
@ -222,46 +222,45 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
|
||||||
mili_sec = secs * 1000 + nsecs / 1000000;
|
mili_sec = secs * 1000 + nsecs / 1000000;
|
||||||
if (mili_sec > rr->mjit) mili_sec = rr->mjit;
|
if (mili_sec > rr->mjit) mili_sec = rr->mjit;
|
||||||
|
|
||||||
// 3. Backup clear packet
|
// 3. Prepare fresh packet
|
||||||
struct buffer_packet clear_packet;
|
|
||||||
assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR);
|
assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR);
|
||||||
append_buffer (&clear_packet.ip.ap, 0, &bp->ip.ap);
|
bp->ip.ap.fmt.content.clear.id = rr->sent_content_id;
|
||||||
|
rr->sent_content_id++;
|
||||||
|
|
||||||
// 4. Set health packet
|
// 4. Create health packet
|
||||||
bp->ip.ap.fmt.headers.cmd = CMD_HEALTH;
|
struct buffer_packet hp;
|
||||||
bp->ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health);
|
hp.ip.ap.fmt.headers.cmd = CMD_HEALTH;
|
||||||
bp->ip.ap.fmt.content.health.id = rr->sent_health_id;
|
hp.ip.ap.fmt.headers.size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health);
|
||||||
bp->ip.ap.fmt.content.health.deltat = mili_sec;
|
hp.ip.ap.fmt.content.health.id = rr->sent_health_id;
|
||||||
bp->ip.ap.fmt.content.health.prevlink = rr->current_link;
|
hp.ip.ap.fmt.content.health.deltat = mili_sec;
|
||||||
bp->ip.ap.fmt.content.health.bitfield = rr->remote_links;
|
hp.ip.ap.fmt.content.health.prevlink = rr->current_link;
|
||||||
|
hp.ip.ap.fmt.content.health.bitfield = rr->remote_links;
|
||||||
rr->sent_health_id++;
|
rr->sent_health_id++;
|
||||||
|
|
||||||
// 5. Append clear packet
|
// 5. Append redundancy if needed
|
||||||
clear_packet.ip.ap.fmt.content.clear.id = rr->sent_content_id;
|
|
||||||
rr->sent_content_id++;
|
|
||||||
bp->ip.ap.fmt.content.health.min_blocked_pkt = clear_packet.ip.ap.fmt.content.clear.id;
|
|
||||||
append_buffer (&bp->ip.ap, 1, &clear_packet.ip.ap);
|
|
||||||
bp->ap_count++;
|
|
||||||
|
|
||||||
// 6. Append redundancy if needed
|
|
||||||
if (app_ctx->ap.redundant_data == 1) {
|
if (app_ctx->ap.redundant_data == 1) {
|
||||||
assert(ring_gt(bp->ip.ap.fmt.content.health.min_blocked_pkt, rr->prev_packet.ap.fmt.content.clear.id));
|
assert(ring_gt(bp->ip.ap.fmt.content.health.min_blocked_pkt, rr->prev_packet.ap.fmt.content.clear.id));
|
||||||
bp->ip.ap.fmt.content.health.min_blocked_pkt = rr->prev_packet.ap.fmt.content.clear.id;
|
bp->ip.ap.fmt.content.health.min_blocked_pkt = rr->prev_packet.ap.fmt.content.clear.id;
|
||||||
append_buffer(&bp->ip.ap, 2, &rr->prev_packet.ap); // We append previous packet
|
append_buffer(&bp->ip.ap, bp->ap_count, &rr->prev_packet.ap); // We append previous packet
|
||||||
append_buffer(&rr->prev_packet.ap, 0, &clear_packet.ip.ap); // We store current packet for next time
|
append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time
|
||||||
bp->ap_count++;
|
bp->ap_count++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// 6. Add health to buffer
|
||||||
|
append_buffer(&bp->ip.ap, bp->ap_count, &hp.ip.ap);
|
||||||
|
bp->ap_count++;
|
||||||
|
|
||||||
//printf("Will send packet id=%d\n", bp->ip.ap.str.id);
|
//printf("Will send packet id=%d\n", bp->ip.ap.str.id);
|
||||||
|
|
||||||
// 7. Try to find someone to send it
|
// 7. Try to find someone to send it
|
||||||
int max = 16;
|
int max = 16;
|
||||||
uint8_t sel_link = rr->current_link;
|
uint8_t sel_link = rr->current_link;
|
||||||
while(max-- >= 0) {
|
while(max-- >= 0) {
|
||||||
|
if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Still bootstrapping
|
||||||
sel_link = (sel_link + 1) % app_ctx->ap.links;
|
sel_link = (sel_link + 1) % app_ctx->ap.links;
|
||||||
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded
|
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded
|
||||||
to_fdinfo = evt_core_get_from_url (ctx, url);
|
to_fdinfo = evt_core_get_from_url (ctx, url);
|
||||||
if (to_fdinfo == NULL) continue; // Missing link
|
if (to_fdinfo == NULL) continue; // Missing link
|
||||||
if (app_ctx->ap.is_waiting_bootstrap && !app_ctx->is_rdy) goto not_ready; // Some links are down
|
|
||||||
if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) {
|
if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) {
|
||||||
rr->current_link = sel_link;
|
rr->current_link = sel_link;
|
||||||
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
|
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
|
||||||
|
@ -269,8 +268,11 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo);
|
struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo);
|
||||||
|
/*
|
||||||
|
* for later
|
||||||
dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0;
|
dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0;
|
||||||
dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor...
|
dup_bp->ap_count = 1; // We want to send only health packet to help link recover... Bwarf same traffic on Tor...
|
||||||
|
*/
|
||||||
main_on_tcp_write(ctx, to_fdinfo);
|
main_on_tcp_write(ctx, to_fdinfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -307,10 +309,10 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) {
|
||||||
// 3. Deliver blocked packets
|
// 3. Deliver blocked packets
|
||||||
// @FIXME CRAPPY CODE / CRAPPY LOGIC
|
// @FIXME CRAPPY CODE / CRAPPY LOGIC
|
||||||
//printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id);
|
//printf("t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id);
|
||||||
/*if (ring_gt(t->min_blocked_pkt, rr->content_id) && !rr->real[t->min_blocked_pkt % PACKET_BUFFER_SIZE].on) {
|
if (ring_gt(t->min_blocked_pkt, rr->content_id) && !rr->real[t->min_blocked_pkt % PACKET_BUFFER_SIZE].on) {
|
||||||
fprintf(stderr, "min_blocked_packet has not been received, t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id);
|
fprintf(stderr, "min_blocked_packet has not been received, t->min_blocked_pkt=%d, rr->content_id=%d\n", t->min_blocked_pkt, rr->content_id);
|
||||||
exit(EXIT_FAILURE);
|
exit(EXIT_FAILURE);
|
||||||
}*/
|
}
|
||||||
while (ring_gt(t->min_blocked_pkt, rr->content_id - 1)) {
|
while (ring_gt(t->min_blocked_pkt, rr->content_id - 1)) {
|
||||||
rr->content_id++;
|
rr->content_id++;
|
||||||
rr_pkt_unroll (ctx, app_ctx);
|
rr_pkt_unroll (ctx, app_ctx);
|
||||||
|
|
Loading…
Reference in a new issue