Huge refactor

This commit is contained in:
Quentin 2021-01-13 11:25:50 +01:00
parent 7769acb8b9
commit 5b98780d79
5 changed files with 204 additions and 153 deletions

View file

@ -19,43 +19,41 @@ struct measlat_ctx {
socklen_t addrlen;
int verbose, connectionless, tor_flags;
char *host, *port, *transport;
enum { MEASLAT_CLIENT = 0, MEASLAT_SERVER = 1 } role;
struct measure_params mp;
};
struct timer_measure {
int fd, cntr;
};
void free_timer_measure(void* obj) {
void free_ms(void* obj) {
struct measure_state* ms = obj;
free(ms->mp_in);
free(ms->mp_out);
free(obj);
}
struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct evt_core_fdinfo *tgt, struct timespec* next_tick) {
struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct measlat_ctx* mctx, struct measure_state* ms, struct timespec* next_tick) {
struct timespec now;
struct itimerspec timer_config;
char url[1024];
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo = {0};
struct timer_measure* tmeas = malloc(sizeof(struct timer_measure));
if (tmeas == NULL) {
struct measure_state* msheap = malloc(sizeof(struct measure_state));
if (ms == NULL) {
perror("unable to malloc struct timer_measure");
exit(EXIT_FAILURE);
}
tmeas->fd = tgt->fd;
tmeas->cntr = 0;
memcpy(msheap, ms, sizeof(struct measure_state));
fdinfo.cat = &cat;
fdinfo.url = url;
fdinfo.other = tmeas;
fdinfo.free_other = free_timer_measure;
struct measlat_ctx* mctx = tgt->cat->app_ctx;
fdinfo.other = msheap;
fdinfo.free_other = free_ms;
if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
perror("clock_gettime");
exit(EXIT_FAILURE);
}
uint64_t micro_sec = mctx->mc.interval;
uint64_t micro_sec = mctx->mp.interval;
timer_config.it_value.tv_sec = next_tick == NULL ? now.tv_sec + 1 : next_tick->tv_sec;
timer_config.it_value.tv_nsec = next_tick == NULL ? now.tv_nsec : next_tick->tv_nsec;
timer_config.it_interval.tv_sec = micro_sec / 1000;
@ -73,7 +71,7 @@ struct evt_core_fdinfo* register_timer(struct evt_core_ctx* evts, struct evt_cor
exit(EXIT_FAILURE);
}
fdinfo.cat->name = "timer";
sprintf(fdinfo.url, "timer:%d", tgt->fd);
sprintf(fdinfo.url, "timer:%d", msheap->fd);
struct evt_core_fdinfo *new_fdinfo = evt_core_add_fd (evts, &fdinfo);
printf("--- Timer registered\n");
return new_fdinfo;
@ -89,44 +87,75 @@ int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdin
return 1;
}
void measlat_stop(
struct evt_core_ctx* ctx,
struct measlat_ctx* mctx,
struct measure_state* ms,
struct evt_core_fdinfo* net,
struct evt_core_fdinfo* timer) {
if (ms->mp_in->counter < mctx->mp.max_measure) return;
if (ms->mp_out->counter < mctx->mp.max_measure) return;
printf("All measurements done\n");
evt_core_rm_fd(ctx, timer->fd);
if (mctx->role == MEASLAT_CLIENT) {
evt_core_rm_fd(ctx, net->fd);
exit(EXIT_SUCCESS);
}
}
int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct timer_measure* tmeas = fdinfo->other;
struct measlat_ctx* mctx = fdinfo->cat->app_ctx;
struct measure_state msbuf = {0};
struct measure_state* ms;
ssize_t nread;
char url[255];
// 1. Get our measurement state
sprintf(url, "timer:%d", fdinfo->fd);
struct evt_core_fdinfo* assoc_timer = evt_core_get_from_url (ctx, url);
if (assoc_timer == NULL) {
// Start sending if timer does not exist yet
struct timespec next_tick = {0};
measure_next_tick(&mctx->mc, &next_tick);
assoc_timer = register_timer (ctx, fdinfo, &next_tick);
if (assoc_timer) {
// Measurement state exists
ms = assoc_timer->other;
} else {
// Does not exist yet, we create it.
ms = &msbuf;
ms->fd = fdinfo->fd;
measure_state_init (&mctx->mp, ms);
}
if (mctx->mc.read_size >= mctx->mc.payload_size || strcmp(mctx->transport, "udp") == 0) mctx->mc.read_size = 0;
// 2. Read data in our measurement object
if (ms->mp_nin >= mctx->mp.payload_size || strcmp(mctx->transport, "udp") == 0) ms->mp_nin = 0;
nread = mctx->connectionless ?
recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen) :
recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0);
recvfrom(fdinfo->fd, ms->mp_in, mctx->mp.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen) :
recv(fdinfo->fd, ms->mp_in + ms->mp_nin, mctx->mp.payload_size - ms->mp_nin, 0);
if (nread > 0) mctx->mc.read_size += nread;
if (nread > 0) ms->mp_nin += nread;
if (nread == -1 && errno == EAGAIN) {
return 1;
}
if (strcmp("udp", mctx->transport) != 0 && mctx->mc.read_size < mctx->mc.payload_size) {
if (strcmp("udp", mctx->transport) != 0 && ms->mp_nin < mctx->mp.payload_size) {
printf("Packet has been fragmented\n");
return 0;
}
// First we parse the packet
measure_parse (mctx->mc.read_size, &mctx->mc);
// 3. Process data in our measurement object
measure_parse (&mctx->mp, ms);
struct measure_packet* head = (struct measure_packet*) mctx->mc.payload_rcv;
if (head->counter >= mctx->mc.max_measure) {
printf("All measurements received\n");
// 4. Persist our measurement object if needed
// It includes starting a timer.
if (ms == &msbuf) {
struct timespec next_tick = {0};
measure_next_tick(&mctx->mp, ms, &next_tick);
assoc_timer = register_timer (ctx, mctx, ms, &next_tick);
}
// 5. Check if our measurements are done
if (ms->mp_in->counter >= mctx->mp.max_measure && ms->mp_out->counter >= mctx->mp.max_measure)
measlat_stop(ctx, mctx, ms, fdinfo, assoc_timer);
return 0;
}
@ -165,6 +194,7 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
ssize_t s;
uint64_t ticks = 0;
struct measlat_ctx* mctx = fdinfo->cat->app_ctx;
struct measure_state* ms = fdinfo->other;
s = read(fdinfo->fd, &ticks, sizeof(uint64_t));
if (s == -1 && errno == EAGAIN) return 1;
@ -177,7 +207,7 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
fprintf(stderr, "Has ticked %lu times, expected 1 time. This is a bug\n", ticks);
}
struct measure_packet* head = measure_generate(&mctx->mc);
struct measure_packet* head = measure_generate(&mctx->mp, ms);
//printf("send(id=%ld,is_echo=%d)\n", head->counter, head->is_echo);
int target_fd;
@ -190,15 +220,15 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
}
s = mctx->connectionless ?
sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) :
send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
sendto(tgtinfo->fd, head, mctx->mp.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) :
send(tgtinfo->fd, head, mctx->mp.payload_size, 0);
if (s < 0 || s != mctx->mc.payload_size) {
if (s < 0 || s != mctx->mp.payload_size) {
perror("Send error");
exit(EXIT_FAILURE);
}
if (mctx->mc.counter >= mctx->mc.max_measure) {
if (head->counter >= mctx->mp.max_measure) {
printf("All measurements sent for %d\n", tgtinfo->fd);
evt_core_rm_fd(ctx, fdinfo->fd);
return 1;
@ -229,7 +259,9 @@ int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (ctx, &fdinfo_n);
printf("--- Tor socket registered\n");
register_timer(ctx, reg_fdinfo, NULL);
struct measure_state ms = {0};
measure_state_init (&mctx->mp, &ms);
register_timer (ctx, mctx, &ms, NULL);
return 1;
}
@ -314,13 +346,13 @@ void spawn_udp_client(struct evt_core_ctx* evts) {
fdinfo.fd = udp_sock;
fdinfo.cat->name = "udp-read";
fdinfo.other = &mctx->mc;
fdinfo.free_other = NULL;
sprintf(fdinfo.url, "udp:read:%s:%s", mctx->host, mctx->port);
struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo);
printf("--- UDP client registered\n");
register_timer(evts, reg_fdinfo, NULL);
struct measure_state ms = {0};
measure_state_init (&mctx->mp, &ms);
register_timer (evts, mctx, &ms, NULL);
}
void spawn_udp_server(struct evt_core_ctx* evts) {
@ -362,13 +394,13 @@ void spawn_tcp_client(struct evt_core_ctx* evts) {
fdinfo.fd = tcp_sock;
fdinfo.cat->name = "tcp-read";
fdinfo.other = &mctx->mc;
fdinfo.free_other = NULL;
sprintf(fdinfo.url, "tcp:read:%s:%s", mctx->host, mctx->port);
struct evt_core_fdinfo* reg_fdinfo = evt_core_add_fd (evts, &fdinfo);
printf("--- TCP client registered\n");
register_timer(evts, reg_fdinfo, NULL);
struct measure_state ms = {0};
measure_state_init (&mctx->mp, &ms);
register_timer (evts, mctx, &ms, NULL);
}
void spawn_tcp_server(struct evt_core_ctx* evts, uint16_t *ports) {
@ -409,6 +441,15 @@ void measlat_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl,
printf("--- Onion services created\n");
}
void spawn_onion_server(struct evt_core_ctx* evts, uint16_t *ports, int ports_count, struct tor_os_str* tos, struct tor_ctl* tctl, struct measlat_ctx* mctx) {
spawn_tcp_server(evts, ports);
measlat_create_onion_services (tos, tctl, ports, ports_count, mctx->tor_flags);
}
int streq(char* s1, char* s2) {
return strcmp(s1, s2) == 0;
}
int main(int argc, char** argv) {
setvbuf(stdout, NULL, _IONBF, 0);
printf("~ measlat ~\n");
@ -416,6 +457,10 @@ int main(int argc, char** argv) {
int opt;
struct measlat_ctx mctx = {0};
struct evt_core_ctx evts = {0};
struct tor_os_str tos = {0};
struct tor_ctl tctl = {0};
uint16_t ports[] = {7500};
int ports_count = sizeof(ports) / sizeof(ports[0]);
// 1. Parse parameters
while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:ln")) != -1) {
@ -430,22 +475,22 @@ int main(int argc, char** argv) {
mctx.port = optarg;
break;
case 'l':
mctx.mc.is_server = 1;
mctx.role = MEASLAT_SERVER;
break;
case 't': // transport
mctx.transport = optarg;
break;
case 'c': // count
mctx.mc.max_measure = atoi(optarg);
mctx.mp.max_measure = atoi(optarg);
break;
case 's': // size - payload in bytes
mctx.mc.payload_size = atoi(optarg);
mctx.mp.payload_size = atoi(optarg);
break;
case 'n':
mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS;
break;
case 'i': // interval - every ms
mctx.mc.interval = atoi(optarg);
mctx.mp.interval = atoi(optarg);
break;
default:
goto usage;
@ -453,35 +498,33 @@ int main(int argc, char** argv) {
}
// 2. Check and fix parameters
measure_prepare (&mctx.mc);
measure_params_init (&mctx.mp);
mctx.addrlen = sizeof(mctx.addr);
if (mctx.transport == NULL) mctx.transport = "udp";
if (mctx.host == NULL) mctx.host = "127.0.0.1";
if (mctx.port == NULL) mctx.port = strcmp(mctx.transport, "udp") == 0 ? "9000" : "7500";
if (strcmp(mctx.transport, "udp") == 0) mctx.connectionless = 1;
printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld, is_rtt=%d\n",
mctx.host, mctx.port, mctx.mc.is_server, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval, mctx.mc.is_rtt);
if (mctx.host == NULL) mctx.host = "127.0.0.1";
if (mctx.port == NULL) mctx.port = mctx.connectionless ? "9000" : "7500";
// 3. Bind events
printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld\n",
mctx.host, mctx.port, mctx.role, mctx.transport, mctx.mp.max_measure, mctx.mp.payload_size, mctx.mp.interval);
// 3. Create event structure
register_categories(&evts, &mctx);
struct tor_os_str tos = {0};
struct tor_ctl tctl = {0};
uint16_t ports[] = {7500};
int ports_count = sizeof(ports) / sizeof(ports[0]);
if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) spawn_udp_server (&evts);
else if (mctx.mc.is_server && strcmp(mctx.transport, "tor") == 0) {
spawn_tcp_server(&evts, ports);
measlat_create_onion_services (&tos, &tctl, ports, ports_count, mctx.tor_flags);
// 4. Register services
if (mctx.role == MEASLAT_SERVER) {
if (streq(mctx.transport, "udp")) spawn_udp_server (&evts);
else if (streq(mctx.transport, "tcp")) spawn_tcp_server(&evts, ports);
else if (streq(mctx.transport, "tor")) spawn_onion_server (&evts, ports, ports_count, &tos, &tctl, &mctx);
}
else if (mctx.role == MEASLAT_CLIENT) {
if (streq(mctx.transport, "udp")) spawn_udp_client(&evts);
else if (streq(mctx.transport, "tor")) spawn_tor_client(&evts);
else if (streq(mctx.transport, "tcp")) spawn_tcp_client(&evts);
}
else if (mctx.mc.is_server && strcmp(mctx.transport, "tcp") == 0) spawn_tcp_server(&evts, ports);
else if (strcmp(mctx.transport, "udp") == 0) spawn_udp_client(&evts);
else if (strcmp(mctx.transport, "tor") == 0) spawn_tor_client(&evts);
else if (strcmp(mctx.transport, "tcp") == 0) spawn_tcp_client(&evts);
else exit(EXIT_FAILURE);
// 4. Run main loop
// 5. Run main loop
evt_core_loop(&evts);
return 0;

View file

@ -1,92 +1,92 @@
#include "measure.h"
void measure_parse(int size, struct measure_conf* mc) {
void measure_params_init(struct measure_params* mp) {
if (mp->interval <= 0) mp->interval = 1000;
if (mp->max_measure <= 0) mp->max_measure = 3;
if (mp->payload_size < sizeof(struct measure_packet)) mp->payload_size = sizeof(struct measure_packet);
}
void measure_state_init(struct measure_params* mp, struct measure_state* ms) {
if ((ms->mp_out = malloc(sizeof(char) * mp->payload_size)) == NULL) {
perror("payload malloc failed");
exit(EXIT_FAILURE);
}
memset(ms->mp_out, 0, mp->payload_size);
if ((ms->mp_in = malloc(sizeof(char) * mp->payload_size)) == NULL) {
perror("payload malloc failed");
exit(EXIT_FAILURE);
}
memset(ms->mp_in, 0, mp->payload_size);
char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire.";
size_t msg_len = strlen(my_msg);
size_t cursor_msg = 0;
char* pl = (char*) ms->mp_out;
for (size_t i = sizeof(struct measure_packet); i < mp->payload_size; i++) {
pl[i] = my_msg[cursor_msg];
cursor_msg = (cursor_msg + 1) % msg_len;
}
}
void measure_parse(struct measure_params* mp, struct measure_state* ms) {
struct timespec curr;
uint64_t micro_sec;
if (size != mc->payload_size) {
fprintf(stderr, "read size: %d, expected: %ld\n", size, mc->payload_size);
if (ms->mp_nin != mp->payload_size) {
fprintf(stderr, "read size: %ld, expected: %ld\n", ms->mp_nin, mp->payload_size);
int i;
fprintf(stderr, "received buffer:\n");
for (i = 0; i < mc->payload_size; i++) {
for (i = 0; i < mp->payload_size; i++) {
if (i > 0) fprintf(stderr, ":");
fprintf(stderr, "%02x", (unsigned char) mc->payload_rcv[i]);
fprintf(stderr, "%02x", (unsigned char) ((char*)ms->mp_in)[i]);
}
fprintf(stderr, "\n");
fprintf(stderr, "local buffer (reference):\n");
for (i = 0; i < mc->payload_size; i++) {
for (i = 0; i < mp->payload_size; i++) {
if (i > 0) fprintf(stderr, ":");
fprintf(stderr, "%02X", (unsigned char) mc->payload[i]);
fprintf(stderr, "%02X", (unsigned char) ((char*)ms->mp_out)[i]);
}
fprintf(stderr, "\n");
perror("read error, payload has wrong size");
}
struct measure_packet* head = (struct measure_packet*) mc->payload_rcv;
if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){
perror("clock_gettime error");
exit(EXIT_FAILURE);
}
micro_sec = elapsed_micros (&head->emit_time, &curr);
uint8_t is_slow = head->flag >> 7;
uint8_t is_vanilla = (head->flag & 0x40) >> 6;
uint8_t link_id = head->flag & 0x3f;
micro_sec = elapsed_micros (&ms->mp_in->emit_time, &curr);
uint8_t is_slow = ms->mp_in->flag >> 7;
uint8_t is_vanilla = (ms->mp_in->flag & 0x40) >> 6;
uint8_t link_id = ms->mp_in->flag & 0x3f;
printf(
"[%s] src=%s, id=%llu, owd=%luµs, flag=%d, link=%d, vanilla=%d\n",
"[%s] src=%d, id=%llu, owd=%luµs, flag=%d, link=%d, vanilla=%d\n",
current_human_datetime(),
"test",
(unsigned long long)head->counter,
ms->fd,
(unsigned long long)ms->mp_in->counter,
micro_sec,
is_slow,
link_id,
is_vanilla);
}
void measure_prepare(struct measure_conf* mc) {
if (mc->interval <= 0) mc->interval = 1000;
if (mc->max_measure <= 0) mc->max_measure = 3;
if (mc->payload_size < sizeof(struct measure_packet)) mc->payload_size = sizeof(struct measure_packet);
if ((mc->payload = malloc(sizeof(char) * mc->payload_size)) == NULL) {
perror("payload malloc failed");
exit(EXIT_FAILURE);
}
memset(mc->payload, 0, mc->payload_size);
if ((mc->payload_rcv = malloc(sizeof(char) * mc->payload_size)) == NULL) {
perror("payload malloc failed");
exit(EXIT_FAILURE);
}
memset(mc->payload_rcv, 0, mc->payload_size);
char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire.";
size_t msg_len = strlen(my_msg);
size_t cursor_msg = 0;
for (size_t i = sizeof(struct measure_packet); i < mc->payload_size; i++) {
mc->payload[i] = my_msg[cursor_msg];
cursor_msg = (cursor_msg + 1) % msg_len;
}
}
struct measure_packet* measure_generate(struct measure_conf* mc) {
struct measure_packet* head = (struct measure_packet*)mc->payload;
mc->counter++;
head->counter = mc->counter;
head->flag = 0;
if (clock_gettime(CLOCK_MONOTONIC, &head->emit_time) == -1) {
struct measure_packet* measure_generate(struct measure_params* mp, struct measure_state* ms) {
ms->mp_out->counter++;
ms->mp_out->flag = 0;
if (clock_gettime(CLOCK_MONOTONIC, &ms->mp_out->emit_time) == -1) {
perror("clock_gettime error");
exit(EXIT_FAILURE);
}
return head;
return ms->mp_out;
}
void measure_next_tick(struct measure_conf *mc, struct timespec *next) {
struct measure_packet *head = (struct measure_packet*) mc->payload_rcv;
struct timespec now, *sent_at = &head->emit_time;
mc->counter = head->counter;
void measure_next_tick(struct measure_params *mp, struct measure_state* ms, struct timespec *next) {
//struct measure_packet *head = (struct measure_packet*) ms->payload_rcv;
struct timespec now = {0}, *sent_at = &ms->mp_in->emit_time;
if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
perror("clock_gettime error");
@ -95,15 +95,15 @@ void measure_next_tick(struct measure_conf *mc, struct timespec *next) {
memcpy(next, sent_at, sizeof(struct timespec));
while(!timespec_gt (next, &now)) {
next->tv_nsec += mc->interval * 1000000L;
next->tv_nsec += mp->interval * 1000000L;
if (next->tv_nsec > ONE_SEC) {
next->tv_sec += next->tv_nsec / ONE_SEC;
next->tv_nsec = next->tv_nsec % ONE_SEC;
}
mc->counter++;
ms->mp_out->counter++;
}
mc->counter--;
printf("interval: %ld\n", mc->interval);
ms->mp_out->counter--;
printf("interval: %ld\n", mp->interval);
printf("sent_at: sec=%ld nsec=%ld \n", (uint64_t) sent_at->tv_sec, (uint64_t) sent_at->tv_nsec);
printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec);
printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec);

View file

@ -6,24 +6,30 @@
#include <string.h>
#include "utils.h"
struct measure_conf {
struct measure_params {
uint64_t max_measure;
uint64_t payload_size;
uint64_t read_size;
uint64_t interval;
char* payload;
char* payload_rcv;
uint64_t counter;
uint8_t is_server;
};
struct measure_state {
struct measure_packet* mp_out;
struct measure_packet* mp_in;
ssize_t mp_nin;
int fd;
};
struct measure_packet {
uint64_t counter;
uint8_t flag;
struct timespec emit_time;
};
void measure_parse(int size, struct measure_conf* mc);
void measure_prepare(struct measure_conf* mc);
struct measure_packet* measure_generate(struct measure_conf* mc);
void measure_next_tick(struct measure_conf *mc, struct timespec *next);
void measure_params_init(struct measure_params* mp);
void measure_state_init(struct measure_params* mp, struct measure_state* ms);
void measure_parse(struct measure_params* mp, struct measure_state* ms);
struct measure_packet* measure_generate(struct measure_params* mp, struct measure_state* ms);
void measure_next_tick(struct measure_params *mp, struct measure_state* ms, struct timespec *next);

View file

@ -10,7 +10,8 @@
struct torecho_ctx {
uint8_t is_measlat;
struct measure_conf mc;
struct measure_params mp;
struct measure_state ms;
};
void te_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl, uint16_t* ports, int ports_count, enum TOR_ONION_FLAGS tof) {
@ -66,18 +67,18 @@ int te_on_tcp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
socklen_t addrlen = sizeof(addr);
struct torecho_ctx *tctx = fdinfo->cat->app_ctx;
nread = recv(fdinfo->fd, tctx->mc.payload, tctx->mc.payload_size, 0);
nread = recv(fdinfo->fd, tctx->ms.mp_in, tctx->mp.payload_size, 0);
if (nread == -1 && errno == EAGAIN) return 1; // Read done
if (nread == 0) { fprintf(stderr, "WARN! Read 0 bytes.\n"); return 1; }
if (nread < 0 || nread > tctx->mc.payload_size) {
fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, tctx->mc.payload_size);
if (nread < 0 || nread > tctx->mp.payload_size) {
fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, tctx->mp.payload_size);
perror("read errno");
exit(EXIT_FAILURE);
}
if (tctx->is_measlat) measure_parse (nread, &tctx->mc);
if (tctx->is_measlat) measure_parse (&tctx->mp, &tctx->ms);
nwritten = send(fdinfo->fd, tctx->mc.payload, nread, 0);
nwritten = send(fdinfo->fd, tctx->ms.mp_in, nread, 0);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
if (nwritten != nread) {
fprintf(stderr, "Didn't write the same number of bytes as read - not supported. nread=%ld, nwritten=%ld\n", nread, nwritten);
@ -100,8 +101,7 @@ int main(int argc, char** argv) {
enum TOR_ONION_FLAGS tof = TOR_ONION_FLAG_NONE;
char url[1024];
struct torecho_ctx tctx = {0};
tctx.mc.is_server = 1;
tctx.mc.payload_size = 1500;
tctx.mp.payload_size = 1500;
while ((opt = getopt(argc, argv, "ns:m")) != -1) {
switch(opt) {
@ -112,7 +112,7 @@ int main(int argc, char** argv) {
tctx.is_measlat = 1;
break;
case 's':
tctx.mc.payload_size = atoi(optarg);
tctx.mp.payload_size = atoi(optarg);
break;
default:
break;
@ -138,7 +138,8 @@ int main(int argc, char** argv) {
.flags = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP,
.socklist = NULL
};
measure_prepare (&tctx.mc);
measure_params_init (&tctx.mp);
measure_state_init (&tctx.mp, &tctx.ms);
evt_core_init(&evts, 0);
evt_core_add_cat(&evts, &tcp_co);

View file

@ -7,7 +7,8 @@
#include "measure.h"
struct udpecho_ctx {
struct measure_conf mc;
struct measure_params mp;
struct measure_state ms;
uint8_t is_measlat;
};
@ -17,17 +18,17 @@ int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
socklen_t addrlen = sizeof(addr);
struct udpecho_ctx *uctx = fdinfo->cat->app_ctx;
nread = recvfrom(fdinfo->fd, uctx->mc.payload, uctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&addr, &addrlen);
nread = recvfrom(fdinfo->fd, uctx->ms.mp_in, uctx->mp.payload_size, MSG_TRUNC, (struct sockaddr*)&addr, &addrlen);
if (nread == -1 && errno == EAGAIN) return 1; // Read done
if (nread <= 0 || nread > uctx->mc.payload_size) {
fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, uctx->mc.payload_size);
if (nread <= 0 || nread > uctx->mp.payload_size) {
fprintf(stderr, "Message is either truncated or an error occured. nread=%ld, expected=%ld\n", nread, uctx->mp.payload_size);
perror("read errno");
exit(EXIT_FAILURE);
}
if (uctx->is_measlat) measure_parse (nread, &uctx->mc);
if (uctx->is_measlat) measure_parse (&uctx->mp, &uctx->ms);
nwritten = sendto(fdinfo->fd, uctx->mc.payload, nread, 0, (struct sockaddr*)&addr, addrlen);
nwritten = sendto(fdinfo->fd, uctx->ms.mp_in, nread, 0, (struct sockaddr*)&addr, addrlen);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
if (nwritten != nread) {
fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", nread, nwritten);
@ -45,8 +46,7 @@ int main(int argc, char** argv) {
int opt, udp_sock, verbose = 0;
char *port = NULL, *bindhost = NULL;
struct evt_core_ctx evts = {0};
uctx.mc.is_server = 1;
uctx.mc.payload_size = 1500;
uctx.mp.payload_size = 1500;
// 1. Parse parameters
while ((opt = getopt(argc, argv, "b:p:vms:")) != -1) {
@ -64,14 +64,15 @@ int main(int argc, char** argv) {
uctx.is_measlat = 1;
break;
case 's':
uctx.mc.payload_size = atoi(optarg);
uctx.mp.payload_size = atoi(optarg);
break;
default:
goto usage;
}
}
if (bindhost == NULL) bindhost = "127.0.0.1";
measure_prepare (&uctx.mc);
measure_params_init (&uctx.mp);
measure_state_init (&uctx.mp, &uctx.ms);
// 2. Register category
struct evt_core_cat udp_read = {