fix fix fix

This commit is contained in:
Quentin 2021-01-27 21:44:26 +01:00
parent 1a8e088431
commit 373d295e5a
9 changed files with 25 additions and 17 deletions

View file

@ -62,7 +62,7 @@ popd
-t udp \ -t udp \
-h 127.0.0.1 \ -h 127.0.0.1 \
-p $LOCAL_PORT \ -p $LOCAL_PORT \
-c 1350 \ -c 135000 \
-i 40 \ -i 40 \
-m torfone \ -m torfone \
-s 172 2>&1 | cyan & -s 172 2>&1 | cyan &
@ -82,7 +82,7 @@ sleep 1
-t udp \ -t udp \
-h 127.13.3.7 \ -h 127.13.3.7 \
-p $LOCAL_PORT \ -p $LOCAL_PORT \
-c 1350 \ -c 135000 \
-i 40 \ -i 40 \
-m torfone \ -m torfone \
-s 172 2>&1 | purple -s 172 2>&1 | purple

View file

@ -92,6 +92,7 @@ struct light_ctx {
int is_measlat; int is_measlat;
int explain; int explain;
int disable_scheduler; int disable_scheduler;
int base_port;
struct stat_entry stats[MAX_LINKS]; struct stat_entry stats[MAX_LINKS];
enum schedule_group_target sched_strat; enum schedule_group_target sched_strat;
}; };
@ -121,6 +122,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
lightc->disable_scheduler = 0; lightc->disable_scheduler = 0;
lightc->active = 0; lightc->active = 0;
lightc->sched_strat = SCHEDULE_BOTH; lightc->sched_strat = SCHEDULE_BOTH;
lightc->base_port = ap->base_port;
uint64_t window = 2000; uint64_t window = 2000;
if (ap->algo_specific_params != NULL) { if (ap->algo_specific_params != NULL) {
@ -256,7 +258,7 @@ void monitoring(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct
} }
// Update current packet status // Update current packet status
int link_id = url_get_port_int(fdinfo->url) - 7500; int link_id = url_get_port_int(fdinfo->url) - lightc->base_port;
struct timing_entry *te2 = &lightc->historic[pkt_id % HISTORIC_SIZE]; struct timing_entry *te2 = &lightc->historic[pkt_id % HISTORIC_SIZE];
te2->state = te2->pkt_id == pkt_id ? OOO_DONE : IN_ORDER; te2->state = te2->pkt_id == pkt_id ? OOO_DONE : IN_ORDER;
te2->pkt_id = pkt_id; te2->pkt_id = pkt_id;
@ -371,7 +373,7 @@ void algo_lightning_update_stats (struct light_ctx *lightc, struct evt_core_ctx*
// Disable broken links // Disable broken links
char url[256]; char url[256];
for (int i = 0; i < lightc->total_links; i++) { for (int i = 0; i < lightc->total_links; i++) {
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + i); sprintf(url, "tcp:write:127.0.0.1:%d", lightc->base_port + i);
struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url); struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) lightc->stats[i].ooo = -2; if (to_fdinfo == NULL) lightc->stats[i].ooo = -2;
} }
@ -399,7 +401,7 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) {
} }
set_now(&lightc->status[lightc->selected_link].last); set_now(&lightc->status[lightc->selected_link].last);
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link); sprintf(url, "tcp:write:127.0.0.1:%d", lightc->base_port + lightc->selected_link);
struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url); struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "[algo_lightning] PACKET DROPPED! We don't have any entry for %s currently\n", url); fprintf(stderr, "[algo_lightning] PACKET DROPPED! We don't have any entry for %s currently\n", url);
@ -543,7 +545,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
if (ctx->verbose > 1) { if (ctx->verbose > 1) {
printf("link ranking (%d fast links, %d total links)\nposition | port | score | class \n", target_to_use/2, target_to_use); printf("link ranking (%d fast links, %d total links)\nposition | port | score | class \n", target_to_use/2, target_to_use);
for (int i = 0; i < lightc->total_links; i++) { for (int i = 0; i < lightc->total_links; i++) {
printf("%8d | %4d | %9ld | %9s \n", i, lightc->stats[i].link_id+7500, lightc->stats[i].ooo, link_cat_str[lightc->status[lightc->stats[i].link_id].used]); printf("%8d | %4d | %9ld | %9s \n", i, lightc->stats[i].link_id + lightc->base_port, lightc->stats[i].ooo, link_cat_str[lightc->status[lightc->stats[i].link_id].used]);
} }
printf("\n"); printf("\n");
} }

View file

