This commit is contained in:
parent
baa714538d
commit
01a2737bd8
4 changed files with 141 additions and 62 deletions
|
@ -12,13 +12,15 @@ use crate::proto::*;
|
||||||
use crate::util::*;
|
use crate::util::*;
|
||||||
|
|
||||||
/// This trait should be implemented by all messages your application
|
/// This trait should be implemented by all messages your application
|
||||||
/// wants to handle (click to read more).
|
/// wants to handle
|
||||||
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
|
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
|
||||||
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
|
/// This trait should be implemented by an object of your application
|
||||||
|
/// that can handle a message of type `M`.
|
||||||
|
///
|
||||||
|
/// The handler object should be in an Arc, see `Endpoint::set_handler`
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait EndpointHandler<M>: Send + Sync
|
pub trait EndpointHandler<M>: Send + Sync
|
||||||
where
|
where
|
||||||
|
@ -27,6 +29,27 @@ where
|
||||||
async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response;
|
async fn handle(self: &Arc<Self>, m: M, from: NodeID) -> M::Response;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// If one simply wants to use an endpoint in a client fashion,
|
||||||
|
/// without locally serving requests to that endpoint,
|
||||||
|
/// use the unit type `()` as the handler type:
|
||||||
|
/// it will panic if it is ever made to handle request.
|
||||||
|
#[async_trait]
|
||||||
|
impl<M: Message + 'static> EndpointHandler<M> for () {
|
||||||
|
async fn handle(self: &Arc<()>, _m: M, _from: NodeID) -> M::Response {
|
||||||
|
panic!("This endpoint should not have a local handler.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// This struct represents an endpoint for message of type `M`.
|
||||||
|
///
|
||||||
|
/// Creating a new endpoint is done by calling `NetApp::endpoint`.
|
||||||
|
/// An endpoint is identified primarily by its path, which is specified
|
||||||
|
/// at creation time.
|
||||||
|
///
|
||||||
|
/// An `Endpoint` is used both to send requests to remote nodes,
|
||||||
|
/// and to specify the handler for such requests on the local node.
|
||||||
|
/// The type `H` represents the type of the handler object for
|
||||||
|
/// endpoint messages (see `EndpointHandler`).
|
||||||
pub struct Endpoint<M, H>
|
pub struct Endpoint<M, H>
|
||||||
where
|
where
|
||||||
M: Message,
|
M: Message,
|
||||||
|
@ -51,9 +74,15 @@ where
|
||||||
handler: ArcSwapOption::from(None),
|
handler: ArcSwapOption::from(None),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the object that is responsible of handling requests to
|
||||||
|
/// this endpoint on the local node.
|
||||||
pub fn set_handler(&self, h: Arc<H>) {
|
pub fn set_handler(&self, h: Arc<H>) {
|
||||||
self.handler.swap(Some(h));
|
self.handler.swap(Some(h));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Call this endpoint on a remote node (or on the local node,
|
||||||
|
/// for that matter)
|
||||||
pub async fn call(
|
pub async fn call(
|
||||||
&self,
|
&self,
|
||||||
target: &NodeID,
|
target: &NodeID,
|
||||||
|
@ -84,6 +113,10 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- Internal stuff ----
|
||||||
|
|
||||||
|
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub(crate) trait GenericEndpoint {
|
pub(crate) trait GenericEndpoint {
|
||||||
async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>;
|
async fn handle(&self, buf: &[u8], from: NodeID) -> Result<Vec<u8>, Error>;
|
||||||
|
|
|
@ -125,21 +125,30 @@ impl NetApp {
|
||||||
.store(Some(Arc::new(Box::new(handler))));
|
.store(Some(Arc::new(Box::new(handler))));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn endpoint<M, H>(self: &Arc<Self>, name: String) -> Arc<Endpoint<M, H>>
|
/// Create a new endpoint with path `path`,
|
||||||
|
/// that handles messages of type `M`.
|
||||||
|
/// `H` is the type of the object that should handle requests
|
||||||
|
/// to this endpoint on the local node. If you don't want
|
||||||
|
/// to handle request on the local node (e.g. if this node
|
||||||
|
/// is only a client in the network), define the type `H`
|
||||||
|
/// to be `()`.
|
||||||
|
/// This function will panic if the endpoint has already been
|
||||||
|
/// created.
|
||||||
|
pub fn endpoint<M, H>(self: &Arc<Self>, path: String) -> Arc<Endpoint<M, H>>
|
||||||
where
|
where
|
||||||
M: Message + 'static,
|
M: Message + 'static,
|
||||||
H: EndpointHandler<M> + 'static,
|
H: EndpointHandler<M> + 'static,
|
||||||
{
|
{
|
||||||
let endpoint = Arc::new(Endpoint::<M, H>::new(self.clone(), name.clone()));
|
let endpoint = Arc::new(Endpoint::<M, H>::new(self.clone(), path.clone()));
|
||||||
let endpoint_arc = EndpointArc(endpoint.clone());
|
let endpoint_arc = EndpointArc(endpoint.clone());
|
||||||
if self
|
if self
|
||||||
.endpoints
|
.endpoints
|
||||||
.write()
|
.write()
|
||||||
.unwrap()
|
.unwrap()
|
||||||
.insert(name.clone(), Box::new(endpoint_arc))
|
.insert(path.clone(), Box::new(endpoint_arc))
|
||||||
.is_some()
|
.is_some()
|
||||||
{
|
{
|
||||||
panic!("Redefining endpoint: {}", name);
|
panic!("Redefining endpoint: {}", path);
|
||||||
};
|
};
|
||||||
endpoint
|
endpoint
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,6 +4,7 @@ use std::sync::atomic::{self, AtomicU64};
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use arc_swap::ArcSwap;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::{debug, info, trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -46,7 +47,7 @@ impl Message for PeerListMessage {
|
||||||
// -- Algorithm data structures --
|
// -- Algorithm data structures --
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
struct PeerInfo {
|
struct PeerInfoInternal {
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
state: PeerConnState,
|
state: PeerConnState,
|
||||||
last_seen: Option<Instant>,
|
last_seen: Option<Instant>,
|
||||||
|
@ -54,40 +55,49 @@ struct PeerInfo {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub struct PeerInfoPub {
|
pub struct PeerInfo {
|
||||||
|
/// The node's identifier (its public key)
|
||||||
pub id: NodeID,
|
pub id: NodeID,
|
||||||
|
/// The node's network address
|
||||||
pub addr: SocketAddr,
|
pub addr: SocketAddr,
|
||||||
|
/// The current status of our connection to this node
|
||||||
pub state: PeerConnState,
|
pub state: PeerConnState,
|
||||||
|
/// The last time at which the node was seen
|
||||||
pub last_seen: Option<Instant>,
|
pub last_seen: Option<Instant>,
|
||||||
|
/// The average ping to this node on recent observations (if at least one ping value is known)
|
||||||
pub avg_ping: Option<Duration>,
|
pub avg_ping: Option<Duration>,
|
||||||
|
/// The maximum observed ping to this node on recent observations (if at least one
|
||||||
|
/// ping value is known)
|
||||||
pub max_ping: Option<Duration>,
|
pub max_ping: Option<Duration>,
|
||||||
|
/// The median ping to this node on recent observations (if at least one ping value
|
||||||
|
/// is known)
|
||||||
pub med_ping: Option<Duration>,
|
pub med_ping: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
// PeerConnState: possible states for our tentative connections to given peer
|
/// PeerConnState: possible states for our tentative connections to given peer
|
||||||
// This module is only interested in recording connection info for outgoing
|
/// This structure is only interested in recording connection info for outgoing
|
||||||
// TCP connections
|
/// TCP connections
|
||||||
#[derive(Copy, Clone, Debug, PartialEq)]
|
#[derive(Copy, Clone, Debug, PartialEq)]
|
||||||
pub enum PeerConnState {
|
pub enum PeerConnState {
|
||||||
// This entry represents ourself
|
/// This entry represents ourself (the local node)
|
||||||
Ourself,
|
Ourself,
|
||||||
|
|
||||||
// We currently have a connection to this peer
|
/// We currently have a connection to this peer
|
||||||
Connected,
|
Connected,
|
||||||
|
|
||||||
// Our next connection tentative (the nth, where n is the first value)
|
/// Our next connection tentative (the nth, where n is the first value of the tuple)
|
||||||
// will be at given Instant
|
/// will be at given Instant
|
||||||
Waiting(usize, Instant),
|
Waiting(usize, Instant),
|
||||||
|
|
||||||
// A connection tentative is in progress
|
/// A connection tentative is in progress (the nth, where n is the value stored)
|
||||||
Trying(usize),
|
Trying(usize),
|
||||||
|
|
||||||
// We abandonned trying to connect to this peer (too many failed attempts)
|
/// We abandonned trying to connect to this peer (too many failed attempts)
|
||||||
Abandonned,
|
Abandonned,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct KnownHosts {
|
struct KnownHosts {
|
||||||
list: HashMap<NodeID, PeerInfo>,
|
list: HashMap<NodeID, PeerInfoInternal>,
|
||||||
hash: hash::Digest,
|
hash: hash::Digest,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -100,7 +110,7 @@ impl KnownHosts {
|
||||||
fn update_hash(&mut self) {
|
fn update_hash(&mut self) {
|
||||||
self.hash = Self::calculate_hash(&self.list);
|
self.hash = Self::calculate_hash(&self.list);
|
||||||
}
|
}
|
||||||
fn map_into_vec(input: &HashMap<NodeID, PeerInfo>) -> Vec<(NodeID, SocketAddr)> {
|
fn map_into_vec(input: &HashMap<NodeID, PeerInfoInternal>) -> Vec<(NodeID, SocketAddr)> {
|
||||||
let mut list = Vec::with_capacity(input.len());
|
let mut list = Vec::with_capacity(input.len());
|
||||||
for (id, peer) in input.iter() {
|
for (id, peer) in input.iter() {
|
||||||
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
|
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
|
||||||
|
@ -109,35 +119,43 @@ impl KnownHosts {
|
||||||
}
|
}
|
||||||
list
|
list
|
||||||
}
|
}
|
||||||
fn calculate_hash(input: &HashMap<NodeID, PeerInfo>) -> hash::Digest {
|
fn calculate_hash(input: &HashMap<NodeID, PeerInfoInternal>) -> hash::Digest {
|
||||||
let mut list = Self::map_into_vec(input);
|
let mut list = Self::map_into_vec(input);
|
||||||
list.sort();
|
list.sort();
|
||||||
let mut hash_state = hash::State::new();
|
let mut hash_state = hash::State::new();
|
||||||
for (id, addr) in list {
|
for (id, addr) in list {
|
||||||
hash_state.update(&id[..]);
|
hash_state.update(&id[..]);
|
||||||
hash_state.update(&format!("{}", addr).into_bytes()[..]);
|
hash_state.update(&format!("{}\n", addr).into_bytes()[..]);
|
||||||
}
|
}
|
||||||
hash_state.finalize()
|
hash_state.finalize()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// A "Full Mesh" peering strategy is a peering strategy that tries
|
||||||
|
/// to establish and maintain a direct connection with all of the
|
||||||
|
/// known nodes in the network.
|
||||||
pub struct FullMeshPeeringStrategy {
|
pub struct FullMeshPeeringStrategy {
|
||||||
netapp: Arc<NetApp>,
|
netapp: Arc<NetApp>,
|
||||||
known_hosts: RwLock<KnownHosts>,
|
known_hosts: RwLock<KnownHosts>,
|
||||||
next_ping_id: AtomicU64,
|
public_peer_list: ArcSwap<Vec<PeerInfo>>,
|
||||||
|
|
||||||
|
next_ping_id: AtomicU64,
|
||||||
ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
|
ping_endpoint: Arc<Endpoint<PingMessage, Self>>,
|
||||||
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
|
peer_list_endpoint: Arc<Endpoint<PeerListMessage, Self>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl FullMeshPeeringStrategy {
|
impl FullMeshPeeringStrategy {
|
||||||
|
/// Create a new Full Mesh peering strategy.
|
||||||
|
/// The strategy will not be run until `.run()` is called and awaited.
|
||||||
|
/// Once that happens, the peering strategy will try to connect
|
||||||
|
/// to all of the nodes specified in the bootstrap list.
|
||||||
pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> {
|
pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> {
|
||||||
let mut known_hosts = KnownHosts::new();
|
let mut known_hosts = KnownHosts::new();
|
||||||
for (id, addr) in bootstrap_list {
|
for (id, addr) in bootstrap_list {
|
||||||
if id != netapp.id {
|
if id != netapp.id {
|
||||||
known_hosts.list.insert(
|
known_hosts.list.insert(
|
||||||
id,
|
id,
|
||||||
PeerInfo {
|
PeerInfoInternal {
|
||||||
addr,
|
addr,
|
||||||
state: PeerConnState::Waiting(0, Instant::now()),
|
state: PeerConnState::Waiting(0, Instant::now()),
|
||||||
last_seen: None,
|
last_seen: None,
|
||||||
|
@ -150,6 +168,7 @@ impl FullMeshPeeringStrategy {
|
||||||
let strat = Arc::new(Self {
|
let strat = Arc::new(Self {
|
||||||
netapp: netapp.clone(),
|
netapp: netapp.clone(),
|
||||||
known_hosts: RwLock::new(known_hosts),
|
known_hosts: RwLock::new(known_hosts),
|
||||||
|
public_peer_list: ArcSwap::new(Arc::new(Vec::new())),
|
||||||
next_ping_id: AtomicU64::new(42),
|
next_ping_id: AtomicU64::new(42),
|
||||||
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
|
ping_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/Ping".into()),
|
||||||
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
|
peer_list_endpoint: netapp.endpoint("__netapp/peering/fullmesh.rs/PeerList".into()),
|
||||||
|
@ -173,6 +192,8 @@ impl FullMeshPeeringStrategy {
|
||||||
strat
|
strat
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Run the full mesh peering strategy.
|
||||||
|
/// This future exits when the `must_exit` watch becomes true.
|
||||||
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
// 1. Read current state: get list of connected peers (ping them)
|
// 1. Read current state: get list of connected peers (ping them)
|
||||||
|
@ -229,6 +250,7 @@ impl FullMeshPeeringStrategy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
|
|
||||||
// 4. Sleep before next loop iteration
|
// 4. Sleep before next loop iteration
|
||||||
|
@ -236,6 +258,48 @@ impl FullMeshPeeringStrategy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns a list of currently known peers in the network.
|
||||||
|
pub fn get_peer_list(&self) -> Arc<Vec<PeerInfo>> {
|
||||||
|
self.public_peer_list.load_full()
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- internal stuff --
|
||||||
|
|
||||||
|
fn update_public_peer_list(&self, known_hosts: &KnownHosts) {
|
||||||
|
let mut pub_peer_list = Vec::with_capacity(known_hosts.list.len());
|
||||||
|
for (id, info) in known_hosts.list.iter() {
|
||||||
|
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
|
||||||
|
pings.sort();
|
||||||
|
if !pings.is_empty() {
|
||||||
|
pub_peer_list.push(PeerInfo {
|
||||||
|
id: *id,
|
||||||
|
addr: info.addr,
|
||||||
|
state: info.state,
|
||||||
|
last_seen: info.last_seen,
|
||||||
|
avg_ping: Some(
|
||||||
|
pings
|
||||||
|
.iter()
|
||||||
|
.fold(Duration::from_secs(0), |x, y| x + *y)
|
||||||
|
.div_f64(pings.len() as f64),
|
||||||
|
),
|
||||||
|
max_ping: pings.last().cloned(),
|
||||||
|
med_ping: Some(pings[pings.len() / 2]),
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
pub_peer_list.push(PeerInfo {
|
||||||
|
id: *id,
|
||||||
|
addr: info.addr,
|
||||||
|
state: info.state,
|
||||||
|
last_seen: info.last_seen,
|
||||||
|
avg_ping: None,
|
||||||
|
max_ping: None,
|
||||||
|
med_ping: None,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.public_peer_list.store(Arc::new(pub_peer_list));
|
||||||
|
}
|
||||||
|
|
||||||
async fn ping(self: Arc<Self>, id: NodeID) {
|
async fn ping(self: Arc<Self>, id: NodeID) {
|
||||||
let peer_list_hash = self.known_hosts.read().unwrap().hash;
|
let peer_list_hash = self.known_hosts.read().unwrap().hash;
|
||||||
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
|
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
|
||||||
|
@ -268,6 +332,7 @@ impl FullMeshPeeringStrategy {
|
||||||
while host.ping.len() > 10 {
|
while host.ping.len() > 10 {
|
||||||
host.ping.pop_front();
|
host.ping.pop_front();
|
||||||
}
|
}
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if ping_resp.peer_list_hash != peer_list_hash {
|
if ping_resp.peer_list_hash != peer_list_hash {
|
||||||
|
@ -299,6 +364,7 @@ impl FullMeshPeeringStrategy {
|
||||||
known_hosts.list.insert(*id, self.new_peer(id, *addr));
|
known_hosts.list.insert(*id, self.new_peer(id, *addr));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
|
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
|
||||||
|
@ -317,6 +383,7 @@ impl FullMeshPeeringStrategy {
|
||||||
}
|
}
|
||||||
_ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL),
|
_ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL),
|
||||||
};
|
};
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -336,6 +403,7 @@ impl FullMeshPeeringStrategy {
|
||||||
if let Some(host) = known_hosts.list.get_mut(&id) {
|
if let Some(host) = known_hosts.list.get_mut(&id) {
|
||||||
host.state = PeerConnState::Connected;
|
host.state = PeerConnState::Connected;
|
||||||
known_hosts.update_hash();
|
known_hosts.update_hash();
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -347,53 +415,18 @@ impl FullMeshPeeringStrategy {
|
||||||
if let Some(host) = known_hosts.list.get_mut(&id) {
|
if let Some(host) = known_hosts.list.get_mut(&id) {
|
||||||
host.state = PeerConnState::Waiting(0, Instant::now());
|
host.state = PeerConnState::Waiting(0, Instant::now());
|
||||||
known_hosts.update_hash();
|
known_hosts.update_hash();
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn get_peer_list(&self) -> Vec<PeerInfoPub> {
|
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
|
||||||
let known_hosts = self.known_hosts.read().unwrap();
|
|
||||||
let mut ret = Vec::with_capacity(known_hosts.list.len());
|
|
||||||
for (id, info) in known_hosts.list.iter() {
|
|
||||||
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
|
|
||||||
pings.sort();
|
|
||||||
if !pings.is_empty() {
|
|
||||||
ret.push(PeerInfoPub {
|
|
||||||
id: *id,
|
|
||||||
addr: info.addr,
|
|
||||||
state: info.state,
|
|
||||||
last_seen: info.last_seen,
|
|
||||||
avg_ping: Some(
|
|
||||||
pings
|
|
||||||
.iter()
|
|
||||||
.fold(Duration::from_secs(0), |x, y| x + *y)
|
|
||||||
.div_f64(pings.len() as f64),
|
|
||||||
),
|
|
||||||
max_ping: pings.last().cloned(),
|
|
||||||
med_ping: Some(pings[pings.len() / 2]),
|
|
||||||
});
|
|
||||||
} else {
|
|
||||||
ret.push(PeerInfoPub {
|
|
||||||
id: *id,
|
|
||||||
addr: info.addr,
|
|
||||||
state: info.state,
|
|
||||||
last_seen: info.last_seen,
|
|
||||||
avg_ping: None,
|
|
||||||
max_ping: None,
|
|
||||||
med_ping: None,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
|
|
||||||
fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo {
|
|
||||||
let state = if *id == self.netapp.id {
|
let state = if *id == self.netapp.id {
|
||||||
PeerConnState::Ourself
|
PeerConnState::Ourself
|
||||||
} else {
|
} else {
|
||||||
PeerConnState::Waiting(0, Instant::now())
|
PeerConnState::Waiting(0, Instant::now())
|
||||||
};
|
};
|
||||||
PeerInfo {
|
PeerInfoInternal {
|
||||||
addr,
|
addr,
|
||||||
state,
|
state,
|
||||||
last_seen: None,
|
last_seen: None,
|
||||||
|
|
|
@ -4,6 +4,7 @@ use log::info;
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
/// A node's identifier, which is also its public cryptographic key
|
||||||
pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey;
|
pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey;
|
||||||
|
|
||||||
/// Utility function: encodes any serializable value in MessagePack binary format
|
/// Utility function: encodes any serializable value in MessagePack binary format
|
||||||
|
@ -25,6 +26,7 @@ where
|
||||||
|
|
||||||
/// This async function returns only when a true signal was received
|
/// This async function returns only when a true signal was received
|
||||||
/// from a watcher that tells us when to exit.
|
/// from a watcher that tells us when to exit.
|
||||||
|
///
|
||||||
/// Usefull in a select statement to interrupt another
|
/// Usefull in a select statement to interrupt another
|
||||||
/// future:
|
/// future:
|
||||||
/// ```ignore
|
/// ```ignore
|
||||||
|
@ -41,6 +43,8 @@ pub async fn await_exit(mut must_exit: watch::Receiver<bool>) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Creates a watch that contains `false`, and that changes
|
||||||
|
/// to `true` when a Ctrl+C signal is received.
|
||||||
pub fn watch_ctrl_c() -> watch::Receiver<bool> {
|
pub fn watch_ctrl_c() -> watch::Receiver<bool> {
|
||||||
let (send_cancel, watch_cancel) = watch::channel(false);
|
let (send_cancel, watch_cancel) = watch::channel(false);
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
|
|
Loading…
Reference in a new issue