Ensure increasing version timestamps when writing new object versions #543
3 changed files with 58 additions and 42 deletions
|
@ -3,12 +3,12 @@ use std::sync::Arc;
|
||||||
use hyper::{Body, Request, Response, StatusCode};
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::time::*;
|
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::s3::object_table::*;
|
use garage_model::s3::object_table::*;
|
||||||
|
|
||||||
use crate::s3::error::*;
|
use crate::s3::error::*;
|
||||||
|
use crate::s3::put::next_timestamp;
|
||||||
use crate::s3::xml as s3_xml;
|
use crate::s3::xml as s3_xml;
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::verify_signed_content;
|
||||||
|
|
||||||
|
@ -23,40 +23,36 @@ async fn handle_delete_internal(
|
||||||
.await?
|
.await?
|
||||||
.ok_or(Error::NoSuchKey)?; // No need to delete
|
.ok_or(Error::NoSuchKey)?; // No need to delete
|
||||||
|
|
||||||
let interesting_versions = object.versions().iter().filter(|v| {
|
let del_timestamp = next_timestamp(Some(&object));
|
||||||
!matches!(
|
let del_uuid = gen_uuid();
|
||||||
v.state,
|
|
||||||
ObjectVersionState::Aborted
|
|
||||||
| ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
|
|
||||||
)
|
|
||||||
});
|
|
||||||
|
|
||||||
let mut version_to_delete = None;
|
let deleted_version = object
|
||||||
let mut timestamp = now_msec();
|
.versions()
|
||||||
for v in interesting_versions {
|
.iter()
|
||||||
if v.timestamp + 1 > timestamp || version_to_delete.is_none() {
|
.rev()
|
||||||
version_to_delete = Some(v.uuid);
|
.find(|v| !matches!(&v.state, ObjectVersionState::Aborted))
|
||||||
|
.or_else(|| object.versions().iter().rev().next());
|
||||||
|
let deleted_version = match deleted_version {
|
||||||
|
Some(dv) => dv.uuid,
|
||||||
|
None => {
|
||||||
|
warn!("Object has no versions: {:?}", object);
|
||||||
|
Uuid::from([0u8; 32])
|
||||||
}
|
}
|
||||||
timestamp = std::cmp::max(timestamp, v.timestamp + 1);
|
};
|
||||||
}
|
|
||||||
|
|
||||||
let deleted_version = version_to_delete.ok_or(Error::NoSuchKey)?;
|
|
||||||
|
|
||||||
let version_uuid = gen_uuid();
|
|
||||||
|
|
||||||
let object = Object::new(
|
let object = Object::new(
|
||||||
bucket_id,
|
bucket_id,
|
||||||
key.into(),
|
key.into(),
|
||||||
vec![ObjectVersion {
|
vec![ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: del_uuid,
|
||||||
timestamp,
|
timestamp: del_timestamp,
|
||||||
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
|
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
|
||||||
}],
|
}],
|
||||||
);
|
);
|
||||||
|
|
||||||
garage.object_table.insert(&object).await?;
|
garage.object_table.insert(&object).await?;
|
||||||
|
|
||||||
Ok((deleted_version, version_uuid))
|
Ok((deleted_version, del_uuid))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_delete(
|
pub async fn handle_delete(
|
||||||
|
|
|
@ -9,7 +9,6 @@ use md5::{Digest as Md5Digest, Md5};
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::async_hash::*;
|
use garage_util::async_hash::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::time::*;
|
|
||||||
|
|
||||||
use garage_model::bucket_table::Bucket;
|
use garage_model::bucket_table::Bucket;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
@ -30,10 +29,13 @@ pub async fn handle_create_multipart_upload(
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
bucket_name: &str,
|
bucket_name: &str,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &String,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
|
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
|
||||||
|
|
||||||
let upload_id = gen_uuid();
|
let upload_id = gen_uuid();
|
||||||
let timestamp = now_msec();
|
let timestamp = next_timestamp(existing_object.as_ref());
|
||||||
|
|
||||||
let headers = get_headers(req.headers())?;
|
let headers = get_headers(req.headers())?;
|
||||||
|
|
||||||
// Create object in object table
|
// Create object in object table
|
||||||
|
@ -233,7 +235,8 @@ pub async fn handle_complete_multipart_upload(
|
||||||
|
|
||||||
// Get object and multipart upload
|
// Get object and multipart upload
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?;
|
let (object, mut object_version, mpu) =
|
||||||
|
get_upload(&garage, &bucket.id, &key, &upload_id).await?;
|
||||||
|
|
||||||
if mpu.parts.is_empty() {
|
if mpu.parts.is_empty() {
|
||||||
return Err(Error::bad_request("No data was uploaded"));
|
return Err(Error::bad_request("No data was uploaded"));
|
||||||
|
@ -331,7 +334,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
// Calculate total size of final object
|
// Calculate total size of final object
|
||||||
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
||||||
|
|
||||||
if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
|
if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
|
||||||
object_version.state = ObjectVersionState::Aborted;
|
object_version.state = ObjectVersionState::Aborted;
|
||||||
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use base64::prelude::*;
|
use base64::prelude::*;
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
use futures::try_join;
|
||||||
use hyper::body::{Body, Bytes};
|
use hyper::body::{Body, Bytes};
|
||||||
use hyper::header::{HeaderMap, HeaderValue};
|
use hyper::header::{HeaderMap, HeaderValue};
|
||||||
use hyper::{Request, Response};
|
use hyper::{Request, Response};
|
||||||
|
@ -35,7 +36,7 @@ pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
bucket: &Bucket,
|
bucket: &Bucket,
|
||||||
key: &str,
|
key: &String,
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
// Retrieve interesting headers from request
|
// Retrieve interesting headers from request
|
||||||
|
@ -68,16 +69,24 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
headers: ObjectVersionHeaders,
|
headers: ObjectVersionHeaders,
|
||||||
body: S,
|
body: S,
|
||||||
bucket: &Bucket,
|
bucket: &Bucket,
|
||||||
key: &str,
|
key: &String,
|
||||||
content_md5: Option<String>,
|
content_md5: Option<String>,
|
||||||
content_sha256: Option<FixedBytes32>,
|
content_sha256: Option<FixedBytes32>,
|
||||||
) -> Result<(Uuid, String), Error> {
|
) -> Result<(Uuid, String), Error> {
|
||||||
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
|
let (first_block_opt, existing_object) = try_join!(
|
||||||
|
chunker.next(),
|
||||||
|
garage
|
||||||
|
.object_table
|
||||||
|
.get(&bucket.id, key)
|
||||||
|
.map_err(Error::from),
|
||||||
|
)?;
|
||||||
|
|
||||||
|
let first_block = first_block_opt.unwrap_or_default();
|
||||||
|
|
||||||
// Generate identity of new version
|
// Generate identity of new version
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let version_timestamp = now_msec();
|
let version_timestamp = next_timestamp(existing_object.as_ref());
|
||||||
|
|
||||||
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
|
||||||
let first_block = chunker.next().await?.unwrap_or_default();
|
|
||||||
|
|
||||||
// If body is small enough, store it directly in the object table
|
// If body is small enough, store it directly in the object table
|
||||||
// as "inline data". We can then return immediately.
|
// as "inline data". We can then return immediately.
|
||||||
|
@ -97,7 +106,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, size).await?;
|
check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
let object_version = ObjectVersion {
|
let object_version = ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: version_uuid,
|
||||||
|
@ -176,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, total_size).await?;
|
check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
// Save final object state, marked as Complete
|
// Save final object state, marked as Complete
|
||||||
let md5sum_hex = hex::encode(data_md5sum);
|
let md5sum_hex = hex::encode(data_md5sum);
|
||||||
|
@ -229,19 +238,19 @@ pub(crate) fn ensure_checksum_matches(
|
||||||
pub(crate) async fn check_quotas(
|
pub(crate) async fn check_quotas(
|
||||||
garage: &Arc<Garage>,
|
garage: &Arc<Garage>,
|
||||||
bucket: &Bucket,
|
bucket: &Bucket,
|
||||||
key: &str,
|
|
||||||
size: u64,
|
size: u64,
|
||||||
|
prev_object: Option<&Object>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let quotas = bucket.state.as_option().unwrap().quotas.get();
|
let quotas = bucket.state.as_option().unwrap().quotas.get();
|
||||||
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let key = key.to_string();
|
let counters = garage
|
||||||
let (prev_object, counters) = futures::try_join!(
|
.object_counter_table
|
||||||
garage.object_table.get(&bucket.id, &key),
|
.table
|
||||||
garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
|
.get(&bucket.id, &EmptyKey)
|
||||||
)?;
|
.await?;
|
||||||
|
|
||||||
let counters = counters
|
let counters = counters
|
||||||
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
||||||
|
@ -275,7 +284,7 @@ pub(crate) async fn check_quotas(
|
||||||
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
|
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
|
||||||
return Err(Error::forbidden(format!(
|
return Err(Error::forbidden(format!(
|
||||||
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
|
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
|
||||||
ms, current_size, size
|
ms, current_size, cnt_size_diff
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -519,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
|
||||||
other,
|
other,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
|
||||||
|
existing_object
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
||||||
|
.map(|t| std::cmp::max(t + 1, now_msec()))
|
||||||
|
.unwrap_or_else(now_msec)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue