diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 9a692870..665b02b2 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -1,3 +1,13 @@ +//! Implements a CausalContext, which is a set of timestamps for each +//! node -- a vector clock --, indicating that the versions with +//! timestamps <= these numbers have been seen and can be +//! overwritten by a subsequent write. +//! +//! The textual representation of a CausalContext, which we call a +//! "causality token", is used in the API and must be sent along with +//! each write or delete operation to indicate the previously seen +//! versions that we want to overwrite or delete. + use std::collections::BTreeMap; use std::convert::TryInto; @@ -9,23 +19,36 @@ use garage_util::data::*; /// of full Garage node IDs which are 256-bit UUIDs. pub type K2VNodeId = u64; +pub type VectorClock = BTreeMap; + pub fn make_node_id(node_id: Uuid) -> K2VNodeId { let mut tmp = [0u8; 8]; tmp.copy_from_slice(&node_id.as_slice()[..8]); u64::from_be_bytes(tmp) } -#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +pub fn vclock_gt(a: &VectorClock, b: &VectorClock) -> bool { + a.iter().any(|(n, ts)| ts > b.get(n).unwrap_or(&0)) +} + +pub fn vclock_max(a: &VectorClock, b: &VectorClock) -> VectorClock { + let mut ret = a.clone(); + for (n, ts) in b.iter() { + let ent = ret.entry(*n).or_insert(0); + *ent = std::cmp::max(*ts, *ent); + } + ret +} + +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Default)] pub struct CausalContext { - pub vector_clock: BTreeMap, + pub vector_clock: VectorClock, } impl CausalContext { /// Empty causality context - pub fn new_empty() -> Self { - Self { - vector_clock: BTreeMap::new(), - } + pub fn new() -> Self { + Self::default() } /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { @@ -72,9 +95,7 @@ impl CausalContext { } /// Check if this causal context contains newer items than another one pub fn is_newer_than(&self, other: &Self) -> bool { - self.vector_clock - .iter() - .any(|(k, v)| v > other.vector_clock.get(k).unwrap_or(&0)) + vclock_gt(&self.vector_clock, &other.vector_clock) } } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index 1ba9bb46..bc2b1aef 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -106,7 +106,7 @@ impl K2VItem { /// Extract the causality context of a K2V Item pub fn causal_context(&self) -> CausalContext { - let mut cc = CausalContext::new_empty(); + let mut cc = CausalContext::new(); for (node, ent) in self.items.iter() { cc.vector_clock.insert(*node, ent.max_time()); } diff --git a/src/model/k2v/mod.rs b/src/model/k2v/mod.rs index 83ad2512..c488b4c6 100644 --- a/src/model/k2v/mod.rs +++ b/src/model/k2v/mod.rs @@ -1,4 +1,5 @@ pub mod causality; +pub mod seen; pub mod item_table; diff --git a/src/model/k2v/seen.rs b/src/model/k2v/seen.rs new file mode 100644 index 00000000..b8f4ff27 --- /dev/null +++ b/src/model/k2v/seen.rs @@ -0,0 +1,85 @@ +//! Implements a RangeSeenMarker, a data type used in the PollRange API +//! to indicate which items in the range have already been seen +//! and which have not been seen yet. +//! +//! It consists of a vector clock that indicates that for each node, +//! all items produced by that node with timestamps <= the value in the +//! vector clock has been seen, as well as a set of causal contexts for +//! individual items. + +use std::collections::BTreeMap; + +use serde::{Deserialize, Serialize}; + +use garage_util::data::Uuid; +use garage_util::encode::{nonversioned_decode, nonversioned_encode}; +use garage_util::error::{Error, OkOrMessage}; + +use crate::k2v::causality::*; +use crate::k2v::item_table::*; + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct RangeSeenMarker { + vector_clock: VectorClock, + items: BTreeMap, +} + +impl RangeSeenMarker { + pub fn new() -> Self { + Self::default() + } + + pub fn mark_seen_node_items<'a, I: IntoIterator>( + &mut self, + node: Uuid, + items: I, + ) { + let node = make_node_id(node); + for item in items.into_iter() { + let cc = item.causal_context(); + + if let Some(ts) = cc.vector_clock.get(&node) { + let ent = self.vector_clock.entry(node).or_insert(0); + *ent = std::cmp::max(*ent, *ts); + } + + if vclock_gt(&cc.vector_clock, &self.vector_clock) { + match self.items.get_mut(&item.sort_key) { + None => { + self.items.insert(item.sort_key.clone(), cc.vector_clock); + } + Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock), + } + } + } + } + + pub fn canonicalize(&mut self) { + let self_vc = &self.vector_clock; + self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc)) + } + + pub fn encode(&mut self) -> Result { + self.canonicalize(); + + let bytes = nonversioned_encode(&self)?; + let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; + Ok(base64::encode(&bytes)) + } + + pub fn decode(s: &str) -> Result { + let bytes = base64::decode(&s).ok_or_message("invalid base64")?; + let bytes = zstd::stream::decode_all(&mut &bytes[..])?; + Ok(nonversioned_decode(&bytes)?) + } + + pub fn is_new_item(&self, item: &K2VItem) -> bool { + let cc = item.causal_context(); + vclock_gt(&cc.vector_clock, &self.vector_clock) + && self + .items + .get(&item.sort_key) + .map(|vc| vclock_gt(&cc.vector_clock, &vc)) + .unwrap_or(true) + } +}