Merge branch 'feature/capture-traffic'

This commit is contained in:
Quentin 2019-05-28 17:18:33 +02:00
commit ea94b7d7f8
18 changed files with 398 additions and 136 deletions

View file

@ -33,12 +33,15 @@ list(APPEND CSOURCES
src/proxy.c src/proxy.c
src/timer.h src/timer.h
src/timer.c src/timer.c
src/capture_traffic.h
src/capture_traffic.c
) )
add_executable(donar ${CSOURCES} src/donar.c) add_executable(donar ${CSOURCES} src/donar.c)
add_executable(measlat ${CSOURCES} src/meas_lat.c) add_executable(measlat ${CSOURCES} src/meas_lat.c)
add_executable(udpecho ${CSOURCES} src/udp_echo.c) add_executable(udpecho ${CSOURCES} src/udp_echo.c)
add_executable(torecho ${CSOURCES} src/tor_echo.c) add_executable(torecho ${CSOURCES} src/tor_echo.c)
add_executable(capdiff ${CSOURCES} src/capdiff.c)
find_package(PkgConfig REQUIRED) find_package(PkgConfig REQUIRED)
pkg_search_module(GLIB REQUIRED glib-2.0) pkg_search_module(GLIB REQUIRED glib-2.0)
@ -55,6 +58,9 @@ target_link_libraries(udpecho ${GLIB_LDFLAGS})
target_include_directories(torecho PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(torecho PRIVATE ${GLIB_INCLUDE_DIRS})
target_link_libraries(torecho ${GLIB_LDFLAGS}) target_link_libraries(torecho ${GLIB_LDFLAGS})
install(TARGETS donar measlat udpecho torecho target_include_directories(capdiff PRIVATE ${GLIB_INCLUDE_DIRS})
target_link_libraries(capdiff ${GLIB_LDFLAGS})
install(TARGETS donar measlat udpecho torecho capdiff
RUNTIME DESTINATION bin RUNTIME DESTINATION bin
LIBRARY DESTINATION lib) LIBRARY DESTINATION lib)

View file

@ -23,7 +23,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
// Check that we didn't already received the packet // Check that we didn't already received the packet
struct dup2_ctx* dup2c = app_ctx->misc; struct dup2_ctx* dup2c = app_ctx->misc;
if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) { if (ring_ge(dup2c->recv_id, bp->ip.ap.fmt.content.clear.id)) {
mv_buffer_rtof(app_ctx, fdinfo); mv_buffer_rtof(&app_ctx->br, fdinfo);
return 0; return 0;
} }
@ -34,12 +34,12 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
to_fdinfo = evt_core_get_from_url (ctx, url); to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 1; return 1;
} }
// 2. Move buffer // 2. Move buffer
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
return 0; return 0;
} }
@ -63,12 +63,12 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
} }
// 2. We move the buffer and notify the target // 2. We move the buffer and notify the target
dup_buffer_tow (app_ctx, bp, to_fdinfo); dup_buffer_tow (&app_ctx->br, bp, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
} }
// 3. Release the buffer // 3. Release the buffer
mv_buffer_rtof (app_ctx, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
return 0; return 0;
} }

View file

@ -15,12 +15,12 @@ int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf
to_fdinfo = evt_core_get_from_url (ctx, url); to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 1; return 1;
} }
// 2. Move buffer // 2. Move buffer
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
return 0; return 0;
@ -35,13 +35,13 @@ int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi
to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL; to_fdinfo = cat->socklist->len > 0 ? g_array_index(cat->socklist, struct evt_core_fdinfo*, 0) : NULL;
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name); fprintf(stderr, "No fd for cat %s in udp-read. Dropping packet :( \n", cat->name);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 1; return 1;
} }
//printf("Pass packet from %s to %s\n", fdinfo->url, url); //printf("Pass packet from %s to %s\n", fdinfo->url, url);
// 2. We move the buffer and notify the target // 2. We move the buffer and notify the target
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
return 0; return 0;

View file

