K2V #293

Merged
lx merged 68 commits from k2v into main 2022-05-10 11:16:58 +00:00
3 changed files with 29 additions and 17 deletions
Showing only changes of commit 99e7c3396c - Show all commits

View file

@ -10,6 +10,7 @@ use garage_rpc::ring::Ring;
use garage_table::util::*; use garage_table::util::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_model::k2v::counter_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::error::*; use crate::error::*;
use crate::k2v::range::read_range; use crate::k2v::range::read_range;
@ -35,9 +36,10 @@ pub async fn handle_read_index(
) )
.await?; .await?;
let s_entries = "entries".to_string(); let s_entries = ENTRIES.to_string();
let s_values = "values".to_string(); let s_conflicts = CONFLICTS.to_string();
let s_bytes = "bytes".to_string(); let s_values = VALUES.to_string();
let s_bytes = BYTES.to_string();
let resp = ReadIndexResponse { let resp = ReadIndexResponse {
prefix, prefix,
@ -51,6 +53,7 @@ pub async fn handle_read_index(
ReadIndexResponseEntry { ReadIndexResponseEntry {
pk: part.sk, pk: part.sk,
entries: *vals.get(&s_entries).unwrap_or(&0), entries: *vals.get(&s_entries).unwrap_or(&0),
conflicts: *vals.get(&s_conflicts).unwrap_or(&0),
values: *vals.get(&s_values).unwrap_or(&0), values: *vals.get(&s_values).unwrap_or(&0),
bytes: *vals.get(&s_bytes).unwrap_or(&0), bytes: *vals.get(&s_bytes).unwrap_or(&0),
} }
@ -85,6 +88,7 @@ struct ReadIndexResponse {
struct ReadIndexResponseEntry { struct ReadIndexResponseEntry {
pk: String, pk: String,
entries: i64, entries: i64,
conflicts: i64,
values: i64, values: i64,
bytes: i64, bytes: i64,
} }

View file

@ -2,6 +2,11 @@ use garage_util::data::*;
use crate::index_counter::*; use crate::index_counter::*;
pub const ENTRIES: &'static str = "entries";
pub const CONFLICTS: &'static str = "conflicts";
pub const VALUES: &'static str = "values";
pub const BYTES: &'static str = "bytes";
#[derive(PartialEq, Clone)] #[derive(PartialEq, Clone)]
pub struct K2VCounterTable; pub struct K2VCounterTable;

View file

@ -109,23 +109,25 @@ impl K2VItem {
} }
} }
// returns counters: (non-deleted entries, non-tombstone values, bytes used) // returns counters: (non-deleted entries, conflict entries, non-tombstone values, bytes used)
fn stats(&self) -> (i64, i64, i64) { fn stats(&self) -> (i64, i64, i64, i64) {
let values = self.values();
let n_entries = if self.is_tombstone() { 0 } else { 1 }; let n_entries = if self.is_tombstone() { 0 } else { 1 };
let n_values = self let n_conflicts = if values.len() > 1 { 1 } else { 0 };
.values() let n_values = values
.iter() .iter()
.filter(|v| matches!(v, DvvsValue::Value(_))) .filter(|v| matches!(v, DvvsValue::Value(_)))
.count() as i64; .count() as i64;
let n_bytes = self let n_bytes = values
.values()
.iter() .iter()
.map(|v| match v { .map(|v| match v {
DvvsValue::Deleted => 0, DvvsValue::Deleted => 0,
DvvsValue::Value(v) => v.len() as i64, DvvsValue::Value(v) => v.len() as i64,
}) })
.sum(); .sum();
(n_entries, n_values, n_bytes)
(n_entries, n_conflicts, n_values, n_bytes)
} }
} }
@ -216,12 +218,12 @@ impl TableSchema for K2VItemTable {
type Filter = ItemFilter; type Filter = ItemFilter;
fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) { fn updated(&self, old: Option<&Self::E>, new: Option<&Self::E>) {
let (old_entries, old_values, old_bytes) = match old { let (old_entries, old_conflicts, old_values, old_bytes) = match old {
None => (0, 0, 0), None => (0, 0, 0, 0),
Some(e) => e.stats(), Some(e) => e.stats(),
}; };
let (new_entries, new_values, new_bytes) = match new { let (new_entries, new_conflicts, new_values, new_bytes) = match new {
None => (0, 0, 0), None => (0, 0, 0, 0),
Some(e) => e.stats(), Some(e) => e.stats(),
}; };
@ -236,9 +238,10 @@ impl TableSchema for K2VItemTable {
&count_pk, &count_pk,
count_sk, count_sk,
&[ &[
("entries", new_entries - old_entries), (ENTRIES, new_entries - old_entries),
("values", new_values - old_values), (CONFLICTS, new_conflicts - old_conflicts),
("bytes", new_bytes - old_bytes), (VALUES, new_values - old_values),
(BYTES, new_bytes - old_bytes),
], ],
) { ) {
error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e); error!("Could not update K2V counter for bucket {:?} partition {}; counts will now be inconsistent. {}", count_pk, count_sk, e);