From e8d750175de3daff0876b63c9ae4dcbd047be793 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 16 Apr 2020 17:04:28 +0200 Subject: [PATCH] Implement ring comparison algorithm --- src/main.rs | 71 +++++++++++++++++++++++++++++++++++++++++++- src/server.rs | 9 ++++-- src/table_sync.rs | 75 +++++++++++++++++++++++++++++++++++++++++------ 3 files changed, 142 insertions(+), 13 deletions(-) diff --git a/src/main.rs b/src/main.rs index 8b124bf..ebf97a2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -61,6 +61,10 @@ pub enum Command { /// Configure Garage node #[structopt(name = "configure")] Configure(ConfigureOpt), + + /// Remove Garage node from cluster + #[structopt(name = "remove")] + Remove(RemoveOpt), } #[derive(StructOpt, Debug)] @@ -82,6 +86,16 @@ pub struct ConfigureOpt { n_tokens: u32, } +#[derive(StructOpt, Debug)] +pub struct RemoveOpt { + /// Node to configure (prefix of hexadecimal node id) + node_id: String, + + /// If this flag is not given, the node won't be removed + #[structopt(long = "yes")] + yes: bool, +} + #[tokio::main] async fn main() { let opt = Opt::from_args(); @@ -102,11 +116,20 @@ async fn main() { let rpc_cli = RpcClient::new(&tls_config).expect("Could not create RPC client"); let resp = match opt.cmd { - Command::Server(server_opt) => server::run_server(server_opt.config_file).await, + Command::Server(server_opt) => { + // Abort on panic (same behavior as in Go) + std::panic::set_hook(Box::new(|panic_info| { + eprintln!("{}", panic_info.to_string()); + std::process::abort(); + })); + + server::run_server(server_opt.config_file).await + } Command::Status => cmd_status(rpc_cli, opt.rpc_host).await, Command::Configure(configure_opt) => { cmd_configure(rpc_cli, opt.rpc_host, configure_opt).await } + Command::Remove(remove_opt) => cmd_remove(rpc_cli, opt.rpc_host, remove_opt).await, }; if let Err(e) = resp { @@ -224,3 +247,49 @@ async fn cmd_configure( .await?; Ok(()) } + +async fn cmd_remove( + rpc_cli: RpcClient, + rpc_host: SocketAddr, + args: RemoveOpt, +) -> Result<(), Error> { + let mut config = match rpc_cli + .call(&rpc_host, &Message::PullConfig, DEFAULT_TIMEOUT) + .await? + { + Message::AdvertiseConfig(cfg) => cfg, + resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), + }; + + let mut candidates = vec![]; + for (key, _) in config.members.iter() { + if hex::encode(key).starts_with(&args.node_id) { + candidates.push(key.clone()); + } + } + if candidates.len() != 1 { + return Err(Error::Message(format!( + "{} matching nodes", + candidates.len() + ))); + } + + if !args.yes { + return Err(Error::Message(format!( + "Add the flag --yes to really remove {:?} from the cluster", + candidates[0] + ))); + } + + config.members.remove(&candidates[0]); + config.version += 1; + + rpc_cli + .call( + &rpc_host, + &Message::AdvertiseConfig(config), + DEFAULT_TIMEOUT, + ) + .await?; + Ok(()) +} diff --git a/src/server.rs b/src/server.rs index af58ded..78b992f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -93,7 +93,8 @@ impl Garage { &db, "block_ref".to_string(), data_rep_param.clone(), - ).await; + ) + .await; let version_table = Table::new( VersionTable { background: background.clone(), @@ -103,7 +104,8 @@ impl Garage { &db, "version".to_string(), meta_rep_param.clone(), - ).await; + ) + .await; let object_table = Table::new( ObjectTable { background: background.clone(), @@ -113,7 +115,8 @@ impl Garage { &db, "object".to_string(), meta_rep_param.clone(), - ).await; + ) + .await; let mut garage = Self { db, diff --git a/src/table_sync.rs b/src/table_sync.rs index 5097c1b..039dab6 100644 --- a/src/table_sync.rs +++ b/src/table_sync.rs @@ -1,7 +1,7 @@ use rand::Rng; +use std::collections::BTreeSet; use std::sync::Arc; use std::time::Duration; -use std::collections::BTreeSet; use futures::{pin_mut, select}; use futures_util::future::*; @@ -10,7 +10,7 @@ use tokio::sync::Mutex; use crate::data::*; use crate::error::Error; -use crate::membership::{Ring, System}; +use crate::membership::Ring; use crate::table::*; const SCAN_INTERVAL: Duration = Duration::from_secs(3600); @@ -29,6 +29,7 @@ pub struct SyncTodo { pub struct Partition { pub begin: Hash, pub end: Hash, + pub retain: bool, } impl TableSyncer { @@ -124,7 +125,8 @@ impl TableSyncer { } async fn sync_partition(self: &Arc, partition: &Partition) -> Result<(), Error> { - unimplemented!() + eprintln!("NOT IMPLEMENTED: SYNC PARTITION {:?}", partition); + Ok(()) } } @@ -135,14 +137,17 @@ impl SyncTodo { self.todo.clear(); let ring: Arc = table.system.ring.borrow().clone(); + for i in 0..ring.ring.len() { let nodes = ring.walk_ring_from_pos(i, table.param.replication_factor); let begin = ring.ring[i].location.clone(); + if i == 0 { + self.add_full_scan_aux(table, [0u8; 32].into(), begin.clone(), &nodes[..], &my_id); + } + if i == ring.ring.len() - 1 { - let end = ring.ring[0].location.clone(); self.add_full_scan_aux(table, begin, [0xffu8; 32].into(), &nodes[..], &my_id); - self.add_full_scan_aux(table, [0u8; 32].into(), end, &nodes[..], &my_id); } else { let end = ring.ring[i + 1].location.clone(); self.add_full_scan_aux(table, begin, end, &nodes[..], &my_id); @@ -158,23 +163,75 @@ impl SyncTodo { nodes: &[UUID], my_id: &UUID, ) { - if !nodes.contains(my_id) { + let retain = nodes.contains(my_id); + if !retain { // Check if we have some data to send, otherwise skip if table .store .range(begin.clone()..end.clone()) .next() .is_none() - {} + { + return; + } } - self.todo.push(Partition { begin, end }); + self.todo.push(Partition { begin, end, retain }); } fn add_ring_difference(&mut self, table: &Table, old: &Ring, new: &Ring) { + let my_id = table.system.id.clone(); + let old_ring = ring_points(old); let new_ring = ring_points(new); - unimplemented!() + let both_ring = old_ring.union(&new_ring).cloned().collect::>(); + + let prev_todo_begin = self + .todo + .iter() + .map(|x| x.begin.clone()) + .collect::>(); + let prev_todo_end = self + .todo + .iter() + .map(|x| x.end.clone()) + .collect::>(); + let prev_todo = prev_todo_begin + .union(&prev_todo_end) + .cloned() + .collect::>(); + + let all_points = both_ring.union(&prev_todo).cloned().collect::>(); + + self.todo.sort_by(|x, y| x.begin.cmp(&y.begin)); + let mut new_todo = vec![]; + for i in 0..all_points.len() - 1 { + let begin = all_points[i].clone(); + let end = all_points[i + 1].clone(); + let was_ours = old + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let is_ours = new + .walk_ring(&begin, table.param.replication_factor) + .contains(&my_id); + let was_todo = match self.todo.binary_search_by(|x| x.begin.cmp(&begin)) { + Ok(_) => true, + Err(j) => { + (j > 0 && self.todo[j - 1].begin < end && begin < self.todo[j - 1].end) + || (j < self.todo.len() + && self.todo[j].begin < end && begin < self.todo[j].end) + } + }; + if was_todo || (is_ours && !was_ours) || (was_ours && !is_ours) { + new_todo.push(Partition { + begin, + end, + retain: is_ours, + }); + } + } + + self.todo = new_todo; } fn pop_task(&mut self) -> Option {