@ -82,10 +82,10 @@ void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
rr->real[real_idx].idx = real_idx; rr->real[real_idx].idx = real_idx;
rr->real[real_idx].link_fd = fdinfo->fd; rr->real[real_idx].link_fd = fdinfo->fd;
rr->real[real_idx].algo = app_ctx; rr->real[real_idx].algo = app_ctx;
mv_buffer_rtoa(app_ctx, fdinfo, &rr->real[real_idx].idx); mv_buffer_rtoa(&app_ctx->br, fdinfo, &rr->real[real_idx].idx);
} else { } else {
if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id); if (ctx->verbose) fprintf(stdout, "Packet %d already received (current: %d)\n", bp->ip.ap.fmt.content.clear.id, rr->content_id);
mv_buffer_rtof (app_ctx, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
} }
} }
@ -98,7 +98,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue
dp->on = 0; dp->on = 0;
// 2. Get the buffer // 2. Get the buffer
struct buffer_packet* bp = get_app_buffer (app_ctx, &dp->idx); struct buffer_packet* bp = get_app_buffer (&app_ctx->br, &dp->idx);
assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR); assert(bp->ip.ap.fmt.headers.cmd == CMD_CLEAR);
// 3. We update our cursor // 3. We update our cursor
@ -110,12 +110,12 @@ void rr_deliver(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct queue
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx); fprintf(stderr, "No fd for URL %s in udp:write for tcp-read. Dropping packet %d :( \n", url, dp->idx);
//mv_buffer_wtor (app_ctx, fdinfo, bp); //mv_buffer_wtor (app_ctx, fdinfo, bp);
mv_buffer_atof (app_ctx, &dp->idx); mv_buffer_atof (&app_ctx->br, &dp->idx);
} }
// 5. We move the buffer and notify the target // 5. We move the buffer and notify the target
//mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); //mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp);
mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo); mv_buffer_atow (&app_ctx->br, &dp->idx, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
} }
@ -164,7 +164,7 @@ void rr_pkt_manage_links(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinf
set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health); set_timeout (ctx, timeout, &rr->wait[idx], on_timeout_health);
release: release:
mv_buffer_rtof(app_ctx, fdinfo); mv_buffer_rtof(&app_ctx->br, fdinfo);
} }
uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { uint64_t rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) {
@ -202,7 +202,7 @@ int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo,
rr_pkt_manage_links(ctx, fdinfo, bp); rr_pkt_manage_links(ctx, fdinfo, bp);
} else { } else {
fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd); fprintf(stderr, " [algo/rr] Packet CMD unrecognized (%d)\n", bp->ip.ap.fmt.headers.cmd);
mv_buffer_rtof(app_ctx, fdinfo); mv_buffer_rtof(&app_ctx->br, fdinfo);
} }
return 0; return 0;
@ -288,7 +288,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
continue; continue;
} }
if ((len = write_queue_len (app_ctx, to_fdinfo)) > 0) { if ((len = write_queue_len (&app_ctx->br, to_fdinfo)) > 0) {
if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len);
blacklist_link (rr, sel_link); blacklist_link (rr, sel_link);
continue; continue;
@ -296,11 +296,11 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) {
rr->current_link = sel_link; rr->current_link = sel_link;
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
return 0; return 0;
} else { } else {
struct buffer_packet* dup_bp = dup_buffer_tow(app_ctx, bp, to_fdinfo); struct buffer_packet* dup_bp = dup_buffer_tow(&app_ctx->br, bp, to_fdinfo);
/* /*
* for later * for later
dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0; dup_bp->ip.ap.fmt.content.health.min_blocked_pkt = 0;
@ -313,7 +313,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
not_ready: not_ready:
// 3. We find no up target // 3. We find no up target
fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url); fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url);
mv_buffer_wtof (app_ctx, fdinfo); mv_buffer_wtof (&app_ctx->br, fdinfo);
return 0; return 0;
} }

View file

@ -1,19 +1,5 @@
#include "algo_utils.h" #include "algo_utils.h"
void free_nothing(void* app_ctx) {}
void free_naive(void* app_ctx) {
struct algo_ctx* ctx = (struct algo_ctx*) app_ctx;
ctx->ref_count--;
if (ctx->ref_count > 0) return;
if (ctx->free_misc) ctx->free_misc(ctx->misc);
g_queue_free(ctx->free_buffer);
g_queue_free(ctx->read_waiting);
g_hash_table_destroy (ctx->application_waiting);
g_hash_table_destroy (ctx->used_buffer);
g_hash_table_destroy (ctx->write_waiting);
free(ctx);
}
void iterate(int* fd, GQueue* q, int* waiting_count) { void iterate(int* fd, GQueue* q, int* waiting_count) {
fprintf(stderr, "Queue for fd=%d has length=%d\n", *fd, q->length); fprintf(stderr, "Queue for fd=%d has length=%d\n", *fd, q->length);
*waiting_count += q->length; *waiting_count += q->length;
@ -23,7 +9,12 @@ void iterate2(int* fd, struct buffer_packet *bp, gpointer user_data) {
fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd); fprintf(stderr, "fd=%d has a used_buffer entry\n", *fd);
} }
void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { void naive_free_simple(void* v) {
GQueue* g = v;
g_queue_free (g);
}
void debug_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd); fprintf(stderr, "No more free buffer for fd=%d.\n", fdinfo->fd);
int waiting_count = 0; int waiting_count = 0;
g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count); g_hash_table_foreach(app_ctx->write_waiting, (GHFunc)iterate, &waiting_count);
@ -36,10 +27,30 @@ void debug_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) {
waiting_count); waiting_count);
} }
void init_buffer_management(struct buffer_resources* ctx) {
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet));
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
}
void destroy_buffer_management(struct buffer_resources* ctx) {
g_queue_free(ctx->free_buffer);
g_queue_free(ctx->read_waiting);
g_hash_table_destroy (ctx->application_waiting);
g_hash_table_destroy (ctx->used_buffer);
g_hash_table_destroy (ctx->write_waiting);
}
/** /**
* Returns a buffer if available, NULL otherwise * Returns a buffer if available, NULL otherwise
*/ */
struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. Check if we don't have a buffer // 1. Check if we don't have a buffer
@ -62,12 +73,12 @@ struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_
return bp; return bp;
} }
void __push_to_free(struct algo_ctx *app_ctx, struct buffer_packet* bp) { void __push_to_free(struct buffer_resources *app_ctx, struct buffer_packet* bp) {
memset(bp, 0, sizeof(struct buffer_packet)); memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail (app_ctx->free_buffer, bp); g_queue_push_tail (app_ctx->free_buffer, bp);
} }
guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
GQueue* q; GQueue* q;
if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue
@ -77,7 +88,7 @@ guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo)
/** /**
* Returns a buffer if available, NULL otherwise * Returns a buffer if available, NULL otherwise
*/ */
struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
GQueue* q; GQueue* q;
@ -98,7 +109,7 @@ struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core
return bp; return bp;
} }
void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) { void mv_buffer_rtow(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
struct buffer_packet* bp; struct buffer_packet* bp;
@ -121,30 +132,32 @@ void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, stru
g_queue_push_tail(q, bp); g_queue_push_tail(q, bp);
} }
void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from) { void mv_buffer_rtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. We get the packet buffer // 1. We get the packet buffer
bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd); bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd);
if (bp == NULL) { if (bp == NULL) {
fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in rtof\n", from->fd, from->url); fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in rtof\n", from->fd, from->url);
return;
} }
g_hash_table_remove(app_ctx->used_buffer, &(from->fd)); g_hash_table_remove(app_ctx->used_buffer, &(from->fd));
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* fdinfo) { void mv_buffer_wtof(struct buffer_resources *app_ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd)); struct buffer_packet* bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd));
if (bp == NULL) { if (bp == NULL) {
fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in wtof\n", fdinfo->fd, fdinfo->url); fprintf(stderr, "Unable to find a buffer for fd=%d url=%s in wtof\n", fdinfo->fd, fdinfo->url);
return;
} }
g_hash_table_remove(app_ctx->used_buffer, &(fdinfo->fd)); g_hash_table_remove(app_ctx->used_buffer, &(fdinfo->fd));
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to) { void mv_buffer_rtoa(struct buffer_resources *app_ctx, struct evt_core_fdinfo* from, void* to) {
struct buffer_packet* bp; struct buffer_packet* bp;
bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd); bp = g_hash_table_lookup (app_ctx->used_buffer, &from->fd);
if (bp == NULL) { if (bp == NULL) {
@ -159,7 +172,7 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void
g_hash_table_insert(app_ctx->application_waiting, to, bp); g_hash_table_insert(app_ctx->application_waiting, to, bp);
} }
void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to) { void mv_buffer_atow(struct buffer_resources *app_ctx, void* from, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
struct buffer_packet* bp; struct buffer_packet* bp;
@ -182,7 +195,7 @@ void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo
g_queue_push_tail(q, bp); g_queue_push_tail(q, bp);
} }
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) { void mv_buffer_atof(struct buffer_resources *app_ctx, void* from) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. Remove the buffer // 1. Remove the buffer
@ -197,7 +210,7 @@ void mv_buffer_atof(struct algo_ctx* app_ctx, void* from) {
__push_to_free (app_ctx, bp); __push_to_free (app_ctx, bp);
} }
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) { struct buffer_packet* dup_buffer_tow(struct buffer_resources *app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to) {
GQueue* q; GQueue* q;
// 1. We get a free buffer // 1. We get a free buffer
@ -222,11 +235,11 @@ struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_pac
return bp_dest; return bp_dest;
} }
struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx) { struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx) {
return g_hash_table_lookup (app_ctx->application_waiting, idx); return g_hash_table_lookup (app_ctx->application_waiting, idx);
} }
void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) { void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx) {
struct evt_core_fdinfo* next_fdinfo = NULL; struct evt_core_fdinfo* next_fdinfo = NULL;
while (next_fdinfo == NULL) { while (next_fdinfo == NULL) {
int* fd = g_queue_pop_head(app_ctx->read_waiting); int* fd = g_queue_pop_head(app_ctx->read_waiting);
@ -252,9 +265,3 @@ int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* s
memcpy(target, src, src->fmt.headers.size); memcpy(target, src, src->fmt.headers.size);
return 0; return 0;
} }
void naive_free_simple(void* v) {
GQueue* g = v;
g_queue_free (g);
}

View file

@ -6,62 +6,31 @@
#include "evt_core.h" #include "evt_core.h"
#define PACKET_BUFFER_SIZE 1024 #define PACKET_BUFFER_SIZE 1024
struct algo_params { struct buffer_resources {
uint8_t is_waiting_bootstrap;
uint8_t is_healing;
char* algo_name;
int links, fresh_data, redundant_data;
};
struct algo_ctx;
typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
typedef void (*algo_ctx_free_misc)(void*);
struct algo_desc {
char* name;
algo_init init;
algo_ctx_on_buffer on_stream;
algo_ctx_on_buffer on_datagram;
algo_ctx_on_event on_err;
};
struct algo_ctx {
struct algo_desc* desc;
uint8_t link_count;
uint8_t is_rdy;
struct algo_params ap;
int ref_count;
struct buffer_packet bps[PACKET_BUFFER_SIZE]; struct buffer_packet bps[PACKET_BUFFER_SIZE];
GQueue* free_buffer; // Available buffers GQueue* free_buffer; // Available buffers
GHashTable* used_buffer; // Buffers used for reading or writing GHashTable* used_buffer; // Buffers used for reading or writing
GQueue* read_waiting; // Who wait to be notified for a read GQueue* read_waiting; // Who wait to be notified for a read
GHashTable* application_waiting; // Structure that can be used by the algo for its internal logic GHashTable* application_waiting; // Structure that can be used by the algo for its internal logic
GHashTable* write_waiting; // Structure to track packets waiting to be written GHashTable* write_waiting; // Structure to track packets waiting to be written
void* misc; // Additional structures
algo_ctx_free_misc free_misc; // Fx ptr to free misc
}; };
void mv_buffer_rtow(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to); void init_buffer_management(struct buffer_resources* br);
void mv_buffer_rtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void destroy_buffer_management(struct buffer_resources* br);
void mv_buffer_wtof(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from); void mv_buffer_rtow(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, struct evt_core_fdinfo* to);
void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void* to); void mv_buffer_rtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from);
void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_wtof(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from);
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); void mv_buffer_rtoa(struct buffer_resources* app_ctx, struct evt_core_fdinfo* from, void* to);
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); void mv_buffer_atow(struct buffer_resources* app_ctx, void* from, struct evt_core_fdinfo* to);
guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); void mv_buffer_atof(struct buffer_resources* app_ctx, void* from);
struct buffer_packet* dup_buffer_tow(struct buffer_resources* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
guint write_queue_len(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);
int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src);
struct buffer_packet* get_write_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_write_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);
struct buffer_packet* get_read_buffer(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); struct buffer_packet* get_read_buffer(struct buffer_resources *app_ctx, struct evt_core_fdinfo *fdinfo);
struct buffer_packet* get_app_buffer(struct algo_ctx *app_ctx, void* idx); struct buffer_packet* get_app_buffer(struct buffer_resources *app_ctx, void* idx);
void notify_read(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx); void notify_read(struct evt_core_ctx* ctx, struct buffer_resources* app_ctx);
void free_naive(void* app_ctx);
void free_nothing(void* app_ctx);
void naive_free_simple(void* v);

