add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
10 changed files with 140 additions and 120 deletions
Showing only changes of commit cd203f5708 - Show all commits

View file

@ -16,8 +16,8 @@ name = "netapp"
[features] [features]
default = [] default = []
basalt = ["lru", "rand"] basalt = ["lru"]
telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"] telemetry = ["opentelemetry", "opentelemetry-contrib"]
[dependencies] [dependencies]
futures = "0.3.17" futures = "0.3.17"
@ -30,7 +30,7 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"]
rmp-serde = "0.14.3" rmp-serde = "0.14.3"
hex = "0.4.2" hex = "0.4.2"
rand = { version = "0.5.5", optional = true } rand = { version = "0.5.5" }
log = "0.4.8" log = "0.4.8"
arc-swap = "1.1" arc-swap = "1.1"

View file

@ -146,7 +146,10 @@ mod test {
assert!(buf.len() == 23); assert!(buf.len() == 23);
assert!(!buf.is_empty()); assert!(!buf.is_empty());
assert_eq!(buf.take_all(), Bytes::from(b"Hello, world!1234567890".to_vec())); assert_eq!(
buf.take_all(),
Bytes::from(b"Hello, world!1234567890".to_vec())
);
assert!(buf.len() == 0); assert!(buf.len() == 0);
assert!(buf.is_empty()); assert!(buf.is_empty());
@ -160,7 +163,10 @@ mod test {
assert_eq!(buf.take_exact(12), None); assert_eq!(buf.take_exact(12), None);
assert!(buf.len() == 11); assert!(buf.len() == 11);
assert_eq!(buf.take_exact(11), Some(Bytes::from(b"llo, world!".to_vec()))); assert_eq!(
buf.take_exact(11),
Some(Bytes::from(b"llo, world!".to_vec()))
);
assert!(buf.len() == 0); assert!(buf.len() == 0);
assert!(buf.is_empty()); assert!(buf.is_empty());
} }

View file

@ -35,7 +35,7 @@ pub(crate) struct ClientConn {
pub(crate) remote_addr: SocketAddr, pub(crate) remote_addr: SocketAddr,
pub(crate) peer_id: NodeID, pub(crate) peer_id: NodeID,
query_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, ByteStream)>>, query_send: ArcSwapOption<mpsc::UnboundedSender<SendStream>>,
next_query_number: AtomicU32, next_query_number: AtomicU32,
inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>, inflight: Mutex<HashMap<RequestID, oneshot::Sender<ByteStream>>>,
@ -165,7 +165,7 @@ impl ClientConn {
// Encode request // Encode request
let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id); let req_enc = req.into_enc(prio, path.as_bytes().to_vec().into(), telemetry_id);
let req_msg_len = req_enc.msg.len(); let req_msg_len = req_enc.msg.len();
let req_stream = req_enc.encode(); let (req_stream, req_order) = req_enc.encode();
// Send request through // Send request through
let (resp_send, resp_recv) = oneshot::channel(); let (resp_send, resp_recv) = oneshot::channel();
@ -175,7 +175,10 @@ impl ClientConn {
"Too many inflight requests! RequestID collision. Interrupting previous request." "Too many inflight requests! RequestID collision. Interrupting previous request."
); );
let _ = old_ch.send(Box::pin(futures::stream::once(async move { let _ = old_ch.send(Box::pin(futures::stream::once(async move {
Err(std::io::Error::new(std::io::ErrorKind::Other, "RequestID collision, too many inflight requests")) Err(std::io::Error::new(
std::io::ErrorKind::Other,
"RequestID collision, too many inflight requests",
))
}))); })));
} }

View file

