diff --git a/src/block/manager.rs b/src/block/manager.rs index 3215d27e..9cf72019 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -86,7 +86,7 @@ pub struct BlockManager { mutation_lock: Mutex, - pub rc: BlockRc, + rc: BlockRc, resync_queue: SledCountedTree, resync_notify: Notify, @@ -231,7 +231,10 @@ impl BlockManager { // so that we can offload them if necessary and then delete them locally. self.for_each_file( (), - move |_, hash| async move { self.put_to_resync(&hash, Duration::from_secs(0)) }, + move |_, hash| async move { + self.put_to_resync(&hash, Duration::from_secs(0)) + .map_err(Into::into) + }, must_exit, ) .await @@ -410,6 +413,77 @@ impl BlockManager { // ---- Resync loop ---- + // This part manages a queue of blocks that need to be + // "resynchronized", i.e. that need to have a check that + // they are at present if we need them, or that they are + // deleted once the garbage collection delay has passed. + // + // Here are some explanations on how the resync queue works. + // There are two Sled trees that are used to have information + // about the status of blocks that need to be resynchronized: + // + // - resync_queue: a tree that is ordered first by a timestamp + // (in milliseconds since Unix epoch) that is the time at which + // the resync must be done, and second by block hash. + // The key in this tree is just: + // concat(timestamp (8 bytes), hash (32 bytes)) + // The value is the same 32-byte hash. + // + // - resync_errors: a tree that indicates for each block + // if the last resync resulted in an error, and if so, + // the following two informations (see the ErrorCounter struct): + // - how many consecutive resync errors for this block? + // - when was the last try? + // These two informations are used to implement an + // exponential backoff retry strategy. + // The key in this tree is the 32-byte hash of the block, + // and the value is the encoded ErrorCounter value. + // + // We need to have these two trees, because the resync queue + // is not just a queue of items to process, but a set of items + // that are waiting a specific delay until we can process them + // (the delay being necessary both internally for the exponential + // backoff strategy, and exposed as a parameter when adding items + // to the queue, e.g. to wait until the GC delay has passed). + // This is why we need one tree ordered by time, and one + // ordered by identifier of item to be processed (block hash). + // + // When the worker wants to process an item it takes from + // resync_queue, it checks in resync_errors that if there is an + // exponential back-off delay to await, it has passed before we + // process the item. If not, the item in the queue is skipped + // (but added back for later processing after the time of the + // delay). + // + // An alternative that would have seemed natural is to + // only add items to resync_queue with a processing time that is + // after the delay, but there are several issues with this: + // - This requires to synchronize updates to resync_queue and + // resync_errors (with the current model, there is only one thread, + // the worker thread, that accesses resync_errors, + // so no need to synchronize) by putting them both in a lock. + // This would mean that block_incref might need to take a lock + // before doing its thing, meaning it has much more chances of + // not completing successfully if something bad happens to Garage. + // Currently Garage is not able to recover from block_incref that + // doesn't complete successfully, because it is necessary to ensure + // the consistency between the state of the block manager and + // information in the BlockRef table. + // - If a resync fails, we put that block in the resync_errors table, + // and also add it back to resync_queue to be processed after + // the exponential back-off delay, + // but maybe the block is already scheduled to be resynced again + // at another time that is before the exponential back-off delay, + // and we have no way to check that easily. This means that + // in all cases, we need to check the resync_errors table + // in the resync loop at the time when a block is popped from + // the resync_queue. + // Overall, the current design is therefore simpler and more robust + // because it tolerates inconsistencies between the resync_queue + // and resync_errors table (items being scheduled in resync_queue + // for times that are earlier than the exponential back-off delay + // is a natural condition that is handled properly). + fn spawn_background_worker(self: Arc) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); @@ -421,12 +495,12 @@ impl BlockManager { }); } - fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { + fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), sled::Error> { let when = now_msec() + delay.as_millis() as u64; self.put_to_resync_at(hash, when) } - fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> { + fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), sled::Error> { trace!("Put resync_queue: {} {:?}", when, hash); let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); @@ -461,7 +535,14 @@ impl BlockManager { } } - async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result { + // The result of resync_iter is: + // - Ok(true) -> a block was processed (successfully or not) + // - Ok(false) -> no block was processed, but we are ready for the next iteration + // - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors + async fn resync_iter( + &self, + must_exit: &mut watch::Receiver, + ) -> Result { if let Some(first_pair_res) = self.resync_queue.iter().next() { let (time_bytes, hash_bytes) = first_pair_res?; @@ -480,6 +561,8 @@ impl BlockManager { self.put_to_resync_at(&hash, ec.next_try())?; // ec.next_try() > now >= time_msec, so this remove // is not removing the one we added just above + // (we want to do the remove after the insert to ensure + // that the item is not lost if we crash in-between) self.resync_queue.remove(time_bytes)?; return Ok(false); } @@ -539,7 +622,15 @@ impl BlockManager { Ok(false) } } else { + // Here we wait either for a notification that an item has been + // added to the queue, or for a constant delay of 10 secs to expire. + // The delay avoids a race condition where the notification happens + // between the time we checked the queue and the first poll + // to resync_notify.notified(): if that happens, we'll just loop + // back 10 seconds later, which is fine. + let delay = tokio::time::sleep(Duration::from_secs(10)); select! { + _ = delay.fuse() => {}, _ = self.resync_notify.notified().fuse() => {}, _ = must_exit.changed().fuse() => {}, }