87
src/capdiff.c Normal file
View file

@ -0,0 +1,87 @@
#include <stdlib.h>
#include <stdio.h>
#include "packet.h"
int main(int argc, char** argv) {
if (argc != 3) {
fprintf(stderr, "Usage %s ./cap.0 ./cap.1\n", argv[0]);
exit(EXIT_FAILURE);
}
uint8_t verbose = 0;
FILE *fd[2]; size_t sz[2];
for (int i = 0; i < 2; i++) {
if ((fd[i] = fopen(argv[i+1], "r")) == NULL) {
perror("Unable to open file");
exit(EXIT_FAILURE);
}
fseek(fd[i], 0, SEEK_END);
sz[i] = ftell(fd[i]);
fseek(fd[i], 0, SEEK_SET);
}
if (sz[0] != sz[1]) {
printf("[!!] %s has %ld entries, %s has %ld entries\n",
argv[1],
sz[0]/sizeof(struct buffer_packet),
argv[2],
sz[1]/sizeof(struct buffer_packet));
} else if (verbose) {
printf("[OK] %s and %s have %ld entries\n",
argv[1],
argv[2],
sz[0]/sizeof(struct buffer_packet));
}
size_t to_read = sz[0] < sz[1] ? sz[0] : sz[1];
size_t to_read_obj = to_read / sizeof(struct buffer_packet);
uint16_t expected_id = 0;
struct buffer_packet bpread[2];
for (size_t c = 0; c < to_read_obj; c++) {
for (int i = 0; i < 2; i++) fread(&bpread[i], sizeof(struct buffer_packet), 1, fd[i]);
/*if (expected_id != bpread[0].ip.ap.fmt.content.clear.id || expected_id != bpread[1].ip.ap.fmt.content.clear.id) {
printf("[!!] expected id is %d, %s has id %d, %s has id %d\n",
expected_id,
argv[1],
bpread[0].ip.ap.fmt.content.clear.id,
argv[2],
bpread[1].ip.ap.fmt.content.clear.id);
}*/
uint8_t is_same_size = bpread[0].ip.ap.fmt.headers.size == bpread[1].ip.ap.fmt.headers.size;
if (!is_same_size) {
printf("[!!] %s packet has size %d, %s packet has size %d for expected id %d\n",
argv[1],
bpread[0].ip.ap.fmt.headers.size,
argv[2],
bpread[1].ip.ap.fmt.headers.size,
expected_id);
} else if (verbose) {
printf("[OK] %s and %s packets for expected id %d have size %d\n",
argv[1],
argv[2],
expected_id,
bpread[0].ip.ap.fmt.headers.size);
}
size_t s1 = bpread[0].ip.ap.fmt.headers.size, s2 = bpread[1].ip.ap.fmt.headers.size;
size_t max_size = s1 > s2 ? s1 : s2;
for (size_t idx = 0; idx < max_size; idx++) {
char e1 = (&bpread[0].ip.ap.raw)[idx], e2 = (&bpread[0].ip.ap.raw)[idx];
if (e1 != e2) {
printf("[!!] for expected id %d, byte 0x%04x is different: 0x%02x vs 0x%02x\n", expected_id, (uint16_t)idx, (uint8_t)e1, (uint8_t)e2);
}
}
expected_id++;
}
for (int i = 0; i < 2; i++) {
fclose(fd[i]);
}
printf("done\n");
return 0;
}

