forked from Deuxfleurs/garage
check_quotas: avoid re-fetching object from object table
This commit is contained in:
parent
45b0453d0f
commit
3d6ed63824
2 changed files with 12 additions and 10 deletions
|
@ -233,7 +233,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
|
|
||||||
// Get object and multipart upload
|
// Get object and multipart upload
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let (_, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?;
|
let (object, mut object_version, mpu) = get_upload(&garage, &bucket.id, &key, &upload_id).await?;
|
||||||
|
|
||||||
if mpu.parts.is_empty() {
|
if mpu.parts.is_empty() {
|
||||||
return Err(Error::bad_request("No data was uploaded"));
|
return Err(Error::bad_request("No data was uploaded"));
|
||||||
|
@ -331,7 +331,7 @@ pub async fn handle_complete_multipart_upload(
|
||||||
// Calculate total size of final object
|
// Calculate total size of final object
|
||||||
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
||||||
|
|
||||||
if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
|
if let Err(e) = check_quotas(&garage, bucket, &key, total_size, Some(&object)).await {
|
||||||
object_version.state = ObjectVersionState::Aborted;
|
object_version.state = ObjectVersionState::Aborted;
|
||||||
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
||||||
garage.object_table.insert(&final_object).await?;
|
garage.object_table.insert(&final_object).await?;
|
||||||
|
|
|
@ -87,6 +87,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
// Generate identity of new version
|
// Generate identity of new version
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let version_timestamp = existing_object
|
let version_timestamp = existing_object
|
||||||
|
.as_ref()
|
||||||
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
||||||
.map(|t| std::cmp::max(t + 1, now_msec()))
|
.map(|t| std::cmp::max(t + 1, now_msec()))
|
||||||
.unwrap_or_else(now_msec);
|
.unwrap_or_else(now_msec);
|
||||||
|
@ -109,7 +110,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, size).await?;
|
check_quotas(&garage, bucket, key, size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
let object_version = ObjectVersion {
|
let object_version = ObjectVersion {
|
||||||
uuid: version_uuid,
|
uuid: version_uuid,
|
||||||
|
@ -188,7 +189,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
content_sha256,
|
content_sha256,
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
check_quotas(&garage, bucket, key, total_size).await?;
|
check_quotas(&garage, bucket, key, total_size, existing_object.as_ref()).await?;
|
||||||
|
|
||||||
// Save final object state, marked as Complete
|
// Save final object state, marked as Complete
|
||||||
let md5sum_hex = hex::encode(data_md5sum);
|
let md5sum_hex = hex::encode(data_md5sum);
|
||||||
|
@ -243,17 +244,18 @@ pub(crate) async fn check_quotas(
|
||||||
bucket: &Bucket,
|
bucket: &Bucket,
|
||||||
key: &str,
|
key: &str,
|
||||||
size: u64,
|
size: u64,
|
||||||
|
prev_object: Option<&Object>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let quotas = bucket.state.as_option().unwrap().quotas.get();
|
let quotas = bucket.state.as_option().unwrap().quotas.get();
|
||||||
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
|
|
||||||
let key = key.to_string();
|
let counters = garage
|
||||||
let (prev_object, counters) = futures::try_join!(
|
.object_counter_table
|
||||||
garage.object_table.get(&bucket.id, &key),
|
.table
|
||||||
garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
|
.get(&bucket.id, &EmptyKey)
|
||||||
)?;
|
.await?;
|
||||||
|
|
||||||
let counters = counters
|
let counters = counters
|
||||||
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
||||||
|
@ -287,7 +289,7 @@ pub(crate) async fn check_quotas(
|
||||||
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
|
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
|
||||||
return Err(Error::forbidden(format!(
|
return Err(Error::forbidden(format!(
|
||||||
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
|
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
|
||||||
ms, current_size, size
|
ms, current_size, cnt_size_diff
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue