291 lines
11 KiB
Rust
291 lines
11 KiB
Rust
//! Contains structures to interact with the catalog API
|
|
//!
|
|
//! See <https://developer.hashicorp.com/consul/api-docs/catalog>
|
|
//! for the full definition of the API.
|
|
|
|
use std::cmp;
|
|
use std::collections::HashMap;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::Result;
|
|
use futures::future::BoxFuture;
|
|
use futures::stream::futures_unordered::FuturesUnordered;
|
|
use futures::{FutureExt, StreamExt, TryFutureExt};
|
|
use log::*;
|
|
use serde::{Deserialize, Deserializer, Serialize};
|
|
use tokio::select;
|
|
use tokio::sync::watch;
|
|
|
|
use crate::{Consul, WithIndex};
|
|
|
|
/// Node summary, as specified in response to "list nodes" API calls in
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
|
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct Node {
|
|
pub node: String,
|
|
pub address: String,
|
|
#[serde(default, deserialize_with = "deserialize_null_default")]
|
|
pub meta: HashMap<String, String>,
|
|
}
|
|
|
|
/// One of the services returned in a CatalogNode
|
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct Service {
|
|
pub service: String,
|
|
pub address: String,
|
|
pub port: u16,
|
|
pub tags: Vec<String>,
|
|
#[serde(default, deserialize_with = "deserialize_null_default")]
|
|
pub meta: HashMap<String, String>,
|
|
}
|
|
|
|
/// Full node info, as specified in response to "retrieve map of services for a node" API call in
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
|
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct CatalogNode {
|
|
pub node: Node,
|
|
#[serde(default, deserialize_with = "deserialize_null_default")]
|
|
pub services: HashMap<String, Service>,
|
|
}
|
|
|
|
/// Concise service list, as specified in response to "list services" API call in
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-services>
|
|
pub type ServiceList = HashMap<String, Vec<String>>;
|
|
|
|
/// Node serving a service, as specified in response to "list nodes for a service" API call in
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
|
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ServiceNode {
|
|
pub node: String,
|
|
pub address: String,
|
|
#[serde(default, deserialize_with = "deserialize_null_default")]
|
|
pub node_meta: HashMap<String, String>,
|
|
pub service_name: String,
|
|
pub service_tags: Vec<String>,
|
|
pub service_address: String,
|
|
pub service_port: u16,
|
|
#[serde(default, deserialize_with = "deserialize_null_default")]
|
|
pub service_meta: HashMap<String, String>,
|
|
}
|
|
|
|
/// Node serving a service with health info,
|
|
/// as specified in response to "list service instances for a service" health API call in
|
|
/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
|
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct HealthServiceNode {
|
|
pub node: Node,
|
|
pub service: Service,
|
|
pub checks: Vec<HealthCheck>,
|
|
}
|
|
|
|
/// A health check as returned in HealthServiceNode
|
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct HealthCheck {
|
|
pub node: String,
|
|
#[serde(rename = "CheckID")]
|
|
pub check_id: String,
|
|
pub name: String,
|
|
pub status: String,
|
|
pub output: String,
|
|
#[serde(rename = "Type")]
|
|
pub type_: String,
|
|
}
|
|
|
|
/// Map containing all services and their associated nodes, with health checks,
|
|
/// returned by `watch_all_service_health`
|
|
pub type AllServiceHealth = HashMap<String, Arc<[HealthServiceNode]>>;
|
|
|
|
impl Consul {
|
|
/// The "list nodes" API call of the Catalog API
|
|
///
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
|
|
pub async fn catalog_node_list(
|
|
&self,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Vec<Node>>> {
|
|
self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
|
|
.await
|
|
}
|
|
|
|
/// The "retrieve map of services for a node" API call of the Catalog API
|
|
///
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
|
|
pub async fn catalog_node(
|
|
&self,
|
|
host: &str,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Option<CatalogNode>>> {
|
|
self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
|
|
.await
|
|
}
|
|
|
|
/// The "list services" API call of the Catalog api
|
|
///
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-services>
|
|
pub async fn catalog_service_list(
|
|
&self,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<ServiceList>> {
|
|
self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
|
|
.await
|
|
}
|
|
|
|
/// The "list nodes for a service" API call of the Catalog api
|
|
///
|
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
|
|
pub async fn catalog_service_nodes(
|
|
&self,
|
|
service: &str,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Vec<ServiceNode>>> {
|
|
self.get_with_index(
|
|
format!("{}/v1/catalog/service/{}", self.url, service),
|
|
last_index,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// The "list service instances for a service" API call of the Health api
|
|
///
|
|
/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
|
|
pub async fn health_service_instances(
|
|
&self,
|
|
service: &str,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Vec<HealthServiceNode>>> {
|
|
self.get_with_index(
|
|
format!("{}/v1/health/service/{}", self.url, service),
|
|
last_index,
|
|
)
|
|
.await
|
|
}
|
|
|
|
/// Launches a background task that watches all services and the nodes that serve them,
|
|
/// and make that info available in a tokio watch channel.
|
|
/// The worker terminates when the channel is dropped.
|
|
pub fn watch_all_service_health(
|
|
&self,
|
|
max_retry_interval: Duration,
|
|
) -> watch::Receiver<AllServiceHealth> {
|
|
let (tx, rx) = watch::channel(HashMap::new());
|
|
|
|
tokio::spawn(do_watch_all_service_health(
|
|
self.clone(),
|
|
tx,
|
|
max_retry_interval,
|
|
));
|
|
|
|
rx
|
|
}
|
|
}
|
|
|
|
async fn do_watch_all_service_health(
|
|
consul: Consul,
|
|
tx: watch::Sender<AllServiceHealth>,
|
|
max_retry_interval: Duration,
|
|
) {
|
|
let mut services = AllServiceHealth::new();
|
|
let mut service_watchers =
|
|
FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
|
|
let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
|
|
Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
|
|
|
|
loop {
|
|
select! {
|
|
list_res = &mut service_list => {
|
|
match list_res {
|
|
Ok(list) => {
|
|
let list_index = list.index();
|
|
for service in list.into_inner().keys() {
|
|
if !services.contains_key(service) {
|
|
services.insert(service.to_string(), Arc::new([]));
|
|
|
|
let service = service.to_string();
|
|
service_watchers.push(Box::pin(async {
|
|
let res = consul.health_service_instances(&service, None).await
|
|
.map_err(|e| (1, e));
|
|
(service, res)
|
|
}));
|
|
}
|
|
}
|
|
service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
|
|
}
|
|
Err((err_count, e)) => {
|
|
warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
|
|
let consul = &consul;
|
|
service_list = Box::pin(async move {
|
|
tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
|
|
consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
|
|
});
|
|
}
|
|
}
|
|
}
|
|
(service, watch_res) = service_watchers.next().then(some_or_pending) => {
|
|
match watch_res {
|
|
Ok(nodes) => {
|
|
let index = nodes.index();
|
|
|
|
let nodes = nodes.into_inner();
|
|
if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) {
|
|
services.insert(service.clone(), nodes.into());
|
|
if tx.send(services.clone()).is_err() {
|
|
break;
|
|
}
|
|
}
|
|
|
|
let consul = &consul;
|
|
service_watchers.push(Box::pin(async move {
|
|
let res = consul.health_service_instances(&service, Some(index)).await
|
|
.map_err(|e| (1, e));
|
|
(service, res)
|
|
}));
|
|
}
|
|
Err((err_count, e)) => {
|
|
warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
|
|
let consul = &consul;
|
|
service_watchers.push(Box::pin(async move {
|
|
tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
|
|
let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
|
|
(service, res)
|
|
}));
|
|
}
|
|
}
|
|
}
|
|
_ = tx.closed() => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn some_or_pending<T>(value: Option<T>) -> T {
|
|
match value {
|
|
Some(v) => v,
|
|
None => futures::future::pending().await,
|
|
}
|
|
}
|
|
|
|
fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
|
|
// Exponential retry interval, starting at 2 seconds, maxing out at max_time,
|
|
// with exponential increase of *1.5 each time
|
|
cmp::min(
|
|
max_time,
|
|
Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)),
|
|
)
|
|
}
|
|
|
|
fn deserialize_null_default<'de, D, T>(deserializer: D) -> Result<T, D::Error>
|
|
where
|
|
T: Default + Deserialize<'de>,
|
|
D: Deserializer<'de>,
|
|
{
|
|
Option::deserialize(deserializer).map(Option::unwrap_or_default)
|
|
}
|