diff --git a/src/block/manager.rs b/src/block/manager.rs index 798cedf9..5bad34d4 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -543,8 +543,8 @@ impl BlockManager { } async fn read_block_internal(&self, hash: &Hash) -> Result { - let block_path = match self.find_block(hash).await { - Some(p) => p, + match self.find_block(hash).await { + Some(p) => self.read_block_from(hash, &p).await, None => { // Not found but maybe we should have had it ?? self.resync @@ -554,9 +554,15 @@ impl BlockManager { hash ))); } - }; + } + } - let (path, compressed) = match &block_path { + pub(crate) async fn read_block_from( + &self, + hash: &Hash, + block_path: &DataBlockPath, + ) -> Result { + let (path, compressed) = match block_path { DataBlockPath::Plain(p) => (p, false), DataBlockPath::Compressed(p) => (p, true), }; @@ -581,7 +587,7 @@ impl BlockManager { ); self.lock_mutate(hash) .await - .move_block_to_corrupted(&block_path) + .move_block_to_corrupted(block_path) .await?; self.resync.put_to_resync(hash, Duration::from_millis(0))?; @@ -591,18 +597,11 @@ impl BlockManager { Ok(data) } - /// Check if this node has a block and whether it needs it - pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result { - self.lock_mutate(hash) - .await - .check_block_status(hash, self) - .await - } - /// Check if this node should have a block, but don't actually have it async fn need_block(&self, hash: &Hash) -> Result { - let BlockStatus { exists, needed } = self.check_block_status(hash).await?; - Ok(needed.is_nonzero() && !exists) + let rc = self.rc.get_block_rc(hash)?; + let exists = self.find_block(hash).await.is_some(); + Ok(rc.is_nonzero() && !exists) } /// Delete block if it is not needed anymore @@ -613,8 +612,8 @@ impl BlockManager { .await } - /// Utility: check if block is stored compressed. - async fn find_block(&self, hash: &Hash) -> Option { + /// Find the path where a block is currently stored + pub(crate) async fn find_block(&self, hash: &Hash) -> Option { let dirs = Some(self.data_layout.primary_block_dir(hash)) .into_iter() .chain(self.data_layout.secondary_block_dirs(hash)); @@ -687,17 +686,6 @@ pub(crate) struct BlockStatus { } impl BlockManagerLocked { - async fn check_block_status( - &self, - hash: &Hash, - mgr: &BlockManager, - ) -> Result { - let exists = mgr.find_block(hash).await.is_some(); - let needed = mgr.rc.get_block_rc(hash)?; - - Ok(BlockStatus { exists, needed }) - } - async fn write_block( &self, hash: &Hash, @@ -710,32 +698,32 @@ impl BlockManagerLocked { let mut tgt_path = mgr.data_layout.primary_block_dir(hash); let directory = tgt_path.clone(); tgt_path.push(hex::encode(hash)); - if compressed { - tgt_path.set_extension("zst"); - } + if compressed { + tgt_path.set_extension("zst"); + } fs::create_dir_all(&directory).await?; let to_delete = match (mgr.find_block(hash).await, compressed) { - // If the block is stored in the wrong directory, - // write it again at the correct path and delete the old path + // If the block is stored in the wrong directory, + // write it again at the correct path and delete the old path (Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p), (Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p), - // If the block is already stored not compressed but we have a compressed - // copy, write the compressed copy and delete the uncompressed one + // If the block is already stored not compressed but we have a compressed + // copy, write the compressed copy and delete the uncompressed one (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path), - // If the block is already stored compressed, - // keep the stored copy, we have nothing to do + // If the block is already stored compressed, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Compressed(_)), _) => return Ok(()), - // If the block is already stored not compressed, - // and we don't have a compressed copy either, - // keep the stored copy, we have nothing to do + // If the block is already stored not compressed, + // and we don't have a compressed copy either, + // keep the stored copy, we have nothing to do (Some(DataBlockPath::Plain(_)), false) => return Ok(()), - // If the block isn't stored already, just store what is given to us + // If the block isn't stored already, just store what is given to us (None, _) => None, }; @@ -799,14 +787,12 @@ impl BlockManagerLocked { } async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { - let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; - - if exists && needed.is_deletable() { - let path_opt = match mgr.find_block(hash).await { - Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p), - None => None, - }; - if let Some(path) = path_opt { + let rc = mgr.rc.get_block_rc(hash)?; + if rc.is_deletable() { + while let Some(path) = mgr.find_block(hash).await { + let path = match path { + DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p, + }; fs::remove_file(path).await?; mgr.metrics.delete_counter.add(1); } diff --git a/src/block/resync.rs b/src/block/resync.rs index ea280ad4..bb43ad7e 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -359,20 +359,23 @@ impl BlockResyncManager { } async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> { - let BlockStatus { exists, needed } = manager.check_block_status(hash).await?; + let existing_path = manager.find_block(hash).await; + let exists = existing_path.is_some(); + let rc = manager.rc.get_block_rc(hash)?; - if exists != needed.is_needed() || exists != needed.is_nonzero() { + if exists != rc.is_needed() || exists != rc.is_nonzero() { debug!( "Resync block {:?}: exists {}, nonzero rc {}, deletable {}", hash, exists, - needed.is_nonzero(), - needed.is_deletable(), + rc.is_nonzero(), + rc.is_deletable(), ); } - if exists && needed.is_deletable() { + if exists && rc.is_deletable() { info!("Resync block {:?}: offloading and deleting", hash); + let existing_path = existing_path.unwrap(); let mut who = manager.replication.write_nodes(hash); if who.len() < manager.replication.write_quorum() { @@ -419,7 +422,7 @@ impl BlockResyncManager { .add(1, &[KeyValue::new("to", format!("{:?}", node))]); } - let block = manager.read_block(hash).await?; + let block = manager.read_block_from(hash, &existing_path).await?; let (header, bytes) = block.into_parts(); let put_block_message = Req::new(BlockRpc::PutBlock { hash: *hash, @@ -451,7 +454,7 @@ impl BlockResyncManager { manager.rc.clear_deleted_block_rc(hash)?; } - if needed.is_nonzero() && !exists { + if rc.is_nonzero() && !exists { info!( "Resync block {:?}: fetching absent but needed block (refcount > 0)", hash