online repair workers: retry on error

This commit is contained in:
Alex 2022-12-14 16:31:31 +01:00
parent 13c8662126
commit 1fcd0b371b
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE

View file

@ -92,19 +92,14 @@ impl Worker for RepairVersionsWorker {
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? { let (item_bytes, next_pos) = match self.garage.version_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => { Some((k, v)) => (v, k),
self.pos = k;
v
}
None => { None => {
info!("repair_versions: finished, done {}", self.counter); info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerState::Done); return Ok(WorkerState::Done);
} }
}; };
self.counter += 1;
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?; let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if !version.deleted.get() { if !version.deleted.get() {
let object = self let object = self
@ -133,6 +128,9 @@ impl Worker for RepairVersionsWorker {
} }
} }
self.counter += 1;
self.pos = next_pos;
Ok(WorkerState::Busy) Ok(WorkerState::Busy)
} }
@ -173,19 +171,14 @@ impl Worker for RepairBlockrefsWorker {
} }
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> { async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? { let (item_bytes, next_pos) = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => { Some((k, v)) => (v, k),
self.pos = k;
v
}
None => { None => {
info!("repair_block_ref: finished, done {}", self.counter); info!("repair_block_ref: finished, done {}", self.counter);
return Ok(WorkerState::Done); return Ok(WorkerState::Done);
} }
}; };
self.counter += 1;
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if !block_ref.deleted.get() { if !block_ref.deleted.get() {
let version = self let version = self
@ -211,6 +204,9 @@ impl Worker for RepairBlockrefsWorker {
} }
} }
self.counter += 1;
self.pos = next_pos;
Ok(WorkerState::Busy) Ok(WorkerState::Busy)
} }