use serde::{Deserialize, Serialize}; use garage_util::data::*; /// Conflict-free replicated data type (CRDT) /// /// CRDT are a type of data structures that do not require coordination. /// In other words, we can edit them in parallel, we will always /// find a way to merge it. /// /// A general example is a counter. Its initial value is 0. /// Alice and Bob get a copy of the counter. /// Alice does +1 on her copy, she reads 1. /// Bob does +3 on his copy, he reads 3. /// Now, it is easy to merge their counters, order does not count: /// we always get 4. /// /// Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type) pub trait CRDT { /// Merge the two datastructures according to the CRDT rules /// /// # Arguments /// /// * `other` - the other copy of the CRDT fn merge(&mut self, other: &Self); } impl CRDT for T where T: Ord + Clone, { fn merge(&mut self, other: &Self) { if other > self { *self = other.clone(); } } } // ---- LWW Register ---- /// Last Write Win (LWW) /// /// LWW is based on time, the most recent write wins. /// As multiple computers clocks are always desynchronized, /// when operations are close enough, it is equivalent to /// take one copy and drop the other one. /// /// Given that clocks are not too desynchronized, this assumption /// is enough for most cases, as there is few chance that two humans /// coordonate themself faster than the time difference between two NTP servers. /// /// As a more concret example, let's suppose you want to upload a file /// with the same key (path) in the same bucket at the very same time. /// For each request, the file will be timestamped by the receiving server /// and may differ from what you observed with your atomic clock! /// /// This scheme is used by AWS S3 or Soundcloud and often without knowing /// in entreprise when reconciliating databases with ad-hoc scripts. #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LWW { ts: u64, v: T, } impl LWW where T: CRDT, { /// Creates a new CRDT /// /// CRDT's internal timestamp is set with current node's clock. pub fn new(value: T) -> Self { Self { ts: now_msec(), v: value, } } /// Build a new CRDT from a previous non-compatible one /// /// Compared to new, the CRDT's timestamp is not set to now /// but must be set to the previous, non-compatible, CRDT's timestamp. pub fn migrate_from_raw(ts: u64, value: T) -> Self { Self { ts, v: value } } /// Update the LWW CRDT while keeping some causal ordering. pub fn update(&mut self, new_value: T) { self.ts = std::cmp::max(self.ts + 1, now_msec()); self.v = new_value; } /// Get the CRDT value pub fn get(&self) -> &T { &self.v } /// Get a mutable value for the CRDT pub fn get_mut(&mut self) -> &mut T { &mut self.v } } impl CRDT for LWW where T: Clone + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { self.ts = other.ts; self.v = other.v.clone(); } else if other.ts == self.ts { self.v.merge(&other.v); } } } /// Boolean /// /// with True as absorbing state #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] pub struct Bool(bool); impl Bool { pub fn new(b: bool) -> Self { Self(b) } pub fn set(&mut self) { self.0 = true; } pub fn get(&self) -> bool { self.0 } } impl CRDT for Bool { fn merge(&mut self, other: &Self) { self.0 = self.0 || other.0; } } /// Last Write Win Map /// /// #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LWWMap { vals: Vec<(K, u64, V)>, } impl LWWMap where K: Ord, V: CRDT, { pub fn new() -> Self { Self { vals: vec![] } } pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { Self { vals: vec![(k, ts, v)], } } pub fn take_and_clear(&mut self) -> Self { let vals = std::mem::replace(&mut self.vals, vec![]); Self { vals } } pub fn clear(&mut self) { self.vals.clear(); } pub fn update_mutator(&self, k: K, new_v: V) -> Self { let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { let (_, old_ts, _) = self.vals[i]; let new_ts = std::cmp::max(old_ts + 1, now_msec()); vec![(k, new_ts, new_v)] } Err(_) => vec![(k, now_msec(), new_v)], }; Self { vals: new_vals } } pub fn get(&self, k: &K) -> Option<&V> { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => Some(&self.vals[i].2), Err(_) => None, } } pub fn items(&self) -> &[(K, u64, V)] { &self.vals[..] } } impl CRDT for LWWMap where K: Clone + Ord, V: Clone + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { let (_, ts1, _v1) = &self.vals[i]; if ts2 > ts1 { self.vals[i].1 = *ts2; self.vals[i].2 = v2.clone(); } else if ts1 == ts2 { self.vals[i].2.merge(&v2); } } Err(i) => { self.vals.insert(i, (k.clone(), *ts2, v2.clone())); } } } } }