Merge pull request 'Improve garage worker set and add garage worker get' (#464) from worker-get into main
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #464
This commit is contained in:
Alex 2023-01-04 13:47:42 +00:00
commit 329c0e64f9
14 changed files with 406 additions and 168 deletions

View file

@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db;
use garage_util::background::BackgroundRunner;
use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
use garage_util::persister::PersisterShared;
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System;
@ -89,6 +91,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
@ -128,6 +131,8 @@ impl BlockManager {
let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self {
replication,
data_dir,
@ -138,6 +143,7 @@ impl BlockManager {
system,
endpoint,
metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
@ -155,7 +161,28 @@ impl BlockManager {
// Spawn scrub worker
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
bg.spawn_worker(ScrubWorker::new(
self.clone(),
scrub_rx,
self.scrub_persister.clone(),
));
}
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
self.resync.register_bg_vars(vars);
vars.register_rw(
&self.scrub_persister,
"scrub-tranquility",
|p| p.get_with(|x| x.tranquility),
|p, tranquility| p.set_with(|x| x.tranquility = tranquility),
);
vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
});
vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
p.get_with(|x| x.corruptions_detected)
});
}
/// Ask nodes that might have a (possibly compressed) block for it

View file

@ -13,7 +13,7 @@ use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@ -168,17 +168,25 @@ pub struct ScrubWorker {
work: ScrubWorkerState,
tranquilizer: Tranquilizer,
persister: Persister<ScrubWorkerPersisted>,
persisted: ScrubWorkerPersisted,
persister: PersisterShared<ScrubWorkerPersisted>,
}
#[derive(Serialize, Deserialize)]
struct ScrubWorkerPersisted {
tranquility: u32,
time_last_complete_scrub: u64,
corruptions_detected: u64,
pub struct ScrubWorkerPersisted {
pub tranquility: u32,
pub(crate) time_last_complete_scrub: u64,
pub(crate) corruptions_detected: u64,
}
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
impl Default for ScrubWorkerPersisted {
fn default() -> Self {
ScrubWorkerPersisted {
time_last_complete_scrub: 0,
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
}
}
}
enum ScrubWorkerState {
Running(BlockStoreIterator),
@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration),
Resume,
Cancel,
SetTranquility(u32),
}
impl ScrubWorker {
pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
let persisted = match persister.load() {
Ok(v) => v,
Err(_) => ScrubWorkerPersisted {
time_last_complete_scrub: 0,
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
},
};
pub(crate) fn new(
manager: Arc<BlockManager>,
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
persister: PersisterShared<ScrubWorkerPersisted>,
) -> Self {
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
persister,
persisted,
}
}
@ -267,12 +268,6 @@ impl ScrubWorker {
}
}
}
ScrubWorkerCommand::SetTranquility(t) => {
self.persisted.tranquility = t;
if let Err(e) = self.persister.save_async(&self.persisted).await {
error!("Could not save new tranquilitiy value: {}", e);
}
}
}
}
}
@ -284,9 +279,18 @@ impl Worker for ScrubWorker {
}
fn status(&self) -> WorkerStatus {
let (corruptions_detected, tranquility, time_last_complete_scrub) =
self.persister.get_with(|p| {
(
p.corruptions_detected,
p.tranquility,
p.time_last_complete_scrub,
)
});
let mut s = WorkerStatus {
persistent_errors: Some(self.persisted.corruptions_detected),
tranquility: Some(self.persisted.tranquility),
persistent_errors: Some(corruptions_detected),
tranquility: Some(tranquility),
..Default::default()
};
match &self.work {
@ -300,7 +304,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Finished => {
s.freeform = vec![format!(
"Last scrub completed at {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub)
msec_to_rfc3339(time_last_complete_scrub)
)];
}
}
@ -321,18 +325,17 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
self.persisted.corruptions_detected += 1;
self.persister.save_async(&self.persisted).await?;
self.persister.set_with(|p| p.corruptions_detected += 1)?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
.tranquilize_worker(self.persisted.tranquility))
.tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else {
self.persisted.time_last_complete_scrub = now_msec();
self.persister.save_async(&self.persisted).await?;
self.persister
.set_with(|p| p.time_last_complete_scrub = now_msec())?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
Ok(WorkerState::Idle)
@ -347,7 +350,8 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
self.persister.get_with(|p| p.time_last_complete_scrub)
+ SCRUB_INTERVAL.as_millis() as u64,
ScrubWorkerCommand::Start,
),
};

