From fb55682c66092921f766f82c16eb9e046f1bbb41 Mon Sep 17 00:00:00 2001 From: Yureka Date: Sun, 3 Mar 2024 14:56:52 +0100 Subject: [PATCH] add request context helper --- src/api/helpers.rs | 14 +++ src/api/k2v/api_server.rs | 45 ++++---- src/api/k2v/batch.rs | 51 ++++----- src/api/k2v/index.rs | 10 +- src/api/k2v/item.rs | 38 ++++--- src/api/s3/api_server.rs | 212 +++++++++++++++----------------------- src/api/s3/bucket.rs | 48 ++++----- src/api/s3/copy.rs | 35 ++++--- src/api/s3/cors.rs | 65 ++++++------ src/api/s3/delete.rs | 29 ++---- src/api/s3/get.rs | 21 ++++ src/api/s3/lifecycle.rs | 53 +++++----- src/api/s3/list.rs | 14 +-- src/api/s3/multipart.rs | 68 ++++++------ src/api/s3/post_object.rs | 28 +++-- src/api/s3/put.rs | 68 ++++++------ src/api/s3/website.rs | 54 +++++----- src/model/bucket_table.rs | 7 ++ src/web/web_server.rs | 29 +++--- 19 files changed, 458 insertions(+), 431 deletions(-) diff --git a/src/api/helpers.rs b/src/api/helpers.rs index 5f488912..cf60005d 100644 --- a/src/api/helpers.rs +++ b/src/api/helpers.rs @@ -1,4 +1,5 @@ use std::convert::Infallible; +use std::sync::Arc; use futures::{Stream, StreamExt, TryStreamExt}; @@ -10,6 +11,10 @@ use hyper::{ use idna::domain_to_unicode; use serde::{Deserialize, Serialize}; +use garage_model::bucket_table::BucketParams; +use garage_model::garage::Garage; +use garage_model::key_table::Key; +use garage_util::data::Uuid; use garage_util::error::Error as GarageError; use crate::common_error::{CommonError as Error, *}; @@ -27,6 +32,15 @@ pub enum Authorization { Owner, } +/// The values which are known for each request related to a bucket +pub struct ReqCtx { + pub garage: Arc, + pub bucket_id: Uuid, + pub bucket_name: String, + pub bucket_params: BucketParams, + pub api_key: Key, +} + /// Host to bucket /// /// Convert a host, like "bucket.garage-site.tld" to the corresponding bucket "bucket", diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index fdb5db4c..658cfcc8 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -95,6 +95,7 @@ impl ApiHandler for K2VApiServer { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -112,40 +113,42 @@ impl ApiHandler for K2VApiServer { // are always preflighted, i.e. the browser should make // an OPTIONS call before to check it is allowed let matching_cors_rule = match *req.method() { - Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req) - .ok_or_internal_error("Error looking up CORS rule")?, + Method::GET | Method::HEAD | Method::POST => { + find_matching_cors_rule(&bucket_params, &req) + .ok_or_internal_error("Error looking up CORS rule")? + .cloned() + } _ => None, }; + let ctx = ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; + let resp = match endpoint { Endpoint::DeleteItem { partition_key, sort_key, - } => handle_delete_item(garage, req, bucket_id, &partition_key, &sort_key).await, + } => handle_delete_item(ctx, req, &partition_key, &sort_key).await, Endpoint::InsertItem { partition_key, sort_key, - } => handle_insert_item(garage, req, bucket_id, &partition_key, &sort_key).await, + } => handle_insert_item(ctx, req, &partition_key, &sort_key).await, Endpoint::ReadItem { partition_key, sort_key, - } => handle_read_item(garage, &req, bucket_id, &partition_key, &sort_key).await, + } => handle_read_item(ctx, &req, &partition_key, &sort_key).await, Endpoint::PollItem { partition_key, sort_key, causality_token, timeout, } => { - handle_poll_item( - garage, - &req, - bucket_id, - partition_key, - sort_key, - causality_token, - timeout, - ) - .await + handle_poll_item(ctx, &req, partition_key, sort_key, causality_token, timeout).await } Endpoint::ReadIndex { prefix, @@ -153,12 +156,12 @@ impl ApiHandler for K2VApiServer { end, limit, reverse, - } => handle_read_index(garage, bucket_id, prefix, start, end, limit, reverse).await, - Endpoint::InsertBatch {} => handle_insert_batch(garage, bucket_id, req).await, - Endpoint::ReadBatch {} => handle_read_batch(garage, bucket_id, req).await, - Endpoint::DeleteBatch {} => handle_delete_batch(garage, bucket_id, req).await, + } => handle_read_index(ctx, prefix, start, end, limit, reverse).await, + Endpoint::InsertBatch {} => handle_insert_batch(ctx, req).await, + Endpoint::ReadBatch {} => handle_read_batch(ctx, req).await, + Endpoint::DeleteBatch {} => handle_delete_batch(ctx, req).await, Endpoint::PollRange { partition_key } => { - handle_poll_range(garage, bucket_id, &partition_key, req).await + handle_poll_range(ctx, &partition_key, req).await } Endpoint::Options => unreachable!(), }; @@ -167,7 +170,7 @@ impl ApiHandler for K2VApiServer { // add the corresponding CORS headers to the response let mut resp_ok = resp?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) + add_cors_headers(&mut resp_ok, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/k2v/batch.rs b/src/api/k2v/batch.rs index ae2778b1..02b7ae8b 100644 --- a/src/api/k2v/batch.rs +++ b/src/api/k2v/batch.rs @@ -1,14 +1,9 @@ -use std::sync::Arc; - use base64::prelude::*; use hyper::{Request, Response, StatusCode}; use serde::{Deserialize, Serialize}; -use garage_util::data::*; - use garage_table::{EnumerationOrder, TableSchema}; -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -18,10 +13,12 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_insert_batch( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let items = parse_json_body::, _, Error>(req).await?; let mut items2 = vec![]; @@ -38,7 +35,7 @@ pub async fn handle_insert_batch( items2.push((it.pk, it.sk, ct, v)); } - garage.k2v.rpc.insert_batch(bucket_id, items2).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items2).await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -46,8 +43,7 @@ pub async fn handle_insert_batch( } pub async fn handle_read_batch( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, ) -> Result, Error> { let queries = parse_json_body::, _, Error>(req).await?; @@ -55,7 +51,7 @@ pub async fn handle_read_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_read_batch_query(&garage, bucket_id, q)), + .map(|q| handle_read_batch_query(&ctx, q)), ) .await; @@ -68,12 +64,15 @@ pub async fn handle_read_batch( } async fn handle_read_batch_query( - garage: &Arc, - bucket_id: Uuid, + ctx: &ReqCtx, query: ReadBatchQuery, ) -> Result { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -138,8 +137,7 @@ async fn handle_read_batch_query( } pub async fn handle_delete_batch( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, ) -> Result, Error> { let queries = parse_json_body::, _, Error>(req).await?; @@ -147,7 +145,7 @@ pub async fn handle_delete_batch( let resp_results = futures::future::join_all( queries .into_iter() - .map(|q| handle_delete_batch_query(&garage, bucket_id, q)), + .map(|q| handle_delete_batch_query(&ctx, q)), ) .await; @@ -160,12 +158,15 @@ pub async fn handle_delete_batch( } async fn handle_delete_batch_query( - garage: &Arc, - bucket_id: Uuid, + ctx: &ReqCtx, query: DeleteBatchQuery, ) -> Result { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let partition = K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: query.partition_key.clone(), }; @@ -195,7 +196,7 @@ async fn handle_delete_batch_query( .k2v .rpc .insert( - bucket_id, + *bucket_id, i.partition.partition_key, i.sort_key, Some(cc), @@ -235,7 +236,7 @@ async fn handle_delete_batch_query( .collect::>(); let n = items.len(); - garage.k2v.rpc.insert_batch(bucket_id, items).await?; + garage.k2v.rpc.insert_batch(*bucket_id, items).await?; n }; @@ -251,11 +252,13 @@ async fn handle_delete_batch_query( } pub(crate) async fn handle_poll_range( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, partition_key: &str, req: Request, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; use garage_model::k2v::sub::PollRange; let query = parse_json_body::(req).await?; diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs index 1baec1db..822bec44 100644 --- a/src/api/k2v/index.rs +++ b/src/api/k2v/index.rs @@ -3,12 +3,9 @@ use std::sync::Arc; use hyper::Response; use serde::Serialize; -use garage_util::data::*; - use garage_rpc::ring::Ring; use garage_table::util::*; -use garage_model::garage::Garage; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use crate::helpers::*; @@ -17,14 +14,17 @@ use crate::k2v::error::*; use crate::k2v::range::read_range; pub async fn handle_read_index( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, prefix: Option, start: Option, end: Option, limit: Option, reverse: Option, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let reverse = reverse.unwrap_or(false); let ring: Arc = garage.system.ring.borrow().clone(); diff --git a/src/api/k2v/item.rs b/src/api/k2v/item.rs index 0c5931a1..af3af4e4 100644 --- a/src/api/k2v/item.rs +++ b/src/api/k2v/item.rs @@ -1,13 +1,8 @@ -use std::sync::Arc; - use base64::prelude::*; use http::header; use hyper::{Request, Response, StatusCode}; -use garage_util::data::*; - -use garage_model::garage::Garage; use garage_model::k2v::causality::*; use garage_model::k2v::item_table::*; @@ -100,12 +95,15 @@ impl ReturnFormat { /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_read_item( - garage: Arc, + ctx: ReqCtx, req: &Request, - bucket_id: Uuid, partition_key: &str, sort_key: &String, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let format = ReturnFormat::from(req)?; let item = garage @@ -113,7 +111,7 @@ pub async fn handle_read_item( .item_table .get( &K2VItemPartition { - bucket_id, + bucket_id: *bucket_id, partition_key: partition_key.to_string(), }, sort_key, @@ -125,12 +123,14 @@ pub async fn handle_read_item( } pub async fn handle_insert_item( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -149,7 +149,7 @@ pub async fn handle_insert_item( .k2v .rpc .insert( - bucket_id, + *bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, @@ -163,12 +163,14 @@ pub async fn handle_insert_item( } pub async fn handle_delete_item( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket_id: Uuid, partition_key: &str, sort_key: &str, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let causal_context = req .headers() .get(X_GARAGE_CAUSALITY_TOKEN) @@ -183,7 +185,7 @@ pub async fn handle_delete_item( .k2v .rpc .insert( - bucket_id, + *bucket_id, partition_key.to_string(), sort_key.to_string(), causal_context, @@ -199,14 +201,16 @@ pub async fn handle_delete_item( /// Handle ReadItem request #[allow(clippy::ptr_arg)] pub async fn handle_poll_item( - garage: Arc, + ctx: ReqCtx, req: &Request, - bucket_id: Uuid, partition_key: String, sort_key: String, causality_token: String, timeout_secs: Option, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; let format = ReturnFormat::from(req)?; let causal_context = @@ -218,7 +222,7 @@ pub async fn handle_poll_item( .k2v .rpc .poll_item( - bucket_id, + *bucket_id, partition_key, sort_key, causal_context, diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 51f19554..1ed30996 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -155,6 +155,7 @@ impl ApiHandler for S3ApiServer { .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); let allowed = match endpoint.authorization_type() { Authorization::Read => api_key.allow_read(&bucket_id), @@ -167,12 +168,20 @@ impl ApiHandler for S3ApiServer { return Err(Error::forbidden("Operation is not allowed for this key.")); } - let matching_cors_rule = find_matching_cors_rule(&bucket, &req)?; + let matching_cors_rule = find_matching_cors_rule(&bucket_params, &req)?.cloned(); + + let ctx = ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; let resp = match endpoint { Endpoint::HeadObject { key, part_number, .. - } => handle_head(garage, &req, bucket_id, &key, part_number).await, + } => handle_head(ctx, &req, &key, part_number).await, Endpoint::GetObject { key, part_number, @@ -192,74 +201,37 @@ impl ApiHandler for S3ApiServer { response_content_type, response_expires, }; - handle_get(garage, &req, bucket_id, &key, part_number, overrides).await + handle_get(ctx, &req, &key, part_number, overrides).await } Endpoint::UploadPart { key, part_number, upload_id, - } => { - handle_put_part( - garage, - req, - bucket_id, - &key, - part_number, - &upload_id, - content_sha256, - ) - .await - } - Endpoint::CopyObject { key } => { - handle_copy(garage, &api_key, &req, bucket_id, &key).await - } + } => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await, + Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await, Endpoint::UploadPartCopy { key, part_number, upload_id, - } => { - handle_upload_part_copy( - garage, - &api_key, - &req, - bucket_id, - &key, - part_number, - &upload_id, - ) - .await - } - Endpoint::PutObject { key } => { - handle_put(garage, req, &bucket, &key, content_sha256).await - } + } => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await, + Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await, Endpoint::AbortMultipartUpload { key, upload_id } => { - handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await + handle_abort_multipart_upload(ctx, &key, &upload_id).await } - Endpoint::DeleteObject { key, .. } => handle_delete(garage, bucket_id, &key).await, + Endpoint::DeleteObject { key, .. } => handle_delete(ctx, &key).await, Endpoint::CreateMultipartUpload { key } => { - handle_create_multipart_upload(garage, &req, &bucket_name, bucket_id, &key).await + handle_create_multipart_upload(ctx, &req, &key).await } Endpoint::CompleteMultipartUpload { key, upload_id } => { - handle_complete_multipart_upload( - garage, - req, - &bucket_name, - &bucket, - &key, - &upload_id, - content_sha256, - ) - .await + handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await } Endpoint::CreateBucket {} => unreachable!(), Endpoint::HeadBucket {} => { let response = Response::builder().body(empty_body()).unwrap(); Ok(response) } - Endpoint::DeleteBucket {} => { - handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await - } - Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage), + Endpoint::DeleteBucket {} => handle_delete_bucket(ctx).await, + Endpoint::GetBucketLocation {} => handle_get_bucket_location(ctx), Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(), Endpoint::ListObjects { delimiter, @@ -268,24 +240,21 @@ impl ApiHandler for S3ApiServer { max_keys, prefix, } => { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_keys.unwrap_or(1000).clamp(1, 1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - is_v2: false, - marker, - continuation_token: None, - start_after: None, + let query = ListObjectsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, - ) - .await + is_v2: false, + marker, + continuation_token: None, + start_after: None, + }; + handle_list(ctx, &query).await } Endpoint::ListObjectsV2 { delimiter, @@ -298,24 +267,21 @@ impl ApiHandler for S3ApiServer { .. } => { if list_type == "2" { - handle_list( - garage, - &ListObjectsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_keys.unwrap_or(1000).clamp(1, 1000), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - prefix: prefix.unwrap_or_default(), - }, - is_v2: true, - marker: None, - continuation_token, - start_after, + let query = ListObjectsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_keys.unwrap_or(1000).clamp(1, 1000), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), + prefix: prefix.unwrap_or_default(), }, - ) - .await + is_v2: true, + marker: None, + continuation_token, + start_after, + }; + handle_list(ctx, &query).await } else { Err(Error::bad_request(format!( "Invalid endpoint: list-type={}", @@ -331,22 +297,19 @@ impl ApiHandler for S3ApiServer { prefix, upload_id_marker, } => { - handle_list_multipart_upload( - garage, - &ListMultipartUploadsQuery { - common: ListQueryCommon { - bucket_name, - bucket_id, - delimiter, - page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), - prefix: prefix.unwrap_or_default(), - urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), - }, - key_marker, - upload_id_marker, + let query = ListMultipartUploadsQuery { + common: ListQueryCommon { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + delimiter, + page_size: max_uploads.unwrap_or(1000).clamp(1, 1000), + prefix: prefix.unwrap_or_default(), + urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false), }, - ) - .await + key_marker, + upload_id_marker, + }; + handle_list_multipart_upload(ctx, &query).await } Endpoint::ListParts { key, @@ -354,39 +317,28 @@ impl ApiHandler for S3ApiServer { part_number_marker, upload_id, } => { - handle_list_parts( - garage, - &ListPartsQuery { - bucket_name, - bucket_id, - key, - upload_id, - part_number_marker: part_number_marker.map(|p| p.min(10000)), - max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), - }, - ) - .await + let query = ListPartsQuery { + bucket_name: ctx.bucket_name.clone(), + bucket_id, + key, + upload_id, + part_number_marker: part_number_marker.map(|p| p.min(10000)), + max_parts: max_parts.unwrap_or(1000).clamp(1, 1000), + }; + handle_list_parts(ctx, &query).await } - Endpoint::DeleteObjects {} => { - handle_delete_objects(garage, bucket_id, req, content_sha256).await - } - Endpoint::GetBucketWebsite {} => handle_get_website(&bucket).await, - Endpoint::PutBucketWebsite {} => { - handle_put_website(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketWebsite {} => handle_delete_website(garage, bucket.clone()).await, - Endpoint::GetBucketCors {} => handle_get_cors(&bucket).await, - Endpoint::PutBucketCors {} => { - handle_put_cors(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketCors {} => handle_delete_cors(garage, bucket.clone()).await, - Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(&bucket).await, + Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await, + Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await, + Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await, + Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await, + Endpoint::GetBucketCors {} => handle_get_cors(ctx).await, + Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await, + Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await, + Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await, Endpoint::PutBucketLifecycleConfiguration {} => { - handle_put_lifecycle(garage, bucket.clone(), req, content_sha256).await - } - Endpoint::DeleteBucketLifecycle {} => { - handle_delete_lifecycle(garage, bucket.clone()).await + handle_put_lifecycle(ctx, req, content_sha256).await } + Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await, endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), }; @@ -394,7 +346,7 @@ impl ApiHandler for S3ApiServer { // add the corresponding CORS headers to the response let mut resp_ok = resp?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp_ok, rule) + add_cors_headers(&mut resp_ok, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/s3/bucket.rs b/src/api/s3/bucket.rs index fa337566..6a12aa9c 100644 --- a/src/api/s3/bucket.rs +++ b/src/api/s3/bucket.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -21,7 +20,8 @@ use crate::s3::error::*; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -pub fn handle_get_bucket_location(garage: Arc) -> Result, Error> { +pub fn handle_get_bucket_location(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { garage, .. } = ctx; let loc = s3_xml::LocationConstraint { xmlns: (), region: garage.config.s3_api.s3_region.to_string(), @@ -204,21 +204,20 @@ pub async fn handle_create_bucket( .unwrap()) } -pub async fn handle_delete_bucket( - garage: &Garage, - bucket_id: Uuid, - bucket_name: String, - api_key_id: &String, -) -> Result, Error> { +pub async fn handle_delete_bucket(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + bucket_params: bucket_state, + api_key, + .. + } = &ctx; let helper = garage.locked_helper().await; - let api_key = helper.key().get_existing_key(api_key_id).await?; let key_params = api_key.params().unwrap(); - let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_))); - - let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?; - let bucket_state = bucket.state.as_option().unwrap(); + let is_local_alias = matches!(key_params.local_aliases.get(bucket_name), Some(Some(_))); // If the bucket has no other aliases, this is a true deletion. // Otherwise, it is just an alias removal. @@ -228,20 +227,20 @@ pub async fn handle_delete_bucket( .items() .iter() .filter(|(_, _, active)| *active) - .any(|(n, _, _)| is_local_alias || (*n != bucket_name)); + .any(|(n, _, _)| is_local_alias || (*n != *bucket_name)); let has_other_local_aliases = bucket_state .local_aliases .items() .iter() .filter(|(_, _, active)| *active) - .any(|((k, n), _, _)| !is_local_alias || *n != bucket_name || *k != api_key.key_id); + .any(|((k, n), _, _)| !is_local_alias || *n != *bucket_name || *k != api_key.key_id); if !has_other_global_aliases && !has_other_local_aliases { // Delete bucket // Check bucket is empty - if !helper.bucket().is_bucket_empty(bucket_id).await? { + if !helper.bucket().is_bucket_empty(*bucket_id).await? { return Err(CommonError::BucketNotEmpty.into()); } @@ -249,33 +248,36 @@ pub async fn handle_delete_bucket( // 1. delete bucket alias if is_local_alias { helper - .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name) .await?; } else { helper - .unset_global_bucket_alias(bucket_id, &bucket_name) + .unset_global_bucket_alias(*bucket_id, bucket_name) .await?; } // 2. delete authorization from keys that had access - for (key_id, _) in bucket.authorized_keys() { + for (key_id, _) in bucket_state.authorized_keys.items() { helper - .set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS) + .set_bucket_key_permissions(*bucket_id, key_id, BucketKeyPerm::NO_PERMISSIONS) .await?; } + let bucket = Bucket { + id: *bucket_id, + state: Deletable::delete(), + }; // 3. delete bucket - bucket.state = Deletable::delete(); garage.bucket_table.insert(&bucket).await?; } else if is_local_alias { // Just unalias helper - .unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name) + .unset_local_bucket_alias(*bucket_id, &api_key.key_id, bucket_name) .await?; } else { // Just unalias (but from global namespace) helper - .unset_global_bucket_alias(bucket_id, &bucket_name) + .unset_global_bucket_alias(*bucket_id, bucket_name) .await?; } diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 880ce5f4..3c2bd483 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -1,5 +1,4 @@ use std::pin::Pin; -use std::sync::Arc; use std::time::{Duration, SystemTime, UNIX_EPOCH}; use futures::{stream, stream::Stream, StreamExt}; @@ -15,8 +14,6 @@ use garage_table::*; use garage_util::data::*; use garage_util::time::*; -use garage_model::garage::Garage; -use garage_model::key_table::Key; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; @@ -30,15 +27,19 @@ use crate::s3::put::get_headers; use crate::s3::xml::{self as s3_xml, xmlns_tag}; pub async fn handle_copy( - garage: Arc, - api_key: &Key, + ctx: ReqCtx, req: &Request, - dest_bucket_id: Uuid, dest_key: &str, ) -> Result, Error> { let copy_precondition = CopyPreconditionHeaders::parse(req)?; - let source_object = get_copy_source(&garage, api_key, req).await?; + let source_object = get_copy_source(&ctx, req).await?; + + let ReqCtx { + garage, + bucket_id: dest_bucket_id, + .. + } = ctx; let (source_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -181,10 +182,8 @@ pub async fn handle_copy( } pub async fn handle_upload_part_copy( - garage: Arc, - api_key: &Key, + ctx: ReqCtx, req: &Request, - dest_bucket_id: Uuid, dest_key: &str, part_number: u64, upload_id: &str, @@ -195,10 +194,12 @@ pub async fn handle_upload_part_copy( let dest_key = dest_key.to_string(); let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( - get_copy_source(&garage, api_key, req), - multipart::get_upload(&garage, &dest_bucket_id, &dest_key, &dest_upload_id) + get_copy_source(&ctx, req), + multipart::get_upload(&ctx, &dest_key, &dest_upload_id) )?; + let ReqCtx { garage, .. } = ctx; + let (source_object_version, source_version_data, source_version_meta) = extract_source_info(&source_object)?; @@ -439,11 +440,11 @@ pub async fn handle_upload_part_copy( .body(string_body(resp_xml))?) } -async fn get_copy_source( - garage: &Garage, - api_key: &Key, - req: &Request, -) -> Result { +async fn get_copy_source(ctx: &ReqCtx, req: &Request) -> Result { + let ReqCtx { + garage, api_key, .. + } = ctx; + let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?; diff --git a/src/api/s3/cors.rs b/src/api/s3/cors.rs index e069cae4..173b7ffe 100644 --- a/src/api/s3/cors.rs +++ b/src/api/s3/cors.rs @@ -21,16 +21,13 @@ use crate::s3::error::*; use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; -use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule}; +use garage_model::bucket_table::{Bucket, BucketParams, CorsRule as GarageCorsRule}; use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_cors(bucket: &Bucket) -> Result, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - if let Some(cors) = param.cors_config.get() { +pub async fn handle_get_cors(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { bucket_params, .. } = ctx; + if let Some(cors) = bucket_params.cors_config.get() { let wc = CorsConfiguration { xmlns: (), cors_rules: cors @@ -50,16 +47,18 @@ pub async fn handle_get_cors(bucket: &Bucket) -> Result, Error } } -pub async fn handle_delete_cors( - garage: Arc, - mut bucket: Bucket, -) -> Result, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.cors_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_cors(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.cors_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -67,28 +66,33 @@ pub async fn handle_delete_cors( } pub async fn handle_put_cors( - garage: Arc, - mut bucket: Bucket, + ctx: ReqCtx, req: Request, content_sha256: Option, ) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: CorsConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - param + bucket_params .cors_config .update(Some(conf.into_garage_cors_config()?)); - garage.bucket_table.insert(&bucket).await?; + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) @@ -115,7 +119,8 @@ pub async fn handle_options_api( let bucket_id = helper.resolve_global_bucket_name(&bn).await?; if let Some(id) = bucket_id { let bucket = garage.bucket_helper().get_existing_bucket(id).await?; - handle_options_for_bucket(req, &bucket) + let bucket_params = bucket.state.into_option().unwrap(); + handle_options_for_bucket(req, &bucket_params) } else { // If there is a bucket name in the request, but that name // does not correspond to a global alias for a bucket, @@ -145,7 +150,7 @@ pub async fn handle_options_api( pub fn handle_options_for_bucket( req: &Request, - bucket: &Bucket, + bucket_params: &BucketParams, ) -> Result, CommonError> { let origin = req .headers() @@ -162,7 +167,7 @@ pub fn handle_options_for_bucket( None => vec![], }; - if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { + if let Some(cors_config) = bucket_params.cors_config.get() { let matching_rule = cors_config .iter() .find(|rule| cors_rule_matches(rule, origin, request_method, request_headers.iter())); @@ -181,10 +186,10 @@ pub fn handle_options_for_bucket( } pub fn find_matching_cors_rule<'a>( - bucket: &'a Bucket, + bucket_params: &'a BucketParams, req: &Request, ) -> Result, Error> { - if let Some(cors_config) = bucket.params().unwrap().cors_config.get() { + if let Some(cors_config) = bucket_params.cors_config.get() { if let Some(origin) = req.headers().get("Origin") { let origin = origin.to_str()?; let request_headers = match req.headers().get(ACCESS_CONTROL_REQUEST_HEADERS) { diff --git a/src/api/s3/delete.rs b/src/api/s3/delete.rs index 3fb39147..57f6f948 100644 --- a/src/api/s3/delete.rs +++ b/src/api/s3/delete.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; use garage_util::data::*; -use garage_model::garage::Garage; use garage_model::s3::object_table::*; use crate::helpers::*; @@ -15,14 +12,13 @@ use crate::s3::put::next_timestamp; use crate::s3::xml as s3_xml; use crate::signature::verify_signed_content; -async fn handle_delete_internal( - garage: &Garage, - bucket_id: Uuid, - key: &str, -) -> Result<(Uuid, Uuid), Error> { +async fn handle_delete_internal(ctx: &ReqCtx, key: &str) -> Result<(Uuid, Uuid), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let object = garage .object_table - .get(&bucket_id, &key.to_string()) + .get(bucket_id, &key.to_string()) .await? .ok_or(Error::NoSuchKey)?; // No need to delete @@ -44,7 +40,7 @@ async fn handle_delete_internal( }; let object = Object::new( - bucket_id, + *bucket_id, key.into(), vec![ObjectVersion { uuid: del_uuid, @@ -58,12 +54,8 @@ async fn handle_delete_internal( Ok((deleted_version, del_uuid)) } -pub async fn handle_delete( - garage: Arc, - bucket_id: Uuid, - key: &str, -) -> Result, Error> { - match handle_delete_internal(&garage, bucket_id, key).await { +pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result, Error> { + match handle_delete_internal(&ctx, key).await { Ok(_) | Err(Error::NoSuchKey) => Ok(Response::builder() .status(StatusCode::NO_CONTENT) .body(empty_body()) @@ -73,8 +65,7 @@ pub async fn handle_delete( } pub async fn handle_delete_objects( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, req: Request, content_sha256: Option, ) -> Result, Error> { @@ -91,7 +82,7 @@ pub async fn handle_delete_objects( let mut ret_errors = Vec::new(); for obj in cmd.objects.iter() { - match handle_delete_internal(&garage, bucket_id, &obj.key).await { + match handle_delete_internal(&ctx, &obj.key).await { Ok((deleted_version, delete_marker_version)) => { if cmd.quiet { continue; diff --git a/src/api/s3/get.rs b/src/api/s3/get.rs index 0d18e775..ed996fb1 100644 --- a/src/api/s3/get.rs +++ b/src/api/s3/get.rs @@ -131,6 +131,16 @@ fn try_answer_cached( /// Handle HEAD request pub async fn handle_head( + ctx: ReqCtx, + req: &Request, + key: &str, + part_number: Option, +) -> Result, Error> { + handle_head_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number).await +} + +/// Handle HEAD request for website +pub async fn handle_head_without_ctx( garage: Arc, req: &Request, bucket_id: Uuid, @@ -218,6 +228,17 @@ pub async fn handle_head( /// Handle GET request pub async fn handle_get( + ctx: ReqCtx, + req: &Request, + key: &str, + part_number: Option, + overrides: GetObjectOverrides, +) -> Result, Error> { + handle_get_without_ctx(ctx.garage, req, ctx.bucket_id, key, part_number, overrides).await +} + +/// Handle GET request +pub async fn handle_get_without_ctx( garage: Arc, req: &Request, bucket_id: Uuid, diff --git a/src/api/s3/lifecycle.rs b/src/api/s3/lifecycle.rs index 35757e8c..7eb1c2cb 100644 --- a/src/api/s3/lifecycle.rs +++ b/src/api/s3/lifecycle.rs @@ -1,5 +1,4 @@ use quick_xml::de::from_reader; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -16,15 +15,12 @@ use garage_model::bucket_table::{ parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration, LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule, }; -use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; +pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { bucket_params, .. } = ctx; - if let Some(lifecycle) = param.lifecycle_config.get() { + if let Some(lifecycle) = bucket_params.lifecycle_config.get() { let wc = LifecycleConfiguration::from_garage_lifecycle_config(lifecycle); let xml = to_xml_with_header(&wc)?; Ok(Response::builder() @@ -38,16 +34,18 @@ pub async fn handle_get_lifecycle(bucket: &Bucket) -> Result, } } -pub async fn handle_delete_lifecycle( - garage: Arc, - mut bucket: Bucket, -) -> Result, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.lifecycle_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.lifecycle_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -55,28 +53,33 @@ pub async fn handle_delete_lifecycle( } pub async fn handle_put_lifecycle( - garage: Arc, - mut bucket: Bucket, + ctx: ReqCtx, req: Request, content_sha256: Option, ) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: LifecycleConfiguration = from_reader(&body as &[u8])?; let config = conf .validate_into_garage_lifecycle_config() .ok_or_bad_request("Invalid lifecycle configuration")?; - param.lifecycle_config.update(Some(config)); - garage.bucket_table.insert(&bucket).await?; + bucket_params.lifecycle_config.update(Some(config)); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) diff --git a/src/api/s3/list.rs b/src/api/s3/list.rs index b832a4f4..302c03f4 100644 --- a/src/api/s3/list.rs +++ b/src/api/s3/list.rs @@ -1,6 +1,5 @@ use std::collections::{BTreeMap, BTreeSet}; use std::iter::{Iterator, Peekable}; -use std::sync::Arc; use base64::prelude::*; use hyper::Response; @@ -9,7 +8,6 @@ use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; -use garage_model::garage::Garage; use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; @@ -62,9 +60,10 @@ pub struct ListPartsQuery { } pub async fn handle_list( - garage: Arc, + ctx: ReqCtx, query: &ListObjectsQuery, ) -> Result, Error> { + let ReqCtx { garage, .. } = &ctx; let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -167,9 +166,11 @@ pub async fn handle_list( } pub async fn handle_list_multipart_upload( - garage: Arc, + ctx: ReqCtx, query: &ListMultipartUploadsQuery, ) -> Result, Error> { + let ReqCtx { garage, .. } = &ctx; + let io = |bucket, key, count| { let t = &garage.object_table; async move { @@ -269,15 +270,14 @@ pub async fn handle_list_multipart_upload( } pub async fn handle_list_parts( - garage: Arc, + ctx: ReqCtx, query: &ListPartsQuery, ) -> Result, Error> { debug!("ListParts {:?}", query); let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?; - let (_, _, mpu) = - s3_multipart::get_upload(&garage, &query.bucket_id, &query.key, &upload_id).await?; + let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?; let (info, next) = fetch_part_info(query, &mpu)?; diff --git a/src/api/s3/multipart.rs b/src/api/s3/multipart.rs index 5959bcd6..1d5aeb26 100644 --- a/src/api/s3/multipart.rs +++ b/src/api/s3/multipart.rs @@ -8,7 +8,6 @@ use md5::{Digest as Md5Digest, Md5}; use garage_table::*; use garage_util::data::*; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::s3::block_ref_table::*; use garage_model::s3::mpu_table::*; @@ -25,12 +24,16 @@ use crate::signature::verify_signed_content; // ---- pub async fn handle_create_multipart_upload( - garage: Arc, + ctx: ReqCtx, req: &Request, - bucket_name: &str, - bucket_id: Uuid, key: &String, ) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; let existing_object = garage.object_table.get(&bucket_id, &key).await?; let upload_id = gen_uuid(); @@ -47,13 +50,13 @@ pub async fn handle_create_multipart_upload( headers, }, }; - let object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&object).await?; // Create multipart upload in mpu table // This multipart upload will hold references to uploaded parts // (which are entries in the Version table) - let mpu = MultipartUpload::new(upload_id, timestamp, bucket_id, key.into(), false); + let mpu = MultipartUpload::new(upload_id, timestamp, *bucket_id, key.into(), false); garage.mpu_table.insert(&mpu).await?; // Send success response @@ -69,14 +72,15 @@ pub async fn handle_create_multipart_upload( } pub async fn handle_put_part( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket_id: Uuid, key: &str, part_number: u64, upload_id: &str, content_sha256: Option, ) -> Result, Error> { + let ReqCtx { garage, .. } = &ctx; + let upload_id = decode_upload_id(upload_id)?; let content_md5 = match req.headers().get("content-md5") { @@ -90,10 +94,8 @@ pub async fn handle_put_part( let stream = body_stream(req.into_body()); let mut chunker = StreamChunker::new(stream, garage.config.block_size); - let ((_, _, mut mpu), first_block) = futures::try_join!( - get_upload(&garage, &bucket_id, &key, &upload_id), - chunker.next(), - )?; + let ((_, _, mut mpu), first_block) = + futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; // Check object is valid and part can be accepted let first_block = first_block.ok_or_bad_request("Empty body")?; @@ -135,7 +137,7 @@ pub async fn handle_put_part( // Copy data to version let (total_size, data_md5sum, data_sha256sum, _) = - read_and_put_blocks(&garage, &version, part_number, first_block, &mut chunker).await?; + read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; // Verify that checksums map ensure_checksum_matches( @@ -200,14 +202,19 @@ impl Drop for InterruptedCleanup { } pub async fn handle_complete_multipart_upload( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket_name: &str, - bucket: &Bucket, key: &str, upload_id: &str, content_sha256: Option, ) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + bucket_name, + .. + } = &ctx; + let body = http_body_util::BodyExt::collect(req.into_body()) .await? .to_bytes(); @@ -228,8 +235,7 @@ pub async fn handle_complete_multipart_upload( // Get object and multipart upload let key = key.to_string(); - let (object, mut object_version, mpu) = - get_upload(&garage, &bucket.id, &key, &upload_id).await?; + let (object, mut object_version, mpu) = get_upload(&ctx, &key, &upload_id).await?; if mpu.parts.is_empty() { return Err(Error::bad_request("No data was uploaded")); @@ -283,7 +289,7 @@ pub async fn handle_complete_multipart_upload( let mut final_version = Version::new( upload_id, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.to_string(), }, false, @@ -327,9 +333,9 @@ pub async fn handle_complete_multipart_upload( // Calculate total size of final object let total_size = parts.iter().map(|x| x.size.unwrap()).sum(); - if let Err(e) = check_quotas(&garage, bucket, total_size, Some(&object)).await { + if let Err(e) = check_quotas(&ctx, total_size, Some(&object)).await { object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; return Err(e); @@ -345,7 +351,7 @@ pub async fn handle_complete_multipart_upload( final_version.blocks.items()[0].1.hash, )); - let final_object = Object::new(bucket.id, key.clone(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.clone(), vec![object_version]); garage.object_table.insert(&final_object).await?; // Send response saying ok we're done @@ -362,18 +368,20 @@ pub async fn handle_complete_multipart_upload( } pub async fn handle_abort_multipart_upload( - garage: Arc, - bucket_id: Uuid, + ctx: ReqCtx, key: &str, upload_id: &str, ) -> Result, Error> { + let ReqCtx { + garage, bucket_id, .. + } = &ctx; + let upload_id = decode_upload_id(upload_id)?; - let (_, mut object_version, _) = - get_upload(&garage, &bucket_id, &key.to_string(), &upload_id).await?; + let (_, mut object_version, _) = get_upload(&ctx, &key.to_string(), &upload_id).await?; object_version.state = ObjectVersionState::Aborted; - let final_object = Object::new(bucket_id, key.to_string(), vec![object_version]); + let final_object = Object::new(*bucket_id, key.to_string(), vec![object_version]); garage.object_table.insert(&final_object).await?; Ok(Response::new(empty_body())) @@ -383,11 +391,13 @@ pub async fn handle_abort_multipart_upload( #[allow(clippy::ptr_arg)] pub(crate) async fn get_upload( - garage: &Garage, - bucket_id: &Uuid, + ctx: &ReqCtx, key: &String, upload_id: &Uuid, ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; let (object, mpu) = futures::try_join!( garage.object_table.get(bucket_id, key).map_err(Error::from), garage diff --git a/src/api/s3/post_object.rs b/src/api/s3/post_object.rs index b542cc1a..66f8174c 100644 --- a/src/api/s3/post_object.rs +++ b/src/api/s3/post_object.rs @@ -120,6 +120,12 @@ pub async fn handle_post_object( .bucket_helper() .get_existing_bucket(bucket_id) .await?; + let bucket_params = bucket.state.into_option().unwrap(); + let matching_cors_rule = find_matching_cors_rule( + &bucket_params, + &Request::from_parts(head.clone(), empty_body::()), + )? + .cloned(); let decoded_policy = BASE64_STANDARD .decode(policy) @@ -213,11 +219,19 @@ pub async fn handle_post_object( let headers = get_headers(¶ms)?; let stream = field.map(|r| r.map_err(Into::into)); - let (_, md5) = save_stream( + + let ctx = ReqCtx { garage, + bucket_id, + bucket_name, + bucket_params, + api_key, + }; + + let (_, md5) = save_stream( + &ctx, headers, StreamLimiter::new(stream, conditions.content_length), - &bucket, &key, None, None, @@ -234,7 +248,7 @@ pub async fn handle_post_object( { target .query_pairs_mut() - .append_pair("bucket", &bucket_name) + .append_pair("bucket", &ctx.bucket_name) .append_pair("key", &key) .append_pair("etag", &etag); let target = target.to_string(); @@ -278,7 +292,7 @@ pub async fn handle_post_object( let xml = s3_xml::PostObject { xmlns: (), location: s3_xml::Value(location), - bucket: s3_xml::Value(bucket_name), + bucket: s3_xml::Value(ctx.bucket_name), key: s3_xml::Value(key), etag: s3_xml::Value(etag), }; @@ -291,12 +305,8 @@ pub async fn handle_post_object( } }; - let matching_cors_rule = find_matching_cors_rule( - &bucket, - &Request::from_parts(head, empty_body::()), - )?; if let Some(rule) = matching_cors_rule { - add_cors_headers(&mut resp, rule) + add_cors_headers(&mut resp, &rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 489f1136..36523b30 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -28,7 +28,6 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::index_counter::CountedItem; use garage_model::s3::block_ref_table::*; @@ -42,9 +41,8 @@ use crate::s3::error::*; const PUT_BLOCKS_MAX_PARALLEL: usize = 3; pub async fn handle_put( - garage: Arc, + ctx: ReqCtx, req: Request, - bucket: &Bucket, key: &String, content_sha256: Option, ) -> Result, Error> { @@ -59,35 +57,27 @@ pub async fn handle_put( let stream = body_stream(req.into_body()); - save_stream( - garage, - headers, - stream, - bucket, - key, - content_md5, - content_sha256, - ) - .await - .map(|(uuid, md5)| put_response(uuid, md5)) + save_stream(&ctx, headers, stream, key, content_md5, content_sha256) + .await + .map(|(uuid, md5)| put_response(uuid, md5)) } pub(crate) async fn save_stream> + Unpin>( - garage: Arc, + ctx: &ReqCtx, headers: ObjectVersionHeaders, body: S, - bucket: &Bucket, key: &String, content_md5: Option, content_sha256: Option, ) -> Result<(Uuid, String), Error> { + let ReqCtx { + garage, bucket_id, .. + } = ctx; + let mut chunker = StreamChunker::new(body, garage.config.block_size); let (first_block_opt, existing_object) = try_join!( chunker.next(), - garage - .object_table - .get(&bucket.id, key) - .map_err(Error::from), + garage.object_table.get(bucket_id, key).map_err(Error::from), )?; let first_block = first_block_opt.unwrap_or_default(); @@ -114,7 +104,7 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, size, existing_object.as_ref()).await?; + check_quotas(ctx, size, existing_object.as_ref()).await?; let object_version = ObjectVersion { uuid: version_uuid, @@ -129,7 +119,7 @@ pub(crate) async fn save_stream> + Unpin>( )), }; - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; return Ok((version_uuid, data_md5sum_hex)); @@ -140,7 +130,7 @@ pub(crate) async fn save_stream> + Unpin>( // before everything is finished (cleanup is done using the Drop trait). let mut interrupted_cleanup = InterruptedCleanup(Some(InterruptedCleanupInner { garage: garage.clone(), - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), version_uuid, version_timestamp, @@ -156,7 +146,7 @@ pub(crate) async fn save_stream> + Unpin>( multipart: false, }, }; - let object = Object::new(bucket.id, key.into(), vec![object_version.clone()]); + let object = Object::new(*bucket_id, key.into(), vec![object_version.clone()]); garage.object_table.insert(&object).await?; // Initialize corresponding entry in version table @@ -166,7 +156,7 @@ pub(crate) async fn save_stream> + Unpin>( let version = Version::new( version_uuid, VersionBacklink::Object { - bucket_id: bucket.id, + bucket_id: *bucket_id, key: key.into(), }, false, @@ -175,7 +165,7 @@ pub(crate) async fn save_stream> + Unpin>( // Transfer data and verify checksum let (total_size, data_md5sum, data_sha256sum, first_block_hash) = - read_and_put_blocks(&garage, &version, 1, first_block, &mut chunker).await?; + read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; ensure_checksum_matches( data_md5sum.as_slice(), @@ -184,7 +174,7 @@ pub(crate) async fn save_stream> + Unpin>( content_sha256, )?; - check_quotas(&garage, bucket, total_size, existing_object.as_ref()).await?; + check_quotas(ctx, total_size, existing_object.as_ref()).await?; // Save final object state, marked as Complete let md5sum_hex = hex::encode(data_md5sum); @@ -196,7 +186,7 @@ pub(crate) async fn save_stream> + Unpin>( }, first_block_hash, )); - let object = Object::new(bucket.id, key.into(), vec![object_version]); + let object = Object::new(*bucket_id, key.into(), vec![object_version]); garage.object_table.insert(&object).await?; // We were not interrupted, everything went fine. @@ -235,12 +225,18 @@ pub(crate) fn ensure_checksum_matches( /// Check that inserting this object with this size doesn't exceed bucket quotas pub(crate) async fn check_quotas( - garage: &Arc, - bucket: &Bucket, + ctx: &ReqCtx, size: u64, prev_object: Option<&Object>, ) -> Result<(), Error> { - let quotas = bucket.state.as_option().unwrap().quotas.get(); + let ReqCtx { + garage, + bucket_id, + bucket_params, + .. + } = ctx; + + let quotas = bucket_params.quotas.get(); if quotas.max_objects.is_none() && quotas.max_size.is_none() { return Ok(()); }; @@ -248,7 +244,7 @@ pub(crate) async fn check_quotas( let counters = garage .object_counter_table .table - .get(&bucket.id, &EmptyKey) + .get(bucket_id, &EmptyKey) .await?; let counters = counters @@ -292,7 +288,7 @@ pub(crate) async fn check_quotas( } pub(crate) async fn read_and_put_blocks> + Unpin>( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, first_block: Bytes, @@ -417,7 +413,7 @@ pub(crate) async fn read_and_put_blocks> + let offset = written_bytes; written_bytes += block.len() as u64; write_futs.push_back(put_block_and_meta( - garage, + ctx, version, part_number, offset, @@ -447,7 +443,7 @@ pub(crate) async fn read_and_put_blocks> + } async fn put_block_and_meta( - garage: &Garage, + ctx: &ReqCtx, version: &Version, part_number: u64, offset: u64, @@ -455,6 +451,8 @@ async fn put_block_and_meta( block: Bytes, order_tag: OrderTag, ) -> Result<(), GarageError> { + let ReqCtx { garage, .. } = ctx; + let mut version = version.clone(); version.blocks.put( VersionBlockKey { diff --git a/src/api/s3/website.rs b/src/api/s3/website.rs index 1c1dbf20..6af55677 100644 --- a/src/api/s3/website.rs +++ b/src/api/s3/website.rs @@ -1,5 +1,4 @@ use quick_xml::de::from_reader; -use std::sync::Arc; use http_body_util::BodyExt; use hyper::{Request, Response, StatusCode}; @@ -12,15 +11,11 @@ use crate::s3::xml::{to_xml_with_header, xmlns_tag, IntValue, Value}; use crate::signature::verify_signed_content; use garage_model::bucket_table::*; -use garage_model::garage::Garage; use garage_util::data::*; -pub async fn handle_get_website(bucket: &Bucket) -> Result, Error> { - let param = bucket - .params() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - if let Some(website) = param.website_config.get() { +pub async fn handle_get_website(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { bucket_params, .. } = ctx; + if let Some(website) = bucket_params.website_config.get() { let wc = WebsiteConfiguration { xmlns: (), error_document: website.error_document.as_ref().map(|v| Key { @@ -44,16 +39,18 @@ pub async fn handle_get_website(bucket: &Bucket) -> Result, Er } } -pub async fn handle_delete_website( - garage: Arc, - mut bucket: Bucket, -) -> Result, Error> { - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - - param.website_config.update(None); - garage.bucket_table.insert(&bucket).await?; +pub async fn handle_delete_website(ctx: ReqCtx) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + bucket_params.website_config.update(None); + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::NO_CONTENT) @@ -61,28 +58,33 @@ pub async fn handle_delete_website( } pub async fn handle_put_website( - garage: Arc, - mut bucket: Bucket, + ctx: ReqCtx, req: Request, content_sha256: Option, ) -> Result, Error> { + let ReqCtx { + garage, + bucket_id, + mut bucket_params, + .. + } = ctx; + let body = BodyExt::collect(req.into_body()).await?.to_bytes(); if let Some(content_sha256) = content_sha256 { verify_signed_content(content_sha256, &body[..])?; } - let param = bucket - .params_mut() - .ok_or_internal_error("Bucket should not be deleted at this point")?; - let conf: WebsiteConfiguration = from_reader(&body as &[u8])?; conf.validate()?; - param + bucket_params .website_config .update(Some(conf.into_garage_website_config()?)); - garage.bucket_table.insert(&bucket).await?; + garage + .bucket_table + .insert(&Bucket::present(bucket_id, bucket_params)) + .await?; Ok(Response::builder() .status(StatusCode::OK) diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 4c48a76f..1dbdfac2 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -191,6 +191,13 @@ impl Bucket { } } + pub fn present(id: Uuid, params: BucketParams) -> Self { + Bucket { + id, + state: crdt::Deletable::present(params), + } + } + /// Returns true if this represents a deleted bucket pub fn is_deleted(&self) -> bool { self.state.is_deleted() diff --git a/src/web/web_server.rs b/src/web/web_server.rs index 0f9b5dc8..69939f65 100644 --- a/src/web/web_server.rs +++ b/src/web/web_server.rs @@ -26,7 +26,7 @@ use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_opt use garage_api::s3::error::{ CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError, }; -use garage_api::s3::get::{handle_get, handle_head}; +use garage_api::s3::get::{handle_get_without_ctx, handle_head_without_ctx}; use garage_model::garage::Garage; @@ -219,14 +219,13 @@ impl WebServer { // Check bucket isn't deleted and has website access enabled let bucket = self .garage - .bucket_table - .get(&EmptyKey, &bucket_id) - .await? - .ok_or(Error::NotFound)?; + .bucket_helper() + .get_existing_bucket(bucket_id) + .await + .map_err(|_| Error::NotFound)?; + let bucket_params = bucket.state.into_option().unwrap(); - let website_config = bucket - .params() - .ok_or(Error::NotFound)? + let website_config = bucket_params .website_config .get() .as_ref() @@ -243,14 +242,16 @@ impl WebServer { ); let ret_doc = match *req.method() { - Method::OPTIONS => handle_options_for_bucket(req, &bucket) + Method::OPTIONS => handle_options_for_bucket(req, &bucket_params) .map_err(ApiError::from) .map(|res| res.map(|_empty_body: EmptyBody| empty_body())), - Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, + Method::HEAD => { + handle_head_without_ctx(self.garage.clone(), req, bucket_id, &key, None).await + } Method::GET => { - handle_get( + handle_get_without_ctx( self.garage.clone(), - &req, + req, bucket_id, &key, None, @@ -301,7 +302,7 @@ impl WebServer { .body(empty_body::()) .unwrap(); - match handle_get( + match handle_get_without_ctx( self.garage.clone(), &req2, bucket_id, @@ -344,7 +345,7 @@ impl WebServer { } Ok(mut resp) => { // Maybe add CORS headers - if let Some(rule) = find_matching_cors_rule(&bucket, req)? { + if let Some(rule) = find_matching_cors_rule(&bucket_params, req)? { add_cors_headers(&mut resp, rule) .ok_or_internal_error("Invalid bucket CORS configuration")?; }