diff --git a/Cargo.lock b/Cargo.lock index 8de73002..c45ee015 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1049,6 +1049,7 @@ dependencies = [ name = "garage_block" version = "0.7.0" dependencies = [ + "arc-swap", "async-trait", "bytes 1.1.0", "futures", diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 80346aca..2555a44a 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" } opentelemetry = "0.17" +arc-swap = "1.5" async-trait = "0.1.7" bytes = "1.0" hex = "0.4" diff --git a/src/block/manager.rs b/src/block/manager.rs index 27f51ff8..015ac71b 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -3,6 +3,7 @@ use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; +use arc_swap::ArcSwapOption; use async_trait::async_trait; use serde::{Deserialize, Serialize}; @@ -10,7 +11,7 @@ use futures::future::*; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::select; -use tokio::sync::{watch, Mutex, Notify}; +use tokio::sync::{mpsc, watch, Mutex, Notify}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -35,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::block::*; use crate::metrics::*; use crate::rc::*; +use crate::repair::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -86,6 +88,8 @@ pub struct BlockManager { pub replication: TableShardedReplication, /// Directory in which block are stored pub data_dir: PathBuf, + /// State store (only used by scrub worker to store time of last scrub) + pub(crate) state_variables_store: db::Tree, compression_level: Option, background_tranquility: u32, @@ -102,6 +106,8 @@ pub struct BlockManager { endpoint: Arc>, metrics: BlockManagerMetrics, + + tx_scrub_command: ArcSwapOption>, } // This custom struct contains functions that must only be ran @@ -141,6 +147,10 @@ impl BlockManager { let resync_errors = CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); + let state_variables_store = db + .open_tree("state_variables") + .expect("Unable to open state_variables tree"); + let endpoint = system .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); @@ -159,13 +169,15 @@ impl BlockManager { resync_queue, resync_notify: Notify::new(), resync_errors, + state_variables_store, system, endpoint, metrics, + tx_scrub_command: ArcSwapOption::new(None), }); block_manager.endpoint.set_handler(block_manager.clone()); - block_manager.clone().spawn_background_worker(); + block_manager.clone().spawn_background_workers(); block_manager } @@ -242,6 +254,17 @@ impl BlockManager { Ok(self.rc.rc.len()?) } + /// Send command to start/stop/manager scrub worker + pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) { + let _ = self + .tx_scrub_command + .load() + .as_ref() + .unwrap() + .send(cmd) + .await; + } + //// ----- Managing the reference counter ---- /// Increment the number of time a block is used, putting it to resynchronization if it is @@ -475,11 +498,11 @@ impl BlockManager { // for times that are earlier than the exponential back-off delay // is a natural condition that is handled properly). - fn spawn_background_worker(self: Arc) { + fn spawn_background_workers(self: Arc) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); let worker = ResyncWorker { - manager: self, + manager: self.clone(), tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), }; @@ -487,6 +510,12 @@ impl BlockManager { tokio::time::sleep(Duration::from_secs(10)).await; background.spawn_worker(worker); }); + + // Launch a background worker for data store scrubs + let (scrub_tx, scrub_rx) = mpsc::channel(1); + self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); + let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4); + self.system.background.spawn_worker(scrub_worker); } pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { diff --git a/src/block/repair.rs b/src/block/repair.rs index a2a8443e..8335de51 100644 --- a/src/block/repair.rs +++ b/src/block/repair.rs @@ -1,20 +1,26 @@ use core::ops::Bound; +use std::convert::TryInto; use std::path::PathBuf; - use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use tokio::fs; +use tokio::select; +use tokio::sync::mpsc; use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; +use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::manager::*; +const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days +const TIME_LAST_COMPLETE_SCRUB: &[u8] = b"time_last_complete_scrub"; + pub struct RepairWorker { manager: Arc, next_start: Option, @@ -129,19 +135,107 @@ impl Worker for RepairWorker { pub struct ScrubWorker { manager: Arc, - iterator: BlockStoreIterator, + rx_cmd: mpsc::Receiver, + + work: ScrubWorkerState, tranquilizer: Tranquilizer, tranquility: u32, + + time_last_complete_scrub: u64, +} + +enum ScrubWorkerState { + Running(BlockStoreIterator), + Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub + Finished, +} + +impl Default for ScrubWorkerState { + fn default() -> Self { + ScrubWorkerState::Finished + } +} + +pub enum ScrubWorkerCommand { + Start, + Pause(Duration), + Resume, + Cancel, + SetTranquility(u32), } impl ScrubWorker { - pub fn new(manager: Arc, tranquility: u32) -> Self { - let iterator = BlockStoreIterator::new(&manager); + pub fn new( + manager: Arc, + rx_cmd: mpsc::Receiver, + tranquility: u32, + ) -> Self { + let time_last_complete_scrub = match manager + .state_variables_store + .get(TIME_LAST_COMPLETE_SCRUB) + .expect("DB error when initializing scrub worker") + { + Some(v) => u64::from_be_bytes(v.try_into().unwrap()), + None => 0, + }; Self { manager, - iterator, + rx_cmd, + work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), tranquility, + time_last_complete_scrub, + } + } + + fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) { + match cmd { + ScrubWorkerCommand::Start => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Finished => { + let iterator = BlockStoreIterator::new(&self.manager); + ScrubWorkerState::Running(iterator) + } + work => { + error!("Cannot start scrub worker: already running!"); + work + } + }; + } + ScrubWorkerCommand::Pause(dur) => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { + ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) + } + work => { + error!("Cannot pause scrub worker: not running!"); + work + } + }; + } + ScrubWorkerCommand::Resume => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), + work => { + error!("Cannot resume scrub worker: not paused!"); + work + } + }; + } + ScrubWorkerCommand::Cancel => { + self.work = match std::mem::take(&mut self.work) { + ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { + ScrubWorkerState::Finished + } + work => { + error!("Cannot cancel scrub worker: not running!"); + work + } + } + } + ScrubWorkerCommand::SetTranquility(t) => { + self.tranquility = t; + } } } } @@ -153,24 +247,80 @@ impl Worker for ScrubWorker { } fn info(&self) -> Option { - Some(format!("{:.2}% done", self.iterator.progress() * 100.)) + match &self.work { + ScrubWorkerState::Running(bsi) => Some(format!("{:.2}% done", bsi.progress() * 100.)), + ScrubWorkerState::Paused(_bsi, rt) => { + Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt))) + } + ScrubWorkerState::Finished => Some(format!( + "Last completed scrub: {}", + msec_to_rfc3339(self.time_last_complete_scrub) + )), + } } async fn work( &mut self, _must_exit: &mut watch::Receiver, ) -> Result { - self.tranquilizer.reset(); - if let Some(hash) = self.iterator.next().await? { - let _ = self.manager.read_block(&hash).await; - Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) - } else { - Ok(WorkerStatus::Done) + match self.rx_cmd.try_recv() { + Ok(cmd) => self.handle_cmd(cmd), + Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done), + Err(mpsc::error::TryRecvError::Empty) => (), + }; + + match &mut self.work { + ScrubWorkerState::Running(bsi) => { + self.tranquilizer.reset(); + if let Some(hash) = bsi.next().await? { + let _ = self.manager.read_block(&hash).await; + Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) + } else { + self.time_last_complete_scrub = now_msec(); // TODO save to file + self.manager.state_variables_store.insert( + TIME_LAST_COMPLETE_SCRUB, + u64::to_be_bytes(self.time_last_complete_scrub), + )?; + self.work = ScrubWorkerState::Finished; + self.tranquilizer.clear(); + Ok(WorkerStatus::Idle) + } + } + _ => Ok(WorkerStatus::Idle), } } async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerStatus { - unreachable!() + match &self.work { + ScrubWorkerState::Running(_) => return WorkerStatus::Busy, + ScrubWorkerState::Paused(_, resume_time) => { + let delay = Duration::from_millis(resume_time - now_msec()); + select! { + _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume), + cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { + self.handle_cmd(cmd); + } else { + return WorkerStatus::Done; + } + } + } + ScrubWorkerState::Finished => { + let delay = SCRUB_INTERVAL + - Duration::from_secs(now_msec() - self.time_last_complete_scrub); + select! { + _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start), + cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { + self.handle_cmd(cmd); + } else { + return WorkerStatus::Done; + } + } + } + } + match &self.work { + ScrubWorkerState::Running(_) => WorkerStatus::Busy, + _ => WorkerStatus::Idle, + } } } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index de49331e..71ee608c 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -698,7 +698,7 @@ impl AdminRpcHandler { ))) } } else { - launch_online_repair(self.garage.clone(), opt); + launch_online_repair(self.garage.clone(), opt).await; Ok(AdminRpc::Ok(format!( "Repair launched on {:?}", self.garage.system.id diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index c1ee32ab..bc44b5ef 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -427,8 +427,29 @@ pub enum RepairWhat { /// Verify integrity of all blocks on disc (extremely slow, i/o intensive) #[structopt(name = "scrub")] Scrub { - /// Tranquility factor (see tranquilizer documentation) - #[structopt(name = "tranquility", default_value = "2")] + #[structopt(subcommand)] + cmd: ScrubCmd, + }, +} + +#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +pub enum ScrubCmd { + /// Start scrub + #[structopt(name = "start")] + Start, + /// Pause scrub (it will resume automatically after 24 hours) + #[structopt(name = "pause")] + Pause, + /// Resume paused scrub + #[structopt(name = "resume")] + Resume, + /// Cancel scrub in progress + #[structopt(name = "cancel")] + Cancel, + /// Set tranquility level for in-progress and future scrubs + #[structopt(name = "set-tranquility")] + SetTranquility { + #[structopt()] tranquility: u32, }, } diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index b0437c5e..8207a8b4 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,8 +1,10 @@ use std::sync::Arc; +use std::time::Duration; use async_trait::async_trait; use tokio::sync::watch; +use garage_block::repair::ScrubWorkerCommand; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; @@ -13,7 +15,7 @@ use garage_util::error::Error; use crate::*; -pub fn launch_online_repair(garage: Arc, opt: RepairOpt) { +pub async fn launch_online_repair(garage: Arc, opt: RepairOpt) { match opt.what { RepairWhat::Tables => { info!("Launching a full sync of tables"); @@ -43,14 +45,18 @@ pub fn launch_online_repair(garage: Arc, opt: RepairOpt) { garage.block_manager.clone(), )); } - RepairWhat::Scrub { tranquility } => { + RepairWhat::Scrub { cmd } => { info!("Verifying integrity of stored blocks"); - garage - .background - .spawn_worker(garage_block::repair::ScrubWorker::new( - garage.block_manager.clone(), - tranquility, - )); + let cmd = match cmd { + ScrubCmd::Start => ScrubWorkerCommand::Start, + ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), + ScrubCmd::Resume => ScrubWorkerCommand::Resume, + ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, + ScrubCmd::SetTranquility { tranquility } => { + ScrubWorkerCommand::SetTranquility(tranquility) + } + }; + garage.block_manager.send_scrub_command(cmd).await; } } } diff --git a/src/util/tranquilizer.rs b/src/util/tranquilizer.rs index f0c2b410..9c796f8b 100644 --- a/src/util/tranquilizer.rs +++ b/src/util/tranquilizer.rs @@ -71,4 +71,8 @@ impl Tranquilizer { pub fn reset(&mut self) { self.last_step_begin = Instant::now(); } + + pub fn clear(&mut self) { + self.observations.clear(); + } }