object lifecycles (fix #309) #620

Merged
lx merged 25 commits from bucket-lifecycle into next 2023-09-04 09:45:11 +00:00
Showing only changes of commit da8b224e24 - Show all commits

View file

@ -54,6 +54,12 @@ enum State {
}, },
} }
#[derive(Clone, Copy, Eq, PartialEq)]
enum Skip {
SkipBucket,
NextObject,
}
pub fn register_bg_vars( pub fn register_bg_vars(
persister: &PersisterShared<LifecycleWorkerPersisted>, persister: &PersisterShared<LifecycleWorkerPersisted>,
vars: &mut vars::BgVars, vars: &mut vars::BgVars,
@ -164,10 +170,10 @@ impl Worker for LifecycleWorker {
}; };
let object = self.garage.object_table.data.decode_entry(&object_bytes)?; let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
process_object( let skip = process_object(
&self.garage, &self.garage,
*date, *date,
object, &object,
objects_expired, objects_expired,
mpu_aborted, mpu_aborted,
last_bucket, last_bucket,
@ -175,7 +181,13 @@ impl Worker for LifecycleWorker {
.await?; .await?;
*counter += 1; *counter += 1;
if skip == Skip::SkipBucket {
let bucket_id_len = object.bucket_id.as_slice().len();
assert_eq!(pos.get(..bucket_id_len), Some(object.bucket_id.as_slice()));
*pos = [&pos[..bucket_id_len], &[0xFFu8][..]].concat();
} else {
*pos = next_pos; *pos = next_pos;
}
Ok(WorkerState::Busy) Ok(WorkerState::Busy)
} }
@ -211,11 +223,11 @@ impl Worker for LifecycleWorker {
async fn process_object( async fn process_object(
garage: &Arc<Garage>, garage: &Arc<Garage>,
now_date: NaiveDate, now_date: NaiveDate,
object: Object, object: &Object,
objects_expired: &mut usize, objects_expired: &mut usize,
mpu_aborted: &mut usize, mpu_aborted: &mut usize,
last_bucket: &mut Option<Bucket>, last_bucket: &mut Option<Bucket>,
) -> Result<(), Error> { ) -> Result<Skip, Error> {
let bucket = match last_bucket.take() { let bucket = match last_bucket.take() {
Some(b) if b.id == object.bucket_id => b, Some(b) if b.id == object.bucket_id => b,
_ => garage _ => garage
@ -231,6 +243,10 @@ async fn process_object(
.and_then(|s| s.lifecycle_config.get().as_deref()) .and_then(|s| s.lifecycle_config.get().as_deref())
.unwrap_or_default(); .unwrap_or_default();
if lifecycle_policy.is_empty() {
return Ok(Skip::SkipBucket);
}
for rule in lifecycle_policy.iter() { for rule in lifecycle_policy.iter() {
if let Some(pfx) = &rule.filter.prefix { if let Some(pfx) = &rule.filter.prefix {
if !object.key.starts_with(pfx) { if !object.key.starts_with(pfx) {
@ -304,7 +320,7 @@ async fn process_object(
} }
*last_bucket = Some(bucket); *last_bucket = Some(bucket);
Ok(()) Ok(Skip::NextObject)
} }
fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool { fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {