forked from Deuxfleurs/garage
Begin improve model to use better CRDTs
This commit is contained in:
parent
5dc304ac41
commit
e02e9e035e
5 changed files with 245 additions and 96 deletions
|
@ -68,7 +68,7 @@ pub async fn check_signature(
|
|||
.key_table
|
||||
.get(&EmptyKey, &authorization.key_id)
|
||||
.await?
|
||||
.filter(|k| !k.deleted)
|
||||
.filter(|k| !k.deleted.get())
|
||||
.ok_or(Error::Forbidden(format!(
|
||||
"No such key: {}",
|
||||
authorization.key_id
|
||||
|
|
|
@ -6,6 +6,7 @@ use garage_util::data::*;
|
|||
use garage_util::error::Error;
|
||||
|
||||
use garage_table::*;
|
||||
use garage_table::crdt::CRDT;
|
||||
|
||||
use garage_rpc::rpc_client::*;
|
||||
use garage_rpc::rpc_server::*;
|
||||
|
@ -114,7 +115,7 @@ impl AdminRpcHandler {
|
|||
// --- done checking, now commit ---
|
||||
for ak in bucket.authorized_keys() {
|
||||
if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? {
|
||||
if !key.deleted {
|
||||
if !key.deleted.get() {
|
||||
self.update_key_bucket(key, &bucket.name, false, false)
|
||||
.await?;
|
||||
}
|
||||
|
@ -173,7 +174,7 @@ impl AdminRpcHandler {
|
|||
.get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
|
||||
.await?
|
||||
.iter()
|
||||
.map(|k| (k.key_id.to_string(), k.name.to_string()))
|
||||
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
|
||||
.collect::<Vec<_>>();
|
||||
Ok(AdminRPC::KeyList(key_ids))
|
||||
}
|
||||
|
@ -182,14 +183,13 @@ impl AdminRpcHandler {
|
|||
Ok(AdminRPC::KeyInfo(key))
|
||||
}
|
||||
KeyOperation::New(query) => {
|
||||
let key = Key::new(query.name, vec![]);
|
||||
let key = Key::new(query.name);
|
||||
self.garage.key_table.insert(&key).await?;
|
||||
Ok(AdminRPC::KeyInfo(key))
|
||||
}
|
||||
KeyOperation::Rename(query) => {
|
||||
let mut key = self.get_existing_key(&query.key_id).await?;
|
||||
key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec());
|
||||
key.name = query.new_name;
|
||||
key.name.update(query.new_name);
|
||||
self.garage.key_table.insert(&key).await?;
|
||||
Ok(AdminRPC::KeyInfo(key))
|
||||
}
|
||||
|
@ -201,16 +201,16 @@ impl AdminRpcHandler {
|
|||
)));
|
||||
}
|
||||
// --- done checking, now commit ---
|
||||
for ab in key.authorized_buckets().iter() {
|
||||
for (ab_name, _, _) in key.authorized_buckets.items().iter() {
|
||||
if let Some(bucket) =
|
||||
self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await?
|
||||
self.garage.bucket_table.get(&EmptyKey, ab_name).await?
|
||||
{
|
||||
if !bucket.deleted {
|
||||
self.update_bucket_key(bucket, &key.key_id, false, false)
|
||||
.await?;
|
||||
}
|
||||
} else {
|
||||
return Err(Error::Message(format!("Bucket not found: {}", ab.bucket)));
|
||||
return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
|
||||
}
|
||||
}
|
||||
let del_key = Key::delete(key.key_id);
|
||||
|
@ -241,7 +241,7 @@ impl AdminRpcHandler {
|
|||
.key_table
|
||||
.get(&EmptyKey, id)
|
||||
.await?
|
||||
.filter(|k| !k.deleted)
|
||||
.filter(|k| !k.deleted.get())
|
||||
.map(Ok)
|
||||
.unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
|
||||
}
|
||||
|
@ -281,22 +281,14 @@ impl AdminRpcHandler {
|
|||
allow_read: bool,
|
||||
allow_write: bool,
|
||||
) -> Result<(), Error> {
|
||||
let timestamp = match key
|
||||
.authorized_buckets()
|
||||
.iter()
|
||||
.find(|x| x.bucket == *bucket)
|
||||
{
|
||||
None => now_msec(),
|
||||
Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
|
||||
};
|
||||
key.clear_buckets();
|
||||
key.add_bucket(AllowedBucket {
|
||||
bucket: bucket.clone(),
|
||||
timestamp,
|
||||
allow_read,
|
||||
allow_write,
|
||||
})
|
||||
.unwrap();
|
||||
let old_map = key.authorized_buckets.take_and_clear();
|
||||
key.authorized_buckets.merge(
|
||||
&old_map.update_mutator(
|
||||
bucket.clone(),
|
||||
PermissionSet{
|
||||
allow_read, allow_write
|
||||
}
|
||||
));
|
||||
self.garage.key_table.insert(&key).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -2,9 +2,12 @@ use async_trait::async_trait;
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_table::*;
|
||||
use garage_util::data::*;
|
||||
use garage_table::crdt::CRDT;
|
||||
|
||||
use garage_util::error::Error;
|
||||
|
||||
use model010::key_table as prev;
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct Key {
|
||||
// Primary key
|
||||
|
@ -14,83 +17,56 @@ pub struct Key {
|
|||
pub secret_key: String,
|
||||
|
||||
// Name
|
||||
pub name: String,
|
||||
pub name_timestamp: u64,
|
||||
pub name: crdt::LWW<String>,
|
||||
|
||||
// Deletion
|
||||
pub deleted: bool,
|
||||
pub deleted: crdt::Bool,
|
||||
|
||||
// Authorized keys
|
||||
authorized_buckets: Vec<AllowedBucket>,
|
||||
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
|
||||
|
||||
// CRDT interaction: deleted implies authorized_buckets is empty
|
||||
}
|
||||
|
||||
impl Key {
|
||||
pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self {
|
||||
pub fn new(name: String) -> Self {
|
||||
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
|
||||
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
|
||||
let mut ret = Self {
|
||||
let ret = Self {
|
||||
key_id,
|
||||
secret_key,
|
||||
name,
|
||||
name_timestamp: now_msec(),
|
||||
deleted: false,
|
||||
authorized_buckets: vec![],
|
||||
name: crdt::LWW::new(name),
|
||||
deleted: crdt::Bool::new(false),
|
||||
authorized_buckets: crdt::LWWMap::new(),
|
||||
};
|
||||
for b in buckets {
|
||||
ret.add_bucket(b)
|
||||
.expect("Duplicate AllowedBucket in Key constructor");
|
||||
}
|
||||
ret
|
||||
}
|
||||
pub fn delete(key_id: String) -> Self {
|
||||
Self {
|
||||
key_id,
|
||||
secret_key: "".into(),
|
||||
name: "".into(),
|
||||
name_timestamp: now_msec(),
|
||||
deleted: true,
|
||||
authorized_buckets: vec![],
|
||||
name: crdt::LWW::new("".to_string()),
|
||||
deleted: crdt::Bool::new(true),
|
||||
authorized_buckets: crdt::LWWMap::new(),
|
||||
}
|
||||
}
|
||||
/// Add an authorized bucket, only if it wasn't there before
|
||||
pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> {
|
||||
match self
|
||||
.authorized_buckets
|
||||
.binary_search_by(|b| b.bucket.cmp(&new.bucket))
|
||||
{
|
||||
Err(i) => {
|
||||
self.authorized_buckets.insert(i, new);
|
||||
Ok(())
|
||||
}
|
||||
Ok(_) => Err(()),
|
||||
}
|
||||
}
|
||||
pub fn authorized_buckets(&self) -> &[AllowedBucket] {
|
||||
&self.authorized_buckets[..]
|
||||
}
|
||||
pub fn clear_buckets(&mut self) {
|
||||
self.authorized_buckets.clear();
|
||||
}
|
||||
pub fn allow_read(&self, bucket: &str) -> bool {
|
||||
self.authorized_buckets
|
||||
.iter()
|
||||
.find(|x| x.bucket.as_str() == bucket)
|
||||
.get(&bucket.to_string())
|
||||
.map(|x| x.allow_read)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
pub fn allow_write(&self, bucket: &str) -> bool {
|
||||
self.authorized_buckets
|
||||
.iter()
|
||||
.find(|x| x.bucket.as_str() == bucket)
|
||||
.get(&bucket.to_string())
|
||||
.map(|x| x.allow_write)
|
||||
.unwrap_or(false)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct AllowedBucket {
|
||||
pub bucket: String,
|
||||
pub timestamp: u64,
|
||||
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct PermissionSet {
|
||||
pub allow_read: bool,
|
||||
pub allow_write: bool,
|
||||
}
|
||||
|
@ -104,35 +80,15 @@ impl Entry<EmptyKey, String> for Key {
|
|||
}
|
||||
|
||||
fn merge(&mut self, other: &Self) {
|
||||
if other.name_timestamp > self.name_timestamp {
|
||||
self.name_timestamp = other.name_timestamp;
|
||||
self.name = other.name.clone();
|
||||
}
|
||||
self.name.merge(&other.name);
|
||||
self.deleted.merge(&other.deleted);
|
||||
|
||||
if other.deleted {
|
||||
self.deleted = true;
|
||||
}
|
||||
if self.deleted {
|
||||
if self.deleted.get() {
|
||||
self.authorized_buckets.clear();
|
||||
return;
|
||||
}
|
||||
|
||||
for ab in other.authorized_buckets.iter() {
|
||||
match self
|
||||
.authorized_buckets
|
||||
.binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket))
|
||||
{
|
||||
Ok(i) => {
|
||||
let our_ab = &mut self.authorized_buckets[i];
|
||||
if ab.timestamp > our_ab.timestamp {
|
||||
*our_ab = ab.clone();
|
||||
}
|
||||
}
|
||||
Err(i) => {
|
||||
self.authorized_buckets.insert(i, ab.clone());
|
||||
}
|
||||
}
|
||||
}
|
||||
self.authorized_buckets.merge(&other.authorized_buckets);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -150,6 +106,31 @@ impl TableSchema for KeyTable {
|
|||
}
|
||||
|
||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||
filter.apply(entry.deleted)
|
||||
filter.apply(entry.deleted.get())
|
||||
}
|
||||
|
||||
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
||||
let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) {
|
||||
Ok(x) => x,
|
||||
Err(_) => return None,
|
||||
};
|
||||
let mut new = Self::E {
|
||||
key_id: old.key_id.clone(),
|
||||
secret_key: old.secret_key.clone(),
|
||||
name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()),
|
||||
deleted: crdt::Bool::new(old.deleted),
|
||||
authorized_buckets: crdt::LWWMap::new(),
|
||||
};
|
||||
for ab in old.authorized_buckets() {
|
||||
let it = crdt::LWWMap::migrate_from_raw_item(
|
||||
ab.bucket.clone(),
|
||||
ab.timestamp,
|
||||
PermissionSet{
|
||||
allow_read: ab.allow_read,
|
||||
allow_write: ab.allow_write,
|
||||
});
|
||||
new.authorized_buckets.merge(&it);
|
||||
}
|
||||
Some(new)
|
||||
}
|
||||
}
|
||||
|
|
175
src/table/crdt.rs
Normal file
175
src/table/crdt.rs
Normal file
|
@ -0,0 +1,175 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_util::data::*;
|
||||
|
||||
pub trait CRDT {
|
||||
fn merge(&mut self, other: &Self);
|
||||
}
|
||||
|
||||
impl<T> CRDT for T
|
||||
where T: Ord + Clone {
|
||||
fn merge(&mut self, other: &Self) {
|
||||
if other > self {
|
||||
*self = other.clone();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- LWW Register ----
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct LWW<T>
|
||||
{
|
||||
ts: u64,
|
||||
v: T,
|
||||
}
|
||||
|
||||
impl<T> LWW<T>
|
||||
where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord
|
||||
{
|
||||
pub fn new(value: T) -> Self {
|
||||
Self {
|
||||
ts: now_msec(),
|
||||
v: value,
|
||||
}
|
||||
}
|
||||
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
|
||||
Self {
|
||||
ts,
|
||||
v: value,
|
||||
}
|
||||
}
|
||||
pub fn update(&mut self, new_value: T) {
|
||||
self.ts = std::cmp::max(self.ts + 1, now_msec());
|
||||
self.v = new_value;
|
||||
}
|
||||
pub fn get(&self) -> &T {
|
||||
&self.v
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> CRDT for LWW<T>
|
||||
where T: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + CRDT
|
||||
{
|
||||
fn merge(&mut self, other: &Self) {
|
||||
if other.ts > self.ts {
|
||||
self.ts = other.ts;
|
||||
self.v = other.v.clone();
|
||||
} else if other.ts == self.ts {
|
||||
self.v.merge(&other.v);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ---- Boolean (true as absorbing state) ----
|
||||
|
||||
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct Bool(bool);
|
||||
|
||||
impl Bool {
|
||||
pub fn new(b: bool) -> Self {
|
||||
Self(b)
|
||||
}
|
||||
pub fn set(&mut self) {
|
||||
self.0 = true;
|
||||
}
|
||||
pub fn get(&self) -> bool {
|
||||
self.0
|
||||
}
|
||||
}
|
||||
|
||||
impl CRDT for Bool {
|
||||
fn merge(&mut self, other: &Self) {
|
||||
self.0 = self.0 || other.0;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// ---- LWW Map ----
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||
pub struct LWWMap<K, V>
|
||||
{
|
||||
vals: Vec<(K, u64, V)>,
|
||||
}
|
||||
|
||||
impl<K, V> LWWMap<K, V>
|
||||
where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
|
||||
V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + PartialEq + Ord,
|
||||
{
|
||||
pub fn new() -> Self {
|
||||
Self{
|
||||
vals: vec![],
|
||||
}
|
||||
}
|
||||
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
|
||||
Self{
|
||||
vals: vec![(k, ts, v)],
|
||||
}
|
||||
}
|
||||
pub fn take_and_clear(&mut self) -> Self {
|
||||
let vals = std::mem::replace(&mut self.vals, vec![]);
|
||||
Self{vals}
|
||||
}
|
||||
pub fn clear(&mut self) {
|
||||
self.vals.clear();
|
||||
}
|
||||
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
|
||||
let new_vals = match self
|
||||
.vals
|
||||
.binary_search_by(|(k2, _, _)| k2.cmp(&k))
|
||||
{
|
||||
Ok(i) => {
|
||||
let (_, old_ts, _) = self.vals[i];
|
||||
let new_ts = std::cmp::max(old_ts+1, now_msec());
|
||||
vec![(k, new_ts, new_v)]
|
||||
}
|
||||
Err(_) => {
|
||||
vec![(k, now_msec(), new_v)]
|
||||
}
|
||||
};
|
||||
Self{
|
||||
vals: new_vals,
|
||||
}
|
||||
}
|
||||
pub fn get(&self, k: &K) -> Option<&V> {
|
||||
match self
|
||||
.vals
|
||||
.binary_search_by(|(k2, _, _)| k2.cmp(&k))
|
||||
{
|
||||
Ok(i) => Some(&self.vals[i].2),
|
||||
Err(_) => None
|
||||
}
|
||||
}
|
||||
pub fn items(&self) -> &[(K, u64, V)] {
|
||||
&self.vals[..]
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> CRDT for LWWMap<K, V>
|
||||
where K: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + Ord,
|
||||
V: Serialize + for<'de> Deserialize<'de> + Clone + core::fmt::Debug + CRDT,
|
||||
{
|
||||
fn merge(&mut self, other: &Self) {
|
||||
for (k, ts2, v2) in other.vals.iter() {
|
||||
match self
|
||||
.vals
|
||||
.binary_search_by(|(k2, _, _)| k2.cmp(&k))
|
||||
{
|
||||
Ok(i) => {
|
||||
let (_, ts1, v1) = &self.vals[i];
|
||||
if ts2 > ts1 {
|
||||
self.vals[i].1 = *ts2;
|
||||
self.vals[i].2 = v2.clone();
|
||||
} else if ts1 == ts2 {
|
||||
self.vals[i].2.merge(&v2);
|
||||
}
|
||||
}
|
||||
Err(i) => {
|
||||
self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -5,6 +5,7 @@ extern crate log;
|
|||
|
||||
pub mod schema;
|
||||
pub mod util;
|
||||
pub mod crdt;
|
||||
|
||||
pub mod table;
|
||||
pub mod table_fullcopy;
|
||||
|
|
Loading…
Reference in a new issue