Add automatic peer discovery from Consul

This commit is contained in:
Alex 2020-06-30 18:33:14 +02:00
parent ade29cf63a
commit fbe8fe81f2
7 changed files with 116 additions and 4 deletions

1
Cargo.lock generated
View file

@ -424,6 +424,7 @@ dependencies = [
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)", "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-rustls 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-rustls 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)",

View file

@ -61,7 +61,11 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
garage garage
.system .system
.clone() .clone()
.bootstrap(&garage.config.bootstrap_peers[..]) .bootstrap(
&garage.config.bootstrap_peers[..],
garage.config.consul_host.clone(),
garage.config.consul_service_name.clone()
)
.map(|rv| { .map(|rv| {
info!("Bootstrap done"); info!("Bootstrap done");
Ok(rv) Ok(rv)

View file

@ -22,6 +22,7 @@ log = "0.4"
rmp-serde = "0.14.3" rmp-serde = "0.14.3"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"

52
src/rpc/consul.rs Normal file
View file

@ -0,0 +1,52 @@
use std::net::{IpAddr, SocketAddr};
use hyper::client::Client;
use hyper::StatusCode;
use hyper::{Body, Method, Request};
use serde::Deserialize;
use garage_util::error::Error;
#[derive(Deserialize, Clone)]
struct ConsulEntry {
#[serde(alias = "Address")]
address: String,
#[serde(alias = "ServicePort")]
service_port: u16,
}
pub async fn get_consul_nodes(
consul_host: &str,
consul_service_name: &str,
) -> Result<Vec<SocketAddr>, Error> {
let url = format!(
"http://{}/v1/catalog/service/{}",
consul_host, consul_service_name
);
let req = Request::builder()
.uri(url)
.method(Method::GET)
.body(Body::default())?;
let client = Client::new();
let resp = client.request(req).await?;
if resp.status() != StatusCode::OK {
return Err(Error::Message(format!("HTTP error {}", resp.status())));
}
let body = hyper::body::to_bytes(resp.into_body()).await?;
let entries = serde_json::from_slice::<Vec<ConsulEntry>>(body.as_ref())?;
let mut ret = vec![];
for ent in entries {
let ip = ent
.address
.parse::<IpAddr>()
.map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?;
ret.push(SocketAddr::new(ip, ent.service_port));
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}

View file

@ -1,6 +1,7 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod consul;
pub mod membership; pub mod membership;
pub mod rpc_client; pub mod rpc_client;
pub mod rpc_server; pub mod rpc_server;

View file

@ -21,10 +21,12 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use crate::consul::get_consul_nodes;
use crate::rpc_client::*; use crate::rpc_client::*;
use crate::rpc_server::*; use crate::rpc_server::*;
const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_INTERVAL: Duration = Duration::from_secs(10);
const CONSUL_INTERVAL: Duration = Duration::from_secs(60);
const PING_TIMEOUT: Duration = Duration::from_secs(2); const PING_TIMEOUT: Duration = Duration::from_secs(2);
const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5;
@ -420,16 +422,34 @@ impl System {
self.rpc_client.call_many(&to[..], msg, timeout).await; self.rpc_client.call_many(&to[..], msg, timeout).await;
} }
pub async fn bootstrap(self: Arc<Self>, peers: &[SocketAddr]) { pub async fn bootstrap(
self: Arc<Self>,
peers: &[SocketAddr],
consul_host: Option<String>,
consul_service_name: Option<String>,
) {
let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>(); let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::<Vec<_>>();
self.clone().ping_nodes(bootstrap_peers).await; self.clone().ping_nodes(bootstrap_peers).await;
let self2 = self.clone();
self.clone() self.clone()
.background .background
.spawn_worker(format!("ping loop"), |stop_signal| { .spawn_worker(format!("ping loop"), |stop_signal| {
self.ping_loop(stop_signal).map(Ok) self2.ping_loop(stop_signal).map(Ok)
}) })
.await; .await;
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone();
self.clone()
.background
.spawn_worker(format!("Consul loop"), |stop_signal| {
self2
.consul_loop(stop_signal, consul_host, consul_service_name)
.map(Ok)
})
.await;
}
} }
async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) { async fn ping_nodes(self: Arc<Self>, peers: Vec<(SocketAddr, Option<UUID>)>) {
@ -639,7 +659,7 @@ impl System {
Ok(Message::Ok) Ok(Message::Ok)
} }
pub async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) { async fn ping_loop(self: Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
loop { loop {
let restart_at = tokio::time::delay_for(PING_INTERVAL); let restart_at = tokio::time::delay_for(PING_INTERVAL);
@ -665,6 +685,37 @@ impl System {
} }
} }
async fn consul_loop(
self: Arc<Self>,
mut stop_signal: watch::Receiver<bool>,
consul_host: String,
consul_service_name: String,
) {
loop {
let restart_at = tokio::time::delay_for(CONSUL_INTERVAL);
match get_consul_nodes(&consul_host, &consul_service_name).await {
Ok(mut node_list) => {
let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::<Vec<_>>();
self.clone().ping_nodes(ping_addrs).await;
}
Err(e) => {
warn!("Could not retrieve node list from Consul: {}", e);
}
}
select! {
_ = restart_at.fuse() => (),
must_exit = stop_signal.recv().fuse() => {
match must_exit {
None | Some(true) => return,
_ => (),
}
}
}
}
}
pub fn pull_status( pub fn pull_status(
self: Arc<Self>, self: Arc<Self>,
peer: UUID, peer: UUID,

View file

@ -14,6 +14,8 @@ pub struct Config {
pub rpc_bind_addr: SocketAddr, pub rpc_bind_addr: SocketAddr,
pub bootstrap_peers: Vec<SocketAddr>, pub bootstrap_peers: Vec<SocketAddr>,
pub consul_host: Option<String>,
pub consul_service_name: Option<String>,
#[serde(default = "default_max_concurrent_rpc_requests")] #[serde(default = "default_max_concurrent_rpc_requests")]
pub max_concurrent_rpc_requests: usize, pub max_concurrent_rpc_requests: usize,