diff --git a/src/model/garage.rs b/src/model/garage.rs index 273690db..863271d5 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -40,9 +40,6 @@ pub struct Garage { /// The set of background variables that can be viewed/modified at runtime pub bg_vars: vars::BgVars, - /// The replication factor of this cluster - pub replication_factor: ReplicationFactor, - /// The local database pub db: db::Db, /// The membership manager @@ -143,26 +140,23 @@ impl Garage { .and_then(|x| NetworkKey::from_slice(&x)) .ok_or_message("Invalid RPC secret key")?; - let (replication_factor, consistency_mode) = parse_replication_mode(&config)?; + let consistency_mode = ConsistencyMode::parse(&config.consistency_mode) + .ok_or_message("Invalid consistency_mode in config file.")?; info!("Initialize background variable system..."); let mut bg_vars = vars::BgVars::new(); info!("Initialize membership management system..."); - let system = System::new(network_key, replication_factor, consistency_mode, &config)?; + let system = System::new(network_key, consistency_mode, &config)?; let data_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: 1, + layout_manager: system.layout_manager.clone(), + consistency_mode, }; let meta_rep_param = TableShardedReplication { - system: system.clone(), - replication_factor: replication_factor.into(), - write_quorum: replication_factor.write_quorum(consistency_mode), - read_quorum: replication_factor.read_quorum(consistency_mode), + layout_manager: system.layout_manager.clone(), + consistency_mode, }; let control_rep_param = TableFullReplication { @@ -259,7 +253,6 @@ impl Garage { Ok(Arc::new(Self { config, bg_vars, - replication_factor, db, system, block_manager, diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 3a033ab2..5512bae5 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -28,7 +28,6 @@ pub struct SyncLayoutDigest { } pub struct LayoutHelper { - replication_factor: ReplicationFactor, consistency_mode: ConsistencyMode, layout: Option, @@ -51,7 +50,6 @@ pub struct LayoutHelper { impl LayoutHelper { pub fn new( - replication_factor: ReplicationFactor, consistency_mode: ConsistencyMode, mut layout: LayoutHistory, mut ack_lock: HashMap, @@ -97,8 +95,7 @@ impl LayoutHelper { // consistency on those). // This value is calculated using quorums to allow progress even // if not all nodes have successfully completed a sync. - let sync_map_min = - layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes); + let sync_map_min = layout.calculate_sync_map_min_with_quorum(&all_nongateway_nodes); let trackers_hash = layout.calculate_trackers_hash(); let staging_hash = layout.calculate_staging_hash(); @@ -111,7 +108,6 @@ impl LayoutHelper { let is_check_ok = layout.check().is_ok(); LayoutHelper { - replication_factor, consistency_mode, layout: Some(layout), ack_map_min, @@ -134,7 +130,6 @@ impl LayoutHelper { let changed = f(self.layout.as_mut().unwrap()); if changed { *self = Self::new( - self.replication_factor, self.consistency_mode, self.layout.take().unwrap(), std::mem::take(&mut self.ack_lock), diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index af2cbc63..892f2dea 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -9,8 +9,8 @@ use super::*; use crate::replication_mode::*; impl LayoutHistory { - pub fn new(replication_factor: ReplicationFactor) -> Self { - let version = LayoutVersion::new(replication_factor.into()); + pub fn new() -> Self { + let version = LayoutVersion::new(3); let staging = LayoutStaging { parameters: Lww::::new(version.parameters), @@ -123,11 +123,7 @@ impl LayoutHistory { } } - pub(crate) fn calculate_sync_map_min_with_quorum( - &self, - replication_factor: ReplicationFactor, - all_nongateway_nodes: &[Uuid], - ) -> u64 { + pub(crate) fn calculate_sync_map_min_with_quorum(&self, all_nongateway_nodes: &[Uuid]) -> u64 { // This function calculates the minimum layout version from which // it is safe to read if we want to maintain read-after-write consistency. // In the general case the computation can be a bit expensive so @@ -139,7 +135,7 @@ impl LayoutHistory { return self.current().version; } - let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent); + let quorum = self.current().write_quorum(ConsistencyMode::Consistent); let min_version = self.min_stored(); let global_min = self @@ -153,7 +149,11 @@ impl LayoutHistory { // This is represented by reading from the layout with version // number global_min, the smallest layout version for which all nodes // have completed a sync. - if quorum == self.current().replication_factor { + if self + .versions + .iter() + .all(|v| v.write_quorum(ConsistencyMode::Consistent) == v.replication_factor) + { return global_min; } diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index a0dcf50e..89392f49 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -20,7 +20,6 @@ use crate::system::*; pub struct LayoutManager { node_id: Uuid, - replication_factor: ReplicationFactor, persist_cluster_layout: Persister, layout: Arc>, @@ -38,38 +37,24 @@ impl LayoutManager { node_id: NodeID, system_endpoint: Arc>, peering: Arc, - replication_factor: ReplicationFactor, consistency_mode: ConsistencyMode, ) -> Result, Error> { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); let cluster_layout = match persist_cluster_layout.load() { - Ok(x) => { - if x.current().replication_factor != replication_factor.replication_factor() { - return Err(Error::Message(format!( - "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", - x.current().replication_factor, - replication_factor.replication_factor() - ))); - } - x - } + Ok(x) => x, Err(e) => { info!( "No valid previous cluster layout stored ({}), starting fresh.", e ); - LayoutHistory::new(replication_factor) + LayoutHistory::new() } }; - let mut cluster_layout = LayoutHelper::new( - replication_factor, - consistency_mode, - cluster_layout, - Default::default(), - ); + let mut cluster_layout = + LayoutHelper::new(consistency_mode, cluster_layout, Default::default()); cluster_layout.update_update_trackers(node_id.into()); let layout = Arc::new(RwLock::new(cluster_layout)); @@ -84,7 +69,6 @@ impl LayoutManager { Ok(Arc::new(Self { node_id: node_id.into(), - replication_factor, persist_cluster_layout, layout, change_notify, @@ -298,16 +282,6 @@ impl LayoutManager { adv.update_trackers ); - if adv.current().replication_factor != self.replication_factor.replication_factor() { - let msg = format!( - "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", - adv.current().replication_factor, - self.replication_factor.replication_factor() - ); - error!("{}", msg); - return Err(Error::Message(msg)); - } - if let Some(new_layout) = self.merge_layout(adv) { debug!("handle_advertise_cluster_layout: some changes were added to the current stuff"); diff --git a/src/rpc/layout/test.rs b/src/rpc/layout/test.rs index fcbb9dfc..7f91d5ac 100644 --- a/src/rpc/layout/test.rs +++ b/src/rpc/layout/test.rs @@ -5,7 +5,6 @@ use garage_util::crdt::Crdt; use garage_util::error::*; use crate::layout::*; -use crate::replication_mode::ReplicationFactor; // This function checks that the partition size S computed is at least better than the // one given by a very naive algorithm. To do so, we try to run the naive algorithm @@ -121,7 +120,7 @@ fn test_assignment() { let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_zone_vec = vec!["A", "B", "C"]; - let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap()); + let mut cl = LayoutHistory::new(); update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); let v = cl.current().version; let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index ee4b2821..2c434eaa 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -11,6 +11,7 @@ use garage_util::error::*; use super::graph_algo::*; use super::*; +use crate::replication_mode::ConsistencyMode; // The Message type will be used to collect information on the algorithm. pub type Message = Vec; @@ -843,4 +844,20 @@ impl LayoutVersion { Ok(msg) } + + pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize { + match consistency_mode { + ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1, + ConsistencyMode::Consistent => self.replication_factor.div_ceil(2), + } + } + + pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize { + match consistency_mode { + ConsistencyMode::Dangerous => 1, + ConsistencyMode::Degraded | ConsistencyMode::Consistent => { + (self.replication_factor + 1) - self.read_quorum(ConsistencyMode::Consistent) + } + } + } } diff --git a/src/rpc/replication_mode.rs b/src/rpc/replication_mode.rs index a3a94085..520688d2 100644 --- a/src/rpc/replication_mode.rs +++ b/src/rpc/replication_mode.rs @@ -1,12 +1,6 @@ -use garage_util::config::Config; use garage_util::crdt::AutoCrdt; -use garage_util::error::*; use serde::{Deserialize, Serialize}; -#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)] -#[serde(transparent)] -pub struct ReplicationFactor(usize); - #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)] #[serde(rename_all = "lowercase")] pub enum ConsistencyMode { @@ -28,67 +22,3 @@ impl ConsistencyMode { impl AutoCrdt for ConsistencyMode { const WARN_IF_DIFFERENT: bool = true; } - -impl ReplicationFactor { - pub fn new(replication_factor: usize) -> Option { - if replication_factor < 1 { - None - } else { - Some(Self(replication_factor)) - } - } - - pub fn replication_factor(&self) -> usize { - self.0 - } - - pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize { - match consistency_mode { - ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1, - ConsistencyMode::Consistent => self.replication_factor().div_ceil(2), - } - } - - pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize { - match consistency_mode { - ConsistencyMode::Dangerous => 1, - ConsistencyMode::Degraded | ConsistencyMode::Consistent => { - (self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent) - } - } - } -} - -impl std::convert::From for usize { - fn from(replication_factor: ReplicationFactor) -> usize { - replication_factor.0 - } -} - -pub fn parse_replication_mode( - config: &Config, -) -> Result<(ReplicationFactor, ConsistencyMode), Error> { - match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) { - (Some(replication_mode), None, "consistent") => { - tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode"); - let parsed_replication_mode = match replication_mode.as_str() { - "1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)), - "2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)), - "2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)), - "3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)), - "3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)), - "3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)), - _ => None, - }; - Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?) - }, - (None, Some(replication_factor), consistency_mode) => { - let replication_factor = ReplicationFactor::new(replication_factor) - .ok_or_message("Invalid replication_factor in config file.")?; - let consistency_mode = ConsistencyMode::parse(consistency_mode) - .ok_or_message("Invalid consistency_mode in config file.")?; - Some((replication_factor, consistency_mode)) - } - _ => None, - }.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.") -} diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 0e78060b..305e23ad 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -112,8 +112,6 @@ pub struct System { metrics: ArcSwapOption, - pub(crate) replication_factor: ReplicationFactor, - /// Path to metadata directory pub metadata_dir: PathBuf, /// Path to data directory @@ -125,9 +123,6 @@ pub struct NodeStatus { /// Hostname of the node pub hostname: Option, - /// Replication factor configured on the node - pub replication_factor: usize, - /// Cluster layout digest pub layout_digest: RpcLayoutDigest, @@ -242,7 +237,6 @@ impl System { /// Create this node's membership manager pub fn new( network_key: NetworkKey, - replication_factor: ReplicationFactor, consistency_mode: ConsistencyMode, config: &Config, ) -> Result, Error> { @@ -279,11 +273,10 @@ impl System { netapp.id, system_endpoint.clone(), peering.clone(), - replication_factor, consistency_mode, )?; - let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); + let mut local_status = NodeStatus::initial(&layout_manager); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir); // ---- if enabled, set up additionnal peer discovery methods ---- @@ -314,7 +307,6 @@ impl System { netapp: netapp.clone(), peering: peering.clone(), system_endpoint, - replication_factor, rpc_listen_addr: config.rpc_bind_addr, rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), @@ -429,10 +421,6 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let quorum = self - .replication_factor - .write_quorum(ConsistencyMode::Consistent); - // Gather information about running nodes. // Technically, `nodes` contains currently running nodes, as well // as nodes that this Garage process has been connected to at least @@ -465,6 +453,7 @@ impl System { // Determine the number of partitions that have: // - a quorum of up nodes for all write sets (i.e. are available) // - for which all nodes in all write sets are up (i.e. are fully healthy) + let quorum = layout.current().write_quorum(ConsistencyMode::Consistent); let partitions = layout.current().partitions().collect::>(); let mut partitions_quorum = 0; let mut partitions_all_ok = 0; @@ -580,13 +569,6 @@ impl System { ) -> Result { let local_info = self.local_status.read().unwrap(); - if local_info.replication_factor < info.replication_factor { - error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.", - info.replication_factor, - local_info.replication_factor); - std::process::exit(1); - } - self.layout_manager .handle_advertise_status(from, &info.layout_digest); @@ -635,7 +617,7 @@ impl System { .count(); let not_configured = !self.cluster_layout().is_check_ok(); - let no_peers = n_connected < self.replication_factor.into(); + let no_peers = n_connected < self.cluster_layout().current().replication_factor; let expected_n_nodes = self.cluster_layout().all_nodes().len(); let bad_peers = n_connected != expected_n_nodes; @@ -781,14 +763,13 @@ impl EndpointHandler for System { } impl NodeStatus { - fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self { + fn initial(layout_manager: &LayoutManager) -> Self { NodeStatus { hostname: Some( gethostname::gethostname() .into_string() .unwrap_or_else(|_| "".to_string()), ), - replication_factor: replication_factor.into(), layout_digest: layout_manager.layout().digest(), meta_disk_avail: None, data_disk_avail: None, @@ -798,7 +779,6 @@ impl NodeStatus { fn unknown() -> Self { NodeStatus { hostname: None, - replication_factor: 0, layout_digest: Default::default(), meta_disk_avail: None, data_disk_avail: None, diff --git a/src/rpc/system_metrics.rs b/src/rpc/system_metrics.rs index a64daec8..2f29659f 100644 --- a/src/rpc/system_metrics.rs +++ b/src/rpc/system_metrics.rs @@ -65,10 +65,13 @@ impl SystemMetrics { .with_description("Garage build info") .init(), _replication_factor: { - let replication_factor = system.replication_factor; + let system = system.clone(); meter .u64_value_observer("garage_replication_factor", move |observer| { - observer.observe(replication_factor.replication_factor() as u64, &[]) + observer.observe( + system.cluster_layout().current().replication_factor as u64, + &[], + ) }) .with_description("Garage replication factor setting") .init() diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index e0245949..787a01f4 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,9 +1,10 @@ use std::sync::Arc; use garage_rpc::layout::*; -use garage_rpc::system::System; +use garage_rpc::replication_mode::ConsistencyMode; use garage_util::data::*; +use crate::replication::sharded::manager::LayoutManager; use crate::replication::*; /// Sharded replication schema: @@ -15,42 +16,43 @@ use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { /// The membership manager of this node - pub system: Arc, - /// How many time each data should be replicated - pub replication_factor: usize, - /// How many nodes to contact for a read, should be at most `replication_factor` - pub read_quorum: usize, - /// How many nodes to contact for a write, should be at most `replication_factor` - pub write_quorum: usize, + pub layout_manager: Arc, + pub consistency_mode: ConsistencyMode, } impl TableReplication for TableShardedReplication { type WriteSets = WriteLock>>; fn storage_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().storage_nodes_of(hash) + self.layout_manager.layout().storage_nodes_of(hash) } fn read_nodes(&self, hash: &Hash) -> Vec { - self.system.cluster_layout().read_nodes_of(hash) + self.layout_manager.layout().read_nodes_of(hash) } fn read_quorum(&self) -> usize { - self.read_quorum + self.layout_manager + .layout() + .current() + .read_quorum(self.consistency_mode) } fn write_sets(&self, hash: &Hash) -> Self::WriteSets { - self.system.layout_manager.write_sets_of(hash) + self.layout_manager.write_sets_of(hash) } fn write_quorum(&self) -> usize { - self.write_quorum + self.layout_manager + .layout() + .current() + .write_quorum(self.consistency_mode) } fn partition_of(&self, hash: &Hash) -> Partition { - self.system.cluster_layout().current().partition_of(hash) + self.layout_manager.layout().current().partition_of(hash) } fn sync_partitions(&self) -> SyncPartitions { - let layout = self.system.cluster_layout(); + let layout = self.layout_manager.layout(); let layout_version = layout.ack_map_min(); let mut partitions = layout diff --git a/src/util/config.rs b/src/util/config.rs index 028f8c68..f66eb786 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -38,12 +38,6 @@ pub struct Config { )] pub block_size: usize, - /// Number of replicas. Can be any positive integer, but uneven numbers are more favorable. - /// - 1 for single-node clusters, or to disable replication - /// - 3 is the recommended and supported setting. - #[serde(default)] - pub replication_factor: Option, - /// Consistency mode for all for requests through this node /// - Degraded -> Disable read quorum /// - Dangerous -> Disable read and write quorum