diff --git a/src/block.rs b/src/block.rs index bc3e92922..ea4f7c101 100644 --- a/src/block.rs +++ b/src/block.rs @@ -4,11 +4,12 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use futures::future::*; +use futures::select; use futures::stream::*; use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::prelude::*; -use tokio::sync::{watch, Mutex}; +use tokio::sync::{watch, Mutex, Notify}; use crate::data; use crate::data::*; @@ -47,9 +48,13 @@ impl RpcMessage for Message {} pub struct BlockManager { pub data_dir: PathBuf, + pub data_dir_lock: Mutex<()>, + pub rc: sled::Tree, + pub resync_queue: sled::Tree, - pub lock: Mutex<()>, + pub resync_notify: Notify, + pub system: Arc, rpc_client: Arc>, pub garage: ArcSwapOption, @@ -75,10 +80,11 @@ impl BlockManager { let rpc_client = system.rpc_client::(rpc_path); let block_manager = Arc::new(Self { + data_dir, + data_dir_lock: Mutex::new(()), rc, resync_queue, - data_dir, - lock: Mutex::new(()), + resync_notify: Notify::new(), system, rpc_client, garage: ArcSwapOption::from(None), @@ -109,17 +115,20 @@ impl BlockManager { // Launch 2 simultaneous workers for background resync loop preprocessing for i in 0..2usize { let bm2 = self.clone(); - self.system - .background - .spawn_worker(format!("block resync worker {}", i), move |must_exit| { - bm2.resync_loop(must_exit) - }) - .await; + let background = self.system.background.clone(); + tokio::spawn(async move { + tokio::time::delay_for(Duration::from_secs(10)).await; + background + .spawn_worker(format!("block resync worker {}", i), move |must_exit| { + bm2.resync_loop(must_exit) + }) + .await; + }); } } pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result { - let _lock = self.lock.lock().await; + let _lock = self.data_dir_lock.lock().await; let mut path = self.block_dir(hash); fs::create_dir_all(&path).await?; @@ -152,7 +161,7 @@ impl BlockManager { drop(f); if data::hash(&data[..]) != *hash { - let _lock = self.lock.lock().await; + let _lock = self.data_dir_lock.lock().await; warn!("Block {:?} is corrupted. Deleting and resyncing.", hash); fs::remove_file(path).await?; self.put_to_resync(&hash, 0)?; @@ -212,19 +221,20 @@ impl BlockManager { let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); self.resync_queue.insert(key, hash.as_ref())?; + self.resync_notify.notify(); Ok(()) } - async fn resync_loop(self: Arc, must_exit: watch::Receiver) -> Result<(), Error> { + async fn resync_loop( + self: Arc, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + let mut n_failures = 0usize; while !*must_exit.borrow() { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { let time_msec = u64_from_bytes(&time_bytes[0..8]); - trace!( - "First in resync queue: {} (now = {})", - time_msec, - now_msec() - ); - if now_msec() >= time_msec { + let now = now_msec(); + if now >= time_msec { let mut hash = [0u8; 32]; hash.copy_from_slice(hash_bytes.as_ref()); let hash = Hash::from(hash); @@ -232,13 +242,29 @@ impl BlockManager { if let Err(e) = self.resync_iter(&hash).await { warn!("Failed to resync block {:?}, retrying later: {}", hash, e); self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?; + n_failures += 1; + if n_failures >= 10 { + warn!("Too many resync failures, throttling."); + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } else { + n_failures = 0; } - continue; } else { self.resync_queue.insert(time_bytes, hash_bytes)?; + let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); + select! { + _ = delay.fuse() => (), + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), + } + } + } else { + select! { + _ = self.resync_notify.notified().fuse() => (), + _ = must_exit.recv().fuse() => (), } } - tokio::time::delay_for(Duration::from_secs(1)).await; } Ok(()) }