2024-02-13 12:55:41 +01:00
|
|
|
use std::cmp::Ordering;
|
|
|
|
use std::collections::VecDeque;
|
|
|
|
|
|
|
|
use bytes::BytesMut;
|
|
|
|
|
2024-02-23 11:46:57 +01:00
|
|
|
use crate::stream::ByteStream;
|
|
|
|
|
2024-02-13 12:55:41 +01:00
|
|
|
pub use bytes::Bytes;
|
|
|
|
|
|
|
|
/// A circular buffer of bytes, internally represented as a list of Bytes
|
|
|
|
/// for optimization, but that for all intent and purposes acts just like
|
|
|
|
/// a big byte slice which can be extended on the right and from which
|
|
|
|
/// stuff can be taken on the left.
|
|
|
|
pub struct BytesBuf {
|
|
|
|
buf: VecDeque<Bytes>,
|
|
|
|
buf_len: usize,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl BytesBuf {
|
|
|
|
/// Creates a new empty BytesBuf
|
|
|
|
pub fn new() -> Self {
|
|
|
|
Self {
|
|
|
|
buf: VecDeque::new(),
|
|
|
|
buf_len: 0,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns the number of bytes stored in the BytesBuf
|
|
|
|
#[inline]
|
|
|
|
pub fn len(&self) -> usize {
|
|
|
|
self.buf_len
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Returns true iff the BytesBuf contains zero bytes
|
|
|
|
#[inline]
|
|
|
|
pub fn is_empty(&self) -> bool {
|
|
|
|
self.buf_len == 0
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Adds some bytes to the right of the buffer
|
|
|
|
pub fn extend(&mut self, b: Bytes) {
|
|
|
|
if !b.is_empty() {
|
|
|
|
self.buf_len += b.len();
|
|
|
|
self.buf.push_back(b);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Takes the whole content of the buffer and returns it as a single Bytes unit
|
|
|
|
pub fn take_all(&mut self) -> Bytes {
|
|
|
|
if self.buf.is_empty() {
|
|
|
|
Bytes::new()
|
|
|
|
} else if self.buf.len() == 1 {
|
|
|
|
self.buf_len = 0;
|
|
|
|
self.buf.pop_back().unwrap()
|
|
|
|
} else {
|
|
|
|
let mut ret = BytesMut::with_capacity(self.buf_len);
|
|
|
|
for b in self.buf.iter() {
|
|
|
|
ret.extend_from_slice(&b[..]);
|
|
|
|
}
|
|
|
|
self.buf.clear();
|
|
|
|
self.buf_len = 0;
|
|
|
|
ret.freeze()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Takes at most max_len bytes from the left of the buffer
|
|
|
|
pub fn take_max(&mut self, max_len: usize) -> Bytes {
|
|
|
|
if self.buf_len <= max_len {
|
|
|
|
self.take_all()
|
|
|
|
} else {
|
|
|
|
self.take_exact_ok(max_len)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Take exactly len bytes from the left of the buffer, returns None if
|
|
|
|
/// the BytesBuf doesn't contain enough data
|
|
|
|
pub fn take_exact(&mut self, len: usize) -> Option<Bytes> {
|
|
|
|
if self.buf_len < len {
|
|
|
|
None
|
|
|
|
} else {
|
|
|
|
Some(self.take_exact_ok(len))
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
fn take_exact_ok(&mut self, len: usize) -> Bytes {
|
|
|
|
assert!(len <= self.buf_len);
|
|
|
|
let front = self.buf.pop_front().unwrap();
|
|
|
|
match front.len().cmp(&len) {
|
|
|
|
Ordering::Greater => {
|
|
|
|
self.buf.push_front(front.slice(len..));
|
|
|
|
self.buf_len -= len;
|
|
|
|
front.slice(..len)
|
|
|
|
}
|
|
|
|
Ordering::Equal => {
|
|
|
|
self.buf_len -= len;
|
|
|
|
front
|
|
|
|
}
|
|
|
|
Ordering::Less => {
|
|
|
|
let mut ret = BytesMut::with_capacity(len);
|
|
|
|
ret.extend_from_slice(&front[..]);
|
|
|
|
self.buf_len -= front.len();
|
|
|
|
while ret.len() < len {
|
|
|
|
let front = self.buf.pop_front().unwrap();
|
|
|
|
if front.len() > len - ret.len() {
|
|
|
|
let take = len - ret.len();
|
|
|
|
ret.extend_from_slice(&front[..take]);
|
|
|
|
self.buf.push_front(front.slice(take..));
|
|
|
|
self.buf_len -= take;
|
|
|
|
break;
|
|
|
|
} else {
|
|
|
|
ret.extend_from_slice(&front[..]);
|
|
|
|
self.buf_len -= front.len();
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ret.freeze()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Return the internal sequence of Bytes slices that make up the buffer
|
|
|
|
pub fn into_slices(self) -> VecDeque<Bytes> {
|
|
|
|
self.buf
|
|
|
|
}
|
2024-02-23 11:46:57 +01:00
|
|
|
|
|
|
|
/// Return the entire buffer concatenated into a single big Bytes
|
|
|
|
pub fn into_bytes(mut self) -> Bytes {
|
|
|
|
self.take_all()
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Return the content as a stream of individual chunks
|
|
|
|
pub fn into_stream(self) -> ByteStream {
|
|
|
|
use futures::stream::StreamExt;
|
|
|
|
Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x)))
|
|
|
|
}
|
2024-02-13 12:55:41 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
impl Default for BytesBuf {
|
|
|
|
fn default() -> Self {
|
|
|
|
Self::new()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<Bytes> for BytesBuf {
|
|
|
|
fn from(b: Bytes) -> BytesBuf {
|
|
|
|
let mut ret = BytesBuf::new();
|
|
|
|
ret.extend(b);
|
|
|
|
ret
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl From<BytesBuf> for Bytes {
|
|
|
|
fn from(mut b: BytesBuf) -> Bytes {
|
|
|
|
b.take_all()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
#[cfg(test)]
|
|
|
|
mod test {
|
|
|
|
use super::*;
|
|
|
|
|
|
|
|
#[test]
|
|
|
|
fn test_bytes_buf() {
|
|
|
|
let mut buf = BytesBuf::new();
|
|
|
|
assert!(buf.len() == 0);
|
|
|
|
assert!(buf.is_empty());
|
|
|
|
|
|
|
|
buf.extend(Bytes::from(b"Hello, world!".to_vec()));
|
|
|
|
assert!(buf.len() == 13);
|
|
|
|
assert!(!buf.is_empty());
|
|
|
|
|
|
|
|
buf.extend(Bytes::from(b"1234567890".to_vec()));
|
|
|
|
assert!(buf.len() == 23);
|
|
|
|
assert!(!buf.is_empty());
|
|
|
|
|
|
|
|
assert_eq!(
|
|
|
|
buf.take_all(),
|
|
|
|
Bytes::from(b"Hello, world!1234567890".to_vec())
|
|
|
|
);
|
|
|
|
assert!(buf.len() == 0);
|
|
|
|
assert!(buf.is_empty());
|
|
|
|
|
|
|
|
buf.extend(Bytes::from(b"1234567890".to_vec()));
|
|
|
|
buf.extend(Bytes::from(b"Hello, world!".to_vec()));
|
|
|
|
assert!(buf.len() == 23);
|
|
|
|
assert!(!buf.is_empty());
|
|
|
|
|
|
|
|
assert_eq!(buf.take_max(12), Bytes::from(b"1234567890He".to_vec()));
|
|
|
|
assert!(buf.len() == 11);
|
|
|
|
|
|
|
|
assert_eq!(buf.take_exact(12), None);
|
|
|
|
assert!(buf.len() == 11);
|
|
|
|
assert_eq!(
|
|
|
|
buf.take_exact(11),
|
|
|
|
Some(Bytes::from(b"llo, world!".to_vec()))
|
|
|
|
);
|
|
|
|
assert!(buf.len() == 0);
|
|
|
|
assert!(buf.is_empty());
|
|
|
|
}
|
|
|
|
}
|