231 lines
7.2 KiB
Rust
231 lines
7.2 KiB
Rust
use std::collections::HashMap;
|
|
use std::fmt::Write;
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::Result;
|
|
use futures::future::BoxFuture;
|
|
use futures::stream::futures_unordered::FuturesUnordered;
|
|
use futures::{FutureExt, StreamExt};
|
|
use log::*;
|
|
use serde::{Deserialize, Serialize};
|
|
use tokio::select;
|
|
use tokio::sync::watch;
|
|
|
|
use crate::{Consul, WithIndex};
|
|
|
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ConsulNode {
|
|
pub node: String,
|
|
pub address: String,
|
|
pub meta: HashMap<String, String>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ConsulService {
|
|
pub service: String,
|
|
pub address: String,
|
|
pub port: u16,
|
|
pub tags: Vec<String>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ConsulCatalogNode {
|
|
pub node: ConsulNode,
|
|
pub services: HashMap<String, ConsulService>,
|
|
}
|
|
|
|
pub type ConsulServiceList = HashMap<String, Vec<String>>;
|
|
|
|
#[derive(Serialize, Deserialize, Debug)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ConsulServiceNode {
|
|
pub node: String,
|
|
pub address: String,
|
|
pub node_meta: HashMap<String, String>,
|
|
pub service_name: String,
|
|
pub service_tags: Vec<String>,
|
|
pub service_address: String,
|
|
pub service_port: u16,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ConsulHealthServiceNode {
|
|
pub node: ConsulNode,
|
|
pub service: ConsulService,
|
|
pub checks: Vec<ConsulHealthCheck>,
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Debug, Clone)]
|
|
#[serde(rename_all = "PascalCase")]
|
|
pub struct ConsulHealthCheck {
|
|
pub node: String,
|
|
#[serde(rename = "CheckID")]
|
|
pub check_id: String,
|
|
pub name: String,
|
|
pub status: String,
|
|
pub output: String,
|
|
#[serde(rename = "Type")]
|
|
pub type_: String,
|
|
}
|
|
|
|
pub type AllServiceHealth = HashMap<String, Arc<[ConsulHealthServiceNode]>>;
|
|
|
|
impl Consul {
|
|
pub async fn catalog_node_list(
|
|
&self,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Vec<ConsulNode>>> {
|
|
self.get_with_index(format!("{}/v1/catalog/nodes", self.url), last_index)
|
|
.await
|
|
}
|
|
|
|
pub async fn catalog_node(
|
|
&self,
|
|
host: &str,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Option<ConsulCatalogNode>>> {
|
|
self.get_with_index(format!("{}/v1/catalog/node/{}", self.url, host), last_index)
|
|
.await
|
|
}
|
|
|
|
pub async fn catalog_service_list(
|
|
&self,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<ConsulServiceList>> {
|
|
self.get_with_index::<ConsulServiceList>(
|
|
format!("{}/v1/catalog/services", self.url),
|
|
last_index,
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub async fn catalog_service_nodes(
|
|
&self,
|
|
service: &str,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Vec<ConsulServiceNode>>> {
|
|
self.get_with_index(
|
|
format!("{}/v1/catalog/service/{}", self.url, service),
|
|
last_index,
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub async fn health_service_instances(
|
|
&self,
|
|
service: &str,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<Vec<ConsulHealthServiceNode>>> {
|
|
self.get_with_index(
|
|
format!("{}/v1/health/service/{}", self.url, service),
|
|
last_index,
|
|
)
|
|
.await
|
|
}
|
|
|
|
pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> {
|
|
let (tx, rx) = watch::channel(HashMap::new());
|
|
|
|
tokio::spawn(do_watch_all_service_health(self.clone(), tx));
|
|
|
|
rx
|
|
}
|
|
|
|
async fn get_with_index<T: for<'de> Deserialize<'de>>(
|
|
&self,
|
|
mut url: String,
|
|
last_index: Option<usize>,
|
|
) -> Result<WithIndex<T>> {
|
|
if let Some(i) = last_index {
|
|
if url.contains('?') {
|
|
write!(&mut url, "&index={}", i).unwrap();
|
|
} else {
|
|
write!(&mut url, "?index={}", i).unwrap();
|
|
}
|
|
}
|
|
debug!("GET {} as {}", url, std::any::type_name::<T>());
|
|
|
|
let http = self.client.get(&url).send().await?;
|
|
|
|
Ok(WithIndex::<T>::index_from(&http)?.value(http.json().await?))
|
|
}
|
|
}
|
|
|
|
async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) {
|
|
let mut services = AllServiceHealth::new();
|
|
let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new();
|
|
let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None));
|
|
|
|
loop {
|
|
select! {
|
|
list_res = &mut service_list => {
|
|
match list_res {
|
|
Ok(list) => {
|
|
let list_index = list.index();
|
|
for service in list.into_inner().keys() {
|
|
if !services.contains_key(service) {
|
|
services.insert(service.to_string(), Arc::new([]));
|
|
|
|
let service = service.to_string();
|
|
service_watchers.push(Box::pin(async {
|
|
let res = consul.health_service_instances(&service, None).await;
|
|
(service, res)
|
|
}));
|
|
}
|
|
}
|
|
service_list = Box::pin(consul.catalog_service_list(Some(list_index)));
|
|
}
|
|
Err(e) => {
|
|
warn!("Error listing services: {}", e);
|
|
service_list = Box::pin(async {
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
consul.catalog_service_list(None).await
|
|
});
|
|
}
|
|
}
|
|
}
|
|
(service, watch_res) = service_watchers.next().then(some_or_pending) => {
|
|
match watch_res {
|
|
Ok(nodes) => {
|
|
let index = nodes.index();
|
|
services.insert(service.clone(), nodes.into_inner().into());
|
|
|
|
let consul = &consul;
|
|
service_watchers.push(Box::pin(async move {
|
|
let res = consul.health_service_instances(&service, Some(index)).await;
|
|
(service, res)
|
|
}));
|
|
|
|
if tx.send(services.clone()).is_err() {
|
|
break;
|
|
}
|
|
}
|
|
Err(e) => {
|
|
warn!("Error getting service {}: {}", service, e);
|
|
service_watchers.push(Box::pin(async {
|
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
|
let res = consul.health_service_instances(&service, None).await;
|
|
(service, res)
|
|
}));
|
|
}
|
|
}
|
|
}
|
|
_ = tx.closed() => {
|
|
break;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn some_or_pending<T>(value: Option<T>) -> T {
|
|
match value {
|
|
Some(v) => v,
|
|
None => futures::future::pending().await,
|
|
}
|
|
}
|