diff --git a/src/meas_lat.c b/src/meas_lat.c index 4f15215..a97a637 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -9,28 +9,133 @@ #include "socks5.h" #include "utils.h" #include "measure.h" +#include "url.h" struct measlat_ctx { struct measure_conf mc; - int verbose; + struct sockaddr_in addr; + socklen_t addrlen; + int verbose, is_timer_started, is_from_needed; char *host, *port, *transport; }; -int on_udp_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { +void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) { + struct timespec now; + struct itimerspec timer_config; + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + + struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read"); + if (ucat == NULL) { + fprintf(stderr, "Category udp-read not found\n"); + exit(EXIT_FAILURE); + } + struct measlat_ctx* mctx = ucat->app_ctx; + mctx->is_timer_started = 1; + + if (clock_gettime(CLOCK_REALTIME, &now) == -1) { + perror("clock_gettime"); + exit(EXIT_FAILURE); + } + uint64_t micro_sec = mctx->mc.interval; + timer_config.it_value.tv_sec = next_tick == NULL ? now.tv_sec + 1 : next_tick->tv_sec; + timer_config.it_value.tv_nsec = next_tick == NULL ? now.tv_nsec : next_tick->tv_nsec; + timer_config.it_interval.tv_sec = micro_sec / 1000; + timer_config.it_interval.tv_nsec = micro_sec % 1000 * 1000000; + + printf("timer_config: sec=%ld nsec=%ld \n", (uint64_t) timer_config.it_value.tv_sec, (uint64_t) timer_config.it_value.tv_nsec); + + fdinfo.fd = timerfd_create(CLOCK_REALTIME, 0); + if (fdinfo.fd == -1) { + perror("Unable to timerfd_create"); + exit(EXIT_FAILURE); + } + if (timerfd_settime (fdinfo.fd, TFD_TIMER_ABSTIME, &timer_config, NULL) == -1) { + perror("Unable to timerfd_time"); + exit(EXIT_FAILURE); + } + fdinfo.cat->name = "timer"; + sprintf(fdinfo.url, "timer:%ld:%ld", mctx->mc.interval, mctx->mc.max_measure); + evt_core_add_fd (evts, &fdinfo); + printf("--- Timer registered\n"); +} + +int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { return 1; } -int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - ssize_t res; +int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + ssize_t nread; struct measlat_ctx* mctx = fdinfo->cat->app_ctx; - res = read(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size); - if (res == -1 && errno == EAGAIN) return 1; - measure_parse (res, &mctx->mc); + if (mctx->is_from_needed) nread = recvfrom(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen); + else nread = recv(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size, MSG_TRUNC); + + if (nread == -1 && errno == EAGAIN) return 1; + // @FIXME logic is wrong for TCP here but would lead (hopefully) to a crash + + // First we parse the packet and exit if needed + measure_parse (nread, &mctx->mc); + + // Old behaviour where we work in a RTT way and send back the packet + if (measure_need_reply (&mctx->mc)) { + ssize_t nwritten; + if (mctx->is_from_needed) nwritten = sendto(fdinfo->fd, mctx->mc.payload, nread, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen); + else nwritten = send(fdinfo->fd, mctx->mc.payload, nread, 0); + + // @FIXME don't support EAGAIN on write. Could be intended, you don't think so? + if (nwritten != nread) { + fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", nread, nwritten); + perror("write errno"); + exit(EXIT_FAILURE); + } + return 0; + } + + // Used to start sending from the server in sync with client + if (!mctx->is_timer_started) { + struct timespec next_tick = {0}; + measure_next_tick(&mctx->mc, &next_tick); + register_timer (ctx, &next_tick); + } return 0; } + +int on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + int conn_sock1, conn_sock2; + struct sockaddr_in addr; + socklen_t in_len; + char url[1024], port[6]; + struct evt_core_cat local_cat = {0}; + struct evt_core_fdinfo to_fdinfo = {0}; + to_fdinfo.cat = &local_cat; + to_fdinfo.url = url; + + in_len = sizeof(addr); + conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len); + + if (conn_sock1 == -1 && errno == EAGAIN) return 1; + if (conn_sock1 == -1) goto co_error; + + url_get_port(port, fdinfo->url); + + to_fdinfo.fd = conn_sock1; + to_fdinfo.cat->name = "tcp-read"; + sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port); + evt_core_add_fd (ctx, &to_fdinfo); + + return 0; + +co_error: + perror("Failed to handle new connection"); + exit(EXIT_FAILURE); +} + int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { ssize_t s; uint64_t ticks = 0; @@ -46,15 +151,8 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { if (ticks != 1) { fprintf(stderr, "Has ticked %lu times, expected 1 time. This is a bug\n", ticks); } - mctx->mc.counter++; - - struct measure_packet* head = (struct measure_packet*)mctx->mc.payload; - head->counter = mctx->mc.counter; - if (clock_gettime(CLOCK_MONOTONIC, &head->emit_time) == -1) { - perror("clock_gettime error"); - exit(EXIT_FAILURE); - } + struct measure_packet* head = measure_generate(&mctx->mc); struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read"); if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read"); @@ -62,56 +160,17 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { printf("No connection yet\n"); return 1; } - s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0); + if (mctx->is_from_needed) s = sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen); + else s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0); + if (s < 0) { perror("Send error"); //exit(EXIT_FAILURE); } + return 0; } - -void register_timer(struct evt_core_ctx* evts) { - struct timespec now; - struct itimerspec timer_config; - char url[1024]; - struct evt_core_cat cat = {0}; - struct evt_core_fdinfo fdinfo = {0}; - fdinfo.cat = &cat; - fdinfo.url = url; - - struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read"); - if (ucat == NULL) { - fprintf(stderr, "Category udp-read not found\n"); - exit(EXIT_FAILURE); - } - struct measlat_ctx* mctx = ucat->app_ctx; - - if (clock_gettime(CLOCK_REALTIME, &now) == -1) { - perror("clock_gettime"); - exit(EXIT_FAILURE); - } - uint64_t micro_sec = mctx->mc.interval; - timer_config.it_value.tv_sec = now.tv_sec + 1; - timer_config.it_value.tv_nsec = now.tv_nsec; - timer_config.it_interval.tv_sec = micro_sec / 1000; - timer_config.it_interval.tv_nsec = micro_sec % 1000 * 1000000; - - fdinfo.fd = timerfd_create(CLOCK_REALTIME, 0); - if (fdinfo.fd == -1) { - perror("Unable to timerfd_create"); - exit(EXIT_FAILURE); - } - if (timerfd_settime (fdinfo.fd, TFD_TIMER_ABSTIME, &timer_config, NULL) == -1) { - perror("Unable to timerfd_time"); - exit(EXIT_FAILURE); - } - fdinfo.cat->name = "timer"; - sprintf(fdinfo.url, "timer:%ld:%ld", mctx->mc.interval, mctx->mc.max_measure); - evt_core_add_fd (evts, &fdinfo); - printf("--- Timer registered\n"); -} - int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { char url[1024]; struct evt_core_cat cat = {0}; @@ -134,11 +193,11 @@ int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* evt_core_add_fd (ctx, &fdinfo_n); printf("--- Tor socket registered\n"); - register_timer(ctx); + register_timer(ctx, NULL); return 1; } -void spawn_tor_socket(struct evt_core_ctx* evts) { +void spawn_tor_client(struct evt_core_ctx* evts) { struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read"); if (ucat == NULL) { fprintf(stderr, "Category udp-read not found\n"); @@ -147,11 +206,12 @@ void spawn_tor_socket(struct evt_core_ctx* evts) { struct measlat_ctx* mctx = ucat->app_ctx; socks5_create_dns_client (evts, "127.0.0.1", "9050", mctx->host, atoi(mctx->port)); + printf("--- Tor client SOCKS started\n"); } int on_socks5_failed_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { evt_core_rm_fd (ctx, fdinfo->fd); - spawn_tor_socket(ctx); + spawn_tor_client(ctx); return 1; } @@ -166,18 +226,24 @@ void register_categories(struct evt_core_ctx* evts, struct measlat_ctx* mctx) { template.flags = EPOLLIN | EPOLLET; evt_core_add_cat(evts, &template); - template.cb = on_udp; // intended but not elegant - template.err_cb = on_udp_err; // intended but not elegant + template.cb = on_receive_measure_packet; // intended but not elegant + template.err_cb = on_receive_measure_packet_err; // intended but not elegant template.name = "tcp-read"; template.flags = EPOLLIN | EPOLLET; evt_core_add_cat(evts, &template); - template.cb = on_udp; - template.err_cb = on_udp_err; + template.cb = on_receive_measure_packet; + template.err_cb = on_receive_measure_packet_err; template.name = "udp-read"; template.flags = EPOLLIN | EPOLLET; evt_core_add_cat(evts, &template); + template.cb = on_tcp_co; + template.err_cb = NULL; + template.name = "tcp-co"; + template.flags = EPOLLIN | EPOLLET; + evt_core_add_cat(evts, &template); + template.cb = on_socks5_success_measlat; template.err_cb = on_socks5_failed_measlat; template.name = "socks5-success"; @@ -195,7 +261,7 @@ void register_categories(struct evt_core_ctx* evts, struct measlat_ctx* mctx) { } -void spawn_udp_socket(struct evt_core_ctx* evts) { +void spawn_udp_client(struct evt_core_ctx* evts) { struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "udp-read"); if (ucat == NULL) { fprintf(stderr, "Category udp-read not found\n"); @@ -216,9 +282,52 @@ void spawn_udp_socket(struct evt_core_ctx* evts) { fdinfo.free_other = NULL; sprintf(fdinfo.url, "udp:read:%s:%s", mctx->host, mctx->port); evt_core_add_fd (evts, &fdinfo); - printf("--- UDP socket registered\n"); + printf("--- UDP client registered\n"); - register_timer(evts); + register_timer(evts, NULL); +} + +void spawn_udp_server(struct evt_core_ctx* evts) { + struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "udp-read"); + if (ucat == NULL) { + fprintf(stderr, "Category udp-read not found\n"); + exit(EXIT_FAILURE); + } + struct measlat_ctx* mctx = ucat->app_ctx; + int udp_sock = create_udp_server (mctx->host, mctx->port); + + char url[1024]; + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = url; + + fdinfo.fd = udp_sock; + sprintf(url, "udp:rw:127.0.0.1:%s", mctx->port); + fdinfo.cat->name = "udp-read"; + evt_core_add_fd(evts, &fdinfo); + printf("--- UDP server is listening\n"); +} + +void spawn_tor_server(struct evt_core_ctx* evts) { + uint16_t ports[] = {7500}; + char buffer[1024]; + int tcp_serv_sock, err; + + sprintf(buffer, "%d", ports[0]); + tcp_serv_sock = create_tcp_server ("0.0.0.0", buffer); + err = listen(tcp_serv_sock, SOMAXCONN); + + struct evt_core_cat cat = {0}; + struct evt_core_fdinfo fdinfo = {0}; + fdinfo.cat = &cat; + fdinfo.url = buffer; + + fdinfo.fd = tcp_serv_sock; + sprintf(buffer, "tcp:co:127.0.0.1:%d", ports[0]); + fdinfo.cat->name = "tcp-co"; + evt_core_add_fd(evts, &fdinfo); + printf("--- TCP server is listening\n"); } int main(int argc, char** argv) { @@ -230,7 +339,7 @@ int main(int argc, char** argv) { struct evt_core_ctx evts = {0}; // 1. Parse parameters - while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:")) != -1) { + while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:lr")) != -1) { switch(opt) { case 'v': mctx.verbose++; @@ -241,6 +350,9 @@ int main(int argc, char** argv) { case 'p': // port mctx.port = optarg; break; + case 'l': + mctx.mc.is_server = 1; + break; case 't': // transport mctx.transport = optarg; break; @@ -250,6 +362,8 @@ int main(int argc, char** argv) { case 's': // size - payload in bytes mctx.mc.payload_size = atoi(optarg); break; + case 'r': + mctx.mc.is_rtt = 1; case 'i': // interval - every ms mctx.mc.interval = atoi(optarg); break; @@ -260,21 +374,27 @@ int main(int argc, char** argv) { // 2. Check and fix parameters measure_prepare (&mctx.mc); + mctx.addrlen = sizeof(mctx.addr); if (mctx.transport == NULL) mctx.transport = "udp"; - if (mctx.host == NULL || mctx.port == NULL) goto usage; - printf("[measlat_conf] host=%s, port=%s, transport=%s, count=%ld, size=%ld, interval=%ld\n", - mctx.host, mctx.port, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval); + if (mctx.host == NULL) mctx.host = "127.0.0.1"; + if (mctx.port == NULL) mctx.port = strcmp(mctx.transport, "udp") == 0 ? "9000" : "7500"; + printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld\n", + mctx.host, mctx.port, mctx.mc.is_server, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval); // 3. Bind events register_categories(&evts, &mctx); - if (strcmp(mctx.transport, "udp") == 0) spawn_udp_socket(&evts); - else if (strcmp(mctx.transport, "tor") == 0) spawn_tor_socket(&evts); + + if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) { spawn_udp_server (&evts); mctx.is_from_needed = 1; } + else if (mctx.mc.is_server && strcmp(mctx.transport, "tor") == 0) spawn_tor_server(&evts); + else if (strcmp(mctx.transport, "udp") == 0) spawn_udp_client(&evts); + else if (strcmp(mctx.transport, "tor") == 0) spawn_tor_client(&evts); + else exit(EXIT_FAILURE); // 4. Run main loop evt_core_loop(&evts); return 0; usage: - fprintf(stderr, "Usage: %s -h -p [-t ] [-c ] [-i ] [-s ]\n", argv[0]); + fprintf(stderr, "Usage: %s -h -p [-l] [-t ] [-c ] [-i ] [-s ]\n", argv[0]); exit(EXIT_FAILURE); } diff --git a/src/measure.c b/src/measure.c index 060f58d..e7d1fb2 100644 --- a/src/measure.c +++ b/src/measure.c @@ -1,4 +1,5 @@ #include "measure.h" +#define ONE_SEC 1000000000L void measure_parse(int size, struct measure_conf* mc) { struct timespec curr; @@ -8,7 +9,7 @@ void measure_parse(int size, struct measure_conf* mc) { exit(EXIT_FAILURE); } struct measure_packet* head = (struct measure_packet*) mc->payload; - if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ + if (clock_gettime(CLOCK_REALTIME, &curr) == -1){ perror("clock_gettime error"); exit(EXIT_FAILURE); } @@ -19,7 +20,7 @@ void measure_parse(int size, struct measure_conf* mc) { micro_sec = elapsed_micros (&head->emit_time, &curr); printf("[%s] Packet %llu latency %luµs\n", ctime_no_newline, (unsigned long long)head->counter, micro_sec); - if (!mc->inhibit_exit && head->counter >= mc->max_measure) { + if (!mc->is_server && head->counter >= mc->max_measure) { printf("Measurement done\n"); exit(EXIT_SUCCESS); } @@ -44,3 +45,49 @@ void measure_prepare(struct measure_conf* mc) { cursor_msg = (cursor_msg + 1) % msg_len; } } +struct measure_packet* measure_generate(struct measure_conf* mc) { + struct measure_packet* head = (struct measure_packet*)mc->payload; + mc->counter++; + + head->counter = mc->counter; + head->is_echo = mc->is_rtt && !mc->is_server; + if (clock_gettime(CLOCK_REALTIME, &head->emit_time) == -1) { + perror("clock_gettime error"); + exit(EXIT_FAILURE); + } + return head; +} + +uint8_t timespec_gt(struct timespec *t1, struct timespec *t2) { + return t1->tv_sec > t2->tv_sec || (t1->tv_sec == t2->tv_sec && t1->tv_nsec > t2->tv_nsec); +} + +void measure_next_tick(struct measure_conf *mc, struct timespec *next) { + struct measure_packet *head = (struct measure_packet*) mc->payload; + struct timespec now, *sent_at = &head->emit_time; + mc->counter = head->counter; + + if (clock_gettime(CLOCK_REALTIME, &now) == -1) { + perror("clock_gettime error"); + exit(EXIT_FAILURE); + } + memcpy(next, sent_at, sizeof(struct timespec)); + + while(!timespec_gt (next, &now)) { + next->tv_nsec += mc->interval * 1000000L; + if (next->tv_nsec > ONE_SEC) { + next->tv_sec += next->tv_nsec / ONE_SEC; + next->tv_nsec = next->tv_nsec % ONE_SEC; + } + mc->counter++; + } + mc->counter--; + printf("interval: %ld\n", mc->interval); + printf("sent_at: sec=%ld nsec=%ld \n", (uint64_t) sent_at->tv_sec, (uint64_t) sent_at->tv_nsec); + printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec); + printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec); +} + +uint8_t measure_need_reply(struct measure_conf* mc) { + return mc->is_server && ((struct measure_packet*)mc->payload)->is_echo; +}; diff --git a/src/measure.h b/src/measure.h index 51ff873..72a53f8 100644 --- a/src/measure.h +++ b/src/measure.h @@ -1,3 +1,4 @@ +#pragma once #include #include #include @@ -11,13 +12,17 @@ struct measure_conf { uint64_t interval; char* payload; uint64_t counter; - uint8_t inhibit_exit; + uint8_t is_server, is_rtt; }; struct measure_packet { uint64_t counter; + uint8_t is_echo; struct timespec emit_time; }; void measure_parse(int size, struct measure_conf* mc); void measure_prepare(struct measure_conf* mc); +struct measure_packet* measure_generate(struct measure_conf* mc); +uint8_t measure_need_reply(struct measure_conf* mc); +void measure_next_tick(struct measure_conf *mc, struct timespec *next); diff --git a/src/tor_echo.c b/src/tor_echo.c index 2572f96..7c33600 100644 --- a/src/tor_echo.c +++ b/src/tor_echo.c @@ -100,7 +100,7 @@ int main(int argc, char** argv) { enum TOR_ONION_FLAGS tof = TOR_ONION_FLAG_NONE; char url[1024]; struct torecho_ctx tctx = {0}; - tctx.mc.inhibit_exit = 1; + tctx.mc.is_server = 1; tctx.mc.payload_size = 1500; while ((opt = getopt(argc, argv, "ns:m")) != -1) { @@ -130,7 +130,7 @@ int main(int argc, char** argv) { .socklist = NULL }; struct evt_core_cat tcp_all = { - .app_ctx = &tctx, + .app_ctx = &tctx , .free_app_ctx = NULL, .cb = te_on_tcp, .err_cb = NULL, diff --git a/src/udp_echo.c b/src/udp_echo.c index 9294f41..92f3e34 100644 --- a/src/udp_echo.c +++ b/src/udp_echo.c @@ -45,7 +45,7 @@ int main(int argc, char** argv) { int opt, udp_sock, verbose = 0; char *port = NULL, *bindhost = NULL; struct evt_core_ctx evts = {0}; - uctx.mc.inhibit_exit = 1; + uctx.mc.is_server = 1; uctx.mc.payload_size = 1500; // 1. Parse parameters