Removed many bugs

This commit is contained in:
Quentin 2021-01-26 12:13:03 +01:00
parent 1efc14eecd
commit d4bbca89f0
6 changed files with 159 additions and 55 deletions

View file

@ -55,33 +55,34 @@ add_executable(faketor ${CSOURCES} src/faketor.c)
add_executable(dcall src/dcall.c) add_executable(dcall src/dcall.c)
find_package(PkgConfig REQUIRED) find_package(PkgConfig REQUIRED)
pkg_search_module(UUID REQUIRED uuid)
pkg_search_module(GLIB REQUIRED glib-2.0) pkg_search_module(GLIB REQUIRED glib-2.0)
pkg_check_modules(GST REQUIRED gstreamer-1.0>=1.14 pkg_check_modules(GST REQUIRED gstreamer-1.0>=1.14
gstreamer-rtp-1.0>=1.14) gstreamer-rtp-1.0>=1.14)
target_include_directories(donar PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(donar PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(donar ${GLIB_LDFLAGS}) target_link_libraries(donar ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(measlat PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(measlat PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(measlat ${GLIB_LDFLAGS}) target_link_libraries(measlat ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(udpecho PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(udpecho PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(udpecho ${GLIB_LDFLAGS}) target_link_libraries(udpecho ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(torecho PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(torecho PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(torecho ${GLIB_LDFLAGS}) target_link_libraries(torecho ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(capdiff PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(capdiff PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(capdiff ${GLIB_LDFLAGS}) target_link_libraries(capdiff ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(capreplay PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(capreplay PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(capreplay ${GLIB_LDFLAGS}) target_link_libraries(capreplay ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(donar_unit_test PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(donar_unit_test PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(donar_unit_test ${GLIB_LDFLAGS}) target_link_libraries(donar_unit_test ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(faketor PRIVATE ${GLIB_INCLUDE_DIRS}) target_include_directories(faketor PRIVATE ${GLIB_INCLUDE_DIRS} ${UUID_INCLUDE_DIRS})
target_link_libraries(faketor ${GLIB_LDFLAGS}) target_link_libraries(faketor ${GLIB_LDFLAGS} ${UUID_LDFLAGS})
target_include_directories(dcall PRIVATE ${GST_INCLUDE_DIRS}) target_include_directories(dcall PRIVATE ${GST_INCLUDE_DIRS})
target_link_libraries(dcall ${GST_LDFLAGS}) target_link_libraries(dcall ${GST_LDFLAGS})

View file

@ -24,9 +24,8 @@ struct measlat_ctx {
}; };
void free_ms(void* obj) { void free_ms(void* obj) {
struct measure_state* ms = obj; printf("free measure state\n");
free(ms->mp_in); measure_state_free (obj);
free(ms->mp_out);
free(obj); free(obj);
} }
@ -111,6 +110,7 @@ void measlat_stop(
if (ms->mp_out->counter < mctx->mp.max_measure) return; if (ms->mp_out->counter < mctx->mp.max_measure) return;
printf("[states] measurement %d+%d terminated\n", net_fd, timer_fd); printf("[states] measurement %d+%d terminated\n", net_fd, timer_fd);
measure_summary (&mctx->mp, ms);
evt_core_rm_fd(ctx, timer_fd); evt_core_rm_fd(ctx, timer_fd);
if (!(mctx->connectionless && mctx->role == MEASLAT_SERVER)) if (!(mctx->connectionless && mctx->role == MEASLAT_SERVER))
evt_core_rm_fd(ctx, net_fd); evt_core_rm_fd(ctx, net_fd);
@ -148,7 +148,7 @@ int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
if (ms->mp_nin < mctx->mp.payload_size) return 0; if (ms->mp_nin < mctx->mp.payload_size) return 0;
// 3. Process data in our measurement object // 3. Process data in our measurement object
measure_parse (&mctx->mp, ms); measure_parse (&mctx->mp, ms, mctx->verbose);
// 4. Persist our measurement object if needed // 4. Persist our measurement object if needed
// It includes starting a timer. // It includes starting a timer.
@ -483,13 +483,15 @@ int main(int argc, char** argv) {
int opt; int opt;
struct measlat_ctx mctx = {0}; struct measlat_ctx mctx = {0};
mctx.mp.tag = "undefined";
struct evt_core_ctx evts = {0}; struct evt_core_ctx evts = {0};
struct tor_os_str tos = {0}; struct tor_os_str tos = {0};
struct tor_ctl tctl = {0}; struct tor_ctl tctl = {0};
tctl.os_endpoint = "127.0.0.1"; tctl.os_endpoint = "127.0.0.1";
measure_params_init (&mctx.mp);
// 1. Parse parameters // 1. Parse parameters
while ((opt = getopt(argc, argv, "vq:h:p:c:s:i:t:ln")) != -1) { while ((opt = getopt(argc, argv, "vq:h:p:c:s:i:t:lnm:")) != -1) {
switch(opt) { switch(opt) {
case 'v': case 'v':
mctx.verbose++; mctx.verbose++;
@ -513,7 +515,7 @@ int main(int argc, char** argv) {
mctx.mp.max_measure = atoi(optarg); mctx.mp.max_measure = atoi(optarg);
break; break;
case 's': // size - payload in bytes case 's': // size - payload in bytes
mctx.mp.payload_size = atoi(optarg); measure_params_setpl(&mctx.mp, atoi(optarg));
break; break;
case 'n': case 'n':
mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS; mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS;
@ -521,13 +523,15 @@ int main(int argc, char** argv) {
case 'i': // interval - every ms case 'i': // interval - every ms
mctx.mp.interval = atoi(optarg); mctx.mp.interval = atoi(optarg);
break; break;
case 'm':
mctx.mp.tag = optarg;
break;
default: default:
goto usage; goto usage;
} }
} }
// 2. Check and fix parameters // 2. Check and fix parameters
measure_params_init (&mctx.mp);
measure_state_init (&mctx.mp, &ms_transi); measure_state_init (&mctx.mp, &ms_transi);
mctx.addrlen = sizeof(mctx.addr); mctx.addrlen = sizeof(mctx.addr);

View file

@ -1,15 +1,32 @@
#include "measure.h" #include "measure.h"
char* default_tag = "undefined";
void measure_params_init(struct measure_params* mp) { void measure_params_init(struct measure_params* mp) {
if (mp->interval <= 0) mp->interval = 1000; mp->interval = 1000;
if (mp->max_measure <= 0) mp->max_measure = 3; mp->max_measure = 3;
if (mp->payload_size < sizeof(struct measure_packet)) mp->payload_size = sizeof(struct measure_packet); mp->tag = default_tag;
mp->payload_size = sizeof(struct measure_packet);
}
void measure_params_setpl (struct measure_params* mp, size_t plsize) {
if (plsize > sizeof(struct measure_packet)) {
mp->payload_size = plsize;
}
} }
void measure_state_init(struct measure_params* mp, struct measure_state* ms) { void measure_state_init(struct measure_params* mp, struct measure_state* ms) {
ms->mp_nin = 0; ms->mp_nin = 0;
ms->fd = 0; ms->fd = 0;
uuid_generate (ms->uuid);
if (ms->log == NULL) {
if ((ms->log = malloc(sizeof(uint64_t) * mp->max_measure)) == NULL) {
perror("log malloc failed");
exit(EXIT_FAILURE);
}
}
memset(ms->log, 0, sizeof(uint64_t) * mp->max_measure);
if (ms->mp_out == NULL) { if (ms->mp_out == NULL) {
if ((ms->mp_out = malloc(sizeof(char) * mp->payload_size)) == NULL) { if ((ms->mp_out = malloc(sizeof(char) * mp->payload_size)) == NULL) {
@ -37,7 +54,7 @@ void measure_state_init(struct measure_params* mp, struct measure_state* ms) {
} }
} }
void measure_parse(struct measure_params* mp, struct measure_state* ms) { void measure_parse(struct measure_params* mp, struct measure_state* ms, uint8_t verbose) {
struct timespec curr; struct timespec curr;
uint64_t micro_sec; uint64_t micro_sec;
if (ms->mp_nin != mp->payload_size) { if (ms->mp_nin != mp->payload_size) {
@ -67,6 +84,15 @@ void measure_parse(struct measure_params* mp, struct measure_state* ms) {
} }
micro_sec = elapsed_micros (&ms->mp_in->emit_time, &curr); micro_sec = elapsed_micros (&ms->mp_in->emit_time, &curr);
if (ms->mp_in->counter <= mp->max_measure) {
ms->log[ms->mp_in->counter-1] = micro_sec;
} else {
verbose = 1;
fprintf(stderr, "measure will be ignored: counter=%lu, max_measure=%lu\n", ms->mp_in->counter, mp->max_measure);
}
if (verbose) {
uint8_t is_slow = ms->mp_in->flag >> 7; uint8_t is_slow = ms->mp_in->flag >> 7;
uint8_t is_vanilla = (ms->mp_in->flag & 0x40) >> 6; uint8_t is_vanilla = (ms->mp_in->flag & 0x40) >> 6;
uint8_t link_id = ms->mp_in->flag & 0x3f; uint8_t link_id = ms->mp_in->flag & 0x3f;
@ -79,6 +105,7 @@ void measure_parse(struct measure_params* mp, struct measure_state* ms) {
is_slow, is_slow,
link_id, link_id,
is_vanilla); is_vanilla);
}
ms->mp_nin = 0; ms->mp_nin = 0;
} }
@ -117,3 +144,63 @@ void measure_next_tick(struct measure_params *mp, struct measure_state* ms, stru
printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec); printf("now: sec=%ld nsec=%ld \n", (uint64_t) now.tv_sec, (uint64_t) now.tv_nsec);
printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec); printf("next: sec=%ld nsec=%ld \n", (uint64_t) next->tv_sec, (uint64_t) next->tv_nsec);
} }
void measure_state_free (struct measure_state* ms) {
free(ms->mp_in);
free(ms->mp_out);
free(ms->log);
uuid_clear(ms->uuid);
}
int cmpuint64t(const void* u1, const void* u2) {
const uint64_t *i1 = u1;
const uint64_t *i2 = u2;
return *i1 < *i2 ? -1 : 1;
}
void measure_summary(struct measure_params* mp, struct measure_state* ms) {
char uuidstr[128];
uuid_unparse (ms->uuid, uuidstr);
fprintf(stdout, "tag,uuid,metric,value\n");
uint64_t* real_log = ms->log;
uint64_t real_log_size = mp->max_measure;
// cut beginning
while (real_log[0] == 0 && real_log_size > 0) {
real_log = &(real_log[1]);
real_log_size--;
}
printf("[summary] cutted %lu values at beginning\n", mp->max_measure - real_log_size);
// cut end
while (real_log[real_log_size-1] == 0 && real_log_size > 0) {
real_log_size--;
}
printf("[summary] cutted %lu values in total\n", mp->max_measure - real_log_size);
uint64_t min = real_log[0];
fprintf(stdout, "%s,%s,min,%lu\n", mp->tag, uuidstr, min);
uint64_t max = real_log[real_log_size-1];
fprintf(stdout, "%s,%s,max,%lu\n", mp->tag, uuidstr, max);
uint64_t med = real_log[(int)(0.50 * (real_log_size - 1))];
fprintf(stdout, "%s,%s,med,%lu\n", mp->tag, uuidstr, med);
uint64_t q25 = real_log[(int)(0.25 * (real_log_size - 1))];
fprintf(stdout, "%s,%s,q25,%lu\n", mp->tag, uuidstr, q25);
uint64_t q75 = real_log[(int)(0.75 * (real_log_size - 1))];
fprintf(stdout, "%s,%s,q75,%lu\n", mp->tag, uuidstr, q75);
uint64_t q99 = real_log[(int)(0.99 * (real_log_size - 1))];
fprintf(stdout, "%s,%s,q99,%lu\n", mp->tag, uuidstr, q99);
qsort (real_log, real_log_size, sizeof(uint64_t), cmpuint64t);
double avg = 0;
for (int i = 0; i < real_log_size-1; i++) {
avg += ((double) real_log[i]) / ((double) (real_log_size - 1));
}
fprintf(stdout, "%s,%s,avg,%f\n", mp->tag, uuidstr, avg);
}

View file

@ -4,6 +4,7 @@
#include <stdint.h> #include <stdint.h>
#include <time.h> #include <time.h>
#include <string.h> #include <string.h>
#include <uuid/uuid.h>
#include "utils.h" #include "utils.h"
struct measure_params { struct measure_params {
@ -11,9 +12,12 @@ struct measure_params {
uint64_t payload_size; uint64_t payload_size;
uint64_t interval; uint64_t interval;
uint8_t is_server; uint8_t is_server;
char* tag;
}; };
struct measure_state { struct measure_state {
uuid_t uuid;
uint64_t* log;
struct measure_packet* mp_out; struct measure_packet* mp_out;
struct measure_packet* mp_in; struct measure_packet* mp_in;
ssize_t mp_nin; ssize_t mp_nin;
@ -28,9 +32,13 @@ struct measure_packet {
}; };
void measure_params_init(struct measure_params* mp); void measure_params_init(struct measure_params* mp);
void measure_state_init(struct measure_params* mp, struct measure_state* ms); void measure_params_setpl (struct measure_params* mp, size_t plsize);
void measure_parse(struct measure_params* mp, struct measure_state* ms); void measure_state_init(struct measure_params* mp, struct measure_state* ms);
void measure_state_free(struct measure_state* ms);
void measure_parse(struct measure_params* mp, struct measure_state* ms, uint8_t verbose);
struct measure_packet* measure_generate(struct measure_params* mp, struct measure_state* ms); struct measure_packet* measure_generate(struct measure_params* mp, struct measure_state* ms);
void measure_next_tick(struct measure_params *mp, struct measure_state* ms, struct timespec *next); void measure_next_tick(struct measure_params *mp, struct measure_state* ms, struct timespec *next);
void measure_summary(struct measure_params* mp, struct measure_state* ms);

View file

@ -9,7 +9,7 @@
#include "measure.h" #include "measure.h"
struct torecho_ctx { struct torecho_ctx {
uint8_t is_measlat, is_tor; uint8_t is_measlat, is_tor, verbose;
struct measure_params mp; struct measure_params mp;
struct measure_state ms; struct measure_state ms;
}; };
@ -76,7 +76,7 @@ int te_on_tcp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (tctx->is_measlat) measure_parse (&tctx->mp, &tctx->ms); if (tctx->is_measlat) measure_parse (&tctx->mp, &tctx->ms, tctx->verbose);
nwritten = send(fdinfo->fd, tctx->ms.mp_in, nread, 0); nwritten = send(fdinfo->fd, tctx->ms.mp_in, nread, 0);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so? // @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
@ -101,10 +101,14 @@ int main(int argc, char** argv) {
enum TOR_ONION_FLAGS tof = TOR_ONION_FLAG_NONE; enum TOR_ONION_FLAGS tof = TOR_ONION_FLAG_NONE;
char url[1024]; char url[1024];
struct torecho_ctx tctx = {0}; struct torecho_ctx tctx = {0};
tctx.mp.payload_size = 1500; //tctx.mp.payload_size = 1500;
measure_params_init (&tctx.mp);
while ((opt = getopt(argc, argv, "ns:mtp:")) != -1) { while ((opt = getopt(argc, argv, "ns:mtp:v")) != -1) {
switch(opt) { switch(opt) {
case 'v':
tctx.verbose++;
break;
case 't': case 't':
tctx.is_tor = 1; tctx.is_tor = 1;
break; break;
@ -118,7 +122,7 @@ int main(int argc, char** argv) {
tctx.is_measlat = 1; tctx.is_measlat = 1;
break; break;
case 's': case 's':
tctx.mp.payload_size = atoi(optarg); measure_params_setpl (&tctx.mp, atoi(optarg));
break; break;
default: default:
break; break;
@ -144,10 +148,9 @@ int main(int argc, char** argv) {
.flags = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP, .flags = EPOLLIN | EPOLLOUT | EPOLLET | EPOLLRDHUP,
.socklist = NULL .socklist = NULL
}; };
measure_params_init (&tctx.mp);
measure_state_init (&tctx.mp, &tctx.ms); measure_state_init (&tctx.mp, &tctx.ms);
evt_core_init(&evts, 0); evt_core_init(&evts, tctx.verbose);
evt_core_add_cat(&evts, &tcp_co); evt_core_add_cat(&evts, &tcp_co);
evt_core_add_cat(&evts, &tcp_all); evt_core_add_cat(&evts, &tcp_all);
printf("--- Categories created\n"); printf("--- Categories created\n");

View file

@ -10,6 +10,7 @@ struct udpecho_ctx {
struct measure_params mp; struct measure_params mp;
struct measure_state ms; struct measure_state ms;
uint8_t is_measlat; uint8_t is_measlat;
uint8_t verbose;
}; };
int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
@ -26,7 +27,7 @@ int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
if (uctx->is_measlat) measure_parse (&uctx->mp, &uctx->ms); if (uctx->is_measlat) measure_parse (&uctx->mp, &uctx->ms, uctx->verbose);
nwritten = sendto(fdinfo->fd, uctx->ms.mp_in, nread, 0, (struct sockaddr*)&addr, addrlen); nwritten = sendto(fdinfo->fd, uctx->ms.mp_in, nread, 0, (struct sockaddr*)&addr, addrlen);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so? // @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
@ -43,16 +44,17 @@ int main(int argc, char** argv) {
printf("~ udpecho ~\n"); printf("~ udpecho ~\n");
struct udpecho_ctx uctx = {0}; struct udpecho_ctx uctx = {0};
int opt, udp_sock, verbose = 0; int opt, udp_sock = 0;
char *port = NULL, *bindhost = NULL; char *port = NULL, *bindhost = NULL;
struct evt_core_ctx evts = {0}; struct evt_core_ctx evts = {0};
uctx.mp.payload_size = 1500; //uctx.mp.payload_size = 1500;
measure_params_init (&uctx.mp);
// 1. Parse parameters // 1. Parse parameters
while ((opt = getopt(argc, argv, "b:p:vms:")) != -1) { while ((opt = getopt(argc, argv, "b:p:vms:")) != -1) {
switch(opt) { switch(opt) {
case 'v': case 'v':
verbose++; uctx.verbose++;
break; break;
case 'p': case 'p':
port = optarg; port = optarg;
@ -64,14 +66,13 @@ int main(int argc, char** argv) {
uctx.is_measlat = 1; uctx.is_measlat = 1;
break; break;
case 's': case 's':
uctx.mp.payload_size = atoi(optarg); measure_params_setpl (&uctx.mp, atoi(optarg));
break; break;
default: default:
goto usage; goto usage;
} }
} }
if (bindhost == NULL) bindhost = "127.0.0.1"; if (bindhost == NULL) bindhost = "127.0.0.1";
measure_params_init (&uctx.mp);
measure_state_init (&uctx.mp, &uctx.ms); measure_state_init (&uctx.mp, &uctx.ms);
// 2. Register category // 2. Register category
@ -84,7 +85,7 @@ int main(int argc, char** argv) {
.flags = EPOLLIN | EPOLLET, .flags = EPOLLIN | EPOLLET,
.socklist = NULL .socklist = NULL
}; };
evt_core_init(&evts, verbose); evt_core_init(&evts, uctx.verbose);
evt_core_add_cat(&evts, &udp_read); evt_core_add_cat(&evts, &udp_read);
// 3. Register UDP socket // 3. Register UDP socket