From d611054b5f5cdec379f6cbb15ee6258e8da41356 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 14 Dec 2021 18:20:11 +0100 Subject: [PATCH] add compressed data block --- Cargo.lock | 42 ++++++++++ src/model/Cargo.toml | 1 + src/model/block.rs | 183 ++++++++++++++++++++++++++++++++++--------- 3 files changed, 191 insertions(+), 35 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8f81c098..c07acac2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -106,6 +106,9 @@ name = "cc" version = "1.0.71" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -458,6 +461,7 @@ dependencies = [ "serde_bytes", "sled", "tokio", + "zstd", ] [[package]] @@ -752,6 +756,15 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" +[[package]] +name = "jobserver" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa" +dependencies = [ + "libc", +] + [[package]] name = "kuska-handshake" version = "0.2.0" @@ -1652,3 +1665,32 @@ name = "xxhash-rust" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded" + +[[package]] +name = "zstd" +version = "0.9.0+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "4.1.1+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.6.1+zstd.1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33" +dependencies = [ + "cc", + "libc", +] diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 7979a79a..48b74991 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -23,6 +23,7 @@ arc-swap = "1.0" hex = "0.4" log = "0.4" rand = "0.8" +zstd = "0.9" sled = "0.34" diff --git a/src/model/block.rs b/src/model/block.rs index 8b1919bb..97cfb6bf 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; @@ -55,22 +56,81 @@ pub enum BlockRpc { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), + PutBlock { + hash: Hash, + data: DataBlock, + }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, +pub enum DataBlock { + /// Uncompressed data + Plain(#[serde(with = "serde_bytes")] Vec), + /// Data compressed with zstd + Compressed(#[serde(with = "serde_bytes")] Vec), +} - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec, +impl DataBlock { + /// Query whether this block is compressed + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlock::Compressed(_)) + } + + /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] + /// instead + pub fn inner_buffer(&self) -> &[u8] { + use DataBlock::*; + let (Plain(ref res) | Compressed(ref res)) = self; + res + } + + /// Get the buffer, possibly decompressing it, and verify it's integrity. + /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system + /// is used instead. + pub fn verify_get(self, hash: Hash) -> Result, Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(&data) == hash { + Ok(data) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) + } + } + } + + /// Verify data integrity. Allocate less than [`verify_get`] and don't consume self, but + /// does not return the buffer content. + pub fn verify(&self, hash: Hash) -> Result<(), Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(&data) == hash { + Ok(()) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)), + } + } + + pub fn from_buffer(data: Vec, level: Option) -> DataBlock { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } + } + DataBlock::Plain(data) + } } impl Rpc for BlockRpc { @@ -138,10 +198,8 @@ impl BlockManager { block_manager } - // ---- Public interface ---- - - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + /// Ask nodes that might have a (possibly compressed) block for it + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(hash); let resps = self .system @@ -158,8 +216,8 @@ impl BlockManager { .await?; for resp in resps { - if let BlockRpc::PutBlock(msg) = resp { - return Ok(msg.data); + if let BlockRpc::PutBlock { data, .. } = resp { + return Ok(data); } } Err(Error::Message(format!( @@ -168,15 +226,23 @@ impl BlockManager { ))) } + // ---- Public interface ---- + + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + self.rpc_get_raw_block(hash).await?.verify_get(*hash) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); + let data = DataBlock::from_buffer(data, None); // TODO get compression level from somewhere self.system .rpc .try_call_many( &self.endpoint, &who[..], - BlockRpc::PutBlock(PutBlockMessage { hash, data }), + BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), @@ -306,7 +372,7 @@ impl BlockManager { // ---- Reading and writing blocks locally ---- /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result { self.mutation_lock .lock() .await @@ -316,21 +382,31 @@ impl BlockManager { /// Read block from disk, verifying it's integrity async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); - - let mut f = match fs::File::open(&path).await { - Ok(f) => f, + let mut path = self.block_path(hash); + let compressed = match self.is_block_compressed(hash).await { + Ok(c) => c, Err(e) => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; return Err(Into::into(e)); } }; + if compressed { + path.set_extension("zst"); + } + let mut f = fs::File::open(&path).await?; + let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let data = if compressed { + DataBlock::Compressed(data) + } else { + DataBlock::Plain(data) + }; + + if data.verify(*hash).is_err() { self.mutation_lock .lock() .await @@ -340,7 +416,7 @@ impl BlockManager { return Err(Error::CorruptData(*hash)); } - Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(BlockRpc::PutBlock { hash: *hash, data }) } /// Check if this node should have a block, but don't actually have it @@ -362,13 +438,25 @@ impl BlockManager { path } - /// Utility: give the full path where a block should be found + /// Utility: give the full path where a block should be found, minus extension if block is + /// compressed fn block_path(&self, hash: &Hash) -> PathBuf { let mut path = self.block_dir(hash); path.push(hex::encode(hash.as_ref())); path } + /// Utility: check if block is stored compressed. Error if block is not stored + async fn is_block_compressed(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Ok(true); + } + path.set_extension(""); + fs::metadata(&path).await.map(|_| false).map_err(Into::into) + } + // ---- Resync loop ---- pub fn spawn_background_worker(self: Arc) { @@ -550,8 +638,8 @@ impl BlockManager { hash ); - let block_data = self.rpc_get_block(hash).await?; - self.write_block(hash, &block_data[..]).await?; + let block_data = self.rpc_get_raw_block(hash).await?; + self.write_block(hash, &block_data).await?; } Ok(()) @@ -631,7 +719,7 @@ impl EndpointHandler for BlockManager { _from: NodeID, ) -> Result { match message { - BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, BlockRpc::GetBlock(h) => self.read_block(h).await, BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())), @@ -650,9 +738,7 @@ impl BlockManagerLocked { hash: &Hash, mgr: &BlockManager, ) -> Result { - let path = mgr.block_path(hash); - - let exists = fs::metadata(&path).await.is_ok(); + let exists = mgr.is_block_compressed(hash).await.is_ok(); let needed = mgr.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) @@ -661,15 +747,24 @@ impl BlockManagerLocked { async fn write_block( &self, hash: &Hash, - data: &[u8], + data: &DataBlock, mgr: &BlockManager, ) -> Result { + let compressed = data.is_compressed(); + let data = data.inner_buffer(); + let mut path = mgr.block_dir(hash); fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(BlockRpc::Ok); + match mgr.is_block_compressed(hash).await { + Ok(true) => return Ok(BlockRpc::Ok), + Ok(false) if !compressed => return Ok(BlockRpc::Ok), + _ => { + if compressed { + path.set_extension("zst"); + } + } } let mut path2 = path.clone(); @@ -688,9 +783,15 @@ impl BlockManagerLocked { "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); - let path = mgr.block_path(hash); + let mut path = mgr.block_path(hash); let mut path2 = path.clone(); - path2.set_extension("corrupted"); + if mgr.is_block_compressed(hash).await? { + // block marked as corrupted, and absent? That should not happen + path.set_extension("zst"); + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) } @@ -699,7 +800,10 @@ impl BlockManagerLocked { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; if exists && needed.is_deletable() { - let path = mgr.block_path(hash); + let mut path = mgr.block_path(hash); + if mgr.is_block_compressed(hash).await? { + path.set_extension("zst"); + } fs::remove_file(path).await?; } Ok(()) @@ -806,3 +910,12 @@ impl RcEntry { !self.is_deletable() } } + +fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { + let mut result = Vec::::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +}