Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
6 changed files with 76 additions and 47 deletions
Showing only changes of commit b053fc0518 - Show all commits

View file

@ -88,8 +88,6 @@ pub struct BlockManager {
pub replication: TableShardedReplication, pub replication: TableShardedReplication,
/// Directory in which block are stored /// Directory in which block are stored
pub data_dir: PathBuf, pub data_dir: PathBuf,
/// State store (only used by scrub worker to store time of last scrub)
pub(crate) state_variables_store: db::Tree,
compression_level: Option<i32>, compression_level: Option<i32>,
background_tranquility: u32, background_tranquility: u32,
@ -102,7 +100,7 @@ pub struct BlockManager {
resync_notify: Notify, resync_notify: Notify,
resync_errors: CountedTree, resync_errors: CountedTree,
system: Arc<System>, pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>, endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics, metrics: BlockManagerMetrics,
@ -147,10 +145,6 @@ impl BlockManager {
let resync_errors = let resync_errors =
CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors"); CountedTree::new(resync_errors).expect("Could not count block_local_resync_errors");
let state_variables_store = db
.open_tree("state_variables")
.expect("Unable to open state_variables tree");
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint("garage_block/manager.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
@ -169,7 +163,6 @@ impl BlockManager {
resync_queue, resync_queue,
resync_notify: Notify::new(), resync_notify: Notify::new(),
resync_errors, resync_errors,
state_variables_store,
system, system,
endpoint, endpoint,
metrics, metrics,
@ -514,7 +507,7 @@ impl BlockManager {
// Launch a background worker for data store scrubs // Launch a background worker for data store scrubs
let (scrub_tx, scrub_rx) = mpsc::channel(1); let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx))); self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx, 4); let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx);
self.system.background.spawn_worker(scrub_worker); self.system.background.spawn_worker(scrub_worker);
} }

View file

