diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 4779f9243..d9bd600e1 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -279,7 +279,7 @@ impl ApiHandler for AdminApiServer { Endpoint::GetClusterLayout => handle_get_cluster_layout(&self.garage).await, Endpoint::UpdateClusterLayout => handle_update_cluster_layout(&self.garage, req).await, Endpoint::ApplyClusterLayout => handle_apply_cluster_layout(&self.garage, req).await, - Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage, req).await, + Endpoint::RevertClusterLayout => handle_revert_cluster_layout(&self.garage).await, // Keys Endpoint::ListKeys => handle_list_keys(&self.garage).await, Endpoint::GetKeyInfo { diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index 6dd2e8da1..fe8e87646 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -105,7 +105,9 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp .collect::>(); let staged_role_changes = layout - .staging_roles + .staging + .get() + .roles .items() .iter() .filter(|(k, _, v)| layout.current().roles.get(k) != Some(v)) @@ -211,7 +213,7 @@ pub async fn handle_update_cluster_layout( let mut layout = garage.system.cluster_layout().as_ref().clone(); let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging_roles); + roles.merge(&layout.staging.get().roles); for change in updates { let node = hex::decode(&change.id).ok_or_bad_request("Invalid node identifier")?; @@ -232,7 +234,9 @@ pub async fn handle_update_cluster_layout( }; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } @@ -246,7 +250,7 @@ pub async fn handle_apply_cluster_layout( garage: &Arc, req: Request, ) -> Result, Error> { - let param = parse_json_body::(req).await?; + let param = parse_json_body::(req).await?; let layout = garage.system.cluster_layout().as_ref().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; @@ -260,14 +264,9 @@ pub async fn handle_apply_cluster_layout( Ok(json_ok_response(&res)?) } -pub async fn handle_revert_cluster_layout( - garage: &Arc, - req: Request, -) -> Result, Error> { - let param = parse_json_body::(req).await?; - +pub async fn handle_revert_cluster_layout(garage: &Arc) -> Result, Error> { let layout = garage.system.cluster_layout().as_ref().clone(); - let layout = layout.revert_staged_changes(Some(param.version))?; + let layout = layout.revert_staged_changes()?; garage.system.update_cluster_layout(&layout).await?; let res = format_cluster_layout(&layout); @@ -280,7 +279,7 @@ type UpdateClusterLayoutRequest = Vec; #[derive(Deserialize)] #[serde(rename_all = "camelCase")] -struct ApplyRevertLayoutRequest { +struct ApplyLayoutRequest { version: u64, } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 8be43873b..1a0540254 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -85,7 +85,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint, rpc_host: NodeID) -> )); } _ => { - let new_role = match layout.staging_roles.get(&adv.id) { + let new_role = match layout.staging.get().roles.get(&adv.id) { Some(NodeRoleV(Some(_))) => "(pending)", _ => "NO ROLE ASSIGNED", }; diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 4a6173370..269d92f4b 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -65,7 +65,7 @@ pub async fn cmd_assign_role( .collect::, _>>()?; let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging_roles); + roles.merge(&layout.staging.get().roles); for replaced in args.replace.iter() { let replaced_node = @@ -73,7 +73,9 @@ pub async fn cmd_assign_role( match roles.get(&replaced_node) { Some(NodeRoleV(Some(_))) => { layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(replaced_node, NodeRoleV(None))); } _ => { @@ -131,7 +133,9 @@ pub async fn cmd_assign_role( }; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry)))); } @@ -151,13 +155,15 @@ pub async fn cmd_remove_role( let mut layout = fetch_layout(rpc_cli, rpc_host).await?; let mut roles = layout.current().roles.clone(); - roles.merge(&layout.staging_roles); + roles.merge(&layout.staging.get().roles); let deleted_node = find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?; layout - .staging_roles + .staging + .get_mut() + .roles .merge(&roles.update_mutator(deleted_node, NodeRoleV(None))); send_layout(rpc_cli, rpc_host, layout).await?; @@ -203,16 +209,12 @@ pub async fn cmd_show_layout( println!(); println!(" garage layout apply --version {}", v + 1); println!(); - println!( - "You can also revert all proposed changes with: garage layout revert --version {}", - v + 1) + println!("You can also revert all proposed changes with: garage layout revert"); } Err(e) => { println!("Error while trying to compute the assignment: {}", e); println!("This new layout cannot yet be applied."); - println!( - "You can also revert all proposed changes with: garage layout revert --version {}", - v + 1) + println!("You can also revert all proposed changes with: garage layout revert"); } } } @@ -245,9 +247,15 @@ pub async fn cmd_revert_layout( rpc_host: NodeID, revert_opt: RevertLayoutOpt, ) -> Result<(), Error> { + if !revert_opt.yes { + return Err(Error::Message( + "Please add the --yes flag to run the layout revert operation".into(), + )); + } + let layout = fetch_layout(rpc_cli, rpc_host).await?; - let layout = layout.revert_staged_changes(revert_opt.version)?; + let layout = layout.revert_staged_changes()?; send_layout(rpc_cli, rpc_host, layout).await?; @@ -284,7 +292,9 @@ pub async fn cmd_config_layout( } layout - .staging_parameters + .staging + .get_mut() + .parameters .update(LayoutParameters { zone_redundancy: r }); println!("The zone redundancy parameter has been set to '{}'.", r); did_something = true; @@ -371,19 +381,20 @@ pub fn print_cluster_layout(layout: &LayoutVersion, empty_msg: &str) { } pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool { - let has_role_changes = layout - .staging_roles + let staging = layout.staging.get(); + let has_role_changes = staging + .roles .items() .iter() .any(|(k, _, v)| layout.current().roles.get(k) != Some(v)); - let has_layout_changes = *layout.staging_parameters.get() != layout.current().parameters; + let has_layout_changes = *staging.parameters.get() != layout.current().parameters; if has_role_changes || has_layout_changes { println!(); println!("==== STAGED ROLE CHANGES ===="); if has_role_changes { let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()]; - for (id, _, role) in layout.staging_roles.items().iter() { + for (id, _, role) in staging.roles.items().iter() { if layout.current().roles.get(id) == Some(role) { continue; } @@ -406,7 +417,7 @@ pub fn print_staging_role_changes(layout: &LayoutHistory) -> bool { if has_layout_changes { println!( "Zone redundancy: {}", - layout.staging_parameters.get().zone_redundancy + staging.parameters.get().zone_redundancy ); } true diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index aba575515..3badc4473 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -164,9 +164,9 @@ pub struct ApplyLayoutOpt { #[derive(StructOpt, Debug)] pub struct RevertLayoutOpt { - /// Version number of old configuration to which to revert - #[structopt(long = "version")] - pub(crate) version: Option, + /// The revert operation will not be ran unless this flag is added + #[structopt(long = "yes")] + pub(crate) yes: bool, } #[derive(Serialize, Deserialize, StructOpt, Debug)] diff --git a/src/rpc/graph_algo.rs b/src/rpc/layout/graph_algo.rs similarity index 97% rename from src/rpc/graph_algo.rs rename to src/rpc/layout/graph_algo.rs index d8c6c9b98..bd33e97fd 100644 --- a/src/rpc/graph_algo.rs +++ b/src/rpc/layout/graph_algo.rs @@ -114,16 +114,6 @@ impl Graph { Ok(result) } - /// This function returns the value of the flow incoming to v. - pub fn get_inflow(&self, v: Vertex) -> Result { - let idv = self.get_vertex_id(&v)?; - let mut result = 0; - for edge in self.graph[idv].iter() { - result += max(0, self.graph[edge.dest][edge.rev].flow); - } - Ok(result) - } - /// This function returns the value of the flow outgoing from v. pub fn get_outflow(&self, v: Vertex) -> Result { let idv = self.get_vertex_id(&v)?; diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index e59c9e9cf..9ae288870 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -1,5 +1,3 @@ -use std::cmp::Ordering; - use garage_util::crdt::{Crdt, Lww, LwwMap}; use garage_util::data::*; use garage_util::encode::nonversioned_encode; @@ -12,14 +10,15 @@ impl LayoutHistory { pub fn new(replication_factor: usize) -> Self { let version = LayoutVersion::new(replication_factor); - let staging_parameters = Lww::::new(version.parameters); - let empty_lwwmap = LwwMap::new(); + let staging = LayoutStaging { + parameters: Lww::::new(version.parameters), + roles: LwwMap::new(), + }; let mut ret = LayoutHistory { versions: vec![version].into_boxed_slice().into(), update_trackers: Default::default(), - staging_parameters, - staging_roles: empty_lwwmap, + staging: Lww::raw(0, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -31,8 +30,7 @@ impl LayoutHistory { } pub(crate) fn calculate_staging_hash(&self) -> Hash { - let hashed_tuple = (&self.staging_roles, &self.staging_parameters); - blake2sum(&nonversioned_encode(&hashed_tuple).unwrap()[..]) + blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]) } // ================== updates to layout, public interface =================== @@ -41,26 +39,10 @@ impl LayoutHistory { let mut changed = false; // Merge staged layout changes - match other.current().version.cmp(&self.current().version) { - Ordering::Greater => { - self.staging_parameters = other.staging_parameters.clone(); - self.staging_roles = other.staging_roles.clone(); - self.staging_hash = other.staging_hash; - changed = true; - } - Ordering::Equal => { - self.staging_parameters.merge(&other.staging_parameters); - self.staging_roles.merge(&other.staging_roles); - - let new_staging_hash = self.calculate_staging_hash(); - if new_staging_hash != self.staging_hash { - changed = true; - } - - self.staging_hash = new_staging_hash; - } - Ordering::Less => (), + if self.staging != other.staging { + changed = true; } + self.staging.merge(&other.staging); // Add any new versions to history for v2 in other.versions.iter() { @@ -102,50 +84,34 @@ To know the correct value of the new layout version, invoke `garage layout show` } } + // Compute new version and add it to history let mut new_version = self.current().clone(); new_version.version += 1; - new_version.roles.merge(&self.staging_roles); + new_version.roles.merge(&self.staging.get().roles); new_version.roles.retain(|(_, _, v)| v.0.is_some()); - new_version.parameters = *self.staging_parameters.get(); - - self.staging_roles.clear(); - self.staging_hash = self.calculate_staging_hash(); + new_version.parameters = *self.staging.get().parameters.get(); let msg = new_version.calculate_partition_assignment()?; - self.versions.push(new_version); + // Reset the staged layout changes + self.staging.update(LayoutStaging { + parameters: self.staging.get().parameters.clone(), + roles: LwwMap::new(), + }); + self.staging_hash = self.calculate_staging_hash(); + Ok((self, msg)) } - pub fn revert_staged_changes(mut self, version: Option) -> Result { - match version { - None => { - let error = r#" -Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout. -To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes. - "#; - return Err(Error::Message(error.into())); - } - Some(v) => { - if v != self.current().version + 1 { - return Err(Error::Message("Invalid new layout version".into())); - } - } - } - - self.staging_roles.clear(); - self.staging_parameters.update(self.current().parameters); + pub fn revert_staged_changes(mut self) -> Result { + self.staging.update(LayoutStaging { + parameters: Lww::new(self.current().parameters.clone()), + roles: LwwMap::new(), + }); self.staging_hash = self.calculate_staging_hash(); - // TODO this is stupid, we should have a separate version counter/LWW - // for the staging params - let mut new_version = self.current().clone(); - new_version.version += 1; - - self.versions.push(new_version); - Ok(self) } diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 122d4b655..7c15988a8 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -1,8 +1,10 @@ +mod graph_algo; mod history; mod schema; -mod tracker; mod version; +// ---- re-exports ---- + pub use history::*; pub use schema::*; pub use version::*; diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 14e797be3..c5b9b1d34 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -1,3 +1,9 @@ +use std::fmt; + +use bytesize::ByteSize; + +use garage_util::crdt::{AutoCrdt, Crdt}; + mod v08 { use crate::layout::CompactNodeType; use garage_util::crdt::LwwMap; @@ -210,6 +216,15 @@ mod v010 { pub ring_assignment_data: Vec, } + /// The staged changes for the next layout version + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] + pub struct LayoutStaging { + /// Parameters to be used in the next partition assignment computation. + pub parameters: Lww, + /// Role changes which are staged for the next version of the layout + pub roles: LwwMap, + } + /// The history of cluster layouts #[derive(Clone, Debug, Serialize, Deserialize)] pub struct LayoutHistory { @@ -219,10 +234,8 @@ mod v010 { /// Update trackers pub update_trackers: UpdateTrackers, - /// Parameters to be used in the next partition assignment computation. - pub staging_parameters: Lww, - /// Role changes which are staged for the next version of the layout - pub staging_roles: LwwMap, + /// Staged changes for the next version + pub staging: Lww, /// Hash of the serialized staging_parameters + staging_roles pub staging_hash: Hash, } @@ -265,6 +278,10 @@ mod v010 { .map(|x| (*x, version.version)) .collect::>(), ); + let staging = LayoutStaging { + parameters: previous.staging_parameters, + roles: previous.staging_roles, + }; let mut ret = Self { versions: vec![version], update_trackers: UpdateTrackers { @@ -272,8 +289,7 @@ mod v010 { sync_map: update_tracker.clone(), sync_ack_map: update_tracker.clone(), }, - staging_parameters: previous.staging_parameters, - staging_roles: previous.staging_roles, + staging: Lww::raw(previous.version, staging), staging_hash: [0u8; 32].into(), }; ret.staging_hash = ret.calculate_staging_hash(); @@ -283,3 +299,81 @@ mod v010 { } pub use v010::*; + +// ---- utility functions ---- + +impl AutoCrdt for LayoutParameters { + const WARN_IF_DIFFERENT: bool = true; +} + +impl AutoCrdt for NodeRoleV { + const WARN_IF_DIFFERENT: bool = true; +} + +impl Crdt for LayoutStaging { + fn merge(&mut self, other: &LayoutStaging) { + self.parameters.merge(&other.parameters); + self.roles.merge(&other.roles); + } +} + +impl NodeRole { + pub fn capacity_string(&self) -> String { + match self.capacity { + Some(c) => ByteSize::b(c).to_string_as(false), + None => "gateway".to_string(), + } + } + + pub fn tags_string(&self) -> String { + self.tags.join(",") + } +} + +impl fmt::Display for ZoneRedundancy { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + match self { + ZoneRedundancy::Maximum => write!(f, "maximum"), + ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), + } + } +} + +impl core::str::FromStr for ZoneRedundancy { + type Err = &'static str; + fn from_str(s: &str) -> Result { + match s { + "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), + x => { + let v = x + .parse::() + .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; + Ok(ZoneRedundancy::AtLeast(v)) + } + } + } +} + +impl UpdateTracker { + fn merge(&mut self, other: &UpdateTracker) { + for (k, v) in other.0.iter() { + if let Some(v_mut) = self.0.get_mut(k) { + *v_mut = std::cmp::max(*v_mut, *v); + } else { + self.0.insert(*k, *v); + } + } + } + + pub(crate) fn min(&self) -> u64 { + self.0.iter().map(|(_, v)| *v).min().unwrap_or(0) + } +} + +impl UpdateTrackers { + pub(crate) fn merge(&mut self, other: &UpdateTrackers) { + self.ack_map.merge(&other.ack_map); + self.sync_map.merge(&other.sync_map); + self.sync_ack_map.merge(&other.sync_ack_map); + } +} diff --git a/src/rpc/layout/tracker.rs b/src/rpc/layout/tracker.rs deleted file mode 100644 index 778121e4b..000000000 --- a/src/rpc/layout/tracker.rs +++ /dev/null @@ -1,21 +0,0 @@ -use super::*; - -impl UpdateTracker { - fn merge(&mut self, other: &UpdateTracker) { - for (k, v) in other.0.iter() { - if let Some(v_mut) = self.0.get_mut(k) { - *v_mut = std::cmp::max(*v_mut, *v); - } else { - self.0.insert(*k, *v); - } - } - } -} - -impl UpdateTrackers { - pub(crate) fn merge(&mut self, other: &UpdateTrackers) { - self.ack_map.merge(&other.ack_map); - self.sync_map.merge(&other.sync_map); - self.sync_ack_map.merge(&other.sync_ack_map); - } -} diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 363bc2049..6918fdf9c 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -1,69 +1,21 @@ use std::collections::HashMap; use std::collections::HashSet; -use std::fmt; +use std::convert::TryInto; use bytesize::ByteSize; use itertools::Itertools; -use garage_util::crdt::{AutoCrdt, LwwMap}; +use garage_util::crdt::LwwMap; use garage_util::data::*; use garage_util::error::*; -use crate::graph_algo::*; - -use std::convert::TryInto; - +use super::graph_algo::*; use super::schema::*; use super::*; // The Message type will be used to collect information on the algorithm. pub type Message = Vec; -impl AutoCrdt for LayoutParameters { - const WARN_IF_DIFFERENT: bool = true; -} - -impl AutoCrdt for NodeRoleV { - const WARN_IF_DIFFERENT: bool = true; -} - -impl NodeRole { - pub fn capacity_string(&self) -> String { - match self.capacity { - Some(c) => ByteSize::b(c).to_string_as(false), - None => "gateway".to_string(), - } - } - - pub fn tags_string(&self) -> String { - self.tags.join(",") - } -} - -impl fmt::Display for ZoneRedundancy { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - match self { - ZoneRedundancy::Maximum => write!(f, "maximum"), - ZoneRedundancy::AtLeast(x) => write!(f, "{}", x), - } - } -} - -impl core::str::FromStr for ZoneRedundancy { - type Err = &'static str; - fn from_str(s: &str) -> Result { - match s { - "none" | "max" | "maximum" => Ok(ZoneRedundancy::Maximum), - x => { - let v = x - .parse::() - .map_err(|_| "zone redundancy must be 'none'/'max' or an integer")?; - Ok(ZoneRedundancy::AtLeast(v)) - } - } - } -} - impl LayoutVersion { pub fn new(replication_factor: usize) -> Self { // We set the default zone redundancy to be Maximum, meaning that the maximum diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 1af8b78e1..b5b31c05b 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -11,7 +11,6 @@ mod consul; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; -pub mod graph_algo; pub mod layout; pub mod replication_mode; pub mod system;