Refactor my code

This commit is contained in:
Quentin Dufour 2019-04-24 16:23:41 +02:00
parent de2a2a25ce
commit 7e25f35042
14 changed files with 433 additions and 546 deletions

View file

@ -16,8 +16,6 @@ list(APPEND CSOURCES
src/donar_server.c
src/evt_core.h
src/evt_core.c
src/algo_skel.h
src/algo_skel.c
src/algo_naive.c
src/utils.h
src/utils.c
@ -30,6 +28,8 @@ list(APPEND CSOURCES
src/algo_rr.c
src/algo_utils.h
src/algo_utils.c
src/proxy.h
src/proxy.c
)
add_executable(donar ${CSOURCES} src/donar.c)

0
scripts/run-seq Normal file → Executable file
View file

View file

@ -1,67 +1,16 @@
#include "proxy.h"
#include "algo_utils.h"
#include "algo_skel.h"
int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int conn_sock1, conn_sock2;
struct sockaddr_in addr;
socklen_t in_len;
char url[1024], port[6];
struct evt_core_cat local_cat = {0};
struct evt_core_fdinfo to_fdinfo = {0};
to_fdinfo.cat = &local_cat;
to_fdinfo.url = url;
in_len = sizeof(addr);
conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len);
if (conn_sock1 == -1 && errno == EAGAIN) return 1;
if (conn_sock1 == -1) goto co_error;
conn_sock2 = dup(conn_sock1);
if (conn_sock2 == -1) goto co_error;
//printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2);
url_get_port(port, fdinfo->url);
to_fdinfo.fd = conn_sock1;
to_fdinfo.cat->name = "tcp-read";
sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
to_fdinfo.fd = conn_sock2;
to_fdinfo.cat->name = "tcp-write";
sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
return 0;
co_error:
perror("Failed to handle new connection");
exit(EXIT_FAILURE);
void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) {
// We do nothing
}
int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
char url[256];
struct evt_core_fdinfo *to_fdinfo = NULL;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int read_res = FDS_READY;
char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Try to read a whole packet in the buffer
while (bp->mode == BP_READING) {
read_res = read_packet_from_tcp (fdinfo->fd, bp);
if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
}
// 3. A whole packet has been read, we will find someone to write it
// 1. Find destination
sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port);
to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) {
@ -69,66 +18,20 @@ int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
mv_buffer_wtof (app_ctx, fdinfo);
return 1;
}
//printf("Pass packet from %s to %s\n", fdinfo->url, url);
// 4. We move the buffer and notify the target
// 2. Move buffer
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
on_udp_write(ctx, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo);
return 0;
co_error:
perror("Failed to TCP read");
exit(EXIT_FAILURE);
}
int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
char url[256];
struct evt_core_fdinfo *to_fdinfo = NULL;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int write_res = FDS_READY;
if (!app_ctx->is_rdy && strcmp(fdinfo->url, "tcp:write:127.0.0.1:7500") == 0) {
app_ctx->is_rdy = 1;
printf("=== Requested circuit is up ===\n");
}
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write data from the buffer to the socket
while (bp->mode == BP_WRITING) {
write_res = write_packet_to_tcp(fdinfo->fd, bp);
if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
}
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo);
notify_read(ctx, app_ctx);
return 0;
co_error:
perror("Failed to TCP write");
exit(EXIT_FAILURE);
}
int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct evt_core_fdinfo *to_fdinfo;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int read_res = FDS_READY;
char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Read packet from socket
bp->ip.ap.str.port = url_get_port_int (fdinfo->url);
read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other);
if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
// 3. A whole packet has been read, we will find someone to write it
// 1. A whole packet has been read, we will find someone to write it
sprintf(url, "tcp:write:127.0.0.1:7500");
to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) {
@ -138,123 +41,19 @@ int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
}
//printf("Pass packet from %s to %s\n", fdinfo->url, url);
// 4. We move the buffer and notify the target
// 2. We move the buffer and notify the target
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
on_tcp_write(ctx, to_fdinfo);
return 0;
co_error:
perror("Failed to UDP read");
exit(EXIT_FAILURE);
}
int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int write_res = FDS_READY;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write buffer
write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other);
if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo);
notify_read(ctx, app_ctx);
return 0;
co_error:
perror("Failed to UDP write");
exit(EXIT_FAILURE);
}
int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct buffer_packet* bp;
// 1. If has a "used" buffer, remove it
bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd));
if (bp != NULL) {
fprintf(stderr, "begin removing entry in app_ctx->used_buffer for %s\n", fdinfo->url);
g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd));
fprintf(stderr, "end removing entry in app_ctx->used_buffer\n");
memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail(app_ctx->free_buffer, bp);
}
// 2. If appears in the write waiting queue, remove it
GQueue* writew = g_hash_table_lookup (app_ctx->write_waiting, &(fdinfo->fd));
while (writew != NULL && (bp = g_queue_pop_head (writew)) != NULL) {
memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail(app_ctx->free_buffer, bp);
}
if (writew) g_hash_table_remove (app_ctx->write_waiting, &(fdinfo->fd));
// 3. If appears in the read waiting queue, remove it
g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd));
main_on_tcp_write(ctx, to_fdinfo);
return 0;
}
void algo_naive(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) {
struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx));
if (ctx == NULL) goto init_err;
memset(ctx, 0, sizeof(struct algo_ctx));
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
ctx->ap = *ap;
ctx->is_rdy = 0;
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
as->on_tcp_co.name = "tcp-listen";
as->on_tcp_co.flags = EPOLLIN;
as->on_tcp_co.free_app_ctx = free_nothing;
as->on_tcp_co.cb = on_tcp_co;
as->on_tcp_read.name = "tcp-read";
as->on_tcp_read.flags = EPOLLIN | EPOLLET | EPOLLRDHUP;
as->on_tcp_read.app_ctx = ctx;
as->on_tcp_read.free_app_ctx = free_naive;
as->on_tcp_read.cb = on_tcp_read;
as->on_tcp_read.err_cb = on_err;
ctx->ref_count++;
as->on_udp_read.name = "udp-read";
as->on_udp_read.flags = EPOLLIN | EPOLLET;
as->on_udp_read.app_ctx = ctx;
as->on_udp_read.free_app_ctx = free_naive;
as->on_udp_read.cb = on_udp_read;
as->on_udp_read.err_cb = on_err;
ctx->ref_count++;
as->on_tcp_write.name = "tcp-write";
as->on_tcp_write.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP;
as->on_tcp_write.app_ctx = ctx;
as->on_tcp_write.free_app_ctx = free_naive;
as->on_tcp_write.cb = on_tcp_write;
as->on_tcp_write.err_cb = on_err;
ctx->ref_count++;
as->on_udp_write.name = "udp-write";
as->on_udp_write.flags = EPOLLOUT | EPOLLET;
as->on_udp_write.app_ctx = ctx;
as->on_udp_write.free_app_ctx = free_naive;
as->on_udp_write.cb = on_udp_write;
as->on_udp_write.err_cb = on_err;
ctx->ref_count++;
return;
init_err:
fprintf(stderr, "Failed to init algo naive\n");
exit(EXIT_FAILURE);
int algo_naive_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
// We do nothing
return 0;
}
int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) {
// We do nothing
return 1;
}

