Several resync workers; add delay on retry resync

This commit is contained in:
Alex 2020-04-17 20:58:10 +02:00
parent 29a1e94f23
commit 40c48e6a59
2 changed files with 20 additions and 21 deletions

View file

@ -18,6 +18,7 @@ use crate::rpc_client::*;
use crate::server::Garage; use crate::server::Garage;
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
const RESYNC_RETRY_TIMEOUT: Duration = Duration::from_secs(10);
pub struct BlockManager { pub struct BlockManager {
pub data_dir: PathBuf, pub data_dir: PathBuf,
@ -50,11 +51,14 @@ impl BlockManager {
} }
pub async fn spawn_background_worker(self: Arc<Self>) { pub async fn spawn_background_worker(self: Arc<Self>) {
let bm2 = self.clone(); // Launch 2 simultaneous workers for background resync loop preprocessing
self.system for _i in 0..2usize {
.background let bm2 = self.clone();
.spawn_worker(move |must_exit| bm2.resync_loop(must_exit)) self.system
.await; .background
.spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
.await;
}
} }
pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> { pub async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<Message, Error> {
@ -158,7 +162,7 @@ impl BlockManager {
async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> { async fn resync_loop(self: Arc<Self>, must_exit: watch::Receiver<bool>) -> Result<(), Error> {
while !*must_exit.borrow() { while !*must_exit.borrow() {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.get_gt(&[])? { if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? {
let time_msec = u64_from_bytes(&time_bytes[0..8]); let time_msec = u64_from_bytes(&time_bytes[0..8]);
eprintln!( eprintln!(
"First in resync queue: {} (now = {})", "First in resync queue: {} (now = {})",
@ -170,18 +174,13 @@ impl BlockManager {
hash.copy_from_slice(hash_bytes.as_ref()); hash.copy_from_slice(hash_bytes.as_ref());
let hash = Hash::from(hash); let hash = Hash::from(hash);
match self.resync_iter(&hash).await { if let Err(e) = self.resync_iter(&hash).await {
Ok(_) => { eprintln!("Failed to resync block {:?}, retrying later: {}", hash, e);
self.resync_queue.remove(&time_bytes)?; self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT.as_millis() as u64)?;
}
Err(e) => {
eprintln!(
"Failed to resync hash {:?}, leaving it in queue: {}",
hash, e
);
}
} }
continue; continue;
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
} }
} }
tokio::time::delay_for(Duration::from_secs(1)).await; tokio::time::delay_for(Duration::from_secs(1)).await;

View file

@ -326,7 +326,7 @@ impl<F: TableSchema + 'static> Table<F> {
rpc: &TableRPC<F>, rpc: &TableRPC<F>,
quorum: usize, quorum: usize,
) -> Result<Vec<TableRPC<F>>, Error> { ) -> Result<Vec<TableRPC<F>>, Error> {
eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?); //eprintln!("Table RPC to {:?}: {}", who, serde_json::to_string(&rpc)?);
let rpc_bytes = rmp_to_vec_all_named(rpc)?; let rpc_bytes = rmp_to_vec_all_named(rpc)?;
let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes); let rpc_msg = Message::TableRPC(self.name.to_string(), rpc_bytes);
@ -353,10 +353,10 @@ impl<F: TableSchema + 'static> Table<F> {
resp resp
))); )));
} }
eprintln!( //eprintln!(
"Table RPC responses: {}", // "Table RPC responses: {}",
serde_json::to_string(&resps_vals)? // serde_json::to_string(&resps_vals)?
); //);
Ok(resps_vals) Ok(resps_vals)
} }