diff --git a/Cargo.lock b/Cargo.lock index c389056f..1b0aad78 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -424,6 +424,7 @@ dependencies = [ "rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)", "rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)", "serde 1.0.106 (registry+https://github.com/rust-lang/crates.io-index)", + "serde_json 1.0.51 (registry+https://github.com/rust-lang/crates.io-index)", "sha2 0.8.1 (registry+https://github.com/rust-lang/crates.io-index)", "tokio 0.2.18 (registry+https://github.com/rust-lang/crates.io-index)", "tokio-rustls 0.13.0 (registry+https://github.com/rust-lang/crates.io-index)", diff --git a/src/garage/server.rs b/src/garage/server.rs index 52d03464..2b618c1a 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -61,7 +61,11 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { garage .system .clone() - .bootstrap(&garage.config.bootstrap_peers[..]) + .bootstrap( + &garage.config.bootstrap_peers[..], + garage.config.consul_host.clone(), + garage.config.consul_service_name.clone() + ) .map(|rv| { info!("Bootstrap done"); Ok(rv) diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index d7a09255..b55e9e90 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -22,6 +22,7 @@ log = "0.4" rmp-serde = "0.14.3" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde_json = "1.0" futures = "0.3" futures-util = "0.3" diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs new file mode 100644 index 00000000..63051a6b --- /dev/null +++ b/src/rpc/consul.rs @@ -0,0 +1,52 @@ +use std::net::{IpAddr, SocketAddr}; + +use hyper::client::Client; +use hyper::StatusCode; +use hyper::{Body, Method, Request}; +use serde::Deserialize; + +use garage_util::error::Error; + +#[derive(Deserialize, Clone)] +struct ConsulEntry { + #[serde(alias = "Address")] + address: String, + #[serde(alias = "ServicePort")] + service_port: u16, +} + +pub async fn get_consul_nodes( + consul_host: &str, + consul_service_name: &str, +) -> Result, Error> { + let url = format!( + "http://{}/v1/catalog/service/{}", + consul_host, consul_service_name + ); + let req = Request::builder() + .uri(url) + .method(Method::GET) + .body(Body::default())?; + + let client = Client::new(); + + let resp = client.request(req).await?; + if resp.status() != StatusCode::OK { + return Err(Error::Message(format!("HTTP error {}", resp.status()))); + } + + let body = hyper::body::to_bytes(resp.into_body()).await?; + let entries = serde_json::from_slice::>(body.as_ref())?; + + let mut ret = vec![]; + for ent in entries { + let ip = ent + .address + .parse::() + .map_err(|e| Error::Message(format!("Could not parse IP address: {}", e)))?; + ret.push(SocketAddr::new(ip, ent.service_port)); + } + debug!("Got nodes from Consul: {:?}", ret); + + Ok(ret) +} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 3fae6c3e..4c5f6e31 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -1,6 +1,7 @@ #[macro_use] extern crate log; +pub mod consul; pub mod membership; pub mod rpc_client; pub mod rpc_server; diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index dcda2c40..d19c1eb7 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -21,10 +21,12 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use crate::consul::get_consul_nodes; use crate::rpc_client::*; use crate::rpc_server::*; const PING_INTERVAL: Duration = Duration::from_secs(10); +const CONSUL_INTERVAL: Duration = Duration::from_secs(60); const PING_TIMEOUT: Duration = Duration::from_secs(2); const MAX_FAILURES_BEFORE_CONSIDERED_DOWN: usize = 5; @@ -420,16 +422,34 @@ impl System { self.rpc_client.call_many(&to[..], msg, timeout).await; } - pub async fn bootstrap(self: Arc, peers: &[SocketAddr]) { + pub async fn bootstrap( + self: Arc, + peers: &[SocketAddr], + consul_host: Option, + consul_service_name: Option, + ) { let bootstrap_peers = peers.iter().map(|ip| (*ip, None)).collect::>(); self.clone().ping_nodes(bootstrap_peers).await; + let self2 = self.clone(); self.clone() .background .spawn_worker(format!("ping loop"), |stop_signal| { - self.ping_loop(stop_signal).map(Ok) + self2.ping_loop(stop_signal).map(Ok) }) .await; + + if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { + let self2 = self.clone(); + self.clone() + .background + .spawn_worker(format!("Consul loop"), |stop_signal| { + self2 + .consul_loop(stop_signal, consul_host, consul_service_name) + .map(Ok) + }) + .await; + } } async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { @@ -639,7 +659,7 @@ impl System { Ok(Message::Ok) } - pub async fn ping_loop(self: Arc, mut stop_signal: watch::Receiver) { + async fn ping_loop(self: Arc, mut stop_signal: watch::Receiver) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); @@ -665,6 +685,37 @@ impl System { } } + async fn consul_loop( + self: Arc, + mut stop_signal: watch::Receiver, + consul_host: String, + consul_service_name: String, + ) { + loop { + let restart_at = tokio::time::delay_for(CONSUL_INTERVAL); + + match get_consul_nodes(&consul_host, &consul_service_name).await { + Ok(mut node_list) => { + let ping_addrs = node_list.drain(..).map(|a| (a, None)).collect::>(); + self.clone().ping_nodes(ping_addrs).await; + } + Err(e) => { + warn!("Could not retrieve node list from Consul: {}", e); + } + } + + select! { + _ = restart_at.fuse() => (), + must_exit = stop_signal.recv().fuse() => { + match must_exit { + None | Some(true) => return, + _ => (), + } + } + } + } + } + pub fn pull_status( self: Arc, peer: UUID, diff --git a/src/util/config.rs b/src/util/config.rs index 5c01712b..b985114d 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -14,6 +14,8 @@ pub struct Config { pub rpc_bind_addr: SocketAddr, pub bootstrap_peers: Vec, + pub consul_host: Option, + pub consul_service_name: Option, #[serde(default = "default_max_concurrent_rpc_requests")] pub max_concurrent_rpc_requests: usize,