improve internal item counter mechanisms and implement bucket quotas #326

Merged
lx merged 23 commits from counters into main 2022-06-15 18:20:31 +00:00
9 changed files with 131 additions and 8 deletions
Showing only changes of commit 17e1111393 - Show all commits

7
Cargo.lock generated
View file

@ -387,6 +387,12 @@ dependencies = [
"either", "either",
] ]
[[package]]
name = "bytesize"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70"
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.0.73" version = "1.0.73"
@ -948,6 +954,7 @@ dependencies = [
"aws-sdk-s3", "aws-sdk-s3",
"base64", "base64",
"bytes 1.1.0", "bytes 1.1.0",
"bytesize",
"chrono", "chrono",
"futures", "futures",
"futures-util", "futures-util",

View file

@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" }
garage_web = { version = "0.7.0", path = "../web" } garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0" bytes = "1.0"
bytesize = "1.1"
hex = "0.4" hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] } tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4" pretty_env_logger = "0.4"

View file

@ -39,7 +39,11 @@ pub enum AdminRpc {
// Replies // Replies
Ok(String), Ok(String),
BucketList(Vec<Bucket>), BucketList(Vec<Bucket>),
BucketInfo(Bucket, HashMap<String, Key>), BucketInfo {
bucket: Bucket,
relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>,
},
KeyList(Vec<(String, String)>), KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>), KeyInfo(Key, HashMap<Uuid, Bucket>),
} }
@ -104,6 +108,15 @@ impl AdminRpcHandler {
.get_existing_bucket(bucket_id) .get_existing_bucket(bucket_id)
.await?; .await?;
let counters = self
.garage
.object_counter_table
.table
.get(&EmptyKey, &bucket_id)
.await?
.map(|x| x.filtered_values(&self.garage.system.ring.borrow()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new(); let mut relevant_keys = HashMap::new();
for (k, _) in bucket for (k, _) in bucket
.state .state
@ -139,7 +152,11 @@ impl AdminRpcHandler {
} }
} }
Ok(AdminRpc::BucketInfo(bucket, relevant_keys)) Ok(AdminRpc::BucketInfo {
bucket,
relevant_keys,
counters,
})
} }
#[allow(clippy::ptr_arg)] #[allow(clippy::ptr_arg)]

View file

@ -169,8 +169,12 @@ pub async fn cmd_admin(
AdminRpc::BucketList(bl) => { AdminRpc::BucketList(bl) => {
print_bucket_list(bl); print_bucket_list(bl);
} }
AdminRpc::BucketInfo(bucket, rk) => { AdminRpc::BucketInfo {
print_bucket_info(&bucket, &rk); bucket,
relevant_keys,
counters,
} => {
print_bucket_info(&bucket, &relevant_keys, &counters);
} }
AdminRpc::KeyList(kl) => { AdminRpc::KeyList(kl) => {
print_key_list(kl); print_key_list(kl);

View file

@ -425,6 +425,9 @@ pub enum OfflineRepairWhat {
/// Repair K2V item counters /// Repair K2V item counters
#[structopt(name = "k2v_item_counters")] #[structopt(name = "k2v_item_counters")]
K2VItemCounters, K2VItemCounters,
/// Repair object counters
#[structopt(name = "object_counters")]
ObjectCounters,
} }
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]

View file

@ -7,6 +7,7 @@ use garage_util::formater::format_table;
use garage_model::bucket_table::*; use garage_model::bucket_table::*;
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
pub fn print_bucket_list(bl: Vec<Bucket>) { pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:"); println!("List of buckets:");
@ -121,7 +122,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
} }
} }
pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>) { pub fn print_bucket_info(
bucket: &Bucket,
relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>,
) {
let key_name = |k| { let key_name = |k| {
relevant_keys relevant_keys
.get(k) .get(k)
@ -133,6 +138,25 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap<String, Key>)
match &bucket.state { match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."), Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => { Deletable::Present(p) => {
let size =
bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64);
println!(
"Size: {} ({})",
size.to_string_as(true),
size.to_string_as(false)
);
println!(
"Objects: {}",
counters.get(OBJECTS).cloned().unwrap_or_default()
);
println!(
"Unfinished multipart uploads: {}",
counters
.get(UNFINISHED_UPLOADS)
.cloned()
.unwrap_or_default()
);
println!("Website access: {}", p.website_config.get().is_some()); println!("Website access: {}", p.website_config.get().is_some());
println!("\nGlobal aliases:"); println!("\nGlobal aliases:");

View file

@ -38,6 +38,11 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu
#[cfg(not(feature = "k2v"))] #[cfg(not(feature = "k2v"))]
error!("K2V not enabled in this build."); error!("K2V not enabled in this build.");
} }
OfflineRepairWhat::ObjectCounters => {
garage
.object_counter_table
.offline_recount_all(&garage.object_table)?;
}
} }
info!("Repair operation finished, shutting down Garage internals..."); info!("Repair operation finished, shutting down Garage internals...");

