diff --git a/src/api/s3_put.rs b/src/api/s3_put.rs index 7261332..a1681d7 100644 --- a/src/api/s3_put.rs +++ b/src/api/s3_put.rs @@ -322,7 +322,7 @@ pub async fn handle_put_part( let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?; // Check object is valid and multipart block can be accepted - let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; + let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?; let object = object.ok_or(Error::BadRequest(format!("Object not found")))?; if !object diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index bd9fca4..c2b2f22 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -2,11 +2,10 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; -use garage_util::data::*; use garage_util::error::Error; -use garage_table::*; use garage_table::crdt::CRDT; +use garage_table::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -80,25 +79,26 @@ impl AdminRpcHandler { Ok(AdminRPC::BucketInfo(bucket)) } BucketOperation::Create(query) => { - let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; - if bucket.as_ref().filter(|b| !b.deleted).is_some() { - return Err(Error::BadRPC(format!( - "Bucket {} already exists", - query.name - ))); - } - let new_time = match bucket { - Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), - None => now_msec(), + let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? { + Some(mut bucket) => { + if !bucket.is_deleted() { + return Err(Error::BadRPC(format!( + "Bucket {} already exists", + query.name + ))); + } + bucket + .state + .update(BucketState::Present(crdt::LWWMap::new())); + bucket + } + None => Bucket::new(query.name.clone()), }; - self.garage - .bucket_table - .insert(&Bucket::new(query.name.clone(), new_time, false, vec![])) - .await?; + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) } BucketOperation::Delete(query) => { - let bucket = self.get_existing_bucket(&query.name).await?; + let mut bucket = self.get_existing_bucket(&query.name).await?; let objects = self .garage .object_table @@ -113,25 +113,18 @@ impl AdminRpcHandler { ))); } // --- done checking, now commit --- - for ak in bucket.authorized_keys() { - if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { + for (key_id, _, _) in bucket.authorized_keys() { + if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? { if !key.deleted.get() { self.update_key_bucket(key, &bucket.name, false, false) .await?; } } else { - return Err(Error::Message(format!("Key not found: {}", ak.key_id))); + return Err(Error::Message(format!("Key not found: {}", key_id))); } } - self.garage - .bucket_table - .insert(&Bucket::new( - query.name.clone(), - std::cmp::max(bucket.timestamp + 1, now_msec()), - true, - vec![], - )) - .await?; + bucket.state.update(BucketState::Deleted); + self.garage.bucket_table.insert(&bucket).await?; Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) } BucketOperation::Allow(query) => { @@ -202,10 +195,8 @@ impl AdminRpcHandler { } // --- done checking, now commit --- for (ab_name, _, _) in key.authorized_buckets.items().iter() { - if let Some(bucket) = - self.garage.bucket_table.get(&EmptyKey, ab_name).await? - { - if !bucket.deleted { + if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? { + if !bucket.is_deleted() { self.update_bucket_key(bucket, &key.key_id, false, false) .await?; } @@ -228,7 +219,7 @@ impl AdminRpcHandler { .bucket_table .get(&EmptyKey, bucket) .await? - .filter(|b| !b.deleted) + .filter(|b| !b.is_deleted()) .map(Ok) .unwrap_or(Err(Error::BadRPC(format!( "Bucket {} does not exist", @@ -253,24 +244,20 @@ impl AdminRpcHandler { allow_read: bool, allow_write: bool, ) -> Result<(), Error> { - let timestamp = match bucket - .authorized_keys() - .iter() - .find(|x| x.key_id == *key_id) - { - None => now_msec(), - Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()), - }; - bucket.clear_keys(); - bucket - .add_key(AllowedKey { - key_id: key_id.clone(), - timestamp, - allow_read, - allow_write, - }) - .unwrap(); - self.garage.bucket_table.insert(&bucket).await?; + if let BucketState::Present(ak) = bucket.state.get_mut() { + let old_ak = ak.take_and_clear(); + ak.merge(&old_ak.update_mutator( + key_id.to_string(), + PermissionSet { + allow_read, + allow_write, + }, + )); + } else { + return Err(Error::Message(format!( + "Bucket is deleted in update_bucket_key" + ))); + } Ok(()) } @@ -282,13 +269,13 @@ impl AdminRpcHandler { allow_write: bool, ) -> Result<(), Error> { let old_map = key.authorized_buckets.take_and_clear(); - key.authorized_buckets.merge( - &old_map.update_mutator( - bucket.clone(), - PermissionSet{ - allow_read, allow_write - } - )); + key.authorized_buckets.merge(&old_map.update_mutator( + bucket.clone(), + PermissionSet { + allow_read, + allow_write, + }, + )); self.garage.key_table.insert(&key).await?; Ok(()) } diff --git a/src/model/block.rs b/src/model/block.rs index 6a5d9c5..8a513a3 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; use garage_table::table_sharded::TableShardedReplication; -use garage_table::{TableReplication, DeletedFilter}; +use garage_table::{DeletedFilter, TableReplication}; use crate::block_ref_table::*; diff --git a/src/model/bucket_table.rs b/src/model/bucket_table.rs index 35c0cc2..93421ac 100644 --- a/src/model/bucket_table.rs +++ b/src/model/bucket_table.rs @@ -1,69 +1,59 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use garage_table::crdt::CRDT; use garage_table::*; + use garage_util::error::Error; +use crate::key_table::PermissionSet; + +use model010::bucket_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Bucket { // Primary key pub name: String, - // Timestamp and deletion - // Upon version increment, all info is replaced - pub timestamp: u64, - pub deleted: bool, - - // Authorized keys - authorized_keys: Vec, -} - -impl Bucket { - pub fn new( - name: String, - timestamp: u64, - deleted: bool, - authorized_keys: Vec, - ) -> Self { - let mut ret = Bucket { - name, - timestamp, - deleted, - authorized_keys: vec![], - }; - for key in authorized_keys { - ret.add_key(key) - .expect("Duplicate AllowedKey in Bucket constructor"); - } - ret - } - /// Add a key only if it is not already present - pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> { - match self - .authorized_keys - .binary_search_by(|k| k.key_id.cmp(&key.key_id)) - { - Err(i) => { - self.authorized_keys.insert(i, key); - Ok(()) - } - Ok(_) => Err(()), - } - } - pub fn authorized_keys(&self) -> &[AllowedKey] { - &self.authorized_keys[..] - } - pub fn clear_keys(&mut self) { - self.authorized_keys.clear(); - } + pub state: crdt::LWW, } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] -pub struct AllowedKey { - pub key_id: String, - pub timestamp: u64, - pub allow_read: bool, - pub allow_write: bool, +pub enum BucketState { + Deleted, + Present(crdt::LWWMap), +} + +impl CRDT for BucketState { + fn merge(&mut self, o: &Self) { + match o { + BucketState::Deleted => *self = BucketState::Deleted, + BucketState::Present(other_ak) => { + if let BucketState::Present(ak) = self { + ak.merge(other_ak); + } + } + } + } +} + +impl Bucket { + pub fn new(name: String) -> Self { + let ret = Bucket { + name, + state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())), + }; + ret + } + pub fn is_deleted(&self) -> bool { + *self.state.get() == BucketState::Deleted + } + pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] { + match self.state.get() { + BucketState::Deleted => &[], + BucketState::Present(ak) => ak.items(), + } + } } impl Entry for Bucket { @@ -75,36 +65,12 @@ impl Entry for Bucket { } fn merge(&mut self, other: &Self) { - if other.timestamp > self.timestamp { - *self = other.clone(); - return; - } - if self.timestamp > other.timestamp || self.deleted { - return; - } - - for ak in other.authorized_keys.iter() { - match self - .authorized_keys - .binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id)) - { - Ok(i) => { - let our_ak = &mut self.authorized_keys[i]; - if ak.timestamp > our_ak.timestamp { - *our_ak = ak.clone(); - } - } - Err(i) => { - self.authorized_keys.insert(i, ak.clone()); - } - } - } + self.state.merge(&other.state); } } pub struct BucketTable; - #[async_trait] impl TableSchema for BucketTable { type P = EmptyKey; @@ -117,6 +83,35 @@ impl TableSchema for BucketTable { } fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { - filter.apply(entry.deleted) + filter.apply(entry.is_deleted()) + } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + if old.deleted { + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted), + }) + } else { + let mut keys = crdt::LWWMap::new(); + for ak in old.authorized_keys() { + keys.merge(&crdt::LWWMap::migrate_from_raw_item( + ak.key_id.clone(), + ak.timestamp, + PermissionSet { + allow_read: ak.allow_read, + allow_write: ak.allow_write, + }, + )); + } + Some(Bucket { + name: old.name, + state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)), + }) + } } } diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 2b825aa..ff9d7b7 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -1,8 +1,8 @@ use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use garage_table::*; use garage_table::crdt::CRDT; +use garage_table::*; use garage_util::error::Error; @@ -24,7 +24,6 @@ pub struct Key { // Authorized keys pub authorized_buckets: crdt::LWWMap, - // CRDT interaction: deleted implies authorized_buckets is empty } @@ -125,10 +124,11 @@ impl TableSchema for KeyTable { let it = crdt::LWWMap::migrate_from_raw_item( ab.bucket.clone(), ab.timestamp, - PermissionSet{ + PermissionSet { allow_read: ab.allow_read, allow_write: ab.allow_write, - }); + }, + ); new.authorized_buckets.merge(&it); } Some(new) diff --git a/src/table/crdt.rs b/src/table/crdt.rs index 8f5e4d7..7c888e3 100644 --- a/src/table/crdt.rs +++ b/src/table/crdt.rs @@ -7,7 +7,9 @@ pub trait CRDT { } impl CRDT for T -where T: Ord + Clone { +where + T: Ord + Clone, +{ fn merge(&mut self, other: &Self) { if other > self { *self = other.clone(); @@ -18,14 +20,14 @@ where T: Ord + Clone { // ---- LWW Register ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWW -{ +pub struct LWW { ts: u64, v: T, } impl LWW -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { pub fn new(value: T) -> Self { Self { @@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } pub fn migrate_from_raw(ts: u64, value: T) -> Self { - Self { - ts, - v: value, - } + Self { ts, v: value } } pub fn update(&mut self, new_value: T) { self.ts = std::cmp::max(self.ts + 1, now_msec()); @@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part pub fn get(&self) -> &T { &self.v } + pub fn get_mut(&mut self) -> &mut T { + &mut self.v + } } impl CRDT for LWW -where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT +where + T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT, { fn merge(&mut self, other: &Self) { if other.ts > self.ts { @@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } } - // ---- Boolean (true as absorbing state) ---- #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] @@ -85,61 +87,48 @@ impl CRDT for Bool { } } - // ---- LWW Map ---- #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] -pub struct LWWMap -{ +pub struct LWWMap { vals: Vec<(K, u64, V)>, } impl LWWMap -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, { pub fn new() -> Self { - Self{ - vals: vec![], - } + Self { vals: vec![] } } pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { - Self{ + Self { vals: vec![(k, ts, v)], } } pub fn take_and_clear(&mut self) -> Self { let vals = std::mem::replace(&mut self.vals, vec![]); - Self{vals} + Self { vals } } pub fn clear(&mut self) { self.vals.clear(); } pub fn update_mutator(&self, k: K, new_v: V) -> Self { - let new_vals = match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { let (_, old_ts, _) = self.vals[i]; - let new_ts = std::cmp::max(old_ts+1, now_msec()); + let new_ts = std::cmp::max(old_ts + 1, now_msec()); vec![(k, new_ts, new_v)] } - Err(_) => { - vec![(k, now_msec(), new_v)] - } + Err(_) => vec![(k, now_msec(), new_v)], }; - Self{ - vals: new_vals, - } + Self { vals: new_vals } } pub fn get(&self, k: &K) -> Option<&V> { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => Some(&self.vals[i].2), - Err(_) => None + Err(_) => None, } } pub fn items(&self) -> &[(K, u64, V)] { @@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part } impl CRDT for LWWMap -where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, - V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, +where + K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, + V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, { fn merge(&mut self, other: &Self) { for (k, ts2, v2) in other.vals.iter() { - match self - .vals - .binary_search_by(|(k2, _, _)| k2.cmp(&k)) - { + match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) { Ok(i) => { - let (_, ts1, v1) = &self.vals[i]; + let (_, ts1, _v1) = &self.vals[i]; if ts2 > ts1 { self.vals[i].1 = *ts2; self.vals[i].2 = v2.clone(); diff --git a/src/table/lib.rs b/src/table/lib.rs index e2bf1f4..704f8f1 100644 --- a/src/table/lib.rs +++ b/src/table/lib.rs @@ -3,9 +3,9 @@ #[macro_use] extern crate log; +pub mod crdt; pub mod schema; pub mod util; -pub mod crdt; pub mod table; pub mod table_fullcopy; @@ -13,5 +13,5 @@ pub mod table_sharded; pub mod table_sync; pub use schema::*; -pub use util::*; pub use table::*; +pub use util::*; diff --git a/src/table/schema.rs b/src/table/schema.rs index 49cede0..d2ec945 100644 --- a/src/table/schema.rs +++ b/src/table/schema.rs @@ -20,7 +20,6 @@ impl PartitionKey for Hash { } } - pub trait SortKey { fn sort_key(&self) -> &[u8]; } @@ -37,7 +36,6 @@ impl SortKey for Hash { } } - pub trait Entry: PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync { @@ -47,7 +45,6 @@ pub trait Entry: fn merge(&mut self, other: &Self); } - #[async_trait] pub trait TableSchema: Send + Sync { type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; @@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync { true } } -