netapp/src/recv.rs

127 lines
3.4 KiB
Rust
Raw Normal View History

2022-07-21 15:34:53 +00:00
use std::collections::HashMap;
use std::sync::Arc;
2022-07-21 15:59:15 +00:00
use async_trait::async_trait;
2022-07-21 16:15:07 +00:00
use bytes::Bytes;
use log::*;
2022-07-21 15:34:53 +00:00
use futures::AsyncReadExt;
use tokio::sync::mpsc;
2022-07-21 15:34:53 +00:00
use crate::error::*;
use crate::send::*;
2022-07-22 10:45:38 +00:00
use crate::stream::*;
2022-07-21 15:34:53 +00:00
/// Structure to warn when the sender is dropped before end of stream was reached, like when
/// connection to some remote drops while transmitting data
struct Sender {
inner: Option<mpsc::UnboundedSender<Packet>>,
2022-07-21 15:34:53 +00:00
}
impl Sender {
fn new(inner: mpsc::UnboundedSender<Packet>) -> Self {
Sender { inner: Some(inner) }
2022-07-21 15:34:53 +00:00
}
fn send(&self, packet: Packet) {
let _ = self.inner.as_ref().unwrap().send(packet);
2022-07-21 15:34:53 +00:00
}
fn end(&mut self) {
self.inner = None;
2022-07-21 15:34:53 +00:00
}
}
impl Drop for Sender {
fn drop(&mut self) {
if let Some(inner) = self.inner.take() {
let _ = inner.send(Err(255));
2022-07-21 15:34:53 +00:00
}
}
}
/// The RecvLoop trait, which is implemented both by the client and the server
/// connection objects (ServerConn and ClientConn) adds a method `.recv_loop()`
/// and a prototype of a handler for received messages `.recv_handler()` that
/// must be filled by implementors. `.recv_loop()` receives messages in a loop
/// according to the protocol defined above: chunks of message in progress of being
/// received are stored in a buffer, and when the last chunk of a message is received,
/// the full message is passed to the receive handler.
#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
2022-07-21 15:34:53 +00:00
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
where
R: AsyncReadExt + Unpin + Send + Sync,
{
let mut streams: HashMap<RequestID, Sender> = HashMap::new();
loop {
debug!(
"Receiving: {:?}",
streams.iter().map(|(id, _)| id).collect::<Vec<_>>()
);
2022-07-21 15:34:53 +00:00
let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await {
Ok(_) => (),
Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => break,
Err(e) => return Err(e.into()),
};
let id = RequestID::from_be_bytes(header_id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size);
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let is_error = (size & ERROR_MARKER) != 0;
let packet = if is_error {
2022-07-25 13:04:52 +00:00
trace!(
"recv_loop: got id {}, header_size {:04x}, error {}",
id,
size,
size & !ERROR_MARKER
);
2022-07-22 11:06:10 +00:00
Err((size & !ERROR_MARKER) as u8)
2022-07-21 15:34:53 +00:00
} else {
let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;
2022-07-25 13:04:52 +00:00
trace!(
"recv_loop: got id {}, header_size {:04x}, {} bytes",
id,
size,
next_slice.len()
);
2022-07-21 16:15:07 +00:00
Ok(Bytes::from(next_slice))
2022-07-21 15:34:53 +00:00
};
let mut sender = if let Some(send) = streams.remove(&(id)) {
send
} else {
let (send, recv) = mpsc::unbounded_channel();
2022-07-25 13:04:52 +00:00
trace!("recv_loop: id {} is new channel", id);
self.recv_handler(
id,
Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)),
);
2022-07-21 15:34:53 +00:00
Sender::new(send)
};
// If we get an error, the receiving end is disconnected.
// We still need to reach eos before dropping this sender
let _ = sender.send(packet);
2022-07-21 15:34:53 +00:00
if has_cont {
2022-07-25 13:04:52 +00:00
assert!(!is_error);
2022-07-21 15:34:53 +00:00
streams.insert(id, sender);
} else {
2022-07-25 13:04:52 +00:00
trace!("recv_loop: close channel id {}", id);
2022-07-21 15:34:53 +00:00
sender.end();
}
}
Ok(())
}
}