tor_multipath_voip/src/packet.c

202 lines
6.4 KiB
C

#include "packet.h"
union abstract_packet* ap_next(union abstract_packet* ap) {
if (ap->fmt.headers.flags & FLAG_READ_NEXT)
return (union abstract_packet*)(&ap->raw + ap->fmt.headers.size);
return NULL;
}
union abstract_packet* buffer_last_ptr(struct buffer_packet* bp) {
union abstract_packet* ap = (union abstract_packet*) &bp->ip, *apn = NULL;
while ((apn = ap_next(ap)) != NULL) ap = apn;
return ap;
}
union abstract_packet* buffer_free_ptr(struct buffer_packet* bp) {
union abstract_packet* ap = buffer_last_ptr (bp);
ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size);
return ap;
}
size_t get_full_size(struct buffer_packet* bp) {
return &(buffer_free_ptr (bp))->raw - &bp->ip[0];
}
void buffer_append_ap(struct buffer_packet* bp, union abstract_packet* ap) {
buffer_last_ptr(bp)->fmt.headers.flags |= FLAG_READ_NEXT;
memcpy(buffer_last_ptr(bp), ap, ap->fmt.headers.size);
}
enum FD_STATE read_packet_from_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
ssize_t nread;
union abstract_packet* ap = (union abstract_packet*) &bp->ip;
size_t pkt_size_size = sizeof(ap->fmt.headers.size);
if (bp->mode != BP_READING) return FDS_ERR;
while (bp->aread < pkt_size_size) {
nread = read(fdinfo->fd, &(ap->raw) + bp->aread, pkt_size_size - bp->aread);
if (nread == 0) return FDS_AGAIN;
if (nread == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nread == -1) return FDS_ERR;
bp->aread += nread;
}
while (bp->aread < ap->fmt.headers.size) {
nread = read(fdinfo->fd, &(ap->raw) + bp->aread, ap->fmt.headers.size - bp->aread);
if (nread == 0) return FDS_AGAIN;
if (nread == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nread == -1) return FDS_ERR;
bp->aread += nread;
}
bp->ap_count++;
if (bp->ap_count != get_full_size (bp)) return FDS_AGAIN;
bp->mode = BP_WRITING;
bp->awrite = 0;
return FDS_READY;
}
enum FD_STATE write_packet_to_tcp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp) {
ssize_t nwrite;
union abstract_packet* ap = (union abstract_packet*) &bp->ip;
//dump_buffer_packet (bp);
if (bp->mode != BP_WRITING) return FDS_ERR;
while (bp->awrite < get_full_size(bp)) {
nwrite = send(fdinfo->fd, &(ap->raw) + bp->awrite, get_full_size(bp) - bp->awrite, 0);
if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nwrite == -1) return FDS_ERR;
bp->awrite += nwrite;
}
bp->mode = BP_READING;
bp->aread = 0;
bp->ap_count = 0;
return FDS_READY;
}
enum FD_STATE write_packet_to_udp(struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) {
ssize_t nwrite;
union abstract_packet* ap = (union abstract_packet*) (&bp->ip + bp->awrite);
union abstract_packet* end = buffer_free_ptr (bp);
if (bp->mode != BP_WRITING) return FDS_ERR;
while (ap != end) {
if (ap->fmt.headers.cmd != CMD_UDP_ENCAPSULATED) continue;
size_t bytes_to_send;
size_t pkt_header_size = sizeof(ap->fmt.headers);
struct sockaddr* addr = NULL;
socklen_t addrlen = 0;
if (udp_t->set) {
addr = (struct sockaddr*) &udp_t->addr;
addrlen = sizeof(struct sockaddr_in);
}
bytes_to_send = ap->fmt.headers.size - pkt_header_size;
nwrite = sendto(fdinfo->fd,
&(ap->fmt.content.udp_encapsulated),
bytes_to_send,
0,
addr,
addrlen);
if (nwrite == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nwrite != bytes_to_send) return FDS_ERR;
bp->awrite += nwrite;
}
bp->mode = BP_READING;
bp->aread = 0;
bp->ap_count = 0;
return FDS_READY;
}
enum FD_STATE read_packet_from_udp (struct evt_core_fdinfo* fdinfo, struct buffer_packet* bp, struct udp_target* udp_t) {
ssize_t nread;
union abstract_packet* ap = (union abstract_packet*) &bp->ip;
if (bp->mode != BP_READING) {
fprintf(stderr, "Buffer packet is not in reading mode (mode: %d)\n", bp->mode);
return FDS_ERR;
}
size_t pkt_header_size = sizeof(ap->fmt.headers);
size_t udp_packet_size = sizeof(bp->ip) - pkt_header_size;
socklen_t addrlen = sizeof(struct sockaddr_in);
nread = recvfrom(fdinfo->fd,
&(ap->fmt.content.udp_encapsulated.payload),
udp_packet_size,
MSG_TRUNC,
(struct sockaddr*)&udp_t->addr,
&addrlen);
if ((int)nread > (int)udp_packet_size) {
fprintf(stderr, "Packet has been truncated (%ld instead of %d)\n", nread, (int)udp_packet_size);
return FDS_ERR;
}
if (nread == -1 && errno == EAGAIN) return FDS_AGAIN;
if (nread == 0) return FDS_AGAIN;
if (nread == -1) {
fprintf(stderr, "A system error occurred\n");
return FDS_ERR;
}
udp_t->set = 1;
udp_t->addrlen = addrlen;
ap->fmt.headers.size = nread + pkt_header_size;
ap->fmt.headers.cmd = CMD_UDP_ENCAPSULATED;
ap->fmt.content.udp_encapsulated.port = url_get_port_int (fdinfo->url);
bp->mode = BP_WRITING;
bp->awrite = 0;
bp->ap_count = 1;
return FDS_READY;
}
void dump_buffer_packet(struct buffer_packet* bp) {
printf("<Buffer Packet>\n");
printf(" mode=%d, aread=%d, awrite=%d, ap_count=%d, usage=%ld/%ld\n", bp->mode, bp->aread, bp->awrite, bp->ap_count, get_full_size (bp), sizeof(bp->ip));
union abstract_packet* ap = (union abstract_packet*) &bp->ip;
for (int i = 0; i < bp->ap_count; i++) {
dump_abstract_packet(ap);
ap = (union abstract_packet*)(&ap->raw + ap->fmt.headers.size);
}
printf("</Buffer Packet>\n");
}
void dump_abstract_packet(union abstract_packet* ap) {
printf(" <Abstract Packet>\n");
printf(" size=%d, cmd=%d\n", ap->fmt.headers.size, ap->fmt.headers.cmd);
switch (ap->fmt.headers.cmd) {
case CMD_LINK_MONITORING_THUNDER:
printf(" <Health>id=%d, deltat=%d, prevlink=%d, min_blocked_pkt=%d, bitfield=%02x</Health>\n",
ap->fmt.content.link_monitoring_thunder.id,
ap->fmt.content.link_monitoring_thunder.deltat,
ap->fmt.content.link_monitoring_thunder.prevlink,
ap->fmt.content.link_monitoring_thunder.min_blocked_pkt,
ap->fmt.content.link_monitoring_thunder.bitfield);
break;
case CMD_UDP_METADATA_THUNDER:
printf(" <Clear>id=%d</Clear>\n",
ap->fmt.content.udp_metadata_thunder.id);
break;
case CMD_UDP_ENCAPSULATED:
printf(" <Payload>port=%d</Payload>\n", ap->fmt.content.udp_encapsulated.port);
break;
default:
printf(" <Unknown/>\n");
break;
}
printf(" </Abstract Packet>\n");
}