diff --git a/src/bytes_buf.rs b/src/bytes_buf.rs new file mode 100644 index 0000000..46c7039 --- /dev/null +++ b/src/bytes_buf.rs @@ -0,0 +1,167 @@ +use std::collections::VecDeque; + +pub use bytes::Bytes; + +/// A circular buffer of bytes, internally represented as a list of Bytes +/// for optimization, but that for all intent and purposes acts just like +/// a big byte slice which can be extended on the right and from which +/// one can take on the left. +pub struct BytesBuf { + buf: VecDeque, + buf_len: usize, +} + +impl BytesBuf { + /// Creates a new empty BytesBuf + pub fn new() -> Self { + Self { + buf: VecDeque::new(), + buf_len: 0, + } + } + + /// Returns the number of bytes stored in the BytesBuf + #[inline] + pub fn len(&self) -> usize { + self.buf_len + } + + /// Returns true iff the BytesBuf contains zero bytes + #[inline] + pub fn is_empty(&self) -> bool { + self.buf_len == 0 + } + + /// Adds some bytes to the right of the buffer + pub fn extend(&mut self, b: Bytes) { + if !b.is_empty() { + self.buf_len += b.len(); + self.buf.push_back(b); + } + } + + /// Takes the whole content of the buffer and returns it as a single Bytes unit + pub fn take_all(&mut self) -> Bytes { + if self.buf.len() == 0 { + Bytes::new() + } else if self.buf.len() == 1 { + self.buf_len = 0; + self.buf.pop_back().unwrap() + } else { + let mut ret = Vec::with_capacity(self.buf_len); + for b in self.buf.iter() { + ret.extend(&b[..]); + } + self.buf.clear(); + self.buf_len = 0; + Bytes::from(ret) + } + } + + /// Takes at most max_len bytes from the left of the buffer + pub fn take_max(&mut self, max_len: usize) -> Bytes { + if self.buf_len <= max_len { + self.take_all() + } else { + self.take_exact_ok(max_len) + } + } + + /// Take exactly len bytes from the left of the buffer, returns None if + /// the BytesBuf doesn't contain enough data + pub fn take_exact(&mut self, len: usize) -> Option { + if self.buf_len < len { + None + } else { + Some(self.take_exact_ok(len)) + } + } + + fn take_exact_ok(&mut self, len: usize) -> Bytes { + assert!(len <= self.buf_len); + let front = self.buf.pop_front().unwrap(); + if front.len() > len { + self.buf.push_front(front.slice(len..)); + self.buf_len -= len; + front.slice(..len) + } else if front.len() == len { + self.buf_len -= len; + front + } else { + let mut ret = Vec::with_capacity(len); + ret.extend(&front[..]); + self.buf_len -= front.len(); + while ret.len() < len { + let front = self.buf.pop_front().unwrap(); + if front.len() > len - ret.len() { + let take = len - ret.len(); + ret.extend(front.slice(..take)); + self.buf.push_front(front.slice(take..)); + self.buf_len -= take; + break; + } else { + ret.extend(&front[..]); + self.buf_len -= front.len(); + } + } + Bytes::from(ret) + } + } + + /// Return the internal sequence of Bytes slices that make up the buffer + pub fn into_slices(self) -> VecDeque { + self.buf + } +} + +impl From for BytesBuf { + fn from(b: Bytes) -> BytesBuf { + let mut ret = BytesBuf::new(); + ret.extend(b); + ret + } +} + +impl From for Bytes { + fn from(mut b: BytesBuf) -> Bytes { + b.take_all() + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn test_bytes_buf() { + let mut buf = BytesBuf::new(); + assert!(buf.len() == 0); + assert!(buf.is_empty()); + + buf.extend(Bytes::from(b"Hello, world!".to_vec())); + assert!(buf.len() == 13); + assert!(!buf.is_empty()); + + buf.extend(Bytes::from(b"1234567890".to_vec())); + assert!(buf.len() == 23); + assert!(!buf.is_empty()); + + assert_eq!(buf.take_all(), Bytes::from(b"Hello, world!1234567890".to_vec())); + assert!(buf.len() == 0); + assert!(buf.is_empty()); + + buf.extend(Bytes::from(b"1234567890".to_vec())); + buf.extend(Bytes::from(b"Hello, world!".to_vec())); + assert!(buf.len() == 23); + assert!(!buf.is_empty()); + + assert_eq!(buf.take_max(12), Bytes::from(b"1234567890He".to_vec())); + assert!(buf.len() == 11); + + assert_eq!(buf.take_exact(12), None); + assert!(buf.len() == 11); + assert_eq!(buf.take_exact(11), Some(Bytes::from(b"llo, world!".to_vec()))); + assert!(buf.len() == 0); + assert!(buf.is_empty()); + } +} diff --git a/src/lib.rs b/src/lib.rs index bd41048..18091c8 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -16,6 +16,7 @@ pub mod error; pub mod stream; pub mod util; +pub mod bytes_buf; pub mod endpoint; pub mod message; diff --git a/src/stream.rs b/src/stream.rs index 5ba2ed4..cc664ce 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -1,4 +1,3 @@ -use std::collections::VecDeque; use std::pin::Pin; use std::task::{Context, Poll}; @@ -8,6 +7,8 @@ use futures::Future; use futures::{Stream, StreamExt, TryStreamExt}; use tokio::io::AsyncRead; +use crate::bytes_buf::BytesBuf; + /// A stream of associated data. /// /// When sent through Netapp, the Vec may be split in smaller chunk in such a way @@ -23,8 +24,7 @@ pub type Packet = Result; pub struct ByteStreamReader { stream: ByteStream, - buf: VecDeque, - buf_len: usize, + buf: BytesBuf, eos: bool, err: Option, } @@ -33,8 +33,7 @@ impl ByteStreamReader { pub fn new(stream: ByteStream) -> Self { ByteStreamReader { stream, - buf: VecDeque::with_capacity(8), - buf_len: 0, + buf: BytesBuf::new(), eos: false, err: None, } @@ -75,7 +74,7 @@ impl ByteStreamReader { } pub fn into_stream(self) -> ByteStream { - let buf_stream = futures::stream::iter(self.buf.into_iter().map(Ok)); + let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok)); if let Some(err) = self.err { Box::pin(buf_stream.chain(futures::stream::once(async move { Err(err) }))) } else if self.eos { @@ -86,45 +85,15 @@ impl ByteStreamReader { } pub fn take_buffer(&mut self) -> Bytes { - let bytes = Bytes::from(self.buf.iter().map(|x| &x[..]).collect::>().concat()); - self.buf.clear(); - self.buf_len = 0; - bytes + self.buf.take_all() } pub fn eos(&self) -> bool { - self.buf_len == 0 && self.eos + self.buf.is_empty() && self.eos } fn try_get(&mut self, read_len: usize) -> Option { - if self.buf_len >= read_len { - let mut slices = Vec::with_capacity(self.buf.len()); - let mut taken = 0; - while taken < read_len { - let front = self.buf.pop_front().unwrap(); - if taken + front.len() <= read_len { - taken += front.len(); - self.buf_len -= front.len(); - slices.push(front); - } else { - let front_take = read_len - taken; - slices.push(front.slice(..front_take)); - self.buf.push_front(front.slice(front_take..)); - self.buf_len -= front_take; - break; - } - } - Some( - slices - .iter() - .map(|x| &x[..]) - .collect::>() - .concat() - .into(), - ) - } else { - None - } + self.buf.take_exact(read_len) } } @@ -164,10 +133,7 @@ impl<'a> Future for ByteStreamReadExact<'a> { match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { Some(Ok(slice)) => { - if !slice.is_empty() { - this.reader.buf_len += slice.len(); - this.reader.buf.push_back(slice); - } + this.reader.buf.extend(slice); } Some(Err(e)) => { this.reader.err = Some(e);