From a5d5d3738906917128392ae79675846d290a9bdf Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Mon, 16 Sep 2019 14:40:58 +0200 Subject: [PATCH] Close to work --- CMakeLists.txt | 6 +- r/thunder_configure.R | 8 +- src/algo_thunder.c | 166 ++++++++++++++++++++++++------------------ src/proxy.h | 23 ++++++ 4 files changed, 128 insertions(+), 75 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e6661e6..d4a3fa4 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -45,6 +45,7 @@ add_executable(udpecho ${CSOURCES} src/udp_echo.c) add_executable(torecho ${CSOURCES} src/tor_echo.c) add_executable(capdiff ${CSOURCES} src/capdiff.c) add_executable(capreplay ${CSOURCES} src/capreplay.c) +add_executable(donar_unit_test ${CSOURCES} src/test.c) find_package(PkgConfig REQUIRED) pkg_search_module(GLIB REQUIRED glib-2.0) @@ -67,6 +68,9 @@ target_link_libraries(capdiff ${GLIB_LDFLAGS}) target_include_directories(capreplay PRIVATE ${GLIB_INCLUDE_DIRS}) target_link_libraries(capreplay ${GLIB_LDFLAGS}) -install(TARGETS donar measlat udpecho torecho capdiff capreplay +target_include_directories(donar_unit_test PRIVATE ${GLIB_INCLUDE_DIRS}) +target_link_libraries(donar_unit_test ${GLIB_LDFLAGS}) + +install(TARGETS donar measlat udpecho torecho capdiff capreplay donar_unit_test RUNTIME DESTINATION bin LIBRARY DESTINATION lib) diff --git a/r/thunder_configure.R b/r/thunder_configure.R index a7b2588..80950fe 100644 --- a/r/thunder_configure.R +++ b/r/thunder_configure.R @@ -3,7 +3,7 @@ library(sqldf) library(plyr) library(cowplot) -thunder_ms <- read.csv("thunder_configure_7.csv") +thunder_ms <- read.csv("thunder_configure_8.csv") thunder_ms <- sqldf("select run,jmax,links,latency, CAST(latency as real) / 1000. as lat_ms from thunder_ms") thunder_ms$links <- as.factor(thunder_ms$links) @@ -12,7 +12,7 @@ thunder_ms$jmax <- as.factor(thunder_ms$jmax) v1 <- ggplot(data = thunder_ms, aes(x = jmax, y=lat_ms, fill=links)) + geom_boxplot() + scale_fill_grey() + - scale_y_log10() + + #scale_y_log10() + ylab("latency (ms)") + xlab("max allowed jitter") + theme_classic() @@ -30,14 +30,14 @@ v2 <- ggplot(data = thunder_rcv, aes(x = jmax, y=dlv, fill=links)) + xlab("max allowed jitter") + theme_classic() -thunder_bw <- read.csv("thunder_configure_7_bw.csv") +thunder_bw <- read.csv("thunder_configure_8_bw.csv") thunder_bw <- sqldf("select run,jmax,links,udp_sent,udp_rcv,cells_sent,cells_rcv,1.0*cells_sent/udp_sent as sent_ratio,1.0*cells_rcv/udp_rcv as rcv_ratio from thunder_bw where udp_sent > 4000") thunder_bw$jmax <- as.factor(thunder_bw$jmax) thunder_bw$links <- as.factor(thunder_bw$links) v3 <- ggplot(data = thunder_bw, aes(x = jmax, y=sent_ratio, fill=links)) + geom_boxplot() + - scale_y_log10() + + #scale_y_log10() + #scale_y_log10(labels = scales::percent) + scale_fill_grey() + ylab("bandwidth ratio") + diff --git a/src/algo_thunder.c b/src/algo_thunder.c index 77c58dc..33a5ec4 100644 --- a/src/algo_thunder.c +++ b/src/algo_thunder.c @@ -4,24 +4,7 @@ #include "url.h" #include "proxy.h" #include "timer.h" - -// A Tor cell size is 512 bytes but handle only 498 bytes of data -#define TOR_CELL_SIZE 498 -#define MAX_LINKS 64 - -struct thunder_ctx { - uint16_t recv_id; - uint16_t emit_id; - uint8_t selected_link; - uint8_t total_links; - uint64_t delta_t_per_link[MAX_LINKS]; - uint64_t rcv_delta_t_per_link[MAX_LINKS]; - uint64_t received_pkts_on_link[MAX_LINKS]; - uint64_t blacklisted[MAX_LINKS]; - size_t monit_pkt_size; - int64_t allowed_jitter_ms; - struct timespec prev_link_time, prev_rcv_link_time; -}; +#include "proxy.h" uint64_t compute_delta(struct timespec* prev_time, uint64_t max) { struct timespec curr; @@ -135,16 +118,21 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu union abstract_packet *new_ap = buffer_append_ap (bp_dup, &links); // 4. We compute the time difference - uint64_t mili_sec = compute_delta (&thunderc->prev_link_time, UINT16_MAX); + uint64_t mili_sec = 0; + if (protect == thunderc->total_links) + mili_sec = compute_delta (&thunderc->prev_link_time, UINT16_MAX); + printf("send packet on link %d with delta=%ld\n", thunderc->selected_link, mili_sec); // 5. We create the array struct link_info *li = &new_ap->fmt.content.link_monitoring_thunder.links_status; for (int i = 0; i < thunderc->total_links; i++) { + if (thunderc->sent_pkts_on_link[i] == 0) continue; thunderc->delta_t_per_link[i] += mili_sec; li[i].delta_t = thunderc->delta_t_per_link[i] > UINT16_MAX ? UINT16_MAX : thunderc->delta_t_per_link[i]; } thunderc->delta_t_per_link[thunderc->selected_link] = 0; li[thunderc->selected_link].delta_t = 0; + thunderc->sent_pkts_on_link[thunderc->selected_link]++; if (ctx->verbose > 1) { dump_buffer_packet(bp_dup); @@ -208,7 +196,6 @@ int is_in_order(struct thunder_ctx* thunderc, uint8_t link_id) { void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct thunder_ctx* thunderc = app_ctx->misc; - union abstract_packet* ap = buffer_first_ap (bp); while (ap != NULL && ap->fmt.headers.cmd != CMD_LINK_MONITORING_THUNDER) ap = ap_next(ap); if (ap == NULL) { @@ -230,57 +217,60 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b uint64_t mili_sec = compute_delta (&thunderc->prev_rcv_link_time, UINT16_MAX); for (int i = 0; i < thunderc->total_links; i++) { + if (thunderc->received_pkts_on_link[i] == 0) continue; thunderc->rcv_delta_t_per_link[i] += mili_sec; } thunderc->rcv_delta_t_per_link[link_id] = 0; - // 2. Disable links that have received packets too late - if (is_in_order (thunderc, link_id)) { - /*printf("Local: "); - for (int i = 0; i < thunderc->total_links; i++) { - printf("%ld ", thunderc->rcv_delta_t_per_link[i]); + // 2. + + int64_t current_owdd = 0; + int64_t biggest_owdd = current_owdd; + + for (int i = 0; i < thunderc->total_links; i++) { + int64_t remote_delta = 0, local_delta = 0, owd_difference = 0; + + uint64_t expected = i <= link_id ? thunderc->received_pkts_on_link[link_id] : thunderc->received_pkts_on_link[link_id] - 1; + if (thunderc->received_pkts_on_link[i] != expected) continue; + + remote_delta = li[i].delta_t; + local_delta = thunderc->rcv_delta_t_per_link[i]; + owd_difference = local_delta - remote_delta; + //printf("li[i].delta_t=%d, remote_delta=%ld, thunderc->rcv_delta_t_per_link[i]=%ld, local_delta=%ld, owd_difference=%ld\n", li[i].delta_t, remote_delta, thunderc->rcv_delta_t_per_link[i], local_delta, owd_difference); + //if (remote_delta > 10000 || local_delta > 10000) continue; // Too many time elapsed for useful comparison + if (owd_difference > biggest_owdd) { + biggest_owdd = owd_difference; } - printf("\n"); - printf("Packet: "); - for (int i = 0; i < thunderc->total_links; i++) { - printf("%d ", li[i].delta_t); + //printf("----> %ld vs %ld for i=%d, link_id=%d, owd_difference=%ld, smallest=%ld, remote=%ld, local=%ld\n", thunderc->received_pkts_on_link[i], expected, i, link_id, owd_difference, smallest_owdd, local_delta, remote_delta); + + if (owd_difference <= thunderc->allowed_jitter_ms && owd_difference >= -thunderc->allowed_jitter_ms) continue; + + struct block_info *bi = malloc(sizeof(struct block_info)); + bi->app_ctx = app_ctx; bi->is_timeout = 1; + bi->is_timeout = 0; + bi->app_ctx = app_ctx; + sprintf(bi->reason, "Erreur"); + + if (owd_difference < -thunderc->allowed_jitter_ms) { + bi->i = i; + bi->missing = expected; + sprintf(bi->reason, " Packet Too Late - Blocked link %d owd_diff=%ld, local_delta=%ld, remote_delta=%ld", i, owd_difference, local_delta, remote_delta); + } else if (owd_difference > thunderc->allowed_jitter_ms) { + bi->i = link_id; + bi->missing = thunderc->received_pkts_on_link[link_id]; + sprintf(bi->reason, " Packet Too Late - Blocked link %d owd_diff=%ld, local_delta=%ld, remote_delta=%ld", link_id, owd_difference, local_delta, remote_delta); + } else { + fprintf(stderr, "Algorithm is wrong\n"); + exit(EXIT_FAILURE); } - printf("\n");*/ - for (int i = 0; i < thunderc->total_links; i++) { - int64_t remote_delta = 0, local_delta = 0, owd_difference = 0; + on_block(ctx, bi); + } - uint64_t expected = i <= link_id ? thunderc->received_pkts_on_link[link_id] : thunderc->received_pkts_on_link[link_id] - 1; - if (thunderc->received_pkts_on_link[i] != expected) continue; - - remote_delta = li[i].delta_t; - local_delta = thunderc->rcv_delta_t_per_link[i]; - if (remote_delta > 1000 || local_delta > 10000) continue; // Too many time elapsed for useful comparison - - owd_difference = local_delta - remote_delta; - if (owd_difference <= thunderc->allowed_jitter_ms && owd_difference >= -thunderc->allowed_jitter_ms) continue; - - struct block_info *bi = malloc(sizeof(struct block_info)); - bi->app_ctx = app_ctx; bi->is_timeout = 1; - bi->is_timeout = 0; - bi->app_ctx = app_ctx; - sprintf(bi->reason, "Erreur"); - - if (owd_difference < -thunderc->allowed_jitter_ms) { - bi->i = i; - bi->missing = expected; - sprintf(bi->reason, " Packet Too Late - Blocked link %d owd_diff=%ld, local_delta=%ld, remote_delta=%ld", i, owd_difference, local_delta, remote_delta); - } else if (owd_difference > thunderc->allowed_jitter_ms) { - bi->i = link_id; - bi->missing = thunderc->received_pkts_on_link[link_id]; - sprintf(bi->reason, " Packet Too Late - Blocked link %d owd_diff=%ld, local_delta=%ld, remote_delta=%ld", link_id, owd_difference, local_delta, remote_delta); - } else { - fprintf(stderr, "Algorithm is wrong\n"); - exit(EXIT_FAILURE); - } - - on_block(ctx, bi); - } + // 2.5 Compute link jitter + int64_t current_link_jitter = biggest_owdd; + for (int i = 0; i < thunderc->total_links; i++) { + thunderc->estimated_sent[i] = li[i].delta_t + current_link_jitter; } // 3. Disable links that miss packets @@ -318,7 +308,8 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b } struct unpad_info { - union abstract_packet *ap_arr_pl[MAX_LINKS], *ap_arr_meta[MAX_LINKS]; + union abstract_packet *ap_arr_pl[MAX_LINKS], + *ap_arr_meta[MAX_LINKS]; uint8_t ap_arr_vals; }; @@ -334,15 +325,34 @@ void unpad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff fprintf(stderr, "Unexpected packet, expecting udp metadata\n"); } - if (ap_meta->fmt.content.udp_metadata_thunder.id > thunderc->recv_id) { - ui->ap_arr_pl[ui->ap_arr_vals] = ap; - ui->ap_arr_meta[ui->ap_arr_vals] = ap_meta; - ui->ap_arr_vals++; - } + ui->ap_arr_pl[ui->ap_arr_vals] = ap; + ui->ap_arr_meta[ui->ap_arr_vals] = ap_meta; + ui->ap_arr_vals++; } if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Unpad done\n"); } +int compare_int64(const void *a,const void *b) { + int64_t *x = (int64_t *) a; + int64_t *y = (int64_t *) b; + return *x - *y; +} + +void get_estimation(struct thunder_ctx* thunderc, int64_t* sorted_estimation, uint64_t* sorted_occurencies) { + uint64_t deleted = 0; + memcpy (sorted_estimation, thunderc->estimated_sent, thunderc->total_links * sizeof(int64_t)); + qsort(sorted_estimation, thunderc->total_links, sizeof(int64_t), compare_int64); + for (int i = 0; i < thunderc->total_links - 1; i++) { + if (sorted_estimation[i - deleted] == sorted_estimation[i - deleted + 1]) { + for (int j = i - deleted + 1; j < thunderc->total_links - 1 - deleted; j++) { + sorted_estimation[j] = sorted_estimation[j + 1]; + } + deleted++; + } + } + *sorted_occurencies = thunderc->total_links - deleted; +} + void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct unpad_info *ui) { struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; struct thunder_ctx* thunderc = app_ctx->misc; @@ -350,11 +360,27 @@ void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff struct evt_core_fdinfo *to_fdinfo = NULL; uint64_t delivered = 0; + int64_t sorted_estimation[MAX_LINKS]; + uint64_t sorted_occurencies; + get_estimation (thunderc, sorted_estimation, &sorted_occurencies); + + /* + printf("%d packets with estimated jitter ", ui->ap_arr_vals); + for (int i = 0; i < sorted_occurencies; i++) { + printf("%ld, ", sorted_estimation[i]); + } + printf("\n");*/ + for (int i = ui->ap_arr_vals-1; i >= 0; i--) { //fprintf(stderr, "i=%d, ui->ap_arr_vals=%d\n", i, ui->ap_arr_vals); - if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue; + if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue; // already delivered thunderc->recv_id = ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id; + int64_t estimation_index = i > sorted_occurencies - 1 ? sorted_occurencies - 1 : i; + if (sorted_estimation[estimation_index] > thunderc->allowed_jitter_ms) { + continue; + } + // Find destination sprintf(url, "udp:write:127.0.0.1:%d", ui->ap_arr_pl[i]->fmt.content.udp_encapsulated.port); to_fdinfo = evt_core_get_from_url (ctx, url); diff --git a/src/proxy.h b/src/proxy.h index 9d115f9..de8faa5 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -93,3 +93,26 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); + +//@FIXME UGLY + +// A Tor cell size is 512 bytes but handle only 498 bytes of data +#define TOR_CELL_SIZE 498 +#define MAX_LINKS 64 + +struct thunder_ctx { + uint16_t recv_id; + uint16_t emit_id; + uint8_t selected_link; + uint8_t total_links; + uint64_t delta_t_per_link[MAX_LINKS]; + uint64_t rcv_delta_t_per_link[MAX_LINKS]; + uint64_t received_pkts_on_link[MAX_LINKS]; + uint64_t sent_pkts_on_link[MAX_LINKS]; + uint64_t blacklisted[MAX_LINKS]; + int64_t estimated_sent[MAX_LINKS]; + size_t monit_pkt_size; + int64_t allowed_jitter_ms; + struct timespec prev_link_time, prev_rcv_link_time; +}; +void get_estimation(struct thunder_ctx* thunderc, int64_t* sorted_estimation, uint64_t* sorted_occurencies);