diff --git a/Cargo.lock b/Cargo.lock index 6382d0355..330b50fc6 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15,6 +15,16 @@ version = "0.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034" +[[package]] +name = "arrayvec" +version = "0.3.25" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06f59fe10306bb78facd90d28c2038ad23ffaaefa85bac43c8a434cde383334f" +dependencies = [ + "nodrop", + "odds", +] + [[package]] name = "async-trait" version = "0.1.36" @@ -588,6 +598,7 @@ dependencies = [ "garage_rpc 0.1.0", "garage_util 0.1.0", "hex", + "hexdump", "log", "rand", "rmp-serde", @@ -743,6 +754,16 @@ version = "0.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77" +[[package]] +name = "hexdump" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "850f3f2c33d20c0f96c4485e087dd580ff041d720988ebf4c84a42acf739262b" +dependencies = [ + "arrayvec", + "itertools", +] + [[package]] name = "hmac" version = "0.7.1" @@ -870,6 +891,12 @@ dependencies = [ "libc", ] +[[package]] +name = "itertools" +version = "0.4.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f" + [[package]] name = "itoa" version = "0.4.6" @@ -1027,6 +1054,12 @@ dependencies = [ "winapi 0.3.9", ] +[[package]] +name = "nodrop" +version = "0.1.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" + [[package]] name = "num-integer" version = "0.1.43" @@ -1056,6 +1089,12 @@ dependencies = [ "libc", ] +[[package]] +name = "odds" +version = "0.2.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22" + [[package]] name = "once_cell" version = "1.4.0" diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 72613323d..a1681d77a 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -322,7 +322,7 @@ pub async fn handle_put_part( let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; + let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; if !object diff --git a/src/api/signature.rs b/src/api/signature.rs index 402b1881a..0ee47961a 100644 --- a/src/api/signature.rs +++ b/src/api/signature.rs @@ -68,7 +68,7 @@ pub async fn check_signature( .key_table .get(&EmptyKey, &authorization.key_id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .ok_or(Error::Forbidden(format!( "No such key: {}", authorization.key_id diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index 778e4a1d4..a23d3e952 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -2,9 +2,9 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use garage_util::data::*; use garage_util::error::Error; +use garage_table::crdt::CRDT; use garage_table::*; use garage_rpc::rpc_client::*; @@ -79,25 +79,26 @@ impl AdminRpcHandler { Ok(AdminRPC::BucketInfo(bucket)) } BucketOperation::Create(query) => { - let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; - if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRPC(format!( - "Bucket {} already exists", - query.name - ))); - } - let new_time = match bucket { - Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), - None => now_msec(), + let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { + Some(mut bucket) => { + if !bucket.is_deleted() { + return Err(Error::BadRPC(format!( + "Bucket {} already exists", + query.name + ))); + } + bucket + .state + .update(BucketState::Present(crdt::LWWMap::new())); + bucket + } + None => Bucket::new(query.name.clone()), }; - self.garage - .bucket_table - .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) - .await?; + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; + let mut bucket = self.get_existing_bucket(&query.name).await?; let objects = self .garage .object_table @@ -112,25 +113,18 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ak in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { - if !key.deleted { + for (key_id, _, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { + if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } } else { - return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + return Err(Error::Message(format!("Key not found: {}", key_id))); } } - self.garage - .bucket_table - .insert(&Bucket::new( - query.name.clone(), - std::cmp::max(bucket.timestamp + 1, now_msec()), - true, - vec![], - )) - .await?; + bucket.state.update(BucketState::Deleted); + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } BucketOperation::Allow(query) => { @@ -173,7 +167,7 @@ impl AdminRpcHandler { .get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000) .await? .iter() - .map(|k| (k.key_id.to_string(), k.name.to_string())) + .map(|k| (k.key_id.to_string(), k.name.get().clone())) .collect::>(); Ok(AdminRPC::KeyList(key_ids)) } @@ -182,14 +176,13 @@ impl AdminRpcHandler { Ok(AdminRPC::KeyInfo(key)) } KeyOperation::New(query) => { - let key = Key::new(query.name, vec![]); + let key = Key::new(query.name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } KeyOperation::Rename(query) => { let mut key = self.get_existing_key(&query.key_id).await?; - key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec()); - key.name = query.new_name; + key.name.update(query.new_name); self.garage.key_table.insert(&key).await?; Ok(AdminRPC::KeyInfo(key)) } @@ -201,16 +194,14 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ab in key.authorized_buckets().iter() { - if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await? - { - if !bucket.deleted { + for (ab_name, _, _) in key.authorized_buckets.items().iter() { + if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { + if !bucket.is_deleted() { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } } else { - return Err(Error::Message(format!("Bucket not found: {}", ab.bucket))); + return Err(Error::Message(format!("Bucket not found: {}", ab_name))); } } let del_key = Key::delete(key.key_id); @@ -228,7 +219,7 @@ impl AdminRpcHandler { .bucket_table .get(&EmptyKey, bucket) .await? - .filter(|b| !b.deleted) + .filter(|b| !b.is_deleted()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!( "Bucket {} does not exist", @@ -241,7 +232,7 @@ impl AdminRpcHandler { .key_table .get(&EmptyKey, id) .await? - .filter(|k| !k.deleted) + .filter(|k| !k.deleted.get()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id)))) } @@ -253,23 +244,20 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match bucket - .authorized_keys() - .iter() - .find(|x| x.key_id == *key_id) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - bucket.clear_keys(); - bucket - .add_key(AllowedKey { - key_id: key_id.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); + if let BucketState::Present(ak) = bucket.state.get_mut() { + let old_ak = ak.take_and_clear(); + ak.merge(&old_ak.update_mutator( + key_id.to_string(), + PermissionSet { + allow_read, + allow_write, + }, + )); + } else { + return Err(Error::Message(format!( + "Bucket is deleted in update_bucket_key" + ))); + } self.garage.bucket_table.insert(&bucket).await?; Ok(()) } @@ -281,22 +269,14 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match key - .authorized_buckets() - .iter() - .find(|x| x.bucket == *bucket) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - key.clear_buckets(); - key.add_bucket(AllowedBucket { - bucket: bucket.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); + let old_map = key.authorized_buckets.take_and_clear(); + key.authorized_buckets.merge(&old_map.update_mutator( + bucket.clone(), + PermissionSet { + allow_read, + allow_write, + }, + )); self.garage.key_table.insert(&key).await?; Ok(()) } diff --git a/src/model/block.rs b/src/model/block.rs index 6a5d9c5b8..8a513a3c8 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use garage_table::table_sharded::TableShardedReplication; -use garage_table::{TableReplication, DeletedFilter}; +use garage_table::{DeletedFilter, TableReplication}; use crate::block_ref_table::*; diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 35c0cc277..b7f24d714 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,69 +1,58 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; + use garage_util::error::Error; +use crate::key_table::PermissionSet; + +use model010::bucket_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { // Primary key pub name: String, - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, - - // Authorized keys - authorized_keys: Vec, -} - -impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec, - ) -> Self { - let mut ret = Bucket { - name, - timestamp, - deleted, - authorized_keys: vec![], - }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); - } - ret - } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] - } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); - } + pub state: crdt::LWW, } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, +pub enum BucketState { + Deleted, + Present(crdt::LWWMap), +} + +impl CRDT for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_ak) => { + if let BucketState::Present(ak) = self { + ak.merge(other_ak); + } + } + } + } +} + +impl Bucket { + pub fn new(name: String) -> Self { + Bucket { + name, + state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), + } + } + pub fn is_deleted(&self) -> bool { + *self.state.get() == BucketState::Deleted + } + pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { + match self.state.get() { + BucketState::Deleted => &[], + BucketState::Present(ak) => ak.items(), + } + } } impl Entry for Bucket { @@ -75,36 +64,12 @@ impl Entry for Bucket { } fn merge(&mut self, other: &Self) { - if other.timestamp > self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } + self.state.merge(&other.state); } } pub struct BucketTable; - #[async_trait] impl TableSchema for BucketTable { type P = EmptyKey; @@ -117,6 +82,35 @@ impl TableSchema for BucketTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.is_deleted()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + if old.deleted { + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), + }) + } else { + let mut keys = crdt::LWWMap::new(); + for ak in old.authorized_keys() { + keys.merge(&crdt::LWWMap::migrate_from_raw_item( + ak.key_id.clone(), + ak.timestamp, + PermissionSet { + allow_read: ak.allow_read, + allow_write: ak.allow_write, + }, + )); + } + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)), + }) + } } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 05b938cec..20da3cc69 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,10 +1,13 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; -use garage_util::data::*; + use garage_util::error::Error; +use model010::key_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Key { // Primary key @@ -14,83 +17,54 @@ pub struct Key { pub secret_key: String, // Name - pub name: String, - pub name_timestamp: u64, + pub name: crdt::LWW, // Deletion - pub deleted: bool, + pub deleted: crdt::Bool, // Authorized keys - authorized_buckets: Vec, + pub authorized_buckets: crdt::LWWMap, + // CRDT interaction: deleted implies authorized_buckets is empty } impl Key { - pub fn new(name: String, buckets: Vec) -> Self { + pub fn new(name: String) -> Self { let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..])); let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]); - let mut ret = Self { + Self { key_id, secret_key, - name, - name_timestamp: now_msec(), - deleted: false, - authorized_buckets: vec![], - }; - for b in buckets { - ret.add_bucket(b) - .expect("Duplicate AllowedBucket in Key constructor"); + name: crdt::LWW::new(name), + deleted: crdt::Bool::new(false), + authorized_buckets: crdt::LWWMap::new(), } - ret } pub fn delete(key_id: String) -> Self { Self { key_id, secret_key: "".into(), - name: "".into(), - name_timestamp: now_msec(), - deleted: true, - authorized_buckets: vec![], + name: crdt::LWW::new("".to_string()), + deleted: crdt::Bool::new(true), + authorized_buckets: crdt::LWWMap::new(), } } /// Add an authorized bucket, only if it wasn't there before - pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> { - match self - .authorized_buckets - .binary_search_by(|b| b.bucket.cmp(&new.bucket)) - { - Err(i) => { - self.authorized_buckets.insert(i, new); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_buckets(&self) -> &[AllowedBucket] { - &self.authorized_buckets[..] - } - pub fn clear_buckets(&mut self) { - self.authorized_buckets.clear(); - } pub fn allow_read(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_read) .unwrap_or(false) } pub fn allow_write(&self, bucket: &str) -> bool { self.authorized_buckets - .iter() - .find(|x| x.bucket.as_str() == bucket) + .get(&bucket.to_string()) .map(|x| x.allow_write) .unwrap_or(false) } } -#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedBucket { - pub bucket: String, - pub timestamp: u64, +#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] +pub struct PermissionSet { pub allow_read: bool, pub allow_write: bool, } @@ -104,35 +78,15 @@ impl Entry for Key { } fn merge(&mut self, other: &Self) { - if other.name_timestamp > self.name_timestamp { - self.name_timestamp = other.name_timestamp; - self.name = other.name.clone(); - } + self.name.merge(&other.name); + self.deleted.merge(&other.deleted); - if other.deleted { - self.deleted = true; - } - if self.deleted { + if self.deleted.get() { self.authorized_buckets.clear(); return; } - for ab in other.authorized_buckets.iter() { - match self - .authorized_buckets - .binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket)) - { - Ok(i) => { - let our_ab = &mut self.authorized_buckets[i]; - if ab.timestamp > our_ab.timestamp { - *our_ab = ab.clone(); - } - } - Err(i) => { - self.authorized_buckets.insert(i, ab.clone()); - } - } - } + self.authorized_buckets.merge(&other.authorized_buckets); } } @@ -150,6 +104,32 @@ impl TableSchema for KeyTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.deleted.get()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let mut new = Self::E { + key_id: old.key_id.clone(), + secret_key: old.secret_key.clone(), + name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()), + deleted: crdt::Bool::new(old.deleted), + authorized_buckets: crdt::LWWMap::new(), + }; + for ab in old.authorized_buckets() { + let it = crdt::LWWMap::migrate_from_raw_item( + ab.bucket.clone(), + ab.timestamp, + PermissionSet { + allow_read: ab.allow_read, + allow_write: ab.allow_write, + }, + ); + new.authorized_buckets.merge(&it); + } + Some(new) } } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 1963f3da4..945763fab 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -21,6 +21,7 @@ rand = "0.7" hex = "0.3" arc-swap = "0.4" log = "0.4" +hexdump = "0.1" sled = "0.31" diff --git a/src/table/crdt.rs b/src/table/crdt.rs new file mode 100644 index 000000000..2b903cf0c --- /dev/null +++ b/src/table/crdt.rs @@ -0,0 +1,162 @@ +use serde::{Deserialize, Serialize}; + +use garage_util::data::*; + +pub trait CRDT { + fn merge(&mut self, other: &Self); +} + +impl CRDT for T +where + T: Ord + Clone, +{ + fn merge(&mut self, other: &Self) { + if other > self { + *self = other.clone(); + } + } +} + +// ---- LWW Register ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWW { + ts: u64, + v: T, +} + +impl LWW +where + T: CRDT, +{ + pub fn new(value: T) -> Self { + Self { + ts: now_msec(), + v: value, + } + } + pub fn migrate_from_raw(ts: u64, value: T) -> Self { + Self { ts, v: value } + } + pub fn update(&mut self, new_value: T) { + self.ts = std::cmp::max(self.ts + 1, now_msec()); + self.v = new_value; + } + pub fn get(&self) -> &T { + &self.v + } + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } +} + +impl CRDT for LWW +where + T: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + if other.ts > self.ts { + self.ts = other.ts; + self.v = other.v.clone(); + } else if other.ts == self.ts { + self.v.merge(&other.v); + } + } +} + +// ---- Boolean (true as absorbing state) ---- + +#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] +pub struct Bool(bool); + +impl Bool { + pub fn new(b: bool) -> Self { + Self(b) + } + pub fn set(&mut self) { + self.0 = true; + } + pub fn get(&self) -> bool { + self.0 + } +} + +impl CRDT for Bool { + fn merge(&mut self, other: &Self) { + self.0 = self.0 || other.0; + } +} + +// ---- LWW Map ---- + +#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] +pub struct LWWMap { + vals: Vec<(K, u64, V)>, +} + +impl LWWMap +where + K: Ord, + V: CRDT, +{ + pub fn new() -> Self { + Self { vals: vec![] } + } + pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { + Self { + vals: vec![(k, ts, v)], + } + } + pub fn take_and_clear(&mut self) -> Self { + let vals = std::mem::replace(&mut self.vals, vec![]); + Self { vals } + } + pub fn clear(&mut self) { + self.vals.clear(); + } + pub fn update_mutator(&self, k: K, new_v: V) -> Self { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, old_ts, _) = self.vals[i]; + let new_ts = std::cmp::max(old_ts + 1, now_msec()); + vec![(k, new_ts, new_v)] + } + Err(_) => vec![(k, now_msec(), new_v)], + }; + Self { vals: new_vals } + } + pub fn get(&self, k: &K) -> Option<&V> { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => Some(&self.vals[i].2), + Err(_) => None, + } + } + pub fn items(&self) -> &[(K, u64, V)] { + &self.vals[..] + } +} + +impl CRDT for LWWMap +where + K: Clone + Ord, + V: Clone + CRDT, +{ + fn merge(&mut self, other: &Self) { + for (k, ts2, v2) in other.vals.iter() { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { + Ok(i) => { + let (_, ts1, _v1) = &self.vals[i]; + if ts2 > ts1 { + self.vals[i].1 = *ts2; + self.vals[i].2 = v2.clone(); + } else if ts1 == ts2 { + self.vals[i].2.merge(&v2); + } + } + Err(i) => { + self.vals.insert(i, (k.clone(), *ts2, v2.clone())); + } + } + } + } +} diff --git a/src/table/lib.rs b/src/table/lib.rs index 7684fe9da..704f8f1ef 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate log; +pub mod crdt; pub mod schema; pub mod util; @@ -12,5 +13,5 @@ pub mod table_sharded; pub mod table_sync; pub use schema::*; -pub use util::*; pub use table::*; +pub use util::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index 49cede0ad..d2ec94504 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -20,7 +20,6 @@ impl PartitionKey for Hash { } } - pub trait SortKey { fn sort_key(&self) -> &[u8]; } @@ -37,7 +36,6 @@ impl SortKey for Hash { } } - pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { @@ -47,7 +45,6 @@ pub trait Entry: fn merge(&mut self, other: &Self); } - #[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync { true } } - diff --git a/src/table/table.rs b/src/table/table.rs index 2beac3f4e..5dfee3c8a 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap}; use std::sync::Arc; use std::time::Duration; +use log::warn; + use arc_swap::ArcSwapOption; use futures::stream::*; use serde::{Deserialize, Serialize}; @@ -185,7 +187,7 @@ where for resp in resps { if let TableRPC::ReadEntryResponse(value) = resp { if let Some(v_bytes) = value { - let v = Self::decode_entry(v_bytes.as_slice())?; + let v = self.decode_entry(v_bytes.as_slice())?; ret = match ret { None => Some(v), Some(mut x) => { @@ -241,7 +243,7 @@ where for resp in resps { if let TableRPC::Update(entries) = resp { for entry_bytes in entries.iter() { - let entry = Self::decode_entry(entry_bytes.as_slice())?; + let entry = self.decode_entry(entry_bytes.as_slice())?; let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); match ret.remove(&entry_key) { None => { @@ -363,7 +365,7 @@ where let keep = match filter { None => true, Some(f) => { - let entry = Self::decode_entry(value.as_ref())?; + let entry = self.decode_entry(value.as_ref())?; F::matches_filter(&entry, f) } }; @@ -382,14 +384,14 @@ where let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { - let update = Self::decode_entry(update_bytes.as_slice())?; + let update = self.decode_entry(update_bytes.as_slice())?; let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = match db.get(&tree_key)? { Some(prev_bytes) => { - let old_entry = Self::decode_entry(&prev_bytes) + let old_entry = self.decode_entry(&prev_bytes) .map_err(sled::ConflictableTransactionError::Abort)?; let mut new_entry = old_entry.clone(); new_entry.merge(&update); @@ -437,7 +439,7 @@ where break; } if let Some(old_val) = self.store.remove(&key)? { - let old_entry = Self::decode_entry(&old_val)?; + let old_entry = self.decode_entry(&old_val)?; self.instance.updated(Some(old_entry), None).await?; self.system .background @@ -455,12 +457,18 @@ where ret } - fn decode_entry(bytes: &[u8]) -> Result { + fn decode_entry(&self, bytes: &[u8]) -> Result { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { Ok(x) => Ok(x), Err(e) => match F::try_migrate(bytes) { Some(x) => Ok(x), - None => Err(e.into()), + None => { + warn!("Unable to decode entry of {}: {}", self.name, e); + for line in hexdump::hexdump_iter(bytes) { + debug!("{}", line); + } + Err(e.into()) + } }, } }