445 lines
11 KiB
Rust
445 lines
11 KiB
Rust
|
use core::ops::Bound;
|
||
|
use std::path::PathBuf;
|
||
|
use std::sync::Arc;
|
||
|
use std::time::Duration;
|
||
|
|
||
|
use async_trait::async_trait;
|
||
|
use serde::{Deserialize, Serialize};
|
||
|
use tokio::fs;
|
||
|
use tokio::select;
|
||
|
use tokio::sync::mpsc;
|
||
|
use tokio::sync::watch;
|
||
|
|
||
|
use garage_util::background::*;
|
||
|
use garage_util::data::*;
|
||
|
use garage_util::error::*;
|
||
|
use garage_util::persister::Persister;
|
||
|
use garage_util::time::*;
|
||
|
use garage_util::tranquilizer::Tranquilizer;
|
||
|
|
||
|
use crate::manager::*;
|
||
|
|
||
|
const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
|
||
|
|
||
|
pub struct RepairWorker {
|
||
|
manager: Arc<BlockManager>,
|
||
|
next_start: Option<Hash>,
|
||
|
block_iter: Option<BlockStoreIterator>,
|
||
|
}
|
||
|
|
||
|
impl RepairWorker {
|
||
|
pub fn new(manager: Arc<BlockManager>) -> Self {
|
||
|
Self {
|
||
|
manager,
|
||
|
next_start: None,
|
||
|
block_iter: None,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[async_trait]
|
||
|
impl Worker for RepairWorker {
|
||
|
fn name(&self) -> String {
|
||
|
"Block repair worker".into()
|
||
|
}
|
||
|
|
||
|
fn info(&self) -> Option<String> {
|
||
|
match self.block_iter.as_ref() {
|
||
|
None => {
|
||
|
let idx_bytes = self
|
||
|
.next_start
|
||
|
.as_ref()
|
||
|
.map(|x| x.as_slice())
|
||
|
.unwrap_or(&[]);
|
||
|
let idx_bytes = if idx_bytes.len() > 4 {
|
||
|
&idx_bytes[..4]
|
||
|
} else {
|
||
|
idx_bytes
|
||
|
};
|
||
|
Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
|
||
|
}
|
||
|
Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||
|
match self.block_iter.as_mut() {
|
||
|
None => {
|
||
|
// Phase 1: Repair blocks from RC table.
|
||
|
|
||
|
// We have to do this complicated two-step process where we first read a bunch
|
||
|
// of hashes from the RC table, and then insert them in the to-resync queue,
|
||
|
// because of SQLite. Basically, as long as we have an iterator on a DB table,
|
||
|
// we can't do anything else on the DB. The naive approach (which we had previously)
|
||
|
// of just iterating on the RC table and inserting items one to one in the resync
|
||
|
// queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
|
||
|
// This is mostly because the Rust bindings for SQLite assume a worst-case scenario
|
||
|
// where SQLite is not compiled in thread-safe mode, so we have to wrap everything
|
||
|
// in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
|
||
|
// TODO: maybe do this with tokio::task::spawn_blocking ?
|
||
|
let mut batch_of_hashes = vec![];
|
||
|
let start_bound = match self.next_start.as_ref() {
|
||
|
None => Bound::Unbounded,
|
||
|
Some(x) => Bound::Excluded(x.as_slice()),
|
||
|
};
|
||
|
for entry in self
|
||
|
.manager
|
||
|
.rc
|
||
|
.rc
|
||
|
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
|
||
|
{
|
||
|
let (hash, _) = entry?;
|
||
|
let hash = Hash::try_from(&hash[..]).unwrap();
|
||
|
batch_of_hashes.push(hash);
|
||
|
if batch_of_hashes.len() >= 1000 {
|
||
|
break;
|
||
|
}
|
||
|
}
|
||
|
if batch_of_hashes.is_empty() {
|
||
|
// move on to phase 2
|
||
|
self.block_iter = Some(BlockStoreIterator::new(&self.manager));
|
||
|
return Ok(WorkerState::Busy);
|
||
|
}
|
||
|
|
||
|
for hash in batch_of_hashes.into_iter() {
|
||
|
self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
|
||
|
self.next_start = Some(hash)
|
||
|
}
|
||
|
|
||
|
Ok(WorkerState::Busy)
|
||
|
}
|
||
|
Some(bi) => {
|
||
|
// Phase 2: Repair blocks actually on disk
|
||
|
// Lists all blocks on disk and adds them to the resync queue.
|
||
|
// This allows us to find blocks we are storing but don't actually need,
|
||
|
// so that we can offload them if necessary and then delete them locally.
|
||
|
if let Some(hash) = bi.next().await? {
|
||
|
self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
|
||
|
Ok(WorkerState::Busy)
|
||
|
} else {
|
||
|
Ok(WorkerState::Done)
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
|
||
|
unreachable!()
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ----
|
||
|
|
||
|
pub struct ScrubWorker {
|
||
|
manager: Arc<BlockManager>,
|
||
|
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
|
||
|
|
||
|
work: ScrubWorkerState,
|
||
|
tranquilizer: Tranquilizer,
|
||
|
|
||
|
persister: Persister<ScrubWorkerPersisted>,
|
||
|
persisted: ScrubWorkerPersisted,
|
||
|
}
|
||
|
|
||
|
#[derive(Serialize, Deserialize)]
|
||
|
struct ScrubWorkerPersisted {
|
||
|
tranquility: u32,
|
||
|
time_last_complete_scrub: u64,
|
||
|
corruptions_detected: u64,
|
||
|
}
|
||
|
|
||
|
enum ScrubWorkerState {
|
||
|
Running(BlockStoreIterator),
|
||
|
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
|
||
|
Finished,
|
||
|
}
|
||
|
|
||
|
impl Default for ScrubWorkerState {
|
||
|
fn default() -> Self {
|
||
|
ScrubWorkerState::Finished
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[derive(Debug)]
|
||
|
pub enum ScrubWorkerCommand {
|
||
|
Start,
|
||
|
Pause(Duration),
|
||
|
Resume,
|
||
|
Cancel,
|
||
|
SetTranquility(u32),
|
||
|
}
|
||
|
|
||
|
impl ScrubWorker {
|
||
|
pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
|
||
|
let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
|
||
|
let persisted = match persister.load() {
|
||
|
Ok(v) => v,
|
||
|
Err(_) => ScrubWorkerPersisted {
|
||
|
time_last_complete_scrub: 0,
|
||
|
tranquility: 4,
|
||
|
corruptions_detected: 0,
|
||
|
},
|
||
|
};
|
||
|
Self {
|
||
|
manager,
|
||
|
rx_cmd,
|
||
|
work: ScrubWorkerState::Finished,
|
||
|
tranquilizer: Tranquilizer::new(30),
|
||
|
persister,
|
||
|
persisted,
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
|
||
|
match cmd {
|
||
|
ScrubWorkerCommand::Start => {
|
||
|
self.work = match std::mem::take(&mut self.work) {
|
||
|
ScrubWorkerState::Finished => {
|
||
|
let iterator = BlockStoreIterator::new(&self.manager);
|
||
|
ScrubWorkerState::Running(iterator)
|
||
|
}
|
||
|
work => {
|
||
|
error!("Cannot start scrub worker: already running!");
|
||
|
work
|
||
|
}
|
||
|
};
|
||
|
}
|
||
|
ScrubWorkerCommand::Pause(dur) => {
|
||
|
self.work = match std::mem::take(&mut self.work) {
|
||
|
ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
|
||
|
ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
|
||
|
}
|
||
|
work => {
|
||
|
error!("Cannot pause scrub worker: not running!");
|
||
|
work
|
||
|
}
|
||
|
};
|
||
|
}
|
||
|
ScrubWorkerCommand::Resume => {
|
||
|
self.work = match std::mem::take(&mut self.work) {
|
||
|
ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
|
||
|
work => {
|
||
|
error!("Cannot resume scrub worker: not paused!");
|
||
|
work
|
||
|
}
|
||
|
};
|
||
|
}
|
||
|
ScrubWorkerCommand::Cancel => {
|
||
|
self.work = match std::mem::take(&mut self.work) {
|
||
|
ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
|
||
|
ScrubWorkerState::Finished
|
||
|
}
|
||
|
work => {
|
||
|
error!("Cannot cancel scrub worker: not running!");
|
||
|
work
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
ScrubWorkerCommand::SetTranquility(t) => {
|
||
|
self.persisted.tranquility = t;
|
||
|
if let Err(e) = self.persister.save_async(&self.persisted).await {
|
||
|
error!("Could not save new tranquilitiy value: {}", e);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[async_trait]
|
||
|
impl Worker for ScrubWorker {
|
||
|
fn name(&self) -> String {
|
||
|
"Block scrub worker".into()
|
||
|
}
|
||
|
|
||
|
fn info(&self) -> Option<String> {
|
||
|
let s = match &self.work {
|
||
|
ScrubWorkerState::Running(bsi) => format!(
|
||
|
"{:.2}% done (tranquility = {})",
|
||
|
bsi.progress() * 100.,
|
||
|
self.persisted.tranquility
|
||
|
),
|
||
|
ScrubWorkerState::Paused(bsi, rt) => {
|
||
|
format!(
|
||
|
"Paused, {:.2}% done, resumes at {}",
|
||
|
bsi.progress() * 100.,
|
||
|
msec_to_rfc3339(*rt)
|
||
|
)
|
||
|
}
|
||
|
ScrubWorkerState::Finished => format!(
|
||
|
"Last completed scrub: {}",
|
||
|
msec_to_rfc3339(self.persisted.time_last_complete_scrub)
|
||
|
),
|
||
|
};
|
||
|
Some(format!(
|
||
|
"{} ; corruptions detected: {}",
|
||
|
s, self.persisted.corruptions_detected
|
||
|
))
|
||
|
}
|
||
|
|
||
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||
|
match self.rx_cmd.try_recv() {
|
||
|
Ok(cmd) => self.handle_cmd(cmd).await,
|
||
|
Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),
|
||
|
Err(mpsc::error::TryRecvError::Empty) => (),
|
||
|
};
|
||
|
|
||
|
match &mut self.work {
|
||
|
ScrubWorkerState::Running(bsi) => {
|
||
|
self.tranquilizer.reset();
|
||
|
if let Some(hash) = bsi.next().await? {
|
||
|
match self.manager.read_block(&hash).await {
|
||
|
Err(Error::CorruptData(_)) => {
|
||
|
error!("Found corrupt data block during scrub: {:?}", hash);
|
||
|
self.persisted.corruptions_detected += 1;
|
||
|
self.persister.save_async(&self.persisted).await?;
|
||
|
}
|
||
|
Err(e) => return Err(e),
|
||
|
_ => (),
|
||
|
};
|
||
|
Ok(self
|
||
|
.tranquilizer
|
||
|
.tranquilize_worker(self.persisted.tranquility))
|
||
|
} else {
|
||
|
self.persisted.time_last_complete_scrub = now_msec();
|
||
|
self.persister.save_async(&self.persisted).await?;
|
||
|
self.work = ScrubWorkerState::Finished;
|
||
|
self.tranquilizer.clear();
|
||
|
Ok(WorkerState::Idle)
|
||
|
}
|
||
|
}
|
||
|
_ => Ok(WorkerState::Idle),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
|
||
|
let (wait_until, command) = match &self.work {
|
||
|
ScrubWorkerState::Running(_) => return WorkerState::Busy,
|
||
|
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
|
||
|
ScrubWorkerState::Finished => (
|
||
|
self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
|
||
|
ScrubWorkerCommand::Start,
|
||
|
),
|
||
|
};
|
||
|
|
||
|
let now = now_msec();
|
||
|
if now >= wait_until {
|
||
|
self.handle_cmd(command).await;
|
||
|
return WorkerState::Busy;
|
||
|
}
|
||
|
let delay = Duration::from_millis(wait_until - now);
|
||
|
select! {
|
||
|
_ = tokio::time::sleep(delay) => self.handle_cmd(command).await,
|
||
|
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
|
||
|
self.handle_cmd(cmd).await;
|
||
|
} else {
|
||
|
return WorkerState::Done;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
match &self.work {
|
||
|
ScrubWorkerState::Running(_) => WorkerState::Busy,
|
||
|
_ => WorkerState::Idle,
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// ----
|
||
|
|
||
|
struct BlockStoreIterator {
|
||
|
path: Vec<ReadingDir>,
|
||
|
}
|
||
|
|
||
|
enum ReadingDir {
|
||
|
Pending(PathBuf),
|
||
|
Read {
|
||
|
subpaths: Vec<fs::DirEntry>,
|
||
|
pos: usize,
|
||
|
},
|
||
|
}
|
||
|
|
||
|
impl BlockStoreIterator {
|
||
|
fn new(manager: &BlockManager) -> Self {
|
||
|
let root_dir = manager.data_dir.clone();
|
||
|
Self {
|
||
|
path: vec![ReadingDir::Pending(root_dir)],
|
||
|
}
|
||
|
}
|
||
|
|
||
|
/// Returns progress done, between 0 and 1
|
||
|
fn progress(&self) -> f32 {
|
||
|
if self.path.is_empty() {
|
||
|
1.0
|
||
|
} else {
|
||
|
let mut ret = 0.0;
|
||
|
let mut next_div = 1;
|
||
|
for p in self.path.iter() {
|
||
|
match p {
|
||
|
ReadingDir::Pending(_) => break,
|
||
|
ReadingDir::Read { subpaths, pos } => {
|
||
|
next_div *= subpaths.len();
|
||
|
ret += ((*pos - 1) as f32) / (next_div as f32);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
ret
|
||
|
}
|
||
|
}
|
||
|
|
||
|
async fn next(&mut self) -> Result<Option<Hash>, Error> {
|
||
|
loop {
|
||
|
let last_path = match self.path.last_mut() {
|
||
|
None => return Ok(None),
|
||
|
Some(lp) => lp,
|
||
|
};
|
||
|
|
||
|
if let ReadingDir::Pending(path) = last_path {
|
||
|
let mut reader = fs::read_dir(&path).await?;
|
||
|
let mut subpaths = vec![];
|
||
|
while let Some(ent) = reader.next_entry().await? {
|
||
|
subpaths.push(ent);
|
||
|
}
|
||
|
*last_path = ReadingDir::Read { subpaths, pos: 0 };
|
||
|
}
|
||
|
|
||
|
let (subpaths, pos) = match *last_path {
|
||
|
ReadingDir::Read {
|
||
|
ref subpaths,
|
||
|
ref mut pos,
|
||
|
} => (subpaths, pos),
|
||
|
ReadingDir::Pending(_) => unreachable!(),
|
||
|
};
|
||
|
|
||
|
let data_dir_ent = match subpaths.get(*pos) {
|
||
|
None => {
|
||
|
self.path.pop();
|
||
|
continue;
|
||
|
}
|
||
|
Some(ent) => {
|
||
|
*pos += 1;
|
||
|
ent
|
||
|
}
|
||
|
};
|
||
|
|
||
|
let name = data_dir_ent.file_name();
|
||
|
let name = if let Ok(n) = name.into_string() {
|
||
|
n
|
||
|
} else {
|
||
|
continue;
|
||
|
};
|
||
|
let ent_type = data_dir_ent.file_type().await?;
|
||
|
|
||
|
let name = name.strip_suffix(".zst").unwrap_or(&name);
|
||
|
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
||
|
let path = data_dir_ent.path();
|
||
|
self.path.push(ReadingDir::Pending(path));
|
||
|
} else if name.len() == 64 {
|
||
|
if let Ok(h) = hex::decode(&name) {
|
||
|
let mut hash = [0u8; 32];
|
||
|
hash.copy_from_slice(&h);
|
||
|
return Ok(Some(hash.into()));
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|