Close to work
This commit is contained in:
parent
2ea6e46dba
commit
a5d5d37389
4 changed files with 128 additions and 75 deletions
|
@ -45,6 +45,7 @@ add_executable(udpecho ${CSOURCES} src/udp_echo.c)
|
|||
add_executable(torecho ${CSOURCES} src/tor_echo.c)
|
||||
add_executable(capdiff ${CSOURCES} src/capdiff.c)
|
||||
add_executable(capreplay ${CSOURCES} src/capreplay.c)
|
||||
add_executable(donar_unit_test ${CSOURCES} src/test.c)
|
||||
|
||||
find_package(PkgConfig REQUIRED)
|
||||
pkg_search_module(GLIB REQUIRED glib-2.0)
|
||||
|
@ -67,6 +68,9 @@ target_link_libraries(capdiff ${GLIB_LDFLAGS})
|
|||
target_include_directories(capreplay PRIVATE ${GLIB_INCLUDE_DIRS})
|
||||
target_link_libraries(capreplay ${GLIB_LDFLAGS})
|
||||
|
||||
install(TARGETS donar measlat udpecho torecho capdiff capreplay
|
||||
target_include_directories(donar_unit_test PRIVATE ${GLIB_INCLUDE_DIRS})
|
||||
target_link_libraries(donar_unit_test ${GLIB_LDFLAGS})
|
||||
|
||||
install(TARGETS donar measlat udpecho torecho capdiff capreplay donar_unit_test
|
||||
RUNTIME DESTINATION bin
|
||||
LIBRARY DESTINATION lib)
|
||||
|
|
|
@ -3,7 +3,7 @@ library(sqldf)
|
|||
library(plyr)
|
||||
library(cowplot)
|
||||
|
||||
thunder_ms <- read.csv("thunder_configure_7.csv")
|
||||
thunder_ms <- read.csv("thunder_configure_8.csv")
|
||||
|
||||
thunder_ms <- sqldf("select run,jmax,links,latency, CAST(latency as real) / 1000. as lat_ms from thunder_ms")
|
||||
thunder_ms$links <- as.factor(thunder_ms$links)
|
||||
|
@ -12,7 +12,7 @@ thunder_ms$jmax <- as.factor(thunder_ms$jmax)
|
|||
v1 <- ggplot(data = thunder_ms, aes(x = jmax, y=lat_ms, fill=links)) +
|
||||
geom_boxplot() +
|
||||
scale_fill_grey() +
|
||||
scale_y_log10() +
|
||||
#scale_y_log10() +
|
||||
ylab("latency (ms)") +
|
||||
xlab("max allowed jitter") +
|
||||
theme_classic()
|
||||
|
@ -30,14 +30,14 @@ v2 <- ggplot(data = thunder_rcv, aes(x = jmax, y=dlv, fill=links)) +
|
|||
xlab("max allowed jitter") +
|
||||
theme_classic()
|
||||
|
||||
thunder_bw <- read.csv("thunder_configure_7_bw.csv")
|
||||
thunder_bw <- read.csv("thunder_configure_8_bw.csv")
|
||||
thunder_bw <- sqldf("select run,jmax,links,udp_sent,udp_rcv,cells_sent,cells_rcv,1.0*cells_sent/udp_sent as sent_ratio,1.0*cells_rcv/udp_rcv as rcv_ratio from thunder_bw where udp_sent > 4000")
|
||||
thunder_bw$jmax <- as.factor(thunder_bw$jmax)
|
||||
thunder_bw$links <- as.factor(thunder_bw$links)
|
||||
|
||||
v3 <- ggplot(data = thunder_bw, aes(x = jmax, y=sent_ratio, fill=links)) +
|
||||
geom_boxplot() +
|
||||
scale_y_log10() +
|
||||
#scale_y_log10() +
|
||||
#scale_y_log10(labels = scales::percent) +
|
||||
scale_fill_grey() +
|
||||
ylab("bandwidth ratio") +
|
||||
|
|
|
@ -4,24 +4,7 @@
|
|||
#include "url.h"
|
||||
#include "proxy.h"
|
||||
#include "timer.h"
|
||||
|
||||
// A Tor cell size is 512 bytes but handle only 498 bytes of data
|
||||
#define TOR_CELL_SIZE 498
|
||||
#define MAX_LINKS 64
|
||||
|
||||
struct thunder_ctx {
|
||||
uint16_t recv_id;
|
||||
uint16_t emit_id;
|
||||
uint8_t selected_link;
|
||||
uint8_t total_links;
|
||||
uint64_t delta_t_per_link[MAX_LINKS];
|
||||
uint64_t rcv_delta_t_per_link[MAX_LINKS];
|
||||
uint64_t received_pkts_on_link[MAX_LINKS];
|
||||
uint64_t blacklisted[MAX_LINKS];
|
||||
size_t monit_pkt_size;
|
||||
int64_t allowed_jitter_ms;
|
||||
struct timespec prev_link_time, prev_rcv_link_time;
|
||||
};
|
||||
#include "proxy.h"
|
||||
|
||||
uint64_t compute_delta(struct timespec* prev_time, uint64_t max) {
|
||||
struct timespec curr;
|
||||
|
@ -135,16 +118,21 @@ int schedule(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct bu
|
|||
union abstract_packet *new_ap = buffer_append_ap (bp_dup, &links);
|
||||
|
||||
// 4. We compute the time difference
|
||||
uint64_t mili_sec = compute_delta (&thunderc->prev_link_time, UINT16_MAX);
|
||||
uint64_t mili_sec = 0;
|
||||
if (protect == thunderc->total_links)
|
||||
mili_sec = compute_delta (&thunderc->prev_link_time, UINT16_MAX);
|
||||
|
||||
printf("send packet on link %d with delta=%ld\n", thunderc->selected_link, mili_sec);
|
||||
// 5. We create the array
|
||||
struct link_info *li = &new_ap->fmt.content.link_monitoring_thunder.links_status;
|
||||
for (int i = 0; i < thunderc->total_links; i++) {
|
||||
if (thunderc->sent_pkts_on_link[i] == 0) continue;
|
||||
thunderc->delta_t_per_link[i] += mili_sec;
|
||||
li[i].delta_t = thunderc->delta_t_per_link[i] > UINT16_MAX ? UINT16_MAX : thunderc->delta_t_per_link[i];
|
||||
}
|
||||
thunderc->delta_t_per_link[thunderc->selected_link] = 0;
|
||||
li[thunderc->selected_link].delta_t = 0;
|
||||
thunderc->sent_pkts_on_link[thunderc->selected_link]++;
|
||||
|
||||
if (ctx->verbose > 1) {
|
||||
dump_buffer_packet(bp_dup);
|
||||
|
@ -208,7 +196,6 @@ int is_in_order(struct thunder_ctx* thunderc, uint8_t link_id) {
|
|||
void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
|
||||
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
|
||||
struct thunder_ctx* thunderc = app_ctx->misc;
|
||||
|
||||
union abstract_packet* ap = buffer_first_ap (bp);
|
||||
while (ap != NULL && ap->fmt.headers.cmd != CMD_LINK_MONITORING_THUNDER) ap = ap_next(ap);
|
||||
if (ap == NULL) {
|
||||
|
@ -230,22 +217,15 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b
|
|||
|
||||
uint64_t mili_sec = compute_delta (&thunderc->prev_rcv_link_time, UINT16_MAX);
|
||||
for (int i = 0; i < thunderc->total_links; i++) {
|
||||
if (thunderc->received_pkts_on_link[i] == 0) continue;
|
||||
thunderc->rcv_delta_t_per_link[i] += mili_sec;
|
||||
}
|
||||
thunderc->rcv_delta_t_per_link[link_id] = 0;
|
||||
|
||||
// 2. Disable links that have received packets too late
|
||||
if (is_in_order (thunderc, link_id)) {
|
||||
/*printf("Local: ");
|
||||
for (int i = 0; i < thunderc->total_links; i++) {
|
||||
printf("%ld ", thunderc->rcv_delta_t_per_link[i]);
|
||||
}
|
||||
printf("\n");
|
||||
printf("Packet: ");
|
||||
for (int i = 0; i < thunderc->total_links; i++) {
|
||||
printf("%d ", li[i].delta_t);
|
||||
}
|
||||
printf("\n");*/
|
||||
// 2.
|
||||
|
||||
int64_t current_owdd = 0;
|
||||
int64_t biggest_owdd = current_owdd;
|
||||
|
||||
for (int i = 0; i < thunderc->total_links; i++) {
|
||||
int64_t remote_delta = 0, local_delta = 0, owd_difference = 0;
|
||||
|
@ -255,9 +235,14 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b
|
|||
|
||||
remote_delta = li[i].delta_t;
|
||||
local_delta = thunderc->rcv_delta_t_per_link[i];
|
||||
if (remote_delta > 1000 || local_delta > 10000) continue; // Too many time elapsed for useful comparison
|
||||
|
||||
owd_difference = local_delta - remote_delta;
|
||||
//printf("li[i].delta_t=%d, remote_delta=%ld, thunderc->rcv_delta_t_per_link[i]=%ld, local_delta=%ld, owd_difference=%ld\n", li[i].delta_t, remote_delta, thunderc->rcv_delta_t_per_link[i], local_delta, owd_difference);
|
||||
//if (remote_delta > 10000 || local_delta > 10000) continue; // Too many time elapsed for useful comparison
|
||||
if (owd_difference > biggest_owdd) {
|
||||
biggest_owdd = owd_difference;
|
||||
}
|
||||
//printf("----> %ld vs %ld for i=%d, link_id=%d, owd_difference=%ld, smallest=%ld, remote=%ld, local=%ld\n", thunderc->received_pkts_on_link[i], expected, i, link_id, owd_difference, smallest_owdd, local_delta, remote_delta);
|
||||
|
||||
if (owd_difference <= thunderc->allowed_jitter_ms && owd_difference >= -thunderc->allowed_jitter_ms) continue;
|
||||
|
||||
struct block_info *bi = malloc(sizeof(struct block_info));
|
||||
|
@ -281,6 +266,11 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b
|
|||
|
||||
on_block(ctx, bi);
|
||||
}
|
||||
|
||||
// 2.5 Compute link jitter
|
||||
int64_t current_link_jitter = biggest_owdd;
|
||||
for (int i = 0; i < thunderc->total_links; i++) {
|
||||
thunderc->estimated_sent[i] = li[i].delta_t + current_link_jitter;
|
||||
}
|
||||
|
||||
// 3. Disable links that miss packets
|
||||
|
@ -318,7 +308,8 @@ void classify(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct b
|
|||
}
|
||||
|
||||
struct unpad_info {
|
||||
union abstract_packet *ap_arr_pl[MAX_LINKS], *ap_arr_meta[MAX_LINKS];
|
||||
union abstract_packet *ap_arr_pl[MAX_LINKS],
|
||||
*ap_arr_meta[MAX_LINKS];
|
||||
uint8_t ap_arr_vals;
|
||||
};
|
||||
|
||||
|
@ -334,15 +325,34 @@ void unpad(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff
|
|||
fprintf(stderr, "Unexpected packet, expecting udp metadata\n");
|
||||
}
|
||||
|
||||
if (ap_meta->fmt.content.udp_metadata_thunder.id > thunderc->recv_id) {
|
||||
ui->ap_arr_pl[ui->ap_arr_vals] = ap;
|
||||
ui->ap_arr_meta[ui->ap_arr_vals] = ap_meta;
|
||||
ui->ap_arr_vals++;
|
||||
}
|
||||
}
|
||||
if (ctx->verbose > 1) fprintf(stderr, " [algo_thunder] Unpad done\n");
|
||||
}
|
||||
|
||||
int compare_int64(const void *a,const void *b) {
|
||||
int64_t *x = (int64_t *) a;
|
||||
int64_t *y = (int64_t *) b;
|
||||
return *x - *y;
|
||||
}
|
||||
|
||||
void get_estimation(struct thunder_ctx* thunderc, int64_t* sorted_estimation, uint64_t* sorted_occurencies) {
|
||||
uint64_t deleted = 0;
|
||||
memcpy (sorted_estimation, thunderc->estimated_sent, thunderc->total_links * sizeof(int64_t));
|
||||
qsort(sorted_estimation, thunderc->total_links, sizeof(int64_t), compare_int64);
|
||||
for (int i = 0; i < thunderc->total_links - 1; i++) {
|
||||
if (sorted_estimation[i - deleted] == sorted_estimation[i - deleted + 1]) {
|
||||
for (int j = i - deleted + 1; j < thunderc->total_links - 1 - deleted; j++) {
|
||||
sorted_estimation[j] = sorted_estimation[j + 1];
|
||||
}
|
||||
deleted++;
|
||||
}
|
||||
}
|
||||
*sorted_occurencies = thunderc->total_links - deleted;
|
||||
}
|
||||
|
||||
void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct unpad_info *ui) {
|
||||
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
|
||||
struct thunder_ctx* thunderc = app_ctx->misc;
|
||||
|
@ -350,11 +360,27 @@ void adapt(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buff
|
|||
struct evt_core_fdinfo *to_fdinfo = NULL;
|
||||
uint64_t delivered = 0;
|
||||
|
||||
int64_t sorted_estimation[MAX_LINKS];
|
||||
uint64_t sorted_occurencies;
|
||||
get_estimation (thunderc, sorted_estimation, &sorted_occurencies);
|
||||
|
||||
/*
|
||||
printf("%d packets with estimated jitter ", ui->ap_arr_vals);
|
||||
for (int i = 0; i < sorted_occurencies; i++) {
|
||||
printf("%ld, ", sorted_estimation[i]);
|
||||
}
|
||||
printf("\n");*/
|
||||
|
||||
for (int i = ui->ap_arr_vals-1; i >= 0; i--) {
|
||||
//fprintf(stderr, "i=%d, ui->ap_arr_vals=%d\n", i, ui->ap_arr_vals);
|
||||
if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue;
|
||||
if (ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id <= thunderc->recv_id) continue; // already delivered
|
||||
thunderc->recv_id = ui->ap_arr_meta[i]->fmt.content.udp_metadata_thunder.id;
|
||||
|
||||
int64_t estimation_index = i > sorted_occurencies - 1 ? sorted_occurencies - 1 : i;
|
||||
if (sorted_estimation[estimation_index] > thunderc->allowed_jitter_ms) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// Find destination
|
||||
sprintf(url, "udp:write:127.0.0.1:%d", ui->ap_arr_pl[i]->fmt.content.udp_encapsulated.port);
|
||||
to_fdinfo = evt_core_get_from_url (ctx, url);
|
||||
|
|
23
src/proxy.h
23
src/proxy.h
|
@ -93,3 +93,26 @@ int main_on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
|
|||
int main_on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
|
||||
int main_on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
|
||||
int main_on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
|
||||
|
||||
//@FIXME UGLY
|
||||
|
||||
// A Tor cell size is 512 bytes but handle only 498 bytes of data
|
||||
#define TOR_CELL_SIZE 498
|
||||
#define MAX_LINKS 64
|
||||
|
||||
struct thunder_ctx {
|
||||
uint16_t recv_id;
|
||||
uint16_t emit_id;
|
||||
uint8_t selected_link;
|
||||
uint8_t total_links;
|
||||
uint64_t delta_t_per_link[MAX_LINKS];
|
||||
uint64_t rcv_delta_t_per_link[MAX_LINKS];
|
||||
uint64_t received_pkts_on_link[MAX_LINKS];
|
||||
uint64_t sent_pkts_on_link[MAX_LINKS];
|
||||
uint64_t blacklisted[MAX_LINKS];
|
||||
int64_t estimated_sent[MAX_LINKS];
|
||||
size_t monit_pkt_size;
|
||||
int64_t allowed_jitter_ms;
|
||||
struct timespec prev_link_time, prev_rcv_link_time;
|
||||
};
|
||||
void get_estimation(struct thunder_ctx* thunderc, int64_t* sorted_estimation, uint64_t* sorted_occurencies);
|
||||
|
|
Loading…
Reference in a new issue