WIP refactor to capture traffic

This commit is contained in:
Quentin 2019-05-27 17:32:00 +02:00
parent fc494f4c9e
commit b592754973
15 changed files with 178 additions and 133 deletions

View file

@ -33,6 +33,8 @@ list(APPEND CSOURCES
src/proxy.c src/proxy.c
src/timer.h src/timer.h
src/timer.c src/timer.c
src/capture_traffic.h
src/capture_traffic.c
) )
add_executable(donar ${CSOURCES} src/donar.c) add_executable(donar ${CSOURCES} src/donar.c)

View file

@ -23,7 +23,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
// Check that we didn't already received the packet // Check that we didn't already received the packet
struct dup2_ctx* dup2c = app_ctx->misc; struct dup2_ctx* dup2c = app_ctx->misc;
if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) {
mv_buffer_rtof(app_ctx, fdinfo); mv_buffer_rtof(&app_ctx->br, fdinfo);
return 0; return 0;
} }
@ -34,12 +34,12 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
to_fdinfo = evt_core_get_from_url (ctx, url); to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 1; return 1;
} }
// 2. Move buffer // 2. Move buffer
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
return 0; return 0;
} }
@ -63,12 +63,12 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
} }
// 2. We move the buffer and notify the target // 2. We move the buffer and notify the target
dup_buffer_tow (app_ctx, bp, to_fdinfo); dup_buffer_tow (&app_ctx->br, bp, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
} }
// 3. Release the buffer // 3. Release the buffer
mv_buffer_rtof (app_ctx, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
return 0; return 0;
} }

View file

@ -15,12 +15,12 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf
to_fdinfo = evt_core_get_from_url (ctx, url); to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 1; return 1;
} }
// 2. Move buffer // 2. Move buffer
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
return 0; return 0;
@ -35,13 +35,13 @@ int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi
to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL; to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL;
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name); fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 1; return 1;
} }
//printf("Pass packet from %s to %s\n", fdinfo->url, url); //printf("Pass packet from %s to %s\n", fdinfo->url, url);
// 2. We move the buffer and notify the target // 2. We move the buffer and notify the target
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
return 0; return 0;

View file

