diff --git a/Cargo.lock b/Cargo.lock index c97968ef..13b97d2c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -95,6 +95,9 @@ name = "cc" version = "1.0.67" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd" +dependencies = [ + "jobserver", +] [[package]] name = "cfg-if" @@ -429,6 +432,7 @@ dependencies = [ "serde_bytes", "sled", "tokio", + "zstd", ] [[package]] @@ -779,6 +783,15 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736" +[[package]] +name = "jobserver" +version = "0.1.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2" +dependencies = [ + "libc", +] + [[package]] name = "js-sys" version = "0.3.49" @@ -1769,3 +1782,32 @@ name = "xxhash-rust" version = "0.8.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded" + +[[package]] +name = "zstd" +version = "0.6.1+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5de55e77f798f205d8561b8fe2ef57abfb6e0ff2abe7fd3c089e119cdb5631a3" +dependencies = [ + "zstd-safe", +] + +[[package]] +name = "zstd-safe" +version = "3.0.1+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1387cabcd938127b30ce78c4bf00b30387dddf704e3f0881dbc4ff62b5566f8c" +dependencies = [ + "libc", + "zstd-sys", +] + +[[package]] +name = "zstd-sys" +version = "1.4.20+zstd.1.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd5b733d7cf2d9447e2c3e76a5589b4f5e5ae065c22a2bc0b023cbc331b6c8e" +dependencies = [ + "cc", + "libc", +] diff --git a/script/test-smoke.sh b/script/test-smoke.sh index a2ffcea1..05286c65 100755 --- a/script/test-smoke.sh +++ b/script/test-smoke.sh @@ -23,39 +23,43 @@ dd if=/dev/urandom of=/tmp/garage.1.rnd bs=1k count=2 # < INLINE_THRESHOLD = 307 dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5 dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10 +dd if=/dev/urandom bs=1k count=2 | base64 -w0 > /tmp/garage.1.b64 +dd if=/dev/urandom bs=1M count=5 | base64 -w0 > /tmp/garage.2.b64 +dd if=/dev/urandom bs=1M count=10 | base64 -w0 > /tmp/garage.3.b64 + echo "s3 api testing..." -for idx in $(seq 1 3); do +for idx in {1,2,3}.{rnd,b64}; do # AWS sends - awsgrg cp /tmp/garage.$idx.rnd s3://eprouvette/garage.$idx.aws + awsgrg cp /tmp/garage.$idx s3://eprouvette/garage.$idx.aws awsgrg ls s3://eprouvette awsgrg cp s3://eprouvette/garage.$idx.aws /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl s3grg get s3://eprouvette/garage.$idx.aws /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl awsgrg rm s3://eprouvette/garage.$idx.aws # S3CMD sends - s3grg put /tmp/garage.$idx.rnd s3://eprouvette/garage.$idx.s3cmd + s3grg put /tmp/garage.$idx s3://eprouvette/garage.$idx.s3cmd s3grg ls s3://eprouvette s3grg get s3://eprouvette/garage.$idx.s3cmd /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl awsgrg cp s3://eprouvette/garage.$idx.s3cmd /tmp/garage.$idx.dl - diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl + diff /tmp/garage.$idx /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl s3grg rm s3://eprouvette/garage.$idx.s3cmd done -rm /tmp/garage.{1,2,3}.rnd +rm /tmp/garage.{1,2,3}.{rnd,b64} echo "website testing" echo "

hello world

