WIP: Make replication mode part of cluster layout #819
11 changed files with 65 additions and 178 deletions
|
@ -40,9 +40,6 @@ pub struct Garage {
|
||||||
/// The set of background variables that can be viewed/modified at runtime
|
/// The set of background variables that can be viewed/modified at runtime
|
||||||
pub bg_vars: vars::BgVars,
|
pub bg_vars: vars::BgVars,
|
||||||
|
|
||||||
/// The replication factor of this cluster
|
|
||||||
pub replication_factor: ReplicationFactor,
|
|
||||||
|
|
||||||
/// The local database
|
/// The local database
|
||||||
pub db: db::Db,
|
pub db: db::Db,
|
||||||
/// The membership manager
|
/// The membership manager
|
||||||
|
@ -143,26 +140,23 @@ impl Garage {
|
||||||
.and_then(|x| NetworkKey::from_slice(&x))
|
.and_then(|x| NetworkKey::from_slice(&x))
|
||||||
.ok_or_message("Invalid RPC secret key")?;
|
.ok_or_message("Invalid RPC secret key")?;
|
||||||
|
|
||||||
let (replication_factor, consistency_mode) = parse_replication_mode(&config)?;
|
let consistency_mode = ConsistencyMode::parse(&config.consistency_mode)
|
||||||
|
.ok_or_message("Invalid consistency_mode in config file.")?;
|
||||||
|
|
||||||
info!("Initialize background variable system...");
|
info!("Initialize background variable system...");
|
||||||
let mut bg_vars = vars::BgVars::new();
|
let mut bg_vars = vars::BgVars::new();
|
||||||
|
|
||||||
info!("Initialize membership management system...");
|
info!("Initialize membership management system...");
|
||||||
let system = System::new(network_key, replication_factor, consistency_mode, &config)?;
|
let system = System::new(network_key, consistency_mode, &config)?;
|
||||||
|
|
||||||
let data_rep_param = TableShardedReplication {
|
let data_rep_param = TableShardedReplication {
|
||||||
system: system.clone(),
|
layout_manager: system.layout_manager.clone(),
|
||||||
replication_factor: replication_factor.into(),
|
consistency_mode,
|
||||||
write_quorum: replication_factor.write_quorum(consistency_mode),
|
|
||||||
read_quorum: 1,
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let meta_rep_param = TableShardedReplication {
|
let meta_rep_param = TableShardedReplication {
|
||||||
system: system.clone(),
|
layout_manager: system.layout_manager.clone(),
|
||||||
replication_factor: replication_factor.into(),
|
consistency_mode,
|
||||||
write_quorum: replication_factor.write_quorum(consistency_mode),
|
|
||||||
read_quorum: replication_factor.read_quorum(consistency_mode),
|
|
||||||
};
|
};
|
||||||
|
|
||||||
let control_rep_param = TableFullReplication {
|
let control_rep_param = TableFullReplication {
|
||||||
|
@ -259,7 +253,6 @@ impl Garage {
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
config,
|
config,
|
||||||
bg_vars,
|
bg_vars,
|
||||||
replication_factor,
|
|
||||||
db,
|
db,
|
||||||
system,
|
system,
|
||||||
block_manager,
|
block_manager,
|
||||||
|
|
|
@ -28,7 +28,6 @@ pub struct SyncLayoutDigest {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct LayoutHelper {
|
pub struct LayoutHelper {
|
||||||
replication_factor: ReplicationFactor,
|
|
||||||
consistency_mode: ConsistencyMode,
|
consistency_mode: ConsistencyMode,
|
||||||
layout: Option<LayoutHistory>,
|
layout: Option<LayoutHistory>,
|
||||||
|
|
||||||
|
@ -51,7 +50,6 @@ pub struct LayoutHelper {
|
||||||
|
|
||||||
impl LayoutHelper {
|
impl LayoutHelper {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
replication_factor: ReplicationFactor,
|
|
||||||
consistency_mode: ConsistencyMode,
|
consistency_mode: ConsistencyMode,
|
||||||
mut layout: LayoutHistory,
|
mut layout: LayoutHistory,
|
||||||
mut ack_lock: HashMap<u64, AtomicUsize>,
|
mut ack_lock: HashMap<u64, AtomicUsize>,
|
||||||
|
@ -97,8 +95,7 @@ impl LayoutHelper {
|
||||||
// consistency on those).
|
// consistency on those).
|
||||||
// This value is calculated using quorums to allow progress even
|
// This value is calculated using quorums to allow progress even
|
||||||
// if not all nodes have successfully completed a sync.
|
// if not all nodes have successfully completed a sync.
|
||||||
let sync_map_min =
|
let sync_map_min = layout.calculate_sync_map_min_with_quorum(&all_nongateway_nodes);
|
||||||
layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
|
|
||||||
|
|
||||||
let trackers_hash = layout.calculate_trackers_hash();
|
let trackers_hash = layout.calculate_trackers_hash();
|
||||||
let staging_hash = layout.calculate_staging_hash();
|
let staging_hash = layout.calculate_staging_hash();
|
||||||
|
@ -111,7 +108,6 @@ impl LayoutHelper {
|
||||||
let is_check_ok = layout.check().is_ok();
|
let is_check_ok = layout.check().is_ok();
|
||||||
|
|
||||||
LayoutHelper {
|
LayoutHelper {
|
||||||
replication_factor,
|
|
||||||
consistency_mode,
|
consistency_mode,
|
||||||
layout: Some(layout),
|
layout: Some(layout),
|
||||||
ack_map_min,
|
ack_map_min,
|
||||||
|
@ -134,7 +130,6 @@ impl LayoutHelper {
|
||||||
let changed = f(self.layout.as_mut().unwrap());
|
let changed = f(self.layout.as_mut().unwrap());
|
||||||
if changed {
|
if changed {
|
||||||
*self = Self::new(
|
*self = Self::new(
|
||||||
self.replication_factor,
|
|
||||||
self.consistency_mode,
|
self.consistency_mode,
|
||||||
self.layout.take().unwrap(),
|
self.layout.take().unwrap(),
|
||||||
std::mem::take(&mut self.ack_lock),
|
std::mem::take(&mut self.ack_lock),
|
||||||
|
|
|
@ -9,8 +9,8 @@ use super::*;
|
||||||
use crate::replication_mode::*;
|
use crate::replication_mode::*;
|
||||||
|
|
||||||
impl LayoutHistory {
|
impl LayoutHistory {
|
||||||
pub fn new(replication_factor: ReplicationFactor) -> Self {
|
pub fn new() -> Self {
|
||||||
let version = LayoutVersion::new(replication_factor.into());
|
let version = LayoutVersion::new(3);
|
||||||
|
|
||||||
let staging = LayoutStaging {
|
let staging = LayoutStaging {
|
||||||
parameters: Lww::<LayoutParameters>::new(version.parameters),
|
parameters: Lww::<LayoutParameters>::new(version.parameters),
|
||||||
|
@ -123,11 +123,7 @@ impl LayoutHistory {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn calculate_sync_map_min_with_quorum(
|
pub(crate) fn calculate_sync_map_min_with_quorum(&self, all_nongateway_nodes: &[Uuid]) -> u64 {
|
||||||
&self,
|
|
||||||
replication_factor: ReplicationFactor,
|
|
||||||
all_nongateway_nodes: &[Uuid],
|
|
||||||
) -> u64 {
|
|
||||||
// This function calculates the minimum layout version from which
|
// This function calculates the minimum layout version from which
|
||||||
// it is safe to read if we want to maintain read-after-write consistency.
|
// it is safe to read if we want to maintain read-after-write consistency.
|
||||||
// In the general case the computation can be a bit expensive so
|
// In the general case the computation can be a bit expensive so
|
||||||
|
@ -139,7 +135,7 @@ impl LayoutHistory {
|
||||||
return self.current().version;
|
return self.current().version;
|
||||||
}
|
}
|
||||||
|
|
||||||
let quorum = replication_factor.write_quorum(ConsistencyMode::Consistent);
|
let quorum = self.current().write_quorum(ConsistencyMode::Consistent);
|
||||||
|
|
||||||
let min_version = self.min_stored();
|
let min_version = self.min_stored();
|
||||||
let global_min = self
|
let global_min = self
|
||||||
|
@ -153,7 +149,11 @@ impl LayoutHistory {
|
||||||
// This is represented by reading from the layout with version
|
// This is represented by reading from the layout with version
|
||||||
// number global_min, the smallest layout version for which all nodes
|
// number global_min, the smallest layout version for which all nodes
|
||||||
// have completed a sync.
|
// have completed a sync.
|
||||||
if quorum == self.current().replication_factor {
|
if self
|
||||||
|
.versions
|
||||||
|
.iter()
|
||||||
|
.all(|v| v.write_quorum(ConsistencyMode::Consistent) == v.replication_factor)
|
||||||
|
{
|
||||||
return global_min;
|
return global_min;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -20,7 +20,6 @@ use crate::system::*;
|
||||||
|
|
||||||
pub struct LayoutManager {
|
pub struct LayoutManager {
|
||||||
node_id: Uuid,
|
node_id: Uuid,
|
||||||
replication_factor: ReplicationFactor,
|
|
||||||
persist_cluster_layout: Persister<LayoutHistory>,
|
persist_cluster_layout: Persister<LayoutHistory>,
|
||||||
|
|
||||||
layout: Arc<RwLock<LayoutHelper>>,
|
layout: Arc<RwLock<LayoutHelper>>,
|
||||||
|
@ -38,38 +37,24 @@ impl LayoutManager {
|
||||||
node_id: NodeID,
|
node_id: NodeID,
|
||||||
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
||||||
peering: Arc<PeeringManager>,
|
peering: Arc<PeeringManager>,
|
||||||
replication_factor: ReplicationFactor,
|
|
||||||
consistency_mode: ConsistencyMode,
|
consistency_mode: ConsistencyMode,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
let persist_cluster_layout: Persister<LayoutHistory> =
|
let persist_cluster_layout: Persister<LayoutHistory> =
|
||||||
Persister::new(&config.metadata_dir, "cluster_layout");
|
Persister::new(&config.metadata_dir, "cluster_layout");
|
||||||
|
|
||||||
let cluster_layout = match persist_cluster_layout.load() {
|
let cluster_layout = match persist_cluster_layout.load() {
|
||||||
Ok(x) => {
|
Ok(x) => x,
|
||||||
if x.current().replication_factor != replication_factor.replication_factor() {
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
|
|
||||||
x.current().replication_factor,
|
|
||||||
replication_factor.replication_factor()
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
x
|
|
||||||
}
|
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
info!(
|
info!(
|
||||||
"No valid previous cluster layout stored ({}), starting fresh.",
|
"No valid previous cluster layout stored ({}), starting fresh.",
|
||||||
e
|
e
|
||||||
);
|
);
|
||||||
LayoutHistory::new(replication_factor)
|
LayoutHistory::new()
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cluster_layout = LayoutHelper::new(
|
let mut cluster_layout =
|
||||||
replication_factor,
|
LayoutHelper::new(consistency_mode, cluster_layout, Default::default());
|
||||||
consistency_mode,
|
|
||||||
cluster_layout,
|
|
||||||
Default::default(),
|
|
||||||
);
|
|
||||||
cluster_layout.update_update_trackers(node_id.into());
|
cluster_layout.update_update_trackers(node_id.into());
|
||||||
|
|
||||||
let layout = Arc::new(RwLock::new(cluster_layout));
|
let layout = Arc::new(RwLock::new(cluster_layout));
|
||||||
|
@ -84,7 +69,6 @@ impl LayoutManager {
|
||||||
|
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
node_id: node_id.into(),
|
node_id: node_id.into(),
|
||||||
replication_factor,
|
|
||||||
persist_cluster_layout,
|
persist_cluster_layout,
|
||||||
layout,
|
layout,
|
||||||
change_notify,
|
change_notify,
|
||||||
|
@ -298,16 +282,6 @@ impl LayoutManager {
|
||||||
adv.update_trackers
|
adv.update_trackers
|
||||||
);
|
);
|
||||||
|
|
||||||
if adv.current().replication_factor != self.replication_factor.replication_factor() {
|
|
||||||
let msg = format!(
|
|
||||||
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
|
|
||||||
adv.current().replication_factor,
|
|
||||||
self.replication_factor.replication_factor()
|
|
||||||
);
|
|
||||||
error!("{}", msg);
|
|
||||||
return Err(Error::Message(msg));
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(new_layout) = self.merge_layout(adv) {
|
if let Some(new_layout) = self.merge_layout(adv) {
|
||||||
debug!("handle_advertise_cluster_layout: some changes were added to the current stuff");
|
debug!("handle_advertise_cluster_layout: some changes were added to the current stuff");
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ use garage_util::crdt::Crdt;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
|
||||||
use crate::layout::*;
|
use crate::layout::*;
|
||||||
use crate::replication_mode::ReplicationFactor;
|
|
||||||
|
|
||||||
// This function checks that the partition size S computed is at least better than the
|
// This function checks that the partition size S computed is at least better than the
|
||||||
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
// one given by a very naive algorithm. To do so, we try to run the naive algorithm
|
||||||
|
@ -121,7 +120,7 @@ fn test_assignment() {
|
||||||
let mut node_capacity_vec = vec![4000, 1000, 2000];
|
let mut node_capacity_vec = vec![4000, 1000, 2000];
|
||||||
let mut node_zone_vec = vec!["A", "B", "C"];
|
let mut node_zone_vec = vec!["A", "B", "C"];
|
||||||
|
|
||||||
let mut cl = LayoutHistory::new(ReplicationFactor::new(3).unwrap());
|
let mut cl = LayoutHistory::new();
|
||||||
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
|
update_layout(&mut cl, &node_capacity_vec, &node_zone_vec, 3);
|
||||||
let v = cl.current().version;
|
let v = cl.current().version;
|
||||||
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
let (mut cl, msg) = cl.apply_staged_changes(Some(v + 1)).unwrap();
|
||||||
|
|
|
@ -11,6 +11,7 @@ use garage_util::error::*;
|
||||||
|
|
||||||
use super::graph_algo::*;
|
use super::graph_algo::*;
|
||||||
use super::*;
|
use super::*;
|
||||||
|
use crate::replication_mode::ConsistencyMode;
|
||||||
|
|
||||||
// The Message type will be used to collect information on the algorithm.
|
// The Message type will be used to collect information on the algorithm.
|
||||||
pub type Message = Vec<String>;
|
pub type Message = Vec<String>;
|
||||||
|
@ -843,4 +844,20 @@ impl LayoutVersion {
|
||||||
|
|
||||||
Ok(msg)
|
Ok(msg)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
|
||||||
|
match consistency_mode {
|
||||||
|
ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
|
||||||
|
ConsistencyMode::Consistent => self.replication_factor.div_ceil(2),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
|
||||||
|
match consistency_mode {
|
||||||
|
ConsistencyMode::Dangerous => 1,
|
||||||
|
ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
|
||||||
|
(self.replication_factor + 1) - self.read_quorum(ConsistencyMode::Consistent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,12 +1,6 @@
|
||||||
use garage_util::config::Config;
|
|
||||||
use garage_util::crdt::AutoCrdt;
|
use garage_util::crdt::AutoCrdt;
|
||||||
use garage_util::error::*;
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Serialize, Deserialize)]
|
|
||||||
#[serde(transparent)]
|
|
||||||
pub struct ReplicationFactor(usize);
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Default, Serialize, Deserialize)]
|
||||||
#[serde(rename_all = "lowercase")]
|
#[serde(rename_all = "lowercase")]
|
||||||
pub enum ConsistencyMode {
|
pub enum ConsistencyMode {
|
||||||
|
@ -28,67 +22,3 @@ impl ConsistencyMode {
|
||||||
impl AutoCrdt for ConsistencyMode {
|
impl AutoCrdt for ConsistencyMode {
|
||||||
const WARN_IF_DIFFERENT: bool = true;
|
const WARN_IF_DIFFERENT: bool = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ReplicationFactor {
|
|
||||||
pub fn new(replication_factor: usize) -> Option<Self> {
|
|
||||||
if replication_factor < 1 {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(Self(replication_factor))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn replication_factor(&self) -> usize {
|
|
||||||
self.0
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn read_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
|
|
||||||
match consistency_mode {
|
|
||||||
ConsistencyMode::Dangerous | ConsistencyMode::Degraded => 1,
|
|
||||||
ConsistencyMode::Consistent => self.replication_factor().div_ceil(2),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn write_quorum(&self, consistency_mode: ConsistencyMode) -> usize {
|
|
||||||
match consistency_mode {
|
|
||||||
ConsistencyMode::Dangerous => 1,
|
|
||||||
ConsistencyMode::Degraded | ConsistencyMode::Consistent => {
|
|
||||||
(self.replication_factor() + 1) - self.read_quorum(ConsistencyMode::Consistent)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::convert::From<ReplicationFactor> for usize {
|
|
||||||
fn from(replication_factor: ReplicationFactor) -> usize {
|
|
||||||
replication_factor.0
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn parse_replication_mode(
|
|
||||||
config: &Config,
|
|
||||||
) -> Result<(ReplicationFactor, ConsistencyMode), Error> {
|
|
||||||
match (&config.replication_mode, config.replication_factor, config.consistency_mode.as_str()) {
|
|
||||||
(Some(replication_mode), None, "consistent") => {
|
|
||||||
tracing::warn!("Legacy config option replication_mode in use. Please migrate to replication_factor and consistency_mode");
|
|
||||||
let parsed_replication_mode = match replication_mode.as_str() {
|
|
||||||
"1" | "none" => Some((ReplicationFactor(1), ConsistencyMode::Consistent)),
|
|
||||||
"2" => Some((ReplicationFactor(2), ConsistencyMode::Consistent)),
|
|
||||||
"2-dangerous" => Some((ReplicationFactor(2), ConsistencyMode::Dangerous)),
|
|
||||||
"3" => Some((ReplicationFactor(3), ConsistencyMode::Consistent)),
|
|
||||||
"3-degraded" => Some((ReplicationFactor(3), ConsistencyMode::Degraded)),
|
|
||||||
"3-dangerous" => Some((ReplicationFactor(3), ConsistencyMode::Dangerous)),
|
|
||||||
_ => None,
|
|
||||||
};
|
|
||||||
Some(parsed_replication_mode.ok_or_message("Invalid replication_mode in config file.")?)
|
|
||||||
},
|
|
||||||
(None, Some(replication_factor), consistency_mode) => {
|
|
||||||
let replication_factor = ReplicationFactor::new(replication_factor)
|
|
||||||
.ok_or_message("Invalid replication_factor in config file.")?;
|
|
||||||
let consistency_mode = ConsistencyMode::parse(consistency_mode)
|
|
||||||
.ok_or_message("Invalid consistency_mode in config file.")?;
|
|
||||||
Some((replication_factor, consistency_mode))
|
|
||||||
}
|
|
||||||
_ => None,
|
|
||||||
}.ok_or_message("Either the legacy replication_mode or replication_level and consistency_mode can be set, not both.")
|
|
||||||
}
|
|
||||||
|
|
|
@ -112,8 +112,6 @@ pub struct System {
|
||||||
|
|
||||||
metrics: ArcSwapOption<SystemMetrics>,
|
metrics: ArcSwapOption<SystemMetrics>,
|
||||||
|
|
||||||
pub(crate) replication_factor: ReplicationFactor,
|
|
||||||
|
|
||||||
/// Path to metadata directory
|
/// Path to metadata directory
|
||||||
pub metadata_dir: PathBuf,
|
pub metadata_dir: PathBuf,
|
||||||
/// Path to data directory
|
/// Path to data directory
|
||||||
|
@ -125,9 +123,6 @@ pub struct NodeStatus {
|
||||||
/// Hostname of the node
|
/// Hostname of the node
|
||||||
pub hostname: Option<String>,
|
pub hostname: Option<String>,
|
||||||
|
|
||||||
/// Replication factor configured on the node
|
|
||||||
pub replication_factor: usize,
|
|
||||||
|
|
||||||
/// Cluster layout digest
|
/// Cluster layout digest
|
||||||
pub layout_digest: RpcLayoutDigest,
|
pub layout_digest: RpcLayoutDigest,
|
||||||
|
|
||||||
|
@ -242,7 +237,6 @@ impl System {
|
||||||
/// Create this node's membership manager
|
/// Create this node's membership manager
|
||||||
pub fn new(
|
pub fn new(
|
||||||
network_key: NetworkKey,
|
network_key: NetworkKey,
|
||||||
replication_factor: ReplicationFactor,
|
|
||||||
consistency_mode: ConsistencyMode,
|
consistency_mode: ConsistencyMode,
|
||||||
config: &Config,
|
config: &Config,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Arc<Self>, Error> {
|
||||||
|
@ -279,11 +273,10 @@ impl System {
|
||||||
netapp.id,
|
netapp.id,
|
||||||
system_endpoint.clone(),
|
system_endpoint.clone(),
|
||||||
peering.clone(),
|
peering.clone(),
|
||||||
replication_factor,
|
|
||||||
consistency_mode,
|
consistency_mode,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
|
let mut local_status = NodeStatus::initial(&layout_manager);
|
||||||
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
|
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir);
|
||||||
|
|
||||||
// ---- if enabled, set up additionnal peer discovery methods ----
|
// ---- if enabled, set up additionnal peer discovery methods ----
|
||||||
|
@ -314,7 +307,6 @@ impl System {
|
||||||
netapp: netapp.clone(),
|
netapp: netapp.clone(),
|
||||||
peering: peering.clone(),
|
peering: peering.clone(),
|
||||||
system_endpoint,
|
system_endpoint,
|
||||||
replication_factor,
|
|
||||||
rpc_listen_addr: config.rpc_bind_addr,
|
rpc_listen_addr: config.rpc_bind_addr,
|
||||||
rpc_public_addr,
|
rpc_public_addr,
|
||||||
bootstrap_peers: config.bootstrap_peers.clone(),
|
bootstrap_peers: config.bootstrap_peers.clone(),
|
||||||
|
@ -429,10 +421,6 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn health(&self) -> ClusterHealth {
|
pub fn health(&self) -> ClusterHealth {
|
||||||
let quorum = self
|
|
||||||
.replication_factor
|
|
||||||
.write_quorum(ConsistencyMode::Consistent);
|
|
||||||
|
|
||||||
// Gather information about running nodes.
|
// Gather information about running nodes.
|
||||||
// Technically, `nodes` contains currently running nodes, as well
|
// Technically, `nodes` contains currently running nodes, as well
|
||||||
// as nodes that this Garage process has been connected to at least
|
// as nodes that this Garage process has been connected to at least
|
||||||
|
@ -465,6 +453,7 @@ impl System {
|
||||||
// Determine the number of partitions that have:
|
// Determine the number of partitions that have:
|
||||||
// - a quorum of up nodes for all write sets (i.e. are available)
|
// - a quorum of up nodes for all write sets (i.e. are available)
|
||||||
// - for which all nodes in all write sets are up (i.e. are fully healthy)
|
// - for which all nodes in all write sets are up (i.e. are fully healthy)
|
||||||
|
let quorum = layout.current().write_quorum(ConsistencyMode::Consistent);
|
||||||
let partitions = layout.current().partitions().collect::<Vec<_>>();
|
let partitions = layout.current().partitions().collect::<Vec<_>>();
|
||||||
let mut partitions_quorum = 0;
|
let mut partitions_quorum = 0;
|
||||||
let mut partitions_all_ok = 0;
|
let mut partitions_all_ok = 0;
|
||||||
|
@ -580,13 +569,6 @@ impl System {
|
||||||
) -> Result<SystemRpc, Error> {
|
) -> Result<SystemRpc, Error> {
|
||||||
let local_info = self.local_status.read().unwrap();
|
let local_info = self.local_status.read().unwrap();
|
||||||
|
|
||||||
if local_info.replication_factor < info.replication_factor {
|
|
||||||
error!("Some node have a higher replication factor ({}) than this one ({}). This is not supported and will lead to data corruption. Shutting down for safety.",
|
|
||||||
info.replication_factor,
|
|
||||||
local_info.replication_factor);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
self.layout_manager
|
self.layout_manager
|
||||||
.handle_advertise_status(from, &info.layout_digest);
|
.handle_advertise_status(from, &info.layout_digest);
|
||||||
|
|
||||||
|
@ -635,7 +617,7 @@ impl System {
|
||||||
.count();
|
.count();
|
||||||
|
|
||||||
let not_configured = !self.cluster_layout().is_check_ok();
|
let not_configured = !self.cluster_layout().is_check_ok();
|
||||||
let no_peers = n_connected < self.replication_factor.into();
|
let no_peers = n_connected < self.cluster_layout().current().replication_factor;
|
||||||
let expected_n_nodes = self.cluster_layout().all_nodes().len();
|
let expected_n_nodes = self.cluster_layout().all_nodes().len();
|
||||||
let bad_peers = n_connected != expected_n_nodes;
|
let bad_peers = n_connected != expected_n_nodes;
|
||||||
|
|
||||||
|
@ -781,14 +763,13 @@ impl EndpointHandler<SystemRpc> for System {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NodeStatus {
|
impl NodeStatus {
|
||||||
fn initial(replication_factor: ReplicationFactor, layout_manager: &LayoutManager) -> Self {
|
fn initial(layout_manager: &LayoutManager) -> Self {
|
||||||
NodeStatus {
|
NodeStatus {
|
||||||
hostname: Some(
|
hostname: Some(
|
||||||
gethostname::gethostname()
|
gethostname::gethostname()
|
||||||
.into_string()
|
.into_string()
|
||||||
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
||||||
),
|
),
|
||||||
replication_factor: replication_factor.into(),
|
|
||||||
layout_digest: layout_manager.layout().digest(),
|
layout_digest: layout_manager.layout().digest(),
|
||||||
meta_disk_avail: None,
|
meta_disk_avail: None,
|
||||||
data_disk_avail: None,
|
data_disk_avail: None,
|
||||||
|
@ -798,7 +779,6 @@ impl NodeStatus {
|
||||||
fn unknown() -> Self {
|
fn unknown() -> Self {
|
||||||
NodeStatus {
|
NodeStatus {
|
||||||
hostname: None,
|
hostname: None,
|
||||||
replication_factor: 0,
|
|
||||||
layout_digest: Default::default(),
|
layout_digest: Default::default(),
|
||||||
meta_disk_avail: None,
|
meta_disk_avail: None,
|
||||||
data_disk_avail: None,
|
data_disk_avail: None,
|
||||||
|
|
|
@ -65,10 +65,13 @@ impl SystemMetrics {
|
||||||
.with_description("Garage build info")
|
.with_description("Garage build info")
|
||||||
.init(),
|
.init(),
|
||||||
_replication_factor: {
|
_replication_factor: {
|
||||||
let replication_factor = system.replication_factor;
|
let system = system.clone();
|
||||||
meter
|
meter
|
||||||
.u64_value_observer("garage_replication_factor", move |observer| {
|
.u64_value_observer("garage_replication_factor", move |observer| {
|
||||||
observer.observe(replication_factor.replication_factor() as u64, &[])
|
observer.observe(
|
||||||
|
system.cluster_layout().current().replication_factor as u64,
|
||||||
|
&[],
|
||||||
|
)
|
||||||
})
|
})
|
||||||
.with_description("Garage replication factor setting")
|
.with_description("Garage replication factor setting")
|
||||||
.init()
|
.init()
|
||||||
|
|
|
@ -1,9 +1,10 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use garage_rpc::layout::*;
|
use garage_rpc::layout::*;
|
||||||
use garage_rpc::system::System;
|
use garage_rpc::replication_mode::ConsistencyMode;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
use crate::replication::sharded::manager::LayoutManager;
|
||||||
use crate::replication::*;
|
use crate::replication::*;
|
||||||
|
|
||||||
/// Sharded replication schema:
|
/// Sharded replication schema:
|
||||||
|
@ -15,42 +16,43 @@ use crate::replication::*;
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TableShardedReplication {
|
pub struct TableShardedReplication {
|
||||||
/// The membership manager of this node
|
/// The membership manager of this node
|
||||||
pub system: Arc<System>,
|
pub layout_manager: Arc<LayoutManager>,
|
||||||
/// How many time each data should be replicated
|
pub consistency_mode: ConsistencyMode,
|
||||||
pub replication_factor: usize,
|
|
||||||
/// How many nodes to contact for a read, should be at most `replication_factor`
|
|
||||||
pub read_quorum: usize,
|
|
||||||
/// How many nodes to contact for a write, should be at most `replication_factor`
|
|
||||||
pub write_quorum: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
|
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
|
||||||
|
|
||||||
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
self.system.cluster_layout().storage_nodes_of(hash)
|
self.layout_manager.layout().storage_nodes_of(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
self.system.cluster_layout().read_nodes_of(hash)
|
self.layout_manager.layout().read_nodes_of(hash)
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
self.read_quorum
|
self.layout_manager
|
||||||
|
.layout()
|
||||||
|
.current()
|
||||||
|
.read_quorum(self.consistency_mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
|
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
|
||||||
self.system.layout_manager.write_sets_of(hash)
|
self.layout_manager.write_sets_of(hash)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
self.write_quorum
|
self.layout_manager
|
||||||
|
.layout()
|
||||||
|
.current()
|
||||||
|
.write_quorum(self.consistency_mode)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn partition_of(&self, hash: &Hash) -> Partition {
|
fn partition_of(&self, hash: &Hash) -> Partition {
|
||||||
self.system.cluster_layout().current().partition_of(hash)
|
self.layout_manager.layout().current().partition_of(hash)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn sync_partitions(&self) -> SyncPartitions {
|
fn sync_partitions(&self) -> SyncPartitions {
|
||||||
let layout = self.system.cluster_layout();
|
let layout = self.layout_manager.layout();
|
||||||
let layout_version = layout.ack_map_min();
|
let layout_version = layout.ack_map_min();
|
||||||
|
|
||||||
let mut partitions = layout
|
let mut partitions = layout
|
||||||
|
|
|
@ -38,12 +38,6 @@ pub struct Config {
|
||||||
)]
|
)]
|
||||||
pub block_size: usize,
|
pub block_size: usize,
|
||||||
|
|
||||||
/// Number of replicas. Can be any positive integer, but uneven numbers are more favorable.
|
|
||||||
/// - 1 for single-node clusters, or to disable replication
|
|
||||||
/// - 3 is the recommended and supported setting.
|
|
||||||
#[serde(default)]
|
|
||||||
pub replication_factor: Option<usize>,
|
|
||||||
|
|
||||||
/// Consistency mode for all for requests through this node
|
/// Consistency mode for all for requests through this node
|
||||||
/// - Degraded -> Disable read quorum
|
/// - Degraded -> Disable read quorum
|
||||||
/// - Dangerous -> Disable read and write quorum
|
/// - Dangerous -> Disable read and write quorum
|
||||||
|
|
Loading…
Reference in a new issue