feature: Register consul services with agent API #567

Merged
lx merged 10 commits from unrob/garage:roberto/consul-agent-registration into main 2023-06-02 14:35:00 +00:00
8 changed files with 194 additions and 340 deletions
Showing only changes of commit fd7dbea5b8 - Show all commits

View file

@ -35,24 +35,19 @@ bootstrap_peers = [
[consul_discovery]
mode = "node"
consul_http_addr = "http://127.0.0.1:8500"
service_name = "garage-daemon"
ca_cert = "/etc/consul/consul-ca.crt"
client_cert = "/etc/consul/consul-client.crt"
client_key = "/etc/consul/consul-key.crt"
# for `service` mode, unset client_cert and client_key, and optionally enable `consul_http_token`
# consul_http_token = "abcdef-01234-56789"
tls_skip_verify = false
[consul_service_discovery]
consul_http_addr = "https://127.0.0.1:8501"
consul_http_token = "abcdef-01234-56789"
service_name = "garage"
ca_cert = "/etc/consul/consul-ca.crt"
tls_skip_verify = false
# tags to add to the published service
tags = [ "dns-enabled" ]
# additional service meta to send along registration
meta = { dns-acl = "allow trusted" }
[kubernetes_discovery]
namespace = "garage"
service_name = "garage-daemon"
@ -323,6 +318,12 @@ Garage supports discovering other nodes of the cluster using Consul. For this
to work correctly, nodes need to know their IP address by which they can be
reached by other nodes of the cluster, which should be set in `rpc_public_addr`.
### `mode`
Two modes of service discovery are supported: `node` and `service`. `node`, the default will register a service using
the `/v1/catalog` endpoints and mTLS (if `client_cert` and `client_key` are provided). `service` mode uses the
`v1/agent` endpoints instead, where an optional `consul_http_token` may be provided.
### `consul_http_addr` and `service_name`
The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server.
@ -334,7 +335,8 @@ RPC ports are announced.
### `client_cert`, `client_key`
TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so.
`node` mode only. TLS client certificate and client key to use when communicating with Consul over TLS.
Both are mandatory when doing so.
### `ca_cert`
@ -345,6 +347,29 @@ TLS CA certificate to use when communicating with Consul over TLS.
Skip server hostname verification in TLS handshake.
`ca_cert` is ignored when this is set.
### `consul_http_token`
`service` mode only. Uses the provided token for communication with Consul. The policy assigned to this token
should at least have these rules:
```hcl
// the `service_name` specified above
service "garage" {
policy = "write"
}
service_prefix "" {
policy = "read"
}
node_prefix "" {
policy = "read"
}
```
### `tags` and `meta`
Additional list of tags and map of service meta to add during service registration.
## The `[kubernetes_discovery]` section

View file

@ -88,7 +88,6 @@ sqlite = [ "garage_model/sqlite" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]
consul-service-discovery = [ "garage_rpc/consul-service-discovery" ]
# Automatic registration and discovery via Kubernetes API
kubernetes-discovery = [ "garage_rpc/kubernetes-discovery" ]
# Prometheus exporter (/metrics endpoint).

View file

@ -95,8 +95,6 @@ async fn main() {
"sqlite",
#[cfg(feature = "consul-discovery")]
"consul-discovery",
#[cfg(feature = "consul-service-discovery")]
"consul-service-discovery",
#[cfg(feature = "kubernetes-discovery")]
"kubernetes-discovery",
#[cfg(feature = "metrics")]

View file

@ -9,6 +9,9 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::config::ConsulDiscoveryConfig;
use garage_util::config::ConsulDiscoveryMode;
const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
@ -18,6 +21,8 @@ struct ConsulQueryEntry {
service_port: u16,
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "ServiceMeta")]
service_meta: HashMap<String, String>,
}
#[derive(Serialize, Clone, Debug)]
@ -29,14 +34,30 @@ struct ConsulPublishEntry {
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "Service")]
service: ConsulPublishService,
service: ConsulPublishCatalogService,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishCatalogService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Meta")]
service_meta: HashMap<String, String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
#[serde(rename = "Name")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
@ -44,10 +65,11 @@ struct ConsulPublishService {
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
}
// ----
pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig,
client: reqwest::Client,
@ -55,7 +77,20 @@ pub struct ConsulDiscovery {
impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
let client = match (&config.client_cert, &config.client_key) {
let mut builder: reqwest::ClientBuilder = reqwest::Client::builder();
builder = builder.danger_accept_invalid_certs(config.tls_skip_verify);
let client: reqwest::Client = match &config.mode {
ConsulDiscoveryMode::Node => {
if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
builder = builder.use_rustls_tls();
builder = builder
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
}
match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
@ -67,30 +102,35 @@ impl ConsulDiscovery {
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
if config.tls_skip_verify {
reqwest::Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?
} else if let Some(ca_cert) = &config.ca_cert {
builder = builder.use_rustls_tls();
builder = builder.identity(identity);
}
(None, None) => {}
_ => return Err(ConsulError::InvalidTLSConfig),
}
builder.build()?
}
ConsulDiscoveryMode::Service => {
if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
builder = builder
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
builder = builder.use_rustls_tls();
}
reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?
} else {
reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?
if let Some(token) = &config.consul_http_token {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"x-consul-token",
reqwest::header::HeaderValue::from_str(&token)?,
);
builder = builder.default_headers(headers);
}
builder.build()?
}
(None, None) => reqwest::Client::new(),
_ => return Err(ConsulError::InvalidTLSConfig),
};
Ok(Self { client, config })
@ -110,9 +150,12 @@ impl ConsulDiscovery {
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.node_meta
.get("pubkey")
let pubkey = match &self.config.mode {
ConsulDiscoveryMode::Node => ent.node_meta.get("pubkey"),
ConsulDiscoveryMode::Service => {
ent.service_meta.get(&format!("{}-pubkey", META_PREFIX))
}
}
.and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
@ -138,29 +181,63 @@ impl ConsulDiscovery {
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
let tags = [
vec!["advertised-by-garage".into(), hostname.into()],
self.config.tags.clone(),
]
.concat();
let advertisement = ConsulPublishEntry {
let meta_prefix: String = match &self.config.mode {
ConsulDiscoveryMode::Node => "".to_string(),
ConsulDiscoveryMode::Service => format!("{}-", META_PREFIX),
};
let mut meta = HashMap::from([
(format!("{}pubkey", meta_prefix), hex::encode(node_id)),
(format!("{}hostname", meta_prefix), hostname.to_string()),
]);
if let Some(global_meta) = &self.config.meta {
for (key, value) in global_meta.into_iter() {
meta.insert(key.clone(), value.clone());
}
}
let url = format!(
"{}/v1/{}",
self.config.consul_http_addr,
(match &self.config.mode {
ConsulDiscoveryMode::Node => "catalog/register",
ConsulDiscoveryMode::Service => "agent/service/register?replace-existing-checks",
})
);
let req = self.client.put(&url);
let http = (match &self.config.mode {
ConsulDiscoveryMode::Node => req.json(&ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
node_meta: [
("pubkey".to_string(), hex::encode(node_id)),
("hostname".to_string(), hostname.to_string()),
]
.iter()
.cloned()
.collect(),
service: ConsulPublishService {
node_meta: meta.clone(),
service: ConsulPublishCatalogService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags: vec!["advertised-by-garage".into(), hostname.into()],
tags,
service_meta: meta.clone(),
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
};
let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
let http = self.client.put(&url).json(&advertisement).send().await?;
}),
ConsulDiscoveryMode::Service => req.json(&ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags,
meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
}),
})
.send()
.await?;
http.error_for_status()?;
Ok(())
@ -176,4 +253,6 @@ pub enum ConsulError {
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig,
#[error(display = "Token error: {}", _0)]
Token(#[error(source)] reqwest::header::InvalidHeaderValue),
}

View file

@ -1,174 +0,0 @@
use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::net::{IpAddr, SocketAddr};
use err_derive::Error;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::config::ConsulServiceConfig;
const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "ServicePort")]
service_port: u16,
#[serde(rename = "ServiceMeta")]
service_meta: HashMap<String, String>,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Name")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
}
// ----
pub struct ConsulServiceDiscovery {
config: ConsulServiceConfig,
client: reqwest::Client,
}
impl ConsulServiceDiscovery {
pub fn new(config: ConsulServiceConfig) -> Result<Self, ConsulError> {
let mut builder: reqwest::ClientBuilder = match &config.ca_cert {
Some(client_ca) => {
let mut ca_cert_buf = vec![];
File::open(client_ca)?.read_to_end(&mut ca_cert_buf)?;
let req: reqwest::ClientBuilder = reqwest::Client::builder()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.use_rustls_tls();
if config.tls_skip_verify {
req.danger_accept_invalid_certs(true)
} else {
req
}
}
None => reqwest::Client::builder(),
};
if let Some(token) = &config.consul_http_token {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"x-consul-token",
reqwest::header::HeaderValue::from_str(&token)?,
);
builder = builder.default_headers(headers);
}
let client = builder.build()?;
Ok(Self { client, config })
}
// ---- READING FROM CONSUL CATALOG ----
pub async fn get_consul_services(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
let url = format!(
"{}/v1/catalog/service/{}",
self.config.consul_http_addr, self.config.service_name
);
let req = self.client.get(&url);
let http = req.send().await?;
let entries: Vec<ConsulQueryEntry> = http.json().await?;
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.service_meta
.get(&format!("{}-pubkey", META_PREFIX))
.and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
"Could not process node spec from Consul: {:?} (invalid IP or public key)",
ent
);
}
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
// ---- PUBLISHING TO CONSUL CATALOG ----
pub async fn publish_consul_service(
&self,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
let tags = [
vec!["advertised-by-garage".into(), hostname.into()],
self.config.tags.clone(),
]
.concat();
let mut meta = HashMap::from([
(format!("{}-pubkey", META_PREFIX), hex::encode(node_id)),
(format!("{}-hostname", META_PREFIX), hostname.to_string()),
]);
if let Some(global_meta) = &self.config.meta {
for (key, value) in global_meta.into_iter() {
meta.insert(key.clone(), value.clone());
}
}
let advertisement: ConsulPublishService = ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags,
meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
};
let url = format!(
"{}/v1/agent/service/register?replace-existing-checks",
self.config.consul_http_addr
);
let req = self.client.put(&url);
let http = req.json(&advertisement).send().await?;
http.error_for_status()?;
Ok(())
}
}
/// Regroup all Consul discovery errors
#[derive(Debug, Error)]
pub enum ConsulError {
#[error(display = "IO error: {}", _0)]
Io(#[error(source)] std::io::Error),
#[error(display = "HTTP error: {}", _0)]
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid HTTP header error: {}", _0)]
HeaderValue(#[error(source)] reqwest::header::InvalidHeaderValue),
}

View file

@ -8,8 +8,6 @@ mod system_metrics;
#[cfg(feature = "consul-discovery")]
mod consul;
#[cfg(feature = "consul-service-discovery")]
mod consul_services;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes;

View file

@ -32,8 +32,6 @@ use garage_util::time::*;
#[cfg(feature = "consul-discovery")]
use crate::consul::ConsulDiscovery;
#[cfg(feature = "consul-service-discovery")]
use crate::consul_services::ConsulServiceDiscovery;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*;
use crate::layout::*;
@ -100,18 +98,12 @@ pub struct System {
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
rpc_listen_addr: SocketAddr,
#[cfg(any(
feature = "consul-discovery",
feature = "consul-service-discovery",
feature = "kubernetes-discovery"
))]
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<String>,
#[cfg(feature = "consul-discovery")]
consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "consul-service-discovery")]
consul_service_discovery: Option<ConsulServiceDiscovery>,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
@ -354,19 +346,6 @@ impl System {
warn!("Consul discovery is not enabled in this build.");
}
#[cfg(feature = "consul-service-discovery")]
let consul_service_discovery = match &config.consul_service_discovery {
Some(cfg) => Some(
ConsulServiceDiscovery::new(cfg.clone())
.ok_or_message("Invalid Consul service discovery configuration")?,
),
None => None,
};
#[cfg(not(feature = "consul-service-discovery"))]
if config.consul_service_discovery.is_some() {
warn!("Consul service discovery is not enabled in this build.");
}
#[cfg(not(feature = "kubernetes-discovery"))]
if config.kubernetes_discovery.is_some() {
warn!("Kubernetes discovery is not enabled in this build.");
@ -390,17 +369,11 @@ impl System {
replication_mode,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
#[cfg(any(
feature = "consul-discovery",
feature = "consul-service-discovery",
feature = "kubernetes-discovery"
))]
#[cfg(any(feature = "consul-discovery", feature = "kubernetes-discovery"))]
rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")]
consul_discovery,
#[cfg(feature = "consul-service-discovery")]
consul_service_discovery,
#[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(),
metrics,
@ -582,33 +555,6 @@ impl System {
}
}
#[cfg(feature = "consul-service-discovery")]
async fn advertise_to_consul(self: Arc<Self>) {
let c = match &self.consul_service_discovery {
Some(c) => c,
_ => return,
};
let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
return;
}
};
if let Err(e) = c
.publish_consul_service(
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
{
error!("Error while publishing Consul service: {}", e);
}
}
#[cfg(feature = "kubernetes-discovery")]
async fn advertise_to_kubernetes(self: Arc<Self>) {
let k = match &self.kubernetes_discovery {
@ -798,7 +744,7 @@ impl System {
ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
}
// Fetch peer list from Consul Nodes
// Fetch peer list from Consul
#[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery {
match c.get_consul_nodes().await {
@ -811,19 +757,6 @@ impl System {
}
}
// Fetch peer list from Consul Services
#[cfg(feature = "consul-service-discovery")]
if let Some(c) = &self.consul_service_discovery {
match c.get_consul_services().await {
Ok(node_list) => {
ping_list.extend(node_list);
}
Err(e) => {
warn!("Could not retrieve service list from Consul: {}", e);
}
}
}
// Fetch peer list from Kubernetes
#[cfg(feature = "kubernetes-discovery")]
if let Some(k) = &self.kubernetes_discovery {
@ -863,9 +796,6 @@ impl System {
#[cfg(feature = "consul-discovery")]
tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "consul-service-discovery")]
tokio::spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
tokio::spawn(self.clone().advertise_to_kubernetes());

View file

@ -56,9 +56,6 @@ pub struct Config {
/// Configuration for automatic node discovery through Consul
#[serde(default)]
pub consul_discovery: Option<ConsulDiscoveryConfig>,
/// Configuration for automatic node discovery through Consul
#[serde(default)]
pub consul_service_discovery: Option<ConsulServiceConfig>,
/// Configuration for automatic node discovery through Kubernetes
#[serde(default)]
pub kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
@ -138,8 +135,23 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
#[derive(Deserialize, Debug, Clone)]
pub enum ConsulDiscoveryMode {
#[serde(rename_all = "lowercase")]
Node,
Service,
}
impl ConsulDiscoveryMode {
fn default() -> Self {
ConsulDiscoveryMode::Node
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig {
/// Mode of consul operation: either `node` (the default) or `service`
#[serde(default = "ConsulDiscoveryMode::default")]
pub mode: ConsulDiscoveryMode,
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Consul service name to use
@ -150,30 +162,17 @@ pub struct ConsulDiscoveryConfig {
pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul
pub client_key: Option<String>,
/// /// Token to use for connecting to consul
pub consul_http_token: Option<String>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
}
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulServiceConfig {
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Token to use for connecting to consul
pub consul_http_token: Option<String>,
/// Consul service name to use
pub service_name: String,
/// CA TLS certificate to use when connecting to Consul
pub ca_cert: Option<String>,
// Additional tags to add to the service
/// Additional tags to add to the service
#[serde(default)]
pub tags: Vec<String>,
// Additional service metadata to add
/// Additional service metadata to add
#[serde(default)]
pub meta: Option<std::collections::HashMap<String, String>>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
}
#[derive(Deserialize, Debug, Clone)]