lifecycle worker: use queue_insert and process objects in batches
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Alex 2023-08-31 11:19:26 +02:00
parent 1cfcc61de8
commit adbf5925de
1 changed files with 45 additions and 36 deletions

View File

@ -152,41 +152,44 @@ impl Worker for LifecycleWorker {
pos,
last_bucket,
} => {
let (object_bytes, next_pos) = match self
.garage
.object_table
.data
.store
.get_gt(&pos)?
{
None => {
info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
self.persister
.set_with(|x| x.last_completed = Some(date.to_string()))?;
self.state = State::Completed(*date);
return Ok(WorkerState::Idle);
// Process a batch of 100 items before yielding to bg task scheduler
for _ in 0..100 {
let (object_bytes, next_pos) = match self
.garage
.object_table
.data
.store
.get_gt(&pos)?
{
None => {
info!("Lifecycle worker finished for {}, objects expired: {}, mpu aborted: {}", date, *objects_expired, *mpu_aborted);
self.persister
.set_with(|x| x.last_completed = Some(date.to_string()))?;
self.state = State::Completed(*date);
return Ok(WorkerState::Idle);
}
Some((k, v)) => (v, k),
};
let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
let skip = process_object(
&self.garage,
*date,
&object,
objects_expired,
mpu_aborted,
last_bucket,
)
.await?;
*counter += 1;
if skip == Skip::SkipBucket {
let bucket_id_len = object.bucket_id.as_slice().len();
assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
*pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat();
} else {
*pos = next_pos;
}
Some((k, v)) => (v, k),
};
let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
let skip = process_object(
&self.garage,
*date,
&object,
objects_expired,
mpu_aborted,
last_bucket,
)
.await?;
*counter += 1;
if skip == Skip::SkipBucket {
let bucket_id_len = object.bucket_id.as_slice().len();
assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
*pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat();
} else {
*pos = next_pos;
}
Ok(WorkerState::Busy)
@ -260,6 +263,8 @@ async fn process_object(
return Ok(Skip::SkipBucket);
}
let db = garage.object_table.data.store.db();
for rule in lifecycle_policy.iter() {
if !rule.enabled {
continue;
@ -310,7 +315,9 @@ async fn process_object(
"Lifecycle: expiring 1 object in bucket {:?}",
object.bucket_id
);
garage.object_table.insert(&deleted_object).await?;
db.transaction(|mut tx| {
garage.object_table.queue_insert(&mut tx, &deleted_object)
})?;
*objects_expired += 1;
}
}
@ -343,7 +350,9 @@ async fn process_object(
);
let aborted_object =
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
garage.object_table.insert(&aborted_object).await?;
db.transaction(|mut tx| {
garage.object_table.queue_insert(&mut tx, &aborted_object)
})?;
*mpu_aborted += n_aborted;
}
}