View file

@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use arc_swap::ArcSwap;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
use garage_util::persister::Persister;
use garage_util::persister::PersisterShared;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager {
pub(crate) queue: CountedTree,
pub(crate) notify: Notify,
pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree,
busy_set: BusySet,
persister: Persister<ResyncPersistedConfig>,
persisted: ArcSwap<ResyncPersistedConfig>,
persister: PersisterShared<ResyncPersistedConfig>,
}
#[derive(Serialize, Deserialize, Clone, Copy)]
@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32,
}
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
impl Default for ResyncPersistedConfig {
fn default() -> Self {
ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY,
}
}
}
enum ResyncIterResult {
BusyDidSomething,
@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
let persister = Persister::new(&system.metadata_dir, "resync_cfg");
let persisted = match persister.load() {
Ok(v) => v,
Err(_) => ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY,
},
};
let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
Self {
queue,
notify: Notify::new(),
notify: Arc::new(Notify::new()),
errors,
busy_set: Arc::new(Mutex::new(HashSet::new())),
persister,
persisted: ArcSwap::new(Arc::new(persisted)),
}
}
@ -142,6 +140,38 @@ impl BlockResyncManager {
)))
}
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
let notify = self.notify.clone();
vars.register_rw(
&self.persister,
"resync-worker-count",
|p| p.get_with(|x| x.n_workers),
move |p, n_workers| {
if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
p.set_with(|x| x.n_workers = n_workers)?;
notify.notify_waiters();
Ok(())
},
);
let notify = self.notify.clone();
vars.register_rw(
&self.persister,
"resync-tranquility",
|p| p.get_with(|x| x.tranquility),
move |p, tranquility| {
p.set_with(|x| x.tranquility = tranquility)?;
notify.notify_waiters();
Ok(())
},
);
}
// ---- Resync loop ----
// This part manages a queue of blocks that need to be
@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(())
}
async fn update_persisted(
&self,
update: impl Fn(&mut ResyncPersistedConfig),
) -> Result<(), Error> {
let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
update(&mut cfg);
self.persister.save_async(&cfg).await?;
self.persisted.store(Arc::new(cfg));
self.notify.notify_waiters();
Ok(())
}
pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
self.update_persisted(|cfg| cfg.n_workers = n_workers).await
}
pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
self.update_persisted(|cfg| cfg.tranquility = tranquility)
.await
}
}
impl Drop for BusyBlock {
@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
next_delay: Duration,
persister: PersisterShared<ResyncPersistedConfig>,
}
impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
let persister = manager.resync.persister.clone();
Self {
index,
manager,
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
persister,
}
}
}
@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
}
fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load();
let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
if self.index >= persisted.n_workers {
if self.index >= n_workers {
return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()],
..Default::default()
@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
tranquility: Some(persisted.tranquility),
tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default()
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if self.index >= self.manager.resync.persisted.load().n_workers {
let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
if self.index >= n_workers {
return Ok(WorkerState::Idle);
}
self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self
.tranquilizer
.tranquilize_worker(self.manager.resync.persisted.load().tranquility)),
Ok(ResyncIterResult::BusyDidSomething) => {
Ok(self.tranquilizer.tranquilize_worker(tranquility))
}
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
}
async fn wait_for_work(&mut self) -> WorkerState {
while self.index >= self.manager.resync.persisted.load().n_workers {
while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await
}

View file

@ -18,7 +18,6 @@ use garage_table::*;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
@ -60,6 +59,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
@ -943,32 +943,101 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
.await?;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
WorkerOperation::Get {
all_nodes,
variable,
} => self.handle_get_var(*all_nodes, variable).await,
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.handle_set_var(*all_nodes, variable, value).await,
}
}
async fn handle_get_var(
&self,
all_nodes: bool,
variable: &Option<String>,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.layout.node_ids().iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Get {
all_nodes: false,
variable: variable.clone(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
WorkerSetCmd::ResyncWorkerCount { worker_count } => {
self.garage
.block_manager
.resync
.set_n_workers(*worker_count)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
Ok(AdminRpc::WorkerVars(ret))
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(v) = variable {
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
v.clone(),
self.garage.bg_vars.get(v)?,
)]))
} else {
let mut vars = self.garage.bg_vars.get_all();
vars.sort();
Ok(AdminRpc::WorkerVars(
vars.into_iter()
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
.collect(),
))
}
}
}
async fn handle_set_var(
&self,
all_nodes: bool,
variable: &str,
value: &str,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.layout.node_ids().iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Set {
all_nodes: false,
variable: variable.to_string(),
value: value.to_string(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
WorkerSetCmd::ResyncTranquility { tranquility } => {
self.garage
.block_manager
.resync
.set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
}
Ok(AdminRpc::WorkerVars(ret))
} else {
self.garage.bg_vars.set(variable, value)?;
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
variable.to_string(),
value.to_string(),
)]))
}
}

