Fix pinging

This commit is contained in:
Alex 2020-04-07 00:00:43 +02:00
parent a7b85146fe
commit 46d5b896e8
3 changed files with 83 additions and 76 deletions

View file

@ -27,6 +27,8 @@ pub struct Opt {
#[derive(Deserialize, Debug)] #[derive(Deserialize, Debug)]
pub struct Config { pub struct Config {
datacenter: String,
metadata_dir: PathBuf, metadata_dir: PathBuf,
data_dir: PathBuf, data_dir: PathBuf,

View file

@ -41,30 +41,17 @@ impl Members {
NodeStatus{ NodeStatus{
addr: addr.clone(), addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS, remaining_ping_attempts: MAX_FAILED_PINGS,
datacenter: info.datacenter.clone(),
}); });
match old_status { match old_status {
None => { None => {
eprintln!("Discovered new node (ping): {}", hex::encode(info.id)); eprintln!("Newly pingable node: {}", hex::encode(info.id));
true true
} }
Some(x) => x.addr != addr, Some(x) => x.addr != addr,
} }
} }
fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool {
if !self.status.contains_key(id) {
eprintln!("Discovered new node (advertisment): {}", hex::encode(id));
self.status.insert(id.clone(),
NodeStatus{
addr: addr.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
});
true
} else {
false
}
}
fn recalculate_status_hash(&mut self) { fn recalculate_status_hash(&mut self) {
let mut nodes = self.status.iter().collect::<Vec<_>>(); let mut nodes = self.status.iter().collect::<Vec<_>>();
nodes.sort_by_key(|(id, _status)| *id); nodes.sort_by_key(|(id, _status)| *id);
@ -72,7 +59,7 @@ impl Members {
let mut hasher = Sha256::new(); let mut hasher = Sha256::new();
eprintln!("Current set of pingable nodes: --"); eprintln!("Current set of pingable nodes: --");
for (id, status) in nodes { for (id, status) in nodes {
eprintln!("{} {}", hex::encode(id), status.addr); eprintln!("{} {} ({})", hex::encode(id), status.addr, status.datacenter);
hasher.input(format!("{} {}\n", hex::encode(id), status.addr)); hasher.input(format!("{} {}\n", hex::encode(id), status.addr));
} }
eprintln!("END --"); eprintln!("END --");
@ -82,6 +69,7 @@ impl Members {
pub struct NodeStatus { pub struct NodeStatus {
pub addr: SocketAddr, pub addr: SocketAddr,
pub datacenter: String,
pub remaining_ping_attempts: usize, pub remaining_ping_attempts: usize,
} }
@ -109,6 +97,7 @@ impl System {
let members = self.members.read().await; let members = self.members.read().await;
Message::Ping(PingMessage{ Message::Ping(PingMessage{
id: self.id, id: self.id,
datacenter: self.config.datacenter.clone(),
rpc_port: self.config.rpc_port, rpc_port: self.config.rpc_port,
status_hash: members.status_hash.clone(), status_hash: members.status_hash.clone(),
config_version: members.config.version, config_version: members.config.version,
@ -123,27 +112,69 @@ impl System {
} }
pub async fn bootstrap(self: Arc<Self>) { pub async fn bootstrap(self: Arc<Self>) {
let bootstrap_peers = self.config.bootstrap_peers
.iter()
.map(|ip| (ip.clone(), None))
.collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await;
tokio::spawn(self.ping_loop());
}
pub async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
let ping_msg = self.make_ping().await; let ping_msg = self.make_ping().await;
let ping_resps = join_all( let ping_resps = join_all(
self.config.bootstrap_peers.iter().cloned() peers.iter()
.map(|to| { .map(|(addr, id_option)| {
let sys = self.clone(); let sys = self.clone();
let ping_msg_ref = &ping_msg; let ping_msg_ref = &ping_msg;
async move { async move {
(to.clone(), rpc_call_addr(sys, &to, ping_msg_ref, PING_TIMEOUT).await) (id_option, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
} }
})).await; })).await;
let mut members = self.members.write().await; let mut members = self.members.write().await;
for (addr, ping_resp) in ping_resps {
let mut has_changes = false;
let mut to_advertise = vec![];
for (id_option, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(info)) = ping_resp { if let Ok(Message::Ping(info)) = ping_resp {
members.handle_ping(addr.ip(), &info); let is_new = members.handle_ping(addr.ip(), &info);
if is_new {
has_changes = true;
to_advertise.push(AdvertisedNode{
id: info.id.clone(),
addr: addr.clone(),
});
}
if is_new || members.status_hash != info.status_hash {
tokio::spawn(self.clone().pull_status(info.id.clone()));
}
if is_new || members.config.version < info.config_version {
tokio::spawn(self.clone().pull_config(info.id.clone()));
}
} else if let Some(id) = id_option {
let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
if remaining_attempts == 0 {
eprintln!("Removing node {} after too many failed pings", hex::encode(id));
members.status.remove(id);
has_changes = true;
} else {
if let Some(st) = members.status.get_mut(id) {
st.remaining_ping_attempts = remaining_attempts - 1;
}
}
} }
} }
members.recalculate_status_hash(); if has_changes {
members.recalculate_status_hash();
}
drop(members); drop(members);
tokio::spawn(self.ping_loop()); if to_advertise.len() > 0 {
self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT).await;
}
} }
pub async fn handle_ping(self: Arc<Self>, pub async fn handle_ping(self: Arc<Self>,
@ -191,21 +222,36 @@ impl System {
adv: &[AdvertisedNode]) adv: &[AdvertisedNode])
-> Result<Message, Error> -> Result<Message, Error>
{ {
let mut propagate = vec![]; let mut to_ping = vec![];
let mut members = self.members.write().await; let mut members = self.members.write().await;
let mut has_changed = false;
for node in adv.iter() { for node in adv.iter() {
let is_new = members.handle_advertise_node(&node.id, &node.addr); if node.id == self.id {
if is_new { // learn our own ip address
tokio::spawn(self.clone().pull_status(node.id.clone())); let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port);
tokio::spawn(self.clone().pull_config(node.id.clone())); let old_self = members.status.insert(node.id.clone(),
propagate.push(node.clone()); NodeStatus{
addr: self_addr,
datacenter: self.config.datacenter.clone(),
remaining_ping_attempts: MAX_FAILED_PINGS,
});
has_changed = match old_self {
None => true,
Some(x) => x.addr != self_addr,
};
} else if !members.status.contains_key(&node.id) {
to_ping.push((node.addr.clone(), Some(node.id.clone())));
} }
} }
if has_changed {
if propagate.len() > 0 {
members.recalculate_status_hash(); members.recalculate_status_hash();
tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); }
drop(members);
if to_ping.len() > 0 {
tokio::spawn(self.clone().ping_nodes(to_ping));
} }
Ok(Message::Ok) Ok(Message::Ok)
@ -231,53 +277,11 @@ impl System {
let members = self.members.read().await; let members = self.members.read().await;
let ping_addrs = members.status.iter() let ping_addrs = members.status.iter()
.filter(|(id, _)| **id != self.id) .filter(|(id, _)| **id != self.id)
.map(|(id, status)| (id.clone(), status.addr.clone())) .map(|(id, status)| (status.addr.clone(), Some(id.clone())))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
drop(members); drop(members);
let ping_msg = self.make_ping().await; self.clone().ping_nodes(ping_addrs).await;
let ping_resps = join_all(
ping_addrs.iter()
.map(|(id, addr)| {
let sys = self.clone();
let ping_msg_ref = &ping_msg;
async move {
(id, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await)
}
})).await;
let mut members = self.members.write().await;
let mut has_changes = false;
for (id, addr, ping_resp) in ping_resps {
if let Ok(Message::Ping(ping)) = ping_resp {
let is_new = members.handle_ping(addr.ip(), &ping);
if is_new {
has_changes = true;
}
if is_new || members.status_hash != ping.status_hash {
tokio::spawn(self.clone().pull_status(ping.id.clone()));
}
if is_new || members.config.version < ping.config_version {
tokio::spawn(self.clone().pull_config(ping.id.clone()));
}
} else {
let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0);
if remaining_attempts == 0 {
eprintln!("Removing node {} after too many failed pings", hex::encode(id));
members.status.remove(id);
has_changes = true;
} else {
if let Some(st) = members.status.get_mut(id) {
st.remaining_ping_attempts = remaining_attempts - 1;
}
}
}
}
if has_changes {
members.recalculate_status_hash();
}
drop(members);
restart_at.await restart_at.await
} }

View file

@ -18,6 +18,7 @@ pub enum Message {
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PingMessage { pub struct PingMessage {
pub id: UUID, pub id: UUID,
pub datacenter: String,
pub rpc_port: u16, pub rpc_port: u16,
pub status_hash: Hash, pub status_hash: Hash,