add streaming body to requests and responses #3
1 changed files with 4 additions and 7 deletions
|
@ -313,14 +313,13 @@ impl ReqEnc {
|
||||||
buf.put(&self.telemetry_id[..]);
|
buf.put(&self.telemetry_id[..]);
|
||||||
|
|
||||||
buf.put_u32(self.msg.len() as u32);
|
buf.put_u32(self.msg.len() as u32);
|
||||||
buf.put(&self.msg[..]);
|
|
||||||
|
|
||||||
let header = buf.freeze();
|
let header = buf.freeze();
|
||||||
|
|
||||||
if let Some(stream) = self.stream {
|
if let Some(stream) = self.stream {
|
||||||
Box::pin(futures::stream::once(async move { Ok(header) }).chain(stream))
|
Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream))
|
||||||
} else {
|
} else {
|
||||||
Box::pin(futures::stream::once(async move { Ok(header) }))
|
Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -387,16 +386,14 @@ impl RespEnc {
|
||||||
let mut buf = BytesMut::with_capacity(msg.len() + 8);
|
let mut buf = BytesMut::with_capacity(msg.len() + 8);
|
||||||
|
|
||||||
buf.put_u8(0);
|
buf.put_u8(0);
|
||||||
|
|
||||||
buf.put_u32(msg.len() as u32);
|
buf.put_u32(msg.len() as u32);
|
||||||
buf.put(&msg[..]);
|
|
||||||
|
|
||||||
let header = buf.freeze();
|
let header = buf.freeze();
|
||||||
|
|
||||||
if let Some(stream) = stream {
|
if let Some(stream) = stream {
|
||||||
Box::pin(futures::stream::once(async move { Ok(header) }).chain(stream))
|
Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream))
|
||||||
} else {
|
} else {
|
||||||
Box::pin(futures::stream::once(async move { Ok(header) }))
|
Box::pin(futures::stream::iter([Ok(header), Ok(msg)]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RespEnc::Error { code, message } => {
|
RespEnc::Error { code, message } => {
|
||||||
|
|
Loading…
Reference in a new issue