use std::collections::{HashMap, VecDeque}; use std::net::SocketAddr; use std::sync::atomic::{self, AtomicU64}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::watch; use sodiumoxide::crypto::hash; use crate::endpoint::*; use crate::error::*; use crate::netapp::*; use crate::message::*; use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; const PING_INTERVAL: Duration = Duration::from_secs(15); const LOOP_DELAY: Duration = Duration::from_secs(1); const FAILED_PING_THRESHOLD: usize = 4; const DEFAULT_PING_TIMEOUT_MILLIS: u64 = 10_000; // -- Protocol messages -- #[derive(Serialize, Deserialize)] struct PingMessage { pub id: u64, pub peer_list_hash: hash::Digest, } impl Message for PingMessage { type Response = PingMessage; } #[derive(Serialize, Deserialize)] struct PeerListMessage { pub list: Vec<(NodeID, SocketAddr)>, } impl Message for PeerListMessage { type Response = PeerListMessage; } // -- Algorithm data structures -- #[derive(Debug)] struct PeerInfoInternal { // known_addrs contains all of the addresses everyone gave us known_addrs: Vec, state: PeerConnState, last_send_ping: Option, last_seen: Option, ping: VecDeque, failed_pings: usize, } impl PeerInfoInternal { fn new(state: PeerConnState, known_addr: Option) -> Self { Self { known_addrs: known_addr.map(|x| vec![x]).unwrap_or_default(), state, last_send_ping: None, last_seen: None, ping: VecDeque::new(), failed_pings: 0, } } fn add_addr(&mut self, addr: SocketAddr) -> bool { if !self.known_addrs.contains(&addr) { self.known_addrs.push(addr); // If we are learning a new address for this node, // we want to retry connecting self.state = match self.state { PeerConnState::Trying(_) => PeerConnState::Trying(0), PeerConnState::Waiting(_, _) | PeerConnState::Abandonned => { PeerConnState::Waiting(0, Instant::now()) } x @ (PeerConnState::Ourself | PeerConnState::Connected { .. }) => x, }; true } else { false } } } /// Information that the full mesh peering strategy can return about the peers it knows of #[derive(Copy, Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) pub id: NodeID, /// The current status of our connection to this node pub state: PeerConnState, /// The last time at which the node was seen pub last_seen: Option, /// The average ping to this node on recent observations (if at least one ping value is known) pub avg_ping: Option, /// The maximum observed ping to this node on recent observations (if at least one /// ping value is known) pub max_ping: Option, /// The median ping to this node on recent observations (if at least one ping value /// is known) pub med_ping: Option, } impl PeerInfo { /// Returns true if we can currently send requests to this peer pub fn is_up(&self) -> bool { self.state.is_up() } } /// PeerConnState: possible states for our tentative connections to given peer /// This structure is only interested in recording connection info for outgoing /// TCP connections #[derive(Copy, Clone, Debug, PartialEq, Eq)] pub enum PeerConnState { /// This entry represents ourself (the local node) Ourself, /// We currently have a connection to this peer Connected { addr: SocketAddr }, /// Our next connection tentative (the nth, where n is the first value of the tuple) /// will be at given Instant Waiting(usize, Instant), /// A connection tentative is in progress (the nth, where n is the value stored) Trying(usize), /// We abandonned trying to connect to this peer (too many failed attempts) Abandonned, } impl PeerConnState { /// Returns true if we can currently send requests to this peer pub fn is_up(&self) -> bool { matches!(self, Self::Ourself | Self::Connected { .. }) } } struct KnownHosts { list: HashMap, hash: hash::Digest, } impl KnownHosts { fn new() -> Self { let list = HashMap::new(); let mut ret = Self { list, hash: hash::Digest::from_slice(&[0u8; 64][..]).unwrap(), }; ret.update_hash(); ret } fn update_hash(&mut self) { // The hash is a value that is exchanged between nodes when they ping one // another. Nodes compare their known hosts hash to know if they are connected // to the same set of nodes. If the hashes differ, they are connected to // different nodes and they trigger an exchange of the full list of active // connections. The hash value only represents the set of node IDs and not // their actual socket addresses, because nodes can be connected via different // addresses and that shouldn't necessarily trigger a full peer exchange. let mut list = self .list .iter() .filter(|(_, peer)| peer.state.is_up()) .map(|(id, _)| *id) .collect::>(); list.sort(); let mut hash_state = hash::State::new(); for id in list { hash_state.update(&id[..]); } self.hash = hash_state.finalize(); } fn connected_peers_vec(&self) -> Vec<(NodeID, SocketAddr)> { self.list .iter() .filter_map(|(id, peer)| match peer.state { PeerConnState::Connected { addr } => Some((*id, addr)), _ => None, }) .collect::>() } } /// A "Full Mesh" peering strategy is a peering strategy that tries /// to establish and maintain a direct connection with all of the /// known nodes in the network. pub struct PeeringManager { netapp: Arc, known_hosts: RwLock, public_peer_list: ArcSwap>, next_ping_id: AtomicU64, ping_endpoint: Arc>, peer_list_endpoint: Arc>, ping_timeout_millis: AtomicU64, } impl PeeringManager { /// Create a new Full Mesh peering strategy. /// The strategy will not be run until `.run()` is called and awaited. /// Once that happens, the peering strategy will try to connect /// to all of the nodes specified in the bootstrap list. pub fn new( netapp: Arc, bootstrap_list: Vec<(NodeID, SocketAddr)>, our_addr: Option, ) -> Arc { let mut known_hosts = KnownHosts::new(); for (id, addr) in bootstrap_list { if id != netapp.id { known_hosts.list.insert( id, PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)), ); } } known_hosts.list.insert( netapp.id, PeerInfoInternal::new(PeerConnState::Ourself, our_addr), ); known_hosts.update_hash(); let strat = Arc::new(Self { netapp: netapp.clone(), known_hosts: RwLock::new(known_hosts), public_peer_list: ArcSwap::new(Arc::new(Vec::new())), next_ping_id: AtomicU64::new(42), ping_endpoint: netapp.endpoint("garage_net/peering.rs/Ping".into()), peer_list_endpoint: netapp.endpoint("garage_net/peering.rs/PeerList".into()), ping_timeout_millis: DEFAULT_PING_TIMEOUT_MILLIS.into(), }); strat.update_public_peer_list(&strat.known_hosts.read().unwrap()); strat.ping_endpoint.set_handler(strat.clone()); strat.peer_list_endpoint.set_handler(strat.clone()); let strat2 = strat.clone(); netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { strat2.on_connected(id, addr, is_incoming); }); let strat2 = strat.clone(); netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { strat2.on_disconnected(id, is_incoming); }); strat } /// Run the full mesh peering strategy. /// This future exits when the `must_exit` watch becomes true. pub async fn run(self: Arc, must_exit: watch::Receiver) { while !*must_exit.borrow() { // 1. Read current state: get list of connected peers (ping them) let (to_ping, to_retry) = { let known_hosts = self.known_hosts.read().unwrap(); trace!("known_hosts: {} peers", known_hosts.list.len()); let mut to_ping = vec![]; let mut to_retry = vec![]; for (id, info) in known_hosts.list.iter() { trace!("{}, {:?}", hex::encode(&id[..8]), info); match info.state { PeerConnState::Connected { .. } => { let must_ping = match info.last_send_ping { None => true, Some(t) => Instant::now() - t > PING_INTERVAL, }; if must_ping { to_ping.push(*id); } } PeerConnState::Waiting(_, t) => { if Instant::now() >= t { to_retry.push(*id); } } _ => (), } } (to_ping, to_retry) }; // 2. Dispatch ping to hosts trace!("to_ping: {} peers", to_ping.len()); if !to_ping.is_empty() { let mut known_hosts = self.known_hosts.write().unwrap(); for id in to_ping.iter() { known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now()); } drop(known_hosts); for id in to_ping { tokio::spawn(self.clone().ping(id)); } } // 3. Try reconnects trace!("to_retry: {} peers", to_retry.len()); if !to_retry.is_empty() { let mut known_hosts = self.known_hosts.write().unwrap(); for id in to_retry { if let Some(h) = known_hosts.list.get_mut(&id) { if let PeerConnState::Waiting(i, _) = h.state { info!( "Retrying connection to {} at {} ({})", hex::encode(&id[..8]), h.known_addrs .iter() .map(|x| format!("{}", x)) .collect::>() .join(", "), i + 1 ); h.state = PeerConnState::Trying(i); let addresses = h.known_addrs.clone(); tokio::spawn(self.clone().try_connect(id, addresses)); } } } self.update_public_peer_list(&known_hosts); } // 4. Sleep before next loop iteration tokio::time::sleep(LOOP_DELAY).await; } } /// Returns a list of currently known peers in the network. pub fn get_peer_list(&self) -> Arc> { self.public_peer_list.load_full() } /// Set the timeout for ping messages, in milliseconds pub fn set_ping_timeout_millis(&self, timeout: u64) { self.ping_timeout_millis .store(timeout, atomic::Ordering::Relaxed); } // -- internal stuff -- fn update_public_peer_list(&self, known_hosts: &KnownHosts) { let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len()); for (id, info) in known_hosts.list.iter() { if *id == self.netapp.id { // sanity check assert!(matches!(info.state, PeerConnState::Ourself)); } let mut pings = info.ping.iter().cloned().collect::>(); pings.sort(); if !pings.is_empty() { pub_peer_list.push(PeerInfo { id: *id, state: info.state, last_seen: info.last_seen, avg_ping: Some(pings.iter().sum::().div_f64(pings.len() as f64)), max_ping: pings.last().cloned(), med_ping: Some(pings[pings.len() / 2]), }); } else { pub_peer_list.push(PeerInfo { id: *id, state: info.state, last_seen: info.last_seen, avg_ping: None, max_ping: None, med_ping: None, }); } } self.public_peer_list.store(Arc::new(pub_peer_list)); } async fn ping(self: Arc, id: NodeID) { let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_time = Instant::now(); let ping_timeout = Duration::from_millis(self.ping_timeout_millis.load(atomic::Ordering::Relaxed)); let ping_msg = PingMessage { id: ping_id, peer_list_hash, }; debug!( "Sending ping {} to {} at {:?}", ping_id, hex::encode(&id[..8]), ping_time ); let ping_response = select! { r = self.ping_endpoint.call(&id, ping_msg, PRIO_HIGH) => r, _ = tokio::time::sleep(ping_timeout) => Err(Error::Message("Ping timeout".into())), }; match ping_response { Err(e) => { warn!("Error pinging {}: {}", hex::encode(&id[..8]), e); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.failed_pings += 1; if host.failed_pings > FAILED_PING_THRESHOLD { warn!( "Too many failed pings from {}, closing connection.", hex::encode(&id[..8]) ); // this will later update info in known_hosts // through the disconnection handler self.netapp.disconnect(&id); } } } Ok(ping_resp) => { let resp_time = Instant::now(); debug!( "Got ping response from {} at {:?}", hex::encode(&id[..8]), resp_time ); { let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.failed_pings = 0; host.last_seen = Some(resp_time); host.ping.push_back(resp_time - ping_time); while host.ping.len() > 10 { host.ping.pop_front(); } self.update_public_peer_list(&known_hosts); } } if ping_resp.peer_list_hash != peer_list_hash { self.exchange_peers(&id).await; } } } } async fn exchange_peers(self: Arc, id: &NodeID) { let peer_list = self.known_hosts.read().unwrap().connected_peers_vec(); let pex_message = PeerListMessage { list: peer_list }; match self .peer_list_endpoint .call(id, pex_message, PRIO_BACKGROUND) .await { Err(e) => warn!("Error doing peer exchange: {}", e), Ok(resp) => { self.handle_peer_list(&resp.list[..]); } } } fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { let mut known_hosts = self.known_hosts.write().unwrap(); let mut changed = false; for (id, addr) in list.iter() { if let Some(kh) = known_hosts.list.get_mut(id) { if kh.add_addr(*addr) { changed = true; } } else { known_hosts.list.insert(*id, self.new_peer(id, *addr)); changed = true; } } if changed { known_hosts.update_hash(); self.update_public_peer_list(&known_hosts); } } async fn try_connect(self: Arc, id: NodeID, addresses: Vec) { let conn_addr = { let mut ret = None; for addr in addresses.iter() { debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); match self.netapp.clone().try_connect(*addr, id).await { Ok(()) => { ret = Some(*addr); break; } Err(e) => { debug!( "Error connecting to {} at {}: {}", hex::encode(&id[..8]), addr, e ); } } } ret }; if let Some(ok_addr) = conn_addr { self.on_connected(id, ok_addr, false); } else { warn!( "Could not connect to peer {} ({} addresses tried)", hex::encode(&id[..8]), addresses.len() ); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = match host.state { PeerConnState::Trying(i) => { if i >= CONN_MAX_RETRIES { PeerConnState::Abandonned } else { PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL) } } _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), }; self.update_public_peer_list(&known_hosts); } } } fn on_connected(self: &Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { if id == self.netapp.id { // sanity check panic!( "on_connected from local node, id={:?}, addr={}, incoming={}", id, addr, is_incoming ); } let mut known_hosts = self.known_hosts.write().unwrap(); if is_incoming { if let Some(host) = known_hosts.list.get_mut(&id) { host.add_addr(addr); } else { known_hosts.list.insert(id, self.new_peer(&id, addr)); } } else { info!( "Successfully connected to {} at {}", hex::encode(&id[..8]), addr ); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected { addr }; host.add_addr(addr); } else { known_hosts.list.insert( id, PeerInfoInternal::new(PeerConnState::Connected { addr }, Some(addr)), ); } } known_hosts.update_hash(); self.update_public_peer_list(&known_hosts); } fn on_disconnected(self: &Arc, id: NodeID, is_incoming: bool) { if !is_incoming { info!("Connection to {} was closed", hex::encode(&id[..8])); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Waiting(0, Instant::now()); known_hosts.update_hash(); self.update_public_peer_list(&known_hosts); } } } fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { assert!(*id != self.netapp.id); PeerInfoInternal::new(PeerConnState::Waiting(0, Instant::now()), Some(addr)) } } #[async_trait] impl EndpointHandler for PeeringManager { async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { id: ping.id, peer_list_hash: self.known_hosts.read().unwrap().hash, }; debug!("Ping from {}", hex::encode(&from[..8])); ping_resp } } #[async_trait] impl EndpointHandler for PeeringManager { async fn handle( self: &Arc, peer_list: &PeerListMessage, _from: NodeID, ) -> PeerListMessage { self.handle_peer_list(&peer_list.list[..]); let peer_list = self.known_hosts.read().unwrap().connected_peers_vec(); PeerListMessage { list: peer_list } } }