Rework loop

This commit is contained in:
Quentin Dufour 2019-02-20 17:46:58 +01:00
parent 5e042d57ce
commit 20706800ee
5 changed files with 42 additions and 32 deletions

View file

@ -9,10 +9,10 @@ struct naive_ctx {
GHashTable* write_waiting; // Structure to track packets waiting to be written GHashTable* write_waiting; // Structure to track packets waiting to be written
}; };
void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
void on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
void on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo); int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
void free_nothing(void* app_ctx) {} void free_nothing(void* app_ctx) {}
void free_naive(void* app_ctx) { void free_naive(void* app_ctx) {
@ -25,7 +25,7 @@ void free_naive(void* app_ctx) {
free(ctx); free(ctx);
} }
void on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
int conn_sock1, conn_sock2; int conn_sock1, conn_sock2;
struct sockaddr_in addr; struct sockaddr_in addr;
socklen_t in_len; socklen_t in_len;
@ -55,7 +55,7 @@ void on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port); sprintf(to_fdinfo.url, "tcp:write:127.0.0.1:%s", port);
evt_core_add_fd (ctx, &to_fdinfo); evt_core_add_fd (ctx, &to_fdinfo);
return; return 1;
co_error: co_error:
perror("Failed to handle new connection"); perror("Failed to handle new connection");
@ -148,7 +148,7 @@ void notify_read(struct evt_core_ctx* ctx, struct naive_ctx* app_ctx) {
} }
} }
void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
struct evt_core_fdinfo *to_fdinfo = NULL; struct evt_core_fdinfo *to_fdinfo = NULL;
struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; struct naive_ctx* app_ctx = fdinfo->cat->app_ctx;
@ -156,14 +156,14 @@ void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
char url[255]; char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return; if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Try to read a whole packet in the buffer // 2. Try to read a whole packet in the buffer
while (read_res != FDS_AGAIN && bp->mode == BP_READING) { while (bp->mode == BP_READING) {
read_res = read_packet_from_tcp (fdinfo->fd, bp); read_res = read_packet_from_tcp (fdinfo->fd, bp);
if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_ERR) goto co_error;
if (read_res == FDS_AGAIN) return 1;
} }
if (bp->mode != BP_WRITING) return;
// 3. A whole packet has been read, we will find someone to write it // 3. A whole packet has been read, we will find someone to write it
sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port); sprintf(url, "udp:write:127.0.0.1:%d", bp->ip.ap.str.port);
@ -171,7 +171,7 @@ void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url); fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
mv_buffer_wtor (app_ctx, fdinfo, bp); mv_buffer_wtor (app_ctx, fdinfo, bp);
return; return 1;
} }
//printf("Pass packet from %s to %s\n", fdinfo->url, url); //printf("Pass packet from %s to %s\n", fdinfo->url, url);
@ -179,39 +179,39 @@ void on_tcp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp);
on_udp_write(ctx, to_fdinfo); on_udp_write(ctx, to_fdinfo);
return; return 0;
co_error: co_error:
perror("Failed to TCP read"); perror("Failed to TCP read");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_tcp_write(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; struct naive_ctx* app_ctx = fdinfo->cat->app_ctx;
int write_res = FDS_READY; int write_res = FDS_READY;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return; if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write data from the buffer to the socket // 2. Write data from the buffer to the socket
while (write_res != FDS_AGAIN && bp->mode == BP_WRITING) { while (bp->mode == BP_WRITING) {
write_res = write_packet_to_tcp(fdinfo->fd, bp); write_res = write_packet_to_tcp(fdinfo->fd, bp);
if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_ERR) goto co_error;
if (write_res == FDS_AGAIN) return 1;
} }
if (bp->mode != BP_READING) return;
// 3. A whole packet has been written // 3. A whole packet has been written
// Release the buffer and notify // Release the buffer and notify
mv_buffer_wtor(app_ctx, fdinfo, bp); mv_buffer_wtor(app_ctx, fdinfo, bp);
notify_read(ctx, app_ctx); notify_read(ctx, app_ctx);
return; return 0;
co_error: co_error:
perror("Failed to TCP write"); perror("Failed to TCP write");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
struct evt_core_fdinfo *to_fdinfo; struct evt_core_fdinfo *to_fdinfo;
struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; struct naive_ctx* app_ctx = fdinfo->cat->app_ctx;
@ -219,13 +219,13 @@ void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
char url[255]; char url[255];
// 1. Get current read buffer OR a new read buffer OR subscribe to be notified later // 1. Get current read buffer OR a new read buffer OR subscribe to be notified later
if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return; if ((bp = get_read_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Read packet from socket // 2. Read packet from socket
bp->ip.ap.str.port = url_get_port_int (fdinfo->url); bp->ip.ap.str.port = url_get_port_int (fdinfo->url);
read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other); read_res = read_packet_from_udp (fdinfo->fd, bp, fdinfo->other);
if (read_res == FDS_ERR) goto co_error; if (read_res == FDS_ERR) goto co_error;
if (bp->mode != BP_WRITING) return; if (read_res == FDS_AGAIN) return 1;
// 3. A whole packet has been read, we will find someone to write it // 3. A whole packet has been read, we will find someone to write it
sprintf(url, "tcp:write:127.0.0.1:7500"); sprintf(url, "tcp:write:127.0.0.1:7500");
@ -233,7 +233,7 @@ void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
if (to_fdinfo == NULL) { if (to_fdinfo == NULL) {
fprintf(stderr, "No fd for URL %s in udp-read. Dropping packet :( \n", url); fprintf(stderr, "No fd for URL %s in udp-read. Dropping packet :( \n", url);
mv_buffer_wtor (app_ctx, fdinfo, bp); mv_buffer_wtor (app_ctx, fdinfo, bp);
return; return 1;
} }
//printf("Pass packet from %s to %s\n", fdinfo->url, url); //printf("Pass packet from %s to %s\n", fdinfo->url, url);
@ -241,32 +241,32 @@ void on_udp_read(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp); mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo, bp);
on_tcp_write(ctx, to_fdinfo); on_tcp_write(ctx, to_fdinfo);
return; return 0;
co_error: co_error:
perror("Failed to UDP read"); perror("Failed to UDP read");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
void on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_udp_write (struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct buffer_packet* bp; struct buffer_packet* bp;
struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; struct naive_ctx* app_ctx = fdinfo->cat->app_ctx;
int write_res = FDS_READY; int write_res = FDS_READY;
// 1. Get current write buffer OR a buffer from the waiting queue OR leave // 1. Get current write buffer OR a buffer from the waiting queue OR leave
if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return; if ((bp = get_write_buffer(app_ctx, fdinfo)) == NULL) return 1;
// 2. Write buffer // 2. Write buffer
write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other); write_res = write_packet_to_udp(fdinfo->fd, bp, fdinfo->other);
if (write_res == FDS_ERR) goto co_error; if (write_res == FDS_ERR) goto co_error;
if (bp->mode != BP_READING) return; if (write_res == FDS_AGAIN) return 1;
// 3. A whole packet has been written // 3. A whole packet has been written
// Release the buffer and notify // Release the buffer and notify
mv_buffer_wtor(app_ctx, fdinfo, bp); mv_buffer_wtor(app_ctx, fdinfo, bp);
notify_read(ctx, app_ctx); notify_read(ctx, app_ctx);
return; return 0;
co_error: co_error:
perror("Failed to UDP write"); perror("Failed to UDP write");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -277,7 +277,7 @@ void naive_free_simple(void* v) {
free(v); free(v);
} }
void on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
struct naive_ctx* app_ctx = fdinfo->cat->app_ctx; struct naive_ctx* app_ctx = fdinfo->cat->app_ctx;
struct buffer_packet* bp; struct buffer_packet* bp;
@ -299,6 +299,8 @@ void on_err(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
// 3. If appears in the read waiting queue, remove it // 3. If appears in the read waiting queue, remove it
g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd)); g_queue_remove_all (app_ctx->read_waiting, &(fdinfo->fd));
return 1;
} }
void algo_naive(struct algo_skel* as) { void algo_naive(struct algo_skel* as) {

View file

@ -163,7 +163,9 @@ void evt_core_loop(struct evt_core_ctx* ctx) {
fdinfo = evt_core_get_from_fd(ctx, err_fd); fdinfo = evt_core_get_from_fd(ctx, err_fd);
if (fdinfo != NULL) { if (fdinfo != NULL) {
fprintf(stderr, "Clearing fd=%d on cat=%s\n", err_fd, fdinfo->cat->name); fprintf(stderr, "Clearing fd=%d on cat=%s\n", err_fd, fdinfo->cat->name);
if (fdinfo->cat->err_cb != NULL) fdinfo->cat->err_cb(ctx, fdinfo); if (fdinfo->cat->err_cb != NULL) {
while (fdinfo->cat->err_cb(ctx, fdinfo) == 0);
}
evt_core_rm_fd (ctx, err_fd); evt_core_rm_fd (ctx, err_fd);
} else { } else {
fprintf(stderr, "The file descriptor is not registered in a category, this is probably a logic error\n"); fprintf(stderr, "The file descriptor is not registered in a category, this is probably a logic error\n");
@ -178,7 +180,7 @@ void evt_core_loop(struct evt_core_ctx* ctx) {
fprintf(stderr, "Ignoring file descriptor %d as it is not registered. This is a bug.\n", events[n].data.fd); fprintf(stderr, "Ignoring file descriptor %d as it is not registered. This is a bug.\n", events[n].data.fd);
continue; continue;
} }
fdinfo->cat->cb(ctx, fdinfo); while(fdinfo->cat->cb(ctx, fdinfo) == 0);
} }
} }

