diff --git a/src/meas_lat.c b/src/meas_lat.c index 158ed30..2824b4e 100644 --- a/src/meas_lat.c +++ b/src/meas_lat.c @@ -19,43 +19,41 @@ struct measlat_ctx { socklen_t addrlen; int verbose, connectionless, tor_flags; char *host, *port, *transport; + enum { MEASLAT_CLIENT = 0, MEASLAT_SERVER = 1 } role; + struct measure_params mp; }; -struct timer_measure { - int fd, cntr; -}; - -void free_timer_measure(void* obj) { +void free_ms(void* obj) { + struct measure_state* ms = obj; + free(ms->mp_in); + free(ms->mp_out); free(obj); } -struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct evt_core_fdinfo *tgt, struct timespec* next_tick) { +struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct measlat_ctx* mctx, struct measure_state* ms, struct timespec* next_tick) { struct timespec now; struct itimerspec timer_config; char url[1024]; struct evt_core_cat cat = {0}; struct evt_core_fdinfo fdinfo = {0}; - struct timer_measure* tmeas = malloc(sizeof(struct timer_measure)); - if (tmeas == NULL) { + struct measure_state* msheap = malloc(sizeof(struct measure_state)); + if (ms == NULL) { perror("unable to malloc struct timer_measure"); exit(EXIT_FAILURE); } - tmeas->fd = tgt->fd; - tmeas->cntr = 0; + memcpy(msheap, ms, sizeof(struct measure_state)); fdinfo.cat = &cat; fdinfo.url = url; - fdinfo.other = tmeas; - fdinfo.free_other = free_timer_measure; - - struct measlat_ctx* mctx = tgt->cat->app_ctx; + fdinfo.other = msheap; + fdinfo.free_other = free_ms; if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) { perror("clock_gettime"); exit(EXIT_FAILURE); } - uint64_t micro_sec = mctx->mc.interval; + uint64_t micro_sec = mctx->mp.interval; timer_config.it_value.tv_sec = next_tick == NULL ? now.tv_sec + 1 : next_tick->tv_sec; timer_config.it_value.tv_nsec = next_tick == NULL ? now.tv_nsec : next_tick->tv_nsec; timer_config.it_interval.tv_sec = micro_sec / 1000; @@ -73,7 +71,7 @@ struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct evt_cor exit(EXIT_FAILURE); } fdinfo.cat->name = "timer"; - sprintf(fdinfo.url, "timer:%d", tgt->fd); + sprintf(fdinfo.url, "timer:%d", msheap->fd); struct evt_core_fdinfo *new_fdinfo = evt_core_add_fd (evts, &fdinfo); printf("--- Timer registered\n"); return new_fdinfo; @@ -89,44 +87,75 @@ int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdin return 1; } +void measlat_stop( + struct evt_core_ctx* ctx, + struct measlat_ctx* mctx, + struct measure_state* ms, + struct evt_core_fdinfo* net, + struct evt_core_fdinfo* timer) { + if (ms->mp_in->counter < mctx->mp.max_measure) return; + if (ms->mp_out->counter < mctx->mp.max_measure) return; + + printf("All measurements done\n"); + evt_core_rm_fd(ctx, timer->fd); + if (mctx->role == MEASLAT_CLIENT) { + evt_core_rm_fd(ctx, net->fd); + exit(EXIT_SUCCESS); + } +} + int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { - struct timer_measure* tmeas = fdinfo->other; struct measlat_ctx* mctx = fdinfo->cat->app_ctx; + struct measure_state msbuf = {0}; + struct measure_state* ms; ssize_t nread; char url[255]; + // 1. Get our measurement state sprintf(url, "timer:%d", fdinfo->fd); struct evt_core_fdinfo* assoc_timer = evt_core_get_from_url (ctx, url); - if (assoc_timer == NULL) { - // Start sending if timer does not exist yet - struct timespec next_tick = {0}; - measure_next_tick(&mctx->mc, &next_tick); - assoc_timer = register_timer (ctx, fdinfo, &next_tick); + if (assoc_timer) { + // Measurement state exists + ms = assoc_timer->other; + } else { + // Does not exist yet, we create it. + ms = &msbuf; + ms->fd = fdinfo->fd; + measure_state_init (&mctx->mp, ms); } - if (mctx->mc.read_size >= mctx->mc.payload_size || strcmp(mctx->transport, "udp") == 0) mctx->mc.read_size = 0; + // 2. Read data in our measurement object + if (ms->mp_nin >= mctx->mp.payload_size || strcmp(mctx->transport, "udp") == 0) ms->mp_nin = 0; nread = mctx->connectionless ? - recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen) : - recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0); + recvfrom(fdinfo->fd, ms->mp_in, mctx->mp.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen) : + recv(fdinfo->fd, ms->mp_in + ms->mp_nin, mctx->mp.payload_size - ms->mp_nin, 0); - if (nread > 0) mctx->mc.read_size += nread; + if (nread > 0) ms->mp_nin += nread; if (nread == -1 && errno == EAGAIN) { return 1; } - if (strcmp("udp", mctx->transport) != 0 && mctx->mc.read_size < mctx->mc.payload_size) { + if (strcmp("udp", mctx->transport) != 0 && ms->mp_nin < mctx->mp.payload_size) { + printf("Packet has been fragmented\n"); return 0; } - // First we parse the packet - measure_parse (mctx->mc.read_size, &mctx->mc); + // 3. Process data in our measurement object + measure_parse (&mctx->mp, ms); - struct measure_packet* head = (struct measure_packet*) mctx->mc.payload_rcv; - if (head->counter >= mctx->mc.max_measure) { - printf("All measurements received\n"); + // 4. Persist our measurement object if needed + // It includes starting a timer. + if (ms == &msbuf) { + struct timespec next_tick = {0}; + measure_next_tick(&mctx->mp, ms, &next_tick); + assoc_timer = register_timer (ctx, mctx, ms, &next_tick); } + // 5. Check if our measurements are done + if (ms->mp_in->counter >= mctx->mp.max_measure && ms->mp_out->counter >= mctx->mp.max_measure) + measlat_stop(ctx, mctx, ms, fdinfo, assoc_timer); + return 0; } @@ -165,6 +194,7 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { ssize_t s; uint64_t ticks = 0; struct measlat_ctx* mctx = fdinfo->cat->app_ctx; + struct measure_state* ms = fdinfo->other; s = read(fdinfo->fd, &ticks, sizeof(uint64_t)); if (s == -1 && errno == EAGAIN) return 1; @@ -177,7 +207,7 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { fprintf(stderr, "Has ticked %lu times, expected 1 time. This is a bug\n", ticks); } - struct measure_packet* head = measure_generate(&mctx->mc); + struct measure_packet* head = measure_generate(&mctx->mp, ms); //printf("send(id=%ld,is_echo=%d)\n", head->counter, head->is_echo); int target_fd; @@ -190,15 +220,15 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { } s = mctx->connectionless ? - sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) : - send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0); + sendto(tgtinfo->fd, head, mctx->mp.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) : + send(tgtinfo->fd, head, mctx->mp.payload_size, 0); - if (s < 0 || s != mctx->mc.payload_size) { + if (s < 0 || s != mctx->mp.payload_size) { perror("Send error"); exit(EXIT_FAILURE); } - if (mctx->mc.counter >= mctx->mc.max_measure) { + if (head->counter >= mctx->mp.max_measure) { printf("All measurements sent for %d\n", tgtinfo->fd); evt_core_rm_fd(ctx, fdinfo->fd); return 1; @@ -229,7 +259,9 @@ int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (ctx, &fdinfo_n); printf("--- Tor socket registered\n"); - register_timer(ctx, reg_fdinfo, NULL); + struct measure_state ms = {0}; + measure_state_init (&mctx->mp, &ms); + register_timer (ctx, mctx, &ms, NULL); return 1; } @@ -314,13 +346,13 @@ void spawn_udp_client(struct evt_core_ctx* evts) { fdinfo.fd = udp_sock; fdinfo.cat->name = "udp-read"; - fdinfo.other = &mctx->mc; - fdinfo.free_other = NULL; sprintf(fdinfo.url, "udp:read:%s:%s", mctx->host, mctx->port); struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo); printf("--- UDP client registered\n"); - register_timer(evts, reg_fdinfo, NULL); + struct measure_state ms = {0}; + measure_state_init (&mctx->mp, &ms); + register_timer (evts, mctx, &ms, NULL); } void spawn_udp_server(struct evt_core_ctx* evts) { @@ -362,13 +394,13 @@ void spawn_tcp_client(struct evt_core_ctx* evts) { fdinfo.fd = tcp_sock; fdinfo.cat->name = "tcp-read"; - fdinfo.other = &mctx->mc; - fdinfo.free_other = NULL; sprintf(fdinfo.url, "tcp:read:%s:%s", mctx->host, mctx->port); struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo); printf("--- TCP client registered\n"); - register_timer(evts, reg_fdinfo, NULL); + struct measure_state ms = {0}; + measure_state_init (&mctx->mp, &ms); + register_timer (evts, mctx, &ms, NULL); } void spawn_tcp_server(struct evt_core_ctx* evts, uint16_t *ports) { @@ -409,6 +441,15 @@ void measlat_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl, printf("--- Onion services created\n"); } +void spawn_onion_server(struct evt_core_ctx* evts, uint16_t *ports, int ports_count, struct tor_os_str* tos, struct tor_ctl* tctl, struct measlat_ctx* mctx) { + spawn_tcp_server(evts, ports); + measlat_create_onion_services (tos, tctl, ports, ports_count, mctx->tor_flags); +} + +int streq(char* s1, char* s2) { + return strcmp(s1, s2) == 0; +} + int main(int argc, char** argv) { setvbuf(stdout, NULL, _IONBF, 0); printf("~ measlat ~\n"); @@ -416,6 +457,10 @@ int main(int argc, char** argv) { int opt; struct measlat_ctx mctx = {0}; struct evt_core_ctx evts = {0}; + struct tor_os_str tos = {0}; + struct tor_ctl tctl = {0}; + uint16_t ports[] = {7500}; + int ports_count = sizeof(ports) / sizeof(ports[0]); // 1. Parse parameters while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:ln")) != -1) { @@ -430,22 +475,22 @@ int main(int argc, char** argv) { mctx.port = optarg; break; case 'l': - mctx.mc.is_server = 1; + mctx.role = MEASLAT_SERVER; break; case 't': // transport mctx.transport = optarg; break; case 'c': // count - mctx.mc.max_measure = atoi(optarg); + mctx.mp.max_measure = atoi(optarg); break; case 's': // size - payload in bytes - mctx.mc.payload_size = atoi(optarg); + mctx.mp.payload_size = atoi(optarg); break; case 'n': mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS; break; case 'i': // interval - every ms - mctx.mc.interval = atoi(optarg); + mctx.mp.interval = atoi(optarg); break; default: goto usage; @@ -453,35 +498,33 @@ int main(int argc, char** argv) { } // 2. Check and fix parameters - measure_prepare (&mctx.mc); + measure_params_init (&mctx.mp); mctx.addrlen = sizeof(mctx.addr); if (mctx.transport == NULL) mctx.transport = "udp"; - if (mctx.host == NULL) mctx.host = "127.0.0.1"; - if (mctx.port == NULL) mctx.port = strcmp(mctx.transport, "udp") == 0 ? "9000" : "7500"; if (strcmp(mctx.transport, "udp") == 0) mctx.connectionless = 1; - printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld, is_rtt=%d\n", - mctx.host, mctx.port, mctx.mc.is_server, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval, mctx.mc.is_rtt); + if (mctx.host == NULL) mctx.host = "127.0.0.1"; + if (mctx.port == NULL) mctx.port = mctx.connectionless ? "9000" : "7500"; - // 3. Bind events + printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld\n", + mctx.host, mctx.port, mctx.role, mctx.transport, mctx.mp.max_measure, mctx.mp.payload_size, mctx.mp.interval); + + // 3. Create event structure register_categories(&evts, &mctx); - struct tor_os_str tos = {0}; - struct tor_ctl tctl = {0}; - uint16_t ports[] = {7500}; - int ports_count = sizeof(ports) / sizeof(ports[0]); - - if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) spawn_udp_server (&evts); - else if (mctx.mc.is_server && strcmp(mctx.transport, "tor") == 0) { - spawn_tcp_server(&evts, ports); - measlat_create_onion_services (&tos, &tctl, ports, ports_count, mctx.tor_flags); + // 4. Register services + if (mctx.role == MEASLAT_SERVER) { + if (streq(mctx.transport, "udp")) spawn_udp_server (&evts); + else if (streq(mctx.transport, "tcp")) spawn_tcp_server(&evts, ports); + else if (streq(mctx.transport, "tor")) spawn_onion_server (&evts, ports, ports_count, &tos, &tctl, &mctx); + } + else if (mctx.role == MEASLAT_CLIENT) { + if (streq(mctx.transport, "udp")) spawn_udp_client(&evts); + else if (streq(mctx.transport, "tor")) spawn_tor_client(&evts); + else if (streq(mctx.transport, "tcp")) spawn_tcp_client(&evts); } - else if (mctx.mc.is_server && strcmp(mctx.transport, "tcp") == 0) spawn_tcp_server(&evts, ports); - else if (strcmp(mctx.transport, "udp") == 0) spawn_udp_client(&evts); - else if (strcmp(mctx.transport, "tor") == 0) spawn_tor_client(&evts); - else if (strcmp(mctx.transport, "tcp") == 0) spawn_tcp_client(&evts); else exit(EXIT_FAILURE); - // 4. Run main loop + // 5. Run main loop evt_core_loop(&evts); return 0; diff --git a/src/measure.c b/src/measure.c index a24b429..63d7d80 100644 --- a/src/measure.c +++ b/src/measure.c @@ -1,92 +1,92 @@ #include "measure.h" -void measure_parse(int size, struct measure_conf* mc) { + +void measure_params_init(struct measure_params* mp) { + if (mp->interval <= 0) mp->interval = 1000; + if (mp->max_measure <= 0) mp->max_measure = 3; + if (mp->payload_size < sizeof(struct measure_packet)) mp->payload_size = sizeof(struct measure_packet); +} + +void measure_state_init(struct measure_params* mp, struct measure_state* ms) { + if ((ms->mp_out = malloc(sizeof(char) * mp->payload_size)) == NULL) { + perror("payload malloc failed"); + exit(EXIT_FAILURE); + } + memset(ms->mp_out, 0, mp->payload_size); + + if ((ms->mp_in = malloc(sizeof(char) * mp->payload_size)) == NULL) { + perror("payload malloc failed"); + exit(EXIT_FAILURE); + } + memset(ms->mp_in, 0, mp->payload_size); + + char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire."; + size_t msg_len = strlen(my_msg); + size_t cursor_msg = 0; + char* pl = (char*) ms->mp_out; + for (size_t i = sizeof(struct measure_packet); i < mp->payload_size; i++) { + pl[i] = my_msg[cursor_msg]; + cursor_msg = (cursor_msg + 1) % msg_len; + } +} + +void measure_parse(struct measure_params* mp, struct measure_state* ms) { struct timespec curr; uint64_t micro_sec; - if (size != mc->payload_size) { - fprintf(stderr, "read size: %d, expected: %ld\n", size, mc->payload_size); + if (ms->mp_nin != mp->payload_size) { + fprintf(stderr, "read size: %ld, expected: %ld\n", ms->mp_nin, mp->payload_size); int i; fprintf(stderr, "received buffer:\n"); - for (i = 0; i < mc->payload_size; i++) { + for (i = 0; i < mp->payload_size; i++) { if (i > 0) fprintf(stderr, ":"); - fprintf(stderr, "%02x", (unsigned char) mc->payload_rcv[i]); + fprintf(stderr, "%02x", (unsigned char) ((char*)ms->mp_in)[i]); } fprintf(stderr, "\n"); fprintf(stderr, "local buffer (reference):\n"); - for (i = 0; i < mc->payload_size; i++) { + for (i = 0; i < mp->payload_size; i++) { if (i > 0) fprintf(stderr, ":"); - fprintf(stderr, "%02X", (unsigned char) mc->payload[i]); + fprintf(stderr, "%02X", (unsigned char) ((char*)ms->mp_out)[i]); } fprintf(stderr, "\n"); perror("read error, payload has wrong size"); } - struct measure_packet* head = (struct measure_packet*) mc->payload_rcv; if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ perror("clock_gettime error"); exit(EXIT_FAILURE); } - micro_sec = elapsed_micros (&head->emit_time, &curr); - uint8_t is_slow = head->flag >> 7; - uint8_t is_vanilla = (head->flag & 0x40) >> 6; - uint8_t link_id = head->flag & 0x3f; + micro_sec = elapsed_micros (&ms->mp_in->emit_time, &curr); + uint8_t is_slow = ms->mp_in->flag >> 7; + uint8_t is_vanilla = (ms->mp_in->flag & 0x40) >> 6; + uint8_t link_id = ms->mp_in->flag & 0x3f; printf( - "[%s] src=%s, id=%llu, owd=%luµs, flag=%d, link=%d, vanilla=%d\n", + "[%s] src=%d, id=%llu, owd=%luµs, flag=%d, link=%d, vanilla=%d\n", current_human_datetime(), - "test", - (unsigned long long)head->counter, + ms->fd, + (unsigned long long)ms->mp_in->counter, micro_sec, is_slow, link_id, is_vanilla); } -void measure_prepare(struct measure_conf* mc) { - if (mc->interval <= 0) mc->interval = 1000; - if (mc->max_measure <= 0) mc->max_measure = 3; - if (mc->payload_size < sizeof(struct measure_packet)) mc->payload_size = sizeof(struct measure_packet); - - if ((mc->payload = malloc(sizeof(char) * mc->payload_size)) == NULL) { - perror("payload malloc failed"); - exit(EXIT_FAILURE); - } - memset(mc->payload, 0, mc->payload_size); - - if ((mc->payload_rcv = malloc(sizeof(char) * mc->payload_size)) == NULL) { - perror("payload malloc failed"); - exit(EXIT_FAILURE); - } - memset(mc->payload_rcv, 0, mc->payload_size); - - char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire."; - size_t msg_len = strlen(my_msg); - size_t cursor_msg = 0; - for (size_t i = sizeof(struct measure_packet); i < mc->payload_size; i++) { - mc->payload[i] = my_msg[cursor_msg]; - cursor_msg = (cursor_msg + 1) % msg_len; - } -} -struct measure_packet* measure_generate(struct measure_conf* mc) { - struct measure_packet* head = (struct measure_packet*)mc->payload; - mc->counter++; - - head->counter = mc->counter; - head->flag = 0; - if (clock_gettime(CLOCK_MONOTONIC, &head->emit_time) == -1) { +struct measure_packet* measure_generate(struct measure_params* mp, struct measure_state* ms) { + ms->mp_out->counter++; + ms->mp_out->flag = 0; + if (clock_gettime(CLOCK_MONOTONIC, &ms->mp_out->emit_time) == -1) { perror("clock_gettime error"); exit(EXIT_FAILURE); } - return head; + return ms->mp_out; } -void measure_next_tick(struct measure_conf *mc, struct timespec *next) { - struct measure_packet *head = (struct measure_packet*) mc->payload_rcv; - struct timespec now, *sent_at = &head->emit_time; - mc->counter = head->counter; +void measure_next_tick(struct measure_params *mp, struct measure_state* ms, struct timespec *next) { + //struct measure_packet *head = (struct measure_packet*) ms->payload_rcv; + struct timespec now = {0}, *sent_at = &ms->mp_in->emit_time; if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) { perror("clock_gettime error"); @@ -95,15 +95,15 @@ void measure_next_tick(struct measure_conf *mc, struct timespec *next) { memcpy(next, sent_at, sizeof(struct timespec)); while(!timespec_gt (next, &now)) { - next->tv_nsec += mc->interval * 1000000L; + next->tv_nsec += mp->interval * 1000000L; if (next->tv_nsec > ONE_SEC) { next->tv_sec += next->tv_nsec / ONE_SEC; next->tv_nsec = next->tv_nsec % ONE_SEC; } - mc->counter++; + ms->mp_out->counter++; } - mc->counter--; - printf("interval: %ld\n", mc->interval); + ms->mp_out->counter--; + printf("interval: %ld\n", mp->interval); printf("sent_at: sec=%ld nsec=%ld \n", (uint64_t) sent_at->tv_sec, (uint64_t) sent_at->tv_nsec); printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec); printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec); diff --git a/src/measure.h b/src/measure.h index d0d1cf6..c56cfec 100644 --- a/src/measure.h +++ b/src/measure.h @@ -6,24 +6,30 @@ #include #include "utils.h" -struct measure_conf { +struct measure_params { uint64_t max_measure; uint64_t payload_size; - uint64_t read_size; uint64_t interval; - char* payload; - char* payload_rcv; - uint64_t counter; uint8_t is_server; }; +struct measure_state { + struct measure_packet* mp_out; + struct measure_packet* mp_in; + ssize_t mp_nin; + int fd; +}; + struct measure_packet { uint64_t counter; uint8_t flag; struct timespec emit_time; }; -void measure_parse(int size, struct measure_conf* mc); -void measure_prepare(struct measure_conf* mc); -struct measure_packet* measure_generate(struct measure_conf* mc); -void measure_next_tick(struct measure_conf *mc, struct timespec *next); +void measure_params_init(struct measure_params* mp); +void measure_state_init(struct measure_params* mp, struct measure_state* ms); + +void measure_parse(struct measure_params* mp, struct measure_state* ms); +struct measure_packet* measure_generate(struct measure_params* mp, struct measure_state* ms); + +void measure_next_tick(struct measure_params *mp, struct measure_state* ms, struct timespec *next); diff --git a/src/tor_echo.c b/src/tor_echo.c index 98dd874..dc59c30 100644 --- a/src/tor_echo.c +++ b/src/tor_echo.c @@ -10,7 +10,8 @@ struct torecho_ctx { uint8_t is_measlat; - struct measure_conf mc; + struct measure_params mp; + struct measure_state ms; }; void te_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl, uint16_t* ports, int ports_count, enum TOR_ONION_FLAGS tof) { @@ -66,18 +67,18 @@ int te_on_tcp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { socklen_t addrlen = sizeof(addr); struct torecho_ctx *tctx = fdinfo->cat->app_ctx; - nread = recv(fdinfo->fd, tctx->mc.payload, tctx->mc.payload_size, 0); + nread = recv(fdinfo->fd, tctx->ms.mp_in, tctx->mp.payload_size, 0); if (nread == -1 && errno == EAGAIN) return 1; // Read done if (nread == 0) { fprintf(stderr, "WARN! Read 0 bytes.\n"); return 1; } - if (nread < 0 || nread > tctx->mc.payload_size) { - fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, tctx->mc.payload_size); + if (nread < 0 || nread > tctx->mp.payload_size) { + fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, tctx->mp.payload_size); perror("read errno"); exit(EXIT_FAILURE); } - if (tctx->is_measlat) measure_parse (nread, &tctx->mc); + if (tctx->is_measlat) measure_parse (&tctx->mp, &tctx->ms); - nwritten = send(fdinfo->fd, tctx->mc.payload, nread, 0); + nwritten = send(fdinfo->fd, tctx->ms.mp_in, nread, 0); // @FIXME don't support EAGAIN on write. Could be intended, you don't think so? if (nwritten != nread) { fprintf(stderr, "Didn't write the same number of bytes as read - not supported. nread=%ld, nwritten=%ld\n", nread, nwritten); @@ -100,8 +101,7 @@ int main(int argc, char** argv) { enum TOR_ONION_FLAGS tof = TOR_ONION_FLAG_NONE; char url[1024]; struct torecho_ctx tctx = {0}; - tctx.mc.is_server = 1; - tctx.mc.payload_size = 1500; + tctx.mp.payload_size = 1500; while ((opt = getopt(argc, argv, "ns:m")) != -1) { switch(opt) { @@ -112,7 +112,7 @@ int main(int argc, char** argv) { tctx.is_measlat = 1; break; case 's': - tctx.mc.payload_size = atoi(optarg); + tctx.mp.payload_size = atoi(optarg); break; default: break; @@ -138,7 +138,8 @@ int main(int argc, char** argv) { .flags = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP, .socklist = NULL }; - measure_prepare (&tctx.mc); + measure_params_init (&tctx.mp); + measure_state_init (&tctx.mp, &tctx.ms); evt_core_init(&evts, 0); evt_core_add_cat(&evts, &tcp_co); diff --git a/src/udp_echo.c b/src/udp_echo.c index 92f3e34..3bcee11 100644 --- a/src/udp_echo.c +++ b/src/udp_echo.c @@ -7,7 +7,8 @@ #include "measure.h" struct udpecho_ctx { - struct measure_conf mc; + struct measure_params mp; + struct measure_state ms; uint8_t is_measlat; }; @@ -17,17 +18,17 @@ int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { socklen_t addrlen = sizeof(addr); struct udpecho_ctx *uctx = fdinfo->cat->app_ctx; - nread = recvfrom(fdinfo->fd, uctx->mc.payload, uctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&addr, &addrlen); + nread = recvfrom(fdinfo->fd, uctx->ms.mp_in, uctx->mp.payload_size, MSG_TRUNC, (struct sockaddr*)&addr, &addrlen); if (nread == -1 && errno == EAGAIN) return 1; // Read done - if (nread <= 0 || nread > uctx->mc.payload_size) { - fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, uctx->mc.payload_size); + if (nread <= 0 || nread > uctx->mp.payload_size) { + fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, uctx->mp.payload_size); perror("read errno"); exit(EXIT_FAILURE); } - if (uctx->is_measlat) measure_parse (nread, &uctx->mc); + if (uctx->is_measlat) measure_parse (&uctx->mp, &uctx->ms); - nwritten = sendto(fdinfo->fd, uctx->mc.payload, nread, 0, (struct sockaddr*)&addr, addrlen); + nwritten = sendto(fdinfo->fd, uctx->ms.mp_in, nread, 0, (struct sockaddr*)&addr, addrlen); // @FIXME don't support EAGAIN on write. Could be intended, you don't think so? if (nwritten != nread) { fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", nread, nwritten); @@ -45,8 +46,7 @@ int main(int argc, char** argv) { int opt, udp_sock, verbose = 0; char *port = NULL, *bindhost = NULL; struct evt_core_ctx evts = {0}; - uctx.mc.is_server = 1; - uctx.mc.payload_size = 1500; + uctx.mp.payload_size = 1500; // 1. Parse parameters while ((opt = getopt(argc, argv, "b:p:vms:")) != -1) { @@ -64,14 +64,15 @@ int main(int argc, char** argv) { uctx.is_measlat = 1; break; case 's': - uctx.mc.payload_size = atoi(optarg); + uctx.mp.payload_size = atoi(optarg); break; default: goto usage; } } if (bindhost == NULL) bindhost = "127.0.0.1"; - measure_prepare (&uctx.mc); + measure_params_init (&uctx.mp); + measure_state_init (&uctx.mp, &uctx.ms); // 2. Register category struct evt_core_cat udp_read = {