Convert bucket table to better CRDT representation

This commit is contained in:
Alex 2020-11-20 23:01:12 +01:00
parent e02e9e035e
commit f8a04852a2
8 changed files with 157 additions and 192 deletions

View file

@ -2,11 +2,10 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_table::*;
use garage_table::crdt::CRDT; use garage_table::crdt::CRDT;
use garage_table::*;
use garage_rpc::rpc_client::*; use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
@ -80,25 +79,26 @@ impl AdminRpcHandler {
Ok(AdminRPC::BucketInfo(bucket)) Ok(AdminRPC::BucketInfo(bucket))
} }
BucketOperation::Create(query) => { BucketOperation::Create(query) => {
let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?; let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
if bucket.as_ref().filter(|b| !b.deleted).is_some() { Some(mut bucket) => {
if !bucket.is_deleted() {
return Err(Error::BadRPC(format!( return Err(Error::BadRPC(format!(
"Bucket {} already exists", "Bucket {} already exists",
query.name query.name
))); )));
} }
let new_time = match bucket { bucket
Some(b) => std::cmp::max(b.timestamp + 1, now_msec()), .state
None => now_msec(), .update(BucketState::Present(crdt::LWWMap::new()));
bucket
}
None => Bucket::new(query.name.clone()),
}; };
self.garage self.garage.bucket_table.insert(&bucket).await?;
.bucket_table
.insert(&Bucket::new(query.name.clone(), new_time, false, vec![]))
.await?;
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name))) Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
} }
BucketOperation::Delete(query) => { BucketOperation::Delete(query) => {
let bucket = self.get_existing_bucket(&query.name).await?; let mut bucket = self.get_existing_bucket(&query.name).await?;
let objects = self let objects = self
.garage .garage
.object_table .object_table
@ -113,25 +113,18 @@ impl AdminRpcHandler {
))); )));
} }
// --- done checking, now commit --- // --- done checking, now commit ---
for ak in bucket.authorized_keys() { for (key_id, _, _) in bucket.authorized_keys() {
if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? { if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
if !key.deleted.get() { if !key.deleted.get() {
self.update_key_bucket(key, &bucket.name, false, false) self.update_key_bucket(key, &bucket.name, false, false)
.await?; .await?;
} }
} else { } else {
return Err(Error::Message(format!("Key not found: {}", ak.key_id))); return Err(Error::Message(format!("Key not found: {}", key_id)));
} }
} }
self.garage bucket.state.update(BucketState::Deleted);
.bucket_table self.garage.bucket_table.insert(&bucket).await?;
.insert(&Bucket::new(
query.name.clone(),
std::cmp::max(bucket.timestamp + 1, now_msec()),
true,
vec![],
))
.await?;
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name))) Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
} }
BucketOperation::Allow(query) => { BucketOperation::Allow(query) => {
@ -202,10 +195,8 @@ impl AdminRpcHandler {
} }
// --- done checking, now commit --- // --- done checking, now commit ---
for (ab_name, _, _) in key.authorized_buckets.items().iter() { for (ab_name, _, _) in key.authorized_buckets.items().iter() {
if let Some(bucket) = if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? {
self.garage.bucket_table.get(&EmptyKey, ab_name).await? if !bucket.is_deleted() {
{
if !bucket.deleted {
self.update_bucket_key(bucket, &key.key_id, false, false) self.update_bucket_key(bucket, &key.key_id, false, false)
.await?; .await?;
} }
@ -228,7 +219,7 @@ impl AdminRpcHandler {
.bucket_table .bucket_table
.get(&EmptyKey, bucket) .get(&EmptyKey, bucket)
.await? .await?
.filter(|b| !b.deleted) .filter(|b| !b.is_deleted())
.map(Ok) .map(Ok)
.unwrap_or(Err(Error::BadRPC(format!( .unwrap_or(Err(Error::BadRPC(format!(
"Bucket {} does not exist", "Bucket {} does not exist",
@ -253,24 +244,20 @@ impl AdminRpcHandler {
allow_read: bool, allow_read: bool,
allow_write: bool, allow_write: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let timestamp = match bucket if let BucketState::Present(ak) = bucket.state.get_mut() {
.authorized_keys() let old_ak = ak.take_and_clear();
.iter() ak.merge(&old_ak.update_mutator(
.find(|x| x.key_id == *key_id) key_id.to_string(),
{ PermissionSet {
None => now_msec(),
Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
};
bucket.clear_keys();
bucket
.add_key(AllowedKey {
key_id: key_id.clone(),
timestamp,
allow_read, allow_read,
allow_write, allow_write,
}) },
.unwrap(); ));
self.garage.bucket_table.insert(&bucket).await?; } else {
return Err(Error::Message(format!(
"Bucket is deleted in update_bucket_key"
)));
}
Ok(()) Ok(())
} }
@ -282,12 +269,12 @@ impl AdminRpcHandler {
allow_write: bool, allow_write: bool,
) -> Result<(), Error> { ) -> Result<(), Error> {
let old_map = key.authorized_buckets.take_and_clear(); let old_map = key.authorized_buckets.take_and_clear();
key.authorized_buckets.merge( key.authorized_buckets.merge(&old_map.update_mutator(
&old_map.update_mutator(
bucket.clone(), bucket.clone(),
PermissionSet { PermissionSet {
allow_read, allow_write allow_read,
} allow_write,
},
)); ));
self.garage.key_table.insert(&key).await?; self.garage.key_table.insert(&key).await?;
Ok(()) Ok(())

View file

@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use garage_table::table_sharded::TableShardedReplication; use garage_table::table_sharded::TableShardedReplication;
use garage_table::{TableReplication, DeletedFilter}; use garage_table::{DeletedFilter, TableReplication};
use crate::block_ref_table::*; use crate::block_ref_table::*;

View file

@ -1,69 +1,59 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_table::crdt::CRDT;
use garage_table::*; use garage_table::*;
use garage_util::error::Error; use garage_util::error::Error;
use crate::key_table::PermissionSet;
use model010::bucket_table as prev;
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct Bucket { pub struct Bucket {
// Primary key // Primary key
pub name: String, pub name: String,
// Timestamp and deletion pub state: crdt::LWW<BucketState>,
// Upon version increment, all info is replaced
pub timestamp: u64,
pub deleted: bool,
// Authorized keys
authorized_keys: Vec<AllowedKey>,
}
impl Bucket {
pub fn new(
name: String,
timestamp: u64,
deleted: bool,
authorized_keys: Vec<AllowedKey>,
) -> Self {
let mut ret = Bucket {
name,
timestamp,
deleted,
authorized_keys: vec![],
};
for key in authorized_keys {
ret.add_key(key)
.expect("Duplicate AllowedKey in Bucket constructor");
}
ret
}
/// Add a key only if it is not already present
pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
match self
.authorized_keys
.binary_search_by(|k| k.key_id.cmp(&key.key_id))
{
Err(i) => {
self.authorized_keys.insert(i, key);
Ok(())
}
Ok(_) => Err(()),
}
}
pub fn authorized_keys(&self) -> &[AllowedKey] {
&self.authorized_keys[..]
}
pub fn clear_keys(&mut self) {
self.authorized_keys.clear();
}
} }
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
pub struct AllowedKey { pub enum BucketState {
pub key_id: String, Deleted,
pub timestamp: u64, Present(crdt::LWWMap<String, PermissionSet>),
pub allow_read: bool, }
pub allow_write: bool,
impl CRDT for BucketState {
fn merge(&mut self, o: &Self) {
match o {
BucketState::Deleted => *self = BucketState::Deleted,
BucketState::Present(other_ak) => {
if let BucketState::Present(ak) = self {
ak.merge(other_ak);
}
}
}
}
}
impl Bucket {
pub fn new(name: String) -> Self {
let ret = Bucket {
name,
state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())),
};
ret
}
pub fn is_deleted(&self) -> bool {
*self.state.get() == BucketState::Deleted
}
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
match self.state.get() {
BucketState::Deleted => &[],
BucketState::Present(ak) => ak.items(),
}
}
} }
impl Entry<EmptyKey, String> for Bucket { impl Entry<EmptyKey, String> for Bucket {
@ -75,36 +65,12 @@ impl Entry<EmptyKey, String> for Bucket {
} }
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
if other.timestamp > self.timestamp { self.state.merge(&other.state);
*self = other.clone();
return;
}
if self.timestamp > other.timestamp || self.deleted {
return;
}
for ak in other.authorized_keys.iter() {
match self
.authorized_keys
.binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id))
{
Ok(i) => {
let our_ak = &mut self.authorized_keys[i];
if ak.timestamp > our_ak.timestamp {
*our_ak = ak.clone();
}
}
Err(i) => {
self.authorized_keys.insert(i, ak.clone());
}
}
}
} }
} }
pub struct BucketTable; pub struct BucketTable;
#[async_trait] #[async_trait]
impl TableSchema for BucketTable { impl TableSchema for BucketTable {
type P = EmptyKey; type P = EmptyKey;
@ -117,6 +83,35 @@ impl TableSchema for BucketTable {
} }
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted) filter.apply(entry.is_deleted())
}
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) {
Ok(x) => x,
Err(_) => return None,
};
if old.deleted {
Some(Bucket {
name: old.name,
state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted),
})
} else {
let mut keys = crdt::LWWMap::new();
for ak in old.authorized_keys() {
keys.merge(&crdt::LWWMap::migrate_from_raw_item(
ak.key_id.clone(),
ak.timestamp,
PermissionSet {
allow_read: ak.allow_read,
allow_write: ak.allow_write,
},
));
}
Some(Bucket {
name: old.name,
state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)),
})
}
} }
} }

