lifecycle worker: implement main functionality
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2023-08-30 14:28:48 +02:00
parent a2e0e34db5
commit 2996dc875f
3 changed files with 101 additions and 9 deletions

View file

@ -239,8 +239,8 @@ impl Filter {
fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter { fn internal_into_garage_lifecycle_filter(self) -> GarageLifecycleFilter {
GarageLifecycleFilter { GarageLifecycleFilter {
prefix: self.prefix.map(|x| x.0), prefix: self.prefix.map(|x| x.0),
size_gt: self.size_gt.map(|x| x.0 as usize), size_gt: self.size_gt.map(|x| x.0 as u64),
size_lt: self.size_lt.map(|x| x.0 as usize), size_lt: self.size_lt.map(|x| x.0 as u64),
} }
} }

View file

@ -95,9 +95,9 @@ mod v08 {
/// If Some(x), object key has to start with prefix x /// If Some(x), object key has to start with prefix x
pub prefix: Option<String>, pub prefix: Option<String>,
/// If Some(x), object size has to be more than x /// If Some(x), object size has to be more than x
pub size_gt: Option<usize>, pub size_gt: Option<u64>,
/// If Some(x), object size has to be less than x /// If Some(x), object size has to be less than x
pub size_lt: Option<usize>, pub size_lt: Option<u64>,
} }
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]

View file

@ -6,6 +6,7 @@ use std::time::{Duration, Instant};
use tokio::sync::watch; use tokio::sync::watch;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::{Error, OkOrMessage}; use garage_util::error::{Error, OkOrMessage};
use garage_util::persister::PersisterShared; use garage_util::persister::PersisterShared;
use garage_util::time::*; use garage_util::time::*;
@ -165,6 +166,7 @@ impl Worker for LifecycleWorker {
let object = self.garage.object_table.data.decode_entry(&object_bytes)?; let object = self.garage.object_table.data.decode_entry(&object_bytes)?;
process_object( process_object(
&self.garage, &self.garage,
*date,
object, object,
objects_expired, objects_expired,
mpu_aborted, mpu_aborted,
@ -184,7 +186,7 @@ impl Worker for LifecycleWorker {
match &self.state { match &self.state {
State::Completed(d) => { State::Completed(d) => {
let now = now_msec(); let now = now_msec();
let next_start = midnight_ts(d.succ()); let next_start = midnight_ts(d.succ_opt().expect("no next day"));
if now < next_start { if now < next_start {
tokio::time::sleep_until( tokio::time::sleep_until(
(Instant::now() + Duration::from_millis(next_start - now)).into(), (Instant::now() + Duration::from_millis(next_start - now)).into(),
@ -208,6 +210,7 @@ impl Worker for LifecycleWorker {
async fn process_object( async fn process_object(
garage: &Arc<Garage>, garage: &Arc<Garage>,
now_date: NaiveDate,
object: Object, object: Object,
objects_expired: &mut usize, objects_expired: &mut usize,
mpu_aborted: &mut usize, mpu_aborted: &mut usize,
@ -229,24 +232,113 @@ async fn process_object(
.unwrap_or_default(); .unwrap_or_default();
for rule in lifecycle_policy.iter() { for rule in lifecycle_policy.iter() {
todo!() if let Some(pfx) = &rule.filter.prefix {
if !object.key.starts_with(pfx) {
continue;
}
}
if let Some(expire) = &rule.expiration {
if let Some(current_version) = object.versions().iter().rev().find(|v| v.is_data()) {
let version_date = next_date(current_version.timestamp);
let current_version_data = match &current_version.state {
ObjectVersionState::Complete(c) => c,
_ => unreachable!(),
};
let size_match = check_size_filter(current_version_data, &rule.filter);
let date_match = match expire {
LifecycleExpiration::AfterDays(n_days) => {
(now_date - version_date) >= chrono::Duration::days(*n_days as i64)
}
LifecycleExpiration::AtDate(exp_date) => now_date >= *exp_date,
};
if size_match && date_match {
// Delete expired version
let deleted_object = Object::new(
object.bucket_id,
object.key.clone(),
vec![ObjectVersion {
uuid: gen_uuid(),
timestamp: std::cmp::max(now_msec(), current_version.timestamp + 1),
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
garage.object_table.insert(&deleted_object).await?;
*objects_expired += 1;
}
}
}
if let Some(abort_mpu_days) = &rule.abort_incomplete_mpu_days {
let aborted_versions = object
.versions()
.iter()
.filter_map(|v| {
let version_date = next_date(v.timestamp);
match &v.state {
ObjectVersionState::Uploading { .. }
if (now_date - version_date)
>= chrono::Duration::days(*abort_mpu_days as i64) =>
{
Some(ObjectVersion {
state: ObjectVersionState::Aborted,
..*v
})
}
_ => None,
}
})
.collect::<Vec<_>>();
if !aborted_versions.is_empty() {
// Insert aborted mpu info
let n_aborted = aborted_versions.len();
let aborted_object =
Object::new(object.bucket_id, object.key.clone(), aborted_versions);
garage.object_table.insert(&aborted_object).await?;
*mpu_aborted += n_aborted;
}
}
} }
*last_bucket = Some(bucket); *last_bucket = Some(bucket);
Ok(()) Ok(())
} }
fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter) -> bool {
let size = match version_data {
ObjectVersionData::Inline(meta, _) | ObjectVersionData::FirstBlock(meta, _) => meta.size,
_ => unreachable!(),
};
if let Some(size_gt) = filter.size_gt {
if !(size > size_gt) {
return false;
}
}
if let Some(size_lt) = filter.size_lt {
if !(size < size_lt) {
return false;
}
}
return true;
}
fn midnight_ts(date: NaiveDate) -> u64 { fn midnight_ts(date: NaiveDate) -> u64 {
date.and_hms(0, 0, 0).timestamp_millis() as u64 date.and_hms_opt(0, 0, 0)
.expect("midnight does not exist")
.timestamp_millis() as u64
} }
fn next_date(ts: u64) -> NaiveDate { fn next_date(ts: u64) -> NaiveDate {
NaiveDateTime::from_timestamp_millis(ts as i64) NaiveDateTime::from_timestamp_millis(ts as i64)
.expect("bad timestamp") .expect("bad timestamp")
.date() .date()
.succ() .succ_opt()
.expect("no next day")
} }
fn today() -> NaiveDate { fn today() -> NaiveDate {
Utc::today().naive_utc() Utc::now().naive_utc().date()
} }