wip: split out layout management from System into separate LayoutManager

This commit is contained in:
Alex 2023-11-09 12:55:36 +01:00
parent 523d2ecb95
commit 8a2b1dd422
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
10 changed files with 332 additions and 220 deletions

View file

@ -240,7 +240,11 @@ pub async fn handle_update_cluster_layout(
.merge(&roles.update_mutator(node, layout::NodeRoleV(new_role))); .merge(&roles.update_mutator(node, layout::NodeRoleV(new_role)));
} }
garage.system.update_cluster_layout(&layout).await?; garage
.system
.layout_manager
.update_cluster_layout(&layout)
.await?;
let res = format_cluster_layout(&layout); let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?) Ok(json_ok_response(&res)?)
@ -255,7 +259,11 @@ pub async fn handle_apply_cluster_layout(
let layout = garage.system.cluster_layout().as_ref().clone(); let layout = garage.system.cluster_layout().as_ref().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
garage.system.update_cluster_layout(&layout).await?; garage
.system
.layout_manager
.update_cluster_layout(&layout)
.await?;
let res = ApplyClusterLayoutResponse { let res = ApplyClusterLayoutResponse {
message: msg, message: msg,
@ -267,7 +275,11 @@ pub async fn handle_apply_cluster_layout(
pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let layout = garage.system.cluster_layout().as_ref().clone(); let layout = garage.system.cluster_layout().as_ref().clone();
let layout = layout.revert_staged_changes()?; let layout = layout.revert_staged_changes()?;
garage.system.update_cluster_layout(&layout).await?; garage
.system
.layout_manager
.update_cluster_layout(&layout)
.await?;
let res = format_cluster_layout(&layout); let res = format_cluster_layout(&layout);
Ok(json_ok_response(&res)?) Ok(json_ok_response(&res)?)

View file

@ -265,7 +265,7 @@ impl BlockManager {
Fut: futures::Future<Output = Result<T, Error>>, Fut: futures::Future<Output = Result<T, Error>>,
{ {
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
let who = self.system.rpc.request_order(&who); let who = self.system.rpc_helper().request_order(&who);
for node in who.iter() { for node in who.iter() {
let node_id = NodeID::from(*node); let node_id = NodeID::from(*node);
@ -305,7 +305,7 @@ impl BlockManager {
// if the first one doesn't succeed rapidly // if the first one doesn't succeed rapidly
// TODO: keep first request running when initiating a new one and take the // TODO: keep first request running when initiating a new one and take the
// one that finishes earlier // one that finishes earlier
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => { _ = tokio::time::sleep(self.system.rpc_helper().rpc_timeout()) => {
debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node); debug!("Get block {:?}: node {:?} didn't return block in time, trying next.", hash, node);
} }
}; };
@ -363,7 +363,7 @@ impl BlockManager {
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
@ -439,7 +439,7 @@ impl BlockManager {
tokio::spawn(async move { tokio::spawn(async move {
if let Err(e) = this if let Err(e) = this
.resync .resync
.put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout()) .put_to_resync(&hash, 2 * this.system.rpc_helper().rpc_timeout())
{ {
error!("Block {:?} could not be put in resync queue: {}.", hash, e); error!("Block {:?} could not be put in resync queue: {}.", hash, e);
} }
@ -533,7 +533,7 @@ impl BlockManager {
None => { None => {
// Not found but maybe we should have had it ?? // Not found but maybe we should have had it ??
self.resync self.resync
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; .put_to_resync(hash, 2 * self.system.rpc_helper().rpc_timeout())?;
return Err(Error::Message(format!( return Err(Error::Message(format!(
"block {:?} not found on node", "block {:?} not found on node",
hash hash

View file

@ -385,7 +385,7 @@ impl BlockResyncManager {
let who_needs_resps = manager let who_needs_resps = manager
.system .system
.rpc .rpc_helper()
.call_many( .call_many(
&manager.endpoint, &manager.endpoint,
&who, &who,
@ -431,7 +431,7 @@ impl BlockResyncManager {
.with_stream_from_buffer(bytes); .with_stream_from_buffer(bytes);
manager manager
.system .system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&manager.endpoint, &manager.endpoint,
&need_nodes[..], &need_nodes[..],

View file

@ -131,7 +131,7 @@ impl K2VRpcHandler {
who.sort(); who.sort();
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
@ -187,7 +187,7 @@ impl K2VRpcHandler {
let call_futures = call_list.into_iter().map(|(nodes, items)| async move { let call_futures = call_list.into_iter().map(|(nodes, items)| async move {
let resp = self let resp = self
.system .system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&nodes[..], &nodes[..],
@ -229,7 +229,7 @@ impl K2VRpcHandler {
.replication .replication
.write_nodes(&poll_key.partition.hash()); .write_nodes(&poll_key.partition.hash());
let rpc = self.system.rpc.try_call_many( let rpc = self.system.rpc_helper().try_call_many(
&self.endpoint, &self.endpoint,
&nodes[..], &nodes[..],
K2VRpc::PollItem { K2VRpc::PollItem {
@ -241,7 +241,8 @@ impl K2VRpcHandler {
.with_quorum(self.item_table.data.replication.read_quorum()) .with_quorum(self.item_table.data.replication.read_quorum())
.without_timeout(), .without_timeout(),
); );
let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); let timeout_duration =
Duration::from_millis(timeout_msec) + self.system.rpc_helper().rpc_timeout();
let resps = select! { let resps = select! {
r = rpc => r?, r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None), _ = tokio::time::sleep(timeout_duration) => return Ok(None),
@ -300,7 +301,11 @@ impl K2VRpcHandler {
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
let mut requests = nodes let mut requests = nodes
.iter() .iter()
.map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) .map(|node| {
self.system
.rpc_helper()
.call(&self.endpoint, *node, msg.clone(), rs)
})
.collect::<FuturesUnordered<_>>(); .collect::<FuturesUnordered<_>>();
// Fetch responses. This procedure stops fetching responses when any of the following // Fetch responses. This procedure stops fetching responses when any of the following
@ -316,8 +321,9 @@ impl K2VRpcHandler {
// kind: all items produced by that node until time ts have been returned, so we can // kind: all items produced by that node until time ts have been returned, so we can
// bump the entry in the global vector clock and possibly remove some item-specific // bump the entry in the global vector clock and possibly remove some item-specific
// vector clocks) // vector clocks)
let mut deadline = let mut deadline = Instant::now()
Instant::now() + Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout(); + Duration::from_millis(timeout_msec)
+ self.system.rpc_helper().rpc_timeout();
let mut resps = vec![]; let mut resps = vec![];
let mut errors = vec![]; let mut errors = vec![];
loop { loop {

177
src/rpc/layout/manager.rs Normal file
View file

@ -0,0 +1,177 @@
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::watch;
use tokio::sync::Mutex;
use netapp::endpoint::Endpoint;
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::NodeID;
use garage_util::config::Config;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use super::*;
use crate::rpc_helper::*;
use crate::system::*;
pub struct LayoutManager {
replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>,
pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
pub(crate) rpc_helper: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
}
impl LayoutManager {
pub fn new(
config: &Config,
node_id: NodeID,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
fullmesh: Arc<FullMeshPeeringStrategy>,
replication_factor: usize,
) -> Result<Self, Error> {
let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout");
let cluster_layout = match persist_cluster_layout.load() {
Ok(x) => {
if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
replication_factor
)));
}
x
}
Err(e) => {
info!(
"No valid previous cluster layout stored ({}), starting fresh.",
e
);
LayoutHistory::new(replication_factor)
}
};
let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
let rpc_helper = RpcHelper::new(
node_id.into(),
fullmesh,
layout_watch.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
);
Ok(Self {
replication_factor,
persist_cluster_layout,
layout_watch,
update_layout: Mutex::new(update_layout),
system_endpoint,
rpc_helper,
})
}
// ---- PUBLIC INTERFACE ----
pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?;
Ok(())
}
pub fn history(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_watch.borrow()
}
pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) {
let resp = self
.rpc_helper
.call(
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
}
}
// ---- INTERNALS ---
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
}
// ---- RPC HANDLERS ----
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
let layout = self.layout_watch.borrow().as_ref().clone();
SystemRpc::AdvertiseClusterLayout(layout)
}
pub(crate) async fn handle_advertise_cluster_layout(
&self,
adv: &LayoutHistory,
) -> Result<SystemRpc, Error> {
if adv.current().replication_factor != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
self.replication_factor
);
error!("{}", msg);
return Err(Error::Message(msg));
}
let update_layout = self.update_layout.lock().await;
// TODO: don't clone each time an AdvertiseClusterLayout is received
let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
if prev_layout_check && layout.check().is_err() {
error!("New cluster layout is invalid, discarding.");
return Err(Error::Message(
"New cluster layout is invalid, discarding.".into(),
));
}
update_layout.send(Arc::new(layout.clone()))?;
drop(update_layout);
/* TODO
tokio::spawn(async move {
if let Err(e) = system
.rpc_helper()
.broadcast(
&system.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await
{
warn!("Error while broadcasting new cluster layout: {}", e);
}
});
*/
self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
}
}

View file

@ -3,6 +3,8 @@ mod history;
mod schema; mod schema;
mod version; mod version;
pub mod manager;
// ---- re-exports ---- // ---- re-exports ----
pub use history::*; pub use history::*;

View file

@ -14,7 +14,6 @@ use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
use tokio::select; use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*; use netapp::message::*;
@ -34,6 +33,7 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery; use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::manager::LayoutManager;
use crate::layout::*; use crate::layout::*;
use crate::replication_mode::*; use crate::replication_mode::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -49,7 +49,7 @@ const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A pub const GARAGE_VERSION_TAG: u64 = 0x676172616765000A; // garage 0x000A
/// RPC endpoint used for calls related to membership /// RPC endpoint used for calls related to membership
pub const SYSTEM_RPC_PATH: &str = "garage_rpc/membership.rs/SystemRpc"; pub const SYSTEM_RPC_PATH: &str = "garage_rpc/system.rs/SystemRpc";
/// RPC messages related to membership /// RPC messages related to membership
#[derive(Debug, Serialize, Deserialize, Clone)] #[derive(Debug, Serialize, Deserialize, Clone)]
@ -58,17 +58,17 @@ pub enum SystemRpc {
Ok, Ok,
/// Request to connect to a specific node (in <pubkey>@<host>:<port> format) /// Request to connect to a specific node (in <pubkey>@<host>:<port> format)
Connect(String), Connect(String),
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
PullClusterLayout,
/// Advertise Garage status. Answered with another AdvertiseStatus. /// Advertise Garage status. Answered with another AdvertiseStatus.
/// Exchanged with every node on a regular basis. /// Exchanged with every node on a regular basis.
AdvertiseStatus(NodeStatus), AdvertiseStatus(NodeStatus),
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
AdvertiseClusterLayout(LayoutHistory),
/// Get known nodes states /// Get known nodes states
GetKnownNodes, GetKnownNodes,
/// Return known nodes /// Return known nodes
ReturnKnownNodes(Vec<KnownNodeInfo>), ReturnKnownNodes(Vec<KnownNodeInfo>),
/// Ask other node its cluster layout. Answered with AdvertiseClusterLayout
PullClusterLayout,
/// Advertisement of cluster layout. Sent spontanously or in response to PullClusterLayout
AdvertiseClusterLayout(LayoutHistory),
} }
impl Rpc for SystemRpc { impl Rpc for SystemRpc {
@ -84,7 +84,6 @@ pub struct System {
/// The id of this node /// The id of this node
pub id: Uuid, pub id: Uuid,
persist_cluster_layout: Persister<LayoutHistory>,
persist_peer_list: Persister<PeerList>, persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>, local_status: ArcSwap<NodeStatus>,
@ -92,9 +91,8 @@ pub struct System {
pub netapp: Arc<NetApp>, pub netapp: Arc<NetApp>,
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
pub rpc: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>, pub(crate) system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr, rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
@ -106,15 +104,13 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>, kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
pub layout_manager: LayoutManager,
metrics: SystemMetrics, metrics: SystemMetrics,
replication_mode: ReplicationMode, replication_mode: ReplicationMode,
replication_factor: usize, replication_factor: usize,
/// The layout
pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
/// Path to metadata directory /// Path to metadata directory
pub metadata_dir: PathBuf, pub metadata_dir: PathBuf,
/// Path to data directory /// Path to data directory
@ -128,8 +124,11 @@ pub struct NodeStatus {
/// Replication factor configured on the node /// Replication factor configured on the node
pub replication_factor: usize, pub replication_factor: usize,
/// Cluster layout version /// Cluster layout version
pub cluster_layout_version: u64, pub cluster_layout_version: u64,
/// Hash of cluster layout update trackers
// (TODO) pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data /// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash, pub cluster_layout_staging_hash: Hash,
@ -247,8 +246,7 @@ impl System {
replication_mode: ReplicationMode, replication_mode: ReplicationMode,
config: &Config, config: &Config,
) -> Result<Arc<Self>, Error> { ) -> Result<Arc<Self>, Error> {
let replication_factor = replication_mode.replication_factor(); // ---- setup netapp RPC protocol ----
let node_key = let node_key =
gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID"); gen_node_key(&config.metadata_dir).expect("Unable to read or generate node ID");
info!( info!(
@ -256,81 +254,40 @@ impl System {
hex::encode(&node_key.public_key()[..8]) hex::encode(&node_key.public_key()[..8])
); );
let persist_cluster_layout: Persister<LayoutHistory> = let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
Persister::new(&config.metadata_dir, "cluster_layout"); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
let cluster_layout = match persist_cluster_layout.load() { // ---- setup netapp public listener and full mesh peering strategy ----
Ok(x) => { let rpc_public_addr = get_rpc_public_addr(config);
if x.current().replication_factor != replication_factor {
return Err(Error::Message(format!(
"Prevous cluster layout has replication factor {}, which is different than the one specified in the config file ({}). The previous cluster layout can be purged, if you know what you are doing, simply by deleting the `cluster_layout` file in your metadata directory.",
x.current().replication_factor,
replication_factor
)));
}
x
}
Err(e) => {
info!(
"No valid previous cluster layout stored ({}), starting fresh.",
e
);
LayoutHistory::new(replication_factor)
}
};
let metrics = SystemMetrics::new(replication_factor);
let mut local_status = NodeStatus::initial(replication_factor, &cluster_layout);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
let rpc_public_addr = match &config.rpc_public_addr {
Some(a_str) => {
use std::net::ToSocketAddrs;
match a_str.to_socket_addrs() {
Err(e) => {
error!(
"Cannot resolve rpc_public_addr {} from config file: {}.",
a_str, e
);
None
}
Ok(a) => {
let a = a.collect::<Vec<_>>();
if a.is_empty() {
error!("rpc_public_addr {} resolve to no known IP address", a_str);
}
if a.len() > 1 {
warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
}
a.into_iter().next()
}
}
}
None => {
let addr =
get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
if let Some(a) = addr {
warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
}
addr
}
};
if rpc_public_addr.is_none() { if rpc_public_addr.is_none() {
warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication."); warn!("This Garage node does not know its publicly reachable RPC address, this might hamper intra-cluster communication.");
} }
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec { if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
fullmesh.set_ping_timeout_millis(ping_timeout); fullmesh.set_ping_timeout_millis(ping_timeout);
} }
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); let persist_peer_list = Persister::new(&config.metadata_dir, "peer_list");
// ---- setup cluster layout and layout manager ----
let replication_factor = replication_mode.replication_factor();
let layout_manager = LayoutManager::new(
config,
netapp.id,
system_endpoint.clone(),
fullmesh.clone(),
replication_factor,
)?;
// ---- set up metrics and status exchange ----
let metrics = SystemMetrics::new(replication_factor);
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history());
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
// ---- if enabled, set up additionnal peer discovery methods ----
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery { let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some( Some(cfg) => Some(
@ -349,20 +306,14 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build."); warn!("Kubernetes discovery is not enabled in this build.");
} }
// ---- done ----
let sys = Arc::new(System { let sys = Arc::new(System {
id: netapp.id.into(), id: netapp.id.into(),
persist_cluster_layout,
persist_peer_list, persist_peer_list,
local_status: ArcSwap::new(Arc::new(local_status)), local_status: ArcSwap::new(Arc::new(local_status)),
node_status: RwLock::new(HashMap::new()), node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(), netapp: netapp.clone(),
fullmesh: fullmesh.clone(), fullmesh: fullmesh.clone(),
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
layout_watch.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint, system_endpoint,
replication_mode, replication_mode,
replication_factor, replication_factor,
@ -374,10 +325,9 @@ impl System {
consul_discovery, consul_discovery,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(), kubernetes_discovery: config.kubernetes_discovery.clone(),
layout_manager,
metrics, metrics,
layout_watch,
update_layout: Mutex::new(update_layout),
metadata_dir: config.metadata_dir.clone(), metadata_dir: config.metadata_dir.clone(),
data_dir: config.data_dir.clone(), data_dir: config.data_dir.clone(),
}); });
@ -397,6 +347,20 @@ impl System {
); );
} }
// ---- Public utilities / accessors ----
pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_manager.history()
}
pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> {
self.layout_manager.layout_watch.clone()
}
pub fn rpc_helper(&self) -> &RpcHelper {
&self.layout_manager.rpc_helper
}
// ---- Administrative operations (directly available and // ---- Administrative operations (directly available and
// also available through RPC) ---- // also available through RPC) ----
@ -423,18 +387,6 @@ impl System {
known_nodes known_nodes
} }
pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
self.layout_watch.borrow()
}
pub async fn update_cluster_layout(
self: &Arc<Self>,
layout: &LayoutHistory,
) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?;
Ok(())
}
pub async fn connect(&self, node: &str) -> Result<(), Error> { pub async fn connect(&self, node: &str) -> Result<(), Error> {
let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node) let (pubkey, addrs) = parse_and_resolve_peer_addr_async(node)
.await .await
@ -464,7 +416,7 @@ impl System {
} }
pub fn health(&self) -> ClusterHealth { pub fn health(&self) -> ClusterHealth {
let layout: Arc<_> = self.layout_watch.borrow().clone(); let layout: Arc<_> = self.cluster_layout().clone();
let quorum = self.replication_mode.write_quorum(); let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor; let replication_factor = self.replication_factor;
@ -581,20 +533,10 @@ impl System {
} }
} }
/// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> {
let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
self.persist_cluster_layout
.save_async(&layout)
.await
.expect("Cannot save current cluster layout");
Ok(())
}
fn update_local_status(&self) { fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let layout = self.layout_watch.borrow(); let layout = self.cluster_layout();
new_si.cluster_layout_version = layout.current().version; new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash; new_si.cluster_layout_staging_hash = layout.staging_hash;
@ -610,11 +552,6 @@ impl System {
Ok(SystemRpc::Ok) Ok(SystemRpc::Ok)
} }
fn handle_pull_cluster_layout(&self) -> SystemRpc {
let layout = self.layout_watch.borrow().as_ref().clone();
SystemRpc::AdvertiseClusterLayout(layout)
}
fn handle_get_known_nodes(&self) -> SystemRpc { fn handle_get_known_nodes(&self) -> SystemRpc {
let known_nodes = self.get_known_nodes(); let known_nodes = self.get_known_nodes();
SystemRpc::ReturnKnownNodes(known_nodes) SystemRpc::ReturnKnownNodes(known_nodes)
@ -637,7 +574,10 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{ {
tokio::spawn(self.clone().pull_cluster_layout(from)); tokio::spawn({
let system = self.clone();
async move { system.layout_manager.pull_cluster_layout(from).await }
});
} }
self.node_status self.node_status
@ -648,57 +588,6 @@ impl System {
Ok(SystemRpc::Ok) Ok(SystemRpc::Ok)
} }
async fn handle_advertise_cluster_layout(
self: &Arc<Self>,
adv: &LayoutHistory,
) -> Result<SystemRpc, Error> {
if adv.current().replication_factor != self.replication_factor {
let msg = format!(
"Received a cluster layout from another node with replication factor {}, which is different from what we have in our configuration ({}). Discarding the cluster layout we received.",
adv.current().replication_factor,
self.replication_factor
);
error!("{}", msg);
return Err(Error::Message(msg));
}
let update_layout = self.update_layout.lock().await;
// TODO: don't clone each time an AdvertiseClusterLayout is received
let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok();
if layout.merge(adv) {
if prev_layout_check && layout.check().is_err() {
error!("New cluster layout is invalid, discarding.");
return Err(Error::Message(
"New cluster layout is invalid, discarding.".into(),
));
}
update_layout.send(Arc::new(layout.clone()))?;
drop(update_layout);
let self2 = self.clone();
tokio::spawn(async move {
if let Err(e) = self2
.rpc
.broadcast(
&self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await
{
warn!("Error while broadcasting new cluster layout: {}", e);
}
});
self.save_cluster_layout().await?;
}
Ok(SystemRpc::Ok)
}
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) { async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL; let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
@ -706,7 +595,7 @@ impl System {
self.update_local_status(); self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone(); let local_status: NodeStatus = self.local_status.load().as_ref().clone();
let _ = self let _ = self
.rpc .rpc_helper()
.broadcast( .broadcast(
&self.system_endpoint, &self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status), SystemRpc::AdvertiseStatus(local_status),
@ -724,9 +613,9 @@ impl System {
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = self.layout_watch.borrow().check().is_err(); let not_configured = self.cluster_layout().check().is_err();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
let expected_n_nodes = self.layout_watch.borrow().current().num_nodes(); let expected_n_nodes = self.cluster_layout().current().num_nodes();
let bad_peers = self let bad_peers = self
.fullmesh .fullmesh
.get_peer_list() .get_peer_list()
@ -831,34 +720,26 @@ impl System {
.save_async(&PeerList(peer_list)) .save_async(&PeerList(peer_list))
.await .await
} }
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {
let resp = self
.rpc
.call(
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
let _: Result<_, _> = self.handle_advertise_cluster_layout(&layout).await;
}
}
} }
#[async_trait] #[async_trait]
impl EndpointHandler<SystemRpc> for System { impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg { match msg {
// ---- system functions -> System ----
SystemRpc::Connect(node) => self.handle_connect(node).await, SystemRpc::Connect(node) => self.handle_connect(node).await,
SystemRpc::PullClusterLayout => Ok(self.handle_pull_cluster_layout()),
SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await, SystemRpc::AdvertiseStatus(adv) => self.handle_advertise_status(from.into(), adv).await,
SystemRpc::AdvertiseClusterLayout(adv) => {
self.clone().handle_advertise_cluster_layout(adv).await
}
SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()), SystemRpc::GetKnownNodes => Ok(self.handle_get_known_nodes()),
// ---- layout functions -> LayoutManager ----
SystemRpc::PullClusterLayout => Ok(self.layout_manager.handle_pull_cluster_layout()),
SystemRpc::AdvertiseClusterLayout(adv) => {
self.layout_manager
.handle_advertise_cluster_layout(adv)
.await
}
// ---- other -> Error ----
m => Err(Error::unexpected_rpc_message(m)), m => Err(Error::unexpected_rpc_message(m)),
} }
} }
@ -962,6 +843,40 @@ fn get_default_ip() -> Option<IpAddr> {
.map(|a| a.ip()) .map(|a| a.ip())
} }
fn get_rpc_public_addr(config: &Config) -> Option<SocketAddr> {
match &config.rpc_public_addr {
Some(a_str) => {
use std::net::ToSocketAddrs;
match a_str.to_socket_addrs() {
Err(e) => {
error!(
"Cannot resolve rpc_public_addr {} from config file: {}.",
a_str, e
);
None
}
Ok(a) => {
let a = a.collect::<Vec<_>>();
if a.is_empty() {
error!("rpc_public_addr {} resolve to no known IP address", a_str);
}
if a.len() > 1 {
warn!("Multiple possible resolutions for rpc_public_addr: {:?}. Taking the first one.", a);
}
a.into_iter().next()
}
}
}
None => {
let addr = get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
if let Some(a) = addr {
warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
}
addr
}
}
}
async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> { async fn resolve_peers(peers: &[String]) -> Vec<(NodeID, SocketAddr)> {
let mut ret = vec![]; let mut ret = vec![];

View file

@ -227,7 +227,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// GC'ing is not a critical function of the system, so it's not a big // GC'ing is not a critical function of the system, so it's not a big
// deal if we can't do it right now. // deal if we can't do it right now.
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&nodes[..], &nodes[..],
@ -248,7 +248,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// it means that the garbage collection wasn't completed and has // it means that the garbage collection wasn't completed and has
// to be retried later. // to be retried later.
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&nodes[..], &nodes[..],

View file

@ -91,7 +91,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker { bg.spawn_worker(SyncWorker {
syncer: self.clone(), syncer: self.clone(),
layout_watch: self.system.layout_watch.clone(), layout_watch: self.system.layout_watch(),
layout: self.system.cluster_layout().clone(), layout: self.system.cluster_layout().clone(),
add_full_sync_rx, add_full_sync_rx,
todo: vec![], todo: vec![],
@ -244,7 +244,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
} }
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
nodes, nodes,
@ -305,7 +305,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// If so, do nothing. // If so, do nothing.
let root_resp = self let root_resp = self
.system .system
.rpc .rpc_helper()
.call( .call(
&self.endpoint, &self.endpoint,
who, who,
@ -361,7 +361,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// and compare it with local node // and compare it with local node
let remote_node = match self let remote_node = match self
.system .system
.rpc .rpc_helper()
.call( .call(
&self.endpoint, &self.endpoint,
who, who,
@ -437,7 +437,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
let rpc_resp = self let rpc_resp = self
.system .system
.rpc .rpc_helper()
.call( .call(
&self.endpoint, &self.endpoint,
who, who,

View file

@ -123,7 +123,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::Update(vec![e_enc]); let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
@ -181,7 +181,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resp = self let resp = self
.system .system
.rpc .rpc_helper()
.call( .call(
&self.endpoint, &self.endpoint,
node, node,
@ -236,7 +236,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone()); let rpc = TableRpc::<F>::ReadEntry(partition_key.clone(), sort_key.clone());
let resps = self let resps = self
.system .system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
@ -332,7 +332,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let resps = self let resps = self
.system .system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
@ -411,7 +411,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(what.encode()?)); let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system self.system
.rpc .rpc_helper()
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
who, who,