check_quotas: avoid re-fetching object from object table
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2023-06-13 15:12:39 +02:00
parent 32d5520e41
commit cb5199aed0

View file

@ -89,6 +89,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Generate identity of new version // Generate identity of new version
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let version_timestamp = existing_object let version_timestamp = existing_object
.as_ref()
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max()) .and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
.map(|t| std::cmp::max(t + 1, now_msec())) .map(|t| std::cmp::max(t + 1, now_msec()))
.unwrap_or_else(now_msec); .unwrap_or_else(now_msec);
@ -111,7 +112,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256, content_sha256,
)?; )?;
check_quotas(&garage, bucket, key, size).await?; check_quotas(&garage, bucket, key, size, existing_object.as_ref()).await?;
let object_version = ObjectVersion { let object_version = ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
@ -180,7 +181,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
content_sha256, content_sha256,
)?; )?;
check_quotas(&garage, bucket, key, total_size).await?; check_quotas(&garage, bucket, key, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete // Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum); let md5sum_hex = hex::encode(data_md5sum);
@ -235,6 +236,7 @@ async fn check_quotas(
bucket: &Bucket, bucket: &Bucket,
key: &str, key: &str,
size: u64, size: u64,
prev_object: Option<&Object>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let quotas = bucket.state.as_option().unwrap().quotas.get(); let quotas = bucket.state.as_option().unwrap().quotas.get();
if quotas.max_objects.is_none() && quotas.max_size.is_none() { if quotas.max_objects.is_none() && quotas.max_size.is_none() {
@ -242,10 +244,11 @@ async fn check_quotas(
}; };
let key = key.to_string(); let key = key.to_string();
let (prev_object, counters) = futures::try_join!( let counters = garage
garage.object_table.get(&bucket.id, &key), .object_counter_table
garage.object_counter_table.table.get(&bucket.id, &EmptyKey), .table
)?; .get(&bucket.id, &EmptyKey)
.await?;
let counters = counters let counters = counters
.map(|x| x.filtered_values(&garage.system.ring.borrow())) .map(|x| x.filtered_values(&garage.system.ring.borrow()))
@ -279,7 +282,7 @@ async fn check_quotas(
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 { if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
return Err(Error::forbidden(format!( return Err(Error::forbidden(format!(
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.", "Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
ms, current_size, size ms, current_size, cnt_size_diff
))); )));
} }
} }
@ -717,7 +720,7 @@ pub async fn handle_complete_multipart_upload(
// Calculate total size of final object // Calculate total size of final object
let total_size = version.blocks.items().iter().map(|x| x.1.size).sum(); let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await { if let Err(e) = check_quotas(&garage, bucket, &key, total_size, Some(&object)).await {
object_version.state = ObjectVersionState::Aborted; object_version.state = ObjectVersionState::Aborted;
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
garage.object_table.insert(&final_object).await?; garage.object_table.insert(&final_object).await?;