From 13e2eda0c2beb34b087f45d7461dba0a483c78af Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 24 Feb 2021 11:58:03 +0100 Subject: [PATCH] Arrange block manager --- src/model/block.rs | 150 ++++++++++++++++++++------------------------- 1 file changed, 66 insertions(+), 84 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index 1627ef857..2a9916239 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use garage_table::table_sharded::TableShardedReplication; -use garage_table::{DeletedFilter, TableReplication}; +use garage_table::TableReplication; use crate::block_ref_table::*; @@ -303,56 +303,49 @@ impl BlockManager { } if exists && !needed { - let garage = self.garage.load_full().unwrap(); - let active_refs = garage - .block_ref_table - .get_range(&hash, None, Some(DeletedFilter::NotDeleted), 1) - .await?; - let needed_by_others = !active_refs.is_empty(); - if needed_by_others { - let ring = self.system.ring.borrow().clone(); - let who = self.replication.replication_nodes(&hash, &ring); - let msg = Arc::new(Message::NeedBlockQuery(*hash)); - let who_needs_fut = who.iter().map(|to| { - self.rpc_client - .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) - }); - let who_needs = join_all(who_needs_fut).await; + trace!("Offloading block {:?}", hash); - let mut need_nodes = vec![]; - for (node, needed) in who.into_iter().zip(who_needs.iter()) { - match needed { - Ok(Message::NeedBlockReply(needed)) => { - if *needed { - need_nodes.push(node); - } - } - Err(e) => { - return Err(Error::Message(format!( - "Should delete block, but unable to confirm that all other nodes that need it have it: {}", - e - ))); - } - Ok(_) => { - return Err(Error::Message(format!( - "Unexpected response to NeedBlockQuery RPC" - ))); + let ring = self.system.ring.borrow().clone(); + + let mut who = self.replication.replication_nodes(&hash, &ring); + who.retain(|id| *id != self.system.id); + + let msg = Arc::new(Message::NeedBlockQuery(*hash)); + let who_needs_fut = who.iter().map(|to| { + self.rpc_client + .call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT) + }); + let who_needs_resps = join_all(who_needs_fut).await; + + let mut need_nodes = vec![]; + for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { + match needed? { + Message::NeedBlockReply(needed) => { + if needed { + need_nodes.push(*node); } } - } - - if need_nodes.len() > 0 { - let put_block_message = self.read_block(hash).await?; - self.rpc_client - .try_call_many( - &need_nodes[..], - put_block_message, - RequestStrategy::with_quorum(need_nodes.len()) - .with_timeout(BLOCK_RW_TIMEOUT), - ) - .await?; + _ => { + return Err(Error::Message(format!( + "Unexpected response to NeedBlockQuery RPC" + ))); + } } } + + if need_nodes.len() > 0 { + trace!("Block {:?} neede by {} nodes, sending", hash, need_nodes.len()); + + let put_block_message = Arc::new(self.read_block(hash).await?); + let put_resps = join_all(need_nodes.iter().map(|to| { + self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) + })).await; + for resp in put_resps { + resp?; + } + } + trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len()); + fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; } @@ -427,53 +420,42 @@ impl BlockManager { } // 2. Repair blocks actually on disk - let mut ls_data_dir = fs::read_dir(&self.data_dir).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; - let dir_name = data_dir_ent.file_name(); - let dir_name = match dir_name.into_string() { - Ok(x) => x, - Err(_) => continue, - }; - if dir_name.len() != 2 || hex::decode(&dir_name).is_err() { - continue; - } + self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?; - let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { - Err(e) => { - warn!( - "Warning: could not list dir {:?}: {}", - data_dir_ent.path().to_str(), - e - ); - continue; - } - Ok(x) => x, - }; - while let Some(file) = ls_data_dir_2.next().await { - let file = file?; - let file_name = file.file_name(); - let file_name = match file_name.into_string() { + Ok(()) + } + + fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver) -> BoxFuture<'a, Result<(), Error>> { + async move { + let mut ls_data_dir = fs::read_dir(path).await?; + while let Some(data_dir_ent) = ls_data_dir.next().await { + let data_dir_ent = data_dir_ent?; + let name = data_dir_ent.file_name(); + let name = match name.into_string() { Ok(x) => x, Err(_) => continue, }; - if file_name.len() != 64 { - continue; + let ent_type = data_dir_ent.file_type().await?; + println!("name: {}, path: {:?}", name, data_dir_ent.path().to_str()); + + if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { + self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?; + } else if name.len() == 64 { + let hash_bytes = match hex::decode(&name) { + Ok(h) => h, + Err(_) => continue, + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + self.put_to_resync(&hash.into(), 0)?; } - let hash_bytes = match hex::decode(&file_name) { - Ok(h) => h, - Err(_) => continue, - }; - let mut hash = [0u8; 32]; - hash.copy_from_slice(&hash_bytes[..]); - self.put_to_resync(&hash.into(), 0)?; if *must_exit.borrow() { - return Ok(()); + break; } } - } - Ok(()) + Ok(()) + }.boxed() } }