Add feature flag for Kubernetes discovery

This commit is contained in:
Alex 2022-03-16 12:09:50 +01:00 committed by Gitea
parent 509d256c58
commit 9d0ed78887
8 changed files with 489 additions and 592 deletions

160
Cargo.lock generated
View file

@ -162,7 +162,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"regex", "regex",
"ring", "ring",
"time 0.3.7", "time 0.3.9",
"tracing", "tracing",
] ]
@ -258,7 +258,7 @@ dependencies = [
"itoa", "itoa",
"num-integer", "num-integer",
"ryu", "ryu",
"time 0.3.7", "time 0.3.9",
] ]
[[package]] [[package]]
@ -343,9 +343,9 @@ checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]] [[package]]
name = "bytes-utils" name = "bytes-utils"
version = "0.1.1" version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4e314712951c43123e5920a446464929adc667a5eade7f8fb3997776c9df6e54" checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"either", "either",
@ -424,9 +424,9 @@ checksum = "5827cebf4670468b8772dd191856768aedcb1b0278a04f989f7766351917b9dc"
[[package]] [[package]]
name = "cpufeatures" name = "cpufeatures"
version = "0.2.1" version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469" checksum = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -442,9 +442,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-channel" name = "crossbeam-channel"
version = "0.5.2" version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e54ea8bc3fb1ee042f5aace6e3c6e025d3874866da222930f70ce62aceba0bfa" checksum = "5aaa7bd5fb665c6864b5f963dd9097905c54125909c7aa94c9e18507cdbe6c53"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
@ -452,10 +452,11 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-epoch" name = "crossbeam-epoch"
version = "0.9.7" version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c00d6d2ea26e8b151d99093005cb442fb9a37aeaca582a03ec70946f49ab5ed9" checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [ dependencies = [
"autocfg",
"cfg-if", "cfg-if",
"crossbeam-utils", "crossbeam-utils",
"lazy_static", "lazy_static",
@ -465,9 +466,9 @@ dependencies = [
[[package]] [[package]]
name = "crossbeam-utils" name = "crossbeam-utils"
version = "0.8.7" version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b5e5bed1f1c269533fa816a0a5492b3545209a205ca1a54842be180eb63a16a6" checksum = "0bf124c720b7686e3c2663cf54062ab0f68a88af2fb6a030e87e30bf721fcb38"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
"lazy_static", "lazy_static",
@ -596,9 +597,9 @@ checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]] [[package]]
name = "dyn-clone" name = "dyn-clone"
version = "1.0.4" version = "1.0.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee2626afccd7561a06cf1367e2950c4718ea04565e20fb5029b6c7d8ad09abcf" checksum = "21e50f3adc76d6a43f5ed73b698a87d0760ca74617f60f7c3b879003536fdd28"
[[package]] [[package]]
name = "either" name = "either"
@ -1033,7 +1034,7 @@ dependencies = [
"netapp 0.4.1", "netapp 0.4.1",
"openssl", "openssl",
"opentelemetry", "opentelemetry",
"pnet", "pnet_datalink",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 0.15.5", "rmp-serde 0.15.5",
"schemars", "schemars",
@ -1125,8 +1126,6 @@ dependencies = [
"hex", "hex",
"http", "http",
"hyper", "hyper",
"k8s-openapi",
"kube",
"netapp 0.4.1", "netapp 0.4.1",
"opentelemetry", "opentelemetry",
"rand 0.8.5", "rand 0.8.5",
@ -1170,9 +1169,9 @@ dependencies = [
[[package]] [[package]]
name = "gethostname" name = "gethostname"
version = "0.2.2" version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4addc164932852d066774c405dbbdb7914742d2b39e39e1a7ca949c856d054d1" checksum = "c1ebd34e35c46e00bb73e81363248d627782724609fe1b6396f553f68fe3862e"
dependencies = [ dependencies = [
"libc", "libc",
"winapi", "winapi",
@ -1211,12 +1210,6 @@ dependencies = [
"syn", "syn",
] ]
[[package]]
name = "glob"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b919933a397b79c37e33b77bb2aa3dc8eb6e165ad809e58ff75bc7db2e34574"
[[package]] [[package]]
name = "h2" name = "h2"
version = "0.3.12" version = "0.3.12"
@ -1343,9 +1336,9 @@ dependencies = [
[[package]] [[package]]
name = "hyper" name = "hyper"
version = "0.14.17" version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "043f0e083e9901b6cc658a77d1eb86f4fc650bbb977a4337dd63192826aa85dd" checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.1.0",
"futures-channel", "futures-channel",
@ -1661,9 +1654,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.119" version = "0.2.121"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bf2e165bb3457c8e098ea76f3e3bc9db55f87aa90d52d0e6be741470916aaa4" checksum = "efaa7b300f3b5fe8eb6bf21ce3895e1751d9665086af2d64b42f19701015ff4f"
[[package]] [[package]]
name = "libsodium-sys" name = "libsodium-sys"
@ -1694,9 +1687,9 @@ dependencies = [
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.14" version = "0.4.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51b9bbe6c47d51fc3e1a9b945965946b4c44142ab8792c50835a980d362c2710" checksum = "6389c490849ff5bc16be905ae24bc913a9c8892e19b2341dbc175e14c341c2b8"
dependencies = [ dependencies = [
"cfg-if", "cfg-if",
] ]
@ -1753,9 +1746,9 @@ checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.8.1" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7ba42135c6a5917b9db9cd7b293e5409e1c6b041e6f9825e92e55a894c63b6f8" checksum = "52da4364ffb0e4fe33a9841a98a3f3014fb964045ce4f7a45a398243c8d6b0c9"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@ -1866,13 +1859,12 @@ dependencies = [
[[package]] [[package]]
name = "nom" name = "nom"
version = "7.1.0" version = "7.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109" checksum = "a8903e5a29a317527874d0402f867152a3d21c908bb0b933e416c65e301d4c36"
dependencies = [ dependencies = [
"memchr", "memchr",
"minimal-lexical", "minimal-lexical",
"version_check",
] ]
[[package]] [[package]]
@ -1915,9 +1907,9 @@ dependencies = [
[[package]] [[package]]
name = "num_threads" name = "num_threads"
version = "0.1.4" version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c539a50b93a303167eded6e8dff5220cd39447409fb659f4cd24b1f72fe4f133" checksum = "aba1801fb138d8e85e11d0fc70baf4fe1cdfffda7c6cd34a854905df588e5ed0"
dependencies = [ dependencies = [
"libc", "libc",
] ]
@ -1956,9 +1948,9 @@ checksum = "ff011a302c396a5197692431fc1948019154afc178baf7d8e37367442a4601cf"
[[package]] [[package]]
name = "openssl-src" name = "openssl-src"
version = "111.17.0+1.1.1m" version = "111.18.0+1.1.1n"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05d6a336abd10814198f66e2a91ccd7336611f30334119ca8ce300536666fcf4" checksum = "7897a926e1e8d00219127dc020130eca4292e5ca666dd592480d72c3eca2ff6c"
dependencies = [ dependencies = [
"cc", "cc",
] ]
@ -2181,20 +2173,6 @@ version = "0.3.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe" checksum = "58893f751c9b0412871a09abd62ecd2a00298c6c83befa223ef98c52aef40cbe"
[[package]]
name = "pnet"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b6d2a0409666964722368ef5fb74b9f93fac11c18bef3308693c16c6733f103"
dependencies = [
"ipnetwork",
"pnet_base",
"pnet_datalink",
"pnet_packet",
"pnet_sys",
"pnet_transport",
]
[[package]] [[package]]
name = "pnet_base" name = "pnet_base"
version = "0.28.0" version = "0.28.0"
@ -2214,39 +2192,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "pnet_macros"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "30490e0852e58402b8fae0d39897b08a24f493023a4d6cf56b2e30f31ed57548"
dependencies = [
"proc-macro2",
"quote",
"regex",
"syn",
]
[[package]]
name = "pnet_macros_support"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4714e10f30cab023005adce048f2d30dd4ac4f093662abf2220855655ef8f90"
dependencies = [
"pnet_base",
]
[[package]]
name = "pnet_packet"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8588067671d03c9f4254b2e66fecb4d8b93b5d3e703195b84f311cd137e32130"
dependencies = [
"glob",
"pnet_base",
"pnet_macros",
"pnet_macros_support",
]
[[package]] [[package]]
name = "pnet_sys" name = "pnet_sys"
version = "0.28.0" version = "0.28.0"
@ -2257,18 +2202,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "pnet_transport"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "932b2916d693bcc5fa18443dc99142e0a6fd31a6ce75a511868f7174c17e2bce"
dependencies = [
"libc",
"pnet_base",
"pnet_packet",
"pnet_sys",
]
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.16" version = "0.2.16"
@ -2416,9 +2349,9 @@ dependencies = [
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.15" version = "1.0.16"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "864d3e96a899863136fc6e99f3d7cae289dafe43bf2c5ac19b70df7210c0a145" checksum = "b4af2ec4714533fcdf07e886f17025ace8b997b9ce51204ee69b6da831c3da57"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -2492,12 +2425,13 @@ dependencies = [
[[package]] [[package]]
name = "redox_users" name = "redox_users"
version = "0.4.0" version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "528532f3d801c87aec9def2add9ca802fe569e44a544afe633765267840abe64" checksum = "7776223e2696f1aa4c6b0170e83212f47296a00424305117d013dfe86fb0fe55"
dependencies = [ dependencies = [
"getrandom", "getrandom",
"redox_syscall", "redox_syscall",
"thiserror",
] ]
[[package]] [[package]]
@ -2951,9 +2885,9 @@ checksum = "6bdef32e8150c2a081110b42772ffe7d7c9032b606bc226c8260fd97e0976601"
[[package]] [[package]]
name = "syn" name = "syn"
version = "1.0.86" version = "1.0.89"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a65b3f4ffa0092e9887669db0eae07941f023991ab58ea44da8fe8e2d511c6b" checksum = "ea297be220d52398dcc07ce15a209fce436d361735ac1db700cab3b6cdfb9f54"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
@ -3037,9 +2971,9 @@ dependencies = [
[[package]] [[package]]
name = "time" name = "time"
version = "0.3.7" version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "004cbc98f30fa233c61a38bc77e96a9106e65c88f2d3bef182ae952027e5753d" checksum = "c2702e08a7a860f005826c6815dcac101b19b5eb330c27fe4a5928fec1d20ddd"
dependencies = [ dependencies = [
"libc", "libc",
"num_threads", "num_threads",
@ -3501,9 +3435,9 @@ dependencies = [
[[package]] [[package]]
name = "which" name = "which"
version = "4.2.4" version = "4.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a5a7e487e921cf220206864a94a89b6c6905bfc19f1057fa26a4cb360e5c1d2" checksum = "5c4fb54e6113b6a8772ee41c3404fb0301ac79604489467e0a9ce1f3e97c24ae"
dependencies = [ dependencies = [
"either", "either",
"lazy_static", "lazy_static",
@ -3592,9 +3526,9 @@ checksum = "114ba2b24d2167ef6d67d7d04c8cc86522b87f490025f39f0303b7db5bf5e3d8"
[[package]] [[package]]
name = "xxhash-rust" name = "xxhash-rust"
version = "0.8.3" version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "886f21441c6731b9e06a9fdacbc5e29319c17a4069c2a511d80349874864702d" checksum = "83a16b7b403377d61184bb601d8349a4ff2c4cec08a305d004f710b7eaafef24"
[[package]] [[package]]
name = "yaml-rust" name = "yaml-rust"
@ -3607,9 +3541,9 @@ dependencies = [
[[package]] [[package]]
name = "zeroize" name = "zeroize"
version = "1.5.3" version = "1.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "50344758e2f40e3a1fcfc8f6f91aa57b5f8ebd8d27919fe6451f15aaaf9ee608" checksum = "7eb5728b8afd3f280a869ce1d4c554ffaed35f45c231fc41bfbd0381bef50317"
[[package]] [[package]]
name = "zstd" name = "zstd"

736
Cargo.nix

File diff suppressed because it is too large Load diff

View file

@ -32,13 +32,13 @@ serde_bytes = "0.11"
serde_json = "1.0" serde_json = "1.0"
# newer version requires rust edition 2021 # newer version requires rust edition 2021
kube = { version = "0.62", features = ["runtime", "derive"] } kube = { version = "0.62", features = ["runtime", "derive"], optional = true }
k8s-openapi = { version = "0.13", features = ["v1_22"] } k8s-openapi = { version = "0.13", features = ["v1_22"], optional = true }
openssl = { version = "0.10", features = ["vendored"] } openssl = { version = "0.10", features = ["vendored"], optional = true }
schemars = "0.8" schemars = { version = "0.8", optional = true }
# newer version requires rust edition 2021 # newer version requires rust edition 2021
pnet = "0.28" pnet_datalink = "0.28"
futures = "0.3" futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
@ -52,3 +52,5 @@ netapp = { version = "0.4.1", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
[features]
kubernetes-discovery = [ "kube", "k8s-openapi", "openssl", "schemars" ]

View file

@ -12,8 +12,6 @@ use serde::{Deserialize, Serialize};
use netapp::NodeID; use netapp::NodeID;
use garage_util::error::Error;
static K8S_GROUP: &str = "deuxfleurs.fr"; static K8S_GROUP: &str = "deuxfleurs.fr";
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
@ -29,7 +27,7 @@ pub struct Node {
port: u16, port: u16,
} }
pub async fn create_kubernetes_crd() -> Result<(), Error> { pub async fn create_kubernetes_crd() -> Result<(), kube::Error> {
let client = Client::try_default().await?; let client = Client::try_default().await?;
let crds: Api<CustomResourceDefinition> = Api::all(client.clone()); let crds: Api<CustomResourceDefinition> = Api::all(client.clone());
@ -45,7 +43,7 @@ pub async fn create_kubernetes_crd() -> Result<(), Error> {
pub async fn get_kubernetes_nodes( pub async fn get_kubernetes_nodes(
kubernetes_service_name: &str, kubernetes_service_name: &str,
kubernetes_namespace: &str, kubernetes_namespace: &str,
) -> Result<Vec<(NodeID, SocketAddr)>, Error> { ) -> Result<Vec<(NodeID, SocketAddr)>, kube::Error> {
let client = Client::try_default().await?; let client = Client::try_default().await?;
let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace); let nodes: Api<GarageNode> = Api::namespaced(client.clone(), kubernetes_namespace);
@ -80,7 +78,7 @@ pub async fn publish_kubernetes_node(
node_id: NodeID, node_id: NodeID,
hostname: &str, hostname: &str,
rpc_public_addr: SocketAddr, rpc_public_addr: SocketAddr,
) -> Result<(), Error> { ) -> Result<(), kube::Error> {
let node_pubkey = hex::encode(node_id); let node_pubkey = hex::encode(node_id);
let mut node = GarageNode::new( let mut node = GarageNode::new(

View file

@ -4,6 +4,7 @@
extern crate tracing; extern crate tracing;
mod consul; mod consul;
#[cfg(feature = "kubernetes-discovery")]
mod kubernetes; mod kubernetes;
pub mod layout; pub mod layout;

View file

@ -29,6 +29,7 @@ use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
use crate::consul::*; use crate::consul::*;
#[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::*; use crate::layout::*;
use crate::ring::*; use crate::ring::*;
@ -90,12 +91,10 @@ pub struct System {
rpc_listen_addr: SocketAddr, rpc_listen_addr: SocketAddr,
rpc_public_addr: Option<SocketAddr>, rpc_public_addr: Option<SocketAddr>,
bootstrap_peers: Vec<(NodeID, SocketAddr)>, bootstrap_peers: Vec<(NodeID, SocketAddr)>,
consul_host: Option<String>,
consul_service_name: Option<String>,
kubernetes_service_name: Option<String>, consul_discovery: Option<ConsulDiscoveryParam>,
kubernetes_namespace: Option<String>, #[cfg(feature = "kubernetes-discovery")]
kubernetes_skip_crd: bool, kubernetes_discovery: Option<KubernetesDiscoveryParam>,
replication_factor: usize, replication_factor: usize,
@ -228,15 +227,53 @@ impl System {
let ring = Ring::new(cluster_layout, replication_factor); let ring = Ring::new(cluster_layout, replication_factor);
let (update_ring, ring) = watch::channel(Arc::new(ring)); let (update_ring, ring) = watch::channel(Arc::new(ring));
let rpc_public_addr = match config.rpc_public_addr {
Some(a) => Some(a),
None => {
let addr =
get_default_ip().map(|ip| SocketAddr::new(ip, config.rpc_bind_addr.port()));
if let Some(a) = addr {
warn!("Using autodetected rpc_public_addr: {}. Consider specifying it explicitly in configuration file if possible.", a);
}
addr
}
};
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key); let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new( let fullmesh = FullMeshPeeringStrategy::new(
netapp.clone(), netapp.clone(),
config.bootstrap_peers.clone(), config.bootstrap_peers.clone(),
config.rpc_public_addr, rpc_public_addr,
); );
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into()); let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
let consul_discovery = match (&config.consul_host, &config.consul_service_name) {
(Some(ch), Some(csn)) => Some(ConsulDiscoveryParam {
consul_host: ch.to_string(),
service_name: csn.to_string(),
}),
_ => None,
};
#[cfg(feature = "kubernetes-discovery")]
let kubernetes_discovery = match (
&config.kubernetes_service_name,
&config.kubernetes_namespace,
) {
(Some(ksn), Some(kn)) => Some(KubernetesDiscoveryParam {
service_name: ksn.to_string(),
namespace: kn.to_string(),
skip_crd: config.kubernetes_skip_crd,
}),
_ => None,
};
#[cfg(not(feature = "kubernetes-discovery"))]
if config.kubernetes_service_name.is_some() || config.kubernetes_namespace.is_some() {
warn!("Kubernetes discovery is not enabled in this build.");
}
let sys = Arc::new(System { let sys = Arc::new(System {
id: netapp.id.into(), id: netapp.id.into(),
persist_cluster_layout, persist_cluster_layout,
@ -249,13 +286,11 @@ impl System {
system_endpoint, system_endpoint,
replication_factor, replication_factor,
rpc_listen_addr: config.rpc_bind_addr, rpc_listen_addr: config.rpc_bind_addr,
rpc_public_addr: config.rpc_public_addr, rpc_public_addr,
bootstrap_peers: config.bootstrap_peers.clone(), bootstrap_peers: config.bootstrap_peers.clone(),
consul_host: config.consul_host.clone(), consul_discovery,
consul_service_name: config.consul_service_name.clone(), #[cfg(feature = "kubernetes-discovery")]
kubernetes_service_name: config.kubernetes_service_name.clone(), kubernetes_discovery,
kubernetes_namespace: config.kubernetes_namespace.clone(),
kubernetes_skip_crd: config.kubernetes_skip_crd,
ring, ring,
update_ring: Mutex::new(update_ring), update_ring: Mutex::new(update_ring),
@ -280,23 +315,22 @@ impl System {
// ---- INTERNALS ---- // ---- INTERNALS ----
async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> { async fn advertise_to_consul(self: Arc<Self>) -> Result<(), Error> {
let (consul_host, consul_service_name) = let c = match &self.consul_discovery {
match (&self.consul_host, &self.consul_service_name) { Some(c) => c,
(Some(ch), Some(csn)) => (ch, csn),
_ => return Ok(()), _ => return Ok(()),
}; };
let rpc_public_addr = match self.rpc_public_addr { let rpc_public_addr = match self.rpc_public_addr {
Some(addr) => addr, Some(addr) => addr,
None => { None => {
warn!("Not advertising to Consul because rpc_public_addr is not defined in config file."); warn!("Not advertising to Consul because rpc_public_addr is not defined in config file and could not be autodetected.");
return Ok(()); return Ok(());
} }
}; };
publish_consul_service( publish_consul_service(
consul_host, &c.consul_host,
consul_service_name, &c.service_name,
self.netapp.id, self.netapp.id,
&self.local_status.load_full().hostname, &self.local_status.load_full().hostname,
rpc_public_addr, rpc_public_addr,
@ -305,36 +339,24 @@ impl System {
.err_context("Error while publishing Consul service") .err_context("Error while publishing Consul service")
} }
fn get_default_ip() -> IpAddr { #[cfg(feature = "kubernetes-discovery")]
pnet::datalink::interfaces()
.iter()
.find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
.unwrap()
.ips
.first()
.unwrap()
.ip()
}
async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> { async fn advertise_to_kubernetes(self: Arc<Self>) -> Result<(), Error> {
let (kubernetes_service_name, kubernetes_namespace) = let k = match &self.kubernetes_discovery {
match (&self.kubernetes_service_name, &self.kubernetes_namespace) { Some(k) => k,
(Some(ch), Some(csn)) => (ch, csn),
_ => return Ok(()), _ => return Ok(()),
}; };
let rpc_public_addr = let rpc_public_addr = match self.rpc_public_addr {
match self.rpc_public_addr {
Some(addr) => addr, Some(addr) => addr,
None => { None => {
warn!("No rpc_public_addr configured, using first address on first network interface"); warn!("Not advertising to Kubernetes because rpc_public_addr is not defined in config file and could not be autodetected.");
SocketAddr::new(Self::get_default_ip(), self.rpc_listen_addr.port()) return Ok(());
} }
}; };
publish_kubernetes_node( publish_kubernetes_node(
kubernetes_service_name, &k.service_name,
kubernetes_namespace, &k.namespace,
self.netapp.id, self.netapp.id,
&self.local_status.load_full().hostname, &self.local_status.load_full().hostname,
rpc_public_addr, rpc_public_addr,
@ -513,16 +535,6 @@ impl System {
} }
async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) { async fn discovery_loop(self: &Arc<Self>, mut stop_signal: watch::Receiver<bool>) {
let consul_config = match (&self.consul_host, &self.consul_service_name) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
};
let kubernetes_config = match (&self.kubernetes_service_name, &self.kubernetes_namespace) {
(Some(ch), Some(csn)) => Some((ch.clone(), csn.clone())),
_ => None,
};
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let not_configured = !self.ring.borrow().layout.check(); let not_configured = !self.ring.borrow().layout.check();
let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor; let no_peers = self.fullmesh.get_peer_list().len() < self.replication_factor;
@ -545,8 +557,8 @@ impl System {
} }
// Fetch peer list from Consul // Fetch peer list from Consul
if let Some((consul_host, consul_service_name)) = &consul_config { if let Some(c) = &self.consul_discovery {
match get_consul_nodes(consul_host, consul_service_name).await { match get_consul_nodes(&c.consul_host, &c.service_name).await {
Ok(node_list) => { Ok(node_list) => {
ping_list.extend(node_list); ping_list.extend(node_list);
} }
@ -557,8 +569,9 @@ impl System {
} }
// Fetch peer list from Kubernetes // Fetch peer list from Kubernetes
if let Some((kubernetes_service_name, kubernetes_namespace)) = &kubernetes_config { #[cfg(feature = "kubernetes-discovery")]
if !self.kubernetes_skip_crd { if let Some(k) = &self.kubernetes_discovery {
if !k.skip_crd {
match create_kubernetes_crd().await { match create_kubernetes_crd().await {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
@ -567,8 +580,7 @@ impl System {
}; };
} }
match get_kubernetes_nodes(kubernetes_service_name, kubernetes_namespace).await match get_kubernetes_nodes(&k.service_name, &k.namespace).await {
{
Ok(node_list) => { Ok(node_list) => {
ping_list.extend(node_list); ping_list.extend(node_list);
} }
@ -593,6 +605,8 @@ impl System {
} }
self.background.spawn(self.clone().advertise_to_consul()); self.background.spawn(self.clone().advertise_to_consul());
#[cfg(feature = "kubernetes-discovery")]
self.background self.background
.spawn(self.clone().advertise_to_kubernetes()); .spawn(self.clone().advertise_to_kubernetes());
@ -657,3 +671,23 @@ impl EndpointHandler<SystemRpc> for System {
} }
} }
} }
fn get_default_ip() -> Option<IpAddr> {
pnet_datalink::interfaces()
.iter()
.find(|e| e.is_up() && !e.is_loopback() && !e.ips.is_empty())
.and_then(|e| e.ips.first())
.map(|a| a.ip())
}
struct ConsulDiscoveryParam {
consul_host: String,
service_name: String,
}
#[cfg(feature = "kubernetes-discovery")]
struct KubernetesDiscoveryParam {
service_name: String,
namespace: String,
skip_crd: bool,
}

View file

@ -40,7 +40,4 @@ netapp = "0.4.1"
http = "0.2" http = "0.2"
hyper = "0.14" hyper = "0.14"
kube = { version = "0.62", features = ["runtime", "derive"] }
k8s-openapi = { version = "0.13", features = ["v1_22"] }
opentelemetry = "0.17" opentelemetry = "0.17"

View file

@ -23,9 +23,6 @@ pub enum Error {
#[error(display = "Invalid HTTP header value: {}", _0)] #[error(display = "Invalid HTTP header value: {}", _0)]
HttpHeader(#[error(source)] http::header::ToStrError), HttpHeader(#[error(source)] http::header::ToStrError),
#[error(display = "kubernetes error: {}", _0)]
Kubernetes(#[error(source)] kube::Error),
#[error(display = "Netapp error: {}", _0)] #[error(display = "Netapp error: {}", _0)]
Netapp(#[error(source)] netapp::error::Error), Netapp(#[error(source)] netapp::error::Error),