recv side: use unbounded channel to remove deadlock
continuous-integration/drone/push Build was killed Details
continuous-integration/drone/pr Build was killed Details

This commit is contained in:
Alex 2022-09-01 09:45:24 +02:00
parent 2c9d595da0
commit 3fd30c6e28
Signed by: lx
GPG Key ID: 0E496D15096376BE
1 changed files with 8 additions and 10 deletions

View File

@ -15,16 +15,16 @@ use crate::stream::*;
/// Structure to warn when the sender is dropped before end of stream was reached, like when /// Structure to warn when the sender is dropped before end of stream was reached, like when
/// connection to some remote drops while transmitting data /// connection to some remote drops while transmitting data
struct Sender { struct Sender {
inner: Option<mpsc::Sender<Packet>>, inner: Option<mpsc::UnboundedSender<Packet>>,
} }
impl Sender { impl Sender {
fn new(inner: mpsc::Sender<Packet>) -> Self { fn new(inner: mpsc::UnboundedSender<Packet>) -> Self {
Sender { inner: Some(inner) } Sender { inner: Some(inner) }
} }
async fn send(&self, packet: Packet) { fn send(&self, packet: Packet) {
let _ = self.inner.as_ref().unwrap().send(packet).await; let _ = self.inner.as_ref().unwrap().send(packet);
} }
fn end(&mut self) { fn end(&mut self) {
@ -35,9 +35,7 @@ impl Sender {
impl Drop for Sender { impl Drop for Sender {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(inner) = self.inner.take() { if let Some(inner) = self.inner.take() {
tokio::spawn(async move { let _ = inner.send(Err(255));
let _ = inner.send(Err(255)).await;
});
} }
} }
} }
@ -102,18 +100,18 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut sender = if let Some(send) = streams.remove(&(id)) { let mut sender = if let Some(send) = streams.remove(&(id)) {
send send
} else { } else {
let (send, recv) = mpsc::channel(4); let (send, recv) = mpsc::unbounded_channel();
trace!("recv_loop: id {} is new channel", id); trace!("recv_loop: id {} is new channel", id);
self.recv_handler( self.recv_handler(
id, id,
Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)), Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)),
); );
Sender::new(send) Sender::new(send)
}; };
// If we get an error, the receiving end is disconnected. // If we get an error, the receiving end is disconnected.
// We still need to reach eos before dropping this sender // We still need to reach eos before dropping this sender
let _ = sender.send(packet).await; let _ = sender.send(packet);
if has_cont { if has_cont {
assert!(!is_error); assert!(!is_error);