Some improvement on RR algo
This commit is contained in:
parent
e15e15325c
commit
ca8eddc0ad
1 changed files with 19 additions and 7 deletions
|
@ -13,6 +13,7 @@ struct waited_pkt {
|
||||||
struct deferred_pkt {
|
struct deferred_pkt {
|
||||||
int link_fd;
|
int link_fd;
|
||||||
int idx;
|
int idx;
|
||||||
|
uint16_t id;
|
||||||
uint8_t on;
|
uint8_t on;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -175,6 +176,7 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
|
||||||
int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE;
|
int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE;
|
||||||
if (rr->wait[idx_real].on && rr->wait[idx_real].id != bp->ip.ap.str.id) {
|
if (rr->wait[idx_real].on && rr->wait[idx_real].id != bp->ip.ap.str.id) {
|
||||||
fprintf(stderr, "Waiting array is full, BUG\n");
|
fprintf(stderr, "Waiting array is full, BUG\n");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
} else if (!rr->wait[idx_real].on) {
|
} else if (!rr->wait[idx_real].on) {
|
||||||
rr->wait[idx_real].on = 1;
|
rr->wait[idx_real].on = 1;
|
||||||
rr->wait[idx_real].id = bp->ip.ap.str.id;
|
rr->wait[idx_real].id = bp->ip.ap.str.id;
|
||||||
|
@ -183,10 +185,18 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
|
||||||
}
|
}
|
||||||
|
|
||||||
// 5. We queue the packet
|
// 5. We queue the packet
|
||||||
|
if (rr->real[idx_real].on && rr->real[idx_real].id != bp->ip.ap.str.id) {
|
||||||
|
fprintf(stderr, "Real array is full, BUG\n");
|
||||||
|
exit(EXIT_FAILURE);
|
||||||
|
} else if (!rr->real[idx_real].on) {
|
||||||
rr->real[idx_real].on = 1;
|
rr->real[idx_real].on = 1;
|
||||||
rr->real[idx_real].idx = idx_real;
|
rr->real[idx_real].idx = idx_real;
|
||||||
rr->real[idx_real].link_fd = fdinfo->fd;
|
rr->real[idx_real].link_fd = fdinfo->fd;
|
||||||
mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx);
|
mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[idx_real].idx);
|
||||||
|
} else {
|
||||||
|
fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.str.id, rr->recv_id);
|
||||||
|
mv_buffer_wtof (app_ctx, fdinfo);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) {
|
void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct deferred_pkt* dp) {
|
||||||
|
@ -198,18 +208,20 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct
|
||||||
// 1. Marked the packet as handled
|
// 1. Marked the packet as handled
|
||||||
dp->on = 0;
|
dp->on = 0;
|
||||||
|
|
||||||
// 2. Get the buffer
|
// 2. Get the buffer and update rr state
|
||||||
struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx);
|
struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx);
|
||||||
|
int idx_real = bp->ip.ap.str.id % PACKET_BUFFER_SIZE;
|
||||||
|
rr->wait[idx_real].on = 0;
|
||||||
//printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id);
|
//printf("Selected url %s for pkt %d to be delivered\n", fdinfo->url, bp->ip.ap.str.id);
|
||||||
|
|
||||||
// 3. We update our cursor
|
// 3. We update our cursor
|
||||||
rr->recv_id = bp->ip.ap.str.id;
|
rr->recv_id = bp->ip.ap.str.id;
|
||||||
|
|
||||||
// 4. We free the buffer if it's a control packet and quit
|
// 4. We free the buffer if it's a control packet and quit
|
||||||
if (bp->ip.ap.str.flags & PKT_CONTROL) {
|
/*if (bp->ip.ap.str.flags & PKT_CONTROL) {
|
||||||
mv_buffer_atof (app_ctx, &dp->idx);
|
mv_buffer_atof (app_ctx, &dp->idx);
|
||||||
return;
|
return;
|
||||||
}
|
}*/
|
||||||
|
|
||||||
// 5. Find its target
|
// 5. Find its target
|
||||||
sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port);
|
sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port);
|
||||||
|
|
Loading…
Reference in a new issue