Improve garage worker set and add garage worker get #464

Merged
lx merged 3 commits from worker-get into main 2023-01-04 13:47:43 +00:00
14 changed files with 406 additions and 168 deletions

View file

@ -23,10 +23,12 @@ use garage_rpc::rpc_helper::netapp::stream::{stream_asyncread, ByteStream};
use garage_db as db; use garage_db as db;
use garage_util::background::BackgroundRunner; use garage_util::background::{vars, BackgroundRunner};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::persister::PersisterShared;
use garage_util::time::msec_to_rfc3339;
use garage_rpc::rpc_helper::OrderTag; use garage_rpc::rpc_helper::OrderTag;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -89,6 +91,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics, pub(crate) metrics: BlockManagerMetrics,
pub scrub_persister: PersisterShared<ScrubWorkerPersisted>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>, tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
} }
@ -128,6 +131,8 @@ impl BlockManager {
let metrics = let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone()); BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
let block_manager = Arc::new(Self { let block_manager = Arc::new(Self {
replication, replication,
data_dir, data_dir,
@ -138,6 +143,7 @@ impl BlockManager {
system, system,
endpoint, endpoint,
metrics, metrics,
scrub_persister,
tx_scrub_command: ArcSwapOption::new(None), tx_scrub_command: ArcSwapOption::new(None),
}); });
block_manager.endpoint.set_handler(block_manager.clone()); block_manager.endpoint.set_handler(block_manager.clone());
@ -155,7 +161,28 @@ impl BlockManager {
// Spawn scrub worker // Spawn scrub worker
let (scrub_tx, scrub_rx) = mpsc::channel(1); let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
bg.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx)); bg.spawn_worker(ScrubWorker::new(
self.clone(),
scrub_rx,
self.scrub_persister.clone(),
));
}
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
self.resync.register_bg_vars(vars);
vars.register_rw(
&self.scrub_persister,
"scrub-tranquility",
|p| p.get_with(|x| x.tranquility),
|p, tranquility| p.set_with(|x| x.tranquility = tranquility),
);
vars.register_ro(&self.scrub_persister, "scrub-last-completed", |p| {
p.get_with(|x| msec_to_rfc3339(x.time_last_complete_scrub))
});
vars.register_ro(&self.scrub_persister, "scrub-corruptions_detected", |p| {
p.get_with(|x| x.corruptions_detected)
});
} }
/// Ask nodes that might have a (possibly compressed) block for it /// Ask nodes that might have a (possibly compressed) block for it

View file

