Avoid sending useless changes to downstream listener
This commit is contained in:
parent
ff14118db7
commit
a11ed03031
3 changed files with 17 additions and 13 deletions
|
@ -2,7 +2,7 @@
|
||||||
name = "df-consul"
|
name = "df-consul"
|
||||||
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
|
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
|
||||||
authors = [ "Alex Auvolat <alex@adnab.me>" ]
|
authors = [ "Alex Auvolat <alex@adnab.me>" ]
|
||||||
version = "0.3.2"
|
version = "0.3.3"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
license = "MIT"
|
license = "MIT"
|
||||||
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
|
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
|
||||||
|
|
|
@ -22,6 +22,7 @@ async fn main() {
|
||||||
if watch.changed().await.is_err() {
|
if watch.changed().await.is_err() {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
println!("\n{:?}", watch.borrow_and_update());
|
//println!("\n{:?}", watch.borrow_and_update());
|
||||||
|
println!("changed, {} values", watch.borrow_and_update().len());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ use crate::{Consul, WithIndex};
|
||||||
|
|
||||||
/// Node summary, as specified in response to "list nodes" API calls in
|
/// Node summary, as specified in response to "list nodes" API calls in
|
||||||
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes>
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct Node {
|
pub struct Node {
|
||||||
pub node: String,
|
pub node: String,
|
||||||
|
@ -31,7 +31,7 @@ pub struct Node {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// One of the services returned in a CatalogNode
|
/// One of the services returned in a CatalogNode
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct Service {
|
pub struct Service {
|
||||||
pub service: String,
|
pub service: String,
|
||||||
|
@ -42,7 +42,7 @@ pub struct Service {
|
||||||
|
|
||||||
/// Full node info, as specified in response to "retrieve map of services for a node" API call in
|
/// Full node info, as specified in response to "retrieve map of services for a node" API call in
|
||||||
/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#retrieve-map-of-services-for-a-node>
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct CatalogNode {
|
pub struct CatalogNode {
|
||||||
pub node: Node,
|
pub node: Node,
|
||||||
|
@ -55,7 +55,7 @@ pub type ServiceList = HashMap<String, Vec<String>>;
|
||||||
|
|
||||||
/// Node serving a service, as specified in response to "list nodes for a service" API call in
|
/// Node serving a service, as specified in response to "list nodes for a service" API call in
|
||||||
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
|
/// <https://developer.hashicorp.com/consul/api-docs/catalog#list-nodes-for-service>
|
||||||
#[derive(Serialize, Deserialize, Debug)]
|
#[derive(Serialize, Deserialize, Debug, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct ServiceNode {
|
pub struct ServiceNode {
|
||||||
pub node: String,
|
pub node: String,
|
||||||
|
@ -70,7 +70,7 @@ pub struct ServiceNode {
|
||||||
/// Node serving a service with health info,
|
/// Node serving a service with health info,
|
||||||
/// as specified in response to "list service instances for a service" health API call in
|
/// as specified in response to "list service instances for a service" health API call in
|
||||||
/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
|
/// <https://developer.hashicorp.com/consul/api-docs/health#list-service-instances-for-service>
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct HealthServiceNode {
|
pub struct HealthServiceNode {
|
||||||
pub node: Node,
|
pub node: Node,
|
||||||
|
@ -79,7 +79,7 @@ pub struct HealthServiceNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A health check as returned in HealthServiceNode
|
/// A health check as returned in HealthServiceNode
|
||||||
#[derive(Serialize, Deserialize, Debug, Clone)]
|
#[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)]
|
||||||
#[serde(rename_all = "PascalCase")]
|
#[serde(rename_all = "PascalCase")]
|
||||||
pub struct HealthCheck {
|
pub struct HealthCheck {
|
||||||
pub node: String,
|
pub node: String,
|
||||||
|
@ -244,7 +244,14 @@ async fn do_watch_all_service_health(
|
||||||
match watch_res {
|
match watch_res {
|
||||||
Ok(nodes) => {
|
Ok(nodes) => {
|
||||||
let index = nodes.index();
|
let index = nodes.index();
|
||||||
services.insert(service.clone(), nodes.into_inner().into());
|
|
||||||
|
let nodes = nodes.into_inner();
|
||||||
|
if services.get(&service).as_ref().map(|n| &n[..]) != Some(&nodes[..]) {
|
||||||
|
services.insert(service.clone(), nodes.into());
|
||||||
|
if tx.send(services.clone()).is_err() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let consul = &consul;
|
let consul = &consul;
|
||||||
service_watchers.push(Box::pin(async move {
|
service_watchers.push(Box::pin(async move {
|
||||||
|
@ -252,10 +259,6 @@ async fn do_watch_all_service_health(
|
||||||
.map_err(|e| (1, e));
|
.map_err(|e| (1, e));
|
||||||
(service, res)
|
(service, res)
|
||||||
}));
|
}));
|
||||||
|
|
||||||
if tx.send(services.clone()).is_err() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
Err((err_count, e)) => {
|
Err((err_count, e)) => {
|
||||||
warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
|
warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
|
||||||
|
|
Loading…
Reference in a new issue