View file

@ -1,7 +1,8 @@
#include <sys/timerfd.h>
#include "algo_skel.h"
#include "algo_utils.h"
#include "utils.h"
#include "url.h"
#include "proxy.h"
struct waited_pkt {
uint16_t id;
@ -31,49 +32,6 @@ struct rr_ctx {
struct waited_pkt wait[PACKET_BUFFER_SIZE];
};
int rr_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int rr_on_udp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int rr_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int conn_sock1, conn_sock2;
struct sockaddr_in addr;
socklen_t in_len;
char url[1024], port[6];
struct evt_core_cat local_cat = {0};
struct evt_core_fdinfo to_fdinfo = {0};
to_fdinfo.cat = &local_cat;
to_fdinfo.url = url;
in_len = sizeof(addr);
conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len);
if (conn_sock1 == -1 && errno == EAGAIN) return 1;
if (conn_sock1 == -1) goto co_error;
conn_sock2 = dup(conn_sock1);
if (conn_sock2 == -1) goto co_error;
//printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2);
url_get_port(port, fdinfo->url);
to_fdinfo.fd = conn_sock1;
to_fdinfo.cat->name = "tcp-read";
sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
to_fdinfo.fd = conn_sock2;
to_fdinfo.cat->name = "tcp-write";
sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
return 0;
co_error:
perror("Failed to handle new connection");
exit(EXIT_FAILURE);
}
void show_link_availability(struct rr_ctx* rr) {
printf("Links availability: my_links[");
for (int i = 0; i < 8; i++) {
@ -88,6 +46,7 @@ void show_link_availability(struct rr_ctx* rr) {
printf("]\n");
}
// @TODO Might be extracted from RR
int set_timeout(struct evt_core_ctx* evts, uint64_t milli_sec, struct waited_pkt* wpkt) {
struct timespec now;
struct itimerspec timer_config;
@ -242,7 +201,7 @@ void rr_deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct
// 6. We move the buffer and notify the target
//mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp);
mv_buffer_atow (app_ctx, &dp->idx, to_fdinfo);
rr_on_udp_write(ctx, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo);
}
void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) {
@ -268,24 +227,14 @@ void rr_pkt_unroll(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx) {
//------
int rr_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct rr_ctx* rr = app_ctx->misc;
int read_res = FDS_READY;
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Try to read a whole packet in the buffer
while (bp->mode == BP_READING) {
read_res = read_packet_from_tcp (fdinfo->fd, bp);
if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
}
// 3. Logic on packet
// 1. Register packet in our queue
rr_pkt_register(ctx, fdinfo, bp);
// 2. Process queue
rr_pkt_unroll (ctx, app_ctx);
return 0;
@ -294,48 +243,13 @@ co_error:
exit(EXIT_FAILURE);
}
int rr_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int write_res = FDS_READY;
int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct rr_ctx* rr = app_ctx->misc;
struct evt_core_fdinfo *to_fdinfo = NULL;
char url[255];
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write buffer
write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other);
if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo);
notify_read(ctx, app_ctx);
return 0;
co_error:
perror("Failed to UDP write");
exit(EXIT_FAILURE);
}
int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct evt_core_fdinfo *to_fdinfo = NULL;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct rr_ctx* rr = app_ctx->misc;
int read_res = FDS_READY;
char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Read packet from socket
bp->ip.ap.str.port = url_get_port_int (fdinfo->url);
read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other);
if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
// 3. Prepare RR state and packet values
// 1. Prepare RR state and packet values
struct timespec curr;
int secs, nsecs;
uint64_t mili_sec;
@ -360,6 +274,7 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
rr->emit_time = curr;
rr->sent_id++;
// 2. Try to find someone to send it
int max = 10;
uint8_t sel_link = rr->current_link;
while(max-- >= 0) {
@ -371,16 +286,16 @@ int rr_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
if (!app_ctx->ap.is_healing || rr->my_links & (1 << sel_link)) {
rr->current_link = sel_link;
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
rr_on_tcp_write(ctx, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo);
return 0;
} else {
dup_buffer_tow(app_ctx, bp, to_fdinfo);
rr_on_tcp_write(ctx, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo);
}
}
not_ready:
// 4. A whole packet has been read, we will find someone to write it
// 3. We find no up target
fprintf(stderr, "Still bootstrapping or no link to forward data from %s in udp-read. Dropping packet :( \n", fdinfo->url);
mv_buffer_wtof (app_ctx, fdinfo);
return 0;
@ -390,50 +305,7 @@ co_error:
exit(EXIT_FAILURE);
}
int rr_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct rr_ctx* rr = app_ctx->misc;
int write_res = FDS_READY;
// 0. Show some information about circuits
uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0;
if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count);
else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count);
app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write data from the buffer to the socket
while (bp->mode == BP_WRITING) {
write_res = write_packet_to_tcp(fdinfo->fd, bp);
if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
}
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo);
notify_read(ctx, app_ctx);
return 0;
co_error:
perror("Failed to TCP write");
exit(EXIT_FAILURE);
}
int rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
uint64_t ctr;
ssize_t tmr_rd;
tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr));
if (tmr_rd == -1 && errno == EAGAIN) return 1;
if (tmr_rd < 0) {
perror("read on timer");
fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd);
exit(EXIT_FAILURE);
}
int algo_rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct rr_ctx* rr = app_ctx->misc;
@ -459,46 +331,18 @@ end:
return 1;
}
int rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct buffer_packet* bp;
// 1. If has a "used" buffer, remove it
bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd));
if (bp != NULL) {
g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd));
memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail(app_ctx->free_buffer, bp);
}
// 2. If appears in the write waiting queue, remove it
GQueue* writew = g_hash_table_lookup (app_ctx->write_waiting, &(fdinfo->fd));
while (writew != NULL && (bp = g_queue_pop_head (writew)) != NULL) {
memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail(app_ctx->free_buffer, bp);
}
if (writew) g_hash_table_remove (app_ctx->write_waiting, &(fdinfo->fd));
// 3. If appears in the read waiting queue, remove it
g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd));
int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
// We do nothing
return 0;
}
void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params* ap) {
struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx));
if (ctx == NULL) goto init_err;
memset(ctx, 0, sizeof(struct algo_ctx));
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
ctx->link_count = 8;
ctx->is_rdy = 0;
ctx->ap = *ap;
void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) {
struct rr_ctx* rr = malloc(sizeof(struct rr_ctx));
if (rr == NULL) goto init_err;
if (rr == NULL) {
perror("malloc failed for rr_init.");
exit(EXIT_FAILURE);
}
memset(rr, 0, sizeof(struct rr_ctx));
rr->mjit = 200;
rr->my_links = 0xff;
@ -506,63 +350,6 @@ void algo_rr(struct evt_core_ctx* evt, struct algo_skel* as, struct algo_params*
rr->sent_id = 1;
rr->recv_id = 0;
rr->recv_id_late = 0;
ctx->misc = rr;
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
as->on_tcp_co.name = "tcp-listen";
as->on_tcp_co.flags = EPOLLIN;
as->on_tcp_co.free_app_ctx = free_nothing;
as->on_tcp_co.cb = rr_on_tcp_co;
as->on_tcp_read.name = "tcp-read";
as->on_tcp_read.flags = EPOLLIN | EPOLLET | EPOLLRDHUP;
as->on_tcp_read.app_ctx = ctx;
as->on_tcp_read.free_app_ctx = free_naive;
as->on_tcp_read.cb = rr_on_tcp_read;
as->on_tcp_read.err_cb = rr_on_err;
ctx->ref_count++;
as->on_udp_read.name = "udp-read";
as->on_udp_read.flags = EPOLLIN | EPOLLET;
as->on_udp_read.app_ctx = ctx;
as->on_udp_read.free_app_ctx = free_naive;
as->on_udp_read.cb = rr_on_udp_read;
as->on_udp_read.err_cb = rr_on_err;
ctx->ref_count++;
as->on_tcp_write.name = "tcp-write";
as->on_tcp_write.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP;
as->on_tcp_write.app_ctx = ctx;
as->on_tcp_write.free_app_ctx = free_naive;
as->on_tcp_write.cb = rr_on_tcp_write;
as->on_tcp_write.err_cb = rr_on_err;
ctx->ref_count++;
as->on_udp_write.name = "udp-write";
as->on_udp_write.flags = EPOLLOUT | EPOLLET;
as->on_udp_write.app_ctx = ctx;
as->on_udp_write.free_app_ctx = free_naive;
as->on_udp_write.cb = rr_on_udp_write;
as->on_udp_write.err_cb = rr_on_err;
ctx->ref_count++;
struct evt_core_cat tcat = {
.name = "timeout",
.flags = EPOLLIN | EPOLLET,
.app_ctx = ctx,
.free_app_ctx = free_naive,
.cb = rr_on_timer,
.err_cb = NULL
};
ctx->ref_count++;
evt_core_add_cat(evt, &tcat);
return;
init_err:
fprintf(stderr, "Failed to init algo naive\n");
exit(EXIT_FAILURE);
app_ctx->misc = rr;
}

View file

@ -1,12 +0,0 @@
#include "algo_skel.h"
void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap) {
for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) {
if (strcmp(available_algo[i].name, name) == 0) {
available_algo[i].init(ctx, as, ap);
return;
}
}
fprintf(stderr, "Algorithm %s has not been found\n", name);
exit(EXIT_FAILURE);
}

