Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
8 changed files with 240 additions and 28 deletions
Showing only changes of commit 247dbcd598 - Show all commits

1
Cargo.lock generated
View file

@ -1049,6 +1049,7 @@ dependencies = [
name = "garage_block" name = "garage_block"
version = "0.7.0" version = "0.7.0"
dependencies = [ dependencies = [
"arc-swap",
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.1.0",
"futures", "futures",

View file

@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17" opentelemetry = "0.17"
arc-swap = "1.5"
async-trait = "0.1.7" async-trait = "0.1.7"
bytes = "1.0" bytes = "1.0"
hex = "0.4" hex = "0.4"

View file

@ -3,6 +3,7 @@ use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -10,7 +11,7 @@ use futures::future::*;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::select; use tokio::select;
use tokio::sync::{watch, Mutex, Notify}; use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{ use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@ -35,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*; use crate::block::*;
use crate::metrics::*; use crate::metrics::*;
use crate::rc::*; use crate::rc::*;
use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files /// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
@ -86,6 +88,8 @@ pub struct BlockManager {
pub replication: TableShardedReplication, pub replication: TableShardedReplication,
/// Directory in which block are stored /// Directory in which block are stored
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// State store (only used by scrub worker to store time of last scrub)
pub(crate) state_variables_store: db::Tree,
compression_level: Option<i32>, compression_level: Option<i32>,
background_tranquility: u32, background_tranquility: u32,
@ -102,6 +106,8 @@ pub struct BlockManager {
endpoint: Arc<Endpoint<BlockRpc, Self>>, endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics, metrics: BlockManagerMetrics,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
} }
// This custom struct contains functions that must only be ran // This custom struct contains functions that must only be ran
@ -141,6 +147,10 @@ impl BlockManager {
let resync_errors = let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
let state_variables_store = db
.open_tree("state_variables")
.expect("Unable to open state_variables tree");
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint("garage_block/manager.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
@ -159,13 +169,15 @@ impl BlockManager {
resync_queue, resync_queue,
resync_notify: Notify::new(), resync_notify: Notify::new(),
resync_errors, resync_errors,
state_variables_store,
system, system,
endpoint, endpoint,
metrics, metrics,
tx_scrub_command: ArcSwapOption::new(None),
}); });
block_manager.endpoint.set_handler(block_manager.clone()); block_manager.endpoint.set_handler(block_manager.clone());
block_manager.clone().spawn_background_worker(); block_manager.clone().spawn_background_workers();
block_manager block_manager
} }
@ -242,6 +254,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?) Ok(self.rc.rc.len()?)
} }
/// Send command to start/stop/manager scrub worker
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
let _ = self
.tx_scrub_command
.load()
.as_ref()
.unwrap()
.send(cmd)
.await;
}
//// ----- Managing the reference counter ---- //// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is /// Increment the number of time a block is used, putting it to resynchronization if it is
@ -475,11 +498,11 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay // for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly). // is a natural condition that is handled properly).
fn spawn_background_worker(self: Arc<Self>) { fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing // Launch a background workers for background resync loop processing
let background = self.system.background.clone(); let background = self.system.background.clone();
let worker = ResyncWorker { let worker = ResyncWorker {
manager: self, manager: self.clone(),
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10), next_delay: Duration::from_secs(10),
}; };
@ -487,6 +510,12 @@ impl BlockManager {
tokio::time::sleep(Duration::from_secs(10)).await; tokio::time::sleep(Duration::from_secs(10)).await;
background.spawn_worker(worker); background.spawn_worker(worker);
}); });
// Launch a background worker for data store scrubs
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4);
self.system.background.spawn_worker(scrub_worker);
} }
pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> { pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {

View file

@ -1,20 +1,26 @@
use core::ops::Bound; use core::ops::Bound;
use std::convert::TryInto;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::fs; use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::watch; use tokio::sync::watch;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*; use crate::manager::*;
const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
const TIME_LAST_COMPLETE_SCRUB: &[u8] = b"time_last_complete_scrub";
pub struct RepairWorker { pub struct RepairWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
next_start: Option<Hash>, next_start: Option<Hash>,
@ -129,19 +135,107 @@ impl Worker for RepairWorker {
pub struct ScrubWorker { pub struct ScrubWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
iterator: BlockStoreIterator, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
work: ScrubWorkerState,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
tranquility: u32, tranquility: u32,
time_last_complete_scrub: u64,
}
enum ScrubWorkerState {
Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
Finished,
}
impl Default for ScrubWorkerState {
fn default() -> Self {
ScrubWorkerState::Finished
}
}
pub enum ScrubWorkerCommand {
Start,
Pause(Duration),
Resume,
Cancel,
SetTranquility(u32),
} }
impl ScrubWorker { impl ScrubWorker {
pub fn new(manager: Arc<BlockManager>, tranquility: u32) -> Self { pub fn new(
let iterator = BlockStoreIterator::new(&manager); manager: Arc<BlockManager>,
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
tranquility: u32,
) -> Self {
let time_last_complete_scrub = match manager
.state_variables_store
.get(TIME_LAST_COMPLETE_SCRUB)
.expect("DB error when initializing scrub worker")
{
Some(v) => u64::from_be_bytes(v.try_into().unwrap()),
None => 0,
};
Self { Self {
manager, manager,
iterator, rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
tranquility, tranquility,
time_last_complete_scrub,
}
}
fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
match cmd {
ScrubWorkerCommand::Start => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Finished => {
let iterator = BlockStoreIterator::new(&self.manager);
ScrubWorkerState::Running(iterator)
}
work => {
error!("Cannot start scrub worker: already running!");
work
}
};
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
}
work => {
error!("Cannot pause scrub worker: not running!");
work
}
};
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
work => {
error!("Cannot resume scrub worker: not paused!");
work
}
};
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
ScrubWorkerState::Finished
}
work => {
error!("Cannot cancel scrub worker: not running!");
work
}
}
}
ScrubWorkerCommand::SetTranquility(t) => {
self.tranquility = t;
}
} }
} }
} }
@ -153,24 +247,80 @@ impl Worker for ScrubWorker {
} }
fn info(&self) -> Option<String> { fn info(&self) -> Option<String> {
Some(format!("{:.2}% done", self.iterator.progress() * 100.)) match &self.work {
ScrubWorkerState::Running(bsi) => Some(format!("{:.2}% done", bsi.progress() * 100.)),
ScrubWorkerState::Paused(_bsi, rt) => {
Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt)))
}
ScrubWorkerState::Finished => Some(format!(
"Last completed scrub: {}",
msec_to_rfc3339(self.time_last_complete_scrub)
)),
}
} }
async fn work( async fn work(
&mut self, &mut self,
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerStatus, Error> {
self.tranquilizer.reset(); match self.rx_cmd.try_recv() {
if let Some(hash) = self.iterator.next().await? { Ok(cmd) => self.handle_cmd(cmd),
let _ = self.manager.read_block(&hash).await; Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done),
Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) Err(mpsc::error::TryRecvError::Empty) => (),
} else { };
Ok(WorkerStatus::Done)
match &mut self.work {
ScrubWorkerState::Running(bsi) => {
self.tranquilizer.reset();
if let Some(hash) = bsi.next().await? {
let _ = self.manager.read_block(&hash).await;
Ok(self.tranquilizer.tranquilize_worker(self.tranquility))
} else {
self.time_last_complete_scrub = now_msec(); // TODO save to file
self.manager.state_variables_store.insert(
TIME_LAST_COMPLETE_SCRUB,
u64::to_be_bytes(self.time_last_complete_scrub),
)?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
Ok(WorkerStatus::Idle)
}
}
_ => Ok(WorkerStatus::Idle),
} }
} }
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus { async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
unreachable!() match &self.work {
ScrubWorkerState::Running(_) => return WorkerStatus::Busy,
ScrubWorkerState::Paused(_, resume_time) => {
let delay = Duration::from_millis(resume_time - now_msec());
select! {
_ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume),
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd);
} else {
return WorkerStatus::Done;
}
}
}
ScrubWorkerState::Finished => {
let delay = SCRUB_INTERVAL
- Duration::from_secs(now_msec() - self.time_last_complete_scrub);
select! {
_ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start),
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd);
} else {
return WorkerStatus::Done;
}
}
}
}
match &self.work {
ScrubWorkerState::Running(_) => WorkerStatus::Busy,
_ => WorkerStatus::Idle,
}
} }
} }