View file

@ -23,10 +23,9 @@ use crate::s3::version_table::*;
use crate::bucket_alias_table::*; use crate::bucket_alias_table::*;
use crate::bucket_table::*; use crate::bucket_table::*;
use crate::helper; use crate::helper;
use crate::index_counter::*;
use crate::key_table::*; use crate::key_table::*;
#[cfg(feature = "k2v")]
use crate::index_counter::*;
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
use crate::k2v::{item_table::*, poll::*, rpc::*}; use crate::k2v::{item_table::*, poll::*, rpc::*};
@ -53,6 +52,8 @@ pub struct Garage {
/// Table containing S3 objects /// Table containing S3 objects
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>, pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
/// Counting table containing object counters
pub object_counter_table: Arc<IndexCounter<Object>>,
/// Table containing S3 object versions /// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves) /// Table containing S3 block references (not blocks themselves)
@ -205,12 +206,16 @@ impl Garage {
&db, &db,
); );
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
info!("Initialize object_table..."); info!("Initialize object_table...");
#[allow(clippy::redundant_clone)] #[allow(clippy::redundant_clone)]
let object_table = Table::new( let object_table = Table::new(
ObjectTable { ObjectTable {
background: background.clone(), background: background.clone(),
version_table: version_table.clone(), version_table: version_table.clone(),
object_counter_table: object_counter_table.clone(),
}, },
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
@ -232,6 +237,7 @@ impl Garage {
bucket_alias_table, bucket_alias_table,
key_table, key_table,
object_table, object_table,
object_counter_table,
version_table, version_table,
block_ref_table, block_ref_table,
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]

View file

@ -11,10 +11,15 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*; use crate::s3::version_table::*;
use garage_model_050::object_table as old; use garage_model_050::object_table as old;
pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes";
/// An object /// An object
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
@ -218,6 +223,7 @@ impl Crdt for Object {
pub struct ObjectTable { pub struct ObjectTable {
pub background: Arc<BackgroundRunner>, pub background: Arc<BackgroundRunner>,
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>, pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
} }
#[derive(Clone, Copy, Debug, Serialize, Deserialize)] #[derive(Clone, Copy, Debug, Serialize, Deserialize)]
@ -236,10 +242,20 @@ impl TableSchema for ObjectTable {
fn updated( fn updated(
&self, &self,
_tx: &mut db::Transaction, tx: &mut db::Transaction,
old: Option<&Self::E>, old: Option<&Self::E>,
new: Option<&Self::E>, new: Option<&Self::E>,
) -> db::TxOpResult<()> { ) -> db::TxOpResult<()> {
// 1. Count
let counter_res = self.object_counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
error!(
"Unable to update object counter: {}. Index values will be wrong!",
e
);
}
// 2. Spawn threads that propagates deletions to version table
let version_table = self.version_table.clone(); let version_table = self.version_table.clone();
let old = old.cloned(); let old = old.cloned();
let new = new.cloned(); let new = new.cloned();
@ -283,6 +299,46 @@ impl TableSchema for ObjectTable {
} }
} }
impl CountedItem for Object {
const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter";
// Partition key = nothing
type CP = EmptyKey;
// Sort key = bucket id
type CS = Uuid;
fn counter_partition_key(&self) -> &EmptyKey {
&EmptyKey
}
fn counter_sort_key(&self) -> &Uuid {
&self.bucket_id
}
fn counts(&self) -> Vec<(&'static str, i64)> {
let n_objects = if self.is_tombstone() { 0 } else { 1 };
let versions = self.versions();
let n_unfinished_uploads = versions
.iter()
.filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
.count();
let n_bytes = versions
.iter()
.map(|v| match &v.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _))
| ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size,
_ => 0,
})
.sum::<u64>();
vec![
(OBJECTS, n_objects),
(UNFINISHED_UPLOADS, n_unfinished_uploads as i64),
(BYTES, n_bytes as i64),
]
}
}
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it) // (we just want to change bucket into bucket_id by hashing it)