From b55f61c38b01da01314d99ced543aba713dbd2a9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 26 Jul 2022 12:11:48 +0200 Subject: [PATCH] Fix things going wrong when sending chan is closed --- examples/fullmesh.rs | 6 +++--- src/recv.rs | 7 ++++++- src/send.rs | 36 +++++++++++++++++++++++++----------- 3 files changed, 34 insertions(+), 15 deletions(-) diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 82e45c3..972bec0 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -125,7 +125,7 @@ impl Example { async fn exchange_loop(self: Arc, must_exit: watch::Receiver) { let mut i = 12000; while !*must_exit.borrow() { - tokio::time::sleep(Duration::from_secs(7)).await; + tokio::time::sleep(Duration::from_secs(2)).await; let peers = self.fullmesh.get_peer_list(); for p in peers.iter() { @@ -144,7 +144,7 @@ impl Example { ); let stream = Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move { - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(500)).await; Ok(Bytes::from(vec![(x % 256) as u8; 133 * x])) })); match self2 @@ -196,7 +196,7 @@ impl StreamingEndpointHandler for Example { "Handler: stream got bytes {:?}", x.as_ref().map(|b| b.len()) ); - tokio::time::sleep(Duration::from_millis(100)).await; + tokio::time::sleep(Duration::from_millis(300)).await; Ok(Bytes::from(vec![ 10u8; x.map(|b| b.len()).unwrap_or(1422) * 2 diff --git a/src/recv.rs b/src/recv.rs index cba42cb..4d1047b 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -3,7 +3,7 @@ use std::sync::Arc; use async_trait::async_trait; use bytes::Bytes; -use log::trace; +use log::*; use futures::AsyncReadExt; use tokio::sync::mpsc; @@ -59,6 +59,11 @@ pub(crate) trait RecvLoop: Sync + 'static { { let mut streams: HashMap = HashMap::new(); loop { + debug!( + "Receiving: {:?}", + streams.iter().map(|(id, _)| id).collect::>() + ); + let mut header_id = [0u8; RequestID::BITS as usize / 8]; match read.read_exact(&mut header_id[..]).await { Ok(_) => (), diff --git a/src/send.rs b/src/send.rs index 256fe4c..fd415c6 100644 --- a/src/send.rs +++ b/src/send.rs @@ -5,7 +5,7 @@ use std::task::{Context, Poll}; use async_trait::async_trait; use bytes::Bytes; -use log::trace; +use log::*; use futures::AsyncWriteExt; use kuska_handshake::async_std::BoxStreamWrite; @@ -172,24 +172,38 @@ impl DataFrame { pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, - mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, + msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, mut write: BoxStreamWrite, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, { let mut sending = SendQueue::new(); - let mut should_exit = false; - while !should_exit || !sending.is_empty() { - let recv_fut = msg_recv.recv(); - futures::pin_mut!(recv_fut); + let mut msg_recv = Some(msg_recv); + while msg_recv.is_some() || !sending.is_empty() { + debug!( + "Sending: {:?}", + sending + .items + .iter() + .map(|(_, i)| i.iter().map(|x| x.id)) + .flatten() + .collect::>() + ); + + let recv_fut = async { + if let Some(chan) = &mut msg_recv { + chan.recv().await + } else { + futures::future::pending().await + } + }; let send_fut = sending.next_ready(); // recv_fut is cancellation-safe according to tokio doc, // send_fut is cancellation-safe as implemented above? - use futures::future::Either; - match futures::future::select(recv_fut, send_fut).await { - Either::Left((sth, _send_fut)) => { + tokio::select! { + sth = recv_fut => { if let Some((id, prio, data)) = sth { trace!("send_loop: add stream {} to send", id); sending.push(SendQueueItem { @@ -198,10 +212,10 @@ pub(crate) trait SendLoop: Sync { data: ByteStreamReader::new(data), }); } else { - should_exit = true; + msg_recv = None; }; } - Either::Right(((id, data), _recv_fut)) => { + (id, data) = send_fut => { trace!( "send_loop: id {}, send {} bytes, header_size {}", id,