Ability to dynamically set resync tranquility

This commit is contained in:
Alex 2022-09-02 15:34:21 +02:00
parent 532eca7ff9
commit 943d76c583
Signed by: lx
GPG key ID: 0E496D15096376BE
6 changed files with 108 additions and 23 deletions

View file

@ -3,7 +3,7 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -25,6 +25,7 @@ use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
@ -55,6 +56,10 @@ const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour) // The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour)
const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
// Resync tranquility is initially set to 2, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_RESYNC_TRANQUILITY: u32 = 2;
// The delay between the moment when the reference counter // The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves // drops to zero, and the moment where we allow ourselves
// to delete the block locally. // to delete the block locally.
@ -90,7 +95,6 @@ pub struct BlockManager {
pub data_dir: PathBuf, pub data_dir: PathBuf,
compression_level: Option<i32>, compression_level: Option<i32>,
background_tranquility: u32,
mutation_lock: Mutex<BlockManagerLocked>, mutation_lock: Mutex<BlockManagerLocked>,
@ -100,6 +104,9 @@ pub struct BlockManager {
resync_notify: Notify, resync_notify: Notify,
resync_errors: CountedTree, resync_errors: CountedTree,
resync_persister: Persister<ResyncPersistedConfig>,
resync_persisted: ArcSwap<ResyncPersistedConfig>,
pub(crate) system: Arc<System>, pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>, endpoint: Arc<Endpoint<BlockRpc, Self>>,
@ -124,7 +131,6 @@ impl BlockManager {
db: &db::Db, db: &db::Db,
data_dir: PathBuf, data_dir: PathBuf,
compression_level: Option<i32>, compression_level: Option<i32>,
background_tranquility: u32,
replication: TableShardedReplication, replication: TableShardedReplication,
system: Arc<System>, system: Arc<System>,
) -> Arc<Self> { ) -> Arc<Self> {
@ -145,6 +151,14 @@ impl BlockManager {
let resync_errors = let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
let resync_persister = Persister::new(&system.metadata_dir, "resync_cfg");
let resync_persisted = match resync_persister.load() {
Ok(v) => v,
Err(_) => ResyncPersistedConfig {
tranquility: INITIAL_RESYNC_TRANQUILITY,
},
};
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint("garage_block/manager.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
@ -157,12 +171,13 @@ impl BlockManager {
replication, replication,
data_dir, data_dir,
compression_level, compression_level,
background_tranquility,
mutation_lock: Mutex::new(manager_locked), mutation_lock: Mutex::new(manager_locked),
rc, rc,
resync_queue, resync_queue,
resync_notify: Notify::new(), resync_notify: Notify::new(),
resync_errors, resync_errors,
resync_persister,
resync_persisted: ArcSwap::new(Arc::new(resync_persisted)),
system, system,
endpoint, endpoint,
metrics, metrics,
@ -716,6 +731,23 @@ impl BlockManager {
Ok(()) Ok(())
} }
async fn update_resync_persisted(
&self,
update: impl Fn(&mut ResyncPersistedConfig),
) -> Result<(), Error> {
let mut cfg: ResyncPersistedConfig = *self.resync_persisted.load().as_ref();
update(&mut cfg);
self.resync_persister.save_async(&cfg).await?;
self.resync_persisted.store(Arc::new(cfg));
self.resync_notify.notify_one();
Ok(())
}
pub async fn set_resync_tranquility(&self, tranquility: u32) -> Result<(), Error> {
self.update_resync_persisted(|cfg| cfg.tranquility = tranquility)
.await
}
} }
#[async_trait] #[async_trait]
@ -734,6 +766,11 @@ impl EndpointHandler<BlockRpc> for BlockManager {
} }
} }
#[derive(Serialize, Deserialize, Clone, Copy)]
struct ResyncPersistedConfig {
tranquility: u32,
}
struct ResyncWorker { struct ResyncWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
@ -758,19 +795,22 @@ impl Worker for ResyncWorker {
fn info(&self) -> Option<String> { fn info(&self) -> Option<String> {
let mut ret = vec![]; let mut ret = vec![];
ret.push(format!(
"tranquility = {}",
self.manager.resync_persisted.load().tranquility
));
let qlen = self.manager.resync_queue_len().unwrap_or(0); let qlen = self.manager.resync_queue_len().unwrap_or(0);
let elen = self.manager.resync_errors_len().unwrap_or(0);
if qlen > 0 { if qlen > 0 {
ret.push(format!("{} blocks in queue", qlen)); ret.push(format!("{} blocks in queue", qlen));
} }
let elen = self.manager.resync_errors_len().unwrap_or(0);
if elen > 0 { if elen > 0 {
ret.push(format!("{} blocks in error state", elen)); ret.push(format!("{} blocks in error state", elen));
} }
if !ret.is_empty() {
Some(ret.join(", ")) Some(ret.join(", "))
} else {
None
}
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
@ -778,7 +818,7 @@ impl Worker for ResyncWorker {
match self.manager.resync_iter().await { match self.manager.resync_iter().await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self Ok(ResyncIterResult::BusyDidSomething) => Ok(self
.tranquilizer .tranquilizer
.tranquilize_worker(self.manager.background_tranquility)), .tranquilize_worker(self.manager.resync_persisted.load().tranquility)),
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy), Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => { Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay; self.next_delay = delay;

View file

@ -19,7 +19,17 @@ use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*; use crate::manager::*;
const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days // Full scrub every 30 days
const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30);
// Scrub tranquility is initially set to 4, but can be changed in the CLI
// and the updated version is persisted over Garage restarts
const INITIAL_SCRUB_TRANQUILITY: u32 = 4;
// ---- ---- ----
// FIRST KIND OF REPAIR: FINDING MISSING BLOCKS/USELESS BLOCKS
// This is a one-shot repair operation that can be launched,
// checks everything, and then exits.
// ---- ---- ----
pub struct RepairWorker { pub struct RepairWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
@ -128,7 +138,13 @@ impl Worker for RepairWorker {
} }
} }
// ---- // ---- ---- ----
// SECOND KIND OF REPAIR: SCRUBBING THE DATASTORE
// This is significantly more complex than the process above,
// as it is a continuously-running task that triggers automatically
// every SCRUB_INTERVAL, but can also be triggered manually
// and whose parameter (esp. speed) can be controlled at runtime.
// ---- ---- ----
pub struct ScrubWorker { pub struct ScrubWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
@ -176,7 +192,7 @@ impl ScrubWorker {
Ok(v) => v, Ok(v) => v,
Err(_) => ScrubWorkerPersisted { Err(_) => ScrubWorkerPersisted {
time_last_complete_scrub: 0, time_last_complete_scrub: 0,
tranquility: 4, tranquility: INITIAL_SCRUB_TRANQUILITY,
corruptions_detected: 0, corruptions_detected: 0,
}, },
}; };
@ -343,7 +359,9 @@ impl Worker for ScrubWorker {
} }
} }
// ---- // ---- ---- ----
// UTILITY FOR ENUMERATING THE BLOCK STORE
// ---- ---- ----
struct BlockStoreIterator { struct BlockStoreIterator {
path: Vec<ReadingDir>, path: Vec<ReadingDir>,

View file

@ -15,6 +15,8 @@ use garage_table::*;
use garage_rpc::*; use garage_rpc::*;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*; use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -836,6 +838,23 @@ impl AdminRpcHandler {
let workers = self.garage.background.get_worker_info(); let workers = self.garage.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, opt)) Ok(AdminRpc::WorkerList(workers, opt))
} }
WorkerCmd::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
.await;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
}
WorkerSetCmd::ResyncTranquility { tranquility } => {
self.garage
.block_manager
.set_resync_tranquility(tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
} }
} }
} }

