diff --git a/src/api/k2v/index.rs b/src/api/k2v/index.rs
index d5db906d..210950bf 100644
--- a/src/api/k2v/index.rs
+++ b/src/api/k2v/index.rs
@@ -10,7 +10,7 @@ use garage_rpc::ring::Ring;
use garage_table::util::*;
use garage_model::garage::Garage;
-use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
+use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::k2v::error::*;
use crate::k2v::range::read_range;
diff --git a/src/model/garage.rs b/src/model/garage.rs
index 280f3dc7..106ff858 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -27,7 +27,7 @@ use crate::key_table::*;
#[cfg(feature = "k2v")]
use crate::index_counter::*;
#[cfg(feature = "k2v")]
-use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*};
+use crate::k2v::{item_table::*, poll::*, rpc::*};
/// An entire Garage full of data
pub struct Garage {
@@ -66,7 +66,7 @@ pub struct GarageK2V {
/// Table containing K2V items
pub item_table: Arc
>,
/// Indexing table containing K2V item counters
- pub counter_table: Arc>,
+ pub counter_table: Arc>,
/// K2V RPC handler
pub rpc: Arc,
}
diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs
index 2602d5d9..109b9828 100644
--- a/src/model/index_counter.rs
+++ b/src/model/index_counter.rs
@@ -17,25 +17,30 @@ use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
-pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
- const NAME: &'static str;
- type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
- type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
+ const COUNTER_TABLE_NAME: &'static str;
+
+ type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+ type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
+
+ fn counter_partition_key(&self) -> &Self::CP;
+ fn counter_sort_key(&self) -> &Self::CS;
+ fn counts(&self) -> Vec<(&'static str, i64)>;
}
/// A counter entry in the global table
-#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
-pub struct CounterEntry {
- pub pk: T::P,
- pub sk: T::S,
+#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
+pub struct CounterEntry {
+ pub pk: T::CP,
+ pub sk: T::CS,
pub values: BTreeMap,
}
-impl Entry for CounterEntry {
- fn partition_key(&self) -> &T::P {
+impl Entry for CounterEntry {
+ fn partition_key(&self) -> &T::CP {
&self.pk
}
- fn sort_key(&self) -> &T::S {
+ fn sort_key(&self) -> &T::CS {
&self.sk
}
fn is_tombstone(&self) -> bool {
@@ -45,7 +50,7 @@ impl Entry for CounterEntry {
}
}
-impl CounterEntry {
+impl CounterEntry {
pub fn filtered_values(&self, ring: &Ring) -> HashMap {
let nodes = &ring.layout.node_id_vec[..];
self.filtered_values_with_nodes(nodes)
@@ -78,7 +83,7 @@ pub struct CounterValue {
pub node_values: BTreeMap,
}
-impl Crdt for CounterEntry {
+impl Crdt for CounterEntry {
fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() {
if let Some(e) = self.values.get_mut(name) {
@@ -104,15 +109,15 @@ impl Crdt for CounterValue {
}
}
-pub struct CounterTable {
+pub struct CounterTable {
_phantom_t: PhantomData,
}
-impl TableSchema for CounterTable {
- const TABLE_NAME: &'static str = T::NAME;
+impl TableSchema for CounterTable {
+ const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
- type P = T::P;
- type S = T::S;
+ type P = T::CP;
+ type S = T::CS;
type E = CounterEntry;
type Filter = (DeletedFilter, Vec);
@@ -131,14 +136,14 @@ impl TableSchema for CounterTable {
// ----
-pub struct IndexCounter {
+pub struct IndexCounter {
this_node: Uuid,
local_counter: db::Tree,
- propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
+ propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>,
pub table: Arc, TableShardedReplication>>,
}
-impl IndexCounter {
+impl IndexCounter {
pub fn new(
system: Arc,
replication: TableShardedReplication,
@@ -151,7 +156,7 @@ impl IndexCounter {
let this = Arc::new(Self {
this_node: system.id,
local_counter: db
- .open_tree(format!("local_counter:{}", T::NAME))
+ .open_tree(format!("local_counter:{}", T::COUNTER_TABLE_NAME))
.expect("Unable to open local counter tree"),
propagate_tx,
table: Table::new(
@@ -166,7 +171,7 @@ impl IndexCounter {
let this2 = this.clone();
background.spawn_worker(
- format!("{} index counter propagator", T::NAME),
+ format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
this
@@ -175,10 +180,26 @@ impl IndexCounter {
pub fn count(
&self,
tx: &mut db::Transaction,
- pk: &T::P,
- sk: &T::S,
- counts: &[(&str, i64)],
+ old: Option<&T>,
+ new: Option<&T>,
) -> db::TxResult<(), Error> {
+ let pk = old
+ .map(|e| e.counter_partition_key())
+ .unwrap_or_else(|| new.unwrap().counter_partition_key());
+ let sk = old
+ .map(|e| e.counter_sort_key())
+ .unwrap_or_else(|| new.unwrap().counter_sort_key());
+
+ // calculate counter differences
+ let mut counts = HashMap::new();
+ for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) -= v;
+ }
+ for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
+ *counts.entry(k).or_insert(0) += v;
+ }
+
+ // update local counter table
let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
@@ -213,7 +234,7 @@ impl IndexCounter {
async fn propagate_loop(
self: Arc,
- mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
+ mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>,
must_exit: watch::Receiver,
) {
// This loop batches updates to counters to be sent all at once.
@@ -255,10 +276,10 @@ impl IndexCounter {
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
- error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
+ error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
- warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
+ warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
@@ -280,11 +301,11 @@ struct LocalCounterEntry {
}
impl LocalCounterEntry {
- fn into_counter_entry(
+ fn into_counter_entry(
self,
this_node: Uuid,
- pk: T::P,
- sk: T::S,
+ pk: T::CP,
+ sk: T::CS,
) -> CounterEntry {
CounterEntry {
pk,
diff --git a/src/model/k2v/counter_table.rs b/src/model/k2v/counter_table.rs
deleted file mode 100644
index 4856eb2b..00000000
--- a/src/model/k2v/counter_table.rs
+++ /dev/null
@@ -1,20 +0,0 @@
-use garage_util::data::*;
-
-use crate::index_counter::*;
-
-pub const ENTRIES: &str = "entries";
-pub const CONFLICTS: &str = "conflicts";
-pub const VALUES: &str = "values";
-pub const BYTES: &str = "bytes";
-
-#[derive(PartialEq, Clone)]
-pub struct K2VCounterTable;
-
-impl CounterSchema for K2VCounterTable {
- const NAME: &'static str = "k2v_index_counter";
-
- // Partition key = bucket id
- type P = Uuid;
- // Sort key = K2V item's partition key
- type S = String;
-}
diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs
index 991fe66d..d02bdd26 100644
--- a/src/model/k2v/item_table.rs
+++ b/src/model/k2v/item_table.rs
@@ -10,9 +10,13 @@ use garage_table::*;
use crate::index_counter::*;
use crate::k2v::causality::*;
-use crate::k2v::counter_table::*;
use crate::k2v::poll::*;
+pub const ENTRIES: &str = "entries";
+pub const CONFLICTS: &str = "conflicts";
+pub const VALUES: &str = "values";
+pub const BYTES: &str = "bytes";
+
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
@@ -112,27 +116,6 @@ impl K2VItem {
ent.discard();
}
}
-
- // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
- fn stats(&self) -> (i64, i64, i64, i64) {
- let values = self.values();
-
- let n_entries = if self.is_tombstone() { 0 } else { 1 };
- let n_conflicts = if values.len() > 1 { 1 } else { 0 };
- let n_values = values
- .iter()
- .filter(|v| matches!(v, DvvsValue::Value(_)))
- .count() as i64;
- let n_bytes = values
- .iter()
- .map(|v| match v {
- DvvsValue::Deleted => 0,
- DvvsValue::Value(v) => v.len() as i64,
- })
- .sum();
-
- (n_entries, n_conflicts, n_values, n_bytes)
- }
}
impl DvvsEntry {
@@ -204,7 +187,7 @@ impl Entry for K2VItem {
}
pub struct K2VItemTable {
- pub(crate) counter_table: Arc>,
+ pub(crate) counter_table: Arc>,
pub(crate) subscriptions: Arc,
}
@@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable {
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
// 1. Count
- let (old_entries, old_conflicts, old_values, old_bytes) = match old {
- None => (0, 0, 0, 0),
- Some(e) => e.stats(),
- };
- let (new_entries, new_conflicts, new_values, new_bytes) = match new {
- None => (0, 0, 0, 0),
- Some(e) => e.stats(),
- };
-
- let count_pk = old
- .map(|e| e.partition.bucket_id)
- .unwrap_or_else(|| new.unwrap().partition.bucket_id);
- let count_sk = old
- .map(|e| &e.partition.partition_key)
- .unwrap_or_else(|| &new.unwrap().partition.partition_key);
-
- let counter_res = self.counter_table.count(
- tx,
- &count_pk,
- count_sk,
- &[
- (ENTRIES, new_entries - old_entries),
- (CONFLICTS, new_conflicts - old_conflicts),
- (VALUES, new_values - old_values),
- (BYTES, new_bytes - old_bytes),
- ],
- );
+ let counter_res = self.counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
// This result can be returned by `counter_table.count()` for instance
// if messagepack serialization or deserialization fails at some step.
// Warn admin but ignore this error for now, that's all we can do.
error!(
- "Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
- count_pk, count_sk, e
+ "Unable to update K2V item counter: {}. Index values will be wrong!",
+ e
);
}
@@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable {
}
}
+impl CountedItem for K2VItem {
+ const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = K2V item's partition key
+ type CS = String;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.partition.bucket_id
+ }
+ fn counter_sort_key(&self) -> &String {
+ &self.partition.partition_key
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let values = self.values();
+
+ let n_entries = if self.is_tombstone() { 0 } else { 1 };
+ let n_conflicts = if values.len() > 1 { 1 } else { 0 };
+ let n_values = values
+ .iter()
+ .filter(|v| matches!(v, DvvsValue::Value(_)))
+ .count() as i64;
+ let n_bytes = values
+ .iter()
+ .map(|v| match v {
+ DvvsValue::Deleted => 0,
+ DvvsValue::Value(v) => v.len() as i64,
+ })
+ .sum();
+
+ vec![
+ (ENTRIES, n_entries),
+ (CONFLICTS, n_conflicts),
+ (VALUES, n_values),
+ (BYTES, n_bytes),
+ ]
+ }
+}
+
#[cfg(test)]
mod tests {
use super::*;
diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs
index 664172a6..f6a96151 100644
--- a/src/model/k2v/mod.rs
+++ b/src/model/k2v/mod.rs
@@ -1,6 +1,5 @@
pub mod causality;
-pub mod counter_table;
pub mod item_table;
pub mod poll;