Exponential backoff retry on catalog watcher

This commit is contained in:
Alex 2023-02-02 16:10:56 +01:00
parent d0f40c02b9
commit ff14118db7
5 changed files with 83 additions and 36 deletions

View file

@ -2,7 +2,7 @@
name = "df-consul" name = "df-consul"
description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API" description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API"
authors = [ "Alex Auvolat <alex@adnab.me>" ] authors = [ "Alex Auvolat <alex@adnab.me>" ]
version = "0.3.1" version = "0.3.2"
edition = "2021" edition = "2021"
license = "MIT" license = "MIT"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul" repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul"
@ -20,3 +20,4 @@ futures = "0.3.25"
[dev-dependencies] [dev-dependencies]
tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] } tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] }
pretty_env_logger = "0.4.0"

View file

@ -1,7 +1,11 @@
use std::time::Duration;
use df_consul::*; use df_consul::*;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
pretty_env_logger::init();
let config = Config { let config = Config {
addr: "http://localhost:8500".into(), addr: "http://localhost:8500".into(),
ca_cert: None, ca_cert: None,
@ -41,13 +45,4 @@ async fn main() {
.unwrap() .unwrap()
); );
} }
println!("== WATCHING EVERYTHING ==");
let mut watch = consul.watch_all_service_health();
loop {
if watch.changed().await.is_err() {
break;
}
println!("\n{:?}", watch.borrow_and_update());
}
} }

27
examples/watch_test.rs Normal file
View file

@ -0,0 +1,27 @@
use std::time::Duration;
use df_consul::*;
#[tokio::main]
async fn main() {
pretty_env_logger::init();
let config = Config {
addr: "http://localhost:8500".into(),
ca_cert: None,
tls_skip_verify: false,
client_cert: None,
client_key: None,
};
let consul = Consul::new(config, "").unwrap();
println!("== WATCHING EVERYTHING ==");
let mut watch = consul.watch_all_service_health(Duration::from_secs(30));
loop {
if watch.changed().await.is_err() {
break;
}
println!("\n{:?}", watch.borrow_and_update());
}
}

View file

@ -3,6 +3,7 @@
//! See <https://developer.hashicorp.com/consul/api-docs/catalog> //! See <https://developer.hashicorp.com/consul/api-docs/catalog>
//! for the full definition of the API. //! for the full definition of the API.
use std::cmp;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write; use std::fmt::Write;
use std::sync::Arc; use std::sync::Arc;
@ -11,7 +12,7 @@ use std::time::Duration;
use anyhow::Result; use anyhow::Result;
use futures::future::BoxFuture; use futures::future::BoxFuture;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::{FutureExt, StreamExt}; use futures::{FutureExt, StreamExt, TryFutureExt};
use log::*; use log::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::select; use tokio::select;
@ -126,10 +127,7 @@ impl Consul {
&self, &self,
last_index: Option<usize>, last_index: Option<usize>,
) -> Result<WithIndex<ServiceList>> { ) -> Result<WithIndex<ServiceList>> {
self.get_with_index::<ServiceList>( self.get_with_index::<ServiceList>(format!("{}/v1/catalog/services", self.url), last_index)
format!("{}/v1/catalog/services", self.url),
last_index,
)
.await .await
} }
@ -166,10 +164,17 @@ impl Consul {
/// Launches a background task that watches all services and the nodes that serve them, /// Launches a background task that watches all services and the nodes that serve them,
/// and make that info available in a tokio watch channel. /// and make that info available in a tokio watch channel.
/// The worker terminates when the channel is dropped. /// The worker terminates when the channel is dropped.
pub fn watch_all_service_health(&self) -> watch::Receiver<AllServiceHealth> { pub fn watch_all_service_health(
&self,
max_retry_interval: Duration,
) -> watch::Receiver<AllServiceHealth> {
let (tx, rx) = watch::channel(HashMap::new()); let (tx, rx) = watch::channel(HashMap::new());
tokio::spawn(do_watch_all_service_health(self.clone(), tx)); tokio::spawn(do_watch_all_service_health(
self.clone(),
tx,
max_retry_interval,
));
rx rx
} }
@ -194,10 +199,16 @@ impl Consul {
} }
} }
async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServiceHealth>) { async fn do_watch_all_service_health(
consul: Consul,
tx: watch::Sender<AllServiceHealth>,
max_retry_interval: Duration,
) {
let mut services = AllServiceHealth::new(); let mut services = AllServiceHealth::new();
let mut service_watchers = FuturesUnordered::<BoxFuture<(String, Result<_>)>>::new(); let mut service_watchers =
let mut service_list: BoxFuture<Result<_>> = Box::pin(consul.catalog_service_list(None)); FuturesUnordered::<BoxFuture<(String, std::result::Result<_, (usize, _)>)>>::new();
let mut service_list: BoxFuture<std::result::Result<_, (usize, _)>> =
Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e)));
loop { loop {
select! { select! {
@ -211,18 +222,20 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
let service = service.to_string(); let service = service.to_string();
service_watchers.push(Box::pin(async { service_watchers.push(Box::pin(async {
let res = consul.health_service_instances(&service, None).await; let res = consul.health_service_instances(&service, None).await
.map_err(|e| (1, e));
(service, res) (service, res)
})); }));
} }
} }
service_list = Box::pin(consul.catalog_service_list(Some(list_index))); service_list = Box::pin(consul.catalog_service_list(Some(list_index)).map_err(|e| (1, e)));
} }
Err(e) => { Err((err_count, e)) => {
warn!("Error listing services: {}", e); warn!("Error listing services: {} ({} consecutive errors)", e, err_count);
service_list = Box::pin(async { let consul = &consul;
tokio::time::sleep(Duration::from_secs(30)).await; service_list = Box::pin(async move {
consul.catalog_service_list(None).await tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e))
}); });
} }
} }
@ -235,7 +248,8 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
let consul = &consul; let consul = &consul;
service_watchers.push(Box::pin(async move { service_watchers.push(Box::pin(async move {
let res = consul.health_service_instances(&service, Some(index)).await; let res = consul.health_service_instances(&service, Some(index)).await
.map_err(|e| (1, e));
(service, res) (service, res)
})); }));
@ -243,11 +257,12 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender<AllServic
break; break;
} }
} }
Err(e) => { Err((err_count, e)) => {
warn!("Error getting service {}: {}", service, e); warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count);
service_watchers.push(Box::pin(async { let consul = &consul;
tokio::time::sleep(Duration::from_secs(30)).await; service_watchers.push(Box::pin(async move {
let res = consul.health_service_instances(&service, None).await; tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await;
let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e));
(service, res) (service, res)
})); }));
} }
@ -266,3 +281,12 @@ async fn some_or_pending<T>(value: Option<T>) -> T {
None => futures::future::pending().await, None => futures::future::pending().await,
} }
} }
fn retry_to_time(retries: usize, max_time: Duration) -> Duration {
// Exponential retry interval, starting at 2 seconds, maxing out at max_time,
// with exponential increase of *1.5 each time
cmp::min(
max_time,
Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)),
)
}

View file

@ -1,6 +1,6 @@
pub mod catalog; pub mod catalog;
pub mod locking;
mod kv; mod kv;
pub mod locking;
mod with_index; mod with_index;
use std::fs::File; use std::fs::File;