[refactor-block] move read_stream_to_end to garage_net

This commit is contained in:
Alex 2024-02-23 11:46:57 +01:00
parent 93552b9275
commit 9b41f4ff20
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
3 changed files with 28 additions and 18 deletions

View file

@ -8,7 +8,6 @@ use bytes::Bytes;
use rand::prelude::*; use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures_util::stream::StreamExt;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex, MutexGuard}; use tokio::sync::{mpsc, Mutex, MutexGuard};
@ -18,7 +17,7 @@ use opentelemetry::{
Context, Context,
}; };
use garage_net::stream::{stream_asyncread, ByteStream}; use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
use garage_db as db; use garage_db as db;
@ -247,7 +246,8 @@ impl BlockManager {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
read_stream_to_end(stream) read_stream_to_end(stream)
.await .await
.map(|data| DataBlock::from_parts(header, data)) .err_context("error in block data stream")
.map(|data| DataBlock::from_parts(header, data.into_bytes()))
}) })
.await .await
} }
@ -477,7 +477,7 @@ impl BlockManager {
stream: Option<ByteStream>, stream: Option<ByteStream>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let stream = stream.ok_or_message("missing stream")?; let stream = stream.ok_or_message("missing stream")?;
let bytes = read_stream_to_end(stream).await?; let bytes = read_stream_to_end(stream).await?.into_bytes();
let data = DataBlock::from_parts(header, bytes); let data = DataBlock::from_parts(header, bytes);
self.write_block(&hash, &data).await self.write_block(&hash, &data).await
} }
@ -823,20 +823,6 @@ impl BlockManagerLocked {
} }
} }
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
let mut parts: Vec<Bytes> = vec![];
while let Some(part) = stream.next().await {
parts.push(part.ok_or_message("error in stream")?);
}
Ok(parts
.iter()
.map(|x| &x[..])
.collect::<Vec<_>>()
.concat()
.into())
}
struct DeleteOnDrop(Option<PathBuf>); struct DeleteOnDrop(Option<PathBuf>);
impl DeleteOnDrop { impl DeleteOnDrop {

View file

@ -3,6 +3,8 @@ use std::collections::VecDeque;
use bytes::BytesMut; use bytes::BytesMut;
use crate::stream::ByteStream;
pub use bytes::Bytes; pub use bytes::Bytes;
/// A circular buffer of bytes, internally represented as a list of Bytes /// A circular buffer of bytes, internally represented as a list of Bytes
@ -119,6 +121,17 @@ impl BytesBuf {
pub fn into_slices(self) -> VecDeque<Bytes> { pub fn into_slices(self) -> VecDeque<Bytes> {
self.buf self.buf
} }
/// Return the entire buffer concatenated into a single big Bytes
pub fn into_bytes(mut self) -> Bytes {
self.take_all()
}
/// Return the content as a stream of individual chunks
pub fn into_stream(self) -> ByteStream {
use futures::stream::StreamExt;
Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x)))
}
} }
impl Default for BytesBuf { impl Default for BytesBuf {

View file

@ -200,3 +200,14 @@ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> Byte
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
tokio_util::io::StreamReader::new(stream) tokio_util::io::StreamReader::new(stream)
} }
/// Reads all of the content of a `ByteStream` into a BytesBuf
/// that contains everything
pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> {
let mut buf = BytesBuf::new();
while let Some(part) = stream.next().await {
buf.extend(part?);
}
Ok(buf)
}