Refactor model stuff, including cleaner CRDTs

This commit is contained in:
Alex 2021-03-10 16:21:56 +01:00
parent 6a3dcf3974
commit f319a7d374
20 changed files with 591 additions and 457 deletions

View file

@ -66,25 +66,28 @@ pub async fn handle_copy(
.await?; .await?;
let source_version = source_version.ok_or(Error::NotFound)?; let source_version = source_version.ok_or(Error::NotFound)?;
let dest_version = Version::new( let mut dest_version = Version::new(
new_uuid, new_uuid,
dest_bucket.to_string(), dest_bucket.to_string(),
dest_key.to_string(), dest_key.to_string(),
false, false,
source_version.blocks().to_vec(),
); );
for (bk, bv) in source_version.blocks.items().iter() {
dest_version.blocks.put(*bk, *bv);
}
let dest_object = Object::new( let dest_object = Object::new(
dest_bucket.to_string(), dest_bucket.to_string(),
dest_key.to_string(), dest_key.to_string(),
vec![dest_object_version], vec![dest_object_version],
); );
let dest_block_refs = dest_version let dest_block_refs = dest_version
.blocks() .blocks
.items()
.iter() .iter()
.map(|b| BlockRef { .map(|b| BlockRef {
block: b.hash, block: b.1.hash,
version: new_uuid, version: new_uuid,
deleted: false, deleted: false.into(),
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
futures::try_join!( futures::try_join!(

View file

@ -146,9 +146,10 @@ pub async fn handle_get(
let version = version.ok_or(Error::NotFound)?; let version = version.ok_or(Error::NotFound)?;
let mut blocks = version let mut blocks = version
.blocks() .blocks
.items()
.iter() .iter()
.map(|vb| (vb.hash, None)) .map(|(_, vb)| (vb.hash, None))
.collect::<Vec<_>>(); .collect::<Vec<_>>();
blocks[0].1 = Some(first_block); blocks[0].1 = Some(first_block);
@ -219,11 +220,12 @@ pub async fn handle_get_range(
// file (whereas block.offset designates the offset of the block WITHIN THE PART // file (whereas block.offset designates the offset of the block WITHIN THE PART
// block.part_number, which is not the same in the case of a multipart upload) // block.part_number, which is not the same in the case of a multipart upload)
let mut blocks = Vec::with_capacity(std::cmp::min( let mut blocks = Vec::with_capacity(std::cmp::min(
version.blocks().len(), version.blocks.len(),
4 + ((end - begin) / std::cmp::max(version.blocks()[0].size as u64, 1024)) as usize, 4 + ((end - begin) / std::cmp::max(version.blocks.items()[0].1.size as u64, 1024))
as usize,
)); ));
let mut true_offset = 0; let mut true_offset = 0;
for b in version.blocks().iter() { for (_, b) in version.blocks.items().iter() {
if true_offset >= end { if true_offset >= end {
break; break;
} }

View file

@ -94,7 +94,7 @@ pub async fn handle_put(
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
// Initialize corresponding entry in version table // Initialize corresponding entry in version table
let version = Version::new(version_uuid, bucket.into(), key.into(), false, vec![]); let version = Version::new(version_uuid, bucket.into(), key.into(), false);
let first_block_hash = sha256sum(&first_block[..]); let first_block_hash = sha256sum(&first_block[..]);
// Transfer data and verify checksum // Transfer data and verify checksum
@ -242,19 +242,18 @@ async fn put_block_meta(
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
// TODO: don't clone, restart from empty block list ?? // TODO: don't clone, restart from empty block list ??
let mut version = version.clone(); let mut version = version.clone();
version version.blocks.put(
.add_block(VersionBlock { VersionBlockKey {
part_number, part_number,
offset, offset,
hash, },
size, VersionBlock { hash, size },
}) );
.unwrap();
let block_ref = BlockRef { let block_ref = BlockRef {
block: hash, block: hash,
version: version.uuid, version: version.uuid,
deleted: false, deleted: false.into(),
}; };
futures::try_join!( futures::try_join!(
@ -389,7 +388,7 @@ pub async fn handle_put_part(
} }
// Copy block to store // Copy block to store
let version = Version::new(version_uuid, bucket, key, false, vec![]); let version = Version::new(version_uuid, bucket, key, false);
let first_block_hash = sha256sum(&first_block[..]); let first_block_hash = sha256sum(&first_block[..]);
let (_, md5sum_arr, sha256sum) = read_and_put_blocks( let (_, md5sum_arr, sha256sum) = read_and_put_blocks(
&garage, &garage,
@ -454,7 +453,7 @@ pub async fn handle_complete_multipart_upload(
}; };
let version = version.ok_or(Error::BadRequest(format!("Version not found")))?; let version = version.ok_or(Error::BadRequest(format!("Version not found")))?;
if version.blocks().len() == 0 { if version.blocks.len() == 0 {
return Err(Error::BadRequest(format!("No data was uploaded"))); return Err(Error::BadRequest(format!("No data was uploaded")));
} }
@ -466,9 +465,10 @@ pub async fn handle_complete_multipart_upload(
// Check that the list of parts they gave us corresponds to the parts we have here // Check that the list of parts they gave us corresponds to the parts we have here
// TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere... // TODO: check MD5 sum of all uploaded parts? but that would mean we have to store them somewhere...
let mut parts = version let mut parts = version
.blocks() .blocks
.items()
.iter() .iter()
.map(|x| x.part_number) .map(|x| x.0.part_number)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
parts.dedup(); parts.dedup();
let same_parts = body_list_of_parts let same_parts = body_list_of_parts
@ -485,8 +485,8 @@ pub async fn handle_complete_multipart_upload(
// shouldn't impact compatibility as the S3 docs specify that // shouldn't impact compatibility as the S3 docs specify that
// the ETag is an opaque value in case of a multipart upload. // the ETag is an opaque value in case of a multipart upload.
// See also: https://teppen.io/2018/06/23/aws_s3_etags/ // See also: https://teppen.io/2018/06/23/aws_s3_etags/
let num_parts = version.blocks().last().unwrap().part_number let num_parts = version.blocks.items().last().unwrap().0.part_number
- version.blocks().first().unwrap().part_number - version.blocks.items().first().unwrap().0.part_number
+ 1; + 1;
let etag = format!( let etag = format!(
"{}-{}", "{}-{}",
@ -495,17 +495,18 @@ pub async fn handle_complete_multipart_upload(
); );
let total_size = version let total_size = version
.blocks() .blocks
.items()
.iter() .iter()
.map(|x| x.size) .map(|x| x.1.size)
.fold(0, |x, y| x + y); .fold(0, |x, y| x + y);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta { ObjectVersionMeta {
headers, headers,
size: total_size, size: total_size,
etag: etag, etag,
}, },
version.blocks()[0].hash, version.blocks.items()[0].1.hash,
)); ));
let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]); let final_object = Object::new(bucket.clone(), key.clone(), vec![object_version]);

View file

@ -97,7 +97,7 @@ impl Repair {
pos = item_key.to_vec(); pos = item_key.to_vec();
let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?; let version = rmp_serde::decode::from_read_ref::<_, Version>(item_bytes.as_ref())?;
if version.deleted { if version.deleted.get() {
continue; continue;
} }
let object = self let object = self
@ -127,7 +127,6 @@ impl Repair {
version.bucket, version.bucket,
version.key, version.key,
true, true,
vec![],
)) ))
.await?; .await?;
} }
@ -146,7 +145,7 @@ impl Repair {
pos = item_key.to_vec(); pos = item_key.to_vec();
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(item_bytes.as_ref())?;
if block_ref.deleted { if block_ref.deleted.get() {
continue; continue;
} }
let version = self let version = self
@ -155,7 +154,7 @@ impl Repair {
.get(&block_ref.version, &EmptyKey) .get(&block_ref.version, &EmptyKey)
.await?; .await?;
let ref_exists = match version { let ref_exists = match version {
Some(v) => !v.deleted, Some(v) => !v.deleted.get(),
None => { None => {
warn!( warn!(
"Block ref repair: version for block ref {:?} not found, skipping.", "Block ref repair: version for block ref {:?} not found, skipping.",
@ -174,7 +173,7 @@ impl Repair {
.insert(&BlockRef { .insert(&BlockRef {
block: block_ref.block, block: block_ref.block,
version: block_ref.version, version: block_ref.version,
deleted: true, deleted: true.into(),
}) })
.await?; .await?;
} }

View file

@ -420,7 +420,7 @@ impl BlockManager {
if Some(&block_ref.block) == last_hash.as_ref() { if Some(&block_ref.block) == last_hash.as_ref() {
continue; continue;
} }
if !block_ref.deleted { if !block_ref.deleted.get() {
last_hash = Some(block_ref.block); last_hash = Some(block_ref.block);
self.put_to_resync(&block_ref.block, 0)?; self.put_to_resync(&block_ref.block, 0)?;
} }

View file

@ -1,9 +1,9 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::CRDT;
use garage_table::*; use garage_table::*;
use crate::block::*; use crate::block::*;
@ -17,7 +17,7 @@ pub struct BlockRef {
pub version: UUID, pub version: UUID,
// Keep track of deleted status // Keep track of deleted status
pub deleted: bool, pub deleted: crdt::Bool,
} }
impl Entry<Hash, UUID> for BlockRef { impl Entry<Hash, UUID> for BlockRef {
@ -27,16 +27,15 @@ impl Entry<Hash, UUID> for BlockRef {
fn sort_key(&self) -> &UUID { fn sort_key(&self) -> &UUID {
&self.version &self.version
} }
}
impl CRDT for BlockRef {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
if other.deleted { self.deleted.merge(&other.deleted);
self.deleted = true;
}
} }
} }
pub struct BlockRefTable { pub struct BlockRefTable {
pub background: Arc<BackgroundRunner>,
pub block_manager: Arc<BlockManager>, pub block_manager: Arc<BlockManager>,
} }
@ -48,8 +47,8 @@ impl TableSchema for BlockRefTable {
fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) { fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) {
let block = &old.as_ref().or(new.as_ref()).unwrap().block; let block = &old.as_ref().or(new.as_ref()).unwrap().block;
let was_before = old.as_ref().map(|x| !x.deleted).unwrap_or(false); let was_before = old.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
let is_after = new.as_ref().map(|x| !x.deleted).unwrap_or(false); let is_after = new.as_ref().map(|x| !x.deleted.get()).unwrap_or(false);
if is_after && !was_before { if is_after && !was_before {
if let Err(e) = self.block_manager.block_incref(block) { if let Err(e) = self.block_manager.block_incref(block) {
warn!("block_incref failed for block {:?}: {}", block, e); warn!("block_incref failed for block {:?}: {}", block, e);
@ -63,6 +62,6 @@ impl TableSchema for BlockRefTable {
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted) filter.apply(entry.deleted.get())
} }
} }

View file

@ -89,7 +89,9 @@ impl Entry<EmptyKey, String> for Bucket {
fn sort_key(&self) -> &String { fn sort_key(&self) -> &String {
&self.name &self.name
} }
}
impl CRDT for Bucket {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
self.state.merge(&other.state); self.state.merge(&other.state);
} }

View file

@ -79,7 +79,6 @@ impl Garage {
info!("Initialize block_ref_table..."); info!("Initialize block_ref_table...");
let block_ref_table = Table::new( let block_ref_table = Table::new(
BlockRefTable { BlockRefTable {
background: background.clone(),
block_manager: block_manager.clone(), block_manager: block_manager.clone(),
}, },
data_rep_param.clone(), data_rep_param.clone(),

View file

@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
use model010::key_table as prev; use model010::key_table as prev;
@ -66,6 +66,10 @@ pub struct PermissionSet {
pub allow_write: bool, pub allow_write: bool,
} }
impl AutoCRDT for PermissionSet {
const WARN_IF_DIFFERENT: bool = true;
}
impl Entry<EmptyKey, String> for Key { impl Entry<EmptyKey, String> for Key {
fn partition_key(&self) -> &EmptyKey { fn partition_key(&self) -> &EmptyKey {
&EmptyKey &EmptyKey
@ -73,18 +77,19 @@ impl Entry<EmptyKey, String> for Key {
fn sort_key(&self) -> &String { fn sort_key(&self) -> &String {
&self.key_id &self.key_id
} }
}
impl CRDT for Key {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
self.name.merge(&other.name); self.name.merge(&other.name);
self.deleted.merge(&other.deleted); self.deleted.merge(&other.deleted);
if self.deleted.get() { if self.deleted.get() {
self.authorized_buckets.clear(); self.authorized_buckets.clear();
return; } else {
}
self.authorized_buckets.merge(&other.authorized_buckets); self.authorized_buckets.merge(&other.authorized_buckets);
} }
}
} }
pub struct KeyTable; pub struct KeyTable;

View file

@ -5,6 +5,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::table_sharded::*; use garage_table::table_sharded::*;
use garage_table::*; use garage_table::*;
@ -70,7 +71,7 @@ pub enum ObjectVersionState {
Aborted, Aborted,
} }
impl ObjectVersionState { impl CRDT for ObjectVersionState {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
use ObjectVersionState::*; use ObjectVersionState::*;
match other { match other {
@ -91,37 +92,30 @@ impl ObjectVersionState {
} }
} }
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData { pub enum ObjectVersionData {
DeleteMarker, DeleteMarker,
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>), Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
FirstBlock(ObjectVersionMeta, Hash), FirstBlock(ObjectVersionMeta, Hash),
} }
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] impl AutoCRDT for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta { pub struct ObjectVersionMeta {
pub headers: ObjectVersionHeaders, pub headers: ObjectVersionHeaders,
pub size: u64, pub size: u64,
pub etag: String, pub etag: String,
} }
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders { pub struct ObjectVersionHeaders {
pub content_type: String, pub content_type: String,
pub other: BTreeMap<String, String>, pub other: BTreeMap<String, String>,
} }
impl ObjectVersionData {
fn merge(&mut self, b: &Self) {
if *self != *b {
warn!(
"Inconsistent object version data: {:?} (local) vs {:?} (remote)",
self, b
);
}
}
}
impl ObjectVersion { impl ObjectVersion {
fn cmp_key(&self) -> (u64, UUID) { fn cmp_key(&self) -> (u64, UUID) {
(self.timestamp, self.uuid) (self.timestamp, self.uuid)
@ -154,8 +148,11 @@ impl Entry<String, String> for Object {
fn sort_key(&self) -> &String { fn sort_key(&self) -> &String {
&self.key &self.key
} }
}
impl CRDT for Object {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
// Merge versions from other into here
for other_v in other.versions.iter() { for other_v in other.versions.iter() {
match self match self
.versions .versions
@ -169,6 +166,9 @@ impl Entry<String, String> for Object {
} }
} }
} }
// Remove versions which are obsolete, i.e. those that come
// before the last version which .is_complete().
let last_complete = self let last_complete = self
.versions .versions
.iter() .iter()
@ -212,13 +212,8 @@ impl TableSchema for ObjectTable {
} }
}; };
if newly_deleted { if newly_deleted {
let deleted_version = Version::new( let deleted_version =
v.uuid, Version::new(v.uuid, old_v.bucket.clone(), old_v.key.clone(), true);
old_v.bucket.clone(),
old_v.key.clone(),
true,
vec![],
);
version_table.insert(&deleted_version).await?; version_table.insert(&deleted_version).await?;
} }
} }

View file

@ -4,6 +4,7 @@ use std::sync::Arc;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::table_sharded::*; use garage_table::table_sharded::*;
use garage_table::*; use garage_table::*;
@ -15,8 +16,8 @@ pub struct Version {
pub uuid: UUID, pub uuid: UUID,
// Actual data: the blocks for this version // Actual data: the blocks for this version
pub deleted: bool, pub deleted: crdt::Bool,
blocks: Vec<VersionBlock>, pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
// Back link to bucket+key so that we can figure if // Back link to bucket+key so that we can figure if
// this was deleted later on // this was deleted later on
@ -25,56 +26,45 @@ pub struct Version {
} }
impl Version { impl Version {
pub fn new( pub fn new(uuid: UUID, bucket: String, key: String, deleted: bool) -> Self {
uuid: UUID, Self {
bucket: String,
key: String,
deleted: bool,
blocks: Vec<VersionBlock>,
) -> Self {
let mut ret = Self {
uuid, uuid,
deleted, deleted: deleted.into(),
blocks: vec![], blocks: crdt::Map::new(),
bucket, bucket,
key, key,
};
for b in blocks {
ret.add_block(b)
.expect("Twice the same VersionBlock in Version constructor");
} }
ret
}
/// Adds a block if it wasn't already present
pub fn add_block(&mut self, new: VersionBlock) -> Result<(), ()> {
match self
.blocks
.binary_search_by(|b| b.cmp_key().cmp(&new.cmp_key()))
{
Err(i) => {
self.blocks.insert(i, new);
Ok(())
}
Ok(_) => Err(()),
}
}
pub fn blocks(&self) -> &[VersionBlock] {
&self.blocks[..]
} }
} }
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock { pub struct VersionBlockKey {
pub part_number: u64, pub part_number: u64,
pub offset: u64, pub offset: u64,
}
impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
.cmp(&other.part_number)
.then(self.offset.cmp(&other.offset))
}
}
impl PartialOrd for VersionBlockKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
pub hash: Hash, pub hash: Hash,
pub size: u64, pub size: u64,
} }
impl VersionBlock { impl AutoCRDT for VersionBlock {
fn cmp_key(&self) -> (u64, u64) { const WARN_IF_DIFFERENT: bool = true;
(self.part_number, self.offset)
}
} }
impl Entry<Hash, EmptyKey> for Version { impl Entry<Hash, EmptyKey> for Version {
@ -84,23 +74,16 @@ impl Entry<Hash, EmptyKey> for Version {
fn sort_key(&self) -> &EmptyKey { fn sort_key(&self) -> &EmptyKey {
&EmptyKey &EmptyKey
} }
}
impl CRDT for Version {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
if other.deleted { self.deleted.merge(&other.deleted);
self.deleted = true;
if self.deleted.get() {
self.blocks.clear(); self.blocks.clear();
} else if !self.deleted { } else {
for bi in other.blocks.iter() { self.blocks.merge(&other.blocks);
match self
.blocks
.binary_search_by(|x| x.cmp_key().cmp(&bi.cmp_key()))
{
Ok(_) => (),
Err(pos) => {
self.blocks.insert(pos, bi.clone());
}
}
}
} }
} }
} }
@ -121,14 +104,15 @@ impl TableSchema for VersionTable {
self.background.spawn(async move { self.background.spawn(async move {
if let (Some(old_v), Some(new_v)) = (old, new) { if let (Some(old_v), Some(new_v)) = (old, new) {
// Propagate deletion of version blocks // Propagate deletion of version blocks
if new_v.deleted && !old_v.deleted { if new_v.deleted.get() && !old_v.deleted.get() {
let deleted_block_refs = old_v let deleted_block_refs = old_v
.blocks .blocks
.items()
.iter() .iter()
.map(|vb| BlockRef { .map(|(_k, vb)| BlockRef {
block: vb.hash, block: vb.hash,
version: old_v.uuid, version: old_v.uuid,
deleted: true, deleted: true.into(),
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
block_ref_table.insert_many(&deleted_block_refs[..]).await?; block_ref_table.insert_many(&deleted_block_refs[..]).await?;
@ -139,6 +123,6 @@ impl TableSchema for VersionTable {
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted) filter.apply(entry.deleted.get())
} }
} }

View file

@ -1,327 +0,0 @@
//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
//!
//! CRDTs are a type of data structures that do not require coordination. In other words, we can
//! edit them in parallel, we will always find a way to merge it.
//!
//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
//! it is easy to merge their counters, order does not count: we always get 4.
//!
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
use serde::{Deserialize, Serialize};
use garage_util::data::*;
/// Definition of a CRDT - all CRDT Rust types implement this.
///
/// A CRDT is defined as a merge operator that respects a certain set of axioms.
///
/// In particular, the merge operator must be commutative, associative,
/// idempotent, and monotonic.
/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
/// the following axioms must apply:
///
/// ```text
/// a ⊔ b = b ⊔ a (commutativity)
/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
/// ```
///
/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
/// as this would imply a cycle in the partial order.
pub trait CRDT {
/// Merge the two datastructures according to the CRDT rules.
/// `self` is modified to contain the merged CRDT value. `other` is not modified.
///
/// # Arguments
///
/// * `other` - the other CRDT we wish to merge with
fn merge(&mut self, other: &Self);
}
/// All types that implement `Ord` (a total order) also implement a trivial CRDT
/// defined by the merge rule: `a ⊔ b = max(a, b)`.
impl<T> CRDT for T
where
T: Ord + Clone,
{
fn merge(&mut self, other: &Self) {
if other > self {
*self = other.clone();
}
}
}
// ---- LWW Register ----
/// Last Write Win (LWW)
///
/// An LWW CRDT associates a timestamp with a value, in order to implement a
/// time-based reconciliation rule: the most recent write wins.
/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
/// with the same timestamp but different values.
///
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
/// semantics.)
///
/// As multiple computers clocks are always desynchronized,
/// when operations are close enough, it is equivalent to
/// take one copy and drop the other one.
///
/// Given that clocks are not too desynchronized, this assumption
/// is enough for most cases, as there is few chance that two humans
/// coordonate themself faster than the time difference between two NTP servers.
///
/// As a more concret example, let's suppose you want to upload a file
/// with the same key (path) in the same bucket at the very same time.
/// For each request, the file will be timestamped by the receiving server
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> {
ts: u64,
v: T,
}
impl<T> LWW<T>
where
T: CRDT,
{
/// Creates a new CRDT
///
/// CRDT's internal timestamp is set with current node's clock.
pub fn new(value: T) -> Self {
Self {
ts: now_msec(),
v: value,
}
}
/// Build a new CRDT from a previous non-compatible one
///
/// Compared to new, the CRDT's timestamp is not set to now
/// but must be set to the previous, non-compatible, CRDT's timestamp.
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
Self { ts, v: value }
}
/// Update the LWW CRDT while keeping some causal ordering.
///
/// The timestamp of the LWW CRDT is updated to be the current node's clock
/// at time of update, or the previous timestamp + 1 if that's bigger,
/// so that the new timestamp is always strictly larger than the previous one.
/// This ensures that merging the update with the old value will result in keeping
/// the updated value.
pub fn update(&mut self, new_value: T) {
self.ts = std::cmp::max(self.ts + 1, now_msec());
self.v = new_value;
}
/// Get the CRDT value
pub fn get(&self) -> &T {
&self.v
}
/// Get a mutable reference to the CRDT's value
///
/// This is usefull to mutate the inside value without changing the LWW timestamp.
/// When such mutation is done, the merge between two LWW values is done using the inner
/// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
/// data type, such as a map, and we only want to change a single item in the map.
/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
/// This delta consists in a LWW with the same timestamp, and the map
/// inside only contains the updated value.
/// The advantage of such a delta is that it is much smaller than the whole map.
///
/// Avoid using this if the inner data type is a primitive type such as a number or a string,
/// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
/// of both values.
pub fn get_mut(&mut self) -> &mut T {
&mut self.v
}
}
impl<T> CRDT for LWW<T>
where
T: Clone + CRDT,
{
fn merge(&mut self, other: &Self) {
if other.ts > self.ts {
self.ts = other.ts;
self.v = other.v.clone();
} else if other.ts == self.ts {
self.v.merge(&other.v);
}
}
}
/// Boolean, where `true` is an absorbing state
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct Bool(bool);
impl Bool {
/// Create a new boolean with the specified value
pub fn new(b: bool) -> Self {
Self(b)
}
/// Set the boolean to true
pub fn set(&mut self) {
self.0 = true;
}
/// Get the boolean value
pub fn get(&self) -> bool {
self.0
}
}
impl CRDT for Bool {
fn merge(&mut self, other: &Self) {
self.0 = self.0 || other.0;
}
}
/// Last Write Win Map
///
/// This types defines a CRDT for a map from keys to values.
/// The values have an associated timestamp, such that the last written value
/// takes precedence over previous ones. As for the simpler `LWW` type, the value
/// type `V` is also required to implement the CRDT trait.
/// We do not encourage mutating the values associated with a given key
/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
/// method that would allow that.
///
/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
/// such that two values can be compared for equality based on their hashes). As a consequence,
/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWWMap<K, V> {
vals: Vec<(K, u64, V)>,
}
impl<K, V> LWWMap<K, V>
where
K: Ord,
V: CRDT,
{
/// Create a new empty map CRDT
pub fn new() -> Self {
Self { vals: vec![] }
}
/// Used to migrate from a map defined in an incompatible format. This produces
/// a map that contains a single item with the specified timestamp (copied from
/// the incompatible format). Do this as many times as you have items to migrate,
/// and put them all together using the CRDT merge operator.
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
Self {
vals: vec![(k, ts, v)],
}
}
/// Returns a map that contains a single mapping from the specified key to the specified value.
/// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
/// the previous value will be replaced with the one specified here.
/// The timestamp in the provided mutator is set to the maximum of the current system's clock
/// and 1 + the previous value's timestamp (if there is one), so that the new value will always
/// take precedence (LWW rule).
///
/// Typically, to update the value associated to a key in the map, you would do the following:
///
/// ```ignore
/// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
/// my_crdt.merge(&my_update);
/// ```
///
/// However extracting the mutator on its own and only sending that on the network is very
/// interesting as it is much smaller than the whole map.
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
let (_, old_ts, _) = self.vals[i];
let new_ts = std::cmp::max(old_ts + 1, now_msec());
vec![(k, new_ts, new_v)]
}
Err(_) => vec![(k, now_msec(), new_v)],
};
Self { vals: new_vals }
}
/// Takes all of the values of the map and returns them. The current map is reset to the
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
/// that modifies a certain value:
///
/// ```ignore
/// let mut a = get_my_crdt_value();
/// let old_a = a.take_and_clear();
/// a.merge(&old_a.update_mutator(key_to_modify, new_value));
/// put_my_crdt_value(a);
/// ```
///
/// Of course in this simple example we could have written simply
/// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
/// but in the case where the map is a field in a struct for instance (as is always the case),
/// this becomes very handy:
///
/// ```ignore
/// let mut a = get_my_crdt_value();
/// let old_a_map = a.map_field.take_and_clear();
/// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
/// put_my_crdt_value(a);
/// ```
pub fn take_and_clear(&mut self) -> Self {
let vals = std::mem::replace(&mut self.vals, vec![]);
Self { vals }
}
/// Removes all values from the map
pub fn clear(&mut self) {
self.vals.clear();
}
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => Some(&self.vals[i].2),
Err(_) => None,
}
}
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
/// In most case you will want to ignore the timestamp (second item of the tuple).
pub fn items(&self) -> &[(K, u64, V)] {
&self.vals[..]
}
}
impl<K, V> CRDT for LWWMap<K, V>
where
K: Clone + Ord,
V: Clone + CRDT,
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
let (_, ts1, _v1) = &self.vals[i];
if ts2 > ts1 {
self.vals[i].1 = *ts2;
self.vals[i].2 = v2.clone();
} else if ts1 == ts2 {
self.vals[i].2.merge(&v2);
}
}
Err(i) => {
self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
}
}
}
}
}

34
src/table/crdt/bool.rs Normal file
View file

@ -0,0 +1,34 @@
use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*;
/// Boolean, where `true` is an absorbing state
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
pub struct Bool(bool);
impl Bool {
/// Create a new boolean with the specified value
pub fn new(b: bool) -> Self {
Self(b)
}
/// Set the boolean to true
pub fn set(&mut self) {
self.0 = true;
}
/// Get the boolean value
pub fn get(&self) -> bool {
self.0
}
}
impl From<bool> for Bool {
fn from(b: bool) -> Bool {
Bool::new(b)
}
}
impl CRDT for Bool {
fn merge(&mut self, other: &Self) {
self.0 = self.0 || other.0;
}
}

73
src/table/crdt/crdt.rs Normal file
View file

@ -0,0 +1,73 @@
use garage_util::data::*;
/// Definition of a CRDT - all CRDT Rust types implement this.
///
/// A CRDT is defined as a merge operator that respects a certain set of axioms.
///
/// In particular, the merge operator must be commutative, associative,
/// idempotent, and monotonic.
/// In other words, if `a`, `b` and `c` are CRDTs, and `⊔` denotes the merge operator,
/// the following axioms must apply:
///
/// ```text
/// a ⊔ b = b ⊔ a (commutativity)
/// (a ⊔ b) ⊔ c = a ⊔ (b ⊔ c) (associativity)
/// (a ⊔ b) ⊔ b = a ⊔ b (idempotence)
/// ```
///
/// Moreover, the relationship `≥` defined by `a ≥ b ⇔ ∃c. a = b ⊔ c` must be a partial order.
/// This implies a few properties such as: if `a ⊔ b ≠ a`, then there is no `c` such that `(a ⊔ b) ⊔ c = a`,
/// as this would imply a cycle in the partial order.
pub trait CRDT {
/// Merge the two datastructures according to the CRDT rules.
/// `self` is modified to contain the merged CRDT value. `other` is not modified.
///
/// # Arguments
///
/// * `other` - the other CRDT we wish to merge with
fn merge(&mut self, other: &Self);
}
/// All types that implement `Ord` (a total order) can also implement a trivial CRDT
/// defined by the merge rule: `a ⊔ b = max(a, b)`. Implement this trait for your type
/// to enable this behavior.
pub trait AutoCRDT: Ord + Clone + std::fmt::Debug {
/// WARN_IF_DIFFERENT: emit a warning when values differ. Set this to true if
/// different values in your application should never happen. Set this to false
/// if you are actually relying on the semantics of `a ⊔ b = max(a, b)`.
const WARN_IF_DIFFERENT: bool;
}
impl<T> CRDT for T
where
T: AutoCRDT,
{
fn merge(&mut self, other: &Self) {
if Self::WARN_IF_DIFFERENT && self != other {
warn!(
"Different CRDT values should be the same (logic error!): {:?} vs {:?}",
self, other
);
if other > self {
*self = other.clone();
}
warn!("Making an arbitrary choice: {:?}", self);
} else {
if other > self {
*self = other.clone();
}
}
}
}
impl AutoCRDT for String {
const WARN_IF_DIFFERENT: bool = true;
}
impl AutoCRDT for bool {
const WARN_IF_DIFFERENT: bool = true;
}
impl AutoCRDT for FixedBytes32 {
const WARN_IF_DIFFERENT: bool = true;
}

114
src/table/crdt/lww.rs Normal file
View file

@ -0,0 +1,114 @@
use serde::{Deserialize, Serialize};
use garage_util::data::now_msec;
use crate::crdt::crdt::*;
/// Last Write Win (LWW)
///
/// An LWW CRDT associates a timestamp with a value, in order to implement a
/// time-based reconciliation rule: the most recent write wins.
/// For completeness, the LWW reconciliation rule must also be defined for two LWW CRDTs
/// with the same timestamp but different values.
///
/// In our case, we add the constraint that the value that is wrapped inside the LWW CRDT must
/// itself be a CRDT: in the case when the timestamp does not allow us to decide on which value to
/// keep, the merge rule of the inner CRDT is applied on the wrapped values. (Note that all types
/// that implement the `Ord` trait get a default CRDT implemetnation that keeps the maximum value.
/// This enables us to use LWW directly with primitive data types such as numbers or strings. It is
/// generally desirable in this case to never explicitly produce LWW values with the same timestamp
/// but different inner values, as the rule to keep the maximum value isn't generally the desired
/// semantics.)
///
/// As multiple computers clocks are always desynchronized,
/// when operations are close enough, it is equivalent to
/// take one copy and drop the other one.
///
/// Given that clocks are not too desynchronized, this assumption
/// is enough for most cases, as there is few chance that two humans
/// coordonate themself faster than the time difference between two NTP servers.
///
/// As a more concret example, let's suppose you want to upload a file
/// with the same key (path) in the same bucket at the very same time.
/// For each request, the file will be timestamped by the receiving server
/// and may differ from what you observed with your atomic clock!
///
/// This scheme is used by AWS S3 or Soundcloud and often without knowing
/// in entreprise when reconciliating databases with ad-hoc scripts.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> {
ts: u64,
v: T,
}
impl<T> LWW<T>
where
T: CRDT,
{
/// Creates a new CRDT
///
/// CRDT's internal timestamp is set with current node's clock.
pub fn new(value: T) -> Self {
Self {
ts: now_msec(),
v: value,
}
}
/// Build a new CRDT from a previous non-compatible one
///
/// Compared to new, the CRDT's timestamp is not set to now
/// but must be set to the previous, non-compatible, CRDT's timestamp.
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
Self { ts, v: value }
}
/// Update the LWW CRDT while keeping some causal ordering.
///
/// The timestamp of the LWW CRDT is updated to be the current node's clock
/// at time of update, or the previous timestamp + 1 if that's bigger,
/// so that the new timestamp is always strictly larger than the previous one.
/// This ensures that merging the update with the old value will result in keeping
/// the updated value.
pub fn update(&mut self, new_value: T) {
self.ts = std::cmp::max(self.ts + 1, now_msec());
self.v = new_value;
}
/// Get the CRDT value
pub fn get(&self) -> &T {
&self.v
}
/// Get a mutable reference to the CRDT's value
///
/// This is usefull to mutate the inside value without changing the LWW timestamp.
/// When such mutation is done, the merge between two LWW values is done using the inner
/// CRDT's merge operation. This is usefull in the case where the inner CRDT is a large
/// data type, such as a map, and we only want to change a single item in the map.
/// To do this, we can produce a "CRDT delta", i.e. a LWW that contains only the modification.
/// This delta consists in a LWW with the same timestamp, and the map
/// inside only contains the updated value.
/// The advantage of such a delta is that it is much smaller than the whole map.
///
/// Avoid using this if the inner data type is a primitive type such as a number or a string,
/// as you will then rely on the merge function defined on `Ord` types by keeping the maximum
/// of both values.
pub fn get_mut(&mut self) -> &mut T {
&mut self.v
}
}
impl<T> CRDT for LWW<T>
where
T: Clone + CRDT,
{
fn merge(&mut self, other: &Self) {
if other.ts > self.ts {
self.ts = other.ts;
self.v = other.v.clone();
} else if other.ts == self.ts {
self.v.merge(&other.v);
}
}
}

145
src/table/crdt/lww_map.rs Normal file
View file

@ -0,0 +1,145 @@
use serde::{Deserialize, Serialize};
use garage_util::data::now_msec;
use crate::crdt::crdt::*;
/// Last Write Win Map
///
/// This types defines a CRDT for a map from keys to values.
/// The values have an associated timestamp, such that the last written value
/// takes precedence over previous ones. As for the simpler `LWW` type, the value
/// type `V` is also required to implement the CRDT trait.
/// We do not encourage mutating the values associated with a given key
/// without updating the timestamp, in fact at the moment we do not provide a `.get_mut()`
/// method that would allow that.
///
/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
/// such that two values can be compared for equality based on their hashes). As a consequence,
/// insertions take `O(n)` time. This means that LWWMap should be used for reasonably small maps.
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWWMap<K, V> {
vals: Vec<(K, u64, V)>,
}
impl<K, V> LWWMap<K, V>
where
K: Ord,
V: CRDT,
{
/// Create a new empty map CRDT
pub fn new() -> Self {
Self { vals: vec![] }
}
/// Used to migrate from a map defined in an incompatible format. This produces
/// a map that contains a single item with the specified timestamp (copied from
/// the incompatible format). Do this as many times as you have items to migrate,
/// and put them all together using the CRDT merge operator.
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
Self {
vals: vec![(k, ts, v)],
}
}
/// Returns a map that contains a single mapping from the specified key to the specified value.
/// This map is a mutator, or a delta-CRDT, such that when it is merged with the original map,
/// the previous value will be replaced with the one specified here.
/// The timestamp in the provided mutator is set to the maximum of the current system's clock
/// and 1 + the previous value's timestamp (if there is one), so that the new value will always
/// take precedence (LWW rule).
///
/// Typically, to update the value associated to a key in the map, you would do the following:
///
/// ```ignore
/// let my_update = my_crdt.update_mutator(key_to_modify, new_value);
/// my_crdt.merge(&my_update);
/// ```
///
/// However extracting the mutator on its own and only sending that on the network is very
/// interesting as it is much smaller than the whole map.
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
let (_, old_ts, _) = self.vals[i];
let new_ts = std::cmp::max(old_ts + 1, now_msec());
vec![(k, new_ts, new_v)]
}
Err(_) => vec![(k, now_msec(), new_v)],
};
Self { vals: new_vals }
}
/// Takes all of the values of the map and returns them. The current map is reset to the
/// empty map. This is very usefull to produce in-place a new map that contains only a delta
/// that modifies a certain value:
///
/// ```ignore
/// let mut a = get_my_crdt_value();
/// let old_a = a.take_and_clear();
/// a.merge(&old_a.update_mutator(key_to_modify, new_value));
/// put_my_crdt_value(a);
/// ```
///
/// Of course in this simple example we could have written simply
/// `pyt_my_crdt_value(a.update_mutator(key_to_modify, new_value))`,
/// but in the case where the map is a field in a struct for instance (as is always the case),
/// this becomes very handy:
///
/// ```ignore
/// let mut a = get_my_crdt_value();
/// let old_a_map = a.map_field.take_and_clear();
/// a.map_field.merge(&old_a_map.update_mutator(key_to_modify, new_value));
/// put_my_crdt_value(a);
/// ```
pub fn take_and_clear(&mut self) -> Self {
let vals = std::mem::replace(&mut self.vals, vec![]);
Self { vals }
}
/// Removes all values from the map
pub fn clear(&mut self) {
self.vals.clear();
}
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => Some(&self.vals[i].2),
Err(_) => None,
}
}
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
/// In most case you will want to ignore the timestamp (second item of the tuple).
pub fn items(&self) -> &[(K, u64, V)] {
&self.vals[..]
}
/// Returns the number of items in the map
pub fn len(&self) -> usize {
self.vals.len()
}
}
impl<K, V> CRDT for LWWMap<K, V>
where
K: Clone + Ord,
V: Clone + CRDT,
{
fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() {
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
Ok(i) => {
let (_, ts1, _v1) = &self.vals[i];
if ts2 > ts1 {
self.vals[i].1 = *ts2;
self.vals[i].2 = v2.clone();
} else if ts1 == ts2 {
self.vals[i].2.merge(&v2);
}
}
Err(i) => {
self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
}
}
}
}
}