90
src/capture_traffic.c Normal file
View file

@ -0,0 +1,90 @@
#include "capture_traffic.h"
void dynbuf_init(struct dynbuf* db) {
db->content = malloc(MEGABYTE);
if (db->content == NULL) {
perror("malloc dynbuf failed");
exit(EXIT_FAILURE);
}
db->written = 0;
db->alloced = MEGABYTE;
}
void dynbuf_check_alloc(struct dynbuf* db, size_t len) {
if (db->written + len > db->alloced) {
size_t new_alloced = db->written + len > 2 * db->alloced ? db->written + len : 2 * db->alloced;
db->content = realloc(db->content, new_alloced);
if (db->content == NULL) {
perror("realloc dynbuf failed");
exit(EXIT_FAILURE);
}
db->alloced = new_alloced;
}
}
void dynbuf_append(struct dynbuf* db, char* ptr, size_t len) {
dynbuf_check_alloc(db, len);
memcpy(db->content + db->written, ptr, len);
db->written += len;
}
void dynbuf_destroy(struct dynbuf* db) {
free(db->content);
}
void traffic_capture_init(struct capture_ctx* ctx, char* filename) {
ctx->activated = filename == NULL ? 0 : 1;
if (!ctx->activated) return;
ctx->filename = strdup(filename);
dynbuf_init (&ctx->in);
dynbuf_init (&ctx->out);
}
void traffic_capture_notify(struct capture_ctx* ctx, struct buffer_packet *bp, char* dest) {
if (!ctx->activated) return;
if (clock_gettime(CLOCK_MONOTONIC, &bp->seen) == -1){
perror("clock_gettime error");
exit(EXIT_FAILURE);
}
dynbuf_append (
strcmp(dest, "in") == 0 ? &ctx->in : &ctx->out,
(char*)bp,
sizeof(struct buffer_packet));
}
void traffic_capture_stop(struct capture_ctx* ctx) {
if (!ctx->activated) return;
FILE* fd = NULL;
struct dynbuf *dbs[] = {&ctx->in, &ctx->out};
for (int i = 0; i < 2; i++) {
size_t written = 0, ack = 0;
char *out_file = NULL;
asprintf(&out_file, "%s.%d", ctx->filename, i);
if (out_file == NULL || (fd = fopen(out_file, "w")) == NULL) {
perror("failed to open file");
exit(EXIT_FAILURE);
}
free(out_file);
while (written < dbs[i]->written) {
ack = fwrite(dbs[i]->content+written, sizeof(char), dbs[i]->written, fd);
if (ack <= 0) {
perror("unable to write capture file");
exit(EXIT_FAILURE);
}
written += ack;
}
fclose(fd);
dynbuf_destroy (dbs[i]);
}
free(ctx->filename);
}