View file

@ -15,7 +15,7 @@ struct evt_core_cat;
struct evt_core_fdinfo; struct evt_core_fdinfo;
typedef void (*evt_core_free_app_ctx)(void*); typedef void (*evt_core_free_app_ctx)(void*);
typedef void (*evt_core_cb)(struct evt_core_ctx*, struct evt_core_fdinfo*); typedef int (*evt_core_cb)(struct evt_core_ctx*, struct evt_core_fdinfo*);
struct evt_core_cat { struct evt_core_cat {
void* app_ctx; void* app_ctx;

View file

@ -3,6 +3,7 @@
#include <sys/timerfd.h> #include <sys/timerfd.h>
#include <time.h> #include <time.h>
#include <unistd.h> #include <unistd.h>
#include <errno.h>
#include "evt_core.h" #include "evt_core.h"
#include "net_tools.h" #include "net_tools.h"
@ -44,13 +45,15 @@ struct measure_conf* create_measure_conf(int udp_sock, char* max_mes, char* plsi
return mc; return mc;
} }
void on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
ssize_t res; ssize_t res;
int secs, nsecs; int secs, nsecs;
uint64_t micro_sec; uint64_t micro_sec;
struct timespec curr; struct timespec curr;
struct measure_conf* mc = fdinfo->other; struct measure_conf* mc = fdinfo->other;
res = read(fdinfo->fd, mc->payload, mc->payload_size); res = read(fdinfo->fd, mc->payload, mc->payload_size);
if (res == -1 && errno == EAGAIN) return 1;
if (res != mc->payload_size) { if (res != mc->payload_size) {
perror("read error"); perror("read error");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
@ -70,9 +73,10 @@ void on_udp(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
printf("Measurement done\n"); printf("Measurement done\n");
exit(EXIT_SUCCESS); exit(EXIT_SUCCESS);
} }
return 0;
} }
void on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) { int on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
ssize_t s; ssize_t s;
uint64_t ticks; uint64_t ticks;
struct measure_conf* mc = fdinfo->other; struct measure_conf* mc = fdinfo->other;
@ -100,6 +104,7 @@ void on_timer(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
perror("Send error"); perror("Send error");
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
return 1;
} }
void free_timer_conf(void* v) { void free_timer_conf(void* v) {

View file

@ -25,6 +25,7 @@ def receive_next(sock):
def receive_and_send_one(sock): def receive_and_send_one(sock):
"Waits for a single datagram over the socket and echoes it back." "Waits for a single datagram over the socket and echoes it back."
input_data, addr = receive_next(sock) input_data, addr = receive_next(sock)
print(input_data)
#message = input_data.decode() #message = input_data.decode()
#logger.info("Received message from %s: %s (%s bytes).", addr, message, len(input_data)) #logger.info("Received message from %s: %s (%s bytes).", addr, message, len(input_data))
output_len = sock.sendto(input_data, addr) output_len = sock.sendto(input_data, addr)