From 5e986c443f014d96bd0c27113fcf4571f2e1e881 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Oct 2021 00:29:50 +0200 Subject: [PATCH] Discovery via consul --- Cargo.lock | 2 +- src/rpc/consul.rs | 97 +++++++++++++++++++++++++++++++++++++++++++--- src/rpc/system.rs | 34 +++++++++++++++- src/util/config.rs | 2 +- 4 files changed, 125 insertions(+), 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0e53984a..5fca1c27 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -874,7 +874,7 @@ dependencies = [ [[package]] name = "netapp" version = "0.3.0" -source = "git+https://git.deuxfleurs.fr/lx/netapp#de981aace0e47a1fa65b38212ac21d91e52f7c15" +source = "git+https://git.deuxfleurs.fr/lx/netapp#e9add586a5fd6304473b9138b920e325629346f5" dependencies = [ "arc-swap", "async-trait", diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index fca4f517..82bf99ba 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -4,19 +4,21 @@ use std::net::{IpAddr, SocketAddr}; use hyper::client::Client; use hyper::StatusCode; use hyper::{Body, Method, Request}; -use serde::Deserialize; +use serde::{Deserialize, Serialize}; use netapp::NodeID; use garage_util::error::Error; +// ---- READING FROM CONSUL CATALOG ---- + #[derive(Deserialize, Clone, Debug)] -struct ConsulEntry { - #[serde(alias = "Address")] +struct ConsulQueryEntry { + #[serde(rename = "Address")] address: String, - #[serde(alias = "ServicePort")] + #[serde(rename = "ServicePort")] service_port: u16, - #[serde(alias = "NodeMeta")] + #[serde(rename = "NodeMeta")] node_meta: HashMap, } @@ -41,7 +43,7 @@ pub async fn get_consul_nodes( } let body = hyper::body::to_bytes(resp.into_body()).await?; - let entries = serde_json::from_slice::>(body.as_ref())?; + let entries = serde_json::from_slice::>(body.as_ref())?; let mut ret = vec![]; for ent in entries { @@ -66,3 +68,86 @@ pub async fn get_consul_nodes( Ok(ret) } + +// ---- PUBLISHING TO CONSUL CATALOG ---- + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishEntry { + #[serde(rename = "Node")] + node: String, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "NodeMeta")] + node_meta: HashMap, + #[serde(rename = "Service")] + service: ConsulPublishService, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Service")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, +} + +pub async fn publish_consul_service( + consul_host: &str, + consul_service_name: &str, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, +) -> Result<(), Error> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let advertisment = ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: [ + ("pubkey".to_string(), hex::encode(node_id)), + ("hostname".to_string(), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + service: ConsulPublishService { + service_id: node.clone(), + service_name: consul_service_name.to_string(), + tags: vec!["advertised-by-garage".into(), hostname.into()], + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }; + + let url = format!("http://{}/v1/catalog/register", consul_host); + let req_body = serde_json::to_string(&advertisment)?; + debug!("Request body for consul adv: {}", req_body); + + let req = Request::builder() + .uri(url) + .method(Method::PUT) + .body(Body::from(req_body))?; + + let client = Client::new(); + + let resp = client.request(req).await?; + debug!("Response of advertising to Consul: {:?}", resp); + let resp_code = resp.status(); + debug!( + "{}", + std::str::from_utf8(&hyper::body::to_bytes(resp.into_body()).await?) + .unwrap_or("") + ); + + if resp_code != StatusCode::OK { + return Err(Error::Message(format!("HTTP error {}", resp_code))); + } + + Ok(()) +} diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68edabdf..3a81bc3b 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -28,7 +28,7 @@ use garage_util::error::Error; use garage_util::persister::Persister; use garage_util::time::*; -use crate::consul::get_consul_nodes; +use crate::consul::*; use crate::ring::*; use crate::rpc_helper::*; @@ -80,6 +80,7 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, + rpc_public_addr: Option, bootstrap_peers: Vec<(NodeID, SocketAddr)>, consul_host: Option, consul_service_name: Option, @@ -199,6 +200,7 @@ impl System { system_endpoint, replication_factor, rpc_listen_addr: config.rpc_bind_addr, + rpc_public_addr: config.rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), consul_host: config.consul_host.clone(), consul_service_name: config.consul_service_name.clone(), @@ -224,6 +226,32 @@ impl System { // ---- INTERNALS ---- + async fn advertise_to_consul(self: Arc) -> Result<(), Error> { + let (consul_host, consul_service_name) = + match (&self.consul_host, &self.consul_service_name) { + (Some(ch), Some(csn)) => (ch, csn), + _ => return Ok(()), + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); + return Ok(()); + } + }; + + publish_consul_service( + consul_host, + consul_service_name, + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + .map_err(|e| Error::Message(format!("Error while publishing Consul service: {}", e))) + } + /// Save network configuration to disc async fn save_network_config(self: Arc) -> Result<(), Error> { let ring: Arc = self.ring.borrow().clone(); @@ -375,7 +403,7 @@ impl System { } } - async fn discovery_loop(&self, mut stop_signal: watch::Receiver) { + async fn discovery_loop(self: &Arc, mut stop_signal: watch::Receiver) { let consul_config = match (&self.consul_host, &self.consul_service_name) { (Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())), _ => None, @@ -419,6 +447,8 @@ impl System { } } + self.background.spawn(self.clone().advertise_to_consul()); + let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); select! { _ = restart_at.fuse() => {}, diff --git a/src/util/config.rs b/src/util/config.rs index ed17f13c..f1c5c019 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -41,7 +41,7 @@ pub struct Config { /// Bootstrap peers RPC address #[serde(deserialize_with = "deserialize_vec_addr")] pub bootstrap_peers: Vec<(NodeID, SocketAddr)>, - /// Consule host to connect to to discover more peers + /// Consul host to connect to to discover more peers pub consul_host: Option, /// Consul service name to use pub consul_service_name: Option,