Compare commits

..

No commits in common. "8a945ee9963021594378095c4b949bf15e816439" and "bd6485565e78c0bbb9ee830c4e5b114c6248dc97" have entirely different histories.

10 changed files with 341 additions and 162 deletions

View file

@ -35,18 +35,23 @@ bootstrap_peers = [
[consul_discovery] [consul_discovery]
api = "catalog"
consul_http_addr = "http://127.0.0.1:8500" consul_http_addr = "http://127.0.0.1:8500"
service_name = "garage-daemon" service_name = "garage-daemon"
ca_cert = "/etc/consul/consul-ca.crt" ca_cert = "/etc/consul/consul-ca.crt"
client_cert = "/etc/consul/consul-client.crt" client_cert = "/etc/consul/consul-client.crt"
client_key = "/etc/consul/consul-key.crt" client_key = "/etc/consul/consul-key.crt"
# for `agent` API mode, unset client_cert and client_key, and optionally enable `token`
# token = "abcdef-01234-56789"
tls_skip_verify = false tls_skip_verify = false
tags = [ "dns-enabled" ]
meta = { dns-acl = "allow trusted" }
[consul_service_discovery]
consul_http_addr = "https://127.0.0.1:8501"
consul_http_token = "abcdef-01234-56789"
service_name = "garage"
ca_cert = "/etc/consul/consul-ca.crt"
tls_skip_verify = false
# tags to add to the published service
tags = [ "dns-enabled" ]
# additional service meta to send along registration
meta = { dns-acl = "allow trusted" }
[kubernetes_discovery] [kubernetes_discovery]
namespace = "garage" namespace = "garage"
@ -322,12 +327,6 @@ reached by other nodes of the cluster, which should be set in `rpc_public_addr`.
The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server. The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server.
### `api`
Two APIs for service registration are supported: `catalog` and `agent`. `catalog`, the default, will register a service using
the `/v1/catalog` endpoints, enabling mTLS if `client_cert` and `client_key` are provided. The `agent` API uses the
`v1/agent` endpoints instead, where an optional `token` may be provided.
### `service_name` ### `service_name`
`service_name` should be set to the service name under which Garage's `service_name` should be set to the service name under which Garage's
@ -336,7 +335,6 @@ RPC ports are announced.
### `client_cert`, `client_key` ### `client_cert`, `client_key`
TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so. TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so.
Only available when `api = "catalog"`.
### `ca_cert` ### `ca_cert`
@ -347,29 +345,6 @@ TLS CA certificate to use when communicating with Consul over TLS.
Skip server hostname verification in TLS handshake. Skip server hostname verification in TLS handshake.
`ca_cert` is ignored when this is set. `ca_cert` is ignored when this is set.
### `token`
Uses the provided token for communication with Consul. Only available when `api = "agent"`.
The policy assigned to this token should at least have these rules:
```hcl
// the `service_name` specified above
service "garage" {
policy = "write"
}
service_prefix "" {
policy = "read"
}
node_prefix "" {
policy = "read"
}
```
### `tags` and `meta`
Additional list of tags and map of service meta to add during service registration.
## The `[kubernetes_discovery]` section ## The `[kubernetes_discovery]` section

View file

@ -185,7 +185,7 @@ fn parse_query_authorization(
if duration > 7 * 24 * 3600 { if duration > 7 * 24 * 3600 {
return Err(Error::bad_request( return Err(Error::bad_request(
"X-Amz-Expires may not exceed a week".to_string(), "X-Amz-Exprires may not exceed a week".to_string(),
)); ));
} }
@ -211,7 +211,7 @@ fn parse_query_authorization(
fn parse_credential(cred: &str) -> Result<(String, String), Error> { fn parse_credential(cred: &str) -> Result<(String, String), Error> {
let first_slash = cred let first_slash = cred
.find('/') .find('/')
.ok_or_bad_request("Credentials does not contain '/' in authorization field")?; .ok_or_bad_request("Credentials does not contain / in authorization field")?;
let (key_id, scope) = cred.split_at(first_slash); let (key_id, scope) = cred.split_at(first_slash);
Ok(( Ok((
key_id.to_string(), key_id.to_string(),

View file

@ -88,6 +88,7 @@ sqlite = [ "garage_model/sqlite" ]
# Automatic registration and discovery via Consul API # Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ] consul-discovery = [ "garage_rpc/consul-discovery" ]
consul-service-discovery = [ "garage_rpc/consul-service-discovery" ]
# Automatic registration and discovery via Kubernetes API # Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ] kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint). # Prometheus exporter (/metrics endpoint).

View file

@ -95,6 +95,8 @@ async fn main() {
"sqlite", "sqlite",
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
"consul-discovery", "consul-discovery",
#[cfg(feature = "consul-service-discovery")]
"consul-service-discovery",
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
"kubernetes-discovery", "kubernetes-discovery",
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]

View file

@ -50,4 +50,5 @@ netapp = { version = "0.5.2", features = ["telemetry"] }
[features] [features]
kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ] kubernetes-discovery = [ "kube", "k8s-openapi", "schemars" ]
consul-discovery = [ "reqwest", "err-derive" ] consul-discovery = [ "reqwest", "err-derive" ]
consul-service-discovery = [ "reqwest", "err-derive" ]
system-libs = [ "sodiumoxide/use-pkg-config" ] system-libs = [ "sodiumoxide/use-pkg-config" ]

View file

@ -8,26 +8,16 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID; use netapp::NodeID;
use garage_util::config::ConsulDiscoveryAPI;
use garage_util::config::ConsulDiscoveryConfig; use garage_util::config::ConsulDiscoveryConfig;
const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
#[derive(Deserialize, Clone, Debug)] #[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry { struct ConsulQueryEntry {
#[serde(rename = "Address")] #[serde(rename = "Address")]
address: String, address: String,
#[serde(rename = "ServicePort")] #[serde(rename = "ServicePort")]
service_port: u16, service_port: u16,
#[serde(rename = "ServiceMeta")] #[serde(rename = "NodeMeta")]
meta: HashMap<String, String>, node_meta: HashMap<String, String>,
}
#[derive(Serialize, Clone, Debug)]
#[serde(untagged)]
enum PublishRequest {
Catalog(ConsulPublishEntry),
Service(ConsulPublishService),
} }
#[derive(Serialize, Clone, Debug)] #[derive(Serialize, Clone, Debug)]
@ -36,31 +26,17 @@ struct ConsulPublishEntry {
node: String, node: String,
#[serde(rename = "Address")] #[serde(rename = "Address")]
address: IpAddr, address: IpAddr,
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "Service")] #[serde(rename = "Service")]
service: ConsulPublishCatalogService, service: ConsulPublishService,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishCatalogService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
} }
#[derive(Serialize, Clone, Debug)] #[derive(Serialize, Clone, Debug)]
struct ConsulPublishService { struct ConsulPublishService {
#[serde(rename = "ID")] #[serde(rename = "ID")]
service_id: String, service_id: String,
#[serde(rename = "Name")] #[serde(rename = "Service")]
service_name: String, service_name: String,
#[serde(rename = "Tags")] #[serde(rename = "Tags")]
tags: Vec<String>, tags: Vec<String>,
@ -68,11 +44,10 @@ struct ConsulPublishService {
address: IpAddr, address: IpAddr,
#[serde(rename = "Port")] #[serde(rename = "Port")]
port: u16, port: u16,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
} }
// ---- // ----
pub struct ConsulDiscovery { pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig, config: ConsulDiscoveryConfig,
client: reqwest::Client, client: reqwest::Client,
@ -80,18 +55,7 @@ pub struct ConsulDiscovery {
impl ConsulDiscovery { impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> { pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls(); let client = match (&config.client_cert, &config.client_key) {
if config.tls_skip_verify {
builder = builder.danger_accept_invalid_certs(true);
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
builder =
builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
}
match &config.api {
ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => { (Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![]; let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?; File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
@ -103,24 +67,31 @@ impl ConsulDiscovery {
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..], &[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?; )?;
builder = builder.identity(identity); if config.tls_skip_verify {
} reqwest::Client::builder()
(None, None) => {} .use_rustls_tls()
_ => return Err(ConsulError::InvalidTLSConfig), .danger_accept_invalid_certs(true)
}, .identity(identity)
ConsulDiscoveryAPI::Agent => { .build()?
if let Some(token) = &config.token { } else if let Some(ca_cert) = &config.ca_cert {
let mut headers = reqwest::header::HeaderMap::new(); let mut ca_cert_buf = vec![];
headers.insert( File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
"x-consul-token",
reqwest::header::HeaderValue::from_str(&token)?,
);
builder = builder.default_headers(headers);
}
}
};
let client: reqwest::Client = builder.build()?; reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?
} else {
reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?
}
}
(None, None) => reqwest::Client::new(),
_ => return Err(ConsulError::InvalidTLSConfig),
};
Ok(Self { client, config }) Ok(Self { client, config })
} }
@ -140,8 +111,8 @@ impl ConsulDiscovery {
for ent in entries { for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok(); let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent let pubkey = ent
.meta .node_meta
.get(&format!("{}-pubkey", META_PREFIX)) .get("pubkey")
.and_then(|k| hex::decode(k).ok()) .and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..])); .and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) { if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
@ -167,49 +138,29 @@ impl ConsulDiscovery {
rpc_public_addr: SocketAddr, rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> { ) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8])); let node = format!("garage:{}", hex::encode(&node_id[..8]));
let tags = [
vec!["advertised-by-garage".into(), hostname.into()],
self.config.tags.clone(),
]
.concat();
let mut meta = self.config.meta.clone().unwrap_or_default(); let advertisement = ConsulPublishEntry {
meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id));
meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string());
let url = format!(
"{}/v1/{}",
self.config.consul_http_addr,
(match &self.config.api {
ConsulDiscoveryAPI::Catalog => "catalog/register",
ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks",
})
);
let req = self.client.put(&url);
let advertisement: PublishRequest = match &self.config.api {
ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry {
node: node.clone(), node: node.clone(),
address: rpc_public_addr.ip(), address: rpc_public_addr.ip(),
service: ConsulPublishCatalogService { node_meta: [
("pubkey".to_string(), hex::encode(node_id)),
("hostname".to_string(), hostname.to_string()),
]
.iter()
.cloned()
.collect(),
service: ConsulPublishService {
service_id: node.clone(), service_id: node.clone(),
service_name: self.config.service_name.clone(), service_name: self.config.service_name.clone(),
tags, tags: vec!["advertised-by-garage".into(), hostname.into()],
meta: meta.clone(),
address: rpc_public_addr.ip(), address: rpc_public_addr.ip(),
port: rpc_public_addr.port(), port: rpc_public_addr.port(),
}, },
}),
ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags,
meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
}),
}; };
let http = req.json(&advertisement).send().await?;
let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
let http = self.client.put(&url).json(&advertisement).send().await?;
http.error_for_status()?; http.error_for_status()?;
Ok(()) Ok(())
@ -225,6 +176,4 @@ pub enum ConsulError {
Reqwest(#[error(source)] reqwest::Error), Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")] #[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig, InvalidTLSConfig,
#[error(display = "Token error: {}", _0)]
Token(#[error(source)] reqwest::header::InvalidHeaderValue),
} }

174
src/rpc/consul_services.rs Normal file
View file

@ -0,0 +1,174 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use err_derive::Error;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::config::ConsulServiceConfig;
const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "ServicePort")]
service_port: u16,
#[serde(rename = "ServiceMeta")]
service_meta: HashMap<String, String>,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Name")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
}
// ----
pub struct ConsulServiceDiscovery {
config: ConsulServiceConfig,
client: reqwest::Client,
}
impl ConsulServiceDiscovery {
pub fn new(config: ConsulServiceConfig) -> Result<Self, ConsulError> {
let mut builder: reqwest::ClientBuilder = match &config.ca_cert {
Some(client_ca) => {
let mut ca_cert_buf = vec![];
File::open(client_ca)?.read_to_end(&mut ca_cert_buf)?;
let req: reqwest::ClientBuilder = reqwest::Client::builder()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.use_rustls_tls();
if config.tls_skip_verify {
req.danger_accept_invalid_certs(true)
} else {
req
}
}
None => reqwest::Client::builder(),
};
if let Some(token) = &config.consul_http_token {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"x-consul-token",
reqwest::header::HeaderValue::from_str(&token)?,
);
builder = builder.default_headers(headers);
}
let client = builder.build()?;
Ok(Self { client, config })
}
// ---- READING FROM CONSUL CATALOG ----
pub async fn get_consul_services(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
let url = format!(
"{}/v1/catalog/service/{}",
self.config.consul_http_addr, self.config.service_name
);
let req = self.client.get(&url);
let http = req.send().await?;
let entries: Vec<ConsulQueryEntry> = http.json().await?;
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.service_meta
.get(&format!("{}-pubkey", META_PREFIX))
.and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
"Could not process node spec from Consul: {:?} (invalid IP or public key)",
ent
);
}
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
// ---- PUBLISHING TO CONSUL CATALOG ----
pub async fn publish_consul_service(
&self,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
let tags = [
vec!["advertised-by-garage".into(), hostname.into()],
self.config.tags.clone(),
]
.concat();
let mut meta = HashMap::from([
(format!("{}-pubkey", META_PREFIX), hex::encode(node_id)),
(format!("{}-hostname", META_PREFIX), hostname.to_string()),
]);
if let Some(global_meta) = &self.config.meta {
for (key, value) in global_meta.into_iter() {
meta.insert(key.clone(), value.clone());
}
}
let advertisement: ConsulPublishService = ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags,
meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
};
let url = format!(
"{}/v1/agent/service/register?replace-existing-checks",
self.config.consul_http_addr
);
let req = self.client.put(&url);
let http = req.json(&advertisement).send().await?;
http.error_for_status()?;
Ok(())
}
}
/// Regroup all Consul discovery errors
#[derive(Debug, Error)]
pub enum ConsulError {
#[error(display = "IO error: {}", _0)]
Io(#[error(source)] std::io::Error),
#[error(display = "HTTP error: {}", _0)]
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid HTTP header error: {}", _0)]
HeaderValue(#[error(source)] reqwest::header::InvalidHeaderValue),
}

View file

@ -8,6 +8,8 @@ mod system_metrics;
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
mod consul; mod consul;
#[cfg(feature = "consul-service-discovery")]
mod consul_services;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
mod kubernetes; mod kubernetes;

View file

@ -32,6 +32,8 @@ use garage_util::time::*;
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
use crate::consul::ConsulDiscovery; use crate::consul::ConsulDiscovery;
#[cfg(feature = "consul-service-discovery")]
use crate::consul_services::ConsulServiceDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::*; use crate::layout::*;
@ -98,12 +100,18 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr, rpc_listen_addr: SocketAddr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] #[cfg(any(
feature = "consul-discovery",
feature = "consul-service-discovery",
feature = "kubernetes-discovery"
))]
rpc_public_addr: Option<SocketAddr>, rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>, bootstrap_peers: Vec<String>,
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
consul_discovery: Option<ConsulDiscovery>, consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "consul-service-discovery")]
consul_service_discovery: Option<ConsulServiceDiscovery>,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>, kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
@ -346,6 +354,19 @@ impl System {
warn!("Consul discovery is not enabled in this build."); warn!("Consul discovery is not enabled in this build.");
} }
#[cfg(feature = "consul-service-discovery")]
let consul_service_discovery = match &config.consul_service_discovery {
Some(cfg) => Some(
ConsulServiceDiscovery::new(cfg.clone())
.ok_or_message("Invalid Consul service discovery configuration")?,
),
None => None,
};
#[cfg(not(feature = "consul-service-discovery"))]
if config.consul_service_discovery.is_some() {
warn!("Consul service discovery is not enabled in this build.");
}
#[cfg(not(feature = "kubernetes-discovery"))] #[cfg(not(feature = "kubernetes-discovery"))]
if config.kubernetes_discovery.is_some() { if config.kubernetes_discovery.is_some() {
warn!("Kubernetes discovery is not enabled in this build."); warn!("Kubernetes discovery is not enabled in this build.");
@ -369,11 +390,17 @@ impl System {
replication_mode, replication_mode,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))] #[cfg(any(
feature = "consul-discovery",
feature = "consul-service-discovery",
feature = "kubernetes-discovery"
))]
rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
consul_discovery, consul_discovery,
#[cfg(feature = "consul-service-discovery")]
consul_service_discovery,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(), kubernetes_discovery: config.kubernetes_discovery.clone(),
metrics, metrics,
@ -555,6 +582,33 @@ impl System {
} }
} }
#[cfg(feature = "consul-service-discovery")]
async fn advertise_to_consul(self: Arc<Self>) {
let c = match &self.consul_service_discovery {
Some(c) => c,
_ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
return;
}
};
if let Err(e) = c
.publish_consul_service(
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
{
error!("Error while publishing Consul service: {}", e);
}
}
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
async fn advertise_to_kubernetes(self: Arc<Self>) { async fn advertise_to_kubernetes(self: Arc<Self>) {
let k = match &self.kubernetes_discovery { let k = match &self.kubernetes_discovery {
@ -744,7 +798,7 @@ impl System {
ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr))) ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
} }
// Fetch peer list from Consul // Fetch peer list from Consul Nodes
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery { if let Some(c) = &self.consul_discovery {
match c.get_consul_nodes().await { match c.get_consul_nodes().await {
@ -757,6 +811,19 @@ impl System {
} }
} }
// Fetch peer list from Consul Services
#[cfg(feature = "consul-service-discovery")]
if let Some(c) = &self.consul_service_discovery {
match c.get_consul_services().await {
Ok(node_list) => {
ping_list.extend(node_list);
}
Err(e) => {
warn!("Could not retrieve service list from Consul: {}", e);
}
}
}
// Fetch peer list from Kubernetes // Fetch peer list from Kubernetes
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
if let Some(k) = &self.kubernetes_discovery { if let Some(k) = &self.kubernetes_discovery {
@ -796,6 +863,9 @@ impl System {
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
tokio::spawn(self.clone().advertise_to_consul()); tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "consul-service-discovery")]
tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
tokio::spawn(self.clone().advertise_to_kubernetes()); tokio::spawn(self.clone().advertise_to_kubernetes());

View file

@ -56,6 +56,9 @@ pub struct Config {
/// Configuration for automatic node discovery through Consul /// Configuration for automatic node discovery through Consul
#[serde(default)] #[serde(default)]
pub consul_discovery: Option<ConsulDiscoveryConfig>, pub consul_discovery: Option<ConsulDiscoveryConfig>,
/// Configuration for automatic node discovery through Consul
#[serde(default)]
pub consul_service_discovery: Option<ConsulServiceConfig>,
/// Configuration for automatic node discovery through Kubernetes /// Configuration for automatic node discovery through Kubernetes
#[serde(default)] #[serde(default)]
pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>, pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
@ -135,19 +138,8 @@ pub struct AdminConfig {
pub trace_sink: Option<String>, pub trace_sink: Option<String>,
} }
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConsulDiscoveryAPI {
#[default]
Catalog,
Agent,
}
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig { pub struct ConsulDiscoveryConfig {
/// The consul api to use when registering: either `catalog` (the default) or `agent`
#[serde(default)]
pub api: ConsulDiscoveryAPI,
/// Consul http or https address to connect to to discover more peers /// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String, pub consul_http_addr: String,
/// Consul service name to use /// Consul service name to use
@ -158,17 +150,30 @@ pub struct ConsulDiscoveryConfig {
pub client_cert: Option<String>, pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul /// Client TLS key to use when connecting to Consul
pub client_key: Option<String>, pub client_key: Option<String>,
/// /// Token to use for connecting to consul
pub token: Option<String>,
/// Skip TLS hostname verification /// Skip TLS hostname verification
#[serde(default)] #[serde(default)]
pub tls_skip_verify: bool, pub tls_skip_verify: bool,
/// Additional tags to add to the service }
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulServiceConfig {
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Token to use for connecting to consul
pub consul_http_token: Option<String>,
/// Consul service name to use
pub service_name: String,
/// CA TLS certificate to use when connecting to Consul
pub ca_cert: Option<String>,
// Additional tags to add to the service
#[serde(default)] #[serde(default)]
pub tags: Vec<String>, pub tags: Vec<String>,
/// Additional service metadata to add // Additional service metadata to add
#[serde(default)] #[serde(default)]
pub meta: Option<std::collections::HashMap<String, String>>, pub meta: Option<std::collections::HashMap<String, String>>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
} }
#[derive(Deserialize, Debug, Clone)] #[derive(Deserialize, Debug, Clone)]