View file

@ -1,44 +0,0 @@
#pragma once
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "packet.h"
#include "evt_core.h"
#include "utils.h"
#include "url.h"
struct algo_params {
uint8_t is_waiting_bootstrap;
uint8_t is_healing;
};
struct algo_skel {
struct evt_core_cat on_udp_read;
struct evt_core_cat on_tcp_read;
struct evt_core_cat on_udp_write;
struct evt_core_cat on_tcp_write;
struct evt_core_cat on_tcp_co;
};
typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap);
void init_algo(struct evt_core_ctx* ctx, struct algo_skel* as, char* name, struct algo_params* ap);
void algo_naive(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap);
void algo_rr(struct evt_core_ctx* ctx, struct algo_skel* as, struct algo_params* ap);
struct algo_desc {
algo_init init;
char* name;
};
static struct algo_desc available_algo[] = {
{
.init = algo_naive,
.name = "naive"
},
{
.init = algo_rr,
.name = "rr"
}
};

View file

@ -2,12 +2,35 @@
#include <glib-2.0/glib.h>
#include <glib-2.0/gmodule.h>
#include <glib-2.0/glib-object.h>
#include "algo_skel.h"
#include "packet.h"
#include "evt_core.h"
#define PACKET_BUFFER_SIZE 20
struct algo_params {
uint8_t is_waiting_bootstrap;
uint8_t is_healing;
char* algo_name;
};
struct algo_ctx;
typedef void (*algo_init)(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
typedef int (*algo_ctx_on_buffer)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
typedef int (*algo_ctx_on_event)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
typedef void (*algo_ctx_free_misc)(void*);
struct algo_desc {
char* name;
algo_init init;
algo_ctx_on_buffer on_stream;
algo_ctx_on_buffer on_datagram;
algo_ctx_on_event on_timer;
algo_ctx_on_event on_err;
};
struct algo_ctx {
struct algo_desc* desc;
uint8_t link_count;
uint8_t is_rdy;
struct algo_params ap;

View file

@ -4,7 +4,6 @@
#include <sys/stat.h>
#include <unistd.h>
#include <gmodule.h>
#include "algo_skel.h"
#include "donar_client.h"
#include "donar_server.h"

View file

@ -70,22 +70,19 @@ void init_socks5_sinks(struct donar_client_ctx* app_ctx) {
}
void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) {
struct algo_skel algo = {0};
struct algo_params ap = {
.is_waiting_bootstrap = dp->is_waiting_bootstrap,
.is_healing = dp->is_healing
.is_healing = dp->is_healing,
.algo_name = dp->algo
};
evt_core_init (&(ctx->evts), dp->verbose);
init_algo(&ctx->evts, &algo, dp->algo, &ap);
algo_main_init(&ctx->evts, &ap);
printf("--- Algorithm initialized\n");
socks5_init (&ctx->evts);
init_socks5_sinks(ctx);
evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co));
evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read));
evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read));
evt_core_add_cat (&(ctx->evts), &(algo.on_udp_write));
evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_write));
printf("--- Categories created\n");
printf("--- Socks5 connection process started\n");
load_onion_services (ctx, dp->onion_file, CLIENT_PORT_SIZE);
printf("--- Onion services loaded\n");

