diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs index c471420c..b6c2fd27 100644 --- a/src/rpc/layout.rs +++ b/src/rpc/layout.rs @@ -5,8 +5,6 @@ use std::collections::HashSet; use bytesize::ByteSize; use itertools::Itertools; -use serde::{Deserialize, Serialize}; - use garage_util::crdt::{AutoCrdt, Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -23,76 +21,196 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS; // The Message type will be used to collect information on the algorithm. type Message = Vec; -/// The layout of the cluster, i.e. the list of roles -/// which are assigned to each cluster node -#[derive(Clone, Debug, Serialize, Deserialize)] -pub struct ClusterLayout { - pub version: u64, +mod v08 { + use crate::ring::CompactNodeType; + use garage_util::crdt::LwwMap; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; - pub replication_factor: usize, + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, - /// This attribute is only used to retain the previously computed partition size, - /// to know to what extent does it change with the layout update. - pub partition_size: u64, - /// Parameters used to compute the assignment currently given by - /// ring_assignment_data - pub parameters: LayoutParameters, + pub replication_factor: usize, + pub roles: LwwMap, - pub roles: LwwMap, + /// node_id_vec: a vector of node IDs with a role assigned + /// in the system (this includes gateway nodes). + /// The order here is different than the vec stored by `roles`, because: + /// 1. non-gateway nodes are first so that they have lower numbers + /// 2. nodes that don't have a role are excluded (but they need to + /// stay in the CRDT as tombstones) + pub node_id_vec: Vec, + /// the assignation of data partitions to node, the values + /// are indices in node_id_vec + #[serde(with = "serde_bytes")] + pub ring_assignation_data: Vec, - /// node_id_vec: a vector of node IDs with a role assigned - /// in the system (this includes gateway nodes). - /// The order here is different than the vec stored by `roles`, because: - /// 1. non-gateway nodes are first so that they have lower numbers holding - /// in u8 (the number of non-gateway nodes is at most 256). - /// 2. nodes that don't have a role are excluded (but they need to - /// stay in the CRDT as tombstones) - pub node_id_vec: Vec, - /// the assignment of data partitions to node, the values - /// are indices in node_id_vec - #[serde(with = "serde_bytes")] - pub ring_assignment_data: Vec, + /// Role changes which are staged for the next version of the layout + pub staging: LwwMap, + pub staging_hash: Hash, + } - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap, - pub staging_hash: Hash, + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRoleV(pub Option); + + /// The user-assigned roles of cluster nodes + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] + pub struct NodeRole { + /// Datacenter at which this entry belong. This information is used to + /// perform a better geodistribution + pub zone: String, + /// The capacity of the node + /// If this is set to None, the node does not participate in storing data for the system + /// and is only active as an API gateway to other nodes + pub capacity: Option, + /// A set of tags to recognize the node + pub tags: Vec, + } + + impl garage_util::migrate::InitialFormat for ClusterLayout {} } -impl garage_util::migrate::InitialFormat for ClusterLayout {} -/// This struct is used to set the parameters to be used in the assignment computation -/// algorithm. It is stored as a Crdt. -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct LayoutParameters { - pub zone_redundancy: usize, +mod v09 { + use super::v08; + use crate::ring::CompactNodeType; + use garage_util::crdt::{Lww, LwwMap}; + use garage_util::data::{Hash, Uuid}; + use serde::{Deserialize, Serialize}; + pub use v08::{NodeRole, NodeRoleV}; + + /// The layout of the cluster, i.e. the list of roles + /// which are assigned to each cluster node + #[derive(Clone, Debug, Serialize, Deserialize)] + pub struct ClusterLayout { + pub version: u64, + + pub replication_factor: usize, + + /// This attribute is only used to retain the previously computed partition size, + /// to know to what extent does it change with the layout update. + pub partition_size: u64, + /// Parameters used to compute the assignment currently given by + /// ring_assignment_data + pub parameters: LayoutParameters, + + pub roles: LwwMap, + + /// see comment in v08::ClusterLayout + pub node_id_vec: Vec, + /// see comment in v08::ClusterLayout + #[serde(with = "serde_bytes")] + pub ring_assignment_data: Vec, + + /// Parameters to be used in the next partition assignment computation. + pub staging_parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub staging_roles: LwwMap, + pub staging_hash: Hash, + } + + /// This struct is used to set the parameters to be used in the assignment computation + /// algorithm. It is stored as a Crdt. + #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)] + pub struct LayoutParameters { + pub zone_redundancy: usize, + } + + impl garage_util::migrate::Migrate for ClusterLayout { + const VERSION_MARKER: &'static [u8] = b"Glayout09"; + + type Previous = v08::ClusterLayout; + + fn migrate(previous: Self::Previous) -> Self { + use itertools::Itertools; + use std::collections::HashSet; + + // In the old layout, capacities are in an arbitrary unit, + // but in the new layout they are in bytes. + // Here we arbitrarily multiply everything by 1G, + // such that 1 old capacity unit = 1GB in the new units. + // This is totally arbitrary and won't work for most users. + let cap_mul = 1024 * 1024 * 1024; + let roles = multiply_all_capacities(previous.roles, cap_mul); + let staging_roles = multiply_all_capacities(previous.staging, cap_mul); + let node_id_vec = previous.node_id_vec; + + // Determine partition size + let mut tmp = previous.ring_assignation_data.clone(); + tmp.sort(); + let partition_size = tmp + .into_iter() + .dedup_with_count() + .map(|(npart, node)| { + roles + .get(&node_id_vec[node as usize]) + .and_then(|p| p.0.as_ref().and_then(|r| r.capacity)) + .unwrap_or(0) / npart as u64 + }) + .min() + .unwrap_or(0); + + // Determine zone redundancy parameter + let zone_redundancy = std::cmp::min( + previous.replication_factor, + roles + .items() + .iter() + .filter_map(|(_, _, r)| r.0.as_ref().map(|p| p.zone.as_str())) + .collect::>() + .len(), + ); + let parameters = LayoutParameters { zone_redundancy }; + + let mut res = Self { + version: previous.version, + replication_factor: previous.replication_factor, + partition_size, + parameters, + roles, + node_id_vec, + ring_assignment_data: previous.ring_assignation_data, + staging_parameters: Lww::new(parameters), + staging_roles, + staging_hash: [0u8; 32].into(), + }; + res.staging_hash = res.calculate_staging_hash(); + res + } + } + + fn multiply_all_capacities( + old_roles: LwwMap, + mul: u64, + ) -> LwwMap { + let mut new_roles = LwwMap::new(); + for (node, ts, role) in old_roles.items() { + let mut role = role.clone(); + if let NodeRoleV(Some(NodeRole { + capacity: Some(ref mut cap), + .. + })) = role + { + *cap = *cap * mul; + } + new_roles.merge_raw(node, *ts, &role); + } + new_roles + } } +pub use v09::*; + impl AutoCrdt for LayoutParameters { const WARN_IF_DIFFERENT: bool = true; } -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct NodeRoleV(pub Option); - impl AutoCrdt for NodeRoleV { const WARN_IF_DIFFERENT: bool = true; } -/// The user-assigned roles of cluster nodes -#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] -pub struct NodeRole { - /// Datacenter at which this entry belong. This information is used to - /// perform a better geodistribution - pub zone: String, - /// The capacity of the node - /// If this is set to None, the node does not participate in storing data for the system - /// and is only active as an API gateway to other nodes - pub capacity: Option, - /// A set of tags to recognize the node - pub tags: Vec, -} - impl NodeRole { pub fn capacity_string(&self) -> String { match self.capacity {