Improve bootstraping: do it regularly; persist peer list

This commit is contained in:
Alex 2021-04-05 19:55:53 +02:00
parent 3a449badb1
commit 9ced9f78dc
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
12 changed files with 226 additions and 80 deletions

View file

@ -37,6 +37,7 @@ steps:
- apt-get update - apt-get update
- apt-get install --yes libsodium-dev - apt-get install --yes libsodium-dev
- pwd - pwd
- cargo fmt -- --check
- cargo build - cargo build
- name: cargo-test - name: cargo-test

View file

@ -2,7 +2,7 @@ BIN=target/release/garage
DOCKER=lxpz/garage_amd64 DOCKER=lxpz/garage_amd64
all: all:
clear; RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --no-default-features clear; cargo build
$(BIN): $(BIN):
RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features RUSTFLAGS="-C link-arg=-fuse-ld=lld -C target-cpu=x86-64 -C target-feature=+sse2" cargo build --release --no-default-features

View file

@ -246,15 +246,13 @@ impl AdminRpcHandler {
))) )))
} }
KeyOperation::Import(query) => { KeyOperation::Import(query) => {
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id) let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
.await?;
if prev_key.is_some() { if prev_key.is_some() {
return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id))); return Err(Error::Message(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
} }
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name); let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name);
self.garage.key_table.insert(&imported_key).await?; self.garage.key_table.insert(&imported_key).await?;
Ok(AdminRPC::KeyInfo(imported_key)) Ok(AdminRPC::KeyInfo(imported_key))
} }
} }
} }

View file

@ -5,8 +5,8 @@ use std::path::PathBuf;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use structopt::StructOpt; use structopt::StructOpt;
use garage_util::error::Error;
use garage_util::data::UUID; use garage_util::data::UUID;
use garage_util::error::Error;
use garage_util::time::*; use garage_util::time::*;
use garage_rpc::membership::*; use garage_rpc::membership::*;
@ -384,7 +384,10 @@ pub async fn cmd_status(
Ok(()) Ok(())
} }
pub fn find_matching_node(cand: impl std::iter::Iterator<Item=UUID>, pattern: &str) -> Result<UUID, Error> { pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = UUID>,
pattern: &str,
) -> Result<UUID, Error> {
let mut candidates = vec![]; let mut candidates = vec![];
for c in cand { for c in cand {
if hex::encode(&c).starts_with(&pattern) { if hex::encode(&c).starts_with(&pattern) {
@ -428,7 +431,10 @@ pub async fn cmd_configure(
for replaced in args.replace.iter() { for replaced in args.replace.iter() {
let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?; let replaced_node = find_matching_node(config.members.keys().cloned(), replaced)?;
if config.members.remove(&replaced_node).is_none() { if config.members.remove(&replaced_node).is_none() {
return Err(Error::Message(format!("Cannot replace node {:?} as it is not in current configuration", replaced_node))); return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not in current configuration",
replaced_node
)));
} }
} }

View file

@ -52,7 +52,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store..."); info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), db, background, &mut rpc_server); let garage = Garage::new(config.clone(), db, background, &mut rpc_server);
let bootstrap = garage.system.clone().bootstrap( let bootstrap = garage.system.clone().bootstrap(
&config.bootstrap_peers[..], config.bootstrap_peers,
config.consul_host, config.consul_host,
config.consul_service_name, config.consul_service_name,
); );

View file

@ -147,7 +147,9 @@ impl Entry<String, String> for Object {
&self.key &self.key
} }
fn is_tombstone(&self) -> bool { fn is_tombstone(&self) -> bool {
self.versions.len() == 1 && self.versions[0].state == ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) self.versions.len() == 1
&& self.versions[0].state
== ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
} }
} }

View file

