Some refactoring of the index counter API
This commit is contained in:
parent
b44d3fc796
commit
425fe56be8
6 changed files with 106 additions and 108 deletions
|
@ -10,7 +10,7 @@ use garage_rpc::ring::Ring;
|
||||||
use garage_table::util::*;
|
use garage_table::util::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
|
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
|
||||||
|
|
||||||
use crate::k2v::error::*;
|
use crate::k2v::error::*;
|
||||||
use crate::k2v::range::read_range;
|
use crate::k2v::range::read_range;
|
||||||
|
|
|
@ -27,7 +27,7 @@ use crate::key_table::*;
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
use crate::index_counter::*;
|
use crate::index_counter::*;
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
use crate::k2v::{counter_table::*, item_table::*, poll::*, rpc::*};
|
use crate::k2v::{item_table::*, poll::*, rpc::*};
|
||||||
|
|
||||||
/// An entire Garage full of data
|
/// An entire Garage full of data
|
||||||
pub struct Garage {
|
pub struct Garage {
|
||||||
|
@ -66,7 +66,7 @@ pub struct GarageK2V {
|
||||||
/// Table containing K2V items
|
/// Table containing K2V items
|
||||||
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
pub item_table: Arc<Table<K2VItemTable, TableShardedReplication>>,
|
||||||
/// Indexing table containing K2V item counters
|
/// Indexing table containing K2V item counters
|
||||||
pub counter_table: Arc<IndexCounter<K2VCounterTable>>,
|
pub counter_table: Arc<IndexCounter<K2VItem>>,
|
||||||
/// K2V RPC handler
|
/// K2V RPC handler
|
||||||
pub rpc: Arc<K2VRpcHandler>,
|
pub rpc: Arc<K2VRpcHandler>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,25 +17,30 @@ use garage_table::crdt::*;
|
||||||
use garage_table::replication::TableShardedReplication;
|
use garage_table::replication::TableShardedReplication;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
pub trait CounterSchema: Clone + PartialEq + Send + Sync + 'static {
|
pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
|
||||||
const NAME: &'static str;
|
const COUNTER_TABLE_NAME: &'static str;
|
||||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
|
||||||
type S: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type CP: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
type CS: SortKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
|
||||||
|
fn counter_partition_key(&self) -> &Self::CP;
|
||||||
|
fn counter_sort_key(&self) -> &Self::CS;
|
||||||
|
fn counts(&self) -> Vec<(&'static str, i64)>;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A counter entry in the global table
|
/// A counter entry in the global table
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
|
||||||
pub struct CounterEntry<T: CounterSchema> {
|
pub struct CounterEntry<T: CountedItem> {
|
||||||
pub pk: T::P,
|
pub pk: T::CP,
|
||||||
pub sk: T::S,
|
pub sk: T::CS,
|
||||||
pub values: BTreeMap<String, CounterValue>,
|
pub values: BTreeMap<String, CounterValue>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
|
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
|
||||||
fn partition_key(&self) -> &T::P {
|
fn partition_key(&self) -> &T::CP {
|
||||||
&self.pk
|
&self.pk
|
||||||
}
|
}
|
||||||
fn sort_key(&self) -> &T::S {
|
fn sort_key(&self) -> &T::CS {
|
||||||
&self.sk
|
&self.sk
|
||||||
}
|
}
|
||||||
fn is_tombstone(&self) -> bool {
|
fn is_tombstone(&self) -> bool {
|
||||||
|
@ -45,7 +50,7 @@ impl<T: CounterSchema> Entry<T::P, T::S> for CounterEntry<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: CounterSchema> CounterEntry<T> {
|
impl<T: CountedItem> CounterEntry<T> {
|
||||||
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
|
pub fn filtered_values(&self, ring: &Ring) -> HashMap<String, i64> {
|
||||||
let nodes = &ring.layout.node_id_vec[..];
|
let nodes = &ring.layout.node_id_vec[..];
|
||||||
self.filtered_values_with_nodes(nodes)
|
self.filtered_values_with_nodes(nodes)
|
||||||
|
@ -78,7 +83,7 @@ pub struct CounterValue {
|
||||||
pub node_values: BTreeMap<Uuid, (u64, i64)>,
|
pub node_values: BTreeMap<Uuid, (u64, i64)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: CounterSchema> Crdt for CounterEntry<T> {
|
impl<T: CountedItem> Crdt for CounterEntry<T> {
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
for (name, e2) in other.values.iter() {
|
for (name, e2) in other.values.iter() {
|
||||||
if let Some(e) = self.values.get_mut(name) {
|
if let Some(e) = self.values.get_mut(name) {
|
||||||
|
@ -104,15 +109,15 @@ impl Crdt for CounterValue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct CounterTable<T: CounterSchema> {
|
pub struct CounterTable<T: CountedItem> {
|
||||||
_phantom_t: PhantomData<T>,
|
_phantom_t: PhantomData<T>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: CounterSchema> TableSchema for CounterTable<T> {
|
impl<T: CountedItem> TableSchema for CounterTable<T> {
|
||||||
const TABLE_NAME: &'static str = T::NAME;
|
const TABLE_NAME: &'static str = T::COUNTER_TABLE_NAME;
|
||||||
|
|
||||||
type P = T::P;
|
type P = T::CP;
|
||||||
type S = T::S;
|
type S = T::CS;
|
||||||
type E = CounterEntry<T>;
|
type E = CounterEntry<T>;
|
||||||
type Filter = (DeletedFilter, Vec<Uuid>);
|
type Filter = (DeletedFilter, Vec<Uuid>);
|
||||||
|
|
||||||
|
@ -131,14 +136,14 @@ impl<T: CounterSchema> TableSchema for CounterTable<T> {
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
pub struct IndexCounter<T: CounterSchema> {
|
pub struct IndexCounter<T: CountedItem> {
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
local_counter: db::Tree,
|
local_counter: db::Tree,
|
||||||
propagate_tx: mpsc::UnboundedSender<(T::P, T::S, LocalCounterEntry)>,
|
propagate_tx: mpsc::UnboundedSender<(T::CP, T::CS, LocalCounterEntry)>,
|
||||||
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
pub table: Arc<Table<CounterTable<T>, TableShardedReplication>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: CounterSchema> IndexCounter<T> {
|
impl<T: CountedItem> IndexCounter<T> {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
system: Arc<System>,
|
system: Arc<System>,
|
||||||
replication: TableShardedReplication,
|
replication: TableShardedReplication,
|
||||||
|
@ -151,7 +156,7 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
let this = Arc::new(Self {
|
let this = Arc::new(Self {
|
||||||
this_node: system.id,
|
this_node: system.id,
|
||||||
local_counter: db
|
local_counter: db
|
||||||
.open_tree(format!("local_counter:{}", T::NAME))
|
.open_tree(format!("local_counter:{}", T::COUNTER_TABLE_NAME))
|
||||||
.expect("Unable to open local counter tree"),
|
.expect("Unable to open local counter tree"),
|
||||||
propagate_tx,
|
propagate_tx,
|
||||||
table: Table::new(
|
table: Table::new(
|
||||||
|
@ -166,7 +171,7 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
|
|
||||||
let this2 = this.clone();
|
let this2 = this.clone();
|
||||||
background.spawn_worker(
|
background.spawn_worker(
|
||||||
format!("{} index counter propagator", T::NAME),
|
format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
|
||||||
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
|
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
|
||||||
);
|
);
|
||||||
this
|
this
|
||||||
|
@ -175,10 +180,26 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
pub fn count(
|
pub fn count(
|
||||||
&self,
|
&self,
|
||||||
tx: &mut db::Transaction,
|
tx: &mut db::Transaction,
|
||||||
pk: &T::P,
|
old: Option<&T>,
|
||||||
sk: &T::S,
|
new: Option<&T>,
|
||||||
counts: &[(&str, i64)],
|
|
||||||
) -> db::TxResult<(), Error> {
|
) -> db::TxResult<(), Error> {
|
||||||
|
let pk = old
|
||||||
|
.map(|e| e.counter_partition_key())
|
||||||
|
.unwrap_or_else(|| new.unwrap().counter_partition_key());
|
||||||
|
let sk = old
|
||||||
|
.map(|e| e.counter_sort_key())
|
||||||
|
.unwrap_or_else(|| new.unwrap().counter_sort_key());
|
||||||
|
|
||||||
|
// calculate counter differences
|
||||||
|
let mut counts = HashMap::new();
|
||||||
|
for (k, v) in old.map(|x| x.counts()).unwrap_or_default() {
|
||||||
|
*counts.entry(k).or_insert(0) -= v;
|
||||||
|
}
|
||||||
|
for (k, v) in new.map(|x| x.counts()).unwrap_or_default() {
|
||||||
|
*counts.entry(k).or_insert(0) += v;
|
||||||
|
}
|
||||||
|
|
||||||
|
// update local counter table
|
||||||
let tree_key = self.table.data.tree_key(pk, sk);
|
let tree_key = self.table.data.tree_key(pk, sk);
|
||||||
|
|
||||||
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
|
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
|
||||||
|
@ -213,7 +234,7 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
|
|
||||||
async fn propagate_loop(
|
async fn propagate_loop(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut propagate_rx: mpsc::UnboundedReceiver<(T::P, T::S, LocalCounterEntry)>,
|
mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry)>,
|
||||||
must_exit: watch::Receiver<bool>,
|
must_exit: watch::Receiver<bool>,
|
||||||
) {
|
) {
|
||||||
// This loop batches updates to counters to be sent all at once.
|
// This loop batches updates to counters to be sent all at once.
|
||||||
|
@ -255,10 +276,10 @@ impl<T: CounterSchema> IndexCounter<T> {
|
||||||
if let Err(e) = self.table.insert_many(entries).await {
|
if let Err(e) = self.table.insert_many(entries).await {
|
||||||
errors += 1;
|
errors += 1;
|
||||||
if errors >= 2 && *must_exit.borrow() {
|
if errors >= 2 && *must_exit.borrow() {
|
||||||
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::NAME, buf.len(), e);
|
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::NAME, buf.len(), e, errors);
|
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
|
||||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -280,11 +301,11 @@ struct LocalCounterEntry {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LocalCounterEntry {
|
impl LocalCounterEntry {
|
||||||
fn into_counter_entry<T: CounterSchema>(
|
fn into_counter_entry<T: CountedItem>(
|
||||||
self,
|
self,
|
||||||
this_node: Uuid,
|
this_node: Uuid,
|
||||||
pk: T::P,
|
pk: T::CP,
|
||||||
sk: T::S,
|
sk: T::CS,
|
||||||
) -> CounterEntry<T> {
|
) -> CounterEntry<T> {
|
||||||
CounterEntry {
|
CounterEntry {
|
||||||
pk,
|
pk,
|
||||||
|
|
|
@ -1,20 +0,0 @@
|
||||||
use garage_util::data::*;
|
|
||||||
|
|
||||||
use crate::index_counter::*;
|
|
||||||
|
|
||||||
pub const ENTRIES: &str = "entries";
|
|
||||||
pub const CONFLICTS: &str = "conflicts";
|
|
||||||
pub const VALUES: &str = "values";
|
|
||||||
pub const BYTES: &str = "bytes";
|
|
||||||
|
|
||||||
#[derive(PartialEq, Clone)]
|
|
||||||
pub struct K2VCounterTable;
|
|
||||||
|
|
||||||
impl CounterSchema for K2VCounterTable {
|
|
||||||
const NAME: &'static str = "k2v_index_counter";
|
|
||||||
|
|
||||||
// Partition key = bucket id
|
|
||||||
type P = Uuid;
|
|
||||||
// Sort key = K2V item's partition key
|
|
||||||
type S = String;
|
|
||||||
}
|
|
|
@ -10,9 +10,13 @@ use garage_table::*;
|
||||||
|
|
||||||
use crate::index_counter::*;
|
use crate::index_counter::*;
|
||||||
use crate::k2v::causality::*;
|
use crate::k2v::causality::*;
|
||||||
use crate::k2v::counter_table::*;
|
|
||||||
use crate::k2v::poll::*;
|
use crate::k2v::poll::*;
|
||||||
|
|
||||||
|
pub const ENTRIES: &str = "entries";
|
||||||
|
pub const CONFLICTS: &str = "conflicts";
|
||||||
|
pub const VALUES: &str = "values";
|
||||||
|
pub const BYTES: &str = "bytes";
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct K2VItem {
|
pub struct K2VItem {
|
||||||
pub partition: K2VItemPartition,
|
pub partition: K2VItemPartition,
|
||||||
|
@ -112,27 +116,6 @@ impl K2VItem {
|
||||||
ent.discard();
|
ent.discard();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
|
|
||||||
fn stats(&self) -> (i64, i64, i64, i64) {
|
|
||||||
let values = self.values();
|
|
||||||
|
|
||||||
let n_entries = if self.is_tombstone() { 0 } else { 1 };
|
|
||||||
let n_conflicts = if values.len() > 1 { 1 } else { 0 };
|
|
||||||
let n_values = values
|
|
||||||
.iter()
|
|
||||||
.filter(|v| matches!(v, DvvsValue::Value(_)))
|
|
||||||
.count() as i64;
|
|
||||||
let n_bytes = values
|
|
||||||
.iter()
|
|
||||||
.map(|v| match v {
|
|
||||||
DvvsValue::Deleted => 0,
|
|
||||||
DvvsValue::Value(v) => v.len() as i64,
|
|
||||||
})
|
|
||||||
.sum();
|
|
||||||
|
|
||||||
(n_entries, n_conflicts, n_values, n_bytes)
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DvvsEntry {
|
impl DvvsEntry {
|
||||||
|
@ -204,7 +187,7 @@ impl Entry<K2VItemPartition, String> for K2VItem {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct K2VItemTable {
|
pub struct K2VItemTable {
|
||||||
pub(crate) counter_table: Arc<IndexCounter<K2VCounterTable>>,
|
pub(crate) counter_table: Arc<IndexCounter<K2VItem>>,
|
||||||
pub(crate) subscriptions: Arc<SubscriptionManager>,
|
pub(crate) subscriptions: Arc<SubscriptionManager>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -229,40 +212,14 @@ impl TableSchema for K2VItemTable {
|
||||||
new: Option<&Self::E>,
|
new: Option<&Self::E>,
|
||||||
) -> db::TxOpResult<()> {
|
) -> db::TxOpResult<()> {
|
||||||
// 1. Count
|
// 1. Count
|
||||||
let (old_entries, old_conflicts, old_values, old_bytes) = match old {
|
let counter_res = self.counter_table.count(tx, old, new);
|
||||||
None => (0, 0, 0, 0),
|
|
||||||
Some(e) => e.stats(),
|
|
||||||
};
|
|
||||||
let (new_entries, new_conflicts, new_values, new_bytes) = match new {
|
|
||||||
None => (0, 0, 0, 0),
|
|
||||||
Some(e) => e.stats(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let count_pk = old
|
|
||||||
.map(|e| e.partition.bucket_id)
|
|
||||||
.unwrap_or_else(|| new.unwrap().partition.bucket_id);
|
|
||||||
let count_sk = old
|
|
||||||
.map(|e| &e.partition.partition_key)
|
|
||||||
.unwrap_or_else(|| &new.unwrap().partition.partition_key);
|
|
||||||
|
|
||||||
let counter_res = self.counter_table.count(
|
|
||||||
tx,
|
|
||||||
&count_pk,
|
|
||||||
count_sk,
|
|
||||||
&[
|
|
||||||
(ENTRIES, new_entries - old_entries),
|
|
||||||
(CONFLICTS, new_conflicts - old_conflicts),
|
|
||||||
(VALUES, new_values - old_values),
|
|
||||||
(BYTES, new_bytes - old_bytes),
|
|
||||||
],
|
|
||||||
);
|
|
||||||
if let Err(e) = db::unabort(counter_res)? {
|
if let Err(e) = db::unabort(counter_res)? {
|
||||||
// This result can be returned by `counter_table.count()` for instance
|
// This result can be returned by `counter_table.count()` for instance
|
||||||
// if messagepack serialization or deserialization fails at some step.
|
// if messagepack serialization or deserialization fails at some step.
|
||||||
// Warn admin but ignore this error for now, that's all we can do.
|
// Warn admin but ignore this error for now, that's all we can do.
|
||||||
error!(
|
error!(
|
||||||
"Unable to update K2V item counter for bucket {:?} partition {}: {}. Index values will be wrong!",
|
"Unable to update K2V item counter: {}. Index values will be wrong!",
|
||||||
count_pk, count_sk, e
|
e
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -282,6 +239,47 @@ impl TableSchema for K2VItemTable {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl CountedItem for K2VItem {
|
||||||
|
const COUNTER_TABLE_NAME: &'static str = "k2v_index_counter";
|
||||||
|
|
||||||
|
// Partition key = bucket id
|
||||||
|
type CP = Uuid;
|
||||||
|
// Sort key = K2V item's partition key
|
||||||
|
type CS = String;
|
||||||
|
|
||||||
|
fn counter_partition_key(&self) -> &Uuid {
|
||||||
|
&self.partition.bucket_id
|
||||||
|
}
|
||||||
|
fn counter_sort_key(&self) -> &String {
|
||||||
|
&self.partition.partition_key
|
||||||
|
}
|
||||||
|
|
||||||
|
fn counts(&self) -> Vec<(&'static str, i64)> {
|
||||||
|
let values = self.values();
|
||||||
|
|
||||||
|
let n_entries = if self.is_tombstone() { 0 } else { 1 };
|
||||||
|
let n_conflicts = if values.len() > 1 { 1 } else { 0 };
|
||||||
|
let n_values = values
|
||||||
|
.iter()
|
||||||
|
.filter(|v| matches!(v, DvvsValue::Value(_)))
|
||||||
|
.count() as i64;
|
||||||
|
let n_bytes = values
|
||||||
|
.iter()
|
||||||
|
.map(|v| match v {
|
||||||
|
DvvsValue::Deleted => 0,
|
||||||
|
DvvsValue::Value(v) => v.len() as i64,
|
||||||
|
})
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
vec![
|
||||||
|
(ENTRIES, n_entries),
|
||||||
|
(CONFLICTS, n_conflicts),
|
||||||
|
(VALUES, n_values),
|
||||||
|
(BYTES, n_bytes),
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
|
|
@ -1,6 +1,5 @@
|
||||||
pub mod causality;
|
pub mod causality;
|
||||||
|
|
||||||
pub mod counter_table;
|
|
||||||
pub mod item_table;
|
pub mod item_table;
|
||||||
|
|
||||||
pub mod poll;
|
pub mod poll;
|
||||||
|
|
Loading…
Reference in a new issue