Compare commits
2 commits
d146cdd5b6
...
8686cfd0b1
Author | SHA1 | Date | |
---|---|---|---|
8686cfd0b1 | |||
c6cde1f143 |
2 changed files with 17 additions and 11 deletions
|
@ -30,10 +30,13 @@ pub async fn handle_create_multipart_upload(
|
||||||
req: &Request<Body>,
|
req: &Request<Body>,
|
||||||
bucket_name: &str,
|
bucket_name: &str,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &String,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
|
let existing_object = garage.object_table.get(&bucket_id, &key).await?;
|
||||||
|
|
||||||
let upload_id = gen_uuid();
|
let upload_id = gen_uuid();
|
||||||
let timestamp = now_msec();
|
let timestamp = next_timestamp(&existing_object);
|
||||||
|
|
||||||
let headers = get_headers(req.headers())?;
|
let headers = get_headers(req.headers())?;
|
||||||
|
|
||||||
// Create object in object table
|
// Create object in object table
|
||||||
|
@ -332,7 +335,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
// Calculate total size of final object
|
// Calculate total size of final object
|
||||||
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
||||||
|
|
||||||
if let Err(e) = check_quotas(&garage, bucket, &key, total_size, Some(&object)).await {
|
if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await {
|
||||||
object_version.state = ObjectVersionState::Aborted;
|
object_version.state = ObjectVersionState::Aborted;
|
||||||
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
|
@ -86,11 +86,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
|
|
||||||
// Generate identity of new version
|
// Generate identity of new version
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let version_timestamp = existing_object
|
let version_timestamp = next_timestamp(&existing_object);
|
||||||
.as_ref()
|
|
||||||
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
|
||||||
.map(|t| std::cmp::max(t + 1, now_msec()))
|
|
||||||
.unwrap_or_else(now_msec);
|
|
||||||
|
|
||||||
// If body is small enough, store it directly in the object table
|
// If body is small enough, store it directly in the object table
|
||||||
// as "inline data". We can then return immediately.
|
// as "inline data". We can then return immediately.
|
||||||
|
@ -110,7 +106,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, size, existing_object.as_ref()).await?;
|
check_quotas(&garage, bucket, size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
let object_version = ObjectVersion {
|
let object_version = ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: version_uuid,
|
||||||
|
@ -189,7 +185,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, total_size, existing_object.as_ref()).await?;
|
check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
// Save final object state, marked as Complete
|
// Save final object state, marked as Complete
|
||||||
let md5sum_hex = hex::encode(data_md5sum);
|
let md5sum_hex = hex::encode(data_md5sum);
|
||||||
|
@ -242,7 +238,6 @@ pub(crate) fn ensure_checksum_matches(
|
||||||
pub(crate) async fn check_quotas(
|
pub(crate) async fn check_quotas(
|
||||||
garage: &Arc<Garage>,
|
garage: &Arc<Garage>,
|
||||||
bucket: &Bucket,
|
bucket: &Bucket,
|
||||||
key: &str,
|
|
||||||
size: u64,
|
size: u64,
|
||||||
prev_object: Option<&Object>,
|
prev_object: Option<&Object>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
|
@ -533,3 +528,11 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
|
||||||
other,
|
other,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) fn next_timestamp(existing_object: &Option<Object>) -> u64 {
|
||||||
|
existing_object
|
||||||
|
.as_ref()
|
||||||
|
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
||||||
|
.map(|t| std::cmp::max(t + 1, now_msec()))
|
||||||
|
.unwrap_or_else(now_msec)
|
||||||
|
}
|
||||||
|
|
Loading…
Add table
Reference in a new issue