View file

@ -698,7 +698,7 @@ impl AdminRpcHandler {
))) )))
} }
} else { } else {
launch_online_repair(self.garage.clone(), opt); launch_online_repair(self.garage.clone(), opt).await;
Ok(AdminRpc::Ok(format!( Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}", "Repair launched on {:?}",
self.garage.system.id self.garage.system.id

View file

@ -427,8 +427,29 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive) /// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")] #[structopt(name = "scrub")]
Scrub { Scrub {
/// Tranquility factor (see tranquilizer documentation) #[structopt(subcommand)]
#[structopt(name = "tranquility", default_value = "2")] cmd: ScrubCmd,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start")]
Start,
/// Pause scrub (it will resume automatically after 24 hours)
#[structopt(name = "pause")]
Pause,
/// Resume paused scrub
#[structopt(name = "resume")]
Resume,
/// Cancel scrub in progress
#[structopt(name = "cancel")]
Cancel,
/// Set tranquility level for in-progress and future scrubs
#[structopt(name = "set-tranquility")]
SetTranquility {
#[structopt()]
tranquility: u32, tranquility: u32,
}, },
} }

View file

@ -1,8 +1,10 @@
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use tokio::sync::watch; use tokio::sync::watch;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*; use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*; use garage_model::s3::object_table::*;
@ -13,7 +15,7 @@ use garage_util::error::Error;
use crate::*; use crate::*;
pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) { pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
match opt.what { match opt.what {
RepairWhat::Tables => { RepairWhat::Tables => {
info!("Launching a full sync of tables"); info!("Launching a full sync of tables");
@ -43,14 +45,18 @@ pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
garage.block_manager.clone(), garage.block_manager.clone(),
)); ));
} }
RepairWhat::Scrub { tranquility } => { RepairWhat::Scrub { cmd } => {
info!("Verifying integrity of stored blocks"); info!("Verifying integrity of stored blocks");
garage let cmd = match cmd {
.background ScrubCmd::Start => ScrubWorkerCommand::Start,
.spawn_worker(garage_block::repair::ScrubWorker::new( ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
garage.block_manager.clone(), ScrubCmd::Resume => ScrubWorkerCommand::Resume,
tranquility, ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
)); ScrubCmd::SetTranquility { tranquility } => {
ScrubWorkerCommand::SetTranquility(tranquility)
}
};
garage.block_manager.send_scrub_command(cmd).await;
} }
} }
} }

View file

@ -71,4 +71,8 @@ impl Tranquilizer {
pub fn reset(&mut self) { pub fn reset(&mut self) {
self.last_step_begin = Instant::now(); self.last_step_begin = Instant::now();
} }
pub fn clear(&mut self) {
self.observations.clear();
}
} }