forked from Deuxfleurs/garage
[import-netapp] move and rename FullMeshPeeringSrategy to PeeringManager
This commit is contained in:
parent
5766befb24
commit
125c662860
5 changed files with 25 additions and 25 deletions
|
@ -177,7 +177,7 @@ impl KnownHosts {
|
||||||
/// A "Full Mesh" peering strategy is a peering strategy that tries
|
/// A "Full Mesh" peering strategy is a peering strategy that tries
|
||||||
/// to establish and maintain a direct connection with all of the
|
/// to establish and maintain a direct connection with all of the
|
||||||
/// known nodes in the network.
|
/// known nodes in the network.
|
||||||
pub struct FullMeshPeeringStrategy {
|
pub struct PeeringManager {
|
||||||
netapp: Arc<NetApp>,
|
netapp: Arc<NetApp>,
|
||||||
known_hosts: RwLock<KnownHosts>,
|
known_hosts: RwLock<KnownHosts>,
|
||||||
public_peer_list: ArcSwap<Vec<PeerInfo>>,
|
public_peer_list: ArcSwap<Vec<PeerInfo>>,
|
||||||
|
@ -189,7 +189,7 @@ pub struct FullMeshPeeringStrategy {
|
||||||
ping_timeout_millis: AtomicU64,
|
ping_timeout_millis: AtomicU64,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FullMeshPeeringStrategy {
|
impl PeeringManager {
|
||||||
/// Create a new Full Mesh peering strategy.
|
/// Create a new Full Mesh peering strategy.
|
||||||
/// The strategy will not be run until `.run()` is called and awaited.
|
/// The strategy will not be run until `.run()` is called and awaited.
|
||||||
/// Once that happens, the peering strategy will try to connect
|
/// Once that happens, the peering strategy will try to connect
|
||||||
|
@ -216,6 +216,7 @@ impl FullMeshPeeringStrategy {
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO for v0.10 / v1.0 : rename the endpoint (it will break compatibility)
|
||||||
let strat = Arc::new(Self {
|
let strat = Arc::new(Self {
|
||||||
netapp: netapp.clone(),
|
netapp: netapp.clone(),
|
||||||
known_hosts: RwLock::new(known_hosts),
|
known_hosts: RwLock::new(known_hosts),
|
||||||
|
@ -588,7 +589,7 @@ impl FullMeshPeeringStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
|
impl EndpointHandler<PingMessage> for PeeringManager {
|
||||||
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
|
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
|
||||||
let ping_resp = PingMessage {
|
let ping_resp = PingMessage {
|
||||||
id: ping.id,
|
id: ping.id,
|
||||||
|
@ -600,7 +601,7 @@ impl EndpointHandler<PingMessage> for FullMeshPeeringStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<PeerListMessage> for FullMeshPeeringStrategy {
|
impl EndpointHandler<PeerListMessage> for PeeringManager {
|
||||||
async fn handle(
|
async fn handle(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
peer_list: &PeerListMessage,
|
peer_list: &PeerListMessage,
|
|
@ -1 +0,0 @@
|
||||||
pub mod fullmesh;
|
|
|
@ -9,7 +9,7 @@ use sodiumoxide::crypto::auth;
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
|
||||||
use crate::netapp::*;
|
use crate::netapp::*;
|
||||||
use crate::peering::fullmesh::*;
|
use crate::peering::*;
|
||||||
use crate::NodeID;
|
use crate::NodeID;
|
||||||
|
|
||||||
#[tokio::test(flavor = "current_thread")]
|
#[tokio::test(flavor = "current_thread")]
|
||||||
|
@ -100,10 +100,10 @@ fn run_netapp(
|
||||||
) -> (
|
) -> (
|
||||||
tokio::task::JoinHandle<()>,
|
tokio::task::JoinHandle<()>,
|
||||||
Arc<NetApp>,
|
Arc<NetApp>,
|
||||||
Arc<FullMeshPeeringStrategy>,
|
Arc<PeeringManager>,
|
||||||
) {
|
) {
|
||||||
let netapp = NetApp::new(0u64, netid, sk);
|
let netapp = NetApp::new(0u64, netid, sk);
|
||||||
let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers, None);
|
let peering = PeeringManager::new(netapp.clone(), bootstrap_peers, None);
|
||||||
|
|
||||||
let peering2 = peering.clone();
|
let peering2 = peering.clone();
|
||||||
let netapp2 = netapp.clone();
|
let netapp2 = netapp.clone();
|
||||||
|
|
|
@ -19,7 +19,7 @@ pub use garage_net::message::{
|
||||||
IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
|
IntoReq, Message as Rpc, OrderTag, Req, RequestPriority, Resp, PRIO_BACKGROUND, PRIO_HIGH,
|
||||||
PRIO_NORMAL, PRIO_SECONDARY,
|
PRIO_NORMAL, PRIO_SECONDARY,
|
||||||
};
|
};
|
||||||
use garage_net::peering::fullmesh::FullMeshPeeringStrategy;
|
use garage_net::peering::PeeringManager;
|
||||||
pub use garage_net::{self, NetApp, NodeID};
|
pub use garage_net::{self, NetApp, NodeID};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -90,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
|
||||||
|
|
||||||
struct RpcHelperInner {
|
struct RpcHelperInner {
|
||||||
our_node_id: Uuid,
|
our_node_id: Uuid,
|
||||||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
peering: Arc<PeeringManager>,
|
||||||
ring: watch::Receiver<Arc<Ring>>,
|
ring: watch::Receiver<Arc<Ring>>,
|
||||||
metrics: RpcMetrics,
|
metrics: RpcMetrics,
|
||||||
rpc_timeout: Duration,
|
rpc_timeout: Duration,
|
||||||
|
@ -99,7 +99,7 @@ struct RpcHelperInner {
|
||||||
impl RpcHelper {
|
impl RpcHelper {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
our_node_id: Uuid,
|
our_node_id: Uuid,
|
||||||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
peering: Arc<PeeringManager>,
|
||||||
ring: watch::Receiver<Arc<Ring>>,
|
ring: watch::Receiver<Arc<Ring>>,
|
||||||
rpc_timeout: Option<Duration>,
|
rpc_timeout: Option<Duration>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
|
@ -107,7 +107,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
Self(Arc::new(RpcHelperInner {
|
Self(Arc::new(RpcHelperInner {
|
||||||
our_node_id,
|
our_node_id,
|
||||||
fullmesh,
|
peering,
|
||||||
ring,
|
ring,
|
||||||
metrics,
|
metrics,
|
||||||
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
|
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
|
||||||
|
@ -210,7 +210,7 @@ impl RpcHelper {
|
||||||
{
|
{
|
||||||
let to = self
|
let to = self
|
||||||
.0
|
.0
|
||||||
.fullmesh
|
.peering
|
||||||
.get_peer_list()
|
.get_peer_list()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|p| p.id.into())
|
.map(|p| p.id.into())
|
||||||
|
@ -391,7 +391,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
|
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
|
||||||
// Retrieve some status variables that we will use to sort requests
|
// Retrieve some status variables that we will use to sort requests
|
||||||
let peer_list = self.0.fullmesh.get_peer_list();
|
let peer_list = self.0.peering.get_peer_list();
|
||||||
let ring: Arc<Ring> = self.0.ring.borrow().clone();
|
let ring: Arc<Ring> = self.0.ring.borrow().clone();
|
||||||
let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
|
let our_zone = match ring.layout.node_role(&self.0.our_node_id) {
|
||||||
Some(pc) => &pc.zone,
|
Some(pc) => &pc.zone,
|
||||||
|
|
|
@ -18,7 +18,7 @@ use tokio::sync::Mutex;
|
||||||
|
|
||||||
use garage_net::endpoint::{Endpoint, EndpointHandler};
|
use garage_net::endpoint::{Endpoint, EndpointHandler};
|
||||||
use garage_net::message::*;
|
use garage_net::message::*;
|
||||||
use garage_net::peering::fullmesh::FullMeshPeeringStrategy;
|
use garage_net::peering::PeeringManager;
|
||||||
use garage_net::util::parse_and_resolve_peer_addr_async;
|
use garage_net::util::parse_and_resolve_peer_addr_async;
|
||||||
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
|
use garage_net::{NetApp, NetworkKey, NodeID, NodeKey};
|
||||||
|
|
||||||
|
@ -92,7 +92,7 @@ pub struct System {
|
||||||
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
|
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
|
||||||
|
|
||||||
pub netapp: Arc<NetApp>,
|
pub netapp: Arc<NetApp>,
|
||||||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
peering: Arc<PeeringManager>,
|
||||||
pub rpc: RpcHelper,
|
pub rpc: RpcHelper,
|
||||||
|
|
||||||
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
||||||
|
@ -326,9 +326,9 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
|
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
|
||||||
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
|
let peering = PeeringManager::new(netapp.clone(), vec![], rpc_public_addr);
|
||||||
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
|
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
|
||||||
fullmesh.set_ping_timeout_millis(ping_timeout);
|
peering.set_ping_timeout_millis(ping_timeout);
|
||||||
}
|
}
|
||||||
|
|
||||||
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
|
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
|
||||||
|
@ -358,10 +358,10 @@ impl System {
|
||||||
local_status: ArcSwap::new(Arc::new(local_status)),
|
local_status: ArcSwap::new(Arc::new(local_status)),
|
||||||
node_status: RwLock::new(HashMap::new()),
|
node_status: RwLock::new(HashMap::new()),
|
||||||
netapp: netapp.clone(),
|
netapp: netapp.clone(),
|
||||||
fullmesh: fullmesh.clone(),
|
peering: peering.clone(),
|
||||||
rpc: RpcHelper::new(
|
rpc: RpcHelper::new(
|
||||||
netapp.id.into(),
|
netapp.id.into(),
|
||||||
fullmesh,
|
peering,
|
||||||
ring.clone(),
|
ring.clone(),
|
||||||
config.rpc_timeout_msec.map(Duration::from_millis),
|
config.rpc_timeout_msec.map(Duration::from_millis),
|
||||||
),
|
),
|
||||||
|
@ -393,7 +393,7 @@ impl System {
|
||||||
self.netapp
|
self.netapp
|
||||||
.clone()
|
.clone()
|
||||||
.listen(self.rpc_listen_addr, None, must_exit.clone()),
|
.listen(self.rpc_listen_addr, None, must_exit.clone()),
|
||||||
self.fullmesh.clone().run(must_exit.clone()),
|
self.peering.clone().run(must_exit.clone()),
|
||||||
self.discovery_loop(must_exit.clone()),
|
self.discovery_loop(must_exit.clone()),
|
||||||
self.status_exchange_loop(must_exit.clone()),
|
self.status_exchange_loop(must_exit.clone()),
|
||||||
);
|
);
|
||||||
|
@ -405,7 +405,7 @@ impl System {
|
||||||
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
|
pub fn get_known_nodes(&self) -> Vec<KnownNodeInfo> {
|
||||||
let node_status = self.node_status.read().unwrap();
|
let node_status = self.node_status.read().unwrap();
|
||||||
let known_nodes = self
|
let known_nodes = self
|
||||||
.fullmesh
|
.peering
|
||||||
.get_peer_list()
|
.get_peer_list()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|n| KnownNodeInfo {
|
.map(|n| KnownNodeInfo {
|
||||||
|
@ -726,10 +726,10 @@ impl System {
|
||||||
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
|
||||||
while !*stop_signal.borrow() {
|
while !*stop_signal.borrow() {
|
||||||
let not_configured = self.ring.borrow().layout.check().is_err();
|
let not_configured = self.ring.borrow().layout.check().is_err();
|
||||||
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
|
let no_peers = self.peering.get_peer_list().len() < self.replication_factor;
|
||||||
let expected_n_nodes = self.ring.borrow().layout.num_nodes();
|
let expected_n_nodes = self.ring.borrow().layout.num_nodes();
|
||||||
let bad_peers = self
|
let bad_peers = self
|
||||||
.fullmesh
|
.peering
|
||||||
.get_peer_list()
|
.get_peer_list()
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|p| p.is_up())
|
.filter(|p| p.is_up())
|
||||||
|
@ -811,7 +811,7 @@ impl System {
|
||||||
// Prepare new peer list to save to file
|
// Prepare new peer list to save to file
|
||||||
// It is a vec of tuples (node ID as Uuid, node SocketAddr)
|
// It is a vec of tuples (node ID as Uuid, node SocketAddr)
|
||||||
let mut peer_list = self
|
let mut peer_list = self
|
||||||
.fullmesh
|
.peering
|
||||||
.get_peer_list()
|
.get_peer_list()
|
||||||
.iter()
|
.iter()
|
||||||
.map(|n| (n.id.into(), n.addr))
|
.map(|n| (n.id.into(), n.addr))
|
||||||
|
|
Loading…
Reference in a new issue