Uniform framework for bg variable management
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Alex 2023-01-04 13:07:13 +01:00
parent 13c5549886
commit f3f27293df
Signed by: lx
GPG Key ID: 0E496D15096376BE
12 changed files with 315 additions and 130 deletions

View File

@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db; use garage_db as db;
use garage_util::background::BackgroundRunner; use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::persister::PersisterShared;
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag; use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -89,6 +91,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics, pub(crate) metrics: BlockManagerMetrics,
pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
} }
@ -128,6 +131,8 @@ impl BlockManager {
let metrics = let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self { let block_manager = Arc::new(Self {
replication, replication,
data_dir, data_dir,
@ -138,6 +143,7 @@ impl BlockManager {
system, system,
endpoint, endpoint,
metrics, metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None), tx_scrub_command: ArcSwapOption::new(None),
}); });
block_manager.endpoint.set_handler(block_manager.clone()); block_manager.endpoint.set_handler(block_manager.clone());
@ -155,7 +161,28 @@ impl BlockManager {
// Spawn scrub worker // Spawn scrub worker
let (scrub_tx, scrub_rx) = mpsc::channel(1); let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); bg.spawn_worker(ScrubWorker::new(
self.clone(),
scrub_rx,
self.scrub_persister.clone(),
));
}
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
self.resync.register_bg_vars(vars);
vars.register_rw(
&self.scrub_persister,
"scrub-tranquility",
|p| p.get_with(|x| x.tranquility),
|p, tranquility| p.set_with(|x| x.tranquility = tranquility),
);
vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
});
vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
p.get_with(|x| x.corruptions_detected)
});
} }
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it

View File

@ -13,7 +13,7 @@ use tokio::sync::watch;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::persister::Persister; use garage_util::persister::PersisterShared;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -168,17 +168,25 @@ pub struct ScrubWorker {
work: ScrubWorkerState, work: ScrubWorkerState,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
persister: Persister<ScrubWorkerPersisted>, persister: PersisterShared<ScrubWorkerPersisted>,
persisted: ScrubWorkerPersisted,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct ScrubWorkerPersisted { pub struct ScrubWorkerPersisted {
tranquility: u32, pub tranquility: u32,
time_last_complete_scrub: u64, pub(crate) time_last_complete_scrub: u64,
corruptions_detected: u64, pub(crate) corruptions_detected: u64,
} }
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
impl Default for ScrubWorkerPersisted {
fn default() -> Self {
ScrubWorkerPersisted {
time_last_complete_scrub: 0,
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
}
}
}
enum ScrubWorkerState { enum ScrubWorkerState {
Running(BlockStoreIterator), Running(BlockStoreIterator),
@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration), Pause(Duration),
Resume, Resume,
Cancel, Cancel,
SetTranquility(u32),
} }
impl ScrubWorker { impl ScrubWorker {
pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { pub(crate) fn new(
let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); manager: Arc<BlockManager>,
let persisted = match persister.load() { rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
Ok(v) => v, persister: PersisterShared<ScrubWorkerPersisted>,
Err(_) => ScrubWorkerPersisted { ) -> Self {
time_last_complete_scrub: 0,
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
},
};
Self { Self {
manager, manager,
rx_cmd, rx_cmd,
work: ScrubWorkerState::Finished, work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
persister, persister,
persisted,
} }
} }
@ -267,12 +268,6 @@ impl ScrubWorker {
} }
} }
} }
ScrubWorkerCommand::SetTranquility(t) => {
self.persisted.tranquility = t;
if let Err(e) = self.persister.save_async(&self.persisted).await {
error!("Could not save new tranquilitiy value: {}", e);
}
}
} }
} }
} }
@ -284,9 +279,18 @@ impl Worker for ScrubWorker {
} }
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
let (corruptions_detected, tranquility, time_last_complete_scrub) =
self.persister.get_with(|p| {
(
p.corruptions_detected,
p.tranquility,
p.time_last_complete_scrub,
)
});
let mut s = WorkerStatus { let mut s = WorkerStatus {
persistent_errors: Some(self.persisted.corruptions_detected), persistent_errors: Some(corruptions_detected),
tranquility: Some(self.persisted.tranquility), tranquility: Some(tranquility),
..Default::default() ..Default::default()
}; };
match &self.work { match &self.work {
@ -300,7 +304,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Finished => { ScrubWorkerState::Finished => {
s.freeform = vec![format!( s.freeform = vec![format!(
"Last scrub completed at {}", "Last scrub completed at {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub) msec_to_rfc3339(time_last_complete_scrub)
)]; )];
} }
} }
@ -321,18 +325,17 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await { match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => { Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash); error!("Found corrupt data block during scrub: {:?}", hash);
self.persisted.corruptions_detected += 1; self.persister.set_with(|p| p.corruptions_detected += 1)?;
self.persister.save_async(&self.persisted).await?;
} }
Err(e) => return Err(e), Err(e) => return Err(e),
_ => (), _ => (),
}; };
Ok(self Ok(self
.tranquilizer .tranquilizer
.tranquilize_worker(self.persisted.tranquility)) .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else { } else {
self.persisted.time_last_complete_scrub = now_msec(); self.persister
self.persister.save_async(&self.persisted).await?; .set_with(|p| p.time_last_complete_scrub = now_msec())?;
self.work = ScrubWorkerState::Finished; self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear(); self.tranquilizer.clear();
Ok(WorkerState::Idle) Ok(WorkerState::Idle)
@ -347,7 +350,8 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => ( ScrubWorkerState::Finished => (
self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, self.persister.get_with(|p| p.time_last_complete_scrub)
+ SCRUB_INTERVAL.as_millis() as u64,
ScrubWorkerCommand::Start, ScrubWorkerCommand::Start,
), ),
}; };

