diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index f54834514..593bd7785 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -210,7 +210,7 @@ pub async fn handle_update_cluster_layout( ) -> Result, Error> { let updates = parse_json_body::(req).await?; - let mut layout = garage.system.cluster_layout().as_ref().clone(); + let mut layout = garage.system.cluster_layout().clone(); let mut roles = layout.current().roles.clone(); roles.merge(&layout.staging.get().roles); @@ -256,7 +256,7 @@ pub async fn handle_apply_cluster_layout( ) -> Result, Error> { let param = parse_json_body::(req).await?; - let layout = garage.system.cluster_layout().as_ref().clone(); + let layout = garage.system.cluster_layout().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; garage @@ -273,7 +273,7 @@ pub async fn handle_apply_cluster_layout( } pub async fn handle_revert_cluster_layout(garage: &Arc) -> Result, Error> { - let layout = garage.system.cluster_layout().as_ref().clone(); + let layout = garage.system.cluster_layout().clone(); let layout = layout.revert_staged_changes()?; garage .system diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index a9bc38268..3c2f51a9a 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -5,7 +5,6 @@ use serde::Serialize; use garage_util::data::*; -use garage_rpc::layout::LayoutHistory; use garage_table::util::*; use garage_model::garage::Garage; @@ -26,7 +25,8 @@ pub async fn handle_read_index( ) -> Result, Error> { let reverse = reverse.unwrap_or(false); - let layout: Arc = garage.system.cluster_layout().clone(); + // TODO: not only current + let node_id_vec = garage.system.cluster_layout().current().node_ids().to_vec(); let (partition_keys, more, next_start) = read_range( &garage.k2v.counter_table.table, @@ -35,10 +35,7 @@ pub async fn handle_read_index( &start, &end, limit, - Some(( - DeletedFilter::NotDeleted, - layout.current().node_id_vec.clone(), - )), + Some((DeletedFilter::NotDeleted, node_id_vec)), EnumerationOrder::from_reverse(reverse), ) .await?; @@ -57,7 +54,7 @@ pub async fn handle_read_index( partition_keys: partition_keys .into_iter() .map(|part| { - let vals = part.filtered_values(&layout); + let vals = part.filtered_values(&garage.system.cluster_layout()); ReadIndexResponseEntry { pk: part.sk, entries: *vals.get(&s_entries).unwrap_or(&0), diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index bffc81d3f..269d92f4b 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use bytesize::ByteSize; use format_table::format_table; @@ -323,7 +321,7 @@ pub async fn fetch_layout( .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL) .await?? { - SystemRpc::AdvertiseClusterLayout(t) => Ok(Arc::try_unwrap(t).unwrap()), + SystemRpc::AdvertiseClusterLayout(t) => Ok(t), resp => Err(Error::Message(format!("Invalid RPC response: {:?}", resp))), } } @@ -336,7 +334,7 @@ pub async fn send_layout( rpc_cli .call( &rpc_host, - SystemRpc::AdvertiseClusterLayout(Arc::new(layout)), + SystemRpc::AdvertiseClusterLayout(layout), PRIO_NORMAL, ) .await??; diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs index 18904c8d6..2a9c0fb18 100644 --- a/src/model/helper/bucket.rs +++ b/src/model/helper/bucket.rs @@ -450,10 +450,8 @@ impl<'a> BucketHelper<'a> { #[cfg(feature = "k2v")] { - use garage_rpc::layout::LayoutHistory; - use std::sync::Arc; - - let layout: Arc = self.0.system.cluster_layout().clone(); + // TODO: not only current + let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec(); let k2vindexes = self .0 .k2v @@ -462,10 +460,7 @@ impl<'a> BucketHelper<'a> { .get_range( &bucket_id, None, - Some(( - DeletedFilter::NotDeleted, - layout.current().node_id_vec.clone(), - )), + Some((DeletedFilter::NotDeleted, node_id_vec)), 10, EnumerationOrder::Forward, ) diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs index 351e09594..c021039bc 100644 --- a/src/rpc/layout/manager.rs +++ b/src/rpc/layout/manager.rs @@ -1,10 +1,9 @@ -use std::sync::Arc; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::Duration; use serde::{Deserialize, Serialize}; -use tokio::sync::watch; -use tokio::sync::Mutex; +use tokio::sync::Notify; use netapp::endpoint::Endpoint; use netapp::peering::fullmesh::FullMeshPeeringStrategy; @@ -23,8 +22,8 @@ pub struct LayoutManager { replication_factor: usize, persist_cluster_layout: Persister, - pub layout_watch: watch::Receiver>, - update_layout: Mutex>>, + layout: Arc>, + pub(crate) change_notify: Arc, pub(crate) rpc_helper: RpcHelper, system_endpoint: Arc>, @@ -71,20 +70,21 @@ impl LayoutManager { } }; - let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); + let layout = Arc::new(RwLock::new(cluster_layout)); + let change_notify = Arc::new(Notify::new()); let rpc_helper = RpcHelper::new( node_id.into(), fullmesh, - layout_watch.clone(), + layout.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ); Ok(Arc::new(Self { replication_factor, persist_cluster_layout, - layout_watch, - update_layout: Mutex::new(update_layout), + layout, + change_notify, system_endpoint, rpc_helper, })) @@ -108,8 +108,8 @@ impl LayoutManager { Ok(()) } - pub fn layout(&self) -> watch::Ref> { - self.layout_watch.borrow() + pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { + self.layout.read().unwrap() } pub(crate) async fn pull_cluster_layout(self: &Arc, peer: Uuid) { @@ -131,7 +131,7 @@ impl LayoutManager { /// Save network configuration to disc async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout: Arc = self.layout_watch.borrow().clone(); + let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning self.persist_cluster_layout .save_async(&layout) .await @@ -139,6 +139,22 @@ impl LayoutManager { Ok(()) } + fn merge_layout(&self, adv: &LayoutHistory) -> Option { + let mut layout = self.layout.write().unwrap(); + let prev_layout_check = layout.check().is_ok(); + + if !prev_layout_check || adv.check().is_ok() { + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + panic!("Merged two correct layouts and got an incorrect layout."); + } + + return Some(layout.clone()); + } + } + None + } + // ---- RPC HANDLERS ---- pub(crate) fn handle_advertise_status(self: &Arc, from: Uuid, status: &LayoutStatus) { @@ -154,7 +170,7 @@ impl LayoutManager { } pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().clone(); + let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning SystemRpc::AdvertiseClusterLayout(layout) } @@ -172,42 +188,27 @@ impl LayoutManager { return Err(Error::Message(msg)); } - if *adv != **self.layout_watch.borrow() { - let update_layout = self.update_layout.lock().await; - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + if let Some(new_layout) = self.merge_layout(adv) { + self.change_notify.notify_waiters(); - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - let layout = Arc::new(layout); - update_layout.send(layout.clone())?; - drop(update_layout); // release mutex - - tokio::spawn({ - let this = self.clone(); - async move { - if let Err(e) = this - .rpc_helper - .broadcast( - &this.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } + tokio::spawn({ + let this = self.clone(); + async move { + if let Err(e) = this + .rpc_helper + .broadcast( + &this.system_endpoint, + SystemRpc::AdvertiseClusterLayout(new_layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); } - }); + } + }); - self.save_cluster_layout().await?; - } + self.save_cluster_layout().await?; } Ok(SystemRpc::Ok) diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 3fdb4acd6..ce291068a 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -1,12 +1,11 @@ //! Contain structs related to making RPCs -use std::sync::Arc; +use std::sync::{Arc, RwLock}; use std::time::Duration; use futures::future::join_all; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use tokio::select; -use tokio::sync::watch; use opentelemetry::KeyValue; use opentelemetry::{ @@ -91,7 +90,7 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, fullmesh: Arc, - layout_watch: watch::Receiver>, + layout: Arc>, metrics: RpcMetrics, rpc_timeout: Duration, } @@ -100,7 +99,7 @@ impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, fullmesh: Arc, - layout_watch: watch::Receiver>, + layout: Arc>, rpc_timeout: Option, ) -> Self { let metrics = RpcMetrics::new(); @@ -108,7 +107,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, fullmesh, - layout_watch, + layout, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), })) @@ -392,7 +391,7 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); - let layout: Arc = self.0.layout_watch.borrow().clone(); + let layout = self.0.layout.read().unwrap(); let our_zone = match layout.current().node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, None => "", diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 88c4d4433..cb3af3fe7 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -4,7 +4,7 @@ use std::io::{Read, Write}; use std::net::{IpAddr, SocketAddr}; use std::path::{Path, PathBuf}; use std::sync::atomic::Ordering; -use std::sync::{Arc, RwLock}; +use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; @@ -13,7 +13,7 @@ use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; use tokio::select; -use tokio::sync::watch; +use tokio::sync::{watch, Notify}; use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::message::*; @@ -68,7 +68,7 @@ pub enum SystemRpc { /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout PullClusterLayout, /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout - AdvertiseClusterLayout(Arc), + AdvertiseClusterLayout(LayoutHistory), } impl Rpc for SystemRpc { @@ -345,12 +345,12 @@ impl System { // ---- Public utilities / accessors ---- - pub fn cluster_layout(&self) -> watch::Ref> { + pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHistory> { self.layout_manager.layout() } - pub fn layout_watch(&self) -> watch::Receiver> { - self.layout_manager.layout_watch.clone() + pub fn layout_notify(&self) -> Arc { + self.layout_manager.change_notify.clone() } pub fn rpc_helper(&self) -> &RpcHelper { @@ -412,7 +412,6 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let layout: Arc<_> = self.cluster_layout().clone(); let quorum = self.replication_mode.write_quorum(); let replication_factor = self.replication_factor; @@ -423,6 +422,8 @@ impl System { .collect::>(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); + let layout = self.cluster_layout(); // acquires a rwlock + // TODO: not only layout.current() let storage_nodes = layout .current() diff --git a/src/table/sync.rs b/src/table/sync.rs index 2da1bfe74..4355bd9e7 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -10,7 +10,7 @@ use rand::Rng; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; use tokio::select; -use tokio::sync::{mpsc, watch}; +use tokio::sync::{mpsc, watch, Notify}; use garage_util::background::*; use garage_util::data::*; @@ -91,8 +91,8 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - layout_watch: self.system.layout_watch(), - layout: self.system.cluster_layout().clone(), + layout_notify: self.system.layout_notify(), + layout_version: self.system.cluster_layout().current().version, add_full_sync_rx, todo: vec![], next_full_sync: Instant::now() + Duration::from_secs(20), @@ -492,8 +492,8 @@ impl EndpointHandler for TableSync struct SyncWorker { syncer: Arc>, - layout_watch: watch::Receiver>, - layout: Arc, + layout_notify: Arc, + layout_version: u64, add_full_sync_rx: mpsc::UnboundedReceiver<()>, todo: Vec, next_full_sync: Instant, @@ -593,12 +593,11 @@ impl Worker for SyncWorker { self.add_full_sync(); } }, - _ = self.layout_watch.changed() => { - let new_layout = self.layout_watch.borrow(); - if !Arc::ptr_eq(&new_layout, &self.layout) { - self.layout = new_layout.clone(); - drop(new_layout); - debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME); + _ = self.layout_notify.notified() => { + let new_version = self.syncer.system.cluster_layout().current().version; + if new_version > self.layout_version { + self.layout_version = new_version; + debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME); self.add_full_sync(); } },