diff --git a/src/block/manager.rs b/src/block/manager.rs index 1b5a5df0..19841d64 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream}; use garage_db as db; -use garage_util::background::BackgroundRunner; +use garage_util::background::{vars, BackgroundRunner}; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; +use garage_util::persister::PersisterShared; +use garage_util::time::msec_to_rfc3339; use garage_rpc::rpc_helper::OrderTag; use garage_rpc::system::System; @@ -89,6 +91,7 @@ pub struct BlockManager { pub(crate) metrics: BlockManagerMetrics, + pub scrub_persister: PersisterShared, tx_scrub_command: ArcSwapOption>, } @@ -128,6 +131,8 @@ impl BlockManager { let metrics = BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); + let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); + let block_manager = Arc::new(Self { replication, data_dir, @@ -138,6 +143,7 @@ impl BlockManager { system, endpoint, metrics, + scrub_persister, tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); @@ -155,7 +161,28 @@ impl BlockManager { // Spawn scrub worker let (scrub_tx, scrub_rx) = mpsc::channel(1); self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); - bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); + bg.spawn_worker(ScrubWorker::new( + self.clone(), + scrub_rx, + self.scrub_persister.clone(), + )); + } + + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + self.resync.register_bg_vars(vars); + + vars.register_rw( + &self.scrub_persister, + "scrub-tranquility", + |p| p.get_with(|x| x.tranquility), + |p, tranquility| p.set_with(|x| x.tranquility = tranquility), + ); + vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| { + p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub)) + }); + vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| { + p.get_with(|x| x.corruptions_detected) + }); } /// Ask nodes that might have a (possibly compressed) block for it diff --git a/src/block/repair.rs b/src/block/repair.rs index a6ded65a..064cc005 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -13,7 +13,7 @@ use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -168,17 +168,25 @@ pub struct ScrubWorker { work: ScrubWorkerState, tranquilizer: Tranquilizer, - persister: Persister, - persisted: ScrubWorkerPersisted, + persister: PersisterShared, } #[derive(Serialize, Deserialize)] -struct ScrubWorkerPersisted { - tranquility: u32, - time_last_complete_scrub: u64, - corruptions_detected: u64, +pub struct ScrubWorkerPersisted { + pub tranquility: u32, + pub(crate) time_last_complete_scrub: u64, + pub(crate) corruptions_detected: u64, } impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} +impl Default for ScrubWorkerPersisted { + fn default() -> Self { + ScrubWorkerPersisted { + time_last_complete_scrub: 0, + tranquility: INITIAL_SCRUB_TRANQUILITY, + corruptions_detected: 0, + } + } +} enum ScrubWorkerState { Running(BlockStoreIterator), @@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand { Pause(Duration), Resume, Cancel, - SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc, rx_cmd: mpsc::Receiver) -> Self { - let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ScrubWorkerPersisted { - time_last_complete_scrub: 0, - tranquility: INITIAL_SCRUB_TRANQUILITY, - corruptions_detected: 0, - }, - }; + pub(crate) fn new( + manager: Arc, + rx_cmd: mpsc::Receiver, + persister: PersisterShared, + ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, - persisted, } } @@ -267,12 +268,6 @@ impl ScrubWorker { } } } - ScrubWorkerCommand::SetTranquility(t) => { - self.persisted.tranquility = t; - if let Err(e) = self.persister.save_async(&self.persisted).await { - error!("Could not save new tranquilitiy value: {}", e); - } - } } } } @@ -284,9 +279,18 @@ impl Worker for ScrubWorker { } fn status(&self) -> WorkerStatus { + let (corruptions_detected, tranquility, time_last_complete_scrub) = + self.persister.get_with(|p| { + ( + p.corruptions_detected, + p.tranquility, + p.time_last_complete_scrub, + ) + }); + let mut s = WorkerStatus { - persistent_errors: Some(self.persisted.corruptions_detected), - tranquility: Some(self.persisted.tranquility), + persistent_errors: Some(corruptions_detected), + tranquility: Some(tranquility), ..Default::default() }; match &self.work { @@ -300,7 +304,7 @@ impl Worker for ScrubWorker { ScrubWorkerState::Finished => { s.freeform = vec![format!( "Last scrub completed at {}", - msec_to_rfc3339(self.persisted.time_last_complete_scrub) + msec_to_rfc3339(time_last_complete_scrub) )]; } } @@ -321,18 +325,17 @@ impl Worker for ScrubWorker { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); - self.persisted.corruptions_detected += 1; - self.persister.save_async(&self.persisted).await?; + self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer - .tranquilize_worker(self.persisted.tranquility)) + .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { - self.persisted.time_last_complete_scrub = now_msec(); - self.persister.save_async(&self.persisted).await?; + self.persister + .set_with(|p| p.time_last_complete_scrub = now_msec())?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); Ok(WorkerState::Idle) @@ -347,7 +350,8 @@ impl Worker for ScrubWorker { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( - self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, + self.persister.get_with(|p| p.time_last_complete_scrub) + + SCRUB_INTERVAL.as_millis() as u64, ScrubWorkerCommand::Start, ), }; diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c7b3b0e..ea280ad4 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -3,7 +3,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::Duration; -use arc_swap::ArcSwap; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -22,7 +21,7 @@ use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::metrics::RecordDuration; -use garage_util::persister::Persister; +use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; @@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2; pub struct BlockResyncManager { pub(crate) queue: CountedTree, - pub(crate) notify: Notify, + pub(crate) notify: Arc, pub(crate) errors: CountedTree, busy_set: BusySet, - persister: Persister, - persisted: ArcSwap, + persister: PersisterShared, } #[derive(Serialize, Deserialize, Clone, Copy)] @@ -64,6 +62,14 @@ struct ResyncPersistedConfig { tranquility: u32, } impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} +impl Default for ResyncPersistedConfig { + fn default() -> Self { + ResyncPersistedConfig { + n_workers: 1, + tranquility: INITIAL_RESYNC_TRANQUILITY, + } + } +} enum ResyncIterResult { BusyDidSomething, @@ -91,22 +97,14 @@ impl BlockResyncManager { .expect("Unable to open block_local_resync_errors tree"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); - let persister = Persister::new(&system.metadata_dir, "resync_cfg"); - let persisted = match persister.load() { - Ok(v) => v, - Err(_) => ResyncPersistedConfig { - n_workers: 1, - tranquility: INITIAL_RESYNC_TRANQUILITY, - }, - }; + let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg"); Self { queue, - notify: Notify::new(), + notify: Arc::new(Notify::new()), errors, busy_set: Arc::new(Mutex::new(HashSet::new())), persister, - persisted: ArcSwap::new(Arc::new(persisted)), } } @@ -142,6 +140,38 @@ impl BlockResyncManager { ))) } + pub fn register_bg_vars(&self, vars: &mut vars::BgVars) { + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-worker-count", + |p| p.get_with(|x| x.n_workers), + move |p, n_workers| { + if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { + return Err(Error::Message(format!( + "Invalid number of resync workers, must be between 1 and {}", + MAX_RESYNC_WORKERS + ))); + } + p.set_with(|x| x.n_workers = n_workers)?; + notify.notify_waiters(); + Ok(()) + }, + ); + + let notify = self.notify.clone(); + vars.register_rw( + &self.persister, + "resync-tranquility", + |p| p.get_with(|x| x.tranquility), + move |p, tranquility| { + p.set_with(|x| x.tranquility = tranquility)?; + notify.notify_waiters(); + Ok(()) + }, + ); + } + // ---- Resync loop ---- // This part manages a queue of blocks that need to be @@ -436,33 +466,6 @@ impl BlockResyncManager { Ok(()) } - - async fn update_persisted( - &self, - update: impl Fn(&mut ResyncPersistedConfig), - ) -> Result<(), Error> { - let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref(); - update(&mut cfg); - self.persister.save_async(&cfg).await?; - self.persisted.store(Arc::new(cfg)); - self.notify.notify_waiters(); - Ok(()) - } - - pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> { - if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) { - return Err(Error::Message(format!( - "Invalid number of resync workers, must be between 1 and {}", - MAX_RESYNC_WORKERS - ))); - } - self.update_persisted(|cfg| cfg.n_workers = n_workers).await - } - - pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { - self.update_persisted(|cfg| cfg.tranquility = tranquility) - .await - } } impl Drop for BusyBlock { @@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker { manager: Arc, tranquilizer: Tranquilizer, next_delay: Duration, + persister: PersisterShared, } impl ResyncWorker { pub(crate) fn new(index: usize, manager: Arc) -> Self { + let persister = manager.resync.persister.clone(); Self { index, manager, tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), + persister, } } } @@ -497,9 +503,9 @@ impl Worker for ResyncWorker { } fn status(&self) -> WorkerStatus { - let persisted = self.manager.resync.persisted.load(); + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); - if self.index >= persisted.n_workers { + if self.index >= n_workers { return WorkerStatus { freeform: vec!["This worker is currently disabled".into()], ..Default::default() @@ -508,22 +514,24 @@ impl Worker for ResyncWorker { WorkerStatus { queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), - tranquility: Some(persisted.tranquility), + tranquility: Some(tranquility), persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), ..Default::default() } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { - if self.index >= self.manager.resync.persisted.load().n_workers { + let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility)); + + if self.index >= n_workers { return Ok(WorkerState::Idle); } self.tranquilizer.reset(); match self.manager.resync.resync_iter(&self.manager).await { - Ok(ResyncIterResult::BusyDidSomething) => Ok(self - .tranquilizer - .tranquilize_worker(self.manager.resync.persisted.load().tranquility)), + Ok(ResyncIterResult::BusyDidSomething) => { + Ok(self.tranquilizer.tranquilize_worker(tranquility)) + } Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::IdleFor(delay)) => { self.next_delay = delay; @@ -542,7 +550,7 @@ impl Worker for ResyncWorker { } async fn wait_for_work(&mut self) -> WorkerState { - while self.index >= self.manager.resync.persisted.load().n_workers { + while self.index >= self.persister.get_with(|x| x.n_workers) { self.manager.resync.notify.notified().await } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index c669b5e6..305c5c65 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -18,7 +18,6 @@ use garage_table::*; use garage_rpc::*; use garage_block::manager::BlockResyncErrorInfo; -use garage_block::repair::ScrubWorkerCommand; use garage_model::bucket_alias_table::*; use garage_model::bucket_table::*; @@ -60,6 +59,7 @@ pub enum AdminRpc { HashMap, WorkerListOpt, ), + WorkerVars(Vec<(Uuid, String, String)>), WorkerInfo(usize, garage_util::background::WorkerInfo), BlockErrorList(Vec), BlockInfo { @@ -943,32 +943,101 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Set { opt } => match opt { - WorkerSetCmd::ScrubTranquility { tranquility } => { - let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); - self.garage - .block_manager - .send_scrub_command(scrub_command) - .await?; - Ok(AdminRpc::Ok("Scrub tranquility updated".into())) + WorkerOperation::Get { + all_nodes, + variable, + } => self.handle_get_var(*all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.handle_set_var(*all_nodes, variable, value).await, + } + } + + async fn handle_get_var( + &self, + all_nodes: bool, + variable: &Option, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Get { + all_nodes: false, + variable: variable.clone(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } - WorkerSetCmd::ResyncWorkerCount { worker_count } => { - self.garage - .block_manager - .resync - .set_n_workers(*worker_count) - .await?; - Ok(AdminRpc::Ok("Number of resync workers updated".into())) + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + #[allow(clippy::collapsible_else_if)] + if let Some(v) = variable { + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + v.clone(), + self.garage.bg_vars.get(v)?, + )])) + } else { + let mut vars = self.garage.bg_vars.get_all(); + vars.sort(); + Ok(AdminRpc::WorkerVars( + vars.into_iter() + .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) + .collect(), + )) + } + } + } + + async fn handle_set_var( + &self, + all_nodes: bool, + variable: &str, + value: &str, + ) -> Result { + if all_nodes { + let mut ret = vec![]; + let ring = self.garage.system.ring.borrow().clone(); + for node in ring.layout.node_ids().iter() { + let node = (*node).into(); + match self + .endpoint + .call( + &node, + AdminRpc::Worker(WorkerOperation::Set { + all_nodes: false, + variable: variable.to_string(), + value: value.to_string(), + }), + PRIO_NORMAL, + ) + .await?? + { + AdminRpc::WorkerVars(v) => ret.extend(v), + m => return Err(GarageError::unexpected_rpc_message(m).into()), } - WorkerSetCmd::ResyncTranquility { tranquility } => { - self.garage - .block_manager - .resync - .set_tranquility(*tranquility) - .await?; - Ok(AdminRpc::Ok("Resync tranquility updated".into())) - } - }, + } + Ok(AdminRpc::WorkerVars(ret)) + } else { + self.garage.bg_vars.set(variable, value)?; + Ok(AdminRpc::WorkerVars(vec![( + self.garage.system.id, + variable.to_string(), + value.to_string(), + )])) } } diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index 6c5598b1..46e9113c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -191,6 +191,9 @@ pub async fn cmd_admin( AdminRpc::WorkerList(wi, wlo) => { print_worker_list(wi, wlo); } + AdminRpc::WorkerVars(wv) => { + print_worker_vars(wv); + } AdminRpc::WorkerInfo(tid, wi) => { print_worker_info(tid, wi); } diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index e2f632f3..661a71f0 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -517,11 +517,25 @@ pub enum WorkerOperation { /// Get detailed information about a worker #[structopt(name = "info", version = garage_version())] Info { tid: usize }, + /// Get worker parameter + #[structopt(name = "get", version = garage_version())] + Get { + /// Gather variable values from all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable name to get, or none to get all variables + variable: Option, + }, /// Set worker parameter #[structopt(name = "set", version = garage_version())] Set { - #[structopt(subcommand)] - opt: WorkerSetCmd, + /// Set variable values on all nodes + #[structopt(short = "a", long = "all-nodes")] + all_nodes: bool, + /// Variable node to set + variable: String, + /// Value to set the variable to + value: String, }, } @@ -535,19 +549,6 @@ pub struct WorkerListOpt { pub errors: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] -pub enum WorkerSetCmd { - /// Set tranquility of scrub operations - #[structopt(name = "scrub-tranquility", version = garage_version())] - ScrubTranquility { tranquility: u32 }, - /// Set number of concurrent block resync workers - #[structopt(name = "resync-worker-count", version = garage_version())] - ResyncWorkerCount { worker_count: usize }, - /// Set tranquility of block resync operations - #[structopt(name = "resync-tranquility", version = garage_version())] - ResyncTranquility { tranquility: u32 }, -} - #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 63fd9eba..230ce3de 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) { format_table(table); } +pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { + let table = wv + .into_iter() + .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) + .collect::>(); + format_table(table); +} + pub fn print_block_error_list(el: Vec) { let now = now_msec(); let tf = timeago::Formatter::new(); diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 627e3bf3..0e14ed51 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -51,7 +51,11 @@ pub async fn launch_online_repair( ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::SetTranquility { tranquility } => { - ScrubWorkerCommand::SetTranquility(tranquility) + garage + .block_manager + .scrub_persister + .set_with(|x| x.tranquility = tranquility)?; + return Ok(()); } }; info!("Sending command to scrub worker: {:?}", cmd); diff --git a/src/model/garage.rs b/src/model/garage.rs index 5bea6b4f..ac1846ce 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -33,6 +33,8 @@ use crate::k2v::{item_table::*, poll::*, rpc::*}; pub struct Garage { /// The parsed configuration Garage is running pub config: Config, + /// The set of background variables that can be viewed/modified at runtime + pub bg_vars: vars::BgVars, /// The replication mode of this cluster pub replication_mode: ReplicationMode, @@ -249,9 +251,14 @@ impl Garage { #[cfg(feature = "k2v")] let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); + // Initialize bg vars + let mut bg_vars = vars::BgVars::new(); + block_manager.register_bg_vars(&mut bg_vars); + // -- done -- Ok(Arc::new(Self { config, + bg_vars, replication_mode, db, system, diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 41b48e93..607cd7a3 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -1,5 +1,6 @@ //! Job runner for futures and async functions +pub mod vars; pub mod worker; use std::collections::HashMap; diff --git a/src/util/background/vars.rs b/src/util/background/vars.rs new file mode 100644 index 00000000..7a449c95 --- /dev/null +++ b/src/util/background/vars.rs @@ -0,0 +1,113 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use crate::error::{Error, OkOrMessage}; +use crate::migrate::Migrate; +use crate::persister::PersisterShared; + +pub struct BgVars { + vars: HashMap<&'static str, Box>, +} + +impl BgVars { + pub fn new() -> Self { + Self { + vars: HashMap::new(), + } + } + + pub fn register_rw( + &mut self, + p: &PersisterShared, + name: &'static str, + get_fn: GF, + set_fn: SF, + ) where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared) -> T + Send + Sync + 'static, + SF: Fn(&PersisterShared, T) -> Result<(), Error> + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let p2 = p.clone(); + let set_fn = move |v| set_fn(&p2, v); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn register_ro(&mut self, p: &PersisterShared, name: &'static str, get_fn: GF) + where + V: Migrate + Default + Send + Sync, + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn(&PersisterShared) -> T + Send + Sync + 'static, + { + let p1 = p.clone(); + let get_fn = move || get_fn(&p1); + + let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name))); + + self.vars.insert(name, Box::new(BgVar { get_fn, set_fn })); + } + + pub fn get(&self, var: &str) -> Result { + Ok(self + .vars + .get(var) + .ok_or_message("variable does not exist")? + .get()) + } + + pub fn get_all(&self) -> Vec<(&'static str, String)> { + self.vars.iter().map(|(k, v)| (*k, v.get())).collect() + } + + pub fn set(&self, var: &str, val: &str) -> Result<(), Error> { + self.vars + .get(var) + .ok_or_message("variable does not exist")? + .set(val) + } +} + +impl Default for BgVars { + fn default() -> Self { + Self::new() + } +} + +// ---- + +trait BgVarTrait: Send + Sync + 'static { + fn get(&self) -> String; + fn set(&self, v: &str) -> Result<(), Error>; +} + +struct BgVar +where + T: FromStr + ToString + Send + Sync + 'static, + GF: Fn() -> T + Send + Sync + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + get_fn: GF, + set_fn: SF, +} + +impl BgVarTrait for BgVar +where + T: FromStr + ToString + Sync + Send + 'static, + GF: Fn() -> T + Sync + Send + 'static, + SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static, +{ + fn get(&self) -> String { + (self.get_fn)().to_string() + } + + fn set(&self, vstr: &str) -> Result<(), Error> { + let value = vstr + .parse() + .map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?; + (self.set_fn)(value) + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index be82061f..d35ca72f 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -15,6 +15,5 @@ pub mod metrics; pub mod migrate; pub mod persister; pub mod time; -pub mod token_bucket; pub mod tranquilizer; pub mod version; diff --git a/src/util/persister.rs b/src/util/persister.rs index 4b9adf51..5c66bbed 100644 --- a/src/util/persister.rs +++ b/src/util/persister.rs @@ -1,5 +1,6 @@ use std::io::{Read, Write}; use std::path::{Path, PathBuf}; +use std::sync::{Arc, RwLock}; use tokio::io::{AsyncReadExt, AsyncWriteExt}; @@ -84,3 +85,36 @@ impl Persister { Ok(()) } } + +pub struct PersisterShared(Arc<(Persister, RwLock)>); + +impl Clone for PersisterShared { + fn clone(&self) -> PersisterShared { + PersisterShared(self.0.clone()) + } +} + +impl PersisterShared { + pub fn new(base_dir: &Path, file_name: &str) -> Self { + let persister = Persister::new(base_dir, file_name); + let value = persister.load().unwrap_or_default(); + Self(Arc::new((persister, RwLock::new(value)))) + } + + pub fn get_with(&self, f: F) -> R + where + F: FnOnce(&V) -> R, + { + let value = self.0 .1.read().unwrap(); + f(&value) + } + + pub fn set_with(&self, f: F) -> Result<(), Error> + where + F: FnOnce(&mut V), + { + let mut value = self.0 .1.write().unwrap(); + f(&mut value); + self.0 .0.save(&value) + } +} diff --git a/src/util/token_bucket.rs b/src/util/token_bucket.rs deleted file mode 100644 index cc0dfa1f..00000000 --- a/src/util/token_bucket.rs +++ /dev/null @@ -1,40 +0,0 @@ -use std::time::{Duration, Instant}; - -use tokio::time::sleep; - -pub struct TokenBucket { - // Replenish rate: number of tokens per second - replenish_rate: u64, - // Current number of tokens - tokens: u64, - // Last replenish time - last_replenish: Instant, -} - -impl TokenBucket { - pub fn new(replenish_rate: u64) -> Self { - Self { - replenish_rate, - tokens: 0, - last_replenish: Instant::now(), - } - } - - pub async fn take(&mut self, tokens: u64) { - while self.tokens < tokens { - let needed = tokens - self.tokens; - let delay = (needed as f64) / (self.replenish_rate as f64); - sleep(Duration::from_secs_f64(delay)).await; - self.replenish(); - } - self.tokens -= tokens; - } - - pub fn replenish(&mut self) { - let now = Instant::now(); - let new_tokens = - ((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64; - self.tokens += new_tokens; - self.last_replenish = now; - } -}