@ -83,8 +83,8 @@ int main(int argc, char** argv) {
if (dp.tor_port == NULL) dp.tor_port = dp.is_client ? "9050" : "9051"; if (dp.tor_port == NULL) dp.tor_port = dp.is_client ? "9050" : "9051";
if (dp.algo == NULL) goto in_error; if (dp.algo == NULL) goto in_error;
fprintf(stderr, "Passed parameters: client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, transfer_base_port=%d, tor_daemon_port=%s, onion_file=%s, links=%d, duplication=%d,%d\n", fprintf(stderr, "Passed parameters: is_waiting_bootstrap=%d, client=%d, server=%d, algo=%s, exposed_ports=%d, remote_ports=%d, transfer_base_port=%d, tor_daemon_port=%s, onion_file=%s, links=%d, duplication=%d,%d\n",
dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.base_port, dp.tor_port, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data); dp.is_waiting_bootstrap, dp.is_client, dp.is_server, dp.algo, dp.exposed_ports->len, dp.remote_ports->len, dp.base_port, dp.tor_port, dp.onion_file, dp.links, dp.fresh_data, dp.redundant_data);
if (dp.is_server) { if (dp.is_server) {

View file

@ -140,6 +140,7 @@ void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) {
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data, .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file, .capture_file = dp->capture_file,
.base_port = dp->base_port,
.sr = donar_client_stream_repair .sr = donar_client_stream_repair
}; };
ctx->tor_ip = dp->tor_ip; ctx->tor_ip = dp->tor_ip;

View file

@ -87,6 +87,7 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data, .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file, .capture_file = dp->capture_file,
.base_port = dp->base_port,
.sr = donar_server_stream_repair .sr = donar_server_stream_repair
}; };

View file

@ -152,20 +152,24 @@ int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
measure_parse (&mctx->mp, ms, mctx->verbose); measure_parse (&mctx->mp, ms, mctx->verbose);
// 4. Detect if it is a probe // 4. Detect if it is a probe
if (ms->mp_in->probe) { // Allow for probing without registering a timer if (ms->mp_in->probe && mctx->role == MEASLAT_SERVER) { // Allow for probing without registering a timer
int s = mctx->connectionless && mctx->role == MEASLAT_SERVER ? int s = mctx->connectionless && mctx->role == MEASLAT_SERVER ?
sendto(ms->fd, ms->mp_in, mctx->mp.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) : sendto(ms->fd, ms->mp_in, mctx->mp.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen) :
send(ms->fd, ms->mp_in, mctx->mp.payload_size, 0); send(ms->fd, ms->mp_in, mctx->mp.payload_size, 0);
if (!mctx->connectionless || mctx->role == MEASLAT_CLIENT) if (!mctx->connectionless)
evt_core_rm_fd (ctx, fdinfo->fd); evt_core_rm_fd (ctx, fdinfo->fd);
if (mctx->role == MEASLAT_CLIENT)
exit(EXIT_SUCCESS);
return 1; return 1;
} else if (ms->mp_in->probe && mctx->role == MEASLAT_CLIENT) {
if (ms->mp_out->probe) exit(EXIT_SUCCESS);
else {
fprintf(stderr, "lost probe, ignoring\n");
return 0;
}
} }
// 5. Persist our measurement object if needed // 5. Persist our measurement object if needed
// It includes starting a timer. // It includes starting a timer.
if (ms == &ms_transi) { if (ms == &ms_transi) {

View file

@ -86,11 +86,11 @@ void measure_parse(struct measure_params* mp, struct measure_state* ms, uint8_t
micro_sec = elapsed_micros (&ms->mp_in->emit_time, &curr); micro_sec = elapsed_micros (&ms->mp_in->emit_time, &curr);
if (ms->mp_in->counter <= mp->max_measure) { if (ms->mp_in->counter <= mp->max_measure && !ms->mp_in->probe) {
ms->log[ms->mp_in->counter-1] = micro_sec; ms->log[ms->mp_in->counter-1] = micro_sec;
} else { } else {
verbose = 1; verbose = 1;
fprintf(stderr, "measure will be ignored: counter=%lu, max_measure=%lu\n", ms->mp_in->counter, mp->max_measure); fprintf(stderr, "measure will be ignored: probe=%d, counter=%lu, max_measure=%lu\n", ms->mp_in->probe, ms->mp_in->counter, mp->max_measure);
} }
if (verbose) { if (verbose) {

View file

@ -103,7 +103,7 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo)
uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0; uint8_t is_rdy = fdinfo->cat->socklist->len >= app_ctx->link_count ? 1 : 0;
if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count); if (!app_ctx->is_rdy && is_rdy) printf("=== Our %d requested circuits are now up ===\n", app_ctx->link_count);
else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count); else if (app_ctx->is_rdy && !is_rdy) printf("=== Only %d/%d circuits are available, results could be biased ===\n", fdinfo->cat->socklist->len, app_ctx->link_count);
app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // @FIXME we don't want deactivation finally app_ctx->is_rdy = app_ctx->is_rdy || is_rdy; // we don't want deactivation finally, after all the test is running
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1; if ((bp = get_write_buffer(&app_ctx->br, fdinfo)) == NULL) return 1;

View file

@ -16,7 +16,7 @@ typedef int (*stream_repair)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* f
struct algo_params { struct algo_params {
uint8_t is_waiting_bootstrap; uint8_t is_waiting_bootstrap;
char *algo_name, *capture_file, *algo_specific_params; char *algo_name, *capture_file, *algo_specific_params;
int links, fresh_data, redundant_data; int links, fresh_data, redundant_data, base_port;
stream_repair sr; stream_repair sr;
}; };