From 0eb5baea1a8e23fd373685f40aa32a84f794ad49 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 19:55:53 +0200 Subject: [PATCH 01/11] Improve bootstraping: do it regularly; persist peer list --- .drone.yml | 1 + Makefile | 2 +- src/garage/admin_rpc.rs | 4 +- src/garage/cli.rs | 12 ++- src/garage/server.rs | 2 +- src/model/object_table.rs | 4 +- src/rpc/membership.rs | 180 +++++++++++++++++++++++++------------- src/table/data.rs | 8 +- src/table/gc.rs | 7 +- src/table/merkle.rs | 23 +++-- src/util/lib.rs | 1 + src/util/persister.rs | 62 +++++++++++++ 12 files changed, 226 insertions(+), 80 deletions(-) create mode 100644 src/util/persister.rs diff --git a/.drone.yml b/.drone.yml index ee16f5d3..89053bfc 100644 --- a/.drone.yml +++ b/.drone.yml @@ -37,6 +37,7 @@ steps: - apt-get update - apt-get install --yes libsodium-dev - pwd + - cargo fmt -- --check - cargo build - name: cargo-test diff --git a/Makefile b/Makefile index 7cd01be2..51b77699 100644 --- a/Makefile +++ b/Makefile @@ -2,7 +2,7 @@ BIN=target/release/garage DOCKER=lxpz/garage_amd64 all: - clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features + clear; cargo build $(BIN): RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index df00fcaf..d04dd7a1 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -246,15 +246,13 @@ impl AdminRpcHandler { ))) } KeyOperation::Import(query) => { - let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id) - .await?; + let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?; if prev_key.is_some() { return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); } let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); self.garage.key_table.insert(&imported_key).await?; Ok(AdminRPC::KeyInfo(imported_key)) - } } } diff --git a/src/garage/cli.rs b/src/garage/cli.rs index 886cf384..eb8275a9 100644 --- a/src/garage/cli.rs +++ b/src/garage/cli.rs @@ -5,8 +5,8 @@ use std::path::PathBuf; use serde::{Deserialize, Serialize}; use structopt::StructOpt; -use garage_util::error::Error; use garage_util::data::UUID; +use garage_util::error::Error; use garage_util::time::*; use garage_rpc::membership::*; @@ -384,7 +384,10 @@ pub async fn cmd_status( Ok(()) } -pub fn find_matching_node(cand: impl std::iter::Iterator, pattern: &str) -> Result { +pub fn find_matching_node( + cand: impl std::iter::Iterator, + pattern: &str, +) -> Result { let mut candidates = vec![]; for c in cand { if hex::encode(&c).starts_with(&pattern) { @@ -428,7 +431,10 @@ pub async fn cmd_configure( for replaced in args.replace.iter() { let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; if config.members.remove(&replaced_node).is_none() { - return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node))); + return Err(Error::Message(format!( + "Cannot replace node {:?} as it is not in current configuration", + replaced_node + ))); } } diff --git a/src/garage/server.rs b/src/garage/server.rs index c45a69b8..feb858e4 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing Garage main data store..."); let garage = Garage::new(config.clone(), db, background, &mut rpc_server); let bootstrap = garage.system.clone().bootstrap( - &config.bootstrap_peers[..], + config.bootstrap_peers, config.consul_host, config.consul_service_name, ); diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 62606df4..34ac798a 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -147,7 +147,9 @@ impl Entry for Object { &self.key } fn is_tombstone(&self) -> bool { - self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) + self.versions.len() == 1 + && self.versions[0].state + == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) } } diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 4e9822fa..adef7c97 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,13 +11,13 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use tokio::io::AsyncWriteExt; use tokio::sync::watch; use tokio::sync::Mutex; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_util::persister::Persister; use garage_util::time::*; use crate::consul::get_consul_nodes; @@ -26,7 +26,7 @@ use crate::rpc_client::*; use crate::rpc_server::*; const PING_INTERVAL: Duration = Duration::from_secs(10); -const CONSUL_INTERVAL: Duration = Duration::from_secs(60); +const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; @@ -69,7 +69,8 @@ pub struct AdvertisedNode { pub struct System { pub id: UUID, - metadata_dir: PathBuf, + persist_config: Persister, + persist_status: Persister>, rpc_local_port: u16, state_info: StateInfo, @@ -80,11 +81,16 @@ pub struct System { pub(crate) status: watch::Receiver>, pub ring: watch::Receiver>, - update_lock: Mutex<(watch::Sender>, watch::Sender>)>, + update_lock: Mutex, pub background: Arc, } +struct Updaters { + update_status: watch::Sender>, + update_ring: watch::Sender>, +} + #[derive(Debug, Clone)] pub struct Status { pub nodes: HashMap>, @@ -144,6 +150,25 @@ impl Status { debug!("END --"); self.hash = blake2sum(nodes_txt.as_bytes()); } + + fn to_serializable_membership(&self, system: &System) -> Vec { + let mut mem = vec![]; + for (node, status) in self.nodes.iter() { + let state_info = if *node == system.id { + system.state_info.clone() + } else { + status.state_info.clone() + }; + mem.push(AdvertisedNode { + id: *node, + addr: status.addr, + is_up: status.is_up(), + last_seen: status.last_seen, + state_info, + }); + } + mem + } } fn gen_node_id(metadata_dir: &PathBuf) -> Result { @@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result { } } -fn read_network_config(metadata_dir: &PathBuf) -> Result { - let mut path = metadata_dir.clone(); - path.push("network_config"); - - let mut file = std::fs::OpenOptions::new() - .read(true) - .open(path.as_path())?; - - let mut net_config_bytes = vec![]; - file.read_to_end(&mut net_config_bytes)?; - - let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..]) - .expect("Unable to parse network configuration file (has version format changed?)."); - - Ok(net_config) -} - impl System { pub fn new( metadata_dir: PathBuf, @@ -196,7 +204,10 @@ impl System { let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID"); info!("Node ID: {}", hex::encode(&id)); - let net_config = match read_network_config(&metadata_dir) { + let persist_config = Persister::new(&metadata_dir, "network_config"); + let persist_status = Persister::new(&metadata_dir, "peer_info"); + + let net_config = match persist_config.load() { Ok(x) => x, Err(e) => { info!( @@ -206,6 +217,7 @@ impl System { NetworkConfig::new() } }; + let mut status = Status { nodes: HashMap::new(), hash: Hash::default(), @@ -231,14 +243,18 @@ impl System { let sys = Arc::new(System { id, - metadata_dir, + persist_config, + persist_status, rpc_local_port: rpc_server.bind_addr.port(), state_info, rpc_http_client, rpc_client, status, ring, - update_lock: Mutex::new((update_status, update_ring)), + update_lock: Mutex::new(Updaters { + update_status, + update_ring, + }), background, }); sys.clone().register_handler(rpc_server, rpc_path); @@ -272,14 +288,11 @@ impl System { } async fn save_network_config(self: Arc) -> Result<(), Error> { - let mut path = self.metadata_dir.clone(); - path.push("network_config"); - let ring = self.ring.borrow().clone(); - let data = rmp_to_vec_all_named(&ring.config)?; - - let mut f = tokio::fs::File::create(path.as_path()).await?; - f.write_all(&data[..]).await?; + self.persist_config + .save_async(&ring.config) + .await + .expect("Cannot save current cluster configuration"); Ok(()) } @@ -308,12 +321,15 @@ impl System { pub async fn bootstrap( self: Arc, - peers: &[SocketAddr], + peers: Vec, consul_host: Option, consul_service_name: Option, ) { - let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::>(); - self.clone().ping_nodes(bootstrap_peers).await; + let self2 = self.clone(); + self.background + .spawn_worker(format!("discovery loop"), |stop_signal| { + self2.discovery_loop(peers, stop_signal) + }); let self2 = self.clone(); self.background @@ -394,9 +410,7 @@ impl System { if has_changes { status.recalculate_hash(); } - if let Err(e) = update_locked.0.send(Arc::new(status)) { - error!("In ping_nodes: could not save status update ({})", e); - } + self.update_status(&update_locked, status); drop(update_locked); if to_advertise.len() > 0 { @@ -420,7 +434,7 @@ impl System { let status_hash = status.hash; let config_version = self.ring.borrow().config.version; - update_locked.0.send(Arc::new(status))?; + self.update_status(&update_locked, status); drop(update_locked); if is_new || status_hash != ping.status_hash { @@ -436,23 +450,9 @@ impl System { } fn handle_pull_status(&self) -> Result { - let status = self.status.borrow().clone(); - let mut mem = vec![]; - for (node, status) in status.nodes.iter() { - let state_info = if *node == self.id { - self.state_info.clone() - } else { - status.state_info.clone() - }; - mem.push(AdvertisedNode { - id: *node, - addr: status.addr, - is_up: status.is_up(), - last_seen: status.last_seen, - state_info, - }); - } - Ok(Message::AdvertiseNodesUp(mem)) + Ok(Message::AdvertiseNodesUp( + self.status.borrow().to_serializable_membership(self), + )) } fn handle_pull_config(&self) -> Result { @@ -502,7 +502,7 @@ impl System { if has_changed { status.recalculate_hash(); } - update_lock.0.send(Arc::new(status))?; + self.update_status(&update_lock, status); drop(update_lock); if to_ping.len() > 0 { @@ -522,7 +522,7 @@ impl System { if adv.version > ring.config.version { let ring = Ring::new(adv.clone()); - update_lock.1.send(Arc::new(ring))?; + update_lock.update_ring.send(Arc::new(ring))?; drop(update_lock); self.background.spawn_cancellable( @@ -530,6 +530,7 @@ impl System { .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .map(Ok), ); + self.background.spawn(self.clone().save_network_config()); } @@ -537,7 +538,7 @@ impl System { } async fn ping_loop(self: Arc, mut stop_signal: watch::Receiver) { - loop { + while !*stop_signal.borrow() { let restart_at = tokio::time::sleep(PING_INTERVAL); let status = self.status.borrow().clone(); @@ -552,11 +553,49 @@ impl System { select! { _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => { - if *stop_signal.borrow() { - return; + _ = stop_signal.changed().fuse() => (), + } + } + } + + async fn discovery_loop( + self: Arc, + bootstrap_peers: Vec, + mut stop_signal: watch::Receiver, + ) { + while !*stop_signal.borrow() { + let not_configured = self.ring.borrow().config.members.len() == 0; + let no_peers = self.status.borrow().nodes.len() < 3; + let bad_peers = self + .status + .borrow() + .nodes + .iter() + .filter(|(_, v)| !v.is_up()) + .count() != self.ring.borrow().config.members.len(); + + if not_configured || no_peers || bad_peers { + info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); + + let mut bp2 = bootstrap_peers + .iter() + .map(|ip| (*ip, None)) + .collect::>(); + + match self.persist_status.load() { + Ok(peers) => { + bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); } + _ => (), } + + self.clone().ping_nodes(bp2).await; + } + + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); + select! { + _ = restart_at.fuse() => (), + _ = stop_signal.changed().fuse() => (), } } } @@ -568,8 +607,6 @@ impl System { consul_service_name: String, ) { while !*stop_signal.borrow() { - let restart_at = tokio::time::sleep(CONSUL_INTERVAL); - match get_consul_nodes(&consul_host, &consul_service_name).await { Ok(mut node_list) => { let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::>(); @@ -580,6 +617,7 @@ impl System { } } + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { _ = restart_at.fuse() => (), _ = stop_signal.changed().fuse() => (), @@ -611,4 +649,20 @@ impl System { let _: Result<_, _> = self.handle_advertise_config(&config).await; } } + + fn update_status(self: &Arc, updaters: &Updaters, status: Status) { + let status = Arc::new(status); + updaters + .update_status + .send(status.clone()) + .expect("Could not update internal membership status"); + self.background + .spawn_cancellable(self.clone().persist_status(status)); + } + + async fn persist_status(self: Arc, status: Arc) -> Result<(), Error> { + let serializable_status = status.to_serializable_membership(&self); + self.persist_status.save_async(&serializable_status).await?; + Ok(()) + } } diff --git a/src/table/data.rs b/src/table/data.rs index e07a21d2..542a8481 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -35,7 +35,13 @@ where F: TableSchema, R: TableReplication, { - pub fn new(system: Arc, name: String, instance: F, replication: R, db: &sled::Db) -> Arc { + pub fn new( + system: Arc, + name: String, + instance: F, + replication: R, + db: &sled::Db, + ) -> Arc { let store = db .open_tree(&format!("{}:table", name)) .expect("Unable to open DB tree"); diff --git a/src/table/gc.rs b/src/table/gc.rs index a37c052f..e52bf599 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -157,7 +157,12 @@ where if errs.is_empty() { Ok(true) } else { - Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::>().join(", "))) + Err(Error::Message( + errs.into_iter() + .map(|x| format!("{}", x)) + .collect::>() + .join(", "), + )) } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 3001786f..39b87aa1 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -200,12 +200,13 @@ where let subnode = self.read_node_txn(tx, &key_sub)?; match subnode { MerkleNode::Empty => { - warn!("({}) Single subnode in tree is empty Merkle node", self.data.name); + warn!( + "({}) Single subnode in tree is empty Merkle node", + self.data.name + ); Some(MerkleNode::Empty) } - MerkleNode::Intermediate(_) => { - Some(MerkleNode::Intermediate(children)) - } + MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)), x @ MerkleNode::Leaf(_, _) => { tx.remove(key_sub.encode())?; Some(x) @@ -239,14 +240,24 @@ where { let exlf_subkey = key.next_key(&exlf_khash); - let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap(); + let exlf_sub_hash = self + .update_item_rec( + tx, + &exlf_k[..], + &exlf_khash, + &exlf_subkey, + Some(exlf_vhash), + )? + .unwrap(); intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash); assert_eq!(int.len(), 1); } { let key2 = key.next_key(khash); - let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap(); + let subhash = self + .update_item_rec(tx, k, khash, &key2, new_vhash)? + .unwrap(); intermediate_set_child(&mut int, key2.prefix[i], subhash); if exlf_khash.as_slice()[i] == khash.as_slice()[i] { assert_eq!(int.len(), 1); diff --git a/src/util/lib.rs b/src/util/lib.rs index e544a872..055e9ab0 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -5,4 +5,5 @@ pub mod background; pub mod config; pub mod data; pub mod error; +pub mod persister; pub mod time; diff --git a/src/util/persister.rs b/src/util/persister.rs new file mode 100644 index 00000000..f4e8cd72 --- /dev/null +++ b/src/util/persister.rs @@ -0,0 +1,62 @@ +use std::io::{Read, Write}; +use std::path::PathBuf; + +use tokio::io::AsyncWriteExt; + +use serde::{Deserialize, Serialize}; + +use crate::data::*; +use crate::error::Error; + +pub struct Persister Deserialize<'de>> { + path: PathBuf, + + _marker: std::marker::PhantomData, +} + +impl Persister +where + T: Serialize + for<'de> Deserialize<'de>, +{ + pub fn new(base_dir: &PathBuf, file_name: &str) -> Self { + let mut path = base_dir.clone(); + path.push(file_name); + Self { + path, + _marker: Default::default(), + } + } + + pub fn load(&self) -> Result { + let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?; + + let mut bytes = vec![]; + file.read_to_end(&mut bytes)?; + + let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + Ok(value) + } + + pub fn save(&self, t: &T) -> Result<(), Error> { + let bytes = rmp_to_vec_all_named(t)?; + + let mut file = std::fs::OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.path)?; + + file.write_all(&bytes[..])?; + + Ok(()) + } + + pub async fn save_async(&self, t: &T) -> Result<(), Error> { + let bytes = rmp_to_vec_all_named(t)?; + + let mut file = tokio::fs::File::create(&self.path).await?; + file.write_all(&bytes[..]).await?; + + Ok(()) + } +} From 22fbb3b892dc1260da547d1a93e8ca796040d678 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 19:58:42 +0200 Subject: [PATCH 02/11] Fix Drone CI signature --- .drone.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.drone.yml b/.drone.yml index 89053bfc..da1ecf48 100644 --- a/.drone.yml +++ b/.drone.yml @@ -115,6 +115,6 @@ steps: --- kind: signature -hmac: 2ecc0db1a006186c6e3b9bcb458ba40b3e8bf88493bfae40fde2fe4136d0db69 +hmac: 14e8b4c527f176e6219ff6bd5f6e34522fca2fad756baa62e72783247bf88d78 ... From 78eeaab5edc5c7e8682c2465e4df7c3fe4e9bd66 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 20:00:48 +0200 Subject: [PATCH 03/11] Install rustfmt --- .drone.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.drone.yml b/.drone.yml index da1ecf48..dd52945b 100644 --- a/.drone.yml +++ b/.drone.yml @@ -36,6 +36,7 @@ steps: commands: - apt-get update - apt-get install --yes libsodium-dev + - rustup component add rustfmt - pwd - cargo fmt -- --check - cargo build @@ -115,6 +116,6 @@ steps: --- kind: signature -hmac: 14e8b4c527f176e6219ff6bd5f6e34522fca2fad756baa62e72783247bf88d78 +hmac: 0fd1c04714fee173821840a1b8383b105339dc2510067cbad8d16937d6dc15f4 ... From 595dc0ed0d6994cbe0f628b41df85f44c576dc64 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 20:26:01 +0200 Subject: [PATCH 04/11] Persist directly and not in background --- src/rpc/membership.rs | 23 +++++++++++------------ 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index adef7c97..330c154f 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -410,7 +410,7 @@ impl System { if has_changes { status.recalculate_hash(); } - self.update_status(&update_locked, status); + self.update_status(&update_locked, status).await; drop(update_locked); if to_advertise.len() > 0 { @@ -434,7 +434,7 @@ impl System { let status_hash = status.hash; let config_version = self.ring.borrow().config.version; - self.update_status(&update_locked, status); + self.update_status(&update_locked, status).await; drop(update_locked); if is_new || status_hash != ping.status_hash { @@ -502,7 +502,7 @@ impl System { if has_changed { status.recalculate_hash(); } - self.update_status(&update_lock, status); + self.update_status(&update_lock, status).await; drop(update_lock); if to_ping.len() > 0 { @@ -650,19 +650,18 @@ impl System { } } - fn update_status(self: &Arc, updaters: &Updaters, status: Status) { + async fn update_status(self: &Arc, updaters: &Updaters, status: Status) { + if status.hash != self.status.borrow().hash { + info!("Persisting new peer list"); + let serializable_status = status.to_serializable_membership(&self); + self.persist_status.save_async(&serializable_status).await + .expect("Unable to persist peer list"); + } + let status = Arc::new(status); updaters .update_status .send(status.clone()) .expect("Could not update internal membership status"); - self.background - .spawn_cancellable(self.clone().persist_status(status)); - } - - async fn persist_status(self: Arc, status: Arc) -> Result<(), Error> { - let serializable_status = status.to_serializable_membership(&self); - self.persist_status.save_async(&serializable_status).await?; - Ok(()) } } From f11bd80d2a0b8ba0c108deab1293daa31109ce5f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 20:33:24 +0200 Subject: [PATCH 05/11] Keep old data --- src/rpc/membership.rs | 24 ++++++++++++++++++++---- src/util/persister.rs | 12 +++++++++++- 2 files changed, 31 insertions(+), 5 deletions(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 330c154f..f8223420 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -582,7 +582,7 @@ impl System { .map(|ip| (*ip, None)) .collect::>(); - match self.persist_status.load() { + match self.persist_status.load_async().await { Ok(peers) => { bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); } @@ -653,9 +653,25 @@ impl System { async fn update_status(self: &Arc, updaters: &Updaters, status: Status) { if status.hash != self.status.borrow().hash { info!("Persisting new peer list"); - let serializable_status = status.to_serializable_membership(&self); - self.persist_status.save_async(&serializable_status).await - .expect("Unable to persist peer list"); + + let mut list = status.to_serializable_membership(&self); + + // Combine with old peer list to make sure no peer is lost + match self.persist_status.load_async().await { + Ok(old_list) => { + for pp in old_list { + if !list.iter().any(|np| pp.id == np.id) { + list.push(pp); + } + } + } + _ => (), + } + + if list.len() > 0 { + self.persist_status.save_async(&list).await + .expect("Unable to persist peer list"); + } } let status = Arc::new(status); diff --git a/src/util/persister.rs b/src/util/persister.rs index f4e8cd72..93b7cdf4 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -1,7 +1,7 @@ use std::io::{Read, Write}; use std::path::PathBuf; -use tokio::io::AsyncWriteExt; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use serde::{Deserialize, Serialize}; @@ -51,6 +51,16 @@ where Ok(()) } + pub async fn load_async(&self) -> Result { + let mut file = tokio::fs::File::open(&self.path).await?; + + let mut bytes = vec![]; + file.read_to_end(&mut bytes).await?; + + let value = rmp_serde::decode::from_read_ref(&bytes[..])?; + Ok(value) + } + pub async fn save_async(&self, t: &T) -> Result<(), Error> { let bytes = rmp_to_vec_all_named(t)?; From fa11cb746af0107b40ae23e8f6de9e089bf89a31 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 20:35:26 +0200 Subject: [PATCH 06/11] Cargo fmt --- src/rpc/membership.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index f8223420..921ecb14 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -669,7 +669,9 @@ impl System { } if list.len() > 0 { - self.persist_status.save_async(&list).await + self.persist_status + .save_async(&list) + .await .expect("Unable to persist peer list"); } } From c5d8dc7d6d32cfd0ff46b2d0ee5df8c532580f86 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 20:42:46 +0200 Subject: [PATCH 07/11] Print stats --- src/rpc/membership.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 921ecb14..c9253bc2 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -652,7 +652,6 @@ impl System { async fn update_status(self: &Arc, updaters: &Updaters, status: Status) { if status.hash != self.status.borrow().hash { - info!("Persisting new peer list"); let mut list = status.to_serializable_membership(&self); @@ -669,6 +668,7 @@ impl System { } if list.len() > 0 { + info!("Persisting new peer list ({} peers)", list.len()); self.persist_status .save_async(&list) .await From 7fd1f9a8693894b66db6cfa78c0407ff1c188e60 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 20:42:57 +0200 Subject: [PATCH 08/11] cargo fmt --- src/rpc/membership.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index c9253bc2..721b7c39 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -652,7 +652,6 @@ impl System { async fn update_status(self: &Arc, updaters: &Updaters, status: Status) { if status.hash != self.status.borrow().hash { - let mut list = status.to_serializable_membership(&self); // Combine with old peer list to make sure no peer is lost From 7b85056942974b76b80df8e7a0267e1350e05c1f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 23:04:08 +0200 Subject: [PATCH 09/11] Merge discovery loop with consul --- src/rpc/membership.rs | 59 ++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 37 deletions(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 721b7c39..bcf98357 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -328,7 +328,7 @@ impl System { let self2 = self.clone(); self.background .spawn_worker(format!("discovery loop"), |stop_signal| { - self2.discovery_loop(peers, stop_signal) + self2.discovery_loop(peers, consul_host, consul_service_name, stop_signal) }); let self2 = self.clone(); @@ -336,14 +336,6 @@ impl System { .spawn_worker(format!("ping loop"), |stop_signal| { self2.ping_loop(stop_signal) }); - - if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { - let self2 = self.clone(); - self.background - .spawn_worker(format!("Consul loop"), |stop_signal| { - self2.consul_loop(stop_signal, consul_host, consul_service_name) - }); - } } async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { @@ -561,8 +553,15 @@ impl System { async fn discovery_loop( self: Arc, bootstrap_peers: Vec, + consul_host: Option, + consul_service_name: Option, mut stop_signal: watch::Receiver, ) { + let consul_config = match (consul_host, consul_service_name) { + (Some(ch), Some(csn)) => Some((ch, csn)), + _ => None, + }; + while !*stop_signal.borrow() { let not_configured = self.ring.borrow().config.members.len() == 0; let no_peers = self.status.borrow().nodes.len() < 3; @@ -571,50 +570,36 @@ impl System { .borrow() .nodes .iter() - .filter(|(_, v)| !v.is_up()) + .filter(|(_, v)| v.is_up()) .count() != self.ring.borrow().config.members.len(); if not_configured || no_peers || bad_peers { info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers); - let mut bp2 = bootstrap_peers + let mut ping_list = bootstrap_peers .iter() .map(|ip| (*ip, None)) .collect::>(); match self.persist_status.load_async().await { Ok(peers) => { - bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); + ping_list.extend(peers.iter().map(|x| (x.addr, Some(x.id)))); } _ => (), } - self.clone().ping_nodes(bp2).await; - } - - let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); - select! { - _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => (), - } - } - } - - async fn consul_loop( - self: Arc, - mut stop_signal: watch::Receiver, - consul_host: String, - consul_service_name: String, - ) { - while !*stop_signal.borrow() { - match get_consul_nodes(&consul_host, &consul_service_name).await { - Ok(mut node_list) => { - let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::>(); - self.clone().ping_nodes(ping_addrs).await; - } - Err(e) => { - warn!("Could not retrieve node list from Consul: {}", e); + if let Some((consul_host, consul_service_name)) = &consul_config { + match get_consul_nodes(consul_host, consul_service_name).await { + Ok(node_list) => { + ping_list.extend(node_list.iter().map(|a| (*a, None))); + } + Err(e) => { + warn!("Could not retrieve node list from Consul: {}", e); + } + } } + + self.clone().ping_nodes(ping_list).await; } let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); From 0f192a96b51a7606c2a993637b11da047ce97884 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 23:21:25 +0200 Subject: [PATCH 10/11] small simplify --- src/rpc/membership.rs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index bcf98357..9fb24ad4 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -522,7 +522,6 @@ impl System { .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .map(Ok), ); - self.background.spawn(self.clone().save_network_config()); } @@ -660,10 +659,9 @@ impl System { } } - let status = Arc::new(status); updaters .update_status - .send(status.clone()) + .send(Arc::new(status)) .expect("Could not update internal membership status"); } } From ab67bd88decc4f27876aaf7a1448070875c90258 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 5 Apr 2021 23:41:50 +0200 Subject: [PATCH 11/11] Try to fix Drone --- .drone.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.drone.yml b/.drone.yml index dd52945b..3a4660b2 100644 --- a/.drone.yml +++ b/.drone.yml @@ -109,6 +109,8 @@ steps: endpoint: https://garage.deuxfleurs.fr region: garage when: + event: + - push branch: - main repo: @@ -116,6 +118,6 @@ steps: --- kind: signature -hmac: 0fd1c04714fee173821840a1b8383b105339dc2510067cbad8d16937d6dc15f4 +hmac: bfe75f47e5eecdd1f6dd8fd3cf1ea359b0215243d06ac767c51a4b4e363e963e ...