View File

@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwap;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::persister::Persister; use garage_util::persister::PersisterShared;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager { pub struct BlockResyncManager {
pub(crate) queue: CountedTree, pub(crate) queue: CountedTree,
pub(crate) notify: Notify, pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree, pub(crate) errors: CountedTree,
busy_set: BusySet, busy_set: BusySet,
persister: Persister<ResyncPersistedConfig>, persister: PersisterShared<ResyncPersistedConfig>,
persisted: ArcSwap<ResyncPersistedConfig>,
} }
#[derive(Serialize, Deserialize, Clone, Copy)] #[derive(Serialize, Deserialize, Clone, Copy)]
@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32, tranquility: u32,
} }
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
impl Default for ResyncPersistedConfig {
fn default() -> Self {
ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY,
}
}
}
enum ResyncIterResult { enum ResyncIterResult {
BusyDidSomething, BusyDidSomething,
@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree"); .expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
let persister = Persister::new(&system.metadata_dir, "resync_cfg"); let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
let persisted = match persister.load() {
Ok(v) => v,
Err(_) => ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY,
},
};
Self { Self {
queue, queue,
notify: Notify::new(), notify: Arc::new(Notify::new()),
errors, errors,
busy_set: Arc::new(Mutex::new(HashSet::new())), busy_set: Arc::new(Mutex::new(HashSet::new())),
persister, persister,
persisted: ArcSwap::new(Arc::new(persisted)),
} }
} }
@ -142,6 +140,38 @@ impl BlockResyncManager {
))) )))
} }
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
let notify = self.notify.clone();
vars.register_rw(
&self.persister,
"resync-worker-count",
|p| p.get_with(|x| x.n_workers),
move |p, n_workers| {
if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
p.set_with(|x| x.n_workers = n_workers)?;
notify.notify_waiters();
Ok(())
},
);
let notify = self.notify.clone();
vars.register_rw(
&self.persister,
"resync-tranquility",
|p| p.get_with(|x| x.tranquility),
move |p, tranquility| {
p.set_with(|x| x.tranquility = tranquility)?;
notify.notify_waiters();
Ok(())
},
);
}
// ---- Resync loop ---- // ---- Resync loop ----
// This part manages a queue of blocks that need to be // This part manages a queue of blocks that need to be
@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(()) Ok(())
} }
async fn update_persisted(
&self,
update: impl Fn(&mut ResyncPersistedConfig),
) -> Result<(), Error> {
let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
update(&mut cfg);
self.persister.save_async(&cfg).await?;
self.persisted.store(Arc::new(cfg));
self.notify.notify_waiters();
Ok(())
}
pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
self.update_persisted(|cfg| cfg.n_workers = n_workers).await
}
pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
self.update_persisted(|cfg| cfg.tranquility = tranquility)
.await
}
} }
impl Drop for BusyBlock { impl Drop for BusyBlock {
@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
next_delay: Duration, next_delay: Duration,
persister: PersisterShared<ResyncPersistedConfig>,
} }
impl ResyncWorker { impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self { pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
let persister = manager.resync.persister.clone();
Self { Self {
index, index,
manager, manager,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10), next_delay: Duration::from_secs(10),
persister,
} }
} }
} }
@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
} }
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load(); let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
if self.index >= persisted.n_workers { if self.index >= n_workers {
return WorkerStatus { return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()], freeform: vec!["This worker is currently disabled".into()],
..Default::default() ..Default::default()
@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus { WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
tranquility: Some(persisted.tranquility), tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default() ..Default::default()
} }
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if self.index >= self.manager.resync.persisted.load().n_workers { let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
if self.index >= n_workers {
return Ok(WorkerState::Idle); return Ok(WorkerState::Idle);
} }
self.tranquilizer.reset(); self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await { match self.manager.resync.resync_iter(&self.manager).await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self Ok(ResyncIterResult::BusyDidSomething) => {
.tranquilizer Ok(self.tranquilizer.tranquilize_worker(tranquility))
.tranquilize_worker(self.manager.resync.persisted.load().tranquility)), }
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => { Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay; self.next_delay = delay;
@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
} }
async fn wait_for_work(&mut self) -> WorkerState { async fn wait_for_work(&mut self) -> WorkerState {
while self.index >= self.manager.resync.persisted.load().n_workers { while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await self.manager.resync.notify.notified().await
} }

View File

@ -18,7 +18,6 @@ use garage_table::*;
use garage_rpc::*; use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo; use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*; use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
@ -60,6 +59,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>, HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt, WorkerListOpt,
), ),
WorkerVars(Vec<(String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo), WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>), BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo { BlockInfo {
@ -943,32 +943,27 @@ impl AdminRpcHandler {
.clone(); .clone();
Ok(AdminRpc::WorkerInfo(*tid, info)) Ok(AdminRpc::WorkerInfo(*tid, info))
} }
WorkerOperation::Set { opt } => match opt { WorkerOperation::Get { variable } => {
WorkerSetCmd::ScrubTranquility { tranquility } => { if let Some(v) = variable {
let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); Ok(AdminRpc::WorkerVars(vec![(
self.garage v.clone(),
.block_manager self.garage.bg_vars.get(&v)?,
.send_scrub_command(scrub_command) )]))
.await?; } else {
Ok(AdminRpc::Ok("Scrub tranquility updated".into())) Ok(AdminRpc::WorkerVars(
self.garage
.bg_vars
.get_all()
.into_iter()
.map(|(k, v)| (k.to_string(), v))
.collect(),
))
} }
WorkerSetCmd::ResyncWorkerCount { worker_count } => { }
self.garage WorkerOperation::Set { variable, value } => {
.block_manager self.garage.bg_vars.set(&variable, &value)?;
.resync Ok(AdminRpc::Ok(format!("{} was set to {}", variable, value)))
.set_n_workers(*worker_count) }
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
WorkerSetCmd::ResyncTranquility { tranquility } => {
self.garage
.block_manager
.resync
.set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
} }
} }

