From 8a2b1dd422fb57abe611d8c1cf3cb0b55f487189 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 9 Nov 2023 12:55:36 +0100 Subject: [PATCH] wip: split out layout management from System into separate LayoutManager --- src/api/admin/cluster.rs | 18 ++- src/block/manager.rs | 10 +- src/block/resync.rs | 4 +- src/model/k2v/rpc.rs | 20 ++- src/rpc/layout/manager.rs | 177 +++++++++++++++++++++++ src/rpc/layout/mod.rs | 2 + src/rpc/system.rs | 297 ++++++++++++++------------------------ src/table/gc.rs | 4 +- src/table/sync.rs | 10 +- src/table/table.rs | 10 +- 10 files changed, 332 insertions(+), 220 deletions(-) create mode 100644 src/rpc/layout/manager.rs diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index fe8e8764..f5483451 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -240,7 +240,11 @@ pub async fn handle_update_cluster_layout( .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); } - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = format_cluster_layout(&layout); Ok(json_ok_response(&res)?) @@ -255,7 +259,11 @@ pub async fn handle_apply_cluster_layout( let layout = garage.system.cluster_layout().as_ref().clone(); let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = ApplyClusterLayoutResponse { message: msg, @@ -267,7 +275,11 @@ pub async fn handle_apply_cluster_layout( pub async fn handle_revert_cluster_layout(garage: &Arc) -> Result, Error> { let layout = garage.system.cluster_layout().as_ref().clone(); let layout = layout.revert_staged_changes()?; - garage.system.update_cluster_layout(&layout).await?; + garage + .system + .layout_manager + .update_cluster_layout(&layout) + .await?; let res = format_cluster_layout(&layout); Ok(json_ok_response(&res)?) diff --git a/src/block/manager.rs b/src/block/manager.rs index 2d1b5c67..72b4ea66 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -265,7 +265,7 @@ impl BlockManager { Fut: futures::Future>, { let who = self.replication.read_nodes(hash); - let who = self.system.rpc.request_order(&who); + let who = self.system.rpc_helper().request_order(&who); for node in who.iter() { let node_id = NodeID::from(*node); @@ -305,7 +305,7 @@ impl BlockManager { // if the first one doesn't succeed rapidly // TODO: keep first request running when initiating a new one and take the // one that finishes earlier - _ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { + _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => { debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); } }; @@ -363,7 +363,7 @@ impl BlockManager { Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -439,7 +439,7 @@ impl BlockManager { tokio::spawn(async move { if let Err(e) = this .resync - .put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) + .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout()) { error!("Block {:?} could not be put in resync queue: {}.", hash, e); } @@ -533,7 +533,7 @@ impl BlockManager { None => { // Not found but maybe we should have had it ?? self.resync - .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; + .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?; return Err(Error::Message(format!( "block {:?} not found on node", hash diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c1da4a7..fedcd6f5 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -385,7 +385,7 @@ impl BlockResyncManager { let who_needs_resps = manager .system - .rpc + .rpc_helper() .call_many( &manager.endpoint, &who, @@ -431,7 +431,7 @@ impl BlockResyncManager { .with_stream_from_buffer(bytes); manager .system - .rpc + .rpc_helper() .try_call_many( &manager.endpoint, &need_nodes[..], diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 37e142f6..2f548ad7 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -131,7 +131,7 @@ impl K2VRpcHandler { who.sort(); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -187,7 +187,7 @@ impl K2VRpcHandler { let call_futures = call_list.into_iter().map(|(nodes, items)| async move { let resp = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], @@ -229,7 +229,7 @@ impl K2VRpcHandler { .replication .write_nodes(&poll_key.partition.hash()); - let rpc = self.system.rpc.try_call_many( + let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, &nodes[..], K2VRpc::PollItem { @@ -241,7 +241,8 @@ impl K2VRpcHandler { .with_quorum(self.item_table.data.replication.read_quorum()) .without_timeout(), ); - let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let timeout_duration = + Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout(); let resps = select! { r = rpc => r?, _ = tokio::time::sleep(timeout_duration) => return Ok(None), @@ -300,7 +301,11 @@ impl K2VRpcHandler { let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let mut requests = nodes .iter() - .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .map(|node| { + self.system + .rpc_helper() + .call(&self.endpoint, *node, msg.clone(), rs) + }) .collect::>(); // Fetch responses. This procedure stops fetching responses when any of the following @@ -316,8 +321,9 @@ impl K2VRpcHandler { // kind: all items produced by that node until time ts have been returned, so we can // bump the entry in the global vector clock and possibly remove some item-specific // vector clocks) - let mut deadline = - Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + let mut deadline = Instant::now() + + Duration::from_millis(timeout_msec) + + self.system.rpc_helper().rpc_timeout(); let mut resps = vec![]; let mut errors = vec![]; loop { diff --git a/src/rpc/layout/manager.rs b/src/rpc/layout/manager.rs new file mode 100644 index 00000000..a8a77139 --- /dev/null +++ b/src/rpc/layout/manager.rs @@ -0,0 +1,177 @@ +use std::sync::Arc; +use std::time::Duration; + +use tokio::sync::watch; +use tokio::sync::Mutex; + +use netapp::endpoint::Endpoint; +use netapp::peering::fullmesh::FullMeshPeeringStrategy; +use netapp::NodeID; + +use garage_util::config::Config; +use garage_util::data::*; +use garage_util::error::*; +use garage_util::persister::Persister; + +use super::*; +use crate::rpc_helper::*; +use crate::system::*; + +pub struct LayoutManager { + replication_factor: usize, + persist_cluster_layout: Persister, + + pub layout_watch: watch::Receiver>, + update_layout: Mutex>>, + + pub(crate) rpc_helper: RpcHelper, + system_endpoint: Arc>, +} + +impl LayoutManager { + pub fn new( + config: &Config, + node_id: NodeID, + system_endpoint: Arc>, + fullmesh: Arc, + replication_factor: usize, + ) -> Result { + let persist_cluster_layout: Persister = + Persister::new(&config.metadata_dir, "cluster_layout"); + + let cluster_layout = match persist_cluster_layout.load() { + Ok(x) => { + if x.current().replication_factor != replication_factor { + return Err(Error::Message(format!( + "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", + x.current().replication_factor, + replication_factor + ))); + } + x + } + Err(e) => { + info!( + "No valid previous cluster layout stored ({}), starting fresh.", + e + ); + LayoutHistory::new(replication_factor) + } + }; + + let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); + + let rpc_helper = RpcHelper::new( + node_id.into(), + fullmesh, + layout_watch.clone(), + config.rpc_timeout_msec.map(Duration::from_millis), + ); + + Ok(Self { + replication_factor, + persist_cluster_layout, + layout_watch, + update_layout: Mutex::new(update_layout), + system_endpoint, + rpc_helper, + }) + } + + // ---- PUBLIC INTERFACE ---- + + pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> { + self.handle_advertise_cluster_layout(layout).await?; + Ok(()) + } + + pub fn history(&self) -> watch::Ref> { + self.layout_watch.borrow() + } + + pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) { + let resp = self + .rpc_helper + .call( + &self.system_endpoint, + peer, + SystemRpc::PullClusterLayout, + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await; + if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { + let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; + } + } + + // ---- INTERNALS --- + + /// Save network configuration to disc + async fn save_cluster_layout(&self) -> Result<(), Error> { + let layout: Arc = self.layout_watch.borrow().clone(); + self.persist_cluster_layout + .save_async(&layout) + .await + .expect("Cannot save current cluster layout"); + Ok(()) + } + + // ---- RPC HANDLERS ---- + + pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { + let layout = self.layout_watch.borrow().as_ref().clone(); + SystemRpc::AdvertiseClusterLayout(layout) + } + + pub(crate) async fn handle_advertise_cluster_layout( + &self, + adv: &LayoutHistory, + ) -> Result { + if adv.current().replication_factor != self.replication_factor { + let msg = format!( + "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", + adv.current().replication_factor, + self.replication_factor + ); + error!("{}", msg); + return Err(Error::Message(msg)); + } + + let update_layout = self.update_layout.lock().await; + // TODO: don't clone each time an AdvertiseClusterLayout is received + let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); + + let prev_layout_check = layout.check().is_ok(); + if layout.merge(adv) { + if prev_layout_check && layout.check().is_err() { + error!("New cluster layout is invalid, discarding."); + return Err(Error::Message( + "New cluster layout is invalid, discarding.".into(), + )); + } + + update_layout.send(Arc::new(layout.clone()))?; + drop(update_layout); + + /* TODO + tokio::spawn(async move { + if let Err(e) = system + .rpc_helper() + .broadcast( + &system.system_endpoint, + SystemRpc::AdvertiseClusterLayout(layout), + RequestStrategy::with_priority(PRIO_HIGH), + ) + .await + { + warn!("Error while broadcasting new cluster layout: {}", e); + } + }); + */ + + self.save_cluster_layout().await?; + } + + Ok(SystemRpc::Ok) + } +} diff --git a/src/rpc/layout/mod.rs b/src/rpc/layout/mod.rs index 7c15988a..cd3764bc 100644 --- a/src/rpc/layout/mod.rs +++ b/src/rpc/layout/mod.rs @@ -3,6 +3,8 @@ mod history; mod schema; mod version; +pub mod manager; + // ---- re-exports ---- pub use history::*; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index a7433b68..a8e88425 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -14,7 +14,6 @@ use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; use tokio::select; use tokio::sync::watch; -use tokio::sync::Mutex; use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::message::*; @@ -34,6 +33,7 @@ use garage_util::time::*; use crate::consul::ConsulDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; +use crate::layout::manager::LayoutManager; use crate::layout::*; use crate::replication_mode::*; use crate::rpc_helper::*; @@ -49,7 +49,7 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10); pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A /// RPC endpoint used for calls related to membership -pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; +pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc"; /// RPC messages related to membership #[derive(Debug, Serialize, Deserialize, Clone)] @@ -58,17 +58,17 @@ pub enum SystemRpc { Ok, /// Request to connect to a specific node (in @: format) Connect(String), - /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout - PullClusterLayout, /// Advertise Garage status. Answered with another AdvertiseStatus. /// Exchanged with every node on a regular basis. AdvertiseStatus(NodeStatus), - /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout - AdvertiseClusterLayout(LayoutHistory), /// Get known nodes states GetKnownNodes, /// Return known nodes ReturnKnownNodes(Vec), + /// Ask other node its cluster layout. Answered with AdvertiseClusterLayout + PullClusterLayout, + /// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout + AdvertiseClusterLayout(LayoutHistory), } impl Rpc for SystemRpc { @@ -84,7 +84,6 @@ pub struct System { /// The id of this node pub id: Uuid, - persist_cluster_layout: Persister, persist_peer_list: Persister, local_status: ArcSwap, @@ -92,9 +91,8 @@ pub struct System { pub netapp: Arc, fullmesh: Arc, - pub rpc: RpcHelper, - system_endpoint: Arc>, + pub(crate) system_endpoint: Arc>, rpc_listen_addr: SocketAddr, #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] @@ -106,15 +104,13 @@ pub struct System { #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, + pub layout_manager: LayoutManager, + metrics: SystemMetrics, replication_mode: ReplicationMode, replication_factor: usize, - /// The layout - pub layout_watch: watch::Receiver>, - update_layout: Mutex>>, - /// Path to metadata directory pub metadata_dir: PathBuf, /// Path to data directory @@ -128,8 +124,11 @@ pub struct NodeStatus { /// Replication factor configured on the node pub replication_factor: usize, + /// Cluster layout version pub cluster_layout_version: u64, + /// Hash of cluster layout update trackers + // (TODO) pub cluster_layout_trackers_hash: Hash, /// Hash of cluster layout staging data pub cluster_layout_staging_hash: Hash, @@ -247,8 +246,7 @@ impl System { replication_mode: ReplicationMode, config: &Config, ) -> Result, Error> { - let replication_factor = replication_mode.replication_factor(); - + // ---- setup netapp RPC protocol ---- let node_key = gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); info!( @@ -256,81 +254,40 @@ impl System { hex::encode(&node_key.public_key()[..8]) ); - let persist_cluster_layout: Persister = - Persister::new(&config.metadata_dir, "cluster_layout"); - let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); + let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); + let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); - let cluster_layout = match persist_cluster_layout.load() { - Ok(x) => { - if x.current().replication_factor != replication_factor { - return Err(Error::Message(format!( - "Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.", - x.current().replication_factor, - replication_factor - ))); - } - x - } - Err(e) => { - info!( - "No valid previous cluster layout stored ({}), starting fresh.", - e - ); - LayoutHistory::new(replication_factor) - } - }; - - let metrics = SystemMetrics::new(replication_factor); - - let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout); - local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); - - let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); - - let rpc_public_addr = match &config.rpc_public_addr { - Some(a_str) => { - use std::net::ToSocketAddrs; - match a_str.to_socket_addrs() { - Err(e) => { - error!( - "Cannot resolve rpc_public_addr {} from config file: {}.", - a_str, e - ); - None - } - Ok(a) => { - let a = a.collect::>(); - if a.is_empty() { - error!("rpc_public_addr {} resolve to no known IP address", a_str); - } - if a.len() > 1 { - warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); - } - a.into_iter().next() - } - } - } - None => { - let addr = - get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); - if let Some(a) = addr { - warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); - } - addr - } - }; + // ---- setup netapp public listener and full mesh peering strategy ---- + let rpc_public_addr = get_rpc_public_addr(config); if rpc_public_addr.is_none() { warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); } - let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { fullmesh.set_ping_timeout_millis(ping_timeout); } - let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); + let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list"); + // ---- setup cluster layout and layout manager ---- + let replication_factor = replication_mode.replication_factor(); + + let layout_manager = LayoutManager::new( + config, + netapp.id, + system_endpoint.clone(), + fullmesh.clone(), + replication_factor, + )?; + + // ---- set up metrics and status exchange ---- + let metrics = SystemMetrics::new(replication_factor); + + let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history()); + local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); + + // ---- if enabled, set up additionnal peer discovery methods ---- #[cfg(feature = "consul-discovery")] let consul_discovery = match &config.consul_discovery { Some(cfg) => Some( @@ -349,20 +306,14 @@ impl System { warn!("Kubernetes discovery is not enabled in this build."); } + // ---- done ---- let sys = Arc::new(System { id: netapp.id.into(), - persist_cluster_layout, persist_peer_list, local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), fullmesh: fullmesh.clone(), - rpc: RpcHelper::new( - netapp.id.into(), - fullmesh, - layout_watch.clone(), - config.rpc_timeout_msec.map(Duration::from_millis), - ), system_endpoint, replication_mode, replication_factor, @@ -374,10 +325,9 @@ impl System { consul_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), + layout_manager, metrics, - layout_watch, - update_layout: Mutex::new(update_layout), metadata_dir: config.metadata_dir.clone(), data_dir: config.data_dir.clone(), }); @@ -397,6 +347,20 @@ impl System { ); } + // ---- Public utilities / accessors ---- + + pub fn cluster_layout(&self) -> watch::Ref> { + self.layout_manager.history() + } + + pub fn layout_watch(&self) -> watch::Receiver> { + self.layout_manager.layout_watch.clone() + } + + pub fn rpc_helper(&self) -> &RpcHelper { + &self.layout_manager.rpc_helper + } + // ---- Administrative operations (directly available and // also available through RPC) ---- @@ -423,18 +387,6 @@ impl System { known_nodes } - pub fn cluster_layout(&self) -> watch::Ref> { - self.layout_watch.borrow() - } - - pub async fn update_cluster_layout( - self: &Arc, - layout: &LayoutHistory, - ) -> Result<(), Error> { - self.handle_advertise_cluster_layout(layout).await?; - Ok(()) - } - pub async fn connect(&self, node: &str) -> Result<(), Error> { let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) .await @@ -464,7 +416,7 @@ impl System { } pub fn health(&self) -> ClusterHealth { - let layout: Arc<_> = self.layout_watch.borrow().clone(); + let layout: Arc<_> = self.cluster_layout().clone(); let quorum = self.replication_mode.write_quorum(); let replication_factor = self.replication_factor; @@ -581,20 +533,10 @@ impl System { } } - /// Save network configuration to disc - async fn save_cluster_layout(&self) -> Result<(), Error> { - let layout: Arc = self.layout_watch.borrow().clone(); - self.persist_cluster_layout - .save_async(&layout) - .await - .expect("Cannot save current cluster layout"); - Ok(()) - } - fn update_local_status(&self) { let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); - let layout = self.layout_watch.borrow(); + let layout = self.cluster_layout(); new_si.cluster_layout_version = layout.current().version; new_si.cluster_layout_staging_hash = layout.staging_hash; @@ -610,11 +552,6 @@ impl System { Ok(SystemRpc::Ok) } - fn handle_pull_cluster_layout(&self) -> SystemRpc { - let layout = self.layout_watch.borrow().as_ref().clone(); - SystemRpc::AdvertiseClusterLayout(layout) - } - fn handle_get_known_nodes(&self) -> SystemRpc { let known_nodes = self.get_known_nodes(); SystemRpc::ReturnKnownNodes(known_nodes) @@ -637,7 +574,10 @@ impl System { if info.cluster_layout_version > local_info.cluster_layout_version || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash { - tokio::spawn(self.clone().pull_cluster_layout(from)); + tokio::spawn({ + let system = self.clone(); + async move { system.layout_manager.pull_cluster_layout(from).await } + }); } self.node_status @@ -648,57 +588,6 @@ impl System { Ok(SystemRpc::Ok) } - async fn handle_advertise_cluster_layout( - self: &Arc, - adv: &LayoutHistory, - ) -> Result { - if adv.current().replication_factor != self.replication_factor { - let msg = format!( - "Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.", - adv.current().replication_factor, - self.replication_factor - ); - error!("{}", msg); - return Err(Error::Message(msg)); - } - - let update_layout = self.update_layout.lock().await; - // TODO: don't clone each time an AdvertiseClusterLayout is received - let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone(); - - let prev_layout_check = layout.check().is_ok(); - if layout.merge(adv) { - if prev_layout_check && layout.check().is_err() { - error!("New cluster layout is invalid, discarding."); - return Err(Error::Message( - "New cluster layout is invalid, discarding.".into(), - )); - } - - update_layout.send(Arc::new(layout.clone()))?; - drop(update_layout); - - let self2 = self.clone(); - tokio::spawn(async move { - if let Err(e) = self2 - .rpc - .broadcast( - &self2.system_endpoint, - SystemRpc::AdvertiseClusterLayout(layout), - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await - { - warn!("Error while broadcasting new cluster layout: {}", e); - } - }); - - self.save_cluster_layout().await?; - } - - Ok(SystemRpc::Ok) - } - async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver) { while !*stop_signal.borrow() { let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL; @@ -706,7 +595,7 @@ impl System { self.update_local_status(); let local_status: NodeStatus = self.local_status.load().as_ref().clone(); let _ = self - .rpc + .rpc_helper() .broadcast( &self.system_endpoint, SystemRpc::AdvertiseStatus(local_status), @@ -724,9 +613,9 @@ impl System { async fn discovery_loop(self: &Arc, mut stop_signal: watch::Receiver) { while !*stop_signal.borrow() { - let not_configured = self.layout_watch.borrow().check().is_err(); + let not_configured = self.cluster_layout().check().is_err(); let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; - let expected_n_nodes = self.layout_watch.borrow().current().num_nodes(); + let expected_n_nodes = self.cluster_layout().current().num_nodes(); let bad_peers = self .fullmesh .get_peer_list() @@ -831,34 +720,26 @@ impl System { .save_async(&PeerList(peer_list)) .await } - - async fn pull_cluster_layout(self: Arc, peer: Uuid) { - let resp = self - .rpc - .call( - &self.system_endpoint, - peer, - SystemRpc::PullClusterLayout, - RequestStrategy::with_priority(PRIO_HIGH), - ) - .await; - if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp { - let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await; - } - } } #[async_trait] impl EndpointHandler for System { async fn handle(self: &Arc, msg: &SystemRpc, from: NodeID) -> Result { match msg { + // ---- system functions -> System ---- SystemRpc::Connect(node) => self.handle_connect(node).await, - SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()), SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, - SystemRpc::AdvertiseClusterLayout(adv) => { - self.clone().handle_advertise_cluster_layout(adv).await - } SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), + + // ---- layout functions -> LayoutManager ---- + SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()), + SystemRpc::AdvertiseClusterLayout(adv) => { + self.layout_manager + .handle_advertise_cluster_layout(adv) + .await + } + + // ---- other -> Error ---- m => Err(Error::unexpected_rpc_message(m)), } } @@ -962,6 +843,40 @@ fn get_default_ip() -> Option { .map(|a| a.ip()) } +fn get_rpc_public_addr(config: &Config) -> Option { + match &config.rpc_public_addr { + Some(a_str) => { + use std::net::ToSocketAddrs; + match a_str.to_socket_addrs() { + Err(e) => { + error!( + "Cannot resolve rpc_public_addr {} from config file: {}.", + a_str, e + ); + None + } + Ok(a) => { + let a = a.collect::>(); + if a.is_empty() { + error!("rpc_public_addr {} resolve to no known IP address", a_str); + } + if a.len() > 1 { + warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a); + } + a.into_iter().next() + } + } + } + None => { + let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port())); + if let Some(a) = addr { + warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a); + } + addr + } + } +} + async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { let mut ret = vec![]; diff --git a/src/table/gc.rs b/src/table/gc.rs index 5b9124a7..2135a358 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -227,7 +227,7 @@ impl TableGc { // GC'ing is not a critical function of the system, so it's not a big // deal if we can't do it right now. self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], @@ -248,7 +248,7 @@ impl TableGc { // it means that the garbage collection wasn't completed and has // to be retried later. self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &nodes[..], diff --git a/src/table/sync.rs b/src/table/sync.rs index 620d83b9..2da1bfe7 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -91,7 +91,7 @@ impl TableSyncer { bg.spawn_worker(SyncWorker { syncer: self.clone(), - layout_watch: self.system.layout_watch.clone(), + layout_watch: self.system.layout_watch(), layout: self.system.cluster_layout().clone(), add_full_sync_rx, todo: vec![], @@ -244,7 +244,7 @@ impl TableSyncer { } self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, nodes, @@ -305,7 +305,7 @@ impl TableSyncer { // If so, do nothing. let root_resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -361,7 +361,7 @@ impl TableSyncer { // and compare it with local node let remote_node = match self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, @@ -437,7 +437,7 @@ impl TableSyncer { let rpc_resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, who, diff --git a/src/table/table.rs b/src/table/table.rs index 7ad79677..3e3fd138 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -123,7 +123,7 @@ impl Table { let rpc = TableRpc::::Update(vec![e_enc]); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -181,7 +181,7 @@ impl Table { let resp = self .system - .rpc + .rpc_helper() .call( &self.endpoint, node, @@ -236,7 +236,7 @@ impl Table { let rpc = TableRpc::::ReadEntry(partition_key.clone(), sort_key.clone()); let resps = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -332,7 +332,7 @@ impl Table { let resps = self .system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, &who[..], @@ -411,7 +411,7 @@ impl Table { async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(what.encode()?)); self.system - .rpc + .rpc_helper() .try_call_many( &self.endpoint, who,