Fixed one bug

This commit is contained in:
Quentin 2019-08-13 10:47:50 +02:00
parent bc202c07eb
commit d515cbf912
2 changed files with 48 additions and 23 deletions

View file

@ -23,6 +23,8 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
union abstract_packet *ap = (union abstract_packet*) &bp->ip; union abstract_packet *ap = (union abstract_packet*) &bp->ip;
struct dup2_ctx* dup2c = app_ctx->misc; struct dup2_ctx* dup2c = app_ctx->misc;
int32_t id = -1, port = -1; int32_t id = -1, port = -1;
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Received a buffer\n");
dump_buffer_packet(bp);
do { do {
switch (ap->fmt.headers.cmd) { switch (ap->fmt.headers.cmd) {
@ -36,14 +38,16 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
break; break;
} }
} while ((ap = ap_next(ap)) != NULL); } while ((ap = ap_next(ap)) != NULL);
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Extracted port=%d and id=%d\n", port, id);
if (port == -1 || id == -1) { if (port == -1 || id == -1) {
fprintf(stderr, "Missing data..."); fprintf(stderr, "Missing data port=%d and id=%d...\n", port, id);
exit(EXIT_FAILURE); exit(EXIT_FAILURE);
} }
// Check that received identifier has not been delivered // Check that received identifier has not been delivered
if (ring_ge(dup2c->recv_id, id)) { if (ring_ge(dup2c->recv_id, id)) {
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packet already delivered, dropping\n");
mv_buffer_atof(&app_ctx->br, fdinfo); mv_buffer_atof(&app_ctx->br, fdinfo);
return 0; return 0;
} }
@ -61,6 +65,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
} }
// 2. Move buffer // 2. Move buffer
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Scheduling packet for write\n");
mv_buffer_atow (&app_ctx->br, fdinfo, to_fdinfo); mv_buffer_atow (&app_ctx->br, fdinfo, to_fdinfo);
main_on_udp_write(ctx, to_fdinfo); main_on_udp_write(ctx, to_fdinfo);
@ -80,7 +85,11 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
}; };
buffer_append_ap (bp, &metadata); buffer_append_ap (bp, &metadata);
dump_buffer_packet(bp);
dup2c->emit_id = dup2c->emit_id + 1; dup2c->emit_id = dup2c->emit_id + 1;
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Added metadata\n");
struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write"); struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write");
for (int i = 0; i < app_ctx->ap.links; i++) { for (int i = 0; i < app_ctx->ap.links; i++) {
@ -95,6 +104,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
dup_buffer_tow (&app_ctx->br, bp, to_fdinfo); dup_buffer_tow (&app_ctx->br, bp, to_fdinfo);
main_on_tcp_write(ctx, to_fdinfo); main_on_tcp_write(ctx, to_fdinfo);
} }
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packets sent\n");
// 3. Release the buffer // 3. Release the buffer
mv_buffer_rtof (&app_ctx->br, fdinfo); mv_buffer_rtof (&app_ctx->br, fdinfo);

View file

