From de8e47435f2c08f86def1b26325c7cda46830a19 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 17 Mar 2021 20:37:31 +0100 Subject: [PATCH 1/7] compress blocs with zstd --- Cargo.lock | 42 ++++++++++++++++ src/model/Cargo.toml | 2 +- src/model/block.rs | 117 ++++++++++++++++++++++++++++++++++--------- src/util/config.rs | 4 ++ 4 files changed, 140 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index c97968ef..13b97d2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,9 @@ name = "cc" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -429,6 +432,7 @@ dependencies = [ "serde_bytes", "sled", "tokio", + "zstd", ] [[package]] @@ -779,6 +783,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.49" @@ -1769,3 +1782,32 @@ name = "xxhash-rust" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded" + +[[package]] +name = "zstd" +version = "0.6.1+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de55e77f798f205d8561b8fe2ef57abfb6e0ff2abe7fd3c089e119cdb5631a3" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "3.0.1+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1387cabcd938127b30ce78c4bf00b30387dddf704e3f0881dbc4ff62b5566f8c" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.4.20+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd5b733d7cf2d9447e2c3e76a5589b4f5e5ae065c22a2bc0b023cbc331b6c8e" +dependencies = [ + "cc", + "libc", +] diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 77084531..1b97e4f9 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -21,8 +21,8 @@ arc-swap = "1.0" hex = "0.4" log = "0.4" rand = "0.8" - sled = "0.34" +zstd = "0.6.1" rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/model/block.rs b/src/model/block.rs index 5f428fe1..272bd884 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode}; use garage_util::data::*; use garage_util::error::Error; @@ -43,6 +44,7 @@ pub enum Message { /// Message to send a block of data, either because requested, of for first delivery of new /// block PutBlock(PutBlockMessage), + PutCompressedBlock(PutBlockMessage), /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block @@ -134,7 +136,8 @@ impl BlockManager { async fn handle(self: Arc, msg: &Message) -> Result { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await, + Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -157,15 +160,23 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + pub async fn write_block( + &self, + hash: &Hash, + data: &[u8], + compressed: bool, + ) -> Result { + if self.is_block_compressed(hash).await.is_ok() { + return Ok(Message::Ok); + } let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + if compressed { + path.set_extension("zst"); } let mut f = fs::File::create(path).await?; @@ -176,35 +187,61 @@ impl BlockManager { } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); + pub async fn read_block(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { + let mut data = vec![]; + let block = match self.is_block_compressed(hash).await { + Ok(false) => { + let f = fs::File::open(&path).await; + f.map(|f| (f, false)).map_err(Into::into) + } + Ok(true) => { + path.set_extension("zst"); + let f = fs::File::open(&path).await; + f.map(|f| (f, true)).map_err(Into::into) + } + Err(e) => Err(e), + }; + let (mut f, compressed) = match block { + Ok(ok) => ok, + e => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, Duration::from_millis(0))?; - return Err(Into::into(e)); + e? } }; - let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let sum = if compressed { + zstd_decode(&data[..]) + .ok() + .map(|decompressed| blake2sum(&decompressed[..])) + } else { + Some(blake2sum(&data[..])) + }; + if sum.is_none() || sum.unwrap() != *hash { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + if compressed { + Ok(Message::PutCompressedBlock(PutBlockMessage { + hash: *hash, + data, + })) + } else { + Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + } } /// Check if this node should have a block, but don't actually have it @@ -215,14 +252,22 @@ impl BlockManager { .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); Ok(!exists) } else { Ok(false) } } + async fn is_block_compressed(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); + if fs::metadata(&path).await.is_ok() { + return Ok(false); + } + path.set_extension("zst"); + fs::metadata(&path).await.map(|_| true).map_err(Into::into) + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -323,7 +368,7 @@ impl BlockManager { let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); let needed = self .rc .get(hash.as_ref())? @@ -402,15 +447,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; + let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block_data[..], compressed).await?; } Ok(()) } - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec, bool), Error> { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -424,8 +468,10 @@ impl BlockManager { .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); + match resp { + Message::PutBlock(msg) => return Ok((msg.data, false)), + Message::PutCompressedBlock(msg) => return Ok((msg.data, true)), + _ => {} } } Err(Error::Message(format!( @@ -434,13 +480,36 @@ impl BlockManager { ))) } + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + self.rpc_get_raw_block(hash) + .await + .and_then(|(data, compressed)| { + if compressed { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) + } else { + Ok(data) + } + }) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + let garage = self.garage.load_full().unwrap(); + let compressed = zstd_encode(&data[..], garage.config.compression_level); + let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { + Message::PutCompressedBlock(PutBlockMessage { + hash, + data: compressed.unwrap(), + }) + } else { + Message::PutBlock(PutBlockMessage { hash, data }) + }; let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), + message, RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) diff --git a/src/util/config.rs b/src/util/config.rs index bb70467b..ab9cae6c 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -45,6 +45,10 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Zstd compression level used on data blocks + #[serde(default)] + pub compression_level: i32, + /// Configuration for RPC TLS pub rpc_tls: Option, From 01c623e391a08168fc0dbc7d55d2a60867be71a5 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 17:51:19 +0200 Subject: [PATCH 2/7] use BlockData instead of Vec+bool --- src/model/block.rs | 75 ++++++++++++++++++++++------------------------ 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 272bd884..dd869802 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -43,23 +43,18 @@ pub enum Message { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), - PutCompressedBlock(PutBlockMessage), + PutBlock { hash: Hash, data: BlockData }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, - - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec, +pub enum BlockData { + Plain(#[serde(with = "serde_bytes")] Vec), + Compressed(#[serde(with = "serde_bytes")] Vec), } impl RpcMessage for Message {} @@ -136,8 +131,7 @@ impl BlockManager { async fn handle(self: Arc, msg: &Message) -> Result { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await, - Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await, + Message::PutBlock { hash, data } => self.write_block(&hash, &data).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -160,12 +154,7 @@ impl BlockManager { } /// Write a block to disk - pub async fn write_block( - &self, - hash: &Hash, - data: &[u8], - compressed: bool, - ) -> Result { + pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result { if self.is_block_compressed(hash).await.is_ok() { return Ok(Message::Ok); } @@ -175,12 +164,16 @@ impl BlockManager { fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if compressed { - path.set_extension("zst"); - } + let buffer = match data { + BlockData::Plain(b) => b, + BlockData::Compressed(b) => { + path.set_extension("zst"); + b + } + }; let mut f = fs::File::create(path).await?; - f.write_all(data).await?; + f.write_all(&buffer).await?; drop(f); Ok(Message::Ok) @@ -235,12 +228,15 @@ impl BlockManager { } if compressed { - Ok(Message::PutCompressedBlock(PutBlockMessage { + Ok(Message::PutBlock { hash: *hash, - data, - })) + data: BlockData::Compressed(data), + }) } else { - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(Message::PutBlock { + hash: *hash, + data: BlockData::Plain(data), + }) } } @@ -447,14 +443,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?; - self.write_block(hash, &block_data[..], compressed).await?; + let block = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block).await?; } Ok(()) } - async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec, bool), Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -469,8 +465,7 @@ impl BlockManager { for resp in resps { match resp { - Message::PutBlock(msg) => return Ok((msg.data, false)), - Message::PutCompressedBlock(msg) => return Ok((msg.data, true)), + Message::PutBlock { data, .. } => return Ok(data), _ => {} } } @@ -484,11 +479,10 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { self.rpc_get_raw_block(hash) .await - .and_then(|(data, compressed)| { - if compressed { + .and_then(|data| match data { + BlockData::Plain(data) => Ok(data), + BlockData::Compressed(data) => { zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) - } else { - Ok(data) } }) } @@ -498,12 +492,15 @@ impl BlockManager { let garage = self.garage.load_full().unwrap(); let compressed = zstd_encode(&data[..], garage.config.compression_level); let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { - Message::PutCompressedBlock(PutBlockMessage { + Message::PutBlock { hash, - data: compressed.unwrap(), - }) + data: BlockData::Compressed(compressed.unwrap()), + } } else { - Message::PutBlock(PutBlockMessage { hash, data }) + Message::PutBlock { + hash, + data: BlockData::Plain(data), + } }; let who = self.replication.write_nodes(&hash); self.rpc_client From 9215d63e3afd1d19efaa72d16f5e0afbcb666857 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 19:28:47 +0200 Subject: [PATCH 3/7] add checksum to end of file instead of decompressing --- src/model/block.rs | 48 ++++++++++++++++++++++++++++++---------------- 1 file changed, 31 insertions(+), 17 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index dd869802..ec714fef 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -158,22 +158,30 @@ impl BlockManager { if self.is_block_compressed(hash).await.is_ok() { return Ok(Message::Ok); } - let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); - fs::create_dir_all(&path).await?; - path.push(hex::encode(hash)); - let buffer = match data { - BlockData::Plain(b) => b, + let (buffer, checksum) = match data { + BlockData::Plain(b) => (b, None), BlockData::Compressed(b) => { - path.set_extension("zst"); - b + let checksum = blake2sum(&b); + (b, Some(checksum)) } }; + let _lock = self.data_dir_lock.lock().await; + + fs::create_dir_all(&path).await?; + path.push(hex::encode(hash)); + if checksum.is_some() { + path.set_extension("zst.b2"); + } + let mut f = fs::File::create(path).await?; f.write_all(&buffer).await?; + if let Some(checksum) = checksum { + f.write_all(checksum.as_slice()).await?; + } drop(f); Ok(Message::Ok) @@ -190,7 +198,7 @@ impl BlockManager { f.map(|f| (f, false)).map_err(Into::into) } Ok(true) => { - path.set_extension("zst"); + path.set_extension("zst.b2"); let f = fs::File::open(&path).await; f.map(|f| (f, true)).map_err(Into::into) } @@ -207,14 +215,19 @@ impl BlockManager { f.read_to_end(&mut data).await?; drop(f); - let sum = if compressed { - zstd_decode(&data[..]) - .ok() - .map(|decompressed| blake2sum(&decompressed[..])) + let sum_ok = if compressed { + if data.len() >= 32 { + let data_len = data.len() - 32; + let checksum = data.split_off(data_len); + blake2sum(&data[..]).as_slice() == &checksum + } else { + // the file is too short to be valid + false + } } else { - Some(blake2sum(&data[..])) + blake2sum(&data[..]) == *hash }; - if sum.is_none() || sum.unwrap() != *hash { + if !sum_ok { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", @@ -257,11 +270,12 @@ impl BlockManager { async fn is_block_compressed(&self, hash: &Hash) -> Result { let mut path = self.block_path(hash); + path.set_extension("zst.b2"); if fs::metadata(&path).await.is_ok() { - return Ok(false); + return Ok(true); } - path.set_extension("zst"); - fs::metadata(&path).await.map(|_| true).map_err(Into::into) + path.set_extension(""); + fs::metadata(&path).await.map(|_| false).map_err(Into::into) } fn block_dir(&self, hash: &Hash) -> PathBuf { From e4b66a9e28fc3fd72dbb4d212f733c18d7b2c5a5 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 19:30:05 +0200 Subject: [PATCH 4/7] add testing on compressible data in smoke test --- script/test-smoke.sh | 20 ++++++++++++-------- 1 file changed, 12 insertions(+), 8 deletions(-) diff --git a/script/test-smoke.sh b/script/test-smoke.sh index a2ffcea1..05286c65 100755 --- a/script/test-smoke.sh +++ b/script/test-smoke.sh @@ -23,39 +23,43 @@ dd if=/dev/urandom of=/tmp/garage.1.rnd bs=1k count=2 # < INLINE_THRESHOLD = 307 dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5 dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10 +dd if=/dev/urandom bs=1k count=2 | base64 -w0 > /tmp/garage.1.b64 +dd if=/dev/urandom bs=1M count=5 | base64 -w0 > /tmp/garage.2.b64 +dd if=/dev/urandom bs=1M count=10 | base64 -w0 > /tmp/garage.3.b64 + echo "s3 api testing..." -for idx in $(seq 1 3); do +for idx in {1,2,3}.{rnd,b64}; do # AWS sends - awsgrg cp /tmp/garage.$idx.rnd s3://eprouvette/garage.$idx.aws + awsgrg cp /tmp/garage.$idx s3://eprouvette/garage.$idx.aws awsgrg ls s3://eprouvette awsgrg cp s3://eprouvette/garage.$idx.aws /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl s3grg get s3://eprouvette/garage.$idx.aws /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl awsgrg rm s3://eprouvette/garage.$idx.aws # S3CMD sends - s3grg put /tmp/garage.$idx.rnd s3://eprouvette/garage.$idx.s3cmd + s3grg put /tmp/garage.$idx s3://eprouvette/garage.$idx.s3cmd s3grg ls s3://eprouvette s3grg get s3://eprouvette/garage.$idx.s3cmd /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl awsgrg cp s3://eprouvette/garage.$idx.s3cmd /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl s3grg rm s3://eprouvette/garage.$idx.s3cmd done -rm /tmp/garage.{1,2,3}.rnd +rm /tmp/garage.{1,2,3}.{rnd,b64} echo "website testing" echo "

hello world

" > /tmp/garage-index.html From 4d5263ccf36518e1efec3a37ea5bc285a83fe8d2 Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 20:50:03 +0200 Subject: [PATCH 5/7] change extention for compressed files set_extension don't behave verywell with extensions containing multiple dots --- src/model/block.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index ec714fef..0308c611 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -174,7 +174,7 @@ impl BlockManager { fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); if checksum.is_some() { - path.set_extension("zst.b2"); + path.set_extension("zst_b2"); } let mut f = fs::File::create(path).await?; @@ -198,7 +198,7 @@ impl BlockManager { f.map(|f| (f, false)).map_err(Into::into) } Ok(true) => { - path.set_extension("zst.b2"); + path.set_extension("zst_b2"); let f = fs::File::open(&path).await; f.map(|f| (f, true)).map_err(Into::into) } @@ -270,7 +270,7 @@ impl BlockManager { async fn is_block_compressed(&self, hash: &Hash) -> Result { let mut path = self.block_path(hash); - path.set_extension("zst.b2"); + path.set_extension("zst_b2"); if fs::metadata(&path).await.is_ok() { return Ok(true); } From aeadb071c68ff4309aa35bc19482724c04c4f67e Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Tue, 6 Apr 2021 22:25:58 +0200 Subject: [PATCH 6/7] delete plain block when getting a compressed one --- src/model/block.rs | 25 +++++++++++++++++++++---- 1 file changed, 21 insertions(+), 4 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 0308c611..090ffbc2 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -57,6 +57,15 @@ pub enum BlockData { Compressed(#[serde(with = "serde_bytes")] Vec), } +impl BlockData { + pub fn is_compressed(&self) -> bool { + match self { + BlockData::Plain(_) => false, + BlockData::Compressed(_) => true, + } + } +} + impl RpcMessage for Message {} /// The block manager, handling block exchange between nodes, and block storage on local node @@ -155,9 +164,12 @@ impl BlockManager { /// Write a block to disk pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result { - if self.is_block_compressed(hash).await.is_ok() { - return Ok(Message::Ok); - } + let clean_plain = match self.is_block_compressed(hash).await { + Ok(true) => return Ok(Message::Ok), + Ok(false) if !data.is_compressed() => return Ok(Message::Ok), // we have a plain block, and the provided block is not compressed either + Ok(false) => true, + Err(_) => false, + }; let mut path = self.block_dir(hash); @@ -177,11 +189,16 @@ impl BlockManager { path.set_extension("zst_b2"); } - let mut f = fs::File::create(path).await?; + let mut f = fs::File::create(path.clone()).await?; f.write_all(&buffer).await?; if let Some(checksum) = checksum { f.write_all(checksum.as_slice()).await?; } + + if clean_plain { + path.set_extension(""); + fs::remove_file(path).await?; + } drop(f); Ok(Message::Ok) From d4fd07400008247c475ffe2c66bb2bd57bf7ab5c Mon Sep 17 00:00:00 2001 From: Trinity Pointard Date: Wed, 7 Apr 2021 02:13:26 +0200 Subject: [PATCH 7/7] use zstd checksumming --- src/model/block.rs | 66 ++++++++++++++++++++++++++-------------------- src/util/config.rs | 2 +- 2 files changed, 38 insertions(+), 30 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 090ffbc2..699ff32d 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -9,7 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; -use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::Error; @@ -43,7 +43,10 @@ pub enum Message { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock { hash: Hash, data: BlockData }, + PutBlock { + hash: Hash, + data: BlockData, + }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block @@ -64,6 +67,13 @@ impl BlockData { BlockData::Compressed(_) => true, } } + + pub fn buffer(&self) -> &Vec { + match self { + BlockData::Plain(b) => b, + BlockData::Compressed(b) => b, + } + } } impl RpcMessage for Message {} @@ -164,6 +174,10 @@ impl BlockManager { /// Write a block to disk pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result { + let mut path = self.block_dir(hash); + + let _lock = self.data_dir_lock.lock().await; + let clean_plain = match self.is_block_compressed(hash).await { Ok(true) => return Ok(Message::Ok), Ok(false) if !data.is_compressed() => return Ok(Message::Ok), // we have a plain block, and the provided block is not compressed either @@ -171,29 +185,17 @@ impl BlockManager { Err(_) => false, }; - let mut path = self.block_dir(hash); - - let (buffer, checksum) = match data { - BlockData::Plain(b) => (b, None), - BlockData::Compressed(b) => { - let checksum = blake2sum(&b); - (b, Some(checksum)) - } - }; - - let _lock = self.data_dir_lock.lock().await; - fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if checksum.is_some() { - path.set_extension("zst_b2"); + + if data.is_compressed() { + path.set_extension("zst"); } + let buffer = data.buffer(); + let mut f = fs::File::create(path.clone()).await?; f.write_all(&buffer).await?; - if let Some(checksum) = checksum { - f.write_all(checksum.as_slice()).await?; - } if clean_plain { path.set_extension(""); @@ -215,7 +217,7 @@ impl BlockManager { f.map(|f| (f, false)).map_err(Into::into) } Ok(true) => { - path.set_extension("zst_b2"); + path.set_extension("zst"); let f = fs::File::open(&path).await; f.map(|f| (f, true)).map_err(Into::into) } @@ -233,14 +235,7 @@ impl BlockManager { drop(f); let sum_ok = if compressed { - if data.len() >= 32 { - let data_len = data.len() - 32; - let checksum = data.split_off(data_len); - blake2sum(&data[..]).as_slice() == &checksum - } else { - // the file is too short to be valid - false - } + zstd_check_checksum(&data[..]) } else { blake2sum(&data[..]) == *hash }; @@ -287,7 +282,7 @@ impl BlockManager { async fn is_block_compressed(&self, hash: &Hash) -> Result { let mut path = self.block_path(hash); - path.set_extension("zst_b2"); + path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { return Ok(true); } @@ -634,3 +629,16 @@ fn u64_from_be_bytes>(bytes: T) -> u64 { x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } + +fn zstd_check_checksum(source: R) -> bool { + zstd::stream::copy_decode(source, std::io::sink()).is_ok() +} + +fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { + let mut result = Vec::::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} diff --git a/src/util/config.rs b/src/util/config.rs index ab9cae6c..29901d46 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -45,7 +45,7 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, - /// Zstd compression level used on data blocks + /// Zstd compression level used on data blocks #[serde(default)] pub compression_level: i32,