Add tranquilizer mechanism to improve on token bucket mechanism
continuous-integration/drone/pr Build is passing Details
continuous-integration/drone/push Build is passing Details

This commit is contained in:
Alex 2021-11-03 18:28:43 +01:00
parent 6f13d083ab
commit 2090a6187f
No known key found for this signature in database
GPG Key ID: EDABF9711E244EB1
5 changed files with 95 additions and 30 deletions

View File

@ -285,9 +285,9 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
/// Limit on i/o speed, in B/s
#[structopt(name = "limit")]
limit: Option<usize>,
/// Tranquility factor (see tranquilizer documentation)
#[structopt(name = "tranquility", default_value = "2")]
tranquility: u32,
},
}

View File

@ -51,11 +51,11 @@ impl Repair {
.repair_data_store(&must_exit)
.await?;
}
RepairWhat::Scrub { limit } => {
RepairWhat::Scrub { tranquility } => {
info!("Verifying integrity of stored blocks");
self.garage
.block_manager
.scrub_data_store(&must_exit, limit)
.scrub_data_store(&must_exit, tranquility)
.await?;
}
}

View File

@ -14,7 +14,7 @@ use tokio::sync::{watch, Mutex, Notify};
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
use garage_util::token_bucket::TokenBucket;
use garage_util::tranquilizer::Tranquilizer;
use garage_rpc::system::System;
use garage_rpc::*;
@ -29,6 +29,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
pub const BACKGROUND_TRANQUILITY: u32 = 3;
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(42);
const BLOCK_GC_TIMEOUT: Duration = Duration::from_secs(60);
@ -214,24 +215,15 @@ impl BlockManager {
pub async fn scrub_data_store(
&self,
must_exit: &watch::Receiver<bool>,
speed_limit: Option<usize>,
tranquility: u32,
) -> Result<(), Error> {
let token_bucket = speed_limit.map(|rate| TokenBucket::new(rate as u64));
let tranquilizer = Tranquilizer::new(30);
self.for_each_file(
token_bucket,
move |mut token_bucket, hash| {
async move {
let len = match self.read_block(&hash).await {
Ok(BlockRpc::PutBlock(PutBlockMessage { data, .. })) => data.len(),
Ok(_) => unreachable!(),
Err(_) => 0, // resync and warn message made by read_block if necessary
};
if let Some(tb) = &mut token_bucket {
tb.take(len as u64).await;
}
Ok(token_bucket)
}
tranquilizer,
move |mut tranquilizer, hash| async move {
let _ = self.read_block(&hash).await;
tranquilizer.tranquilize(tranquility).await;
Ok(tranquilizer)
},
must_exit,
)
@ -381,18 +373,32 @@ impl BlockManager {
}
async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
let mut tranquilizer = Tranquilizer::new(30);
while !*must_exit.borrow() {
if let Err(e) = self.resync_iter(&mut must_exit).await {
warn!("Error in block resync loop: {}", e);
select! {
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {},
_ = must_exit.changed().fuse() => {},
match self.resync_iter(&mut must_exit).await {
Ok(true) => {
tranquilizer.tranquilize(BACKGROUND_TRANQUILITY).await;
}
Ok(false) => {
tranquilizer.reset();
}
Err(e) => {
// The errors that we have here are only Sled errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
// (there is kind of an assumption that Sled won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
error!(
"Could not do a resync iteration: {} (this is a very bad error)",
e
);
tranquilizer.reset();
}
}
}
}
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<(), Error> {
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_be_bytes(&time_bytes[0..8]);
let now = now_msec();
@ -403,7 +409,7 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?;
}
res?; // propagate error to delay main loop
Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
@ -412,14 +418,15 @@ impl BlockManager {
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
Ok(false)
}
} else {
select! {
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
Ok(false)
}
Ok(())
}
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {

View File

@ -10,3 +10,4 @@ pub mod error;
pub mod persister;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;

57
src/util/tranquilizer.rs Normal file
View File

@ -0,0 +1,57 @@
use std::collections::VecDeque;
use std::time::{Duration, Instant};
use tokio::time::sleep;
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
/// Background operations are done in a loop that does the following:
/// - do one step of the background process
/// - tranquilize, i.e. wait some time to not overload the system
///
/// The tranquilizer observes how long the steps take, and keeps
/// in memory a number of observations. The tranquilize operation
/// simply sleeps k * avg(observed step times), where k is
/// the tranquility factor. For instance with a tranquility of 2,
/// the tranquilizer will sleep on average 2 units of time for every
/// 1 unit of time spent doing the background task.
pub struct Tranquilizer {
n_observations: usize,
observations: VecDeque<Duration>,
sum_observations: Duration,
last_step_begin: Instant,
}
impl Tranquilizer {
pub fn new(n_observations: usize) -> Self {
Self {
n_observations,
observations: VecDeque::with_capacity(n_observations + 1),
sum_observations: Duration::ZERO,
last_step_begin: Instant::now(),
}
}
pub async fn tranquilize(&mut self, tranquility: u32) {
let observation = Instant::now() - self.last_step_begin;
self.observations.push_back(observation);
self.sum_observations += observation;
while self.observations.len() > self.n_observations {
self.sum_observations -= self.observations.pop_front().unwrap();
}
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
sleep(delay).await;
}
self.reset();
}
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
}