layout: refactoring, merge two files
This commit is contained in:
parent
64a6e557a4
commit
4dbf254512
4 changed files with 440 additions and 435 deletions
|
@ -6,7 +6,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
use super::schema::*;
|
use super::*;
|
||||||
use crate::replication_mode::ReplicationMode;
|
use crate::replication_mode::ReplicationMode;
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
||||||
|
|
|
@ -1,7 +1,13 @@
|
||||||
|
use std::fmt;
|
||||||
|
|
||||||
|
use bytesize::ByteSize;
|
||||||
|
|
||||||
|
use garage_util::crdt::{AutoCrdt, Crdt};
|
||||||
|
use garage_util::data::Uuid;
|
||||||
|
|
||||||
mod graph_algo;
|
mod graph_algo;
|
||||||
mod helper;
|
mod helper;
|
||||||
mod history;
|
mod history;
|
||||||
mod schema;
|
|
||||||
mod version;
|
mod version;
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
|
@ -13,7 +19,6 @@ pub mod manager;
|
||||||
|
|
||||||
pub use helper::{LayoutDigest, LayoutHelper};
|
pub use helper::{LayoutDigest, LayoutHelper};
|
||||||
pub use manager::WriteLock;
|
pub use manager::WriteLock;
|
||||||
pub use schema::*;
|
|
||||||
pub use version::*;
|
pub use version::*;
|
||||||
|
|
||||||
// ---- defines: partitions ----
|
// ---- defines: partitions ----
|
||||||
|
@ -39,3 +44,435 @@ const NB_PARTITIONS: usize = 1usize << PARTITION_BITS;
|
||||||
// Change this to u16 the day we want to have more than 256 nodes in a cluster
|
// Change this to u16 the day we want to have more than 256 nodes in a cluster
|
||||||
pub type CompactNodeType = u8;
|
pub type CompactNodeType = u8;
|
||||||
pub const MAX_NODE_NUMBER: usize = 256;
|
pub const MAX_NODE_NUMBER: usize = 256;
|
||||||
|
|
||||||
|
// ======== actual data structures for the layout data ========
|
||||||
|
// ======== that is persisted to disk ========
|
||||||
|
// some small utility impls are at the end of this file,
|
||||||
|
// but most of the code that actually computes stuff is in
|
||||||
|
// version.rs, history.rs and helper.rs
|
||||||
|
|
||||||
|
mod v08 {
|
||||||
|
use crate::layout::CompactNodeType;
|
||||||
|
use garage_util::crdt::LwwMap;
|
||||||
|
use garage_util::data::{Hash, Uuid};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// The layout of the cluster, i.e. the list of roles
|
||||||
|
/// which are assigned to each cluster node
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ClusterLayout {
|
||||||
|
pub version: u64,
|
||||||
|
|
||||||
|
pub replication_factor: usize,
|
||||||
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
|
||||||
|
// see comments in v010::ClusterLayout
|
||||||
|
pub node_id_vec: Vec<Uuid>,
|
||||||
|
#[serde(with = "serde_bytes")]
|
||||||
|
pub ring_assignation_data: Vec<CompactNodeType>,
|
||||||
|
|
||||||
|
/// Role changes which are staged for the next version of the layout
|
||||||
|
pub staging: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
pub staging_hash: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct NodeRoleV(pub Option<NodeRole>);
|
||||||
|
|
||||||
|
/// The user-assigned roles of cluster nodes
|
||||||
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct NodeRole {
|
||||||
|
/// Datacenter at which this entry belong. This information is used to
|
||||||
|
/// perform a better geodistribution
|
||||||
|
pub zone: String,
|
||||||
|
/// The capacity of the node
|
||||||
|
/// If this is set to None, the node does not participate in storing data for the system
|
||||||
|
/// and is only active as an API gateway to other nodes
|
||||||
|
pub capacity: Option<u64>,
|
||||||
|
/// A set of tags to recognize the node
|
||||||
|
pub tags: Vec<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::InitialFormat for ClusterLayout {}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod v09 {
|
||||||
|
use super::v08;
|
||||||
|
use crate::layout::CompactNodeType;
|
||||||
|
use garage_util::crdt::{Lww, LwwMap};
|
||||||
|
use garage_util::data::{Hash, Uuid};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
pub use v08::{NodeRole, NodeRoleV};
|
||||||
|
|
||||||
|
/// The layout of the cluster, i.e. the list of roles
|
||||||
|
/// which are assigned to each cluster node
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct ClusterLayout {
|
||||||
|
pub version: u64,
|
||||||
|
|
||||||
|
pub replication_factor: usize,
|
||||||
|
|
||||||
|
/// This attribute is only used to retain the previously computed partition size,
|
||||||
|
/// to know to what extent does it change with the layout update.
|
||||||
|
pub partition_size: u64,
|
||||||
|
/// Parameters used to compute the assignment currently given by
|
||||||
|
/// ring_assignment_data
|
||||||
|
pub parameters: LayoutParameters,
|
||||||
|
|
||||||
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
|
||||||
|
// see comments in v010::ClusterLayout
|
||||||
|
pub node_id_vec: Vec<Uuid>,
|
||||||
|
#[serde(with = "serde_bytes")]
|
||||||
|
pub ring_assignment_data: Vec<CompactNodeType>,
|
||||||
|
|
||||||
|
/// Parameters to be used in the next partition assignment computation.
|
||||||
|
pub staging_parameters: Lww<LayoutParameters>,
|
||||||
|
/// Role changes which are staged for the next version of the layout
|
||||||
|
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
pub staging_hash: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This struct is used to set the parameters to be used in the assignment computation
|
||||||
|
/// algorithm. It is stored as a Crdt.
|
||||||
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
|
pub struct LayoutParameters {
|
||||||
|
pub zone_redundancy: ZoneRedundancy,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
|
||||||
|
/// of each partition on at least that number of different zones.
|
||||||
|
/// Otherwise, copies will be stored on the maximum possible number of zones.
|
||||||
|
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||||
|
pub enum ZoneRedundancy {
|
||||||
|
AtLeast(usize),
|
||||||
|
Maximum,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::Migrate for ClusterLayout {
|
||||||
|
const VERSION_MARKER: &'static [u8] = b"G09layout";
|
||||||
|
|
||||||
|
type Previous = v08::ClusterLayout;
|
||||||
|
|
||||||
|
fn migrate(previous: Self::Previous) -> Self {
|
||||||
|
use itertools::Itertools;
|
||||||
|
|
||||||
|
// In the old layout, capacities are in an arbitrary unit,
|
||||||
|
// but in the new layout they are in bytes.
|
||||||
|
// Here we arbitrarily multiply everything by 1G,
|
||||||
|
// such that 1 old capacity unit = 1GB in the new units.
|
||||||
|
// This is totally arbitrary and won't work for most users.
|
||||||
|
let cap_mul = 1024 * 1024 * 1024;
|
||||||
|
let roles = multiply_all_capacities(previous.roles, cap_mul);
|
||||||
|
let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
|
||||||
|
let node_id_vec = previous.node_id_vec;
|
||||||
|
|
||||||
|
// Determine partition size
|
||||||
|
let mut tmp = previous.ring_assignation_data.clone();
|
||||||
|
tmp.sort();
|
||||||
|
let partition_size = tmp
|
||||||
|
.into_iter()
|
||||||
|
.dedup_with_count()
|
||||||
|
.map(|(npart, node)| {
|
||||||
|
roles
|
||||||
|
.get(&node_id_vec[node as usize])
|
||||||
|
.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
|
||||||
|
.unwrap_or(0) / npart as u64
|
||||||
|
})
|
||||||
|
.min()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
// By default, zone_redundancy is maximum possible value
|
||||||
|
let parameters = LayoutParameters {
|
||||||
|
zone_redundancy: ZoneRedundancy::Maximum,
|
||||||
|
};
|
||||||
|
|
||||||
|
Self {
|
||||||
|
version: previous.version,
|
||||||
|
replication_factor: previous.replication_factor,
|
||||||
|
partition_size,
|
||||||
|
parameters,
|
||||||
|
roles,
|
||||||
|
node_id_vec,
|
||||||
|
ring_assignment_data: previous.ring_assignation_data,
|
||||||
|
staging_parameters: Lww::new(parameters),
|
||||||
|
staging_roles,
|
||||||
|
staging_hash: [0u8; 32].into(), // will be set in the next migration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn multiply_all_capacities(
|
||||||
|
old_roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
mul: u64,
|
||||||
|
) -> LwwMap<Uuid, NodeRoleV> {
|
||||||
|
let mut new_roles = LwwMap::new();
|
||||||
|
for (node, ts, role) in old_roles.items() {
|
||||||
|
let mut role = role.clone();
|
||||||
|
if let NodeRoleV(Some(NodeRole {
|
||||||
|
capacity: Some(ref mut cap),
|
||||||
|
..
|
||||||
|
})) = role
|
||||||
|
{
|
||||||
|
*cap *= mul;
|
||||||
|
}
|
||||||
|
new_roles.merge_raw(node, *ts, &role);
|
||||||
|
}
|
||||||
|
new_roles
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod v010 {
|
||||||
|
use super::v09;
|
||||||
|
use crate::layout::CompactNodeType;
|
||||||
|
use garage_util::crdt::{Lww, LwwMap};
|
||||||
|
use garage_util::data::Uuid;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
|
||||||
|
|
||||||
|
/// Number of old (non-live) versions to keep, see LayoutHistory::old_versions
|
||||||
|
pub const OLD_VERSION_COUNT: usize = 5;
|
||||||
|
|
||||||
|
/// The history of cluster layouts, with trackers to keep a record
|
||||||
|
/// of which nodes are up-to-date to current cluster data
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct LayoutHistory {
|
||||||
|
/// The versions currently in use in the cluster
|
||||||
|
pub versions: Vec<LayoutVersion>,
|
||||||
|
/// At most 5 of the previous versions, not used by the garage_table
|
||||||
|
/// module, but usefull for the garage_block module to find data blocks
|
||||||
|
/// that have not yet been moved
|
||||||
|
pub old_versions: Vec<LayoutVersion>,
|
||||||
|
|
||||||
|
/// Update trackers
|
||||||
|
pub update_trackers: UpdateTrackers,
|
||||||
|
|
||||||
|
/// Staged changes for the next version
|
||||||
|
pub staging: Lww<LayoutStaging>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A version of the layout of the cluster, i.e. the list of roles
|
||||||
|
/// which are assigned to each cluster node
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct LayoutVersion {
|
||||||
|
/// The number of this version
|
||||||
|
pub version: u64,
|
||||||
|
|
||||||
|
/// Roles assigned to nodes in this version
|
||||||
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
/// Parameters used to compute the assignment currently given by
|
||||||
|
/// ring_assignment_data
|
||||||
|
pub parameters: LayoutParameters,
|
||||||
|
|
||||||
|
/// The number of replicas for each data partition
|
||||||
|
pub replication_factor: usize,
|
||||||
|
/// This attribute is only used to retain the previously computed partition size,
|
||||||
|
/// to know to what extent does it change with the layout update.
|
||||||
|
pub partition_size: u64,
|
||||||
|
|
||||||
|
/// node_id_vec: a vector of node IDs with a role assigned
|
||||||
|
/// in the system (this includes gateway nodes).
|
||||||
|
/// The order here is different than the vec stored by `roles`, because:
|
||||||
|
/// 1. non-gateway nodes are first so that they have lower numbers
|
||||||
|
/// 2. nodes that don't have a role are excluded (but they need to
|
||||||
|
/// stay in the CRDT as tombstones)
|
||||||
|
pub node_id_vec: Vec<Uuid>,
|
||||||
|
/// number of non-gateway nodes, which are the first ids in node_id_vec
|
||||||
|
pub nongateway_node_count: usize,
|
||||||
|
/// The assignation of data partitions to nodes, the values
|
||||||
|
/// are indices in node_id_vec
|
||||||
|
#[serde(with = "serde_bytes")]
|
||||||
|
pub ring_assignment_data: Vec<CompactNodeType>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The staged changes for the next layout version
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct LayoutStaging {
|
||||||
|
/// Parameters to be used in the next partition assignment computation.
|
||||||
|
pub parameters: Lww<LayoutParameters>,
|
||||||
|
/// Role changes which are staged for the next version of the layout
|
||||||
|
pub roles: LwwMap<Uuid, NodeRoleV>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The tracker of acknowlegments and data syncs around the cluster
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
|
||||||
|
pub struct UpdateTrackers {
|
||||||
|
/// The highest layout version number each node has ack'ed
|
||||||
|
pub ack_map: UpdateTracker,
|
||||||
|
/// The highest layout version number each node has synced data for
|
||||||
|
pub sync_map: UpdateTracker,
|
||||||
|
/// The highest layout version number each node has
|
||||||
|
/// ack'ed that all other nodes have synced data for
|
||||||
|
pub sync_ack_map: UpdateTracker,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Generic update tracker struct
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
|
||||||
|
pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);
|
||||||
|
|
||||||
|
impl garage_util::migrate::Migrate for LayoutHistory {
|
||||||
|
const VERSION_MARKER: &'static [u8] = b"G010lh";
|
||||||
|
|
||||||
|
type Previous = v09::ClusterLayout;
|
||||||
|
|
||||||
|
fn migrate(previous: Self::Previous) -> Self {
|
||||||
|
let nongateway_node_count = previous
|
||||||
|
.node_id_vec
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter(|(_, uuid)| {
|
||||||
|
let role = previous.roles.get(uuid);
|
||||||
|
matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
|
||||||
|
})
|
||||||
|
.map(|(i, _)| i + 1)
|
||||||
|
.max()
|
||||||
|
.unwrap_or(0);
|
||||||
|
|
||||||
|
let version = LayoutVersion {
|
||||||
|
version: previous.version,
|
||||||
|
replication_factor: previous.replication_factor,
|
||||||
|
partition_size: previous.partition_size,
|
||||||
|
parameters: previous.parameters,
|
||||||
|
roles: previous.roles,
|
||||||
|
node_id_vec: previous.node_id_vec,
|
||||||
|
nongateway_node_count,
|
||||||
|
ring_assignment_data: previous.ring_assignment_data,
|
||||||
|
};
|
||||||
|
let update_tracker = UpdateTracker(
|
||||||
|
version
|
||||||
|
.nongateway_nodes()
|
||||||
|
.iter()
|
||||||
|
.copied()
|
||||||
|
.map(|x| (x, version.version))
|
||||||
|
.collect::<BTreeMap<Uuid, u64>>(),
|
||||||
|
);
|
||||||
|
let staging = LayoutStaging {
|
||||||
|
parameters: previous.staging_parameters,
|
||||||
|
roles: previous.staging_roles,
|
||||||
|
};
|
||||||
|
Self {
|
||||||
|
versions: vec![version],
|
||||||
|
old_versions: vec![],
|
||||||
|
update_trackers: UpdateTrackers {
|
||||||
|
ack_map: update_tracker.clone(),
|
||||||
|
sync_map: update_tracker.clone(),
|
||||||
|
sync_ack_map: update_tracker.clone(),
|
||||||
|
},
|
||||||
|
staging: Lww::raw(previous.version, staging),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub use v010::*;
|
||||||
|
|
||||||
|
// ---- utility functions ----
|
||||||
|
|
||||||
|
impl AutoCrdt for LayoutParameters {
|
||||||
|
const WARN_IF_DIFFERENT: bool = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AutoCrdt for NodeRoleV {
|
||||||
|
const WARN_IF_DIFFERENT: bool = true;
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Crdt for LayoutStaging {
|
||||||
|
fn merge(&mut self, other: &LayoutStaging) {
|
||||||
|
self.parameters.merge(&other.parameters);
|
||||||
|
self.roles.merge(&other.roles);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NodeRole {
|
||||||
|
pub fn capacity_string(&self) -> String {
|
||||||
|
match self.capacity {
|
||||||
|
Some(c) => ByteSize::b(c).to_string_as(false),
|
||||||
|
None => "gateway".to_string(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn tags_string(&self) -> String {
|
||||||
|
self.tags.join(",")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl fmt::Display for ZoneRedundancy {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
match self {
|
||||||
|
ZoneRedundancy::Maximum => write!(f, "maximum"),
|
||||||
|
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl core::str::FromStr for ZoneRedundancy {
|
||||||
|
type Err = &'static str;
|
||||||
|
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
||||||
|
match s {
|
||||||
|
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
|
||||||
|
x => {
|
||||||
|
let v = x
|
||||||
|
.parse::<usize>()
|
||||||
|
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
|
||||||
|
Ok(ZoneRedundancy::AtLeast(v))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdateTracker {
|
||||||
|
fn merge(&mut self, other: &UpdateTracker) -> bool {
|
||||||
|
let mut changed = false;
|
||||||
|
for (k, v) in other.0.iter() {
|
||||||
|
if let Some(v_mut) = self.0.get_mut(k) {
|
||||||
|
if *v > *v_mut {
|
||||||
|
*v_mut = *v;
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
self.0.insert(*k, *v);
|
||||||
|
changed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
changed
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This bumps the update tracker for a given node up to the specified value.
|
||||||
|
/// This has potential impacts on the correctness of Garage and should only
|
||||||
|
/// be used in very specific circumstances.
|
||||||
|
pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
|
||||||
|
match self.0.get_mut(&peer) {
|
||||||
|
Some(e) if *e < value => {
|
||||||
|
*e = value;
|
||||||
|
true
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
self.0.insert(peer, value);
|
||||||
|
true
|
||||||
|
}
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
|
||||||
|
storage_nodes
|
||||||
|
.iter()
|
||||||
|
.map(|x| self.get(x, min_version))
|
||||||
|
.min()
|
||||||
|
.unwrap_or(min_version)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
|
||||||
|
self.0.get(node).copied().unwrap_or(min_version)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpdateTrackers {
|
||||||
|
pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
|
||||||
|
let c1 = self.ack_map.merge(&other.ack_map);
|
||||||
|
let c2 = self.sync_map.merge(&other.sync_map);
|
||||||
|
let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
|
||||||
|
c1 || c2 || c3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -1,431 +0,0 @@
|
||||||
use std::fmt;
|
|
||||||
|
|
||||||
use bytesize::ByteSize;
|
|
||||||
|
|
||||||
use garage_util::crdt::{AutoCrdt, Crdt};
|
|
||||||
use garage_util::data::Uuid;
|
|
||||||
|
|
||||||
mod v08 {
|
|
||||||
use crate::layout::CompactNodeType;
|
|
||||||
use garage_util::crdt::LwwMap;
|
|
||||||
use garage_util::data::{Hash, Uuid};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
|
|
||||||
/// The layout of the cluster, i.e. the list of roles
|
|
||||||
/// which are assigned to each cluster node
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ClusterLayout {
|
|
||||||
pub version: u64,
|
|
||||||
|
|
||||||
pub replication_factor: usize,
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
|
|
||||||
/// node_id_vec: a vector of node IDs with a role assigned
|
|
||||||
/// in the system (this includes gateway nodes).
|
|
||||||
/// The order here is different than the vec stored by `roles`, because:
|
|
||||||
/// 1. non-gateway nodes are first so that they have lower numbers
|
|
||||||
/// 2. nodes that don't have a role are excluded (but they need to
|
|
||||||
/// stay in the CRDT as tombstones)
|
|
||||||
pub node_id_vec: Vec<Uuid>,
|
|
||||||
/// the assignation of data partitions to node, the values
|
|
||||||
/// are indices in node_id_vec
|
|
||||||
#[serde(with = "serde_bytes")]
|
|
||||||
pub ring_assignation_data: Vec<CompactNodeType>,
|
|
||||||
|
|
||||||
/// Role changes which are staged for the next version of the layout
|
|
||||||
pub staging: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
pub staging_hash: Hash,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct NodeRoleV(pub Option<NodeRole>);
|
|
||||||
|
|
||||||
/// The user-assigned roles of cluster nodes
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct NodeRole {
|
|
||||||
/// Datacenter at which this entry belong. This information is used to
|
|
||||||
/// perform a better geodistribution
|
|
||||||
pub zone: String,
|
|
||||||
/// The capacity of the node
|
|
||||||
/// If this is set to None, the node does not participate in storing data for the system
|
|
||||||
/// and is only active as an API gateway to other nodes
|
|
||||||
pub capacity: Option<u64>,
|
|
||||||
/// A set of tags to recognize the node
|
|
||||||
pub tags: Vec<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl garage_util::migrate::InitialFormat for ClusterLayout {}
|
|
||||||
}
|
|
||||||
|
|
||||||
mod v09 {
|
|
||||||
use super::v08;
|
|
||||||
use crate::layout::CompactNodeType;
|
|
||||||
use garage_util::crdt::{Lww, LwwMap};
|
|
||||||
use garage_util::data::{Hash, Uuid};
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
pub use v08::{NodeRole, NodeRoleV};
|
|
||||||
|
|
||||||
/// The layout of the cluster, i.e. the list of roles
|
|
||||||
/// which are assigned to each cluster node
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct ClusterLayout {
|
|
||||||
pub version: u64,
|
|
||||||
|
|
||||||
pub replication_factor: usize,
|
|
||||||
|
|
||||||
/// This attribute is only used to retain the previously computed partition size,
|
|
||||||
/// to know to what extent does it change with the layout update.
|
|
||||||
pub partition_size: u64,
|
|
||||||
/// Parameters used to compute the assignment currently given by
|
|
||||||
/// ring_assignment_data
|
|
||||||
pub parameters: LayoutParameters,
|
|
||||||
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
|
|
||||||
/// see comment in v08::ClusterLayout
|
|
||||||
pub node_id_vec: Vec<Uuid>,
|
|
||||||
/// see comment in v08::ClusterLayout
|
|
||||||
#[serde(with = "serde_bytes")]
|
|
||||||
pub ring_assignment_data: Vec<CompactNodeType>,
|
|
||||||
|
|
||||||
/// Parameters to be used in the next partition assignment computation.
|
|
||||||
pub staging_parameters: Lww<LayoutParameters>,
|
|
||||||
/// Role changes which are staged for the next version of the layout
|
|
||||||
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
pub staging_hash: Hash,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This struct is used to set the parameters to be used in the assignment computation
|
|
||||||
/// algorithm. It is stored as a Crdt.
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
|
|
||||||
pub struct LayoutParameters {
|
|
||||||
pub zone_redundancy: ZoneRedundancy,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Zone redundancy: if set to AtLeast(x), the layout calculation will aim to store copies
|
|
||||||
/// of each partition on at least that number of different zones.
|
|
||||||
/// Otherwise, copies will be stored on the maximum possible number of zones.
|
|
||||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
|
|
||||||
pub enum ZoneRedundancy {
|
|
||||||
AtLeast(usize),
|
|
||||||
Maximum,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl garage_util::migrate::Migrate for ClusterLayout {
|
|
||||||
const VERSION_MARKER: &'static [u8] = b"G09layout";
|
|
||||||
|
|
||||||
type Previous = v08::ClusterLayout;
|
|
||||||
|
|
||||||
fn migrate(previous: Self::Previous) -> Self {
|
|
||||||
use itertools::Itertools;
|
|
||||||
|
|
||||||
// In the old layout, capacities are in an arbitrary unit,
|
|
||||||
// but in the new layout they are in bytes.
|
|
||||||
// Here we arbitrarily multiply everything by 1G,
|
|
||||||
// such that 1 old capacity unit = 1GB in the new units.
|
|
||||||
// This is totally arbitrary and won't work for most users.
|
|
||||||
let cap_mul = 1024 * 1024 * 1024;
|
|
||||||
let roles = multiply_all_capacities(previous.roles, cap_mul);
|
|
||||||
let staging_roles = multiply_all_capacities(previous.staging, cap_mul);
|
|
||||||
let node_id_vec = previous.node_id_vec;
|
|
||||||
|
|
||||||
// Determine partition size
|
|
||||||
let mut tmp = previous.ring_assignation_data.clone();
|
|
||||||
tmp.sort();
|
|
||||||
let partition_size = tmp
|
|
||||||
.into_iter()
|
|
||||||
.dedup_with_count()
|
|
||||||
.map(|(npart, node)| {
|
|
||||||
roles
|
|
||||||
.get(&node_id_vec[node as usize])
|
|
||||||
.and_then(|p| p.0.as_ref().and_then(|r| r.capacity))
|
|
||||||
.unwrap_or(0) / npart as u64
|
|
||||||
})
|
|
||||||
.min()
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
// By default, zone_redundancy is maximum possible value
|
|
||||||
let parameters = LayoutParameters {
|
|
||||||
zone_redundancy: ZoneRedundancy::Maximum,
|
|
||||||
};
|
|
||||||
|
|
||||||
Self {
|
|
||||||
version: previous.version,
|
|
||||||
replication_factor: previous.replication_factor,
|
|
||||||
partition_size,
|
|
||||||
parameters,
|
|
||||||
roles,
|
|
||||||
node_id_vec,
|
|
||||||
ring_assignment_data: previous.ring_assignation_data,
|
|
||||||
staging_parameters: Lww::new(parameters),
|
|
||||||
staging_roles,
|
|
||||||
staging_hash: [0u8; 32].into(), // will be set in the next migration
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn multiply_all_capacities(
|
|
||||||
old_roles: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
mul: u64,
|
|
||||||
) -> LwwMap<Uuid, NodeRoleV> {
|
|
||||||
let mut new_roles = LwwMap::new();
|
|
||||||
for (node, ts, role) in old_roles.items() {
|
|
||||||
let mut role = role.clone();
|
|
||||||
if let NodeRoleV(Some(NodeRole {
|
|
||||||
capacity: Some(ref mut cap),
|
|
||||||
..
|
|
||||||
})) = role
|
|
||||||
{
|
|
||||||
*cap *= mul;
|
|
||||||
}
|
|
||||||
new_roles.merge_raw(node, *ts, &role);
|
|
||||||
}
|
|
||||||
new_roles
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
mod v010 {
|
|
||||||
use super::v09;
|
|
||||||
use crate::layout::CompactNodeType;
|
|
||||||
use garage_util::crdt::{Lww, LwwMap};
|
|
||||||
use garage_util::data::Uuid;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use std::collections::BTreeMap;
|
|
||||||
pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy};
|
|
||||||
|
|
||||||
pub const OLD_VERSION_COUNT: usize = 5;
|
|
||||||
|
|
||||||
/// The history of cluster layouts, with trackers to keep a record
|
|
||||||
/// of which nodes are up-to-date to current cluster data
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
|
||||||
pub struct LayoutHistory {
|
|
||||||
/// The versions currently in use in the cluster
|
|
||||||
pub versions: Vec<LayoutVersion>,
|
|
||||||
/// At most 5 of the previous versions, not used by the garage_table
|
|
||||||
/// module, but usefull for the garage_block module to find data blocks
|
|
||||||
/// that have not yet been moved
|
|
||||||
pub old_versions: Vec<LayoutVersion>,
|
|
||||||
|
|
||||||
/// Update trackers
|
|
||||||
pub update_trackers: UpdateTrackers,
|
|
||||||
|
|
||||||
/// Staged changes for the next version
|
|
||||||
pub staging: Lww<LayoutStaging>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// A version of the layout of the cluster, i.e. the list of roles
|
|
||||||
/// which are assigned to each cluster node
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
|
||||||
pub struct LayoutVersion {
|
|
||||||
pub version: u64,
|
|
||||||
|
|
||||||
pub replication_factor: usize,
|
|
||||||
|
|
||||||
/// This attribute is only used to retain the previously computed partition size,
|
|
||||||
/// to know to what extent does it change with the layout update.
|
|
||||||
pub partition_size: u64,
|
|
||||||
/// Parameters used to compute the assignment currently given by
|
|
||||||
/// ring_assignment_data
|
|
||||||
pub parameters: LayoutParameters,
|
|
||||||
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
|
|
||||||
/// see comment in v08::ClusterLayout
|
|
||||||
pub node_id_vec: Vec<Uuid>,
|
|
||||||
/// number of non-gateway nodes, which are the first ids in node_id_vec
|
|
||||||
pub nongateway_node_count: usize,
|
|
||||||
/// see comment in v08::ClusterLayout
|
|
||||||
#[serde(with = "serde_bytes")]
|
|
||||||
pub ring_assignment_data: Vec<CompactNodeType>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The staged changes for the next layout version
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
|
||||||
pub struct LayoutStaging {
|
|
||||||
/// Parameters to be used in the next partition assignment computation.
|
|
||||||
pub parameters: Lww<LayoutParameters>,
|
|
||||||
/// Role changes which are staged for the next version of the layout
|
|
||||||
pub roles: LwwMap<Uuid, NodeRoleV>,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The tracker of acknowlegments and data syncs around the cluster
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
|
|
||||||
pub struct UpdateTrackers {
|
|
||||||
/// The highest layout version number each node has ack'ed
|
|
||||||
pub ack_map: UpdateTracker,
|
|
||||||
/// The highest layout version number each node has synced data for
|
|
||||||
pub sync_map: UpdateTracker,
|
|
||||||
/// The highest layout version number each node has
|
|
||||||
/// ack'ed that all other nodes have synced data for
|
|
||||||
pub sync_ack_map: UpdateTracker,
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The history of cluster layouts
|
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
|
|
||||||
pub struct UpdateTracker(pub BTreeMap<Uuid, u64>);
|
|
||||||
|
|
||||||
impl garage_util::migrate::Migrate for LayoutHistory {
|
|
||||||
const VERSION_MARKER: &'static [u8] = b"G010lh";
|
|
||||||
|
|
||||||
type Previous = v09::ClusterLayout;
|
|
||||||
|
|
||||||
fn migrate(previous: Self::Previous) -> Self {
|
|
||||||
let nongateway_node_count = previous
|
|
||||||
.node_id_vec
|
|
||||||
.iter()
|
|
||||||
.enumerate()
|
|
||||||
.filter(|(_, uuid)| {
|
|
||||||
let role = previous.roles.get(uuid);
|
|
||||||
matches!(role, Some(NodeRoleV(Some(role))) if role.capacity.is_some())
|
|
||||||
})
|
|
||||||
.map(|(i, _)| i + 1)
|
|
||||||
.max()
|
|
||||||
.unwrap_or(0);
|
|
||||||
|
|
||||||
let version = LayoutVersion {
|
|
||||||
version: previous.version,
|
|
||||||
replication_factor: previous.replication_factor,
|
|
||||||
partition_size: previous.partition_size,
|
|
||||||
parameters: previous.parameters,
|
|
||||||
roles: previous.roles,
|
|
||||||
node_id_vec: previous.node_id_vec,
|
|
||||||
nongateway_node_count,
|
|
||||||
ring_assignment_data: previous.ring_assignment_data,
|
|
||||||
};
|
|
||||||
let update_tracker = UpdateTracker(
|
|
||||||
version
|
|
||||||
.nongateway_nodes()
|
|
||||||
.iter()
|
|
||||||
.copied()
|
|
||||||
.map(|x| (x, version.version))
|
|
||||||
.collect::<BTreeMap<Uuid, u64>>(),
|
|
||||||
);
|
|
||||||
let staging = LayoutStaging {
|
|
||||||
parameters: previous.staging_parameters,
|
|
||||||
roles: previous.staging_roles,
|
|
||||||
};
|
|
||||||
Self {
|
|
||||||
versions: vec![version],
|
|
||||||
old_versions: vec![],
|
|
||||||
update_trackers: UpdateTrackers {
|
|
||||||
ack_map: update_tracker.clone(),
|
|
||||||
sync_map: update_tracker.clone(),
|
|
||||||
sync_ack_map: update_tracker.clone(),
|
|
||||||
},
|
|
||||||
staging: Lww::raw(previous.version, staging),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub use v010::*;
|
|
||||||
|
|
||||||
// ---- utility functions ----
|
|
||||||
|
|
||||||
impl AutoCrdt for LayoutParameters {
|
|
||||||
const WARN_IF_DIFFERENT: bool = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AutoCrdt for NodeRoleV {
|
|
||||||
const WARN_IF_DIFFERENT: bool = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Crdt for LayoutStaging {
|
|
||||||
fn merge(&mut self, other: &LayoutStaging) {
|
|
||||||
self.parameters.merge(&other.parameters);
|
|
||||||
self.roles.merge(&other.roles);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl NodeRole {
|
|
||||||
pub fn capacity_string(&self) -> String {
|
|
||||||
match self.capacity {
|
|
||||||
Some(c) => ByteSize::b(c).to_string_as(false),
|
|
||||||
None => "gateway".to_string(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn tags_string(&self) -> String {
|
|
||||||
self.tags.join(",")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl fmt::Display for ZoneRedundancy {
|
|
||||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
||||||
match self {
|
|
||||||
ZoneRedundancy::Maximum => write!(f, "maximum"),
|
|
||||||
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl core::str::FromStr for ZoneRedundancy {
|
|
||||||
type Err = &'static str;
|
|
||||||
fn from_str(s: &str) -> Result<Self, Self::Err> {
|
|
||||||
match s {
|
|
||||||
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
|
|
||||||
x => {
|
|
||||||
let v = x
|
|
||||||
.parse::<usize>()
|
|
||||||
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
|
|
||||||
Ok(ZoneRedundancy::AtLeast(v))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UpdateTracker {
|
|
||||||
fn merge(&mut self, other: &UpdateTracker) -> bool {
|
|
||||||
let mut changed = false;
|
|
||||||
for (k, v) in other.0.iter() {
|
|
||||||
if let Some(v_mut) = self.0.get_mut(k) {
|
|
||||||
if *v > *v_mut {
|
|
||||||
*v_mut = *v;
|
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
self.0.insert(*k, *v);
|
|
||||||
changed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
changed
|
|
||||||
}
|
|
||||||
|
|
||||||
/// This bumps the update tracker for a given node up to the specified value.
|
|
||||||
/// This has potential impacts on the correctness of Garage and should only
|
|
||||||
/// be used in very specific circumstances.
|
|
||||||
pub fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
|
|
||||||
match self.0.get_mut(&peer) {
|
|
||||||
Some(e) if *e < value => {
|
|
||||||
*e = value;
|
|
||||||
true
|
|
||||||
}
|
|
||||||
None => {
|
|
||||||
self.0.insert(peer, value);
|
|
||||||
true
|
|
||||||
}
|
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
|
|
||||||
storage_nodes
|
|
||||||
.iter()
|
|
||||||
.map(|x| self.get(x, min_version))
|
|
||||||
.min()
|
|
||||||
.unwrap_or(min_version)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn get(&self, node: &Uuid, min_version: u64) -> u64 {
|
|
||||||
self.0.get(node).copied().unwrap_or(min_version)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl UpdateTrackers {
|
|
||||||
pub(crate) fn merge(&mut self, other: &UpdateTrackers) -> bool {
|
|
||||||
let c1 = self.ack_map.merge(&other.ack_map);
|
|
||||||
let c2 = self.sync_map.merge(&other.sync_map);
|
|
||||||
let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
|
|
||||||
c1 || c2 || c3
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -10,7 +10,6 @@ use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
|
||||||
use super::graph_algo::*;
|
use super::graph_algo::*;
|
||||||
use super::schema::*;
|
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
||||||
// The Message type will be used to collect information on the algorithm.
|
// The Message type will be used to collect information on the algorithm.
|
||||||
|
|
Loading…
Reference in a new issue