diff --git a/src/membership.rs b/src/membership.rs index b025c2bb..8b067686 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -4,7 +4,6 @@ use std::time::Duration; use std::net::{IpAddr, SocketAddr}; use futures::future::join_all; -use futures::stream::StreamExt; use hyper::client::Client; use tokio::sync::RwLock; use sha2::{Sha256, Digest}; @@ -37,15 +36,24 @@ pub struct Members { impl Members { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { - self.status.insert(info.id.clone(), + let addr = SocketAddr::new(ip, info.rpc_port); + let old_status = self.status.insert(info.id.clone(), NodeStatus{ - addr: SocketAddr::new(ip, info.rpc_port), + addr: addr.clone(), remaining_ping_attempts: MAX_FAILED_PINGS, - }).is_none() + }); + match old_status { + None => { + eprintln!("Discovered new node (ping): {}", hex::encode(info.id)); + true + } + Some(x) => x.addr != addr, + } } fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool { if !self.status.contains_key(id) { + eprintln!("Discovered new node (advertisment): {}", hex::encode(id)); self.status.insert(id.clone(), NodeStatus{ addr: addr.clone(), @@ -104,7 +112,7 @@ impl System { pub async fn broadcast(self: Arc, msg: Message, timeout: Duration) { let members = self.members.read().await; - let to = members.status.keys().cloned().collect::>(); + let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::>(); drop(members); rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await; } @@ -140,7 +148,9 @@ impl System { { let mut members = self.members.write().await; let is_new = members.handle_ping(from.ip(), ping); - members.recalculate_status_hash(); + if is_new { + members.recalculate_status_hash(); + } let status_hash = members.status_hash.clone(); let config_version = members.config.version; drop(members); @@ -187,6 +197,7 @@ impl System { propagate.push(node.clone()); } } + drop(members); if propagate.len() > 0 { tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); @@ -214,6 +225,7 @@ impl System { let members = self.members.read().await; let ping_addrs = members.status.iter() + .filter(|(id, _)| **id != self.id) .map(|(id, status)| (id.clone(), status.addr.clone())) .collect::>(); drop(members); @@ -257,13 +269,15 @@ impl System { } } - pub async fn pull_status(self: Arc, peer: UUID) { - let resp = rpc_call(self.clone(), - &peer, - &Message::PullStatus, - PING_TIMEOUT).await; - if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { - self.handle_advertise_nodes_up(&nodes).await; + pub fn pull_status(self: Arc, peer: UUID) -> impl futures::future::Future + Send + 'static { + async move { + let resp = rpc_call(self.clone(), + &peer, + &Message::PullStatus, + PING_TIMEOUT).await; + if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { + let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await; + } } } @@ -273,7 +287,7 @@ impl System { &Message::PullConfig, PING_TIMEOUT).await; if let Ok(Message::AdvertiseConfig(config)) = resp { - self.handle_advertise_config(&config).await; + let _: Result<_, _> = self.handle_advertise_config(&config).await; } } } diff --git a/src/rpc.rs b/src/rpc.rs index d8bb08a8..5f25dafb 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -24,31 +24,10 @@ pub async fn rpc_call_many(sys: Arc, timeout: Duration) -> Vec> { - let resp_stream = to.iter() + let mut resp_stream = to.iter() .map(|to| rpc_call(sys.clone(), to, msg, timeout)) .collect::>(); - collect_rpc_results(resp_stream, stop_after).await -} - -pub async fn rpc_call_many_addr(sys: Arc, - to: &[SocketAddr], - msg: &Message, - stop_after: Option, - timeout: Duration) - -> Vec> -{ - let resp_stream = to.iter() - .map(|to| rpc_call_addr(sys.clone(), to, msg, timeout)) - .collect::>(); - - collect_rpc_results(resp_stream, stop_after).await -} - -async fn collect_rpc_results(mut resp_stream: FuturesUnordered>>, - stop_after: Option) - -> Vec> -{ let mut results = vec![]; let mut n_ok = 0; while let Some(resp) = resp_stream.next().await {