Implement exponential backoff for resync retries #252

Merged
lx merged 1 commit from bug/resync-exponential-backoff into feature/opentelemetry 2022-03-14 10:47:40 +00:00
3 changed files with 106 additions and 5 deletions
Showing only changes of commit fe62d01b7e - Show all commits

View file

@ -39,7 +39,7 @@ use crate::garage::Garage;
pub const INLINE_THRESHOLD: usize = 3072;
pub const BACKGROUND_WORKERS: u64 = 1;
pub const BACKGROUND_TRANQUILITY: u32 = 3;
pub const BACKGROUND_TRANQUILITY: u32 = 2;

Just curiosity, but is there a reason why you decreased it?

Just curiosity, but is there a reason why you decreased it?
// Timeout for RPCs that read and write blocks to remote nodes
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
// The delay between the time where a resync operation fails
// and the time when it is retried.
// and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
@ -158,6 +159,7 @@ pub struct BlockManager {
resync_queue: SledCountedTree,
resync_notify: Notify,
resync_errors: SledCountedTree,
system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
@ -187,13 +189,18 @@ impl BlockManager {
.expect("Unable to open block_local_resync_queue tree");
let resync_queue = SledCountedTree::new(resync_queue);
let resync_errors = db
.open_tree("block_local_resync_errors")
.expect("Unable to open block_local_resync_errors tree");
let resync_errors = SledCountedTree::new(resync_errors);
let endpoint = system
.netapp
.endpoint("garage_model/block.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked();
let metrics = BlockManagerMetrics::new(resync_queue.clone());
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self {
replication,
@ -202,6 +209,7 @@ impl BlockManager {
rc,
resync_queue,
resync_notify: Notify::new(),
resync_errors,
system,
endpoint,
garage: ArcSwapOption::from(None),
@ -519,6 +527,10 @@ impl BlockManager {
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
trace!("Put resync_queue: {} {:?}", when, hash);
let mut key = u64::to_be_bytes(when).to_vec();
key.extend(hash.as_ref());
@ -560,6 +572,17 @@ impl BlockManager {
if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
let ec = ErrorCounter::decode(ec);
if now < ec.next_try() {
// if next retry after an error is not yet,

I know understand that your correctness PR depends on this one.

I know understand that your correctness PR depends on this one.
// don't do resync and return early, but still
// make sure the item is still in queue at expected time
self.put_to_resync_at(&hash, ec.next_try())?;
return Ok(false);
}
}
let tracer = opentelemetry::global::tracer("garage");
let trace_id = gen_uuid();
let span = tracer
@ -584,8 +607,19 @@ impl BlockManager {
if let Err(e) = &res {
self.metrics.resync_error_counter.add(1);
warn!("Error when resyncing {:?}: {}", hash, e);
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
let err_counter = match self.resync_errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(ec).add1(),
None => ErrorCounter::new(),
};
self.put_to_resync_at(&hash, err_counter.next_try())?;
self.resync_errors
.insert(hash.as_slice(), err_counter.encode())?;
} else {
self.resync_errors.remove(hash.as_slice())?;
}
Ok(true)
} else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
@ -994,6 +1028,58 @@ impl RcEntry {
}
}
/// Counts the number of errors when resyncing a block,
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
struct ErrorCounter {
errors: u64,
last_try: u64,
}
impl Default for ErrorCounter {
fn default() -> Self {
Self {
errors: 1,
last_try: now_msec(),
}
}
}
impl ErrorCounter {
fn new() -> Self {
Self::default()
}
fn decode(data: sled::IVec) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
u64::to_be_bytes(self.last_try),
]
.concat()
}
fn add1(self) -> Self {
Self {
errors: self.errors + 1,
last_try: now_msec(),
}
}
fn delay_msec(&self) -> u64 {
(RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
}
fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;

View file

@ -5,6 +5,7 @@ use garage_util::sled_counter::SledCountedTree;
/// TableMetrics reference all counter used for metrics
pub struct BlockManagerMetrics {
pub(crate) _resync_queue_len: ValueObserver<u64>,
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
pub(crate) resync_counter: BoundCounter<u64>,
pub(crate) resync_error_counter: BoundCounter<u64>,
@ -22,7 +23,7 @@ pub struct BlockManagerMetrics {
}
impl BlockManagerMetrics {
pub fn new(resync_queue: SledCountedTree) -> Self {
pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
let meter = global::meter("garage_model/block");
Self {
_resync_queue_len: meter
@ -33,6 +34,12 @@ impl BlockManagerMetrics {
"Number of block hashes queued for local check and possible resync",
)
.init(),
_resync_errored_blocks: meter
.u64_value_observer("block.resync_errored_blocks", move |observer| {
observer.observe(resync_errors.len() as u64, &[])
})
.with_description("Number of block hashes whose last resync resulted in an error")
.init(),
resync_counter: meter
.u64_counter("block.resync_counter")

View file

@ -52,6 +52,14 @@ impl SledCountedTree {
res
}
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
let res = self.0.tree.remove(key);
if matches!(res, Ok(Some(_))) {
self.0.len.fetch_sub(1, Ordering::Relaxed);
}
res
}
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
let res = self.0.tree.pop_min();
if let Ok(Some(_)) = &res {