Many buffers - stopping link broken
This commit is contained in:
parent
a16fb465f8
commit
47e47dceef
8 changed files with 52 additions and 16 deletions
|
@ -29,6 +29,7 @@ struct rr_ctx {
|
|||
uint16_t recv_id_late;
|
||||
uint16_t sent_id;
|
||||
uint8_t current_link;
|
||||
struct internet_packet prev_packet;
|
||||
struct timespec emit_time;
|
||||
struct deferred_pkt real[PACKET_BUFFER_SIZE];
|
||||
struct waited_pkt wait[PACKET_BUFFER_SIZE];
|
||||
|
@ -57,6 +58,8 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
|
|||
char buffer[16];
|
||||
url_get_port (buffer, fdinfo->url);
|
||||
int link_num = atoi(buffer) - 7500; // @FIXME Hardcoded
|
||||
uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE;
|
||||
uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE;
|
||||
|
||||
//printf("Selected url %s for pkt %d to be queued for delivery\n", fdinfo->url, bp->ip.ap.str.id);
|
||||
|
||||
|
@ -75,10 +78,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
|
|||
show_link_availability (rr);
|
||||
}
|
||||
|
||||
// 2. If packet arrived too late, we discard it
|
||||
if (ring_ge(rr->recv_id, bp->ip.ap.str.id)) {
|
||||
// 2. If packet arrived too late or already queued, we discard it
|
||||
if (ring_ge(rr->recv_id, bp->ip.ap.str.id) || rr->real[real_idx].id == bp->ip.ap.str.id) {
|
||||
// Packet has already been delivered or dropped, we free the buffer
|
||||
fprintf(stderr, "Packet %d arrived too late (current: %d)\n", bp->ip.ap.str.id, rr->recv_id);
|
||||
fprintf(stderr, "Packet %d arrived too late (current: %d) or already received\n", bp->ip.ap.str.id, rr->recv_id);
|
||||
mv_buffer_wtof (app_ctx, fdinfo);
|
||||
return;
|
||||
}
|
||||
|
@ -90,7 +93,6 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
|
|||
//printf("%ld - %ld = %ld\n", rr->mjit, (int64_t) bp->ip.ap.str.deltat, timeout);
|
||||
if (timeout <= 0) timeout = 0;
|
||||
|
||||
uint16_t wait_idx = (bp->ip.ap.str.id - 1) % PACKET_BUFFER_SIZE;
|
||||
if (rr->wait[wait_idx].on && rr->wait[wait_idx].id != bp->ip.ap.str.id - 1) {
|
||||
fprintf(stderr, "Waiting array overlap, BUG: [\n");
|
||||
for (int i = 0; i < PACKET_BUFFER_SIZE; i++) {
|
||||
|
@ -108,7 +110,6 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
|
|||
}
|
||||
|
||||
// 4. We queue the packet to keep it
|
||||
uint16_t real_idx = bp->ip.ap.str.id % PACKET_BUFFER_SIZE;
|
||||
if (rr->real[real_idx].on && rr->real[real_idx].id != bp->ip.ap.str.id) {
|
||||
fprintf(stderr, "Real array is full for id=%d, idx=%d, BUG: [\n", bp->ip.ap.str.id, real_idx);
|
||||
for (int i = 0; i < PACKET_BUFFER_SIZE; i++) {
|
||||
|
@ -230,13 +231,19 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
|
|||
bp->ip.ap.str.deltat = mili_sec;
|
||||
bp->ip.ap.str.bitfield = rr->remote_links;
|
||||
bp->ip.ap.str.prevlink = rr->current_link;
|
||||
|
||||
if (app_ctx->ap.redundant_data == 1) {
|
||||
append_buffer(&bp->ip.ap, 1, &rr->prev_packet.ap); // We append previous packet
|
||||
append_buffer(&rr->prev_packet.ap, 0, &bp->ip.ap); // We store current packet for next time
|
||||
bp->ap_count++;
|
||||
}
|
||||
//printf("Will send packet id=%d\n", bp->ip.ap.str.id);
|
||||
|
||||
rr->emit_time = curr;
|
||||
rr->sent_id++;
|
||||
|
||||
// 2. Try to find someone to send it
|
||||
int max = 10;
|
||||
int max = 16;
|
||||
uint8_t sel_link = rr->current_link;
|
||||
while(max-- >= 0) {
|
||||
sel_link = (sel_link + 1) % app_ctx->ap.links;
|
||||
|
|
|
@ -235,6 +235,15 @@ void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) {
|
|||
}
|
||||
}
|
||||
|
||||
int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src) {
|
||||
char* target = &(dest->raw);
|
||||
while (pos-- > 0) {
|
||||
target += dest->str.size;
|
||||
}
|
||||
memcpy(target, src, src->str.size);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void naive_free_simple(void* v) {
|
||||
GQueue* g = v;
|
||||
g_queue_free (g);
|
||||
|
|
|
@ -53,6 +53,8 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo
|
|||
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from);
|
||||
void dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
|
||||
|
||||
int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src);
|
||||
|
||||
struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo);
|
||||
struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo);
|
||||
struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx);
|
||||
|
|
10
src/donar.c
10
src/donar.c
|
@ -14,7 +14,7 @@ int main(int argc, char** argv) {
|
|||
struct donar_params dp = {0};
|
||||
donar_init_params (&dp);
|
||||
|
||||
while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:")) != -1) {
|
||||
while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:")) != -1) {
|
||||
switch(dp.opt) {
|
||||
case 'v':
|
||||
dp.verbose++;
|
||||
|
@ -74,10 +74,10 @@ int main(int argc, char** argv) {
|
|||
|
||||
in_error:
|
||||
dp.errored = 1;
|
||||
fprintf(stderr, "Usage as client : %s -c -a <algo> [-h] [-b] -o <onion service file> -e <exposed udp port> [-e ...]* -r <remote udp port> [-r ...]*\n", argv[0]);
|
||||
fprintf(stderr, "Usage as server : %s -s -a <algo> [-h] [-b] -e <exposed udp port> [-e ...]* -r <remote udp port> [-r ...]*\n\n", argv[0]);
|
||||
fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s\n",
|
||||
dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file);
|
||||
fprintf(stderr, "Usage as client : %s -c -a <algo> -o <onion service file> [-h] [-b] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n", argv[0]);
|
||||
fprintf(stderr, "Usage as server : %s -s -a <algo> [-h] [-b] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n\n", argv[0]);
|
||||
fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n",
|
||||
dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data);
|
||||
|
||||
terminate:
|
||||
if (dp.onion_file != NULL) free(dp.onion_file);
|
||||
|
|
|
@ -152,6 +152,8 @@ void donar_init_params(struct donar_params* dp) {
|
|||
dp->is_waiting_bootstrap = 0;
|
||||
dp->errored = 0;
|
||||
dp->links = 2;
|
||||
dp->fresh_data = 1;
|
||||
dp->redundant_data = 0;
|
||||
dp->remote_ports = g_ptr_array_new_with_free_func (free_port);
|
||||
dp->exposed_ports = g_ptr_array_new_with_free_func (free_port);
|
||||
}
|
||||
|
|
|
@ -55,7 +55,10 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
|
|||
struct algo_params ap = {
|
||||
.is_waiting_bootstrap = dp->is_waiting_bootstrap,
|
||||
.is_healing = dp->is_healing,
|
||||
.algo_name = dp->algo
|
||||
.algo_name = dp->algo,
|
||||
.links = dp->links,
|
||||
.fresh_data = dp->fresh_data,
|
||||
.redundant_data = dp->redundant_data
|
||||
};
|
||||
|
||||
evt_core_init (&(ctx->evts), dp->verbose);
|
||||
|
|
16
src/packet.c
16
src/packet.c
|
@ -1,5 +1,13 @@
|
|||
#include "packet.h"
|
||||
|
||||
size_t get_full_size(struct buffer_packet* bp) {
|
||||
union abstract_packet* ap = &bp->ip.ap;
|
||||
for (int i = 0; i < bp->ap_count; i++) {
|
||||
ap = (union abstract_packet*)(&ap->raw + ap->str.size);
|
||||
}
|
||||
return &ap->raw - &bp->ip.ap.raw;
|
||||
}
|
||||
|
||||
enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) {
|
||||
ssize_t nread;
|
||||
size_t pkt_size_size = sizeof(bp->ip.ap.str.size);
|
||||
|
@ -23,6 +31,7 @@ enum FD_STATE read_packet_from_tcp(int fd, struct buffer_packet* bp) {
|
|||
|
||||
bp->mode = BP_WRITING;
|
||||
bp->awrite = 0;
|
||||
bp->ap_count = 1;
|
||||
|
||||
return FDS_READY;
|
||||
}
|
||||
|
@ -31,8 +40,8 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) {
|
|||
ssize_t nwrite;
|
||||
|
||||
if (bp->mode != BP_WRITING) return FDS_ERR;
|
||||
while (bp->awrite < bp->ip.ap.str.size) {
|
||||
nwrite = send(fd, &(bp->ip.ap.raw), bp->ip.ap.str.size, 0);
|
||||
while (bp->awrite < get_full_size(bp)) {
|
||||
nwrite = send(fd, &(bp->ip.ap.raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0);
|
||||
if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN;
|
||||
if (nwrite == -1) return FDS_ERR;
|
||||
bp->awrite += nwrite;
|
||||
|
@ -40,6 +49,7 @@ enum FD_STATE write_packet_to_tcp(int fd, struct buffer_packet* bp) {
|
|||
|
||||
bp->mode = BP_READING;
|
||||
bp->aread = 0;
|
||||
bp->ap_count = 0;
|
||||
|
||||
return FDS_READY;
|
||||
}
|
||||
|
@ -70,6 +80,7 @@ enum FD_STATE write_packet_to_udp(int fd, struct buffer_packet* bp, struct udp_t
|
|||
|
||||
bp->mode = BP_READING;
|
||||
bp->aread = 0;
|
||||
bp->ap_count = 0;
|
||||
|
||||
return FDS_READY;
|
||||
}
|
||||
|
@ -99,6 +110,7 @@ enum FD_STATE read_packet_from_udp (int fd, struct buffer_packet* bp, struct udp
|
|||
|
||||
bp->mode = BP_WRITING;
|
||||
bp->awrite = 0;
|
||||
bp->ap_count = 1;
|
||||
|
||||
return FDS_READY;
|
||||
}
|
||||
|
|
|
@ -48,11 +48,12 @@ union abstract_packet {
|
|||
|
||||
struct internet_packet {
|
||||
union abstract_packet ap;
|
||||
char rest[1499]; // MTU = 1500, 1 byte in the union
|
||||
char rest[1499]; // MTU = 1500, 1 byte in the union as payload
|
||||
};
|
||||
|
||||
struct buffer_packet {
|
||||
uint8_t mode;
|
||||
enum BP_MODE mode;
|
||||
uint8_t ap_count;
|
||||
uint16_t aread;
|
||||
uint16_t awrite;
|
||||
struct internet_packet ip;
|
||||
|
|
Loading…
Reference in a new issue