View file

@ -2,9 +2,9 @@
#include <stdio.h>
#include <stdlib.h>
#include <gmodule.h>
#include "algo_skel.h"
#include "tor_os.h"
#include "socks5.h"
#include "proxy.h"
#include "donar_init.h"
#define CLIENT_PORT_SIZE 8

View file

@ -52,21 +52,15 @@ socket_create_err:
}
void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
struct algo_skel algo = {0};
struct algo_params ap = {
.is_waiting_bootstrap = dp->is_waiting_bootstrap,
.is_healing = dp->is_healing
.is_healing = dp->is_healing,
.algo_name = dp->algo
};
evt_core_init (&(ctx->evts), dp->verbose);
init_algo(&ctx->evts, &algo, dp->algo, &ap);
evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_co));
evt_core_add_cat (&(ctx->evts), &(algo.on_udp_read));
evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_read));
evt_core_add_cat (&(ctx->evts), &(algo.on_udp_write));
evt_core_add_cat (&(ctx->evts), &(algo.on_tcp_write));
printf("--- Categories created\n");
algo_main_init(&ctx->evts, &ap);
printf("--- Algorithm initialized\n");
for (uint16_t i = 0; i < PORT_SIZE ; i++) {
ctx->ports[i] = 7500 + i;

View file

@ -8,8 +8,8 @@
#include "tor_os.h"
#include "tor_ctl.h"
#include "evt_core.h"
#include "algo_skel.h"
#include "donar_init.h"
#include "proxy.h"
#define PORT_SIZE 8

283
src/proxy.c Normal file
View file

@ -0,0 +1,283 @@
#include "proxy.h"
int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int conn_sock1, conn_sock2;
struct sockaddr_in addr;
socklen_t in_len;
char url[1024], port[6];
struct evt_core_cat local_cat = {0};
struct evt_core_fdinfo to_fdinfo = {0};
to_fdinfo.cat = &local_cat;
to_fdinfo.url = url;
in_len = sizeof(addr);
conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len);
if (conn_sock1 == -1 && errno == EAGAIN) return 1;
if (conn_sock1 == -1) goto co_error;
conn_sock2 = dup(conn_sock1);
if (conn_sock2 == -1) goto co_error;
//printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2);
url_get_port(port, fdinfo->url);
to_fdinfo.fd = conn_sock1;
to_fdinfo.cat->name = "tcp-read";
sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
to_fdinfo.fd = conn_sock2;
to_fdinfo.cat->name = "tcp-write";
sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
return 0;
co_error:
perror("Failed to handle new connection");
exit(EXIT_FAILURE);
}
int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int read_res = FDS_READY;
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Try to read a whole packet in the buffer
while (bp->mode == BP_READING) {
read_res = read_packet_from_tcp (fdinfo->fd, bp);
if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
}
// 3. Logic on packet
return app_ctx->desc->on_stream(ctx, fdinfo, bp);
co_error:
perror("Failed to TCP read");
exit(EXIT_FAILURE);
}
int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int read_res = FDS_READY;
char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Read packet from socket
bp->ip.ap.str.port = url_get_port_int (fdinfo->url);
read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other);
if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
// 3. Apply logic
return app_ctx->desc->on_datagram(ctx, fdinfo, bp);
co_error:
perror("Failed to UDP read");
exit(EXIT_FAILURE);
}
int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct rr_ctx* rr = app_ctx->misc;
int write_res = FDS_READY;
// 0. Show some information about circuits
uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0;
if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count);
else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count);
app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME prevent deactivation for our tests
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write data from the buffer to the socket
while (bp->mode == BP_WRITING) {
write_res = write_packet_to_tcp(fdinfo->fd, bp);
if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
}
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo);
notify_read(ctx, app_ctx);
return 0;
co_error:
perror("Failed to TCP write");
exit(EXIT_FAILURE);
}
int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp;
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
int write_res = FDS_READY;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write buffer
write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other);
if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
// 3. A whole packet has been written
// Release the buffer and notify
mv_buffer_wtof(app_ctx, fdinfo);
notify_read(ctx, app_ctx);
return 0;
co_error:
perror("Failed to UDP write");
exit(EXIT_FAILURE);
}
int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
uint64_t ctr;
ssize_t tmr_rd;
tmr_rd = read(fdinfo->fd, &ctr, sizeof(ctr));
if (tmr_rd == -1 && errno == EAGAIN) return 1;
if (tmr_rd < 0) {
perror("read on timer");
fprintf(stderr, "An error occured on timer fd=%d\n", fdinfo->fd);
exit(EXIT_FAILURE);
}
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
return app_ctx->desc->on_timer (ctx, fdinfo);
end:
evt_core_rm_fd(ctx, fdinfo->fd);
return 1;
}
int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
struct buffer_packet* bp;
// 1. If has a "used" buffer, remove it
bp = g_hash_table_lookup (app_ctx->used_buffer, &(fdinfo->fd));
if (bp != NULL) {
g_hash_table_remove (app_ctx->used_buffer, &(fdinfo->fd));
memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail(app_ctx->free_buffer, bp);
}
// 2. If appears in the write waiting queue, remove it
GQueue* writew = g_hash_table_lookup (app_ctx->write_waiting, &(fdinfo->fd));
while (writew != NULL && (bp = g_queue_pop_head (writew)) != NULL) {
memset(bp, 0, sizeof(struct buffer_packet));
g_queue_push_tail(app_ctx->free_buffer, bp);
}
if (writew) g_hash_table_remove (app_ctx->write_waiting, &(fdinfo->fd));
// 3. If appears in the read waiting queue, remove it
g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd));
return app_ctx->desc->on_err(ctx, fdinfo);
}
void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap) {
struct algo_ctx* ctx = malloc(sizeof(struct algo_ctx));
if (ctx == NULL) goto init_err;
memset(ctx, 0, sizeof(struct algo_ctx));
ctx->free_buffer = g_queue_new ();
ctx->read_waiting = g_queue_new ();
ctx->application_waiting = g_hash_table_new (NULL, NULL);
ctx->used_buffer = g_hash_table_new(g_int_hash, g_int_equal);
ctx->write_waiting = g_hash_table_new_full (g_int_hash, g_int_equal, NULL, naive_free_simple);
ctx->link_count = 8;
ctx->is_rdy = 0;
ctx->ap = *ap;
for (int i = 0; i < sizeof(ctx->bps) / sizeof(ctx->bps[0]); i++) {
g_queue_push_tail(ctx->free_buffer, &(ctx->bps[i]));
}
struct evt_core_cat tcp_listen = {
.name = "tcp-listen",
.flags = EPOLLIN,
.app_ctx = ctx,
.free_app_ctx = free_nothing,
.cb = main_on_tcp_co,
.err_cb = NULL
};
evt_core_add_cat(evt, &tcp_listen);
struct evt_core_cat tcp_read = {
.name = "tcp-read",
.flags = EPOLLIN | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx,
.free_app_ctx = free_naive,
.cb = main_on_tcp_read,
.err_cb = main_on_err
};
ctx->ref_count++;
evt_core_add_cat(evt, &tcp_read);
struct evt_core_cat udp_read = {
.name = "udp-read",
.flags = EPOLLIN | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx,
.free_app_ctx = free_naive,
.cb = main_on_udp_read,
.err_cb = main_on_err
};
ctx->ref_count++;
evt_core_add_cat(evt, &udp_read);
struct evt_core_cat tcp_write = {
.name = "tcp-write",
.flags = EPOLLOUT | EPOLLET | EPOLLRDHUP,
.app_ctx = ctx,
.free_app_ctx = free_naive,
.cb = main_on_tcp_write,
.err_cb = main_on_err
};
ctx->ref_count++;
evt_core_add_cat(evt, &tcp_write);
struct evt_core_cat udp_write = {
.name = "udp-write",
.flags = EPOLLOUT | EPOLLET,
.app_ctx = ctx,
.free_app_ctx = free_naive,
.cb = main_on_udp_write,
.err_cb = main_on_err
};
ctx->ref_count++;
evt_core_add_cat(evt, &udp_write);
struct evt_core_cat timer = {
.name = "timeout",
.flags = EPOLLIN | EPOLLET,
.app_ctx = ctx,
.free_app_ctx = free_naive,
.cb = main_on_timer,
.err_cb = NULL
};
ctx->ref_count++;
evt_core_add_cat(evt, &timer);
for (int i = 0; i < sizeof(available_algo) / sizeof(available_algo[0]); i++) {
if (strcmp(available_algo[i].name, ap->algo_name) == 0) {
ctx->desc = &(available_algo[i]);
ctx->desc->init(evt, ctx, ap);
return;
}
}
fprintf(stderr, "Algorithm %s has not been found\n", ap->algo_name);
exit(EXIT_FAILURE);
init_err:
fprintf(stderr, "Failed to init proxy\n");
exit(EXIT_FAILURE);
}

