WIP: Make replication mode part of cluster layout #819

Closed
yuka wants to merge 2 commits from yuka/garage:layout-replication-mode into main
11 changed files with 65 additions and 178 deletions

View file

@ -40,9 +40,6 @@ pub struct Garage {
/// The set of background variables that can be viewed/modified at runtime /// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars, pub bg_vars: vars::BgVars,
/// The replication factor of this cluster
pub replication_factor: ReplicationFactor,
/// The local database /// The local database
pub db: db::Db, pub db: db::Db,
/// The membership manager /// The membership manager
@ -143,26 +140,23 @@ impl Garage {
.and_then(|x| NetworkKey::from_slice(&x)) .and_then(|x| NetworkKey::from_slice(&x))
.ok_or_message("Invalid RPC secret key")?; .ok_or_message("Invalid RPC secret key")?;
let (replication_factor, consistency_mode) = parse_replication_mode(&config)?; let consistency_mode = ConsistencyMode::parse(&config.consistency_mode)
.ok_or_message("Invalid consistency_mode in config file.")?;
info!("Initialize background variable system..."); info!("Initialize background variable system...");
let mut bg_vars = vars::BgVars::new(); let mut bg_vars = vars::BgVars::new();
info!("Initialize membership management system..."); info!("Initialize membership management system...");
let system = System::new(network_key, replication_factor, consistency_mode, &config)?; let system = System::new(network_key, consistency_mode, &config)?;
let data_rep_param = TableShardedReplication { let data_rep_param = TableShardedReplication {
system: system.clone(), layout_manager: system.layout_manager.clone(),
replication_factor: replication_factor.into(), consistency_mode,
write_quorum: replication_factor.write_quorum(consistency_mode),
read_quorum: 1,
}; };
let meta_rep_param = TableShardedReplication { let meta_rep_param = TableShardedReplication {
system: system.clone(), layout_manager: system.layout_manager.clone(),
replication_factor: replication_factor.into(), consistency_mode,
write_quorum: replication_factor.write_quorum(consistency_mode),
read_quorum: replication_factor.read_quorum(consistency_mode),
}; };
let control_rep_param = TableFullReplication { let control_rep_param = TableFullReplication {
@ -259,7 +253,6 @@ impl Garage {
Ok(Arc::new(Self { Ok(Arc::new(Self {
config, config,
bg_vars, bg_vars,
replication_factor,
db, db,
system, system,
block_manager, block_manager,

View file

@ -28,7 +28,6 @@ pub struct SyncLayoutDigest {
} }
pub struct LayoutHelper { pub struct LayoutHelper {
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode, consistency_mode: ConsistencyMode,
layout: Option<LayoutHistory>, layout: Option<LayoutHistory>,
@ -51,7 +50,6 @@ pub struct LayoutHelper {
impl LayoutHelper { impl LayoutHelper {
pub fn new( pub fn new(
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode, consistency_mode: ConsistencyMode,
mut layout: LayoutHistory, mut layout: LayoutHistory,
mut ack_lock: HashMap<u64, AtomicUsize>, mut ack_lock: HashMap<u64, AtomicUsize>,
@ -97,8 +95,7 @@ impl LayoutHelper {
// consistency on those). // consistency on those).
// This value is calculated using quorums to allow progress even // This value is calculated using quorums to allow progress even
// if not all nodes have successfully completed a sync. // if not all nodes have successfully completed a sync.
let sync_map_min = let sync_map_min = layout.calculate_sync_map_min_with_quorum(&all_nongateway_nodes);
layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
let trackers_hash = layout.calculate_trackers_hash(); let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash(); let staging_hash = layout.calculate_staging_hash();
@ -111,7 +108,6 @@ impl LayoutHelper {
let is_check_ok = layout.check().is_ok(); let is_check_ok = layout.check().is_ok();
LayoutHelper { LayoutHelper {
replication_factor,
consistency_mode, consistency_mode,
layout: Some(layout), layout: Some(layout),
ack_map_min, ack_map_min,
@ -134,7 +130,6 @@ impl LayoutHelper {
let changed = f(self.layout.as_mut().unwrap()); let changed = f(self.layout.as_mut().unwrap());
if changed { if changed {
*self = Self::new( *self = Self::new(
self.replication_factor,
self.consistency_mode, self.consistency_mode,
self.layout.take().unwrap(), self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock), std::mem::take(&mut self.ack_lock),

View file

@ -9,8 +9,8 @@ use super::*;
use crate::replication_mode::*; use crate::replication_mode::*;
impl LayoutHistory { impl LayoutHistory {
pub fn new(replication_factor: ReplicationFactor) -> Self { pub fn new() -> Self {
let version = LayoutVersion::new(replication_factor.into()); let version = LayoutVersion::new(3);
let staging = LayoutStaging { let staging = LayoutStaging {
parameters: Lww::<LayoutParameters>::new(version.parameters), parameters: Lww::<LayoutParameters>::new(version.parameters),
@ -123,11 +123,7 @@ impl LayoutHistory {
} }
} }
pub(crate) fn calculate_sync_map_min_with_quorum( pub(crate) fn calculate_sync_map_min_with_quorum(&self, all_nongateway_nodes: &[Uuid]) -> u64 {
&self,
replication_factor: ReplicationFactor,
all_nongateway_nodes: &[Uuid],
) -> u64 {
// This function calculates the minimum layout version from which // This function calculates the minimum layout version from which
// it is safe to read if we want to maintain read-after-write consistency. // it is safe to read if we want to maintain read-after-write consistency.
// In the general case the computation can be a bit expensive so // In the general case the computation can be a bit expensive so
@ -139,7 +135,7 @@ impl LayoutHistory {
return self.current().version; return self.current().version;
} }
let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent); let quorum = self.current().write_quorum(ConsistencyMode::Consistent);
let min_version = self.min_stored(); let min_version = self.min_stored();
let global_min = self let global_min = self
@ -153,7 +149,11 @@ impl LayoutHistory {
// This is represented by reading from the layout with version // This is represented by reading from the layout with version
// number global_min, the smallest layout version for which all nodes // number global_min, the smallest layout version for which all nodes
// have completed a sync. // have completed a sync.
if quorum == self.current().replication_factor { if self
.versions
.iter()
.all(|v| v.write_quorum(ConsistencyMode::Consistent) == v.replication_factor)
{
return global_min; return global_min;
} }

View file

@ -20,7 +20,6 @@ use crate::system::*;
pub struct LayoutManager { pub struct LayoutManager {
node_id: Uuid, node_id: Uuid,
replication_factor: ReplicationFactor,
persist_cluster_layout: Persister<LayoutHistory>, persist_cluster_layout: Persister<LayoutHistory>,
layout: Arc<RwLock<LayoutHelper>>, layout: Arc<RwLock<LayoutHelper>>,
@ -38,38 +37,24 @@ impl LayoutManager {
node_id: NodeID, node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
peering: Arc<PeeringManager>, peering: Arc<PeeringManager>,
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode, consistency_mode: ConsistencyMode,
) -> Result<Arc<Self>, Error> { ) -> Result<Arc<Self>, Error> {
let persist_cluster_layout: Persister<LayoutHistory> = let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout"); Persister::new(&config.metadata_dir, "cluster_layout");
let cluster_layout = match persist_cluster_layout.load() { let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => { Ok(x) => x,
if x.current().replication_factor != replication_factor.replication_factor() {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
replication_factor.replication_factor()
)));
}
x
}
Err(e) => { Err(e) => {
info!( info!(
"No valid previous cluster layout stored ({}), starting fresh.", "No valid previous cluster layout stored ({}), starting fresh.",
e e
); );
LayoutHistory::new(replication_factor) LayoutHistory::new()
} }
}; };
let mut cluster_layout = LayoutHelper::new( let mut cluster_layout =
replication_factor, LayoutHelper::new(consistency_mode, cluster_layout, Default::default());
consistency_mode,
cluster_layout,
Default::default(),
);
cluster_layout.update_update_trackers(node_id.into()); cluster_layout.update_update_trackers(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout)); let layout = Arc::new(RwLock::new(cluster_layout));
@ -84,7 +69,6 @@ impl LayoutManager {
Ok(Arc::new(Self { Ok(Arc::new(Self {
node_id: node_id.into(), node_id: node_id.into(),
replication_factor,
persist_cluster_layout, persist_cluster_layout,
layout, layout,
change_notify, change_notify,
@ -298,16 +282,6 @@ impl LayoutManager {
adv.update_trackers adv.update_trackers
); );
if adv.current().replication_factor != self.replication_factor.replication_factor() {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
self.replication_factor.replication_factor()
);
error!("{}", msg);
return Err(Error::Message(msg));
}
if let Some(new_layout) = self.merge_layout(adv) { if let Some(new_layout) = self.merge_layout(adv) {
debug!("handle_advertise_cluster_layout: some changes were added to the current stuff"); debug!("handle_advertise_cluster_layout: some changes were added to the current stuff");

View file

@ -5,7 +5,6 @@ use garage_util::crdt::Crdt;
use garage_util::error::*; use garage_util::error::*;
use crate::layout::*; use crate::layout::*;
use crate::replication_mode::ReplicationFactor;
// This function checks that the partition size S computed is at least better than the // This function checks that the partition size S computed is at least better than the
// one given by a very naive algorithm. To do so, we try to run the naive algorithm // one given by a very naive algorithm. To do so, we try to run the naive algorithm
@ -121,7 +120,7 @@ fn test_assignment() {
let mut node_capacity_vec = vec![4000, 1000, 2000]; let mut node_capacity_vec = vec![4000, 1000, 2000];
let mut node_zone_vec = vec!["A", "B", "C"]; let mut node_zone_vec = vec!["A", "B", "C"];
let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap()); let mut cl = LayoutHistory::new();
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3); update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
let v = cl.current().version; let v = cl.current().version;
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap(); let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();

View file

@ -11,6 +11,7 @@ use garage_util::error::*;
use super::graph_algo::*; use super::graph_algo::*;
use super::*; use super::*;
use crate::replication_mode::ConsistencyMode;
// The Message type will be used to collect information on the algorithm. // The Message type will be used to collect information on the algorithm.
pub type Message = Vec<String>; pub type Message = Vec<String>;
@ -843,4 +844,20 @@ impl LayoutVersion {
Ok(msg) Ok(msg)
} }
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
ConsistencyMode::Consistent => self.replication_factor.div_ceil(2),
}
}
pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous => 1,
ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
(self.replication_factor + 1) - self.read_quorum(ConsistencyMode::Consistent)
}
}
}
} }

View file

@ -1,12 +1,6 @@
use garage_util::config::Config;
use garage_util::crdt::AutoCrdt; use garage_util::crdt::AutoCrdt;
use garage_util::error::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
#[serde(transparent)]
pub struct ReplicationFactor(usize);
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)] #[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")] #[serde(rename_all = "lowercase")]
pub enum ConsistencyMode { pub enum ConsistencyMode {
@ -28,67 +22,3 @@ impl ConsistencyMode {
impl AutoCrdt for ConsistencyMode { impl AutoCrdt for ConsistencyMode {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
impl ReplicationFactor {
pub fn new(replication_factor: usize) -> Option<Self> {
if replication_factor < 1 {
None
} else {
Some(Self(replication_factor))
}
}
pub fn replication_factor(&self) -> usize {
self.0
}
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
ConsistencyMode::Consistent => self.replication_factor().div_ceil(2),
}
}
pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
match consistency_mode {
ConsistencyMode::Dangerous => 1,
ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
(self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent)
}
}
}
}
impl std::convert::From<ReplicationFactor> for usize {
fn from(replication_factor: ReplicationFactor) -> usize {
replication_factor.0
}
}
pub fn parse_replication_mode(
config: &Config,
) -> Result<(ReplicationFactor, ConsistencyMode), Error> {
match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) {
(Some(replication_mode), None, "consistent") => {
tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode");
let parsed_replication_mode = match replication_mode.as_str() {
"1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)),
"2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)),
"2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)),
"3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)),
"3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)),
"3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)),
_ => None,
};
Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?)
},
(None, Some(replication_factor), consistency_mode) => {
let replication_factor = ReplicationFactor::new(replication_factor)
.ok_or_message("Invalid replication_factor in config file.")?;
let consistency_mode = ConsistencyMode::parse(consistency_mode)
.ok_or_message("Invalid consistency_mode in config file.")?;
Some((replication_factor, consistency_mode))
}
_ => None,
}.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.")
}

View file

@ -112,8 +112,6 @@ pub struct System {
metrics: ArcSwapOption<SystemMetrics>, metrics: ArcSwapOption<SystemMetrics>,
pub(crate) replication_factor: ReplicationFactor,
/// Path to metadata directory /// Path to metadata directory
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
/// Path to data directory /// Path to data directory
@ -125,9 +123,6 @@ pub struct NodeStatus {
/// Hostname of the node /// Hostname of the node
pub hostname: Option<String>, pub hostname: Option<String>,
/// Replication factor configured on the node
pub replication_factor: usize,
/// Cluster layout digest /// Cluster layout digest
pub layout_digest: RpcLayoutDigest, pub layout_digest: RpcLayoutDigest,
@ -242,7 +237,6 @@ impl System {
/// Create this node's membership manager /// Create this node's membership manager
pub fn new( pub fn new(
network_key: NetworkKey, network_key: NetworkKey,
replication_factor: ReplicationFactor,
consistency_mode: ConsistencyMode, consistency_mode: ConsistencyMode,
config: &Config, config: &Config,
) -> Result<Arc<Self>, Error> { ) -> Result<Arc<Self>, Error> {
@ -279,11 +273,10 @@ impl System {
netapp.id, netapp.id,
system_endpoint.clone(), system_endpoint.clone(),
peering.clone(), peering.clone(),
replication_factor,
consistency_mode, consistency_mode,
)?; )?;
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); let mut local_status = NodeStatus::initial(&layout_manager);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
// ---- if enabled, set up additionnal peer discovery methods ---- // ---- if enabled, set up additionnal peer discovery methods ----
@ -314,7 +307,6 @@ impl System {
netapp: netapp.clone(), netapp: netapp.clone(),
peering: peering.clone(), peering: peering.clone(),
system_endpoint, system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
@ -429,10 +421,6 @@ impl System {
} }
pub fn health(&self) -> ClusterHealth { pub fn health(&self) -> ClusterHealth {
let quorum = self
.replication_factor
.write_quorum(ConsistencyMode::Consistent);
// Gather information about running nodes. // Gather information about running nodes.
// Technically, `nodes` contains currently running nodes, as well // Technically, `nodes` contains currently running nodes, as well
// as nodes that this Garage process has been connected to at least // as nodes that this Garage process has been connected to at least
@ -465,6 +453,7 @@ impl System {
// Determine the number of partitions that have: // Determine the number of partitions that have:
// - a quorum of up nodes for all write sets (i.e. are available) // - a quorum of up nodes for all write sets (i.e. are available)
// - for which all nodes in all write sets are up (i.e. are fully healthy) // - for which all nodes in all write sets are up (i.e. are fully healthy)
let quorum = layout.current().write_quorum(ConsistencyMode::Consistent);
let partitions = layout.current().partitions().collect::<Vec<_>>(); let partitions = layout.current().partitions().collect::<Vec<_>>();
let mut partitions_quorum = 0; let mut partitions_quorum = 0;
let mut partitions_all_ok = 0; let mut partitions_all_ok = 0;
@ -580,13 +569,6 @@ impl System {
) -> Result<SystemRpc, Error> { ) -> Result<SystemRpc, Error> {
let local_info = self.local_status.read().unwrap(); let local_info = self.local_status.read().unwrap();
if local_info.replication_factor < info.replication_factor {
error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.",
info.replication_factor,
local_info.replication_factor);
std::process::exit(1);
}
self.layout_manager self.layout_manager
.handle_advertise_status(from, &info.layout_digest); .handle_advertise_status(from, &info.layout_digest);
@ -635,7 +617,7 @@ impl System {
.count(); .count();
let not_configured = !self.cluster_layout().is_check_ok(); let not_configured = !self.cluster_layout().is_check_ok();
let no_peers = n_connected < self.replication_factor.into(); let no_peers = n_connected < self.cluster_layout().current().replication_factor;
let expected_n_nodes = self.cluster_layout().all_nodes().len(); let expected_n_nodes = self.cluster_layout().all_nodes().len();
let bad_peers = n_connected != expected_n_nodes; let bad_peers = n_connected != expected_n_nodes;
@ -781,14 +763,13 @@ impl EndpointHandler<SystemRpc> for System {
} }
impl NodeStatus { impl NodeStatus {
fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self { fn initial(layout_manager: &LayoutManager) -> Self {
NodeStatus { NodeStatus {
hostname: Some( hostname: Some(
gethostname::gethostname() gethostname::gethostname()
.into_string() .into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()), .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
), ),
replication_factor: replication_factor.into(),
layout_digest: layout_manager.layout().digest(), layout_digest: layout_manager.layout().digest(),
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,
@ -798,7 +779,6 @@ impl NodeStatus {
fn unknown() -> Self { fn unknown() -> Self {
NodeStatus { NodeStatus {
hostname: None, hostname: None,
replication_factor: 0,
layout_digest: Default::default(), layout_digest: Default::default(),
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,

View file

@ -65,10 +65,13 @@ impl SystemMetrics {
.with_description("Garage build info") .with_description("Garage build info")
.init(), .init(),
_replication_factor: { _replication_factor: {
let replication_factor = system.replication_factor; let system = system.clone();
meter meter
.u64_value_observer("garage_replication_factor", move |observer| { .u64_value_observer("garage_replication_factor", move |observer| {
observer.observe(replication_factor.replication_factor() as u64, &[]) observer.observe(
system.cluster_layout().current().replication_factor as u64,
&[],
)
}) })
.with_description("Garage replication factor setting") .with_description("Garage replication factor setting")
.init() .init()

View file

@ -1,9 +1,10 @@
use std::sync::Arc; use std::sync::Arc;
use garage_rpc::layout::*; use garage_rpc::layout::*;
use garage_rpc::system::System; use garage_rpc::replication_mode::ConsistencyMode;
use garage_util::data::*; use garage_util::data::*;
use crate::replication::sharded::manager::LayoutManager;
use crate::replication::*; use crate::replication::*;
/// Sharded replication schema: /// Sharded replication schema:
@ -15,42 +16,43 @@ use crate::replication::*;
#[derive(Clone)] #[derive(Clone)]
pub struct TableShardedReplication { pub struct TableShardedReplication {
/// The membership manager of this node /// The membership manager of this node
pub system: Arc<System>, pub layout_manager: Arc<LayoutManager>,
/// How many time each data should be replicated pub consistency_mode: ConsistencyMode,
pub replication_factor: usize,
/// How many nodes to contact for a read, should be at most `replication_factor`
pub read_quorum: usize,
/// How many nodes to contact for a write, should be at most `replication_factor`
pub write_quorum: usize,
} }
impl TableReplication for TableShardedReplication { impl TableReplication for TableShardedReplication {
type WriteSets = WriteLock<Vec<Vec<Uuid>>>; type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> { fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().storage_nodes_of(hash) self.layout_manager.layout().storage_nodes_of(hash)
} }
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> { fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
self.system.cluster_layout().read_nodes_of(hash) self.layout_manager.layout().read_nodes_of(hash)
} }
fn read_quorum(&self) -> usize { fn read_quorum(&self) -> usize {
self.read_quorum self.layout_manager
.layout()
.current()
.read_quorum(self.consistency_mode)
} }
fn write_sets(&self, hash: &Hash) -> Self::WriteSets { fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
self.system.layout_manager.write_sets_of(hash) self.layout_manager.write_sets_of(hash)
} }
fn write_quorum(&self) -> usize { fn write_quorum(&self) -> usize {
self.write_quorum self.layout_manager
.layout()
.current()
.write_quorum(self.consistency_mode)
} }
fn partition_of(&self, hash: &Hash) -> Partition { fn partition_of(&self, hash: &Hash) -> Partition {
self.system.cluster_layout().current().partition_of(hash) self.layout_manager.layout().current().partition_of(hash)
} }
fn sync_partitions(&self) -> SyncPartitions { fn sync_partitions(&self) -> SyncPartitions {
let layout = self.system.cluster_layout(); let layout = self.layout_manager.layout();
let layout_version = layout.ack_map_min(); let layout_version = layout.ack_map_min();
let mut partitions = layout let mut partitions = layout

View file

@ -38,12 +38,6 @@ pub struct Config {
)] )]
pub block_size: usize, pub block_size: usize,
/// Number of replicas. Can be any positive integer, but uneven numbers are more favorable.
/// - 1 for single-node clusters, or to disable replication
/// - 3 is the recommended and supported setting.
#[serde(default)]
pub replication_factor: Option<usize>,
/// Consistency mode for all for requests through this node /// Consistency mode for all for requests through this node
/// - Degraded -> Disable read quorum /// - Degraded -> Disable read quorum
/// - Dangerous -> Disable read and write quorum /// - Dangerous -> Disable read and write quorum