View file

@ -501,6 +501,12 @@ pub enum WorkerCmd {
#[structopt(flatten)] #[structopt(flatten)]
opt: WorkerListOpt, opt: WorkerListOpt,
}, },
/// Set worker parameter
#[structopt(name = "set", version = version::garage())]
Set {
#[structopt(subcommand)]
opt: WorkerSetCmd,
},
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] #[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
@ -512,3 +518,13 @@ pub struct WorkerListOpt {
#[structopt(short = "e", long = "errors")] #[structopt(short = "e", long = "errors")]
pub errors: bool, pub errors: bool,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerSetCmd {
/// Set tranquility of scrub operations
#[structopt(name = "scrub-tranquility", version = version::garage())]
ScrubTranquility { tranquility: u32 },
/// Set tranquility of resync operations
#[structopt(name = "resync-tranquility", version = version::garage())]
ResyncTranquility { tranquility: u32 },
}

View file

@ -159,7 +159,6 @@ impl Garage {
&db, &db,
config.data_dir.clone(), config.data_dir.clone(),
config.compression_level, config.compression_level,
config.block_manager_background_tranquility,
data_rep_param, data_rep_param,
system.clone(), system.clone(),
); );

View file

@ -23,10 +23,6 @@ pub struct Config {
#[serde(default = "default_block_size")] #[serde(default = "default_block_size")]
pub block_size: usize, pub block_size: usize,
/// Size of data blocks to save to disk
#[serde(default = "default_block_manager_background_tranquility")]
pub block_manager_background_tranquility: u32,
/// Replication mode. Supported values: /// Replication mode. Supported values:
/// - none, 1 -> no replication /// - none, 1 -> no replication
/// - 2 -> 2-way replication /// - 2 -> 2-way replication
@ -147,9 +143,6 @@ fn default_sled_flush_every_ms() -> u64 {
fn default_block_size() -> usize { fn default_block_size() -> usize {
1048576 1048576
} }
fn default_block_manager_background_tranquility() -> u32 {
2
}
/// Read and parse configuration /// Read and parse configuration
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> { pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {