From 125c662860621f9c834e254d62b29b5d5ace5dd4 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 14 Feb 2024 10:04:46 +0100 Subject: [PATCH] [import-netapp] move and rename FullMeshPeeringSrategy to PeeringManager --- src/net/{peering/fullmesh.rs => peering.rs} | 9 +++++---- src/net/peering/mod.rs | 1 - src/net/test.rs | 6 +++--- src/rpc/rpc_helper.rs | 12 +++++------ src/rpc/system.rs | 22 ++++++++++----------- 5 files changed, 25 insertions(+), 25 deletions(-) rename src/net/{peering/fullmesh.rs => peering.rs} (98%) delete mode 100644 src/net/peering/mod.rs diff --git a/src/net/peering/fullmesh.rs b/src/net/peering.rs similarity index 98% rename from src/net/peering/fullmesh.rs rename to src/net/peering.rs index 8e666044a..32199cf8f 100644 --- a/src/net/peering/fullmesh.rs +++ b/src/net/peering.rs @@ -177,7 +177,7 @@ impl KnownHosts { /// A "Full Mesh" peering strategy is a peering strategy that tries /// to establish and maintain a direct connection with all of the /// known nodes in the network. -pub struct FullMeshPeeringStrategy { +pub struct PeeringManager { netapp: Arc, known_hosts: RwLock, public_peer_list: ArcSwap>, @@ -189,7 +189,7 @@ pub struct FullMeshPeeringStrategy { ping_timeout_millis: AtomicU64, } -impl FullMeshPeeringStrategy { +impl PeeringManager { /// Create a new Full Mesh peering strategy. /// The strategy will not be run until `.run()` is called and awaited. /// Once that happens, the peering strategy will try to connect @@ -216,6 +216,7 @@ impl FullMeshPeeringStrategy { ); } + // TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility) let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), @@ -588,7 +589,7 @@ impl FullMeshPeeringStrategy { } #[async_trait] -impl EndpointHandler for FullMeshPeeringStrategy { +impl EndpointHandler for PeeringManager { async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { id: ping.id, @@ -600,7 +601,7 @@ impl EndpointHandler for FullMeshPeeringStrategy { } #[async_trait] -impl EndpointHandler for FullMeshPeeringStrategy { +impl EndpointHandler for PeeringManager { async fn handle( self: &Arc, peer_list: &PeerListMessage, diff --git a/src/net/peering/mod.rs b/src/net/peering/mod.rs deleted file mode 100644 index 044b1dfe0..000000000 --- a/src/net/peering/mod.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod fullmesh; diff --git a/src/net/test.rs b/src/net/test.rs index d4da6f23d..c62597520 100644 --- a/src/net/test.rs +++ b/src/net/test.rs @@ -9,7 +9,7 @@ use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use crate::netapp::*; -use crate::peering::fullmesh::*; +use crate::peering::*; use crate::NodeID; #[tokio::test(flavor = "current_thread")] @@ -100,10 +100,10 @@ fn run_netapp( ) -> ( tokio::task::JoinHandle<()>, Arc, - Arc, + Arc, ) { let netapp = NetApp::new(0u64, netid, sk); - let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers, None); + let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None); let peering2 = peering.clone(); let netapp2 = netapp.clone(); diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index b5279bcd1..c46e577ff 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -19,7 +19,7 @@ pub use garage_net::message::{ IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH, PRIO_NORMAL, PRIO_SECONDARY, }; -use garage_net::peering::fullmesh::FullMeshPeeringStrategy; +use garage_net::peering::PeeringManager; pub use garage_net::{self, NetApp, NodeID}; use garage_util::data::*; @@ -90,7 +90,7 @@ pub struct RpcHelper(Arc); struct RpcHelperInner { our_node_id: Uuid, - fullmesh: Arc, + peering: Arc, ring: watch::Receiver>, metrics: RpcMetrics, rpc_timeout: Duration, @@ -99,7 +99,7 @@ struct RpcHelperInner { impl RpcHelper { pub(crate) fn new( our_node_id: Uuid, - fullmesh: Arc, + peering: Arc, ring: watch::Receiver>, rpc_timeout: Option, ) -> Self { @@ -107,7 +107,7 @@ impl RpcHelper { Self(Arc::new(RpcHelperInner { our_node_id, - fullmesh, + peering, ring, metrics, rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), @@ -210,7 +210,7 @@ impl RpcHelper { { let to = self .0 - .fullmesh + .peering .get_peer_list() .iter() .map(|p| p.id.into()) @@ -391,7 +391,7 @@ impl RpcHelper { pub fn request_order(&self, nodes: &[Uuid]) -> Vec { // Retrieve some status variables that we will use to sort requests - let peer_list = self.0.fullmesh.get_peer_list(); + let peer_list = self.0.peering.get_peer_list(); let ring: Arc = self.0.ring.borrow().clone(); let our_zone = match ring.layout.node_role(&self.0.our_node_id) { Some(pc) => &pc.zone, diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 4b9a72efc..de44e656a 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -18,7 +18,7 @@ use tokio::sync::Mutex; use garage_net::endpoint::{Endpoint, EndpointHandler}; use garage_net::message::*; -use garage_net::peering::fullmesh::FullMeshPeeringStrategy; +use garage_net::peering::PeeringManager; use garage_net::util::parse_and_resolve_peer_addr_async; use garage_net::{NetApp, NetworkKey, NodeID, NodeKey}; @@ -92,7 +92,7 @@ pub struct System { node_status: RwLock>, pub netapp: Arc, - fullmesh: Arc, + peering: Arc, pub rpc: RpcHelper, system_endpoint: Arc>, @@ -326,9 +326,9 @@ impl System { } let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); - let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr); + let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr); if let Some(ping_timeout) = config.rpc_ping_timeout_msec { - fullmesh.set_ping_timeout_millis(ping_timeout); + peering.set_ping_timeout_millis(ping_timeout); } let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); @@ -358,10 +358,10 @@ impl System { local_status: ArcSwap::new(Arc::new(local_status)), node_status: RwLock::new(HashMap::new()), netapp: netapp.clone(), - fullmesh: fullmesh.clone(), + peering: peering.clone(), rpc: RpcHelper::new( netapp.id.into(), - fullmesh, + peering, ring.clone(), config.rpc_timeout_msec.map(Duration::from_millis), ), @@ -393,7 +393,7 @@ impl System { self.netapp .clone() .listen(self.rpc_listen_addr, None, must_exit.clone()), - self.fullmesh.clone().run(must_exit.clone()), + self.peering.clone().run(must_exit.clone()), self.discovery_loop(must_exit.clone()), self.status_exchange_loop(must_exit.clone()), ); @@ -405,7 +405,7 @@ impl System { pub fn get_known_nodes(&self) -> Vec { let node_status = self.node_status.read().unwrap(); let known_nodes = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| KnownNodeInfo { @@ -726,10 +726,10 @@ impl System { async fn discovery_loop(self: &Arc, mut stop_signal: watch::Receiver) { while !*stop_signal.borrow() { let not_configured = self.ring.borrow().layout.check().is_err(); - let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; + let no_peers = self.peering.get_peer_list().len() < self.replication_factor; let expected_n_nodes = self.ring.borrow().layout.num_nodes(); let bad_peers = self - .fullmesh + .peering .get_peer_list() .iter() .filter(|p| p.is_up()) @@ -811,7 +811,7 @@ impl System { // Prepare new peer list to save to file // It is a vec of tuples (node ID as Uuid, node SocketAddr) let mut peer_list = self - .fullmesh + .peering .get_peer_list() .iter() .map(|n| (n.id.into(), n.addr))