@ -13,7 +13,7 @@ use tokio::sync::watch;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::persister::Persister; use garage_util::persister::PersisterShared;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -168,17 +168,25 @@ pub struct ScrubWorker {
work: ScrubWorkerState, work: ScrubWorkerState,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
persister: Persister<ScrubWorkerPersisted>, persister: PersisterShared<ScrubWorkerPersisted>,
persisted: ScrubWorkerPersisted,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct ScrubWorkerPersisted { pub struct ScrubWorkerPersisted {
tranquility: u32, pub tranquility: u32,
time_last_complete_scrub: u64, pub(crate) time_last_complete_scrub: u64,
corruptions_detected: u64, pub(crate) corruptions_detected: u64,
} }
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
impl Default for ScrubWorkerPersisted {
fn default() -> Self {
ScrubWorkerPersisted {
time_last_complete_scrub: 0,
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
}
}
}
enum ScrubWorkerState { enum ScrubWorkerState {
Running(BlockStoreIterator), Running(BlockStoreIterator),
@ -198,27 +206,20 @@ pub enum ScrubWorkerCommand {
Pause(Duration), Pause(Duration),
Resume, Resume,
Cancel, Cancel,
SetTranquility(u32),
} }
impl ScrubWorker { impl ScrubWorker {
pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self { pub(crate) fn new(
let persister = Persister::new(&manager.system.metadata_dir, "scrub_info"); manager: Arc<BlockManager>,
let persisted = match persister.load() { rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
Ok(v) => v, persister: PersisterShared<ScrubWorkerPersisted>,
Err(_) => ScrubWorkerPersisted { ) -> Self {
time_last_complete_scrub: 0,
tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0,
},
};
Self { Self {
manager, manager,
rx_cmd, rx_cmd,
work: ScrubWorkerState::Finished, work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
persister, persister,
persisted,
} }
} }
@ -267,12 +268,6 @@ impl ScrubWorker {
} }
} }
} }
ScrubWorkerCommand::SetTranquility(t) => {
self.persisted.tranquility = t;
if let Err(e) = self.persister.save_async(&self.persisted).await {
error!("Could not save new tranquilitiy value: {}", e);
}
}
} }
} }
} }
@ -284,9 +279,18 @@ impl Worker for ScrubWorker {
} }
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
let (corruptions_detected, tranquility, time_last_complete_scrub) =
self.persister.get_with(|p| {
(
p.corruptions_detected,
p.tranquility,
p.time_last_complete_scrub,
)
});
let mut s = WorkerStatus { let mut s = WorkerStatus {
persistent_errors: Some(self.persisted.corruptions_detected), persistent_errors: Some(corruptions_detected),
tranquility: Some(self.persisted.tranquility), tranquility: Some(tranquility),
..Default::default() ..Default::default()
}; };
match &self.work { match &self.work {
@ -300,7 +304,7 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Finished => { ScrubWorkerState::Finished => {
s.freeform = vec![format!( s.freeform = vec![format!(
"Last scrub completed at {}", "Last scrub completed at {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub) msec_to_rfc3339(time_last_complete_scrub)
)]; )];
} }
} }
@ -321,18 +325,17 @@ impl Worker for ScrubWorker {
match self.manager.read_block(&hash).await { match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => { Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash); error!("Found corrupt data block during scrub: {:?}", hash);
self.persisted.corruptions_detected += 1; self.persister.set_with(|p| p.corruptions_detected += 1)?;
self.persister.save_async(&self.persisted).await?;
} }
Err(e) => return Err(e), Err(e) => return Err(e),
_ => (), _ => (),
}; };
Ok(self Ok(self
.tranquilizer .tranquilizer
.tranquilize_worker(self.persisted.tranquility)) .tranquilize_worker(self.persister.get_with(|p| p.tranquility)))
} else { } else {
self.persisted.time_last_complete_scrub = now_msec(); self.persister
self.persister.save_async(&self.persisted).await?; .set_with(|p| p.time_last_complete_scrub = now_msec())?;
self.work = ScrubWorkerState::Finished; self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear(); self.tranquilizer.clear();
Ok(WorkerState::Idle) Ok(WorkerState::Idle)
@ -347,7 +350,8 @@ impl Worker for ScrubWorker {
ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => ( ScrubWorkerState::Finished => (
self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64, self.persister.get_with(|p| p.time_last_complete_scrub)
+ SCRUB_INTERVAL.as_millis() as u64,
ScrubWorkerCommand::Start, ScrubWorkerCommand::Start,
), ),
}; };

View file

