Merge pull request 'Ensure increasing version timestamps when writing new object versions' (#543) from increasing-timestamps into main
All checks were successful
continuous-integration/drone/push Build is passing

Reviewed-on: #543
This commit is contained in:
Alex 2023-10-24 10:07:16 +00:00
commit 75d5d08ee1
3 changed files with 58 additions and 42 deletions

View file

@ -3,12 +3,12 @@ use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response, StatusCode};
use garage_util::data::*; use garage_util::data::*;
use garage_util::time::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::s3::object_table::*; use garage_model::s3::object_table::*;
use crate::s3::error::*; use crate::s3::error::*;
use crate::s3::put::next_timestamp;
use crate::s3::xml as s3_xml; use crate::s3::xml as s3_xml;
use crate::signature::verify_signed_content; use crate::signature::verify_signed_content;
@ -23,40 +23,36 @@ async fn handle_delete_internal(
.await? .await?
.ok_or(Error::NoSuchKey)?; // No need to delete .ok_or(Error::NoSuchKey)?; // No need to delete
let interesting_versions = object.versions().iter().filter(|v| { let del_timestamp = next_timestamp(Some(&object));
!matches!( let del_uuid = gen_uuid();
v.state,
ObjectVersionState::Aborted
| ObjectVersionState::Complete(ObjectVersionData::DeleteMarker)
)
});
let mut version_to_delete = None; let deleted_version = object
let mut timestamp = now_msec(); .versions()
for v in interesting_versions { .iter()
if v.timestamp + 1 > timestamp || version_to_delete.is_none() { .rev()
version_to_delete = Some(v.uuid); .find(|v| !matches!(&v.state, ObjectVersionState::Aborted))
.or_else(|| object.versions().iter().rev().next());
let deleted_version = match deleted_version {
Some(dv) => dv.uuid,
None => {
warn!("Object has no versions: {:?}", object);
Uuid::from([0u8; 32])
} }
timestamp = std::cmp::max(timestamp, v.timestamp + 1); };
}
let deleted_version = version_to_delete.ok_or(Error::NoSuchKey)?;
let version_uuid = gen_uuid();
let object = Object::new( let object = Object::new(
bucket_id, bucket_id,
key.into(), key.into(),
vec![ObjectVersion { vec![ObjectVersion {
uuid: version_uuid, uuid: del_uuid,
timestamp, timestamp: del_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}], }],
); );
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
Ok((deleted_version, version_uuid)) Ok((deleted_version, del_uuid))
} }
pub async fn handle_delete( pub async fn handle_delete(

View file

@ -9,7 +9,6 @@ use md5::{Digest as Md5Digest, Md5};
use garage_table::*; use garage_table::*;
use garage_util::async_hash::*; use garage_util::async_hash::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::time::*;
use garage_model::bucket_table::Bucket; use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -30,10 +29,13 @@ pub async fn handle_create_multipart_upload(
req: &Request<Body>, req: &Request<Body>,
bucket_name: &str, bucket_name: &str,
bucket_id: Uuid, bucket_id: Uuid,
key: &str, key: &String,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
let upload_id = gen_uuid(); let upload_id = gen_uuid();
let timestamp = now_msec(); let timestamp = next_timestamp(existing_object.as_ref());
let headers = get_headers(req.headers())?; let headers = get_headers(req.headers())?;
// Create object in object table // Create object in object table
@ -233,7 +235,8 @@ pub async fn handle_complete_multipart_upload(
// Get object and multipart upload // Get object and multipart upload
let key = key.to_string(); let key = key.to_string();
let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?; let (object, mut object_version, mpu) =
get_upload(&garage, &bucket.id, &key, &upload_id).await?;
if mpu.parts.is_empty() { if mpu.parts.is_empty() {
return Err(Error::bad_request("No data was uploaded")); return Err(Error::bad_request("No data was uploaded"));
@ -331,7 +334,7 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object // Calculate total size of final object
let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted; object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?; garage.object_table.insert(&final_object).await?;

View file

@ -3,6 +3,7 @@ use std::sync::Arc;
use base64::prelude::*; use base64::prelude::*;
use futures::prelude::*; use futures::prelude::*;
use futures::try_join;
use hyper::body::{Body, Bytes}; use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue}; use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response}; use hyper::{Request, Response};
@ -35,7 +36,7 @@ pub async fn handle_put(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<Body>,
bucket: &Bucket, bucket: &Bucket,
key: &str, key: &String,
content_sha256: Option<Hash>, content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
// Retrieve interesting headers from request // Retrieve interesting headers from request
@ -68,16 +69,24 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
headers: ObjectVersionHeaders, headers: ObjectVersionHeaders,
body: S, body: S,
bucket: &Bucket, bucket: &Bucket,
key: &str, key: &String,
content_md5: Option<String>, content_md5: Option<String>,
content_sha256: Option<FixedBytes32>, content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> { ) -> Result<(Uuid, String), Error> {
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (first_block_opt, existing_object) = try_join!(
chunker.next(),
garage
.object_table
.get(&bucket.id, key)
.map_err(Error::from),
)?;
let first_block = first_block_opt.unwrap_or_default();
// Generate identity of new version // Generate identity of new version
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let version_timestamp = now_msec(); let version_timestamp = next_timestamp(existing_object.as_ref());
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
// If body is small enough, store it directly in the object table // If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately. // as "inline data". We can then return immediately.
@ -97,7 +106,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256, content_sha256,
)?; )?;
check_quotas(&garage, bucket, key, size).await?; check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion { let object_version = ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
@ -176,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256, content_sha256,
)?; )?;
check_quotas(&garage, bucket, key, total_size).await?; check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete // Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum); let md5sum_hex = hex::encode(data_md5sum);
@ -229,19 +238,19 @@ pub(crate) fn ensure_checksum_matches(
pub(crate) async fn check_quotas( pub(crate) async fn check_quotas(
garage: &Arc<Garage>, garage: &Arc<Garage>,
bucket: &Bucket, bucket: &Bucket,
key: &str,
size: u64, size: u64,
prev_object: Option<&Object>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let quotas = bucket.state.as_option().unwrap().quotas.get(); let quotas = bucket.state.as_option().unwrap().quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() { if quotas.max_objects.is_none() && quotas.max_size.is_none() {
return Ok(()); return Ok(());
}; };
let key = key.to_string(); let counters = garage
let (prev_object, counters) = futures::try_join!( .object_counter_table
garage.object_table.get(&bucket.id, &key), .table
garage.object_counter_table.table.get(&bucket.id, &EmptyKey), .get(&bucket.id, &EmptyKey)
)?; .await?;
let counters = counters let counters = counters
.map(|x| x.filtered_values(&garage.system.ring.borrow())) .map(|x| x.filtered_values(&garage.system.ring.borrow()))
@ -275,7 +284,7 @@ pub(crate) async fn check_quotas(
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 { if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
return Err(Error::forbidden(format!( return Err(Error::forbidden(format!(
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
ms, current_size, size ms, current_size, cnt_size_diff
))); )));
} }
} }
@ -519,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
other, other,
}) })
} }
pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
existing_object
.as_ref()
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
.map(|t| std::cmp::max(t + 1, now_msec()))
.unwrap_or_else(now_msec)
}