Compress with zstd #44
4 changed files with 140 additions and 25 deletions
42
Cargo.lock
generated
42
Cargo.lock
generated
|
@ -95,6 +95,9 @@ name = "cc"
|
|||
version = "1.0.67"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e3c69b077ad434294d3ce9f1f6143a2a4b89a8a2d54ef813d85003a4fd1137fd"
|
||||
dependencies = [
|
||||
"jobserver",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "cfg-if"
|
||||
|
@ -429,6 +432,7 @@ dependencies = [
|
|||
"serde_bytes",
|
||||
"sled",
|
||||
"tokio",
|
||||
"zstd",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
|
@ -779,6 +783,15 @@ version = "0.4.7"
|
|||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "dd25036021b0de88a0aff6b850051563c6516d0bf53f8638938edbb9de732736"
|
||||
|
||||
[[package]]
|
||||
name = "jobserver"
|
||||
version = "0.1.21"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5c71313ebb9439f74b00d9d2dcec36440beaf57a6aa0623068441dd7cd81a7f2"
|
||||
dependencies = [
|
||||
"libc",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "js-sys"
|
||||
version = "0.3.49"
|
||||
|
@ -1769,3 +1782,32 @@ name = "xxhash-rust"
|
|||
version = "0.8.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded"
|
||||
|
||||
[[package]]
|
||||
name = "zstd"
|
||||
version = "0.6.1+zstd.1.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "5de55e77f798f205d8561b8fe2ef57abfb6e0ff2abe7fd3c089e119cdb5631a3"
|
||||
dependencies = [
|
||||
"zstd-safe",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-safe"
|
||||
version = "3.0.1+zstd.1.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "1387cabcd938127b30ce78c4bf00b30387dddf704e3f0881dbc4ff62b5566f8c"
|
||||
dependencies = [
|
||||
"libc",
|
||||
"zstd-sys",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "zstd-sys"
|
||||
version = "1.4.20+zstd.1.4.9"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "ebd5b733d7cf2d9447e2c3e76a5589b4f5e5ae065c22a2bc0b023cbc331b6c8e"
|
||||
dependencies = [
|
||||
"cc",
|
||||
"libc",
|
||||
]
|
||||
|
|
|
@ -21,8 +21,8 @@ arc-swap = "1.0"
|
|||
hex = "0.4"
|
||||
log = "0.4"
|
||||
rand = "0.8"
|
||||
|
||||
sled = "0.34"
|
||||
zstd = "0.6.1"
|
||||
|
||||
rmp-serde = "0.15"
|
||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||
|
|
|
@ -9,6 +9,7 @@ use serde::{Deserialize, Serialize};
|
|||
use tokio::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||
use tokio::sync::{watch, Mutex, Notify};
|
||||
use zstd::stream::{decode_all as zstd_decode, encode_all as zstd_encode};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error;
|
||||
|
@ -43,6 +44,7 @@ pub enum Message {
|
|||
/// Message to send a block of data, either because requested, of for first delivery of new
|
||||
/// block
|
||||
PutBlock(PutBlockMessage),
|
||||
PutCompressedBlock(PutBlockMessage),
|
||||
/// Ask other node if they should have this block, but don't actually have it
|
||||
NeedBlockQuery(Hash),
|
||||
/// Response : whether the node do require that block
|
||||
|
@ -134,7 +136,8 @@ impl BlockManager {
|
|||
|
||||
async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
|
||||
match msg {
|
||||
Message::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
|
||||
Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await,
|
||||
Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await,
|
||||
Message::GetBlock(h) => self.read_block(h).await,
|
||||
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
|
||||
_ => Err(Error::BadRPC(format!("Unexpected RPC message"))),
|
||||
|
@ -157,15 +160,23 @@ impl BlockManager {
|
|||
}
|
||||
|
||||
/// Write a block to disk
|
||||
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
||||
pub async fn write_block(
|
||||
&self,
|
||||
hash: &Hash,
|
||||
data: &[u8],
|
||||
lx marked this conversation as resolved
Outdated
|
||||
compressed: bool,
|
||||
) -> Result<Message, Error> {
|
||||
if self.is_block_compressed(hash).await.is_ok() {
|
||||
return Ok(Message::Ok);
|
||||
lx marked this conversation as resolved
Outdated
lx
commented
Taking the lock should probably be the first thing in the function because Taking the lock should probably be the first thing in the function because `is_block_compressed` manipulates the data directory
|
||||
}
|
||||
let _lock = self.data_dir_lock.lock().await;
|
||||
|
||||
let mut path = self.block_dir(hash);
|
||||
fs::create_dir_all(&path).await?;
|
||||
|
||||
path.push(hex::encode(hash));
|
||||
if fs::metadata(&path).await.is_ok() {
|
||||
return Ok(Message::Ok);
|
||||
if compressed {
|
||||
path.set_extension("zst");
|
||||
}
|
||||
|
||||
let mut f = fs::File::create(path).await?;
|
||||
|
@ -176,35 +187,61 @@ impl BlockManager {
|
|||
}
|
||||
|
||||
/// Read block from disk, verifying it's integrity
|
||||
async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
||||
let path = self.block_path(hash);
|
||||
pub async fn read_block(&self, hash: &Hash) -> Result<Message, Error> {
|
||||
let mut path = self.block_path(hash);
|
||||
|
||||
let mut f = match fs::File::open(&path).await {
|
||||
Ok(f) => f,
|
||||
Err(e) => {
|
||||
let mut data = vec![];
|
||||
let block = match self.is_block_compressed(hash).await {
|
||||
Ok(false) => {
|
||||
let f = fs::File::open(&path).await;
|
||||
f.map(|f| (f, false)).map_err(Into::into)
|
||||
}
|
||||
Ok(true) => {
|
||||
path.set_extension("zst");
|
||||
let f = fs::File::open(&path).await;
|
||||
f.map(|f| (f, true)).map_err(Into::into)
|
||||
}
|
||||
Err(e) => Err(e),
|
||||
};
|
||||
let (mut f, compressed) = match block {
|
||||
Ok(ok) => ok,
|
||||
e => {
|
||||
// Not found but maybe we should have had it ??
|
||||
self.put_to_resync(hash, Duration::from_millis(0))?;
|
||||
return Err(Into::into(e));
|
||||
e?
|
||||
}
|
||||
};
|
||||
let mut data = vec![];
|
||||
f.read_to_end(&mut data).await?;
|
||||
drop(f);
|
||||
|
||||
if blake2sum(&data[..]) != *hash {
|
||||
let sum = if compressed {
|
||||
zstd_decode(&data[..])
|
||||
.ok()
|
||||
.map(|decompressed| blake2sum(&decompressed[..]))
|
||||
} else {
|
||||
Some(blake2sum(&data[..]))
|
||||
};
|
||||
if sum.is_none() || sum.unwrap() != *hash {
|
||||
let _lock = self.data_dir_lock.lock().await;
|
||||
warn!(
|
||||
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
|
||||
hash
|
||||
);
|
||||
let mut path2 = path.clone();
|
||||
path2.set_extension(".corrupted");
|
||||
path2.set_extension("corrupted");
|
||||
fs::rename(path, path2).await?;
|
||||
self.put_to_resync(&hash, Duration::from_millis(0))?;
|
||||
return Err(Error::CorruptData(*hash));
|
||||
}
|
||||
|
||||
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
||||
if compressed {
|
||||
Ok(Message::PutCompressedBlock(PutBlockMessage {
|
||||
hash: *hash,
|
||||
data,
|
||||
}))
|
||||
} else {
|
||||
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
||||
}
|
||||
}
|
||||
|
||||
/// Check if this node should have a block, but don't actually have it
|
||||
|
@ -215,14 +252,22 @@ impl BlockManager {
|
|||
.map(|x| u64_from_be_bytes(x) > 0)
|
||||
.unwrap_or(false);
|
||||
if needed {
|
||||
let path = self.block_path(hash);
|
||||
let exists = fs::metadata(&path).await.is_ok();
|
||||
let exists = self.is_block_compressed(hash).await.is_ok();
|
||||
Ok(!exists)
|
||||
} else {
|
||||
Ok(false)
|
||||
}
|
||||
}
|
||||
|
||||
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
|
||||
let mut path = self.block_path(hash);
|
||||
if fs::metadata(&path).await.is_ok() {
|
||||
return Ok(false);
|
||||
}
|
||||
path.set_extension("zst");
|
||||
fs::metadata(&path).await.map(|_| true).map_err(Into::into)
|
||||
}
|
||||
|
||||
fn block_dir(&self, hash: &Hash) -> PathBuf {
|
||||
let mut path = self.data_dir.clone();
|
||||
path.push(hex::encode(&hash.as_slice()[0..1]));
|
||||
|
@ -323,7 +368,7 @@ impl BlockManager {
|
|||
|
||||
let path = self.block_path(hash);
|
||||
|
||||
let exists = fs::metadata(&path).await.is_ok();
|
||||
let exists = self.is_block_compressed(hash).await.is_ok();
|
||||
let needed = self
|
||||
.rc
|
||||
.get(hash.as_ref())?
|
||||
|
@ -402,15 +447,14 @@ impl BlockManager {
|
|||
// TODO find a way to not do this if they are sending it to us
|
||||
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
|
||||
// between the RC being incremented and this part being called.
|
||||
let block_data = self.rpc_get_block(&hash).await?;
|
||||
self.write_block(hash, &block_data[..]).await?;
|
||||
let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?;
|
||||
self.write_block(hash, &block_data[..], compressed).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Ask nodes that might have a block for it
|
||||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec<u8>, bool), Error> {
|
||||
let who = self.replication.read_nodes(&hash);
|
||||
let resps = self
|
||||
.rpc_client
|
||||
|
@ -424,8 +468,10 @@ impl BlockManager {
|
|||
.await?;
|
||||
|
||||
for resp in resps {
|
||||
if let Message::PutBlock(msg) = resp {
|
||||
return Ok(msg.data);
|
||||
match resp {
|
||||
Message::PutBlock(msg) => return Ok((msg.data, false)),
|
||||
Message::PutCompressedBlock(msg) => return Ok((msg.data, true)),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Err(Error::Message(format!(
|
||||
|
@ -434,13 +480,36 @@ impl BlockManager {
|
|||
)))
|
||||
}
|
||||
|
||||
/// Ask nodes that might have a block for it
|
||||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||
self.rpc_get_raw_block(hash)
|
||||
.await
|
||||
.and_then(|(data, compressed)| {
|
||||
if compressed {
|
||||
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash))
|
||||
} else {
|
||||
Ok(data)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/// Send block to nodes that should have it
|
||||
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
||||
let garage = self.garage.load_full().unwrap();
|
||||
let compressed = zstd_encode(&data[..], garage.config.compression_level);
|
||||
let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() {
|
||||
Message::PutCompressedBlock(PutBlockMessage {
|
||||
hash,
|
||||
data: compressed.unwrap(),
|
||||
})
|
||||
} else {
|
||||
Message::PutBlock(PutBlockMessage { hash, data })
|
||||
};
|
||||
let who = self.replication.write_nodes(&hash);
|
||||
self.rpc_client
|
||||
.try_call_many(
|
||||
&who[..],
|
||||
Message::PutBlock(PutBlockMessage { hash, data }),
|
||||
message,
|
||||
RequestStrategy::with_quorum(self.replication.write_quorum())
|
||||
.with_timeout(BLOCK_RW_TIMEOUT),
|
||||
)
|
||||
|
|
|
@ -45,6 +45,10 @@ pub struct Config {
|
|||
#[serde(default = "default_replication_factor")]
|
||||
pub data_replication_factor: usize,
|
||||
|
||||
/// Zstd compression level used on data blocks
|
||||
#[serde(default)]
|
||||
pub compression_level: i32,
|
||||
|
||||
/// Configuration for RPC TLS
|
||||
pub rpc_tls: Option<TlsConfig>,
|
||||
|
||||
|
|
Loading…
Reference in a new issue
Maybe we should calculate this checksum on the sending side, i.e. in
rpc_put_block
, and include it inBlockData::Compressed
? Meaning we do it only once, like we do for the uncompressed checksum