diff --git a/Cargo.lock b/Cargo.lock index f61e2506..d3cc004e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -966,6 +966,8 @@ version = "0.7.0" dependencies = [ "arc-swap", "async-trait", + "base64", + "blake2", "err-derive 0.3.1", "futures", "futures-util", diff --git a/src/api/s3_bucket.rs b/src/api/s3_bucket.rs index bca41569..d27b8379 100644 --- a/src/api/s3_bucket.rs +++ b/src/api/s3_bucket.rs @@ -7,8 +7,8 @@ use garage_model::bucket_alias_table::*; use garage_model::bucket_table::Bucket; use garage_model::garage::Garage; use garage_model::key_table::Key; -use garage_model::s3::object_table::ObjectFilter; use garage_model::permission::BucketKeyPerm; +use garage_model::s3::object_table::ObjectFilter; use garage_table::util::*; use garage_util::crdt::*; use garage_util::data::*; diff --git a/src/api/s3_copy.rs b/src/api/s3_copy.rs index 19ad84cd..a4d55390 100644 --- a/src/api/s3_copy.rs +++ b/src/api/s3_copy.rs @@ -11,9 +11,9 @@ use garage_table::*; use garage_util::data::*; use garage_util::time::*; -use garage_model::s3::block_ref_table::*; use garage_model::garage::Garage; use garage_model::key_table::Key; +use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; diff --git a/src/api/s3_list.rs b/src/api/s3_list.rs index 3002f782..4f011597 100644 --- a/src/api/s3_list.rs +++ b/src/api/s3_list.rs @@ -955,7 +955,7 @@ fn key_after_prefix(pfx: &str) -> Option { #[cfg(test)] mod tests { use super::*; - use garage_model::version_table::*; + use garage_model::s3::version_table::*; use garage_util::*; use std::iter::FromIterator; diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 3b8bfb22..868347fe 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -14,8 +14,8 @@ use garage_util::error::Error as GarageError; use garage_util::time::*; use garage_block::manager::INLINE_THRESHOLD; -use garage_model::s3::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; diff --git a/src/garage/admin.rs b/src/garage/admin.rs index aecc3ac6..de628f1d 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -21,8 +21,8 @@ use garage_model::garage::Garage; use garage_model::helper::error::{Error, OkOrBadRequest}; use garage_model::key_table::*; use garage_model::migrate::Migrate; -use garage_model::s3::object_table::ObjectFilter; use garage_model::permission::*; +use garage_model::s3::object_table::ObjectFilter; use crate::cli::*; use crate::repair::Repair; diff --git a/src/garage/repair.rs b/src/garage/repair.rs index bd7e87d2..830eac71 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -2,8 +2,8 @@ use std::sync::Arc; use tokio::sync::watch; -use garage_model::s3::block_ref_table::*; use garage_model::garage::Garage; +use garage_model::s3::block_ref_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; use garage_table::*; diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 007cec89..a2cedfb0 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -22,8 +22,10 @@ garage_model_050 = { package = "garage_model", version = "0.5.1" } async-trait = "0.1.7" arc-swap = "1.0" +blake2 = "0.9" err-derive = "0.3" hex = "0.4" +base64 = "0.13" tracing = "0.1.30" rand = "0.8" zstd = { version = "0.9", default-features = false } diff --git a/src/model/k2v/causality.rs b/src/model/k2v/causality.rs index 822134d5..848b200e 100644 --- a/src/model/k2v/causality.rs +++ b/src/model/k2v/causality.rs @@ -1,6 +1,8 @@ use std::collections::BTreeMap; +use std::convert::TryInto; use garage_util::data::*; +use garage_util::error::*; /// Node IDs used in K2V are u64 integers that are the abbreviation /// of full Garage node IDs which are 256-bit UUIDs. @@ -12,12 +14,12 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId { u64::from_be_bytes(tmp) } - -pub struct CausalityContext { +#[derive(PartialEq, Debug)] +pub struct CausalContext { pub vector_clock: BTreeMap, } -impl CausalityContext { +impl CausalContext { /// Empty causality context pub fn new_empty() -> Self { Self { @@ -26,10 +28,65 @@ impl CausalityContext { } /// Make binary representation and encode in base64 pub fn serialize(&self) -> String { - unimplemented!(); //TODO + let mut ints = Vec::with_capacity(2 * self.vector_clock.len()); + for (node, time) in self.vector_clock.iter() { + ints.push(*node); + ints.push(*time); + } + let checksum = ints.iter().fold(0, |acc, v| acc ^ *v); + + let mut bytes = u64::to_be_bytes(checksum).to_vec(); + for i in ints { + bytes.extend(u64::to_be_bytes(i)); + } + + base64::encode(bytes) } /// Parse from base64-encoded binary representation - pub fn parse(s: &str) -> Self { - unimplemented!(); //TODO + pub fn parse(s: &str) -> Result { + let bytes = base64::decode(s).ok_or_message("Invalid causality token (bad base64)")?; + if bytes.len() % 16 != 8 || bytes.len() < 8 { + return Err(Error::Message( + "Invalid causality token (bad length)".into(), + )); + } + + let checksum = u64::from_be_bytes(bytes[..8].try_into().unwrap()); + let mut ret = CausalContext { + vector_clock: BTreeMap::new(), + }; + + for i in 0..(bytes.len() / 16) { + let node_id = u64::from_be_bytes(bytes[8 + i * 16..16 + i * 16].try_into().unwrap()); + let time = u64::from_be_bytes(bytes[16 + i * 16..24 + i * 16].try_into().unwrap()); + ret.vector_clock.insert(node_id, time); + } + + let check = ret.vector_clock.iter().fold(0, |acc, (n, t)| acc ^ *n ^ *t); + + if check != checksum { + return Err(Error::Message( + "Invalid causality token (bad checksum)".into(), + )); + } + + Ok(ret) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_causality_token_serialization() { + let ct = CausalContext { + vector_clock: [(4, 42), (1928131023, 76), (0xefc0c1c47f9de433, 2)] + .iter() + .cloned() + .collect(), + }; + + assert_eq!(CausalContext::parse(&ct.serialize()).unwrap(), ct); } } diff --git a/src/model/k2v/item_table.rs b/src/model/k2v/item_table.rs index f40829cb..0fa9e0ac 100644 --- a/src/model/k2v/item_table.rs +++ b/src/model/k2v/item_table.rs @@ -10,13 +10,18 @@ use crate::k2v::causality::*; #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct K2VItem { - pub bucket_id: Uuid, - pub partition_key: String, + pub partition: K2VItemPartition, pub sort_key: String, items: BTreeMap, } +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct K2VItemPartition { + pub bucket_id: Uuid, + pub partition_key: String, +} + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] struct DvvsEntry { t_discard: u64, @@ -31,33 +36,200 @@ pub enum DvvsValue { impl K2VItem { /// Creates a new K2VItem when no previous entry existed in the db - pub fn new(this_node: Uuid, value: DvvsValue) -> Self { - unimplemented!(); // TODO + pub fn new( + bucket_id: Uuid, + partition_key: String, + sort_key: String, + this_node: Uuid, + value: DvvsValue, + ) -> Self { + let mut ret = Self { + partition: K2VItemPartition { + bucket_id, + partition_key, + }, + sort_key, + items: BTreeMap::new(), + }; + let node_id = make_node_id(this_node); + ret.items.insert( + node_id, + DvvsEntry { + t_discard: 0, + values: vec![(1, value)], + }, + ); + ret } /// Updates a K2VItem with a new value or a deletion event - pub fn update(&mut self, this_node: Uuid, context: CausalityContext, new_value: DvvsValue) { - unimplemented!(); // TODO + pub fn update(&mut self, this_node: Uuid, context: CausalContext, new_value: DvvsValue) { + for (node, t_discard) in context.vector_clock.iter() { + if let Some(e) = self.items.get_mut(node) { + e.t_discard = std::cmp::max(e.t_discard, *t_discard); + } else { + self.items.insert( + *node, + DvvsEntry { + t_discard: *t_discard, + values: vec![], + }, + ); + } + } + + self.discard(); + + let node_id = make_node_id(this_node); + let e = self.items.entry(node_id).or_insert(DvvsEntry { + t_discard: 0, + values: vec![], + }); + let t_prev = e.max_time(); + e.values.push((t_prev + 1, new_value)); } /// Extract the causality context of a K2V Item - pub fn causality_context(&self) -> CausalityContext { - unimplemented!(); // TODO + pub fn causality_context(&self) -> CausalContext { + let mut cc = CausalContext::new_empty(); + for (node, ent) in self.items.iter() { + cc.vector_clock.insert(*node, ent.max_time()); + } + cc } /// Extract the list of values pub fn values(&'_ self) -> Vec<&'_ DvvsValue> { - unimplemented!(); // TODO + let mut ret = vec![]; + for (_, ent) in self.items.iter() { + for (_, v) in ent.values.iter() { + ret.push(v); + } + } + ret + } + + fn discard(&mut self) { + for (_, ent) in self.items.iter_mut() { + ent.discard(); + } + } +} + +impl DvvsEntry { + fn max_time(&self) -> u64 { + self.values + .iter() + .fold(self.t_discard, |acc, (vts, _)| std::cmp::max(acc, *vts)) + } + + fn discard(&mut self) { + self.values = std::mem::take(&mut self.values) + .into_iter() + .filter(|(t, _)| *t > self.t_discard) + .collect::>(); } } impl Crdt for K2VItem { fn merge(&mut self, other: &Self) { - unimplemented!(); // TODO + for (node, e2) in other.items.iter() { + if let Some(e) = self.items.get_mut(node) { + e.merge(e2); + } else { + self.items.insert(*node, e2.clone()); + } + } } } impl Crdt for DvvsEntry { fn merge(&mut self, other: &Self) { - unimplemented!(); // TODO + self.t_discard = std::cmp::max(self.t_discard, other.t_discard); + self.discard(); + + let t_max = self.max_time(); + for (vt, vv) in other.values.iter() { + if *vt > t_max { + self.values.push((*vt, vv.clone())); + } + } + } +} + +impl PartitionKey for K2VItemPartition { + fn hash(&self) -> Hash { + use blake2::{Blake2b, Digest}; + + let mut hasher = Blake2b::new(); + hasher.update(self.bucket_id.as_slice()); + hasher.update(self.partition_key.as_bytes()); + let mut hash = [0u8; 32]; + hash.copy_from_slice(&hasher.finalize()[..32]); + hash.into() + } +} + +impl Entry for K2VItem { + fn partition_key(&self) -> &K2VItemPartition { + &self.partition + } + fn sort_key(&self) -> &String { + &self.sort_key + } + fn is_tombstone(&self) -> bool { + self.values() + .iter() + .all(|v| matches!(v, DvvsValue::Deleted)) + } +} + +pub struct K2VItemTable {} + +#[derive(Clone, Copy, Debug, Serialize, Deserialize)] +pub struct ItemFilter { + pub exclude_only_tombstones: bool, + pub conflicts_only: bool, +} + +impl TableSchema for K2VItemTable { + const TABLE_NAME: &'static str = "k2v_item"; + + type P = K2VItemPartition; + type S = String; + type E = K2VItem; + type Filter = ItemFilter; + + fn updated(&self, _old: Option, _new: Option) { + // nothing for now + } + + fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { + let v = entry.values(); + !(filter.conflicts_only && v.len() < 2) + && !(filter.exclude_only_tombstones && entry.is_tombstone()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_dvvsentry_merge_simple() { + let e1 = DvvsEntry { + t_discard: 4, + values: vec![ + (5, DvvsValue::Value(vec![15])), + (6, DvvsValue::Value(vec![16])), + ], + }; + let e2 = DvvsEntry { + t_discard: 5, + values: vec![(6, DvvsValue::Value(vec![16])), (7, DvvsValue::Deleted)], + }; + + let mut e3 = e1.clone(); + e3.merge(&e2); + assert_eq!(e2, e3); } } diff --git a/src/model/lib.rs b/src/model/lib.rs index 6c69c8e2..0abf8c85 100644 --- a/src/model/lib.rs +++ b/src/model/lib.rs @@ -7,8 +7,8 @@ pub mod bucket_alias_table; pub mod bucket_table; pub mod key_table; -pub mod s3; pub mod k2v; +pub mod s3; pub mod garage; pub mod helper;