Implement exponential backoff for resync retries

This commit is contained in:
Alex 2022-02-25 20:42:56 +01:00
parent 8c2a8ecd98
commit 6059ce31c8
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
3 changed files with 106 additions and 5 deletions

View file

@ -39,7 +39,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072; pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1; pub const BACKGROUND_WORKERS: u64 = 1;
pub const BACKGROUND_TRANQUILITY: u32 = 3; pub const BACKGROUND_TRANQUILITY: u32 = 2;
// Timeout for RPCs that read and write blocks to remote nodes // Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30); const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5); const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
// The delay between the time where a resync operation fails // The delay between the time where a resync operation fails
// and the time when it is retried. // and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60); const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter // The delay between the moment when the reference counter
@ -158,6 +159,7 @@ pub struct BlockManager {
resync_queue: SledCountedTree, resync_queue: SledCountedTree,
resync_notify: Notify, resync_notify: Notify,
resync_errors: SledCountedTree,
system: Arc<System>, system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>, endpoint: Arc<Endpoint<BlockRpc, Self>>,
@ -187,13 +189,18 @@ impl BlockManager {
.expect("Unable to open block_local_resync_queue tree"); .expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue); let resync_queue = SledCountedTree::new(resync_queue);
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
let resync_errors = SledCountedTree::new(resync_errors);
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint("garage_model/block.rs/Rpc".to_string()); .endpoint("garage_model/block.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked(); let manager_locked = BlockManagerLocked();
let metrics = BlockManagerMetrics::new(resync_queue.clone()); let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self { let block_manager = Arc::new(Self {
replication, replication,
@ -202,6 +209,7 @@ impl BlockManager {
rc, rc,
resync_queue, resync_queue,
resync_notify: Notify::new(), resync_notify: Notify::new(),
resync_errors,
system, system,
endpoint, endpoint,
garage: ArcSwapOption::from(None), garage: ArcSwapOption::from(None),
@ -519,6 +527,10 @@ impl BlockManager {
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> { fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay.as_millis() as u64; let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
trace!("Put resync_queue: {} {:?}", when, hash); trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec(); let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref()); key.extend(hash.as_ref());
@ -560,6 +572,17 @@ impl BlockManager {
if now >= time_msec { if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let hash = Hash::try_from(&hash_bytes[..]).unwrap();
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
let ec = ErrorCounter::decode(ec);
if now < ec.next_try() {
// if next retry after an error is not yet,
// don't do resync and return early, but still
// make sure the item is still in queue at expected time
self.put_to_resync_at(&hash, ec.next_try())?;
return Ok(false);
}
}
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid(); let trace_id = gen_uuid();
let span = tracer let span = tracer
@ -584,8 +607,19 @@ impl BlockManager {
if let Err(e) = &res { if let Err(e) = &res {
self.metrics.resync_error_counter.add(1); self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e); warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
let err_counter = match self.resync_errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(ec).add1(),
None => ErrorCounter::new(),
};
self.put_to_resync_at(&hash, err_counter.next_try())?;
self.resync_errors
.insert(hash.as_slice(), err_counter.encode())?;
} else {
self.resync_errors.remove(hash.as_slice())?;
} }
Ok(true) Ok(true)
} else { } else {
self.resync_queue.insert(time_bytes, hash_bytes)?; self.resync_queue.insert(time_bytes, hash_bytes)?;
@ -994,6 +1028,58 @@ impl RcEntry {
} }
} }
/// Counts the number of errors when resyncing a block,
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
struct ErrorCounter {
errors: u64,
last_try: u64,
}
impl Default for ErrorCounter {
fn default() -> Self {
Self {
errors: 1,
last_try: now_msec(),
}
}
}
impl ErrorCounter {
fn new() -> Self {
Self::default()
}
fn decode(data: sled::IVec) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
u64::to_be_bytes(self.last_try),
]
.concat()
}
fn add1(self) -> Self {
Self {
errors: self.errors + 1,
last_try: now_msec(),
}
}
fn delay_msec(&self) -> u64 {
(RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
}
fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new(); let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?; let mut encoder = Encoder::new(&mut result, level)?;

View file

@ -5,6 +5,7 @@ use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics /// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics { pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>, pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>, pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>, pub(crate) resync_error_counter: BoundCounter<u64>,
@ -22,7 +23,7 @@ pub struct BlockManagerMetrics {
} }
impl BlockManagerMetrics { impl BlockManagerMetrics {
pub fn new(resync_queue: SledCountedTree) -> Self { pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block"); let meter = global::meter("garage_model/block");
Self { Self {
_resync_queue_len: meter _resync_queue_len: meter
@ -33,6 +34,12 @@ impl BlockManagerMetrics {
"Number of block hashes queued for local check and possible resync", "Number of block hashes queued for local check and possible resync",
) )
.init(), .init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
observer.observe(resync_errors.len() as u64, &[])
})
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
resync_counter: meter resync_counter: meter
.u64_counter("block.resync_counter") .u64_counter("block.resync_counter")

View file

@ -52,6 +52,14 @@ impl SledCountedTree {
res res
} }
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
let res = self.0.tree.remove(key);
if matches!(res, Ok(Some(_))) {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
res
}
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> { pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min(); let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res { if let Ok(Some(_)) = &res {