From 0f7167ee84d01cbb75e89de76e1230beaac9a9ca Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 18 Feb 2019 14:35:09 +0100 Subject: [PATCH] Introduce url concept --- src/algo_naive.c | 30 +++++++++++--- src/donar_client.c | 60 +++++++++++++++++++++------ src/donar_server.c | 31 ++++++++++++-- src/evt_core.c | 101 +++++++++++++++++++++++++++++---------------- src/evt_core.h | 15 +++++-- src/meas_lat.c | 10 ++++- 6 files changed, 186 insertions(+), 61 deletions(-) diff --git a/src/algo_naive.c b/src/algo_naive.c index e023b59..b3f28a7 100644 --- a/src/algo_naive.c +++ b/src/algo_naive.c @@ -15,20 +15,38 @@ void free_naive(void* app_ctx) { } void on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) { - int conn_sock1, conn_sock2; - struct sockaddr addr; + int conn_sock1, conn_sock2, port; + struct sockaddr_in addr; socklen_t in_len; - struct epoll_event current_event; + char url[1024]; + struct evt_core_cat local_cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &local_cat; + fdinfo.url = url; in_len = sizeof(addr); + conn_sock1 = accept(fd, (struct sockaddr*)&addr, &in_len); - conn_sock1 = accept(fd, &addr, &in_len); if (conn_sock1 == -1) goto co_error; conn_sock2 = dup(conn_sock1); if (conn_sock2 == -1) goto co_error; //printf("fd=%d accepts, creating fds=%d,%d\n", fd, conn_sock1, conn_sock2); - evt_core_add_fd (ctx, "tcp-read", conn_sock1); - evt_core_add_fd (ctx, "tcp-write", conn_sock2); + + struct evt_core_fdinfo* listen_info = g_hash_table_lookup(ctx->socklist, &fd); + if (listen_info == NULL) goto co_error; + sscanf(listen_info->url, "tcp:listen:%d", &port); + + fdinfo.fd = conn_sock1; + fdinfo.cat->name = "tcp-read"; + sprintf(fdinfo.url, "tcp:read:127.0.0.1:%d", port); + evt_core_add_fd (ctx, &fdinfo); + + fdinfo.fd = conn_sock2; + fdinfo.cat->name = "tcp-write"; + sprintf(fdinfo.url, "tcp:write:127.0.0.1:%d", port); + evt_core_add_fd (ctx, &fdinfo); + + printf("Selected port: %d\n", port); return; diff --git a/src/donar_client.c b/src/donar_client.c index 34328e7..f21f1eb 100644 --- a/src/donar_client.c +++ b/src/donar_client.c @@ -6,24 +6,31 @@ void load_onion_services(struct donar_client_ctx* ctx, char* onion_file, int por } void init_tcp_client(struct donar_client_ctx* ctx, int i) { + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + char url_buf[255]; + int err; ctx->ports[i] = 7500 + i; - int sock1, err; + cat.name = "configure-socks5"; + fdinfo.cat = &cat; + fdinfo.url = url_buf; while (1) { - sock1 = create_tcp_client("127.0.0.1", "9050"); - if (sock1 < 0) goto failed_socks5; - ctx->client_sock[i].fd = sock1; + fdinfo.fd = create_tcp_client("127.0.0.1", "9050"); + if (fdinfo.fd < 0) goto failed_socks5; + ctx->client_sock[i].fd = fdinfo.fd; ctx->client_sock[i].state = SOCKS5_STATE_NEW; - evt_core_add_fd (&(ctx->evts), "configure-socks5", sock1); - //@FIXME: We suppose that we will be able to do the whole write at once which is wrong too - err = socks5_handshake_syn(sock1); + sprintf(url_buf, "socks5:dist:%d", i); + evt_core_add_fd (&(ctx->evts), &fdinfo); + //@FIXME: We suppose that we will be able to do the whole write at once which is wrong + err = socks5_handshake_syn(fdinfo.fd); if (err) goto failed_socks5; break; failed_socks5: fprintf(stderr, "Failed connection for socket %d/%d. Sleeping 2 seconds...\n",i+1, CLIENT_PORT_SIZE); - close(sock1); + close(fdinfo.fd); sleep(2); } } @@ -68,8 +75,23 @@ void configure_tcp_clients(struct evt_core_ctx* ctx, struct evt_core_cat* cat, i if (sock1 < 0 || sock2 < 0) goto on_socks5_err; void* fdcat = evt_core_rm_fd (ctx, fd); if (fdcat == NULL) goto on_socks5_err; - evt_core_add_fd (ctx, "tcp-write", sock1); - evt_core_add_fd (ctx, "tcp-read", sock2); + + struct evt_core_fdinfo fdinfo = {0}; + struct evt_core_cat cat = {0}; + char url[1024]; + fdinfo.cat = &cat; + fdinfo.url = url; + + fdinfo.cat->name = "tcp-write"; + fdinfo.fd = sock1; + sprintf(fdinfo.url, "tcp:write:127.0.0.1:%d", app_ctx->ports[pos]); + evt_core_add_fd (ctx, &fdinfo); + + fdinfo.cat->name = "tcp-read"; + fdinfo.fd = sock2; + sprintf(fdinfo.url, "tcp:read:127.0.0.1:%d", app_ctx->ports[pos]); + evt_core_add_fd (ctx, &fdinfo); + printf("Socket %d/%d %s:%d has been added to the pool!\n", pos+1, CLIENT_PORT_SIZE, target_host, app_ctx->ports[pos]); break; case SOCKS5_STATE_RDY: @@ -90,12 +112,26 @@ on_socks5_err: void init_udp_socket(char* port, struct donar_client_ctx* ctx) { int sock1, sock2; + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + sock1 = create_udp_client ("127.0.0.1", port); if (sock1 < 0) goto socket_failed; sock2 = dup(sock1); if (sock2 < 0) goto socket_failed; - evt_core_add_fd (&(ctx->evts), "udp-read", sock1); - evt_core_add_fd (&(ctx->evts), "udp-write", sock2); + + fdinfo.cat->name = "udp-read"; + fdinfo.fd = sock1; + sprintf(fdinfo.url, "udp:read:127.0.0.1:%s", port); + evt_core_add_fd (&(ctx->evts), &fdinfo); + + fdinfo.cat->name = "udp-write"; + fdinfo.fd = sock2; + sprintf(fdinfo.url, "udp:write:127.0.0.1:%s", port); + evt_core_add_fd (&(ctx->evts), &fdinfo); return; socket_failed: diff --git a/src/donar_server.c b/src/donar_server.c index 38b9278..d599510 100644 --- a/src/donar_server.c +++ b/src/donar_server.c @@ -23,6 +23,12 @@ void destroy_resources(struct tor_os_str* tos, struct tor_ctl* tctl) { } void init_tcp_servers(struct donar_server_ctx* ctx) { + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + char buffer[6]; int err, sock = 0; for (int i = 0; i < PORT_SIZE; i++) { @@ -31,7 +37,11 @@ void init_tcp_servers(struct donar_server_ctx* ctx) { if (sock < 0) goto socket_create_err; err = listen(sock, SOMAXCONN); if (err != 0) goto socket_create_err; - evt_core_add_fd(&(ctx->evts), "tcp-listen", sock); + + fdinfo.cat->name = "tcp-listen"; + fdinfo.fd = sock; + sprintf(fdinfo.url, "tcp:listen:%d", ctx->ports[i]); + evt_core_add_fd(&(ctx->evts), &fdinfo); } return; @@ -42,12 +52,27 @@ socket_create_err: void serv_init_udp_socket(char* port, struct donar_server_ctx* ctx) { int sock1, sock2; + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + sock1 = create_udp_client ("127.0.0.1", port); if (sock1 < 0) goto socket_failed; sock2 = dup(sock1); if (sock2 < 0) goto socket_failed; - evt_core_add_fd (&(ctx->evts), "udp-read", sock1); - evt_core_add_fd (&(ctx->evts), "udp-write", sock2); + + fdinfo.fd = sock1; + fdinfo.cat->name = "udp-read"; + sprintf(fdinfo.url, "udp:read:127.0.0.1:%s", port); + evt_core_add_fd (&(ctx->evts), &fdinfo); + + fdinfo.fd = sock2; + fdinfo.cat->name = "udp-write"; + sprintf(fdinfo.url, "udp:write:127.0.0.1:%s", port); + evt_core_add_fd (&(ctx->evts), &fdinfo); + return; socket_failed: diff --git a/src/evt_core.c b/src/evt_core.c index daf7fc7..885cd1d 100644 --- a/src/evt_core.c +++ b/src/evt_core.c @@ -1,13 +1,15 @@ #include "evt_core.h" -void free_fd(void* v) { - int* fd = (int*)v; - close(*fd); +void free_fdinfo(void* v) { + struct evt_core_fdinfo* fdinfo = (struct evt_core_fdinfo*)v; + close(fdinfo->fd); // We close the file descriptor here + if (fdinfo->url != NULL) free(fdinfo->url); // We free the URL here; + if (fdinfo->other != NULL) fdinfo->free_other(fdinfo->other); free(v); } -void free_char(void* c) { - free(c); +void free_simple(void* s) { + free(s); } void free_cat(void* vcat) { @@ -25,8 +27,9 @@ void evt_core_init(struct evt_core_ctx* ctx) { exit(EXIT_FAILURE); } - ctx->catlist = g_hash_table_new_full(g_str_hash, g_str_equal,free_char,free_cat); - ctx->socklist = g_hash_table_new_full(g_int_hash, g_int_equal,free_fd, NULL); + ctx->catlist = g_hash_table_new_full(g_str_hash, g_str_equal,NULL, free_cat); + ctx->socklist = g_hash_table_new_full(g_int_hash, g_int_equal, NULL, free_fdinfo); + ctx->urltofd = g_hash_table_new_full(g_str_hash, g_str_equal, NULL, NULL); } void evt_core_add_cat(struct evt_core_ctx* ctx, struct evt_core_cat* cat) { @@ -35,75 +38,99 @@ void evt_core_add_cat(struct evt_core_ctx* ctx, struct evt_core_cat* cat) { exit(EXIT_FAILURE); } + // 1. Create category structure struct evt_core_cat* dyn = NULL; dyn = malloc(sizeof(struct evt_core_cat)); if (dyn == NULL) { fprintf(stderr, "Failed to alloc memory\n"); exit(EXIT_FAILURE); } + + // 2. Populate category structure dyn->app_ctx = cat->app_ctx; dyn->free_app_ctx = cat->free_app_ctx; dyn->cb = cat->cb; dyn->name = strdup(cat->name); dyn->flags = cat->flags; - dyn->socklist = g_array_new (FALSE, FALSE, sizeof(int)); + dyn->socklist = g_array_new (FALSE, FALSE, sizeof(struct evt_core_fdinfo*)); if (dyn->name == NULL) { perror("Unable to allocate memory for category name via strdup"); exit(EXIT_FAILURE); } - char* key = NULL; - key = strdup(cat->name); - if (key == NULL) { - perror("Unable to allocate memory for key via strdup"); - exit(EXIT_FAILURE); - } - - g_hash_table_insert (ctx->catlist, key, dyn); + // 3. Insert category structure in our context + g_hash_table_insert (ctx->catlist, dyn->name, dyn); } -void evt_core_add_fd(struct evt_core_ctx* ctx, char* name, int fd) { - int* key = NULL; - key = malloc(sizeof(int)); - - if (key == NULL) { - perror("Unable to allocate memory for key via malloc"); - exit(EXIT_FAILURE); - } - *key = fd; - - struct evt_core_cat* cat = g_hash_table_lookup(ctx->catlist, name); +void evt_core_add_fd(struct evt_core_ctx* ctx, struct evt_core_fdinfo* user_data) { + // 1. Fetch fd category + struct evt_core_cat* cat = g_hash_table_lookup(ctx->catlist, user_data->cat->name); if (cat == NULL) { - fprintf(stderr, "Category %s should be defined before inserting a file descriptor in it.\n", name); + fprintf(stderr, "Category %s should be defined before inserting a file descriptor in it.\n", user_data->cat->name); exit(EXIT_FAILURE); } - g_array_append_val (cat->socklist, fd); - g_hash_table_insert(ctx->socklist, key, cat); - add_fd_to_epoll(ctx->epollfd, fd, cat->flags); + // 2. Create fdinfo struct + struct evt_core_fdinfo* fdinfo; + if ((fdinfo = malloc(sizeof (struct evt_core_fdinfo))) == NULL) { + perror("Unable to allocate memory for fdinfo via malloc"); + exit(EXIT_FAILURE); + } + + // 3. Populate fdinfo struct + fdinfo->fd = user_data->fd; + fdinfo->cat = cat; + fdinfo->url = strdup(user_data->url); + fdinfo->other = user_data->other; + fdinfo->free_other = user_data->free_other; + + if (fdinfo->url == NULL) { + perror("Unable to allocate memory via malloc for fdinfo->url"); + exit(EXIT_FAILURE); + } + + // 4. Insert structure in our context + g_array_append_val (cat->socklist, fdinfo); + g_hash_table_insert(ctx->socklist, &(fdinfo->fd), fdinfo); + g_hash_table_insert(ctx->urltofd, fdinfo->url, fdinfo); + + // 5. Add file descriptor to epoll + add_fd_to_epoll(ctx->epollfd, user_data->fd, cat->flags); } struct evt_core_cat* evt_core_rm_fd(struct evt_core_ctx* ctx, int fd) { - struct evt_core_cat* cat = g_hash_table_lookup (ctx->socklist, &fd); - if (cat == NULL) return NULL; + struct evt_core_cat* cat; + // 1. Fetch fdinfo structure + struct evt_core_fdinfo* fdinfo = g_hash_table_lookup (ctx->socklist, &fd); + if (fdinfo == NULL) return NULL; + cat = fdinfo->cat; + + // 2. Remove structure from urltofd and socklist + g_hash_table_remove(ctx->urltofd, fdinfo->url); g_hash_table_remove(ctx->socklist, &fd); + + // 3. Update category for (int i = 0; i < cat->socklist->len; i++) { if (g_array_index(cat->socklist, int, i) == fd) { g_array_remove_index(cat->socklist, i); } } + + // 4. Return file descriptor's category return cat; } void evt_core_free(struct evt_core_ctx* ctx) { g_hash_table_destroy(ctx->socklist); g_hash_table_destroy(ctx->catlist); + g_hash_table_destroy (ctx->urltofd); } void evt_core_loop(struct evt_core_ctx* ctx) { struct epoll_event current_event, events[EVT_CORE_MAX_EVENTS]; + struct evt_core_fdinfo* fdinfo; struct evt_core_cat* cat; printf("--- Start main loop\n"); @@ -116,6 +143,7 @@ void evt_core_loop(struct evt_core_ctx* ctx) { } for (n = 0 ; n < num_fd; n++) { + // 1. Handle errors if (events[n].events & (EPOLLERR | EPOLLHUP | EPOLLRDHUP)) { int err_fd = events[n].data.fd; int evt = events[n].events; @@ -133,12 +161,13 @@ void evt_core_loop(struct evt_core_ctx* ctx) { continue; } - cat = g_hash_table_lookup(ctx->socklist, &(events[n].data.fd)); - if (cat == NULL) { + // 2. Fetch info and call appropriate function + fdinfo = g_hash_table_lookup(ctx->socklist, &(events[n].data.fd)); + if (fdinfo == NULL) { fprintf(stderr, "Ignoring file descriptor %d as it is not registered. This is a bug.\n", events[n].data.fd); continue; } - cat->cb(ctx, cat, events[n].data.fd); + fdinfo->cat->cb(ctx, fdinfo->cat, events[n].data.fd); } } diff --git a/src/evt_core.h b/src/evt_core.h index 3e97e96..25d99c1 100644 --- a/src/evt_core.h +++ b/src/evt_core.h @@ -27,13 +27,22 @@ struct evt_core_cat { struct evt_core_ctx { int epollfd; - GHashTable* catlist; - GHashTable* socklist; + GHashTable* catlist; // name -> category + GHashTable* socklist; // fd -> category + GHashTable* urltofd; // url -> fd, like "tcp:127.0.0.1:7500" +}; + +struct evt_core_fdinfo { + int fd; + char* url; + struct evt_core_cat* cat; + void* other; + evt_core_free_app_ctx free_other; }; void evt_core_init(struct evt_core_ctx* ctx); void evt_core_add_cat(struct evt_core_ctx* ctx, struct evt_core_cat* cat); -void evt_core_add_fd(struct evt_core_ctx* ctx, char* name, int fd); +void evt_core_add_fd(struct evt_core_ctx* ctx, struct evt_core_fdinfo* user_data); struct evt_core_cat* evt_core_rm_fd(struct evt_core_ctx* ctx, int fd); void evt_core_free(struct evt_core_ctx* ctx); void evt_core_loop(struct evt_core_ctx* ctx); diff --git a/src/meas_lat.c b/src/meas_lat.c index 9572556..0d069ba 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -42,8 +42,16 @@ int main(int argc, char** argv) { printf("--- Categories registered\n"); int udp_sock = create_udp_client (argv[1], argv[2]); - evt_core_add_fd (&evts, "udp-read", udp_sock); + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + fdinfo.fd = udp_sock; + fdinfo.cat->name = "udp-read"; + sprintf(fdinfo.url, "udp:read:%s:%s", argv[1], argv[2]); + evt_core_add_fd (&evts, &fdinfo); return 0; }