diff --git a/src/data.rs b/src/data.rs index e26a38f..68f8124 100644 --- a/src/data.rs +++ b/src/data.rs @@ -1,9 +1,23 @@ +use std::collections::HashMap; use serde::{Serialize, Deserialize}; pub type UUID = [u8; 32]; pub type Hash = [u8; 32]; +// Network management + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkConfig { + pub members: HashMap, + pub version: u64, +} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct NetworkConfigEntry { + pub n_tokens: u32, +} + // Data management #[derive(Debug, Serialize, Deserialize)] diff --git a/src/membership.rs b/src/membership.rs index 2c9cd1c..b025c2b 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -29,33 +29,41 @@ pub struct System { } pub struct Members { - pub present: Vec, pub status: HashMap, pub status_hash: Hash, - pub config: HashMap, - pub config_version: u64, + pub config: NetworkConfig, } impl Members { - fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) { - match self.present.binary_search(&info.id) { - Ok(pos) => {} - Err(pos) => self.present.insert(pos, info.id.clone()), - } + fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { self.status.insert(info.id.clone(), NodeStatus{ addr: SocketAddr::new(ip, info.rpc_port), remaining_ping_attempts: MAX_FAILED_PINGS, - }); + }).is_none() + } + + fn handle_advertise_node(&mut self, id: &UUID, addr: &SocketAddr) -> bool { + if !self.status.contains_key(id) { + self.status.insert(id.clone(), + NodeStatus{ + addr: addr.clone(), + remaining_ping_attempts: MAX_FAILED_PINGS, + }); + true + } else { + false + } } fn recalculate_status_hash(&mut self) { + let mut nodes = self.status.iter().collect::>(); + nodes.sort_by_key(|(id, _status)| *id); + let mut hasher = Sha256::new(); - for node in self.present.iter() { - if let Some(status) = self.status.get(node) { - hasher.input(format!("{} {}\n", hex::encode(node), status.addr)); - } + for (id, status) in nodes { + hasher.input(format!("{} {}\n", hex::encode(id), status.addr)); } self.status_hash.copy_from_slice(&hasher.result()[..]); } @@ -66,9 +74,6 @@ pub struct NodeStatus { pub remaining_ping_attempts: usize, } -pub struct NodeConfig { - pub n_tokens: u32, -} impl System { pub fn new(config: Config, id: UUID) -> Self { @@ -77,26 +82,31 @@ impl System { id, rpc_client: Client::new(), members: RwLock::new(Members{ - present: Vec::new(), status: HashMap::new(), status_hash: [0u8; 32], - config: HashMap::new(), - config_version: 0, + config: NetworkConfig{ + members: HashMap::new(), + version: 0, + }, }), } } pub async fn make_ping(&self) -> Message { + let members = self.members.read().await; Message::Ping(PingMessage{ id: self.id, rpc_port: self.config.rpc_port, - present_hash: self.members.read().await.status_hash.clone(), - config_version: 0, + status_hash: members.status_hash.clone(), + config_version: members.config.version, }) } - pub async fn broadcast(&self) -> Vec { - self.members.read().await.present.clone() + pub async fn broadcast(self: Arc, msg: Message, timeout: Duration) { + let members = self.members.read().await; + let to = members.status.keys().cloned().collect::>(); + drop(members); + rpc_call_many(self.clone(), &to[..], &msg, None, timeout).await; } pub async fn bootstrap(self: Arc) { @@ -115,19 +125,12 @@ impl System { for (addr, ping_resp) in ping_resps { if let Ok(Message::Ping(info)) = ping_resp { members.handle_ping(addr.ip(), &info); - } } members.recalculate_status_hash(); drop(members); - let resps = rpc_call_many_addr(self.clone(), - &self.config.bootstrap_peers, - &ping_msg, - None, - PING_TIMEOUT).await; - - unimplemented!() //TODO + tokio::spawn(self.ping_loop()); } pub async fn handle_ping(self: Arc, @@ -136,17 +139,141 @@ impl System { -> Result { let mut members = self.members.write().await; - members.handle_ping(from.ip(), ping); + let is_new = members.handle_ping(from.ip(), ping); members.recalculate_status_hash(); + let status_hash = members.status_hash.clone(); + let config_version = members.config.version; drop(members); + if is_new || status_hash != ping.status_hash { + tokio::spawn(self.clone().pull_status(ping.id.clone())); + } + if is_new || config_version < ping.config_version { + tokio::spawn(self.clone().pull_config(ping.id.clone())); + } + Ok(self.make_ping().await) } - pub async fn handle_advertise_node(self: Arc, - ping: &AdvertiseNodeMessage) + pub async fn handle_pull_status(&self) -> Result { + let members = self.members.read().await; + let mut mem = vec![]; + for (node, status) in members.status.iter() { + mem.push(AdvertisedNode{ + id: node.clone(), + addr: status.addr.clone(), + }); + } + Ok(Message::AdvertiseNodesUp(mem)) + } + + pub async fn handle_pull_config(&self) -> Result { + let members = self.members.read().await; + Ok(Message::AdvertiseConfig(members.config.clone())) + } + + pub async fn handle_advertise_nodes_up(self: Arc, + adv: &[AdvertisedNode]) -> Result { - unimplemented!() //TODO + let mut propagate = vec![]; + + let mut members = self.members.write().await; + for node in adv.iter() { + let is_new = members.handle_advertise_node(&node.id, &node.addr); + if is_new { + tokio::spawn(self.clone().pull_status(node.id.clone())); + tokio::spawn(self.clone().pull_config(node.id.clone())); + propagate.push(node.clone()); + } + } + + if propagate.len() > 0 { + tokio::spawn(self.clone().broadcast(Message::AdvertiseNodesUp(propagate), PING_TIMEOUT)); + } + + Ok(Message::Ok) + } + + pub async fn handle_advertise_config(self: Arc, + adv: &NetworkConfig) + -> Result + { + let mut members = self.members.write().await; + if adv.version > members.config.version { + members.config = adv.clone(); + tokio::spawn(self.clone().broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)); + } + + Ok(Message::Ok) + } + + pub async fn ping_loop(self: Arc) { + loop { + let restart_at = tokio::time::delay_for(PING_INTERVAL); + + let members = self.members.read().await; + let ping_addrs = members.status.iter() + .map(|(id, status)| (id.clone(), status.addr.clone())) + .collect::>(); + drop(members); + + let ping_msg = self.make_ping().await; + let ping_resps = join_all( + ping_addrs.iter() + .map(|(id, addr)| { + let sys = self.clone(); + let ping_msg_ref = &ping_msg; + async move { + (id, addr.clone(), rpc_call_addr(sys, &addr, ping_msg_ref, PING_TIMEOUT).await) + } + })).await; + + let mut members = self.members.write().await; + for (id, addr, ping_resp) in ping_resps { + if let Ok(Message::Ping(ping)) = ping_resp { + let is_new = members.handle_ping(addr.ip(), &ping); + if is_new || members.status_hash != ping.status_hash { + tokio::spawn(self.clone().pull_status(ping.id.clone())); + } + if is_new || members.config.version < ping.config_version { + tokio::spawn(self.clone().pull_config(ping.id.clone())); + } + } else { + let remaining_attempts = members.status.get(id).map(|x| x.remaining_ping_attempts).unwrap_or(0); + if remaining_attempts == 0 { + eprintln!("Removing node {} after too many failed pings", hex::encode(id)); + members.status.remove(id); + } else { + if let Some(st) = members.status.get_mut(id) { + st.remaining_ping_attempts = remaining_attempts - 1; + } + } + } + } + drop(members); + + restart_at.await + } + } + + pub async fn pull_status(self: Arc, peer: UUID) { + let resp = rpc_call(self.clone(), + &peer, + &Message::PullStatus, + PING_TIMEOUT).await; + if let Ok(Message::AdvertiseNodesUp(nodes)) = resp { + self.handle_advertise_nodes_up(&nodes).await; + } + } + + pub async fn pull_config(self: Arc, peer: UUID) { + let resp = rpc_call(self.clone(), + &peer, + &Message::PullConfig, + PING_TIMEOUT).await; + if let Ok(Message::AdvertiseConfig(config)) = resp { + self.handle_advertise_config(&config).await; + } } } diff --git a/src/proto.rs b/src/proto.rs index 6cb1259..8b60784 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -9,7 +9,10 @@ pub enum Message { Error(String), Ping(PingMessage), - AdvertiseNode(AdvertiseNodeMessage), + PullStatus, + PullConfig, + AdvertiseNodesUp(Vec), + AdvertiseConfig(NetworkConfig), } #[derive(Debug, Serialize, Deserialize)] @@ -17,12 +20,12 @@ pub struct PingMessage { pub id: UUID, pub rpc_port: u16, - pub present_hash: Hash, + pub status_hash: Hash, pub config_version: u64, } -#[derive(Debug, Serialize, Deserialize)] -pub struct AdvertiseNodeMessage { +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct AdvertisedNode { pub id: UUID, pub addr: SocketAddr, } diff --git a/src/rpc.rs b/src/rpc.rs index 5e72d0f..d8bb08a 100644 --- a/src/rpc.rs +++ b/src/rpc.rs @@ -133,7 +133,11 @@ async fn handler(sys: Arc, req: Request, addr: SocketAddr) -> Resu let resp = err_to_msg(match &msg { Message::Ping(ping) => sys.handle_ping(&addr, ping).await, - Message::AdvertiseNode(adv) => sys.handle_advertise_node(adv).await, + Message::PullStatus => sys.handle_pull_status().await, + Message::PullConfig => sys.handle_pull_config().await, + Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, + Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, + _ => Ok(Message::Error(format!("Unexpected message: {:?}", msg))), });