forked from Deuxfleurs/garage
105 lines
2.9 KiB
Rust
105 lines
2.9 KiB
Rust
//! Implements a RangeSeenMarker, a data type used in the PollRange API
|
|
//! to indicate which items in the range have already been seen
|
|
//! and which have not been seen yet.
|
|
//!
|
|
//! It consists of a vector clock that indicates that for each node,
|
|
//! all items produced by that node with timestamps <= the value in the
|
|
//! vector clock has been seen, as well as a set of causal contexts for
|
|
//! individual items.
|
|
|
|
use std::collections::BTreeMap;
|
|
|
|
use base64::prelude::*;
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use garage_util::data::Uuid;
|
|
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
|
|
use garage_util::error::Error;
|
|
|
|
use crate::helper::error::{Error as HelperError, OkOrBadRequest};
|
|
use crate::k2v::causality::*;
|
|
use crate::k2v::item_table::*;
|
|
use crate::k2v::sub::*;
|
|
|
|
#[derive(Debug, Serialize, Deserialize, Default)]
|
|
pub struct RangeSeenMarker {
|
|
vector_clock: VectorClock,
|
|
items: BTreeMap<String, VectorClock>,
|
|
}
|
|
|
|
impl RangeSeenMarker {
|
|
pub fn new() -> Self {
|
|
Self::default()
|
|
}
|
|
|
|
pub fn restrict(&mut self, range: &PollRange) {
|
|
if let Some(start) = &range.start {
|
|
self.items = self.items.split_off(start);
|
|
}
|
|
if let Some(end) = &range.end {
|
|
self.items.split_off(end);
|
|
}
|
|
if let Some(pfx) = &range.prefix {
|
|
self.items.retain(|k, _v| k.starts_with(pfx));
|
|
}
|
|
}
|
|
|
|
pub fn mark_seen_node_items<'a, I: IntoIterator<Item = &'a K2VItem>>(
|
|
&mut self,
|
|
node: Uuid,
|
|
items: I,
|
|
) {
|
|
let node = make_node_id(node);
|
|
for item in items.into_iter() {
|
|
let cc = item.causal_context();
|
|
|
|
if let Some(ts) = cc.vector_clock.get(&node) {
|
|
let ent = self.vector_clock.entry(node).or_insert(0);
|
|
*ent = std::cmp::max(*ent, *ts);
|
|
}
|
|
|
|
if vclock_gt(&cc.vector_clock, &self.vector_clock) {
|
|
match self.items.get_mut(&item.sort_key) {
|
|
None => {
|
|
self.items.insert(item.sort_key.clone(), cc.vector_clock);
|
|
}
|
|
Some(ent) => *ent = vclock_max(ent, &cc.vector_clock),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
pub fn canonicalize(&mut self) {
|
|
let self_vc = &self.vector_clock;
|
|
self.items.retain(|_sk, vc| vclock_gt(vc, self_vc))
|
|
}
|
|
|
|
pub fn encode(&mut self) -> Result<String, Error> {
|
|
self.canonicalize();
|
|
|
|
let bytes = nonversioned_encode(&self)?;
|
|
let bytes = zstd::stream::encode_all(&mut &bytes[..], zstd::DEFAULT_COMPRESSION_LEVEL)?;
|
|
Ok(BASE64_STANDARD.encode(bytes))
|
|
}
|
|
|
|
/// Decode from msgpack+zstd+b64 representation, returns None on error.
|
|
pub fn decode(s: &str) -> Option<Self> {
|
|
let bytes = BASE64_STANDARD.decode(s).ok()?;
|
|
let bytes = zstd::stream::decode_all(&mut &bytes[..]).ok()?;
|
|
nonversioned_decode(&bytes).ok()
|
|
}
|
|
|
|
pub fn decode_helper(s: &str) -> Result<Self, HelperError> {
|
|
Self::decode(s).ok_or_bad_request("Invalid causality token")
|
|
}
|
|
|
|
pub fn is_new_item(&self, item: &K2VItem) -> bool {
|
|
let cc = item.causal_context();
|
|
vclock_gt(&cc.vector_clock, &self.vector_clock)
|
|
&& self
|
|
.items
|
|
.get(&item.sort_key)
|
|
.map(|vc| vclock_gt(&cc.vector_clock, vc))
|
|
.unwrap_or(true)
|
|
}
|
|
}
|