Improve algorithm correctness...
This commit is contained in:
parent
102865deb1
commit
d6254351f6
2 changed files with 42 additions and 15 deletions
|
@ -3,17 +3,18 @@ library(sqldf)
|
||||||
library(plyr)
|
library(plyr)
|
||||||
library(cowplot)
|
library(cowplot)
|
||||||
|
|
||||||
link_info <- read.csv("../res/tmp_graph/l.txt")
|
link_info <- read.csv("../res/tmp_graph/u.txt")
|
||||||
ggplot(data=link_info, aes(x=timestamp, y=link, color=speed)) +
|
ggplot(data=link_info, aes(x=timestamp, y=link, color=speed)) +
|
||||||
#geom_line() +
|
#geom_line() +
|
||||||
geom_point() +
|
geom_point() +
|
||||||
theme_classic()
|
theme_classic()
|
||||||
|
|
||||||
|
|
||||||
xx <- read.csv("../res/tmp_graph/m.csv")
|
xx <- read.csv("../res/tmp_graph/r.csv")
|
||||||
xx2 <- sqldf("select packet_id,1.0 * MIN(latency) / 1000.0 as lat,way from xx group by packet_id,way")
|
xx2 <- sqldf("select packet_id,1.0 * MIN(latency) / 1000.0 as lat,way from xx group by packet_id,way")
|
||||||
ggplot(data=xx2, aes(x=packet_id, y=lat, color=way)) +
|
ggplot(data=xx2, aes(x=packet_id, y=lat, color=way)) +
|
||||||
geom_line() +
|
geom_line() +
|
||||||
|
coord_cartesian(ylim=c(0,2000)) +
|
||||||
#geom_point() +
|
#geom_point() +
|
||||||
theme_classic()
|
theme_classic()
|
||||||
|
|
||||||
|
|
|
@ -15,6 +15,12 @@ enum ooo_state {
|
||||||
OOO_DONE
|
OOO_DONE
|
||||||
};
|
};
|
||||||
|
|
||||||
|
char* ooo_state_str[] = {
|
||||||
|
"IN_ORDER",
|
||||||
|
"OOO_ONGOING",
|
||||||
|
"OOO_DONE"
|
||||||
|
};
|
||||||
|
|
||||||
struct stat_entry {
|
struct stat_entry {
|
||||||
uint8_t link_id;
|
uint8_t link_id;
|
||||||
int64_t max_ooo;
|
int64_t max_ooo;
|
||||||
|
@ -30,8 +36,8 @@ struct timing_entry {
|
||||||
|
|
||||||
struct light_ctx {
|
struct light_ctx {
|
||||||
uint8_t prev_links[MAX_LINKS];
|
uint8_t prev_links[MAX_LINKS];
|
||||||
uint16_t remote_stats[MAX_LINKS];
|
int16_t remote_stats[MAX_LINKS];
|
||||||
uint16_t local_stats[MAX_LINKS];
|
int16_t local_stats[MAX_LINKS];
|
||||||
struct timing_entry historic[HISTORIC_SIZE];
|
struct timing_entry historic[HISTORIC_SIZE];
|
||||||
struct timespec last[MAX_LINKS];
|
struct timespec last[MAX_LINKS];
|
||||||
uint64_t pkt_rcv_id;
|
uint64_t pkt_rcv_id;
|
||||||
|
@ -79,7 +85,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
|
||||||
sscanf(token, "recovery=%d", &lightc->sleep_duration);
|
sscanf(token, "recovery=%d", &lightc->sleep_duration);
|
||||||
sscanf(token, "window=%ld", &window);
|
sscanf(token, "window=%ld", &window);
|
||||||
sscanf(token, "sent_past_links=%d", &lightc->sent_past_links);
|
sscanf(token, "sent_past_links=%d", &lightc->sent_past_links);
|
||||||
sscanf(token, "csv=%c", &lightc->csv);
|
sscanf(token, "csv=%d", &lightc->csv);
|
||||||
sscanf(token, "measlat=%c", &lightc->is_measlat);
|
sscanf(token, "measlat=%c", &lightc->is_measlat);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -92,7 +98,7 @@ void algo_lightning_init(struct evt_core_ctx* ctx, struct algo_ctx* app_ctx, str
|
||||||
sizeof(m.fmt.headers) +
|
sizeof(m.fmt.headers) +
|
||||||
sizeof(m.fmt.content.link_monitoring_lightning) +
|
sizeof(m.fmt.content.link_monitoring_lightning) +
|
||||||
sizeof(uint8_t) * (lightc->sent_past_links - 1) +
|
sizeof(uint8_t) * (lightc->sent_past_links - 1) +
|
||||||
sizeof(uint16_t) * lightc->total_links;
|
sizeof(int16_t) * lightc->total_links;
|
||||||
timespec_set_unit (&lightc->window, window, MILISEC);
|
timespec_set_unit (&lightc->window, window, MILISEC);
|
||||||
|
|
||||||
printf("fast_count = %d\n", lightc->fast_count);
|
printf("fast_count = %d\n", lightc->fast_count);
|
||||||
|
@ -115,13 +121,19 @@ void monitoring(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct
|
||||||
}
|
}
|
||||||
|
|
||||||
uint8_t *prev_links = &ap->fmt.content.link_monitoring_lightning.dyn_struct;
|
uint8_t *prev_links = &ap->fmt.content.link_monitoring_lightning.dyn_struct;
|
||||||
uint16_t *remote_stats = (uint16_t*)(prev_links + sizeof(uint8_t) * lightc->sent_past_links);
|
int16_t *remote_stats = (int16_t*)(prev_links + sizeof(uint8_t) * lightc->sent_past_links);
|
||||||
|
|
||||||
|
printf("(monitoring.stats) ");
|
||||||
|
for (int i = 0; i < lightc->total_links; i++) {
|
||||||
|
printf("%d, ", remote_stats[i]);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
|
||||||
int64_t pkt_id = ap->fmt.content.link_monitoring_lightning.id;
|
int64_t pkt_id = ap->fmt.content.link_monitoring_lightning.id;
|
||||||
int64_t missing = pkt_id - (lightc->pkt_rcv_id + 1);
|
int64_t missing = pkt_id - (lightc->pkt_rcv_id + 1);
|
||||||
if (pkt_id > lightc->pkt_rcv_id) {
|
if (pkt_id > lightc->pkt_rcv_id) {
|
||||||
lightc->pkt_rcv_id = pkt_id;
|
lightc->pkt_rcv_id = pkt_id;
|
||||||
memcpy(&lightc->remote_stats, remote_stats, sizeof(uint16_t) * lightc->total_links);
|
memcpy(&lightc->remote_stats, remote_stats, sizeof(int16_t) * lightc->total_links);
|
||||||
}
|
}
|
||||||
//printf("internal packet %ld (%ld)\n", pkt_id, missing);
|
//printf("internal packet %ld (%ld)\n", pkt_id, missing);
|
||||||
|
|
||||||
|
@ -138,6 +150,7 @@ void monitoring(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct
|
||||||
te->detected_at = now;
|
te->detected_at = now;
|
||||||
te->link_id = link_id;
|
te->link_id = link_id;
|
||||||
te->pkt_id = miss_id;
|
te->pkt_id = miss_id;
|
||||||
|
printf("(monitoring.delay) packet=%ld, link=%d, state=%s\n", miss_id, link_id, ooo_state_str[te->state]);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update current packet status
|
// Update current packet status
|
||||||
|
@ -147,6 +160,7 @@ void monitoring(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct
|
||||||
te2->pkt_id = pkt_id;
|
te2->pkt_id = pkt_id;
|
||||||
te2->link_id = link_id;
|
te2->link_id = link_id;
|
||||||
te2->finished_at = now;
|
te2->finished_at = now;
|
||||||
|
printf("(monitoring.rcv) packet=%ld, link=%d, state=%s\n", pkt_id, link_id, ooo_state_str[te2->state]);
|
||||||
}
|
}
|
||||||
|
|
||||||
int deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
|
int deliver(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
|
||||||
|
@ -196,7 +210,7 @@ void algo_lightning_update_stats (struct light_ctx *lightc, struct stat_entry *s
|
||||||
|
|
||||||
// Compute local stats
|
// Compute local stats
|
||||||
for (int i = 0; i < HISTORIC_SIZE; i++) {
|
for (int i = 0; i < HISTORIC_SIZE; i++) {
|
||||||
if (timespec_lt(&lightc->historic[i].finished_at, ¬_before)) continue;
|
if (timespec_lt(&lightc->historic[i].finished_at, ¬_before) && lightc->historic[i].state != OOO_ONGOING) continue;
|
||||||
uint8_t l = lightc->historic[i].link_id;
|
uint8_t l = lightc->historic[i].link_id;
|
||||||
if (l >= lightc->total_links) continue;
|
if (l >= lightc->total_links) continue;
|
||||||
int64_t delta = 0;
|
int64_t delta = 0;
|
||||||
|
@ -212,14 +226,21 @@ void algo_lightning_update_stats (struct light_ctx *lightc, struct stat_entry *s
|
||||||
delta = timespec_get_unit (&temp_time, MILISEC);
|
delta = timespec_get_unit (&temp_time, MILISEC);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
printf("(stats.compute) packet=%ld, link=%d, status=%s, delta=%ld\n", lightc->historic[i].pkt_id, l, ooo_state_str[lightc->historic[i].state], delta);
|
||||||
stats[l].link_id = l;
|
stats[l].link_id = l;
|
||||||
stats[l].max_ooo = delta > stats[l].max_ooo ? delta : stats[l].max_ooo;
|
if (delta > stats[l].max_ooo) {
|
||||||
|
printf("(stats.local) link=%d, delta=%ld\n", l, delta);
|
||||||
|
stats[l].max_ooo = delta;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Set my local stats + merge remote stats
|
// Set my local stats + merge remote stats
|
||||||
for (int i = 0; i < lightc->total_links; i++) {
|
for (int i = 0; i < lightc->total_links; i++) {
|
||||||
lightc->local_stats[i] = stats[i].max_ooo;
|
lightc->local_stats[i] = stats[i].max_ooo;
|
||||||
stats[i].max_ooo = lightc->remote_stats[i] > stats[i].max_ooo ? lightc->remote_stats[i] : stats[i].max_ooo;
|
if (lightc->remote_stats[i] > stats[i].max_ooo) {
|
||||||
|
printf("(stats.remote) link=%d, delta=%d\n", i, lightc->remote_stats[i]);
|
||||||
|
stats[i].max_ooo = lightc->remote_stats[i];
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sort
|
// Sort
|
||||||
|
@ -254,12 +275,17 @@ int send_message(struct evt_core_ctx* ctx, struct buffer_packet* bp) {
|
||||||
};
|
};
|
||||||
union abstract_packet *ap_buf = buffer_append_ap (bp_dup, &monit);
|
union abstract_packet *ap_buf = buffer_append_ap (bp_dup, &monit);
|
||||||
uint8_t *links = &ap_buf->fmt.content.link_monitoring_lightning.dyn_struct;
|
uint8_t *links = &ap_buf->fmt.content.link_monitoring_lightning.dyn_struct;
|
||||||
uint16_t *remote_stats = (uint16_t*)(links + sizeof(uint8_t) * lightc->sent_past_links);
|
int16_t *remote_stats = (int16_t*)(links + sizeof(uint8_t) * lightc->sent_past_links);
|
||||||
|
|
||||||
for (int i = 0; i < lightc->sent_past_links; i++) {
|
for (int i = 0; i < lightc->sent_past_links; i++) {
|
||||||
links[i] = lightc->prev_links[(lightc->pkt_sent_id - (i + 1)) % MAX_LINKS];
|
links[i] = lightc->prev_links[(lightc->pkt_sent_id - (i + 1)) % MAX_LINKS];
|
||||||
}
|
}
|
||||||
memcpy(remote_stats, &lightc->local_stats, sizeof(uint16_t) * lightc->total_links);
|
memcpy(remote_stats, &lightc->local_stats, sizeof(int16_t) * lightc->total_links);
|
||||||
|
printf("(send.stats) ");
|
||||||
|
for (int i = 0; i < lightc->total_links; i++) {
|
||||||
|
printf("%d, ", remote_stats[i]);
|
||||||
|
}
|
||||||
|
printf("\n");
|
||||||
|
|
||||||
lightc->prev_links[lightc->pkt_sent_id % MAX_LINKS] = lightc->selected_link;
|
lightc->prev_links[lightc->pkt_sent_id % MAX_LINKS] = lightc->selected_link;
|
||||||
|
|
||||||
|
@ -303,7 +329,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
send_message (ctx, bp);
|
send_message (ctx, bp);
|
||||||
printf("%ld,%d,fast\n", now_timestamp, lightc->selected_link);
|
if (lightc->csv) printf("%ld,%d,fast\n", now_timestamp, lightc->selected_link);
|
||||||
|
|
||||||
// Select slow link
|
// Select slow link
|
||||||
sel_link_last = now;
|
sel_link_last = now;
|
||||||
|
@ -322,7 +348,7 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
|
||||||
}
|
}
|
||||||
if (elapsed >= lightc->sleep_duration) {
|
if (elapsed >= lightc->sleep_duration) {
|
||||||
send_message (ctx, bp);
|
send_message (ctx, bp);
|
||||||
printf("%ld,%d,slow\n", now_timestamp, lightc->selected_link);
|
if (lightc->csv) printf("%ld,%d,slow\n", now_timestamp, lightc->selected_link);
|
||||||
}
|
}
|
||||||
|
|
||||||
mv_buffer_rtof (&app_ctx->br, fdinfo);
|
mv_buffer_rtof (&app_ctx->br, fdinfo);
|
||||||
|
|
Loading…
Reference in a new issue