Implement exponential backoff for resync retries #252
3 changed files with 106 additions and 5 deletions
|
@ -39,7 +39,7 @@ use crate::garage::Garage;
|
|||
pub const INLINE_THRESHOLD: usize = 3072;
|
||||
|
||||
pub const BACKGROUND_WORKERS: u64 = 1;
|
||||
pub const BACKGROUND_TRANQUILITY: u32 = 3;
|
||||
pub const BACKGROUND_TRANQUILITY: u32 = 2;
|
||||
|
||||
// Timeout for RPCs that read and write blocks to remote nodes
|
||||
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
|
|||
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
|
||||
// The delay between the time where a resync operation fails
|
||||
// and the time when it is retried.
|
||||
// and the time when it is retried, with exponential backoff
|
||||
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
|
||||
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||
|
||||
// The delay between the moment when the reference counter
|
||||
|
@ -158,6 +159,7 @@ pub struct BlockManager {
|
|||
|
||||
resync_queue: SledCountedTree,
|
||||
resync_notify: Notify,
|
||||
resync_errors: SledCountedTree,
|
||||
|
||||
system: Arc<System>,
|
||||
endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
||||
|
@ -187,13 +189,18 @@ impl BlockManager {
|
|||
.expect("Unable to open block_local_resync_queue tree");
|
||||
let resync_queue = SledCountedTree::new(resync_queue);
|
||||
|
||||
let resync_errors = db
|
||||
.open_tree("block_local_resync_errors")
|
||||
.expect("Unable to open block_local_resync_errors tree");
|
||||
let resync_errors = SledCountedTree::new(resync_errors);
|
||||
|
||||
let endpoint = system
|
||||
.netapp
|
||||
.endpoint("garage_model/block.rs/Rpc".to_string());
|
||||
|
||||
let manager_locked = BlockManagerLocked();
|
||||
|
||||
let metrics = BlockManagerMetrics::new(resync_queue.clone());
|
||||
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
|
||||
|
||||
let block_manager = Arc::new(Self {
|
||||
replication,
|
||||
|
@ -202,6 +209,7 @@ impl BlockManager {
|
|||
rc,
|
||||
resync_queue,
|
||||
resync_notify: Notify::new(),
|
||||
resync_errors,
|
||||
system,
|
||||
endpoint,
|
||||
garage: ArcSwapOption::from(None),
|
||||
|
@ -519,6 +527,10 @@ impl BlockManager {
|
|||
|
||||
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
|
||||
let when = now_msec() + delay.as_millis() as u64;
|
||||
self.put_to_resync_at(hash, when)
|
||||
}
|
||||
|
||||
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
|
||||
trace!("Put resync_queue: {} {:?}", when, hash);
|
||||
let mut key = u64::to_be_bytes(when).to_vec();
|
||||
key.extend(hash.as_ref());
|
||||
|
@ -560,6 +572,17 @@ impl BlockManager {
|
|||
if now >= time_msec {
|
||||
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
||||
|
||||
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
|
||||
let ec = ErrorCounter::decode(ec);
|
||||
if now < ec.next_try() {
|
||||
// if next retry after an error is not yet,
|
||||
// don't do resync and return early, but still
|
||||
// make sure the item is still in queue at expected time
|
||||
self.put_to_resync_at(&hash, ec.next_try())?;
|
||||
return Ok(false);
|
||||
}
|
||||
}
|
||||
|
||||
let tracer = opentelemetry::global::tracer("garage");
|
||||
let trace_id = gen_uuid();
|
||||
let span = tracer
|
||||
|
@ -584,8 +607,19 @@ impl BlockManager {
|
|||
if let Err(e) = &res {
|
||||
self.metrics.resync_error_counter.add(1);
|
||||
warn!("Error when resyncing {:?}: {}", hash, e);
|
||||
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
|
||||
|
||||
let err_counter = match self.resync_errors.get(hash.as_slice())? {
|
||||
Some(ec) => ErrorCounter::decode(ec).add1(),
|
||||
None => ErrorCounter::new(),
|
||||
};
|
||||
|
||||
self.put_to_resync_at(&hash, err_counter.next_try())?;
|
||||
self.resync_errors
|
||||
.insert(hash.as_slice(), err_counter.encode())?;
|
||||
} else {
|
||||
self.resync_errors.remove(hash.as_slice())?;
|
||||
}
|
||||
|
||||
Ok(true)
|
||||
} else {
|
||||
self.resync_queue.insert(time_bytes, hash_bytes)?;
|
||||
|
@ -994,6 +1028,58 @@ impl RcEntry {
|
|||
}
|
||||
}
|
||||
|
||||
/// Counts the number of errors when resyncing a block,
|
||||
/// and the time of the last try.
|
||||
/// Used to implement exponential backoff.
|
||||
#[derive(Clone, Copy, Debug)]
|
||||
struct ErrorCounter {
|
||||
errors: u64,
|
||||
last_try: u64,
|
||||
}
|
||||
|
||||
impl Default for ErrorCounter {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
errors: 1,
|
||||
last_try: now_msec(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl ErrorCounter {
|
||||
fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
fn decode(data: sled::IVec) -> Self {
|
||||
Self {
|
||||
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
|
||||
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
|
||||
}
|
||||
}
|
||||
fn encode(&self) -> Vec<u8> {
|
||||
[
|
||||
u64::to_be_bytes(self.errors),
|
||||
u64::to_be_bytes(self.last_try),
|
||||
]
|
||||
.concat()
|
||||
}
|
||||
|
||||
fn add1(self) -> Self {
|
||||
Self {
|
||||
errors: self.errors + 1,
|
||||
last_try: now_msec(),
|
||||
}
|
||||
}
|
||||
|
||||
fn delay_msec(&self) -> u64 {
|
||||
(RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
|
||||
}
|
||||
fn next_try(&self) -> u64 {
|
||||
self.last_try + self.delay_msec()
|
||||
}
|
||||
}
|
||||
|
||||
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
||||
let mut result = Vec::<u8>::new();
|
||||
let mut encoder = Encoder::new(&mut result, level)?;
|
||||
|
|
|
@ -5,6 +5,7 @@ use garage_util::sled_counter::SledCountedTree;
|
|||
/// TableMetrics reference all counter used for metrics
|
||||
pub struct BlockManagerMetrics {
|
||||
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
||||
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
||||
|
||||
pub(crate) resync_counter: BoundCounter<u64>,
|
||||
pub(crate) resync_error_counter: BoundCounter<u64>,
|
||||
|
@ -22,7 +23,7 @@ pub struct BlockManagerMetrics {
|
|||
}
|
||||
|
||||
impl BlockManagerMetrics {
|
||||
pub fn new(resync_queue: SledCountedTree) -> Self {
|
||||
pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
|
||||
let meter = global::meter("garage_model/block");
|
||||
Self {
|
||||
_resync_queue_len: meter
|
||||
|
@ -33,6 +34,12 @@ impl BlockManagerMetrics {
|
|||
"Number of block hashes queued for local check and possible resync",
|
||||
)
|
||||
.init(),
|
||||
_resync_errored_blocks: meter
|
||||
.u64_value_observer("block.resync_errored_blocks", move |observer| {
|
||||
observer.observe(resync_errors.len() as u64, &[])
|
||||
})
|
||||
.with_description("Number of block hashes whose last resync resulted in an error")
|
||||
.init(),
|
||||
|
||||
resync_counter: meter
|
||||
.u64_counter("block.resync_counter")
|
||||
|
|
|
@ -52,6 +52,14 @@ impl SledCountedTree {
|
|||
res
|
||||
}
|
||||
|
||||
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
|
||||
let res = self.0.tree.remove(key);
|
||||
if matches!(res, Ok(Some(_))) {
|
||||
self.0.len.fetch_sub(1, Ordering::Relaxed);
|
||||
}
|
||||
res
|
||||
}
|
||||
|
||||
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
|
||||
let res = self.0.tree.pop_min();
|
||||
if let Ok(Some(_)) = &res {
|
||||
|
|
Loading…
Reference in a new issue