From 3fd30c6e280fba41377c8b563352d756e8bc1caf Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 1 Sep 2022 09:45:24 +0200 Subject: [PATCH] recv side: use unbounded channel to remove deadlock --- src/recv.rs | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/src/recv.rs b/src/recv.rs index 4d1047b..3bea709 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -15,16 +15,16 @@ use crate::stream::*; /// Structure to warn when the sender is dropped before end of stream was reached, like when /// connection to some remote drops while transmitting data struct Sender { - inner: Option>, + inner: Option>, } impl Sender { - fn new(inner: mpsc::Sender) -> Self { + fn new(inner: mpsc::UnboundedSender) -> Self { Sender { inner: Some(inner) } } - async fn send(&self, packet: Packet) { - let _ = self.inner.as_ref().unwrap().send(packet).await; + fn send(&self, packet: Packet) { + let _ = self.inner.as_ref().unwrap().send(packet); } fn end(&mut self) { @@ -35,9 +35,7 @@ impl Sender { impl Drop for Sender { fn drop(&mut self) { if let Some(inner) = self.inner.take() { - tokio::spawn(async move { - let _ = inner.send(Err(255)).await; - }); + let _ = inner.send(Err(255)); } } } @@ -102,18 +100,18 @@ pub(crate) trait RecvLoop: Sync + 'static { let mut sender = if let Some(send) = streams.remove(&(id)) { send } else { - let (send, recv) = mpsc::channel(4); + let (send, recv) = mpsc::unbounded_channel(); trace!("recv_loop: id {} is new channel", id); self.recv_handler( id, - Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)), + Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)), ); Sender::new(send) }; // If we get an error, the receiving end is disconnected. // We still need to reach eos before dropping this sender - let _ = sender.send(packet).await; + let _ = sender.send(packet); if has_cont { assert!(!is_error);