Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
13 changed files with 274 additions and 49 deletions
Showing only changes of commit 95ffba343f - Show all commits

188
Cargo.lock generated
View file

@ -93,6 +93,15 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "autocfg"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"
dependencies = [
"autocfg 1.1.0",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.1.0" version = "1.1.0"
@ -543,7 +552,7 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c" checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [ dependencies = [
"autocfg", "autocfg 1.1.0",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"crossbeam-utils 0.8.8", "crossbeam-utils 0.8.8",
"lazy_static", "lazy_static",
@ -985,6 +994,7 @@ dependencies = [
"sha2", "sha2",
"static_init", "static_init",
"structopt", "structopt",
"timeago",
"tokio", "tokio",
"toml", "toml",
"tracing", "tracing",
@ -1631,7 +1641,7 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223" checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [ dependencies = [
"autocfg", "autocfg 1.1.0",
"hashbrown", "hashbrown",
] ]
@ -1653,6 +1663,16 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "isolang"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "265ef164908329e47e753c769b14cbb27434abf0c41984dca201484022f09ce5"
dependencies = [
"phf",
"phf_codegen",
]
[[package]] [[package]]
name = "itertools" name = "itertools"
version = "0.4.19" version = "0.4.19"
@ -1977,7 +1997,7 @@ version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce" checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [ dependencies = [
"autocfg", "autocfg 1.1.0",
] ]
[[package]] [[package]]
@ -2139,7 +2159,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db" checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [ dependencies = [
"autocfg", "autocfg 1.1.0",
"num-traits", "num-traits",
] ]
@ -2149,7 +2169,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [ dependencies = [
"autocfg", "autocfg 1.1.0",
] ]
[[package]] [[package]]
@ -2218,7 +2238,7 @@ version = "0.9.72"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb" checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
dependencies = [ dependencies = [
"autocfg", "autocfg 1.1.0",
"cc", "cc",
"libc", "libc",
"openssl-src", "openssl-src",
@ -2388,6 +2408,44 @@ dependencies = [
"indexmap", "indexmap",
] ]
[[package]]
name = "phf"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_codegen"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"
dependencies = [
"phf_generator",
"phf_shared",
]
[[package]]
name = "phf_generator"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"
dependencies = [
"phf_shared",
"rand 0.6.5",
]
[[package]]
name = "phf_shared"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"
dependencies = [
"siphasher",
]
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "0.4.29" version = "0.4.29"
@ -2642,6 +2700,25 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "rand"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
dependencies = [
"autocfg 0.1.8",
"libc",
"rand_chacha 0.1.1",
"rand_core 0.4.2",
"rand_hc",
"rand_isaac",
"rand_jitter",
"rand_os",
"rand_pcg",
"rand_xorshift",
"winapi",
]
[[package]] [[package]]
name = "rand" name = "rand"
version = "0.8.5" version = "0.8.5"
@ -2649,10 +2726,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [ dependencies = [
"libc", "libc",
"rand_chacha", "rand_chacha 0.3.1",
"rand_core 0.6.3", "rand_core 0.6.3",
] ]
[[package]]
name = "rand_chacha"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
dependencies = [
"autocfg 0.1.8",
"rand_core 0.3.1",
]
[[package]] [[package]]
name = "rand_chacha" name = "rand_chacha"
version = "0.3.1" version = "0.3.1"
@ -2687,6 +2774,77 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "rand_hc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_isaac"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_jitter"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"
dependencies = [
"libc",
"rand_core 0.4.2",
"winapi",
]
[[package]]
name = "rand_os"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
dependencies = [
"cloudabi",
"fuchsia-cprng",
"libc",
"rand_core 0.4.2",
"rdrand",
"winapi",
]
[[package]]
name = "rand_pcg"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"
dependencies = [
"autocfg 0.1.8",
"rand_core 0.4.2",
]
[[package]]
name = "rand_xorshift"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]] [[package]]
name = "redox_syscall" name = "redox_syscall"
version = "0.2.11" version = "0.2.11"
@ -3109,6 +3267,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "siphasher"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"
[[package]] [[package]]
name = "slab" name = "slab"
version = "0.4.5" version = "0.4.5"
@ -3357,6 +3521,16 @@ dependencies = [
"num_threads", "num_threads",
] ]
[[package]]
name = "timeago"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ec32dde57efb15c035ac074118d7f32820451395f28cb0524a01d4e94983b26"
dependencies = [
"chrono",
"isolang",
]
[[package]] [[package]]
name = "tinyvec" name = "tinyvec"
version = "1.5.1" version = "1.5.1"

View file

