add support for kubernetes service discovery #262
11 changed files with 2676 additions and 927 deletions
1104
Cargo.lock
generated
1104
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
@ -29,6 +29,10 @@ bootstrap_peers = [
|
||||||
consul_host = "consul.service"
|
consul_host = "consul.service"
|
||||||
consul_service_name = "garage-daemon"
|
consul_service_name = "garage-daemon"
|
||||||
|
|
||||||
|
kubernetes_namespace = "garage"
|
||||||
|
kubernetes_service_name = "garage-daemon"
|
||||||
|
kubernetes_skip_crd = false
|
||||||
|
|
||||||
sled_cache_capacity = 134217728
|
sled_cache_capacity = 134217728
|
||||||
sled_flush_every_ms = 2000
|
sled_flush_every_ms = 2000
|
||||||
|
|
||||||
|
@ -181,6 +185,20 @@ RPC ports are announced.
|
||||||
|
|
||||||
Garage does not yet support talking to Consul over TLS.
|
Garage does not yet support talking to Consul over TLS.
|
||||||
|
|
||||||
|
### `kubernetes_namespace`, `kubernetes_service_name` and `kubernetes_skip_crd`
|
||||||
|
|
||||||
|
Garage supports discovering other nodes of the cluster using kubernetes custom
|
||||||
|
resources. For this to work `kubernetes_namespace` and `kubernetes_service_name`
|
||||||
|
need to be configured.
|
||||||
|
|
||||||
|
`kubernetes_namespace` sets the namespace in which the custom resources are
|
||||||
|
configured. `kubernetes_service_name` is added as a label to these resources to
|
||||||
|
filter them, to allow for multiple deployments in a single namespace.
|
||||||
|
|
||||||
|
`kubernetes_skip_crd` can be set to true to disable the automatic creation and
|
||||||
|
patching of the `garagenodes.deuxfleurs.fr` CRD. You will need to create the CRD
|
||||||
|
manually.
|
||||||
|
|
||||||
### `sled_cache_capacity`
|
### `sled_cache_capacity`
|
||||||
|
|
||||||
This parameter can be used to tune the capacity of the cache used by
|
This parameter can be used to tune the capacity of the cache used by
|
||||||
|
|
|
@ -77,6 +77,7 @@ function refresh_toolchain {
|
||||||
pkgs.rustPlatform.rust.cargo
|
pkgs.rustPlatform.rust.cargo
|
||||||
pkgs.clippy
|
pkgs.clippy
|
||||||
pkgs.rustfmt
|
pkgs.rustfmt
|
||||||
|
pkgs.perl
|
||||||
cargo2nix.packages.x86_64-linux.cargo2nix
|
cargo2nix.packages.x86_64-linux.cargo2nix
|
||||||
] else [])
|
] else [])
|
||||||
++
|
++
|
||||||
|
|
|
@ -30,6 +30,15 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"]
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
|
||||||
|
# newer version requires rust edition 2021
|
||||||
|
kube = { version = "0.62", features = ["runtime", "derive"] }
|
||||||
|
k8s-openapi = { version = "0.13", features = ["v1_22"] }
|
||||||
|
openssl = { version = "0.10", features = ["vendored"] }
|
||||||
audron marked this conversation as resolved
Outdated
|
|||||||
|
schemars = "0.8"
|
||||||
|
|
||||||
|
# newer version requires rust edition 2021
|
||||||
|
pnet = "0.28"
|
||||||
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
||||||
|
|
118
src/rpc/kubernetes.rs
Normal file
118
src/rpc/kubernetes.rs
Normal file
|
@ -0,0 +1,118 @@
|
||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::net::{IpAddr, SocketAddr};
|
||||||
|
|
||||||
|
use kube::{
|
||||||
|
api::{ListParams, Patch, PatchParams, PostParams},
|
||||||
|
Api, Client, CustomResource, CustomResourceExt,
|
||||||
|
};
|
||||||
|
|
||||||
|
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
|
||||||
|
use schemars::JsonSchema;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use netapp::NodeID;
|
||||||
|
|
||||||
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
static K8S_GROUP: &str = "deuxfleurs.fr";
|
||||||
|
|
||||||
|
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
|
||||||
|
#[kube(
|
||||||
|
group = "deuxfleurs.fr",
|
||||||
|
version = "v1",
|
||||||
|
kind = "GarageNode",
|
||||||
|
namespaced
|
||||||
|
)]
|
||||||
|
pub struct Node {
|
||||||
|
hostname: String,
|
||||||
|
address: IpAddr,
|
||||||
|
port: u16,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn create_kubernetes_crd() -> Result<(), Error> {
|
||||||
|
let client = Client::try_default().await?;
|
||||||
|
let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
|
||||||
|
|
||||||
|
let params = PatchParams::apply(&format!("garage.{}", K8S_GROUP));
|
||||||
|
let crd = GarageNode::crd();
|
||||||
|
let patch = Patch::Apply(crd);
|
||||||
|
crds.patch(&format!("garagenodes.{}", K8S_GROUP), ¶ms, &patch)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_kubernetes_nodes(
|
||||||
|
kubernetes_service_name: &str,
|
||||||
|
kubernetes_namespace: &str,
|
||||||
|
) -> Result<Vec<(NodeID, SocketAddr)>, Error> {
|
||||||
|
let client = Client::try_default().await?;
|
||||||
|
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
|
||||||
|
|
||||||
|
let lp = ListParams::default().labels(&format!(
|
||||||
|
"garage.{}/service={}",
|
||||||
|
K8S_GROUP, kubernetes_service_name
|
||||||
|
));
|
||||||
|
|
||||||
|
let nodes = nodes.list(&lp).await?;
|
||||||
|
let mut ret = Vec::with_capacity(nodes.items.len());
|
||||||
|
|
||||||
|
for node in nodes {
|
||||||
|
println!("Found Pod: {:?}", node.metadata.name);
|
||||||
|
|
||||||
|
let pubkey = &node
|
||||||
|
.metadata
|
||||||
|
.name
|
||||||
|
.map(|k| hex::decode(&k).ok())
|
||||||
|
.flatten()
|
||||||
|
.map(|k| NodeID::from_slice(&k[..]))
|
||||||
|
.flatten();
|
||||||
|
|
||||||
|
if let Some(pubkey) = pubkey {
|
||||||
|
ret.push((*pubkey, SocketAddr::new(node.spec.address, node.spec.port)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn publish_kubernetes_node(
|
||||||
|
kubernetes_service_name: &str,
|
||||||
|
kubernetes_namespace: &str,
|
||||||
|
node_id: NodeID,
|
||||||
|
hostname: &str,
|
||||||
|
rpc_public_addr: SocketAddr,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let node_pubkey = hex::encode(node_id);
|
||||||
|
|
||||||
|
let mut node = GarageNode::new(
|
||||||
|
&node_pubkey,
|
||||||
|
Node {
|
||||||
|
hostname: hostname.to_string(),
|
||||||
|
address: rpc_public_addr.ip(),
|
||||||
|
port: rpc_public_addr.port(),
|
||||||
|
},
|
||||||
|
);
|
||||||
|
|
||||||
|
let labels = node.metadata.labels.insert(BTreeMap::new());
|
||||||
|
labels.insert(
|
||||||
|
format!("garage.{}/service", K8S_GROUP),
|
||||||
|
kubernetes_service_name.to_string(),
|
||||||
|
);
|
||||||
|
|
||||||
|
debug!("Node object to be applied: {:#?}", node);
|
||||||
|
|
||||||
|
let client = Client::try_default().await?;
|
||||||
|
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
|
||||||
|
|
||||||
|
if let Ok(old_node) = nodes.get(&node_pubkey).await {
|
||||||
|
node.metadata.resource_version = old_node.metadata.resource_version;
|
||||||
|
nodes
|
||||||
|
.replace(&node_pubkey, &PostParams::default(), &node)
|
||||||
|
.await?;
|
||||||
|
} else {
|
||||||
|
nodes.create(&PostParams::default(), &node).await?;
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
|
@ -4,6 +4,7 @@
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
mod consul;
|
mod consul;
|
||||||
|
mod kubernetes;
|
||||||
|
|
||||||
pub mod layout;
|
pub mod layout;
|
||||||
pub mod ring;
|
pub mod ring;
|
||||||
|
|
|
@ -1,7 +1,7 @@
|
||||||
//! Module containing structs related to membership management
|
//! Module containing structs related to membership management
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::io::{Read, Write};
|
use std::io::{Read, Write};
|
||||||
use std::net::SocketAddr;
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
@ -29,6 +29,7 @@ use garage_util::persister::Persister;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
use crate::consul::*;
|
use crate::consul::*;
|
||||||
|
use crate::kubernetes::*;
|
||||||
use crate::layout::*;
|
use crate::layout::*;
|
||||||
use crate::ring::*;
|
use crate::ring::*;
|
||||||
use crate::rpc_helper::*;
|
use crate::rpc_helper::*;
|
||||||
|
@ -88,6 +89,11 @@ pub struct System {
|
||||||
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
|
bootstrap_peers: Vec<(NodeID, SocketAddr)>,
|
||||||
consul_host: Option<String>,
|
consul_host: Option<String>,
|
||||||
consul_service_name: Option<String>,
|
consul_service_name: Option<String>,
|
||||||
|
|
||||||
|
kubernetes_service_name: Option<String>,
|
||||||
|
kubernetes_namespace: Option<String>,
|
||||||
|
kubernetes_skip_crd: bool,
|
||||||
|
|
||||||
replication_factor: usize,
|
replication_factor: usize,
|
||||||
|
|
||||||
/// The ring
|
/// The ring
|
||||||
|
@ -247,6 +253,10 @@ impl System {
|
||||||
bootstrap_peers: config.bootstrap_peers.clone(),
|
bootstrap_peers: config.bootstrap_peers.clone(),
|
||||||
consul_host: config.consul_host.clone(),
|
consul_host: config.consul_host.clone(),
|
||||||
consul_service_name: config.consul_service_name.clone(),
|
consul_service_name: config.consul_service_name.clone(),
|
||||||
|
kubernetes_service_name: config.kubernetes_service_name.clone(),
|
||||||
|
kubernetes_namespace: config.kubernetes_namespace.clone(),
|
||||||
|
kubernetes_skip_crd: config.kubernetes_skip_crd,
|
||||||
|
|
||||||
ring,
|
ring,
|
||||||
update_ring: Mutex::new(update_ring),
|
update_ring: Mutex::new(update_ring),
|
||||||
background,
|
background,
|
||||||
|
@ -295,6 +305,44 @@ impl System {
|
||||||
.err_context("Error while publishing Consul service")
|
.err_context("Error while publishing Consul service")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn get_default_ip() -> IpAddr {
|
||||||
|
pnet::datalink::interfaces()
|
||||||
|
.iter()
|
||||||
|
.find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
|
||||||
|
.unwrap()
|
||||||
|
.ips
|
||||||
|
.first()
|
||||||
|
.unwrap()
|
||||||
|
.ip()
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
|
||||||
|
let (kubernetes_service_name, kubernetes_namespace) =
|
||||||
|
match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
|
||||||
|
(Some(ch), Some(csn)) => (ch, csn),
|
||||||
|
_ => return Ok(()),
|
||||||
|
};
|
||||||
|
|
||||||
|
let rpc_public_addr =
|
||||||
|
match self.rpc_public_addr {
|
||||||
|
Some(addr) => addr,
|
||||||
|
None => {
|
||||||
audron marked this conversation as resolved
Outdated
lx
commented
This should be put in a separate utility function to be also used in the function that advertises to Consul, it would be really helpful! This should be put in a separate utility function to be also used in the function that advertises to Consul, it would be really helpful!
|
|||||||
|
warn!("No rpc_public_addr configured, using first address on first network interface");
|
||||||
|
SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port())
|
||||||
audron marked this conversation as resolved
Outdated
lx
commented
The port isn't always 3901, it should be derived from the The port isn't always 3901, it should be derived from the `.port()` of the RPC bind addr that is set in the configuration file.
|
|||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
publish_kubernetes_node(
|
||||||
|
kubernetes_service_name,
|
||||||
|
kubernetes_namespace,
|
||||||
|
self.netapp.id,
|
||||||
|
&self.local_status.load_full().hostname,
|
||||||
|
rpc_public_addr,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.err_context("Error while publishing node to kubernetes")
|
||||||
|
}
|
||||||
|
|
||||||
/// Save network configuration to disc
|
/// Save network configuration to disc
|
||||||
async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
|
async fn save_cluster_layout(self: Arc<Self>) -> Result<(), Error> {
|
||||||
let ring: Arc<Ring> = self.ring.borrow().clone();
|
let ring: Arc<Ring> = self.ring.borrow().clone();
|
||||||
|
@ -470,6 +518,11 @@ impl System {
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
|
||||||
|
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
while !*stop_signal.borrow() {
|
while !*stop_signal.borrow() {
|
||||||
let not_configured = !self.ring.borrow().layout.check();
|
let not_configured = !self.ring.borrow().layout.check();
|
||||||
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
|
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
|
||||||
|
@ -503,6 +556,28 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Fetch peer list from Kubernetes
|
||||||
|
if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config {
|
||||||
|
if !self.kubernetes_skip_crd {
|
||||||
|
match create_kubernetes_crd().await {
|
||||||
|
Ok(()) => (),
|
||||||
|
Err(e) => {
|
||||||
|
error!("Failed to create kubernetes custom resource: {}", e)
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await
|
||||||
|
{
|
||||||
|
Ok(node_list) => {
|
||||||
|
ping_list.extend(node_list);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
warn!("Could not retrieve node list from Kubernetes: {}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
for (node_id, node_addr) in ping_list {
|
for (node_id, node_addr) in ping_list {
|
||||||
tokio::spawn(
|
tokio::spawn(
|
||||||
self.netapp
|
self.netapp
|
||||||
|
@ -518,6 +593,8 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
self.background.spawn(self.clone().advertise_to_consul());
|
self.background.spawn(self.clone().advertise_to_consul());
|
||||||
|
self.background
|
||||||
|
.spawn(self.clone().advertise_to_kubernetes());
|
||||||
|
|
||||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
||||||
select! {
|
select! {
|
||||||
|
|
|
@ -38,3 +38,6 @@ netapp = "0.3.0"
|
||||||
|
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
hyper = "0.14"
|
hyper = "0.14"
|
||||||
|
|
||||||
|
kube = { version = "0.62", features = ["runtime", "derive"] }
|
||||||
|
k8s-openapi = { version = "0.13", features = ["v1_22"] }
|
||||||
|
|
|
@ -52,6 +52,13 @@ pub struct Config {
|
||||||
pub consul_host: Option<String>,
|
pub consul_host: Option<String>,
|
||||||
/// Consul service name to use
|
/// Consul service name to use
|
||||||
pub consul_service_name: Option<String>,
|
pub consul_service_name: Option<String>,
|
||||||
|
/// Kubernetes namespace the service discovery resources are be created in
|
||||||
|
pub kubernetes_namespace: Option<String>,
|
||||||
|
/// Service name to filter for in k8s custom resources
|
||||||
|
pub kubernetes_service_name: Option<String>,
|
||||||
|
/// Skip creation of the garagenodes CRD
|
||||||
|
#[serde(default)]
|
||||||
|
pub kubernetes_skip_crd: bool,
|
||||||
|
|
||||||
/// Sled cache size, in bytes
|
/// Sled cache size, in bytes
|
||||||
#[serde(default = "default_sled_cache_capacity")]
|
#[serde(default = "default_sled_cache_capacity")]
|
||||||
|
|
|
@ -23,6 +23,9 @@ pub enum Error {
|
||||||
#[error(display = "Invalid HTTP header value: {}", _0)]
|
#[error(display = "Invalid HTTP header value: {}", _0)]
|
||||||
HttpHeader(#[error(source)] http::header::ToStrError),
|
HttpHeader(#[error(source)] http::header::ToStrError),
|
||||||
|
|
||||||
|
#[error(display = "kubernetes error: {}", _0)]
|
||||||
|
Kubernetes(#[error(source)] kube::Error),
|
||||||
|
|
||||||
#[error(display = "Netapp error: {}", _0)]
|
#[error(display = "Netapp error: {}", _0)]
|
||||||
Netapp(#[error(source)] netapp::error::Error),
|
Netapp(#[error(source)] netapp::error::Error),
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue
could you add the following line please?
Garage tries to be cross-compilation friendly, and by default kube seems to use system openssl, which poses problem. Adding this feature flag makes compilation a bit slower as it requires compiling openssl, but makes cross-compilation possible.
kube also has rustls support, but i couldn't make it work for whatever reason rn.
I added your suggestion.