@ -28,6 +28,9 @@ pub enum Error {
#[error(display = "Framing protocol error")] #[error(display = "Framing protocol error")]
Framing, Framing,
#[error(display = "Remote error ({:?}): {}", _0, _1)]
Remote(io::ErrorKind, String),
#[error(display = "Request ID collision")] #[error(display = "Request ID collision")]
IdCollision, IdCollision,
@ -42,30 +45,6 @@ pub enum Error {
#[error(display = "Version mismatch: {}", _0)] #[error(display = "Version mismatch: {}", _0)]
VersionMismatch(String), VersionMismatch(String),
#[error(display = "Remote error {}: {}", _0, _1)]
Remote(u8, String),
}
impl Error {
pub fn code(&self) -> u8 {
match self {
Self::Io(_) => 100,
Self::TokioJoin(_) => 110,
Self::OneshotRecv(_) => 111,
Self::RMPEncode(_) => 10,
Self::RMPDecode(_) => 11,
Self::UTF8(_) => 12,
Self::Framing => 13,
Self::NoHandler => 20,
Self::ConnectionClosed => 21,
Self::IdCollision => 22,
Self::Handshake(_) => 30,
Self::VersionMismatch(_) => 31,
Self::Remote(c, _) => *c,
Self::Message(_) => 99,
}
}
} }
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error { impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {

View file

@ -13,10 +13,10 @@
//! about message priorization. //! about message priorization.
//! Also check out the examples to learn how to use this crate. //! Also check out the examples to learn how to use this crate.
pub mod bytes_buf;
pub mod error; pub mod error;
pub mod stream; pub mod stream;
pub mod util; pub mod util;
pub mod bytes_buf;
pub mod endpoint; pub mod endpoint;
pub mod message; pub mod message;

View file

@ -3,6 +3,7 @@ use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures::stream::StreamExt; use futures::stream::StreamExt;
@ -40,6 +41,24 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01;
// ---- // ----
#[derive(Clone, Copy)]
pub struct OrderTagStream(u64);
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
pub struct OrderTag(u64, u64);
impl OrderTag {
pub fn stream() -> OrderTagStream {
OrderTagStream(thread_rng().gen())
}
}
impl OrderTagStream {
pub fn order(&self, order: u64) -> OrderTag {
OrderTag(self.0, order)
}
}
// ----
/// This trait should be implemented by all messages your application /// This trait should be implemented by all messages your application
/// wants to handle /// wants to handle
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
@ -56,6 +75,7 @@ pub struct Req<M: Message> {
pub(crate) msg: Arc<M>, pub(crate) msg: Arc<M>,
pub(crate) msg_ser: Option<Bytes>, pub(crate) msg_ser: Option<Bytes>,
pub(crate) stream: AttachedStream, pub(crate) stream: AttachedStream,
pub(crate) order_tag: Option<OrderTag>,
} }
impl<M: Message> Req<M> { impl<M: Message> Req<M> {
@ -77,6 +97,13 @@ impl<M: Message> Req<M> {
} }
} }
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
Self {
order_tag: Some(order_tag),
..self
}
}
pub fn msg(&self) -> &M { pub fn msg(&self) -> &M {
&self.msg &self.msg
} }
@ -97,6 +124,7 @@ impl<M: Message> Req<M> {
telemetry_id, telemetry_id,
msg: self.msg_ser.unwrap(), msg: self.msg_ser.unwrap(),
stream: self.stream.into_stream(), stream: self.stream.into_stream(),
order_tag: self.order_tag,
} }
} }
@ -109,6 +137,7 @@ impl<M: Message> Req<M> {
.stream .stream
.map(AttachedStream::Stream) .map(AttachedStream::Stream)
.unwrap_or(AttachedStream::None), .unwrap_or(AttachedStream::None),
order_tag: enc.order_tag,
}) })
} }
} }
@ -125,6 +154,7 @@ impl<M: Message> IntoReq<M> for M {
msg: Arc::new(self), msg: Arc::new(self),
msg_ser: Some(Bytes::from(msg_ser)), msg_ser: Some(Bytes::from(msg_ser)),
stream: AttachedStream::None, stream: AttachedStream::None,
order_tag: None,
}) })
} }
fn into_req_local(self) -> Req<M> { fn into_req_local(self) -> Req<M> {
@ -132,6 +162,7 @@ impl<M: Message> IntoReq<M> for M {
msg: Arc::new(self), msg: Arc::new(self),
msg_ser: None, msg_ser: None,
stream: AttachedStream::None, stream: AttachedStream::None,
order_tag: None,
} }
} }
} }
@ -158,6 +189,7 @@ impl<M: Message> Clone for Req<M> {
msg: self.msg.clone(), msg: self.msg.clone(),
msg_ser: self.msg_ser.clone(), msg_ser: self.msg_ser.clone(),
stream, stream,
order_tag: self.order_tag,
} }
} }
} }
@ -184,6 +216,7 @@ pub struct Resp<M: Message> {
pub(crate) _phantom: PhantomData<M>, pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: M::Response, pub(crate) msg: M::Response,
pub(crate) stream: AttachedStream, pub(crate) stream: AttachedStream,
pub(crate) order_tag: Option<OrderTag>,
} }
impl<M: Message> Resp<M> { impl<M: Message> Resp<M> {
@ -192,6 +225,7 @@ impl<M: Message> Resp<M> {
_phantom: Default::default(), _phantom: Default::default(),
msg: v, msg: v,
stream: AttachedStream::None, stream: AttachedStream::None,
order_tag: None,
} }
} }
@ -209,6 +243,13 @@ impl<M: Message> Resp<M> {
} }
} }
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
Self {
order_tag: Some(order_tag),
..self
}
}
pub fn msg(&self) -> &M::Response { pub fn msg(&self) -> &M::Response {
&self.msg &self.msg
} }
@ -222,26 +263,24 @@ impl<M: Message> Resp<M> {
} }
pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> { pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> {
Ok(RespEnc::Success { Ok(RespEnc {
msg: rmp_to_vec_all_named(&self.msg)?.into(), msg: rmp_to_vec_all_named(&self.msg)?.into(),
stream: self.stream.into_stream(), stream: self.stream.into_stream(),
order_tag: self.order_tag,
}) })
} }
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> { pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
match enc { let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
RespEnc::Success { msg, stream } => { Ok(Self {
let msg = rmp_serde::decode::from_read_ref(&msg)?; _phantom: Default::default(),
Ok(Self { msg,
_phantom: Default::default(), stream: enc
msg, .stream
stream: stream .map(AttachedStream::Stream)
.map(AttachedStream::Stream) .unwrap_or(AttachedStream::None),
.unwrap_or(AttachedStream::None), order_tag: enc.order_tag,
}) })
}
RespEnc::Error { code, message } => Err(Error::Remote(code, message)),
}
} }
} }
@ -295,10 +334,11 @@ pub(crate) struct ReqEnc {
pub(crate) telemetry_id: Bytes, pub(crate) telemetry_id: Bytes,
pub(crate) msg: Bytes, pub(crate) msg: Bytes,
pub(crate) stream: Option<ByteStream>, pub(crate) stream: Option<ByteStream>,
pub(crate) order_tag: Option<OrderTag>,
} }
impl ReqEnc { impl ReqEnc {
pub(crate) fn encode(self) -> ByteStream { pub(crate) fn encode(self) -> (ByteStream, Option<OrderTag>) {
let mut buf = BytesMut::with_capacity( let mut buf = BytesMut::with_capacity(
self.path.len() + self.telemetry_id.len() + self.msg.len() + 16, self.path.len() + self.telemetry_id.len() + self.msg.len() + 16,
); );
@ -315,15 +355,18 @@ impl ReqEnc {
let header = buf.freeze(); let header = buf.freeze();
if let Some(stream) = self.stream { let res_stream: ByteStream = if let Some(stream) = self.stream {
Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream)) Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream))
} else { } else {
Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)])) Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]))
} };
(res_stream, self.order_tag)
} }
pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> { pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
Self::decode_aux(stream).await.map_err(|_| Error::Framing) Self::decode_aux(stream)
.await
.map_err(read_exact_error_to_error)
} }
async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> { async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
@ -346,6 +389,7 @@ impl ReqEnc {
telemetry_id, telemetry_id,
msg, msg,
stream: Some(reader.into_stream()), stream: Some(reader.into_stream()),
order_tag: None,
}) })
} }
} }
@ -360,74 +404,67 @@ impl ReqEnc {
/// - message length + 1: u8 /// - message length + 1: u8
/// - error code: u8 /// - error code: u8
/// - message: [u8; message_length] /// - message: [u8; message_length]
pub(crate) enum RespEnc { pub(crate) struct RespEnc {
Error { msg: Bytes,
code: u8, stream: Option<ByteStream>,
message: String, order_tag: Option<OrderTag>,
},
Success {
msg: Bytes,
stream: Option<ByteStream>,
},
} }
impl RespEnc { impl RespEnc {
pub(crate) fn from_err(e: Error) -> Self { pub(crate) fn encode(resp: Result<Self, Error>) -> (ByteStream, Option<OrderTag>) {
RespEnc::Error { match resp {
code: e.code(), Ok(Self {
message: format!("{}", e), msg,
} stream,
} order_tag,
}) => {
pub(crate) fn encode(self) -> ByteStream { let mut buf = BytesMut::with_capacity(4);
match self {
RespEnc::Success { msg, stream } => {
let mut buf = BytesMut::with_capacity(msg.len() + 8);
buf.put_u8(0);
buf.put_u32(msg.len() as u32); buf.put_u32(msg.len() as u32);
let header = buf.freeze(); let header = buf.freeze();
if let Some(stream) = stream { let res_stream: ByteStream = if let Some(stream) = stream {
Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream)) Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream))
} else { } else {
Box::pin(futures::stream::iter([Ok(header), Ok(msg)])) Box::pin(futures::stream::iter([Ok(header), Ok(msg)]))
} };
(res_stream, order_tag)
} }
RespEnc::Error { code, message } => { Err(err) => {
let mut buf = BytesMut::with_capacity(message.len() + 8); let err = std::io::Error::new(
buf.put_u8(1 + message.len() as u8); std::io::ErrorKind::Other,
buf.put_u8(code); format!("netapp error: {}", err),
buf.put(message.as_bytes()); );
let header = buf.freeze(); (
Box::pin(futures::stream::once(async move { Ok(header) })) Box::pin(futures::stream::once(async move { Err(err) })),
None,
)
} }
} }
} }
pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> { pub(crate) async fn decode(stream: ByteStream) -> Result<Self, Error> {
Self::decode_aux(stream).await.map_err(|_| Error::Framing) Self::decode_aux(stream)
.await
.map_err(read_exact_error_to_error)
} }
async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> { async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
let mut reader = ByteStreamReader::new(stream); let mut reader = ByteStreamReader::new(stream);
let is_err = reader.read_u8().await?; let msg_len = reader.read_u32().await?;
let msg = reader.read_exact(msg_len as usize).await?;
if is_err > 0 { Ok(Self {
let code = reader.read_u8().await?; msg,
let message = reader.read_exact(is_err as usize - 1).await?; stream: Some(reader.into_stream()),
let message = String::from_utf8(message.to_vec()).unwrap_or_default(); order_tag: None,
Ok(RespEnc::Error { code, message }) })
} else { }
let msg_len = reader.read_u32().await?; }
let msg = reader.read_exact(msg_len as usize).await?;
fn read_exact_error_to_error(e: ReadExactError) -> Error {
Ok(RespEnc::Success { match e {
msg, ReadExactError::Stream(err) => Error::Remote(err.kind(), err.to_string()),
stream: Some(reader.into_stream()), ReadExactError::UnexpectedEos => Error::Framing,
})
}
} }
} }