View File

@ -191,6 +191,9 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => { AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo); print_worker_list(wi, wlo);
} }
AdminRpc::WorkerVars(wv) => {
print_worker_vars(wv);
}
AdminRpc::WorkerInfo(tid, wi) => { AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi); print_worker_info(tid, wi);
} }

View File

@ -517,12 +517,12 @@ pub enum WorkerOperation {
/// Get detailed information about a worker /// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())] #[structopt(name = "info", version = garage_version())]
Info { tid: usize }, Info { tid: usize },
/// Get worker parameter
#[structopt(name = "get", version = garage_version())]
Get { variable: Option<String> },
/// Set worker parameter /// Set worker parameter
#[structopt(name = "set", version = garage_version())] #[structopt(name = "set", version = garage_version())]
Set { Set { variable: String, value: String },
#[structopt(subcommand)]
opt: WorkerSetCmd,
},
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
@ -535,19 +535,6 @@ pub struct WorkerListOpt {
pub errors: bool, pub errors: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerSetCmd {
/// Set tranquility of scrub operations
#[structopt(name = "scrub-tranquility", version = garage_version())]
ScrubTranquility { tranquility: u32 },
/// Set number of concurrent block resync workers
#[structopt(name = "resync-worker-count", version = garage_version())]
ResyncWorkerCount { worker_count: usize },
/// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation { pub enum BlockOperation {
/// List all blocks that currently have a resync error /// List all blocks that currently have a resync error

View File

@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table); format_table(table);
} }
pub fn print_worker_vars(wv: Vec<(String, String)>) {
let table = wv
.into_iter()
.map(|(k, v)| format!("{}\t{}", k, v))
.collect::<Vec<_>>();
format_table(table);
}
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec(); let now = now_msec();
let tf = timeago::Formatter::new(); let tf = timeago::Formatter::new();

