Ensure increasing version timestamps when writing new object versions #543
3 changed files with 21 additions and 26 deletions
|
@ -3,12 +3,12 @@ use std::sync::Arc;
|
|||
use hyper::{Body, Request, Response, StatusCode};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::time::*;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
use garage_model::s3::object_table::*;
|
||||
|
||||
use crate::s3::error::*;
|
||||
use crate::s3::put::next_timestamp;
|
||||
use crate::s3::xml as s3_xml;
|
||||
use crate::signature::verify_signed_content;
|
||||
|
||||
|
@ -23,40 +23,36 @@ async fn handle_delete_internal(
|
|||
.await?
|
||||
.ok_or(Error::NoSuchKey)?; // No need to delete
|
||||
|
||||
let interesting_versions = object.versions().iter().filter(|v| {
|
||||
!matches!(
|
||||
v.state,
|
||||
ObjectVersionState::Aborted
|
||||
| ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
||||
)
|
||||
});
|
||||
let del_timestamp = next_timestamp(Some(&object));
|
||||
let del_uuid = gen_uuid();
|
||||
|
||||
let mut version_to_delete = None;
|
||||
let mut timestamp = now_msec();
|
||||
for v in interesting_versions {
|
||||
if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
|
||||
version_to_delete = Some(v.uuid);
|
||||
let deleted_version = object
|
||||
.versions()
|
||||
.iter()
|
||||
.rev()
|
||||
.find(|v| !matches!(&v.state, ObjectVersionState::Aborted))
|
||||
.or_else(|| object.versions().iter().rev().next());
|
||||
let deleted_version = match deleted_version {
|
||||
Some(dv) => dv.uuid,
|
||||
None => {
|
||||
warn!("Object has no versions: {:?}", object);
|
||||
Uuid::from([0u8; 32])
|
||||
}
|
||||
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
|
||||
}
|
||||
|
||||
let deleted_version = version_to_delete.ok_or(Error::NoSuchKey)?;
|
||||
|
||||
let version_uuid = gen_uuid();
|
||||
};
|
||||
|
||||
let object = Object::new(
|
||||
bucket_id,
|
||||
key.into(),
|
||||
vec![ObjectVersion {
|
||||
uuid: version_uuid,
|
||||
timestamp,
|
||||
uuid: del_uuid,
|
||||
timestamp: del_timestamp,
|
||||
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
|
||||
}],
|
||||
);
|
||||
|
||||
garage.object_table.insert(&object).await?;
|
||||
|
||||
Ok((deleted_version, version_uuid))
|
||||
Ok((deleted_version, del_uuid))
|
||||
}
|
||||
|
||||
pub async fn handle_delete(
|
||||
|
|
|
@ -9,7 +9,6 @@ use md5::{Digest as Md5Digest, Md5};
|
|||
use garage_table::*;
|
||||
use garage_util::async_hash::*;
|
||||
use garage_util::data::*;
|
||||
use garage_util::time::*;
|
||||
|
||||
use garage_model::bucket_table::Bucket;
|
||||
use garage_model::garage::Garage;
|
||||
|
@ -35,7 +34,7 @@ pub async fn handle_create_multipart_upload(
|
|||
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
|
||||
|
||||
let upload_id = gen_uuid();
|
||||
let timestamp = next_timestamp(&existing_object);
|
||||
let timestamp = next_timestamp(existing_object.as_ref());
|
||||
|
||||
let headers = get_headers(req.headers())?;
|
||||
|
||||
|
|
|
@ -86,7 +86,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
|
||||
// Generate identity of new version
|
||||
let version_uuid = gen_uuid();
|
||||
let version_timestamp = next_timestamp(&existing_object);
|
||||
let version_timestamp = next_timestamp(existing_object.as_ref());
|
||||
|
||||
// If body is small enough, store it directly in the object table
|
||||
// as "inline data". We can then return immediately.
|
||||
|
@ -529,7 +529,7 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
|
|||
})
|
||||
}
|
||||
|
||||
pub(crate) fn next_timestamp(existing_object: &Option<Object>) -> u64 {
|
||||
pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
|
||||
existing_object
|
||||
.as_ref()
|
||||
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
||||
|
|
Loading…
Reference in a new issue