add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
2 changed files with 10 additions and 9 deletions
Showing only changes of commit 2305c2cf03 - Show all commits

View file

@ -1,5 +1,7 @@
use std::collections::VecDeque; use std::collections::VecDeque;
use bytes::BytesMut;
pub use bytes::Bytes; pub use bytes::Bytes;
/// A circular buffer of bytes, internally represented as a list of Bytes /// A circular buffer of bytes, internally represented as a list of Bytes
@ -48,13 +50,13 @@ impl BytesBuf {
self.buf_len = 0; self.buf_len = 0;
self.buf.pop_back().unwrap() self.buf.pop_back().unwrap()
} else { } else {
let mut ret = Vec::with_capacity(self.buf_len); let mut ret = BytesMut::with_capacity(self.buf_len);
for b in self.buf.iter() { for b in self.buf.iter() {
ret.extend(&b[..]); ret.extend_from_slice(&b[..]);
} }
self.buf.clear(); self.buf.clear();
self.buf_len = 0; self.buf_len = 0;
Bytes::from(ret) ret.freeze()
} }
} }
@ -88,23 +90,23 @@ impl BytesBuf {
self.buf_len -= len; self.buf_len -= len;
front front
} else { } else {
let mut ret = Vec::with_capacity(len); let mut ret = BytesMut::with_capacity(len);
ret.extend(&front[..]); ret.extend_from_slice(&front[..]);
self.buf_len -= front.len(); self.buf_len -= front.len();
while ret.len() < len { while ret.len() < len {
let front = self.buf.pop_front().unwrap(); let front = self.buf.pop_front().unwrap();
if front.len() > len - ret.len() { if front.len() > len - ret.len() {
let take = len - ret.len(); let take = len - ret.len();
ret.extend(front.slice(..take)); ret.extend_from_slice(&front[..take]);
self.buf.push_front(front.slice(take..)); self.buf.push_front(front.slice(take..));
self.buf_len -= take; self.buf_len -= take;
break; break;
} else { } else {
ret.extend(&front[..]); ret.extend_from_slice(&front[..]);
self.buf_len -= front.len(); self.buf_len -= front.len();
} }
} }
Bytes::from(ret) ret.freeze()
} }
} }

View file

@ -52,7 +52,6 @@ pub struct OrderTag(pub(crate) u64, pub(crate) u64);
#[derive(Clone, Copy)] #[derive(Clone, Copy)]
pub struct OrderTagStream(u64); pub struct OrderTagStream(u64);
impl OrderTag { impl OrderTag {
/// Create a new stream from which to generate order tags. Example: /// Create a new stream from which to generate order tags. Example:
/// ``` /// ```