//! Implements a RangeSeenMarker, a data type used in the PollRange API //! to indicate which items in the range have already been seen //! and which have not been seen yet. //! //! It consists of a vector clock that indicates that for each node, //! all items produced by that node with timestamps <= the value in the //! vector clock has been seen, as well as a set of causal contexts for //! individual items. use std::collections::BTreeMap; use serde::{Deserialize, Serialize}; use garage_util::data::Uuid; use garage_util::encode::{nonversioned_decode, nonversioned_encode}; use garage_util::error::{Error, OkOrMessage}; use crate::k2v::causality::*; use crate::k2v::item_table::*; #[derive(Debug, Serialize, Deserialize, Default)] pub struct RangeSeenMarker { vector_clock: VectorClock, items: BTreeMap, } impl RangeSeenMarker { pub fn new() -> Self { Self::default() } pub fn mark_seen_node_items<'a, I: IntoIterator>( &mut self, node: Uuid, items: I, ) { let node = make_node_id(node); for item in items.into_iter() { let cc = item.causal_context(); if let Some(ts) = cc.vector_clock.get(&node) { let ent = self.vector_clock.entry(node).or_insert(0); *ent = std::cmp::max(*ent, *ts); } if vclock_gt(&cc.vector_clock, &self.vector_clock) { match self.items.get_mut(&item.sort_key) { None => { self.items.insert(item.sort_key.clone(), cc.vector_clock); } Some(ent) => *ent = vclock_max(&ent, &cc.vector_clock), } } } } pub fn canonicalize(&mut self) { let self_vc = &self.vector_clock; self.items.retain(|_sk, vc| vclock_gt(&vc, self_vc)) } pub fn encode(&mut self) -> Result { self.canonicalize(); let bytes = nonversioned_encode(&self)?; let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?; Ok(base64::encode(&bytes)) } pub fn decode(s: &str) -> Result { let bytes = base64::decode(&s).ok_or_message("invalid base64")?; let bytes = zstd::stream::decode_all(&mut &bytes[..])?; Ok(nonversioned_decode(&bytes)?) } pub fn is_new_item(&self, item: &K2VItem) -> bool { let cc = item.causal_context(); vclock_gt(&cc.vector_clock, &self.vector_clock) && self .items .get(&item.sort_key) .map(|vc| vclock_gt(&cc.vector_clock, &vc)) .unwrap_or(true) } }