Fix another bug!
This commit is contained in:
parent
d515cbf912
commit
0c50ca5a42
2 changed files with 9 additions and 12 deletions
|
@ -48,7 +48,7 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
|
||||||
// Check that received identifier has not been delivered
|
// Check that received identifier has not been delivered
|
||||||
if (ring_ge(dup2c->recv_id, id)) {
|
if (ring_ge(dup2c->recv_id, id)) {
|
||||||
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packet already delivered, dropping\n");
|
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Packet already delivered, dropping\n");
|
||||||
mv_buffer_atof(&app_ctx->br, fdinfo);
|
mv_buffer_rtof(&app_ctx->br, fdinfo);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -60,13 +60,13 @@ int algo_dup2_on_stream(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdinfo
|
||||||
to_fdinfo = evt_core_get_from_url (ctx, url);
|
to_fdinfo = evt_core_get_from_url (ctx, url);
|
||||||
if (to_fdinfo == NULL) {
|
if (to_fdinfo == NULL) {
|
||||||
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
|
fprintf(stderr, "No fd for URL %s in tcp-read. Dropping packet :( \n", url);
|
||||||
mv_buffer_atof (&app_ctx->br, fdinfo);
|
mv_buffer_rtof (&app_ctx->br, fdinfo);
|
||||||
return 1;
|
return 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Move buffer
|
// 2. Move buffer
|
||||||
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Scheduling packet for write\n");
|
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Scheduling packet for write\n");
|
||||||
mv_buffer_atow (&app_ctx->br, fdinfo, to_fdinfo);
|
mv_buffer_rtow (&app_ctx->br, fdinfo, to_fdinfo);
|
||||||
main_on_udp_write(ctx, to_fdinfo);
|
main_on_udp_write(ctx, to_fdinfo);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -77,6 +77,7 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
|
||||||
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
|
struct algo_ctx* app_ctx = fdinfo->cat->app_ctx;
|
||||||
|
|
||||||
struct dup2_ctx* dup2c = app_ctx->misc;
|
struct dup2_ctx* dup2c = app_ctx->misc;
|
||||||
|
dup2c->emit_id = dup2c->emit_id + 1;
|
||||||
union abstract_packet metadata = {
|
union abstract_packet metadata = {
|
||||||
.fmt.headers.cmd = CMD_UDP_METADATA_THUNDER,
|
.fmt.headers.cmd = CMD_UDP_METADATA_THUNDER,
|
||||||
.fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder),
|
.fmt.headers.size = sizeof(metadata.fmt.headers) + sizeof(metadata.fmt.content.udp_metadata_thunder),
|
||||||
|
@ -84,14 +85,10 @@ int algo_dup2_on_datagram(struct evt_core_ctx* ctx, struct evt_core_fdinfo* fdin
|
||||||
.fmt.content.udp_metadata_thunder.id = dup2c->emit_id
|
.fmt.content.udp_metadata_thunder.id = dup2c->emit_id
|
||||||
};
|
};
|
||||||
buffer_append_ap (bp, &metadata);
|
buffer_append_ap (bp, &metadata);
|
||||||
|
|
||||||
dump_buffer_packet(bp);
|
dump_buffer_packet(bp);
|
||||||
|
|
||||||
dup2c->emit_id = dup2c->emit_id + 1;
|
|
||||||
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Added metadata\n");
|
if (ctx->verbose > 1) fprintf(stderr, " [algo_dup2] Added metadata\n");
|
||||||
|
|
||||||
struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write");
|
struct evt_core_cat* cat = evt_core_get_from_cat (ctx, "tcp-write");
|
||||||
|
|
||||||
for (int i = 0; i < app_ctx->ap.links; i++) {
|
for (int i = 0; i < app_ctx->ap.links; i++) {
|
||||||
// 1. A whole packet has been read, we will find someone to write it
|
// 1. A whole packet has been read, we will find someone to write it
|
||||||
to_fdinfo = cat->socklist->len > i ? g_array_index(cat->socklist, struct evt_core_fdinfo*, i) : NULL;
|
to_fdinfo = cat->socklist->len > i ? g_array_index(cat->socklist, struct evt_core_fdinfo*, i) : NULL;
|
||||||
|
|
10
src/packet.c
10
src/packet.c
|
@ -44,10 +44,10 @@ enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer
|
||||||
size_t pkt_size_size = sizeof(ap->fmt.headers.size);
|
size_t pkt_size_size = sizeof(ap->fmt.headers.size);
|
||||||
if (bp->mode != BP_READING) return FDS_ERR;
|
if (bp->mode != BP_READING) return FDS_ERR;
|
||||||
|
|
||||||
fprintf(stderr, "Entering read_packet_from_tcp\n");
|
//fprintf(stderr, "Entering read_packet_from_tcp\n");
|
||||||
do {
|
do {
|
||||||
|
|
||||||
fprintf(stderr, "bp->ap_count=%d\n", bp->ap_count);
|
//fprintf(stderr, "bp->ap_count=%d\n", bp->ap_count);
|
||||||
ap = (union abstract_packet*) &bp->ip;
|
ap = (union abstract_packet*) &bp->ip;
|
||||||
for (int i = 0; i < bp->ap_count; i++) {
|
for (int i = 0; i < bp->ap_count; i++) {
|
||||||
ap_aread += ap->fmt.headers.size;
|
ap_aread += ap->fmt.headers.size;
|
||||||
|
@ -55,7 +55,7 @@ enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer
|
||||||
}
|
}
|
||||||
cur_ap_aread = bp->aread - ap_aread;
|
cur_ap_aread = bp->aread - ap_aread;
|
||||||
|
|
||||||
fprintf(stderr, "[size] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread);
|
//fprintf(stderr, "[size] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread);
|
||||||
while (cur_ap_aread < pkt_size_size) {
|
while (cur_ap_aread < pkt_size_size) {
|
||||||
nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, pkt_size_size - cur_ap_aread);
|
nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, pkt_size_size - cur_ap_aread);
|
||||||
if (nread == 0) return FDS_AGAIN;
|
if (nread == 0) return FDS_AGAIN;
|
||||||
|
@ -65,7 +65,7 @@ enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer
|
||||||
cur_ap_aread += nread;
|
cur_ap_aread += nread;
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(stderr, "[content] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread);
|
//fprintf(stderr, "[content] bp_aread=%d, prev_ap_aread=%ld, cur_ap_aread=%ld\n", bp->aread, ap_aread, cur_ap_aread);
|
||||||
while (cur_ap_aread < ap->fmt.headers.size) {
|
while (cur_ap_aread < ap->fmt.headers.size) {
|
||||||
nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, ap->fmt.headers.size - cur_ap_aread);
|
nread = read(fdinfo->fd, &(ap->raw) + cur_ap_aread, ap->fmt.headers.size - cur_ap_aread);
|
||||||
perror("read ap");
|
perror("read ap");
|
||||||
|
@ -77,7 +77,7 @@ enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer
|
||||||
}
|
}
|
||||||
|
|
||||||
bp->ap_count++;
|
bp->ap_count++;
|
||||||
fprintf(stderr, "bp->ap_count=%d, buffer_count_ap(bp)=%ld\n", bp->ap_count, buffer_count_ap (bp));
|
//fprintf(stderr, "bp->ap_count=%d, buffer_count_ap(bp)=%ld\n", bp->ap_count, buffer_count_ap (bp));
|
||||||
} while (bp->ap_count != buffer_count_ap (bp));
|
} while (bp->ap_count != buffer_count_ap (bp));
|
||||||
|
|
||||||
bp->mode = BP_WRITING;
|
bp->mode = BP_WRITING;
|
||||||
|
|
Loading…
Reference in a new issue