Compress with zstd #44

Merged
lx merged 7 commits from trinity-1686a/garage:zstd-block into main 2021-04-14 21:27:36 +00:00
Showing only changes of commit aeadb071c6 - Show all commits

View file

@ -57,6 +57,15 @@ pub enum BlockData {
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
}
impl BlockData {
pub fn is_compressed(&self) -> bool {
match self {
BlockData::Plain(_) => false,
BlockData::Compressed(_) => true,
}
}
}
impl RpcMessage for Message {}
/// The block manager, handling block exchange between nodes, and block storage on local node
@ -155,9 +164,12 @@ impl BlockManager {
/// Write a block to disk
pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result<Message, Error> {
lx marked this conversation as resolved Outdated
Outdated
Review

Maybe we should calculate this checksum on the sending side, i.e. in rpc_put_block, and include it in BlockData::Compressed? Meaning we do it only once, like we do for the uncompressed checksum

Maybe we should calculate this checksum on the sending side, i.e. in `rpc_put_block`, and include it in `BlockData::Compressed`? Meaning we do it only once, like we do for the uncompressed checksum
if self.is_block_compressed(hash).await.is_ok() {
return Ok(Message::Ok);
}
let clean_plain = match self.is_block_compressed(hash).await {
Ok(true) => return Ok(Message::Ok),
Ok(false) if !data.is_compressed() => return Ok(Message::Ok), // we have a plain block, and the provided block is not compressed either
Ok(false) => true,
lx marked this conversation as resolved Outdated
Outdated
Review

Taking the lock should probably be the first thing in the function because is_block_compressed manipulates the data directory

Taking the lock should probably be the first thing in the function because `is_block_compressed` manipulates the data directory
Err(_) => false,
};
let mut path = self.block_dir(hash);
@ -177,11 +189,16 @@ impl BlockManager {
path.set_extension("zst_b2");
}
let mut f = fs::File::create(path).await?;
let mut f = fs::File::create(path.clone()).await?;
f.write_all(&buffer).await?;
if let Some(checksum) = checksum {
f.write_all(checksum.as_slice()).await?;
}
if clean_plain {
path.set_extension("");
fs::remove_file(path).await?;
}
drop(f);
Ok(Message::Ok)