From 02ba9016ab6eca5bb4964549de002573d5a3a07a Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Fri, 5 May 2023 16:18:24 -0600 Subject: [PATCH 01/10] register consul services against local agent instead of catalog api --- doc/book/development/devenv.md | 2 +- src/garage/Cargo.toml | 1 + src/garage/main.rs | 2 + src/rpc/Cargo.toml | 1 + src/rpc/consul_services.rs | 166 +++++++++++++++++++++++++++++++++ src/rpc/lib.rs | 2 + src/rpc/system.rs | 69 +++++++++++++- src/util/config.rs | 25 ++++- 8 files changed, 262 insertions(+), 6 deletions(-) create mode 100644 src/rpc/consul_services.rs diff --git a/doc/book/development/devenv.md b/doc/book/development/devenv.md index 8d7d2e95..dd3bdec0 100644 --- a/doc/book/development/devenv.md +++ b/doc/book/development/devenv.md @@ -25,7 +25,7 @@ git clone https://git.deuxfleurs.fr/Deuxfleurs/garage cd garage ``` -*Optionnaly, you can use our nix.conf file to speed up compilations:* +*Optionally, you can use our nix.conf file to speed up compilations:* ```bash sudo mkdir -p /etc/nix diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 52d0ea79..aaf028a8 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -88,6 +88,7 @@ sqlite = [ "garage_model/sqlite" ] # Automatic registration and discovery via Consul API consul-discovery = [ "garage_rpc/consul-discovery" ] +consul-service-discovery = [ "garage_rpc/consul-service-discovery" ] # Automatic registration and discovery via Kubernetes API kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] # Prometheus exporter (/metrics endpoint). diff --git a/src/garage/main.rs b/src/garage/main.rs index e8aee892..103c027d 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -95,6 +95,8 @@ async fn main() { "sqlite", #[cfg(feature = "consul-discovery")] "consul-discovery", + #[cfg(feature = "consul-service-discovery")] + "consul-service-discovery", #[cfg(feature = "kubernetes-discovery")] "kubernetes-discovery", #[cfg(feature = "metrics")] diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index f0fde7a7..d168e246 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -50,4 +50,5 @@ netapp = { version = "0.5.2", features = ["telemetry"] } [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] consul-discovery = [ "reqwest", "err-derive" ] +consul-service-discovery = [ "reqwest", "err-derive" ] system-libs = [ "sodiumoxide/use-pkg-config" ] diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs new file mode 100644 index 00000000..461bb195 --- /dev/null +++ b/src/rpc/consul_services.rs @@ -0,0 +1,166 @@ +use std::collections::HashMap; +use std::fs::File; +use std::io::Read; +use std::net::{IpAddr, SocketAddr}; + +use err_derive::Error; +use serde::{Deserialize, Serialize}; + +use netapp::NodeID; + +use garage_util::config::ConsulServiceConfig; + +const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; + +#[derive(Deserialize, Clone, Debug)] +struct ConsulQueryEntry { + #[serde(rename = "Address")] + address: String, + #[serde(rename = "ServicePort")] + service_port: u16, + #[serde(rename = "ServiceMeta")] + service_meta: HashMap, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Name")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, + #[serde(rename = "Meta")] + meta: HashMap, +} + +// ---- + +pub struct ConsulServiceDiscovery { + config: ConsulServiceConfig, + client: reqwest::Client, +} + +impl ConsulServiceDiscovery { + pub fn new(config: ConsulServiceConfig) -> Result { + let mut builder: reqwest::ClientBuilder = match &config.ca_cert { + Some(client_ca) => { + let mut ca_cert_buf = vec![]; + File::open(client_ca)?.read_to_end(&mut ca_cert_buf)?; + + let req: reqwest::ClientBuilder = reqwest::Client::builder() + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) + .use_rustls_tls(); + + if config.tls_skip_verify { + req.danger_accept_invalid_certs(true) + } else { + req + } + } + None => reqwest::Client::builder(), + }; + + if let Some(token) = &config.consul_http_token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert("x-consul-token", reqwest::header::HeaderValue::from_str(&token)?); + builder = builder.default_headers(headers); + } + + let client = builder.build()?; + + Ok(Self { client, config }) + } + + // ---- READING FROM CONSUL CATALOG ---- + + pub async fn get_consul_services(&self) -> Result, ConsulError> { + let url = format!( + "{}/v1/catalog/service/{}", + self.config.consul_http_addr, self.config.service_name + ); + + let req = self.client.get(&url); + let http = req.send().await?; + let entries: Vec = http.json().await?; + + let mut ret = vec![]; + for ent in entries { + let ip = ent.address.parse::().ok(); + let pubkey = ent + .service_meta + .get(&format!("{}-pubkey", META_PREFIX)) + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); + if let (Some(ip), Some(pubkey)) = (ip, pubkey) { + ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); + } else { + warn!( + "Could not process node spec from Consul: {:?} (invalid IP or public key)", + ent + ); + } + } + debug!("Got nodes from Consul: {:?}", ret); + + Ok(ret) + } + + // ---- PUBLISHING TO CONSUL CATALOG ---- + + pub async fn publish_consul_service( + &self, + node_id: NodeID, + hostname: &str, + rpc_public_addr: SocketAddr, + ) -> Result<(), ConsulError> { + let node = format!("garage:{}", hex::encode(&node_id[..8])); + + let tags = [ + vec!["advertised-by-garage".into(), hostname.into()], + self.config.tags.clone(), + ] + .concat(); + + let advertisement: ConsulPublishService = ConsulPublishService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + meta: [ + (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), + (format!("{}-hostname", META_PREFIX), hostname.to_string()), + ] + .iter() + .cloned() + .collect(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }; + + let url = format!( + "{}/v1/agent/service/register?replace-existing-checks", + self.config.consul_http_addr + ); + + let req = self.client.put(&url); + let http = req.json(&advertisement).send().await?; + http.error_for_status()?; + + Ok(()) + } +} + +/// Regroup all Consul discovery errors +#[derive(Debug, Error)] +pub enum ConsulError { + #[error(display = "IO error: {}", _0)] + Io(#[error(source)] std::io::Error), + #[error(display = "HTTP error: {}", _0)] + Reqwest(#[error(source)] reqwest::Error), + #[error(display = "Invalid HTTP header error: {}", _0)] + HeaderValue(#[error(source)] reqwest::header::InvalidHeaderValue), +} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 5aec92c0..2c5ed107 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -8,6 +8,8 @@ mod system_metrics; #[cfg(feature = "consul-discovery")] mod consul; +#[cfg(feature = "consul-service-discovery")] +mod consul_services; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index b42e49fc..b0fef6f2 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -32,6 +32,8 @@ use garage_util::time::*; #[cfg(feature = "consul-discovery")] use crate::consul::ConsulDiscovery; +#[cfg(feature = "consul-service-discovery")] +use crate::consul_services::ConsulServiceDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -98,12 +100,14 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, - #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] + #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] rpc_public_addr: Option, bootstrap_peers: Vec, #[cfg(feature = "consul-discovery")] consul_discovery: Option, + #[cfg(feature = "consul-service-discovery")] + consul_service_discovery: Option, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, @@ -346,6 +350,19 @@ impl System { warn!("Consul discovery is not enabled in this build."); } + #[cfg(feature = "consul-service-discovery")] + let consul_service_discovery = match &config.consul_service_discovery { + Some(cfg) => Some( + ConsulServiceDiscovery::new(cfg.clone()) + .ok_or_message("Invalid Consul service discovery configuration")?, + ), + None => None, + }; + #[cfg(not(feature = "consul-service-discovery"))] + if config.consul_service_discovery.is_some() { + warn!("Consul service discovery is not enabled in this build."); + } + #[cfg(not(feature = "kubernetes-discovery"))] if config.kubernetes_discovery.is_some() { warn!("Kubernetes discovery is not enabled in this build."); @@ -369,11 +386,13 @@ impl System { replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] + #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] consul_discovery, + #[cfg(feature = "consul-service-discovery")] + consul_service_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, @@ -555,6 +574,34 @@ impl System { } } + #[cfg(feature = "consul-service-discovery")] + async fn advertise_to_consul(self: Arc) { + let c = match &self.consul_service_discovery { + Some(c) => c, + _ => return, + }; + + let rpc_public_addr = match self.rpc_public_addr { + Some(addr) => addr, + None => { + warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); + return; + } + }; + + if let Err(e) = c + .publish_consul_service( + self.netapp.id, + &self.local_status.load_full().hostname, + rpc_public_addr, + ) + .await + { + error!("Error while publishing Consul service: {}", e); + } + } + + #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc) { let k = match &self.kubernetes_discovery { @@ -744,7 +791,7 @@ impl System { ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } - // Fetch peer list from Consul + // Fetch peer list from Consul Nodes #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { match c.get_consul_nodes().await { @@ -757,6 +804,19 @@ impl System { } } + // Fetch peer list from Consul Services + #[cfg(feature = "consul-service-discovery")] + if let Some(c) = &self.consul_service_discovery { + match c.get_consul_services().await { + Ok(node_list) => { + ping_list.extend(node_list); + } + Err(e) => { + warn!("Could not retrieve service list from Consul: {}", e); + } + } + } + // Fetch peer list from Kubernetes #[cfg(feature = "kubernetes-discovery")] if let Some(k) = &self.kubernetes_discovery { @@ -796,6 +856,9 @@ impl System { #[cfg(feature = "consul-discovery")] tokio::spawn(self.clone().advertise_to_consul()); + #[cfg(feature = "consul-service-discovery")] + tokio::spawn(self.clone().advertise_to_consul()); + #[cfg(feature = "kubernetes-discovery")] tokio::spawn(self.clone().advertise_to_kubernetes()); diff --git a/src/util/config.rs b/src/util/config.rs index 95835bbb..4b32f8ba 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -56,6 +56,9 @@ pub struct Config { /// Configuration for automatic node discovery through Consul #[serde(default)] pub consul_discovery: Option, + /// Configuration for automatic node discovery through Consul + #[serde(default)] + pub consul_service_discovery: Option, /// Configuration for automatic node discovery through Kubernetes #[serde(default)] pub kubernetes_discovery: Option, @@ -152,6 +155,24 @@ pub struct ConsulDiscoveryConfig { pub tls_skip_verify: bool, } +#[derive(Deserialize, Debug, Clone)] +pub struct ConsulServiceConfig { + /// Consul http or https address to connect to to discover more peers + pub consul_http_addr: String, + /// Token to use for connecting to consul + pub consul_http_token: Option, + /// Consul service name to use + pub service_name: String, + /// CA TLS certificate to use when connecting to Consul + pub ca_cert: Option, + // Additional tags to add to the service + #[serde(default)] + pub tags: Vec, + /// Skip TLS hostname verification + #[serde(default)] + pub tls_skip_verify: bool, +} + #[derive(Deserialize, Debug, Clone)] pub struct KubernetesDiscoveryConfig { /// Kubernetes namespace the service discovery resources are be created in @@ -344,7 +365,7 @@ mod tests { replication_mode = "3" rpc_bind_addr = "[::]:3901" rpc_secret_file = "{}" - + [s3_api] s3_region = "garage" api_bind_addr = "[::]:3900" @@ -388,7 +409,7 @@ mod tests { rpc_bind_addr = "[::]:3901" rpc_secret= "dummy" rpc_secret_file = "dummy" - + [s3_api] s3_region = "garage" api_bind_addr = "[::]:3900" From 4d6e6fc155bb263a04f7f6dfbb77933f5d2d0b2e Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 8 May 2023 13:10:23 -0600 Subject: [PATCH 02/10] cargo fmt --- src/rpc/consul_services.rs | 5 ++++- src/rpc/system.rs | 13 ++++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs index 461bb195..928c7691 100644 --- a/src/rpc/consul_services.rs +++ b/src/rpc/consul_services.rs @@ -67,7 +67,10 @@ impl ConsulServiceDiscovery { if let Some(token) = &config.consul_http_token { let mut headers = reqwest::header::HeaderMap::new(); - headers.insert("x-consul-token", reqwest::header::HeaderValue::from_str(&token)?); + headers.insert( + "x-consul-token", + reqwest::header::HeaderValue::from_str(&token)?, + ); builder = builder.default_headers(headers); } diff --git a/src/rpc/system.rs b/src/rpc/system.rs index b0fef6f2..287cd666 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -100,7 +100,11 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, - #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] + #[cfg(any( + feature = "consul-discovery", + feature = "consul-service-discovery", + feature = "kubernetes-discovery" + ))] rpc_public_addr: Option, bootstrap_peers: Vec, @@ -386,7 +390,11 @@ impl System { replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - #[cfg(any(feature = "consul-discovery", feature = "consul-service-discovery", feature = "kubernetes-discovery"))] + #[cfg(any( + feature = "consul-discovery", + feature = "consul-service-discovery", + feature = "kubernetes-discovery" + ))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] @@ -601,7 +609,6 @@ impl System { } } - #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc) { let k = match &self.kubernetes_discovery { From bd6485565e78c0bbb9ee830c4e5b114c6248dc97 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 8 May 2023 19:29:47 -0600 Subject: [PATCH 03/10] allow additional ServiceMeta, docs --- doc/book/reference-manual/configuration.md | 15 +++++++++++++-- src/rpc/consul_services.rs | 19 ++++++++++++------- src/util/config.rs | 3 +++ 3 files changed, 28 insertions(+), 9 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 38062bab..348a352a 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -42,6 +42,17 @@ client_cert = "/etc/consul/consul-client.crt" client_key = "/etc/consul/consul-key.crt" tls_skip_verify = false +[consul_service_discovery] +consul_http_addr = "https://127.0.0.1:8501" +consul_http_token = "abcdef-01234-56789" +service_name = "garage" +ca_cert = "/etc/consul/consul-ca.crt" +tls_skip_verify = false +# tags to add to the published service +tags = [ "dns-enabled" ] +# additional service meta to send along registration +meta = { dns-acl = "allow trusted" } + [kubernetes_discovery] namespace = "garage" service_name = "garage-daemon" @@ -201,7 +212,7 @@ Garage supports the following replication modes: that should probably never be used. Note that in modes `2` and `3`, -if at least the same number of zones are available, an arbitrary number of failures in +if at least the same number of zones are available, an arbitrary number of failures in any given zone is tolerated as copies of data will be spread over several zones. **Make sure `replication_mode` is the same in the configuration files of all nodes. @@ -245,7 +256,7 @@ Values between `1` (faster compression) and `19` (smaller file) are standard com levels for zstd. From `20` to `22`, compression levels are referred as "ultra" and must be used with extra care as it will use lot of memory. A value of `0` will let zstd choose a default value (currently `3`). Finally, zstd has also compression designed to be faster -than default compression levels, they range from `-1` (smaller file) to `-99` (faster +than default compression levels, they range from `-1` (smaller file) to `-99` (faster compression). If you do not specify a `compression_level` entry, Garage will set it to `1` for you. With diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs index 928c7691..aaf3c4a1 100644 --- a/src/rpc/consul_services.rs +++ b/src/rpc/consul_services.rs @@ -129,17 +129,22 @@ impl ConsulServiceDiscovery { ] .concat(); + let mut meta = HashMap::from([ + (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), + (format!("{}-hostname", META_PREFIX), hostname.to_string()), + ]); + + if let Some(global_meta) = &self.config.meta { + for (key, value) in global_meta.into_iter() { + meta.insert(key.clone(), value.clone()); + } + } + let advertisement: ConsulPublishService = ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, - meta: [ - (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), - (format!("{}-hostname", META_PREFIX), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), + meta, address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }; diff --git a/src/util/config.rs b/src/util/config.rs index 4b32f8ba..84a8e34f 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -168,6 +168,9 @@ pub struct ConsulServiceConfig { // Additional tags to add to the service #[serde(default)] pub tags: Vec, + // Additional service metadata to add + #[serde(default)] + pub meta: Option>, /// Skip TLS hostname verification #[serde(default)] pub tls_skip_verify: bool, From fd7dbea5b86ed8757e76e1114e2154538c5a3c16 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Wed, 10 May 2023 13:20:39 -0600 Subject: [PATCH 04/10] follow feedback, fold into existing feature --- doc/book/reference-manual/configuration.md | 45 +++-- src/garage/Cargo.toml | 1 - src/garage/main.rs | 2 - src/rpc/consul.rs | 195 +++++++++++++++------ src/rpc/consul_services.rs | 174 ------------------ src/rpc/lib.rs | 2 - src/rpc/system.rs | 76 +------- src/util/config.rs | 39 ++--- 8 files changed, 194 insertions(+), 340 deletions(-) delete mode 100644 src/rpc/consul_services.rs diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 348a352a..819a5b88 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -35,24 +35,19 @@ bootstrap_peers = [ [consul_discovery] +mode = "node" consul_http_addr = "http://127.0.0.1:8500" service_name = "garage-daemon" ca_cert = "/etc/consul/consul-ca.crt" client_cert = "/etc/consul/consul-client.crt" client_key = "/etc/consul/consul-key.crt" +# for `service` mode, unset client_cert and client_key, and optionally enable `consul_http_token` +# consul_http_token = "abcdef-01234-56789" tls_skip_verify = false - -[consul_service_discovery] -consul_http_addr = "https://127.0.0.1:8501" -consul_http_token = "abcdef-01234-56789" -service_name = "garage" -ca_cert = "/etc/consul/consul-ca.crt" -tls_skip_verify = false -# tags to add to the published service tags = [ "dns-enabled" ] -# additional service meta to send along registration meta = { dns-acl = "allow trusted" } + [kubernetes_discovery] namespace = "garage" service_name = "garage-daemon" @@ -323,6 +318,12 @@ Garage supports discovering other nodes of the cluster using Consul. For this to work correctly, nodes need to know their IP address by which they can be reached by other nodes of the cluster, which should be set in `rpc_public_addr`. +### `mode` + +Two modes of service discovery are supported: `node` and `service`. `node`, the default will register a service using +the `/v1/catalog` endpoints and mTLS (if `client_cert` and `client_key` are provided). `service` mode uses the +`v1/agent` endpoints instead, where an optional `consul_http_token` may be provided. + ### `consul_http_addr` and `service_name` The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server. @@ -334,7 +335,8 @@ RPC ports are announced. ### `client_cert`, `client_key` -TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so. +`node` mode only. TLS client certificate and client key to use when communicating with Consul over TLS. +Both are mandatory when doing so. ### `ca_cert` @@ -345,6 +347,29 @@ TLS CA certificate to use when communicating with Consul over TLS. Skip server hostname verification in TLS handshake. `ca_cert` is ignored when this is set. +### `consul_http_token` + +`service` mode only. Uses the provided token for communication with Consul. The policy assigned to this token +should at least have these rules: + +```hcl +// the `service_name` specified above +service "garage" { + policy = "write" +} + +service_prefix "" { + policy = "read" +} + +node_prefix "" { + policy = "read" +} +``` + +### `tags` and `meta` + +Additional list of tags and map of service meta to add during service registration. ## The `[kubernetes_discovery]` section diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index aaf028a8..52d0ea79 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -88,7 +88,6 @@ sqlite = [ "garage_model/sqlite" ] # Automatic registration and discovery via Consul API consul-discovery = [ "garage_rpc/consul-discovery" ] -consul-service-discovery = [ "garage_rpc/consul-service-discovery" ] # Automatic registration and discovery via Kubernetes API kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] # Prometheus exporter (/metrics endpoint). diff --git a/src/garage/main.rs b/src/garage/main.rs index 103c027d..e8aee892 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -95,8 +95,6 @@ async fn main() { "sqlite", #[cfg(feature = "consul-discovery")] "consul-discovery", - #[cfg(feature = "consul-service-discovery")] - "consul-service-discovery", #[cfg(feature = "kubernetes-discovery")] "kubernetes-discovery", #[cfg(feature = "metrics")] diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index f85f789c..cd29893f 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; use garage_util::config::ConsulDiscoveryConfig; +use garage_util::config::ConsulDiscoveryMode; + +const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; #[derive(Deserialize, Clone, Debug)] struct ConsulQueryEntry { @@ -18,6 +21,8 @@ struct ConsulQueryEntry { service_port: u16, #[serde(rename = "NodeMeta")] node_meta: HashMap, + #[serde(rename = "ServiceMeta")] + service_meta: HashMap, } #[derive(Serialize, Clone, Debug)] @@ -29,14 +34,30 @@ struct ConsulPublishEntry { #[serde(rename = "NodeMeta")] node_meta: HashMap, #[serde(rename = "Service")] - service: ConsulPublishService, + service: ConsulPublishCatalogService, +} + +#[derive(Serialize, Clone, Debug)] +struct ConsulPublishCatalogService { + #[serde(rename = "ID")] + service_id: String, + #[serde(rename = "Service")] + service_name: String, + #[serde(rename = "Tags")] + tags: Vec, + #[serde(rename = "Meta")] + service_meta: HashMap, + #[serde(rename = "Address")] + address: IpAddr, + #[serde(rename = "Port")] + port: u16, } #[derive(Serialize, Clone, Debug)] struct ConsulPublishService { #[serde(rename = "ID")] service_id: String, - #[serde(rename = "Service")] + #[serde(rename = "Name")] service_name: String, #[serde(rename = "Tags")] tags: Vec, @@ -44,10 +65,11 @@ struct ConsulPublishService { address: IpAddr, #[serde(rename = "Port")] port: u16, + #[serde(rename = "Meta")] + meta: HashMap, } // ---- - pub struct ConsulDiscovery { config: ConsulDiscoveryConfig, client: reqwest::Client, @@ -55,42 +77,60 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result { - let client = match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); + builder = builder.danger_accept_invalid_certs(config.tls_skip_verify); - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; - - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; - - if config.tls_skip_verify { - reqwest::Client::builder() - .use_rustls_tls() - .danger_accept_invalid_certs(true) - .identity(identity) - .build()? - } else if let Some(ca_cert) = &config.ca_cert { + let client: reqwest::Client = match &config.mode { + ConsulDiscoveryMode::Node => { + if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - - reqwest::Client::builder() - .use_rustls_tls() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .identity(identity) - .build()? - } else { - reqwest::Client::builder() - .use_rustls_tls() - .identity(identity) - .build()? + builder = builder.use_rustls_tls(); + builder = builder + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); } + + match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; + + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; + + builder = builder.use_rustls_tls(); + builder = builder.identity(identity); + } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + } + + builder.build()? + } + ConsulDiscoveryMode::Service => { + if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder + .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + builder = builder.use_rustls_tls(); + } + + if let Some(token) = &config.consul_http_token { + let mut headers = reqwest::header::HeaderMap::new(); + headers.insert( + "x-consul-token", + reqwest::header::HeaderValue::from_str(&token)?, + ); + builder = builder.default_headers(headers); + } + + builder.build()? } - (None, None) => reqwest::Client::new(), - _ => return Err(ConsulError::InvalidTLSConfig), }; Ok(Self { client, config }) @@ -110,11 +150,14 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::().ok(); - let pubkey = ent - .node_meta - .get("pubkey") - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); + let pubkey = match &self.config.mode { + ConsulDiscoveryMode::Node => ent.node_meta.get("pubkey"), + ConsulDiscoveryMode::Service => { + ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) + } + } + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { @@ -138,29 +181,63 @@ impl ConsulDiscovery { rpc_public_addr: SocketAddr, ) -> Result<(), ConsulError> { let node = format!("garage:{}", hex::encode(&node_id[..8])); + let tags = [ + vec!["advertised-by-garage".into(), hostname.into()], + self.config.tags.clone(), + ] + .concat(); - let advertisement = ConsulPublishEntry { - node: node.clone(), - address: rpc_public_addr.ip(), - node_meta: [ - ("pubkey".to_string(), hex::encode(node_id)), - ("hostname".to_string(), hostname.to_string()), - ] - .iter() - .cloned() - .collect(), - service: ConsulPublishService { - service_id: node.clone(), - service_name: self.config.service_name.clone(), - tags: vec!["advertised-by-garage".into(), hostname.into()], - address: rpc_public_addr.ip(), - port: rpc_public_addr.port(), - }, + let meta_prefix: String = match &self.config.mode { + ConsulDiscoveryMode::Node => "".to_string(), + ConsulDiscoveryMode::Service => format!("{}-", META_PREFIX), }; - let url = format!("{}/v1/catalog/register", self.config.consul_http_addr); + let mut meta = HashMap::from([ + (format!("{}pubkey", meta_prefix), hex::encode(node_id)), + (format!("{}hostname", meta_prefix), hostname.to_string()), + ]); - let http = self.client.put(&url).json(&advertisement).send().await?; + if let Some(global_meta) = &self.config.meta { + for (key, value) in global_meta.into_iter() { + meta.insert(key.clone(), value.clone()); + } + } + + let url = format!( + "{}/v1/{}", + self.config.consul_http_addr, + (match &self.config.mode { + ConsulDiscoveryMode::Node => "catalog/register", + ConsulDiscoveryMode::Service => "agent/service/register?replace-existing-checks", + }) + ); + + let req = self.client.put(&url); + let http = (match &self.config.mode { + ConsulDiscoveryMode::Node => req.json(&ConsulPublishEntry { + node: node.clone(), + address: rpc_public_addr.ip(), + node_meta: meta.clone(), + service: ConsulPublishCatalogService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + service_meta: meta.clone(), + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }, + }), + ConsulDiscoveryMode::Service => req.json(&ConsulPublishService { + service_id: node.clone(), + service_name: self.config.service_name.clone(), + tags, + meta, + address: rpc_public_addr.ip(), + port: rpc_public_addr.port(), + }), + }) + .send() + .await?; http.error_for_status()?; Ok(()) @@ -176,4 +253,6 @@ pub enum ConsulError { Reqwest(#[error(source)] reqwest::Error), #[error(display = "Invalid Consul TLS configuration")] InvalidTLSConfig, + #[error(display = "Token error: {}", _0)] + Token(#[error(source)] reqwest::header::InvalidHeaderValue), } diff --git a/src/rpc/consul_services.rs b/src/rpc/consul_services.rs deleted file mode 100644 index aaf3c4a1..00000000 --- a/src/rpc/consul_services.rs +++ /dev/null @@ -1,174 +0,0 @@ -use std::collections::HashMap; -use std::fs::File; -use std::io::Read; -use std::net::{IpAddr, SocketAddr}; - -use err_derive::Error; -use serde::{Deserialize, Serialize}; - -use netapp::NodeID; - -use garage_util::config::ConsulServiceConfig; - -const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; - -#[derive(Deserialize, Clone, Debug)] -struct ConsulQueryEntry { - #[serde(rename = "Address")] - address: String, - #[serde(rename = "ServicePort")] - service_port: u16, - #[serde(rename = "ServiceMeta")] - service_meta: HashMap, -} - -#[derive(Serialize, Clone, Debug)] -struct ConsulPublishService { - #[serde(rename = "ID")] - service_id: String, - #[serde(rename = "Name")] - service_name: String, - #[serde(rename = "Tags")] - tags: Vec, - #[serde(rename = "Address")] - address: IpAddr, - #[serde(rename = "Port")] - port: u16, - #[serde(rename = "Meta")] - meta: HashMap, -} - -// ---- - -pub struct ConsulServiceDiscovery { - config: ConsulServiceConfig, - client: reqwest::Client, -} - -impl ConsulServiceDiscovery { - pub fn new(config: ConsulServiceConfig) -> Result { - let mut builder: reqwest::ClientBuilder = match &config.ca_cert { - Some(client_ca) => { - let mut ca_cert_buf = vec![]; - File::open(client_ca)?.read_to_end(&mut ca_cert_buf)?; - - let req: reqwest::ClientBuilder = reqwest::Client::builder() - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?) - .use_rustls_tls(); - - if config.tls_skip_verify { - req.danger_accept_invalid_certs(true) - } else { - req - } - } - None => reqwest::Client::builder(), - }; - - if let Some(token) = &config.consul_http_token { - let mut headers = reqwest::header::HeaderMap::new(); - headers.insert( - "x-consul-token", - reqwest::header::HeaderValue::from_str(&token)?, - ); - builder = builder.default_headers(headers); - } - - let client = builder.build()?; - - Ok(Self { client, config }) - } - - // ---- READING FROM CONSUL CATALOG ---- - - pub async fn get_consul_services(&self) -> Result, ConsulError> { - let url = format!( - "{}/v1/catalog/service/{}", - self.config.consul_http_addr, self.config.service_name - ); - - let req = self.client.get(&url); - let http = req.send().await?; - let entries: Vec = http.json().await?; - - let mut ret = vec![]; - for ent in entries { - let ip = ent.address.parse::().ok(); - let pubkey = ent - .service_meta - .get(&format!("{}-pubkey", META_PREFIX)) - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); - if let (Some(ip), Some(pubkey)) = (ip, pubkey) { - ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); - } else { - warn!( - "Could not process node spec from Consul: {:?} (invalid IP or public key)", - ent - ); - } - } - debug!("Got nodes from Consul: {:?}", ret); - - Ok(ret) - } - - // ---- PUBLISHING TO CONSUL CATALOG ---- - - pub async fn publish_consul_service( - &self, - node_id: NodeID, - hostname: &str, - rpc_public_addr: SocketAddr, - ) -> Result<(), ConsulError> { - let node = format!("garage:{}", hex::encode(&node_id[..8])); - - let tags = [ - vec!["advertised-by-garage".into(), hostname.into()], - self.config.tags.clone(), - ] - .concat(); - - let mut meta = HashMap::from([ - (format!("{}-pubkey", META_PREFIX), hex::encode(node_id)), - (format!("{}-hostname", META_PREFIX), hostname.to_string()), - ]); - - if let Some(global_meta) = &self.config.meta { - for (key, value) in global_meta.into_iter() { - meta.insert(key.clone(), value.clone()); - } - } - - let advertisement: ConsulPublishService = ConsulPublishService { - service_id: node.clone(), - service_name: self.config.service_name.clone(), - tags, - meta, - address: rpc_public_addr.ip(), - port: rpc_public_addr.port(), - }; - - let url = format!( - "{}/v1/agent/service/register?replace-existing-checks", - self.config.consul_http_addr - ); - - let req = self.client.put(&url); - let http = req.json(&advertisement).send().await?; - http.error_for_status()?; - - Ok(()) - } -} - -/// Regroup all Consul discovery errors -#[derive(Debug, Error)] -pub enum ConsulError { - #[error(display = "IO error: {}", _0)] - Io(#[error(source)] std::io::Error), - #[error(display = "HTTP error: {}", _0)] - Reqwest(#[error(source)] reqwest::Error), - #[error(display = "Invalid HTTP header error: {}", _0)] - HeaderValue(#[error(source)] reqwest::header::InvalidHeaderValue), -} diff --git a/src/rpc/lib.rs b/src/rpc/lib.rs index 2c5ed107..5aec92c0 100644 --- a/src/rpc/lib.rs +++ b/src/rpc/lib.rs @@ -8,8 +8,6 @@ mod system_metrics; #[cfg(feature = "consul-discovery")] mod consul; -#[cfg(feature = "consul-service-discovery")] -mod consul_services; #[cfg(feature = "kubernetes-discovery")] mod kubernetes; diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 287cd666..b42e49fc 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -32,8 +32,6 @@ use garage_util::time::*; #[cfg(feature = "consul-discovery")] use crate::consul::ConsulDiscovery; -#[cfg(feature = "consul-service-discovery")] -use crate::consul_services::ConsulServiceDiscovery; #[cfg(feature = "kubernetes-discovery")] use crate::kubernetes::*; use crate::layout::*; @@ -100,18 +98,12 @@ pub struct System { system_endpoint: Arc>, rpc_listen_addr: SocketAddr, - #[cfg(any( - feature = "consul-discovery", - feature = "consul-service-discovery", - feature = "kubernetes-discovery" - ))] + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr: Option, bootstrap_peers: Vec, #[cfg(feature = "consul-discovery")] consul_discovery: Option, - #[cfg(feature = "consul-service-discovery")] - consul_service_discovery: Option, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: Option, @@ -354,19 +346,6 @@ impl System { warn!("Consul discovery is not enabled in this build."); } - #[cfg(feature = "consul-service-discovery")] - let consul_service_discovery = match &config.consul_service_discovery { - Some(cfg) => Some( - ConsulServiceDiscovery::new(cfg.clone()) - .ok_or_message("Invalid Consul service discovery configuration")?, - ), - None => None, - }; - #[cfg(not(feature = "consul-service-discovery"))] - if config.consul_service_discovery.is_some() { - warn!("Consul service discovery is not enabled in this build."); - } - #[cfg(not(feature = "kubernetes-discovery"))] if config.kubernetes_discovery.is_some() { warn!("Kubernetes discovery is not enabled in this build."); @@ -390,17 +369,11 @@ impl System { replication_mode, replication_factor, rpc_listen_addr: config.rpc_bind_addr, - #[cfg(any( - feature = "consul-discovery", - feature = "consul-service-discovery", - feature = "kubernetes-discovery" - ))] + #[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] rpc_public_addr, bootstrap_peers: config.bootstrap_peers.clone(), #[cfg(feature = "consul-discovery")] consul_discovery, - #[cfg(feature = "consul-service-discovery")] - consul_service_discovery, #[cfg(feature = "kubernetes-discovery")] kubernetes_discovery: config.kubernetes_discovery.clone(), metrics, @@ -582,33 +555,6 @@ impl System { } } - #[cfg(feature = "consul-service-discovery")] - async fn advertise_to_consul(self: Arc) { - let c = match &self.consul_service_discovery { - Some(c) => c, - _ => return, - }; - - let rpc_public_addr = match self.rpc_public_addr { - Some(addr) => addr, - None => { - warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected."); - return; - } - }; - - if let Err(e) = c - .publish_consul_service( - self.netapp.id, - &self.local_status.load_full().hostname, - rpc_public_addr, - ) - .await - { - error!("Error while publishing Consul service: {}", e); - } - } - #[cfg(feature = "kubernetes-discovery")] async fn advertise_to_kubernetes(self: Arc) { let k = match &self.kubernetes_discovery { @@ -798,7 +744,7 @@ impl System { ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) } - // Fetch peer list from Consul Nodes + // Fetch peer list from Consul #[cfg(feature = "consul-discovery")] if let Some(c) = &self.consul_discovery { match c.get_consul_nodes().await { @@ -811,19 +757,6 @@ impl System { } } - // Fetch peer list from Consul Services - #[cfg(feature = "consul-service-discovery")] - if let Some(c) = &self.consul_service_discovery { - match c.get_consul_services().await { - Ok(node_list) => { - ping_list.extend(node_list); - } - Err(e) => { - warn!("Could not retrieve service list from Consul: {}", e); - } - } - } - // Fetch peer list from Kubernetes #[cfg(feature = "kubernetes-discovery")] if let Some(k) = &self.kubernetes_discovery { @@ -863,9 +796,6 @@ impl System { #[cfg(feature = "consul-discovery")] tokio::spawn(self.clone().advertise_to_consul()); - #[cfg(feature = "consul-service-discovery")] - tokio::spawn(self.clone().advertise_to_consul()); - #[cfg(feature = "kubernetes-discovery")] tokio::spawn(self.clone().advertise_to_kubernetes()); diff --git a/src/util/config.rs b/src/util/config.rs index 84a8e34f..632d22ef 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -56,9 +56,6 @@ pub struct Config { /// Configuration for automatic node discovery through Consul #[serde(default)] pub consul_discovery: Option, - /// Configuration for automatic node discovery through Consul - #[serde(default)] - pub consul_service_discovery: Option, /// Configuration for automatic node discovery through Kubernetes #[serde(default)] pub kubernetes_discovery: Option, @@ -138,8 +135,23 @@ pub struct AdminConfig { pub trace_sink: Option, } +#[derive(Deserialize, Debug, Clone)] +pub enum ConsulDiscoveryMode { + #[serde(rename_all = "lowercase")] + Node, + Service, +} +impl ConsulDiscoveryMode { + fn default() -> Self { + ConsulDiscoveryMode::Node + } +} + #[derive(Deserialize, Debug, Clone)] pub struct ConsulDiscoveryConfig { + /// Mode of consul operation: either `node` (the default) or `service` + #[serde(default = "ConsulDiscoveryMode::default")] + pub mode: ConsulDiscoveryMode, /// Consul http or https address to connect to to discover more peers pub consul_http_addr: String, /// Consul service name to use @@ -150,30 +162,17 @@ pub struct ConsulDiscoveryConfig { pub client_cert: Option, /// Client TLS key to use when connecting to Consul pub client_key: Option, + /// /// Token to use for connecting to consul + pub consul_http_token: Option, /// Skip TLS hostname verification #[serde(default)] pub tls_skip_verify: bool, -} - -#[derive(Deserialize, Debug, Clone)] -pub struct ConsulServiceConfig { - /// Consul http or https address to connect to to discover more peers - pub consul_http_addr: String, - /// Token to use for connecting to consul - pub consul_http_token: Option, - /// Consul service name to use - pub service_name: String, - /// CA TLS certificate to use when connecting to Consul - pub ca_cert: Option, - // Additional tags to add to the service + /// Additional tags to add to the service #[serde(default)] pub tags: Vec, - // Additional service metadata to add + /// Additional service metadata to add #[serde(default)] pub meta: Option>, - /// Skip TLS hostname verification - #[serde(default)] - pub tls_skip_verify: bool, } #[derive(Deserialize, Debug, Clone)] From 011f4730483dc7b75720c7b800702ea7c1a925c1 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Wed, 10 May 2023 13:22:14 -0600 Subject: [PATCH 05/10] revert rpc/Cargo.toml --- src/rpc/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index d168e246..f0fde7a7 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -50,5 +50,4 @@ netapp = { version = "0.5.2", features = ["telemetry"] } [features] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] consul-discovery = [ "reqwest", "err-derive" ] -consul-service-discovery = [ "reqwest", "err-derive" ] system-libs = [ "sodiumoxide/use-pkg-config" ] From 6b69404f1a53b927b4ce3cabdbb41f58e832a963 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Wed, 10 May 2023 20:11:14 -0600 Subject: [PATCH 06/10] rename mode to consul_http_api --- doc/book/reference-manual/configuration.md | 20 ++++---- src/rpc/consul.rs | 58 ++++++++++------------ src/util/config.rs | 16 +++--- 3 files changed, 43 insertions(+), 51 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 819a5b88..50921824 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -318,16 +318,16 @@ Garage supports discovering other nodes of the cluster using Consul. For this to work correctly, nodes need to know their IP address by which they can be reached by other nodes of the cluster, which should be set in `rpc_public_addr`. -### `mode` - -Two modes of service discovery are supported: `node` and `service`. `node`, the default will register a service using -the `/v1/catalog` endpoints and mTLS (if `client_cert` and `client_key` are provided). `service` mode uses the -`v1/agent` endpoints instead, where an optional `consul_http_token` may be provided. - ### `consul_http_addr` and `service_name` The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server. +### `consul_http_api` + +Two APIs for service registration are supported: `catalog` and `agent`. `catalog`, the default, will register a service using +the `/v1/catalog` endpoints and mTLS (if `client_cert` and `client_key` are provided). The `agent` API uses the +`v1/agent` endpoints instead, where an optional `consul_http_token` may be provided. + ### `service_name` `service_name` should be set to the service name under which Garage's @@ -335,8 +335,8 @@ RPC ports are announced. ### `client_cert`, `client_key` -`node` mode only. TLS client certificate and client key to use when communicating with Consul over TLS. -Both are mandatory when doing so. +TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so. +Only available when `consul_http_api = "catalog"`. ### `ca_cert` @@ -349,8 +349,8 @@ Skip server hostname verification in TLS handshake. ### `consul_http_token` -`service` mode only. Uses the provided token for communication with Consul. The policy assigned to this token -should at least have these rules: +Uses the provided token for communication with Consul. Only available when `consul_http_api = "agent"`. +The policy assigned to this token should at least have these rules: ```hcl // the `service_name` specified above diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index cd29893f..08fb0418 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -8,8 +8,8 @@ use serde::{Deserialize, Serialize}; use netapp::NodeID; +use garage_util::config::ConsulDiscoveryAPI; use garage_util::config::ConsulDiscoveryConfig; -use garage_util::config::ConsulDiscoveryMode; const META_PREFIX: &str = "fr-deuxfleurs-garagehq"; @@ -78,18 +78,18 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result { let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); - builder = builder.danger_accept_invalid_certs(config.tls_skip_verify); - - let client: reqwest::Client = match &config.mode { - ConsulDiscoveryMode::Node => { - if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder.use_rustls_tls(); - builder = builder - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); - } + if config.tls_skip_verify { + builder = builder.danger_accept_invalid_certs(true); + } else if let Some(ca_cert) = &config.ca_cert { + let mut ca_cert_buf = vec![]; + File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; + builder = builder.use_rustls_tls(); + builder = + builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); + } + let client: reqwest::Client = match &config.consul_http_api { + ConsulDiscoveryAPI::Catalog => { match (&config.client_cert, &config.client_key) { (Some(client_cert), Some(client_key)) => { let mut client_cert_buf = vec![]; @@ -111,15 +111,7 @@ impl ConsulDiscovery { builder.build()? } - ConsulDiscoveryMode::Service => { - if let Some(ca_cert) = &config.ca_cert { - let mut ca_cert_buf = vec![]; - File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder - .add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); - builder = builder.use_rustls_tls(); - } - + ConsulDiscoveryAPI::Agent => { if let Some(token) = &config.consul_http_token { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( @@ -150,9 +142,9 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::().ok(); - let pubkey = match &self.config.mode { - ConsulDiscoveryMode::Node => ent.node_meta.get("pubkey"), - ConsulDiscoveryMode::Service => { + let pubkey = match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => ent.node_meta.get("pubkey"), + ConsulDiscoveryAPI::Agent => { ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) } } @@ -187,9 +179,9 @@ impl ConsulDiscovery { ] .concat(); - let meta_prefix: String = match &self.config.mode { - ConsulDiscoveryMode::Node => "".to_string(), - ConsulDiscoveryMode::Service => format!("{}-", META_PREFIX), + let meta_prefix: String = match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => "".to_string(), + ConsulDiscoveryAPI::Agent => format!("{}-", META_PREFIX), }; let mut meta = HashMap::from([ @@ -206,15 +198,15 @@ impl ConsulDiscovery { let url = format!( "{}/v1/{}", self.config.consul_http_addr, - (match &self.config.mode { - ConsulDiscoveryMode::Node => "catalog/register", - ConsulDiscoveryMode::Service => "agent/service/register?replace-existing-checks", + (match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => "catalog/register", + ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks", }) ); let req = self.client.put(&url); - let http = (match &self.config.mode { - ConsulDiscoveryMode::Node => req.json(&ConsulPublishEntry { + let http = (match &self.config.consul_http_api { + ConsulDiscoveryAPI::Catalog => req.json(&ConsulPublishEntry { node: node.clone(), address: rpc_public_addr.ip(), node_meta: meta.clone(), @@ -227,7 +219,7 @@ impl ConsulDiscovery { port: rpc_public_addr.port(), }, }), - ConsulDiscoveryMode::Service => req.json(&ConsulPublishService { + ConsulDiscoveryAPI::Agent => req.json(&ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, diff --git a/src/util/config.rs b/src/util/config.rs index 632d22ef..8b723e47 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -136,22 +136,22 @@ pub struct AdminConfig { } #[derive(Deserialize, Debug, Clone)] -pub enum ConsulDiscoveryMode { +pub enum ConsulDiscoveryAPI { #[serde(rename_all = "lowercase")] - Node, - Service, + Catalog, + Agent, } -impl ConsulDiscoveryMode { +impl ConsulDiscoveryAPI { fn default() -> Self { - ConsulDiscoveryMode::Node + ConsulDiscoveryAPI::Catalog } } #[derive(Deserialize, Debug, Clone)] pub struct ConsulDiscoveryConfig { - /// Mode of consul operation: either `node` (the default) or `service` - #[serde(default = "ConsulDiscoveryMode::default")] - pub mode: ConsulDiscoveryMode, + /// The consul api to use when registering: either `catalog` (the default) or `agent` + #[serde(default = "ConsulDiscoveryAPI::default")] + pub consul_http_api: ConsulDiscoveryAPI, /// Consul http or https address to connect to to discover more peers pub consul_http_addr: String, /// Consul service name to use From b7705041268e49f2a5ba9a719372048f85c3de83 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 15 May 2023 16:15:56 -0600 Subject: [PATCH 07/10] simplify code according to feedback --- doc/book/reference-manual/configuration.md | 6 +- src/rpc/consul.rs | 104 +++++++++------------ src/util/config.rs | 15 +-- 3 files changed, 50 insertions(+), 75 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 50921824..2fdfce8f 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -35,14 +35,14 @@ bootstrap_peers = [ [consul_discovery] -mode = "node" +api = "catalog" consul_http_addr = "http://127.0.0.1:8500" service_name = "garage-daemon" ca_cert = "/etc/consul/consul-ca.crt" client_cert = "/etc/consul/consul-client.crt" client_key = "/etc/consul/consul-key.crt" -# for `service` mode, unset client_cert and client_key, and optionally enable `consul_http_token` -# consul_http_token = "abcdef-01234-56789" +# for `catalog` API mode, unset client_cert and client_key, and optionally enable `token` +# token = "abcdef-01234-56789" tls_skip_verify = false tags = [ "dns-enabled" ] meta = { dns-acl = "allow trusted" } diff --git a/src/rpc/consul.rs b/src/rpc/consul.rs index 08fb0418..ab8d1112 100644 --- a/src/rpc/consul.rs +++ b/src/rpc/consul.rs @@ -19,10 +19,15 @@ struct ConsulQueryEntry { address: String, #[serde(rename = "ServicePort")] service_port: u16, - #[serde(rename = "NodeMeta")] - node_meta: HashMap, #[serde(rename = "ServiceMeta")] - service_meta: HashMap, + meta: HashMap, +} + +#[derive(Serialize, Clone, Debug)] +#[serde(untagged)] +enum PublishRequest { + Catalog(ConsulPublishEntry), + Service(ConsulPublishService), } #[derive(Serialize, Clone, Debug)] @@ -31,8 +36,6 @@ struct ConsulPublishEntry { node: String, #[serde(rename = "Address")] address: IpAddr, - #[serde(rename = "NodeMeta")] - node_meta: HashMap, #[serde(rename = "Service")] service: ConsulPublishCatalogService, } @@ -46,7 +49,7 @@ struct ConsulPublishCatalogService { #[serde(rename = "Tags")] tags: Vec, #[serde(rename = "Meta")] - service_meta: HashMap, + meta: HashMap, #[serde(rename = "Address")] address: IpAddr, #[serde(rename = "Port")] @@ -77,42 +80,36 @@ pub struct ConsulDiscovery { impl ConsulDiscovery { pub fn new(config: ConsulDiscoveryConfig) -> Result { - let mut builder: reqwest::ClientBuilder = reqwest::Client::builder(); + let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls(); if config.tls_skip_verify { builder = builder.danger_accept_invalid_certs(true); } else if let Some(ca_cert) = &config.ca_cert { let mut ca_cert_buf = vec![]; File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?; - builder = builder.use_rustls_tls(); builder = builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?); } - let client: reqwest::Client = match &config.consul_http_api { - ConsulDiscoveryAPI::Catalog => { - match (&config.client_cert, &config.client_key) { - (Some(client_cert), Some(client_key)) => { - let mut client_cert_buf = vec![]; - File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; + match &config.api { + ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) { + (Some(client_cert), Some(client_key)) => { + let mut client_cert_buf = vec![]; + File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; - let mut client_key_buf = vec![]; - File::open(client_key)?.read_to_end(&mut client_key_buf)?; + let mut client_key_buf = vec![]; + File::open(client_key)?.read_to_end(&mut client_key_buf)?; - let identity = reqwest::Identity::from_pem( - &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], - )?; + let identity = reqwest::Identity::from_pem( + &[&client_cert_buf[..], &client_key_buf[..]].concat()[..], + )?; - builder = builder.use_rustls_tls(); - builder = builder.identity(identity); - } - (None, None) => {} - _ => return Err(ConsulError::InvalidTLSConfig), + builder = builder.identity(identity); } - - builder.build()? - } + (None, None) => {} + _ => return Err(ConsulError::InvalidTLSConfig), + }, ConsulDiscoveryAPI::Agent => { - if let Some(token) = &config.consul_http_token { + if let Some(token) = &config.token { let mut headers = reqwest::header::HeaderMap::new(); headers.insert( "x-consul-token", @@ -120,11 +117,11 @@ impl ConsulDiscovery { ); builder = builder.default_headers(headers); } - - builder.build()? } }; + let client: reqwest::Client = builder.build()?; + Ok(Self { client, config }) } @@ -142,14 +139,11 @@ impl ConsulDiscovery { let mut ret = vec![]; for ent in entries { let ip = ent.address.parse::().ok(); - let pubkey = match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => ent.node_meta.get("pubkey"), - ConsulDiscoveryAPI::Agent => { - ent.service_meta.get(&format!("{}-pubkey", META_PREFIX)) - } - } - .and_then(|k| hex::decode(k).ok()) - .and_then(|k| NodeID::from_slice(&k[..])); + let pubkey = ent + .meta + .get(&format!("{}-pubkey", META_PREFIX)) + .and_then(|k| hex::decode(k).ok()) + .and_then(|k| NodeID::from_slice(&k[..])); if let (Some(ip), Some(pubkey)) = (ip, pubkey) { ret.push((pubkey, SocketAddr::new(ip, ent.service_port))); } else { @@ -179,47 +173,34 @@ impl ConsulDiscovery { ] .concat(); - let meta_prefix: String = match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => "".to_string(), - ConsulDiscoveryAPI::Agent => format!("{}-", META_PREFIX), - }; - - let mut meta = HashMap::from([ - (format!("{}pubkey", meta_prefix), hex::encode(node_id)), - (format!("{}hostname", meta_prefix), hostname.to_string()), - ]); - - if let Some(global_meta) = &self.config.meta { - for (key, value) in global_meta.into_iter() { - meta.insert(key.clone(), value.clone()); - } - } + let mut meta = self.config.meta.clone().unwrap_or_default(); + meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id)); + meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string()); let url = format!( "{}/v1/{}", self.config.consul_http_addr, - (match &self.config.consul_http_api { + (match &self.config.api { ConsulDiscoveryAPI::Catalog => "catalog/register", ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks", }) ); let req = self.client.put(&url); - let http = (match &self.config.consul_http_api { - ConsulDiscoveryAPI::Catalog => req.json(&ConsulPublishEntry { + let advertisement: PublishRequest = match &self.config.api { + ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry { node: node.clone(), address: rpc_public_addr.ip(), - node_meta: meta.clone(), service: ConsulPublishCatalogService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, - service_meta: meta.clone(), + meta: meta.clone(), address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }, }), - ConsulDiscoveryAPI::Agent => req.json(&ConsulPublishService { + ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService { service_id: node.clone(), service_name: self.config.service_name.clone(), tags, @@ -227,9 +208,8 @@ impl ConsulDiscovery { address: rpc_public_addr.ip(), port: rpc_public_addr.port(), }), - }) - .send() - .await?; + }; + let http = req.json(&advertisement).send().await?; http.error_for_status()?; Ok(()) diff --git a/src/util/config.rs b/src/util/config.rs index 8b723e47..647c2659 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -135,23 +135,18 @@ pub struct AdminConfig { pub trace_sink: Option, } -#[derive(Deserialize, Debug, Clone)] +#[derive(Deserialize, Debug, Clone, Default)] +#[serde(rename_all = "lowercase")] pub enum ConsulDiscoveryAPI { - #[serde(rename_all = "lowercase")] + #[default] Catalog, Agent, } -impl ConsulDiscoveryAPI { - fn default() -> Self { - ConsulDiscoveryAPI::Catalog - } -} #[derive(Deserialize, Debug, Clone)] pub struct ConsulDiscoveryConfig { /// The consul api to use when registering: either `catalog` (the default) or `agent` - #[serde(default = "ConsulDiscoveryAPI::default")] - pub consul_http_api: ConsulDiscoveryAPI, + pub api: ConsulDiscoveryAPI, /// Consul http or https address to connect to to discover more peers pub consul_http_addr: String, /// Consul service name to use @@ -163,7 +158,7 @@ pub struct ConsulDiscoveryConfig { /// Client TLS key to use when connecting to Consul pub client_key: Option, /// /// Token to use for connecting to consul - pub consul_http_token: Option, + pub token: Option, /// Skip TLS hostname verification #[serde(default)] pub tls_skip_verify: bool, From 2d46d24d06849584e751ffcf6842b4d3016b6f77 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 15 May 2023 20:02:28 -0600 Subject: [PATCH 08/10] update docs --- doc/book/reference-manual/configuration.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 2fdfce8f..5322b755 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -322,10 +322,10 @@ reached by other nodes of the cluster, which should be set in `rpc_public_addr`. The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server. -### `consul_http_api` +### `api` Two APIs for service registration are supported: `catalog` and `agent`. `catalog`, the default, will register a service using -the `/v1/catalog` endpoints and mTLS (if `client_cert` and `client_key` are provided). The `agent` API uses the +the `/v1/catalog` endpoints, enabling mTLS if `client_cert` and `client_key` are provided. The `agent` API uses the `v1/agent` endpoints instead, where an optional `consul_http_token` may be provided. ### `service_name` @@ -336,7 +336,7 @@ RPC ports are announced. ### `client_cert`, `client_key` TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so. -Only available when `consul_http_api = "catalog"`. +Only available when `api = "catalog"`. ### `ca_cert` @@ -347,9 +347,9 @@ TLS CA certificate to use when communicating with Consul over TLS. Skip server hostname verification in TLS handshake. `ca_cert` is ignored when this is set. -### `consul_http_token` +### `token` -Uses the provided token for communication with Consul. Only available when `consul_http_api = "agent"`. +Uses the provided token for communication with Consul. Only available when `api = "agent"`. The policy assigned to this token should at least have these rules: ```hcl From ef8a7add0865b593836736af64b8577c5375d531 Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Sat, 20 May 2023 21:25:57 -0600 Subject: [PATCH 09/10] set default for [consul-services] api --- src/util/config.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/util/config.rs b/src/util/config.rs index 647c2659..1da95b2f 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -146,6 +146,7 @@ pub enum ConsulDiscoveryAPI { #[derive(Deserialize, Debug, Clone)] pub struct ConsulDiscoveryConfig { /// The consul api to use when registering: either `catalog` (the default) or `agent` + #[serde(default)] pub api: ConsulDiscoveryAPI, /// Consul http or https address to connect to to discover more peers pub consul_http_addr: String, From 32ad4538eec9e844edab7e04e03dee9d594ec8fb Mon Sep 17 00:00:00 2001 From: Roberto Hidalgo Date: Mon, 22 May 2023 08:47:06 -0600 Subject: [PATCH 10/10] fix references to old config names --- doc/book/reference-manual/configuration.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index 5322b755..20a79aa6 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -41,7 +41,7 @@ service_name = "garage-daemon" ca_cert = "/etc/consul/consul-ca.crt" client_cert = "/etc/consul/consul-client.crt" client_key = "/etc/consul/consul-key.crt" -# for `catalog` API mode, unset client_cert and client_key, and optionally enable `token` +# for `agent` API mode, unset client_cert and client_key, and optionally enable `token` # token = "abcdef-01234-56789" tls_skip_verify = false tags = [ "dns-enabled" ] @@ -326,7 +326,7 @@ The `consul_http_addr` parameter should be set to the full HTTP(S) address of th Two APIs for service registration are supported: `catalog` and `agent`. `catalog`, the default, will register a service using the `/v1/catalog` endpoints, enabling mTLS if `client_cert` and `client_key` are provided. The `agent` API uses the -`v1/agent` endpoints instead, where an optional `consul_http_token` may be provided. +`v1/agent` endpoints instead, where an optional `token` may be provided. ### `service_name`