Implement exponential backoff for resync retries #252
3 changed files with 106 additions and 5 deletions
|
@ -39,7 +39,7 @@ use crate::garage::Garage;
|
||||||
pub const INLINE_THRESHOLD: usize = 3072;
|
pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
|
|
||||||
pub const BACKGROUND_WORKERS: u64 = 1;
|
pub const BACKGROUND_WORKERS: u64 = 1;
|
||||||
pub const BACKGROUND_TRANQUILITY: u32 = 3;
|
pub const BACKGROUND_TRANQUILITY: u32 = 2;
|
||||||
|
|||||||
|
|
||||||
// Timeout for RPCs that read and write blocks to remote nodes
|
// Timeout for RPCs that read and write blocks to remote nodes
|
||||||
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
|
const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
|
@ -48,7 +48,8 @@ const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(30);
|
||||||
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(5);
|
||||||
|
|
||||||
// The delay between the time where a resync operation fails
|
// The delay between the time where a resync operation fails
|
||||||
// and the time when it is retried.
|
// and the time when it is retried, with exponential backoff
|
||||||
|
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
|
||||||
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
|
const RESYNC_RETRY_DELAY: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
// The delay between the moment when the reference counter
|
// The delay between the moment when the reference counter
|
||||||
|
@ -158,6 +159,7 @@ pub struct BlockManager {
|
||||||
|
|
||||||
resync_queue: SledCountedTree,
|
resync_queue: SledCountedTree,
|
||||||
resync_notify: Notify,
|
resync_notify: Notify,
|
||||||
|
resync_errors: SledCountedTree,
|
||||||
|
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
||||||
|
@ -187,13 +189,18 @@ impl BlockManager {
|
||||||
.expect("Unable to open block_local_resync_queue tree");
|
.expect("Unable to open block_local_resync_queue tree");
|
||||||
let resync_queue = SledCountedTree::new(resync_queue);
|
let resync_queue = SledCountedTree::new(resync_queue);
|
||||||
|
|
||||||
|
let resync_errors = db
|
||||||
|
.open_tree("block_local_resync_errors")
|
||||||
|
.expect("Unable to open block_local_resync_errors tree");
|
||||||
|
let resync_errors = SledCountedTree::new(resync_errors);
|
||||||
|
|
||||||
let endpoint = system
|
let endpoint = system
|
||||||
.netapp
|
.netapp
|
||||||
.endpoint("garage_model/block.rs/Rpc".to_string());
|
.endpoint("garage_model/block.rs/Rpc".to_string());
|
||||||
|
|
||||||
let manager_locked = BlockManagerLocked();
|
let manager_locked = BlockManagerLocked();
|
||||||
|
|
||||||
let metrics = BlockManagerMetrics::new(resync_queue.clone());
|
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
|
||||||
|
|
||||||
let block_manager = Arc::new(Self {
|
let block_manager = Arc::new(Self {
|
||||||
replication,
|
replication,
|
||||||
|
@ -202,6 +209,7 @@ impl BlockManager {
|
||||||
rc,
|
rc,
|
||||||
resync_queue,
|
resync_queue,
|
||||||
resync_notify: Notify::new(),
|
resync_notify: Notify::new(),
|
||||||
|
resync_errors,
|
||||||
system,
|
system,
|
||||||
endpoint,
|
endpoint,
|
||||||
garage: ArcSwapOption::from(None),
|
garage: ArcSwapOption::from(None),
|
||||||
|
@ -519,6 +527,10 @@ impl BlockManager {
|
||||||
|
|
||||||
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
|
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> Result<(), Error> {
|
||||||
let when = now_msec() + delay.as_millis() as u64;
|
let when = now_msec() + delay.as_millis() as u64;
|
||||||
|
self.put_to_resync_at(hash, when)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn put_to_resync_at(&self, hash: &Hash, when: u64) -> Result<(), Error> {
|
||||||
trace!("Put resync_queue: {} {:?}", when, hash);
|
trace!("Put resync_queue: {} {:?}", when, hash);
|
||||||
let mut key = u64::to_be_bytes(when).to_vec();
|
let mut key = u64::to_be_bytes(when).to_vec();
|
||||||
key.extend(hash.as_ref());
|
key.extend(hash.as_ref());
|
||||||
|
@ -560,6 +572,17 @@ impl BlockManager {
|
||||||
if now >= time_msec {
|
if now >= time_msec {
|
||||||
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
let hash = Hash::try_from(&hash_bytes[..]).unwrap();
|
||||||
|
|
||||||
|
if let Some(ec) = self.resync_errors.get(hash.as_slice())? {
|
||||||
|
let ec = ErrorCounter::decode(ec);
|
||||||
|
if now < ec.next_try() {
|
||||||
|
// if next retry after an error is not yet,
|
||||||
quentin
commented
I know understand that your correctness PR depends on this one. I know understand that your correctness PR depends on this one.
|
|||||||
|
// don't do resync and return early, but still
|
||||||
|
// make sure the item is still in queue at expected time
|
||||||
|
self.put_to_resync_at(&hash, ec.next_try())?;
|
||||||
|
return Ok(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let tracer = opentelemetry::global::tracer("garage");
|
let tracer = opentelemetry::global::tracer("garage");
|
||||||
let trace_id = gen_uuid();
|
let trace_id = gen_uuid();
|
||||||
let span = tracer
|
let span = tracer
|
||||||
|
@ -584,8 +607,19 @@ impl BlockManager {
|
||||||
if let Err(e) = &res {
|
if let Err(e) = &res {
|
||||||
self.metrics.resync_error_counter.add(1);
|
self.metrics.resync_error_counter.add(1);
|
||||||
warn!("Error when resyncing {:?}: {}", hash, e);
|
warn!("Error when resyncing {:?}: {}", hash, e);
|
||||||
self.put_to_resync(&hash, RESYNC_RETRY_DELAY)?;
|
|
||||||
|
let err_counter = match self.resync_errors.get(hash.as_slice())? {
|
||||||
|
Some(ec) => ErrorCounter::decode(ec).add1(),
|
||||||
|
None => ErrorCounter::new(),
|
||||||
|
};
|
||||||
|
|
||||||
|
self.put_to_resync_at(&hash, err_counter.next_try())?;
|
||||||
|
self.resync_errors
|
||||||
|
.insert(hash.as_slice(), err_counter.encode())?;
|
||||||
|
} else {
|
||||||
|
self.resync_errors.remove(hash.as_slice())?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(true)
|
Ok(true)
|
||||||
} else {
|
} else {
|
||||||
self.resync_queue.insert(time_bytes, hash_bytes)?;
|
self.resync_queue.insert(time_bytes, hash_bytes)?;
|
||||||
|
@ -994,6 +1028,58 @@ impl RcEntry {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Counts the number of errors when resyncing a block,
|
||||||
|
/// and the time of the last try.
|
||||||
|
/// Used to implement exponential backoff.
|
||||||
|
#[derive(Clone, Copy, Debug)]
|
||||||
|
struct ErrorCounter {
|
||||||
|
errors: u64,
|
||||||
|
last_try: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for ErrorCounter {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
errors: 1,
|
||||||
|
last_try: now_msec(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl ErrorCounter {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self::default()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn decode(data: sled::IVec) -> Self {
|
||||||
|
Self {
|
||||||
|
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
|
||||||
|
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn encode(&self) -> Vec<u8> {
|
||||||
|
[
|
||||||
|
u64::to_be_bytes(self.errors),
|
||||||
|
u64::to_be_bytes(self.last_try),
|
||||||
|
]
|
||||||
|
.concat()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn add1(self) -> Self {
|
||||||
|
Self {
|
||||||
|
errors: self.errors + 1,
|
||||||
|
last_try: now_msec(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delay_msec(&self) -> u64 {
|
||||||
|
(RESYNC_RETRY_DELAY.as_millis() as u64) << std::cmp::min(self.errors - 1, 10)
|
||||||
|
}
|
||||||
|
fn next_try(&self) -> u64 {
|
||||||
|
self.last_try + self.delay_msec()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
||||||
let mut result = Vec::<u8>::new();
|
let mut result = Vec::<u8>::new();
|
||||||
let mut encoder = Encoder::new(&mut result, level)?;
|
let mut encoder = Encoder::new(&mut result, level)?;
|
||||||
|
|
|
@ -5,6 +5,7 @@ use garage_util::sled_counter::SledCountedTree;
|
||||||
/// TableMetrics reference all counter used for metrics
|
/// TableMetrics reference all counter used for metrics
|
||||||
pub struct BlockManagerMetrics {
|
pub struct BlockManagerMetrics {
|
||||||
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
||||||
|
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
||||||
|
|
||||||
pub(crate) resync_counter: BoundCounter<u64>,
|
pub(crate) resync_counter: BoundCounter<u64>,
|
||||||
pub(crate) resync_error_counter: BoundCounter<u64>,
|
pub(crate) resync_error_counter: BoundCounter<u64>,
|
||||||
|
@ -22,7 +23,7 @@ pub struct BlockManagerMetrics {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockManagerMetrics {
|
impl BlockManagerMetrics {
|
||||||
pub fn new(resync_queue: SledCountedTree) -> Self {
|
pub fn new(resync_queue: SledCountedTree, resync_errors: SledCountedTree) -> Self {
|
||||||
let meter = global::meter("garage_model/block");
|
let meter = global::meter("garage_model/block");
|
||||||
Self {
|
Self {
|
||||||
_resync_queue_len: meter
|
_resync_queue_len: meter
|
||||||
|
@ -33,6 +34,12 @@ impl BlockManagerMetrics {
|
||||||
"Number of block hashes queued for local check and possible resync",
|
"Number of block hashes queued for local check and possible resync",
|
||||||
)
|
)
|
||||||
.init(),
|
.init(),
|
||||||
|
_resync_errored_blocks: meter
|
||||||
|
.u64_value_observer("block.resync_errored_blocks", move |observer| {
|
||||||
|
observer.observe(resync_errors.len() as u64, &[])
|
||||||
|
})
|
||||||
|
.with_description("Number of block hashes whose last resync resulted in an error")
|
||||||
|
.init(),
|
||||||
|
|
||||||
resync_counter: meter
|
resync_counter: meter
|
||||||
.u64_counter("block.resync_counter")
|
.u64_counter("block.resync_counter")
|
||||||
|
|
|
@ -52,6 +52,14 @@ impl SledCountedTree {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn remove<K: AsRef<[u8]>>(&self, key: K) -> Result<Option<IVec>> {
|
||||||
|
let res = self.0.tree.remove(key);
|
||||||
|
if matches!(res, Ok(Some(_))) {
|
||||||
|
self.0.len.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
|
||||||
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
|
pub fn pop_min(&self) -> Result<Option<(IVec, IVec)>> {
|
||||||
let res = self.0.tree.pop_min();
|
let res = self.0.tree.pop_min();
|
||||||
if let Ok(Some(_)) = &res {
|
if let Ok(Some(_)) = &res {
|
||||||
|
|
Loading…
Reference in a new issue
Just curiosity, but is there a reason why you decreased it?