diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 53f0a345..efb8d4ab 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -13,7 +13,7 @@ use http::header::{ use hyper::{body::Body, Request, Response, StatusCode}; use tokio::sync::mpsc; -use garage_block::manager::BlockStream; +use garage_net::stream::ByteStream; use garage_rpc::rpc_helper::OrderTag; use garage_table::EmptyKey; use garage_util::data::*; @@ -286,7 +286,7 @@ pub async fn handle_get( Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) } ObjectVersionData::FirstBlock(_, first_block_hash) => { - let (tx, rx) = mpsc::channel::(2); + let (tx, rx) = mpsc::channel::(2); let order_stream = OrderTag::stream(); let first_block_hash = *first_block_hash; @@ -494,7 +494,7 @@ fn body_from_blocks_range( } let order_stream = OrderTag::stream(); - let (tx, rx) = mpsc::channel::(2); + let (tx, rx) = mpsc::channel::(2); tokio::spawn(async move { match async { @@ -542,7 +542,7 @@ fn body_from_blocks_range( }) .filter_map(futures::future::ready); - let block_stream: BlockStream = Box::pin(block_stream); + let block_stream: ByteStream = Box::pin(block_stream); tx.send(Box::pin(block_stream)) .await .ok_or_message("channel closed")?; @@ -562,7 +562,7 @@ fn body_from_blocks_range( response_body_from_block_stream(rx) } -fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { +fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) .flatten() .map(|x| { @@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver) -> ResBody { ResBody::new(http_body_util::StreamBody::new(body_stream)) } -fn error_stream_item(e: E) -> BlockStream { +fn error_stream_item(e: E) -> ByteStream { let err = std::io::Error::new( std::io::ErrorKind::Other, format!("Error while getting object data: {}", e), diff --git a/src/block/block.rs b/src/block/block.rs index 20f57aa5..504d11f8 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -2,107 +2,98 @@ use std::path::PathBuf; use bytes::Bytes; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; +use zstd::stream::Encoder; use garage_util::data::*; use garage_util::error::*; +use garage_net::stream::ByteStream; + #[derive(Debug, Serialize, Deserialize, Copy, Clone)] pub enum DataBlockHeader { Plain, Compressed, } -/// A possibly compressed block of data -pub enum DataBlock { - /// Uncompressed data - Plain(Bytes), - /// Data compressed with zstd - Compressed(Bytes), -} - #[derive(Debug)] -pub enum DataBlockPath { - /// Uncompressed data fail - Plain(PathBuf), - /// Compressed data fail - Compressed(PathBuf), +pub struct DataBlockElem { + header: DataBlockHeader, + elem: T, } -impl DataBlock { - /// Query whether this block is compressed +/// A possibly compressed block of data +pub type DataBlock = DataBlockElem; + +/// A path to a possibly compressed block of data +pub type DataBlockPath = DataBlockElem; + +/// A stream of possibly compressed block data +pub type DataBlockStream = DataBlockElem; + +impl DataBlockHeader { pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) + matches!(self, DataBlockHeader::Compressed) + } +} + +impl DataBlockElem { + pub fn from_parts(header: DataBlockHeader, elem: T) -> Self { + Self { header, elem } } - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res - } - - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd_decode(&data[..]) - .map_err(|_| Error::CorruptData(hash)) - .map(Bytes::from), + pub fn plain(elem: T) -> Self { + Self { + header: DataBlockHeader::Plain, + elem, } } - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. + pub fn compressed(elem: T) -> Self { + Self { + header: DataBlockHeader::Compressed, + elem, + } + } + + pub fn into_parts(self) -> (DataBlockHeader, T) { + (self.header, self.elem) + } + + pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { + (self.header, &self.elem) + } +} + +impl DataBlock { + /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { + match self.header { + DataBlockHeader::Plain => { + if blake2sum(&self.elem) == hash { Ok(()) } else { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), + DataBlockHeader::Compressed => { + zstd::stream::copy_decode(&self.elem[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)) + } } } pub async fn from_buffer(data: Bytes, level: Option) -> DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data.into()); + if let Ok(data_compressed) = zstd_encode(&data[..], level) { + return DataBlock::compressed(data_compressed.into()); } } - DataBlock::Plain(data) + DataBlock::plain(data.into()) }) .await .unwrap() } - - pub fn into_parts(self) -> (DataBlockHeader, Bytes) { - match self { - DataBlock::Plain(data) => (DataBlockHeader::Plain, data), - DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), - } - } - - pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { - match h { - DataBlockHeader::Plain => DataBlock::Plain(bytes), - DataBlockHeader::Compressed => DataBlock::Compressed(bytes), - } - } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { diff --git a/src/block/manager.rs b/src/block/manager.rs index 848d9141..817866f6 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,5 +1,4 @@ use std::path::PathBuf; -use std::pin::Pin; use std::sync::Arc; use std::time::Duration; @@ -9,8 +8,6 @@ use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; -use futures::Stream; -use futures_util::stream::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; use tokio::sync::{mpsc, Mutex, MutexGuard}; @@ -20,7 +17,7 @@ use opentelemetry::{ Context, }; -use garage_net::stream::{stream_asyncread, ByteStream}; +use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream}; use garage_db as db; @@ -53,9 +50,6 @@ pub const INLINE_THRESHOLD: usize = 3072; // to delete the block locally. pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); -pub type BlockStream = - Pin> + Send + Sync + 'static>>; - /// RPC messages used to share blocks of data between nodes #[derive(Debug, Serialize, Deserialize)] pub enum BlockRpc { @@ -235,11 +229,9 @@ impl BlockManager { &self, hash: &Hash, order_tag: Option, - ) -> Result<(DataBlockHeader, ByteStream), Error> { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { - Ok((header, stream)) - }) - .await + ) -> Result { + self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,10 +241,12 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move { + self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await - .map(|data| DataBlock::from_parts(header, data)) + .err_context("error in block data stream") + .map(|data| DataBlock::from_parts(header, data.into_bytes())) }) .await } @@ -264,7 +258,7 @@ impl BlockManager { f: F, ) -> Result where - F: Fn(DataBlockHeader, ByteStream) -> Fut, + F: Fn(DataBlockStream) -> Fut, Fut: futures::Future>, { let who = self.replication.read_nodes(hash); @@ -286,8 +280,8 @@ impl BlockManager { continue; } }; - let (header, stream) = match res.into_parts() { - (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream), + let block_stream = match res.into_parts() { + (Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream), (Ok(_), _) => { debug!("Get block {:?}: node {:?} returned a malformed response", hash, node); continue; @@ -297,7 +291,7 @@ impl BlockManager { continue; } }; - match f(header, stream).await { + match f(block_stream).await { Ok(ret) => return Ok(ret), Err(e) => { debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e); @@ -321,14 +315,14 @@ impl BlockManager { // ---- Public interface ---- - /// Ask nodes that might have a block for it, - /// return it as a stream + /// Ask nodes that might have a block for it, return it as a stream pub async fn rpc_get_block_streaming( &self, hash: &Hash, order_tag: Option, - ) -> Result { - let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + ) -> Result { + let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Compressed => { @@ -341,15 +335,14 @@ impl BlockManager { } } - /// Ask nodes that might have a block for it + /// Ask nodes that might have a block for it, return it as one big Bytes pub async fn rpc_get_block( &self, hash: &Hash, order_tag: Option, ) -> Result { - self.rpc_get_raw_block(hash, order_tag) - .await? - .verify_get(*hash) + let stream = self.rpc_get_block_streaming(hash, order_tag).await?; + Ok(read_stream_to_end(stream).await?.into_bytes()) } /// Send block to nodes that should have it @@ -482,7 +475,7 @@ impl BlockManager { stream: Option, ) -> Result<(), Error> { let stream = stream.ok_or_message("missing stream")?; - let bytes = read_stream_to_end(stream).await?; + let bytes = read_stream_to_end(stream).await?.into_bytes(); let data = DataBlock::from_parts(header, bytes); self.write_block(&hash, &data).await } @@ -553,10 +546,7 @@ impl BlockManager { hash: &Hash, block_path: &DataBlockPath, ) -> Result { - let (path, compressed) = match block_path { - DataBlockPath::Plain(p) => (p, false), - DataBlockPath::Compressed(p) => (p, true), - }; + let (header, path) = block_path.as_parts_ref(); let mut f = fs::File::open(&path).await?; let mut data = vec![]; @@ -564,11 +554,7 @@ impl BlockManager { self.metrics.bytes_read.add(data.len() as u64); drop(f); - let data = if compressed { - DataBlock::Compressed(data.into()) - } else { - DataBlock::Plain(data.into()) - }; + let data = DataBlock::from_parts(header, data.into()); if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); @@ -621,20 +607,20 @@ impl BlockManager { // first and then a compressed one (as compression may have been // previously enabled). if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } } else { path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } path.set_extension(""); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } } } @@ -704,8 +690,8 @@ impl BlockManagerLocked { mgr: &BlockManager, existing_path: Option, ) -> Result<(), Error> { - let compressed = data.is_compressed(); - let data = data.inner_buffer(); + let (header, data) = data.as_parts_ref(); + let compressed = header.is_compressed(); let directory = mgr.data_layout.load().primary_block_dir(hash); @@ -715,24 +701,25 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (existing_path, compressed) { + let existing_info = existing_path.map(|x| x.into_parts()); + let to_delete = match (existing_info, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path - (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), - (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p), // If the block is already stored not compressed but we have a compressed // copy, write the compressed copy and delete the uncompressed one - (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path), // If the block is already stored compressed, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some((DataBlockHeader::Compressed, _)), _) => return Ok(()), // If the block is already stored not compressed, // and we don't have a compressed copy either, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some((DataBlockHeader::Plain, _)), false) => return Ok(()), // If the block isn't stored already, just store what is given to us (None, _) => None, @@ -784,18 +771,14 @@ impl BlockManagerLocked { } async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { - let (path, path2) = match block_path { - DataBlockPath::Plain(p) => { - let mut p2 = p.clone(); - p2.set_extension("corrupted"); - (p, p2) - } - DataBlockPath::Compressed(p) => { - let mut p2 = p.clone(); - p2.set_extension("zst.corrupted"); - (p, p2) - } - }; + let (header, path) = block_path.as_parts_ref(); + + let mut path2 = path.clone(); + if header.is_compressed() { + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) @@ -805,9 +788,7 @@ impl BlockManagerLocked { let rc = mgr.rc.get_block_rc(hash)?; if rc.is_deletable() { while let Some(path) = mgr.find_block(hash).await { - let path = match path { - DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, - }; + let (_header, path) = path.as_parts_ref(); fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } @@ -824,24 +805,10 @@ impl BlockManagerLocked { let data = mgr.read_block_from(hash, &wrong_path).await?; self.write_block_inner(hash, &data, mgr, Some(wrong_path)) .await?; - Ok(data.inner_buffer().len()) + Ok(data.as_parts_ref().1.len()) } } -async fn read_stream_to_end(mut stream: ByteStream) -> Result { - let mut parts: Vec = vec![]; - while let Some(part) = stream.next().await { - parts.push(part.ok_or_message("error in stream")?); - } - - Ok(parts - .iter() - .map(|x| &x[..]) - .collect::>() - .concat() - .into()) -} - struct DeleteOnDrop(Option); impl DeleteOnDrop { diff --git a/src/block/repair.rs b/src/block/repair.rs index 77ee0d14..2c8acbc9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -584,8 +584,8 @@ impl Worker for RebalanceWorker { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.ancestors().all(|x| x != prim_loc) { let block_path = match path.extension() { - None => DataBlockPath::Plain(path.clone()), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), + None => DataBlockPath::plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy); diff --git a/src/net/bytes_buf.rs b/src/net/bytes_buf.rs index 3929a860..1d928ffb 100644 --- a/src/net/bytes_buf.rs +++ b/src/net/bytes_buf.rs @@ -3,6 +3,8 @@ use std::collections::VecDeque; use bytes::BytesMut; +use crate::stream::ByteStream; + pub use bytes::Bytes; /// A circular buffer of bytes, internally represented as a list of Bytes @@ -119,6 +121,17 @@ impl BytesBuf { pub fn into_slices(self) -> VecDeque { self.buf } + + /// Return the entire buffer concatenated into a single big Bytes + pub fn into_bytes(mut self) -> Bytes { + self.take_all() + } + + /// Return the content as a stream of individual chunks + pub fn into_stream(self) -> ByteStream { + use futures::stream::StreamExt; + Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x))) + } } impl Default for BytesBuf { diff --git a/src/net/stream.rs b/src/net/stream.rs index 88c3fed4..3ac6896d 100644 --- a/src/net/stream.rs +++ b/src/net/stream.rs @@ -200,3 +200,14 @@ pub fn asyncread_stream(reader: R) -> Byte pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { tokio_util::io::StreamReader::new(stream) } + +/// Reads all of the content of a `ByteStream` into a BytesBuf +/// that contains everything +pub async fn read_stream_to_end(mut stream: ByteStream) -> Result { + let mut buf = BytesBuf::new(); + while let Some(part) = stream.next().await { + buf.extend(part?); + } + + Ok(buf) +}