diff --git a/src/meas_lat.c b/src/meas_lat.c index 4d2b580..158ed30 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -4,6 +4,7 @@ #include #include #include +#include #include "evt_core.h" #include "net_tools.h" #include "socks5.h" @@ -14,30 +15,41 @@ #include "tor_ctl.h" struct measlat_ctx { - struct measure_conf mc; struct sockaddr_in addr; socklen_t addrlen; - int verbose, is_timer_started, is_from_needed, tor_flags; + int verbose, connectionless, tor_flags; char *host, *port, *transport; }; -void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) { +struct timer_measure { + int fd, cntr; +}; + +void free_timer_measure(void* obj) { + free(obj); +} + +struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct evt_core_fdinfo *tgt, struct timespec* next_tick) { struct timespec now; struct itimerspec timer_config; char url[1024]; struct evt_core_cat cat = {0}; struct evt_core_fdinfo fdinfo = {0}; - fdinfo.cat = &cat; - fdinfo.url = url; - struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read"); - if (ucat == NULL) { - fprintf(stderr, "Category udp-read not found\n"); + struct timer_measure* tmeas = malloc(sizeof(struct timer_measure)); + if (tmeas == NULL) { + perror("unable to malloc struct timer_measure"); exit(EXIT_FAILURE); } - struct measlat_ctx* mctx = ucat->app_ctx; + tmeas->fd = tgt->fd; + tmeas->cntr = 0; - mctx->is_timer_started = 1; + fdinfo.cat = &cat; + fdinfo.url = url; + fdinfo.other = tmeas; + fdinfo.free_other = free_timer_measure; + + struct measlat_ctx* mctx = tgt->cat->app_ctx; if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) { perror("clock_gettime"); @@ -61,64 +73,58 @@ void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) { exit(EXIT_FAILURE); } fdinfo.cat->name = "timer"; - sprintf(fdinfo.url, "timer:%ld:%ld", mctx->mc.interval, mctx->mc.max_measure); - evt_core_add_fd (evts, &fdinfo); + sprintf(fdinfo.url, "timer:%d", tgt->fd); + struct evt_core_fdinfo *new_fdinfo = evt_core_add_fd (evts, &fdinfo); printf("--- Timer registered\n"); + return new_fdinfo; } int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + char url[255]; + + sprintf(url, "timer:%d", fdinfo->fd); + struct evt_core_fdinfo* assoc_timer = evt_core_get_from_url (ctx, url); + if (assoc_timer != NULL) evt_core_rm_fd (ctx, assoc_timer->fd); + return 1; } int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { + struct timer_measure* tmeas = fdinfo->other; struct measlat_ctx* mctx = fdinfo->cat->app_ctx; ssize_t nread; + char url[255]; + + sprintf(url, "timer:%d", fdinfo->fd); + struct evt_core_fdinfo* assoc_timer = evt_core_get_from_url (ctx, url); + if (assoc_timer == NULL) { + // Start sending if timer does not exist yet + struct timespec next_tick = {0}; + measure_next_tick(&mctx->mc, &next_tick); + assoc_timer = register_timer (ctx, fdinfo, &next_tick); + } if (mctx->mc.read_size >= mctx->mc.payload_size || strcmp(mctx->transport, "udp") == 0) mctx->mc.read_size = 0; - if (mctx->is_from_needed) { - nread = recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen); - } else { - nread = recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0); - } + nread = mctx->connectionless ? + recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen) : + recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0); + if (nread > 0) mctx->mc.read_size += nread; - if ((nread == -1 && errno == EAGAIN) || nread == 0) { + if (nread == -1 && errno == EAGAIN) { return 1; } if (strcmp("udp", mctx->transport) != 0 && mctx->mc.read_size < mctx->mc.payload_size) { return 0; } - // First we parse the packet and exit if needed + // First we parse the packet measure_parse (mctx->mc.read_size, &mctx->mc); - // Old behaviour where we work in a RTT way and send back the packet - if (measure_need_reply (&mctx->mc)) { - ssize_t nwritten; - if (mctx->is_from_needed) nwritten = sendto(fdinfo->fd, mctx->mc.payload, mctx->mc.read_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen); - else nwritten = send(fdinfo->fd, mctx->mc.payload, mctx->mc.read_size, 0); - - // @FIXME don't support EAGAIN on write. Could be intended, you don't think so? - if (nwritten != mctx->mc.read_size) { - fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", mctx->mc.read_size, nwritten); - perror("write errno"); - exit(EXIT_FAILURE); - } - return 0; - } - - // Used to start sending from the server in sync with client - if (!mctx->is_timer_started && mctx->mc.counter <= mctx->mc.max_measure) { - struct timespec next_tick = {0}; - measure_next_tick(&mctx->mc, &next_tick); - register_timer (ctx, &next_tick); - } - struct measure_packet* head = (struct measure_packet*) mctx->mc.payload_rcv; - if ((!mctx->mc.is_server || mctx->mc.is_rtt) && head->counter >= mctx->mc.max_measure) { - printf("Measurement done\n"); - exit(EXIT_SUCCESS); + if (head->counter >= mctx->mc.max_measure) { + printf("All measurements received\n"); } return 0; @@ -174,26 +180,30 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { struct measure_packet* head = measure_generate(&mctx->mc); //printf("send(id=%ld,is_echo=%d)\n", head->counter, head->is_echo); - struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read"); - if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read"); - if (!mctx->mc.is_server && tgtinfo == NULL) { - fprintf(stderr, "Unable to find a fdinfo in udp-read nor in tcp-read. Quitting...\n"); - exit(EXIT_FAILURE); - } else if (mctx->mc.is_server && (tgtinfo == NULL || mctx->mc.counter > mctx->mc.max_measure)) { - printf("No connection yet\n"); - struct evt_core_fdinfo* timer_fd = evt_core_get_first_from_cat (ctx, "timer"); - evt_core_rm_fd(ctx, timer_fd->fd); - mctx->is_timer_started = 0; + int target_fd; + sscanf(fdinfo->url, "timer:%d", &target_fd); + struct evt_core_fdinfo* tgtinfo = evt_core_get_from_fd(ctx, target_fd); + + if (tgtinfo == NULL) { + fprintf(stderr, "No fd found to send, continue\n"); return 1; } - if (mctx->is_from_needed) s = sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen); - else s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0); + + s = mctx->connectionless ? + sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) : + send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0); if (s < 0 || s != mctx->mc.payload_size) { perror("Send error"); exit(EXIT_FAILURE); } + if (mctx->mc.counter >= mctx->mc.max_measure) { + printf("All measurements sent for %d\n", tgtinfo->fd); + evt_core_rm_fd(ctx, fdinfo->fd); + return 1; + } + return 0; } @@ -216,10 +226,10 @@ int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo_n.cat->name = "tcp-read"; sprintf(fdinfo_n.url, "tcp:read:%s:%d", s5ctx->addr, s5ctx->port); - evt_core_add_fd (ctx, &fdinfo_n); + struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (ctx, &fdinfo_n); printf("--- Tor socket registered\n"); - register_timer(ctx, NULL); + register_timer(ctx, reg_fdinfo, NULL); return 1; } @@ -307,10 +317,10 @@ void spawn_udp_client(struct evt_core_ctx* evts) { fdinfo.other = &mctx->mc; fdinfo.free_other = NULL; sprintf(fdinfo.url, "udp:read:%s:%s", mctx->host, mctx->port); - evt_core_add_fd (evts, &fdinfo); + struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo); printf("--- UDP client registered\n"); - register_timer(evts, NULL); + register_timer(evts, reg_fdinfo, NULL); } void spawn_udp_server(struct evt_core_ctx* evts) { @@ -355,10 +365,10 @@ void spawn_tcp_client(struct evt_core_ctx* evts) { fdinfo.other = &mctx->mc; fdinfo.free_other = NULL; sprintf(fdinfo.url, "tcp:read:%s:%s", mctx->host, mctx->port); - evt_core_add_fd (evts, &fdinfo); + struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo); printf("--- TCP client registered\n"); - register_timer(evts, NULL); + register_timer(evts, reg_fdinfo, NULL); } void spawn_tcp_server(struct evt_core_ctx* evts, uint16_t *ports) { @@ -396,6 +406,7 @@ void measlat_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl, fprintf(stderr, "Unable to create Onion Services (error: %d)\n", err); exit(EXIT_FAILURE); } + printf("--- Onion services created\n"); } int main(int argc, char** argv) { @@ -407,7 +418,7 @@ int main(int argc, char** argv) { struct evt_core_ctx evts = {0}; // 1. Parse parameters - while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:lrn")) != -1) { + while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:ln")) != -1) { switch(opt) { case 'v': mctx.verbose++; @@ -430,9 +441,6 @@ int main(int argc, char** argv) { case 's': // size - payload in bytes mctx.mc.payload_size = atoi(optarg); break; - case 'r': - mctx.mc.is_rtt = 1; - break; case 'n': mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS; break; @@ -450,6 +458,7 @@ int main(int argc, char** argv) { if (mctx.transport == NULL) mctx.transport = "udp"; if (mctx.host == NULL) mctx.host = "127.0.0.1"; if (mctx.port == NULL) mctx.port = strcmp(mctx.transport, "udp") == 0 ? "9000" : "7500"; + if (strcmp(mctx.transport, "udp") == 0) mctx.connectionless = 1; printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld, is_rtt=%d\n", mctx.host, mctx.port, mctx.mc.is_server, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval, mctx.mc.is_rtt); @@ -461,14 +470,10 @@ int main(int argc, char** argv) { uint16_t ports[] = {7500}; int ports_count = sizeof(ports) / sizeof(ports[0]); - if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) { - spawn_udp_server (&evts); - mctx.is_from_needed = 1; - } + if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) spawn_udp_server (&evts); else if (mctx.mc.is_server && strcmp(mctx.transport, "tor") == 0) { spawn_tcp_server(&evts, ports); measlat_create_onion_services (&tos, &tctl, ports, ports_count, mctx.tor_flags); - printf("--- Onion services created\n"); } else if (mctx.mc.is_server && strcmp(mctx.transport, "tcp") == 0) spawn_tcp_server(&evts, ports); else if (strcmp(mctx.transport, "udp") == 0) spawn_udp_client(&evts); diff --git a/src/measure.c b/src/measure.c index b91cfa3..a24b429 100644 --- a/src/measure.c +++ b/src/measure.c @@ -23,7 +23,6 @@ void measure_parse(int size, struct measure_conf* mc) { perror("read error, payload has wrong size"); - //exit(EXIT_FAILURE); } struct measure_packet* head = (struct measure_packet*) mc->payload_rcv; if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ @@ -36,8 +35,9 @@ void measure_parse(int size, struct measure_conf* mc) { uint8_t is_vanilla = (head->flag & 0x40) >> 6; uint8_t link_id = head->flag & 0x3f; printf( - "[%s] Packet %llu latency %luµs with flag %d sent on link %d with vanilla %d\n", + "[%s] src=%s, id=%llu, owd=%luµs, flag=%d, link=%d, vanilla=%d\n", current_human_datetime(), + "test", (unsigned long long)head->counter, micro_sec, is_slow, @@ -47,7 +47,7 @@ void measure_parse(int size, struct measure_conf* mc) { void measure_prepare(struct measure_conf* mc) { if (mc->interval <= 0) mc->interval = 1000; - if (mc->max_measure <= 0) mc->max_measure = 1; + if (mc->max_measure <= 0) mc->max_measure = 3; if (mc->payload_size < sizeof(struct measure_packet)) mc->payload_size = sizeof(struct measure_packet); if ((mc->payload = malloc(sizeof(char) * mc->payload_size)) == NULL) { @@ -75,7 +75,6 @@ struct measure_packet* measure_generate(struct measure_conf* mc) { mc->counter++; head->counter = mc->counter; - head->is_echo = mc->is_rtt && !mc->is_server; head->flag = 0; if (clock_gettime(CLOCK_MONOTONIC, &head->emit_time) == -1) { perror("clock_gettime error"); @@ -109,7 +108,3 @@ void measure_next_tick(struct measure_conf *mc, struct timespec *next) { printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec); printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec); } - -uint8_t measure_need_reply(struct measure_conf* mc) { - return mc->is_server && ((struct measure_packet*)mc->payload)->is_echo; -}; diff --git a/src/measure.h b/src/measure.h index bfface4..d0d1cf6 100644 --- a/src/measure.h +++ b/src/measure.h @@ -14,12 +14,11 @@ struct measure_conf { char* payload; char* payload_rcv; uint64_t counter; - uint8_t is_server, is_rtt; + uint8_t is_server; }; struct measure_packet { uint64_t counter; - uint8_t is_echo; uint8_t flag; struct timespec emit_time; }; @@ -27,5 +26,4 @@ struct measure_packet { void measure_parse(int size, struct measure_conf* mc); void measure_prepare(struct measure_conf* mc); struct measure_packet* measure_generate(struct measure_conf* mc); -uint8_t measure_need_reply(struct measure_conf* mc); void measure_next_tick(struct measure_conf *mc, struct timespec *next);