@ -1,10 +1,10 @@
use core::ops::Bound; use core::ops::Bound;
use std::convert::TryInto;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::select; use tokio::select;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -13,13 +13,13 @@ use tokio::sync::watch;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*; use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer; use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*; use crate::manager::*;
const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
const TIME_LAST_COMPLETE_SCRUB: &[u8] = b"time_last_complete_scrub";
pub struct RepairWorker { pub struct RepairWorker {
manager: Arc<BlockManager>, manager: Arc<BlockManager>,
@ -139,8 +139,14 @@ pub struct ScrubWorker {
work: ScrubWorkerState, work: ScrubWorkerState,
tranquilizer: Tranquilizer, tranquilizer: Tranquilizer,
tranquility: u32,
persister: Persister<ScrubWorkerPersisted>,
persisted: ScrubWorkerPersisted,
}
#[derive(Serialize, Deserialize)]
struct ScrubWorkerPersisted {
tranquility: u32,
time_last_complete_scrub: u64, time_last_complete_scrub: u64,
} }
@ -156,6 +162,7 @@ impl Default for ScrubWorkerState {
} }
} }
#[derive(Debug)]
pub enum ScrubWorkerCommand { pub enum ScrubWorkerCommand {
Start, Start,
Pause(Duration), Pause(Duration),
@ -165,30 +172,26 @@ pub enum ScrubWorkerCommand {
} }
impl ScrubWorker { impl ScrubWorker {
pub fn new( pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
manager: Arc<BlockManager>, let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>, let persisted = match persister.load() {
tranquility: u32, Ok(v) => v,
) -> Self { Err(_) => ScrubWorkerPersisted {
let time_last_complete_scrub = match manager time_last_complete_scrub: 0,
.state_variables_store tranquility: 4,
.get(TIME_LAST_COMPLETE_SCRUB) },
.expect("DB error when initializing scrub worker")
{
Some(v) => u64::from_be_bytes(v.try_into().unwrap()),
None => 0,
}; };
Self { Self {
manager, manager,
rx_cmd, rx_cmd,
work: ScrubWorkerState::Finished, work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30), tranquilizer: Tranquilizer::new(30),
tranquility, persister,
time_last_complete_scrub, persisted,
} }
} }
fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) { async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
match cmd { match cmd {
ScrubWorkerCommand::Start => { ScrubWorkerCommand::Start => {
self.work = match std::mem::take(&mut self.work) { self.work = match std::mem::take(&mut self.work) {
@ -234,7 +237,10 @@ impl ScrubWorker {
} }
} }
ScrubWorkerCommand::SetTranquility(t) => { ScrubWorkerCommand::SetTranquility(t) => {
self.tranquility = t; self.persisted.tranquility = t;
if let Err(e) = self.persister.save_async(&self.persisted).await {
error!("Could not save new tranquilitiy value: {}", e);
}
} }
} }
} }
@ -248,13 +254,17 @@ impl Worker for ScrubWorker {
fn info(&self) -> Option<String> { fn info(&self) -> Option<String> {
match &self.work { match &self.work {
ScrubWorkerState::Running(bsi) => Some(format!("{:.2}% done", bsi.progress() * 100.)), ScrubWorkerState::Running(bsi) => Some(format!(
"{:.2}% done (tranquility = {})",
bsi.progress() * 100.,
self.persisted.tranquility
)),
ScrubWorkerState::Paused(_bsi, rt) => { ScrubWorkerState::Paused(_bsi, rt) => {
Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt))) Some(format!("Paused, resumes at {}", msec_to_rfc3339(*rt)))
} }
ScrubWorkerState::Finished => Some(format!( ScrubWorkerState::Finished => Some(format!(
"Last completed scrub: {}", "Last completed scrub: {}",
msec_to_rfc3339(self.time_last_complete_scrub) msec_to_rfc3339(self.persisted.time_last_complete_scrub)
)), )),
} }
} }
@ -264,7 +274,7 @@ impl Worker for ScrubWorker {
_must_exit: &mut watch::Receiver<bool>, _must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> { ) -> Result<WorkerStatus, Error> {
match self.rx_cmd.try_recv() { match self.rx_cmd.try_recv() {
Ok(cmd) => self.handle_cmd(cmd), Ok(cmd) => self.handle_cmd(cmd).await,
Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done), Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerStatus::Done),
Err(mpsc::error::TryRecvError::Empty) => (), Err(mpsc::error::TryRecvError::Empty) => (),
}; };
@ -274,13 +284,12 @@ impl Worker for ScrubWorker {
self.tranquilizer.reset(); self.tranquilizer.reset();
if let Some(hash) = bsi.next().await? { if let Some(hash) = bsi.next().await? {
let _ = self.manager.read_block(&hash).await; let _ = self.manager.read_block(&hash).await;
Ok(self.tranquilizer.tranquilize_worker(self.tranquility)) Ok(self
.tranquilizer
.tranquilize_worker(self.persisted.tranquility))
} else { } else {
self.time_last_complete_scrub = now_msec(); // TODO save to file self.persisted.time_last_complete_scrub = now_msec();
self.manager.state_variables_store.insert( self.persister.save_async(&self.persisted).await?;
TIME_LAST_COMPLETE_SCRUB,
u64::to_be_bytes(self.time_last_complete_scrub),
)?;
self.work = ScrubWorkerState::Finished; self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear(); self.tranquilizer.clear();
Ok(WorkerStatus::Idle) Ok(WorkerStatus::Idle)
@ -294,23 +303,35 @@ impl Worker for ScrubWorker {
match &self.work { match &self.work {
ScrubWorkerState::Running(_) => return WorkerStatus::Busy, ScrubWorkerState::Running(_) => return WorkerStatus::Busy,
ScrubWorkerState::Paused(_, resume_time) => { ScrubWorkerState::Paused(_, resume_time) => {
let delay = Duration::from_millis(resume_time - now_msec()); let now = now_msec();
if now >= *resume_time {
self.handle_cmd(ScrubWorkerCommand::Resume).await;
return WorkerStatus::Busy;
}
let delay = Duration::from_millis(*resume_time - now);
select! { select! {
_ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume), _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Resume).await,
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd); self.handle_cmd(cmd).await;
} else { } else {
return WorkerStatus::Done; return WorkerStatus::Done;
} }
} }
} }
ScrubWorkerState::Finished => { ScrubWorkerState::Finished => {
let now = now_msec();
if now - self.persisted.time_last_complete_scrub
>= SCRUB_INTERVAL.as_millis() as u64
{
self.handle_cmd(ScrubWorkerCommand::Start).await;
return WorkerStatus::Busy;
}
let delay = SCRUB_INTERVAL let delay = SCRUB_INTERVAL
- Duration::from_secs(now_msec() - self.time_last_complete_scrub); - Duration::from_millis(now - self.persisted.time_last_complete_scrub);
select! { select! {
_ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start), _ = tokio::time::sleep(delay) => self.handle_cmd(ScrubWorkerCommand::Start).await,
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd { cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd); self.handle_cmd(cmd).await;
} else { } else {
return WorkerStatus::Done; return WorkerStatus::Done;
} }

View file

@ -263,7 +263,7 @@ pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
continue; continue;
} }
table.push(format!("{}\t{:?}\t{}", tid, info.status, info.name)); table.push(format!("{}\t{}\t{}", tid, info.status, info.name));
if let Some(i) = &info.info { if let Some(i) = &info.info {
table.push(format!("\t\t {}", i)); table.push(format!("\t\t {}", i));
} }

