feature: Register consul services with agent API #567

Merged
lx merged 10 commits from unrob/garage:roberto/consul-agent-registration into main 2023-06-02 14:35:00 +00:00
4 changed files with 167 additions and 61 deletions

View file

@ -25,7 +25,7 @@ git clone https://git.deuxfleurs.fr/Deuxfleurs/garage
cd garage
```
*Optionnaly, you can use our nix.conf file to speed up compilations:*
*Optionally, you can use our nix.conf file to speed up compilations:*
```bash
sudo mkdir -p /etc/nix

View file

@ -35,12 +35,18 @@ bootstrap_peers = [
[consul_discovery]
api = "catalog"
consul_http_addr = "http://127.0.0.1:8500"
service_name = "garage-daemon"
ca_cert = "/etc/consul/consul-ca.crt"
client_cert = "/etc/consul/consul-client.crt"
client_key = "/etc/consul/consul-key.crt"
# for `agent` API mode, unset client_cert and client_key, and optionally enable `token`
# token = "abcdef-01234-56789"
tls_skip_verify = false
tags = [ "dns-enabled" ]
meta = { dns-acl = "allow trusted" }
[kubernetes_discovery]
namespace = "garage"
@ -201,7 +207,7 @@ Garage supports the following replication modes:
that should probably never be used.
Note that in modes `2` and `3`,
if at least the same number of zones are available, an arbitrary number of failures in
if at least the same number of zones are available, an arbitrary number of failures in
any given zone is tolerated as copies of data will be spread over several zones.
**Make sure `replication_mode` is the same in the configuration files of all nodes.
@ -245,7 +251,7 @@ Values between `1` (faster compression) and `19` (smaller file) are standard com
levels for zstd. From `20` to `22`, compression levels are referred as "ultra" and must be
used with extra care as it will use lot of memory. A value of `0` will let zstd choose a
default value (currently `3`). Finally, zstd has also compression designed to be faster
than default compression levels, they range from `-1` (smaller file) to `-99` (faster
than default compression levels, they range from `-1` (smaller file) to `-99` (faster
compression).
If you do not specify a `compression_level` entry, Garage will set it to `1` for you. With
@ -316,6 +322,12 @@ reached by other nodes of the cluster, which should be set in `rpc_public_addr`.
The `consul_http_addr` parameter should be set to the full HTTP(S) address of the Consul server.
### `api`
Two APIs for service registration are supported: `catalog` and `agent`. `catalog`, the default, will register a service using
the `/v1/catalog` endpoints, enabling mTLS if `client_cert` and `client_key` are provided. The `agent` API uses the
`v1/agent` endpoints instead, where an optional `token` may be provided.
### `service_name`
`service_name` should be set to the service name under which Garage's
@ -324,6 +336,7 @@ RPC ports are announced.
### `client_cert`, `client_key`
TLS client certificate and client key to use when communicating with Consul over TLS. Both are mandatory when doing so.
Only available when `api = "catalog"`.
### `ca_cert`
@ -334,6 +347,29 @@ TLS CA certificate to use when communicating with Consul over TLS.
Skip server hostname verification in TLS handshake.
`ca_cert` is ignored when this is set.
### `token`
Uses the provided token for communication with Consul. Only available when `api = "agent"`.
The policy assigned to this token should at least have these rules:
```hcl
// the `service_name` specified above
service "garage" {
policy = "write"
}
service_prefix "" {
policy = "read"
}
node_prefix "" {
policy = "read"
}
```
### `tags` and `meta`
Additional list of tags and map of service meta to add during service registration.
## The `[kubernetes_discovery]` section

View file

@ -8,16 +8,26 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::config::ConsulDiscoveryAPI;
use garage_util::config::ConsulDiscoveryConfig;
const META_PREFIX: &str = "fr-deuxfleurs-garagehq";
#[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry {
#[serde(rename = "Address")]
address: String,
#[serde(rename = "ServicePort")]
service_port: u16,
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "ServiceMeta")]
meta: HashMap<String, String>,
}
#[derive(Serialize, Clone, Debug)]
#[serde(untagged)]
enum PublishRequest {
Catalog(ConsulPublishEntry),
Service(ConsulPublishService),
}
#[derive(Serialize, Clone, Debug)]
@ -26,17 +36,31 @@ struct ConsulPublishEntry {
node: String,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "NodeMeta")]
node_meta: HashMap<String, String>,
#[serde(rename = "Service")]
service: ConsulPublishService,
service: ConsulPublishCatalogService,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishCatalogService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
#[serde(rename = "Address")]
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
}
#[derive(Serialize, Clone, Debug)]
struct ConsulPublishService {
#[serde(rename = "ID")]
service_id: String,
#[serde(rename = "Service")]
#[serde(rename = "Name")]
service_name: String,
#[serde(rename = "Tags")]
tags: Vec<String>,
@ -44,10 +68,11 @@ struct ConsulPublishService {
address: IpAddr,
#[serde(rename = "Port")]
port: u16,
#[serde(rename = "Meta")]
meta: HashMap<String, String>,
}
// ----
pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig,
client: reqwest::Client,
@ -55,44 +80,48 @@ pub struct ConsulDiscovery {
impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
let client = match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
let mut builder: reqwest::ClientBuilder = reqwest::Client::builder().use_rustls_tls();
if config.tls_skip_verify {
builder = builder.danger_accept_invalid_certs(true);
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
builder =
builder.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?);
}
let mut client_key_buf = vec![];
File::open(client_key)?.read_to_end(&mut client_key_buf)?;
match &config.api {
ConsulDiscoveryAPI::Catalog => match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
let identity = reqwest::Identity::from_pem(
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
let mut client_key_buf = vec![];
File::open(client_key)?.read_to_end(&mut client_key_buf)?;
if config.tls_skip_verify {
reqwest::Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
let identity = reqwest::Identity::from_pem(
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?
} else {
reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?
builder = builder.identity(identity);
}
(None, None) => {}
_ => return Err(ConsulError::InvalidTLSConfig),
},
ConsulDiscoveryAPI::Agent => {
if let Some(token) = &config.token {
let mut headers = reqwest::header::HeaderMap::new();
headers.insert(
"x-consul-token",
reqwest::header::HeaderValue::from_str(&token)?,
);
builder = builder.default_headers(headers);
}
}
(None, None) => reqwest::Client::new(),
_ => return Err(ConsulError::InvalidTLSConfig),
};
let client: reqwest::Client = builder.build()?;
Ok(Self { client, config })
}
@ -111,8 +140,8 @@ impl ConsulDiscovery {
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.node_meta
.get("pubkey")
.meta
.get(&format!("{}-pubkey", META_PREFIX))
.and_then(|k| hex::decode(k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
@ -138,29 +167,49 @@ impl ConsulDiscovery {
rpc_public_addr: SocketAddr,
) -> Result<(), ConsulError> {
let node = format!("garage:{}", hex::encode(&node_id[..8]));
let tags = [
vec!["advertised-by-garage".into(), hostname.into()],
self.config.tags.clone(),
]
.concat();
let advertisement = ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
node_meta: [
("pubkey".to_string(), hex::encode(node_id)),
("hostname".to_string(), hostname.to_string()),
]
.iter()
.cloned()
.collect(),
service: ConsulPublishService {
let mut meta = self.config.meta.clone().unwrap_or_default();
meta.insert(format!("{}-pubkey", META_PREFIX), hex::encode(node_id));
meta.insert(format!("{}-hostname", META_PREFIX), hostname.to_string());
let url = format!(
"{}/v1/{}",
self.config.consul_http_addr,
(match &self.config.api {
ConsulDiscoveryAPI::Catalog => "catalog/register",
ConsulDiscoveryAPI::Agent => "agent/service/register?replace-existing-checks",
})
);
let req = self.client.put(&url);
let advertisement: PublishRequest = match &self.config.api {
ConsulDiscoveryAPI::Catalog => PublishRequest::Catalog(ConsulPublishEntry {
node: node.clone(),
address: rpc_public_addr.ip(),
service: ConsulPublishCatalogService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags,
meta: meta.clone(),
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
}),
ConsulDiscoveryAPI::Agent => PublishRequest::Service(ConsulPublishService {
service_id: node.clone(),
service_name: self.config.service_name.clone(),
tags: vec!["advertised-by-garage".into(), hostname.into()],
tags,
meta,
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
}),
};
let url = format!("{}/v1/catalog/register", self.config.consul_http_addr);
let http = self.client.put(&url).json(&advertisement).send().await?;
let http = req.json(&advertisement).send().await?;
http.error_for_status()?;
Ok(())
@ -176,4 +225,6 @@ pub enum ConsulError {
Reqwest(#[error(source)] reqwest::Error),
#[error(display = "Invalid Consul TLS configuration")]
InvalidTLSConfig,
#[error(display = "Token error: {}", _0)]
Token(#[error(source)] reqwest::header::InvalidHeaderValue),
}

View file

@ -135,8 +135,19 @@ pub struct AdminConfig {
pub trace_sink: Option<String>,
}
#[derive(Deserialize, Debug, Clone, Default)]
#[serde(rename_all = "lowercase")]
pub enum ConsulDiscoveryAPI {
#[default]
Catalog,
Agent,
}
#[derive(Deserialize, Debug, Clone)]
pub struct ConsulDiscoveryConfig {
/// The consul api to use when registering: either `catalog` (the default) or `agent`
#[serde(default)]
pub api: ConsulDiscoveryAPI,
/// Consul http or https address to connect to to discover more peers
pub consul_http_addr: String,
/// Consul service name to use
@ -147,9 +158,17 @@ pub struct ConsulDiscoveryConfig {
pub client_cert: Option<String>,
/// Client TLS key to use when connecting to Consul
pub client_key: Option<String>,
/// /// Token to use for connecting to consul
pub token: Option<String>,
/// Skip TLS hostname verification
#[serde(default)]
pub tls_skip_verify: bool,
/// Additional tags to add to the service
#[serde(default)]
pub tags: Vec<String>,
/// Additional service metadata to add
#[serde(default)]
pub meta: Option<std::collections::HashMap<String, String>>,
}
#[derive(Deserialize, Debug, Clone)]
@ -344,7 +363,7 @@ mod tests {
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_secret_file = "{}"
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
@ -388,7 +407,7 @@ mod tests {
rpc_bind_addr = "[::]:3901"
rpc_secret= "dummy"
rpc_secret_file = "dummy"
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"