83
src/table/crdt/map.rs Normal file
View file

@ -0,0 +1,83 @@
use serde::{Deserialize, Serialize};
use crate::crdt::crdt::*;
/// Simple CRDT Map
///
/// This types defines a CRDT for a map from keys to values. Values are CRDT types which
/// can have their own updating logic.
///
/// Internally, the map is stored as a vector of keys and values, sorted by ascending key order.
/// This is why the key type `K` must implement `Ord` (and also to ensure a unique serialization,
/// such that two values can be compared for equality based on their hashes). As a consequence,
/// insertions take `O(n)` time. This means that Map should be used for reasonably small maps.
/// However, note that even if we were using a more efficient data structure such as a `BTreeMap`,
/// the serialization cost `O(n)` would still have to be paid at each modification, so we are
/// actually not losing anything here.
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct Map<K, V> {
vals: Vec<(K, V)>,
}
impl<K, V> Map<K, V>
where
K: Clone + Ord,
V: Clone + CRDT,
{
/// Create a new empty map CRDT
pub fn new() -> Self {
Self { vals: vec![] }
}
/// Returns a map that contains a single mapping from the specified key to the specified value.
/// This can be used to build a delta-mutator:
/// when merged with another map, the value will be added or CRDT-merged if a previous
/// value already exists.
pub fn put_mutator(k: K, v: V) -> Self {
Self { vals: vec![(k, v)] }
}
pub fn put(&mut self, k: K, v: V) {
self.merge(&Self::put_mutator(k, v));
}
/// Removes all values from the map
pub fn clear(&mut self) {
self.vals.clear();
}
/// Get a reference to the value assigned to a key
pub fn get(&self, k: &K) -> Option<&V> {
match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
Ok(i) => Some(&self.vals[i].1),
Err(_) => None,
}
}
/// Gets a reference to all of the items, as a slice. Usefull to iterate on all map values.
pub fn items(&self) -> &[(K, V)] {
&self.vals[..]
}
/// Returns the number of items in the map
pub fn len(&self) -> usize {
self.vals.len()
}
}
impl<K, V> CRDT for Map<K, V>
where
K: Clone + Ord,
V: Clone + CRDT,
{
fn merge(&mut self, other: &Self) {
for (k, v2) in other.vals.iter() {
match self.vals.binary_search_by(|(k2, _)| k2.cmp(&k)) {
Ok(i) => {
self.vals[i].1.merge(&v2);
}
Err(i) => {
self.vals.insert(i, (k.clone(), v2.clone()));
}
}
}
}
}

