Introduce a ring buffer

This commit is contained in:
Quentin Dufour 2019-02-12 11:17:37 +01:00
parent f93853f974
commit debe0feb56
9 changed files with 177 additions and 61 deletions

View file

@ -18,6 +18,8 @@ list(APPEND CSOURCES
src/evt_core.c
src/algo_skel.h
src/algo_naive.c
src/utils.h
src/utils.c
)
add_executable(donar-proxy ${CSOURCES} src/donar_proxy.c)

View file

@ -1,6 +1,12 @@
#include "algo_skel.h"
#define NAIVE_BUFFER 128
struct naive_ctx {
struct ring_buffer rb;
};
void free_nothing(void* app_ctx) {}
void free_naive(void* app_ctx) {
if (app_ctx != NULL) free(app_ctx);
}
@ -13,12 +19,10 @@ void on_tcp_co(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) {
in_len = sizeof(in_addr);
//while (1) {
conn_sock = accept(fd, &in_addr, &in_len);
//if (conn_sock == -1 && (errno == EAGAIN || errno == EWOULDBLOCK)) break;
if (conn_sock == -1) goto co_error;
evt_core_add_fd (ctx, "tcp-data", fd);
//}
evt_core_add_fd (ctx, "tcp-read", fd);
evt_core_add_fd (ctx, "tcp-write", fd);
return;
@ -27,26 +31,31 @@ co_error:
exit(EXIT_FAILURE);
}
void on_tcp_data(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) {
void tcp_to_udp(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) {
// Get target file descriptor
struct evt_core_cat* udp = g_hash_table_lookup (ctx->catlist, "udp-data");
if (udp == NULL || udp->socklist->len < 1) goto co_error;
int udp_fd = g_array_index(udp->socklist, int, 0);
char buffer[NAIVE_BUFFER];
int nread, nwrite;
nread = read(fd, buffer, sizeof(char) * NAIVE_BUFFER);
if (nread == 0) return;
//if (nread == -1 && errno == EAGAIN) return;
if (nread == -1) goto co_error;
nwrite = write(udp_fd, buffer, nread);
if (nwrite == -1 && errno == EAGAIN) {
printf("Lost data EAGAIN\n");
return;
}
if (nwrite == -1) goto co_error;
if (nread != nwrite) {
printf("Lost data not everything has been written\n");
return;
// Init data structures for the transfer
struct naive_ctx* app_ctx = cat->app_ctx;
struct ring_buffer* rb = &(app_ctx->rb);
char buffer[RING_BUFFER_SIZE];
int nread, nwrite, rb_free_space;
while (1) {
rb_free_space = ring_buffer_free_space (rb); // We can't afford to read more
nread = read(fd, buffer, rb_free_space); // Effective read
if (nread == 0) return; // End of file
if (nread == -1 && errno == EAGAIN) return; // No more data to read
if (nread == -1) goto co_error; // A bad error
ring_buffer_write(rb, buffer, nread); // Persist read data in our buffer
nread = ring_buffer_read(rb, buffer, RING_BUFFER_SIZE);
nwrite = write(udp_fd, buffer, nread);
if (nwrite == -1 && errno == EAGAIN) return;
if (nwrite == -1) goto co_error;
ring_buffer_ack_read (rb, nwrite);
}
return;
@ -55,26 +64,31 @@ co_error:
exit(EXIT_FAILURE);
}
void on_udp_data(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) {
void udp_to_tcp(struct evt_core_ctx* ctx, struct evt_core_cat* cat, int fd) {
// Get target file descriptor
struct evt_core_cat* tcp = g_hash_table_lookup (ctx->catlist, "tcp-data");
if (tcp == NULL || tcp->socklist->len < 1) goto co_error;
int tcp_fd = g_array_index(tcp->socklist, int, 0);
char buffer[NAIVE_BUFFER];
int nread, nwrite;
nread = read(fd, buffer, sizeof(char) * NAIVE_BUFFER);
if (nread == 0) return;
//if (nread == -1 && errno == EAGAIN) return;
if (nread == -1) goto co_error;
nwrite = write(tcp_fd, buffer, nread);
if (nwrite == -1 && errno == EAGAIN) {
printf("Lost data EAGAIN\n");
return;
}
if (nwrite == -1) goto co_error;
if (nread != nwrite) {
printf("Lost data not everything has been written\n");
return;
// Init data structures for the transfer
struct naive_ctx* app_ctx = cat->app_ctx;
struct ring_buffer* rb = &(app_ctx->rb);
char buffer[RING_BUFFER_SIZE];
int nread, nwrite, rb_free_space;
while (1) {
rb_free_space = ring_buffer_free_space (rb); // We can't afford to read more
nread = read(fd, buffer, rb_free_space); // Effective read
if (nread == 0) return; // End of file
if (nread == -1 && errno == EAGAIN) return; // No more data to read
if (nread == -1) goto co_error; // A bad error
ring_buffer_write(rb, buffer, nread); // Persist read data in our buffer
nread = ring_buffer_read(rb, buffer, RING_BUFFER_SIZE);
nwrite = write(tcp_fd, buffer, nread);
if (nwrite == -1 && errno == EAGAIN) return;
if (nwrite == -1) goto co_error;
ring_buffer_ack_read (rb, nwrite);
}
return;
@ -85,29 +99,43 @@ co_error:
}
void algo_naive(struct algo_skel* as) {
as->on_tcp_data.name = "tcp-data";
as->on_tcp_data.flags = 0;
as->on_tcp_data.app_ctx = NULL;
as->on_tcp_data.free_app_ctx = free_naive;
as->on_tcp_data.cb = on_tcp_data;
as->on_tcp_data.socklist = NULL;
as->on_tcp_co.name = "tcp-listen";
as->on_tcp_co.flags = 0;
as->on_tcp_co.flags = EPOLLIN;
as->on_tcp_co.app_ctx = NULL;
as->on_tcp_co.free_app_ctx = free_naive;
as->on_tcp_co.cb = on_tcp_co;
as->on_tcp_co.socklist = NULL;
as->on_udp_data.name = "udp-data";
as->on_udp_data.flags = 0;
as->on_udp_data.app_ctx = NULL;
as->on_udp_data.free_app_ctx = free_naive;
as->on_udp_data.cb = on_udp_data;
as->on_udp_data.socklist = NULL;
as->on_tcp_read.name = "tcp-read";
as->on_tcp_read.flags = EPOLLIN | EPOLLET;
as->on_tcp_read.app_ctx = malloc(sizeof(struct naive_ctx));
as->on_tcp_read.free_app_ctx = free_naive;
as->on_tcp_read.cb = tcp_to_udp;
as->on_tcp_read.socklist = NULL;
/*if (as->on_tcp_data.app_ctx == NULL || as->on_udp_data.app_ctx == NULL) {
as->on_udp_read.name = "udp-read";
as->on_udp_read.flags = EPOLLIN | EPOLLET;
as->on_udp_read.app_ctx = malloc(sizeof(struct naive_ctx));
as->on_udp_read.free_app_ctx = free_naive;
as->on_udp_read.cb = tcp_to_udp;
as->on_udp_read.socklist = NULL;
as->on_tcp_write.name = "tcp-write";
as->on_tcp_write.flags = EPOLLOUT | EPOLLET;
as->on_tcp_write.app_ctx = as->on_udp_read.app_ctx;
as->on_tcp_write.free_app_ctx = free_nothing;
as->on_tcp_write.cb = udp_to_tcp;
as->on_tcp_write.socklist = NULL;
as->on_udp_write.name = "udp-write";
as->on_udp_write.flags = EPOLLOUT | EPOLLET;
as->on_udp_write.app_ctx = as->on_tcp_read.app_ctx;
as->on_udp_write.free_app_ctx = free_nothing;
as->on_udp_write.cb = tcp_to_udp;
as->on_udp_write.socklist = NULL;
if (as->on_tcp_read.app_ctx == NULL || as->on_udp_read.app_ctx == NULL) {
fprintf(stderr, "Failed to malloc naive_ctx\n");
exit(EXIT_FAILURE);
}*/
}
}

View file

@ -3,10 +3,13 @@
#include <stdio.h>
#include <errno.h>
#include "evt_core.h"
#include "utils.h"
struct algo_skel {
struct evt_core_cat on_udp_data;
struct evt_core_cat on_tcp_data;
struct evt_core_cat on_udp_read;
struct evt_core_cat on_tcp_read;
struct evt_core_cat on_udp_write;
struct evt_core_cat on_tcp_write;
struct evt_core_cat on_tcp_co;
};

View file

@ -43,12 +43,15 @@ socket_create_err:
void donar_server(struct donar_server_ctx* ctx, struct algo_skel* algo, char* udp_host, char* udp_port) {
evt_core_init (&(ctx->evts));
evt_core_add_cat (&(ctx->evts), &(algo->on_udp_data));
evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_data));
evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_co));
evt_core_add_cat (&(ctx->evts), &(algo->on_udp_read));
evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_read));
evt_core_add_cat (&(ctx->evts), &(algo->on_udp_write));
evt_core_add_cat (&(ctx->evts), &(algo->on_tcp_write));
int sock = create_udp_client (udp_host, udp_port);
evt_core_add_fd (&(ctx->evts), "udp-data", sock);
evt_core_add_fd (&(ctx->evts), "udp-read", sock);
evt_core_add_fd (&(ctx->evts), "udp-write", sock);
for (uint16_t i = 0; i < PORT_SIZE ; i++) {
ctx->ports[i] = 7500 + i;

View file

@ -81,7 +81,7 @@ void evt_core_add_fd(struct evt_core_ctx* ctx, char* name, int fd) {
g_array_append_val (cat->socklist, fd);
g_hash_table_insert(ctx->socklist, key, cat);
add_fd_to_epoll(ctx->epollfd, fd);
add_fd_to_epoll(ctx->epollfd, fd, cat->flags);
}
void evt_core_free(struct evt_core_ctx* ctx) {

View file

@ -128,12 +128,12 @@ void fill_buffer(size_t* written, char* dest, void *src, size_t n) {
* You need to read everything before going back to epoll
* Which means keeping state too
*/
void add_fd_to_epoll(int epollfd, int fd) {
void add_fd_to_epoll(int epollfd, int fd, int flags) {
make_socket_non_blocking (fd);
struct epoll_event current_event;
//current_event.events = EPOLLIN | EPOLLET;
current_event.events = EPOLLIN;
current_event.events = flags;
current_event.data.fd = fd;
if (epoll_ctl (epollfd, EPOLL_CTL_ADD, fd, &current_event) == -1) {
perror("Failed to add a file descriptor to epoll with epoll_ctl");

View file

@ -12,6 +12,6 @@ int create_tcp_client(char* host, char* service);
int create_udp_client(char* host, char* service);
int create_tcp_server(char* service);
int make_socket_non_blocking(int fd);
void add_fd_to_epoll(int epollfd, int fd);
void add_fd_to_epoll(int epollfd, int fd, int flags);
int read_entity(int fd, void* entity, int size);
void fill_buffer(size_t* written, char* dest, void *src, size_t n);

61
src/utils.c Normal file
View file

@ -0,0 +1,61 @@
#include "utils.h"
int ring_buffer_read(struct ring_buffer* rb, char* dest, int size) {
int slice1 = size;
int slice2 = 0;
int used_space = ring_buffer_used_space (rb);
if (used_space < slice1)
slice1 = used_space;
if (RING_BUFFER_SIZE - rb->head < slice1) {
slice1 = RING_BUFFER_SIZE - rb->head;
slice2 = size - slice1;
if (used_space - slice1 < slice2)
slice2 = used_space - slice1;
}
memcpy(dest, rb->buffer + rb->head, slice1);
memcpy(dest+slice1, rb->buffer, slice2);
return slice1 + slice2;
}
void ring_buffer_ack_read(struct ring_buffer* rb, int size) {
if (size > ring_buffer_used_space (rb)) {
fprintf(stderr, "You try to ack more data than contained in the ring buffer\n");
exit(EXIT_FAILURE);
}
rb->head = (rb->head + size) % RING_BUFFER_SIZE;
}
int ring_buffer_write(struct ring_buffer* rb, char* source, int size) {
if (size > ring_buffer_free_space (rb)) {
fprintf(stderr, "You try to write more data than available space in the buffer\n");
exit(EXIT_FAILURE);
}
int slice1 = size;
int slice2 = 0;
if (RING_BUFFER_SIZE - rb->tail < slice1) {
slice1 = RING_BUFFER_SIZE - rb->tail;
slice2 = size - slice1;
}
memcpy(rb->buffer + rb->tail, source, slice1);
memcpy(rb->buffer, source + slice1, slice2);
rb->tail = (rb->tail + slice1 + slice2) % RING_BUFFER_SIZE;
return slice1 + slice2;
}
int ring_buffer_free_space(struct ring_buffer* rb) {
if (rb->head > rb->tail) return rb->head - rb->tail;
return RING_BUFFER_SIZE + (rb->tail - rb->head);
}
int ring_buffer_used_space(struct ring_buffer* rb) {
return RING_BUFFER_SIZE - ring_buffer_free_space (rb);
}

19
src/utils.h Normal file
View file

@ -0,0 +1,19 @@
#pragma once
#include <stdio.h>
#include <stdlib.h>
#include <stdint.h>
#include <string.h>
#define RING_BUFFER_SIZE 1024
struct ring_buffer {
char buffer[RING_BUFFER_SIZE];
int head;
int tail;
};
int ring_buffer_read(struct ring_buffer* rb, char* dest, int size);
void ring_buffer_ack_read(struct ring_buffer* rb, int size);
int ring_buffer_write(struct ring_buffer* rb, char* source, int size);
int ring_buffer_free_space(struct ring_buffer* rb);
int ring_buffer_used_space(struct ring_buffer* rb);