Arrange block manager
This commit is contained in:
parent
09fd6ea7f0
commit
13e2eda0c2
1 changed files with 66 additions and 84 deletions
|
@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
|
||||||
use garage_rpc::rpc_server::*;
|
use garage_rpc::rpc_server::*;
|
||||||
|
|
||||||
use garage_table::table_sharded::TableShardedReplication;
|
use garage_table::table_sharded::TableShardedReplication;
|
||||||
use garage_table::{DeletedFilter, TableReplication};
|
use garage_table::TableReplication;
|
||||||
|
|
||||||
use crate::block_ref_table::*;
|
use crate::block_ref_table::*;
|
||||||
|
|
||||||
|
@ -303,37 +303,29 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists && !needed {
|
if exists && !needed {
|
||||||
let garage = self.garage.load_full().unwrap();
|
trace!("Offloading block {:?}", hash);
|
||||||
let active_refs = garage
|
|
||||||
.block_ref_table
|
|
||||||
.get_range(&hash, None, Some(DeletedFilter::NotDeleted), 1)
|
|
||||||
.await?;
|
|
||||||
let needed_by_others = !active_refs.is_empty();
|
|
||||||
if needed_by_others {
|
|
||||||
let ring = self.system.ring.borrow().clone();
|
let ring = self.system.ring.borrow().clone();
|
||||||
let who = self.replication.replication_nodes(&hash, &ring);
|
|
||||||
|
let mut who = self.replication.replication_nodes(&hash, &ring);
|
||||||
|
who.retain(|id| *id != self.system.id);
|
||||||
|
|
||||||
let msg = Arc::new(Message::NeedBlockQuery(*hash));
|
let msg = Arc::new(Message::NeedBlockQuery(*hash));
|
||||||
let who_needs_fut = who.iter().map(|to| {
|
let who_needs_fut = who.iter().map(|to| {
|
||||||
self.rpc_client
|
self.rpc_client
|
||||||
.call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
|
.call_arc(*to, msg.clone(), NEED_BLOCK_QUERY_TIMEOUT)
|
||||||
});
|
});
|
||||||
let who_needs = join_all(who_needs_fut).await;
|
let who_needs_resps = join_all(who_needs_fut).await;
|
||||||
|
|
||||||
let mut need_nodes = vec![];
|
let mut need_nodes = vec![];
|
||||||
for (node, needed) in who.into_iter().zip(who_needs.iter()) {
|
for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) {
|
||||||
match needed {
|
match needed? {
|
||||||
Ok(Message::NeedBlockReply(needed)) => {
|
Message::NeedBlockReply(needed) => {
|
||||||
if *needed {
|
if needed {
|
||||||
need_nodes.push(node);
|
need_nodes.push(*node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Err(e) => {
|
_ => {
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Should delete block, but unable to confirm that all other nodes that need it have it: {}",
|
|
||||||
e
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
Ok(_) => {
|
|
||||||
return Err(Error::Message(format!(
|
return Err(Error::Message(format!(
|
||||||
"Unexpected response to NeedBlockQuery RPC"
|
"Unexpected response to NeedBlockQuery RPC"
|
||||||
)));
|
)));
|
||||||
|
@ -342,17 +334,18 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if need_nodes.len() > 0 {
|
if need_nodes.len() > 0 {
|
||||||
let put_block_message = self.read_block(hash).await?;
|
trace!("Block {:?} neede by {} nodes, sending", hash, need_nodes.len());
|
||||||
self.rpc_client
|
|
||||||
.try_call_many(
|
let put_block_message = Arc::new(self.read_block(hash).await?);
|
||||||
&need_nodes[..],
|
let put_resps = join_all(need_nodes.iter().map(|to| {
|
||||||
put_block_message,
|
self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
|
||||||
RequestStrategy::with_quorum(need_nodes.len())
|
})).await;
|
||||||
.with_timeout(BLOCK_RW_TIMEOUT),
|
for resp in put_resps {
|
||||||
)
|
resp?;
|
||||||
.await?;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len());
|
||||||
|
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
self.resync_queue.remove(&hash)?;
|
self.resync_queue.remove(&hash)?;
|
||||||
}
|
}
|
||||||
|
@ -427,53 +420,42 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Repair blocks actually on disk
|
// 2. Repair blocks actually on disk
|
||||||
let mut ls_data_dir = fs::read_dir(&self.data_dir).await?;
|
self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?;
|
||||||
while let Some(data_dir_ent) = ls_data_dir.next().await {
|
|
||||||
let data_dir_ent = data_dir_ent?;
|
Ok(())
|
||||||
let dir_name = data_dir_ent.file_name();
|
|
||||||
let dir_name = match dir_name.into_string() {
|
|
||||||
Ok(x) => x,
|
|
||||||
Err(_) => continue,
|
|
||||||
};
|
|
||||||
if dir_name.len() != 2 || hex::decode(&dir_name).is_err() {
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await {
|
fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
|
||||||
Err(e) => {
|
async move {
|
||||||
warn!(
|
let mut ls_data_dir = fs::read_dir(path).await?;
|
||||||
"Warning: could not list dir {:?}: {}",
|
while let Some(data_dir_ent) = ls_data_dir.next().await {
|
||||||
data_dir_ent.path().to_str(),
|
let data_dir_ent = data_dir_ent?;
|
||||||
e
|
let name = data_dir_ent.file_name();
|
||||||
);
|
let name = match name.into_string() {
|
||||||
continue;
|
|
||||||
}
|
|
||||||
Ok(x) => x,
|
|
||||||
};
|
|
||||||
while let Some(file) = ls_data_dir_2.next().await {
|
|
||||||
let file = file?;
|
|
||||||
let file_name = file.file_name();
|
|
||||||
let file_name = match file_name.into_string() {
|
|
||||||
Ok(x) => x,
|
Ok(x) => x,
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
};
|
};
|
||||||
if file_name.len() != 64 {
|
let ent_type = data_dir_ent.file_type().await?;
|
||||||
continue;
|
println!("name: {}, path: {:?}", name, data_dir_ent.path().to_str());
|
||||||
}
|
|
||||||
let hash_bytes = match hex::decode(&file_name) {
|
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
||||||
|
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?;
|
||||||
|
} else if name.len() == 64 {
|
||||||
|
let hash_bytes = match hex::decode(&name) {
|
||||||
Ok(h) => h,
|
Ok(h) => h,
|
||||||
Err(_) => continue,
|
Err(_) => continue,
|
||||||
};
|
};
|
||||||
let mut hash = [0u8; 32];
|
let mut hash = [0u8; 32];
|
||||||
hash.copy_from_slice(&hash_bytes[..]);
|
hash.copy_from_slice(&hash_bytes[..]);
|
||||||
self.put_to_resync(&hash.into(), 0)?;
|
self.put_to_resync(&hash.into(), 0)?;
|
||||||
|
}
|
||||||
|
|
||||||
if *must_exit.borrow() {
|
if *must_exit.borrow() {
|
||||||
return Ok(());
|
break;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
|
}.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue