WIP measlat impr

This commit is contained in:
Quentin 2021-01-08 18:30:22 +01:00
parent ec40ad0a99
commit 7769acb8b9
3 changed files with 80 additions and 82 deletions

View file

@ -4,6 +4,7 @@
#include <time.h>
#include <unistd.h>
#include <errno.h>
#include <arpa/inet.h>
#include "evt_core.h"
#include "net_tools.h"
#include "socks5.h"
@ -14,30 +15,41 @@
#include "tor_ctl.h"
struct measlat_ctx {
struct measure_conf mc;
struct sockaddr_in addr;
socklen_t addrlen;
int verbose, is_timer_started, is_from_needed, tor_flags;
int verbose, connectionless, tor_flags;
char *host, *port, *transport;
};
void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) {
struct timer_measure {
int fd, cntr;
};
void free_timer_measure(void* obj) {
free(obj);
}
struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct evt_core_fdinfo *tgt, struct timespec* next_tick) {
struct timespec now;
struct itimerspec timer_config;
char url[1024];
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo = {0};
fdinfo.cat = &cat;
fdinfo.url = url;
struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read");
if (ucat == NULL) {
fprintf(stderr, "Category udp-read not found\n");
struct timer_measure* tmeas = malloc(sizeof(struct timer_measure));
if (tmeas == NULL) {
perror("unable to malloc struct timer_measure");
exit(EXIT_FAILURE);
}
struct measlat_ctx* mctx = ucat->app_ctx;
tmeas->fd = tgt->fd;
tmeas->cntr = 0;
mctx->is_timer_started = 1;
fdinfo.cat = &cat;
fdinfo.url = url;
fdinfo.other = tmeas;
fdinfo.free_other = free_timer_measure;
struct measlat_ctx* mctx = tgt->cat->app_ctx;
if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
perror("clock_gettime");
@ -61,64 +73,58 @@ void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) {
exit(EXIT_FAILURE);
}
fdinfo.cat->name = "timer";
sprintf(fdinfo.url, "timer:%ld:%ld", mctx->mc.interval, mctx->mc.max_measure);
evt_core_add_fd (evts, &fdinfo);
sprintf(fdinfo.url, "timer:%d", tgt->fd);
struct evt_core_fdinfo *new_fdinfo = evt_core_add_fd (evts, &fdinfo);
printf("--- Timer registered\n");
return new_fdinfo;
}
int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
char url[255];
sprintf(url, "timer:%d", fdinfo->fd);
struct evt_core_fdinfo* assoc_timer = evt_core_get_from_url (ctx, url);
if (assoc_timer != NULL) evt_core_rm_fd (ctx, assoc_timer->fd);
return 1;
}
int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct timer_measure* tmeas = fdinfo->other;
struct measlat_ctx* mctx = fdinfo->cat->app_ctx;
ssize_t nread;
char url[255];
sprintf(url, "timer:%d", fdinfo->fd);
struct evt_core_fdinfo* assoc_timer = evt_core_get_from_url (ctx, url);
if (assoc_timer == NULL) {
// Start sending if timer does not exist yet
struct timespec next_tick = {0};
measure_next_tick(&mctx->mc, &next_tick);
assoc_timer = register_timer (ctx, fdinfo, &next_tick);
}
if (mctx->mc.read_size >= mctx->mc.payload_size || strcmp(mctx->transport, "udp") == 0) mctx->mc.read_size = 0;
if (mctx->is_from_needed) {
nread = recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen);
} else {
nread = recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0);
}
nread = mctx->connectionless ?
recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen) :
recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0);
if (nread > 0) mctx->mc.read_size += nread;
if ((nread == -1 && errno == EAGAIN) || nread == 0) {
if (nread == -1 && errno == EAGAIN) {
return 1;
}
if (strcmp("udp", mctx->transport) != 0 && mctx->mc.read_size < mctx->mc.payload_size) {
return 0;
}
// First we parse the packet and exit if needed
// First we parse the packet
measure_parse (mctx->mc.read_size, &mctx->mc);
// Old behaviour where we work in a RTT way and send back the packet
if (measure_need_reply (&mctx->mc)) {
ssize_t nwritten;
if (mctx->is_from_needed) nwritten = sendto(fdinfo->fd, mctx->mc.payload, mctx->mc.read_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen);
else nwritten = send(fdinfo->fd, mctx->mc.payload, mctx->mc.read_size, 0);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
if (nwritten != mctx->mc.read_size) {
fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", mctx->mc.read_size, nwritten);
perror("write errno");
exit(EXIT_FAILURE);
}
return 0;
}
// Used to start sending from the server in sync with client
if (!mctx->is_timer_started && mctx->mc.counter <= mctx->mc.max_measure) {
struct timespec next_tick = {0};
measure_next_tick(&mctx->mc, &next_tick);
register_timer (ctx, &next_tick);
}
struct measure_packet* head = (struct measure_packet*) mctx->mc.payload_rcv;
if ((!mctx->mc.is_server || mctx->mc.is_rtt) && head->counter >= mctx->mc.max_measure) {
printf("Measurement done\n");
exit(EXIT_SUCCESS);
if (head->counter >= mctx->mc.max_measure) {
printf("All measurements received\n");
}
return 0;
@ -174,26 +180,30 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct measure_packet* head = measure_generate(&mctx->mc);
//printf("send(id=%ld,is_echo=%d)\n", head->counter, head->is_echo);
struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read");
if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read");
if (!mctx->mc.is_server && tgtinfo == NULL) {
fprintf(stderr, "Unable to find a fdinfo in udp-read nor in tcp-read. Quitting...\n");
exit(EXIT_FAILURE);
} else if (mctx->mc.is_server && (tgtinfo == NULL || mctx->mc.counter > mctx->mc.max_measure)) {
printf("No connection yet\n");
struct evt_core_fdinfo* timer_fd = evt_core_get_first_from_cat (ctx, "timer");
evt_core_rm_fd(ctx, timer_fd->fd);
mctx->is_timer_started = 0;
int target_fd;
sscanf(fdinfo->url, "timer:%d", &target_fd);
struct evt_core_fdinfo* tgtinfo = evt_core_get_from_fd(ctx, target_fd);
if (tgtinfo == NULL) {
fprintf(stderr, "No fd found to send, continue\n");
return 1;
}
if (mctx->is_from_needed) s = sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen);
else s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
s = mctx->connectionless ?
sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) :
send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
if (s < 0 || s != mctx->mc.payload_size) {
perror("Send error");
exit(EXIT_FAILURE);
}
if (mctx->mc.counter >= mctx->mc.max_measure) {
printf("All measurements sent for %d\n", tgtinfo->fd);
evt_core_rm_fd(ctx, fdinfo->fd);
return 1;
}
return 0;
}
@ -216,10 +226,10 @@ int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
fdinfo_n.cat->name = "tcp-read";
sprintf(fdinfo_n.url, "tcp:read:%s:%d", s5ctx->addr, s5ctx->port);
evt_core_add_fd (ctx, &fdinfo_n);
struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (ctx, &fdinfo_n);
printf("--- Tor socket registered\n");
register_timer(ctx, NULL);
register_timer(ctx, reg_fdinfo, NULL);
return 1;
}
@ -307,10 +317,10 @@ void spawn_udp_client(struct evt_core_ctx* evts) {
fdinfo.other = &mctx->mc;
fdinfo.free_other = NULL;
sprintf(fdinfo.url, "udp:read:%s:%s", mctx->host, mctx->port);
evt_core_add_fd (evts, &fdinfo);
struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo);
printf("--- UDP client registered\n");
register_timer(evts, NULL);
register_timer(evts, reg_fdinfo, NULL);
}
void spawn_udp_server(struct evt_core_ctx* evts) {
@ -355,10 +365,10 @@ void spawn_tcp_client(struct evt_core_ctx* evts) {
fdinfo.other = &mctx->mc;
fdinfo.free_other = NULL;
sprintf(fdinfo.url, "tcp:read:%s:%s", mctx->host, mctx->port);
evt_core_add_fd (evts, &fdinfo);
struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo);
printf("--- TCP client registered\n");
register_timer(evts, NULL);
register_timer(evts, reg_fdinfo, NULL);
}
void spawn_tcp_server(struct evt_core_ctx* evts, uint16_t *ports) {
@ -396,6 +406,7 @@ void measlat_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl,
fprintf(stderr, "Unable to create Onion Services (error: %d)\n", err);
exit(EXIT_FAILURE);
}
printf("--- Onion services created\n");
}
int main(int argc, char** argv) {
@ -407,7 +418,7 @@ int main(int argc, char** argv) {
struct evt_core_ctx evts = {0};
// 1. Parse parameters
while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:lrn")) != -1) {
while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:ln")) != -1) {
switch(opt) {
case 'v':
mctx.verbose++;
@ -430,9 +441,6 @@ int main(int argc, char** argv) {
case 's': // size - payload in bytes
mctx.mc.payload_size = atoi(optarg);
break;
case 'r':
mctx.mc.is_rtt = 1;
break;
case 'n':
mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS;
break;
@ -450,6 +458,7 @@ int main(int argc, char** argv) {
if (mctx.transport == NULL) mctx.transport = "udp";
if (mctx.host == NULL) mctx.host = "127.0.0.1";
if (mctx.port == NULL) mctx.port = strcmp(mctx.transport, "udp") == 0 ? "9000" : "7500";
if (strcmp(mctx.transport, "udp") == 0) mctx.connectionless = 1;
printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld, is_rtt=%d\n",
mctx.host, mctx.port, mctx.mc.is_server, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval, mctx.mc.is_rtt);
@ -461,14 +470,10 @@ int main(int argc, char** argv) {
uint16_t ports[] = {7500};
int ports_count = sizeof(ports) / sizeof(ports[0]);
if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) {
spawn_udp_server (&evts);
mctx.is_from_needed = 1;
}
if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) spawn_udp_server (&evts);
else if (mctx.mc.is_server && strcmp(mctx.transport, "tor") == 0) {
spawn_tcp_server(&evts, ports);
measlat_create_onion_services (&tos, &tctl, ports, ports_count, mctx.tor_flags);
printf("--- Onion services created\n");
}
else if (mctx.mc.is_server && strcmp(mctx.transport, "tcp") == 0) spawn_tcp_server(&evts, ports);
else if (strcmp(mctx.transport, "udp") == 0) spawn_udp_client(&evts);

View file

@ -23,7 +23,6 @@ void measure_parse(int size, struct measure_conf* mc) {
perror("read error, payload has wrong size");
//exit(EXIT_FAILURE);
}
struct measure_packet* head = (struct measure_packet*) mc->payload_rcv;
if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){
@ -36,8 +35,9 @@ void measure_parse(int size, struct measure_conf* mc) {
uint8_t is_vanilla = (head->flag & 0x40) >> 6;
uint8_t link_id = head->flag & 0x3f;
printf(
"[%s] Packet %llu latency %luµs with flag %d sent on link %d with vanilla %d\n",
"[%s] src=%s, id=%llu, owd=%luµs, flag=%d, link=%d, vanilla=%d\n",
current_human_datetime(),
"test",
(unsigned long long)head->counter,
micro_sec,
is_slow,
@ -47,7 +47,7 @@ void measure_parse(int size, struct measure_conf* mc) {
void measure_prepare(struct measure_conf* mc) {
if (mc->interval <= 0) mc->interval = 1000;
if (mc->max_measure <= 0) mc->max_measure = 1;
if (mc->max_measure <= 0) mc->max_measure = 3;
if (mc->payload_size < sizeof(struct measure_packet)) mc->payload_size = sizeof(struct measure_packet);
if ((mc->payload = malloc(sizeof(char) * mc->payload_size)) == NULL) {
@ -75,7 +75,6 @@ struct measure_packet* measure_generate(struct measure_conf* mc) {
mc->counter++;
head->counter = mc->counter;
head->is_echo = mc->is_rtt && !mc->is_server;
head->flag = 0;
if (clock_gettime(CLOCK_MONOTONIC, &head->emit_time) == -1) {
perror("clock_gettime error");
@ -109,7 +108,3 @@ void measure_next_tick(struct measure_conf *mc, struct timespec *next) {
printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec);
printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec);
}
uint8_t measure_need_reply(struct measure_conf* mc) {
return mc->is_server && ((struct measure_packet*)mc->payload)->is_echo;
};

View file

@ -14,12 +14,11 @@ struct measure_conf {
char* payload;
char* payload_rcv;
uint64_t counter;
uint8_t is_server, is_rtt;
uint8_t is_server;
};
struct measure_packet {
uint64_t counter;
uint8_t is_echo;
uint8_t flag;
struct timespec emit_time;
};
@ -27,5 +26,4 @@ struct measure_packet {
void measure_parse(int size, struct measure_conf* mc);
void measure_prepare(struct measure_conf* mc);
struct measure_packet* measure_generate(struct measure_conf* mc);
uint8_t measure_need_reply(struct measure_conf* mc);
void measure_next_tick(struct measure_conf *mc, struct timespec *next);