tor_multipath_voip/src/meas_lat.c
2019-10-16 18:15:09 +02:00

443 lines
14 KiB
C

#include <stdlib.h>
#include <stdio.h>
#include <sys/timerfd.h>
#include <time.h>
#include <unistd.h>
#include <errno.h>
#include "evt_core.h"
#include "net_tools.h"
#include "socks5.h"
#include "utils.h"
#include "measure.h"
#include "url.h"
#include "tor_os.h"
#include "tor_ctl.h"
struct measlat_ctx {
struct measure_conf mc;
struct sockaddr_in addr;
socklen_t addrlen;
int verbose, is_timer_started, is_from_needed, tor_flags;
char *host, *port, *transport;
};
void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) {
struct timespec now;
struct itimerspec timer_config;
char url[1024];
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo = {0};
fdinfo.cat = &cat;
fdinfo.url = url;
struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read");
if (ucat == NULL) {
fprintf(stderr, "Category udp-read not found\n");
exit(EXIT_FAILURE);
}
struct measlat_ctx* mctx = ucat->app_ctx;
mctx->is_timer_started = 1;
if (clock_gettime(CLOCK_REALTIME, &now) == -1) {
perror("clock_gettime");
exit(EXIT_FAILURE);
}
uint64_t micro_sec = mctx->mc.interval;
timer_config.it_value.tv_sec = next_tick == NULL ? now.tv_sec + 1 : next_tick->tv_sec;
timer_config.it_value.tv_nsec = next_tick == NULL ? now.tv_nsec : next_tick->tv_nsec;
timer_config.it_interval.tv_sec = micro_sec / 1000;
timer_config.it_interval.tv_nsec = micro_sec % 1000 * 1000000;
printf("timer_config: sec=%ld nsec=%ld \n", (uint64_t) timer_config.it_value.tv_sec, (uint64_t) timer_config.it_value.tv_nsec);
fdinfo.fd = timerfd_create(CLOCK_REALTIME, 0);
if (fdinfo.fd == -1) {
perror("Unable to timerfd_create");
exit(EXIT_FAILURE);
}
if (timerfd_settime (fdinfo.fd, TFD_TIMER_ABSTIME, &timer_config, NULL) == -1) {
perror("Unable to timerfd_time");
exit(EXIT_FAILURE);
}
fdinfo.cat->name = "timer";
sprintf(fdinfo.url, "timer:%ld:%ld", mctx->mc.interval, mctx->mc.max_measure);
evt_core_add_fd (evts, &fdinfo);
printf("--- Timer registered\n");
}
int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
return 1;
}
int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
ssize_t nread;
struct measlat_ctx* mctx = fdinfo->cat->app_ctx;
if (mctx->is_from_needed) nread = recvfrom(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen);
else nread = recv(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
if ((nread == -1 && errno == EAGAIN) || nread == 0) return 1;
// @FIXME logic is wrong for TCP here but would lead (hopefully) to a crash
// First we parse the packet and exit if needed
measure_parse (nread, &mctx->mc);
// Old behaviour where we work in a RTT way and send back the packet
if (measure_need_reply (&mctx->mc)) {
ssize_t nwritten;
if (mctx->is_from_needed) nwritten = sendto(fdinfo->fd, mctx->mc.payload, nread, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen);
else nwritten = send(fdinfo->fd, mctx->mc.payload, nread, 0);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
if (nwritten != nread) {
fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", nread, nwritten);
perror("write errno");
exit(EXIT_FAILURE);
}
return 0;
}
// Used to start sending from the server in sync with client
if (!mctx->is_timer_started && mctx->mc.counter <= mctx->mc.max_measure) {
struct timespec next_tick = {0};
measure_next_tick(&mctx->mc, &next_tick);
register_timer (ctx, &next_tick);
}
return 0;
}
int on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int conn_sock1, conn_sock2;
struct sockaddr_in addr;
socklen_t in_len;
char url[1024], port[6];
struct evt_core_cat local_cat = {0};
struct evt_core_fdinfo to_fdinfo = {0};
to_fdinfo.cat = &local_cat;
to_fdinfo.url = url;
in_len = sizeof(addr);
conn_sock1 = accept(fdinfo->fd, (struct sockaddr*)&addr, &in_len);
if (conn_sock1 == -1 && errno == EAGAIN) return 1;
if (conn_sock1 == -1) goto co_error;
url_get_port(port, fdinfo->url);
to_fdinfo.fd = conn_sock1;
to_fdinfo.cat->name = "tcp-read";
sprintf(to_fdinfo.url, "tcp:read:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo);
return 0;
co_error:
perror("Failed to handle new connection");
exit(EXIT_FAILURE);
}
int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
ssize_t s;
uint64_t ticks = 0;
struct measlat_ctx* mctx = fdinfo->cat->app_ctx;
s = read(fdinfo->fd, &ticks, sizeof(uint64_t));
if (s == -1 && errno == EAGAIN) return 1;
if (s != sizeof(uint64_t)) {
perror("Read error");
exit(EXIT_FAILURE);
}
if (ticks != 1) {
fprintf(stderr, "Has ticked %lu times, expected 1 time. This is a bug\n", ticks);
}
struct measure_packet* head = measure_generate(&mctx->mc);
//printf("send(id=%ld,is_echo=%d)\n", head->counter, head->is_echo);
struct evt_core_fdinfo* tgtinfo = evt_core_get_first_from_cat (ctx, "udp-read");
if (tgtinfo == NULL) tgtinfo = evt_core_get_first_from_cat (ctx, "tcp-read");
if (!mctx->mc.is_server && tgtinfo == NULL) {
fprintf(stderr, "Unable to find a fdinfo in udp-read nor in tcp-read. Quitting...\n");
exit(EXIT_FAILURE);
} else if (mctx->mc.is_server && (tgtinfo == NULL || mctx->mc.counter > mctx->mc.max_measure)) {
printf("No connection yet\n");
struct evt_core_fdinfo* timer_fd = evt_core_get_first_from_cat (ctx, "timer");
evt_core_rm_fd(ctx, timer_fd->fd);
mctx->is_timer_started = 0;
return 1;
}
if (mctx->is_from_needed) s = sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen);
else s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
if (s < 0) {
perror("Send error");
//exit(EXIT_FAILURE);
}
return 0;
}
int on_socks5_success_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
char url[1024];
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo_n = {0};
struct socks5_ctx* s5ctx = fdinfo->other;
fdinfo_n.cat = &cat;
fdinfo_n.url = url;
struct evt_core_cat* ucat = evt_core_get_from_cat (ctx, "tcp-read");
if (ucat == NULL) {
fprintf(stderr, "Category udp-read not found\n");
exit(EXIT_FAILURE);
}
struct measlat_ctx* mctx = ucat->app_ctx;
fdinfo_n.fd = dup(fdinfo->fd);
fdinfo_n.cat->name = "tcp-read";
sprintf(fdinfo_n.url, "tcp:read:%s:%d", s5ctx->addr, s5ctx->port);
evt_core_add_fd (ctx, &fdinfo_n);
printf("--- Tor socket registered\n");
register_timer(ctx, NULL);
return 1;
}
void spawn_tor_client(struct evt_core_ctx* evts) {
struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "tcp-read");
if (ucat == NULL) {
fprintf(stderr, "Category udp-read not found\n");
exit(EXIT_FAILURE);
}
struct measlat_ctx* mctx = ucat->app_ctx;
socks5_create_dns_client (evts, "127.0.0.1", "9050", mctx->host, atoi(mctx->port));
printf("--- Tor client SOCKS started\n");
}
int on_socks5_failed_measlat(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
evt_core_rm_fd (ctx, fdinfo->fd);
spawn_tor_client(ctx);
return 1;
}
void register_categories(struct evt_core_ctx* evts, struct measlat_ctx* mctx) {
struct evt_core_cat template = {0};
template.app_ctx = mctx;
evt_core_init(evts, mctx->verbose);
template.cb = on_timer;
template.name = "timer";
template.flags = EPOLLIN | EPOLLET;
evt_core_add_cat(evts, &template);
template.cb = on_receive_measure_packet; // intended but not elegant
template.err_cb = NULL; // intended but not elegant
template.name = "tcp-read";
template.flags = EPOLLIN | EPOLLET;
evt_core_add_cat(evts, &template);
template.cb = on_receive_measure_packet;
template.err_cb = on_receive_measure_packet_err;
template.name = "udp-read";
template.flags = EPOLLIN | EPOLLET;
evt_core_add_cat(evts, &template);
template.cb = on_tcp_co;
template.err_cb = NULL;
template.name = "tcp-co";
template.flags = EPOLLIN | EPOLLET;
evt_core_add_cat(evts, &template);
template.cb = on_socks5_success_measlat;
template.err_cb = on_socks5_failed_measlat;
template.name = "socks5-success";
template.flags = EPOLLET;
evt_core_add_cat(evts, &template);
template.cb = on_socks5_failed_measlat;
template.err_cb = on_socks5_failed_measlat;
template.name = "socks5-failed";
template.flags = EPOLLET;
evt_core_add_cat(evts, &template);
socks5_init(evts);
printf("--- Categories registered\n");
}
void spawn_udp_client(struct evt_core_ctx* evts) {
struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "udp-read");
if (ucat == NULL) {
fprintf(stderr, "Category udp-read not found\n");
exit(EXIT_FAILURE);
}
struct measlat_ctx* mctx = ucat->app_ctx;
int udp_sock = create_udp_client (mctx->host, mctx->port);
char url[1024];
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo = {0};
fdinfo.cat = &cat;
fdinfo.url = url;
fdinfo.fd = udp_sock;
fdinfo.cat->name = "udp-read";
fdinfo.other = &mctx->mc;
fdinfo.free_other = NULL;
sprintf(fdinfo.url, "udp:read:%s:%s", mctx->host, mctx->port);
evt_core_add_fd (evts, &fdinfo);
printf("--- UDP client registered\n");
register_timer(evts, NULL);
}
void spawn_udp_server(struct evt_core_ctx* evts) {
struct evt_core_cat* ucat = evt_core_get_from_cat (evts, "udp-read");
if (ucat == NULL) {
fprintf(stderr, "Category udp-read not found\n");
exit(EXIT_FAILURE);
}
struct measlat_ctx* mctx = ucat->app_ctx;
int udp_sock = create_udp_server (mctx->host, mctx->port);
char url[1024];
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo = {0};
fdinfo.cat = &cat;
fdinfo.url = url;
fdinfo.fd = udp_sock;
sprintf(url, "udp:rw:127.0.0.1:%s", mctx->port);
fdinfo.cat->name = "udp-read";
evt_core_add_fd(evts, &fdinfo);
printf("--- UDP server is listening\n");
}
void spawn_tor_server(struct evt_core_ctx* evts, uint16_t *ports) {
char buffer[1024];
int tcp_serv_sock, err;
sprintf(buffer, "%d", ports[0]);
tcp_serv_sock = create_tcp_server ("0.0.0.0", buffer);
err = listen(tcp_serv_sock, SOMAXCONN);
struct evt_core_cat cat = {0};
struct evt_core_fdinfo fdinfo = {0};
fdinfo.cat = &cat;
fdinfo.url = buffer;
fdinfo.fd = tcp_serv_sock;
sprintf(buffer, "tcp:co:127.0.0.1:%d", ports[0]);
fdinfo.cat->name = "tcp-co";
evt_core_add_fd(evts, &fdinfo);
printf("--- TCP server is listening\n");
}
void measlat_create_onion_services(struct tor_os_str* tos, struct tor_ctl* tctl, uint16_t* ports, int ports_count, enum TOR_ONION_FLAGS tof) {
tor_os_create (tos, "onion_services.pub", "onion_services.txt", ports_count);
tor_os_read (tos);
int err = 0;
err = tor_ctl_connect (tctl, "127.0.0.1", "9051");
if (err < 0) {
fprintf(stderr, "Unable to open Tor Socket\n");
exit(EXIT_FAILURE);
}
err = tor_ctl_add_onion (tctl, tos, ports, tof);
if (err != 0) {
fprintf(stderr, "Unable to create Onion Services (error: %d)\n", err);
exit(EXIT_FAILURE);
}
}
int main(int argc, char** argv) {
setvbuf(stdout, NULL, _IONBF, 0);
printf("~ measlat ~\n");
int opt;
struct measlat_ctx mctx = {0};
struct evt_core_ctx evts = {0};
// 1. Parse parameters
while ((opt = getopt(argc, argv, "vh:p:c:s:i:t:lrn")) != -1) {
switch(opt) {
case 'v':
mctx.verbose++;
break;
case 'h': // host
mctx.host = optarg;
break;
case 'p': // port
mctx.port = optarg;
break;
case 'l':
mctx.mc.is_server = 1;
break;
case 't': // transport
mctx.transport = optarg;
break;
case 'c': // count
mctx.mc.max_measure = atoi(optarg);
break;
case 's': // size - payload in bytes
mctx.mc.payload_size = atoi(optarg);
break;
case 'r':
mctx.mc.is_rtt = 1;
break;
case 'n':
mctx.tor_flags |= TOR_ONION_FLAG_NON_ANONYMOUS;
break;
case 'i': // interval - every ms
mctx.mc.interval = atoi(optarg);
break;
default:
goto usage;
}
}
// 2. Check and fix parameters
measure_prepare (&mctx.mc);
mctx.addrlen = sizeof(mctx.addr);
if (mctx.transport == NULL) mctx.transport = "udp";
if (mctx.host == NULL) mctx.host = "127.0.0.1";
if (mctx.port == NULL) mctx.port = strcmp(mctx.transport, "udp") == 0 ? "9000" : "7500";
printf("[measlat_conf] host=%s, port=%s, listen=%d, transport=%s, count=%ld, size=%ld, interval=%ld, is_rtt=%d\n",
mctx.host, mctx.port, mctx.mc.is_server, mctx.transport, mctx.mc.max_measure, mctx.mc.payload_size, mctx.mc.interval, mctx.mc.is_rtt);
// 3. Bind events
register_categories(&evts, &mctx);
struct tor_os_str tos = {0};
struct tor_ctl tctl = {0};
uint16_t ports[] = {7500};
int ports_count = sizeof(ports) / sizeof(ports[0]);
if (mctx.mc.is_server && strcmp(mctx.transport, "udp") == 0) {
spawn_udp_server (&evts);
mctx.is_from_needed = 1;
}
else if (mctx.mc.is_server && strcmp(mctx.transport, "tor") == 0) {
spawn_tor_server(&evts, ports);
measlat_create_onion_services (&tos, &tctl, ports, ports_count, mctx.tor_flags);
printf("--- Onion services created\n");
}
else if (strcmp(mctx.transport, "udp") == 0) spawn_udp_client(&evts);
else if (strcmp(mctx.transport, "tor") == 0) spawn_tor_client(&evts);
else exit(EXIT_FAILURE);
// 4. Run main loop
evt_core_loop(&evts);
return 0;
usage:
fprintf(stderr, "Usage: %s -h <host> -p <port> [-l] [-r] [-t <udp|tor>] [-c <count>] [-i <ms>] [-s <bytes>]\n", argv[0]);
exit(EXIT_FAILURE);
}