diff --git a/script/dev-cluster.sh b/script/dev-cluster.sh index 96147ca..bb5f9a0 100755 --- a/script/dev-cluster.sh +++ b/script/dev-cluster.sh @@ -31,9 +31,7 @@ bootstrap_peers = [ "127.0.0.1:3903" ] max_concurrent_rpc_requests = 12 -data_replication_factor = 3 -meta_replication_factor = 3 -meta_epidemic_fanout = 3 +replication_mode = "3" [s3_api] api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part. diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index f2d11bb..b1538a3 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -430,8 +430,8 @@ impl AdminRpcHandler { // Gather ring statistics let ring = self.garage.system.ring.borrow().clone(); let mut ring_nodes = HashMap::new(); - for r in ring.ring.iter() { - for n in r.nodes.iter() { + for (_i, loc) in ring.partitions().iter() { + for n in ring.get_nodes(loc, ring.replication_factor).iter() { if !ring_nodes.contains_key(n) { ring_nodes.insert(*n, 0usize); } diff --git a/src/garage/cli.rs b/src/garage/cli.rs index bfe7e08..42cc657 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -80,6 +80,10 @@ pub struct ConfigureNodeOpt { #[structopt(short = "c", long = "capacity")] capacity: Option, + /// Gateway-only node + #[structopt(short = "g", long = "gateway")] + gateway: bool, + /// Optional node tag #[structopt(short = "t", long = "tag")] tag: Option, @@ -339,7 +343,12 @@ pub async fn cmd_status( if let Some(cfg) = config.members.get(&adv.id) { println!( "{:?}\t{}\t{}\t[{}]\t{}\t{}", - adv.id, adv.state_info.hostname, adv.addr, cfg.tag, cfg.datacenter, cfg.capacity + adv.id, + adv.state_info.hostname, + adv.addr, + cfg.tag, + cfg.datacenter, + cfg.capacity_string() ); } else { println!( @@ -366,7 +375,7 @@ pub async fn cmd_status( adv.addr, cfg.tag, cfg.datacenter, - cfg.capacity, + cfg.capacity_string(), (now_msec() - adv.last_seen) / 1000, ); } @@ -375,7 +384,10 @@ pub async fn cmd_status( if !status.iter().any(|x| x.id == *id) { println!( "{:?}\t{}\t{}\t{}\tnever seen", - id, cfg.tag, cfg.datacenter, cfg.capacity + id, + cfg.tag, + cfg.datacenter, + cfg.capacity_string(), ); } } @@ -438,23 +450,44 @@ pub async fn cmd_configure( } } + if args.capacity.is_some() && args.gateway { + return Err(Error::Message( + "-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into())); + } + if args.capacity == Some(0) { + return Err(Error::Message("Invalid capacity value: 0".into())); + } + let new_entry = match config.members.get(&added_node) { - None => NetworkConfigEntry { - datacenter: args - .datacenter - .expect("Please specifiy a datacenter with the -d flag"), - capacity: args - .capacity - .expect("Please specifiy a capacity with the -c flag"), - tag: args.tag.unwrap_or_default(), - }, - Some(old) => NetworkConfigEntry { - datacenter: args - .datacenter - .unwrap_or_else(|| old.datacenter.to_string()), - capacity: args.capacity.unwrap_or(old.capacity), - tag: args.tag.unwrap_or_else(|| old.tag.to_string()), - }, + None => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + _ => return Err(Error::Message( + "Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())), + }; + NetworkConfigEntry { + datacenter: args + .datacenter + .expect("Please specifiy a datacenter with the -d flag"), + capacity, + tag: args.tag.unwrap_or_default(), + } + } + Some(old) => { + let capacity = match args.capacity { + Some(c) => Some(c), + None if args.gateway => None, + _ => old.capacity, + }; + NetworkConfigEntry { + datacenter: args + .datacenter + .unwrap_or_else(|| old.datacenter.to_string()), + capacity, + tag: args.tag.unwrap_or_else(|| old.tag.to_string()), + } + } }; config.members.insert(added_node, new_entry); diff --git a/src/model/garage.rs b/src/model/garage.rs index 5c6c21f..c359493 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -7,6 +7,7 @@ use garage_rpc::membership::System; use garage_rpc::rpc_client::RpcHttpClient; use garage_rpc::rpc_server::RpcServer; +use garage_table::replication::ReplicationMode; use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; @@ -50,6 +51,9 @@ impl Garage { background: Arc, rpc_server: &mut RpcServer, ) -> Arc { + let replication_mode = ReplicationMode::parse(&config.replication_mode) + .expect("Invalid replication_mode in config file."); + info!("Initialize membership management system..."); let rpc_http_client = Arc::new( RpcHttpClient::new(config.max_concurrent_rpc_requests, &config.rpc_tls) @@ -60,32 +64,33 @@ impl Garage { rpc_http_client, background.clone(), rpc_server, + replication_mode.replication_factor(), ); let data_rep_param = TableShardedReplication { system: system.clone(), - replication_factor: config.data_replication_factor, - write_quorum: (config.data_replication_factor + 1) / 2, + replication_factor: replication_mode.replication_factor(), + write_quorum: replication_mode.write_quorum(), read_quorum: 1, }; let meta_rep_param = TableShardedReplication { system: system.clone(), - replication_factor: config.meta_replication_factor, - write_quorum: (config.meta_replication_factor + 1) / 2, - read_quorum: (config.meta_replication_factor + 1) / 2, + replication_factor: replication_mode.replication_factor(), + write_quorum: replication_mode.write_quorum(), + read_quorum: replication_mode.read_quorum(), }; let control_rep_param = TableFullReplication { system: system.clone(), - max_faults: config.control_write_max_faults, + max_faults: replication_mode.control_write_max_faults(), }; info!("Initialize block manager..."); let block_manager = BlockManager::new( &db, config.data_dir.clone(), - data_rep_param.clone(), + data_rep_param, system.clone(), rpc_server, ); @@ -95,7 +100,7 @@ impl Garage { BlockRefTable { block_manager: block_manager.clone(), }, - data_rep_param, + meta_rep_param.clone(), system.clone(), &db, "block_ref".to_string(), diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index da7dcf8..37cf810 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -95,6 +95,7 @@ pub struct System { rpc_http_client: Arc, rpc_client: Arc>, + replication_factor: usize, pub(crate) status: watch::Receiver>, /// The ring pub ring: watch::Receiver>, @@ -228,6 +229,7 @@ impl System { rpc_http_client: Arc, background: Arc, rpc_server: &mut RpcServer, + replication_factor: usize, ) -> Arc { let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID"); info!("Node ID: {}", hex::encode(&id)); @@ -259,7 +261,7 @@ impl System { .unwrap_or_else(|_| "".to_string()), }; - let ring = Ring::new(net_config); + let ring = Ring::new(net_config, replication_factor); let (update_ring, ring) = watch::channel(Arc::new(ring)); let rpc_path = MEMBERSHIP_RPC_PATH.to_string(); @@ -277,6 +279,7 @@ impl System { state_info, rpc_http_client, rpc_client, + replication_factor, status, ring, update_lock: Mutex::new(Updaters { @@ -543,7 +546,7 @@ impl System { let ring: Arc = self.ring.borrow().clone(); if adv.version > ring.config.version { - let ring = Ring::new(adv.clone()); + let ring = Ring::new(adv.clone(), self.replication_factor); update_lock.update_ring.send(Arc::new(ring))?; drop(update_lock); diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 0f94d0f..daeb25d 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -7,10 +7,9 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; -// A partition number is encoded on 16 bits, -// i.e. we have up to 2**16 partitions. -// (in practice we have exactly 2**PARTITION_BITS partitions) -/// A partition id, stored on 16 bits +/// A partition id, which is stored on 16 bits +/// i.e. we have up to 2**16 partitions. +/// (in practice we have exactly 2**PARTITION_BITS partitions) pub type Partition = u16; // TODO: make this constant parametrizable in the config file @@ -23,11 +22,6 @@ pub const PARTITION_BITS: usize = 8; const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_BITS); -// TODO: make this constant paraetrizable in the config file -// (most deployments use a replication factor of 3, so...) -/// The maximum number of time an object might get replicated -pub const MAX_REPLICATION: usize = 3; - /// The user-defined configuration of the cluster's nodes #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { @@ -53,40 +47,72 @@ pub struct NetworkConfigEntry { /// geodistribution pub datacenter: String, /// The (relative) capacity of the node - pub capacity: u32, + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option, /// A tag to recognize the entry, not used for other things than display pub tag: String, } +impl NetworkConfigEntry { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => format!("{}", c), + None => "gateway".to_string(), + } + } +} + /// A ring distributing fairly objects to nodes #[derive(Clone)] pub struct Ring { + /// The replication factor for this ring + pub replication_factor: usize, + /// The network configuration used to generate this ring pub config: NetworkConfig, - /// The list of entries in the ring - pub ring: Vec, + + // Internal order of nodes used to make a more compact representation of the ring + nodes: Vec, + + // The list of entries in the ring + ring: Vec, } +// Type to store compactly the id of a node in the system +// Change this to u16 the day we want to have more than 256 nodes in a cluster +type CompactNodeType = u8; + +// The maximum number of times an object might get replicated +// This must be at least 3 because Garage supports 3-way replication +// Here we use 6 so that the size of a ring entry is 8 bytes +// (2 bytes partition id, 6 bytes node numbers as u8s) +const MAX_REPLICATION: usize = 6; + /// An entry in the ring #[derive(Clone, Debug)] -pub struct RingEntry { - /// The prefix of the Hash of object which should use this entry - pub location: Hash, - /// The nodes in which a matching object should get stored - pub nodes: [Uuid; MAX_REPLICATION], +struct RingEntry { + // The two first bytes of the first hash that goes in this partition + // (the next bytes are zeroes) + hash_prefix: u16, + // The nodes that store this partition, stored as a list of positions in the `nodes` + // field of the Ring structure + // Only items 0 up to ring.replication_factor - 1 are used, others are zeros + nodes_buf: [CompactNodeType; MAX_REPLICATION], } impl Ring { // TODO this function MUST be refactored, it's 100 lines long, with a 50 lines loop, going up to 6 // levels of imbrication. It is basically impossible to test, maintain, or understand for an // outsider. - pub(crate) fn new(config: NetworkConfig) -> Self { + pub(crate) fn new(config: NetworkConfig, replication_factor: usize) -> Self { // Create a vector of partition indices (0 to 2**PARTITION_BITS-1) let partitions_idx = (0usize..(1usize << PARTITION_BITS)).collect::>(); let datacenters = config .members .iter() + .filter(|(_id, info)| info.capacity.is_some()) .map(|(_id, info)| info.datacenter.as_str()) .collect::>(); let n_datacenters = datacenters.len(); @@ -101,6 +127,7 @@ impl Ring { let mut queues = config .members .iter() + .filter(|(_id, info)| info.capacity.is_some()) .map(|(node_id, node_info)| { let mut parts = partitions_idx .iter() @@ -119,11 +146,13 @@ impl Ring { let max_capacity = config .members .iter() - .map(|(_, node_info)| node_info.capacity) + .filter_map(|(_, node_info)| node_info.capacity) .fold(0, std::cmp::max); + assert!(replication_factor <= MAX_REPLICATION); + // Fill up ring - for rep in 0..MAX_REPLICATION { + for rep in 0..replication_factor { queues.sort_by_key(|(ni, _np, _q, _p)| { let queue_data = [&u16::to_be_bytes(rep as u16)[..], ni.as_slice()].concat(); fasthash(&queue_data[..]) @@ -138,7 +167,7 @@ impl Ring { let remaining0 = remaining; for i_round in 0..max_capacity { for (node_id, node_info, q, pos) in queues.iter_mut() { - if i_round >= node_info.capacity { + if i_round >= node_info.capacity.unwrap() { continue; } for (pos2, &qv) in q.iter().enumerate().skip(*pos) { @@ -166,34 +195,58 @@ impl Ring { // No progress made, exit warn!("Could not build ring, not enough nodes configured."); return Self { + replication_factor, config, + nodes: vec![], ring: vec![], }; } } } + // Make a canonical order for nodes + let nodes = config + .members + .iter() + .filter(|(_id, info)| info.capacity.is_some()) + .map(|(id, _)| *id) + .collect::>(); + let nodes_rev = nodes + .iter() + .enumerate() + .map(|(i, id)| (*id, i as CompactNodeType)) + .collect::>(); + let ring = partitions .iter() .enumerate() .map(|(i, nodes)| { let top = (i as u16) << (16 - PARTITION_BITS); - let mut hash = [0u8; 32]; - hash[0..2].copy_from_slice(&u16::to_be_bytes(top)[..]); - let nodes = nodes.iter().map(|(id, _info)| **id).collect::>(); + let nodes = nodes + .iter() + .map(|(id, _info)| *nodes_rev.get(id).unwrap()) + .collect::>(); + assert!(nodes.len() == replication_factor); + let mut nodes_buf = [0u8; MAX_REPLICATION]; + nodes_buf[..replication_factor].copy_from_slice(&nodes[..]); RingEntry { - location: hash.into(), - nodes: nodes.try_into().unwrap(), + hash_prefix: top, + nodes_buf, } }) .collect::>(); - Self { config, ring } + Self { + replication_factor, + config, + nodes, + ring, + } } /// Get the partition in which data would fall on - pub fn partition_of(&self, from: &Hash) -> Partition { - let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); + pub fn partition_of(&self, position: &Hash) -> Partition { + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } @@ -202,7 +255,9 @@ impl Ring { let mut ret = vec![]; for (i, entry) in self.ring.iter().enumerate() { - ret.push((i as u16, entry.location)); + let mut location = [0u8; 32]; + location[..2].copy_from_slice(&u16::to_be_bytes(entry.hash_prefix)[..]); + ret.push((i as u16, location.into())); } if !ret.is_empty() { assert_eq!(ret[0].1, [0u8; 32].into()); @@ -211,28 +266,38 @@ impl Ring { ret } - // TODO rename this function as it no longer walk the ring /// Walk the ring to find the n servers in which data should be replicated - pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { + pub fn get_nodes(&self, position: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); return vec![]; } - let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); - let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; - // TODO why computing two time in the same way and asserting? - assert_eq!(partition_idx, self.partition_of(from) as usize); - + let partition_idx = self.partition_of(position) as usize; let partition = &self.ring[partition_idx]; - let partition_top = - u16::from_be_bytes(partition.location.as_slice()[0..2].try_into().unwrap()); - // TODO is this an assertion on the validity of PARTITION_MASK_U16? If so, it should - // probably be a test more than a runtime assertion - assert_eq!(partition_top & PARTITION_MASK_U16, top & PARTITION_MASK_U16); + let top = u16::from_be_bytes(position.as_slice()[0..2].try_into().unwrap()); + // Check that we haven't messed up our partition table, i.e. that this partition + // table entrey indeed corresponds to the item we are storing + assert_eq!( + partition.hash_prefix & PARTITION_MASK_U16, + top & PARTITION_MASK_U16 + ); - assert!(n <= partition.nodes.len()); - partition.nodes[..n].to_vec() + assert!(n <= self.replication_factor); + partition.nodes_buf[..n] + .iter() + .map(|i| self.nodes[*i as usize]) + .collect::>() + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_ring_entry_size() { + assert_eq!(std::mem::size_of::(), 8); } } diff --git a/src/table/replication/mod.rs b/src/table/replication/mod.rs index dfcb026..19e6772 100644 --- a/src/table/replication/mod.rs +++ b/src/table/replication/mod.rs @@ -1,8 +1,10 @@ mod parameters; mod fullcopy; +mod mode; mod sharded; pub use fullcopy::TableFullReplication; +pub use mode::ReplicationMode; pub use parameters::*; pub use sharded::TableShardedReplication; diff --git a/src/table/replication/mode.rs b/src/table/replication/mode.rs new file mode 100644 index 0000000..3268728 --- /dev/null +++ b/src/table/replication/mode.rs @@ -0,0 +1,47 @@ +pub enum ReplicationMode { + None, + TwoWay, + ThreeWay, +} + +impl ReplicationMode { + pub fn parse(v: &str) -> Option { + match v { + "none" | "1" => Some(Self::None), + "2" => Some(Self::TwoWay), + "3" => Some(Self::ThreeWay), + _ => None, + } + } + + pub fn control_write_max_faults(&self) -> usize { + match self { + Self::None => 0, + _ => 1, + } + } + + pub fn replication_factor(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay => 2, + Self::ThreeWay => 3, + } + } + + pub fn read_quorum(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay => 1, + Self::ThreeWay => 2, + } + } + + pub fn write_quorum(&self) -> usize { + match self { + Self::None => 1, + Self::TwoWay => 2, + Self::ThreeWay => 2, + } + } +} diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 93b95a3..8081b89 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -26,8 +26,8 @@ pub struct TableShardedReplication { impl TableReplication for TableShardedReplication { fn read_nodes(&self, hash: &Hash) -> Vec { - let ring = self.system.ring.borrow().clone(); - ring.walk_ring(&hash, self.replication_factor) + let ring = self.system.ring.borrow(); + ring.get_nodes(&hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum @@ -35,7 +35,7 @@ impl TableReplication for TableShardedReplication { fn write_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow(); - ring.walk_ring(&hash, self.replication_factor) + ring.get_nodes(&hash, self.replication_factor) } fn write_quorum(&self) -> usize { self.write_quorum diff --git a/src/util/config.rs b/src/util/config.rs index 093b385..46b918a 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -15,6 +15,17 @@ pub struct Config { /// Path where to store data. Can be slower, but need higher volume pub data_dir: PathBuf, + /// Size of data blocks to save to disk + #[serde(default = "default_block_size")] + pub block_size: usize, + + /// Replication mode. Supported values: + /// - none, 1 -> no replication + /// - 2 -> 2-way replication + /// - 3 -> 3-way replication + // (we can add more aliases for this later) + pub replication_mode: String, + /// Address to bind for RPC pub rpc_bind_addr: SocketAddr, @@ -26,6 +37,13 @@ pub struct Config { /// Consul service name to use pub consul_service_name: Option, + /// Configuration for RPC TLS + pub rpc_tls: Option, + + /// Max number of concurrent RPC request + #[serde(default = "default_max_concurrent_rpc_requests")] + pub max_concurrent_rpc_requests: usize, + /// Sled cache size, in bytes #[serde(default = "default_sled_cache_capacity")] pub sled_cache_capacity: u64, @@ -34,28 +52,6 @@ pub struct Config { #[serde(default = "default_sled_flush_every_ms")] pub sled_flush_every_ms: u64, - /// Max number of concurrent RPC request - #[serde(default = "default_max_concurrent_rpc_requests")] - pub max_concurrent_rpc_requests: usize, - - /// Size of data blocks to save to disk - #[serde(default = "default_block_size")] - pub block_size: usize, - - #[serde(default = "default_control_write_max_faults")] - pub control_write_max_faults: usize, - - /// How many nodes should hold a copy of meta data - #[serde(default = "default_replication_factor")] - pub meta_replication_factor: usize, - - /// How many nodes should hold a copy of data - #[serde(default = "default_replication_factor")] - pub data_replication_factor: usize, - - /// Configuration for RPC TLS - pub rpc_tls: Option, - /// Configuration for S3 api pub s3_api: ApiConfig, @@ -106,12 +102,6 @@ fn default_max_concurrent_rpc_requests() -> usize { fn default_block_size() -> usize { 1048576 } -fn default_replication_factor() -> usize { - 3 -} -fn default_control_write_max_faults() -> usize { - 1 -} /// Read and parse configuration pub fn read_config(config_file: PathBuf) -> Result {