Refactor bucket emptiness check and add k2v check

This commit is contained in:
Alex 2022-05-18 10:09:51 +02:00
parent 30e393b439
commit 5367f8adb2
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
4 changed files with 49 additions and 38 deletions

View file

@ -14,7 +14,6 @@ use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::permission::*;
use garage_model::s3::object_table::ObjectFilter;
use crate::admin::error::*;
use crate::admin::key::ApiBucketKeyPerm;
@ -327,17 +326,7 @@ pub async fn handle_delete_bucket(
let state = bucket.state.as_option().unwrap();
// Check bucket is empty
let objects = garage
.object_table
.get_range(
&bucket_id,
None,
Some(ObjectFilter::IsData),
10,
EnumerationOrder::Forward,
)
.await?;
if !objects.is_empty() {
if !helper.is_bucket_empty(bucket_id).await? {
return Err(CommonError::BucketNotEmpty.into());
}

View file

@ -8,7 +8,6 @@ use garage_model::bucket_table::Bucket;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::permission::BucketKeyPerm;
use garage_model::s3::object_table::ObjectFilter;
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
@ -229,17 +228,7 @@ pub async fn handle_delete_bucket(
// Delete bucket
// Check bucket is empty
let objects = garage
.object_table
.get_range(
&bucket_id,
None,
Some(ObjectFilter::IsData),
10,
EnumerationOrder::Forward,
)
.await?;
if !objects.is_empty() {
if !garage.bucket_helper().is_bucket_empty(bucket_id).await? {
return Err(CommonError::BucketNotEmpty.into());
}

View file

@ -22,7 +22,6 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
use garage_model::s3::object_table::ObjectFilter;
use crate::cli::*;
use crate::repair::Repair;
@ -213,18 +212,7 @@ impl AdminRpcHandler {
}
// Check bucket is empty
let objects = self
.garage
.object_table
.get_range(
&bucket_id,
None,
Some(ObjectFilter::IsData),
10,
EnumerationOrder::Forward,
)
.await?;
if !objects.is_empty() {
if !helper.is_bucket_empty(bucket_id).await? {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name

View file

@ -1,9 +1,10 @@
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::time::*;
use garage_table::util::*;
use crate::bucket_alias_table::*;
use crate::bucket_table::*;
use crate::garage::Garage;
@ -11,6 +12,7 @@ use crate::helper::error::*;
use crate::helper::key::KeyHelper;
use crate::key_table::*;
use crate::permission::BucketKeyPerm;
use crate::s3::object_table::ObjectFilter;
pub struct BucketHelper<'a>(pub(crate) &'a Garage);
@ -427,4 +429,47 @@ impl<'a> BucketHelper<'a> {
Ok(())
}
pub async fn is_bucket_empty(&self, bucket_id: Uuid) -> Result<bool, Error> {
let objects = self
.0
.object_table
.get_range(
&bucket_id,
None,
Some(ObjectFilter::IsData),
10,
EnumerationOrder::Forward,
)
.await?;
if !objects.is_empty() {
return Ok(false);
}
#[cfg(feature = "k2v")]
{
use garage_rpc::ring::Ring;
use std::sync::Arc;
let ring: Arc<Ring> = self.0.system.ring.borrow().clone();
let k2vindexes = self
.0
.k2v
.counter_table
.table
.get_range(
&bucket_id,
None,
Some((DeletedFilter::NotDeleted, ring.layout.node_id_vec.clone())),
10,
EnumerationOrder::Forward,
)
.await?;
if !k2vindexes.is_empty() {
return Ok(false);
}
}
Ok(true)
}
}