diff --git a/src/data.rs b/src/data.rs index 68f81240..c649b289 100644 --- a/src/data.rs +++ b/src/data.rs @@ -15,6 +15,7 @@ pub struct NetworkConfig { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfigEntry { + pub datacenter: String, pub n_tokens: u32, } diff --git a/src/main.rs b/src/main.rs index 922e873e..2cb4b720 100644 --- a/src/main.rs +++ b/src/main.rs @@ -55,6 +55,9 @@ pub struct ConfigureOpt { /// Node to configure (prefix of hexadecimal node id) node_id: String, + /// Location (datacenter) of the node + datacenter: String, + /// Number of tokens n_tokens: u32, } @@ -100,7 +103,7 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro println!("Healthy nodes:"); for adv in status.iter() { if let Some(cfg) = config.members.get(&adv.id) { - println!("{}\t{}\t{}\t{}", hex::encode(adv.id), adv.addr, adv.datacenter, cfg.n_tokens); + println!("{}\t{}\t{}\t{}", hex::encode(adv.id), cfg.datacenter, cfg.n_tokens, adv.addr); } } @@ -109,7 +112,7 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro println!("\nFailed nodes:"); for (id, cfg) in config.members.iter() { if !status.iter().any(|x| x.id == *id) { - println!("{}\t{}", hex::encode(id), cfg.n_tokens); + println!("{}\t{}\t{}", hex::encode(id), cfg.datacenter, cfg.n_tokens); } } } @@ -118,7 +121,7 @@ async fn cmd_status(rpc_cli: RpcClient, rpc_host: SocketAddr) -> Result<(), Erro println!("\nUnconfigured nodes:"); for adv in status.iter() { if !config.members.contains_key(&adv.id) { - println!("{}\t{}\t{}", hex::encode(adv.id), adv.addr, adv.datacenter); + println!("{}\t{}", hex::encode(adv.id), adv.addr); } } } @@ -153,6 +156,7 @@ async fn cmd_configure(rpc_cli: RpcClient, rpc_host: SocketAddr, args: Configure config.members.insert(candidates[0].clone(), NetworkConfigEntry{ + datacenter: args.datacenter, n_tokens: args.n_tokens, }); config.version += 1; diff --git a/src/membership.rs b/src/membership.rs index aa51e0fa..1ce567a7 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -1,4 +1,6 @@ use std::sync::Arc; +use std::hash::Hash as StdHash; +use std::hash::Hasher; use std::path::PathBuf; use std::io::{Read}; use std::collections::HashMap; @@ -34,6 +36,19 @@ pub struct Members { pub status_hash: Hash, pub config: NetworkConfig, + pub ring: Vec, + pub n_datacenters: usize, +} + +pub struct NodeStatus { + pub addr: SocketAddr, + pub remaining_ping_attempts: usize, +} + +pub struct RingEntry { + pub location: Hash, + pub node: UUID, + pub datacenter: u64, } impl Members { @@ -43,7 +58,6 @@ impl Members { NodeStatus{ addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - datacenter: info.datacenter.clone(), }); match old_status { None => { @@ -61,18 +75,78 @@ impl Members { let mut hasher = Sha256::new(); eprintln!("Current set of pingable nodes: --"); for (id, status) in nodes { - eprintln!("{} {} ({})", hex::encode(id), status.addr, status.datacenter); + eprintln!("{} {}", hex::encode(id), status.addr); hasher.input(format!("{} {}\n", hex::encode(id), status.addr)); } eprintln!("END --"); self.status_hash.copy_from_slice(&hasher.result()[..]); } -} -pub struct NodeStatus { - pub addr: SocketAddr, - pub datacenter: String, - pub remaining_ping_attempts: usize, + fn rebuild_ring(&mut self) { + let mut new_ring = vec![]; + let mut datacenters = vec![]; + + for (id, config) in self.config.members.iter() { + let mut dc_hasher = std::collections::hash_map::DefaultHasher::new(); + config.datacenter.hash(&mut dc_hasher); + let datacenter = dc_hasher.finish(); + + if !datacenters.contains(&datacenter) { + datacenters.push(datacenter); + } + + for i in 0..config.n_tokens { + let mut location_hasher = Sha256::new(); + location_hasher.input(format!("{} {}", hex::encode(id), i)); + let mut location = [0u8; 32]; + location.copy_from_slice(&location_hasher.result()[..]); + + new_ring.push(RingEntry{ + location, + node: id.clone(), + datacenter, + }) + } + } + + new_ring.sort_by_key(|x| x.location); + self.ring = new_ring; + self.n_datacenters = datacenters.len(); + } + + fn walk_ring(&self, from: &Hash, n: usize) -> Vec { + if n >= self.config.members.len() { + return self.config.members.keys().cloned().collect::>(); + } + + let start = match self.ring.binary_search_by_key(from, |x| x.location) { + Ok(i) => i, + Err(i) => if i == 0 { + self.ring.len() - 1 + } else { + i - 1 + } + }; + let mut ret = vec![]; + let mut datacenters = vec![]; + + for delta in 0..self.ring.len() { + if ret.len() == n { + break; + } + + let i = (start + delta) % self.ring.len(); + + if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { + ret.push(self.ring[i].node.clone()); + } else if !datacenters.contains(&self.ring[i].datacenter) { + ret.push(self.ring[i].node.clone()); + datacenters.push(self.ring[i].datacenter); + } + } + + ret + } } fn read_network_config(metadata_dir: &PathBuf) -> Result { @@ -106,8 +180,11 @@ impl System { status: HashMap::new(), status_hash: [0u8; 32], config: net_config, + ring: Vec::new(), + n_datacenters: 0, }; members.recalculate_status_hash(); + members.rebuild_ring(); System{ config, id, @@ -135,7 +212,6 @@ impl System { let members = self.members.read().await; Message::Ping(PingMessage{ id: self.id, - datacenter: self.config.datacenter.clone(), rpc_port: self.config.rpc_port, status_hash: members.status_hash.clone(), config_version: members.config.version, @@ -184,7 +260,6 @@ impl System { to_advertise.push(AdvertisedNode{ id: info.id.clone(), addr: addr.clone(), - datacenter: info.datacenter.clone(), }); } if is_new || members.status_hash != info.status_hash { @@ -247,7 +322,6 @@ impl System { mem.push(AdvertisedNode{ id: node.clone(), addr: status.addr.clone(), - datacenter: status.datacenter.clone(), }); } Ok(Message::AdvertiseNodesUp(mem)) @@ -274,7 +348,6 @@ impl System { let old_self = members.status.insert(node.id.clone(), NodeStatus{ addr: self_addr, - datacenter: self.config.datacenter.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, }); has_changed = match old_self { @@ -303,9 +376,11 @@ impl System { { let mut members = self.members.write().await; if adv.version > members.config.version { - members.config = adv.clone(); tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); + + members.config = adv.clone(); self.save_network_config().await; + members.rebuild_ring(); } Ok(Message::Ok) diff --git a/src/proto.rs b/src/proto.rs index 3e679d35..18bc339e 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -21,7 +21,6 @@ pub enum Message { #[derive(Debug, Serialize, Deserialize)] pub struct PingMessage { pub id: UUID, - pub datacenter: String, pub rpc_port: u16, pub status_hash: Hash, @@ -32,5 +31,4 @@ pub struct PingMessage { pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, - pub datacenter: String, } diff --git a/src/server.rs b/src/server.rs index 04f98c65..1450911b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -14,8 +14,6 @@ use crate::rpc_server; #[derive(Deserialize, Debug)] pub struct Config { - pub datacenter: String, - pub metadata_dir: PathBuf, pub data_dir: PathBuf,