@ -1,4 +1,4 @@
.PHONY: doc all release shell .PHONY: doc all release shell run1 run2 run3
all: all:
clear; cargo build --all-features clear; cargo build --all-features
@ -11,3 +11,12 @@ release:
shell: shell:
nix-shell nix-shell
run1:
RUST_LOG=garage=debug ./target/debug/garage -c tmp/config.1.toml server
run2:
RUST_LOG=garage=debug ./target/debug/garage -c tmp/config.2.toml server
run3:
RUST_LOG=garage=debug ./target/debug/garage -c tmp/config.3.toml server

View file

@ -32,6 +32,7 @@ garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0" bytes = "1.0"
bytesize = "1.1" bytesize = "1.1"
timeago = "0.3"
hex = "0.4" hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] } tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4" pretty_env_logger = "0.4"

View file

@ -5,7 +5,6 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::background::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
@ -49,7 +48,10 @@ pub enum AdminRpc {
}, },
KeyList(Vec<(String, String)>), KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>), KeyInfo(Key, HashMap<Uuid, Bucket>),
WorkerList(HashMap<usize, garage_util::background::WorkerInfo>), WorkerList(
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
} }
impl Rpc for AdminRpc { impl Rpc for AdminRpc {
@ -830,19 +832,9 @@ impl AdminRpcHandler {
async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> { async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
match opt.cmd { match opt.cmd {
WorkerCmd::List { busy } => { WorkerCmd::List { opt } => {
let workers = self.garage.background.get_worker_info(); let workers = self.garage.background.get_worker_info();
let workers = if busy { Ok(AdminRpc::WorkerList(workers, opt))
workers
.into_iter()
.filter(|(_, w)| {
matches!(w.status, WorkerStatus::Busy | WorkerStatus::Throttled(_))
})
.collect()
} else {
workers
};
Ok(AdminRpc::WorkerList(workers))
} }
} }
} }

View file

@ -1,4 +1,5 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::time::Duration;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table; use garage_util::formater::format_table;
@ -101,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()]; vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) { for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) { if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
let tf = timeago::Formatter::new();
failed_nodes.push(format!( failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}", "{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id, id = adv.id,
@ -111,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(), capacity = cfg.capacity_string(),
last_seen = adv last_seen = adv
.last_seen_secs_ago .last_seen_secs_ago
.map(|s| format!("{}s ago", s)) .map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()), .unwrap_or_else(|| "never seen".into()),
)); ));
} }
@ -183,8 +185,8 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => { AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb); print_key_info(&key, &rb);
} }
AdminRpc::WorkerList(wi) => { AdminRpc::WorkerList(wi, wlo) => {
print_worker_info(wi); print_worker_info(wi, wlo);
} }
r => { r => {
error!("Unexpected response: {:?}", r); error!("Unexpected response: {:?}", r);

View file

@ -476,8 +476,17 @@ pub enum WorkerCmd {
/// List all workers on Garage node /// List all workers on Garage node
#[structopt(name = "list")] #[structopt(name = "list")]
List { List {
/// Show only busy workers #[structopt(flatten)]
#[structopt(short = "b", long = "busy")] opt: WorkerListOpt,
busy: bool,
}, },
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
pub busy: bool,
/// Show only workers with errors
#[structopt(short = "e", long = "errors")]
pub errors: bool,
}

View file

@ -1,15 +1,19 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::time::Duration;
use garage_util::background::*; use garage_util::background::*;
use garage_util::crdt::*; use garage_util::crdt::*;
use garage_util::data::Uuid; use garage_util::data::Uuid;
use garage_util::error::*; use garage_util::error::*;
use garage_util::formater::format_table; use garage_util::formater::format_table;
use garage_util::time::*;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
use crate::cli::structs::WorkerListOpt;
pub fn print_bucket_list(bl: Vec<Bucket>) { pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:"); println!("List of buckets:");
@ -237,7 +241,7 @@ pub fn find_matching_node(
} }
} }
pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) { pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>(); let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| { wi.sort_by_key(|(tid, info)| {
( (
@ -252,23 +256,39 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>) {
let mut table = vec![]; let mut table = vec![];
for (tid, info) in wi.iter() { for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.status, WorkerStatus::Busy | WorkerStatus::Throttled(_)) {
continue;
}
if wlo.errors && info.errors == 0 {
continue;
}
table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name)); table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name));
if let Some(i) = &info.info { if let Some(i) = &info.info {
table.push(format!("\t\t{}", i)); table.push(format!("\t\t {}", i));
} }
let tf = timeago::Formatter::new();
let (err_ago, err_msg) = info
.last_error
.as_ref()
.map(|(m, t)| {
(
tf.convert(Duration::from_millis(now_msec() - t)),
m.as_str(),
)
})
.unwrap_or(("(?) ago".into(), "(?)"));
if info.consecutive_errors > 0 { if info.consecutive_errors > 0 {
table.push(format!( table.push(format!(
"\t\t{} CONSECUTIVE ERRORS ({} total), last: {}", "\t\t {} consecutive errors ({} total), last {}",
info.consecutive_errors, info.consecutive_errors, info.errors, err_ago,
info.errors,
info.last_error.as_deref().unwrap_or("(?)")
)); ));
table.push(format!("\t\t {}", err_msg));
} else if info.errors > 0 { } else if info.errors > 0 {
table.push(format!( table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
"\t\t{} errors, last: {}", if wlo.errors {
info.errors, table.push(format!("\t\t {}", err_msg));
info.last_error.as_deref().unwrap_or("(?)") }
));
} }
} }
format_table(table); format_table(table);

