Integrate index counter with k2v item table
This commit is contained in:
parent
3a66cf4f7d
commit
e9b796356a
5 changed files with 99 additions and 15 deletions
|
@ -13,6 +13,7 @@ use garage_table::replication::TableFullReplication;
|
|||
use garage_table::replication::TableShardedReplication;
|
||||
use garage_table::*;
|
||||
|
||||
use crate::k2v::counter_table::*;
|
||||
use crate::k2v::item_table::*;
|
||||
use crate::k2v::rpc::*;
|
||||
use crate::s3::block_ref_table::*;
|
||||
|
@ -22,6 +23,7 @@ use crate::s3::version_table::*;
|
|||
use crate::bucket_alias_table::*;
|
||||
use crate::bucket_table::*;
|
||||
use crate::helper;
|
||||
use crate::index_counter::*;
|
||||
use crate::key_table::*;
|
||||
|
||||
/// An entire Garage full of data
|
||||
|
@ -54,6 +56,8 @@ pub struct Garage {
|
|||
|
||||
/// Table containing K2V items
|
||||
pub k2v_item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||
/// Indexing table containing K2V item counters
|
||||
pub k2v_counter_table: Arc<IndexCounter<K2VCounterTable>>,
|
||||
/// K2V RPC handler
|
||||
pub k2v_rpc: Arc<K2VRpcHandler>,
|
||||
}
|
||||
|
@ -154,7 +158,15 @@ impl Garage {
|
|||
);
|
||||
|
||||
// ---- K2V ----
|
||||
let k2v_item_table = Table::new(K2VItemTable {}, meta_rep_param, system.clone(), &db);
|
||||
let k2v_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
|
||||
let k2v_item_table = Table::new(
|
||||
K2VItemTable {
|
||||
counter_table: k2v_counter_table.clone(),
|
||||
},
|
||||
meta_rep_param,
|
||||
system.clone(),
|
||||
&db,
|
||||
);
|
||||
let k2v_rpc = K2VRpcHandler::new(system.clone(), k2v_item_table.clone());
|
||||
|
||||
info!("Initialize Garage...");
|
||||
|
@ -172,6 +184,7 @@ impl Garage {
|
|||
version_table,
|
||||
block_ref_table,
|
||||
k2v_item_table,
|
||||
k2v_counter_table,
|
||||
k2v_rpc,
|
||||
})
|
||||
}
|
||||
|
|
|
@ -4,8 +4,8 @@ use std::sync::Arc;
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_rpc::system::System;
|
||||
use garage_rpc::ring::Ring;
|
||||
use garage_rpc::system::System;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
|
||||
|
@ -42,13 +42,14 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
|
|||
}
|
||||
|
||||
impl<T: CounterSchema> CounterEntry<T> {
|
||||
pub fn filtered_values(&self, sys: System) -> HashMap<String, i64> {
|
||||
let ring: Arc<Ring> = sys.ring.borrow().clone();
|
||||
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
|
||||
let nodes = &ring.layout.node_id_vec;
|
||||
|
||||
|
||||
let mut ret = HashMap::new();
|
||||
for (name, vals) in self.values.iter() {
|
||||
let new_vals = vals.node_values.iter()
|
||||
let new_vals = vals
|
||||
.node_values
|
||||
.iter()
|
||||
.filter(|(n, _)| nodes.contains(n))
|
||||
.map(|(_, (_, v))| v)
|
||||
.collect::<Vec<_>>();
|
||||
|
@ -56,7 +57,7 @@ impl<T: CounterSchema> CounterEntry<T> {
|
|||
ret.insert(name.clone(), new_vals.iter().fold(i64::MIN, |a, b| a + *b));
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
@ -123,8 +124,12 @@ pub struct IndexCounter<T: CounterSchema> {
|
|||
}
|
||||
|
||||
impl<T: CounterSchema> IndexCounter<T> {
|
||||
pub fn new(system: Arc<System>, replication: TableShardedReplication, db: &sled::Db) -> Self {
|
||||
Self {
|
||||
pub fn new(
|
||||
system: Arc<System>,
|
||||
replication: TableShardedReplication,
|
||||
db: &sled::Db,
|
||||
) -> Arc<Self> {
|
||||
Arc::new(Self {
|
||||
this_node: system.id,
|
||||
local_counter: db
|
||||
.open_tree(format!("local_counter:{}", T::NAME))
|
||||
|
@ -137,10 +142,10 @@ impl<T: CounterSchema> IndexCounter<T> {
|
|||
system,
|
||||
db,
|
||||
),
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(String, i64)]) -> Result<(), Error> {
|
||||
pub fn count(&self, pk: &T::P, sk: &T::S, counts: &[(&str, i64)]) -> Result<(), Error> {
|
||||
let tree_key = self.table.data.tree_key(pk, sk);
|
||||
|
||||
let new_entry = self.local_counter.transaction(|tx| {
|
||||
|
@ -156,7 +161,7 @@ impl<T: CounterSchema> IndexCounter<T> {
|
|||
};
|
||||
|
||||
for (s, inc) in counts.iter() {
|
||||
let mut ent = entry.values.entry(s.clone()).or_insert((0, 0));
|
||||
let mut ent = entry.values.entry(s.to_string()).or_insert((0, 0));
|
||||
ent.0 += 1;
|
||||
ent.1 += *inc;
|
||||
}
|
||||
|
|
15
src/model/k2v/counter_table.rs
Normal file
15
src/model/k2v/counter_table.rs
Normal file
|
@ -0,0 +1,15 @@
|
|||
use garage_util::data::*;
|
||||
|
||||
use crate::index_counter::*;
|
||||
|
||||
#[derive(PartialEq, Clone)]
|
||||
pub struct K2VCounterTable;
|
||||
|
||||
impl CounterSchema for K2VCounterTable {
|
||||
const NAME: &'static str = "k2v_index_counter";
|
||||
|
||||
// Partition key = bucket id
|
||||
type P = Uuid;
|
||||
// Sort key = K2V item's partition key
|
||||
type S = String;
|
||||
}
|
|
@ -1,12 +1,15 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use garage_util::data::*;
|
||||
|
||||
use garage_table::crdt::*;
|
||||
use garage_table::*;
|
||||
|
||||
use crate::index_counter::*;
|
||||
use crate::k2v::causality::*;
|
||||
use crate::k2v::counter_table::*;
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct K2VItem {
|
||||
|
@ -105,6 +108,25 @@ impl K2VItem {
|
|||
ent.discard();
|
||||
}
|
||||
}
|
||||
|
||||
// returns counters: (non-deleted entries, non-tombstone values, bytes used)
|
||||
fn stats(&self) -> (i64, i64, i64) {
|
||||
let n_entries = if self.is_tombstone() { 0 } else { 1 };
|
||||
let n_values = self
|
||||
.values()
|
||||
.iter()
|
||||
.filter(|v| matches!(v, DvvsValue::Value(_)))
|
||||
.count() as i64;
|
||||
let n_bytes = self
|
||||
.values()
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
DvvsValue::Deleted => 0,
|
||||
DvvsValue::Value(v) => v.len() as i64,
|
||||
})
|
||||
.sum();
|
||||
(n_entries, n_values, n_bytes)
|
||||
}
|
||||
}
|
||||
|
||||
impl DvvsEntry {
|
||||
|
@ -175,7 +197,9 @@ impl Entry<K2VItemPartition, String> for K2VItem {
|
|||
}
|
||||
}
|
||||
|
||||
pub struct K2VItemTable {}
|
||||
pub struct K2VItemTable {
|
||||
pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub struct ItemFilter {
|
||||
|
@ -191,8 +215,34 @@ impl TableSchema for K2VItemTable {
|
|||
type E = K2VItem;
|
||||
type Filter = ItemFilter;
|
||||
|
||||
fn updated(&self, _old: Option<&Self::E>, _new: Option<&Self::E>) {
|
||||
// nothing for now
|
||||
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
|
||||
let (old_entries, old_values, old_bytes) = match old {
|
||||
None => (0, 0, 0),
|
||||
Some(e) => e.stats(),
|
||||
};
|
||||
let (new_entries, new_values, new_bytes) = match new {
|
||||
None => (0, 0, 0),
|
||||
Some(e) => e.stats(),
|
||||
};
|
||||
|
||||
let count_pk = old
|
||||
.map(|e| e.partition.bucket_id)
|
||||
.unwrap_or_else(|| new.unwrap().partition.bucket_id);
|
||||
let count_sk = old
|
||||
.map(|e| &e.partition.partition_key)
|
||||
.unwrap_or_else(|| &new.unwrap().partition.partition_key);
|
||||
|
||||
if let Err(e) = self.counter_table.count(
|
||||
&count_pk,
|
||||
count_sk,
|
||||
&[
|
||||
("entries", new_entries - old_entries),
|
||||
("values", new_values - old_values),
|
||||
("bytes", new_bytes - old_bytes),
|
||||
],
|
||||
) {
|
||||
error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);
|
||||
}
|
||||
}
|
||||
|
||||
#[allow(clippy::nonminimal_bool)]
|
||||
|
|
|
@ -1,5 +1,6 @@
|
|||
pub mod causality;
|
||||
|
||||
pub mod counter_table;
|
||||
pub mod item_table;
|
||||
|
||||
pub mod rpc;
|
||||
|
|
Loading…
Add table
Reference in a new issue