From 9b41f4ff2016b099ee706747828f5828f8c96b1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 23 Feb 2024 11:46:57 +0100 Subject: [PATCH] [refactor-block] move read_stream_to_end to garage_net --- src/block/manager.rs | 22 ++++------------------ src/net/bytes_buf.rs | 13 +++++++++++++ src/net/stream.rs | 11 +++++++++++ 3 files changed, 28 insertions(+), 18 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 96ea2c96..54290888 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{mpsc, Mutex, MutexGuard}; @@ -18,7 +17,7 @@ use opentelemetry::{ Context, }; -use garage_net::stream::{stream_asyncread, ByteStream}; +use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_db as db; @@ -247,7 +246,8 @@ impl BlockManager { self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { read_stream_to_end(stream) .await - .map(|data| DataBlock::from_parts(header, data)) + .err_context("error in block data stream") + .map(|data| DataBlock::from_parts(header, data.into_bytes())) }) .await } @@ -477,7 +477,7 @@ impl BlockManager { stream: Option, ) -> Result<(), Error> { let stream = stream.ok_or_message("missing stream")?; - let bytes = read_stream_to_end(stream).await?; + let bytes = read_stream_to_end(stream).await?.into_bytes(); let data = DataBlock::from_parts(header, bytes); self.write_block(&hash, &data).await } @@ -823,20 +823,6 @@ impl BlockManagerLocked { } } -async fn read_stream_to_end(mut stream: ByteStream) -> Result { - let mut parts: Vec = vec![]; - while let Some(part) = stream.next().await { - parts.push(part.ok_or_message("error in stream")?); - } - - Ok(parts - .iter() - .map(|x| &x[..]) - .collect::>() - .concat() - .into()) -} - struct DeleteOnDrop(Option); impl DeleteOnDrop { diff --git a/src/net/bytes_buf.rs b/src/net/bytes_buf.rs index 3929a860..1d928ffb 100644 --- a/src/net/bytes_buf.rs +++ b/src/net/bytes_buf.rs @@ -3,6 +3,8 @@ use std::collections::VecDeque; use bytes::BytesMut; +use crate::stream::ByteStream; + pub use bytes::Bytes; /// A circular buffer of bytes, internally represented as a list of Bytes @@ -119,6 +121,17 @@ impl BytesBuf { pub fn into_slices(self) -> VecDeque { self.buf } + + /// Return the entire buffer concatenated into a single big Bytes + pub fn into_bytes(mut self) -> Bytes { + self.take_all() + } + + /// Return the content as a stream of individual chunks + pub fn into_stream(self) -> ByteStream { + use futures::stream::StreamExt; + Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x))) + } } impl Default for BytesBuf { diff --git a/src/net/stream.rs b/src/net/stream.rs index 88c3fed4..3ac6896d 100644 --- a/src/net/stream.rs +++ b/src/net/stream.rs @@ -200,3 +200,14 @@ pub fn asyncread_stream(reader: R) -> Byte pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { tokio_util::io::StreamReader::new(stream) } + +/// Reads all of the content of a `ByteStream` into a BytesBuf +/// that contains everything +pub async fn read_stream_to_end(mut stream: ByteStream) -> Result { + let mut buf = BytesBuf::new(); + while let Some(part) = stream.next().await { + buf.extend(part?); + } + + Ok(buf) +}