@ -82,10 +82,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
rr->real[real_idx].idx = real_idx; rr->real[real_idx].idx = real_idx;
rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].link_fd = fdinfo->fd;
rr->real[real_idx].algo = app_ctx; rr->real[real_idx].algo = app_ctx;
mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); mv_buffer_rtoa(&app_ctx->br, fdinfo, &rr->real[real_idx].idx);
} else { } else {
if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id);
mv_buffer_rtof (app_ctx, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
} }
} }
@ -98,7 +98,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue
dp->on = 0; dp->on = 0;
// 2. Get the buffer // 2. Get the buffer
struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); struct buffer_packet* bp = get_app_buffer (&app_ctx->br, &dp->idx);
assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR);
// 3. We update our cursor // 3. We update our cursor
@ -110,12 +110,12 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx); fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx);
//mv_buffer_wtor (app_ctx, fdinfo, bp); //mv_buffer_wtor (app_ctx, fdinfo, bp);
mv_buffer_atof (app_ctx, &dp->idx); mv_buffer_atof (&app_ctx->br, &dp->idx);
} }
// 5. We move the buffer and notify the target // 5. We move the buffer and notify the target
//mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp);
mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); mv_buffer_atow (&app_ctx->br, &dp->idx, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
} }
@ -164,7 +164,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf
set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health);
release: release:
mv_buffer_rtof(app_ctx, fdinfo); mv_buffer_rtof(&app_ctx->br, fdinfo);
} }
uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) {
@ -202,7 +202,7 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
rr_pkt_manage_links(ctx, fdinfo, bp); rr_pkt_manage_links(ctx, fdinfo, bp);
} else { } else {
fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd); fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd);
mv_buffer_rtof(app_ctx, fdinfo); mv_buffer_rtof(&app_ctx->br, fdinfo);
} }
return 0; return 0;
@ -288,7 +288,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
continue; continue;
} }
if ((len = write_queue_len (app_ctx, to_fdinfo)) > 0) { if ((len = write_queue_len (&app_ctx->br, to_fdinfo)) > 0) {
if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len);
blacklist_link (rr, sel_link); blacklist_link (rr, sel_link);
continue; continue;
@ -296,11 +296,11 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) {
rr->current_link = sel_link; rr->current_link = sel_link;
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
return 0; return 0;
} else { } else {
struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); struct buffer_packet* dup_bp = dup_buffer_tow(&app_ctx->br, bp, to_fdinfo);
/* /*
* for later * for later
dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0;
@ -313,7 +313,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
not_ready: not_ready:
// 3. We find no up target // 3. We find no up target
fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 0; return 0;
} }

View file

@ -1,19 +1,5 @@
#include "algo_utils.h" #include "algo_utils.h"
void free_nothing(void* app_ctx) {}
void free_naive(void* app_ctx) {
struct algo_ctx* ctx = (struct algo_ctx*) app_ctx;
ctx->ref_count--;
if (ctx->ref_count > 0) return;
if (ctx->free_misc) ctx->free_misc(ctx->misc);
g_queue_free(ctx->free_buffer);
g_queue_free(ctx->read_waiting);
g_hash_table_destroy (ctx->application_waiting);
g_hash_table_destroy (ctx->used_buffer);
g_hash_table_destroy (ctx->write_waiting);
free(ctx);
}
void iterate(int* fd, GQueue* q, int* waiting_count) { void iterate(int* fd, GQueue* q, int* waiting_count) {
fprintf(stderr, "Queue for fd=%d has length=%d\n", *fd, q->length); fprintf(stderr, "Queue for fd=%d has length=%d\n", *fd, q->length);
*waiting_count += q->length; *waiting_count += q->length;
@ -23,7 +9,12 @@ void iterate2(int* fd, struct buffer_packet *bp, gpointer user_data) {
fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd); fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd);
} }
void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { void naive_free_simple(void* v) {
GQueue* g = v;
g_queue_free (g);
}
void debug_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd);
int waiting_count = 0; int waiting_count = 0;
g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count);
@ -36,10 +27,30 @@ void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) {
waiting_count); waiting_count);
} }
void init_buffer_management(struct buffer_resources* ctx) {
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet));
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
}
void destroy_buffer_management(struct buffer_resources* ctx) {
g_queue_free(ctx->free_buffer);
g_queue_free(ctx->read_waiting);
g_hash_table_destroy (ctx->application_waiting);
g_hash_table_destroy (ctx->used_buffer);
g_hash_table_destroy (ctx->write_waiting);
}
/** /**
* Returns a buffer if available, NULL otherwise * Returns a buffer if available, NULL otherwise
*/ */
struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. Check if we don't have a buffer // 1. Check if we don't have a buffer
@ -62,12 +73,12 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_
return bp; return bp;
} }
void __push_to_free(struct algo_ctx *app_ctx, struct buffer_packet* bp) { void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) {
memset(bp, 0, sizeof(struct buffer_packet)); memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail (app_ctx->free_buffer, bp); g_queue_push_tail (app_ctx->free_buffer, bp);
} }
guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
GQueue* q; GQueue* q;
if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue
@ -77,7 +88,7 @@ guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo)
/** /**
* Returns a buffer if available, NULL otherwise * Returns a buffer if available, NULL otherwise
*/ */
struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
GQueue* q; GQueue* q;
@ -98,7 +109,7 @@ struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core
return bp; return bp;
} }
void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) { void mv_buffer_rtow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
struct buffer_packet* bp; struct buffer_packet* bp;
@ -121,7 +132,7 @@ void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, stru
g_queue_push_tail(q, bp); g_queue_push_tail(q, bp);
} }
void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from) { void mv_buffer_rtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. We get the packet buffer // 1. We get the packet buffer
@ -134,7 +145,7 @@ void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from) {
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo) { void mv_buffer_wtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); struct buffer_packet* bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd));
if (bp == NULL) { if (bp == NULL) {
fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in wtof\n", fdinfo->fd, fdinfo->url); fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in wtof\n", fdinfo->fd, fdinfo->url);
@ -144,7 +155,7 @@ void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo) {
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to) { void mv_buffer_rtoa(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, void* to) {
struct buffer_packet* bp; struct buffer_packet* bp;
bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd); bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd);
if (bp == NULL) { if (bp == NULL) {
@ -159,7 +170,7 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void
g_hash_table_insert(app_ctx->application_waiting, to, bp); g_hash_table_insert(app_ctx->application_waiting, to, bp);
} }
void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to) { void mv_buffer_atow(struct buffer_resources *app_ctx, void* from, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
struct buffer_packet* bp; struct buffer_packet* bp;
@ -182,7 +193,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo
g_queue_push_tail(q, bp); g_queue_push_tail(q, bp);
} }
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. Remove the buffer // 1. Remove the buffer
@ -197,7 +208,7 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) {
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
// 1. We get a free buffer // 1. We get a free buffer
@ -222,11 +233,11 @@ struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_pac
return bp_dest; return bp_dest;
} }
struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx) {
return g_hash_table_lookup (app_ctx->application_waiting, idx); return g_hash_table_lookup (app_ctx->application_waiting, idx);
} }
void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx) {
struct evt_core_fdinfo* next_fdinfo = NULL; struct evt_core_fdinfo* next_fdinfo = NULL;
while (next_fdinfo == NULL) { while (next_fdinfo == NULL) {
int* fd = g_queue_pop_head(app_ctx->read_waiting); int* fd = g_queue_pop_head(app_ctx->read_waiting);
@ -252,9 +263,3 @@ int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* s
memcpy(target, src, src->fmt.headers.size); memcpy(target, src, src->fmt.headers.size);
return 0; return 0;
} }
void naive_free_simple(void* v) {
GQueue* g = v;
g_queue_free (g);
}

View file

@ -6,62 +6,31 @@
#include "evt_core.h" #include "evt_core.h"
#define PACKET_BUFFER_SIZE 1024 #define PACKET_BUFFER_SIZE 1024
struct algo_params { struct buffer_resources {
uint8_t is_waiting_bootstrap;
uint8_t is_healing;
char* algo_name;
int links, fresh_data, redundant_data;
};
struct algo_ctx;
typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
typedef void (*algo_ctx_free_misc)(void*);
struct algo_desc {
char* name;
algo_init init;
algo_ctx_on_buffer on_stream;
algo_ctx_on_buffer on_datagram;
algo_ctx_on_event on_err;
};
struct algo_ctx {
struct algo_desc* desc;
uint8_t link_count;
uint8_t is_rdy;
struct algo_params ap;
int ref_count;
struct buffer_packet bps[PACKET_BUFFER_SIZE]; struct buffer_packet bps[PACKET_BUFFER_SIZE];
GQueue* free_buffer; // Available buffers GQueue* free_buffer; // Available buffers
GHashTable* used_buffer; // Buffers used for reading or writing GHashTable* used_buffer; // Buffers used for reading or writing
GQueue* read_waiting; // Who wait to be notified for a read GQueue* read_waiting; // Who wait to be notified for a read
GHashTable* application_waiting; // Structure that can be used by the algo for its internal logic GHashTable* application_waiting; // Structure that can be used by the algo for its internal logic
GHashTable* write_waiting; // Structure to track packets waiting to be written GHashTable* write_waiting; // Structure to track packets waiting to be written
void* misc; // Additional structures
algo_ctx_free_misc free_misc; // Fx ptr to free misc
}; };
void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to); void init_buffer_management(struct buffer_resources* br);
void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void destroy_buffer_management(struct buffer_resources* br);
void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void mv_buffer_rtow(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to);
void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_rtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from);
void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from);
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to);
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to);
guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); void mv_buffer_atof(struct buffer_resources* app_ctx, void* from);
struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);
int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src);
struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);
struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);
struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx); struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx);
void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx); void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx);
void free_naive(void* app_ctx);
void free_nothing(void* app_ctx);
void naive_free_simple(void* v);

