Compare commits

..

No commits in common. "916c67ccf4c9d31c14088f2d775e15c64750458f" and "81cebdd12415381f67747e96591e83b1a4a8cc0b" have entirely different histories.

6 changed files with 153 additions and 135 deletions

View file

@ -13,7 +13,7 @@ use http::header::{
use hyper::{body::Body, Request, Response, StatusCode}; use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use garage_net::stream::ByteStream; use garage_block::manager::BlockStream;
use garage_rpc::rpc_helper::OrderTag; use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey; use garage_table::EmptyKey;
use garage_util::data::*; use garage_util::data::*;
@ -286,7 +286,7 @@ pub async fn handle_get(
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
} }
ObjectVersionData::FirstBlock(_, first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel::<ByteStream>(2); let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash; let first_block_hash = *first_block_hash;
@ -494,7 +494,7 @@ fn body_from_blocks_range(
} }
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let (tx, rx) = mpsc::channel::<ByteStream>(2); let (tx, rx) = mpsc::channel::<BlockStream>(2);
tokio::spawn(async move { tokio::spawn(async move {
match async { match async {
@ -542,7 +542,7 @@ fn body_from_blocks_range(
}) })
.filter_map(futures::future::ready); .filter_map(futures::future::ready);
let block_stream: ByteStream = Box::pin(block_stream); let block_stream: BlockStream = Box::pin(block_stream);
tx.send(Box::pin(block_stream)) tx.send(Box::pin(block_stream))
.await .await
.ok_or_message("channel closed")?; .ok_or_message("channel closed")?;
@ -562,7 +562,7 @@ fn body_from_blocks_range(
response_body_from_block_stream(rx) response_body_from_block_stream(rx)
} }
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody { fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
.flatten() .flatten()
.map(|x| { .map(|x| {
@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
ResBody::new(http_body_util::StreamBody::new(body_stream)) ResBody::new(http_body_util::StreamBody::new(body_stream))
} }
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream { fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
let err = std::io::Error::new( let err = std::io::Error::new(
std::io::ErrorKind::Other, std::io::ErrorKind::Other,
format!("Error while getting object data: {}", e), format!("Error while getting object data: {}", e),

View file

@ -2,98 +2,107 @@ use std::path::PathBuf;
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use zstd::stream::Encoder; use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_net::stream::ByteStream;
#[derive(Debug, Serialize, Deserialize, Copy, Clone)] #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub enum DataBlockHeader { pub enum DataBlockHeader {
Plain, Plain,
Compressed, Compressed,
} }
#[derive(Debug)]
pub struct DataBlockElem<T> {
header: DataBlockHeader,
elem: T,
}
/// A possibly compressed block of data /// A possibly compressed block of data
pub type DataBlock = DataBlockElem<Bytes>; pub enum DataBlock {
/// Uncompressed data
/// A path to a possibly compressed block of data Plain(Bytes),
pub type DataBlockPath = DataBlockElem<PathBuf>; /// Data compressed with zstd
Compressed(Bytes),
/// A stream of possibly compressed block data
pub type DataBlockStream = DataBlockElem<ByteStream>;
impl DataBlockHeader {
pub fn is_compressed(&self) -> bool {
matches!(self, DataBlockHeader::Compressed)
}
} }
impl<T> DataBlockElem<T> { #[derive(Debug)]
pub fn from_parts(header: DataBlockHeader, elem: T) -> Self { pub enum DataBlockPath {
Self { header, elem } /// Uncompressed data fail
} Plain(PathBuf),
/// Compressed data fail
pub fn plain(elem: T) -> Self { Compressed(PathBuf),
Self {
header: DataBlockHeader::Plain,
elem,
}
}
pub fn compressed(elem: T) -> Self {
Self {
header: DataBlockHeader::Compressed,
elem,
}
}
pub fn into_parts(self) -> (DataBlockHeader, T) {
(self.header, self.elem)
}
pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) {
(self.header, &self.elem)
}
} }
impl DataBlock { impl DataBlock {
/// Verify data integrity. Does not return the buffer content. /// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {
matches!(self, DataBlock::Compressed(_))
}
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
/// instead
pub fn inner_buffer(&self) -> &[u8] {
use DataBlock::*;
let (Plain(ref res) | Compressed(ref res)) = self;
res
}
/// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead.
pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(&data) == hash {
Ok(data)
} else {
Err(Error::CorruptData(hash))
}
}
DataBlock::Compressed(data) => zstd_decode(&data[..])
.map_err(|_| Error::CorruptData(hash))
.map(Bytes::from),
}
}
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
/// does not return the buffer content.
pub fn verify(&self, hash: Hash) -> Result<(), Error> { pub fn verify(&self, hash: Hash) -> Result<(), Error> {
match self.header { match self {
DataBlockHeader::Plain => { DataBlock::Plain(data) => {
if blake2sum(&self.elem) == hash { if blake2sum(data) == hash {
Ok(()) Ok(())
} else { } else {
Err(Error::CorruptData(hash)) Err(Error::CorruptData(hash))
} }
} }
DataBlockHeader::Compressed => { DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
zstd::stream::copy_decode(&self.elem[..], std::io::sink()) .map_err(|_| Error::CorruptData(hash)),
.map_err(|_| Error::CorruptData(hash))
}
} }
} }
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock { pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
tokio::task::spawn_blocking(move || { tokio::task::spawn_blocking(move || {
if let Some(level) = level { if let Some(level) = level {
if let Ok(data_compressed) = zstd_encode(&data[..], level) { if let Ok(data) = zstd_encode(&data[..], level) {
return DataBlock::compressed(data_compressed.into()); return DataBlock::Compressed(data.into());
} }
} }
DataBlock::plain(data.into()) DataBlock::Plain(data)
}) })
.await .await
.unwrap() .unwrap()
} }
pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
match self {
DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
}
}
pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
match h {
DataBlockHeader::Plain => DataBlock::Plain(bytes),
DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
}
}
} }
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {

View file

@ -1,4 +1,5 @@
use std::path::PathBuf; use std::path::PathBuf;
use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -8,6 +9,8 @@ use bytes::Bytes;
use rand::prelude::*; use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures::Stream;
use futures_util::stream::StreamExt;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex, MutexGuard}; use tokio::sync::{mpsc, Mutex, MutexGuard};
@ -17,7 +20,7 @@ use opentelemetry::{
Context, Context,
}; };
use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_net::stream::{stream_asyncread, ByteStream};
use garage_db as db; use garage_db as db;
@ -50,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally. // to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub type BlockStream =
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
/// RPC messages used to share blocks of data between nodes /// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc { pub enum BlockRpc {
@ -229,9 +235,11 @@ impl BlockManager {
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<DataBlockStream, Error> { ) -> Result<(DataBlockHeader, ByteStream), Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
.await Ok((header, stream))
})
.await
} }
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it
@ -241,12 +249,10 @@ impl BlockManager {
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> { ) -> Result<DataBlock, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream) read_stream_to_end(stream)
.await .await
.err_context("error in block data stream") .map(|data| DataBlock::from_parts(header, data))
.map(|data| DataBlock::from_parts(header, data.into_bytes()))
}) })
.await .await
} }
@ -258,7 +264,7 @@ impl BlockManager {
f: F, f: F,
) -> Result<T, Error> ) -> Result<T, Error>
where where
F: Fn(DataBlockStream) -> Fut, F: Fn(DataBlockHeader, ByteStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>, Fut: futures::Future<Output = Result<T, Error>>,
{ {
let who = self let who = self
@ -282,8 +288,8 @@ impl BlockManager {
continue; continue;
} }
}; };
let block_stream = match res.into_parts() { let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream), (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
(Ok(_), _) => { (Ok(_), _) => {
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue; continue;
@ -293,7 +299,7 @@ impl BlockManager {
continue; continue;
} }
}; };
match f(block_stream).await { match f(header, stream).await {
Ok(ret) => return Ok(ret), Ok(ret) => return Ok(ret),
Err(e) => { Err(e) => {
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
@ -317,14 +323,14 @@ impl BlockManager {
// ---- Public interface ---- // ---- Public interface ----
/// Ask nodes that might have a block for it, return it as a stream /// Ask nodes that might have a block for it,
/// return it as a stream
pub async fn rpc_get_block_streaming( pub async fn rpc_get_block_streaming(
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> { ) -> Result<BlockStream, Error> {
let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
let (header, stream) = block_stream.into_parts();
match header { match header {
DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => { DataBlockHeader::Compressed => {
@ -337,14 +343,15 @@ impl BlockManager {
} }
} }
/// Ask nodes that might have a block for it, return it as one big Bytes /// Ask nodes that might have a block for it
pub async fn rpc_get_block( pub async fn rpc_get_block(
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<Bytes, Error> { ) -> Result<Bytes, Error> {
let stream = self.rpc_get_block_streaming(hash, order_tag).await?; self.rpc_get_raw_block(hash, order_tag)
Ok(read_stream_to_end(stream).await?.into_bytes()) .await?
.verify_get(*hash)
} }
/// Send block to nodes that should have it /// Send block to nodes that should have it
@ -477,7 +484,7 @@ impl BlockManager {
stream: Option<ByteStream>, stream: Option<ByteStream>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let stream = stream.ok_or_message("missing stream")?; let stream = stream.ok_or_message("missing stream")?;
let bytes = read_stream_to_end(stream).await?.into_bytes(); let bytes = read_stream_to_end(stream).await?;
let data = DataBlock::from_parts(header, bytes); let data = DataBlock::from_parts(header, bytes);
self.write_block(&hash, &data).await self.write_block(&hash, &data).await
} }
@ -548,7 +555,10 @@ impl BlockManager {
hash: &Hash, hash: &Hash,
block_path: &DataBlockPath, block_path: &DataBlockPath,
) -> Result<DataBlock, Error> { ) -> Result<DataBlock, Error> {
let (header, path) = block_path.as_parts_ref(); let (path, compressed) = match block_path {
DataBlockPath::Plain(p) => (p, false),
DataBlockPath::Compressed(p) => (p, true),
};
let mut f = fs::File::open(&path).await?; let mut f = fs::File::open(&path).await?;
let mut data = vec![]; let mut data = vec![];
@ -556,7 +566,11 @@ impl BlockManager {
self.metrics.bytes_read.add(data.len() as u64); self.metrics.bytes_read.add(data.len() as u64);
drop(f); drop(f);
let data = DataBlock::from_parts(header, data.into()); let data = if compressed {
DataBlock::Compressed(data.into())
} else {
DataBlock::Plain(data.into())
};
if data.verify(*hash).is_err() { if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1); self.metrics.corruption_counter.add(1);
@ -609,20 +623,20 @@ impl BlockManager {
// first and then a compressed one (as compression may have been // first and then a compressed one (as compression may have been
// previously enabled). // previously enabled).
if fs::metadata(&path).await.is_ok() { if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::plain(path)); return Some(DataBlockPath::Plain(path));
} }
path.set_extension("zst"); path.set_extension("zst");
if fs::metadata(&path).await.is_ok() { if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::compressed(path)); return Some(DataBlockPath::Compressed(path));
} }
} else { } else {
path.set_extension("zst"); path.set_extension("zst");
if fs::metadata(&path).await.is_ok() { if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::compressed(path)); return Some(DataBlockPath::Compressed(path));
} }
path.set_extension(""); path.set_extension("");
if fs::metadata(&path).await.is_ok() { if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::plain(path)); return Some(DataBlockPath::Plain(path));
} }
} }
} }
@ -692,8 +706,8 @@ impl BlockManagerLocked {
mgr: &BlockManager, mgr: &BlockManager,
existing_path: Option<DataBlockPath>, existing_path: Option<DataBlockPath>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let (header, data) = data.as_parts_ref(); let compressed = data.is_compressed();
let compressed = header.is_compressed(); let data = data.inner_buffer();
let directory = mgr.data_layout.load().primary_block_dir(hash); let directory = mgr.data_layout.load().primary_block_dir(hash);
@ -703,25 +717,24 @@ impl BlockManagerLocked {
tgt_path.set_extension("zst"); tgt_path.set_extension("zst");
} }
let existing_info = existing_path.map(|x| x.into_parts()); let to_delete = match (existing_path, compressed) {
let to_delete = match (existing_info, compressed) {
// If the block is stored in the wrong directory, // If the block is stored in the wrong directory,
// write it again at the correct path and delete the old path // write it again at the correct path and delete the old path
(Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p), (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
(Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p), (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
// If the block is already stored not compressed but we have a compressed // If the block is already stored not compressed but we have a compressed
// copy, write the compressed copy and delete the uncompressed one // copy, write the compressed copy and delete the uncompressed one
(Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path), (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
// If the block is already stored compressed, // If the block is already stored compressed,
// keep the stored copy, we have nothing to do // keep the stored copy, we have nothing to do
(Some((DataBlockHeader::Compressed, _)), _) => return Ok(()), (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
// If the block is already stored not compressed, // If the block is already stored not compressed,
// and we don't have a compressed copy either, // and we don't have a compressed copy either,
// keep the stored copy, we have nothing to do // keep the stored copy, we have nothing to do
(Some((DataBlockHeader::Plain, _)), false) => return Ok(()), (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
// If the block isn't stored already, just store what is given to us // If the block isn't stored already, just store what is given to us
(None, _) => None, (None, _) => None,
@ -773,14 +786,18 @@ impl BlockManagerLocked {
} }
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
let (header, path) = block_path.as_parts_ref(); let (path, path2) = match block_path {
DataBlockPath::Plain(p) => {
let mut path2 = path.clone(); let mut p2 = p.clone();
if header.is_compressed() { p2.set_extension("corrupted");
path2.set_extension("zst.corrupted"); (p, p2)
} else { }
path2.set_extension("corrupted"); DataBlockPath::Compressed(p) => {
} let mut p2 = p.clone();
p2.set_extension("zst.corrupted");
(p, p2)
}
};
fs::rename(path, path2).await?; fs::rename(path, path2).await?;
Ok(()) Ok(())
@ -790,7 +807,9 @@ impl BlockManagerLocked {
let rc = mgr.rc.get_block_rc(hash)?; let rc = mgr.rc.get_block_rc(hash)?;
if rc.is_deletable() { if rc.is_deletable() {
while let Some(path) = mgr.find_block(hash).await { while let Some(path) = mgr.find_block(hash).await {
let (_header, path) = path.as_parts_ref(); let path = match path {
DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
};
fs::remove_file(path).await?; fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1); mgr.metrics.delete_counter.add(1);
} }
@ -807,10 +826,24 @@ impl BlockManagerLocked {
let data = mgr.read_block_from(hash, &wrong_path).await?; let data = mgr.read_block_from(hash, &wrong_path).await?;
self.write_block_inner(hash, &data, mgr, Some(wrong_path)) self.write_block_inner(hash, &data, mgr, Some(wrong_path))
.await?; .await?;
Ok(data.as_parts_ref().1.len()) Ok(data.inner_buffer().len())
} }
} }
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
let mut parts: Vec<Bytes> = vec![];
while let Some(part) = stream.next().await {
parts.push(part.ok_or_message("error in stream")?);
}
Ok(parts
.iter()
.map(|x| &x[..])
.collect::<Vec<_>>()
.concat()
.into())
}
struct DeleteOnDrop(Option<PathBuf>); struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop { impl DeleteOnDrop {

View file

@ -584,8 +584,8 @@ impl Worker for RebalanceWorker {
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
if path.ancestors().all(|x| x != prim_loc) { if path.ancestors().all(|x| x != prim_loc) {
let block_path = match path.extension() { let block_path = match path.extension() {
None => DataBlockPath::plain(path.clone()), None => DataBlockPath::Plain(path.clone()),
Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()), Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
_ => { _ => {
warn!("not rebalancing file: {}", path.to_string_lossy()); warn!("not rebalancing file: {}", path.to_string_lossy());
return Ok(WorkerState::Busy); return Ok(WorkerState::Busy);

View file

@ -3,8 +3,6 @@ use std::collections::VecDeque;
use bytes::BytesMut; use bytes::BytesMut;
use crate::stream::ByteStream;
pub use bytes::Bytes; pub use bytes::Bytes;
/// A circular buffer of bytes, internally represented as a list of Bytes /// A circular buffer of bytes, internally represented as a list of Bytes
@ -121,17 +119,6 @@ impl BytesBuf {
pub fn into_slices(self) -> VecDeque<Bytes> { pub fn into_slices(self) -> VecDeque<Bytes> {
self.buf self.buf
} }
/// Return the entire buffer concatenated into a single big Bytes
pub fn into_bytes(mut self) -> Bytes {
self.take_all()
}
/// Return the content as a stream of individual chunks
pub fn into_stream(self) -> ByteStream {
use futures::stream::StreamExt;
Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x)))
}
} }
impl Default for BytesBuf { impl Default for BytesBuf {

View file

@ -200,14 +200,3 @@ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> Byte
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
tokio_util::io::StreamReader::new(stream) tokio_util::io::StreamReader::new(stream)
} }
/// Reads all of the content of a `ByteStream` into a BytesBuf
/// that contains everything
pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> {
let mut buf = BytesBuf::new();
while let Some(part) = stream.next().await {
buf.extend(part?);
}
Ok(buf)
}