Seems to be fixed
This commit is contained in:
parent
87f2b4d2fc
commit
a09f019cc5
2 changed files with 29 additions and 36 deletions
|
@ -4,7 +4,6 @@ use std::time::Duration;
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use futures::stream::StreamExt;
|
|
||||||
use hyper::client::Client;
|
use hyper::client::Client;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::RwLock;
|
||||||
use sha2::{Sha256, Digest};
|
use sha2::{Sha256, Digest};
|
||||||
|
@ -37,15 +36,24 @@ pub struct Members {
|
||||||
|
|
||||||
impl Members {
|
impl Members {
|
||||||
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool {
|
||||||
self.status.insert(info.id.clone(),
|
let addr = SocketAddr::new(ip, info.rpc_port);
|
||||||
|
let old_status = self.status.insert(info.id.clone(),
|
||||||
NodeStatus{
|
NodeStatus{
|
||||||
addr: SocketAddr::new(ip, info.rpc_port),
|
addr: addr.clone(),
|
||||||
remaining_ping_attempts: MAX_FAILED_PINGS,
|
remaining_ping_attempts: MAX_FAILED_PINGS,
|
||||||
}).is_none()
|
});
|
||||||
|
match old_status {
|
||||||
|
None => {
|
||||||
|
eprintln!("Discovered new node (ping): {}", hex::encode(info.id));
|
||||||
|
true
|
||||||
|
}
|
||||||
|
Some(x) => x.addr != addr,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
|
fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
|
||||||
if !self.status.contains_key(id) {
|
if !self.status.contains_key(id) {
|
||||||
|
eprintln!("Discovered new node (advertisment): {}", hex::encode(id));
|
||||||
self.status.insert(id.clone(),
|
self.status.insert(id.clone(),
|
||||||
NodeStatus{
|
NodeStatus{
|
||||||
addr: addr.clone(),
|
addr: addr.clone(),
|
||||||
|
@ -104,7 +112,7 @@ impl System {
|
||||||
|
|
||||||
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
pub async fn broadcast(self: Arc<Self>, msg: Message, timeout: Duration) {
|
||||||
let members = self.members.read().await;
|
let members = self.members.read().await;
|
||||||
let to = members.status.keys().cloned().collect::<Vec<_>>();
|
let to = members.status.keys().filter(|x| **x != self.id).cloned().collect::<Vec<_>>();
|
||||||
drop(members);
|
drop(members);
|
||||||
rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await;
|
rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await;
|
||||||
}
|
}
|
||||||
|
@ -140,7 +148,9 @@ impl System {
|
||||||
{
|
{
|
||||||
let mut members = self.members.write().await;
|
let mut members = self.members.write().await;
|
||||||
let is_new = members.handle_ping(from.ip(), ping);
|
let is_new = members.handle_ping(from.ip(), ping);
|
||||||
members.recalculate_status_hash();
|
if is_new {
|
||||||
|
members.recalculate_status_hash();
|
||||||
|
}
|
||||||
let status_hash = members.status_hash.clone();
|
let status_hash = members.status_hash.clone();
|
||||||
let config_version = members.config.version;
|
let config_version = members.config.version;
|
||||||
drop(members);
|
drop(members);
|
||||||
|
@ -187,6 +197,7 @@ impl System {
|
||||||
propagate.push(node.clone());
|
propagate.push(node.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
drop(members);
|
||||||
|
|
||||||
if propagate.len() > 0 {
|
if propagate.len() > 0 {
|
||||||
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
|
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT));
|
||||||
|
@ -214,6 +225,7 @@ impl System {
|
||||||
|
|
||||||
let members = self.members.read().await;
|
let members = self.members.read().await;
|
||||||
let ping_addrs = members.status.iter()
|
let ping_addrs = members.status.iter()
|
||||||
|
.filter(|(id, _)| **id != self.id)
|
||||||
.map(|(id, status)| (id.clone(), status.addr.clone()))
|
.map(|(id, status)| (id.clone(), status.addr.clone()))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
drop(members);
|
drop(members);
|
||||||
|
@ -257,13 +269,15 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn pull_status(self: Arc<Self>, peer: UUID) {
|
pub fn pull_status(self: Arc<Self>, peer: UUID) -> impl futures::future::Future<Output=()> + Send + 'static {
|
||||||
let resp = rpc_call(self.clone(),
|
async move {
|
||||||
&peer,
|
let resp = rpc_call(self.clone(),
|
||||||
&Message::PullStatus,
|
&peer,
|
||||||
PING_TIMEOUT).await;
|
&Message::PullStatus,
|
||||||
if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
|
PING_TIMEOUT).await;
|
||||||
self.handle_advertise_nodes_up(&nodes).await;
|
if let Ok(Message::AdvertiseNodesUp(nodes)) = resp {
|
||||||
|
let _: Result<_, _> = self.handle_advertise_nodes_up(&nodes).await;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -273,7 +287,7 @@ impl System {
|
||||||
&Message::PullConfig,
|
&Message::PullConfig,
|
||||||
PING_TIMEOUT).await;
|
PING_TIMEOUT).await;
|
||||||
if let Ok(Message::AdvertiseConfig(config)) = resp {
|
if let Ok(Message::AdvertiseConfig(config)) = resp {
|
||||||
self.handle_advertise_config(&config).await;
|
let _: Result<_, _> = self.handle_advertise_config(&config).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
23
src/rpc.rs
23
src/rpc.rs
|
@ -24,31 +24,10 @@ pub async fn rpc_call_many(sys: Arc<System>,
|
||||||
timeout: Duration)
|
timeout: Duration)
|
||||||
-> Vec<Result<Message, Error>>
|
-> Vec<Result<Message, Error>>
|
||||||
{
|
{
|
||||||
let resp_stream = to.iter()
|
let mut resp_stream = to.iter()
|
||||||
.map(|to| rpc_call(sys.clone(), to, msg, timeout))
|
.map(|to| rpc_call(sys.clone(), to, msg, timeout))
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
collect_rpc_results(resp_stream, stop_after).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn rpc_call_many_addr(sys: Arc<System>,
|
|
||||||
to: &[SocketAddr],
|
|
||||||
msg: &Message,
|
|
||||||
stop_after: Option<usize>,
|
|
||||||
timeout: Duration)
|
|
||||||
-> Vec<Result<Message, Error>>
|
|
||||||
{
|
|
||||||
let resp_stream = to.iter()
|
|
||||||
.map(|to| rpc_call_addr(sys.clone(), to, msg, timeout))
|
|
||||||
.collect::<FuturesUnordered<_>>();
|
|
||||||
|
|
||||||
collect_rpc_results(resp_stream, stop_after).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn collect_rpc_results(mut resp_stream: FuturesUnordered<impl Future<Output=Result<Message, Error>>>,
|
|
||||||
stop_after: Option<usize>)
|
|
||||||
-> Vec<Result<Message, Error>>
|
|
||||||
{
|
|
||||||
let mut results = vec![];
|
let mut results = vec![];
|
||||||
let mut n_ok = 0;
|
let mut n_ok = 0;
|
||||||
while let Some(resp) = resp_stream.next().await {
|
while let Some(resp) = resp_stream.next().await {
|
||||||
|
|
Loading…
Reference in a new issue