add support for kubernetes service discovery
All checks were successful
continuous-integration/drone/pr Build is passing
continuous-integration/drone/push Build is passing

This commit adds support to discover garage instances running in
kubernetes.

Once enabled by setting `kubernetes_namespace` and
`kubernetes_service_name` garage will create a Custom Resources
`garagenodes.deuxfleurs.fr` with nodes public key as the resource name.
and IP and Port information as spec in the namespace configured by
`kubernetes_namespace`.

For discovering nodes the resources are filtered with the optionally set
`kubernetes_service_name` which sets a label
`garage.deuxfleurs.fr/service` on the resources.

This allows to separate multiple garage deployments in a single
namespace.

the `kubernetes_skip_crd` variable allows to disable the creation of the
CRD by garage itself. The user must deploy this manually.
This commit is contained in:
Max Audron 2022-03-06 14:50:00 +01:00
parent c00b2c9948
commit 9d44127245
Signed by untrusted user: audron
GPG key ID: DF87081FA2C1945A
11 changed files with 2676 additions and 927 deletions

1104
Cargo.lock generated

File diff suppressed because it is too large Load diff

2260
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -29,6 +29,10 @@ bootstrap_peers = [
consul_host = "consul.service" consul_host = "consul.service"
consul_service_name = "garage-daemon" consul_service_name = "garage-daemon"
kubernetes_namespace = "garage"
kubernetes_service_name = "garage-daemon"
kubernetes_skip_crd = false
sled_cache_capacity = 134217728 sled_cache_capacity = 134217728
sled_flush_every_ms = 2000 sled_flush_every_ms = 2000
@ -181,6 +185,20 @@ RPC ports are announced.
Garage does not yet support talking to Consul over TLS. Garage does not yet support talking to Consul over TLS.
### `kubernetes_namespace`, `kubernetes_service_name` and `kubernetes_skip_crd`
Garage supports discovering other nodes of the cluster using kubernetes custom
resources. For this to work `kubernetes_namespace` and `kubernetes_service_name`
need to be configured.
`kubernetes_namespace` sets the namespace in which the custom resources are
configured. `kubernetes_service_name` is added as a label to these resources to
filter them, to allow for multiple deployments in a single namespace.
`kubernetes_skip_crd` can be set to true to disable the automatic creation and
patching of the `garagenodes.deuxfleurs.fr` CRD. You will need to create the CRD
manually.
### `sled_cache_capacity` ### `sled_cache_capacity`
This parameter can be used to tune the capacity of the cache used by This parameter can be used to tune the capacity of the cache used by

View file

@ -77,6 +77,7 @@ function refresh_toolchain {
pkgs.rustPlatform.rust.cargo pkgs.rustPlatform.rust.cargo
pkgs.clippy pkgs.clippy
pkgs.rustfmt pkgs.rustfmt
pkgs.perl
cargo2nix.packages.x86_64-linux.cargo2nix cargo2nix.packages.x86_64-linux.cargo2nix
] else []) ] else [])
++ ++

View file

@ -30,6 +30,15 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"]
serde_bytes = "0.11" serde_bytes = "0.11"
serde_json = "1.0" serde_json = "1.0"
# newer version requires rust edition 2021
kube = { version = "0.62", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.13", features = ["v1_22"] }
openssl = { version = "0.10", features = ["vendored"] }
schemars = "0.8"
# newer version requires rust edition 2021
pnet = "0.28"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }

118
src/rpc/kubernetes.rs Normal file
View file

@ -0,0 +1,118 @@
use std::collections::BTreeMap;
use std::net::{IpAddr, SocketAddr};
use kube::{
api::{ListParams, Patch, PatchParams, PostParams},
Api, Client, CustomResource, CustomResourceExt,
};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use netapp::NodeID;
use garage_util::error::Error;
static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "deuxfleurs.fr",
version = "v1",
kind = "GarageNode",
namespaced
)]
pub struct Node {
hostname: String,
address: IpAddr,
port: u16,
}
pub async fn create_kubernetes_crd() -> Result<(), Error> {
let client = Client::try_default().await?;
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
let params = PatchParams::apply(&format!("garage.{}", K8S_GROUP));
let crd = GarageNode::crd();
let patch = Patch::Apply(crd);
crds.patch(&format!("garagenodes.{}", K8S_GROUP), &params, &patch)
.await?;
Ok(())
}
pub async fn get_kubernetes_nodes(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
let lp = ListParams::default().labels(&format!(
"garage.{}/service={}",
K8S_GROUP, kubernetes_service_name
));
let nodes = nodes.list(&lp).await?;
let mut ret = Vec::with_capacity(nodes.items.len());
for node in nodes {
println!("Found Pod: {:?}", node.metadata.name);
let pubkey = &node
.metadata
.name
.map(|k| hex::decode(&k).ok())
.flatten()
.map(|k| NodeID::from_slice(&k[..]))
.flatten();
if let Some(pubkey) = pubkey {
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
}
}
Ok(ret)
}
pub async fn publish_kubernetes_node(
kubernetes_service_name: &str,
kubernetes_namespace: &str,
node_id: NodeID,
hostname: &str,
rpc_public_addr: SocketAddr,
) -> Result<(), Error> {
let node_pubkey = hex::encode(node_id);
let mut node = GarageNode::new(
&node_pubkey,
Node {
hostname: hostname.to_string(),
address: rpc_public_addr.ip(),
port: rpc_public_addr.port(),
},
);
let labels = node.metadata.labels.insert(BTreeMap::new());
labels.insert(
format!("garage.{}/service", K8S_GROUP),
kubernetes_service_name.to_string(),
);
debug!("Node object to be applied: {:#?}", node);
let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
if let Ok(old_node) = nodes.get(&node_pubkey).await {
node.metadata.resource_version = old_node.metadata.resource_version;
nodes
.replace(&node_pubkey, &PostParams::default(), &node)
.await?;
} else {
nodes.create(&PostParams::default(), &node).await?;
};
Ok(())
}

