diff --git a/src/algo_dup2.c b/src/algo_dup2.c index 7750df0..c3f1847 100644 --- a/src/algo_dup2.c +++ b/src/algo_dup2.c @@ -23,6 +23,8 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo union abstract_packet *ap = (union abstract_packet*) &bp->ip; struct dup2_ctx* dup2c = app_ctx->misc; int32_t id = -1, port = -1; + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Received a buffer\n"); + dump_buffer_packet(bp); do { switch (ap->fmt.headers.cmd) { @@ -36,14 +38,16 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo break; } } while ((ap = ap_next(ap)) != NULL); + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Extracted port=%d and id=%d\n", port, id); if (port == -1 || id == -1) { - fprintf(stderr, "Missing data..."); + fprintf(stderr, "Missing data port=%d and id=%d...\n", port, id); exit(EXIT_FAILURE); } // Check that received identifier has not been delivered if (ring_ge(dup2c->recv_id, id)) { + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packet already delivered, dropping\n"); mv_buffer_atof(&app_ctx->br, fdinfo); return 0; } @@ -61,6 +65,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo } // 2. Move buffer + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Scheduling packet for write\n"); mv_buffer_atow (&app_ctx->br, fdinfo, to_fdinfo); main_on_udp_write(ctx, to_fdinfo); @@ -80,7 +85,11 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin }; buffer_append_ap (bp, &metadata); + dump_buffer_packet(bp); + dup2c->emit_id = dup2c->emit_id + 1; + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Added metadata\n"); + struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); for (int i = 0; i < app_ctx->ap.links; i++) { @@ -95,6 +104,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo); } + if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packets sent\n"); // 3. Release the buffer mv_buffer_rtof (&app_ctx->br, fdinfo); diff --git a/src/packet.c b/src/packet.c index 1042343..f5879c0 100644 --- a/src/packet.c +++ b/src/packet.c @@ -35,35 +35,50 @@ size_t get_full_size(struct buffer_packet* bp) { void buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap) { buffer_last_ap(bp)->fmt.headers.flags |= FLAG_READ_NEXT; memcpy(buffer_last_ap(bp), ap, ap->fmt.headers.size); + bp->ap_count++; } enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { - ssize_t nread; + ssize_t nread = 0, ap_aread = 0, cur_ap_aread = 0; union abstract_packet* ap = (union abstract_packet*) &bp->ip; size_t pkt_size_size = sizeof(ap->fmt.headers.size); if (bp->mode != BP_READING) return FDS_ERR; - while (bp->aread < pkt_size_size) { - nread = read(fdinfo->fd, &(ap->raw) + bp->aread, pkt_size_size - bp->aread); - if (nread == 0) return FDS_AGAIN; - if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; - if (nread == -1) return FDS_ERR; - bp->aread += nread; - } + fprintf(stderr, "Entering read_packet_from_tcp\n"); + do { - while (bp->aread < ap->fmt.headers.size) { - nread = read(fdinfo->fd, &(ap->raw) + bp->aread, ap->fmt.headers.size - bp->aread); - if (nread == 0) return FDS_AGAIN; - if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; - if (nread == -1) return FDS_ERR; - bp->aread += nread; - } + fprintf(stderr, "bp->ap_count=%d\n", bp->ap_count); + ap = (union abstract_packet*) &bp->ip; + for (int i = 0; i < bp->ap_count; i++) { + ap_aread += ap->fmt.headers.size; + ap = ap_next (ap); + } + cur_ap_aread = bp->aread - ap_aread; - bp->ap_count++; - if (bp->ap_count != buffer_count_ap (bp)) { - printf(" Expected %ld packets in buffer, received %d\n", get_full_size(bp), bp->ap_count); - return FDS_AGAIN; - } + fprintf(stderr, "[size] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread); + while (cur_ap_aread < pkt_size_size) { + nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, pkt_size_size - cur_ap_aread); + if (nread == 0) return FDS_AGAIN; + if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; + if (nread == -1) return FDS_ERR; + bp->aread += nread; + cur_ap_aread += nread; + } + + fprintf(stderr, "[content] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread); + while (cur_ap_aread < ap->fmt.headers.size) { + nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, ap->fmt.headers.size - cur_ap_aread); + perror("read ap"); + if (nread == 0) return FDS_AGAIN; + if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; + if (nread == -1) return FDS_ERR; + bp->aread += nread; + cur_ap_aread += nread; + } + + bp->ap_count++; + fprintf(stderr, "bp->ap_count=%d, buffer_count_ap(bp)=%ld\n", bp->ap_count, buffer_count_ap (bp)); + } while (bp->ap_count != buffer_count_ap (bp)); bp->mode = BP_WRITING; bp->awrite = 0; @@ -188,7 +203,7 @@ void dump_abstract_packet(union abstract_packet* ap) { printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd); switch (ap->fmt.headers.cmd) { case CMD_LINK_MONITORING_THUNDER: - printf(" id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x\n", + printf(" id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x\n", ap->fmt.content.link_monitoring_thunder.id, ap->fmt.content.link_monitoring_thunder.deltat, ap->fmt.content.link_monitoring_thunder.prevlink, @@ -196,7 +211,7 @@ void dump_abstract_packet(union abstract_packet* ap) { ap->fmt.content.link_monitoring_thunder.bitfield); break; case CMD_UDP_METADATA_THUNDER: - printf(" id=%d\n", + printf(" id=%d\n", ap->fmt.content.udp_metadata_thunder.id); break; case CMD_UDP_ENCAPSULATED: