diff --git a/src/algo_rr.c b/src/algo_rr.c index b822c5e..b2fd1eb 100644 --- a/src/algo_rr.c +++ b/src/algo_rr.c @@ -50,6 +50,12 @@ void show_link_availability(struct rr_ctx* rr) { printf("]\n"); } +void blacklist_link(struct rr_ctx* rr, int sel_link) { + printf("Blacklist link=%d | ", sel_link); + rr->remote_links &= 0xff ^ 1 << sel_link; + show_link_availability (rr); +} + void on_timeout_health (struct evt_core_ctx* ctx, void* user); void rr_pkt_register(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { @@ -204,6 +210,7 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo struct rr_ctx* rr = app_ctx->misc; struct evt_core_fdinfo *to_fdinfo = NULL; uint16_t min_pkt; + guint len; char url[255]; size_t health_packet_size = sizeof(bp->ip.ap.fmt.headers) + sizeof(bp->ip.ap.fmt.content.health); size_t max_size = sizeof(struct internet_packet) - health_packet_size; @@ -273,7 +280,17 @@ int algo_rr_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo sel_link = (sel_link + 1) % app_ctx->ap.links; sprintf(url, "tcp:write:127.0.0.1:%d", 7500 + sel_link); //@FIXME Hardcoded to_fdinfo = evt_core_get_from_url (ctx, url); - if (to_fdinfo == NULL) continue; // Missing link + if (to_fdinfo == NULL) { + if (ctx->verbose) fprintf(stderr, " [algo/rr] write fd %s has not been found, skipping\n", url); + continue; + } + + if ((len = write_queue_len (app_ctx, to_fdinfo)) > 0) { + if (ctx->verbose) fprintf(stderr, " [algo/rr] write queue of %s is not empty (%d), skipping and deactivating\n", to_fdinfo->url, len); + blacklist_link (rr, sel_link); + continue; + } + if (!app_ctx->ap.is_healing /* if healing deactivated */|| rr->my_links & (1 << sel_link) /* if link not down */ ) { rr->current_link = sel_link; mv_buffer_rtow (app_ctx, fdinfo, to_fdinfo); @@ -309,11 +326,7 @@ void on_timeout_health (struct evt_core_ctx* ctx, void* raw) { uint16_t prev_health_id = (t->health_id - 1); uint16_t prev_health_idx = prev_health_id % PACKET_BUFFER_SIZE; struct timer_info* t_old = &rr->wait[prev_health_idx]; - if (t_old->health_id != prev_health_id) { - printf("Blacklist link=%d | ", t->prevlink); - rr->remote_links &= 0xff ^ 1 << t->prevlink; - show_link_availability (rr); - } + if (t_old->health_id != prev_health_id) blacklist_link (rr, t->prevlink); // 3. Deliver blocked packets // @FIXME CRAPPY CODE / CRAPPY LOGIC diff --git a/src/algo_utils.c b/src/algo_utils.c index 33c744a..6cad14b 100644 --- a/src/algo_utils.c +++ b/src/algo_utils.c @@ -67,6 +67,13 @@ void __push_to_free(struct algo_ctx *app_ctx, struct buffer_packet* bp) { g_queue_push_tail (app_ctx->free_buffer, bp); } +guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo) { + GQueue* q; + + if ((q = g_hash_table_lookup(app_ctx->write_waiting, &(fdinfo->fd))) == NULL) return 0; // No queue + return q->length; +} + /** * Returns a buffer if available, NULL otherwise */ diff --git a/src/algo_utils.h b/src/algo_utils.h index 76154de..b5c093d 100644 --- a/src/algo_utils.h +++ b/src/algo_utils.h @@ -52,6 +52,7 @@ void mv_buffer_rtoa(struct algo_ctx* app_ctx, struct evt_core_fdinfo* from, void void mv_buffer_atow(struct algo_ctx* app_ctx, void* from, struct evt_core_fdinfo* to); void mv_buffer_atof(struct algo_ctx* app_ctx, void* from); struct buffer_packet* dup_buffer_tow(struct algo_ctx* app_ctx, struct buffer_packet* bp, struct evt_core_fdinfo* to); +guint write_queue_len(struct algo_ctx *app_ctx, struct evt_core_fdinfo *fdinfo); int append_buffer(union abstract_packet* dest, int pos, union abstract_packet* src);