@ -3,7 +3,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwap;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -22,7 +21,7 @@ use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::persister::Persister; use garage_util::persister::PersisterShared;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -49,13 +48,12 @@ const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
pub struct BlockResyncManager { pub struct BlockResyncManager {
pub(crate) queue: CountedTree, pub(crate) queue: CountedTree,
pub(crate) notify: Notify, pub(crate) notify: Arc<Notify>,
pub(crate) errors: CountedTree, pub(crate) errors: CountedTree,
busy_set: BusySet, busy_set: BusySet,
persister: Persister<ResyncPersistedConfig>, persister: PersisterShared<ResyncPersistedConfig>,
persisted: ArcSwap<ResyncPersistedConfig>,
} }
#[derive(Serialize, Deserialize, Clone, Copy)] #[derive(Serialize, Deserialize, Clone, Copy)]
@ -64,6 +62,14 @@ struct ResyncPersistedConfig {
tranquility: u32, tranquility: u32,
} }
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {} impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
impl Default for ResyncPersistedConfig {
fn default() -> Self {
ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY,
}
}
}
enum ResyncIterResult { enum ResyncIterResult {
BusyDidSomething, BusyDidSomething,
@ -91,22 +97,14 @@ impl BlockResyncManager {
.expect("Unable to open block_local_resync_errors tree"); .expect("Unable to open block_local_resync_errors tree");
let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors"); let errors = CountedTree::new(errors).expect("Could not count block_local_resync_errors");
let persister = Persister::new(&system.metadata_dir, "resync_cfg"); let persister = PersisterShared::new(&system.metadata_dir, "resync_cfg");
let persisted = match persister.load() {
Ok(v) => v,
Err(_) => ResyncPersistedConfig {
n_workers: 1,
tranquility: INITIAL_RESYNC_TRANQUILITY,
},
};
Self { Self {
queue, queue,
notify: Notify::new(), notify: Arc::new(Notify::new()),
errors, errors,
busy_set: Arc::new(Mutex::new(HashSet::new())), busy_set: Arc::new(Mutex::new(HashSet::new())),
persister, persister,
persisted: ArcSwap::new(Arc::new(persisted)),
} }
} }
@ -142,6 +140,38 @@ impl BlockResyncManager {
))) )))
} }
pub fn register_bg_vars(&self, vars: &mut vars::BgVars) {
let notify = self.notify.clone();
vars.register_rw(
&self.persister,
"resync-worker-count",
|p| p.get_with(|x| x.n_workers),
move |p, n_workers| {
if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
p.set_with(|x| x.n_workers = n_workers)?;
notify.notify_waiters();
Ok(())
},
);
let notify = self.notify.clone();
vars.register_rw(
&self.persister,
"resync-tranquility",
|p| p.get_with(|x| x.tranquility),
move |p, tranquility| {
p.set_with(|x| x.tranquility = tranquility)?;
notify.notify_waiters();
Ok(())
},
);
}
// ---- Resync loop ---- // ---- Resync loop ----
// This part manages a queue of blocks that need to be // This part manages a queue of blocks that need to be
@ -436,33 +466,6 @@ impl BlockResyncManager {
Ok(()) Ok(())
} }
async fn update_persisted(
&self,
update: impl Fn(&mut ResyncPersistedConfig),
) -> Result<(), Error> {
let mut cfg: ResyncPersistedConfig = *self.persisted.load().as_ref();
update(&mut cfg);
self.persister.save_async(&cfg).await?;
self.persisted.store(Arc::new(cfg));
self.notify.notify_waiters();
Ok(())
}
pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> {
if !(1..=MAX_RESYNC_WORKERS).contains(&n_workers) {
return Err(Error::Message(format!(
"Invalid number of resync workers, must be between 1 and {}",
MAX_RESYNC_WORKERS
)));
}
self.update_persisted(|cfg| cfg.n_workers = n_workers).await
}
pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> {
self.update_persisted(|cfg| cfg.tranquility = tranquility)
.await
}
} }
impl Drop for BusyBlock { impl Drop for BusyBlock {
@ -477,15 +480,18 @@ pub(crate) struct ResyncWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
next_delay: Duration, next_delay: Duration,
persister: PersisterShared<ResyncPersistedConfig>,
} }
impl ResyncWorker { impl ResyncWorker {
pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self { pub(crate) fn new(index: usize, manager: Arc<BlockManager>) -> Self {
let persister = manager.resync.persister.clone();
Self { Self {
index, index,
manager, manager,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10), next_delay: Duration::from_secs(10),
persister,
} }
} }
} }
@ -497,9 +503,9 @@ impl Worker for ResyncWorker {
} }
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
let persisted = self.manager.resync.persisted.load(); let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
if self.index >= persisted.n_workers { if self.index >= n_workers {
return WorkerStatus { return WorkerStatus {
freeform: vec!["This worker is currently disabled".into()], freeform: vec!["This worker is currently disabled".into()],
..Default::default() ..Default::default()
@ -508,22 +514,24 @@ impl Worker for ResyncWorker {
WorkerStatus { WorkerStatus {
queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64), queue_length: Some(self.manager.resync.queue_len().unwrap_or(0) as u64),
tranquility: Some(persisted.tranquility), tranquility: Some(tranquility),
persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64), persistent_errors: Some(self.manager.resync.errors_len().unwrap_or(0) as u64),
..Default::default() ..Default::default()
} }
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if self.index >= self.manager.resync.persisted.load().n_workers { let (n_workers, tranquility) = self.persister.get_with(|x| (x.n_workers, x.tranquility));
if self.index >= n_workers {
return Ok(WorkerState::Idle); return Ok(WorkerState::Idle);
} }
self.tranquilizer.reset(); self.tranquilizer.reset();
match self.manager.resync.resync_iter(&self.manager).await { match self.manager.resync.resync_iter(&self.manager).await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self Ok(ResyncIterResult::BusyDidSomething) => {
.tranquilizer Ok(self.tranquilizer.tranquilize_worker(tranquility))
.tranquilize_worker(self.manager.resync.persisted.load().tranquility)), }
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => { Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay; self.next_delay = delay;
@ -542,7 +550,7 @@ impl Worker for ResyncWorker {
} }
async fn wait_for_work(&mut self) -> WorkerState { async fn wait_for_work(&mut self) -> WorkerState {
while self.index >= self.manager.resync.persisted.load().n_workers { while self.index >= self.persister.get_with(|x| x.n_workers) {
self.manager.resync.notify.notified().await self.manager.resync.notify.notified().await
} }

View file

@ -18,7 +18,6 @@ use garage_table::*;
use garage_rpc::*; use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo; use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*; use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
@ -60,6 +59,7 @@ pub enum AdminRpc {
HashMap<usize, garage_util::background::WorkerInfo>, HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt, WorkerListOpt,
), ),
WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo), WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>), BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo { BlockInfo {
@ -943,32 +943,101 @@ impl AdminRpcHandler {
.clone(); .clone();
Ok(AdminRpc::WorkerInfo(*tid, info)) Ok(AdminRpc::WorkerInfo(*tid, info))
} }
WorkerOperation::Set { opt } => match opt { WorkerOperation::Get {
WorkerSetCmd::ScrubTranquility { tranquility } => { all_nodes,
let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility); variable,
self.garage } => self.handle_get_var(*all_nodes, variable).await,
.block_manager WorkerOperation::Set {
.send_scrub_command(scrub_command) all_nodes,
.await?; variable,
Ok(AdminRpc::Ok("Scrub tranquility updated".into())) value,
} => self.handle_set_var(*all_nodes, variable, value).await,
}
}
async fn handle_get_var(
&self,
all_nodes: bool,
variable: &Option<String>,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.layout.node_ids().iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Get {
all_nodes: false,
variable: variable.clone(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
} }
WorkerSetCmd::ResyncWorkerCount { worker_count } => { }
self.garage Ok(AdminRpc::WorkerVars(ret))
.block_manager } else {
.resync #[allow(clippy::collapsible_else_if)]
.set_n_workers(*worker_count) if let Some(v) = variable {
.await?; Ok(AdminRpc::WorkerVars(vec![(
Ok(AdminRpc::Ok("Number of resync workers updated".into())) self.garage.system.id,
v.clone(),
self.garage.bg_vars.get(v)?,
)]))
} else {
let mut vars = self.garage.bg_vars.get_all();
vars.sort();
Ok(AdminRpc::WorkerVars(
vars.into_iter()
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
.collect(),
))
}
}
}
async fn handle_set_var(
&self,
all_nodes: bool,
variable: &str,
value: &str,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let ring = self.garage.system.ring.borrow().clone();
for node in ring.layout.node_ids().iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Set {
all_nodes: false,
variable: variable.to_string(),
value: value.to_string(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
} }
WorkerSetCmd::ResyncTranquility { tranquility } => { }
self.garage Ok(AdminRpc::WorkerVars(ret))
.block_manager } else {
.resync self.garage.bg_vars.set(variable, value)?;
.set_tranquility(*tranquility) Ok(AdminRpc::WorkerVars(vec![(
.await?; self.garage.system.id,
Ok(AdminRpc::Ok("Resync tranquility updated".into())) variable.to_string(),
} value.to_string(),
}, )]))
} }
} }

