diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index 269d92f4b..bffc81d3f 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use bytesize::ByteSize; use format_table::format_table; @@ -321,7 +323,7 @@ pub async fn fetch_layout( .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? { - SystemRpc::AdvertiseClusterLayout(t) => Ok(t), + SystemRpc::AdvertiseClusterLayout(t) => Ok(Arc::try_unwrap(t).unwrap()), resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), } } @@ -334,7 +336,7 @@ pub async fn send_layout( rpc_cli .call( &rpc_host, - SystemRpc::AdvertiseClusterLayout(layout), + SystemRpc::AdvertiseClusterLayout(Arc::new(layout)), PRIO_NORMAL, ) .await??; diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index a8a77139c..351e09594 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,6 +1,8 @@ use std::sync::Arc; use std::time::Duration; +use serde::{Deserialize, Serialize}; + use tokio::sync::watch; use tokio::sync::Mutex; @@ -28,6 +30,16 @@ pub struct LayoutManager { system_endpoint: Arc>, } +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LayoutStatus { + /// Cluster layout version + pub cluster_layout_version: u64, + /// Hash of cluster layout update trackers + // (TODO) pub cluster_layout_trackers_hash: Hash, + /// Hash of cluster layout staging data + pub cluster_layout_staging_hash: Hash, +} + impl LayoutManager { pub fn new( config: &Config, @@ -35,7 +47,7 @@ impl LayoutManager { system_endpoint: Arc>, fullmesh: Arc, replication_factor: usize, - ) -> Result { + ) -> Result, Error> { let persist_cluster_layout: Persister = Persister::new(&config.metadata_dir, "cluster_layout"); @@ -68,28 +80,39 @@ impl LayoutManager { config.rpc_timeout_msec.map(Duration::from_millis), ); - Ok(Self { + Ok(Arc::new(Self { replication_factor, persist_cluster_layout, layout_watch, update_layout: Mutex::new(update_layout), system_endpoint, rpc_helper, - }) + })) } // ---- PUBLIC INTERFACE ---- - pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> { + pub fn status(&self) -> LayoutStatus { + let layout = self.layout(); + LayoutStatus { + cluster_layout_version: layout.current().version, + cluster_layout_staging_hash: layout.staging_hash, + } + } + + pub async fn update_cluster_layout( + self: &Arc, + layout: &LayoutHistory, + ) -> Result<(), Error> { self.handle_advertise_cluster_layout(layout).await?; Ok(()) } - pub fn history(&self) -> watch::Ref> { + pub fn layout(&self) -> watch::Ref> { self.layout_watch.borrow() } - pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) { + pub(crate) async fn pull_cluster_layout(self: &Arc, peer: Uuid) { let resp = self .rpc_helper .call( @@ -118,13 +141,25 @@ impl LayoutManager { // ---- RPC HANDLERS ---- + pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, status: &LayoutStatus) { + let local_status = self.status(); + if status.cluster_layout_version > local_status.cluster_layout_version + || status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash + { + tokio::spawn({ + let this = self.clone(); + async move { this.pull_cluster_layout(from).await } + }); + } + } + pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().as_ref().clone(); + let layout = self.layout_watch.borrow().clone(); SystemRpc::AdvertiseClusterLayout(layout) } pub(crate) async fn handle_advertise_cluster_layout( - &self, + self: &Arc, adv: &LayoutHistory, ) -> Result { if adv.current().replication_factor != self.replication_factor { @@ -137,39 +172,42 @@ impl LayoutManager { return Err(Error::Message(msg)); } - let update_layout = self.update_layout.lock().await; - // TODO: don't clone each time an AdvertiseClusterLayout is received - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + if *adv != **self.layout_watch.borrow() { + let update_layout = self.update_layout.lock().await; + let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - update_layout.send(Arc::new(layout.clone()))?; - drop(update_layout); - - /* TODO - tokio::spawn(async move { - if let Err(e) = system - .rpc_helper() - .broadcast( - &system.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); + let prev_layout_check = layout.check().is_ok(); + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); } - }); - */ - self.save_cluster_layout().await?; + let layout = Arc::new(layout); + update_layout.send(layout.clone())?; + drop(update_layout); // release mutex + + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + SystemRpc::AdvertiseClusterLayout(layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } + } + }); + + self.save_cluster_layout().await?; + } } Ok(SystemRpc::Ok) diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index c5b9b1d34..d587a6cb7 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -226,7 +226,7 @@ mod v010 { } /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize)] + #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutHistory { /// The versions currently in use in the cluster pub versions: Vec, @@ -241,7 +241,7 @@ mod v010 { } /// The tracker of acknowlegments and data syncs around the cluster - #[derive(Clone, Debug, Serialize, Deserialize, Default)] + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTrackers { /// The highest layout version number each node has ack'ed pub ack_map: UpdateTracker, @@ -253,7 +253,7 @@ mod v010 { } /// The history of cluster layouts - #[derive(Clone, Debug, Serialize, Deserialize, Default)] + #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] pub struct UpdateTracker(pub HashMap); impl garage_util::migrate::Migrate for LayoutHistory { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index a8e884254..88c4d4433 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -33,7 +33,7 @@ use garage_util::time::*; use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; -use crate::layout::manager::LayoutManager; +use crate::layout::manager::{LayoutManager, LayoutStatus}; use crate::layout::*; use crate::replication_mode::*; use crate::rpc_helper::*; @@ -68,7 +68,7 @@ pub enum SystemRpc { /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout PullClusterLayout, /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout - AdvertiseClusterLayout(LayoutHistory), + AdvertiseClusterLayout(Arc), } impl Rpc for SystemRpc { @@ -104,7 +104,7 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, - pub layout_manager: LayoutManager, + pub layout_manager: Arc, metrics: SystemMetrics, @@ -125,12 +125,8 @@ pub struct NodeStatus { /// Replication factor configured on the node pub replication_factor: usize, - /// Cluster layout version - pub cluster_layout_version: u64, - /// Hash of cluster layout update trackers - // (TODO) pub cluster_layout_trackers_hash: Hash, - /// Hash of cluster layout staging data - pub cluster_layout_staging_hash: Hash, + /// Layout status + pub layout_status: LayoutStatus, /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) #[serde(default)] @@ -284,7 +280,7 @@ impl System { // ---- set up metrics and status exchange ---- let metrics = SystemMetrics::new(replication_factor); - let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history()); + let mut local_status = NodeStatus::initial(replication_factor, &layout_manager); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); // ---- if enabled, set up additionnal peer discovery methods ---- @@ -350,7 +346,7 @@ impl System { // ---- Public utilities / accessors ---- pub fn cluster_layout(&self) -> watch::Ref> { - self.layout_manager.history() + self.layout_manager.layout() } pub fn layout_watch(&self) -> watch::Receiver> { @@ -536,9 +532,7 @@ impl System { fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let layout = self.cluster_layout(); - new_si.cluster_layout_version = layout.current().version; - new_si.cluster_layout_staging_hash = layout.staging_hash; + new_si.layout_status = self.layout_manager.status(); new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); @@ -571,14 +565,8 @@ impl System { std::process::exit(1); } - if info.cluster_layout_version > local_info.cluster_layout_version - || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash - { - tokio::spawn({ - let system = self.clone(); - async move { system.layout_manager.pull_cluster_layout(from).await } - }); - } + self.layout_manager + .handle_advertise_status(from, &info.layout_status); self.node_status .write() @@ -746,14 +734,13 @@ impl EndpointHandler for System { } impl NodeStatus { - fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self { + fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self { NodeStatus { hostname: gethostname::gethostname() .into_string() .unwrap_or_else(|_| "".to_string()), replication_factor, - cluster_layout_version: layout.current().version, - cluster_layout_staging_hash: layout.staging_hash, + layout_status: layout_manager.status(), meta_disk_avail: None, data_disk_avail: None, } @@ -763,8 +750,7 @@ impl NodeStatus { NodeStatus { hostname: "?".to_string(), replication_factor: 0, - cluster_layout_version: 0, - cluster_layout_staging_hash: Hash::from([0u8; 32]), + layout_status: Default::default(), meta_disk_avail: None, data_disk_avail: None, }