forked from lx/netapp
Fix things going wrong when sending chan is closed
This commit is contained in:
parent
bdf7d4731d
commit
b55f61c38b
3 changed files with 34 additions and 15 deletions
|
@ -125,7 +125,7 @@ impl Example {
|
||||||
async fn exchange_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
async fn exchange_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
let mut i = 12000;
|
let mut i = 12000;
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
tokio::time::sleep(Duration::from_secs(7)).await;
|
tokio::time::sleep(Duration::from_secs(2)).await;
|
||||||
|
|
||||||
let peers = self.fullmesh.get_peer_list();
|
let peers = self.fullmesh.get_peer_list();
|
||||||
for p in peers.iter() {
|
for p in peers.iter() {
|
||||||
|
@ -144,7 +144,7 @@ impl Example {
|
||||||
);
|
);
|
||||||
let stream =
|
let stream =
|
||||||
Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move {
|
Box::pin(stream::iter([100, 200, 300, 400]).then(|x| async move {
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||||
Ok(Bytes::from(vec![(x % 256) as u8; 133 * x]))
|
Ok(Bytes::from(vec![(x % 256) as u8; 133 * x]))
|
||||||
}));
|
}));
|
||||||
match self2
|
match self2
|
||||||
|
@ -196,7 +196,7 @@ impl StreamingEndpointHandler<ExampleMessage> for Example {
|
||||||
"Handler: stream got bytes {:?}",
|
"Handler: stream got bytes {:?}",
|
||||||
x.as_ref().map(|b| b.len())
|
x.as_ref().map(|b| b.len())
|
||||||
);
|
);
|
||||||
tokio::time::sleep(Duration::from_millis(100)).await;
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||||
Ok(Bytes::from(vec![
|
Ok(Bytes::from(vec![
|
||||||
10u8;
|
10u8;
|
||||||
x.map(|b| b.len()).unwrap_or(1422) * 2
|
x.map(|b| b.len()).unwrap_or(1422) * 2
|
||||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::trace;
|
use log::*;
|
||||||
|
|
||||||
use futures::AsyncReadExt;
|
use futures::AsyncReadExt;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
@ -59,6 +59,11 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
{
|
{
|
||||||
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
|
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
|
debug!(
|
||||||
|
"Receiving: {:?}",
|
||||||
|
streams.iter().map(|(id, _)| id).collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
|
||||||
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
||||||
match read.read_exact(&mut header_id[..]).await {
|
match read.read_exact(&mut header_id[..]).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
|
36
src/send.rs
36
src/send.rs
|
@ -5,7 +5,7 @@ use std::task::{Context, Poll};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::trace;
|
use log::*;
|
||||||
|
|
||||||
use futures::AsyncWriteExt;
|
use futures::AsyncWriteExt;
|
||||||
use kuska_handshake::async_std::BoxStreamWrite;
|
use kuska_handshake::async_std::BoxStreamWrite;
|
||||||
|
@ -172,24 +172,38 @@ impl DataFrame {
|
||||||
pub(crate) trait SendLoop: Sync {
|
pub(crate) trait SendLoop: Sync {
|
||||||
async fn send_loop<W>(
|
async fn send_loop<W>(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>,
|
msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>,
|
||||||
mut write: BoxStreamWrite<W>,
|
mut write: BoxStreamWrite<W>,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
W: AsyncWriteExt + Unpin + Send + Sync,
|
W: AsyncWriteExt + Unpin + Send + Sync,
|
||||||
{
|
{
|
||||||
let mut sending = SendQueue::new();
|
let mut sending = SendQueue::new();
|
||||||
let mut should_exit = false;
|
let mut msg_recv = Some(msg_recv);
|
||||||
while !should_exit || !sending.is_empty() {
|
while msg_recv.is_some() || !sending.is_empty() {
|
||||||
let recv_fut = msg_recv.recv();
|
debug!(
|
||||||
futures::pin_mut!(recv_fut);
|
"Sending: {:?}",
|
||||||
|
sending
|
||||||
|
.items
|
||||||
|
.iter()
|
||||||
|
.map(|(_, i)| i.iter().map(|x| x.id))
|
||||||
|
.flatten()
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
);
|
||||||
|
|
||||||
|
let recv_fut = async {
|
||||||
|
if let Some(chan) = &mut msg_recv {
|
||||||
|
chan.recv().await
|
||||||
|
} else {
|
||||||
|
futures::future::pending().await
|
||||||
|
}
|
||||||
|
};
|
||||||
let send_fut = sending.next_ready();
|
let send_fut = sending.next_ready();
|
||||||
|
|
||||||
// recv_fut is cancellation-safe according to tokio doc,
|
// recv_fut is cancellation-safe according to tokio doc,
|
||||||
// send_fut is cancellation-safe as implemented above?
|
// send_fut is cancellation-safe as implemented above?
|
||||||
use futures::future::Either;
|
tokio::select! {
|
||||||
match futures::future::select(recv_fut, send_fut).await {
|
sth = recv_fut => {
|
||||||
Either::Left((sth, _send_fut)) => {
|
|
||||||
if let Some((id, prio, data)) = sth {
|
if let Some((id, prio, data)) = sth {
|
||||||
trace!("send_loop: add stream {} to send", id);
|
trace!("send_loop: add stream {} to send", id);
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
|
@ -198,10 +212,10 @@ pub(crate) trait SendLoop: Sync {
|
||||||
data: ByteStreamReader::new(data),
|
data: ByteStreamReader::new(data),
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
should_exit = true;
|
msg_recv = None;
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
Either::Right(((id, data), _recv_fut)) => {
|
(id, data) = send_fut => {
|
||||||
trace!(
|
trace!(
|
||||||
"send_loop: id {}, send {} bytes, header_size {}",
|
"send_loop: id {}, send {} bytes, header_size {}",
|
||||||
id,
|
id,
|
||||||
|
|
Loading…
Reference in a new issue