View file

@ -191,6 +191,9 @@ pub async fn cmd_admin(
AdminRpc::WorkerList(wi, wlo) => { AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo); print_worker_list(wi, wlo);
} }
AdminRpc::WorkerVars(wv) => {
print_worker_vars(wv);
}
AdminRpc::WorkerInfo(tid, wi) => { AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi); print_worker_info(tid, wi);
} }

View file

@ -517,11 +517,25 @@ pub enum WorkerOperation {
/// Get detailed information about a worker /// Get detailed information about a worker
#[structopt(name = "info", version = garage_version())] #[structopt(name = "info", version = garage_version())]
Info { tid: usize }, Info { tid: usize },
/// Get worker parameter
#[structopt(name = "get", version = garage_version())]
Get {
/// Gather variable values from all nodes
#[structopt(short = "a", long = "all-nodes")]
all_nodes: bool,
/// Variable name to get, or none to get all variables
variable: Option<String>,
},
/// Set worker parameter /// Set worker parameter
#[structopt(name = "set", version = garage_version())] #[structopt(name = "set", version = garage_version())]
Set { Set {
#[structopt(subcommand)] /// Set variable values on all nodes
opt: WorkerSetCmd, #[structopt(short = "a", long = "all-nodes")]
all_nodes: bool,
/// Variable node to set
variable: String,
/// Value to set the variable to
value: String,
}, },
} }
@ -535,19 +549,6 @@ pub struct WorkerListOpt {
pub errors: bool, pub errors: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerSetCmd {
/// Set tranquility of scrub operations
#[structopt(name = "scrub-tranquility", version = garage_version())]
ScrubTranquility { tranquility: u32 },
/// Set number of concurrent block resync workers
#[structopt(name = "resync-worker-count", version = garage_version())]
ResyncWorkerCount { worker_count: usize },
/// Set tranquility of block resync operations
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation { pub enum BlockOperation {
/// List all blocks that currently have a resync error /// List all blocks that currently have a resync error

View file

@ -357,6 +357,14 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
format_table(table); format_table(table);
} }
pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
let table = wv
.into_iter()
.map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
.collect::<Vec<_>>();
format_table(table);
}
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) { pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec(); let now = now_msec();
let tf = timeago::Formatter::new(); let tf = timeago::Formatter::new();

View file

@ -51,7 +51,11 @@ pub async fn launch_online_repair(
ScrubCmd::Resume => ScrubWorkerCommand::Resume, ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => { ScrubCmd::SetTranquility { tranquility } => {
ScrubWorkerCommand::SetTranquility(tranquility) garage
.block_manager
.scrub_persister
.set_with(|x| x.tranquility = tranquility)?;
return Ok(());
} }
}; };
info!("Sending command to scrub worker: {:?}", cmd); info!("Sending command to scrub worker: {:?}", cmd);

