use core::ops::Bound; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; use rand::Rng; use tokio::fs; use tokio::select; use tokio::sync::mpsc; use tokio::sync::watch; use garage_util::background::*; use garage_util::data::*; use garage_util::error::*; use garage_util::persister::PersisterShared; use garage_util::time::*; use garage_util::tranquilizer::Tranquilizer; use crate::layout::*; use crate::manager::*; // Full scrub every 25 days with a random element of 10 days mixed in below const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 25); // Scrub tranquility is initially set to 4, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_SCRUB_TRANQUILITY: u32 = 4; // ---- ---- ---- // FIRST KIND OF REPAIR: FINDING MISSING BLOCKS/USELESS BLOCKS // This is a one-shot repair operation that can be launched, // checks everything, and then exits. // ---- ---- ---- pub struct RepairWorker { manager: Arc, next_start: Option, block_iter: Option, } impl RepairWorker { pub fn new(manager: Arc) -> Self { Self { manager, next_start: None, block_iter: None, } } } #[async_trait] impl Worker for RepairWorker { fn name(&self) -> String { "Block repair worker".into() } fn status(&self) -> WorkerStatus { match self.block_iter.as_ref() { None => { let idx_bytes = self .next_start .as_ref() .map(|x| x.as_slice()) .unwrap_or(&[]); let idx_bytes = if idx_bytes.len() > 4 { &idx_bytes[..4] } else { idx_bytes }; WorkerStatus { progress: Some("0.00%".into()), freeform: vec![format!( "Currently in phase 1, iterator position: {}", hex::encode(idx_bytes) )], ..Default::default() } } Some(bi) => WorkerStatus { progress: Some(format!("{:.2}%", bi.progress() * 100.)), freeform: vec!["Currently in phase 2".into()], ..Default::default() }, } } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { match self.block_iter.as_mut() { None => { // Phase 1: Repair blocks from RC table. // We have to do this complicated two-step process where we first read a bunch // of hashes from the RC table, and then insert them in the to-resync queue, // because of SQLite. Basically, as long as we have an iterator on a DB table, // we can't do anything else on the DB. The naive approach (which we had previously) // of just iterating on the RC table and inserting items one to one in the resync // queue can't work here, it would just provoke a deadlock in the SQLite adapter code. // This is mostly because the Rust bindings for SQLite assume a worst-case scenario // where SQLite is not compiled in thread-safe mode, so we have to wrap everything // in a mutex (see db/sqlite_adapter.rs and discussion in PR #322). // TODO: maybe do this with tokio::task::spawn_blocking ? let mut batch_of_hashes = vec![]; let start_bound = match self.next_start.as_ref() { None => Bound::Unbounded, Some(x) => Bound::Excluded(x.as_slice()), }; for entry in self .manager .rc .rc .range::<&[u8], _>((start_bound, Bound::Unbounded))? { let (hash, _) = entry?; let hash = Hash::try_from(&hash[..]).unwrap(); batch_of_hashes.push(hash); if batch_of_hashes.len() >= 1000 { break; } } if batch_of_hashes.is_empty() { // move on to phase 2 self.block_iter = Some(BlockStoreIterator::new(&self.manager)); return Ok(WorkerState::Busy); } for hash in batch_of_hashes.into_iter() { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; self.next_start = Some(hash) } Ok(WorkerState::Busy) } Some(bi) => { // Phase 2: Repair blocks actually on disk // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. if let Some((_path, hash)) = bi.next().await? { self.manager .resync .put_to_resync(&hash, Duration::from_secs(0))?; Ok(WorkerState::Busy) } else { Ok(WorkerState::Done) } } } } async fn wait_for_work(&mut self) -> WorkerState { unreachable!() } } // ---- ---- ---- // SECOND KIND OF REPAIR: SCRUBBING THE DATASTORE // This is significantly more complex than the process above, // as it is a continuously-running task that triggers automatically // every SCRUB_INTERVAL, but can also be triggered manually // and whose parameter (esp. speed) can be controlled at runtime. // ---- ---- ---- mod v081 { use serde::{Deserialize, Serialize}; #[derive(Serialize, Deserialize)] pub struct ScrubWorkerPersisted { pub tranquility: u32, pub(crate) time_last_complete_scrub: u64, pub(crate) corruptions_detected: u64, } impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {} } mod v082 { use serde::{Deserialize, Serialize}; use super::v081; #[derive(Serialize, Deserialize)] pub struct ScrubWorkerPersisted { pub tranquility: u32, pub(crate) time_last_complete_scrub: u64, pub(crate) time_next_run_scrub: u64, pub(crate) corruptions_detected: u64, } impl garage_util::migrate::Migrate for ScrubWorkerPersisted { type Previous = v081::ScrubWorkerPersisted; const VERSION_MARKER: &'static [u8] = b"G082bswp"; fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted { use crate::repair::randomize_next_scrub_run_time; ScrubWorkerPersisted { tranquility: old.tranquility, time_last_complete_scrub: old.time_last_complete_scrub, time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub), corruptions_detected: old.corruptions_detected, } } } } pub use v082::*; pub struct ScrubWorker { manager: Arc, rx_cmd: mpsc::Receiver, work: ScrubWorkerState, tranquilizer: Tranquilizer, persister: PersisterShared, } fn randomize_next_scrub_run_time(timestamp: u64) -> u64 { // Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to // balance scrub load across different cluster nodes. timestamp + SCRUB_INTERVAL .saturating_add(Duration::from_secs( rand::thread_rng().gen_range(0..3600 * 24 * 10), )) .as_millis() as u64 } impl Default for ScrubWorkerPersisted { fn default() -> Self { ScrubWorkerPersisted { time_last_complete_scrub: 0, time_next_run_scrub: randomize_next_scrub_run_time(now_msec()), tranquility: INITIAL_SCRUB_TRANQUILITY, corruptions_detected: 0, } } } #[derive(Default)] enum ScrubWorkerState { Running(BlockStoreIterator), Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub #[default] Finished, } #[derive(Debug)] pub enum ScrubWorkerCommand { Start, Pause(Duration), Resume, Cancel, } impl ScrubWorker { pub(crate) fn new( manager: Arc, rx_cmd: mpsc::Receiver, persister: PersisterShared, ) -> Self { Self { manager, rx_cmd, work: ScrubWorkerState::Finished, tranquilizer: Tranquilizer::new(30), persister, } } async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) { match cmd { ScrubWorkerCommand::Start => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Finished => { info!("Scrub worker initializing, now performing datastore scrub"); let iterator = BlockStoreIterator::new(&self.manager); ScrubWorkerState::Running(iterator) } work => { error!("Cannot start scrub worker: already running!"); work } }; } ScrubWorkerCommand::Pause(dur) => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => { ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64) } work => { error!("Cannot pause scrub worker: not running!"); work } }; } ScrubWorkerCommand::Resume => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it), work => { error!("Cannot resume scrub worker: not paused!"); work } }; } ScrubWorkerCommand::Cancel => { self.work = match std::mem::take(&mut self.work) { ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => { ScrubWorkerState::Finished } work => { error!("Cannot cancel scrub worker: not running!"); work } } } } } } #[async_trait] impl Worker for ScrubWorker { fn name(&self) -> String { "Block scrub worker".into() } fn status(&self) -> WorkerStatus { let (corruptions_detected, tranquility, time_last_complete_scrub, time_next_run_scrub) = self.persister.get_with(|p| { ( p.corruptions_detected, p.tranquility, p.time_last_complete_scrub, p.time_next_run_scrub, ) }); let mut s = WorkerStatus { persistent_errors: Some(corruptions_detected), tranquility: Some(tranquility), ..Default::default() }; match &self.work { ScrubWorkerState::Running(bsi) => { s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); } ScrubWorkerState::Paused(bsi, rt) => { s.progress = Some(format!("{:.2}%", bsi.progress() * 100.)); s.freeform = vec![format!("Scrub paused, resumes at {}", msec_to_rfc3339(*rt))]; } ScrubWorkerState::Finished => { s.freeform = vec![ format!( "Last scrub completed at {}", msec_to_rfc3339(time_last_complete_scrub), ), format!( "Next scrub scheduled for {}", msec_to_rfc3339(time_next_run_scrub) ), ]; } } s } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { match self.rx_cmd.try_recv() { Ok(cmd) => self.handle_cmd(cmd).await, Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done), Err(mpsc::error::TryRecvError::Empty) => (), }; match &mut self.work { ScrubWorkerState::Running(bsi) => { self.tranquilizer.reset(); if let Some((_path, hash)) = bsi.next().await? { match self.manager.read_block(&hash).await { Err(Error::CorruptData(_)) => { error!("Found corrupt data block during scrub: {:?}", hash); self.persister.set_with(|p| p.corruptions_detected += 1)?; } Err(e) => return Err(e), _ => (), }; Ok(self .tranquilizer .tranquilize_worker(self.persister.get_with(|p| p.tranquility))) } else { let now = now_msec(); let next_scrub_timestamp = randomize_next_scrub_run_time(now); self.persister.set_with(|p| { p.time_last_complete_scrub = now; p.time_next_run_scrub = next_scrub_timestamp; })?; self.work = ScrubWorkerState::Finished; self.tranquilizer.clear(); info!( "Datastore scrub completed, next scrub scheduled for {}", msec_to_rfc3339(next_scrub_timestamp) ); Ok(WorkerState::Idle) } } _ => Ok(WorkerState::Idle), } } async fn wait_for_work(&mut self) -> WorkerState { let (wait_until, command) = match &self.work { ScrubWorkerState::Running(_) => return WorkerState::Busy, ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume), ScrubWorkerState::Finished => ( self.persister.get_with(|p| p.time_next_run_scrub), ScrubWorkerCommand::Start, ), }; let now = now_msec(); if now >= wait_until { self.handle_cmd(command).await; return WorkerState::Busy; } let delay = Duration::from_millis(wait_until - now); select! { _ = tokio::time::sleep(delay) => self.handle_cmd(command).await, cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { self.handle_cmd(cmd).await; } else { return WorkerState::Done; } } match &self.work { ScrubWorkerState::Running(_) => WorkerState::Busy, _ => WorkerState::Idle, } } } // ---- ---- ---- // UTILITY FOR ENUMERATING THE BLOCK STORE // ---- ---- ---- const PROGRESS_FP: u64 = 1_000_000_000; struct BlockStoreIterator { todo: Vec, } enum BsiTodo { Directory { path: PathBuf, progress_min: u64, progress_max: u64, }, File { path: PathBuf, filename: String, progress: u64, }, } impl BlockStoreIterator { fn new(manager: &BlockManager) -> Self { let min_cap = manager .data_layout .data_dirs .iter() .filter_map(|x| match x.state { DataDirState::Active { capacity } => Some(capacity), _ => None, }) .min() .unwrap_or(0); let sum_cap = manager .data_layout .data_dirs .iter() .map(|x| match x.state { DataDirState::Active { capacity } => capacity, _ => min_cap, // approximation }) .sum::() as u128; let mut cum_cap = 0; let mut todo = vec![]; for dir in manager.data_layout.data_dirs.iter() { let cap = match dir.state { DataDirState::Active { capacity } => capacity, _ => min_cap, }; let progress_min = ((cum_cap as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; let progress_max = (((cum_cap + cap) as u128 * PROGRESS_FP as u128) / (sum_cap as u128)) as u64; cum_cap += cap; todo.push(BsiTodo::Directory { path: dir.path.clone(), progress_min, progress_max, }); } // entries are processed back-to-front (because of .pop()), // so reverse entries to process them in increasing progress bounds todo.reverse(); let ret = Self { todo }; debug_assert!(ret.progress_invariant()); ret } /// Returns progress done, between 0 and 1 fn progress(&self) -> f32 { self.todo .last() .map(|x| match x { BsiTodo::Directory { progress_min, .. } => *progress_min, BsiTodo::File { progress, .. } => *progress, }) .map(|x| x as f32 / PROGRESS_FP as f32) .unwrap_or(1.0) } async fn next(&mut self) -> Result, Error> { loop { match self.todo.pop() { None => return Ok(None), Some(BsiTodo::Directory { path, progress_min, progress_max, }) => { let istart = self.todo.len(); let mut reader = fs::read_dir(&path).await?; while let Some(ent) = reader.next_entry().await? { let name = if let Ok(n) = ent.file_name().into_string() { n } else { continue; }; let ft = ent.file_type().await?; if ft.is_dir() && hex::decode(&name).is_ok() { self.todo.push(BsiTodo::Directory { path: ent.path(), progress_min: 0, progress_max: 0, }); } else if ft.is_file() { self.todo.push(BsiTodo::File { path: ent.path(), filename: name, progress: 0, }); } } let count = self.todo.len() - istart; for (i, ent) in self.todo[istart..].iter_mut().enumerate() { let p1 = progress_min + ((progress_max - progress_min) * i as u64) / count as u64; let p2 = progress_min + ((progress_max - progress_min) * (i + 1) as u64) / count as u64; match ent { BsiTodo::Directory { progress_min, progress_max, .. } => { *progress_min = p1; *progress_max = p2; } BsiTodo::File { progress, .. } => { *progress = p1; } } } self.todo[istart..].reverse(); debug_assert!(self.progress_invariant()); } Some(BsiTodo::File { path, filename, .. }) => { let filename = filename.strip_suffix(".zst").unwrap_or(&filename); if filename.len() == 64 { if let Ok(h) = hex::decode(filename) { let mut hash = [0u8; 32]; hash.copy_from_slice(&h); return Ok(Some((path, hash.into()))); } } } } } } fn progress_invariant(&self) -> bool { let iter = self.todo.iter().map(|x| match x { BsiTodo::Directory { progress_min, .. } => progress_min, BsiTodo::File { progress, .. } => progress, }); let iter_1 = iter.clone().skip(1); iter.zip(iter_1).all(|(prev, next)| prev >= next) } }