Background task manager #332

Merged
lx merged 35 commits from background-task-manager into main 2022-07-08 11:30:32 +00:00
8 changed files with 237 additions and 184 deletions
Showing only changes of commit ba1ace6cf6 - Show all commits

1
Cargo.lock generated
View file

@ -959,6 +959,7 @@ dependencies = [
"futures",
"futures-util",
"garage_api",
"garage_block",
"garage_db",
"garage_model 0.7.0",
"garage_rpc 0.7.0",

View file

@ -2,6 +2,7 @@
extern crate tracing;
pub mod manager;
pub mod repair;
mod block;
mod metrics;

View file

@ -1,7 +1,5 @@
use core::ops::Bound;
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
@ -94,7 +92,7 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
rc: BlockRc,
pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
@ -225,90 +223,6 @@ impl BlockManager {
Ok(())
}
/// Launch the repair procedure on the data store
///
/// This will list all blocks locally present, as well as those
/// that are required because of refcount > 0, and will try
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
let mut next_start: Option<Hash> = None;
loop {
// We have to do this complicated two-step process where we first read a bunch
// of hashes from the RC table, and then insert them in the to-resync queue,
// because of SQLite. Basically, as long as we have an iterator on a DB table,
// we can't do anything else on the DB. The naive approach (which we had previously)
// of just iterating on the RC table and inserting items one to one in the resync
// queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
// This is mostly because the Rust bindings for SQLite assume a worst-case scenario
// where SQLite is not compiled in thread-safe mode, so we have to wrap everything
// in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
let mut batch_of_hashes = vec![];
let start_bound = match next_start.as_ref() {
None => Bound::Unbounded,
Some(x) => Bound::Excluded(x.as_slice()),
};
for entry in self
.rc
.rc
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
batch_of_hashes.push(hash);
if batch_of_hashes.len() >= 1000 {
break;
}
}
if batch_of_hashes.is_empty() {
break;
}
for hash in batch_of_hashes.into_iter() {
self.put_to_resync(&hash, Duration::from_secs(0))?;
next_start = Some(hash)
}
if *must_exit.borrow() {
return Ok(());
}
}
// 2. Repair blocks actually on disk
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
self.for_each_file(
(),
move |_, hash| async move {
self.put_to_resync(&hash, Duration::from_secs(0))
.map_err(Into::into)
},
must_exit,
)
.await
}
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
/// this function.
pub async fn scrub_data_store(
&self,
must_exit: &watch::Receiver<bool>,
tranquility: u32,
) -> Result<(), Error> {
let tranquilizer = Tranquilizer::new(30);
self.for_each_file(
tranquilizer,
move |mut tranquilizer, hash| async move {
let _ = self.read_block(&hash).await;
tranquilizer.tranquilize(tranquility).await;
Ok(tranquilizer)
},
must_exit,
)
.await
}
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@ -397,7 +311,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@ -575,7 +489,7 @@ impl BlockManager {
});
}
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@ -784,72 +698,6 @@ impl BlockManager {
Ok(())
}
// ---- Utility: iteration on files in the data directory ----
async fn for_each_file<F, Fut, State>(
&self,
state: State,
mut f: F,
must_exit: &watch::Receiver<bool>,
) -> Result<(), Error>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send,
{
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
.await
.map(|_| ())
}
fn for_each_file_rec<'a, F, Fut, State>(
&'a self,
path: &'a Path,
mut state: State,
f: &'a mut F,
must_exit: &'a watch::Receiver<bool>,
) -> BoxFuture<'a, Result<State, Error>>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send + 'a,
{
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
if *must_exit.borrow() {
break;
}
let name = data_dir_ent.file_name();
let name = if let Ok(n) = name.into_string() {
n
} else {
continue;
};
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
state = self
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
.await?;
} else if name.len() == 64 {
let hash_bytes = if let Ok(h) = hex::decode(&name) {
h
} else {
continue;
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
state = f(state, hash.into()).await?;
}
}
Ok(state)
}
.boxed()
}
}
#[async_trait]

204
src/block/repair.rs Normal file
View file

