add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
3 changed files with 18 additions and 29 deletions
Showing only changes of commit 50358b944a - Show all commits

View file

@ -103,7 +103,10 @@ impl<M: Message> Req<M> {
_phantom: Default::default(), _phantom: Default::default(),
msg: Arc::new(msg), msg: Arc::new(msg),
msg_ser: Some(enc.msg), msg_ser: Some(enc.msg),
stream: enc.stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None), stream: enc
.stream
.map(AttachedStream::Stream)
.unwrap_or(AttachedStream::None),
}) })
} }
} }
@ -147,7 +150,9 @@ impl<M: Message> Clone for Req<M> {
let stream = match &self.stream { let stream = match &self.stream {
AttachedStream::None => AttachedStream::None, AttachedStream::None => AttachedStream::None,
AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()), AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()),
AttachedStream::Stream(_) => panic!("Cannot clone a Req<_> with a non-buffer attached stream"), AttachedStream::Stream(_) => {
panic!("Cannot clone a Req<_> with a non-buffer attached stream")
}
}; };
Self { Self {
_phantom: Default::default(), _phantom: Default::default(),
@ -231,7 +236,9 @@ impl<M: Message> Resp<M> {
Ok(Self { Ok(Self {
_phantom: Default::default(), _phantom: Default::default(),
msg, msg,
stream: stream.map(AttachedStream::Stream).unwrap_or(AttachedStream::None), stream: stream
.map(AttachedStream::Stream)
.unwrap_or(AttachedStream::None),
}) })
} }
RespEnc::Error { code, message } => Err(Error::Remote(code, message)), RespEnc::Error { code, message } => Err(Error::Remote(code, message)),
@ -293,7 +300,9 @@ pub(crate) struct ReqEnc {
impl ReqEnc { impl ReqEnc {
pub(crate) fn encode(self) -> ByteStream { pub(crate) fn encode(self) -> ByteStream {
let mut buf = BytesMut::with_capacity(64); let mut buf = BytesMut::with_capacity(
self.path.len() + self.telemetry_id.len() + self.msg.len() + 16,
);
buf.put_u8(self.prio); buf.put_u8(self.prio);
@ -375,7 +384,7 @@ impl RespEnc {
pub(crate) fn encode(self) -> ByteStream { pub(crate) fn encode(self) -> ByteStream {
match self { match self {
RespEnc::Success { msg, stream } => { RespEnc::Success { msg, stream } => {
let mut buf = BytesMut::with_capacity(64); let mut buf = BytesMut::with_capacity(msg.len() + 8);
buf.put_u8(0); buf.put_u8(0);
@ -391,7 +400,7 @@ impl RespEnc {
} }
} }
RespEnc::Error { code, message } => { RespEnc::Error { code, message } => {
let mut buf = BytesMut::with_capacity(64); let mut buf = BytesMut::with_capacity(message.len() + 8);
buf.put_u8(1 + message.len() as u8); buf.put_u8(1 + message.len() as u8);
buf.put_u8(code); buf.put_u8(code);
buf.put(message.as_bytes()); buf.put(message.as_bytes());

View file

@ -30,7 +30,7 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
struct SendQueue { struct SendQueue {
items: VecDeque<(u8, VecDeque<SendQueueItem>)>, items: Vec<(u8, VecDeque<SendQueueItem>)>,
} }
struct SendQueueItem { struct SendQueueItem {
@ -42,7 +42,7 @@ struct SendQueueItem {
impl SendQueue { impl SendQueue {
fn new() -> Self { fn new() -> Self {
Self { Self {
items: VecDeque::with_capacity(64), items: Vec::with_capacity(64),
} }
} }
fn push(&mut self, item: SendQueueItem) { fn push(&mut self, item: SendQueueItem) {
@ -56,20 +56,6 @@ impl SendQueue {
}; };
self.items[pos_prio].1.push_back(item); self.items[pos_prio].1.push_back(item);
} }
// used only in tests. They should probably be rewriten
#[allow(dead_code)]
fn pop(&mut self) -> Option<SendQueueItem> {
match self.items.pop_front() {
None => None,
Some((prio, mut items_at_prio)) => {
let ret = items_at_prio.pop_front();
if !items_at_prio.is_empty() {
self.items.push_front((prio, items_at_prio));
}
ret.or_else(|| self.pop())
}
}
}
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.items.iter().all(|(_k, v)| v.is_empty()) self.items.iter().all(|(_k, v)| v.is_empty())
} }

View file

@ -83,13 +83,7 @@ impl ByteStreamReader {
} }
pub fn take_buffer(&mut self) -> Bytes { pub fn take_buffer(&mut self) -> Bytes {
let bytes = Bytes::from( let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::<Vec<_>>().concat());
self .buf
.iter()
.map(|x| &x[..])
.collect::<Vec<_>>()
.concat(),
);
self.buf.clear(); self.buf.clear();
self.buf_len = 0; self.buf_len = 0;
bytes bytes