Compress with zstd #44

Merged
lx merged 7 commits from trinity-1686a/garage:zstd-block into main 2021-04-14 21:27:36 +00:00
Showing only changes of commit 9215d63e3a - Show all commits

View file

@ -158,22 +158,30 @@ impl BlockManager {
if self.is_block_compressed(hash).await.is_ok() {
return Ok(Message::Ok);
}
let _lock = self.data_dir_lock.lock().await;
let mut path = self.block_dir(hash);
fs::create_dir_all(&path).await?;
path.push(hex::encode(hash));
let buffer = match data {
BlockData::Plain(b) => b,
let (buffer, checksum) = match data {
BlockData::Plain(b) => (b, None),
BlockData::Compressed(b) => {
lx marked this conversation as resolved Outdated
Outdated
Review

Maybe we should calculate this checksum on the sending side, i.e. in rpc_put_block, and include it in BlockData::Compressed? Meaning we do it only once, like we do for the uncompressed checksum

Maybe we should calculate this checksum on the sending side, i.e. in `rpc_put_block`, and include it in `BlockData::Compressed`? Meaning we do it only once, like we do for the uncompressed checksum
path.set_extension("zst");
b
let checksum = blake2sum(&b);
(b, Some(checksum))
}
};
lx marked this conversation as resolved Outdated
Outdated
Review

Taking the lock should probably be the first thing in the function because is_block_compressed manipulates the data directory

Taking the lock should probably be the first thing in the function because `is_block_compressed` manipulates the data directory
let _lock = self.data_dir_lock.lock().await;
fs::create_dir_all(&path).await?;
path.push(hex::encode(hash));
if checksum.is_some() {
path.set_extension("zst.b2");
}
let mut f = fs::File::create(path).await?;
f.write_all(&buffer).await?;
if let Some(checksum) = checksum {
f.write_all(checksum.as_slice()).await?;
}
drop(f);
Ok(Message::Ok)
@ -190,7 +198,7 @@ impl BlockManager {
f.map(|f| (f, false)).map_err(Into::into)
}
Ok(true) => {
path.set_extension("zst");
path.set_extension("zst.b2");
let f = fs::File::open(&path).await;
f.map(|f| (f, true)).map_err(Into::into)
}
@ -207,14 +215,19 @@ impl BlockManager {
f.read_to_end(&mut data).await?;
drop(f);
let sum = if compressed {
zstd_decode(&data[..])
.ok()
.map(|decompressed| blake2sum(&decompressed[..]))
let sum_ok = if compressed {
if data.len() >= 32 {
let data_len = data.len() - 32;
let checksum = data.split_off(data_len);
blake2sum(&data[..]).as_slice() == &checksum
} else {
Some(blake2sum(&data[..]))
// the file is too short to be valid
false
}
} else {
blake2sum(&data[..]) == *hash
};
if sum.is_none() || sum.unwrap() != *hash {
if !sum_ok {
let _lock = self.data_dir_lock.lock().await;
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
@ -257,11 +270,12 @@ impl BlockManager {
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
path.set_extension("zst.b2");
if fs::metadata(&path).await.is_ok() {
return Ok(false);
return Ok(true);
}
path.set_extension("zst");
fs::metadata(&path).await.map(|_| true).map_err(Into::into)
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
fn block_dir(&self, hash: &Hash) -> PathBuf {