block/repair.rs: Added migration for ScrubWorkerPersisted's time_next_run_scrub. #523
2 changed files with 49 additions and 14 deletions
|
@ -152,6 +152,7 @@ impl BlockManager {
|
||||||
tx_scrub_command: ArcSwapOption::new(None),
|
tx_scrub_command: ArcSwapOption::new(None),
|
||||||
});
|
});
|
||||||
block_manager.endpoint.set_handler(block_manager.clone());
|
block_manager.endpoint.set_handler(block_manager.clone());
|
||||||
|
block_manager.scrub_persister.set_with(|_| ()).unwrap();
|
||||||
|
|
||||||
block_manager
|
block_manager
|
||||||
}
|
}
|
||||||
|
|
|
@ -5,7 +5,6 @@ use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use rand::Rng;
|
use rand::Rng;
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
@ -162,6 +161,50 @@ impl Worker for RepairWorker {
|
||||||
// and whose parameter (esp. speed) can be controlled at runtime.
|
// and whose parameter (esp. speed) can be controlled at runtime.
|
||||||
// ---- ---- ----
|
// ---- ---- ----
|
||||||
|
|
||||||
|
mod v081 {
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct ScrubWorkerPersisted {
|
||||||
|
pub tranquility: u32,
|
||||||
|
pub(crate) time_last_complete_scrub: u64,
|
||||||
|
pub(crate) corruptions_detected: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
|
||||||
|
}
|
||||||
|
|
||||||
|
mod v082 {
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use super::v081;
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
pub struct ScrubWorkerPersisted {
|
||||||
|
pub tranquility: u32,
|
||||||
|
pub(crate) time_last_complete_scrub: u64,
|
||||||
|
pub(crate) time_next_run_scrub: u64,
|
||||||
|
pub(crate) corruptions_detected: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl garage_util::migrate::Migrate for ScrubWorkerPersisted {
|
||||||
|
type Previous = v081::ScrubWorkerPersisted;
|
||||||
|
|
||||||
|
fn migrate(old: v081::ScrubWorkerPersisted) -> ScrubWorkerPersisted {
|
||||||
|
use crate::repair::randomize_next_scrub_run_time;
|
||||||
|
|
||||||
|
ScrubWorkerPersisted {
|
||||||
|
tranquility: old.tranquility,
|
||||||
|
time_last_complete_scrub: old.time_last_complete_scrub,
|
||||||
|
time_next_run_scrub: randomize_next_scrub_run_time(old.time_last_complete_scrub),
|
||||||
|
corruptions_detected: old.corruptions_detected,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub use v082::*;
|
||||||
|
|
||||||
pub struct ScrubWorker {
|
pub struct ScrubWorker {
|
||||||
manager: Arc<BlockManager>,
|
manager: Arc<BlockManager>,
|
||||||
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
|
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
|
||||||
|
@ -172,19 +215,11 @@ pub struct ScrubWorker {
|
||||||
persister: PersisterShared<ScrubWorkerPersisted>,
|
persister: PersisterShared<ScrubWorkerPersisted>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
fn randomize_next_scrub_run_time(timestamp: u64) -> u64 {
|
||||||
pub struct ScrubWorkerPersisted {
|
|
||||||
pub tranquility: u32,
|
|
||||||
pub(crate) time_last_complete_scrub: u64,
|
|
||||||
pub(crate) time_next_run_scrub: u64,
|
|
||||||
pub(crate) corruptions_detected: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
fn randomize_next_scrub_run_time() -> u64 {
|
|
||||||
// Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
|
// Take SCRUB_INTERVAL and mix in a random interval of 10 days to attempt to
|
||||||
// balance scrub load across different cluster nodes.
|
// balance scrub load across different cluster nodes.
|
||||||
|
|
||||||
let next_run_timestamp = now_msec()
|
let next_run_timestamp = timestamp
|
||||||
+ SCRUB_INTERVAL
|
+ SCRUB_INTERVAL
|
||||||
.saturating_add(Duration::from_secs(
|
.saturating_add(Duration::from_secs(
|
||||||
rand::thread_rng().gen_range(0..3600 * 24 * 10),
|
rand::thread_rng().gen_range(0..3600 * 24 * 10),
|
||||||
|
@ -194,12 +229,11 @@ fn randomize_next_scrub_run_time() -> u64 {
|
||||||
next_run_timestamp
|
next_run_timestamp
|
||||||
}
|
}
|
||||||
|
|
||||||
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
|
|
||||||
impl Default for ScrubWorkerPersisted {
|
impl Default for ScrubWorkerPersisted {
|
||||||
fn default() -> Self {
|
fn default() -> Self {
|
||||||
ScrubWorkerPersisted {
|
ScrubWorkerPersisted {
|
||||||
time_last_complete_scrub: 0,
|
time_last_complete_scrub: 0,
|
||||||
time_next_run_scrub: randomize_next_scrub_run_time(),
|
time_next_run_scrub: randomize_next_scrub_run_time(now_msec()),
|
||||||
tranquility: INITIAL_SCRUB_TRANQUILITY,
|
tranquility: INITIAL_SCRUB_TRANQUILITY,
|
||||||
corruptions_detected: 0,
|
corruptions_detected: 0,
|
||||||
}
|
}
|
||||||
|
@ -361,7 +395,7 @@ impl Worker for ScrubWorker {
|
||||||
} else {
|
} else {
|
||||||
self.persister.set_with(|p| {
|
self.persister.set_with(|p| {
|
||||||
p.time_last_complete_scrub = now_msec();
|
p.time_last_complete_scrub = now_msec();
|
||||||
p.time_next_run_scrub = randomize_next_scrub_run_time();
|
p.time_next_run_scrub = randomize_next_scrub_run_time(now_msec());
|
||||||
})?;
|
})?;
|
||||||
self.work = ScrubWorkerState::Finished;
|
self.work = ScrubWorkerState::Finished;
|
||||||
self.tranquilizer.clear();
|
self.tranquilizer.clear();
|
||||||
|
|
Loading…
Reference in a new issue