layout: use separate CRDT for staged layout changes

This commit is contained in:
Alex 2023-11-09 11:19:43 +01:00
parent 1da0a5676e
commit 523d2ecb95
Signed by: lx
GPG key ID: 0E496D15096376BE
12 changed files with 175 additions and 183 deletions

View file

@ -279,7 +279,7 @@ impl ApiHandler for AdminApiServer {
Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await,
Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await,
Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await,
Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await,
Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await,
// Keys
Endpoint::ListKeys => handle_list_keys(&self.garage).await,
Endpoint::GetKeyInfo {

View file

@ -105,7 +105,9 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
.collect::<Vec<_>>();
let staged_role_changes = layout
.staging_roles
.staging
.get()
.roles
.items()
.iter()
.filter(|(k, _, v)| layout.current().roles.get(k) != Some(v))
@ -211,7 +213,7 @@ pub async fn handle_update_cluster_layout(
let mut layout = garage.system.cluster_layout().as_ref().clone();
let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging_roles);
roles.merge(&layout.staging.get().roles);
for change in updates {
let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?;
@ -232,7 +234,9 @@ pub async fn handle_update_cluster_layout(
};
layout
.staging_roles
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
}
@ -246,7 +250,7 @@ pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
let param = parse_json_body::<ApplyLayoutRequest>(req).await?;
let layout = garage.system.cluster_layout().as_ref().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
@ -260,14 +264,9 @@ pub async fn handle_apply_cluster_layout(
Ok(json_ok_response(&res)?)
}
pub async fn handle_revert_cluster_layout(
garage: &Arc<Garage>,
req: Request<Body>,
) -> Result<Response<Body>, Error> {
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let layout = garage.system.cluster_layout().as_ref().clone();
let layout = layout.revert_staged_changes(Some(param.version))?;
let layout = layout.revert_staged_changes()?;
garage.system.update_cluster_layout(&layout).await?;
let res = format_cluster_layout(&layout);
@ -280,7 +279,7 @@ type UpdateClusterLayoutRequest = Vec<NodeRoleChange>;
#[derive(Deserialize)]
#[serde(rename_all = "camelCase")]
struct ApplyRevertLayoutRequest {
struct ApplyLayoutRequest {
version: u64,
}

View file

@ -85,7 +85,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
));
}
_ => {
let new_role = match layout.staging_roles.get(&adv.id) {
let new_role = match layout.staging.get().roles.get(&adv.id) {
Some(NodeRoleV(Some(_))) => "(pending)",
_ => "NO ROLE ASSIGNED",
};

View file

@ -65,7 +65,7 @@ pub async fn cmd_assign_role(
.collect::<Result<Vec<_>, _>>()?;
let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging_roles);
roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() {
let replaced_node =
@ -73,7 +73,9 @@ pub async fn cmd_assign_role(
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
.staging_roles
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
@ -131,7 +133,9 @@ pub async fn cmd_assign_role(
};
layout
.staging_roles
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
@ -151,13 +155,15 @@ pub async fn cmd_remove_role(
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging_roles);
roles.merge(&layout.staging.get().roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
.staging_roles
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
@ -203,16 +209,12 @@ pub async fn cmd_show_layout(
println!();
println!(" garage layout apply --version {}", v + 1);
println!();
println!(
"You can also revert all proposed changes with: garage layout revert --version {}",
v + 1)
println!("You can also revert all proposed changes with: garage layout revert");
}
Err(e) => {
println!("Error while trying to compute the assignment: {}", e);
println!("This new layout cannot yet be applied.");
println!(
"You can also revert all proposed changes with: garage layout revert --version {}",
v + 1)
println!("You can also revert all proposed changes with: garage layout revert");
}
}
}
@ -245,9 +247,15 @@ pub async fn cmd_revert_layout(
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
if !revert_opt.yes {
return Err(Error::Message(
"Please add the --yes flag to run the layout revert operation".into(),
));
}
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let layout = layout.revert_staged_changes(revert_opt.version)?;
let layout = layout.revert_staged_changes()?;
send_layout(rpc_cli, rpc_host, layout).await?;
@ -284,7 +292,9 @@ pub async fn cmd_config_layout(
}
layout
.staging_parameters
.staging
.get_mut()
.parameters
.update(LayoutParameters { zone_redundancy: r });
println!("The zone redundancy parameter has been set to '{}'.", r);
did_something = true;
@ -371,19 +381,20 @@ pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) {
}
pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
let has_role_changes = layout
.staging_roles
let staging = layout.staging.get();
let has_role_changes = staging
.roles
.items()
.iter()
.any(|(k, _, v)| layout.current().roles.get(k) != Some(v));
let has_layout_changes = *layout.staging_parameters.get() != layout.current().parameters;
let has_layout_changes = *staging.parameters.get() != layout.current().parameters;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for (id, _, role) in layout.staging_roles.items().iter() {
for (id, _, role) in staging.roles.items().iter() {
if layout.current().roles.get(id) == Some(role) {
continue;
}
@ -406,7 +417,7 @@ pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool {
if has_layout_changes {
println!(
"Zone redundancy: {}",
layout.staging_parameters.get().zone_redundancy
staging.parameters.get().zone_redundancy
);
}
true

View file

@ -164,9 +164,9 @@ pub struct ApplyLayoutOpt {
#[derive(StructOpt, Debug)]
pub struct RevertLayoutOpt {
/// Version number of old configuration to which to revert
#[structopt(long = "version")]
pub(crate) version: Option<u64>,
/// The revert operation will not be ran unless this flag is added
#[structopt(long = "yes")]
pub(crate) yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]

View file

@ -114,16 +114,6 @@ impl Graph<FlowEdge> {
Ok(result)
}
/// This function returns the value of the flow incoming to v.
pub fn get_inflow(&self, v: Vertex) -> Result<i64, String> {
let idv = self.get_vertex_id(&v)?;
let mut result = 0;
for edge in self.graph[idv].iter() {
result += max(0, self.graph[edge.dest][edge.rev].flow);
}
Ok(result)
}
/// This function returns the value of the flow outgoing from v.
pub fn get_outflow(&self, v: Vertex) -> Result<i64, String> {
let idv = self.get_vertex_id(&v)?;

View file

@ -1,5 +1,3 @@
use std::cmp::Ordering;
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
use garage_util::encode::nonversioned_encode;
@ -12,14 +10,15 @@ impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
let staging_parameters = Lww::<LayoutParameters>::new(version.parameters);
let empty_lwwmap = LwwMap::new();
let staging = LayoutStaging {
parameters: Lww::<LayoutParameters>::new(version.parameters),
roles: LwwMap::new(),
};
let mut ret = LayoutHistory {
versions: vec![version].into_boxed_slice().into(),
update_trackers: Default::default(),
staging_parameters,
staging_roles: empty_lwwmap,
staging: Lww::raw(0, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
@ -31,8 +30,7 @@ impl LayoutHistory {
}
pub(crate) fn calculate_staging_hash(&self) -> Hash {
let hashed_tuple = (&self.staging_roles, &self.staging_parameters);
blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..])
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
// ================== updates to layout, public interface ===================
@ -41,26 +39,10 @@ impl LayoutHistory {
let mut changed = false;
// Merge staged layout changes
match other.current().version.cmp(&self.current().version) {
Ordering::Greater => {
self.staging_parameters = other.staging_parameters.clone();
self.staging_roles = other.staging_roles.clone();
self.staging_hash = other.staging_hash;
if self.staging != other.staging {
changed = true;
}
Ordering::Equal => {
self.staging_parameters.merge(&other.staging_parameters);
self.staging_roles.merge(&other.staging_roles);
let new_staging_hash = self.calculate_staging_hash();
if new_staging_hash != self.staging_hash {
changed = true;
}
self.staging_hash = new_staging_hash;
}
Ordering::Less => (),
}
self.staging.merge(&other.staging);
// Add any new versions to history
for v2 in other.versions.iter() {
@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
}
// Compute new version and add it to history
let mut new_version = self.current().clone();
new_version.version += 1;
new_version.roles.merge(&self.staging_roles);
new_version.roles.merge(&self.staging.get().roles);
new_version.roles.retain(|(_, _, v)| v.0.is_some());
new_version.parameters = *self.staging_parameters.get();
self.staging_roles.clear();
self.staging_hash = self.calculate_staging_hash();
new_version.parameters = *self.staging.get().parameters.get();
let msg = new_version.calculate_partition_assignment()?;
self.versions.push(new_version);
// Reset the staged layout changes
self.staging.update(LayoutStaging {
parameters: self.staging.get().parameters.clone(),
roles: LwwMap::new(),
});
self.staging_hash = self.calculate_staging_hash();
Ok((self, msg))
}
pub fn revert_staged_changes(mut self, version: Option<u64>) -> Result<Self, Error> {
match version {
None => {
let error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
return Err(Error::Message(error.into()));
}
Some(v) => {
if v != self.current().version + 1 {
return Err(Error::Message("Invalid new layout version".into()));
}
}
}
self.staging_roles.clear();
self.staging_parameters.update(self.current().parameters);
pub fn revert_staged_changes(mut self) -> Result<Self, Error> {
self.staging.update(LayoutStaging {
parameters: Lww::new(self.current().parameters.clone()),
roles: LwwMap::new(),
});
self.staging_hash = self.calculate_staging_hash();
// TODO this is stupid, we should have a separate version counter/LWW
// for the staging params
let mut new_version = self.current().clone();
new_version.version += 1;
self.versions.push(new_version);
Ok(self)
}

View file

@ -1,8 +1,10 @@
mod graph_algo;
mod history;
mod schema;
mod tracker;
mod version;
// ---- re-exports ----
pub use history::*;
pub use schema::*;
pub use version::*;

View file

@ -1,3 +1,9 @@
use std::fmt;
use bytesize::ByteSize;
use garage_util::crdt::{AutoCrdt, Crdt};
mod v08 {
use crate::layout::CompactNodeType;
use garage_util::crdt::LwwMap;
@ -210,6 +216,15 @@ mod v010 {
pub ring_assignment_data: Vec<CompactNodeType>,
}
/// The staged changes for the next layout version
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LayoutStaging {
/// Parameters to be used in the next partition assignment computation.
pub parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
pub roles: LwwMap<Uuid, NodeRoleV>,
}
/// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LayoutHistory {
@ -219,10 +234,8 @@ mod v010 {
/// Update trackers
pub update_trackers: UpdateTrackers,
/// Parameters to be used in the next partition assignment computation.
pub staging_parameters: Lww<LayoutParameters>,
/// Role changes which are staged for the next version of the layout
pub staging_roles: LwwMap<Uuid, NodeRoleV>,
/// Staged changes for the next version
pub staging: Lww<LayoutStaging>,
/// Hash of the serialized staging_parameters + staging_roles
pub staging_hash: Hash,
}
@ -265,6 +278,10 @@ mod v010 {
.map(|x| (*x, version.version))
.collect::<HashMap<Uuid, u64>>(),
);
let staging = LayoutStaging {
parameters: previous.staging_parameters,
roles: previous.staging_roles,
};
let mut ret = Self {
versions: vec![version],
update_trackers: UpdateTrackers {
@ -272,8 +289,7 @@ mod v010 {
sync_map: update_tracker.clone(),
sync_ack_map: update_tracker.clone(),
},
staging_parameters: previous.staging_parameters,
staging_roles: previous.staging_roles,
staging: Lww::raw(previous.version, staging),
staging_hash: [0u8; 32].into(),
};
ret.staging_hash = ret.calculate_staging_hash();
@ -283,3 +299,81 @@ mod v010 {
}
pub use v010::*;
// ---- utility functions ----
impl AutoCrdt for LayoutParameters {
const WARN_IF_DIFFERENT: bool = true;
}
impl AutoCrdt for NodeRoleV {
const WARN_IF_DIFFERENT: bool = true;
}
impl Crdt for LayoutStaging {
fn merge(&mut self, other: &LayoutStaging) {
self.parameters.merge(&other.parameters);
self.roles.merge(&other.roles);
}
}
impl NodeRole {
pub fn capacity_string(&self) -> String {
match self.capacity {
Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
pub fn tags_string(&self) -> String {
self.tags.join(",")
}
}
impl fmt::Display for ZoneRedundancy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ZoneRedundancy::Maximum => write!(f, "maximum"),
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
}
}
}
impl core::str::FromStr for ZoneRedundancy {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
x => {
let v = x
.parse::<usize>()
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
Ok(ZoneRedundancy::AtLeast(v))
}
}
}
}
impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) {
for (k, v) in other.0.iter() {
if let Some(v_mut) = self.0.get_mut(k) {
*v_mut = std::cmp::max(*v_mut, *v);
} else {
self.0.insert(*k, *v);
}
}
}
pub(crate) fn min(&self) -> u64 {
self.0.iter().map(|(_, v)| *v).min().unwrap_or(0)
}
}
impl UpdateTrackers {
pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
self.ack_map.merge(&other.ack_map);
self.sync_map.merge(&other.sync_map);
self.sync_ack_map.merge(&other.sync_ack_map);
}
}

View file

@ -1,21 +0,0 @@
use super::*;
impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) {
for (k, v) in other.0.iter() {
if let Some(v_mut) = self.0.get_mut(k) {
*v_mut = std::cmp::max(*v_mut, *v);
} else {
self.0.insert(*k, *v);
}
}
}
}
impl UpdateTrackers {
pub(crate) fn merge(&mut self, other: &UpdateTrackers) {
self.ack_map.merge(&other.ack_map);
self.sync_map.merge(&other.sync_map);
self.sync_ack_map.merge(&other.sync_ack_map);
}
}

View file

@ -1,69 +1,21 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::fmt;
use std::convert::TryInto;
use bytesize::ByteSize;
use itertools::Itertools;
use garage_util::crdt::{AutoCrdt, LwwMap};
use garage_util::crdt::LwwMap;
use garage_util::data::*;
use garage_util::error::*;
use crate::graph_algo::*;
use std::convert::TryInto;
use super::graph_algo::*;
use super::schema::*;
use super::*;
// The Message type will be used to collect information on the algorithm.
pub type Message = Vec<String>;
impl AutoCrdt for LayoutParameters {
const WARN_IF_DIFFERENT: bool = true;
}
impl AutoCrdt for NodeRoleV {
const WARN_IF_DIFFERENT: bool = true;
}
impl NodeRole {
pub fn capacity_string(&self) -> String {
match self.capacity {
Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
pub fn tags_string(&self) -> String {
self.tags.join(",")
}
}
impl fmt::Display for ZoneRedundancy {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
ZoneRedundancy::Maximum => write!(f, "maximum"),
ZoneRedundancy::AtLeast(x) => write!(f, "{}", x),
}
}
}
impl core::str::FromStr for ZoneRedundancy {
type Err = &'static str;
fn from_str(s: &str) -> Result<Self, Self::Err> {
match s {
"none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum),
x => {
let v = x
.parse::<usize>()
.map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?;
Ok(ZoneRedundancy::AtLeast(v))
}
}
}
}
impl LayoutVersion {
pub fn new(replication_factor: usize) -> Self {
// We set the default zone redundancy to be Maximum, meaning that the maximum

View file

@ -11,7 +11,6 @@ mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;
pub mod graph_algo;
pub mod layout;
pub mod replication_mode;
pub mod system;