Improve measlat

This commit is contained in:
Quentin 2020-02-01 00:00:45 +01:00
parent b1e004f61b
commit f830d43458
3 changed files with 32 additions and 16 deletions

View file

@ -39,7 +39,7 @@ void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) {
mctx->is_timer_started = 1; mctx->is_timer_started = 1;
if (clock_gettime(CLOCK_REALTIME, &now) == -1) { if (clock_gettime(CLOCK_MONOTONIC, &now) == -1) {
perror("clock_gettime"); perror("clock_gettime");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -51,7 +51,7 @@ void register_timer(struct evt_core_ctx* evts, struct timespec* next_tick) {
printf("timer_config: sec=%ld nsec=%ld \n", (uint64_t) timer_config.it_value.tv_sec, (uint64_t) timer_config.it_value.tv_nsec); printf("timer_config: sec=%ld nsec=%ld \n", (uint64_t) timer_config.it_value.tv_sec, (uint64_t) timer_config.it_value.tv_nsec);
fdinfo.fd = timerfd_create(CLOCK_REALTIME, 0); fdinfo.fd = timerfd_create(CLOCK_MONOTONIC, 0);
if (fdinfo.fd == -1) { if (fdinfo.fd == -1) {
perror("Unable to timerfd_create"); perror("Unable to timerfd_create");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -71,27 +71,34 @@ int on_receive_measure_packet_err(struct evt_core_ctx* ctx, struct evt_core_fdin
} }
int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_receive_measure_packet(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
ssize_t nread;
struct measlat_ctx* mctx = fdinfo->cat->app_ctx; struct measlat_ctx* mctx = fdinfo->cat->app_ctx;
ssize_t nread;
if (mctx->is_from_needed) nread = recvfrom(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen); if (mctx->mc.read_size >= mctx->mc.payload_size) mctx->mc.read_size = 0;
else nread = recv(fdinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
if ((nread == -1 && errno == EAGAIN) || nread == 0) return 1; if (mctx->is_from_needed) nread = recvfrom(fdinfo->fd, mctx->mc.payload_rcv, mctx->mc.payload_size, MSG_TRUNC, (struct sockaddr*)&mctx->addr, &mctx->addrlen);
// @FIXME logic is wrong for TCP here but would lead (hopefully) to a crash else nread = recv(fdinfo->fd, mctx->mc.payload_rcv + mctx->mc.read_size, mctx->mc.payload_size - mctx->mc.read_size, 0);
if (nread > 0) mctx->mc.read_size += nread;
if ((nread == -1 && errno == EAGAIN) || nread == 0) {
return 1;
}
if (mctx->mc.read_size < mctx->mc.payload_size) {
return 0;
}
// First we parse the packet and exit if needed // First we parse the packet and exit if needed
measure_parse (nread, &mctx->mc); measure_parse (mctx->mc.read_size, &mctx->mc);
// Old behaviour where we work in a RTT way and send back the packet // Old behaviour where we work in a RTT way and send back the packet
if (measure_need_reply (&mctx->mc)) { if (measure_need_reply (&mctx->mc)) {
ssize_t nwritten; ssize_t nwritten;
if (mctx->is_from_needed) nwritten = sendto(fdinfo->fd, mctx->mc.payload, nread, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen); if (mctx->is_from_needed) nwritten = sendto(fdinfo->fd, mctx->mc.payload, mctx->mc.read_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen);
else nwritten = send(fdinfo->fd, mctx->mc.payload, nread, 0); else nwritten = send(fdinfo->fd, mctx->mc.payload, mctx->mc.read_size, 0);
// @FIXME don't support EAGAIN on write. Could be intended, you don't think so? // @FIXME don't support EAGAIN on write. Could be intended, you don't think so?
if (nwritten != nread) { if (nwritten != mctx->mc.read_size) {
fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", nread, nwritten); fprintf(stderr, "Didn't write the same number of bytes as read. nread=%ld, nwritten=%ld\n", mctx->mc.read_size, nwritten);
perror("write errno"); perror("write errno");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
@ -173,9 +180,9 @@ int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
if (mctx->is_from_needed) s = sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen); if (mctx->is_from_needed) s = sendto(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0, (struct sockaddr*)&mctx->addr, mctx->addrlen);
else s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0); else s = send(tgtinfo->fd, mctx->mc.payload, mctx->mc.payload_size, 0);
if (s < 0) { if (s < 0 || s != mctx->mc.payload_size) {
perror("Send error"); perror("Send error");
//exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
return 0; return 0;

View file

@ -4,10 +4,11 @@ void measure_parse(int size, struct measure_conf* mc) {
struct timespec curr; struct timespec curr;
uint64_t micro_sec; uint64_t micro_sec;
if (size != mc->payload_size) { if (size != mc->payload_size) {
fprintf(stderr, "read size: %d, expected: %ld\n", size, mc->payload_size);
perror("read error, payload has wrong size"); perror("read error, payload has wrong size");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
struct measure_packet* head = (struct measure_packet*) mc->payload; struct measure_packet* head = (struct measure_packet*) mc->payload_rcv;
if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){ if (clock_gettime(CLOCK_MONOTONIC, &curr) == -1){
perror("clock_gettime error"); perror("clock_gettime error");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -43,6 +44,12 @@ void measure_prepare(struct measure_conf* mc) {
} }
memset(mc->payload, 0, mc->payload_size); memset(mc->payload, 0, mc->payload_size);
if ((mc->payload_rcv = malloc(sizeof(char) * mc->payload_size)) == NULL) {
perror("payload malloc failed");
exit(EXIT_FAILURE);
}
memset(mc->payload_rcv, 0, mc->payload_size);
char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire."; char *my_msg = "Tu n'es pas tout a fait la misere,\nCar les levres les plus pauvres te denoncent\nPar un sourire.";
size_t msg_len = strlen(my_msg); size_t msg_len = strlen(my_msg);
size_t cursor_msg = 0; size_t cursor_msg = 0;
@ -66,7 +73,7 @@ struct measure_packet* measure_generate(struct measure_conf* mc) {
} }
void measure_next_tick(struct measure_conf *mc, struct timespec *next) { void measure_next_tick(struct measure_conf *mc, struct timespec *next) {
struct measure_packet *head = (struct measure_packet*) mc->payload; struct measure_packet *head = (struct measure_packet*) mc->payload_rcv;
struct timespec now, *sent_at = &head->emit_time; struct timespec now, *sent_at = &head->emit_time;
mc->counter = head->counter; mc->counter = head->counter;

View file

@ -9,8 +9,10 @@
struct measure_conf { struct measure_conf {
uint64_t max_measure; uint64_t max_measure;
uint64_t payload_size; uint64_t payload_size;
uint64_t read_size;
uint64_t interval; uint64_t interval;
char* payload; char* payload;
char* payload_rcv;
uint64_t counter; uint64_t counter;
uint8_t is_server, is_rtt; uint8_t is_server, is_rtt;
}; };