rework bucket helper functions to use local access where relevant
This commit is contained in:
parent
2c9e849bbf
commit
1e13a66b42
15 changed files with 228 additions and 151 deletions
|
@ -79,18 +79,24 @@ impl RequestHandler for GetBucketInfoRequest {
|
|||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<GetBucketInfoResponse, Error> {
|
||||
let bucket_id = match (self.id, self.global_alias, self.search) {
|
||||
(Some(id), None, None) => parse_bucket_id(&id)?,
|
||||
(None, Some(ga), None) => garage
|
||||
.bucket_alias_table
|
||||
.get(&EmptyKey, &ga)
|
||||
.await?
|
||||
.and_then(|x| *x.state.get())
|
||||
.ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
|
||||
let bucket = match (self.id, self.global_alias, self.search) {
|
||||
(Some(id), None, None) => {
|
||||
let id = parse_bucket_id(&id)?;
|
||||
garage.bucket_helper().get_existing_bucket(id).await?
|
||||
}
|
||||
(None, Some(ga), None) => {
|
||||
let id = garage
|
||||
.bucket_alias_table
|
||||
.get(&EmptyKey, &ga)
|
||||
.await?
|
||||
.and_then(|x| *x.state.get())
|
||||
.ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?;
|
||||
garage.bucket_helper().get_existing_bucket(id).await?
|
||||
}
|
||||
(None, None, Some(search)) => {
|
||||
let helper = garage.bucket_helper();
|
||||
if let Some(uuid) = helper.resolve_global_bucket_name(&search).await? {
|
||||
uuid
|
||||
if let Some(bucket) = helper.resolve_global_bucket(&search).await? {
|
||||
bucket
|
||||
} else {
|
||||
let hexdec = if search.len() >= 2 {
|
||||
search
|
||||
|
@ -124,7 +130,7 @@ impl RequestHandler for GetBucketInfoRequest {
|
|||
if candidates.is_empty() {
|
||||
return Err(Error::Common(CommonError::NoSuchBucket(search.clone())));
|
||||
} else if candidates.len() == 1 {
|
||||
candidates.into_iter().next().unwrap().id
|
||||
candidates.into_iter().next().unwrap()
|
||||
} else {
|
||||
return Err(Error::bad_request(format!(
|
||||
"Several matching buckets: {}",
|
||||
|
@ -140,23 +146,18 @@ impl RequestHandler for GetBucketInfoRequest {
|
|||
}
|
||||
};
|
||||
|
||||
bucket_info_results(garage, bucket_id).await
|
||||
bucket_info_results(garage, bucket).await
|
||||
}
|
||||
}
|
||||
|
||||
async fn bucket_info_results(
|
||||
garage: &Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
bucket: Bucket,
|
||||
) -> Result<GetBucketInfoResponse, Error> {
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.get_existing_bucket(bucket_id)
|
||||
.await?;
|
||||
|
||||
let counters = garage
|
||||
.object_counter_table
|
||||
.table
|
||||
.get(&bucket_id, &EmptyKey)
|
||||
.get(&bucket.id, &EmptyKey)
|
||||
.await?
|
||||
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
|
||||
.unwrap_or_default();
|
||||
|
@ -164,7 +165,7 @@ async fn bucket_info_results(
|
|||
let mpu_counters = garage
|
||||
.mpu_counter_table
|
||||
.table
|
||||
.get(&bucket_id, &EmptyKey)
|
||||
.get(&bucket.id, &EmptyKey)
|
||||
.await?
|
||||
.map(|x| x.filtered_values(&garage.system.cluster_layout()))
|
||||
.unwrap_or_default();
|
||||
|
@ -336,7 +337,7 @@ impl RequestHandler for CreateBucketRequest {
|
|||
}
|
||||
|
||||
Ok(CreateBucketResponse(
|
||||
bucket_info_results(garage, bucket.id).await?,
|
||||
bucket_info_results(garage, bucket).await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -444,7 +445,7 @@ impl RequestHandler for UpdateBucketRequest {
|
|||
garage.bucket_table.insert(&bucket).await?;
|
||||
|
||||
Ok(UpdateBucketResponse(
|
||||
bucket_info_results(garage, bucket_id).await?,
|
||||
bucket_info_results(garage, bucket).await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -534,7 +535,7 @@ pub async fn handle_bucket_change_key_perm(
|
|||
.set_bucket_key_permissions(bucket.id, &key.key_id, perm)
|
||||
.await?;
|
||||
|
||||
bucket_info_results(garage, bucket.id).await
|
||||
bucket_info_results(garage, bucket).await
|
||||
}
|
||||
|
||||
// ---- BUCKET ALIASES ----
|
||||
|
@ -551,11 +552,11 @@ impl RequestHandler for AddBucketAliasRequest {
|
|||
|
||||
let helper = garage.locked_helper().await;
|
||||
|
||||
match self.alias {
|
||||
let bucket = match self.alias {
|
||||
BucketAliasEnum::Global { global_alias } => {
|
||||
helper
|
||||
.set_global_bucket_alias(bucket_id, &global_alias)
|
||||
.await?;
|
||||
.await?
|
||||
}
|
||||
BucketAliasEnum::Local {
|
||||
local_alias,
|
||||
|
@ -563,12 +564,12 @@ impl RequestHandler for AddBucketAliasRequest {
|
|||
} => {
|
||||
helper
|
||||
.set_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
|
||||
.await?;
|
||||
.await?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(AddBucketAliasResponse(
|
||||
bucket_info_results(garage, bucket_id).await?,
|
||||
bucket_info_results(garage, bucket).await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
@ -585,11 +586,11 @@ impl RequestHandler for RemoveBucketAliasRequest {
|
|||
|
||||
let helper = garage.locked_helper().await;
|
||||
|
||||
match self.alias {
|
||||
let bucket = match self.alias {
|
||||
BucketAliasEnum::Global { global_alias } => {
|
||||
helper
|
||||
.unset_global_bucket_alias(bucket_id, &global_alias)
|
||||
.await?;
|
||||
.await?
|
||||
}
|
||||
BucketAliasEnum::Local {
|
||||
local_alias,
|
||||
|
@ -597,12 +598,12 @@ impl RequestHandler for RemoveBucketAliasRequest {
|
|||
} => {
|
||||
helper
|
||||
.unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
|
||||
.await?;
|
||||
.await?
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(RemoveBucketAliasResponse(
|
||||
bucket_info_results(garage, bucket_id).await?,
|
||||
bucket_info_results(garage, bucket).await?,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -151,12 +151,11 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
|
|||
(domain.to_string(), true)
|
||||
};
|
||||
|
||||
let bucket_id = match garage
|
||||
let bucket = match garage
|
||||
.bucket_helper()
|
||||
.resolve_global_bucket_name(&bucket_name)
|
||||
.await?
|
||||
.resolve_global_bucket_fast(&bucket_name)?
|
||||
{
|
||||
Some(bucket_id) => bucket_id,
|
||||
Some(b) => b,
|
||||
None => return Ok(false),
|
||||
};
|
||||
|
||||
|
@ -164,11 +163,6 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
|
|||
return Ok(true);
|
||||
}
|
||||
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.get_existing_bucket(bucket_id)
|
||||
.await?;
|
||||
|
||||
let bucket_state = bucket.state.as_option().unwrap();
|
||||
let bucket_website_config = bucket_state.website_config.get();
|
||||
|
||||
|
|
|
@ -9,9 +9,7 @@ use hyper::{body::Body, body::Incoming as IncomingBody, Request, Response, Statu
|
|||
use garage_model::bucket_table::{BucketParams, CorsRule as GarageCorsRule};
|
||||
use garage_model::garage::Garage;
|
||||
|
||||
use crate::common_error::{
|
||||
helper_error_as_internal, CommonError, OkOrBadRequest, OkOrInternalError,
|
||||
};
|
||||
use crate::common_error::{CommonError, OkOrBadRequest, OkOrInternalError};
|
||||
use crate::helpers::*;
|
||||
|
||||
pub fn find_matching_cors_rule<'a, B>(
|
||||
|
@ -76,7 +74,7 @@ pub fn add_cors_headers(
|
|||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn handle_options_api(
|
||||
pub fn handle_options_api(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<IncomingBody>,
|
||||
bucket_name: Option<String>,
|
||||
|
@ -93,16 +91,8 @@ pub async fn handle_options_api(
|
|||
// OPTIONS calls are not auhtenticated).
|
||||
if let Some(bn) = bucket_name {
|
||||
let helper = garage.bucket_helper();
|
||||
let bucket_id = helper
|
||||
.resolve_global_bucket_name(&bn)
|
||||
.await
|
||||
.map_err(helper_error_as_internal)?;
|
||||
if let Some(id) = bucket_id {
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.get_existing_bucket(id)
|
||||
.await
|
||||
.map_err(helper_error_as_internal)?;
|
||||
let bucket_opt = helper.resolve_global_bucket_fast(&bn)?;
|
||||
if let Some(bucket) = bucket_opt {
|
||||
let bucket_params = bucket.state.into_option().unwrap();
|
||||
handle_options_for_bucket(req, &bucket_params)
|
||||
} else {
|
||||
|
|
|
@ -64,12 +64,12 @@ pub struct VerifiedRequest {
|
|||
pub content_sha256_header: ContentSha256Header,
|
||||
}
|
||||
|
||||
pub async fn verify_request(
|
||||
pub fn verify_request(
|
||||
garage: &Garage,
|
||||
mut req: Request<IncomingBody>,
|
||||
service: &'static str,
|
||||
) -> Result<VerifiedRequest, Error> {
|
||||
let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?;
|
||||
let checked_signature = payload::check_payload_signature(&garage, &mut req, service)?;
|
||||
|
||||
let request = streaming::parse_streaming_body(
|
||||
req,
|
||||
|
|
|
@ -32,7 +32,7 @@ pub struct CheckedSignature {
|
|||
pub signature_header: Option<String>,
|
||||
}
|
||||
|
||||
pub async fn check_payload_signature(
|
||||
pub fn check_payload_signature(
|
||||
garage: &Garage,
|
||||
request: &mut Request<IncomingBody>,
|
||||
service: &'static str,
|
||||
|
@ -43,9 +43,9 @@ pub async fn check_payload_signature(
|
|||
// We check for presigned-URL-style authentication first, because
|
||||
// the browser or something else could inject an Authorization header
|
||||
// that is totally unrelated to AWS signatures.
|
||||
check_presigned_signature(garage, service, request, query).await
|
||||
check_presigned_signature(garage, service, request, query)
|
||||
} else if request.headers().contains_key(AUTHORIZATION) {
|
||||
check_standard_signature(garage, service, request, query).await
|
||||
check_standard_signature(garage, service, request, query)
|
||||
} else {
|
||||
// Unsigned (anonymous) request
|
||||
let content_sha256 = request
|
||||
|
@ -93,7 +93,7 @@ fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Heade
|
|||
}
|
||||
}
|
||||
|
||||
async fn check_standard_signature(
|
||||
fn check_standard_signature(
|
||||
garage: &Garage,
|
||||
service: &'static str,
|
||||
request: &Request<IncomingBody>,
|
||||
|
@ -128,7 +128,7 @@ async fn check_standard_signature(
|
|||
trace!("canonical request:\n{}", canonical_request);
|
||||
trace!("string to sign:\n{}", string_to_sign);
|
||||
|
||||
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
|
||||
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?;
|
||||
|
||||
let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?;
|
||||
|
||||
|
@ -139,7 +139,7 @@ async fn check_standard_signature(
|
|||
})
|
||||
}
|
||||
|
||||
async fn check_presigned_signature(
|
||||
fn check_presigned_signature(
|
||||
garage: &Garage,
|
||||
service: &'static str,
|
||||
request: &mut Request<IncomingBody>,
|
||||
|
@ -178,7 +178,7 @@ async fn check_presigned_signature(
|
|||
trace!("canonical request (presigned url):\n{}", canonical_request);
|
||||
trace!("string to sign (presigned url):\n{}", string_to_sign);
|
||||
|
||||
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
|
||||
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes())?;
|
||||
|
||||
// In the page on presigned URLs, AWS specifies that if a signed query
|
||||
// parameter and a signed header of the same name have different values,
|
||||
|
@ -378,7 +378,7 @@ pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
|
|||
Ok(Utc.from_utc_datetime(&date))
|
||||
}
|
||||
|
||||
pub async fn verify_v4(
|
||||
pub fn verify_v4(
|
||||
garage: &Garage,
|
||||
service: &str,
|
||||
auth: &Authorization,
|
||||
|
@ -391,8 +391,7 @@ pub async fn verify_v4(
|
|||
|
||||
let key = garage
|
||||
.key_table
|
||||
.get(&EmptyKey, &auth.key_id)
|
||||
.await?
|
||||
.get_local(&EmptyKey, &auth.key_id)?
|
||||
.filter(|k| !k.state.is_deleted())
|
||||
.ok_or_else(|| Error::forbidden(format!("No such key: {}", &auth.key_id)))?;
|
||||
let key_p = key.params().unwrap();
|
||||
|
|
|
@ -77,25 +77,19 @@ impl ApiHandler for K2VApiServer {
|
|||
// The OPTIONS method is processed early, before we even check for an API key
|
||||
if let Endpoint::Options = endpoint {
|
||||
let options_res = handle_options_api(garage, &req, Some(bucket_name))
|
||||
.await
|
||||
.ok_or_bad_request("Error handling OPTIONS")?;
|
||||
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
|
||||
}
|
||||
|
||||
let verified_request = verify_request(&garage, req, "k2v").await?;
|
||||
let verified_request = verify_request(&garage, req, "k2v")?;
|
||||
let req = verified_request.request;
|
||||
let api_key = verified_request.access_key;
|
||||
|
||||
let bucket_id = garage
|
||||
.bucket_helper()
|
||||
.resolve_bucket(&bucket_name, &api_key)
|
||||
.await
|
||||
.map_err(pass_helper_error)?;
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.get_existing_bucket(bucket_id)
|
||||
.await
|
||||
.map_err(helper_error_as_internal)?;
|
||||
.resolve_bucket_fast(&bucket_name, &api_key)
|
||||
.map_err(pass_helper_error)?;
|
||||
let bucket_id = bucket.id;
|
||||
let bucket_params = bucket.state.into_option().unwrap();
|
||||
|
||||
let allowed = match endpoint.authorization_type() {
|
||||
|
|
|
@ -2,8 +2,8 @@ use err_derive::Error;
|
|||
use hyper::header::HeaderValue;
|
||||
use hyper::{HeaderMap, StatusCode};
|
||||
|
||||
pub(crate) use garage_api_common::common_error::pass_helper_error;
|
||||
use garage_api_common::common_error::{commonErrorDerivative, CommonError};
|
||||
pub(crate) use garage_api_common::common_error::{helper_error_as_internal, pass_helper_error};
|
||||
pub use garage_api_common::common_error::{
|
||||
CommonErrorDerivative, OkOrBadRequest, OkOrInternalError,
|
||||
};
|
||||
|
|
|
@ -118,11 +118,11 @@ impl ApiHandler for S3ApiServer {
|
|||
return handle_post_object(garage, req, bucket_name.unwrap()).await;
|
||||
}
|
||||
if let Endpoint::Options = endpoint {
|
||||
let options_res = handle_options_api(garage, &req, bucket_name).await?;
|
||||
let options_res = handle_options_api(garage, &req, bucket_name)?;
|
||||
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
|
||||
}
|
||||
|
||||
let verified_request = verify_request(&garage, req, "s3").await?;
|
||||
let verified_request = verify_request(&garage, req, "s3")?;
|
||||
let req = verified_request.request;
|
||||
let api_key = verified_request.access_key;
|
||||
|
||||
|
@ -140,15 +140,11 @@ impl ApiHandler for S3ApiServer {
|
|||
return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await;
|
||||
}
|
||||
|
||||
let bucket_id = garage
|
||||
.bucket_helper()
|
||||
.resolve_bucket(&bucket_name, &api_key)
|
||||
.await
|
||||
.map_err(pass_helper_error)?;
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.get_existing_bucket(bucket_id)
|
||||
.await?;
|
||||
.resolve_bucket_fast(&bucket_name, &api_key)
|
||||
.map_err(pass_helper_error)?;
|
||||
let bucket_id = bucket.id;
|
||||
let bucket_params = bucket.state.into_option().unwrap();
|
||||
|
||||
let allowed = match endpoint.authorization_type() {
|
||||
|
|
|
@ -143,21 +143,16 @@ pub async fn handle_create_bucket(
|
|||
let api_key = helper.key().get_existing_key(api_key_id).await?;
|
||||
let key_params = api_key.params().unwrap();
|
||||
|
||||
let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name)
|
||||
{
|
||||
Some(*bucket_id)
|
||||
} else {
|
||||
helper
|
||||
.bucket()
|
||||
.resolve_global_bucket_name(&bucket_name)
|
||||
.await?
|
||||
};
|
||||
let existing_bucket = helper
|
||||
.bucket()
|
||||
.resolve_bucket(&bucket_name, &api_key.key_id)
|
||||
.await?;
|
||||
|
||||
if let Some(bucket_id) = existing_bucket {
|
||||
if let Some(bucket) = existing_bucket {
|
||||
// Check we have write or owner permission on the bucket,
|
||||
// in that case it's fine, return 200 OK, bucket exists;
|
||||
// otherwise return a forbidden error.
|
||||
let kp = api_key.bucket_permissions(&bucket_id);
|
||||
let kp = api_key.bucket_permissions(&bucket.id);
|
||||
if !(kp.allow_write || kp.allow_owner) {
|
||||
return Err(CommonError::BucketAlreadyExists.into());
|
||||
}
|
||||
|
|
|
@ -683,16 +683,15 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object,
|
|||
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
|
||||
|
||||
let (source_bucket, source_key) = parse_bucket_key(©_source, None)?;
|
||||
let source_bucket_id = garage
|
||||
let source_bucket = garage
|
||||
.bucket_helper()
|
||||
.resolve_bucket(&source_bucket.to_string(), api_key)
|
||||
.await
|
||||
.resolve_bucket_fast(&source_bucket.to_string(), api_key)
|
||||
.map_err(pass_helper_error)?;
|
||||
|
||||
if !api_key.allow_read(&source_bucket_id) {
|
||||
if !api_key.allow_read(&source_bucket.id) {
|
||||
return Err(Error::forbidden(format!(
|
||||
"Reading from bucket {} not allowed for this key",
|
||||
source_bucket
|
||||
"Reading from bucket {:?} not allowed for this key",
|
||||
source_bucket.id
|
||||
)));
|
||||
}
|
||||
|
||||
|
@ -700,7 +699,7 @@ async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object,
|
|||
|
||||
let source_object = garage
|
||||
.object_table
|
||||
.get(&source_bucket_id, &source_key.to_string())
|
||||
.get(&source_bucket.id, &source_key.to_string())
|
||||
.await?
|
||||
.ok_or(Error::NoSuchKey)?;
|
||||
|
||||
|
|
|
@ -104,22 +104,18 @@ pub async fn handle_post_object(
|
|||
key.to_owned()
|
||||
};
|
||||
|
||||
let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes()).await?;
|
||||
let api_key = verify_v4(&garage, "s3", &authorization, policy.as_bytes())?;
|
||||
|
||||
let bucket_id = garage
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.resolve_bucket(&bucket_name, &api_key)
|
||||
.await
|
||||
.resolve_bucket_fast(&bucket_name, &api_key)
|
||||
.map_err(pass_helper_error)?;
|
||||
let bucket_id = bucket.id;
|
||||
|
||||
if !api_key.allow_write(&bucket_id) {
|
||||
return Err(Error::forbidden("Operation is not allowed for this key."));
|
||||
}
|
||||
|
||||
let bucket = garage
|
||||
.bucket_helper()
|
||||
.get_existing_bucket(bucket_id)
|
||||
.await?;
|
||||
let bucket_params = bucket.state.into_option().unwrap();
|
||||
let matching_cors_rule = find_matching_cors_rule(
|
||||
&bucket_params,
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::time::Duration;
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::OkOrMessage;
|
||||
use garage_util::error::{Error as GarageError, OkOrMessage};
|
||||
use garage_util::time::*;
|
||||
|
||||
use garage_table::util::*;
|
||||
|
@ -16,10 +16,25 @@ pub struct BucketHelper<'a>(pub(crate) &'a Garage);
|
|||
|
||||
#[allow(clippy::ptr_arg)]
|
||||
impl<'a> BucketHelper<'a> {
|
||||
pub async fn resolve_global_bucket_name(
|
||||
// ================
|
||||
// Local functions to find buckets FAST.
|
||||
// This is only for the fast path in API requests.
|
||||
// They do not conserve the read-after-write guarantee.
|
||||
// ================
|
||||
|
||||
/// Return bucket ID corresponding to global bucket name.
|
||||
///
|
||||
/// The name can be of two forms:
|
||||
/// 1. A global bucket alias
|
||||
/// 2. The full ID of a bucket encoded in hex
|
||||
///
|
||||
/// This will not do any network interaction to check the alias table,
|
||||
/// it will only check the local copy of the table.
|
||||
/// As a consequence, it does not conserve read-after-write guarantees.
|
||||
pub fn resolve_global_bucket_fast(
|
||||
&self,
|
||||
bucket_name: &String,
|
||||
) -> Result<Option<Uuid>, Error> {
|
||||
) -> Result<Option<Bucket>, GarageError> {
|
||||
// Bucket names in Garage are aliases, true bucket identifiers
|
||||
// are 32-byte UUIDs. This function resolves bucket names into
|
||||
// their full identifier by looking up in the bucket_alias_table.
|
||||
|
@ -32,38 +47,129 @@ impl<'a> BucketHelper<'a> {
|
|||
let hexbucket = hex::decode(bucket_name.as_str())
|
||||
.ok()
|
||||
.and_then(|by| Uuid::try_from(&by));
|
||||
if let Some(bucket_id) = hexbucket {
|
||||
Ok(self
|
||||
.0
|
||||
.bucket_table
|
||||
.get(&EmptyKey, &bucket_id)
|
||||
.await?
|
||||
.filter(|x| !x.state.is_deleted())
|
||||
.map(|_| bucket_id))
|
||||
} else {
|
||||
Ok(self
|
||||
.0
|
||||
.bucket_alias_table
|
||||
.get(&EmptyKey, bucket_name)
|
||||
.await?
|
||||
.and_then(|x| *x.state.get()))
|
||||
}
|
||||
let bucket_id = match hexbucket {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
let alias = self
|
||||
.0
|
||||
.bucket_alias_table
|
||||
.get_local(&EmptyKey, bucket_name)?
|
||||
.and_then(|x| *x.state.get());
|
||||
match alias {
|
||||
Some(id) => id,
|
||||
None => return Ok(None),
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(self
|
||||
.0
|
||||
.bucket_table
|
||||
.get_local(&EmptyKey, &bucket_id)?
|
||||
.filter(|x| !x.state.is_deleted()))
|
||||
}
|
||||
|
||||
/// Return bucket ID corresponding to a bucket name from the perspective of
|
||||
/// a given access key.
|
||||
///
|
||||
/// The name can be of three forms:
|
||||
/// 1. A global bucket alias
|
||||
/// 2. A local bucket alias
|
||||
/// 3. The full ID of a bucket encoded in hex
|
||||
///
|
||||
/// This will not do any network interaction to check the alias table,
|
||||
/// it will only check the local copy of the table.
|
||||
/// As a consequence, it does not conserve read-after-write guarantees.
|
||||
///
|
||||
/// This function transforms non-existing buckets in a NoSuchBucket error.
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn resolve_bucket(&self, bucket_name: &String, api_key: &Key) -> Result<Uuid, Error> {
|
||||
pub fn resolve_bucket_fast(
|
||||
&self,
|
||||
bucket_name: &String,
|
||||
api_key: &Key,
|
||||
) -> Result<Bucket, Error> {
|
||||
let api_key_params = api_key
|
||||
.state
|
||||
.as_option()
|
||||
.ok_or_message("Key should not be deleted at this point")?;
|
||||
|
||||
if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
|
||||
Ok(*bucket_id)
|
||||
} else {
|
||||
let bucket_opt =
|
||||
if let Some(Some(bucket_id)) = api_key_params.local_aliases.get(bucket_name) {
|
||||
self.0
|
||||
.bucket_table
|
||||
.get_local(&EmptyKey, &bucket_id)?
|
||||
.filter(|x| !x.state.is_deleted())
|
||||
} else {
|
||||
self.resolve_global_bucket_fast(bucket_name)?
|
||||
};
|
||||
bucket_opt.ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))
|
||||
}
|
||||
|
||||
// ================
|
||||
// Global functions that do quorum reads/writes,
|
||||
// for admin operations.
|
||||
// ================
|
||||
|
||||
/// See resolve_global_bucket_fast,
|
||||
/// but this one does a quorum read to ensure consistency
|
||||
pub async fn resolve_global_bucket(
|
||||
&self,
|
||||
bucket_name: &String,
|
||||
) -> Result<Option<Bucket>, GarageError> {
|
||||
let hexbucket = hex::decode(bucket_name.as_str())
|
||||
.ok()
|
||||
.and_then(|by| Uuid::try_from(&by));
|
||||
let bucket_id = match hexbucket {
|
||||
Some(id) => id,
|
||||
None => {
|
||||
let alias = self
|
||||
.0
|
||||
.bucket_alias_table
|
||||
.get(&EmptyKey, bucket_name)
|
||||
.await?
|
||||
.and_then(|x| *x.state.get());
|
||||
match alias {
|
||||
Some(id) => id,
|
||||
None => return Ok(None),
|
||||
}
|
||||
}
|
||||
};
|
||||
Ok(self
|
||||
.0
|
||||
.bucket_table
|
||||
.get(&EmptyKey, &bucket_id)
|
||||
.await?
|
||||
.filter(|x| !x.state.is_deleted()))
|
||||
}
|
||||
|
||||
/// See resolve_bucket_fast, but this one does a quorum read to ensure consistency.
|
||||
/// Also, this function does not return a HelperError::NoSuchBucket if bucket is absent.
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn resolve_bucket(
|
||||
&self,
|
||||
bucket_name: &String,
|
||||
key_id: &String,
|
||||
) -> Result<Option<Bucket>, GarageError> {
|
||||
let local_alias = self
|
||||
.0
|
||||
.key_table
|
||||
.get(&EmptyKey, &key_id)
|
||||
.await?
|
||||
.and_then(|k| k.state.into_option())
|
||||
.ok_or_else(|| GarageError::Message(format!("access key {} has been deleted", key_id)))?
|
||||
.local_aliases
|
||||
.get(bucket_name)
|
||||
.copied()
|
||||
.flatten();
|
||||
|
||||
if let Some(bucket_id) = local_alias {
|
||||
Ok(self
|
||||
.resolve_global_bucket_name(bucket_name)
|
||||
.0
|
||||
.bucket_table
|
||||
.get(&EmptyKey, &bucket_id)
|
||||
.await?
|
||||
.ok_or_else(|| Error::NoSuchBucket(bucket_name.to_string()))?)
|
||||
.filter(|x| !x.state.is_deleted()))
|
||||
} else {
|
||||
Ok(self.resolve_global_bucket(bucket_name).await?)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -6,6 +6,7 @@ use garage_util::time::*;
|
|||
use garage_table::util::*;
|
||||
|
||||
use crate::bucket_alias_table::*;
|
||||
use crate::bucket_table::*;
|
||||
use crate::garage::Garage;
|
||||
use crate::helper::bucket::BucketHelper;
|
||||
use crate::helper::error::*;
|
||||
|
@ -56,7 +57,7 @@ impl<'a> LockedHelper<'a> {
|
|||
&self,
|
||||
bucket_id: Uuid,
|
||||
alias_name: &String,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<Bucket, Error> {
|
||||
if !is_valid_bucket_name(alias_name) {
|
||||
return Err(Error::InvalidBucketName(alias_name.to_string()));
|
||||
}
|
||||
|
@ -100,7 +101,7 @@ impl<'a> LockedHelper<'a> {
|
|||
bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true);
|
||||
self.0.bucket_table.insert(&bucket).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(bucket)
|
||||
}
|
||||
|
||||
/// Unsets an alias for a bucket in global namespace.
|
||||
|
@ -112,7 +113,7 @@ impl<'a> LockedHelper<'a> {
|
|||
&self,
|
||||
bucket_id: Uuid,
|
||||
alias_name: &String,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<Bucket, Error> {
|
||||
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
|
||||
let bucket_state = bucket.state.as_option_mut().unwrap();
|
||||
|
||||
|
@ -156,7 +157,7 @@ impl<'a> LockedHelper<'a> {
|
|||
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
|
||||
self.0.bucket_table.insert(&bucket).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(bucket)
|
||||
}
|
||||
|
||||
/// Ensures a bucket does not have a certain global alias.
|
||||
|
@ -215,7 +216,7 @@ impl<'a> LockedHelper<'a> {
|
|||
bucket_id: Uuid,
|
||||
key_id: &String,
|
||||
alias_name: &String,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<Bucket, Error> {
|
||||
let key_helper = KeyHelper(self.0);
|
||||
|
||||
if !is_valid_bucket_name(alias_name) {
|
||||
|
@ -257,7 +258,7 @@ impl<'a> LockedHelper<'a> {
|
|||
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
|
||||
self.0.bucket_table.insert(&bucket).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(bucket)
|
||||
}
|
||||
|
||||
/// Unsets an alias for a bucket in the local namespace of a key.
|
||||
|
@ -271,7 +272,7 @@ impl<'a> LockedHelper<'a> {
|
|||
bucket_id: Uuid,
|
||||
key_id: &String,
|
||||
alias_name: &String,
|
||||
) -> Result<(), Error> {
|
||||
) -> Result<Bucket, Error> {
|
||||
let key_helper = KeyHelper(self.0);
|
||||
|
||||
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
|
||||
|
@ -330,7 +331,7 @@ impl<'a> LockedHelper<'a> {
|
|||
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
|
||||
self.0.bucket_table.insert(&bucket).await?;
|
||||
|
||||
Ok(())
|
||||
Ok(bucket)
|
||||
}
|
||||
|
||||
/// Sets permissions for a key on a bucket.
|
||||
|
|
|
@ -451,10 +451,7 @@ impl K2VRpcHandler {
|
|||
|
||||
let mut value = self
|
||||
.item_table
|
||||
.data
|
||||
.read_entry(&key.partition, &key.sort_key)?
|
||||
.map(|bytes| self.item_table.data.decode_entry(&bytes[..]))
|
||||
.transpose()?
|
||||
.get_local(&key.partition, &key.sort_key)?
|
||||
.unwrap_or_else(|| {
|
||||
K2VItem::new(
|
||||
key.partition.bucket_id,
|
||||
|
|
|
@ -482,6 +482,15 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
|||
Ok(ret_vec)
|
||||
}
|
||||
|
||||
pub fn get_local(
|
||||
self: &Arc<Self>,
|
||||
partition_key: &F::P,
|
||||
sort_key: &F::S,
|
||||
) -> Result<Option<F::E>, Error> {
|
||||
let bytes = self.data.read_entry(partition_key, sort_key)?;
|
||||
bytes.map(|b| self.data.decode_entry(&b)).transpose()
|
||||
}
|
||||
|
||||
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
|
||||
|
||||
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
|
||||
|
|
Loading…
Add table
Reference in a new issue