View File

@ -51,7 +51,11 @@ pub async fn launch_online_repair(
ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => { ScrubCmd::SetTranquility { tranquility } => {
ScrubWorkerCommand::SetTranquility(tranquility) garage
.block_manager
.scrub_persister
.set_with(|x| x.tranquility = tranquility)?;
return Ok(());
} }
}; };
info!("Sending command to scrub worker: {:?}", cmd); info!("Sending command to scrub worker: {:?}", cmd);

View File

@ -33,6 +33,8 @@ use crate::k2v::{item_table::*, poll::*, rpc::*};
pub struct Garage { pub struct Garage {
/// The parsed configuration Garage is running /// The parsed configuration Garage is running
pub config: Config, pub config: Config,
/// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars,
/// The replication mode of this cluster /// The replication mode of this cluster
pub replication_mode: ReplicationMode, pub replication_mode: ReplicationMode,
@ -249,9 +251,14 @@ impl Garage {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
// Initialize bg vars
let mut bg_vars = vars::BgVars::new();
block_manager.register_bg_vars(&mut bg_vars);
// -- done -- // -- done --
Ok(Arc::new(Self { Ok(Arc::new(Self {
config, config,
bg_vars,
replication_mode, replication_mode,
db, db,
system, system,

View File

@ -1,5 +1,6 @@
//! Job runner for futures and async functions //! Job runner for futures and async functions
pub mod vars;
pub mod worker; pub mod worker;
use std::collections::HashMap; use std::collections::HashMap;

107
src/util/background/vars.rs Normal file
View File

@ -0,0 +1,107 @@
use std::collections::HashMap;
use std::str::FromStr;
use crate::error::{Error, OkOrMessage};
use crate::migrate::Migrate;
use crate::persister::PersisterShared;
pub struct BgVars {
vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
}
impl BgVars {
pub fn new() -> Self {
Self {
vars: HashMap::new(),
}
}
pub fn register_rw<V, T, GF, SF>(
&mut self,
p: &PersisterShared<V>,
name: &'static str,
get_fn: GF,
set_fn: SF,
) where
V: Migrate + Default + Send + Sync,
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
{
let p1 = p.clone();
let get_fn = move || get_fn(&p1);
let p2 = p.clone();
let set_fn = move |v| set_fn(&p2, v);
self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
}
pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
where
V: Migrate + Default + Send + Sync,
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
{
let p1 = p.clone();
let get_fn = move || get_fn(&p1);
let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
}
pub fn get(&self, var: &str) -> Result<String, Error> {
Ok(self
.vars
.get(var)
.ok_or_message("variable does not exist")?
.get())
}
pub fn get_all(&self) -> Vec<(&'static str, String)> {
self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
}
pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
self.vars
.get(var)
.ok_or_message("variable does not exist")?
.set(val)
}
}
// ----
trait BgVarTrait: Send + Sync + 'static {
fn get(&self) -> String;
fn set(&self, v: &str) -> Result<(), Error>;
}
struct BgVar<T, GF, SF>
where
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn() -> T + Send + Sync + 'static,
SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
{
get_fn: GF,
set_fn: SF,
}
impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
where
T: FromStr + ToString + Sync + Send + 'static,
GF: Fn() -> T + Sync + Send + 'static,
SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
{
fn get(&self) -> String {
(self.get_fn)().to_string()
}
fn set(&self, vstr: &str) -> Result<(), Error> {
let value = vstr
.parse()
.map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
(self.set_fn)(value)
}
}

View File

@ -1,5 +1,6 @@
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> {
Ok(()) Ok(())
} }
} }
pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
impl<V: Migrate + Default> Clone for PersisterShared<V> {
fn clone(&self) -> PersisterShared<V> {
PersisterShared(self.0.clone())
}
}
impl<V: Migrate + Default> PersisterShared<V> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let persister = Persister::new(base_dir, file_name);
let value = persister.load().unwrap_or_default();
Self(Arc::new((persister, RwLock::new(value))))
}
pub fn get_with<F, R>(&self, f: F) -> R
where
F: FnOnce(&V) -> R,
{
let value = self.0 .1.read().unwrap();
f(&value)
}
pub fn set_with<F>(&self, f: F) -> Result<(), Error>
where
F: FnOnce(&mut V),
{
let mut value = self.0 .1.write().unwrap();
f(&mut value);
self.0 .0.save(&value)
}
}