8
src/capture_traffic.c Normal file
View file

@ -0,0 +1,8 @@
#include "capture_traffic.h"
void init_traffic_capture(struct capture_ctx* ctx) {
}
void stop_traffic_capture(struct capture_ctx* ctx) {
}

14
src/capture_traffic.h Normal file
View file

@ -0,0 +1,14 @@
#pragma once
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <sys/signalfd.h>
#include <errno.h>
#include "evt_core.h"
struct capture_ctx {
};
void init_traffic_capture(struct capture_ctx* ctx);
void stop_traffic_capture(struct capture_ctx* ctx);

View file

@ -14,7 +14,7 @@ int main(int argc, char** argv) {
struct donar_params dp = {0}; struct donar_params dp = {0};
donar_init_params (&dp); donar_init_params (&dp);
while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:")) != -1) { while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:f:")) != -1) {
switch(dp.opt) { switch(dp.opt) {
case 'v': case 'v':
dp.verbose++; dp.verbose++;
@ -53,6 +53,9 @@ int main(int argc, char** argv) {
case 'd': case 'd':
sscanf(optarg, "%d,%d", &dp.fresh_data, &dp.redundant_data); sscanf(optarg, "%d,%d", &dp.fresh_data, &dp.redundant_data);
break; break;
case 'f':
dp.capture_file = strdup(optarg);
break;
default: default:
goto in_error; goto in_error;
} }
@ -74,14 +77,16 @@ int main(int argc, char** argv) {
in_error: in_error:
dp.errored = 1; dp.errored = 1;
fprintf(stderr, "Usage as client : %s -c -a <algo> -o <onion service file> [-h] [-b] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n", argv[0]); fprintf(stderr, "Usage as client : %s -c -a <algo> -o <onion service file> [-h] [-b] [-f <dump packets>] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n", argv[0]);
fprintf(stderr, "Usage as server : %s -s -a <algo> [-h] [-b] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n\n", argv[0]); fprintf(stderr, "Usage as server : %s -s -a <algo> [-h] [-b] [-l <links>] [-f <dump_packets>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n\n", argv[0]);
fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n", fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n",
dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data); dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data);
terminate: terminate:
if (dp.onion_file != NULL) free(dp.onion_file); if (dp.onion_file != NULL) free(dp.onion_file);
if (dp.algo != NULL) free(dp.algo); if (dp.algo != NULL) free(dp.algo);
if (dp.capture_file != NULL) free(dp.capture_file);
g_ptr_array_free(dp.exposed_ports, TRUE); g_ptr_array_free(dp.exposed_ports, TRUE);
g_ptr_array_free(dp.remote_ports, TRUE); g_ptr_array_free(dp.remote_ports, TRUE);

View file

@ -76,7 +76,8 @@ void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) {
.algo_name = dp->algo, .algo_name = dp->algo,
.links = dp->links, .links = dp->links,
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file
}; };
evt_core_init (&(ctx->evts), dp->verbose); evt_core_init (&(ctx->evts), dp->verbose);

View file

@ -146,6 +146,7 @@ void free_port (void* ptr) {
void donar_init_params(struct donar_params* dp) { void donar_init_params(struct donar_params* dp) {
dp->onion_file = NULL; dp->onion_file = NULL;
dp->algo = NULL; dp->algo = NULL;
dp->capture_file = NULL;
dp->is_server = 0; dp->is_server = 0;
dp->is_client = 0; dp->is_client = 0;
dp->is_healing = 0; dp->is_healing = 0;

View file

@ -10,7 +10,7 @@
struct donar_params { struct donar_params {
int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored, verbose, links, fresh_data, redundant_data; int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored, verbose, links, fresh_data, redundant_data;
char *port, *onion_file, *algo; char *port, *onion_file, *algo, *capture_file;
GPtrArray *remote_ports, *exposed_ports; GPtrArray *remote_ports, *exposed_ports;
}; };

View file

@ -58,7 +58,8 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
.algo_name = dp->algo, .algo_name = dp->algo,
.links = dp->links, .links = dp->links,
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file
}; };
evt_core_init (&(ctx->evts), dp->verbose); evt_core_init (&(ctx->evts), dp->verbose);

View file

@ -44,7 +44,7 @@ int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int read_res = FDS_READY; int read_res = FDS_READY;
if (ctx->verbose > 1) fprintf(stderr, " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later\n"); if (ctx->verbose > 1) fprintf(stderr, " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later\n");
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n"); if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n");
while (bp->mode == BP_READING) { while (bp->mode == BP_READING) {
@ -69,7 +69,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
char url[255]; char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
// 2. Read packet from socket // 2. Read packet from socket
bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url);
@ -98,7 +98,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
// 2. Write data from the buffer to the socket // 2. Write data from the buffer to the socket
while (bp->mode == BP_WRITING) { while (bp->mode == BP_WRITING) {
@ -109,8 +109,8 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
// 3. A whole packet has been written // 3. A whole packet has been written
// Release the buffer and notify // Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo); mv_buffer_wtof(&app_ctx->br, fdinfo);
notify_read(ctx, app_ctx); notify_read(ctx, &app_ctx->br);
return 0; return 0;
co_error: co_error:
@ -125,7 +125,7 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
int write_res = FDS_READY; int write_res = FDS_READY;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
// 2. Write buffer // 2. Write buffer
write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other);
@ -134,8 +134,8 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
// 3. A whole packet has been written // 3. A whole packet has been written
// Release the buffer and notify // Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo); mv_buffer_wtof(&app_ctx->br, fdinfo);
notify_read(ctx, app_ctx); notify_read(ctx, &app_ctx->br);
return 0; return 0;
co_error: co_error:
@ -148,41 +148,43 @@ int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. If has a "used" buffer, remove it // 1. If has a "used" buffer, remove it
mv_buffer_rtof (app_ctx, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
// 2. If appears in the write waiting queue, remove it // 2. If appears in the write waiting queue, remove it
while (get_write_buffer (app_ctx, fdinfo) != NULL) { while (get_write_buffer (&app_ctx->br, fdinfo) != NULL) {
mv_buffer_wtof(app_ctx, fdinfo); mv_buffer_wtof(&app_ctx->br, fdinfo);
} }
// 3. If appears in the read waiting queue, remove it // 3. If appears in the read waiting queue, remove it
g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); g_queue_remove_all (app_ctx->br.read_waiting, &(fdinfo->fd));
return app_ctx->desc->on_err(ctx, fdinfo); return app_ctx->desc->on_err(ctx, fdinfo);
} }
void algo_main_destroy(void* app_ctx) {
struct algo_ctx* ctx = (struct algo_ctx*) app_ctx;
ctx->ref_count--;
if (ctx->ref_count > 0) return;
stop_traffic_capture(&ctx->cap);
destroy_buffer_management(&ctx->br);
if (ctx->free_misc) ctx->free_misc(ctx->misc);
free(ctx);
}
void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx));
if (ctx == NULL) goto init_err; if (ctx == NULL) goto init_err;
memset(ctx, 0, sizeof(struct algo_ctx)); memset(ctx, 0, sizeof(struct algo_ctx));
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
ctx->link_count = 8; ctx->link_count = 8;
ctx->is_rdy = 0; ctx->is_rdy = 0;
ctx->ap = *ap; ctx->ap = *ap;
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet));
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
struct evt_core_cat tcp_listen = { struct evt_core_cat tcp_listen = {
.name = "tcp-listen", .name = "tcp-listen",
.flags = EPOLLIN, .flags = EPOLLIN,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_tcp_co, .cb = main_on_tcp_co,
.err_cb = NULL .err_cb = NULL
}; };
@ -193,7 +195,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "tcp-read", .name = "tcp-read",
.flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .flags = EPOLLIN | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_tcp_read, .cb = main_on_tcp_read,
.err_cb = main_on_err .err_cb = main_on_err
}; };
@ -204,7 +206,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "udp-read", .name = "udp-read",
.flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .flags = EPOLLIN | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_udp_read, .cb = main_on_udp_read,
.err_cb = main_on_err .err_cb = main_on_err
}; };
@ -215,7 +217,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "tcp-write", .name = "tcp-write",
.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP, .flags = EPOLLOUT | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_tcp_write, .cb = main_on_tcp_write,
.err_cb = main_on_err .err_cb = main_on_err
}; };
@ -226,13 +228,16 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "udp-write", .name = "udp-write",
.flags = EPOLLOUT | EPOLLET, .flags = EPOLLOUT | EPOLLET,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_udp_write, .cb = main_on_udp_write,
.err_cb = main_on_err .err_cb = main_on_err
}; };
ctx->ref_count++; ctx->ref_count++;
evt_core_add_cat(evt, &udp_write); evt_core_add_cat(evt, &udp_write);
init_buffer_management(&ctx->br);
init_traffic_capture(&ctx->cap);
for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) {
if (strcmp(available_algo[i].name, ap->algo_name) == 0) { if (strcmp(available_algo[i].name, ap->algo_name) == 0) {
ctx->desc = &(available_algo[i]); ctx->desc = &(available_algo[i]);

View file

@ -6,10 +6,45 @@
#include <string.h> #include <string.h>
#include "evt_core.h" #include "evt_core.h"
#include "algo_utils.h" #include "algo_utils.h"
#include "capture_traffic.h"
#include "url.h" #include "url.h"
#include "utils.h" #include "utils.h"
#include "packet.h" #include "packet.h"
struct algo_params {
uint8_t is_waiting_bootstrap;
uint8_t is_healing;
char *algo_name, *capture_file;
int links, fresh_data, redundant_data;
};
struct algo_ctx;
typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
typedef void (*algo_ctx_free_misc)(void*);
struct algo_desc {
char* name;
algo_init init;
algo_ctx_on_buffer on_stream;
algo_ctx_on_buffer on_datagram;
algo_ctx_on_event on_err;
};
struct algo_ctx {
struct algo_desc* desc;
uint8_t link_count;
uint8_t is_rdy;
struct algo_params ap;
int ref_count;
struct capture_ctx cap;
struct buffer_resources br;
void* misc; // Additional structures
algo_ctx_free_misc free_misc; // Fx ptr to free misc
};
void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
@ -49,7 +84,6 @@ static struct algo_desc available_algo[] = {
} }
}; };
void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap); void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap);
int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);