View file

@ -35,7 +35,10 @@ impl Sender {
impl Drop for Sender { impl Drop for Sender {
fn drop(&mut self) { fn drop(&mut self) {
if let Some(inner) = self.inner.take() { if let Some(inner) = self.inner.take() {
let _ = inner.send(Err(std::io::Error::new(std::io::ErrorKind::BrokenPipe, "Netapp connection dropped before end of stream"))); let _ = inner.send(Err(std::io::Error::new(
std::io::ErrorKind::BrokenPipe,
"Netapp connection dropped before end of stream",
)));
} }
} }
} }
@ -82,7 +85,8 @@ pub(crate) trait RecvLoop: Sync + 'static {
let packet = if is_error { let packet = if is_error {
let kind = u8_to_io_errorkind(next_slice[0]); let kind = u8_to_io_errorkind(next_slice[0]);
let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>"); let msg =
std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>");
debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg); debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg);
Some(Err(std::io::Error::new(kind, msg.to_string()))) Some(Err(std::io::Error::new(kind, msg.to_string())))
} else { } else {

View file

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_trait::async_trait; use async_trait::async_trait;
use bytes::{Bytes, BytesMut, BufMut}; use bytes::{BufMut, Bytes, BytesMut};
use log::*; use log::*;
use futures::AsyncWriteExt; use futures::AsyncWriteExt;
@ -36,6 +36,8 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF; pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF;
pub(crate) type SendStream = (RequestID, RequestPriority, ByteStream);
struct SendQueue { struct SendQueue {
items: Vec<(u8, VecDeque<SendQueueItem>)>, items: Vec<(u8, VecDeque<SendQueueItem>)>,
} }
@ -184,7 +186,7 @@ impl DataFrame {
pub(crate) trait SendLoop: Sync { pub(crate) trait SendLoop: Sync {
async fn send_loop<W>( async fn send_loop<W>(
self: Arc<Self>, self: Arc<Self>,
msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, ByteStream)>, msg_recv: mpsc::UnboundedReceiver<SendStream>,
mut write: BoxStreamWrite<W>, mut write: BoxStreamWrite<W>,
) -> Result<(), Error> ) -> Result<(), Error>
where where

View file

@ -53,7 +53,7 @@ pub(crate) struct ServerConn {
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
resp_send: ArcSwapOption<mpsc::UnboundedSender<(RequestID, RequestPriority, ByteStream)>>, resp_send: ArcSwapOption<mpsc::UnboundedSender<SendStream>>,
} }
impl ServerConn { impl ServerConn {
@ -177,26 +177,16 @@ impl RecvLoop for ServerConn {
tokio::spawn(async move { tokio::spawn(async move {
debug!("server: recv_handler got {}", id); debug!("server: recv_handler got {}", id);
let (prio, resp_enc) = match ReqEnc::decode(stream).await { let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
Ok(req_enc) => { Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
let prio = req_enc.prio; Err(e) => (PRIO_HIGH, Err(e)),
let resp = self2.recv_handler_aux(req_enc).await;
(
prio,
match resp {
Ok(resp_enc) => resp_enc,
Err(e) => RespEnc::from_err(e),
},
)
}
Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)),
}; };
debug!("server: sending response to {}", id); debug!("server: sending response to {}", id);
let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result);
resp_send resp_send
.send((id, prio, resp_enc.encode())) .send((id, prio, resp_stream))
.log_err("ServerConn recv_handler send resp bytes"); .log_err("ServerConn recv_handler send resp bytes");
Ok::<_, Error>(()) Ok::<_, Error>(())
}); });

View file

@ -150,7 +150,6 @@ impl<'a> Future for ByteStreamReadExact<'a> {
// ---- // ----
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream { pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
Box::pin(tokio_util::io::ReaderStream::new(reader)) Box::pin(tokio_util::io::ReaderStream::new(reader))
} }