@ -35,35 +35,50 @@ size_t get_full_size(struct buffer_packet* bp) {
void buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap) { void buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap) {
buffer_last_ap(bp)->fmt.headers.flags |= FLAG_READ_NEXT; buffer_last_ap(bp)->fmt.headers.flags |= FLAG_READ_NEXT;
memcpy(buffer_last_ap(bp), ap, ap->fmt.headers.size); memcpy(buffer_last_ap(bp), ap, ap->fmt.headers.size);
bp->ap_count++;
} }
enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) { enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
ssize_t nread; ssize_t nread = 0, ap_aread = 0, cur_ap_aread = 0;
union abstract_packet* ap = (union abstract_packet*) &bp->ip; union abstract_packet* ap = (union abstract_packet*) &bp->ip;
size_t pkt_size_size = sizeof(ap->fmt.headers.size); size_t pkt_size_size = sizeof(ap->fmt.headers.size);
if (bp->mode != BP_READING) return FDS_ERR; if (bp->mode != BP_READING) return FDS_ERR;
while (bp->aread < pkt_size_size) { fprintf(stderr, "Entering read_packet_from_tcp\n");
nread = read(fdinfo->fd, &(ap->raw) + bp->aread, pkt_size_size - bp->aread); do {
fprintf(stderr, "bp->ap_count=%d\n", bp->ap_count);
ap = (union abstract_packet*) &bp->ip;
for (int i = 0; i < bp->ap_count; i++) {
ap_aread += ap->fmt.headers.size;
ap = ap_next (ap);
}
cur_ap_aread = bp->aread - ap_aread;
fprintf(stderr, "[size] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread);
while (cur_ap_aread < pkt_size_size) {
nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, pkt_size_size - cur_ap_aread);
if (nread == 0) return FDS_AGAIN; if (nread == 0) return FDS_AGAIN;
if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nread == -1) return FDS_ERR; if (nread == -1) return FDS_ERR;
bp->aread += nread; bp->aread += nread;
cur_ap_aread += nread;
} }
while (bp->aread < ap->fmt.headers.size) { fprintf(stderr, "[content] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread);
nread = read(fdinfo->fd, &(ap->raw) + bp->aread, ap->fmt.headers.size - bp->aread); while (cur_ap_aread < ap->fmt.headers.size) {
nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, ap->fmt.headers.size - cur_ap_aread);
perror("read ap");
if (nread == 0) return FDS_AGAIN; if (nread == 0) return FDS_AGAIN;
if (nread == -1 && errno == EAGAIN) return FDS_AGAIN; if (nread == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nread == -1) return FDS_ERR; if (nread == -1) return FDS_ERR;
bp->aread += nread; bp->aread += nread;
cur_ap_aread += nread;
} }
bp->ap_count++; bp->ap_count++;
if (bp->ap_count != buffer_count_ap (bp)) { fprintf(stderr, "bp->ap_count=%d, buffer_count_ap(bp)=%ld\n", bp->ap_count, buffer_count_ap (bp));
printf(" Expected %ld packets in buffer, received %d\n", get_full_size(bp), bp->ap_count); } while (bp->ap_count != buffer_count_ap (bp));
return FDS_AGAIN;
}
bp->mode = BP_WRITING; bp->mode = BP_WRITING;
bp->awrite = 0; bp->awrite = 0;
@ -188,7 +203,7 @@ void dump_abstract_packet(union abstract_packet* ap) {
printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd); printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd);
switch (ap->fmt.headers.cmd) { switch (ap->fmt.headers.cmd) {
case CMD_LINK_MONITORING_THUNDER: case CMD_LINK_MONITORING_THUNDER:
printf(" <Health>id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x</Health>\n", printf(" <LinkMonitoringThunder>id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x</LinkMonitoringThunder>\n",
ap->fmt.content.link_monitoring_thunder.id, ap->fmt.content.link_monitoring_thunder.id,
ap->fmt.content.link_monitoring_thunder.deltat, ap->fmt.content.link_monitoring_thunder.deltat,
ap->fmt.content.link_monitoring_thunder.prevlink, ap->fmt.content.link_monitoring_thunder.prevlink,
@ -196,7 +211,7 @@ void dump_abstract_packet(union abstract_packet* ap) {
ap->fmt.content.link_monitoring_thunder.bitfield); ap->fmt.content.link_monitoring_thunder.bitfield);
break; break;
case CMD_UDP_METADATA_THUNDER: case CMD_UDP_METADATA_THUNDER:
printf(" <Clear>id=%d</Clear>\n", printf(" <UdpMetadataThunder>id=%d</UdpMetadataThunder>\n",
ap->fmt.content.udp_metadata_thunder.id); ap->fmt.content.udp_metadata_thunder.id);
break; break;
case CMD_UDP_ENCAPSULATED: case CMD_UDP_ENCAPSULATED: