Add a stream repair function

This commit is contained in:
Quentin 2020-01-20 16:35:52 +01:00
parent a319c3d9be
commit 6f579386f6
5 changed files with 67 additions and 4 deletions

View file

@ -114,6 +114,11 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
} }
int algo_dup2_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { int algo_dup2_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) {
// We do nothing struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
if (strcmp("tcp-read", fdinfo->cat->name) == 0 || strcmp("tcp-write", fdinfo->cat->name) == 0)
return app_ctx->ap.sr(ctx, fdinfo);
fprintf(stderr, "%s is not eligible for a reconnect\n", fdinfo->url);
// We do nothing
return 1; return 1;
} }

View file

@ -562,5 +562,11 @@ int algo_lightning_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo*
} }
int algo_lightning_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) { int algo_lightning_on_err(struct evt_core_ctx *ctx, struct evt_core_fdinfo *fdinfo) {
return 0; struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
if (strcmp("tcp-read", fdinfo->cat->name) == 0 || strcmp("tcp-write", fdinfo->cat->name) == 0)
return app_ctx->ap.sr(ctx, fdinfo);
fprintf(stderr, "%s is not eligible for a reconnect\n", fdinfo->url);
// We do nothing
return 1;
} }

View file

@ -69,6 +69,48 @@ void init_socks5_sinks(struct donar_client_ctx* app_ctx) {
evt_core_add_cat(&app_ctx->evts, &template); evt_core_add_cat(&app_ctx->evts, &template);
} }
int donar_client_stream_repair(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo) {
// @FIXME: Ugly way to get donar_client_ctx. Shame on me :/
fprintf(stdout, "[donar-client] %s broke\n", fdinfo->url);
struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "socks5-failed");
if (cat == NULL) {
fprintf(stderr, "Unable to reconnect stream as socks5-failed cat is not available...\n");
exit(EXIT_FAILURE);
}
struct donar_client_ctx* app_ctx = cat->app_ctx;
struct evt_core_fdinfo* fdtarget = NULL;
int port = url_get_port_int (fdinfo->url);
int pos = port - 7500, removed = 0;
char buffer[256];
sprintf(buffer, "tcp:read:127.0.0.1:%d", port);
fdtarget = evt_core_get_from_url (ctx, buffer);
if (fdtarget != NULL) {
evt_core_rm_fd(ctx, fdtarget->fd);
removed++;
}
sprintf(buffer, "tcp:write:127.0.0.1:%d", port);
fdtarget = evt_core_get_from_url (ctx, buffer);
if (fdtarget != NULL) {
evt_core_rm_fd(ctx, fdtarget->fd);
removed++;
}
if (removed == 2) {
fprintf(stdout, "[donar-client] Retriggering socks5 for port %d\n", port);
init_socks5_client (app_ctx, pos);
return 0;
} else if (removed == 0) {
fprintf(stdout, "[donar-client] Socks5 has already been retriggered for port %d\n", port);
return 0;
} else {
fprintf(stderr, "[donar-client] We only removed 1 link and not 2 for port %d, strange behaviour, exiting...\n", port);
exit(EXIT_FAILURE);
}
}
void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) { void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) {
struct algo_params ap = { struct algo_params ap = {
.is_waiting_bootstrap = dp->is_waiting_bootstrap, .is_waiting_bootstrap = dp->is_waiting_bootstrap,
@ -77,7 +119,8 @@ void donar_client(struct donar_client_ctx* ctx, struct donar_params* dp) {
.links = dp->links, .links = dp->links,
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data, .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file .capture_file = dp->capture_file,
.sr = donar_client_stream_repair
}; };
evt_core_init (&(ctx->evts), dp->verbose); evt_core_init (&(ctx->evts), dp->verbose);

View file

@ -51,6 +51,11 @@ socket_create_err:
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
int donar_server_stream_repair(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fd) {
fprintf(stderr, "I am a server, I do nothing with broken streams...\n");
return 1;
}
void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) { void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
struct algo_params ap = { struct algo_params ap = {
.is_waiting_bootstrap = dp->is_waiting_bootstrap, .is_waiting_bootstrap = dp->is_waiting_bootstrap,
@ -59,7 +64,8 @@ void donar_server(struct donar_server_ctx* ctx, struct donar_params* dp) {
.links = dp->links, .links = dp->links,
.fresh_data = dp->fresh_data, .fresh_data = dp->fresh_data,
.redundant_data = dp->redundant_data, .redundant_data = dp->redundant_data,
.capture_file = dp->capture_file .capture_file = dp->capture_file,
.sr = donar_server_stream_repair
}; };
evt_core_init (&(ctx->evts), dp->verbose); evt_core_init (&(ctx->evts), dp->verbose);

View file

@ -11,10 +11,13 @@
#include "utils.h" #include "utils.h"
#include "packet.h" #include "packet.h"
typedef int (*stream_repair)(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo);
struct algo_params { struct algo_params {
uint8_t is_waiting_bootstrap; uint8_t is_waiting_bootstrap;
char *algo_name, *capture_file, *algo_specific_params; char *algo_name, *capture_file, *algo_specific_params;
int links, fresh_data, redundant_data; int links, fresh_data, redundant_data;
stream_repair sr;
}; };
struct algo_ctx; struct algo_ctx;