View file

@ -1,8 +1,8 @@
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_table::*;
use garage_table::crdt::CRDT; use garage_table::crdt::CRDT;
use garage_table::*;
use garage_util::error::Error; use garage_util::error::Error;
@ -24,7 +24,6 @@ pub struct Key {
// Authorized keys // Authorized keys
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>, pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
// CRDT interaction: deleted implies authorized_buckets is empty // CRDT interaction: deleted implies authorized_buckets is empty
} }
@ -128,7 +127,8 @@ impl TableSchema for KeyTable {
PermissionSet { PermissionSet {
allow_read: ab.allow_read, allow_read: ab.allow_read,
allow_write: ab.allow_write, allow_write: ab.allow_write,
}); },
);
new.authorized_buckets.merge(&it); new.authorized_buckets.merge(&it);
} }
Some(new) Some(new)

View file

@ -7,7 +7,9 @@ pub trait CRDT {
} }
impl<T> CRDT for T impl<T> CRDT for T
where T: Ord + Clone { where
T: Ord + Clone,
{
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
if other > self { if other > self {
*self = other.clone(); *self = other.clone();
@ -18,14 +20,14 @@ where T: Ord + Clone {
// ---- LWW Register ---- // ---- LWW Register ----
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWW<T> pub struct LWW<T> {
{
ts: u64, ts: u64,
v: T, v: T,
} }
impl<T> LWW<T> impl<T> LWW<T>
where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord where
T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT,
{ {
pub fn new(value: T) -> Self { pub fn new(value: T) -> Self {
Self { Self {
@ -34,10 +36,7 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
} }
} }
pub fn migrate_from_raw(ts: u64, value: T) -> Self { pub fn migrate_from_raw(ts: u64, value: T) -> Self {
Self { Self { ts, v: value }
ts,
v: value,
}
} }
pub fn update(&mut self, new_value: T) { pub fn update(&mut self, new_value: T) {
self.ts = std::cmp::max(self.ts + 1, now_msec()); self.ts = std::cmp::max(self.ts + 1, now_msec());
@ -46,10 +45,14 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
pub fn get(&self) -> &T { pub fn get(&self) -> &T {
&self.v &self.v
} }
pub fn get_mut(&mut self) -> &mut T {
&mut self.v
}
} }
impl<T> CRDT for LWW<T> impl<T> CRDT for LWW<T>
where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT where
T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT,
{ {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
if other.ts > self.ts { if other.ts > self.ts {
@ -61,7 +64,6 @@ where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
} }
} }
// ---- Boolean (true as absorbing state) ---- // ---- Boolean (true as absorbing state) ----
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
@ -85,23 +87,20 @@ impl CRDT for Bool {
} }
} }
// ---- LWW Map ---- // ---- LWW Map ----
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LWWMap<K, V> pub struct LWWMap<K, V> {
{
vals: Vec<(K, u64, V)>, vals: Vec<(K, u64, V)>,
} }
impl<K, V> LWWMap<K, V> impl<K, V> LWWMap<K, V>
where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, where
K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord, V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
{ {
pub fn new() -> Self { pub fn new() -> Self {
Self{ Self { vals: vec![] }
vals: vec![],
}
} }
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self { pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
Self { Self {
@ -116,30 +115,20 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
self.vals.clear(); self.vals.clear();
} }
pub fn update_mutator(&self, k: K, new_v: V) -> Self { pub fn update_mutator(&self, k: K, new_v: V) -> Self {
let new_vals = match self let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
.vals
.binary_search_by(|(k2, _, _)| k2.cmp(&k))
{
Ok(i) => { Ok(i) => {
let (_, old_ts, _) = self.vals[i]; let (_, old_ts, _) = self.vals[i];
let new_ts = std::cmp::max(old_ts + 1, now_msec()); let new_ts = std::cmp::max(old_ts + 1, now_msec());
vec![(k, new_ts, new_v)] vec![(k, new_ts, new_v)]
} }
Err(_) => { Err(_) => vec![(k, now_msec(), new_v)],
vec![(k, now_msec(), new_v)]
}
}; };
Self{ Self { vals: new_vals }
vals: new_vals,
}
} }
pub fn get(&self, k: &K) -> Option<&V> { pub fn get(&self, k: &K) -> Option<&V> {
match self match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
.vals
.binary_search_by(|(k2, _, _)| k2.cmp(&k))
{
Ok(i) => Some(&self.vals[i].2), Ok(i) => Some(&self.vals[i].2),
Err(_) => None Err(_) => None,
} }
} }
pub fn items(&self) -> &[(K, u64, V)] { pub fn items(&self) -> &[(K, u64, V)] {
@ -148,17 +137,15 @@ where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Part
} }
impl<K, V> CRDT for LWWMap<K, V> impl<K, V> CRDT for LWWMap<K, V>
where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord, where
K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT, V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
{ {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
for (k, ts2, v2) in other.vals.iter() { for (k, ts2, v2) in other.vals.iter() {
match self match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
.vals
.binary_search_by(|(k2, _, _)| k2.cmp(&k))
{
Ok(i) => { Ok(i) => {
let (_, ts1, v1) = &self.vals[i]; let (_, ts1, _v1) = &self.vals[i];
if ts2 > ts1 { if ts2 > ts1 {
self.vals[i].1 = *ts2; self.vals[i].1 = *ts2;
self.vals[i].2 = v2.clone(); self.vals[i].2 = v2.clone();

View file

@ -3,9 +3,9 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod crdt;
pub mod schema; pub mod schema;
pub mod util; pub mod util;
pub mod crdt;
pub mod table; pub mod table;
pub mod table_fullcopy; pub mod table_fullcopy;
@ -13,5 +13,5 @@ pub mod table_sharded;
pub mod table_sync; pub mod table_sync;
pub use schema::*; pub use schema::*;
pub use util::*;
pub use table::*; pub use table::*;
pub use util::*;

View file

@ -20,7 +20,6 @@ impl PartitionKey for Hash {
} }
} }
pub trait SortKey { pub trait SortKey {
fn sort_key(&self) -> &[u8]; fn sort_key(&self) -> &[u8];
} }
@ -37,7 +36,6 @@ impl SortKey for Hash {
} }
} }
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{ {
@ -47,7 +45,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
fn merge(&mut self, other: &Self); fn merge(&mut self, other: &Self);
} }
#[async_trait] #[async_trait]
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync {
true true
} }
} }