22
src/table/crdt/mod.rs Normal file
View file

@ -0,0 +1,22 @@
//! This package provides a simple implementation of conflict-free replicated data types (CRDTs)
//!
//! CRDTs are a type of data structures that do not require coordination. In other words, we can
//! edit them in parallel, we will always find a way to merge it.
//!
//! A general example is a counter. Its initial value is 0. Alice and Bob get a copy of the
//! counter. Alice does +1 on her copy, she reads 1. Bob does +3 on his copy, he reads 3. Now,
//! it is easy to merge their counters, order does not count: we always get 4.
//!
//! Learn more about CRDT [on Wikipedia](https://en.wikipedia.org/wiki/Conflict-free_replicated_data_type)
mod bool;
mod crdt;
mod lww;
mod lww_map;
mod map;
pub use self::bool::*;
pub use crdt::*;
pub use lww::*;
pub use lww_map::*;
pub use map::*;

View file

@ -2,6 +2,8 @@ use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
use crate::crdt::CRDT;
pub trait PartitionKey { pub trait PartitionKey {
fn hash(&self) -> Hash; fn hash(&self) -> Hash;
} }
@ -35,12 +37,10 @@ impl SortKey for Hash {
} }
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync CRDT + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{ {
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S; fn sort_key(&self) -> &S;
fn merge(&mut self, other: &Self);
} }
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {

View file

@ -17,6 +17,7 @@ use garage_rpc::ring::Ring;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use crate::crdt::CRDT;
use crate::schema::*; use crate::schema::*;
use crate::table_sync::*; use crate::table_sync::*;