Compress with zstd #44
1 changed files with 36 additions and 39 deletions
|
@ -43,23 +43,18 @@ pub enum Message {
|
|||
GetBlock(Hash),
|
||||
/// Message to send a block of data, either because requested, of for first delivery of new
|
||||
/// block
|
||||
PutBlock(PutBlockMessage),
|
||||
PutCompressedBlock(PutBlockMessage),
|
||||
PutBlock { hash: Hash, data: BlockData },
|
||||
/// Ask other node if they should have this block, but don't actually have it
|
||||
NeedBlockQuery(Hash),
|
||||
/// Response : whether the node do require that block
|
||||
NeedBlockReply(bool),
|
||||
}
|
||||
|
||||
/// Structure used to send a block
|
||||
/// A possibly compressed block of data
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct PutBlockMessage {
|
||||
/// Hash of the block
|
||||
pub hash: Hash,
|
||||
|
||||
/// Content of the block
|
||||
#[serde(with = "serde_bytes")]
|
||||
pub data: Vec<u8>,
|
||||
pub enum BlockData {
|
||||
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
}
|
||||
|
||||
impl RpcMessage for Message {}
|
||||
|
@ -136,8 +131,7 @@ impl BlockManager {
|
|||
|
||||
async fn handle(self: Arc<Self>, msg: &Message) -> Result<Message, Error> {
|
||||
match msg {
|
||||
Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await,
|
||||
Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await,
|
||||
Message::PutBlock { hash, data } => self.write_block(&hash, &data).await,
|
||||
Message::GetBlock(h) => self.read_block(h).await,
|
||||
Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply),
|
||||
_ => Err(Error::BadRPC(format!("Unexpected RPC message"))),
|
||||
|
@ -160,12 +154,7 @@ impl BlockManager {
|
|||
}
|
||||
|
||||
/// Write a block to disk
|
||||
pub async fn write_block(
|
||||
&self,
|
||||
hash: &Hash,
|
||||
data: &[u8],
|
||||
compressed: bool,
|
||||
) -> Result<Message, Error> {
|
||||
pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result<Message, Error> {
|
||||
if self.is_block_compressed(hash).await.is_ok() {
|
||||
return Ok(Message::Ok);
|
||||
}
|
||||
|
@ -175,12 +164,16 @@ impl BlockManager {
|
|||
fs::create_dir_all(&path).await?;
|
||||
|
||||
path.push(hex::encode(hash));
|
||||
lx marked this conversation as resolved
Outdated
|
||||
if compressed {
|
||||
let buffer = match data {
|
||||
BlockData::Plain(b) => b,
|
||||
BlockData::Compressed(b) => {
|
||||
path.set_extension("zst");
|
||||
lx marked this conversation as resolved
Outdated
lx
commented
Taking the lock should probably be the first thing in the function because Taking the lock should probably be the first thing in the function because `is_block_compressed` manipulates the data directory
|
||||
b
|
||||
}
|
||||
};
|
||||
|
||||
let mut f = fs::File::create(path).await?;
|
||||
f.write_all(data).await?;
|
||||
f.write_all(&buffer).await?;
|
||||
drop(f);
|
||||
|
||||
Ok(Message::Ok)
|
||||
|
@ -235,12 +228,15 @@ impl BlockManager {
|
|||
}
|
||||
|
||||
if compressed {
|
||||
Ok(Message::PutCompressedBlock(PutBlockMessage {
|
||||
Ok(Message::PutBlock {
|
||||
hash: *hash,
|
||||
data,
|
||||
}))
|
||||
data: BlockData::Compressed(data),
|
||||
})
|
||||
} else {
|
||||
Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data }))
|
||||
Ok(Message::PutBlock {
|
||||
hash: *hash,
|
||||
data: BlockData::Plain(data),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -447,14 +443,14 @@ impl BlockManager {
|
|||
// TODO find a way to not do this if they are sending it to us
|
||||
// Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay
|
||||
// between the RC being incremented and this part being called.
|
||||
let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?;
|
||||
self.write_block(hash, &block_data[..], compressed).await?;
|
||||
let block = self.rpc_get_raw_block(&hash).await?;
|
||||
self.write_block(hash, &block).await?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec<u8>, bool), Error> {
|
||||
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<BlockData, Error> {
|
||||
let who = self.replication.read_nodes(&hash);
|
||||
let resps = self
|
||||
.rpc_client
|
||||
|
@ -469,8 +465,7 @@ impl BlockManager {
|
|||
|
||||
for resp in resps {
|
||||
match resp {
|
||||
Message::PutBlock(msg) => return Ok((msg.data, false)),
|
||||
Message::PutCompressedBlock(msg) => return Ok((msg.data, true)),
|
||||
Message::PutBlock { data, .. } => return Ok(data),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
|
@ -484,11 +479,10 @@ impl BlockManager {
|
|||
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
|
||||
self.rpc_get_raw_block(hash)
|
||||
.await
|
||||
.and_then(|(data, compressed)| {
|
||||
if compressed {
|
||||
.and_then(|data| match data {
|
||||
BlockData::Plain(data) => Ok(data),
|
||||
BlockData::Compressed(data) => {
|
||||
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash))
|
||||
} else {
|
||||
Ok(data)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
@ -498,12 +492,15 @@ impl BlockManager {
|
|||
let garage = self.garage.load_full().unwrap();
|
||||
let compressed = zstd_encode(&data[..], garage.config.compression_level);
|
||||
let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() {
|
||||
Message::PutCompressedBlock(PutBlockMessage {
|
||||
Message::PutBlock {
|
||||
hash,
|
||||
data: compressed.unwrap(),
|
||||
})
|
||||
data: BlockData::Compressed(compressed.unwrap()),
|
||||
}
|
||||
} else {
|
||||
Message::PutBlock(PutBlockMessage { hash, data })
|
||||
Message::PutBlock {
|
||||
hash,
|
||||
data: BlockData::Plain(data),
|
||||
}
|
||||
};
|
||||
let who = self.replication.write_nodes(&hash);
|
||||
self.rpc_client
|
||||
|
|
Loading…
Reference in a new issue
Maybe we should calculate this checksum on the sending side, i.e. in
rpc_put_block
, and include it inBlockData::Compressed
? Meaning we do it only once, like we do for the uncompressed checksum