Avoid queue that are not empty

This commit is contained in:
Quentin 2019-05-24 15:51:49 +02:00
parent 1f7256ef5e
commit 81040b16b9
3 changed files with 27 additions and 6 deletions

View file

@ -50,6 +50,12 @@ void show_link_availability(struct rr_ctx* rr) {
printf("]\n"); printf("]\n");
} }
void blacklist_link(struct rr_ctx* rr, int sel_link) {
printf("Blacklist link=%d | ", sel_link);
rr->remote_links &= 0xff ^ 1 << sel_link;
show_link_availability (rr);
}
void on_timeout_health (struct evt_core_ctx* ctx, void* user); void on_timeout_health (struct evt_core_ctx* ctx, void* user);
void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
@ -204,6 +210,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
struct rr_ctx* rr = app_ctx->misc; struct rr_ctx* rr = app_ctx->misc;
struct evt_core_fdinfo *to_fdinfo = NULL; struct evt_core_fdinfo *to_fdinfo = NULL;
uint16_t min_pkt; uint16_t min_pkt;
guint len;
char url[255]; char url[255];
size_t health_packet_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); size_t health_packet_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health);
size_t max_size = sizeof(struct internet_packet) - health_packet_size; size_t max_size = sizeof(struct internet_packet) - health_packet_size;
@ -273,7 +280,17 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
sel_link = (sel_link + 1) % app_ctx->ap.links; sel_link = (sel_link + 1) % app_ctx->ap.links;
sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded
to_fdinfo = evt_core_get_from_url (ctx, url); to_fdinfo = evt_core_get_from_url (ctx, url);
if (to_fdinfo == NULL) continue; // Missing link if (to_fdinfo == NULL) {
if (ctx->verbose) fprintf(stderr, " [algo/rr] write fd %s has not been found, skipping\n", url);
continue;
}
if ((len = write_queue_len (app_ctx, to_fdinfo)) > 0) {
if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len);
blacklist_link (rr, sel_link);
continue;
}
if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) {
rr->current_link = sel_link; rr->current_link = sel_link;
mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo);
@ -309,11 +326,7 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) {
uint16_t prev_health_id = (t->health_id - 1); uint16_t prev_health_id = (t->health_id - 1);
uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE;
struct timer_info* t_old = &rr->wait[prev_health_idx]; struct timer_info* t_old = &rr->wait[prev_health_idx];
if (t_old->health_id != prev_health_id) { if (t_old->health_id != prev_health_id) blacklist_link (rr, t->prevlink);
printf("Blacklist link=%d | ", t->prevlink);
rr->remote_links &= 0xff ^ 1 << t->prevlink;
show_link_availability (rr);
}
// 3. Deliver blocked packets // 3. Deliver blocked packets
// @FIXME CRAPPY CODE / CRAPPY LOGIC // @FIXME CRAPPY CODE / CRAPPY LOGIC

View file

@ -67,6 +67,13 @@ void __push_to_free(struct algo_ctx *app_ctx, struct buffer_packet* bp) {
g_queue_push_tail (app_ctx->free_buffer, bp); g_queue_push_tail (app_ctx->free_buffer, bp);
} }
guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) {
GQueue* q;
if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue
return q->length;
}
/** /**
* Returns a buffer if available, NULL otherwise * Returns a buffer if available, NULL otherwise
*/ */

View file

@ -52,6 +52,7 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void
void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to);
void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); void mv_buffer_atof(struct algo_ctx* app_ctx, void* from);
struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to);
guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo);
int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src);