diff --git a/CMakeLists.txt b/CMakeLists.txt index ec99347..aac5bdc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -39,6 +39,7 @@ list(APPEND CSOURCES src/cap_utils.c src/measure.h src/measure.c + src/algo_lightning.c ) add_executable(donar ${CSOURCES} src/donar.c) diff --git a/src/algo_lightning.c b/src/algo_lightning.c new file mode 100644 index 0000000..c4635ab --- /dev/null +++ b/src/algo_lightning.c @@ -0,0 +1,184 @@ +#include +#include "algo_utils.h" +#include "utils.h" +#include "url.h" +#include "proxy.h" +#include "timer.h" +#include "proxy.h" +#define HISTORIC_SIZE 256 +#define MAX_LINKS 64 + +enum ooo_state { + IN_ORDER, + OOO_ONGOING, + OOO_DONE +}; + +struct timing_entry { + enum ooo_state state; + struct timespec detected_at; + struct timespec finished_at; + uint8_t link_id; + uint64_t pkt_id; +}; + +struct light_ctx { + struct timing_entry historic[HISTORIC_SIZE]; + struct timespec last[MAX_LINKS]; + uint64_t pkt_rcv_id; + uint64_t pkt_sent_id; + uint8_t selected_link; + uint8_t total_links; + int max_ooo; + int sleep_duration; + struct timespec window; + size_t monit_pkt_size; +}; + +void algo_lightning_free(void* v) { + struct light_ctx* lightc = v; + free(lightc); +} + +void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap) { + app_ctx->misc = malloc(sizeof(struct light_ctx)); + app_ctx->free_misc = algo_lightning_free; + if (app_ctx->misc == NULL) { + perror("malloc failed in algo lightning init"); + exit(EXIT_FAILURE); + } + memset(app_ctx->misc, 0, sizeof(struct light_ctx)); + struct light_ctx* lightc = app_ctx->misc; + lightc->total_links = app_ctx->ap.links; + lightc->selected_link = lightc->total_links - 1; + lightc->max_ooo = 50; + lightc->sleep_duration = 500; + + uint64_t window = 5000; + if (ap->algo_specific_params != NULL) { + char *parse_ptr, *token, *params; + + for (params = ap->algo_specific_params; ; params = NULL) { + token = strtok_r(params, ",", &parse_ptr); + if (token == NULL) break; + sscanf(token, "max_ooo=%d", &lightc->max_ooo); + sscanf(token, "recovery=%d", &lightc->sleep_duration); + sscanf(token, "window=%ld", &window); + } + } + + union abstract_packet m; + lightc->monit_pkt_size = sizeof(m.fmt.headers) + sizeof(m.fmt.content.link_monitoring_lightning); + timespec_set_unit (&lightc->window, window, MICROSEC); + + printf("max_ooo = %d\n", lightc->max_ooo); + printf("recovery = %d\n", lightc->sleep_duration); + printf("window = %ld ms\n", window); +} + +int algo_lightning_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + + return 0; +} + +void algo_lightning_update_stats (struct light_ctx *lightc, int64_t *stats) { + struct timespec now, not_before = {0}, temp_time; + set_now(&now); + timespec_diff (&now, &lightc->window, ¬_before); + + for (int i = 0; i < lightc->total_links; i++) stats[i] = -1; + + for (int i = 0; i < HISTORIC_SIZE; i++) { + if (timespec_lt(&lightc->historic[i].finished_at, ¬_before)) continue; + int64_t delta = 0; + switch (lightc->historic[i].state) { + case IN_ORDER: + break; + case OOO_ONGOING: + timespec_diff(&now, &lightc->historic[i].detected_at, &temp_time); + delta = timespec_get_unit (&temp_time, MICROSEC); + break; + case OOO_DONE: + timespec_diff(&lightc->historic[i].finished_at, &lightc->historic[i].detected_at, &temp_time); + delta = timespec_get_unit (&temp_time, MICROSEC); + break; + } + uint8_t l = lightc->historic[i].link_id; + stats[l] = delta > stats[l] ? delta : stats[l]; + } +} + +int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) { + char url[256]; + struct evt_core_cat *cat = evt_core_get_from_cat (ctx, "tcp-write"); + if (cat == NULL) { + fprintf(stderr, "[algo_lightning] cat tcp-write not found\n"); + exit(EXIT_FAILURE); + } + + struct algo_ctx* app_ctx = cat->app_ctx; + struct light_ctx* lightc = app_ctx->misc; + set_now(&lightc->last[lightc->selected_link]); + + sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + lightc->selected_link); + struct evt_core_fdinfo *to_fdinfo = evt_core_get_from_url (ctx, url); + if (to_fdinfo == NULL) return 0; + + struct buffer_packet* bp_dup = dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); + + union abstract_packet monit = { + .fmt.headers.cmd = CMD_LINK_MONITORING_LIGHTNING, + .fmt.headers.size = lightc->monit_pkt_size, + .fmt.headers.flags = 0, + .fmt.content.link_monitoring_lightning.id = ++lightc->pkt_sent_id + }; + buffer_append_ap (bp_dup, &monit); + + if (ctx->verbose > 1) { + dump_buffer_packet(bp_dup); + fprintf(stderr, " [algo_lightning] Will send this info\n"); + } + main_on_tcp_write(ctx, to_fdinfo); + + return 1; +} + +int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { + struct algo_ctx* app_ctx = fdinfo->cat->app_ctx; + struct light_ctx* lightc = app_ctx->misc; + union abstract_packet* ap = (union abstract_packet*) &bp->ip; + + // Last step, send packet + int64_t stats[MAX_LINKS]; + algo_lightning_update_stats(lightc, stats); + + struct timespec now; + set_now(&now); + + while (1) { + lightc->selected_link = (lightc->selected_link + 1) % lightc->total_links; + uint64_t cdelta = stats[lightc->selected_link]; + + // OK link + if (cdelta >= 0 && cdelta < lightc->max_ooo) { + send_message (ctx, bp); + break; + } + + // Broken link that must be probed + if (elapsed_micros (&now, &lightc->last[lightc->selected_link]) >= lightc->sleep_duration) { + send_message (ctx, bp); + continue; + } + + // Broken link probed recently + } + + mv_buffer_rtof (&app_ctx->br, fdinfo); + return 0; +} + +int algo_lightning_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { + return 0; +} diff --git a/src/measure.c b/src/measure.c index e7d1fb2..0556a14 100644 --- a/src/measure.c +++ b/src/measure.c @@ -1,5 +1,4 @@ #include "measure.h" -#define ONE_SEC 1000000000L void measure_parse(int size, struct measure_conf* mc) { struct timespec curr; @@ -58,10 +57,6 @@ struct measure_packet* measure_generate(struct measure_conf* mc) { return head; } -uint8_t timespec_gt(struct timespec *t1, struct timespec *t2) { - return t1->tv_sec > t2->tv_sec || (t1->tv_sec == t2->tv_sec && t1->tv_nsec > t2->tv_nsec); -} - void measure_next_tick(struct measure_conf *mc, struct timespec *next) { struct measure_packet *head = (struct measure_packet*) mc->payload; struct timespec now, *sent_at = &head->emit_time; diff --git a/src/packet.h b/src/packet.h index 16a3279..4bf212c 100644 --- a/src/packet.h +++ b/src/packet.h @@ -35,6 +35,7 @@ enum PKT_CMD { CMD_UDP_ENCAPSULATED = 1, CMD_LINK_MONITORING_THUNDER = 2, CMD_UDP_METADATA_THUNDER = 3, + CMD_LINK_MONITORING_LIGHTNING = 4, }; enum PKT_FLAGS { @@ -56,6 +57,9 @@ union abstract_packet { } headers; union { + struct { + uint64_t id; + } link_monitoring_lightning; struct { uint8_t to_increment; struct link_info links_status; diff --git a/src/proxy.h b/src/proxy.h index f67fb95..e4371f1 100644 --- a/src/proxy.h +++ b/src/proxy.h @@ -60,6 +60,11 @@ int algo_thunder_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdi int algo_thunder_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); int algo_thunder_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); +void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, struct algo_params* ap); +int algo_lightning_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp); +int algo_lightning_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo); + static struct algo_desc available_algo[] = { { .name = "naive", @@ -81,6 +86,10 @@ static struct algo_desc available_algo[] = { .on_stream = algo_thunder_on_stream, .on_datagram = algo_thunder_on_datagram, .on_err = algo_thunder_on_err + }, + { + .name = "lightning", + .init = algo_lightning_init } }; diff --git a/src/utils.c b/src/utils.c index e178480..3387762 100644 --- a/src/utils.c +++ b/src/utils.c @@ -92,3 +92,58 @@ uint64_t elapsed_micros(struct timespec* t1, struct timespec* t2) { return micro_sec; } +void set_now(struct timespec *dest) { + if (clock_gettime(CLOCK_MONOTONIC, dest) == -1){ + perror("clock_gettime error"); + exit(EXIT_FAILURE); + } +} + +uint8_t timespec_eq(struct timespec *t1, struct timespec *t2) { + return t1->tv_sec == t2->tv_sec && t1->tv_nsec == t2->tv_nsec; +} + +uint8_t timespec_gt(struct timespec *t1, struct timespec *t2) { + return t1->tv_sec > t2->tv_sec || + (t1->tv_sec == t2->tv_sec && t1->tv_nsec > t2->tv_nsec); +} + +uint8_t timespec_ge(struct timespec *t1, struct timespec *t2) { + return timespec_gt (t1, t2) || timespec_eq(t1, t2); +} + +uint8_t timespec_le (struct timespec *t1, struct timespec *t2) { + return !timespec_gt (t1, t2); +} + +uint8_t timespec_lt(struct timespec *t1, struct timespec *t2) { + return !timespec_ge(t1, t2); +} + +void timespec_diff(struct timespec *stop, struct timespec *start, struct timespec *result) { + if (timespec_gt(start, stop)) { + fprintf(stderr, "start is greater than stop in timespec_diff, this is an error\n"); + exit(EXIT_FAILURE); + } + + if (stop->tv_nsec < start->tv_nsec) { + result->tv_sec = stop->tv_sec - start->tv_sec - 1; + result->tv_nsec = stop->tv_nsec - start->tv_nsec + 1000000000; + } else { + result->tv_sec = stop->tv_sec - start->tv_sec; + result->tv_nsec = stop->tv_nsec - start->tv_nsec; + } + + return; +} + +void timespec_set_unit(struct timespec *t, uint64_t value, enum time_units unit) { + uint64_t sec = value / (SEC / unit); + uint64_t ns = value * unit; + t->tv_sec = sec; + t->tv_nsec = ns; +} + +uint64_t timespec_get_unit(struct timespec *t, enum time_units unit) { + return t->tv_sec * (SEC / unit) + t->tv_nsec / unit; +} diff --git a/src/utils.h b/src/utils.h index fad8837..e6a90d7 100644 --- a/src/utils.h +++ b/src/utils.h @@ -3,9 +3,18 @@ #include #include #include +#include // 1500 = internet MTU #define RING_BUFFER_SIZE 1500*10 +#define ONE_SEC 1000000000L + +enum time_units { + NANOSEC = 1L, + MICROSEC = 1000L, + MILISEC = 1000000L, + SEC = 1000000000L +}; struct ring_buffer { char buffer[RING_BUFFER_SIZE]; @@ -25,4 +34,13 @@ int ring_lt(uint16_t v1, uint16_t v2); int ring_le(uint16_t v1, uint16_t v2); uint64_t elapsed_micros(struct timespec* t1, struct timespec* t2); +void set_now(struct timespec *dest); +uint8_t timespec_eq(struct timespec *t1, struct timespec *t2); +uint8_t timespec_gt(struct timespec *t1, struct timespec *t2); +uint8_t timespec_ge(struct timespec *t1, struct timespec *t2); +uint8_t timespec_le(struct timespec *t1, struct timespec *t2); +uint8_t timespec_lt(struct timespec *t1, struct timespec *t2); +void timespec_diff(struct timespec *end, struct timespec *begin, struct timespec *result); +void timespec_set_unit(struct timespec *t, uint64_t value, enum time_units unit); +uint64_t timespec_get_unit(struct timespec *t, enum time_units unit);