tor_multipath_voip/src/dcall.c
2020-01-29 12:18:58 +01:00

283 lines
11 KiB
C

#include <gst/gst.h>
#include <glib-2.0/glib.h>
#include <glib-2.0/gmodule.h>
#include <glib-2.0/glib-object.h>
#include <glib-2.0/glib-unix.h>
#include <gst/rtp/gstrtpbuffer.h>
struct dcall_elements {
GstElement *pipeline;
GstElement *rx_tap, *rx_jitterbuffer, *rx_depay, *rx_opusdec, *rx_resample, *rx_echocancel, *rx_sink;
GstElement *tx_tap, *tx_echocancel, *tx_queue, *tx_resample, *tx_opusenc, *tx_pay, *tx_sink;
char* remote_host;
guint bus_watch_id;
guint16 prev_seqnum;
};
int create_rx_chain(struct dcall_elements* de) {
de->rx_tap = gst_element_factory_make("udpsrc", "rx-tap");
de->rx_jitterbuffer = gst_element_factory_make("rtpjitterbuffer", "rx-jitterbuffer");
de->rx_depay = gst_element_factory_make("rtpopusdepay", "rx-depay");
de->rx_opusdec = gst_element_factory_make("opusdec", "rx-opusdec");
de->rx_resample = gst_element_factory_make("audioresample", "rx-audioresample");
de->rx_echocancel = gst_element_factory_make("webrtcechoprobe", "rx-echocancel");
de->rx_sink = gst_element_factory_make("pulsesink", "rx-sink");
if (!de->rx_tap || !de->rx_jitterbuffer || !de->rx_depay || !de->rx_opusdec || !de->rx_resample || !de->rx_echocancel || !de->rx_sink) {
g_printerr ("One element of the rx chain could not be created. Exiting.\n");
return -1;
}
g_object_set(G_OBJECT (de->rx_tap), "port", 5000, NULL);
//g_object_set(G_OBJECT (rx_tap), "address", "127.0.0.1", NULL);
g_object_set(G_OBJECT (de->rx_tap), "caps", gst_caps_new_simple("application/x-rtp", "media", G_TYPE_STRING, "audio", NULL), NULL);
g_object_set(G_OBJECT (de->rx_jitterbuffer), "do-lost", TRUE, NULL);
g_object_set(G_OBJECT (de->rx_jitterbuffer), "do-retransmission", FALSE, NULL);
g_object_set(G_OBJECT (de->rx_jitterbuffer), "latency", 150, NULL);
g_object_set(G_OBJECT (de->rx_jitterbuffer), "drop-on-latency", FALSE, NULL);
//g_object_set(G_OBJECT (de->rx_jitterbuffer), "post-drop-messages", TRUE, NULL);
g_object_set(G_OBJECT (de->rx_opusdec), "plc", TRUE, NULL);
g_object_set(G_OBJECT (de->rx_opusdec), "use-inband-fec", FALSE, NULL);
GstStructure *props;
props = gst_structure_from_string ("props,media.role=phone", NULL);
g_object_set (de->rx_sink, "stream-properties", props, NULL);
gst_structure_free (props);
gst_bin_add_many (GST_BIN (de->pipeline), de->rx_tap, de->rx_jitterbuffer, de->rx_depay, de->rx_opusdec, de->rx_resample, de->rx_echocancel, de->rx_sink, NULL);
gst_element_link_many (de->rx_tap, de->rx_jitterbuffer, de->rx_depay, de->rx_opusdec, de->rx_resample, de->rx_echocancel, de->rx_sink, NULL);
return 0;
}
int create_tx_chain(struct dcall_elements* de) {
de->tx_tap = gst_element_factory_make("pulsesrc", "tx-tap");
de->tx_resample = gst_element_factory_make("audioresample", "tx-resample");
de->tx_echocancel = gst_element_factory_make("webrtcdsp", "tx-echocancel");
de->tx_queue = gst_element_factory_make("queue", "tx-queue");
de->tx_opusenc = gst_element_factory_make("opusenc", "tx-opusenc");
de->tx_pay = gst_element_factory_make("rtpopuspay", "tx-rtpopuspay");
de->tx_sink = gst_element_factory_make("udpsink", "tx-sink");
if (!de->tx_tap || !de->tx_echocancel || !de->tx_queue || !de->tx_resample || !de->tx_opusenc || !de->tx_pay || !de->tx_sink) {
g_printerr("One element of the tx chain could not be created. Exiting.\n");
return -1;
}
gst_util_set_object_arg(G_OBJECT(de->tx_opusenc), "audio-type", "voice");
g_object_set(G_OBJECT(de->tx_opusenc), "inband-fec", FALSE, NULL);
g_object_set(G_OBJECT(de->tx_opusenc), "frame-size", 40, NULL);
g_object_set(G_OBJECT(de->tx_opusenc), "bitrate", 32000, NULL);
g_object_set(G_OBJECT(de->tx_opusenc), "dtx", FALSE, NULL); // gstreamer dtx opus implem. is broken
g_object_set(G_OBJECT(de->tx_sink), "host", de->remote_host, NULL);
g_object_set(G_OBJECT(de->tx_sink), "port", 5000, NULL);
g_object_set(G_OBJECT(de->tx_sink), "async", FALSE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "echo-cancel", TRUE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "extended-filter", TRUE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "gain-control", TRUE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "high-pass-filter", TRUE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "limiter", FALSE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "noise-suppression", TRUE, NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "probe", "rx-echocancel", NULL);
g_object_set(G_OBJECT(de->tx_echocancel), "voice-detection", FALSE, NULL);
GstStructure *props;
props = gst_structure_from_string ("props,media.role=phone", NULL);
g_object_set (de->tx_tap, "stream-properties", props, NULL);
gst_structure_free (props);
gst_bin_add_many(GST_BIN(de->pipeline), de->tx_tap, de->tx_echocancel, de->tx_queue, de->tx_resample, de->tx_opusenc, de->tx_pay, de->tx_sink, NULL);
gst_element_link_many( de->tx_tap, de->tx_resample, de->tx_echocancel, de->tx_queue, de->tx_opusenc, de->tx_pay, de->tx_sink, NULL);
return 0;
}
static GstPadProbeReturn jitter_buffer_sink_event(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
struct dcall_elements *de = user_data;
GstEvent *event = NULL;
g_print("Entering rtpjitterbuffer sink pad handler for events...\n");
event = gst_pad_probe_info_get_event (info);
if (event == NULL) return GST_PAD_PROBE_OK;
g_print("We successfully extracted an event from the pad... \n");
const GstStructure *struc = NULL;
struc = gst_event_get_structure(event);
if (struc == NULL) return GST_PAD_PROBE_OK;
g_print("We successfully extracted a structure from the event... \n");
const gchar* struc_name = NULL;
struc_name = gst_structure_get_name(struc);
if (struc_name == NULL) return GST_PAD_PROBE_OK;
g_print("We extracted the structure \"%s\"...\n", struc_name);
if (strcmp(struc_name, "GstRTPPacketLost") != 0) return GST_PAD_PROBE_OK;
g_print("And that's the structure we want !\n");
guint seqnum = 0, retry = 0;
guint64 timestamp = 0, duration = 0;
gst_structure_get_uint(struc, "seqnum", &seqnum);
gst_structure_get_uint(struc, "retry", &retry);
gst_structure_get_uint64(struc, "timestamp", &timestamp);
gst_structure_get_uint64(struc, "duration", &duration);
g_print("GstRTPPacketLost{seqnum=%d, retry=%d, duration=%ld, timestamp=%ld}\n", seqnum, retry, duration, timestamp);
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn jitter_buffer_sink_event_up(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
g_print("upstream event: begin\n");
jitter_buffer_sink_event (pad, info, user_data);
g_print("upstream event: end\n");
return GST_PAD_PROBE_OK;
}
static GstPadProbeReturn jitter_buffer_sink_event_down(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
g_print("downstream event: begin\n");
jitter_buffer_sink_event (pad, info, user_data);
g_print("downstream event: end\n");
return GST_PAD_PROBE_OK;
}
static gboolean foreach_buffer (GstBuffer *inbuf, GstMeta **meta, gpointer user_data) {
//@FIXME Dead code, please remove me
return TRUE;
}
static GstPadProbeReturn extract_buffer(GstPad *pad, GstPadProbeInfo *info, gpointer user_data) {
//g_print("Entering rtpjitterbuffer sink pad handler for buffers...\n");
struct dcall_elements *de = user_data;
guint16 seqnum;
GstBuffer *out = NULL, *rtp_extracted = NULL;
//GstRTPBuffer rtp_buffer = {0};
GstMapInfo map;
out = gst_pad_probe_info_get_buffer (info);
if (out == NULL) {
g_print("Empty buffer \n");
return GST_PAD_PROBE_OK;
}
if (GST_BUFFER_FLAG_IS_SET (out, GST_BUFFER_FLAG_DISCONT)) {
g_print("Discontinuous buffer\n");
}
return GST_PAD_PROBE_OK;
//gst_buffer_map (out, &map, GST_MAP_READ);
//if (gst_buffer_get_size (out) != 172) g_print("buffer size is %ld\n", gst_buffer_get_size (out));
/*
if (G_UNLIKELY (!gst_rtp_buffer_map (out, GST_MAP_READ, &rtp_buffer))) return GST_PAD_PROBE_OK;
seqnum = gst_rtp_buffer_get_seq (&rtp_buffer);
if (de->prev_seqnum == 0) de->prev_seqnum = seqnum;
else if (de->prev_seqnum + 1 == seqnum) de->prev_seqnum = seqnum;
else {
g_print("Current seqnum = %d, previous seqnum = %d, gap\n", seqnum, de->prev_seqnum);
de->prev_seqnum = seqnum;
}
g_print("Receiving packet %d\n", seqnum);
rtp_extracted = gst_rtp_buffer_get_payload_buffer (&rtp_buffer);
gst_buffer_foreach_meta (rtp_extracted, foreach_buffer, NULL);
gst_rtp_buffer_unmap (&rtp_buffer);*/
}
void register_pad(struct dcall_elements *de) {
GstPad *pad_src, *pad_opusdec;
pad_src = gst_element_get_static_pad (de->rx_jitterbuffer, "src");
pad_opusdec = gst_element_get_static_pad (de->rx_opusdec, "src");
//gst_pad_add_probe (pad_src, GST_PAD_PROBE_TYPE_EVENT_DOWNSTREAM, jitter_buffer_sink_event_down, &de, NULL);
//gst_pad_add_probe (pad_src, GST_PAD_PROBE_TYPE_EVENT_UPSTREAM, jitter_buffer_sink_event_up, &de, NULL);
gst_pad_add_probe (pad_opusdec, GST_PAD_PROBE_TYPE_BUFFER, extract_buffer, &de, NULL);
}
static gboolean pipeline_bus_handler (GstBus *bus, GstMessage *message, gpointer data) {
g_print ("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));
const GstStructure *struc = NULL;
struc = gst_message_get_structure(message);
g_print("structure is: %s \n", gst_structure_to_string (struc));
return TRUE;
}
void register_bus(struct dcall_elements *de) {
GstBus *bus;
bus = gst_pipeline_get_bus (GST_PIPELINE (de->pipeline));
de->bus_watch_id = gst_bus_add_watch (bus, pipeline_bus_handler, de);
gst_object_unref(bus);
}
gboolean stop_handler(gpointer user_data) {
GMainLoop *loop = user_data;
g_main_loop_quit(loop);
return TRUE;
}
int main(int argc, char *argv[]) {
GMainLoop *loop;
struct dcall_elements de = {
.prev_seqnum = 0
};
/* Check input arguments */
if (argc != 2) {
g_printerr ("Usage: %s <Remote host>\n", argv[0]);
return -1;
}
de.remote_host = argv[1];
gst_init (&argc, &argv);
loop = g_main_loop_new (NULL, FALSE);
de.pipeline = gst_pipeline_new ("pipeline");
if (!de.pipeline) {
g_printerr ("Pipeline could not be created. Exiting.\n");
return -1;
}
if (create_rx_chain (&de) != 0) return -1;
if (create_tx_chain (&de) != 0) return -1;
register_pad(&de);
//register_bus(&de);
gst_element_set_state (de.pipeline, GST_STATE_PLAYING);
g_unix_signal_add (SIGTERM, stop_handler, loop);
g_unix_signal_add (SIGINT, stop_handler, loop);
g_print ("Running...\n");
g_main_loop_run (loop);
g_print ("Main loop stopped...\n");
GstStructure *stats;
guint64 num_pushed, num_lost, num_late, num_duplicates;
g_object_get(de.rx_jitterbuffer, "stats", &stats, NULL);
gst_structure_get_uint64(stats, "num-pushed", &num_pushed);
gst_structure_get_uint64(stats, "num-lost", &num_lost);
gst_structure_get_uint64(stats, "num-late", &num_late);
gst_structure_get_uint64(stats, "num-duplicates", &num_duplicates);
g_print("pkt_delivered=%ld, pkt_lost=%ld, pkt_late=%ld, pkt_duplicates=%ld\n", num_pushed, num_lost, num_late, num_duplicates);
gst_element_set_state (de.pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (de.pipeline));
// g_source_remove (de.bus_watch_id);
g_main_loop_unref (loop);
return 0;
}