From ff14118db7ac838910f4ace03bc24f6dbafc62b3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 2 Feb 2023 16:10:56 +0100 Subject: [PATCH] Exponential backoff retry on catalog watcher --- Cargo.toml | 3 +- examples/test.rs | 13 +++----- examples/watch_test.rs | 27 +++++++++++++++ src/catalog.rs | 74 ++++++++++++++++++++++++++++-------------- src/lib.rs | 2 +- 5 files changed, 83 insertions(+), 36 deletions(-) create mode 100644 examples/watch_test.rs diff --git a/Cargo.toml b/Cargo.toml index fc616df..b844a1f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,7 @@ name = "df-consul" description = "Deuxfleurs' async Rust bindings for (a subset of) the Consul HTTP API" authors = [ "Alex Auvolat " ] -version = "0.3.1" +version = "0.3.2" edition = "2021" license = "MIT" repository = "https://git.deuxfleurs.fr/Deuxfleurs/df-consul" @@ -20,3 +20,4 @@ futures = "0.3.25" [dev-dependencies] tokio = { version = "1.23", features = ["rt", "rt-multi-thread", "macros"] } +pretty_env_logger = "0.4.0" diff --git a/examples/test.rs b/examples/test.rs index b3e6455..7e38c66 100644 --- a/examples/test.rs +++ b/examples/test.rs @@ -1,7 +1,11 @@ +use std::time::Duration; + use df_consul::*; #[tokio::main] async fn main() { + pretty_env_logger::init(); + let config = Config { addr: "http://localhost:8500".into(), ca_cert: None, @@ -41,13 +45,4 @@ async fn main() { .unwrap() ); } - - println!("== WATCHING EVERYTHING =="); - let mut watch = consul.watch_all_service_health(); - loop { - if watch.changed().await.is_err() { - break; - } - println!("\n{:?}", watch.borrow_and_update()); - } } diff --git a/examples/watch_test.rs b/examples/watch_test.rs new file mode 100644 index 0000000..d4d48a0 --- /dev/null +++ b/examples/watch_test.rs @@ -0,0 +1,27 @@ +use std::time::Duration; + +use df_consul::*; + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let config = Config { + addr: "http://localhost:8500".into(), + ca_cert: None, + tls_skip_verify: false, + client_cert: None, + client_key: None, + }; + + let consul = Consul::new(config, "").unwrap(); + + println!("== WATCHING EVERYTHING =="); + let mut watch = consul.watch_all_service_health(Duration::from_secs(30)); + loop { + if watch.changed().await.is_err() { + break; + } + println!("\n{:?}", watch.borrow_and_update()); + } +} diff --git a/src/catalog.rs b/src/catalog.rs index 0f0ecf5..cad7f5b 100644 --- a/src/catalog.rs +++ b/src/catalog.rs @@ -3,6 +3,7 @@ //! See //! for the full definition of the API. +use std::cmp; use std::collections::HashMap; use std::fmt::Write; use std::sync::Arc; @@ -11,7 +12,7 @@ use std::time::Duration; use anyhow::Result; use futures::future::BoxFuture; use futures::stream::futures_unordered::FuturesUnordered; -use futures::{FutureExt, StreamExt}; +use futures::{FutureExt, StreamExt, TryFutureExt}; use log::*; use serde::{Deserialize, Serialize}; use tokio::select; @@ -19,7 +20,7 @@ use tokio::sync::watch; use crate::{Consul, WithIndex}; -/// Node summary, as specified in response to "list nodes" API calls in +/// Node summary, as specified in response to "list nodes" API calls in /// #[derive(Serialize, Deserialize, Debug, Clone)] #[serde(rename_all = "PascalCase")] @@ -126,11 +127,8 @@ impl Consul { &self, last_index: Option, ) -> Result> { - self.get_with_index::( - format!("{}/v1/catalog/services", self.url), - last_index, - ) - .await + self.get_with_index::(format!("{}/v1/catalog/services", self.url), last_index) + .await } /// The "list nodes for a service" API call of the Catalog api @@ -166,10 +164,17 @@ impl Consul { /// Launches a background task that watches all services and the nodes that serve them, /// and make that info available in a tokio watch channel. /// The worker terminates when the channel is dropped. - pub fn watch_all_service_health(&self) -> watch::Receiver { + pub fn watch_all_service_health( + &self, + max_retry_interval: Duration, + ) -> watch::Receiver { let (tx, rx) = watch::channel(HashMap::new()); - tokio::spawn(do_watch_all_service_health(self.clone(), tx)); + tokio::spawn(do_watch_all_service_health( + self.clone(), + tx, + max_retry_interval, + )); rx } @@ -194,10 +199,16 @@ impl Consul { } } -async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender) { +async fn do_watch_all_service_health( + consul: Consul, + tx: watch::Sender, + max_retry_interval: Duration, +) { let mut services = AllServiceHealth::new(); - let mut service_watchers = FuturesUnordered::)>>::new(); - let mut service_list: BoxFuture> = Box::pin(consul.catalog_service_list(None)); + let mut service_watchers = + FuturesUnordered::)>>::new(); + let mut service_list: BoxFuture> = + Box::pin(consul.catalog_service_list(None).map_err(|e| (1, e))); loop { select! { @@ -211,18 +222,20 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender { - warn!("Error listing services: {}", e); - service_list = Box::pin(async { - tokio::time::sleep(Duration::from_secs(30)).await; - consul.catalog_service_list(None).await + Err((err_count, e)) => { + warn!("Error listing services: {} ({} consecutive errors)", e, err_count); + let consul = &consul; + service_list = Box::pin(async move { + tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; + consul.catalog_service_list(None).await.map_err(|e| (err_count + 1, e)) }); } } @@ -235,7 +248,8 @@ async fn do_watch_all_service_health(consul: Consul, tx: watch::Sender { - warn!("Error getting service {}: {}", service, e); - service_watchers.push(Box::pin(async { - tokio::time::sleep(Duration::from_secs(30)).await; - let res = consul.health_service_instances(&service, None).await; + Err((err_count, e)) => { + warn!("Error getting service {}: {} ({} consecutive errors)", service, e, err_count); + let consul = &consul; + service_watchers.push(Box::pin(async move { + tokio::time::sleep(retry_to_time(err_count, max_retry_interval)).await; + let res = consul.health_service_instances(&service, None).await.map_err(|e| (err_count + 1, e)); (service, res) })); } @@ -266,3 +281,12 @@ async fn some_or_pending(value: Option) -> T { None => futures::future::pending().await, } } + +fn retry_to_time(retries: usize, max_time: Duration) -> Duration { + // Exponential retry interval, starting at 2 seconds, maxing out at max_time, + // with exponential increase of *1.5 each time + cmp::min( + max_time, + Duration::from_secs_f64(2.0f64 * 1.5f64.powf(retries as f64)), + ) +} diff --git a/src/lib.rs b/src/lib.rs index c320936..583f106 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,6 +1,6 @@ pub mod catalog; -pub mod locking; mod kv; +pub mod locking; mod with_index; use std::fs::File;