diff --git a/Cargo.lock b/Cargo.lock index 11aa070d..ecdf8a57 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -387,6 +387,12 @@ dependencies = [ "either", ] +[[package]] +name = "bytesize" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c58ec36aac5066d5ca17df51b3e70279f5670a72102f5752cb7e7c856adfc70" + [[package]] name = "cc" version = "1.0.73" @@ -948,6 +954,7 @@ dependencies = [ "aws-sdk-s3", "base64", "bytes 1.1.0", + "bytesize", "chrono", "futures", "futures-util", diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index eb643160..640e6975 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -30,6 +30,7 @@ garage_util = { version = "0.7.0", path = "../util" } garage_web = { version = "0.7.0", path = "../web" } bytes = "1.0" +bytesize = "1.1" hex = "0.4" tracing = { version = "0.1.30", features = ["log-always"] } pretty_env_logger = "0.4" diff --git a/src/garage/admin.rs b/src/garage/admin.rs index afe7fe7a..31305b51 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -39,7 +39,11 @@ pub enum AdminRpc { // Replies Ok(String), BucketList(Vec), - BucketInfo(Bucket, HashMap), + BucketInfo { + bucket: Bucket, + relevant_keys: HashMap, + counters: HashMap, + }, KeyList(Vec<(String, String)>), KeyInfo(Key, HashMap), } @@ -104,6 +108,15 @@ impl AdminRpcHandler { .get_existing_bucket(bucket_id) .await?; + let counters = self + .garage + .object_counter_table + .table + .get(&EmptyKey, &bucket_id) + .await? + .map(|x| x.filtered_values(&self.garage.system.ring.borrow())) + .unwrap_or_default(); + let mut relevant_keys = HashMap::new(); for (k, _) in bucket .state @@ -139,7 +152,11 @@ impl AdminRpcHandler { } } - Ok(AdminRpc::BucketInfo(bucket, relevant_keys)) + Ok(AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + }) } #[allow(clippy::ptr_arg)] diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index b2dd8f14..3a0bd956 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -169,8 +169,12 @@ pub async fn cmd_admin( AdminRpc::BucketList(bl) => { print_bucket_list(bl); } - AdminRpc::BucketInfo(bucket, rk) => { - print_bucket_info(&bucket, &rk); + AdminRpc::BucketInfo { + bucket, + relevant_keys, + counters, + } => { + print_bucket_info(&bucket, &relevant_keys, &counters); } AdminRpc::KeyList(kl) => { print_key_list(kl); diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 575ac857..cdaa09be 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -425,6 +425,9 @@ pub enum OfflineRepairWhat { /// Repair K2V item counters #[structopt(name = "k2v_item_counters")] K2VItemCounters, + /// Repair object counters + #[structopt(name = "object_counters")] + ObjectCounters, } #[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs index 6d73be3a..23c669b9 100644 --- a/src/garage/cli/util.rs +++ b/src/garage/cli/util.rs @@ -7,6 +7,7 @@ use garage_util::formater::format_table; use garage_model::bucket_table::*; use garage_model::key_table::*; +use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS}; pub fn print_bucket_list(bl: Vec) { println!("List of buckets:"); @@ -121,7 +122,11 @@ pub fn print_key_info(key: &Key, relevant_buckets: &HashMap) { } } -pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap) { +pub fn print_bucket_info( + bucket: &Bucket, + relevant_keys: &HashMap, + counters: &HashMap, +) { let key_name = |k| { relevant_keys .get(k) @@ -133,6 +138,25 @@ pub fn print_bucket_info(bucket: &Bucket, relevant_keys: &HashMap) match &bucket.state { Deletable::Deleted => println!("Bucket is deleted."), Deletable::Present(p) => { + let size = + bytesize::ByteSize::b(counters.get(BYTES).cloned().unwrap_or_default() as u64); + println!( + "Size: {} ({})", + size.to_string_as(true), + size.to_string_as(false) + ); + println!( + "Objects: {}", + counters.get(OBJECTS).cloned().unwrap_or_default() + ); + println!( + "Unfinished multipart uploads: {}", + counters + .get(UNFINISHED_UPLOADS) + .cloned() + .unwrap_or_default() + ); + println!("Website access: {}", p.website_config.get().is_some()); println!("\nGlobal aliases:"); diff --git a/src/garage/repair/offline.rs b/src/garage/repair/offline.rs index 853bfdf3..ef56cc5c 100644 --- a/src/garage/repair/offline.rs +++ b/src/garage/repair/offline.rs @@ -38,6 +38,11 @@ pub async fn offline_repair(config_file: PathBuf, opt: OfflineRepairOpt) -> Resu #[cfg(not(feature = "k2v"))] error!("K2V not enabled in this build."); } + OfflineRepairWhat::ObjectCounters => { + garage + .object_counter_table + .offline_recount_all(&garage.object_table)?; + } } info!("Repair operation finished, shutting down Garage internals..."); diff --git a/src/model/garage.rs b/src/model/garage.rs index eed9445c..06ef25d1 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -23,10 +23,9 @@ use crate::s3::version_table::*; use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; +use crate::index_counter::*; use crate::key_table::*; -#[cfg(feature = "k2v")] -use crate::index_counter::*; #[cfg(feature = "k2v")] use crate::k2v::{item_table::*, poll::*, rpc::*}; @@ -53,6 +52,8 @@ pub struct Garage { /// Table containing S3 objects pub object_table: Arc>, + /// Counting table containing object counters + pub object_counter_table: Arc>, /// Table containing S3 object versions pub version_table: Arc>, /// Table containing S3 block references (not blocks themselves) @@ -205,12 +206,16 @@ impl Garage { &db, ); + info!("Initialize object counter table..."); + let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + info!("Initialize object_table..."); #[allow(clippy::redundant_clone)] let object_table = Table::new( ObjectTable { background: background.clone(), version_table: version_table.clone(), + object_counter_table: object_counter_table.clone(), }, meta_rep_param.clone(), system.clone(), @@ -232,6 +237,7 @@ impl Garage { bucket_alias_table, key_table, object_table, + object_counter_table, version_table, block_ref_table, #[cfg(feature = "k2v")] diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs index 62f5d8d9..027acea0 100644 --- a/src/model/s3/object_table.rs +++ b/src/model/s3/object_table.rs @@ -11,10 +11,15 @@ use garage_table::crdt::*; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::index_counter::*; use crate::s3::version_table::*; use garage_model_050::object_table as old; +pub const OBJECTS: &str = "objects"; +pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; +pub const BYTES: &str = "bytes"; + /// An object #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { @@ -218,6 +223,7 @@ impl Crdt for Object { pub struct ObjectTable { pub background: Arc, pub version_table: Arc>, + pub object_counter_table: Arc>, } #[derive(Clone, Copy, Debug, Serialize, Deserialize)] @@ -236,10 +242,20 @@ impl TableSchema for ObjectTable { fn updated( &self, - _tx: &mut db::Transaction, + tx: &mut db::Transaction, old: Option<&Self::E>, new: Option<&Self::E>, ) -> db::TxOpResult<()> { + // 1. Count + let counter_res = self.object_counter_table.count(tx, old, new); + if let Err(e) = db::unabort(counter_res)? { + error!( + "Unable to update object counter: {}. Index values will be wrong!", + e + ); + } + + // 2. Spawn threads that propagates deletions to version table let version_table = self.version_table.clone(); let old = old.cloned(); let new = new.cloned(); @@ -283,6 +299,46 @@ impl TableSchema for ObjectTable { } } +impl CountedItem for Object { + const COUNTER_TABLE_NAME: &'static str = "bucket_object_counter"; + + // Partition key = nothing + type CP = EmptyKey; + // Sort key = bucket id + type CS = Uuid; + + fn counter_partition_key(&self) -> &EmptyKey { + &EmptyKey + } + fn counter_sort_key(&self) -> &Uuid { + &self.bucket_id + } + + fn counts(&self) -> Vec<(&'static str, i64)> { + let n_objects = if self.is_tombstone() { 0 } else { 1 }; + + let versions = self.versions(); + let n_unfinished_uploads = versions + .iter() + .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_))) + .count(); + let n_bytes = versions + .iter() + .map(|v| match &v.state { + ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) + | ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta.size, + _ => 0, + }) + .sum::(); + + vec![ + (OBJECTS, n_objects), + (UNFINISHED_UPLOADS, n_unfinished_uploads as i64), + (BYTES, n_bytes as i64), + ] + } +} + // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // (we just want to change bucket into bucket_id by hashing it)