39
src/capture_traffic.h Normal file
View file

@ -0,0 +1,39 @@
#pragma once
#define _GNU_SOURCE
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <glib-2.0/glib.h>
#include <glib-2.0/gmodule.h>
#include <glib-2.0/glib-object.h>
#include <errno.h>
#include "packet.h"
#include "evt_core.h"
#define KILOBYTE 1024l
#define MEGABYTE 1024l * KILOBYTE
#define GIGABYTE 1024l * MEGABYTE
struct captured_packet {
struct timeval* captured_time;
char* pkt;
};
struct dynbuf {
char* content;
size_t written;
size_t alloced;
};
struct capture_ctx {
uint8_t activated;
char* filename;
struct timeval* start_time;
struct dynbuf in;
struct dynbuf out;
};
void traffic_capture_init(struct capture_ctx* ctx, char* filename);
void traffic_capture_stop(struct capture_ctx* ctx);
void traffic_capture_notify(struct capture_ctx* ctx, struct buffer_packet *bp, char* dest);

View file

@ -14,7 +14,7 @@ int main(int argc, char** argv) {
struct donar_params dp = {0}; struct donar_params dp = {0};
donar_init_params (&dp); donar_init_params (&dp);
while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:")) != -1) { while ((dp.opt = getopt(argc, argv, "vcse:r:o:a:bhl:d:f:")) != -1) {
switch(dp.opt) { switch(dp.opt) {
case 'v': case 'v':
dp.verbose++; dp.verbose++;
@ -53,6 +53,9 @@ int main(int argc, char** argv) {
case 'd': case 'd':
sscanf(optarg, "%d,%d", &dp.fresh_data, &dp.redundant_data); sscanf(optarg, "%d,%d", &dp.fresh_data, &dp.redundant_data);
break; break;
case 'f':
dp.capture_file = strdup(optarg);
break;
default: default:
goto in_error; goto in_error;
} }
@ -74,14 +77,16 @@ int main(int argc, char** argv) {
in_error: in_error:
dp.errored = 1; dp.errored = 1;
fprintf(stderr, "Usage as client : %s -c -a <algo> -o <onion service file> [-h] [-b] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n", argv[0]); fprintf(stderr, "Usage as client : %s -c -a <algo> -o <onion service file> [-h] [-b] [-f <dump packets>] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n", argv[0]);
fprintf(stderr, "Usage as server : %s -s -a <algo> [-h] [-b] [-l <links>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n\n", argv[0]); fprintf(stderr, "Usage as server : %s -s -a <algo> [-h] [-b] [-l <links>] [-f <dump_packets>] [-d <fresh>,<red>] [-e <exposed udp port>]* [-r <remote udp port>]*\n\n", argv[0]);
fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n", fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, onion_file=%s, links=%d, duplication=%d,%d\n",
dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data); dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data);
terminate: terminate:
if (dp.onion_file != NULL) free(dp.onion_file); if (dp.onion_file != NULL) free(dp.onion_file);
if (dp.algo != NULL) free(dp.algo); if (dp.algo != NULL) free(dp.algo);
if (dp.capture_file != NULL) free(dp.capture_file);
g_ptr_array_free(dp.exposed_ports, TRUE); g_ptr_array_free(dp.exposed_ports, TRUE);
g_ptr_array_free(dp.remote_ports, TRUE); g_ptr_array_free(dp.remote_ports, TRUE);

View file

@ -76,7 +76,8 @@ void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) {
.algo_name = dp->algo, .algo_name = dp->algo,
.links = dp->links, .links = dp->links,
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file
}; };
evt_core_init (&(ctx->evts), dp->verbose); evt_core_init (&(ctx->evts), dp->verbose);