View file

@ -191,6 +191,9 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
AdminRpc::WorkerVars(wv) => {
print_worker_vars(wv);
}
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}

View file

@ -517,11 +517,25 @@ pub enum WorkerOperation {
/// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())]
Info { tid: usize },
/// Get worker parameter
#[structopt(name = "get", version = garage_version())]
Get {
/// Gather variable values from all nodes
#[structopt(short = "a", long = "all-nodes")]
all_nodes: bool,
/// Variable name to get, or none to get all variables
variable: Option<String>,
},
/// Set worker parameter
#[structopt(name = "set", version = garage_version())]
Set {
#[structopt(subcommand)]
opt: WorkerSetCmd,
/// Set variable values on all nodes
#[structopt(short = "a", long = "all-nodes")]
all_nodes: bool,
/// Variable node to set
variable: String,
/// Value to set the variable to
value: String,
},
}
@ -535,19 +549,6 @@ pub struct WorkerListOpt {
pub errors: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerSetCmd {
/// Set tranquility of scrub operations
#[structopt(name = "scrub-tranquility", version = garage_version())]
ScrubTranquility { tranquility: u32 },
/// Set number of concurrent block resync workers
#[structopt(name = "resync-worker-count", version = garage_version())]
ResyncWorkerCount { worker_count: usize },
/// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error

View file

@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table);
}
pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
let table = wv
.into_iter()
.map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
.collect::<Vec<_>>();
format_table(table);
}
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();

View file

@ -51,7 +51,11 @@ pub async fn launch_online_repair(
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
ScrubWorkerCommand::SetTranquility(tranquility)
garage
.block_manager
.scrub_persister
.set_with(|x| x.tranquility = tranquility)?;
return Ok(());
}
};
info!("Sending command to scrub worker: {:?}", cmd);

View file

