use std::fmt; use bytesize::ByteSize; use garage_util::crdt::{AutoCrdt, Crdt}; use garage_util::data::Uuid; mod v08 { use crate::layout::CompactNodeType; use garage_util::crdt::LwwMap; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; /// The layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, pub roles: LwwMap, /// node_id_vec: a vector of node IDs with a role assigned /// in the system (this includes gateway nodes). /// The order here is different than the vec stored by `roles`, because: /// 1. non-gateway nodes are first so that they have lower numbers /// 2. nodes that don't have a role are excluded (but they need to /// stay in the CRDT as tombstones) pub node_id_vec: Vec, /// the assignation of data partitions to node, the values /// are indices in node_id_vec #[serde(with = "serde_bytes")] pub ring_assignation_data: Vec, /// Role changes which are staged for the next version of the layout pub staging: LwwMap, pub staging_hash: Hash, } #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRoleV(pub Option); /// The user-assigned roles of cluster nodes #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] pub struct NodeRole { /// Datacenter at which this entry belong. This information is used to /// perform a better geodistribution pub zone: String, /// The capacity of the node /// If this is set to None, the node does not participate in storing data for the system /// and is only active as an API gateway to other nodes pub capacity: Option, /// A set of tags to recognize the node pub tags: Vec, } impl garage_util::migrate::InitialFormat for ClusterLayout {} } mod v09 { use super::v08; use crate::layout::CompactNodeType; use garage_util::crdt::{Lww, LwwMap}; use garage_util::data::{Hash, Uuid}; use serde::{Deserialize, Serialize}; pub use v08::{NodeRole, NodeRoleV}; /// The layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize)] pub struct ClusterLayout { pub version: u64, pub replication_factor: usize, /// This attribute is only used to retain the previously computed partition size, /// to know to what extent does it change with the layout update. pub partition_size: u64, /// Parameters used to compute the assignment currently given by /// ring_assignment_data pub parameters: LayoutParameters, pub roles: LwwMap, /// see comment in v08::ClusterLayout pub node_id_vec: Vec, /// see comment in v08::ClusterLayout #[serde(with = "serde_bytes")] pub ring_assignment_data: Vec, /// Parameters to be used in the next partition assignment computation. pub staging_parameters: Lww, /// Role changes which are staged for the next version of the layout pub staging_roles: LwwMap, pub staging_hash: Hash, } /// This struct is used to set the parameters to be used in the assignment computation /// algorithm. It is stored as a Crdt. #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] pub struct LayoutParameters { pub zone_redundancy: ZoneRedundancy, } /// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies /// of each partition on at least that number of different zones. /// Otherwise, copies will be stored on the maximum possible number of zones. #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] pub enum ZoneRedundancy { AtLeast(usize), Maximum, } impl garage_util::migrate::Migrate for ClusterLayout { const VERSION_MARKER: &'static [u8] = b"G09layout"; type Previous = v08::ClusterLayout; fn migrate(previous: Self::Previous) -> Self { use itertools::Itertools; // In the old layout, capacities are in an arbitrary unit, // but in the new layout they are in bytes. // Here we arbitrarily multiply everything by 1G, // such that 1 old capacity unit = 1GB in the new units. // This is totally arbitrary and won't work for most users. let cap_mul = 1024 * 1024 * 1024; let roles = multiply_all_capacities(previous.roles, cap_mul); let staging_roles = multiply_all_capacities(previous.staging, cap_mul); let node_id_vec = previous.node_id_vec; // Determine partition size let mut tmp = previous.ring_assignation_data.clone(); tmp.sort(); let partition_size = tmp .into_iter() .dedup_with_count() .map(|(npart, node)| { roles .get(&node_id_vec[node as usize]) .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) .unwrap_or(0) / npart as u64 }) .min() .unwrap_or(0); // By default, zone_redundancy is maximum possible value let parameters = LayoutParameters { zone_redundancy: ZoneRedundancy::Maximum, }; Self { version: previous.version, replication_factor: previous.replication_factor, partition_size, parameters, roles, node_id_vec, ring_assignment_data: previous.ring_assignation_data, staging_parameters: Lww::new(parameters), staging_roles, staging_hash: [0u8; 32].into(), // will be set in the next migration } } } fn multiply_all_capacities( old_roles: LwwMap, mul: u64, ) -> LwwMap { let mut new_roles = LwwMap::new(); for (node, ts, role) in old_roles.items() { let mut role = role.clone(); if let NodeRoleV(Some(NodeRole { capacity: Some(ref mut cap), .. })) = role { *cap *= mul; } new_roles.merge_raw(node, *ts, &role); } new_roles } } mod v010 { use super::v09; use crate::layout::CompactNodeType; use garage_util::crdt::{Lww, LwwMap}; use garage_util::data::Uuid; use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; pub const OLD_VERSION_COUNT: usize = 5; /// The history of cluster layouts, with trackers to keep a record /// of which nodes are up-to-date to current cluster data #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutHistory { /// The versions currently in use in the cluster pub versions: Vec, /// At most 5 of the previous versions, not used by the garage_table /// module, but usefull for the garage_block module to find data blocks /// that have not yet been moved pub old_versions: Vec, /// Update trackers pub update_trackers: UpdateTrackers, /// Staged changes for the next version pub staging: Lww, } /// A version of the layout of the cluster, i.e. the list of roles /// which are assigned to each cluster node #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutVersion { pub version: u64, pub replication_factor: usize, /// This attribute is only used to retain the previously computed partition size, /// to know to what extent does it change with the layout update. pub partition_size: u64, /// Parameters used to compute the assignment currently given by /// ring_assignment_data pub parameters: LayoutParameters, pub roles: LwwMap, /// see comment in v08::ClusterLayout pub node_id_vec: Vec, /// number of non-gateway nodes, which are the first ids in node_id_vec pub nongateway_node_count: usize, /// see comment in v08::ClusterLayout #[serde(with = "serde_bytes")] pub ring_assignment_data: Vec, } /// The staged changes for the next layout version #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutStaging { /// Parameters to be used in the next partition assignment computation. pub parameters: Lww, /// Role changes which are staged for the next version of the layout pub roles: LwwMap, } /// The tracker of acknowlegments and data syncs around the cluster #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTrackers { /// The highest layout version number each node has ack'ed pub ack_map: UpdateTracker, /// The highest layout version number each node has synced data for pub sync_map: UpdateTracker, /// The highest layout version number each node has /// ack'ed that all other nodes have synced data for pub sync_ack_map: UpdateTracker, } /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTracker(pub BTreeMap); impl garage_util::migrate::Migrate for LayoutHistory { const VERSION_MARKER: &'static [u8] = b"G010lh"; type Previous = v09::ClusterLayout; fn migrate(previous: Self::Previous) -> Self { let nongateway_node_count = previous .node_id_vec .iter() .enumerate() .filter(|(_, uuid)| { let role = previous.roles.get(uuid); matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some()) }) .map(|(i, _)| i + 1) .max() .unwrap_or(0); let version = LayoutVersion { version: previous.version, replication_factor: previous.replication_factor, partition_size: previous.partition_size, parameters: previous.parameters, roles: previous.roles, node_id_vec: previous.node_id_vec, nongateway_node_count, ring_assignment_data: previous.ring_assignment_data, }; let update_tracker = UpdateTracker( version .nongateway_nodes() .iter() .copied() .map(|x| (x, version.version)) .collect::>(), ); let staging = LayoutStaging { parameters: previous.staging_parameters, roles: previous.staging_roles, }; Self { versions: vec![version], old_versions: vec![], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, staging: Lww::raw(previous.version, staging), } } } } pub use v010::*; // ---- utility functions ---- impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } impl AutoCrdt for NodeRoleV { const WARN_IF_DIFFERENT: bool = true; } impl Crdt for LayoutStaging { fn merge(&mut self, other: &LayoutStaging) { self.parameters.merge(&other.parameters); self.roles.merge(&other.roles); } } impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity { Some(c) => ByteSize::b(c).to_string_as(false), None => "gateway".to_string(), } } pub fn tags_string(&self) -> String { self.tags.join(",") } } impl fmt::Display for ZoneRedundancy { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { ZoneRedundancy::Maximum => write!(f, "maximum"), ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), } } } impl core::str::FromStr for ZoneRedundancy { type Err = &'static str; fn from_str(s: &str) -> Result { match s { "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), x => { let v = x .parse::() .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; Ok(ZoneRedundancy::AtLeast(v)) } } } } impl UpdateTracker { fn merge(&mut self, other: &UpdateTracker) -> bool { let mut changed = false; for (k, v) in other.0.iter() { if let Some(v_mut) = self.0.get_mut(k) { if *v > *v_mut { *v_mut = *v; changed = true; } } else { self.0.insert(*k, *v); changed = true; } } changed } pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { match self.0.get_mut(&peer) { Some(e) if *e < value => { *e = value; true } None => { self.0.insert(peer, value); true } _ => false, } } pub(crate) fn min(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 { storage_nodes .iter() .map(|x| self.0.get(x).copied().unwrap_or(min_version)) .min() .unwrap_or(min_version) } } impl UpdateTrackers { pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool { let c1 = self.ack_map.merge(&other.ack_map); let c2 = self.sync_map.merge(&other.sync_map); let c3 = self.sync_ack_map.merge(&other.sync_ack_map); c1 || c2 || c3 } }