add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
2 changed files with 42 additions and 45 deletions
Showing only changes of commit aa1b29d41a - Show all commits

View file

@ -110,8 +110,8 @@ where
/// Call this endpoint on a remote node (or on the local node,
/// for that matter). This function invokes the full version that
/// allows to attach a streaming body to the request and to
/// receive such a body attached to the response.
/// allows to attach a stream to the request and to
/// receive such a stream attached to the response.
pub async fn call_streaming<T>(
&self,
target: &NodeID,

View file

@ -49,30 +49,27 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
// ----
/// The Req<M> is a helper object used to create requests and attach them
/// a streaming body. If the body is a fixed Bytes and not a ByteStream,
/// a stream of data. If the stream is a fixed Bytes and not a ByteStream,
/// Req<M> is cheaply clonable to allow the request to be sent to different
/// peers (Clone will panic if the body is a ByteStream).
///
/// Internally, this is also used to encode and decode requests
/// from/to byte streams to be sent over the network.
/// peers (Clone will panic if the stream is a ByteStream).
pub struct Req<M: Message> {
pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: Arc<M>,
pub(crate) msg_ser: Option<Bytes>,
pub(crate) body: BodyData,
pub(crate) stream: AttachedStream,
}
impl<M: Message> Req<M> {
pub fn with_fixed_body(self, b: Bytes) -> Self {
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
body: BodyData::Fixed(b),
stream: AttachedStream::Fixed(b),
..self
}
}
pub fn with_streaming_body(self, b: ByteStream) -> Self {
pub fn with_stream(self, b: ByteStream) -> Self {
Self {
body: BodyData::Stream(b),
stream: AttachedStream::Stream(b),
..self
}
}
@ -82,7 +79,7 @@ impl<M: Message> Req<M> {
}
pub fn take_stream(&mut self) -> Option<ByteStream> {
std::mem::replace(&mut self.body, BodyData::None).into_stream()
std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
}
pub(crate) fn into_enc(
@ -96,7 +93,7 @@ impl<M: Message> Req<M> {
path,
telemetry_id,
msg: self.msg_ser.unwrap(),
stream: self.body.into_stream(),
stream: self.stream.into_stream(),
}
}
@ -106,7 +103,7 @@ impl<M: Message> Req<M> {
_phantom: Default::default(),
msg: Arc::new(msg),
msg_ser: Some(enc.msg),
body: enc.stream.map(BodyData::Stream).unwrap_or(BodyData::None),
stream: enc.stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None),
})
}
}
@ -123,7 +120,7 @@ impl<M: Message> IntoReq<M> for M {
_phantom: Default::default(),
msg: Arc::new(self),
msg_ser: Some(Bytes::from(msg_ser)),
body: BodyData::None,
stream: AttachedStream::None,
})
}
fn into_req_local(self) -> Req<M> {
@ -131,7 +128,7 @@ impl<M: Message> IntoReq<M> for M {
_phantom: Default::default(),
msg: Arc::new(self),
msg_ser: None,
body: BodyData::None,
stream: AttachedStream::None,
}
}
}
@ -147,16 +144,16 @@ impl<M: Message> IntoReq<M> for Req<M> {
impl<M: Message> Clone for Req<M> {
fn clone(&self) -> Self {
let body = match &self.body {
BodyData::None => BodyData::None,
BodyData::Fixed(b) => BodyData::Fixed(b.clone()),
BodyData::Stream(_) => panic!("Cannot clone a Req<_> with a stream body"),
let stream = match &self.stream {
AttachedStream::None => AttachedStream::None,
AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
AttachedStream::Stream(_) => panic!("Cannot clone a Req<_> with a non-buffer attached stream"),
};
Self {
_phantom: Default::default(),
msg: self.msg.clone(),
msg_ser: self.msg_ser.clone(),
body,
stream,
}
}
}
@ -167,10 +164,10 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Req[{:?}", self.msg)?;
match &self.body {
BodyData::None => write!(f, "]"),
BodyData::Fixed(b) => write!(f, "; body={}]", b.len()),
BodyData::Stream(_) => write!(f, "; body=stream]"),
match &self.stream {
AttachedStream::None => write!(f, "]"),
AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
AttachedStream::Stream(_) => write!(f, "; stream]"),
}
}
}
@ -178,11 +175,11 @@ where
// ----
/// The Resp<M> represents a full response from a RPC that may have
/// an attached body stream.
/// an attached stream.
pub struct Resp<M: Message> {
pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: M::Response,
pub(crate) body: BodyData,
pub(crate) stream: AttachedStream,
}
impl<M: Message> Resp<M> {
@ -190,20 +187,20 @@ impl<M: Message> Resp<M> {
Resp {
_phantom: Default::default(),
msg: v,
body: BodyData::None,
stream: AttachedStream::None,
}
}
pub fn with_fixed_body(self, b: Bytes) -> Self {
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
Self {
body: BodyData::Fixed(b),
stream: AttachedStream::Fixed(b),
..self
}
}
pub fn with_streaming_body(self, b: ByteStream) -> Self {
pub fn with_stream(self, b: ByteStream) -> Self {
Self {
body: BodyData::Stream(b),
stream: AttachedStream::Stream(b),
..self
}
}
@ -217,13 +214,13 @@ impl<M: Message> Resp<M> {
}
pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
(self.msg, self.body.into_stream())
(self.msg, self.stream.into_stream())
}
pub(crate) fn into_enc(self) -> Result<RespEnc, rmp_serde::encode::Error> {
Ok(RespEnc::Success {
msg: rmp_to_vec_all_named(&self.msg)?.into(),
stream: self.body.into_stream(),
stream: self.stream.into_stream(),
})
}
@ -234,7 +231,7 @@ impl<M: Message> Resp<M> {
Ok(Self {
_phantom: Default::default(),
msg,
body: stream.map(BodyData::Stream).unwrap_or(BodyData::None),
stream: stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None),
})
}
RespEnc::Error { code, message } => Err(Error::Remote(code, message)),
@ -249,28 +246,28 @@ where
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> {
write!(f, "Resp[{:?}", self.msg)?;
match &self.body {
BodyData::None => write!(f, "]"),
BodyData::Fixed(b) => write!(f, "; body={}]", b.len()),
BodyData::Stream(_) => write!(f, "; body=stream]"),
match &self.stream {
AttachedStream::None => write!(f, "]"),
AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()),
AttachedStream::Stream(_) => write!(f, "; stream]"),
}
}
}
// ----
pub(crate) enum BodyData {
pub(crate) enum AttachedStream {
None,
Fixed(Bytes),
Stream(ByteStream),
}
impl BodyData {
impl AttachedStream {
pub fn into_stream(self) -> Option<ByteStream> {
match self {
BodyData::None => None,
BodyData::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
BodyData::Stream(s) => Some(s),
AttachedStream::None => None,
AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))),
AttachedStream::Stream(s) => Some(s),
}
}
}