use core::ops::Bound; use std::collections::{BTreeMap, HashMap}; use std::marker::PhantomData; use std::sync::Arc; use serde::{Deserialize, Serialize}; use garage_db as db; use garage_rpc::ring::Ring; use garage_rpc::system::System; use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::*; use garage_util::time::*; use garage_table::crdt::*; use garage_table::replication::*; use garage_table::*; pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static { const COUNTER_TABLE_NAME: &'static str; type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; fn counter_partition_key(&self) -> &Self::CP; fn counter_sort_key(&self) -> &Self::CS; fn counts(&self) -> Vec<(&'static str, i64)>; } /// A counter entry in the global table #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] pub struct CounterEntry { pub pk: T::CP, pub sk: T::CS, pub values: BTreeMap, } impl Entry for CounterEntry { fn partition_key(&self) -> &T::CP { &self.pk } fn sort_key(&self) -> &T::CS { &self.sk } fn is_tombstone(&self) -> bool { self.values .iter() .all(|(_, v)| v.node_values.iter().all(|(_, (_, v))| *v == 0)) } } impl CounterEntry { pub fn filtered_values(&self, ring: &Ring) -> HashMap { let nodes = &ring.layout.node_id_vec[..]; self.filtered_values_with_nodes(nodes) } pub fn filtered_values_with_nodes(&self, nodes: &[Uuid]) -> HashMap { let mut ret = HashMap::new(); for (name, vals) in self.values.iter() { let new_vals = vals .node_values .iter() .filter(|(n, _)| nodes.contains(n)) .map(|(_, (_, v))| *v) .collect::>(); if !new_vals.is_empty() { ret.insert( name.clone(), new_vals.iter().fold(i64::MIN, |a, b| std::cmp::max(a, *b)), ); } } ret } } /// A counter entry in the global table #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] pub struct CounterValue { pub node_values: BTreeMap, } impl Crdt for CounterEntry { fn merge(&mut self, other: &Self) { for (name, e2) in other.values.iter() { if let Some(e) = self.values.get_mut(name) { e.merge(e2); } else { self.values.insert(name.clone(), e2.clone()); } } } } impl Crdt for CounterValue { fn merge(&mut self, other: &Self) { for (node, (t2, e2)) in other.node_values.iter() { if let Some((t, e)) = self.node_values.get_mut(node) { if t2 > t { *e = *e2; } } else { self.node_values.insert(*node, (*t2, *e2)); } } } } pub struct CounterTable { _phantom_t: PhantomData, } impl TableSchema for CounterTable { const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME; type P = T::CP; type S = T::CS; type E = CounterEntry; type Filter = (DeletedFilter, Vec); fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { if filter.0 == DeletedFilter::Any { return true; } let is_tombstone = entry .filtered_values_with_nodes(&filter.1[..]) .iter() .all(|(_, v)| *v == 0); filter.0.apply(is_tombstone) } } // ---- pub struct IndexCounter { this_node: Uuid, local_counter: db::Tree, pub table: Arc, TableShardedReplication>>, } impl IndexCounter { pub fn new( system: Arc, replication: TableShardedReplication, db: &db::Db, ) -> Arc { Arc::new(Self { this_node: system.id, local_counter: db .open_tree(format!("local_counter_v2:{}", T::COUNTER_TABLE_NAME)) .expect("Unable to open local counter tree"), table: Table::new( CounterTable { _phantom_t: Default::default(), }, replication, system, db, ), }) } pub fn spawn_workers(&self, bg: &BackgroundRunner) { self.table.spawn_workers(bg); } pub fn count( &self, tx: &mut db::Transaction, old: Option<&T>, new: Option<&T>, ) -> db::TxResult<(), Error> { let pk = old .map(|e| e.counter_partition_key()) .unwrap_or_else(|| new.unwrap().counter_partition_key()); let sk = old .map(|e| e.counter_sort_key()) .unwrap_or_else(|| new.unwrap().counter_sort_key()); // calculate counter differences let mut counts = HashMap::new(); for (k, v) in old.map(|x| x.counts()).unwrap_or_default() { *counts.entry(k).or_insert(0) -= v; } for (k, v) in new.map(|x| x.counts()).unwrap_or_default() { *counts.entry(k).or_insert(0) += v; } // update local counter table let tree_key = self.table.data.tree_key(pk, sk); let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { Some(old_bytes) => { rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes) .map_err(Error::RmpDecode) .map_err(db::TxError::Abort)? } None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), values: BTreeMap::new(), }, }; let now = now_msec(); for (s, inc) in counts.iter() { let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0)); ent.0 = std::cmp::max(ent.0 + 1, now); ent.1 += *inc; } let new_entry_bytes = rmp_to_vec_all_named(&entry) .map_err(Error::RmpEncode) .map_err(db::TxError::Abort)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; let dist_entry = entry.into_counter_entry(self.this_node); self.table.queue_insert(tx, &dist_entry)?; Ok(()) } pub fn offline_recount_all( &self, counted_table: &Arc>, ) -> Result<(), Error> where TS: TableSchema, TR: TableReplication, { // 1. Set all old local counters to zero let now = now_msec(); let mut next_start: Option> = None; loop { let low_bound = match next_start.take() { Some(v) => Bound::Excluded(v), None => Bound::Unbounded, }; let mut batch = vec![]; for item in self.local_counter.range((low_bound, Bound::Unbounded))? { batch.push(item?); if batch.len() > 1000 { break; } } if batch.is_empty() { break; } info!("zeroing old counters... ({})", hex::encode(&batch[0].0)); for (local_counter_k, local_counter) in batch { let mut local_counter = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&local_counter)?; for (_, tv) in local_counter.values.iter_mut() { tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 = 0; } let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; self.local_counter .insert(&local_counter_k, &local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); self.local_counter .db() .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(local_counter_k); } } // 2. Recount all table entries let now = now_msec(); let mut next_start: Option> = None; loop { let low_bound = match next_start.take() { Some(v) => Bound::Excluded(v), None => Bound::Unbounded, }; let mut batch = vec![]; for item in counted_table .data .store .range((low_bound, Bound::Unbounded))? { batch.push(item?); if batch.len() > 1000 { break; } } if batch.is_empty() { break; } info!("counting entries... ({})", hex::encode(&batch[0].0)); for (counted_entry_k, counted_entry) in batch { let counted_entry = counted_table.data.decode_entry(&counted_entry)?; let pk = counted_entry.counter_partition_key(); let sk = counted_entry.counter_sort_key(); let counts = counted_entry.counts(); let local_counter_key = self.table.data.tree_key(pk, sk); let mut local_counter = match self.local_counter.get(&local_counter_key)? { Some(old_bytes) => { let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>( &old_bytes, )?; assert!(ent.pk == *pk); assert!(ent.sk == *sk); ent } None => LocalCounterEntry { pk: pk.clone(), sk: sk.clone(), values: BTreeMap::new(), }, }; for (s, v) in counts.iter() { let mut tv = local_counter.values.entry(s.to_string()).or_insert((0, 0)); tv.0 = std::cmp::max(tv.0 + 1, now); tv.1 += v; } let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; self.local_counter .insert(&local_counter_key, local_counter_bytes)?; let counter_entry = local_counter.into_counter_entry(self.this_node); self.local_counter .db() .transaction(|mut tx| self.table.queue_insert(&mut tx, &counter_entry))?; next_start = Some(counted_entry_k); } } // Done Ok(()) } } // ---- #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct LocalCounterEntry { pk: T::CP, sk: T::CS, values: BTreeMap, } impl LocalCounterEntry { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry { CounterEntry { pk: self.pk, sk: self.sk, values: self .values .into_iter() .map(|(name, (ts, v))| { let mut node_values = BTreeMap::new(); node_values.insert(this_node, (ts, v)); (name, CounterValue { node_values }) }) .collect(), } } }