" > /tmp/garage-index.html diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 77084531..1b97e4f9 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -21,8 +21,8 @@ arc-swap = "1.0" hex = "0.4" log = "0.4" rand = "0.8" - sled = "0.34" +zstd = "0.6.1" rmp-serde = "0.15" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/model/block.rs b/src/model/block.rs index 5f428fe1..699ff32d 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::Error; @@ -42,22 +43,37 @@ pub enum Message { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), + PutBlock { + hash: Hash, + data: BlockData, + }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, +pub enum BlockData { + Plain(#[serde(with = "serde_bytes")] Vec), + Compressed(#[serde(with = "serde_bytes")] Vec), +} - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec, +impl BlockData { + pub fn is_compressed(&self) -> bool { + match self { + BlockData::Plain(_) => false, + BlockData::Compressed(_) => true, + } + } + + pub fn buffer(&self) -> &Vec { + match self { + BlockData::Plain(b) => b, + BlockData::Compressed(b) => b, + } + } } impl RpcMessage for Message {} @@ -134,7 +150,7 @@ impl BlockManager { async fn handle(self: Arc, msg: &Message) -> Result { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await, + Message::PutBlock { hash, data } => self.write_block(&hash, &data).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -157,54 +173,96 @@ impl BlockManager { } /// Write a block to disk - async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { + pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result { + let mut path = self.block_dir(hash); + let _lock = self.data_dir_lock.lock().await; - let mut path = self.block_dir(hash); - fs::create_dir_all(&path).await?; + let clean_plain = match self.is_block_compressed(hash).await { + Ok(true) => return Ok(Message::Ok), + Ok(false) if !data.is_compressed() => return Ok(Message::Ok), // we have a plain block, and the provided block is not compressed either + Ok(false) => true, + Err(_) => false, + }; + fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if fs::metadata(&path).await.is_ok() { - return Ok(Message::Ok); + + if data.is_compressed() { + path.set_extension("zst"); } - let mut f = fs::File::create(path).await?; - f.write_all(data).await?; + let buffer = data.buffer(); + + let mut f = fs::File::create(path.clone()).await?; + f.write_all(&buffer).await?; + + if clean_plain { + path.set_extension(""); + fs::remove_file(path).await?; + } drop(f); Ok(Message::Ok) } /// Read block from disk, verifying it's integrity - async fn read_block(&self, hash: &Hash) -> Result { - let path = self.block_path(hash); + pub async fn read_block(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); - let mut f = match fs::File::open(&path).await { - Ok(f) => f, - Err(e) => { + let mut data = vec![]; + let block = match self.is_block_compressed(hash).await { + Ok(false) => { + let f = fs::File::open(&path).await; + f.map(|f| (f, false)).map_err(Into::into) + } + Ok(true) => { + path.set_extension("zst"); + let f = fs::File::open(&path).await; + f.map(|f| (f, true)).map_err(Into::into) + } + Err(e) => Err(e), + }; + let (mut f, compressed) = match block { + Ok(ok) => ok, + e => { // Not found but maybe we should have had it ?? self.put_to_resync(hash, Duration::from_millis(0))?; - return Err(Into::into(e)); + e? } }; - let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); - if blake2sum(&data[..]) != *hash { + let sum_ok = if compressed { + zstd_check_checksum(&data[..]) + } else { + blake2sum(&data[..]) == *hash + }; + if !sum_ok { let _lock = self.data_dir_lock.lock().await; warn!( "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", hash ); let mut path2 = path.clone(); - path2.set_extension(".corrupted"); + path2.set_extension("corrupted"); fs::rename(path, path2).await?; self.put_to_resync(&hash, Duration::from_millis(0))?; return Err(Error::CorruptData(*hash)); } - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + if compressed { + Ok(Message::PutBlock { + hash: *hash, + data: BlockData::Compressed(data), + }) + } else { + Ok(Message::PutBlock { + hash: *hash, + data: BlockData::Plain(data), + }) + } } /// Check if this node should have a block, but don't actually have it @@ -215,14 +273,23 @@ impl BlockManager { .map(|x| u64_from_be_bytes(x) > 0) .unwrap_or(false); if needed { - let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); Ok(!exists) } else { Ok(false) } } + async fn is_block_compressed(&self, hash: &Hash) -> Result { + let mut path = self.block_path(hash); + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Ok(true); + } + path.set_extension(""); + fs::metadata(&path).await.map(|_| false).map_err(Into::into) + } + fn block_dir(&self, hash: &Hash) -> PathBuf { let mut path = self.data_dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); @@ -323,7 +390,7 @@ impl BlockManager { let path = self.block_path(hash); - let exists = fs::metadata(&path).await.is_ok(); + let exists = self.is_block_compressed(hash).await.is_ok(); let needed = self .rc .get(hash.as_ref())? @@ -402,15 +469,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let block_data = self.rpc_get_block(&hash).await?; - self.write_block(hash, &block_data[..]).await?; + let block = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block).await?; } Ok(()) } - /// Ask nodes that might have a block for it - pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -424,8 +490,9 @@ impl BlockManager { .await?; for resp in resps { - if let Message::PutBlock(msg) = resp { - return Ok(msg.data); + match resp { + Message::PutBlock { data, .. } => return Ok(data), + _ => {} } } Err(Error::Message(format!( @@ -434,13 +501,38 @@ impl BlockManager { ))) } + /// Ask nodes that might have a block for it + pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { + self.rpc_get_raw_block(hash) + .await + .and_then(|data| match data { + BlockData::Plain(data) => Ok(data), + BlockData::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) + } + }) + } + /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + let garage = self.garage.load_full().unwrap(); + let compressed = zstd_encode(&data[..], garage.config.compression_level); + let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { + Message::PutBlock { + hash, + data: BlockData::Compressed(compressed.unwrap()), + } + } else { + Message::PutBlock { + hash, + data: BlockData::Plain(data), + } + }; let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], - Message::PutBlock(PutBlockMessage { hash, data }), + message, RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) @@ -537,3 +629,16 @@ fn u64_from_be_bytes>(bytes: T) -> u64 { x8.copy_from_slice(bytes.as_ref()); u64::from_be_bytes(x8) } + +fn zstd_check_checksum(source: R) -> bool { + zstd::stream::copy_decode(source, std::io::sink()).is_ok() +} + +fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { + let mut result = Vec::::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} diff --git a/src/util/config.rs b/src/util/config.rs index bb70467b..29901d46 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -45,6 +45,10 @@ pub struct Config { #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, + /// Zstd compression level used on data blocks + #[serde(default)] + pub compression_level: i32, + /// Configuration for RPC TLS pub rpc_tls: Option,