diff --git a/Cargo.lock b/Cargo.lock index c97968ef..13b97d2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,9 @@ name = "cc" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -429,6 +432,7 @@ dependencies = [ "serde_bytes", "sled", "tokio", + "zstd", ] [[package]] @@ -779,6 +783,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.49" @@ -1769,3 +1782,32 @@ name = "xxhash-rust" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded" + +[[package]] +name = "zstd" +version = "0.6.1+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de55e77f798f205d8561b8fe2ef57abfb6e0ff2abe7fd3c089e119cdb5631a3" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "3.0.1+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1387cabcd938127b30ce78c4bf00b30387dddf704e3f0881dbc4ff62b5566f8c" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.4.20+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd5b733d7cf2d9447e2c3e76a5589b4f5e5ae065c22a2bc0b023cbc331b6c8e" +dependencies = [ + "cc", + "libc", +] diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 77084531..1b97e4f9 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -21,8 +21,8 @@ arc-swap = "1.0" hex = "0.4" log = "0.4" rand = "0.8" - sled = "0.34" +zstd = "0.6.1" rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/model/block.rs b/src/model/block.rs index 5f428fe1..272bd884 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode}; use garage_util::data::*; use garage_util::error::Error; @@ -43,6 +44,7 @@ pub enum Message { /// Message to send a block of data, either because requested, of for first delivery of new /// block PutBlock(PutBlockMessage), + PutCompressedBlock(PutBlockMessage), /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block @@ -134,7 +136,8 @@ impl BlockManager { async fn handle(self: Arc, msg: &Message) -> Result { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await, + Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -157,15 +160,23 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + pub async fn write_block( + &self, + hash: &Hash, + data: &[u8], + compressed: bool, + ) -> Result { + if self.is_block_compressed(hash).await.is_ok() { + return Ok(Message::Ok); + } let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + if compressed { + path.set_extension("zst"); } let mut f = fs::File::create(path).await?; @@ -176,35 +187,61 @@ impl BlockManager { } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); + pub async fn read_block(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { + let mut data = vec![]; + let block = match self.is_block_compressed(hash).await { + Ok(false) => { + let f = fs::File::open(&path).await; + f.map(|f| (f, false)).map_err(Into::into) + } + Ok(true) => { + path.set_extension("zst"); + let f = fs::File::open(&path).await; + f.map(|f| (f, true)).map_err(Into::into) + } + Err(e) => Err(e), + }; + let (mut f, compressed) = match block { + Ok(ok) => ok, + e => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, Duration::from_millis(0))?; - return Err(Into::into(e)); + e? } }; - let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let sum = if compressed { + zstd_decode(&data[..]) + .ok() + .map(|decompressed| blake2sum(&decompressed[..])) + } else { + Some(blake2sum(&data[..])) + }; + if sum.is_none() || sum.unwrap() != *hash { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + if compressed { + Ok(Message::PutCompressedBlock(PutBlockMessage { + hash: *hash, + data, + })) + } else { + Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + } } /// Check if this node should have a block, but don't actually have it @@ -215,14 +252,22 @@ impl BlockManager { .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); Ok(!exists) } else { Ok(false) } } + async fn is_block_compressed(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); + if fs::metadata(&path).await.is_ok() { + return Ok(false); + } + path.set_extension("zst"); + fs::metadata(&path).await.map(|_| true).map_err(Into::into) + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -323,7 +368,7 @@ impl BlockManager { let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); let needed = self .rc .get(hash.as_ref())? @@ -402,15 +447,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; + let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block_data[..], compressed).await?; } Ok(()) } - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec, bool), Error> { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -424,8 +468,10 @@ impl BlockManager { .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); + match resp { + Message::PutBlock(msg) => return Ok((msg.data, false)), + Message::PutCompressedBlock(msg) => return Ok((msg.data, true)), + _ => {} } } Err(Error::Message(format!( @@ -434,13 +480,36 @@ impl BlockManager { ))) } + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + self.rpc_get_raw_block(hash) + .await + .and_then(|(data, compressed)| { + if compressed { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) + } else { + Ok(data) + } + }) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + let garage = self.garage.load_full().unwrap(); + let compressed = zstd_encode(&data[..], garage.config.compression_level); + let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { + Message::PutCompressedBlock(PutBlockMessage { + hash, + data: compressed.unwrap(), + }) + } else { + Message::PutBlock(PutBlockMessage { hash, data }) + }; let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), + message, RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) diff --git a/src/util/config.rs b/src/util/config.rs index bb70467b..ab9cae6c 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -45,6 +45,10 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Zstd compression level used on data blocks + #[serde(default)] + pub compression_level: i32, + /// Configuration for RPC TLS pub rpc_tls: Option,