61
src/proxy.h Normal file
View file

@ -0,0 +1,61 @@
#pragma once
#include <sys/socket.h>
#include <stdlib.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include "evt_core.h"
#include "algo_utils.h"
#include "url.h"
#include "utils.h"
#include "packet.h"
void algo_naive_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
int algo_naive_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
int algo_naive_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
int algo_naive_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int algo_naive_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo);
void algo_rr_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap);
int algo_rr_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp);
int algo_rr_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int algo_rr_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
static struct algo_desc available_algo[] = {
{
.name = "naive",
.init = algo_naive_init,
.on_stream = algo_naive_on_stream,
.on_datagram = algo_naive_on_datagram,
.on_timer = algo_naive_on_timer,
.on_err = algo_naive_on_err
},
{
.name = "rr",
.init = algo_rr_init,
.on_stream = algo_rr_on_stream,
.on_datagram = algo_rr_on_datagram,
.on_timer = algo_rr_on_timer,
.on_err = algo_rr_on_err
},
{
.name = "dup2",
.init = NULL,
.on_stream = NULL,
.on_datagram = NULL,
.on_timer = NULL,
.on_err = NULL
}
};
void algo_main_init(struct evt_core_ctx* evt, struct algo_params* ap);
int main_on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int main_on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int main_on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);