diff --git a/src/model/garage.rs b/src/model/garage.rs index 3d538ecd..0ea4bc4a 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -13,6 +13,7 @@ use garage_table::replication::TableFullReplication; use garage_table::replication::TableShardedReplication; use garage_table::*; +use crate::k2v::counter_table::*; use crate::k2v::item_table::*; use crate::k2v::rpc::*; use crate::s3::block_ref_table::*; @@ -22,6 +23,7 @@ use crate::s3::version_table::*; use crate::bucket_alias_table::*; use crate::bucket_table::*; use crate::helper; +use crate::index_counter::*; use crate::key_table::*; /// An entire Garage full of data @@ -54,6 +56,8 @@ pub struct Garage { /// Table containing K2V items pub k2v_item_table: Arc>, + /// Indexing table containing K2V item counters + pub k2v_counter_table: Arc>, /// K2V RPC handler pub k2v_rpc: Arc, } @@ -154,7 +158,15 @@ impl Garage { ); // ---- K2V ---- - let k2v_item_table = Table::new(K2VItemTable {}, meta_rep_param, system.clone(), &db); + let k2v_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + let k2v_item_table = Table::new( + K2VItemTable { + counter_table: k2v_counter_table.clone(), + }, + meta_rep_param, + system.clone(), + &db, + ); let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone()); info!("Initialize Garage..."); @@ -172,6 +184,7 @@ impl Garage { version_table, block_ref_table, k2v_item_table, + k2v_counter_table, k2v_rpc, }) } diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index 1292afef..48d69939 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -4,8 +4,8 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use garage_rpc::system::System; use garage_rpc::ring::Ring; +use garage_rpc::system::System; use garage_util::data::*; use garage_util::error::*; @@ -42,13 +42,14 @@ impl Entry for CounterEntry { } impl CounterEntry { - pub fn filtered_values(&self, sys: System) -> HashMap { - let ring: Arc = sys.ring.borrow().clone(); + pub fn filtered_values(&self, ring: &Ring) -> HashMap { let nodes = &ring.layout.node_id_vec; - + let mut ret = HashMap::new(); for (name, vals) in self.values.iter() { - let new_vals = vals.node_values.iter() + let new_vals = vals + .node_values + .iter() .filter(|(n, _)| nodes.contains(n)) .map(|(_, (_, v))| v) .collect::>(); @@ -56,7 +57,7 @@ impl CounterEntry { ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| a + *b)); } } - + ret } } @@ -123,8 +124,12 @@ pub struct IndexCounter { } impl IndexCounter { - pub fn new(system: Arc, replication: TableShardedReplication, db: &sled::Db) -> Self { - Self { + pub fn new( + system: Arc, + replication: TableShardedReplication, + db: &sled::Db, + ) -> Arc { + Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter:{}", T::NAME)) @@ -137,10 +142,10 @@ impl IndexCounter { system, db, ), - } + }) } - pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(String, i64)]) -> Result<(), Error> { + pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> { let tree_key = self.table.data.tree_key(pk, sk); let new_entry = self.local_counter.transaction(|tx| { @@ -156,7 +161,7 @@ impl IndexCounter { }; for (s, inc) in counts.iter() { - let mut ent = entry.values.entry(s.clone()).or_insert((0, 0)); + let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); ent.0 += 1; ent.1 += *inc; } diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs new file mode 100644 index 00000000..a257e4fb --- /dev/null +++ b/src/model/k2v/counter_table.rs @@ -0,0 +1,15 @@ +use garage_util::data::*; + +use crate::index_counter::*; + +#[derive(PartialEq, Clone)] +pub struct K2VCounterTable; + +impl CounterSchema for K2VCounterTable { + const NAME: &'static str = "k2v_index_counter"; + + // Partition key = bucket id + type P = Uuid; + // Sort key = K2V item's partition key + type S = String; +} diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index b369df49..8f771643 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -1,12 +1,15 @@ use serde::{Deserialize, Serialize}; use std::collections::BTreeMap; +use std::sync::Arc; use garage_util::data::*; use garage_table::crdt::*; use garage_table::*; +use crate::index_counter::*; use crate::k2v::causality::*; +use crate::k2v::counter_table::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { @@ -105,6 +108,25 @@ impl K2VItem { ent.discard(); } } + + // returns counters: (non-deleted entries, non-tombstone values, bytes used) + fn stats(&self) -> (i64, i64, i64) { + let n_entries = if self.is_tombstone() { 0 } else { 1 }; + let n_values = self + .values() + .iter() + .filter(|v| matches!(v, DvvsValue::Value(_))) + .count() as i64; + let n_bytes = self + .values() + .iter() + .map(|v| match v { + DvvsValue::Deleted => 0, + DvvsValue::Value(v) => v.len() as i64, + }) + .sum(); + (n_entries, n_values, n_bytes) + } } impl DvvsEntry { @@ -175,7 +197,9 @@ impl Entry for K2VItem { } } -pub struct K2VItemTable {} +pub struct K2VItemTable { + pub(crate) counter_table: Arc>, +} #[derive(Clone, Copy, Debug, Serialize, Deserialize)] pub struct ItemFilter { @@ -191,8 +215,34 @@ impl TableSchema for K2VItemTable { type E = K2VItem; type Filter = ItemFilter; - fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) { - // nothing for now + fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { + let (old_entries, old_values, old_bytes) = match old { + None => (0, 0, 0), + Some(e) => e.stats(), + }; + let (new_entries, new_values, new_bytes) = match new { + None => (0, 0, 0), + Some(e) => e.stats(), + }; + + let count_pk = old + .map(|e| e.partition.bucket_id) + .unwrap_or_else(|| new.unwrap().partition.bucket_id); + let count_sk = old + .map(|e| &e.partition.partition_key) + .unwrap_or_else(|| &new.unwrap().partition.partition_key); + + if let Err(e) = self.counter_table.count( + &count_pk, + count_sk, + &[ + ("entries", new_entries - old_entries), + ("values", new_values - old_values), + ("bytes", new_bytes - old_bytes), + ], + ) { + error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); + } } #[allow(clippy::nonminimal_bool)] diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index d6531764..cfac965b 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,5 +1,6 @@ pub mod causality; +pub mod counter_table; pub mod item_table; pub mod rpc;