Improvements to block resync queue & worker
This commit is contained in:
parent
2556a1e383
commit
897fafa8db
1 changed files with 47 additions and 21 deletions
60
src/block.rs
60
src/block.rs
|
@ -4,11 +4,12 @@ use std::time::Duration;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
|
use futures::select;
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::prelude::*;
|
use tokio::prelude::*;
|
||||||
use tokio::sync::{watch, Mutex};
|
use tokio::sync::{watch, Mutex, Notify};
|
||||||
|
|
||||||
use crate::data;
|
use crate::data;
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
@ -47,9 +48,13 @@ impl RpcMessage for Message {}
|
||||||
|
|
||||||
pub struct BlockManager {
|
pub struct BlockManager {
|
||||||
pub data_dir: PathBuf,
|
pub data_dir: PathBuf,
|
||||||
|
pub data_dir_lock: Mutex<()>,
|
||||||
|
|
||||||
pub rc: sled::Tree,
|
pub rc: sled::Tree,
|
||||||
|
|
||||||
pub resync_queue: sled::Tree,
|
pub resync_queue: sled::Tree,
|
||||||
pub lock: Mutex<()>,
|
pub resync_notify: Notify,
|
||||||
|
|
||||||
pub system: Arc<System>,
|
pub system: Arc<System>,
|
||||||
rpc_client: Arc<RpcClient<Message>>,
|
rpc_client: Arc<RpcClient<Message>>,
|
||||||
pub garage: ArcSwapOption<Garage>,
|
pub garage: ArcSwapOption<Garage>,
|
||||||
|
@ -75,10 +80,11 @@ impl BlockManager {
|
||||||
let rpc_client = system.rpc_client::<Message>(rpc_path);
|
let rpc_client = system.rpc_client::<Message>(rpc_path);
|
||||||
|
|
||||||
let block_manager = Arc::new(Self {
|
let block_manager = Arc::new(Self {
|
||||||
|
data_dir,
|
||||||
|
data_dir_lock: Mutex::new(()),
|
||||||
rc,
|
rc,
|
||||||
resync_queue,
|
resync_queue,
|
||||||
data_dir,
|
resync_notify: Notify::new(),
|
||||||
lock: Mutex::new(()),
|
|
||||||
system,
|
system,
|
||||||
rpc_client,
|
rpc_client,
|
||||||
garage: ArcSwapOption::from(None),
|
garage: ArcSwapOption::from(None),
|
||||||
|
@ -109,17 +115,20 @@ impl BlockManager {
|
||||||
// Launch 2 simultaneous workers for background resync loop preprocessing
|
// Launch 2 simultaneous workers for background resync loop preprocessing
|
||||||
for i in 0..2usize {
|
for i in 0..2usize {
|
||||||
let bm2 = self.clone();
|
let bm2 = self.clone();
|
||||||
self.system
|
let background = self.system.background.clone();
|
||||||
.background
|
tokio::spawn(async move {
|
||||||
|
tokio::time::delay_for(Duration::from_secs(10)).await;
|
||||||
|
background
|
||||||
.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
|
.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
|
||||||
bm2.resync_loop(must_exit)
|
bm2.resync_loop(must_exit)
|
||||||
})
|
})
|
||||||
.await;
|
.await;
|
||||||
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
|
||||||
let _lock = self.lock.lock().await;
|
let _lock = self.data_dir_lock.lock().await;
|
||||||
|
|
||||||
let mut path = self.block_dir(hash);
|
let mut path = self.block_dir(hash);
|
||||||
fs::create_dir_all(&path).await?;
|
fs::create_dir_all(&path).await?;
|
||||||
|
@ -152,7 +161,7 @@ impl BlockManager {
|
||||||
drop(f);
|
drop(f);
|
||||||
|
|
||||||
if data::hash(&data[..]) != *hash {
|
if data::hash(&data[..]) != *hash {
|
||||||
let _lock = self.lock.lock().await;
|
let _lock = self.data_dir_lock.lock().await;
|
||||||
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
|
warn!("Block {:?} is corrupted. Deleting and resyncing.", hash);
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
self.put_to_resync(&hash, 0)?;
|
self.put_to_resync(&hash, 0)?;
|
||||||
|
@ -212,19 +221,20 @@ impl BlockManager {
|
||||||
let mut key = u64::to_be_bytes(when).to_vec();
|
let mut key = u64::to_be_bytes(when).to_vec();
|
||||||
key.extend(hash.as_ref());
|
key.extend(hash.as_ref());
|
||||||
self.resync_queue.insert(key, hash.as_ref())?;
|
self.resync_queue.insert(key, hash.as_ref())?;
|
||||||
|
self.resync_notify.notify();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
|
async fn resync_loop(
|
||||||
|
self: Arc<Self>,
|
||||||
|
mut must_exit: watch::Receiver<bool>,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let mut n_failures = 0usize;
|
||||||
while !*must_exit.borrow() {
|
while !*must_exit.borrow() {
|
||||||
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
|
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
|
||||||
let time_msec = u64_from_bytes(&time_bytes[0..8]);
|
let time_msec = u64_from_bytes(&time_bytes[0..8]);
|
||||||
trace!(
|
let now = now_msec();
|
||||||
"First in resync queue: {} (now = {})",
|
if now >= time_msec {
|
||||||
time_msec,
|
|
||||||
now_msec()
|
|
||||||
);
|
|
||||||
if now_msec() >= time_msec {
|
|
||||||
let mut hash = [0u8; 32];
|
let mut hash = [0u8; 32];
|
||||||
hash.copy_from_slice(hash_bytes.as_ref());
|
hash.copy_from_slice(hash_bytes.as_ref());
|
||||||
let hash = Hash::from(hash);
|
let hash = Hash::from(hash);
|
||||||
|
@ -232,13 +242,29 @@ impl BlockManager {
|
||||||
if let Err(e) = self.resync_iter(&hash).await {
|
if let Err(e) = self.resync_iter(&hash).await {
|
||||||
warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
|
warn!("Failed to resync block {:?}, retrying later: {}", hash, e);
|
||||||
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
|
self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
|
||||||
|
n_failures += 1;
|
||||||
|
if n_failures >= 10 {
|
||||||
|
warn!("Too many resync failures, throttling.");
|
||||||
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
n_failures = 0;
|
||||||
}
|
}
|
||||||
continue;
|
|
||||||
} else {
|
} else {
|
||||||
self.resync_queue.insert(time_bytes, hash_bytes)?;
|
self.resync_queue.insert(time_bytes, hash_bytes)?;
|
||||||
|
let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now));
|
||||||
|
select! {
|
||||||
|
_ = delay.fuse() => (),
|
||||||
|
_ = self.resync_notify.notified().fuse() => (),
|
||||||
|
_ = must_exit.recv().fuse() => (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
select! {
|
||||||
|
_ = self.resync_notify.notified().fuse() => (),
|
||||||
|
_ = must_exit.recv().fuse() => (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tokio::time::delay_for(Duration::from_secs(1)).await;
|
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue