[refactor-block] add DataBlockStream type
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2024-02-23 12:22:29 +01:00
parent cd1069c1d4
commit e9c42bca34
Signed by: lx
GPG key ID: 0E496D15096376BE
2 changed files with 18 additions and 14 deletions

View file

@ -7,6 +7,8 @@ use zstd::stream::Encoder;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_net::stream::ByteStream;
#[derive(Debug, Serialize, Deserialize, Copy, Clone)] #[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub enum DataBlockHeader { pub enum DataBlockHeader {
Plain, Plain,
@ -25,6 +27,9 @@ pub type DataBlock = DataBlockElem<Bytes>;
/// A path to a possibly compressed block of data /// A path to a possibly compressed block of data
pub type DataBlockPath = DataBlockElem<PathBuf>; pub type DataBlockPath = DataBlockElem<PathBuf>;
/// A stream of possibly compressed block data
pub type DataBlockStream = DataBlockElem<ByteStream>;
impl DataBlockHeader { impl DataBlockHeader {
pub fn is_compressed(&self) -> bool { pub fn is_compressed(&self) -> bool {
matches!(self, DataBlockHeader::Compressed) matches!(self, DataBlockHeader::Compressed)

View file

@ -229,11 +229,9 @@ impl BlockManager {
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> { ) -> Result<DataBlockStream, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
Ok((header, stream)) .await
})
.await
} }
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it
@ -243,7 +241,8 @@ impl BlockManager {
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> { ) -> Result<DataBlock, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream) read_stream_to_end(stream)
.await .await
.err_context("error in block data stream") .err_context("error in block data stream")
@ -259,7 +258,7 @@ impl BlockManager {
f: F, f: F,
) -> Result<T, Error> ) -> Result<T, Error>
where where
F: Fn(DataBlockHeader, ByteStream) -> Fut, F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>, Fut: futures::Future<Output = Result<T, Error>>,
{ {
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
@ -281,8 +280,8 @@ impl BlockManager {
continue; continue;
} }
}; };
let (header, stream) = match res.into_parts() { let block_stream = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream),
(Ok(_), _) => { (Ok(_), _) => {
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue; continue;
@ -292,7 +291,7 @@ impl BlockManager {
continue; continue;
} }
}; };
match f(header, stream).await { match f(block_stream).await {
Ok(ret) => return Ok(ret), Ok(ret) => return Ok(ret),
Err(e) => { Err(e) => {
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
@ -316,14 +315,14 @@ impl BlockManager {
// ---- Public interface ---- // ---- Public interface ----
/// Ask nodes that might have a block for it, /// Ask nodes that might have a block for it, return it as a stream
/// return it as a stream
pub async fn rpc_get_block_streaming( pub async fn rpc_get_block_streaming(
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> { ) -> Result<ByteStream, Error> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
let (header, stream) = block_stream.into_parts();
match header { match header {
DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => { DataBlockHeader::Compressed => {
@ -336,7 +335,7 @@ impl BlockManager {
} }
} }
/// Ask nodes that might have a block for it /// Ask nodes that might have a block for it, return it as one big Bytes
pub async fn rpc_get_block( pub async fn rpc_get_block(
&self, &self,
hash: &Hash, hash: &Hash,