Fix resync queue to not drop items

This commit is contained in:
Alex 2022-03-01 14:55:37 +01:00
parent f7e6f4616f
commit d78bf379fb
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE

View file

@ -562,9 +562,12 @@ impl BlockManager {
} }
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> { async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.pop_min()? { if let Some(first_pair_res) = self.resync_queue.iter().next() {
let (time_bytes, hash_bytes) = first_pair_res?;
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap()); let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec(); let now = now_msec();
if now >= time_msec { if now >= time_msec {
let hash = Hash::try_from(&hash_bytes[..]).unwrap(); let hash = Hash::try_from(&hash_bytes[..]).unwrap();
@ -575,6 +578,9 @@ impl BlockManager {
// don't do resync and return early, but still // don't do resync and return early, but still
// make sure the item is still in queue at expected time // make sure the item is still in queue at expected time
self.put_to_resync_at(&hash, ec.next_try())?; self.put_to_resync_at(&hash, ec.next_try())?;
// ec.next_try() > now >= time_msec, so this remove
// is not removing the one we added just above
self.resync_queue.remove(time_bytes)?;
return Ok(false); return Ok(false);
} }
} }
@ -605,20 +611,25 @@ impl BlockManager {
warn!("Error when resyncing {:?}: {}", hash, e); warn!("Error when resyncing {:?}: {}", hash, e);
let err_counter = match self.resync_errors.get(hash.as_slice())? { let err_counter = match self.resync_errors.get(hash.as_slice())? {
Some(ec) => ErrorCounter::decode(ec).add1(), Some(ec) => ErrorCounter::decode(ec).add1(now + 1),
None => ErrorCounter::new(), None => ErrorCounter::new(now + 1),
}; };
self.put_to_resync_at(&hash, err_counter.next_try())?;
self.resync_errors self.resync_errors
.insert(hash.as_slice(), err_counter.encode())?; .insert(hash.as_slice(), err_counter.encode())?;
self.put_to_resync_at(&hash, err_counter.next_try())?;
// err_counter.next_try() >= now + 1 > now,
// the entry we remove from the queue is not
// the entry we inserted with put_to_resync_at
self.resync_queue.remove(time_bytes)?;
} else { } else {
self.resync_errors.remove(hash.as_slice())?; self.resync_errors.remove(hash.as_slice())?;
self.resync_queue.remove(time_bytes)?;
} }
Ok(true) Ok(true)
} else { } else {
self.resync_queue.insert(time_bytes, hash_bytes)?;
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! { select! {
_ = delay.fuse() => {}, _ = delay.fuse() => {},
@ -1044,19 +1055,13 @@ struct ErrorCounter {
last_try: u64, last_try: u64,
} }
impl Default for ErrorCounter { impl ErrorCounter {
fn default() -> Self { fn new(now: u64) -> Self {
Self { Self {
errors: 1, errors: 1,
last_try: now_msec(), last_try: now,
} }
} }
}
impl ErrorCounter {
fn new() -> Self {
Self::default()
}
fn decode(data: sled::IVec) -> Self { fn decode(data: sled::IVec) -> Self {
Self { Self {
@ -1072,10 +1077,10 @@ impl ErrorCounter {
.concat() .concat()
} }
fn add1(self) -> Self { fn add1(self, now: u64) -> Self {
Self { Self {
errors: self.errors + 1, errors: self.errors + 1,
last_try: now_msec(), last_try: now,
} }
} }