View file

@ -408,7 +408,11 @@ impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
} }
fn info(&self) -> Option<String> { fn info(&self) -> Option<String> {
if !self.buf.is_empty() {
Some(format!("{} items in queue", self.buf.len())) Some(format!("{} items in queue", self.buf.len()))
} else {
None
}
} }
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> { async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {

View file

@ -357,7 +357,10 @@ where
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
if *must_exit.borrow() {
return WorkerStatus::Done;
}
tokio::time::sleep(self.wait_delay).await; tokio::time::sleep(self.wait_delay).await;
WorkerStatus::Busy WorkerStatus::Busy
} }

View file

@ -329,7 +329,10 @@ where
self.0.updater_loop_iter() self.0.updater_loop_iter()
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
if *must_exit.borrow() {
return WorkerStatus::Done;
}
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
WorkerStatus::Busy WorkerStatus::Busy
} }

View file

@ -595,7 +595,10 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
if *must_exit.borrow() {
return WorkerStatus::Done;
}
select! { select! {
s = self.add_full_sync_rx.recv() => { s = self.add_full_sync_rx.recv() => {
if let Some(()) = s { if let Some(()) = s {

View file

@ -33,7 +33,7 @@ pub struct WorkerInfo {
pub status: WorkerStatus, pub status: WorkerStatus,
pub errors: usize, pub errors: usize,
pub consecutive_errors: usize, pub consecutive_errors: usize,
pub last_error: Option<String>, pub last_error: Option<(String, u64)>,
} }
impl BackgroundRunner { impl BackgroundRunner {

View file

@ -13,6 +13,7 @@ use tracing::*;
use crate::background::WorkerInfo; use crate::background::WorkerInfo;
use crate::error::Error; use crate::error::Error;
use crate::time::now_msec;
#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)]
pub enum WorkerStatus { pub enum WorkerStatus {
@ -167,7 +168,7 @@ impl WorkerProcessor {
select! { select! {
_ = drain_everything => { _ = drain_everything => {
info!("All workers exited in time \\o/"); info!("All workers exited peacefully \\o/");
} }
_ = tokio::time::sleep(Duration::from_secs(9)) => { _ = tokio::time::sleep(Duration::from_secs(9)) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle"); error!("Some workers could not exit in time, we are cancelling some things in the middle");
@ -176,7 +177,6 @@ impl WorkerProcessor {
} }
} }
// TODO add tranquilizer
struct WorkerHandler { struct WorkerHandler {
task_id: usize, task_id: usize,
stop_signal: watch::Receiver<bool>, stop_signal: watch::Receiver<bool>,
@ -185,7 +185,7 @@ struct WorkerHandler {
status: WorkerStatus, status: WorkerStatus,
errors: usize, errors: usize,
consecutive_errors: usize, consecutive_errors: usize,
last_error: Option<String>, last_error: Option<(String, u64)>,
} }
impl WorkerHandler { impl WorkerHandler {
@ -205,7 +205,7 @@ impl WorkerHandler {
); );
self.errors += 1; self.errors += 1;
self.consecutive_errors += 1; self.consecutive_errors += 1;
self.last_error = Some(format!("{}", e)); self.last_error = Some((format!("{}", e), now_msec()));
// Sleep a bit so that error won't repeat immediately, exponential backoff // Sleep a bit so that error won't repeat immediately, exponential backoff
// strategy (min 1sec, max ~60sec) // strategy (min 1sec, max ~60sec)
self.status = WorkerStatus::Throttled( self.status = WorkerStatus::Throttled(
@ -215,7 +215,12 @@ impl WorkerHandler {
}, },
WorkerStatus::Throttled(delay) => { WorkerStatus::Throttled(delay) => {
// Sleep for given delay and go back to busy state // Sleep for given delay and go back to busy state
tokio::time::sleep(Duration::from_secs_f32(delay)).await; if !*self.stop_signal.borrow() {
select! {
_ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
_ = self.stop_signal.changed() => (),
}
}
self.status = WorkerStatus::Busy; self.status = WorkerStatus::Busy;
} }
WorkerStatus::Idle => { WorkerStatus::Idle => {