diff --git a/src/block/manager.rs b/src/block/manager.rs index 62ef96b98..9240db255 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -125,13 +125,11 @@ impl BlockManager { }); block_manager.endpoint.set_handler(block_manager.clone()); - // Spawn one resync worker - let background = block_manager.system.background.clone(); - let worker = ResyncWorker::new(block_manager.clone()); - tokio::spawn(async move { - tokio::time::sleep(Duration::from_secs(10)).await; - background.spawn_worker(worker); - }); + // Spawn a bunch of resync workers + for index in 0..MAX_RESYNC_WORKERS { + let worker = ResyncWorker::new(index, block_manager.clone()); + block_manager.system.background.spawn_worker(worker); + } // Spawn scrub worker let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx); diff --git a/src/block/resync.rs b/src/block/resync.rs index dab083380..0f358d48b 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -1,5 +1,6 @@ +use std::collections::HashSet; use std::convert::TryInto; -use std::sync::Arc; +use std::sync::{Arc, Mutex}; use std::time::Duration; use arc_swap::ArcSwap; @@ -44,6 +45,9 @@ pub(crate) const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); // The minimum retry delay is 60 seconds = 1 minute // The maximum retry delay is 60 seconds * 2^6 = 60 seconds << 6 = 64 minutes (~1 hour) pub(crate) const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; + +// No more than 4 resync workers can be running in the system +pub(crate) const MAX_RESYNC_WORKERS: usize = 4; // Resync tranquility is initially set to 2, but can be changed in the CLI // and the updated version is persisted over Garage restarts const INITIAL_RESYNC_TRANQUILITY: u32 = 2; @@ -53,12 +57,15 @@ pub struct BlockResyncManager { pub(crate) notify: Notify, pub(crate) errors: CountedTree, + busy_set: BusySet, + persister: Persister, persisted: ArcSwap, } #[derive(Serialize, Deserialize, Clone, Copy)] struct ResyncPersistedConfig { + n_workers: usize, tranquility: u32, } @@ -68,6 +75,14 @@ enum ResyncIterResult { IdleFor(Duration), } +type BusySet = Arc>>>; + +struct BusyBlock { + time_bytes: Vec, + hash_bytes: Vec, + busy_set: BusySet, +} + impl BlockResyncManager { pub(crate) fn new(db: &db::Db, system: &System) -> Self { let queue = db @@ -84,6 +99,7 @@ impl BlockResyncManager { let persisted = match persister.load() { Ok(v) => v, Err(_) => ResyncPersistedConfig { + n_workers: 1, tranquility: INITIAL_RESYNC_TRANQUILITY, }, }; @@ -92,6 +108,7 @@ impl BlockResyncManager { queue, notify: Notify::new(), errors, + busy_set: Arc::new(Mutex::new(HashSet::new())), persister, persisted: ArcSwap::new(Arc::new(persisted)), } @@ -199,12 +216,12 @@ impl BlockResyncManager { } async fn resync_iter(&self, manager: &BlockManager) -> Result { - if let Some((time_bytes, hash_bytes)) = self.queue.first()? { - let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); + if let Some(block) = self.get_block_to_resync()? { + let time_msec = u64::from_be_bytes(block.time_bytes[0..8].try_into().unwrap()); let now = now_msec(); if now >= time_msec { - let hash = Hash::try_from(&hash_bytes[..]).unwrap(); + let hash = Hash::try_from(&block.hash_bytes[..]).unwrap(); if let Some(ec) = self.errors.get(hash.as_slice())? { let ec = ErrorCounter::decode(&ec); @@ -217,7 +234,7 @@ impl BlockResyncManager { // is not removing the one we added just above // (we want to do the remove after the insert to ensure // that the item is not lost if we crash in-between) - self.queue.remove(time_bytes)?; + self.queue.remove(&block.time_bytes)?; return Ok(ResyncIterResult::BusyDidNothing); } } @@ -258,10 +275,10 @@ impl BlockResyncManager { // err_counter.next_try() >= now + 1 > now, // the entry we remove from the queue is not // the entry we inserted with put_to_resync_at - self.queue.remove(time_bytes)?; + self.queue.remove(&block.time_bytes)?; } else { self.errors.remove(hash.as_slice())?; - self.queue.remove(time_bytes)?; + self.queue.remove(&block.time_bytes)?; } Ok(ResyncIterResult::BusyDidSomething) @@ -281,6 +298,22 @@ impl BlockResyncManager { } } + fn get_block_to_resync(&self) -> Result, db::Error> { + let mut busy = self.busy_set.lock().unwrap(); + for it in self.queue.iter()? { + let (time_bytes, hash_bytes) = it?; + if !busy.contains(&time_bytes) { + busy.insert(time_bytes.clone()); + return Ok(Some(BusyBlock { + time_bytes, + hash_bytes, + busy_set: self.busy_set.clone(), + })); + } + } + return Ok(None); + } + async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> { let BlockStatus { exists, needed } = manager.check_block_status(hash).await?; @@ -394,25 +427,44 @@ impl BlockResyncManager { update(&mut cfg); self.persister.save_async(&cfg).await?; self.persisted.store(Arc::new(cfg)); - self.notify.notify_one(); + self.notify.notify_waiters(); Ok(()) } + pub async fn set_n_workers(&self, n_workers: usize) -> Result<(), Error> { + if n_workers < 1 || n_workers > MAX_RESYNC_WORKERS { + return Err(Error::Message(format!( + "Invalid number of resync workers, must be between 1 and {}", + MAX_RESYNC_WORKERS + ))); + } + self.update_persisted(|cfg| cfg.n_workers = n_workers).await + } + pub async fn set_tranquility(&self, tranquility: u32) -> Result<(), Error> { self.update_persisted(|cfg| cfg.tranquility = tranquility) .await } } +impl Drop for BusyBlock { + fn drop(&mut self) { + let mut busy = self.busy_set.lock().unwrap(); + busy.remove(&self.time_bytes); + } +} + pub(crate) struct ResyncWorker { + index: usize, manager: Arc, tranquilizer: Tranquilizer, next_delay: Duration, } impl ResyncWorker { - pub(crate) fn new(manager: Arc) -> Self { + pub(crate) fn new(index: usize, manager: Arc) -> Self { Self { + index, manager, tranquilizer: Tranquilizer::new(30), next_delay: Duration::from_secs(10), @@ -423,15 +475,18 @@ impl ResyncWorker { #[async_trait] impl Worker for ResyncWorker { fn name(&self) -> String { - "Block resync worker".into() + format!("Block resync worker #{}", self.index + 1) } fn info(&self) -> Option { + let persisted = self.manager.resync.persisted.load(); + + if self.index >= persisted.n_workers { + return Some("(unused)".into()); + } + let mut ret = vec![]; - ret.push(format!( - "tranquility = {}", - self.manager.resync.persisted.load().tranquility - )); + ret.push(format!("tranquility = {}", persisted.tranquility)); let qlen = self.manager.resync.queue_len().unwrap_or(0); if qlen > 0 { @@ -447,6 +502,10 @@ impl Worker for ResyncWorker { } async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + if self.index >= self.manager.resync.persisted.load().n_workers { + return Ok(WorkerState::Idle); + } + self.tranquilizer.reset(); match self.manager.resync.resync_iter(&self.manager).await { Ok(ResyncIterResult::BusyDidSomething) => Ok(self @@ -470,10 +529,15 @@ impl Worker for ResyncWorker { } async fn wait_for_work(&mut self, _must_exit: &watch::Receiver) -> WorkerState { + while self.index >= self.manager.resync.persisted.load().n_workers { + self.manager.resync.notify.notified().await + } + select! { _ = tokio::time::sleep(self.next_delay) => (), _ = self.manager.resync.notify.notified() => (), }; + WorkerState::Busy } } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index 9f4764df2..762610506 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -847,6 +847,14 @@ impl AdminRpcHandler { .await; Ok(AdminRpc::Ok("Scrub tranquility updated".into())) } + WorkerSetCmd::ResyncNWorkers { n_workers } => { + self.garage + .block_manager + .resync + .set_n_workers(n_workers) + .await?; + Ok(AdminRpc::Ok("Number of resync workers updated".into())) + } WorkerSetCmd::ResyncTranquility { tranquility } => { self.garage .block_manager diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 1fba934f0..0388cef5d 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -524,7 +524,10 @@ pub enum WorkerSetCmd { /// Set tranquility of scrub operations #[structopt(name = "scrub-tranquility", version = version::garage())] ScrubTranquility { tranquility: u32 }, - /// Set tranquility of resync operations + /// Set number of concurrent block resync workers + #[structopt(name = "resync-n-workers", version = version::garage())] + ResyncNWorkers { n_workers: usize }, + /// Set tranquility of block resync operations #[structopt(name = "resync-tranquility", version = version::garage())] ResyncTranquility { tranquility: u32 }, }