diff --git a/src/block/block.rs b/src/block/block.rs index 935aa900..6d79fb6c 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,5 @@ +use std::path::PathBuf; + use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -19,6 +21,13 @@ pub enum DataBlock { Compressed(Bytes), } +pub enum DataBlockPath { + /// Uncompressed data fail + Plain(PathBuf), + /// Compressed data fail + Compressed(PathBuf), +} + impl DataBlock { /// Query whether this block is compressed pub fn is_compressed(&self) -> bool { diff --git a/src/block/layout.rs b/src/block/layout.rs index 4a49b287..b119281b 100644 --- a/src/block/layout.rs +++ b/src/block/layout.rs @@ -190,32 +190,35 @@ impl DataLayout { }) } - pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { - let ipart = self.partition_from(hash); - let idir = self.part_prim[ipart] as usize; - self.data_dir_from(hash, &self.data_dirs[idir].path) + pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf { + let ipart = self.partition_from(hash); + let idir = self.part_prim[ipart] as usize; + self.block_dir_from(hash, &self.data_dirs[idir].path) } - pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator + 'a { - let ipart = self.partition_from(hash); - self.part_sec[ipart] - .iter() - .map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) + pub(crate) fn secondary_block_dirs<'a>( + &'a self, + hash: &'a Hash, + ) -> impl Iterator + 'a { + let ipart = self.partition_from(hash); + self.part_sec[ipart] + .iter() + .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path)) } - fn partition_from(&self, hash: &Hash) -> usize { - u16::from_be_bytes([ - hash.as_slice()[DPART_BYTES.0], - hash.as_slice()[DPART_BYTES.1] - ]) as usize % DRIVE_NPART - } + fn partition_from(&self, hash: &Hash) -> usize { + u16::from_be_bytes([ + hash.as_slice()[DPART_BYTES.0], + hash.as_slice()[DPART_BYTES.1], + ]) as usize % DRIVE_NPART + } - fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { + fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { let mut path = dir.clone(); path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[1..2])); path - } + } } impl InitialFormat for DataLayout { diff --git a/src/block/manager.rs b/src/block/manager.rs index 45729a00..73fefa0c 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -543,21 +543,25 @@ impl BlockManager { } async fn read_block_internal(&self, hash: &Hash) -> Result { - let mut path = self.block_path(hash); - let compressed = match self.is_block_compressed(hash).await { - Ok(c) => c, - Err(e) => { + let block_path = match self.find_block(hash).await { + Some(p) => p, + None => { // Not found but maybe we should have had it ?? self.resync .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; - return Err(Into::into(e)); + return Err(Error::Message(format!( + "block {:?} not found on node", + hash + ))); } }; - if compressed { - path.set_extension("zst"); - } - let mut f = fs::File::open(&path).await?; + let (path, compressed) = match &block_path { + DataBlockPath::Plain(p) => (p, false), + DataBlockPath::Compressed(p) => (p, true), + }; + + let mut f = fs::File::open(&path).await?; let mut data = vec![]; f.read_to_end(&mut data).await?; drop(f); @@ -571,11 +575,16 @@ impl BlockManager { if data.verify(*hash).is_err() { self.metrics.corruption_counter.add(1); + warn!( + "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", + hash + ); self.lock_mutate(hash) .await - .move_block_to_corrupted(hash, self) + .move_block_to_corrupted(&block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; + return Err(Error::CorruptData(*hash)); } @@ -604,56 +613,51 @@ impl BlockManager { .await } - /// Utility: gives the path of the directory in which a block should be found - fn block_dir(&self, hash: &Hash) -> PathBuf { - self.data_layout.primary_data_dir(hash) - } + /// Utility: check if block is stored compressed. + async fn find_block(&self, hash: &Hash) -> Option { + let dirs = Some(self.data_layout.primary_block_dir(hash)) + .into_iter() + .chain(self.data_layout.secondary_block_dirs(hash)); + let filename = hex::encode(hash.as_ref()); - /// Utility: give the full path where a block should be found, minus extension if block is - /// compressed - fn block_path(&self, hash: &Hash) -> PathBuf { - let mut path = self.block_dir(hash); - path.push(hex::encode(hash.as_ref())); - path - } + for dir in dirs { + let mut path = dir; + path.push(&filename); - /// Utility: check if block is stored compressed. Error if block is not stored - async fn is_block_compressed(&self, hash: &Hash) -> Result { - let mut path = self.block_path(hash); - - // If compression is disabled on node - check for the raw block - // first and then a compressed one (as compression may have been - // previously enabled). - match self.compression_level { - None => { + if self.compression_level.is_none() { + // If compression is disabled on node - check for the raw block + // first and then a compressed one (as compression may have been + // previously enabled). if fs::metadata(&path).await.is_ok() { - return Ok(false); + return Some(DataBlockPath::Plain(path)); } - path.set_extension("zst"); - - fs::metadata(&path).await.map(|_| true).map_err(Into::into) - } - _ => { - path.set_extension("zst"); - if fs::metadata(&path).await.is_ok() { - return Ok(true); + return Some(DataBlockPath::Compressed(path)); + } + } else { + path.set_extension("zst"); + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Compressed(path)); } - path.set_extension(""); - - fs::metadata(&path).await.map(|_| false).map_err(Into::into) + if fs::metadata(&path).await.is_ok() { + return Some(DataBlockPath::Plain(path)); + } } } + + None } async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { let tracer = opentelemetry::global::tracer("garage"); - self.mutation_lock[hash.as_slice()[0] as usize] + let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize + % self.mutation_lock.len(); + self.mutation_lock[ilock] .lock() .with_context(Context::current_with_span( - tracer.start("Acquire mutation_lock"), + tracer.start(format!("Acquire mutation_lock #{}", ilock)), )) .await } @@ -688,7 +692,7 @@ impl BlockManagerLocked { hash: &Hash, mgr: &BlockManager, ) -> Result { - let exists = mgr.is_block_compressed(hash).await.is_ok(); + let exists = mgr.find_block(hash).await.is_some(); let needed = mgr.rc.get_block_rc(hash)?; Ok(BlockStatus { exists, needed }) @@ -703,21 +707,17 @@ impl BlockManagerLocked { let compressed = data.is_compressed(); let data = data.inner_buffer(); - let mut path = mgr.block_dir(hash); + let mut path = mgr.data_layout.primary_block_dir(hash); let directory = path.clone(); path.push(hex::encode(hash)); fs::create_dir_all(&directory).await?; - let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { - (Ok(true), _) => return Ok(()), - (Ok(false), false) => return Ok(()), - (Ok(false), true) => { - let path_to_delete = path.clone(); - path.set_extension("zst"); - Some(path_to_delete) - } - (Err(_), compressed) => { + let to_delete = match (mgr.find_block(hash).await, compressed) { + (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), + (Some(DataBlockPath::Plain(_)), false) => return Ok(()), + (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), + (None, compressed) => { if compressed { path.set_extension("zst"); } @@ -766,19 +766,20 @@ impl BlockManagerLocked { Ok(()) } - async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - warn!( - "Block {:?} is corrupted. Renaming to .corrupted and resyncing.", - hash - ); - let mut path = mgr.block_path(hash); - let mut path2 = path.clone(); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); - path2.set_extension("zst.corrupted"); - } else { - path2.set_extension("corrupted"); - } + async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> { + let (path, path2) = match block_path { + DataBlockPath::Plain(p) => { + let mut p2 = p.clone(); + p2.set_extension("corrupted"); + (p, p2) + } + DataBlockPath::Compressed(p) => { + let mut p2 = p.clone(); + p2.set_extension("zst.corrupted"); + (p, p2) + } + }; + fs::rename(path, path2).await?; Ok(()) } @@ -787,12 +788,14 @@ impl BlockManagerLocked { let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; if exists && needed.is_deletable() { - let mut path = mgr.block_path(hash); - if mgr.is_block_compressed(hash).await? { - path.set_extension("zst"); + let path_opt = match mgr.find_block(hash).await { + Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p), + None => None, + }; + if let Some(path) = path_opt { + fs::remove_file(path).await?; + mgr.metrics.delete_counter.add(1); } - fs::remove_file(path).await?; - mgr.metrics.delete_counter.add(1); } Ok(()) }