forked from Deuxfleurs/garage
Compare commits
2 commits
main
...
increasing
Author | SHA1 | Date | |
---|---|---|---|
cb5199aed0 | |||
32d5520e41 |
1 changed files with 29 additions and 14 deletions
|
@ -3,6 +3,7 @@ use std::sync::Arc;
|
|||
|
||||
use base64::prelude::*;
|
||||
use futures::prelude::*;
|
||||
use futures::try_join;
|
||||
use hyper::body::{Body, Bytes};
|
||||
use hyper::header::{HeaderMap, HeaderValue};
|
||||
use hyper::{Request, Response};
|
||||
|
@ -37,7 +38,7 @@ pub async fn handle_put(
|
|||
garage: Arc<Garage>,
|
||||
req: Request<Body>,
|
||||
bucket: &Bucket,
|
||||
key: &str,
|
||||
key: &String,
|
||||
content_sha256: Option<Hash>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
// Retrieve interesting headers from request
|
||||
|
@ -70,16 +71,28 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
headers: ObjectVersionHeaders,
|
||||
body: S,
|
||||
bucket: &Bucket,
|
||||
key: &str,
|
||||
key: &String,
|
||||
content_md5: Option<String>,
|
||||
content_sha256: Option<FixedBytes32>,
|
||||
) -> Result<(Uuid, String), Error> {
|
||||
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||
let (first_block_opt, existing_object) = try_join!(
|
||||
chunker.next(),
|
||||
garage
|
||||
.object_table
|
||||
.get(&bucket.id, key)
|
||||
.map_err(Error::from),
|
||||
)?;
|
||||
|
||||
let first_block = first_block_opt.unwrap_or_default();
|
||||
|
||||
// Generate identity of new version
|
||||
let version_uuid = gen_uuid();
|
||||
let version_timestamp = now_msec();
|
||||
|
||||
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||
let first_block = chunker.next().await?.unwrap_or_default();
|
||||
let version_timestamp = existing_object
|
||||
.as_ref()
|
||||
.and_then(|obj| obj.versions().iter().map(|v| v.timestamp).max())
|
||||
.map(|t| std::cmp::max(t + 1, now_msec()))
|
||||
.unwrap_or_else(now_msec);
|
||||
|
||||
// If body is small enough, store it directly in the object table
|
||||
// as "inline data". We can then return immediately.
|
||||
|
@ -99,7 +112,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
content_sha256,
|
||||
)?;
|
||||
|
||||
check_quotas(&garage, bucket, key, size).await?;
|
||||
check_quotas(&garage, bucket, key, size, existing_object.as_ref()).await?;
|
||||
|
||||
let object_version = ObjectVersion {
|
||||
uuid: version_uuid,
|
||||
|
@ -168,7 +181,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
content_sha256,
|
||||
)?;
|
||||
|
||||
check_quotas(&garage, bucket, key, total_size).await?;
|
||||
check_quotas(&garage, bucket, key, total_size, existing_object.as_ref()).await?;
|
||||
|
||||
// Save final object state, marked as Complete
|
||||
let md5sum_hex = hex::encode(data_md5sum);
|
||||
|
@ -223,6 +236,7 @@ async fn check_quotas(
|
|||
bucket: &Bucket,
|
||||
key: &str,
|
||||
size: u64,
|
||||
prev_object: Option<&Object>,
|
||||
) -> Result<(), Error> {
|
||||
let quotas = bucket.state.as_option().unwrap().quotas.get();
|
||||
if quotas.max_objects.is_none() && quotas.max_size.is_none() {
|
||||
|
@ -230,10 +244,11 @@ async fn check_quotas(
|
|||
};
|
||||
|
||||
let key = key.to_string();
|
||||
let (prev_object, counters) = futures::try_join!(
|
||||
garage.object_table.get(&bucket.id, &key),
|
||||
garage.object_counter_table.table.get(&bucket.id, &EmptyKey),
|
||||
)?;
|
||||
let counters = garage
|
||||
.object_counter_table
|
||||
.table
|
||||
.get(&bucket.id, &EmptyKey)
|
||||
.await?;
|
||||
|
||||
let counters = counters
|
||||
.map(|x| x.filtered_values(&garage.system.ring.borrow()))
|
||||
|
@ -267,7 +282,7 @@ async fn check_quotas(
|
|||
if cnt_size_diff > 0 && current_size + cnt_size_diff > ms as i64 {
|
||||
return Err(Error::forbidden(format!(
|
||||
"Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes.",
|
||||
ms, current_size, size
|
||||
ms, current_size, cnt_size_diff
|
||||
)));
|
||||
}
|
||||
}
|
||||
|
@ -705,7 +720,7 @@ pub async fn handle_complete_multipart_upload(
|
|||
// Calculate total size of final object
|
||||
let total_size = version.blocks.items().iter().map(|x| x.1.size).sum();
|
||||
|
||||
if let Err(e) = check_quotas(&garage, bucket, &key, total_size).await {
|
||||
if let Err(e) = check_quotas(&garage, bucket, &key, total_size, Some(&object)).await {
|
||||
object_version.state = ObjectVersionState::Aborted;
|
||||
let final_object = Object::new(bucket.id, key.clone(), vec![object_version]);
|
||||
garage.object_table.insert(&final_object).await?;
|
||||
|
|
Loading…
Reference in a new issue