Merge branch 'main' into next-0.10

This commit is contained in:
Alex 2024-02-22 15:45:45 +01:00
commit 59f61c966a
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
16 changed files with 523 additions and 488 deletions

View file

@ -18,7 +18,7 @@ api_bind_addr = "0.0.0.0:3903"
```
This will allow anyone to scrape Prometheus metrics by fetching
`http://localhost:3093/metrics`. If you want to restrict access
`http://localhost:3903/metrics`. If you want to restrict access
to the exported metrics, set the `metrics_token` configuration value
to a bearer token to be used when fetching the metrics endpoint.

View file

@ -250,7 +250,7 @@ garage bucket info nextcloud-bucket
```
## Uploading and downlading from Garage
## Uploading and downloading from Garage
To download and upload files on garage, we can use a third-party tool named `awscli`.

View file

@ -8,8 +8,8 @@ listen address is specified in the `[admin]` section of the configuration
file (see [configuration file
reference](@/documentation/reference-manual/configuration.md))
**WARNING.** At this point, there is no comittement to stability of the APIs described in this document.
We will bump the version numbers prefixed to each API endpoint at each time the syntax
**WARNING.** At this point, there is no commitment to the stability of the APIs described in this document.
We will bump the version numbers prefixed to each API endpoint each time the syntax
or semantics change, meaning that code that relies on these endpoint will break
when changes are introduced.
@ -22,7 +22,7 @@ Versions:
## Access control
The admin API uses two different tokens for acces control, that are specified in the config file's `[admin]` section:
The admin API uses two different tokens for access control, that are specified in the config file's `[admin]` section:
- `metrics_token`: the token for accessing the Metrics endpoint (if this token
is not set in the config file, the Metrics endpoint can be accessed without
@ -88,8 +88,8 @@ Consult the full health check API endpoint at /v0/health for more details
### On-demand TLS `GET /check`
To prevent abuses for on-demand TLS, Caddy developpers have specified an endpoint that can be queried by the reverse proxy
to know if a given domain is allowed to get a certificate. Garage implements this endpoints to tell if a given domain is handled by Garage or is garbage.
To prevent abuse for on-demand TLS, Caddy developers have specified an endpoint that can be queried by the reverse proxy
to know if a given domain is allowed to get a certificate. Garage implements these endpoints to tell if a given domain is handled by Garage or is garbage.
Garage responds with the following logic:
- If the domain matches the pattern `<bucket-name>.<s3_api.root_domain>`, returns 200 OK
@ -102,7 +102,7 @@ You must manually declare the domain in your reverse-proxy. Idem for K2V.*
*Note 2: buckets in a user's namespace are not supported yet by this endpoint. This is a limitation of this endpoint currently.*
**Example:** Suppose a Garage instance configured with `s3_api.root_domain = .s3.garage.localhost` and `s3_web.root_domain = .web.garage.localhost`.
**Example:** Suppose a Garage instance is configured with `s3_api.root_domain = .s3.garage.localhost` and `s3_web.root_domain = .web.garage.localhost`.
With a private `media` bucket (name in the global namespace, website is disabled), the endpoint will feature the following behavior:

View file

@ -8,9 +8,9 @@ listen address is specified in the `[admin]` section of the configuration
file (see [configuration file
reference](@/documentation/reference-manual/configuration.md))
**WARNING.** At this point, there is no comittement to stability of the APIs described in this document.
We will bump the version numbers prefixed to each API endpoint at each time the syntax
or semantics change, meaning that code that relies on these endpoint will break
**WARNING.** At this point, there is no commitment to the stability of the APIs described in this document.
We will bump the version numbers prefixed to each API endpoint each time the syntax
or semantics change, meaning that code that relies on these endpoints will break
when changes are introduced.
The Garage administration API was introduced in version 0.7.2, this document
@ -19,7 +19,7 @@ does not apply to older versions of Garage.
## Access control
The admin API uses two different tokens for acces control, that are specified in the config file's `[admin]` section:
The admin API uses two different tokens for access control, that are specified in the config file's `[admin]` section:
- `metrics_token`: the token for accessing the Metrics endpoint (if this token
is not set in the config file, the Metrics endpoint can be accessed without

View file

@ -273,6 +273,8 @@ pub async fn handle_create_bucket(
) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
let helper = garage.locked_helper().await;
if let Some(ga) = &req.global_alias {
if !is_valid_bucket_name(ga) {
return Err(Error::bad_request(format!(
@ -296,10 +298,7 @@ pub async fn handle_create_bucket(
)));
}
let key = garage
.key_helper()
.get_existing_key(&la.access_key_id)
.await?;
let key = helper.key().get_existing_key(&la.access_key_id).await?;
let state = key.state.as_option().unwrap();
if matches!(state.local_aliases.get(&la.alias), Some(_)) {
return Err(Error::bad_request("Local alias already exists"));
@ -310,21 +309,16 @@ pub async fn handle_create_bucket(
garage.bucket_table.insert(&bucket).await?;
if let Some(ga) = &req.global_alias {
garage
.bucket_helper()
.set_global_bucket_alias(bucket.id, ga)
.await?;
helper.set_global_bucket_alias(bucket.id, ga).await?;
}
if let Some(la) = &req.local_alias {
garage
.bucket_helper()
helper
.set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
.await?;
if la.allow.read || la.allow.write || la.allow.owner {
garage
.bucket_helper()
helper
.set_bucket_key_permissions(
bucket.id,
&la.access_key_id,
@ -362,15 +356,15 @@ pub async fn handle_delete_bucket(
garage: &Arc<Garage>,
id: String,
) -> Result<Response<ResBody>, Error> {
let helper = garage.bucket_helper();
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&id)?;
let mut bucket = helper.get_existing_bucket(bucket_id).await?;
let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
let state = bucket.state.as_option().unwrap();
// Check bucket is empty
if !helper.is_bucket_empty(bucket_id).await? {
if !helper.bucket().is_bucket_empty(bucket_id).await? {
return Err(CommonError::BucketNotEmpty.into());
}
@ -476,18 +470,14 @@ pub async fn handle_bucket_change_key_perm(
) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&req.bucket_id)?;
let bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
let state = bucket.state.as_option().unwrap();
let key = garage
.key_helper()
.get_existing_key(&req.access_key_id)
.await?;
let key = helper.key().get_existing_key(&req.access_key_id).await?;
let mut perm = state
.authorized_keys
@ -505,8 +495,7 @@ pub async fn handle_bucket_change_key_perm(
perm.allow_owner = new_perm_flag;
}
garage
.bucket_helper()
helper
.set_bucket_key_permissions(bucket.id, &key.key_id, perm)
.await?;
@ -530,10 +519,9 @@ pub async fn handle_global_alias_bucket(
) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
.bucket_helper()
.set_global_bucket_alias(bucket_id, &alias)
.await?;
let helper = garage.locked_helper().await;
helper.set_global_bucket_alias(bucket_id, &alias).await?;
bucket_info_results(garage, bucket_id).await
}
@ -545,10 +533,9 @@ pub async fn handle_global_unalias_bucket(
) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
.bucket_helper()
.unset_global_bucket_alias(bucket_id, &alias)
.await?;
let helper = garage.locked_helper().await;
helper.unset_global_bucket_alias(bucket_id, &alias).await?;
bucket_info_results(garage, bucket_id).await
}
@ -561,8 +548,9 @@ pub async fn handle_local_alias_bucket(
) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
.bucket_helper()
let helper = garage.locked_helper().await;
helper
.set_local_bucket_alias(bucket_id, &access_key_id, &alias)
.await?;
@ -577,8 +565,9 @@ pub async fn handle_local_unalias_bucket(
) -> Result<Response<ResBody>, Error> {
let bucket_id = parse_bucket_id(&bucket_id)?;
garage
.bucket_helper()
let helper = garage.locked_helper().await;
helper
.unset_local_bucket_alias(bucket_id, &access_key_id, &alias)
.await?;

View file

@ -151,11 +151,11 @@ pub async fn handle_delete_key(
garage: &Arc<Garage>,
id: String,
) -> Result<Response<ResBody>, Error> {
let mut key = garage.key_helper().get_existing_key(&id).await?;
let helper = garage.locked_helper().await;
key.state.as_option().unwrap();
let mut key = helper.key().get_existing_key(&id).await?;
garage.key_helper().delete_key(&mut key).await?;
helper.delete_key(&mut key).await?;
Ok(Response::builder()
.status(StatusCode::NO_CONTENT)

View file

@ -148,7 +148,14 @@ impl ApiHandler for S3ApiServer {
// Special code path for CreateBucket API endpoint
if let Endpoint::CreateBucket {} = endpoint {
return handle_create_bucket(&garage, req, content_sha256, api_key, bucket_name).await;
return handle_create_bucket(
&garage,
req,
content_sha256,
&api_key.key_id,
bucket_name,
)
.await;
}
let bucket_id = garage
@ -261,7 +268,7 @@ impl ApiHandler for S3ApiServer {
Ok(response)
}
Endpoint::DeleteBucket {} => {
handle_delete_bucket(&garage, bucket_id, bucket_name, api_key).await
handle_delete_bucket(&garage, bucket_id, bucket_name, &api_key.key_id).await
}
Endpoint::GetBucketLocation {} => handle_get_bucket_location(garage),
Endpoint::GetBucketVersioning {} => handle_get_bucket_versioning(),

View file

@ -122,7 +122,7 @@ pub async fn handle_create_bucket(
garage: &Garage,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
api_key: Key,
api_key_id: &String,
bucket_name: String,
) -> Result<Response<ResBody>, Error> {
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
@ -144,16 +144,18 @@ pub async fn handle_create_bucket(
}
}
let key_params = api_key
.params()
.ok_or_internal_error("Key should not be deleted at this point")?;
let helper = garage.locked_helper().await;
// refetch API key after taking lock to ensure up-to-date data
let api_key = helper.key().get_existing_key(api_key_id).await?;
let key_params = api_key.params().unwrap();
let existing_bucket = if let Some(Some(bucket_id)) = key_params.local_aliases.get(&bucket_name)
{
Some(*bucket_id)
} else {
garage
.bucket_helper()
helper
.bucket()
.resolve_global_bucket_name(&bucket_name)
.await?
};
@ -187,13 +189,11 @@ pub async fn handle_create_bucket(
let bucket = Bucket::new();
garage.bucket_table.insert(&bucket).await?;
garage
.bucket_helper()
helper
.set_bucket_key_permissions(bucket.id, &api_key.key_id, BucketKeyPerm::ALL_PERMISSIONS)
.await?;
garage
.bucket_helper()
helper
.set_local_bucket_alias(bucket.id, &api_key.key_id, &bucket_name)
.await?;
}
@ -208,18 +208,16 @@ pub async fn handle_delete_bucket(
garage: &Garage,
bucket_id: Uuid,
bucket_name: String,
api_key: Key,
api_key_id: &String,
) -> Result<Response<ResBody>, Error> {
let key_params = api_key
.params()
.ok_or_internal_error("Key should not be deleted at this point")?;
let helper = garage.locked_helper().await;
let api_key = helper.key().get_existing_key(api_key_id).await?;
let key_params = api_key.params().unwrap();
let is_local_alias = matches!(key_params.local_aliases.get(&bucket_name), Some(Some(_)));
let mut bucket = garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option().unwrap();
// If the bucket has no other aliases, this is a true deletion.
@ -243,28 +241,25 @@ pub async fn handle_delete_bucket(
// Delete bucket
// Check bucket is empty
if !garage.bucket_helper().is_bucket_empty(bucket_id).await? {
if !helper.bucket().is_bucket_empty(bucket_id).await? {
return Err(CommonError::BucketNotEmpty.into());
}
// --- done checking, now commit ---
// 1. delete bucket alias
if is_local_alias {
garage
.bucket_helper()
helper
.unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
.await?;
} else {
garage
.bucket_helper()
helper
.unset_global_bucket_alias(bucket_id, &bucket_name)
.await?;
}
// 2. delete authorization from keys that had access
for (key_id, _) in bucket.authorized_keys() {
garage
.bucket_helper()
helper
.set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
.await?;
}
@ -274,14 +269,12 @@ pub async fn handle_delete_bucket(
garage.bucket_table.insert(&bucket).await?;
} else if is_local_alias {
// Just unalias
garage
.bucket_helper()
helper
.unset_local_bucket_alias(bucket_id, &api_key.key_id, &bucket_name)
.await?;
} else {
// Just unalias (but from global namespace)
garage
.bucket_helper()
helper
.unset_global_bucket_alias(bucket_id, &bucket_name)
.await?;
}

View file

@ -134,6 +134,8 @@ impl AdminRpcHandler {
)));
}
let helper = self.garage.locked_helper().await;
if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
if alias.state.get().is_some() {
return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
@ -145,18 +147,16 @@ impl AdminRpcHandler {
let bucket = Bucket::new();
self.garage.bucket_table.insert(&bucket).await?;
self.garage
.bucket_helper()
.set_global_bucket_alias(bucket.id, name)
.await?;
helper.set_global_bucket_alias(bucket.id, name).await?;
Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
}
async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.resolve_global_bucket_name(&query.name)
.await?
.ok_or_bad_request("Bucket not found")?;
@ -174,7 +174,7 @@ impl AdminRpcHandler {
.await?;
// Check bucket doesn't have other aliases
let mut bucket = helper.get_existing_bucket(bucket_id).await?;
let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option().unwrap();
if bucket_state
.aliases
@ -195,7 +195,7 @@ impl AdminRpcHandler {
}
// Check bucket is empty
if !helper.is_bucket_empty(bucket_id).await? {
if !helper.bucket().is_bucket_empty(bucket_id).await? {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name
@ -231,16 +231,16 @@ impl AdminRpcHandler {
}
async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
let key_helper = self.garage.key_helper();
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.resolve_global_bucket_name(&query.existing_bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
if let Some(key_pattern) = &query.local {
let key = key_helper.get_existing_matching_key(key_pattern).await?;
let key = helper.key().get_existing_matching_key(key_pattern).await?;
helper
.set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
@ -261,11 +261,10 @@ impl AdminRpcHandler {
}
async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
let key_helper = self.garage.key_helper();
let helper = self.garage.locked_helper().await;
if let Some(key_pattern) = &query.local {
let key = key_helper.get_existing_matching_key(key_pattern).await?;
let key = helper.key().get_existing_matching_key(key_pattern).await?;
let bucket_id = key
.state
@ -287,6 +286,7 @@ impl AdminRpcHandler {
)))
} else {
let bucket_id = helper
.bucket()
.resolve_global_bucket_name(&query.name)
.await?
.ok_or_bad_request("Bucket not found")?;
@ -303,14 +303,15 @@ impl AdminRpcHandler {
}
async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
let key_helper = self.garage.key_helper();
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
let key = key_helper
let key = helper
.key()
.get_existing_matching_key(&query.key_pattern)
.await?;
@ -338,14 +339,15 @@ impl AdminRpcHandler {
}
async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.bucket_helper();
let key_helper = self.garage.key_helper();
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.resolve_global_bucket_name(&query.bucket)
.await?
.ok_or_bad_request("Bucket not found")?;
let key = key_helper
let key = helper
.key()
.get_existing_matching_key(&query.key_pattern)
.await?;

View file

@ -76,9 +76,10 @@ impl AdminRpcHandler {
}
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
let key_helper = self.garage.key_helper();
let helper = self.garage.locked_helper().await;
let mut key = key_helper
let mut key = helper
.key()
.get_existing_matching_key(&query.key_pattern)
.await?;
@ -88,7 +89,7 @@ impl AdminRpcHandler {
));
}
key_helper.delete_key(&mut key).await?;
helper.delete_key(&mut key).await?;
Ok(AdminRpc::Ok(format!(
"Key {} was deleted successfully.",

View file

@ -56,6 +56,9 @@ pub struct Garage {
/// Table containing api keys
pub key_table: Arc<Table<KeyTable, TableFullReplication>>,
/// Lock to prevent concurrent modification of buckets and access keys
bucket_lock: tokio::sync::Mutex<()>,
/// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
/// Counting table containing object counters
@ -343,6 +346,7 @@ impl Garage {
bucket_table,
bucket_alias_table,
key_table,
bucket_lock: tokio::sync::Mutex::new(()),
object_table,
object_counter_table,
mpu_table,
@ -385,6 +389,11 @@ impl Garage {
pub fn key_helper(&self) -> helper::key::KeyHelper {
helper::key::KeyHelper(self)
}
pub async fn locked_helper(&self) -> helper::locked::LockedHelper {
let lock = self.bucket_lock.lock().await;
helper::locked::LockedHelper(self, lock)
}
}
#[cfg(feature = "k2v")]

View file

@ -1,19 +1,15 @@
use std::time::Duration;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::error::OkOrMessage;
use garage_util::time::*;
use garage_table::util::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::garage::Garage;
use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
use crate::s3::object_table::*;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@ -96,341 +92,7 @@ impl<'a> BucketHelper<'a> {
.ok_or_else(|| Error::NoSuchBucket(hex::encode(bucket_id)))
}
/// Sets a new alias for a bucket in global namespace.
/// This function fails if:
/// - alias name is not valid according to S3 spec
/// - bucket does not exist or is deleted
/// - alias already exists and points to another bucket
pub async fn set_global_bucket_alias(
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
if !is_valid_bucket_name(alias_name) {
return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?;
if let Some(existing_alias) = alias.as_ref() {
if let Some(p_bucket) = existing_alias.state.get() {
if *p_bucket != bucket_id {
return Err(Error::BadRequest(format!(
"Alias {} already exists and points to different bucket: {:?}",
alias_name, p_bucket
)));
}
}
}
// Checks ok, add alias
let bucket_p = bucket.state.as_option_mut().unwrap();
let alias_ts = increment_logical_clock_2(
bucket_p.aliases.get_timestamp(alias_name),
alias.as_ref().map(|a| a.state.timestamp()).unwrap_or(0),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
let alias = match alias {
None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id))
.ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?,
Some(mut a) => {
a.state = Lww::raw(alias_ts, Some(bucket_id));
a
}
};
self.0.bucket_alias_table.insert(&alias).await?;
bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Unsets an alias for a bucket in global namespace.
/// This function fails if:
/// - bucket does not exist or is deleted
/// - alias does not exist or maps to another bucket (-> internal error)
/// - bucket has no other aliases (global or local)
pub async fn unset_global_bucket_alias(
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option_mut().unwrap();
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.await?
.filter(|a| a.state.get().map(|x| x == bucket_id).unwrap_or(false))
.ok_or_message(format!(
"Internal error: alias not found or does not point to bucket {:?}",
bucket_id
))?;
let has_other_global_aliases = bucket_state
.aliases
.items()
.iter()
.any(|(name, _, active)| name != alias_name && *active);
let has_other_local_aliases = bucket_state
.local_aliases
.items()
.iter()
.any(|(_, _, active)| *active);
if !has_other_global_aliases && !has_other_local_aliases {
return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name)));
}
// Checks ok, remove alias
let alias_ts = increment_logical_clock_2(
alias.state.timestamp(),
bucket_state.aliases.get_timestamp(alias_name),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Ensures a bucket does not have a certain global alias.
/// Contrarily to unset_global_bucket_alias, this does not
/// fail on any condition other than:
/// - bucket cannot be found (its fine if it is in deleted state)
/// - alias cannot be found (its fine if it points to nothing or
/// to another bucket)
pub async fn purge_global_bucket_alias(
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
let mut bucket = self.get_internal_bucket(bucket_id).await?;
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.await?
.ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?;
// Checks ok, remove alias
let alias_ts = match bucket.state.as_option() {
Some(bucket_state) => increment_logical_clock_2(
alias.state.timestamp(),
bucket_state.aliases.get_timestamp(alias_name),
),
None => increment_logical_clock(alias.state.timestamp()),
};
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
if alias.state.get() == &Some(bucket_id) {
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
}
if let Some(bucket_state) = bucket.state.as_option_mut() {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
}
Ok(())
}
/// Sets a new alias for a bucket in the local namespace of a key.
/// This function fails if:
/// - alias name is not valid according to S3 spec
/// - bucket does not exist or is deleted
/// - key does not exist or is deleted
/// - alias already exists and points to another bucket
pub async fn set_local_bucket_alias(
&self,
bucket_id: Uuid,
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
let key_helper = KeyHelper(self.0);
if !is_valid_bucket_name(alias_name) {
return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
let key_param = key.state.as_option_mut().unwrap();
if let Some(Some(existing_alias)) = key_param.local_aliases.get(alias_name) {
if *existing_alias != bucket_id {
return Err(Error::BadRequest(format!("Alias {} already exists in namespace of key {} and points to different bucket: {:?}", alias_name, key.key_id, existing_alias)));
}
}
// Checks ok, add alias
let bucket_p = bucket.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
// (the one from key to bucket, and the reverse one stored in the bucket iself)
// so that merges on both maps in case of a concurrent operation resolve
// to the same alias being set
let alias_ts = increment_logical_clock_2(
key_param.local_aliases.get_timestamp(alias_name),
bucket_p
.local_aliases
.get_timestamp(&bucket_p_local_alias_key),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, Some(bucket_id));
self.0.key_table.insert(&key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Unsets an alias for a bucket in the local namespace of a key.
/// This function fails if:
/// - bucket does not exist or is deleted
/// - key does not exist or is deleted
/// - alias does not exist or maps to another bucket (-> internal error)
/// - bucket has no other aliases (global or local)
pub async fn unset_local_bucket_alias(
&self,
bucket_id: Uuid,
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
let key_helper = KeyHelper(self.0);
let mut bucket = self.get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
let bucket_p = bucket.state.as_option_mut().unwrap();
if key
.state
.as_option()
.unwrap()
.local_aliases
.get(alias_name)
.cloned()
.flatten() != Some(bucket_id)
{
return Err(GarageError::Message(format!(
"Bucket {:?} does not have alias {} in namespace of key {}",
bucket_id, alias_name, key_id
))
.into());
}
let has_other_global_aliases = bucket_p
.aliases
.items()
.iter()
.any(|(_, _, active)| *active);
let has_other_local_aliases = bucket_p
.local_aliases
.items()
.iter()
.any(|((k, n), _, active)| *k == key.key_id && n == alias_name && *active);
if !has_other_global_aliases && !has_other_local_aliases {
return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name)));
}
// Checks ok, remove alias
let key_param = key.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
let alias_ts = increment_logical_clock_2(
key_param.local_aliases.get_timestamp(alias_name),
bucket_p
.local_aliases
.get_timestamp(&bucket_p_local_alias_key),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, None);
self.0.key_table.insert(&key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Sets permissions for a key on a bucket.
/// This function fails if:
/// - bucket or key cannot be found at all (its ok if they are in deleted state)
/// - bucket or key is in deleted state and we are trying to set permissions other than "deny
/// all"
pub async fn set_bucket_key_permissions(
&self,
bucket_id: Uuid,
key_id: &String,
mut perm: BucketKeyPerm,
) -> Result<(), Error> {
let key_helper = KeyHelper(self.0);
let mut bucket = self.get_internal_bucket(bucket_id).await?;
let mut key = key_helper.get_internal_key(key_id).await?;
if let Some(bstate) = bucket.state.as_option() {
if let Some(kp) = bstate.authorized_keys.get(key_id) {
perm.timestamp = increment_logical_clock_2(perm.timestamp, kp.timestamp);
}
} else if perm.is_any() {
return Err(Error::BadRequest(
"Trying to give permissions on a deleted bucket".into(),
));
}
if let Some(kstate) = key.state.as_option() {
if let Some(bp) = kstate.authorized_buckets.get(&bucket_id) {
perm.timestamp = increment_logical_clock_2(perm.timestamp, bp.timestamp);
}
} else if perm.is_any() {
return Err(Error::BadRequest(
"Trying to give permissions to a deleted key".into(),
));
}
// ---- timestamp-ensured causality barrier ----
if let Some(bstate) = bucket.state.as_option_mut() {
bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm);
self.0.bucket_table.insert(&bucket).await?;
}
if let Some(kstate) = key.state.as_option_mut() {
kstate.authorized_buckets = Map::put_mutator(bucket_id, perm);
self.0.key_table.insert(&key).await?;
}
Ok(())
}
// ----
pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> {
let objects = self

View file

@ -1,12 +1,9 @@
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::error::OkOrMessage;
use crate::garage::Garage;
use crate::helper::bucket::BucketHelper;
use crate::helper::error::*;
use crate::key_table::{Key, KeyFilter};
use crate::permission::BucketKeyPerm;
pub struct KeyHelper<'a>(pub(crate) &'a Garage);
@ -65,38 +62,4 @@ impl<'a> KeyHelper<'a> {
Ok(candidates.into_iter().next().unwrap())
}
}
/// Deletes an API access key
pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> {
let bucket_helper = BucketHelper(self.0);
let state = key.state.as_option_mut().unwrap();
// --- done checking, now commit ---
// (the step at unset_local_bucket_alias will fail if a bucket
// does not have another alias, the deletion will be
// interrupted in the middle if that happens)
// 1. Delete local aliases
for (alias, _, to) in state.local_aliases.items().iter() {
if let Some(bucket_id) = to {
bucket_helper
.unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
.await?;
}
}
// 2. Remove permissions on all authorized buckets
for (ab_id, _auth) in state.authorized_buckets.items().iter() {
bucket_helper
.set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
.await?;
}
// 3. Actually delete key
key.state = Deletable::delete();
self.0.key_table.insert(key).await?;
Ok(())
}
}

410
src/model/helper/locked.rs Normal file
View file

@ -0,0 +1,410 @@
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::time::*;
use garage_table::util::*;
use crate::bucket_alias_table::*;
use crate::garage::Garage;
use crate::helper::bucket::BucketHelper;
use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
/// A LockedHelper is the mandatory struct to hold when doing operations
/// that modify access keys or bucket aliases. This structure takes
/// a lock to a unit value that is in the globally-shared Garage struct.
///
/// This avoid several concurrent requests to modify the list of buckets
/// and aliases at the same time, ending up in inconsistent states.
/// This DOES NOT FIX THE FUNDAMENTAL ISSUE as CreateBucket requests handled
/// by different API nodes can still break the cluster, but it is a first
/// fix that allows consistency to be maintained if all such requests are
/// directed to a single node, which is doable for many deployments.
///
/// See issues: #649, #723
pub struct LockedHelper<'a>(
pub(crate) &'a Garage,
pub(crate) tokio::sync::MutexGuard<'a, ()>,
);
#[allow(clippy::ptr_arg)]
impl<'a> LockedHelper<'a> {
pub fn bucket(&self) -> BucketHelper<'a> {
BucketHelper(self.0)
}
pub fn key(&self) -> KeyHelper<'a> {
KeyHelper(self.0)
}
/// Sets a new alias for a bucket in global namespace.
/// This function fails if:
/// - alias name is not valid according to S3 spec
/// - bucket does not exist or is deleted
/// - alias already exists and points to another bucket
pub async fn set_global_bucket_alias(
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
if !is_valid_bucket_name(alias_name) {
return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let alias = self.0.bucket_alias_table.get(&EmptyKey, alias_name).await?;
if let Some(existing_alias) = alias.as_ref() {
if let Some(p_bucket) = existing_alias.state.get() {
if *p_bucket != bucket_id {
return Err(Error::BadRequest(format!(
"Alias {} already exists and points to different bucket: {:?}",
alias_name, p_bucket
)));
}
}
}
// Checks ok, add alias
let bucket_p = bucket.state.as_option_mut().unwrap();
let alias_ts = increment_logical_clock_2(
bucket_p.aliases.get_timestamp(alias_name),
alias.as_ref().map(|a| a.state.timestamp()).unwrap_or(0),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
let alias = match alias {
None => BucketAlias::new(alias_name.clone(), alias_ts, Some(bucket_id))
.ok_or_else(|| Error::InvalidBucketName(alias_name.clone()))?,
Some(mut a) => {
a.state = Lww::raw(alias_ts, Some(bucket_id));
a
}
};
self.0.bucket_alias_table.insert(&alias).await?;
bucket_p.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Unsets an alias for a bucket in global namespace.
/// This function fails if:
/// - bucket does not exist or is deleted
/// - alias does not exist or maps to another bucket (-> internal error)
/// - bucket has no other aliases (global or local)
pub async fn unset_global_bucket_alias(
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option_mut().unwrap();
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.await?
.filter(|a| a.state.get().map(|x| x == bucket_id).unwrap_or(false))
.ok_or_message(format!(
"Internal error: alias not found or does not point to bucket {:?}",
bucket_id
))?;
let has_other_global_aliases = bucket_state
.aliases
.items()
.iter()
.any(|(name, _, active)| name != alias_name && *active);
let has_other_local_aliases = bucket_state
.local_aliases
.items()
.iter()
.any(|(_, _, active)| *active);
if !has_other_global_aliases && !has_other_local_aliases {
return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name)));
}
// Checks ok, remove alias
let alias_ts = increment_logical_clock_2(
alias.state.timestamp(),
bucket_state.aliases.get_timestamp(alias_name),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Ensures a bucket does not have a certain global alias.
/// Contrarily to unset_global_bucket_alias, this does not
/// fail on any condition other than:
/// - bucket cannot be found (its fine if it is in deleted state)
/// - alias cannot be found (its fine if it points to nothing or
/// to another bucket)
pub async fn purge_global_bucket_alias(
&self,
bucket_id: Uuid,
alias_name: &String,
) -> Result<(), Error> {
let mut bucket = self.bucket().get_internal_bucket(bucket_id).await?;
let mut alias = self
.0
.bucket_alias_table
.get(&EmptyKey, alias_name)
.await?
.ok_or_else(|| Error::NoSuchBucket(alias_name.to_string()))?;
// Checks ok, remove alias
let alias_ts = match bucket.state.as_option() {
Some(bucket_state) => increment_logical_clock_2(
alias.state.timestamp(),
bucket_state.aliases.get_timestamp(alias_name),
),
None => increment_logical_clock(alias.state.timestamp()),
};
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
if alias.state.get() == &Some(bucket_id) {
alias.state = Lww::raw(alias_ts, None);
self.0.bucket_alias_table.insert(&alias).await?;
}
if let Some(bucket_state) = bucket.state.as_option_mut() {
bucket_state.aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
}
Ok(())
}
/// Sets a new alias for a bucket in the local namespace of a key.
/// This function fails if:
/// - alias name is not valid according to S3 spec
/// - bucket does not exist or is deleted
/// - key does not exist or is deleted
/// - alias already exists and points to another bucket
pub async fn set_local_bucket_alias(
&self,
bucket_id: Uuid,
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
let key_helper = KeyHelper(self.0);
if !is_valid_bucket_name(alias_name) {
return Err(Error::InvalidBucketName(alias_name.to_string()));
}
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
let key_param = key.state.as_option_mut().unwrap();
if let Some(Some(existing_alias)) = key_param.local_aliases.get(alias_name) {
if *existing_alias != bucket_id {
return Err(Error::BadRequest(format!("Alias {} already exists in namespace of key {} and points to different bucket: {:?}", alias_name, key.key_id, existing_alias)));
}
}
// Checks ok, add alias
let bucket_p = bucket.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
// (the one from key to bucket, and the reverse one stored in the bucket iself)
// so that merges on both maps in case of a concurrent operation resolve
// to the same alias being set
let alias_ts = increment_logical_clock_2(
key_param.local_aliases.get_timestamp(alias_name),
bucket_p
.local_aliases
.get_timestamp(&bucket_p_local_alias_key),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, Some(bucket_id));
self.0.key_table.insert(&key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, true);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Unsets an alias for a bucket in the local namespace of a key.
/// This function fails if:
/// - bucket does not exist or is deleted
/// - key does not exist or is deleted
/// - alias does not exist or maps to another bucket (-> internal error)
/// - bucket has no other aliases (global or local)
pub async fn unset_local_bucket_alias(
&self,
bucket_id: Uuid,
key_id: &String,
alias_name: &String,
) -> Result<(), Error> {
let key_helper = KeyHelper(self.0);
let mut bucket = self.bucket().get_existing_bucket(bucket_id).await?;
let mut key = key_helper.get_existing_key(key_id).await?;
let bucket_p = bucket.state.as_option_mut().unwrap();
if key
.state
.as_option()
.unwrap()
.local_aliases
.get(alias_name)
.cloned()
.flatten() != Some(bucket_id)
{
return Err(GarageError::Message(format!(
"Bucket {:?} does not have alias {} in namespace of key {}",
bucket_id, alias_name, key_id
))
.into());
}
let has_other_global_aliases = bucket_p
.aliases
.items()
.iter()
.any(|(_, _, active)| *active);
let has_other_local_aliases = bucket_p
.local_aliases
.items()
.iter()
.any(|((k, n), _, active)| *k == key.key_id && n == alias_name && *active);
if !has_other_global_aliases && !has_other_local_aliases {
return Err(Error::BadRequest(format!("Bucket {} doesn't have other aliases, please delete it instead of just unaliasing.", alias_name)));
}
// Checks ok, remove alias
let key_param = key.state.as_option_mut().unwrap();
let bucket_p_local_alias_key = (key.key_id.clone(), alias_name.clone());
let alias_ts = increment_logical_clock_2(
key_param.local_aliases.get_timestamp(alias_name),
bucket_p
.local_aliases
.get_timestamp(&bucket_p_local_alias_key),
);
// ---- timestamp-ensured causality barrier ----
// writes are now done and all writes use timestamp alias_ts
key_param.local_aliases = LwwMap::raw_item(alias_name.clone(), alias_ts, None);
self.0.key_table.insert(&key).await?;
bucket_p.local_aliases = LwwMap::raw_item(bucket_p_local_alias_key, alias_ts, false);
self.0.bucket_table.insert(&bucket).await?;
Ok(())
}
/// Sets permissions for a key on a bucket.
/// This function fails if:
/// - bucket or key cannot be found at all (its ok if they are in deleted state)
/// - bucket or key is in deleted state and we are trying to set permissions other than "deny
/// all"
pub async fn set_bucket_key_permissions(
&self,
bucket_id: Uuid,
key_id: &String,
mut perm: BucketKeyPerm,
) -> Result<(), Error> {
let key_helper = KeyHelper(self.0);
let mut bucket = self.bucket().get_internal_bucket(bucket_id).await?;
let mut key = key_helper.get_internal_key(key_id).await?;
if let Some(bstate) = bucket.state.as_option() {
if let Some(kp) = bstate.authorized_keys.get(key_id) {
perm.timestamp = increment_logical_clock_2(perm.timestamp, kp.timestamp);
}
} else if perm.is_any() {
return Err(Error::BadRequest(
"Trying to give permissions on a deleted bucket".into(),
));
}
if let Some(kstate) = key.state.as_option() {
if let Some(bp) = kstate.authorized_buckets.get(&bucket_id) {
perm.timestamp = increment_logical_clock_2(perm.timestamp, bp.timestamp);
}
} else if perm.is_any() {
return Err(Error::BadRequest(
"Trying to give permissions to a deleted key".into(),
));
}
// ---- timestamp-ensured causality barrier ----
if let Some(bstate) = bucket.state.as_option_mut() {
bstate.authorized_keys = Map::put_mutator(key_id.clone(), perm);
self.0.bucket_table.insert(&bucket).await?;
}
if let Some(kstate) = key.state.as_option_mut() {
kstate.authorized_buckets = Map::put_mutator(bucket_id, perm);
self.0.key_table.insert(&key).await?;
}
Ok(())
}
// ----
/// Deletes an API access key
pub async fn delete_key(&self, key: &mut Key) -> Result<(), Error> {
let state = key.state.as_option_mut().unwrap();
// --- done checking, now commit ---
// (the step at unset_local_bucket_alias will fail if a bucket
// does not have another alias, the deletion will be
// interrupted in the middle if that happens)
// 1. Delete local aliases
for (alias, _, to) in state.local_aliases.items().iter() {
if let Some(bucket_id) = to {
self.unset_local_bucket_alias(*bucket_id, &key.key_id, alias)
.await?;
}
}
// 2. Remove permissions on all authorized buckets
for (ab_id, _auth) in state.authorized_buckets.items().iter() {
self.set_bucket_key_permissions(*ab_id, &key.key_id, BucketKeyPerm::NO_PERMISSIONS)
.await?;
}
// 3. Actually delete key
key.state = Deletable::delete();
self.0.key_table.insert(key).await?;
Ok(())
}
}

View file

@ -1,3 +1,4 @@
pub mod bucket;
pub mod error;
pub mod key;
pub mod locked;

View file

@ -67,6 +67,8 @@ impl Migrate {
None
};
let helper = self.garage.locked_helper().await;
self.garage
.bucket_table
.insert(&Bucket {
@ -84,14 +86,10 @@ impl Migrate {
})
.await?;
self.garage
.bucket_helper()
.set_global_bucket_alias(bucket_id, &new_name)
.await?;
helper.set_global_bucket_alias(bucket_id, &new_name).await?;
for (k, ts, perm) in old_bucket_p.authorized_keys.items().iter() {
self.garage
.bucket_helper()
helper
.set_bucket_key_permissions(
bucket_id,
k,