View file

@ -33,6 +33,8 @@ use crate::k2v::{item_table::*, poll::*, rpc::*};
pub struct Garage { pub struct Garage {
/// The parsed configuration Garage is running /// The parsed configuration Garage is running
pub config: Config, pub config: Config,
/// The set of background variables that can be viewed/modified at runtime
pub bg_vars: vars::BgVars,
/// The replication mode of this cluster /// The replication mode of this cluster
pub replication_mode: ReplicationMode, pub replication_mode: ReplicationMode,
@ -249,9 +251,14 @@ impl Garage {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
// Initialize bg vars
let mut bg_vars = vars::BgVars::new();
block_manager.register_bg_vars(&mut bg_vars);
// -- done -- // -- done --
Ok(Arc::new(Self { Ok(Arc::new(Self {
config, config,
bg_vars,
replication_mode, replication_mode,
db, db,
system, system,

View file

@ -1,5 +1,6 @@
//! Job runner for futures and async functions //! Job runner for futures and async functions
pub mod vars;
pub mod worker; pub mod worker;
use std::collections::HashMap; use std::collections::HashMap;

113
src/util/background/vars.rs Normal file
View file

@ -0,0 +1,113 @@
use std::collections::HashMap;
use std::str::FromStr;
use crate::error::{Error, OkOrMessage};
use crate::migrate::Migrate;
use crate::persister::PersisterShared;
pub struct BgVars {
vars: HashMap<&'static str, Box<dyn BgVarTrait>>,
}
impl BgVars {
pub fn new() -> Self {
Self {
vars: HashMap::new(),
}
}
pub fn register_rw<V, T, GF, SF>(
&mut self,
p: &PersisterShared<V>,
name: &'static str,
get_fn: GF,
set_fn: SF,
) where
V: Migrate + Default + Send + Sync,
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
SF: Fn(&PersisterShared<V>, T) -> Result<(), Error> + Send + Sync + 'static,
{
let p1 = p.clone();
let get_fn = move || get_fn(&p1);
let p2 = p.clone();
let set_fn = move |v| set_fn(&p2, v);
self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
}
pub fn register_ro<V, T, GF>(&mut self, p: &PersisterShared<V>, name: &'static str, get_fn: GF)
where
V: Migrate + Default + Send + Sync,
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn(&PersisterShared<V>) -> T + Send + Sync + 'static,
{
let p1 = p.clone();
let get_fn = move || get_fn(&p1);
let set_fn = move |_| Err(Error::Message(format!("Cannot set value of {}", name)));
self.vars.insert(name, Box::new(BgVar { get_fn, set_fn }));
}
pub fn get(&self, var: &str) -> Result<String, Error> {
Ok(self
.vars
.get(var)
.ok_or_message("variable does not exist")?
.get())
}
pub fn get_all(&self) -> Vec<(&'static str, String)> {
self.vars.iter().map(|(k, v)| (*k, v.get())).collect()
}
pub fn set(&self, var: &str, val: &str) -> Result<(), Error> {
self.vars
.get(var)
.ok_or_message("variable does not exist")?
.set(val)
}
}
impl Default for BgVars {
fn default() -> Self {
Self::new()
}
}
// ----
trait BgVarTrait: Send + Sync + 'static {
fn get(&self) -> String;
fn set(&self, v: &str) -> Result<(), Error>;
}
struct BgVar<T, GF, SF>
where
T: FromStr + ToString + Send + Sync + 'static,
GF: Fn() -> T + Send + Sync + 'static,
SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
{
get_fn: GF,
set_fn: SF,
}
impl<T, GF, SF> BgVarTrait for BgVar<T, GF, SF>
where
T: FromStr + ToString + Sync + Send + 'static,
GF: Fn() -> T + Sync + Send + 'static,
SF: Fn(T) -> Result<(), Error> + Sync + Send + 'static,
{
fn get(&self) -> String {
(self.get_fn)().to_string()
}
fn set(&self, vstr: &str) -> Result<(), Error> {
let value = vstr
.parse()
.map_err(|_| Error::Message(format!("invalid value: {}", vstr)))?;
(self.set_fn)(value)
}
}

View file

@ -15,6 +15,5 @@ pub mod metrics;
pub mod migrate; pub mod migrate;
pub mod persister; pub mod persister;
pub mod time; pub mod time;
pub mod token_bucket;
pub mod tranquilizer; pub mod tranquilizer;
pub mod version; pub mod version;

View file

@ -1,5 +1,6 @@
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
@ -84,3 +85,36 @@ impl<T: Migrate> Persister<T> {
Ok(()) Ok(())
} }
} }
pub struct PersisterShared<V: Migrate + Default>(Arc<(Persister<V>, RwLock<V>)>);
impl<V: Migrate + Default> Clone for PersisterShared<V> {
fn clone(&self) -> PersisterShared<V> {
PersisterShared(self.0.clone())
}
}
impl<V: Migrate + Default> PersisterShared<V> {
pub fn new(base_dir: &Path, file_name: &str) -> Self {
let persister = Persister::new(base_dir, file_name);
let value = persister.load().unwrap_or_default();
Self(Arc::new((persister, RwLock::new(value))))
}
pub fn get_with<F, R>(&self, f: F) -> R
where
F: FnOnce(&V) -> R,
{
let value = self.0 .1.read().unwrap();
f(&value)
}
pub fn set_with<F>(&self, f: F) -> Result<(), Error>
where
F: FnOnce(&mut V),
{
let mut value = self.0 .1.write().unwrap();
f(&mut value);
self.0 .0.save(&value)
}
}

View file

@ -1,40 +0,0 @@
use std::time::{Duration, Instant};
use tokio::time::sleep;
pub struct TokenBucket {
// Replenish rate: number of tokens per second
replenish_rate: u64,
// Current number of tokens
tokens: u64,
// Last replenish time
last_replenish: Instant,
}
impl TokenBucket {
pub fn new(replenish_rate: u64) -> Self {
Self {
replenish_rate,
tokens: 0,
last_replenish: Instant::now(),
}
}
pub async fn take(&mut self, tokens: u64) {
while self.tokens < tokens {
let needed = tokens - self.tokens;
let delay = (needed as f64) / (self.replenish_rate as f64);
sleep(Duration::from_secs_f64(delay)).await;
self.replenish();
}
self.tokens -= tokens;
}
pub fn replenish(&mut self) {
let now = Instant::now();
let new_tokens =
((now - self.last_replenish).as_secs_f64() * (self.replenish_rate as f64)) as u64;
self.tokens += new_tokens;
self.last_replenish = now;
}
}