View file

@ -46,7 +46,6 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
)); ));
} }
RepairWhat::Scrub { cmd } => { RepairWhat::Scrub { cmd } => {
info!("Verifying integrity of stored blocks");
let cmd = match cmd { let cmd = match cmd {
ScrubCmd::Start => ScrubWorkerCommand::Start, ScrubCmd::Start => ScrubWorkerCommand::Start,
ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
@ -56,6 +55,7 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
ScrubWorkerCommand::SetTranquility(tranquility) ScrubWorkerCommand::SetTranquility(tranquility)
} }
}; };
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await; garage.block_manager.send_scrub_command(cmd).await;
} }
} }

View file

@ -2,7 +2,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::io::{Read, Write}; use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::Path; use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -104,6 +104,9 @@ pub struct System {
/// The job runner of this node /// The job runner of this node
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
/// Path to metadata directory (usefull)
pub metadata_dir: PathBuf,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
@ -295,6 +298,7 @@ impl System {
ring, ring,
update_ring: Mutex::new(update_ring), update_ring: Mutex::new(update_ring),
background, background,
metadata_dir: config.metadata_dir.clone(),
}); });
sys.system_endpoint.set_handler(sys.clone()); sys.system_endpoint.set_handler(sys.clone());
sys sys

View file

@ -15,7 +15,7 @@ use crate::background::WorkerInfo;
use crate::error::Error; use crate::error::Error;
use crate::time::now_msec; use crate::time::now_msec;
#[derive(PartialEq, Copy, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerStatus { pub enum WorkerStatus {
Busy, Busy,
Throttled(f32), Throttled(f32),
@ -23,6 +23,17 @@ pub enum WorkerStatus {
Done, Done,
} }
impl std::fmt::Display for WorkerStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerStatus::Busy => write!(f, "Busy"),
WorkerStatus::Throttled(t) => write!(f, "Thr:{:.3}", t),
WorkerStatus::Idle => write!(f, "Idle"),
WorkerStatus::Done => write!(f, "Done"),
}
}
}
#[async_trait] #[async_trait]
pub trait Worker: Send { pub trait Worker: Send {
fn name(&self) -> String; fn name(&self) -> String;