From 9c931f5edacbaaab746ecf180fac2dd7062d0336 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Apr 2020 23:53:32 +0200 Subject: [PATCH] Keep network status & ring in a tokio::sync::watch advantages - reads don't prevent preparing writes - can be followed from other parts of the system by cloning the receiver --- src/api_server.rs | 12 +-- src/block.rs | 26 +++--- src/block_ref_table.rs | 7 +- src/membership.rs | 185 ++++++++++++++++++++++++----------------- src/rpc_client.rs | 4 +- src/rpc_server.rs | 4 +- src/server.rs | 2 +- src/table.rs | 47 +++++------ src/version_table.rs | 11 ++- 9 files changed, 165 insertions(+), 133 deletions(-) diff --git a/src/api_server.rs b/src/api_server.rs index 53c3e176..52f33969 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -212,9 +212,9 @@ async fn put_block_meta( async fn put_block(garage: Arc, hash: Hash, data: Vec) -> Result<(), Error> { let who = garage .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, garage.system.config.meta_replication_factor); rpc_try_call_many( garage.system.clone(), @@ -359,9 +359,9 @@ async fn handle_get( async fn get_block(garage: Arc, hash: &Hash) -> Result, Error> { let who = garage .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, garage.system.config.meta_replication_factor); let resps = rpc_try_call_many( garage.system.clone(), diff --git a/src/block.rs b/src/block.rs index e78ddd78..e50dab38 100644 --- a/src/block.rs +++ b/src/block.rs @@ -1,15 +1,14 @@ use std::path::PathBuf; +use futures_util::future::*; use tokio::fs; use tokio::prelude::*; use tokio::sync::Mutex; -use futures_util::future::*; +use crate::background::*; use crate::data::*; use crate::error::Error; use crate::proto::*; -use crate::background::*; - pub struct BlockManager { pub data_dir: PathBuf, @@ -19,10 +18,11 @@ pub struct BlockManager { impl BlockManager { pub fn new(db: &sled::Db, data_dir: PathBuf) -> Self { - let rc = db.open_tree("block_local_rc") + let rc = db + .open_tree("block_local_rc") .expect("Unable to open block_local_rc tree"); rc.set_merge_operator(rc_merge); - Self{ + Self { rc, data_dir, lock: Mutex::new(()), @@ -81,18 +81,20 @@ impl BlockManager { background.spawn(tokio::fs::remove_file(path).map_err(Into::into)); Ok(()) } - Some(_) => Ok(()) + Some(_) => Ok(()), } } } fn rc_merge(_key: &[u8], old: Option<&[u8]>, new: &[u8]) -> Option> { - let old = old.map(|x| { - assert!(x.len() == 8); - let mut x8 = [0u8; 8]; - x8.copy_from_slice(x); - u64::from_be_bytes(x8) - }).unwrap_or(0); + let old = old + .map(|x| { + assert!(x.len() == 8); + let mut x8 = [0u8; 8]; + x8.copy_from_slice(x); + u64::from_be_bytes(x8) + }) + .unwrap_or(0); assert!(new.len() == 1); let new = match new[0] { 0 => { diff --git a/src/block_ref_table.rs b/src/block_ref_table.rs index e2310f74..f3a14d81 100644 --- a/src/block_ref_table.rs +++ b/src/block_ref_table.rs @@ -55,8 +55,11 @@ impl TableFormat for BlockRefTable { } } if was_before && !is_after { - if let Err(e) = garage.block_manager.block_decref(&new.block, &garage.background) { - eprintln!("Failed to decref or delete block {:?}: {}", &new.block, e); + if let Err(e) = garage + .block_manager + .block_decref(&new.block, &garage.background) + { + eprintln!("Failed to decref block {:?}: {}", &new.block, e); } } } diff --git a/src/membership.rs b/src/membership.rs index cb8dba99..22c13f64 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -13,7 +13,7 @@ use futures_util::future::*; use sha2::{Digest, Sha256}; use tokio::prelude::*; use tokio::sync::watch; -use tokio::sync::RwLock; +use tokio::sync::Mutex; use crate::background::BackgroundRunner; use crate::data::*; @@ -32,36 +32,44 @@ pub struct System { pub rpc_client: RpcClient, - pub members: RwLock, + pub status: watch::Receiver>, + pub ring: watch::Receiver>, + + update_lock: Mutex<(watch::Sender>, watch::Sender>)>, pub background: Arc, } -pub struct Members { - pub status: HashMap, - pub status_hash: Hash, - - pub config: NetworkConfig, - pub ring: Vec, - pub n_datacenters: usize, +#[derive(Debug, Clone)] +pub struct Status { + pub nodes: HashMap, + pub hash: Hash, } +#[derive(Debug, Clone)] pub struct NodeStatus { pub addr: SocketAddr, pub remaining_ping_attempts: usize, } -#[derive(Debug)] +#[derive(Clone)] +pub struct Ring { + pub config: NetworkConfig, + pub ring: Vec, + pub n_datacenters: usize, +} + +#[derive(Clone, Debug)] pub struct RingEntry { pub location: Hash, pub node: UUID, pub datacenter: u64, } -impl Members { +impl Status { fn handle_ping(&mut self, ip: IpAddr, info: &PingMessage) -> bool { let addr = SocketAddr::new(ip, info.rpc_port); - let old_status = self.status.insert( + let old_status = self.nodes.insert( info.id.clone(), NodeStatus { addr: addr.clone(), @@ -77,8 +85,8 @@ impl Members { } } - fn recalculate_status_hash(&mut self) { - let mut nodes = self.status.iter().collect::>(); + fn recalculate_hash(&mut self) { + let mut nodes = self.nodes.iter().collect::>(); nodes.sort_unstable_by_key(|(id, _status)| *id); let mut hasher = Sha256::new(); @@ -88,11 +96,13 @@ impl Members { hasher.input(format!("{} {}\n", hex::encode(&id), status.addr)); } eprintln!("END --"); - self.status_hash + self.hash .as_slice_mut() .copy_from_slice(&hasher.result()[..]); } +} +impl Ring { fn rebuild_ring(&mut self) { let mut new_ring = vec![]; let mut datacenters = vec![]; @@ -201,20 +211,28 @@ impl System { } } }; - let mut members = Members { - status: HashMap::new(), - status_hash: Hash::default(), + let mut status = Status { + nodes: HashMap::new(), + hash: Hash::default(), + }; + status.recalculate_hash(); + let (update_status, status) = watch::channel(Arc::new(status)); + + let mut ring = Ring { config: net_config, ring: Vec::new(), n_datacenters: 0, }; - members.recalculate_status_hash(); - members.rebuild_ring(); + ring.rebuild_ring(); + let (update_ring, ring) = watch::channel(Arc::new(ring)); + System { config, id, rpc_client: RpcClient::new(), - members: RwLock::new(members), + status, + ring, + update_lock: Mutex::new((update_status, update_ring)), background, } } @@ -223,37 +241,33 @@ impl System { let mut path = self.config.metadata_dir.clone(); path.push("network_config"); - let members = self.members.read().await; - let data = - rmp_to_vec_all_named(&members.config)?; - drop(members); + let ring = self.ring.borrow().clone(); + let data = rmp_to_vec_all_named(&ring.config)?; - let mut f = tokio::fs::File::create(path.as_path()) - .await?; - f.write_all(&data[..]) - .await?; + let mut f = tokio::fs::File::create(path.as_path()).await?; + f.write_all(&data[..]).await?; Ok(()) } - pub async fn make_ping(&self) -> Message { - let members = self.members.read().await; + pub fn make_ping(&self) -> Message { + let status = self.status.borrow().clone(); + let ring = self.ring.borrow().clone(); Message::Ping(PingMessage { id: self.id.clone(), rpc_port: self.config.rpc_port, - status_hash: members.status_hash.clone(), - config_version: members.config.version, + status_hash: status.hash.clone(), + config_version: ring.config.version, }) } pub async fn broadcast(self: Arc, msg: Message, timeout: Duration) { - let members = self.members.read().await; - let to = members - .status + let status = self.status.borrow().clone(); + let to = status + .nodes .keys() .filter(|x| **x != self.id) .cloned() .collect::>(); - drop(members); rpc_call_many(self.clone(), &to[..], &msg, timeout).await; } @@ -273,7 +287,7 @@ impl System { } pub async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { - let ping_msg = self.make_ping().await; + let ping_msg = self.make_ping(); let ping_resps = join_all(peers.iter().map(|(addr, id_option)| { let sys = self.clone(); let ping_msg_ref = &ping_msg; @@ -287,14 +301,16 @@ impl System { })) .await; - let mut members = self.members.write().await; + let update_locked = self.update_lock.lock().await; + let mut status: Status = self.status.borrow().as_ref().clone(); + let ring = self.ring.borrow().clone(); let mut has_changes = false; let mut to_advertise = vec![]; for (id_option, addr, ping_resp) in ping_resps { if let Ok(Message::Ping(info)) = ping_resp { - let is_new = members.handle_ping(addr.ip(), &info); + let is_new = status.handle_ping(addr.ip(), &info); if is_new { has_changes = true; to_advertise.push(AdvertisedNode { @@ -302,17 +318,17 @@ impl System { addr: addr.clone(), }); } - if is_new || members.status_hash != info.status_hash { + if is_new || status.hash != info.status_hash { self.background .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok)); } - if is_new || members.config.version < info.config_version { + if is_new || ring.config.version < info.config_version { self.background .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok)); } } else if let Some(id) = id_option { - let remaining_attempts = members - .status + let remaining_attempts = status + .nodes .get(id) .map(|x| x.remaining_ping_attempts) .unwrap_or(0); @@ -321,19 +337,22 @@ impl System { "Removing node {} after too many failed pings", hex::encode(&id) ); - members.status.remove(&id); + status.nodes.remove(&id); has_changes = true; } else { - if let Some(st) = members.status.get_mut(id) { + if let Some(st) = status.nodes.get_mut(id) { st.remaining_ping_attempts = remaining_attempts - 1; } } } } if has_changes { - members.recalculate_status_hash(); + status.recalculate_hash(); } - drop(members); + if let Err(e) = update_locked.0.broadcast(Arc::new(status)) { + eprintln!("In ping_nodes: could not save status update ({})", e); + } + drop(update_locked); if to_advertise.len() > 0 { self.broadcast(Message::AdvertiseNodesUp(to_advertise), PING_TIMEOUT) @@ -346,29 +365,35 @@ impl System { from: &SocketAddr, ping: &PingMessage, ) -> Result { - let mut members = self.members.write().await; - let is_new = members.handle_ping(from.ip(), ping); + let update_locked = self.update_lock.lock().await; + let mut status: Status = self.status.borrow().as_ref().clone(); + + let is_new = status.handle_ping(from.ip(), ping); if is_new { - members.recalculate_status_hash(); + status.recalculate_hash(); } - let status_hash = members.status_hash.clone(); - let config_version = members.config.version; - drop(members); + let status_hash = status.hash.clone(); + let config_version = self.ring.borrow().config.version; + + update_locked.0.broadcast(Arc::new(status))?; + drop(update_locked); if is_new || status_hash != ping.status_hash { - self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); + self.background + .spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); } if is_new || config_version < ping.config_version { - self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); + self.background + .spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); } - Ok(self.make_ping().await) + Ok(self.make_ping()) } - pub async fn handle_pull_status(&self) -> Result { - let members = self.members.read().await; + pub fn handle_pull_status(&self) -> Result { + let status = self.status.borrow().clone(); let mut mem = vec![]; - for (node, status) in members.status.iter() { + for (node, status) in status.nodes.iter() { mem.push(AdvertisedNode { id: node.clone(), addr: status.addr.clone(), @@ -377,9 +402,9 @@ impl System { Ok(Message::AdvertiseNodesUp(mem)) } - pub async fn handle_pull_config(&self) -> Result { - let members = self.members.read().await; - Ok(Message::AdvertiseConfig(members.config.clone())) + pub fn handle_pull_config(&self) -> Result { + let ring = self.ring.borrow().clone(); + Ok(Message::AdvertiseConfig(ring.config.clone())) } pub async fn handle_advertise_nodes_up( @@ -388,14 +413,15 @@ impl System { ) -> Result { let mut to_ping = vec![]; - let mut members = self.members.write().await; + let update_lock = self.update_lock.lock().await; + let mut status: Status = self.status.borrow().as_ref().clone(); let mut has_changed = false; for node in adv.iter() { if node.id == self.id { // learn our own ip address let self_addr = SocketAddr::new(node.addr.ip(), self.config.rpc_port); - let old_self = members.status.insert( + let old_self = status.nodes.insert( node.id.clone(), NodeStatus { addr: self_addr, @@ -406,17 +432,19 @@ impl System { None => true, Some(x) => x.addr != self_addr, }; - } else if !members.status.contains_key(&node.id) { + } else if !status.nodes.contains_key(&node.id) { to_ping.push((node.addr.clone(), Some(node.id.clone()))); } } if has_changed { - members.recalculate_status_hash(); + status.recalculate_hash(); } - drop(members); + update_lock.0.broadcast(Arc::new(status))?; + drop(update_lock); if to_ping.len() > 0 { - self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); + self.background + .spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); } Ok(Message::Ok) @@ -426,10 +454,14 @@ impl System { self: Arc, adv: &NetworkConfig, ) -> Result { - let mut members = self.members.write().await; - if adv.version > members.config.version { - members.config = adv.clone(); - members.rebuild_ring(); + let mut ring: Ring = self.ring.borrow().as_ref().clone(); + let update_lock = self.update_lock.lock().await; + + if adv.version > ring.config.version { + ring.config = adv.clone(); + ring.rebuild_ring(); + update_lock.1.broadcast(Arc::new(ring))?; + drop(update_lock); self.background.spawn_cancellable( self.clone() @@ -446,14 +478,13 @@ impl System { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); - let members = self.members.read().await; - let ping_addrs = members - .status + let status = self.status.borrow().clone(); + let ping_addrs = status + .nodes .iter() .filter(|(id, _)| **id != self.id) .map(|(id, status)| (status.addr.clone(), Some(id.clone()))) .collect::>(); - drop(members); self.clone().ping_nodes(ping_addrs).await; diff --git a/src/rpc_client.rs b/src/rpc_client.rs index 7587782e..8d8b724b 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -79,8 +79,8 @@ pub async fn rpc_call( timeout: Duration, ) -> Result { let addr = { - let members = sys.members.read().await; - match members.status.get(to) { + let status = sys.status.borrow().clone(); + match status.nodes.get(to) { Some(status) => status.addr.clone(), None => return Err(Error::Message(format!("Peer ID not found"))), } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 08fa909d..ddfc5e04 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -59,8 +59,8 @@ async fn handler( let resp = err_to_msg(match &msg { Message::Ping(ping) => sys.handle_ping(&addr, ping).await, - Message::PullStatus => sys.handle_pull_status().await, - Message::PullConfig => sys.handle_pull_config().await, + Message::PullStatus => sys.handle_pull_status(), + Message::PullConfig => sys.handle_pull_config(), Message::AdvertiseNodesUp(adv) => sys.handle_advertise_nodes_up(adv).await, Message::AdvertiseConfig(adv) => sys.handle_advertise_config(adv).await, diff --git a/src/server.rs b/src/server.rs index 8b2bd0c3..e38e8580 100644 --- a/src/server.rs +++ b/src/server.rs @@ -10,8 +10,8 @@ use tokio::sync::RwLock; use crate::api_server; use crate::background::*; -use crate::data::*; use crate::block::*; +use crate::data::*; use crate::error::Error; use crate::membership::System; use crate::proto::*; diff --git a/src/table.rs b/src/table.rs index 9f5eca33..d0f24119 100644 --- a/src/table.rs +++ b/src/table.rs @@ -3,9 +3,9 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; -use futures::stream::*; use crate::data::*; use crate::error::Error; @@ -150,9 +150,9 @@ impl Table { let hash = e.partition_key().hash(); let who = self .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, self.param.replication_factor); eprintln!("insert who: {:?}", who); @@ -171,9 +171,9 @@ impl Table { let hash = entry.partition_key().hash(); let who = self .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, self.param.replication_factor); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { @@ -184,20 +184,14 @@ impl Table { } } - let call_futures = call_list.drain() - .map(|(node, entries)| async move { - let rpc = TableRPC::::Update(entries); - let rpc_bytes = rmp_to_vec_all_named(&rpc)?; - let message = Message::TableRPC(self.name.to_string(), rpc_bytes); + let call_futures = call_list.drain().map(|(node, entries)| async move { + let rpc = TableRPC::::Update(entries); + let rpc_bytes = rmp_to_vec_all_named(&rpc)?; + let message = Message::TableRPC(self.name.to_string(), rpc_bytes); - let resp = rpc_call( - self.system.clone(), - &node, - &message, - self.param.timeout - ).await?; - Ok::<_, Error>((node, resp)) - }); + let resp = rpc_call(self.system.clone(), &node, &message, self.param.timeout).await?; + Ok::<_, Error>((node, resp)) + }); let mut resps = call_futures.collect::>(); let mut errors = vec![]; @@ -217,9 +211,9 @@ impl Table { let hash = partition_key.hash(); let who = self .system - .members - .read() - .await + .ring + .borrow() + .clone() .walk_ring(&hash, self.param.replication_factor); eprintln!("get who: {:?}", who); @@ -259,12 +253,9 @@ impl Table { async fn repair_on_read(&self, who: &[UUID], what: &F::E) -> Result<(), Error> { let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(what)?)); - self.rpc_try_call_many(&who[..], - &TableRPC::::Update(vec![what_enc]), - who.len(), - ) + self.rpc_try_call_many(&who[..], &TableRPC::::Update(vec![what_enc]), who.len()) .await - .map(|_|()) + .map(|_| ()) } async fn rpc_try_call_many( diff --git a/src/version_table.rs b/src/version_table.rs index d9d2b675..9ea0551e 100644 --- a/src/version_table.rs +++ b/src/version_table.rs @@ -69,14 +69,19 @@ impl TableFormat for VersionTable { // Propagate deletion of version blocks if let Some(old_v) = old { if new.deleted && !old_v.deleted { - let deleted_block_refs = old_v.blocks.iter() - .map(|vb| BlockRef{ + let deleted_block_refs = old_v + .blocks + .iter() + .map(|vb| BlockRef { block: vb.hash.clone(), version: old_v.uuid.clone(), deleted: true, }) .collect::>(); - garage.block_ref_table.insert_many(&deleted_block_refs[..]).await?; + garage + .block_ref_table + .insert_many(&deleted_block_refs[..]) + .await?; } } Ok(())