@ -11,13 +11,13 @@ use futures::future::join_all;
use futures::select; use futures::select;
use futures_util::future::*; use futures_util::future::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::AsyncWriteExt;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
use crate::consul::get_consul_nodes; use crate::consul::get_consul_nodes;
@ -26,7 +26,7 @@ use crate::rpc_client::*;
use crate::rpc_server::*; use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONSUL_INTERVAL: Duration = Duration::from_secs(60); const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@ -69,7 +69,8 @@ pub struct AdvertisedNode {
pub struct System { pub struct System {
pub id: UUID, pub id: UUID,
metadata_dir: PathBuf, persist_config: Persister<NetworkConfig>,
persist_status: Persister<Vec<AdvertisedNode>>,
rpc_local_port: u16, rpc_local_port: u16,
state_info: StateInfo, state_info: StateInfo,
@ -80,11 +81,16 @@ pub struct System {
pub(crate) status: watch::Receiver<Arc<Status>>, pub(crate) status: watch::Receiver<Arc<Status>>,
pub ring: watch::Receiver<Arc<Ring>>, pub ring: watch::Receiver<Arc<Ring>>,
update_lock: Mutex<(watch::Sender<Arc<Status>>, watch::Sender<Arc<Ring>>)>, update_lock: Mutex<Updaters>,
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
} }
struct Updaters {
update_status: watch::Sender<Arc<Status>>,
update_ring: watch::Sender<Arc<Ring>>,
}
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Status { pub struct Status {
pub nodes: HashMap<UUID, Arc<StatusEntry>>, pub nodes: HashMap<UUID, Arc<StatusEntry>>,
@ -144,6 +150,25 @@ impl Status {
debug!("END --"); debug!("END --");
self.hash = blake2sum(nodes_txt.as_bytes()); self.hash = blake2sum(nodes_txt.as_bytes());
} }
fn to_serializable_membership(&self, system: &System) -> Vec<AdvertisedNode> {
let mut mem = vec![];
for (node, status) in self.nodes.iter() {
let state_info = if *node == system.id {
system.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
mem
}
} }
fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> { fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
@ -169,23 +194,6 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result<UUID, Error> {
} }
} }
fn read_network_config(metadata_dir: &PathBuf) -> Result<NetworkConfig, Error> {
let mut path = metadata_dir.clone();
path.push("network_config");
let mut file = std::fs::OpenOptions::new()
.read(true)
.open(path.as_path())?;
let mut net_config_bytes = vec![];
file.read_to_end(&mut net_config_bytes)?;
let net_config = rmp_serde::decode::from_read_ref(&net_config_bytes[..])
.expect("Unable to parse network configuration file (has version format changed?).");
Ok(net_config)
}
impl System { impl System {
pub fn new( pub fn new(
metadata_dir: PathBuf, metadata_dir: PathBuf,
@ -196,7 +204,10 @@ impl System {
let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID"); let id = gen_node_id(&metadata_dir).expect("Unable to read or generate node ID");
info!("Node ID: {}", hex::encode(&id)); info!("Node ID: {}", hex::encode(&id));
let net_config = match read_network_config(&metadata_dir) { let persist_config = Persister::new(&metadata_dir, "network_config");
let persist_status = Persister::new(&metadata_dir, "peer_info");
let net_config = match persist_config.load() {
Ok(x) => x, Ok(x) => x,
Err(e) => { Err(e) => {
info!( info!(
@ -206,6 +217,7 @@ impl System {
NetworkConfig::new() NetworkConfig::new()
} }
}; };
let mut status = Status { let mut status = Status {
nodes: HashMap::new(), nodes: HashMap::new(),
hash: Hash::default(), hash: Hash::default(),
@ -231,14 +243,18 @@ impl System {
let sys = Arc::new(System { let sys = Arc::new(System {
id, id,
metadata_dir, persist_config,
persist_status,
rpc_local_port: rpc_server.bind_addr.port(), rpc_local_port: rpc_server.bind_addr.port(),
state_info, state_info,
rpc_http_client, rpc_http_client,
rpc_client, rpc_client,
status, status,
ring, ring,
update_lock: Mutex::new((update_status, update_ring)), update_lock: Mutex::new(Updaters {
update_status,
update_ring,
}),
background, background,
}); });
sys.clone().register_handler(rpc_server, rpc_path); sys.clone().register_handler(rpc_server, rpc_path);
@ -272,14 +288,11 @@ impl System {
} }
async fn save_network_config(self: Arc<Self>) -> Result<(), Error> { async fn save_network_config(self: Arc<Self>) -> Result<(), Error> {
let mut path = self.metadata_dir.clone();
path.push("network_config");
let ring = self.ring.borrow().clone(); let ring = self.ring.borrow().clone();
let data = rmp_to_vec_all_named(&ring.config)?; self.persist_config
.save_async(&ring.config)
let mut f = tokio::fs::File::create(path.as_path()).await?; .await
f.write_all(&data[..]).await?; .expect("Cannot save current cluster configuration");
Ok(()) Ok(())
} }
@ -308,12 +321,15 @@ impl System {
pub async fn bootstrap( pub async fn bootstrap(
self: Arc<Self>, self: Arc<Self>,
peers: &[SocketAddr], peers: Vec<SocketAddr>,
consul_host: Option<String>, consul_host: Option<String>,
consul_service_name: Option<String>, consul_service_name: Option<String>,
) { ) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>(); let self2 = self.clone();
self.clone().ping_nodes(bootstrap_peers).await; self.background
.spawn_worker(format!("discovery loop"), |stop_signal| {
self2.discovery_loop(peers, stop_signal)
});
let self2 = self.clone(); let self2 = self.clone();
self.background self.background
@ -394,9 +410,7 @@ impl System {
if has_changes { if has_changes {
status.recalculate_hash(); status.recalculate_hash();
} }
if let Err(e) = update_locked.0.send(Arc::new(status)) { self.update_status(&update_locked, status);
error!("In ping_nodes: could not save status update ({})", e);
}
drop(update_locked); drop(update_locked);
if to_advertise.len() > 0 { if to_advertise.len() > 0 {
@ -420,7 +434,7 @@ impl System {
let status_hash = status.hash; let status_hash = status.hash;
let config_version = self.ring.borrow().config.version; let config_version = self.ring.borrow().config.version;
update_locked.0.send(Arc::new(status))?; self.update_status(&update_locked, status);
drop(update_locked); drop(update_locked);
if is_new || status_hash != ping.status_hash { if is_new || status_hash != ping.status_hash {
@ -436,23 +450,9 @@ impl System {
} }
fn handle_pull_status(&self) -> Result<Message, Error> { fn handle_pull_status(&self) -> Result<Message, Error> {
let status = self.status.borrow().clone(); Ok(Message::AdvertiseNodesUp(
let mut mem = vec![]; self.status.borrow().to_serializable_membership(self),
for (node, status) in status.nodes.iter() { ))
let state_info = if *node == self.id {
self.state_info.clone()
} else {
status.state_info.clone()
};
mem.push(AdvertisedNode {
id: *node,
addr: status.addr,
is_up: status.is_up(),
last_seen: status.last_seen,
state_info,
});
}
Ok(Message::AdvertiseNodesUp(mem))
} }
fn handle_pull_config(&self) -> Result<Message, Error> { fn handle_pull_config(&self) -> Result<Message, Error> {
@ -502,7 +502,7 @@ impl System {
if has_changed { if has_changed {
status.recalculate_hash(); status.recalculate_hash();
} }
update_lock.0.send(Arc::new(status))?; self.update_status(&update_lock, status);
drop(update_lock); drop(update_lock);
if to_ping.len() > 0 { if to_ping.len() > 0 {
@ -522,7 +522,7 @@ impl System {
if adv.version > ring.config.version { if adv.version > ring.config.version {
let ring = Ring::new(adv.clone()); let ring = Ring::new(adv.clone());
update_lock.1.send(Arc::new(ring))?; update_lock.update_ring.send(Arc::new(ring))?;
drop(update_lock); drop(update_lock);
self.background.spawn_cancellable( self.background.spawn_cancellable(
@ -530,6 +530,7 @@ impl System {
.broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT)
.map(Ok), .map(Ok),
); );
self.background.spawn(self.clone().save_network_config()); self.background.spawn(self.clone().save_network_config());
} }
@ -537,7 +538,7 @@ impl System {
} }
async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop { while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(PING_INTERVAL); let restart_at = tokio::time::sleep(PING_INTERVAL);
let status = self.status.borrow().clone(); let status = self.status.borrow().clone();
@ -552,12 +553,50 @@ impl System {
select! { select! {
_ = restart_at.fuse() => (), _ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => { _ = stop_signal.changed().fuse() => (),
if *stop_signal.borrow() {
return;
} }
} }
} }
async fn discovery_loop(
self: Arc<Self>,
bootstrap_peers: Vec<SocketAddr>,
mut stop_signal: watch::Receiver<bool>,
) {
while !*stop_signal.borrow() {
let not_configured = self.ring.borrow().config.members.len() == 0;
let no_peers = self.status.borrow().nodes.len() < 3;
let bad_peers = self
.status
.borrow()
.nodes
.iter()
.filter(|(_, v)| !v.is_up())
.count() != self.ring.borrow().config.members.len();
if not_configured || no_peers || bad_peers {
info!("Doing a bootstrap/discovery step (not_configured: {}, no_peers: {}, bad_peers: {})", not_configured, no_peers, bad_peers);
let mut bp2 = bootstrap_peers
.iter()
.map(|ip| (*ip, None))
.collect::<Vec<_>>();
match self.persist_status.load() {
Ok(peers) => {
bp2.extend(peers.iter().map(|x| (x.addr, Some(x.id))));
}
_ => (),
}
self.clone().ping_nodes(bp2).await;
}
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! {
_ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => (),
}
} }
} }
@ -568,8 +607,6 @@ impl System {
consul_service_name: String, consul_service_name: String,
) { ) {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await { match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => { Ok(mut node_list) => {
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>(); let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
@ -580,6 +617,7 @@ impl System {
} }
} }
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! { select! {
_ = restart_at.fuse() => (), _ = restart_at.fuse() => (),
_ = stop_signal.changed().fuse() => (), _ = stop_signal.changed().fuse() => (),
@ -611,4 +649,20 @@ impl System {
let _: Result<_, _> = self.handle_advertise_config(&config).await; let _: Result<_, _> = self.handle_advertise_config(&config).await;
} }
} }
fn update_status(self: &Arc<Self>, updaters: &Updaters, status: Status) {
let status = Arc::new(status);
updaters
.update_status
.send(status.clone())
.expect("Could not update internal membership status");
self.background
.spawn_cancellable(self.clone().persist_status(status));
}
async fn persist_status(self: Arc<Self>, status: Arc<Status>) -> Result<(), Error> {
let serializable_status = status.to_serializable_membership(&self);
self.persist_status.save_async(&serializable_status).await?;
Ok(())
}
} }

View file

@ -35,7 +35,13 @@ where
F: TableSchema, F: TableSchema,
R: TableReplication, R: TableReplication,
{ {
pub fn new(system: Arc<System>, name: String, instance: F, replication: R, db: &sled::Db) -> Arc<Self> { pub fn new(
system: Arc<System>,
name: String,
instance: F,
replication: R,
db: &sled::Db,
) -> Arc<Self> {
let store = db let store = db
.open_tree(&format!("{}:table", name)) .open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");

View file

@ -157,7 +157,12 @@ where
if errs.is_empty() { if errs.is_empty() {
Ok(true) Ok(true)
} else { } else {
Err(Error::Message(errs.into_iter().map(|x| format!("{}", x)).collect::<Vec<_>>().join(", "))) Err(Error::Message(
errs.into_iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", "),
))
} }
} }

View file

@ -200,12 +200,13 @@ where
let subnode = self.read_node_txn(tx, &key_sub)?; let subnode = self.read_node_txn(tx, &key_sub)?;
match subnode { match subnode {
MerkleNode::Empty => { MerkleNode::Empty => {
warn!("({}) Single subnode in tree is empty Merkle node", self.data.name); warn!(
"({}) Single subnode in tree is empty Merkle node",
self.data.name
);
Some(MerkleNode::Empty) Some(MerkleNode::Empty)
} }
MerkleNode::Intermediate(_) => { MerkleNode::Intermediate(_) => Some(MerkleNode::Intermediate(children)),
Some(MerkleNode::Intermediate(children))
}
x @ MerkleNode::Leaf(_, _) => { x @ MerkleNode::Leaf(_, _) => {
tx.remove(key_sub.encode())?; tx.remove(key_sub.encode())?;
Some(x) Some(x)
@ -239,14 +240,24 @@ where
{ {
let exlf_subkey = key.next_key(&exlf_khash); let exlf_subkey = key.next_key(&exlf_khash);
let exlf_sub_hash = self.update_item_rec(tx, &exlf_k[..], &exlf_khash, &exlf_subkey, Some(exlf_vhash))?.unwrap(); let exlf_sub_hash = self
.update_item_rec(
tx,
&exlf_k[..],
&exlf_khash,
&exlf_subkey,
Some(exlf_vhash),
)?
.unwrap();
intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash); intermediate_set_child(&mut int, exlf_subkey.prefix[i], exlf_sub_hash);
assert_eq!(int.len(), 1); assert_eq!(int.len(), 1);
} }
{ {
let key2 = key.next_key(khash); let key2 = key.next_key(khash);
let subhash = self.update_item_rec(tx, k, khash, &key2, new_vhash)?.unwrap(); let subhash = self
.update_item_rec(tx, k, khash, &key2, new_vhash)?
.unwrap();
intermediate_set_child(&mut int, key2.prefix[i], subhash); intermediate_set_child(&mut int, key2.prefix[i], subhash);
if exlf_khash.as_slice()[i] == khash.as_slice()[i] { if exlf_khash.as_slice()[i] == khash.as_slice()[i] {
assert_eq!(int.len(), 1); assert_eq!(int.len(), 1);

View file

@ -5,4 +5,5 @@ pub mod background;
pub mod config; pub mod config;
pub mod data; pub mod data;
pub mod error; pub mod error;
pub mod persister;
pub mod time; pub mod time;

62
src/util/persister.rs Normal file
View file

@ -0,0 +1,62 @@
use std::io::{Read, Write};
use std::path::PathBuf;
use tokio::io::AsyncWriteExt;
use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error;
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> {
path: PathBuf,
_marker: std::marker::PhantomData<T>,
}
impl<T> Persister<T>
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn new(base_dir: &PathBuf, file_name: &str) -> Self {
let mut path = base_dir.clone();
path.push(file_name);
Self {
path,
_marker: Default::default(),
}
}
pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![];
file.read_to_end(&mut bytes)?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value)
}
pub fn save(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&self.path)?;
file.write_all(&bytes[..])?;
Ok(())
}
pub async fn save_async(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?;
let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?;
Ok(())
}
}