View file

@ -4,6 +4,7 @@
extern crate log; extern crate log;
mod consul; mod consul;
mod kubernetes;
pub mod layout; pub mod layout;
pub mod ring; pub mod ring;

View file

@ -1,7 +1,7 @@
//! Module containing structs related to membership management //! Module containing structs related to membership management
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::SocketAddr; use std::net::{IpAddr, SocketAddr};
use std::path::Path; use std::path::Path;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -29,6 +29,7 @@ use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
use crate::consul::*; use crate::consul::*;
use crate::kubernetes::*;
use crate::layout::*; use crate::layout::*;
use crate::ring::*; use crate::ring::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -88,6 +89,11 @@ pub struct System {
bootstrap_peers: Vec<(NodeID, SocketAddr)>, bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>, consul_host: Option<String>,
consul_service_name: Option<String>, consul_service_name: Option<String>,
kubernetes_service_name: Option<String>,
kubernetes_namespace: Option<String>,
kubernetes_skip_crd: bool,
replication_factor: usize, replication_factor: usize,
/// The ring /// The ring
@ -247,6 +253,10 @@ impl System {
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(), consul_host: config.consul_host.clone(),
consul_service_name: config.consul_service_name.clone(), consul_service_name: config.consul_service_name.clone(),
kubernetes_service_name: config.kubernetes_service_name.clone(),
kubernetes_namespace: config.kubernetes_namespace.clone(),
kubernetes_skip_crd: config.kubernetes_skip_crd,
ring, ring,
update_ring: Mutex::new(update_ring), update_ring: Mutex::new(update_ring),
background, background,
@ -295,6 +305,44 @@ impl System {
.err_context("Error while publishing Consul service") .err_context("Error while publishing Consul service")
} }
fn get_default_ip() -> IpAddr {
pnet::datalink::interfaces()
.iter()
.find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
.unwrap()
.ips
.first()
.unwrap()
.ip()
}
async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
let (kubernetes_service_name, kubernetes_namespace) =
match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
(Some(ch), Some(csn)) => (ch, csn),
_ => return Ok(()),
};
let rpc_public_addr =
match self.rpc_public_addr {
Some(addr) => addr,
None => {
warn!("No rpc_public_addr configured, using first address on first network interface");
SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port())
}
};
publish_kubernetes_node(
kubernetes_service_name,
kubernetes_namespace,
self.netapp.id,
&self.local_status.load_full().hostname,
rpc_public_addr,
)
.await
.err_context("Error while publishing node to kubernetes")
}
/// Save network configuration to disc /// Save network configuration to disc
async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> { async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
let ring: Arc<Ring> = self.ring.borrow().clone(); let ring: Arc<Ring> = self.ring.borrow().clone();
@ -470,6 +518,11 @@ impl System {
_ => None, _ => None,
}; };
let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
};
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = !self.ring.borrow().layout.check(); let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
@ -503,6 +556,28 @@ impl System {
} }
} }
// Fetch peer list from Kubernetes
if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config {
if !self.kubernetes_skip_crd {
match create_kubernetes_crd().await {
Ok(()) => (),
Err(e) => {
error!("Failed to create kubernetes custom resource: {}", e)
}
};
}
match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await
{
Ok(node_list) => {
ping_list.extend(node_list);
}
Err(e) => {
warn!("Could not retrieve node list from Kubernetes: {}", e);
}
}
}
for (node_id, node_addr) in ping_list { for (node_id, node_addr) in ping_list {
tokio::spawn( tokio::spawn(
self.netapp self.netapp
@ -518,6 +593,8 @@ impl System {
} }
self.background.spawn(self.clone().advertise_to_consul()); self.background.spawn(self.clone().advertise_to_consul());
self.background
.spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL); let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! { select! {

View file

@ -38,3 +38,6 @@ netapp = "0.3.0"
http = "0.2" http = "0.2"
hyper = "0.14" hyper = "0.14"
kube = { version = "0.62", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.13", features = ["v1_22"] }

View file

@ -52,6 +52,13 @@ pub struct Config {
pub consul_host: Option<String>, pub consul_host: Option<String>,
/// Consul service name to use /// Consul service name to use
pub consul_service_name: Option<String>, pub consul_service_name: Option<String>,
/// Kubernetes namespace the service discovery resources are be created in
pub kubernetes_namespace: Option<String>,
/// Service name to filter for in k8s custom resources
pub kubernetes_service_name: Option<String>,
/// Skip creation of the garagenodes CRD
#[serde(default)]
pub kubernetes_skip_crd: bool,
/// Sled cache size, in bytes /// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")] #[serde(default = "default_sled_cache_capacity")]

View file

@ -23,6 +23,9 @@ pub enum Error {
#[error(display = "Invalid HTTP header value: {}", _0)] #[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError), HttpHeader(#[error(source)] http::header::ToStrError),
#[error(display = "kubernetes error: {}", _0)]
Kubernetes(#[error(source)] kube::Error),
#[error(display = "Netapp error: {}", _0)] #[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error), Netapp(#[error(source)] netapp::error::Error),