add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
Showing only changes of commit 2c9d595da0 - Show all commits

View file

@ -53,7 +53,6 @@ pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
/// Req<M> is cheaply clonable to allow the request to be sent to different /// Req<M> is cheaply clonable to allow the request to be sent to different
/// peers (Clone will panic if the stream is a ByteStream). /// peers (Clone will panic if the stream is a ByteStream).
pub struct Req<M: Message> { pub struct Req<M: Message> {
pub(crate) _phantom: PhantomData<M>,
pub(crate) msg: Arc<M>, pub(crate) msg: Arc<M>,
pub(crate) msg_ser: Option<Bytes>, pub(crate) msg_ser: Option<Bytes>,
pub(crate) stream: AttachedStream, pub(crate) stream: AttachedStream,
@ -104,7 +103,6 @@ impl<M: Message> Req<M> {
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> { pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
let msg = rmp_serde::decode::from_read_ref(&enc.msg)?; let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
Ok(Req { Ok(Req {
_phantom: Default::default(),
msg: Arc::new(msg), msg: Arc::new(msg),
msg_ser: Some(enc.msg), msg_ser: Some(enc.msg),
stream: enc stream: enc
@ -124,7 +122,6 @@ impl<M: Message> IntoReq<M> for M {
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> { fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error> {
let msg_ser = rmp_to_vec_all_named(&self)?; let msg_ser = rmp_to_vec_all_named(&self)?;
Ok(Req { Ok(Req {
_phantom: Default::default(),
msg: Arc::new(self), msg: Arc::new(self),
msg_ser: Some(Bytes::from(msg_ser)), msg_ser: Some(Bytes::from(msg_ser)),
stream: AttachedStream::None, stream: AttachedStream::None,
@ -132,7 +129,6 @@ impl<M: Message> IntoReq<M> for M {
} }
fn into_req_local(self) -> Req<M> { fn into_req_local(self) -> Req<M> {
Req { Req {
_phantom: Default::default(),
msg: Arc::new(self), msg: Arc::new(self),
msg_ser: None, msg_ser: None,
stream: AttachedStream::None, stream: AttachedStream::None,
@ -159,7 +155,6 @@ impl<M: Message> Clone for Req<M> {
} }
}; };
Self { Self {
_phantom: Default::default(),
msg: self.msg.clone(), msg: self.msg.clone(),
msg_ser: self.msg_ser.clone(), msg_ser: self.msg_ser.clone(),
stream, stream,
@ -331,7 +326,7 @@ impl ReqEnc {
Self::decode_aux(stream).await.map_err(|_| Error::Framing) Self::decode_aux(stream).await.map_err(|_| Error::Framing)
} }
pub(crate) async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> { async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
let mut reader = ByteStreamReader::new(stream); let mut reader = ByteStreamReader::new(stream);
let prio = reader.read_u8().await?; let prio = reader.read_u8().await?;
@ -415,7 +410,7 @@ impl RespEnc {
Self::decode_aux(stream).await.map_err(|_| Error::Framing) Self::decode_aux(stream).await.map_err(|_| Error::Framing)
} }
pub(crate) async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> { async fn decode_aux(stream: ByteStream) -> Result<Self, ReadExactError> {
let mut reader = ByteStreamReader::new(stream); let mut reader = ByteStreamReader::new(stream);
let is_err = reader.read_u8().await?; let is_err = reader.read_u8().await?;