diff --git a/src/block/block.rs b/src/block/block.rs index 34afea5d..0b14bad4 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -13,77 +13,99 @@ pub enum DataBlockHeader { Compressed, } -/// A possibly compressed block of data -pub enum DataBlock { - /// Uncompressed data - Plain(Bytes), - /// Data compressed with zstd - Compressed(Bytes), +#[derive(Debug)] +pub struct DataBlockElem { + header: DataBlockHeader, + elem: T, } -#[derive(Debug)] -pub enum DataBlockPath { - /// Uncompressed data fail - Plain(PathBuf), - /// Compressed data fail - Compressed(PathBuf), +/// A possibly compressed block of data +pub type DataBlock = DataBlockElem; + +/// A path to a possibly compressed block of data +pub type DataBlockPath = DataBlockElem; + +impl DataBlockHeader { + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlockHeader::Compressed) + } +} + +impl DataBlockElem { + pub fn from_parts(header: DataBlockHeader, elem: T) -> Self { + Self { header, elem } + } + + pub fn plain(elem: T) -> Self { + Self { + header: DataBlockHeader::Plain, + elem, + } + } + + pub fn compressed(elem: T) -> Self { + Self { + header: DataBlockHeader::Compressed, + elem, + } + } + + pub fn into_parts(self) -> (DataBlockHeader, T) { + (self.header, self.elem) + } + + pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) { + (self.header, &self.elem) + } + + /// Query whether this block is compressed + pub fn is_compressed(&self) -> bool { + self.header.is_compressed() + } } impl DataBlock { - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) - } - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] /// instead pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res + &self.elem } /// Verify data integrity. Does not return the buffer content. pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { + match self.header { + DataBlockHeader::Plain => { + if blake2sum(&self.elem) == hash { Ok(()) } else { Err(Error::CorruptData(hash)) } } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), + DataBlockHeader::Compressed => { + zstd::stream::copy_decode(&self.elem[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)) + } } } pub async fn from_buffer(data: Bytes, level: Option) -> DataBlock { tokio::task::spawn_blocking(move || { if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data.into()); + if let Ok(data_compressed) = zstd_encode(&data[..], level) { + return DataBlock { + header: DataBlockHeader::Compressed, + elem: data_compressed.into(), + }; } } - DataBlock::Plain(data) + DataBlock { + header: DataBlockHeader::Plain, + elem: data.into(), + } }) .await .unwrap() } - - pub fn into_parts(self) -> (DataBlockHeader, Bytes) { - match self { - DataBlock::Plain(data) => (DataBlockHeader::Plain, data), - DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data), - } - } - - pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self { - match h { - DataBlockHeader::Plain => DataBlock::Plain(bytes), - DataBlockHeader::Compressed => DataBlock::Compressed(bytes), - } - } } fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { diff --git a/src/block/manager.rs b/src/block/manager.rs index 64e2ea27..6303248a 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -547,10 +547,7 @@ impl BlockManager { hash: &Hash, block_path: &DataBlockPath, ) -> Result { - let (path, compressed) = match block_path { - DataBlockPath::Plain(p) => (p, false), - DataBlockPath::Compressed(p) => (p, true), - }; + let (header, path) = block_path.as_parts_ref(); let mut f = fs::File::open(&path).await?; let mut data = vec![]; @@ -558,11 +555,7 @@ impl BlockManager { self.metrics.bytes_read.add(data.len() as u64); drop(f); - let data = if compressed { - DataBlock::Compressed(data.into()) - } else { - DataBlock::Plain(data.into()) - }; + let data = DataBlock::from_parts(header, data.into()); if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); @@ -615,20 +608,20 @@ impl BlockManager { // first and then a compressed one (as compression may have been // previously enabled). if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } } else { path.set_extension("zst"); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Compressed(path)); + return Some(DataBlockPath::compressed(path)); } path.set_extension(""); if fs::metadata(&path).await.is_ok() { - return Some(DataBlockPath::Plain(path)); + return Some(DataBlockPath::plain(path)); } } } @@ -709,24 +702,25 @@ impl BlockManagerLocked { tgt_path.set_extension("zst"); } - let to_delete = match (existing_path, compressed) { + let existing_info = existing_path.map(|x| x.into_parts()); + let to_delete = match (existing_info, compressed) { // If the block is stored in the wrong directory, // write it again at the correct path and delete the old path - (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), - (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p), + (Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p), // If the block is already stored not compressed but we have a compressed // copy, write the compressed copy and delete the uncompressed one - (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path), // If the block is already stored compressed, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some((DataBlockHeader::Compressed, _)), _) => return Ok(()), // If the block is already stored not compressed, // and we don't have a compressed copy either, // keep the stored copy, we have nothing to do - (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some((DataBlockHeader::Plain, _)), false) => return Ok(()), // If the block isn't stored already, just store what is given to us (None, _) => None, @@ -778,18 +772,14 @@ impl BlockManagerLocked { } async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { - let (path, path2) = match block_path { - DataBlockPath::Plain(p) => { - let mut p2 = p.clone(); - p2.set_extension("corrupted"); - (p, p2) - } - DataBlockPath::Compressed(p) => { - let mut p2 = p.clone(); - p2.set_extension("zst.corrupted"); - (p, p2) - } - }; + let (header, path) = block_path.as_parts_ref(); + + let mut path2 = path.clone(); + if header.is_compressed() { + path2.set_extension("zst.corrupted"); + } else { + path2.set_extension("corrupted"); + } fs::rename(path, path2).await?; Ok(()) @@ -799,9 +789,7 @@ impl BlockManagerLocked { let rc = mgr.rc.get_block_rc(hash)?; if rc.is_deletable() { while let Some(path) = mgr.find_block(hash).await { - let path = match path { - DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, - }; + let (_header, path) = path.as_parts_ref(); fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } diff --git a/src/block/repair.rs b/src/block/repair.rs index 77ee0d14..2c8acbc9 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -584,8 +584,8 @@ impl Worker for RebalanceWorker { let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash); if path.ancestors().all(|x| x != prim_loc) { let block_path = match path.extension() { - None => DataBlockPath::Plain(path.clone()), - Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()), + None => DataBlockPath::plain(path.clone()), + Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()), _ => { warn!("not rebalancing file: {}", path.to_string_lossy()); return Ok(WorkerState::Busy);