diff --git a/script/simulate_ring.py b/script/simulate_ring.py index 47d748f..7953785 100755 --- a/script/simulate_ring.py +++ b/script/simulate_ring.py @@ -262,21 +262,21 @@ if __name__ == "__main__": print("------") print("method 2 (custom ring)") - nodes = [('digitale', 'atuin', 4), - ('drosera', 'atuin', 4), - ('datura', 'atuin', 4), - ('io', 'jupiter', 8)] - nodes2 = [('digitale', 'atuin', 8), - ('drosera', 'atuin', 8), - ('datura', 'atuin', 8), - ('io', 'jupiter', 16), - ('isou', 'jupiter', 8), - ('mini', 'grog', 4), - ('mixi', 'grog', 4), - ('moxi', 'grog', 4), - ('modi', 'grog', 4), - ('geant', 'grisou', 16), - ('gipsie', 'grisou', 16), + nodes = [('digitale', 'atuin', 1), + ('drosera', 'atuin', 1), + ('datura', 'atuin', 1), + ('io', 'jupiter', 2)] + nodes2 = [('digitale', 'atuin', 2), + ('drosera', 'atuin', 2), + ('datura', 'atuin', 2), + ('io', 'jupiter', 4), + ('isou', 'jupiter', 2), + ('mini', 'grog', 1), + ('mixi', 'grog', 1), + ('moxi', 'grog', 1), + ('modi', 'grog', 1), + ('geant', 'grisou', 4), + ('gipsie', 'grisou', 4), ] evaluate_method(method2, nodes2) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index f9047b3..44d7122 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -218,12 +218,7 @@ impl System { .unwrap_or("<invalid utf-8>".to_string()), }; - let mut ring = Ring { - config: net_config, - ring: Vec::new(), - n_datacenters: 0, - }; - ring.rebuild_ring(); + let ring = Ring::new(net_config); let (update_ring, ring) = watch::channel(Arc::new(ring)); let rpc_path = MEMBERSHIP_RPC_PATH.to_string(); @@ -531,10 +526,7 @@ impl System { let ring: Arc<Ring> = self.ring.borrow().clone(); if adv.version > ring.config.version { - let mut ring = ring.as_ref().clone(); - - ring.config = adv.clone(); - ring.rebuild_ring(); + let ring = Ring::new(adv.clone()); update_lock.1.broadcast(Arc::new(ring))?; drop(update_lock); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 5ca43ac..906ab9f 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -1,9 +1,22 @@ -use std::collections::HashMap; +use std::collections::{HashMap, HashSet}; +use std::convert::TryInto; use serde::{Deserialize, Serialize}; use garage_util::data::*; +// TODO: make this constant parametrizable in the config file +// For deployments with many nodes it might make sense to bump +// it up to 10. +// Maximum value : 16 +pub const PARTITION_BITS: usize = 8; + +const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); + +// TODO: make this constant paraetrizable in the config file +// (most deployments use a replication factor of 3, so...) +pub const MAX_REPLICATION: usize = 3; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { pub members: HashMap<UUID, NetworkConfigEntry>, @@ -30,96 +43,150 @@ pub struct NetworkConfigEntry { pub struct Ring { pub config: NetworkConfig, pub ring: Vec<RingEntry>, - pub n_datacenters: usize, } #[derive(Clone, Debug)] pub struct RingEntry { pub location: Hash, - pub node: UUID, - datacenter: usize, + pub nodes: [UUID; MAX_REPLICATION], } impl Ring { - pub(crate) fn rebuild_ring(&mut self) { - let mut new_ring = vec![]; - let mut datacenters = vec![]; + pub(crate) fn new(config: NetworkConfig) -> Self { + // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) + let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::<Vec<_>>(); - for (id, config) in self.config.members.iter() { - let datacenter = &config.datacenter; + let datacenters = config + .members + .iter() + .map(|(_id, info)| info.datacenter.as_str()) + .collect::<HashSet<&str>>(); + let n_datacenters = datacenters.len(); - if !datacenters.contains(datacenter) { - datacenters.push(datacenter.to_string()); + // Prepare ring + let mut partitions: Vec<Vec<(&UUID, &NetworkConfigEntry)>> = partitions_idx + .iter() + .map(|_i| Vec::new()) + .collect::<Vec<_>>(); + + // Create MagLev priority queues for each node + let mut queues = config + .members + .iter() + .map(|(node_id, node_info)| { + let mut parts = partitions_idx + .iter() + .map(|i| { + let part_data = + [&u16::to_be_bytes(*i as u16)[..], node_id.as_slice()].concat(); + (*i, fasthash(&part_data[..])) + }) + .collect::<Vec<_>>(); + parts.sort_by_key(|(_i, h)| *h); + let parts_i = parts.iter().map(|(i, _h)| *i).collect::<Vec<_>>(); + (node_id, node_info, parts_i, 0) + }) + .collect::<Vec<_>>(); + + let max_toktok = config + .members + .iter() + .map(|(_, node_info)| node_info.n_tokens) + .fold(0, std::cmp::max); + + // Fill up ring + for rep in 0..MAX_REPLICATION { + queues.sort_by_key(|(ni, _np, _q, _p)| { + let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); + fasthash(&queue_data[..]) + }); + + for (_, _, _, pos) in queues.iter_mut() { + *pos = 0; } - let datacenter_idx = datacenters - .iter() - .enumerate() - .find(|(_, dc)| *dc == datacenter) - .unwrap() - .0; - for i in 0..config.n_tokens { - let location = sha256sum(format!("{} {}", hex::encode(&id), i).as_bytes()); - - new_ring.push(RingEntry { - location: location.into(), - node: *id, - datacenter: datacenter_idx, - }) + let mut remaining = partitions_idx.len(); + while remaining > 0 { + let remaining0 = remaining; + for toktok in 0..max_toktok { + for (node_id, node_info, q, pos) in queues.iter_mut() { + if toktok >= node_info.n_tokens { + continue; + } + for pos2 in *pos..q.len() { + let qv = q[pos2]; + if partitions[qv].len() != rep { + continue; + } + let p_dcs = partitions[qv] + .iter() + .map(|(_id, info)| info.datacenter.as_str()) + .collect::<HashSet<&str>>(); + if !partitions[qv] + .iter() + .any(|(_id, i)| *i.datacenter == node_info.datacenter) + || (p_dcs.len() == n_datacenters + && !partitions[qv].iter().any(|(id, _i)| id == node_id)) + { + partitions[qv].push((node_id, node_info)); + remaining -= 1; + *pos = pos2 + 1; + break; + } + } + } + } + if remaining == remaining0 { + // No progress made, exit + warn!("Could not build ring, not enough nodes configured."); + return Self { + config, + ring: vec![], + }; + } } } - new_ring.sort_unstable_by(|x, y| x.location.cmp(&y.location)); - self.ring = new_ring; - self.n_datacenters = datacenters.len(); + let ring = partitions + .iter() + .enumerate() + .map(|(i, nodes)| { + let top = (i as u16) << (16 - PARTITION_BITS); + let mut hash = [0u8; 32]; + hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]); + let nodes = nodes.iter().map(|(id, _info)| **id).collect::<Vec<UUID>>(); + RingEntry { + location: hash.into(), + nodes: nodes.try_into().unwrap(), + } + }) + .collect::<Vec<_>>(); - // eprintln!("RING: --"); - // for e in self.ring.iter() { - // eprintln!("{:?}", e); - // } - // eprintln!("END --"); + eprintln!("RING: --"); + for e in ring.iter() { + eprintln!("{:?}", e); + } + eprintln!("END --"); + + Self { config, ring } } pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec<UUID> { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::<Vec<_>>(); + if self.ring.len() != 1 << PARTITION_BITS { + warn!("Ring not yet ready, read/writes will be lost"); + return vec![]; } - let start = match self.ring.binary_search_by(|x| x.location.cmp(from)) { - Ok(i) => i, - Err(i) => { - if i == 0 { - self.ring.len() - 1 - } else { - i - 1 - } - } - }; + let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); - self.walk_ring_from_pos(start, n) - } + let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; + let partition = &self.ring[partition_idx]; - fn walk_ring_from_pos(&self, start: usize, n: usize) -> Vec<UUID> { - if n >= self.config.members.len() { - return self.config.members.keys().cloned().collect::<Vec<_>>(); - } + let partition_top = + u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); + assert!(partition_top & PARTITION_MASK_U16 == top & PARTITION_MASK_U16); - let mut ret = vec![]; - let mut datacenters = vec![]; - - let mut delta = 0; - while ret.len() < n { - let i = (start + delta) % self.ring.len(); - delta += 1; - - if !datacenters.contains(&self.ring[i].datacenter) { - ret.push(self.ring[i].node); - datacenters.push(self.ring[i].datacenter); - } else if datacenters.len() == self.n_datacenters && !ret.contains(&self.ring[i].node) { - ret.push(self.ring[i].node); - } - } - - ret + assert!(n <= partition.nodes.len()); + partition.nodes[..n].iter().cloned().collect::<Vec<_>>() } } diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs index 47bdfea..098637d 100644 --- a/src/table/table_sharded.rs +++ b/src/table/table_sharded.rs @@ -44,7 +44,6 @@ impl TableReplication for TableShardedReplication { fn split_points(&self, ring: &Ring) -> Vec<Hash> { let mut ret = vec![]; - ret.push([0u8; 32].into()); for entry in ring.ring.iter() { ret.push(entry.location); }