@ -33,6 +33,8 @@ use crate::k2v::{item_table::*, poll::*, rpc::*};
pub struct Garage {
/// The parsed configuration Garage is running
pub config: Config,
/// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars,
/// The replication mode of this cluster
pub replication_mode: ReplicationMode,
@ -249,9 +251,14 @@ impl Garage {
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
// Initialize bg vars
let mut bg_vars = vars::BgVars::new();
block_manager.register_bg_vars(&mut bg_vars);
// -- done --
Ok(Arc::new(Self {
config,
bg_vars,
replication_mode,
db,
system,

View file

@ -1,5 +1,6 @@
//! Job runner for futures and async functions
pub mod vars;
pub mod worker;
use std::collections::HashMap;

113
src/util/background/vars.rs Normal file
View file

@ -0,0 +1,113 @@
use std::collections::HashMap;
use std::str::FromStr;
use crate::error::{Error, OkOrMessage};
use crate::migrate::Migrate;
use crate::persister::PersisterShared;
pub struct BgVars {
vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
}
impl BgVars {
pub fn new() -> Self {
Self {
vars: HashMap::new(),
}
}
pub fn register_rw<V, T, GF, SF>(
&mut self,
p: &PersisterShared<V>,
name: &'static str,
get_fn: GF,
set_fn: SF,
) where
V: Migrate + Default + Send + Sync,
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
{
let p1 = p.clone();
let get_fn = move || get_fn(&p1);
let p2 = p.clone();
let set_fn = move |v| set_fn(&p2, v);
self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
}
pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
where
V: Migrate + Default + Send + Sync,
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
{
let p1 = p.clone();
let get_fn = move || get_fn(&p1);
let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
}
pub fn get(&self, var: &str) -> Result<String, Error> {
Ok(self
.vars
.get(var)
.ok_or_message("variable does not exist")?
.get())
}
pub fn get_all(&self) -> Vec<(&'static str, String)> {
self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
}
pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
self.vars
.get(var)
.ok_or_message("variable does not exist")?
.set(val)
}
}
impl Default for BgVars {
fn default() -> Self {
Self::new()
}
}
// ----
trait BgVarTrait: Send + Sync + 'static {
fn get(&self) -> String;
fn set(&self, v: &str) -> Result<(), Error>;
}
struct BgVar<T, GF, SF>
where
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn() -> T + Send + Sync + 'static,
SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
{
get_fn: GF,
set_fn: SF,
}
impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
where
T: FromStr + ToString + Sync + Send + 'static,
GF: Fn() -> T + Sync + Send + 'static,
SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
{
fn get(&self) -> String {
(self.get_fn)().to_string()
}
fn set(&self, vstr: &str) -> Result<(), Error> {
let value = vstr
.parse()
.map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
(self.set_fn)(value)
}
}

View file

@ -15,6 +15,5 @@ pub mod metrics;
pub mod migrate;
pub mod persister;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;
pub mod version;

View file

@ -1,5 +1,6 @@
use std::io::{Read, Write};
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> {
Ok(())
}
}
pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
impl<V: Migrate + Default> Clone for PersisterShared<V> {
fn clone(&self) -> PersisterShared<V> {
PersisterShared(self.0.clone())
}
}
impl<V: Migrate + Default> PersisterShared<V> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let persister = Persister::new(base_dir, file_name);
let value = persister.load().unwrap_or_default();
Self(Arc::new((persister, RwLock::new(value))))
}
pub fn get_with<F, R>(&self, f: F) -> R
where
F: FnOnce(&V) -> R,
{
let value = self.0 .1.read().unwrap();
f(&value)
}
pub fn set_with<F>(&self, f: F) -> Result<(), Error>
where
F: FnOnce(&mut V),
{
let mut value = self.0 .1.write().unwrap();
f(&mut value);
self.0 .0.save(&value)
}
}

View file

@ -1,40 +0,0 @@
use std::time::{Duration, Instant};
use tokio::time::sleep;
pub struct TokenBucket {
// Replenish rate: number of tokens per second
replenish_rate: u64,
// Current number of tokens
tokens: u64,
// Last replenish time
last_replenish: Instant,
}
impl TokenBucket {
pub fn new(replenish_rate: u64) -> Self {
Self {
replenish_rate,
tokens: 0,
last_replenish: Instant::now(),
}
}
pub async fn take(&mut self, tokens: u64) {
while self.tokens < tokens {
let needed = tokens - self.tokens;
let delay = (needed as f64) / (self.replenish_rate as f64);
sleep(Duration::from_secs_f64(delay)).await;
self.replenish();
}
self.tokens -= tokens;
}
pub fn replenish(&mut self) {
let now = Instant::now();
let new_tokens =
((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
self.tokens += new_tokens;
self.last_replenish = now;
}
}