diff --git a/src/model/block.rs b/src/model/block.rs index 272bd8849..dd869802a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -43,23 +43,18 @@ pub enum Message { GetBlock(Hash), /// Message to send a block of data, either because requested, of for first delivery of new /// block - PutBlock(PutBlockMessage), - PutCompressedBlock(PutBlockMessage), + PutBlock { hash: Hash, data: BlockData }, /// Ask other node if they should have this block, but don't actually have it NeedBlockQuery(Hash), /// Response : whether the node do require that block NeedBlockReply(bool), } -/// Structure used to send a block +/// A possibly compressed block of data #[derive(Debug, Serialize, Deserialize)] -pub struct PutBlockMessage { - /// Hash of the block - pub hash: Hash, - - /// Content of the block - #[serde(with = "serde_bytes")] - pub data: Vec, +pub enum BlockData { + Plain(#[serde(with = "serde_bytes")] Vec), + Compressed(#[serde(with = "serde_bytes")] Vec), } impl RpcMessage for Message {} @@ -136,8 +131,7 @@ impl BlockManager { async fn handle(self: Arc, msg: &Message) -> Result { match msg { - Message::PutBlock(m) => self.write_block(&m.hash, &m.data, false).await, - Message::PutCompressedBlock(m) => self.write_block(&m.hash, &m.data, true).await, + Message::PutBlock { hash, data } => self.write_block(&hash, &data).await, Message::GetBlock(h) => self.read_block(h).await, Message::NeedBlockQuery(h) => self.need_block(h).await.map(Message::NeedBlockReply), _ => Err(Error::BadRPC(format!("Unexpected RPC message"))), @@ -160,12 +154,7 @@ impl BlockManager { } /// Write a block to disk - pub async fn write_block( - &self, - hash: &Hash, - data: &[u8], - compressed: bool, - ) -> Result { + pub async fn write_block(&self, hash: &Hash, data: &BlockData) -> Result { if self.is_block_compressed(hash).await.is_ok() { return Ok(Message::Ok); } @@ -175,12 +164,16 @@ impl BlockManager { fs::create_dir_all(&path).await?; path.push(hex::encode(hash)); - if compressed { - path.set_extension("zst"); - } + let buffer = match data { + BlockData::Plain(b) => b, + BlockData::Compressed(b) => { + path.set_extension("zst"); + b + } + }; let mut f = fs::File::create(path).await?; - f.write_all(data).await?; + f.write_all(&buffer).await?; drop(f); Ok(Message::Ok) @@ -235,12 +228,15 @@ impl BlockManager { } if compressed { - Ok(Message::PutCompressedBlock(PutBlockMessage { + Ok(Message::PutBlock { hash: *hash, - data, - })) + data: BlockData::Compressed(data), + }) } else { - Ok(Message::PutBlock(PutBlockMessage { hash: *hash, data })) + Ok(Message::PutBlock { + hash: *hash, + data: BlockData::Plain(data), + }) } } @@ -447,14 +443,14 @@ impl BlockManager { // TODO find a way to not do this if they are sending it to us // Let's suppose this isn't an issue for now with the BLOCK_RW_TIMEOUT delay // between the RC being incremented and this part being called. - let (block_data, compressed) = self.rpc_get_raw_block(&hash).await?; - self.write_block(hash, &block_data[..], compressed).await?; + let block = self.rpc_get_raw_block(&hash).await?; + self.write_block(hash, &block).await?; } Ok(()) } - async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<(Vec, bool), Error> { + async fn rpc_get_raw_block(&self, hash: &Hash) -> Result { let who = self.replication.read_nodes(&hash); let resps = self .rpc_client @@ -469,8 +465,7 @@ impl BlockManager { for resp in resps { match resp { - Message::PutBlock(msg) => return Ok((msg.data, false)), - Message::PutCompressedBlock(msg) => return Ok((msg.data, true)), + Message::PutBlock { data, .. } => return Ok(data), _ => {} } } @@ -484,11 +479,10 @@ impl BlockManager { pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { self.rpc_get_raw_block(hash) .await - .and_then(|(data, compressed)| { - if compressed { + .and_then(|data| match data { + BlockData::Plain(data) => Ok(data), + BlockData::Compressed(data) => { zstd_decode(&data[..]).map_err(|_| Error::CorruptData(*hash)) - } else { - Ok(data) } }) } @@ -498,12 +492,15 @@ impl BlockManager { let garage = self.garage.load_full().unwrap(); let compressed = zstd_encode(&data[..], garage.config.compression_level); let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() { - Message::PutCompressedBlock(PutBlockMessage { + Message::PutBlock { hash, - data: compressed.unwrap(), - }) + data: BlockData::Compressed(compressed.unwrap()), + } } else { - Message::PutBlock(PutBlockMessage { hash, data }) + Message::PutBlock { + hash, + data: BlockData::Plain(data), + } }; let who = self.replication.write_nodes(&hash); self.rpc_client