@ -0,0 +1,204 @@
use core::ops::Bound;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::fs;
use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
pub struct RepairWorker {
manager: Arc<BlockManager>,
next_start: Option<Hash>,
block_iter: Option<BlockStoreIterator>,
}
impl RepairWorker {
pub fn new(manager: Arc<BlockManager>) -> Self {
Self {
manager,
next_start: None,
block_iter: None,
}
}
}
#[async_trait]
impl Worker for RepairWorker {
fn name(&self) -> String {
"Block repair worker".into()
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> {
match self.block_iter.as_mut() {
None => {
// Phase 1: Repair blocks from RC table.
// We have to do this complicated two-step process where we first read a bunch
// of hashes from the RC table, and then insert them in the to-resync queue,
// because of SQLite. Basically, as long as we have an iterator on a DB table,
// we can't do anything else on the DB. The naive approach (which we had previously)
// of just iterating on the RC table and inserting items one to one in the resync
// queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
// This is mostly because the Rust bindings for SQLite assume a worst-case scenario
// where SQLite is not compiled in thread-safe mode, so we have to wrap everything
// in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
let mut batch_of_hashes = vec![];
let start_bound = match self.next_start.as_ref() {
None => Bound::Unbounded,
Some(x) => Bound::Excluded(x.as_slice()),
};
for entry in self
.manager
.rc
.rc
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
batch_of_hashes.push(hash);
if batch_of_hashes.len() >= 1000 {
break;
}
}
if batch_of_hashes.is_empty() {
// move on to phase 2
self.block_iter = Some(BlockStoreIterator::new(&self.manager).await?);
return Ok(WorkerStatus::Busy);
}
for hash in batch_of_hashes.into_iter() {
self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
self.next_start = Some(hash)
}
Ok(WorkerStatus::Busy)
}
Some(bi) => {
// Phase 2: Repair blocks actually on disk
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
if let Some(hash) = bi.next().await? {
self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
Ok(WorkerStatus::Busy)
} else {
Ok(WorkerStatus::Done)
}
}
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
unreachable!()
}
}
// ----
pub struct ScrubWorker {
manager: Arc<BlockManager>,
iterator: BlockStoreIterator,
tranquilizer: Tranquilizer,
tranquility: u32,
}
impl ScrubWorker {
pub async fn new(manager: Arc<BlockManager>, tranquility: u32) -> Result<Self, Error> {
let iterator = BlockStoreIterator::new(&manager).await?;
Ok(Self {
manager,
iterator,
tranquilizer: Tranquilizer::new(30),
tranquility,
})
}
}
#[async_trait]
impl Worker for ScrubWorker {
fn name(&self) -> String {
"Block scrub worker".into()
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> {
self.tranquilizer.reset();
if let Some(hash) = self.iterator.next().await? {
let _ = self.manager.read_block(&hash).await;
self.tranquilizer.tranquilize(self.tranquility).await;
Ok(WorkerStatus::Busy)
} else {
Ok(WorkerStatus::Done)
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
unreachable!()
}
}
// ----
struct BlockStoreIterator {
path: Vec<fs::ReadDir>,
}
impl BlockStoreIterator {
async fn new(manager: &BlockManager) -> Result<Self, Error> {
let root_dir = manager.data_dir.clone();
let read_root_dir = fs::read_dir(&root_dir).await?;
Ok(Self {
path: vec![read_root_dir],
})
}
async fn next(&mut self) -> Result<Option<Hash>, Error> {
loop {
if let Some(reader) = self.path.last_mut() {
if let Some(data_dir_ent) = reader.next_entry().await? {
let name = data_dir_ent.file_name();
let name = if let Ok(n) = name.into_string() {
n
} else {
continue;
};
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
let read_child_dir = fs::read_dir(&data_dir_ent.path()).await?;
self.path.push(read_child_dir);
continue;
} else if name.len() == 64 {
let hash_bytes = if let Ok(h) = hex::decode(&name) {
h
} else {
continue;
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
return Ok(Some(hash.into()));
}
} else {
self.path.pop();
continue;
}
} else {
return Ok(None);
}
}
}
}

View file

@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
garage_block = { version = "0.7.0", path = "../block" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }

View file

@ -693,7 +693,7 @@ impl AdminRpcHandler {
)))
}
} else {
launch_online_repair(self.garage.clone(), opt)?;
launch_online_repair(self.garage.clone(), opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id

View file

@ -13,7 +13,7 @@ use garage_util::error::Error;
use crate::*;
pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
@ -36,24 +36,19 @@ pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), E
.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
}
RepairWhat::Blocks => {
unimplemented!()
/*
info!("Repairing the stored blocks");
self.garage
.block_manager
.repair_data_store(&must_exit)
.await?;
*/
garage
.background
.spawn_worker(garage_block::repair::RepairWorker::new(
garage.block_manager.clone(),
));
}
RepairWhat::Scrub { tranquility } => {
unimplemented!()
/*
info!("Verifying integrity of stored blocks");
self.garage
.block_manager
.scrub_data_store(&must_exit, tranquility)
.await?;
*/
garage.background.spawn_worker(
garage_block::repair::ScrubWorker::new(garage.block_manager.clone(), tranquility)
.await?,
);
}
}
Ok(())
@ -64,7 +59,7 @@ pub fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), E
struct RepairVersionsWorker {
garage: Arc<Garage>,
pos: Vec<u8>,
iter: usize,
counter: usize,
}
impl RepairVersionsWorker {
@ -72,7 +67,7 @@ impl RepairVersionsWorker {
Self {
garage,
pos: vec![],
iter: 0,
counter: 0,
}
}
}
@ -93,14 +88,14 @@ impl Worker for RepairVersionsWorker {
v
}
None => {
info!("repair_versions: finished, done {}", self.iter);
info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerStatus::Done);
}
};
self.iter += 1;
if self.iter % 1000 == 0 {
info!("repair_versions: {}", self.iter);
self.counter += 1;
if self.counter % 1000 == 0 {
info!("repair_versions: {}", self.counter);
}
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
@ -144,7 +139,7 @@ impl Worker for RepairVersionsWorker {
struct RepairBlockrefsWorker {
garage: Arc<Garage>,
pos: Vec<u8>,
iter: usize,
counter: usize,
}
impl RepairBlockrefsWorker {
@ -152,7 +147,7 @@ impl RepairBlockrefsWorker {
Self {
garage,
pos: vec![],
iter: 0,
counter: 0,
}
}
}
@ -173,14 +168,14 @@ impl Worker for RepairBlockrefsWorker {
v
}
None => {
info!("repair_block_ref: finished, done {}", self.iter);
info!("repair_block_ref: finished, done {}", self.counter);
return Ok(WorkerStatus::Done);
}
};
self.iter += 1;
if self.iter % 1000 == 0 {
info!("repair_block_ref: {}", self.iter);
self.counter += 1;
if self.counter % 1000 == 0 {
info!("repair_block_ref: {}", self.counter);
}
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;

View file

@ -159,6 +159,9 @@ impl WorkerHandler {
self.task_id,
e
);
// Sleep a bit so that error won't repeat immediately
// (TODO good way to handle errors)
tokio::time::sleep(Duration::from_secs(10)).await;
}
},
WorkerStatus::Idle => {