Load TLS certificates only once

This commit is contained in:
Alex 2022-10-18 19:11:16 +02:00
parent 002b9fc50c
commit 5d8d393054
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
2 changed files with 135 additions and 134 deletions

View file

@ -1,9 +1,8 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File;
use std::io::Read;
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use tokio::fs::File;
use tokio::io::AsyncReadExt;
use err_derive::Error; use err_derive::Error;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -11,59 +10,6 @@ use netapp::NodeID;
use garage_util::config::ConsulDiscoveryConfig; use garage_util::config::ConsulDiscoveryConfig;
async fn make_consul_client(
config: &ConsulDiscoveryConfig,
) -> Result<reqwest::Client, ConsulError> {
match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)
.await?
.read_to_end(&mut client_cert_buf)
.await?;
let mut client_key_buf = vec![];
File::open(client_key)
.await?
.read_to_end(&mut client_key_buf)
.await?;
let identity = reqwest::Identity::from_pem(
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
if config.tls_skip_verify {
Ok(reqwest::Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?)
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)
.await?
.read_to_end(&mut ca_cert_buf)
.await?;
Ok(reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?)
} else {
Ok(reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?)
}
}
(None, None) => Ok(reqwest::Client::new()),
_ => Err(ConsulError::InvalidTLSConfig),
}
}
// ---- READING FROM CONSUL CATALOG ----
#[derive(Deserialize, Clone, Debug)] #[derive(Deserialize, Clone, Debug)]
struct ConsulQueryEntry { struct ConsulQueryEntry {
#[serde(rename = "Address")] #[serde(rename = "Address")]
@ -74,42 +20,6 @@ struct ConsulQueryEntry {
node_meta: HashMap<String, String>, node_meta: HashMap<String, String>,
} }
pub async fn get_consul_nodes(
consul_config: &ConsulDiscoveryConfig,
) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
let url = format!(
"http://{}/v1/catalog/service/{}",
consul_config.consul_host, consul_config.service_name
);
let client = make_consul_client(consul_config).await?;
let http = client.get(&url).send().await?;
let entries: Vec<ConsulQueryEntry> = http.json().await?;
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.node_meta
.get("pubkey")
.and_then(|k| hex::decode(&k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
"Could not process node spec from Consul: {:?} (invalid IP or public key)",
ent
);
}
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
// ---- PUBLISHING TO CONSUL CATALOG ----
#[derive(Serialize, Clone, Debug)] #[derive(Serialize, Clone, Debug)]
struct ConsulPublishEntry { struct ConsulPublishEntry {
#[serde(rename = "Node")] #[serde(rename = "Node")]
@ -136,8 +46,93 @@ struct ConsulPublishService {
port: u16, port: u16,
} }
// ----
pub struct ConsulDiscovery {
config: ConsulDiscoveryConfig,
client: reqwest::Client,
}
impl ConsulDiscovery {
pub fn new(config: ConsulDiscoveryConfig) -> Result<Self, ConsulError> {
let client = match (&config.client_cert, &config.client_key) {
(Some(client_cert), Some(client_key)) => {
let mut client_cert_buf = vec![];
File::open(client_cert)?.read_to_end(&mut client_cert_buf)?;
let mut client_key_buf = vec![];
File::open(client_key)?.read_to_end(&mut client_key_buf)?;
let identity = reqwest::Identity::from_pem(
&[&client_cert_buf[..], &client_key_buf[..]].concat()[..],
)?;
if config.tls_skip_verify {
reqwest::Client::builder()
.use_rustls_tls()
.danger_accept_invalid_certs(true)
.identity(identity)
.build()?
} else if let Some(ca_cert) = &config.ca_cert {
let mut ca_cert_buf = vec![];
File::open(ca_cert)?.read_to_end(&mut ca_cert_buf)?;
reqwest::Client::builder()
.use_rustls_tls()
.add_root_certificate(reqwest::Certificate::from_pem(&ca_cert_buf[..])?)
.identity(identity)
.build()?
} else {
reqwest::Client::builder()
.use_rustls_tls()
.identity(identity)
.build()?
}
}
(None, None) => reqwest::Client::new(),
_ => return Err(ConsulError::InvalidTLSConfig),
};
Ok(Self { client, config })
}
// ---- READING FROM CONSUL CATALOG ----
pub async fn get_consul_nodes(&self) -> Result<Vec<(NodeID, SocketAddr)>, ConsulError> {
let url = format!(
"http://{}/v1/catalog/service/{}",
self.config.consul_host, self.config.service_name
);
let http = self.client.get(&url).send().await?;
let entries: Vec<ConsulQueryEntry> = http.json().await?;
let mut ret = vec![];
for ent in entries {
let ip = ent.address.parse::<IpAddr>().ok();
let pubkey = ent
.node_meta
.get("pubkey")
.and_then(|k| hex::decode(&k).ok())
.and_then(|k| NodeID::from_slice(&k[..]));
if let (Some(ip), Some(pubkey)) = (ip, pubkey) {
ret.push((pubkey, SocketAddr::new(ip, ent.service_port)));
} else {
warn!(
"Could not process node spec from Consul: {:?} (invalid IP or public key)",
ent
);
}
}
debug!("Got nodes from Consul: {:?}", ret);
Ok(ret)
}
// ---- PUBLISHING TO CONSUL CATALOG ----
pub async fn publish_consul_service( pub async fn publish_consul_service(
consul_config: &ConsulDiscoveryConfig, &self,
node_id: NodeID, node_id: NodeID,
hostname: &str, hostname: &str,
rpc_public_addr: SocketAddr, rpc_public_addr: SocketAddr,
@ -156,23 +151,23 @@ pub async fn publish_consul_service(
.collect(), .collect(),
service: ConsulPublishService { service: ConsulPublishService {
service_id: node.clone(), service_id: node.clone(),
service_name: consul_config.service_name.clone(), service_name: self.config.service_name.clone(),
tags: vec!["advertised-by-garage".into(), hostname.into()], tags: vec!["advertised-by-garage".into(), hostname.into()],
address: rpc_public_addr.ip(), address: rpc_public_addr.ip(),
port: rpc_public_addr.port(), port: rpc_public_addr.port(),
}, },
}; };
let url = format!("http://{}/v1/catalog/register", consul_config.consul_host); let url = format!("http://{}/v1/catalog/register", self.config.consul_host);
let client = make_consul_client(consul_config).await?; let http = self.client.put(&url).json(&advertisement).send().await?;
let http = client.put(&url).json(&advertisement).send().await?;
http.error_for_status()?; http.error_for_status()?;
Ok(()) Ok(())
} }
}
/// Regroup all Garage errors /// Regroup all Consul discovery errors
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum ConsulError { pub enum ConsulError {
#[error(display = "IO error: {}", _0)] #[error(display = "IO error: {}", _0)]

View file

@ -23,8 +23,6 @@ use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::config::Config; use garage_util::config::Config;
#[cfg(feature = "consul-discovery")]
use garage_util::config::ConsulDiscoveryConfig;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig; use garage_util::config::KubernetesDiscoveryConfig;
use garage_util::data::*; use garage_util::data::*;
@ -33,7 +31,7 @@ use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
use crate::consul::{get_consul_nodes, publish_consul_service}; use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::*; use crate::layout::*;
@ -100,7 +98,7 @@ pub struct System {
bootstrap_peers: Vec<String>, bootstrap_peers: Vec<String>,
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
consul_discovery: Option<ConsulDiscoveryConfig>, consul_discovery: Option<ConsulDiscovery>,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>, kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
@ -302,6 +300,15 @@ impl System {
warn!("Kubernetes discovery is not enabled in this build."); warn!("Kubernetes discovery is not enabled in this build.");
} }
#[cfg(feature = "consul-discovery")]
let consul_discovery = match &config.consul_discovery {
Some(cfg) => Some(
ConsulDiscovery::new(cfg.clone())
.ok_or_message("Invalid Consul discovery configuration")?,
),
None => None,
};
let sys = Arc::new(System { let sys = Arc::new(System {
id: netapp.id.into(), id: netapp.id.into(),
persist_cluster_layout, persist_cluster_layout,
@ -324,7 +331,7 @@ impl System {
rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
consul_discovery: config.consul_discovery.clone(), consul_discovery,
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: config.kubernetes_discovery.clone(), kubernetes_discovery: config.kubernetes_discovery.clone(),
@ -440,8 +447,7 @@ impl System {
} }
}; };
publish_consul_service( c.publish_consul_service(
c,
self.netapp.id, self.netapp.id,
&self.local_status.load_full().hostname, &self.local_status.load_full().hostname,
rpc_public_addr, rpc_public_addr,
@ -638,7 +644,7 @@ impl System {
// Fetch peer list from Consul // Fetch peer list from Consul
#[cfg(feature = "consul-discovery")] #[cfg(feature = "consul-discovery")]
if let Some(c) = &self.consul_discovery { if let Some(c) = &self.consul_discovery {
match get_consul_nodes(c).await { match c.get_consul_nodes().await {
Ok(node_list) => { Ok(node_list) => {
ping_list.extend(node_list); ping_list.extend(node_list);
} }