diff --git a/src/admin_rpc.rs b/src/admin_rpc.rs index 29037c6c..8e278dbe 100644 --- a/src/admin_rpc.rs +++ b/src/admin_rpc.rs @@ -4,6 +4,7 @@ use serde::{Deserialize, Serialize}; use crate::data::*; use crate::error::Error; +use crate::rpc_client::*; use crate::rpc_server::*; use crate::server::Garage; use crate::table::*; @@ -11,11 +12,13 @@ use crate::*; use crate::bucket_table::*; +pub const ADMIN_RPC_TIMEOUT: Duration = Duration::from_secs(30); pub const ADMIN_RPC_PATH: &str = "_admin"; #[derive(Debug, Serialize, Deserialize)] pub enum AdminRPC { BucketOperation(BucketOperation), + LaunchRepair(bool), // Replies Ok(String), @@ -27,11 +30,13 @@ impl RpcMessage for AdminRPC {} pub struct AdminRpcHandler { garage: Arc, + rpc_client: Arc>, } impl AdminRpcHandler { pub fn new(garage: Arc) -> Arc { - Arc::new(Self { garage }) + let rpc_client = garage.system.clone().rpc_client::(ADMIN_RPC_PATH); + Arc::new(Self { garage, rpc_client }) } pub fn register_handler(self: Arc, rpc_server: &mut RpcServer) { @@ -40,6 +45,9 @@ impl AdminRpcHandler { async move { match msg { AdminRPC::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRPC::LaunchRepair(repair_all) => { + self2.handle_launch_repair(repair_all).await + } _ => Err(Error::Message(format!("Invalid RPC"))), } } @@ -143,4 +151,35 @@ impl AdminRpcHandler { } } } + + async fn handle_launch_repair(&self, repair_all: bool) -> Result { + if repair_all { + let mut failures = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.config.members.keys() { + if self + .rpc_client + .call(node, AdminRPC::LaunchRepair(false), ADMIN_RPC_TIMEOUT) + .await + .is_err() + { + failures.push(node.clone()); + } + } + if failures.is_empty() { + Ok(AdminRPC::Ok(format!("Repair launched on all nodes"))) + } else { + Err(Error::Message(format!( + "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", + failures + ))) + } + } else { + self.garage.block_manager.launch_repair().await?; + Ok(AdminRPC::Ok(format!( + "Repair launched on {:?}", + self.garage.system.id + ))) + } + } } diff --git a/src/block.rs b/src/block.rs index 489dc33e..4ad74d76 100644 --- a/src/block.rs +++ b/src/block.rs @@ -17,6 +17,7 @@ use crate::membership::System; use crate::rpc_client::*; use crate::rpc_server::*; +use crate::block_ref_table::*; use crate::server::Garage; pub const INLINE_THRESHOLD: usize = 3072; @@ -356,6 +357,89 @@ impl BlockManager { .await?; Ok(()) } + + pub async fn launch_repair(self: &Arc) -> Result<(), Error> { + let self2 = self.clone(); + self.system + .background + .spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await }) + .await; + Ok(()) + } + + pub async fn repair_worker( + self: Arc, + must_exit: watch::Receiver, + ) -> Result<(), Error> { + // 1. Repair blocks from RC table + let garage = self.garage.load_full().unwrap(); + let mut last_hash = None; + let mut i = 0usize; + for entry in garage.block_ref_table.store.iter() { + let (_k, v_bytes) = entry?; + let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(v_bytes.as_ref())?; + if Some(&block_ref.block) == last_hash.as_ref() { + continue; + } + if !block_ref.deleted { + last_hash = Some(block_ref.block.clone()); + self.put_to_resync(&block_ref.block, 0)?; + } + i += 1; + if i & 0xFF == 0 && *must_exit.borrow() { + return Ok(()); + } + } + + // 2. Repair blocks actually on disk + let mut ls_data_dir = fs::read_dir(&self.data_dir).await?; + while let Some(data_dir_ent) = ls_data_dir.next().await { + let data_dir_ent = data_dir_ent?; + let dir_name = data_dir_ent.file_name(); + let dir_name = match dir_name.into_string() { + Ok(x) => x, + Err(_) => continue, + }; + if dir_name.len() != 2 || hex::decode(&dir_name).is_err() { + continue; + } + + let mut ls_data_dir_2 = match fs::read_dir(data_dir_ent.path()).await { + Err(e) => { + eprintln!( + "Warning: could not list dir {:?}: {}", + data_dir_ent.path().to_str(), + e + ); + continue; + } + Ok(x) => x, + }; + while let Some(file) = ls_data_dir_2.next().await { + let file = file?; + let file_name = file.file_name(); + let file_name = match file_name.into_string() { + Ok(x) => x, + Err(_) => continue, + }; + if file_name.len() != 64 { + continue; + } + let hash_bytes = match hex::decode(&file_name) { + Ok(h) => h, + Err(_) => continue, + }; + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hash_bytes[..]); + self.put_to_resync(&hash.into(), 0)?; + + if *must_exit.borrow() { + return Ok(()); + } + } + } + Ok(()) + } } fn u64_from_bytes(bytes: &[u8]) -> u64 { diff --git a/src/main.rs b/src/main.rs index 1d582c25..11890e57 100644 --- a/src/main.rs +++ b/src/main.rs @@ -39,8 +39,6 @@ use server::TlsConfig; use admin_rpc::*; -const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); - #[derive(StructOpt, Debug)] #[structopt(name = "garage")] pub struct Opt { @@ -76,6 +74,10 @@ pub enum Command { /// Bucket operations #[structopt(name = "bucket")] Bucket(BucketOperation), + + /// Start repair of node data + #[structopt(name = "repair")] + Repair(RepairOpt), } #[derive(StructOpt, Debug)] @@ -179,6 +181,13 @@ pub struct PermBucketOpt { pub bucket: String, } +#[derive(Serialize, Deserialize, StructOpt, Debug)] +pub struct RepairOpt { + /// Launch repair operation on all nodes + #[structopt(long = "all")] + pub all: bool, +} + #[tokio::main] async fn main() { let opt = Opt::from_args(); @@ -222,6 +231,9 @@ async fn main() { Command::Bucket(bo) => { cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::BucketOperation(bo)).await } + Command::Repair(ro) => { + cmd_admin(admin_rpc_cli, opt.rpc_host, AdminRPC::LaunchRepair(ro.all)).await + } }; if let Err(e) = resp { @@ -231,14 +243,14 @@ async fn main() { async fn cmd_status(rpc_cli: RpcAddrClient, rpc_host: SocketAddr) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) + .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) .await? { Message::AdvertiseNodesUp(nodes) => nodes, resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), }; let config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) .await? { Message::AdvertiseConfig(cfg) => cfg, @@ -290,7 +302,7 @@ async fn cmd_configure( args: ConfigureNodeOpt, ) -> Result<(), Error> { let status = match rpc_cli - .call(&rpc_host, &Message::PullStatus, DEFAULT_TIMEOUT) + .call(&rpc_host, &Message::PullStatus, ADMIN_RPC_TIMEOUT) .await? { Message::AdvertiseNodesUp(nodes) => nodes, @@ -311,7 +323,7 @@ async fn cmd_configure( } let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) .await? { Message::AdvertiseConfig(cfg) => cfg, @@ -331,7 +343,7 @@ async fn cmd_configure( .call( &rpc_host, &Message::AdvertiseConfig(config), - DEFAULT_TIMEOUT, + ADMIN_RPC_TIMEOUT, ) .await?; Ok(()) @@ -343,7 +355,7 @@ async fn cmd_remove( args: RemoveNodeOpt, ) -> Result<(), Error> { let mut config = match rpc_cli - .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .call(&rpc_host, &Message::PullConfig, ADMIN_RPC_TIMEOUT) .await? { Message::AdvertiseConfig(cfg) => cfg, @@ -377,7 +389,7 @@ async fn cmd_remove( .call( &rpc_host, &Message::AdvertiseConfig(config), - DEFAULT_TIMEOUT, + ADMIN_RPC_TIMEOUT, ) .await?; Ok(()) @@ -388,7 +400,7 @@ async fn cmd_admin( rpc_host: SocketAddr, args: AdminRPC, ) -> Result<(), Error> { - match rpc_cli.call(&rpc_host, args, DEFAULT_TIMEOUT).await? { + match rpc_cli.call(&rpc_host, args, ADMIN_RPC_TIMEOUT).await? { AdminRPC::Ok(msg) => { println!("{}", msg); } diff --git a/src/table_sync.rs b/src/table_sync.rs index e394ba0d..2fb5de77 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -220,11 +220,17 @@ where }) .collect::>(); + let mut n_errors = 0; while let Some(r) = sync_futures.next().await { if let Err(e) = r { + n_errors += 1; eprintln!("({}) Sync error: {}", self.table.name, e); } } + if n_errors > self.table.replication.max_write_errors() { + return Err(Error::Message(format!("Sync failed with too many nodes."))); + } + if !partition.retain { self.table .delete_range(&partition.begin, &partition.end)