View file

@ -146,6 +146,7 @@ void free_port (void* ptr) {
void donar_init_params(struct donar_params* dp) { void donar_init_params(struct donar_params* dp) {
dp->onion_file = NULL; dp->onion_file = NULL;
dp->algo = NULL; dp->algo = NULL;
dp->capture_file = NULL;
dp->is_server = 0; dp->is_server = 0;
dp->is_client = 0; dp->is_client = 0;
dp->is_healing = 0; dp->is_healing = 0;

View file

@ -10,7 +10,7 @@
struct donar_params { struct donar_params {
int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored, verbose, links, fresh_data, redundant_data; int opt, is_server, is_client, is_waiting_bootstrap, is_healing, errored, verbose, links, fresh_data, redundant_data;
char *port, *onion_file, *algo; char *port, *onion_file, *algo, *capture_file;
GPtrArray *remote_ports, *exposed_ports; GPtrArray *remote_ports, *exposed_ports;
}; };

View file

@ -58,7 +58,8 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
.algo_name = dp->algo, .algo_name = dp->algo,
.links = dp->links, .links = dp->links,
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file
}; };
evt_core_init (&(ctx->evts), dp->verbose); evt_core_init (&(ctx->evts), dp->verbose);

View file

@ -41,6 +41,7 @@ struct measure_conf* create_measure_conf(int max_mes, int plsize) {
perror("malloc failed"); perror("malloc failed");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
return mc; return mc;
} }
@ -114,6 +115,15 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
perror("clock_gettime error"); perror("clock_gettime error");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire.";
size_t msg_len = strlen(my_msg);
size_t cursor_msg = 0;
for (size_t i = sizeof(struct packet_header); i < mc->payload_size; i++) {
mc->payload[i] = my_msg[cursor_msg];
cursor_msg = (cursor_msg + 1) % msg_len;
}
struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read"); struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read");
if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read"); if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read");
if (tgtinfo == NULL) { if (tgtinfo == NULL) {

View file

@ -70,6 +70,7 @@ struct buffer_packet {
uint8_t ap_count; uint8_t ap_count;
uint16_t aread; uint16_t aread;
uint16_t awrite; uint16_t awrite;
struct timespec seen;
struct internet_packet ip; struct internet_packet ip;
}; };

View file

@ -44,7 +44,7 @@ int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int read_res = FDS_READY; int read_res = FDS_READY;
if (ctx->verbose > 1) fprintf(stderr, " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later\n"); if (ctx->verbose > 1) fprintf(stderr, " [proxy] Get current read buffer OR a new read buffer OR subscribe to be notified later\n");
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n"); if (ctx->verbose > 1) fprintf(stderr, " [proxy] Try to read a whole packet in the buffer\n");
while (bp->mode == BP_READING) { while (bp->mode == BP_READING) {
@ -69,7 +69,7 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
char url[255]; char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_read_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
// 2. Read packet from socket // 2. Read packet from socket
bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url); bp->ip.ap.fmt.content.clear.port = url_get_port_int (fdinfo->url);
@ -77,7 +77,10 @@ int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1; if (read_res == FDS_AGAIN) return 1;
// 3. Apply logic // 3. Notify helpers
traffic_capture_notify (&app_ctx->cap, bp, "in");
// 4. Apply logic
return app_ctx->desc->on_datagram(ctx, fdinfo, bp); return app_ctx->desc->on_datagram(ctx, fdinfo, bp);
co_error: co_error:
@ -98,7 +101,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
// 2. Write data from the buffer to the socket // 2. Write data from the buffer to the socket
while (bp->mode == BP_WRITING) { while (bp->mode == BP_WRITING) {
@ -109,8 +112,8 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
// 3. A whole packet has been written // 3. A whole packet has been written
// Release the buffer and notify // Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo); mv_buffer_wtof(&app_ctx->br, fdinfo);
notify_read(ctx, app_ctx); notify_read(ctx, &app_ctx->br);
return 0; return 0;
co_error: co_error:
@ -125,17 +128,20 @@ int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
int write_res = FDS_READY; int write_res = FDS_READY;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1; if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;
// 2. Write buffer // 2. Write buffer
write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other);
if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1; if (write_res == FDS_AGAIN) return 1;
// 3. A whole packet has been written // 3. Notify helpers
traffic_capture_notify (&app_ctx->cap, bp, "out");
// 4. A whole packet has been written
// Release the buffer and notify // Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo); mv_buffer_wtof(&app_ctx->br, fdinfo);
notify_read(ctx, app_ctx); notify_read(ctx, &app_ctx->br);
return 0; return 0;
co_error: co_error:
@ -148,41 +154,43 @@ int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
// 1. If has a "used" buffer, remove it // 1. If has a "used" buffer, remove it
mv_buffer_rtof (app_ctx, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);
// 2. If appears in the write waiting queue, remove it // 2. If appears in the write waiting queue, remove it
while (get_write_buffer (app_ctx, fdinfo) != NULL) { while (get_write_buffer (&app_ctx->br, fdinfo) != NULL) {
mv_buffer_wtof(app_ctx, fdinfo); mv_buffer_wtof(&app_ctx->br, fdinfo);
} }
// 3. If appears in the read waiting queue, remove it // 3. If appears in the read waiting queue, remove it
g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); g_queue_remove_all (app_ctx->br.read_waiting, &(fdinfo->fd));
return app_ctx->desc->on_err(ctx, fdinfo); return app_ctx->desc->on_err(ctx, fdinfo);
} }
void algo_main_destroy(void* app_ctx) {
struct algo_ctx* ctx = (struct algo_ctx*) app_ctx;
ctx->ref_count--;
if (ctx->ref_count > 0) return;
traffic_capture_stop(&ctx->cap);
destroy_buffer_management(&ctx->br);
if (ctx->free_misc) ctx->free_misc(ctx->misc);
free(ctx);
}
void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) { void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx)); struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx));
if (ctx == NULL) goto init_err; if (ctx == NULL) goto init_err;
memset(ctx, 0, sizeof(struct algo_ctx)); memset(ctx, 0, sizeof(struct algo_ctx));
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
ctx->link_count = 8; ctx->link_count = 8;
ctx->is_rdy = 0; ctx->is_rdy = 0;
ctx->ap = *ap; ctx->ap = *ap;
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
memset(&(ctx->bps[i]), 0, sizeof(struct buffer_packet));
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
struct evt_core_cat tcp_listen = { struct evt_core_cat tcp_listen = {
.name = "tcp-listen", .name = "tcp-listen",
.flags = EPOLLIN, .flags = EPOLLIN,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_tcp_co, .cb = main_on_tcp_co,
.err_cb = NULL .err_cb = NULL
}; };
@ -193,7 +201,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "tcp-read", .name = "tcp-read",
.flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .flags = EPOLLIN | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_tcp_read, .cb = main_on_tcp_read,
.err_cb = main_on_err .err_cb = main_on_err
}; };
@ -204,7 +212,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "udp-read", .name = "udp-read",
.flags = EPOLLIN | EPOLLET | EPOLLRDHUP, .flags = EPOLLIN | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_udp_read, .cb = main_on_udp_read,
.err_cb = main_on_err .err_cb = main_on_err
}; };
@ -215,7 +223,7 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "tcp-write", .name = "tcp-write",
.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP, .flags = EPOLLOUT | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_tcp_write, .cb = main_on_tcp_write,
.err_cb = main_on_err .err_cb = main_on_err
}; };
@ -226,13 +234,16 @@ void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
.name = "udp-write", .name = "udp-write",
.flags = EPOLLOUT | EPOLLET, .flags = EPOLLOUT | EPOLLET,
.app_ctx = ctx, .app_ctx = ctx,
.free_app_ctx = free_naive, .free_app_ctx = algo_main_destroy,
.cb = main_on_udp_write, .cb = main_on_udp_write,
.err_cb = main_on_err .err_cb = main_on_err
}; };
ctx->ref_count++; ctx->ref_count++;
evt_core_add_cat(evt, &udp_write); evt_core_add_cat(evt, &udp_write);
init_buffer_management(&ctx->br);
traffic_capture_init(&ctx->cap, ap->capture_file);
for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) { for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) {
if (strcmp(available_algo[i].name, ap->algo_name) == 0) { if (strcmp(available_algo[i].name, ap->algo_name) == 0) {
ctx->desc = &(available_algo[i]); ctx->desc = &(available_algo[i]);

View file

@ -6,10 +6,45 @@
#include <string.h> #include <string.h>
#include "evt_core.h" #include "evt_core.h"
#include "algo_utils.h" #include "algo_utils.h"
#include "capture_traffic.h"
#include "url.h" #include "url.h"
#include "utils.h" #include "utils.h"
#include "packet.h" #include "packet.h"
struct algo_params {
uint8_t is_waiting_bootstrap;
uint8_t is_healing;
char *algo_name, *capture_file;
int links, fresh_data, redundant_data;
};
struct algo_ctx;
typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
typedef void (*algo_ctx_free_misc)(void*);
struct algo_desc {
char* name;
algo_init init;
algo_ctx_on_buffer on_stream;
algo_ctx_on_buffer on_datagram;
algo_ctx_on_event on_err;
};
struct algo_ctx {
struct algo_desc* desc;
uint8_t link_count;
uint8_t is_rdy;
struct algo_params ap;
int ref_count;
struct capture_ctx cap;
struct buffer_resources br;
void* misc; // Additional structures
algo_ctx_free_misc free_misc; // Fx ptr to free misc
};
void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
@ -49,7 +84,6 @@ static struct algo_desc available_algo[] = {
} }
}; };
void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap); void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap);
int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);