Merge pull request 'Use cleaner CRDT data types for objects to avoid accidents like #16' (#18) from feature/better-crdt into master
Reviewed-on: Deuxfleurs/garage#18
This commit is contained in:
commit
b3814b15cc
12 changed files with 400 additions and 239 deletions
39
Cargo.lock
generated
39
Cargo.lock
generated
|
@ -15,6 +15,16 @@ version = "0.4.7"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
|
checksum = "4d25d88fd6b8041580a654f9d0c581a047baee2b3efee13275f2fc392fc75034"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "arrayvec"
|
||||||
|
version = "0.3.25"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "06f59fe10306bb78facd90d28c2038ad23ffaaefa85bac43c8a434cde383334f"
|
||||||
|
dependencies = [
|
||||||
|
"nodrop",
|
||||||
|
"odds",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "async-trait"
|
name = "async-trait"
|
||||||
version = "0.1.36"
|
version = "0.1.36"
|
||||||
|
@ -588,6 +598,7 @@ dependencies = [
|
||||||
"garage_rpc 0.1.0",
|
"garage_rpc 0.1.0",
|
||||||
"garage_util 0.1.0",
|
"garage_util 0.1.0",
|
||||||
"hex",
|
"hex",
|
||||||
|
"hexdump",
|
||||||
"log",
|
"log",
|
||||||
"rand",
|
"rand",
|
||||||
"rmp-serde",
|
"rmp-serde",
|
||||||
|
@ -743,6 +754,16 @@ version = "0.3.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
|
checksum = "805026a5d0141ffc30abb3be3173848ad46a1b1664fe632428479619a3644d77"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hexdump"
|
||||||
|
version = "0.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "850f3f2c33d20c0f96c4485e087dd580ff041d720988ebf4c84a42acf739262b"
|
||||||
|
dependencies = [
|
||||||
|
"arrayvec",
|
||||||
|
"itertools",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "hmac"
|
name = "hmac"
|
||||||
version = "0.7.1"
|
version = "0.7.1"
|
||||||
|
@ -870,6 +891,12 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "itertools"
|
||||||
|
version = "0.4.19"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "c4a9b56eb56058f43dc66e58f40a214b2ccbc9f3df51861b63d51dec7b65bc3f"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "itoa"
|
name = "itoa"
|
||||||
version = "0.4.6"
|
version = "0.4.6"
|
||||||
|
@ -1027,6 +1054,12 @@ dependencies = [
|
||||||
"winapi 0.3.9",
|
"winapi 0.3.9",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nodrop"
|
||||||
|
version = "0.1.14"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "num-integer"
|
name = "num-integer"
|
||||||
version = "0.1.43"
|
version = "0.1.43"
|
||||||
|
@ -1056,6 +1089,12 @@ dependencies = [
|
||||||
"libc",
|
"libc",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "odds"
|
||||||
|
version = "0.2.26"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "4eae0151b9dacf24fcc170d9995e511669a082856a91f958a2fe380bfab3fb22"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "once_cell"
|
name = "once_cell"
|
||||||
version = "1.4.0"
|
version = "1.4.0"
|
||||||
|
|
|
@ -322,7 +322,7 @@ pub async fn handle_put_part(
|
||||||
let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?;
|
let (object, first_block) = futures::try_join!(get_object_fut, get_first_block_fut)?;
|
||||||
|
|
||||||
// Check object is valid and multipart block can be accepted
|
// Check object is valid and multipart block can be accepted
|
||||||
let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
|
let first_block = first_block.ok_or(Error::BadRequest(format!("Empty body")))?;
|
||||||
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
|
let object = object.ok_or(Error::BadRequest(format!("Object not found")))?;
|
||||||
|
|
||||||
if !object
|
if !object
|
||||||
|
|
|
@ -68,7 +68,7 @@ pub async fn check_signature(
|
||||||
.key_table
|
.key_table
|
||||||
.get(&EmptyKey, &authorization.key_id)
|
.get(&EmptyKey, &authorization.key_id)
|
||||||
.await?
|
.await?
|
||||||
.filter(|k| !k.deleted)
|
.filter(|k| !k.deleted.get())
|
||||||
.ok_or(Error::Forbidden(format!(
|
.ok_or(Error::Forbidden(format!(
|
||||||
"No such key: {}",
|
"No such key: {}",
|
||||||
authorization.key_id
|
authorization.key_id
|
||||||
|
|
|
@ -2,9 +2,9 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::data::*;
|
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
use garage_table::crdt::CRDT;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use garage_rpc::rpc_client::*;
|
use garage_rpc::rpc_client::*;
|
||||||
|
@ -79,25 +79,26 @@ impl AdminRpcHandler {
|
||||||
Ok(AdminRPC::BucketInfo(bucket))
|
Ok(AdminRPC::BucketInfo(bucket))
|
||||||
}
|
}
|
||||||
BucketOperation::Create(query) => {
|
BucketOperation::Create(query) => {
|
||||||
let bucket = self.garage.bucket_table.get(&EmptyKey, &query.name).await?;
|
let bucket = match self.garage.bucket_table.get(&EmptyKey, &query.name).await? {
|
||||||
if bucket.as_ref().filter(|b| !b.deleted).is_some() {
|
Some(mut bucket) => {
|
||||||
return Err(Error::BadRPC(format!(
|
if !bucket.is_deleted() {
|
||||||
"Bucket {} already exists",
|
return Err(Error::BadRPC(format!(
|
||||||
query.name
|
"Bucket {} already exists",
|
||||||
)));
|
query.name
|
||||||
}
|
)));
|
||||||
let new_time = match bucket {
|
}
|
||||||
Some(b) => std::cmp::max(b.timestamp + 1, now_msec()),
|
bucket
|
||||||
None => now_msec(),
|
.state
|
||||||
|
.update(BucketState::Present(crdt::LWWMap::new()));
|
||||||
|
bucket
|
||||||
|
}
|
||||||
|
None => Bucket::new(query.name.clone()),
|
||||||
};
|
};
|
||||||
self.garage
|
self.garage.bucket_table.insert(&bucket).await?;
|
||||||
.bucket_table
|
|
||||||
.insert(&Bucket::new(query.name.clone(), new_time, false, vec![]))
|
|
||||||
.await?;
|
|
||||||
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
|
Ok(AdminRPC::Ok(format!("Bucket {} was created.", query.name)))
|
||||||
}
|
}
|
||||||
BucketOperation::Delete(query) => {
|
BucketOperation::Delete(query) => {
|
||||||
let bucket = self.get_existing_bucket(&query.name).await?;
|
let mut bucket = self.get_existing_bucket(&query.name).await?;
|
||||||
let objects = self
|
let objects = self
|
||||||
.garage
|
.garage
|
||||||
.object_table
|
.object_table
|
||||||
|
@ -112,25 +113,18 @@ impl AdminRpcHandler {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
// --- done checking, now commit ---
|
// --- done checking, now commit ---
|
||||||
for ak in bucket.authorized_keys() {
|
for (key_id, _, _) in bucket.authorized_keys() {
|
||||||
if let Some(key) = self.garage.key_table.get(&EmptyKey, &ak.key_id).await? {
|
if let Some(key) = self.garage.key_table.get(&EmptyKey, key_id).await? {
|
||||||
if !key.deleted {
|
if !key.deleted.get() {
|
||||||
self.update_key_bucket(key, &bucket.name, false, false)
|
self.update_key_bucket(key, &bucket.name, false, false)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Message(format!("Key not found: {}", ak.key_id)));
|
return Err(Error::Message(format!("Key not found: {}", key_id)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
self.garage
|
bucket.state.update(BucketState::Deleted);
|
||||||
.bucket_table
|
self.garage.bucket_table.insert(&bucket).await?;
|
||||||
.insert(&Bucket::new(
|
|
||||||
query.name.clone(),
|
|
||||||
std::cmp::max(bucket.timestamp + 1, now_msec()),
|
|
||||||
true,
|
|
||||||
vec![],
|
|
||||||
))
|
|
||||||
.await?;
|
|
||||||
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
|
Ok(AdminRPC::Ok(format!("Bucket {} was deleted.", query.name)))
|
||||||
}
|
}
|
||||||
BucketOperation::Allow(query) => {
|
BucketOperation::Allow(query) => {
|
||||||
|
@ -173,7 +167,7 @@ impl AdminRpcHandler {
|
||||||
.get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
|
.get_range(&EmptyKey, None, Some(DeletedFilter::NotDeleted), 10000)
|
||||||
.await?
|
.await?
|
||||||
.iter()
|
.iter()
|
||||||
.map(|k| (k.key_id.to_string(), k.name.to_string()))
|
.map(|k| (k.key_id.to_string(), k.name.get().clone()))
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
Ok(AdminRPC::KeyList(key_ids))
|
Ok(AdminRPC::KeyList(key_ids))
|
||||||
}
|
}
|
||||||
|
@ -182,14 +176,13 @@ impl AdminRpcHandler {
|
||||||
Ok(AdminRPC::KeyInfo(key))
|
Ok(AdminRPC::KeyInfo(key))
|
||||||
}
|
}
|
||||||
KeyOperation::New(query) => {
|
KeyOperation::New(query) => {
|
||||||
let key = Key::new(query.name, vec![]);
|
let key = Key::new(query.name);
|
||||||
self.garage.key_table.insert(&key).await?;
|
self.garage.key_table.insert(&key).await?;
|
||||||
Ok(AdminRPC::KeyInfo(key))
|
Ok(AdminRPC::KeyInfo(key))
|
||||||
}
|
}
|
||||||
KeyOperation::Rename(query) => {
|
KeyOperation::Rename(query) => {
|
||||||
let mut key = self.get_existing_key(&query.key_id).await?;
|
let mut key = self.get_existing_key(&query.key_id).await?;
|
||||||
key.name_timestamp = std::cmp::max(key.name_timestamp + 1, now_msec());
|
key.name.update(query.new_name);
|
||||||
key.name = query.new_name;
|
|
||||||
self.garage.key_table.insert(&key).await?;
|
self.garage.key_table.insert(&key).await?;
|
||||||
Ok(AdminRPC::KeyInfo(key))
|
Ok(AdminRPC::KeyInfo(key))
|
||||||
}
|
}
|
||||||
|
@ -201,16 +194,14 @@ impl AdminRpcHandler {
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
// --- done checking, now commit ---
|
// --- done checking, now commit ---
|
||||||
for ab in key.authorized_buckets().iter() {
|
for (ab_name, _, _) in key.authorized_buckets.items().iter() {
|
||||||
if let Some(bucket) =
|
if let Some(bucket) = self.garage.bucket_table.get(&EmptyKey, ab_name).await? {
|
||||||
self.garage.bucket_table.get(&EmptyKey, &ab.bucket).await?
|
if !bucket.is_deleted() {
|
||||||
{
|
|
||||||
if !bucket.deleted {
|
|
||||||
self.update_bucket_key(bucket, &key.key_id, false, false)
|
self.update_bucket_key(bucket, &key.key_id, false, false)
|
||||||
.await?;
|
.await?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return Err(Error::Message(format!("Bucket not found: {}", ab.bucket)));
|
return Err(Error::Message(format!("Bucket not found: {}", ab_name)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
let del_key = Key::delete(key.key_id);
|
let del_key = Key::delete(key.key_id);
|
||||||
|
@ -228,7 +219,7 @@ impl AdminRpcHandler {
|
||||||
.bucket_table
|
.bucket_table
|
||||||
.get(&EmptyKey, bucket)
|
.get(&EmptyKey, bucket)
|
||||||
.await?
|
.await?
|
||||||
.filter(|b| !b.deleted)
|
.filter(|b| !b.is_deleted())
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
.unwrap_or(Err(Error::BadRPC(format!(
|
.unwrap_or(Err(Error::BadRPC(format!(
|
||||||
"Bucket {} does not exist",
|
"Bucket {} does not exist",
|
||||||
|
@ -241,7 +232,7 @@ impl AdminRpcHandler {
|
||||||
.key_table
|
.key_table
|
||||||
.get(&EmptyKey, id)
|
.get(&EmptyKey, id)
|
||||||
.await?
|
.await?
|
||||||
.filter(|k| !k.deleted)
|
.filter(|k| !k.deleted.get())
|
||||||
.map(Ok)
|
.map(Ok)
|
||||||
.unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
|
.unwrap_or(Err(Error::BadRPC(format!("Key {} does not exist", id))))
|
||||||
}
|
}
|
||||||
|
@ -253,23 +244,20 @@ impl AdminRpcHandler {
|
||||||
allow_read: bool,
|
allow_read: bool,
|
||||||
allow_write: bool,
|
allow_write: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let timestamp = match bucket
|
if let BucketState::Present(ak) = bucket.state.get_mut() {
|
||||||
.authorized_keys()
|
let old_ak = ak.take_and_clear();
|
||||||
.iter()
|
ak.merge(&old_ak.update_mutator(
|
||||||
.find(|x| x.key_id == *key_id)
|
key_id.to_string(),
|
||||||
{
|
PermissionSet {
|
||||||
None => now_msec(),
|
allow_read,
|
||||||
Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
|
allow_write,
|
||||||
};
|
},
|
||||||
bucket.clear_keys();
|
));
|
||||||
bucket
|
} else {
|
||||||
.add_key(AllowedKey {
|
return Err(Error::Message(format!(
|
||||||
key_id: key_id.clone(),
|
"Bucket is deleted in update_bucket_key"
|
||||||
timestamp,
|
)));
|
||||||
allow_read,
|
}
|
||||||
allow_write,
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
self.garage.bucket_table.insert(&bucket).await?;
|
self.garage.bucket_table.insert(&bucket).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -281,22 +269,14 @@ impl AdminRpcHandler {
|
||||||
allow_read: bool,
|
allow_read: bool,
|
||||||
allow_write: bool,
|
allow_write: bool,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let timestamp = match key
|
let old_map = key.authorized_buckets.take_and_clear();
|
||||||
.authorized_buckets()
|
key.authorized_buckets.merge(&old_map.update_mutator(
|
||||||
.iter()
|
bucket.clone(),
|
||||||
.find(|x| x.bucket == *bucket)
|
PermissionSet {
|
||||||
{
|
allow_read,
|
||||||
None => now_msec(),
|
allow_write,
|
||||||
Some(ab) => std::cmp::max(ab.timestamp + 1, now_msec()),
|
},
|
||||||
};
|
));
|
||||||
key.clear_buckets();
|
|
||||||
key.add_bucket(AllowedBucket {
|
|
||||||
bucket: bucket.clone(),
|
|
||||||
timestamp,
|
|
||||||
allow_read,
|
|
||||||
allow_write,
|
|
||||||
})
|
|
||||||
.unwrap();
|
|
||||||
self.garage.key_table.insert(&key).await?;
|
self.garage.key_table.insert(&key).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,7 @@ use garage_rpc::rpc_client::*;
|
||||||
use garage_rpc::rpc_server::*;
|
use garage_rpc::rpc_server::*;
|
||||||
|
|
||||||
use garage_table::table_sharded::TableShardedReplication;
|
use garage_table::table_sharded::TableShardedReplication;
|
||||||
use garage_table::{TableReplication, DeletedFilter};
|
use garage_table::{DeletedFilter, TableReplication};
|
||||||
|
|
||||||
use crate::block_ref_table::*;
|
use crate::block_ref_table::*;
|
||||||
|
|
||||||
|
|
|
@ -1,69 +1,58 @@
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_table::crdt::CRDT;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
use crate::key_table::PermissionSet;
|
||||||
|
|
||||||
|
use model010::bucket_table as prev;
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Bucket {
|
pub struct Bucket {
|
||||||
// Primary key
|
// Primary key
|
||||||
pub name: String,
|
pub name: String,
|
||||||
|
|
||||||
// Timestamp and deletion
|
pub state: crdt::LWW<BucketState>,
|
||||||
// Upon version increment, all info is replaced
|
|
||||||
pub timestamp: u64,
|
|
||||||
pub deleted: bool,
|
|
||||||
|
|
||||||
// Authorized keys
|
|
||||||
authorized_keys: Vec<AllowedKey>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Bucket {
|
|
||||||
pub fn new(
|
|
||||||
name: String,
|
|
||||||
timestamp: u64,
|
|
||||||
deleted: bool,
|
|
||||||
authorized_keys: Vec<AllowedKey>,
|
|
||||||
) -> Self {
|
|
||||||
let mut ret = Bucket {
|
|
||||||
name,
|
|
||||||
timestamp,
|
|
||||||
deleted,
|
|
||||||
authorized_keys: vec![],
|
|
||||||
};
|
|
||||||
for key in authorized_keys {
|
|
||||||
ret.add_key(key)
|
|
||||||
.expect("Duplicate AllowedKey in Bucket constructor");
|
|
||||||
}
|
|
||||||
ret
|
|
||||||
}
|
|
||||||
/// Add a key only if it is not already present
|
|
||||||
pub fn add_key(&mut self, key: AllowedKey) -> Result<(), ()> {
|
|
||||||
match self
|
|
||||||
.authorized_keys
|
|
||||||
.binary_search_by(|k| k.key_id.cmp(&key.key_id))
|
|
||||||
{
|
|
||||||
Err(i) => {
|
|
||||||
self.authorized_keys.insert(i, key);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Ok(_) => Err(()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn authorized_keys(&self) -> &[AllowedKey] {
|
|
||||||
&self.authorized_keys[..]
|
|
||||||
}
|
|
||||||
pub fn clear_keys(&mut self) {
|
|
||||||
self.authorized_keys.clear();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AllowedKey {
|
pub enum BucketState {
|
||||||
pub key_id: String,
|
Deleted,
|
||||||
pub timestamp: u64,
|
Present(crdt::LWWMap<String, PermissionSet>),
|
||||||
pub allow_read: bool,
|
}
|
||||||
pub allow_write: bool,
|
|
||||||
|
impl CRDT for BucketState {
|
||||||
|
fn merge(&mut self, o: &Self) {
|
||||||
|
match o {
|
||||||
|
BucketState::Deleted => *self = BucketState::Deleted,
|
||||||
|
BucketState::Present(other_ak) => {
|
||||||
|
if let BucketState::Present(ak) = self {
|
||||||
|
ak.merge(other_ak);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Bucket {
|
||||||
|
pub fn new(name: String) -> Self {
|
||||||
|
Bucket {
|
||||||
|
name,
|
||||||
|
state: crdt::LWW::new(BucketState::Present(crdt::LWWMap::new())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn is_deleted(&self) -> bool {
|
||||||
|
*self.state.get() == BucketState::Deleted
|
||||||
|
}
|
||||||
|
pub fn authorized_keys(&self) -> &[(String, u64, PermissionSet)] {
|
||||||
|
match self.state.get() {
|
||||||
|
BucketState::Deleted => &[],
|
||||||
|
BucketState::Present(ak) => ak.items(),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Entry<EmptyKey, String> for Bucket {
|
impl Entry<EmptyKey, String> for Bucket {
|
||||||
|
@ -75,36 +64,12 @@ impl Entry<EmptyKey, String> for Bucket {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
if other.timestamp > self.timestamp {
|
self.state.merge(&other.state);
|
||||||
*self = other.clone();
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if self.timestamp > other.timestamp || self.deleted {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
for ak in other.authorized_keys.iter() {
|
|
||||||
match self
|
|
||||||
.authorized_keys
|
|
||||||
.binary_search_by(|our_ak| our_ak.key_id.cmp(&ak.key_id))
|
|
||||||
{
|
|
||||||
Ok(i) => {
|
|
||||||
let our_ak = &mut self.authorized_keys[i];
|
|
||||||
if ak.timestamp > our_ak.timestamp {
|
|
||||||
*our_ak = ak.clone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(i) => {
|
|
||||||
self.authorized_keys.insert(i, ak.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct BucketTable;
|
pub struct BucketTable;
|
||||||
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl TableSchema for BucketTable {
|
impl TableSchema for BucketTable {
|
||||||
type P = EmptyKey;
|
type P = EmptyKey;
|
||||||
|
@ -117,6 +82,35 @@ impl TableSchema for BucketTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||||
filter.apply(entry.deleted)
|
filter.apply(entry.is_deleted())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
||||||
|
let old = match rmp_serde::decode::from_read_ref::<_, prev::Bucket>(bytes) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(_) => return None,
|
||||||
|
};
|
||||||
|
if old.deleted {
|
||||||
|
Some(Bucket {
|
||||||
|
name: old.name,
|
||||||
|
state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Deleted),
|
||||||
|
})
|
||||||
|
} else {
|
||||||
|
let mut keys = crdt::LWWMap::new();
|
||||||
|
for ak in old.authorized_keys() {
|
||||||
|
keys.merge(&crdt::LWWMap::migrate_from_raw_item(
|
||||||
|
ak.key_id.clone(),
|
||||||
|
ak.timestamp,
|
||||||
|
PermissionSet {
|
||||||
|
allow_read: ak.allow_read,
|
||||||
|
allow_write: ak.allow_write,
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Some(Bucket {
|
||||||
|
name: old.name,
|
||||||
|
state: crdt::LWW::migrate_from_raw(old.timestamp, BucketState::Present(keys)),
|
||||||
|
})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,10 +1,13 @@
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_table::crdt::CRDT;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::*;
|
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
|
use model010::key_table as prev;
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct Key {
|
pub struct Key {
|
||||||
// Primary key
|
// Primary key
|
||||||
|
@ -14,83 +17,54 @@ pub struct Key {
|
||||||
pub secret_key: String,
|
pub secret_key: String,
|
||||||
|
|
||||||
// Name
|
// Name
|
||||||
pub name: String,
|
pub name: crdt::LWW<String>,
|
||||||
pub name_timestamp: u64,
|
|
||||||
|
|
||||||
// Deletion
|
// Deletion
|
||||||
pub deleted: bool,
|
pub deleted: crdt::Bool,
|
||||||
|
|
||||||
// Authorized keys
|
// Authorized keys
|
||||||
authorized_buckets: Vec<AllowedBucket>,
|
pub authorized_buckets: crdt::LWWMap<String, PermissionSet>,
|
||||||
|
// CRDT interaction: deleted implies authorized_buckets is empty
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Key {
|
impl Key {
|
||||||
pub fn new(name: String, buckets: Vec<AllowedBucket>) -> Self {
|
pub fn new(name: String) -> Self {
|
||||||
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
|
let key_id = format!("GK{}", hex::encode(&rand::random::<[u8; 12]>()[..]));
|
||||||
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
|
let secret_key = hex::encode(&rand::random::<[u8; 32]>()[..]);
|
||||||
let mut ret = Self {
|
Self {
|
||||||
key_id,
|
key_id,
|
||||||
secret_key,
|
secret_key,
|
||||||
name,
|
name: crdt::LWW::new(name),
|
||||||
name_timestamp: now_msec(),
|
deleted: crdt::Bool::new(false),
|
||||||
deleted: false,
|
authorized_buckets: crdt::LWWMap::new(),
|
||||||
authorized_buckets: vec![],
|
|
||||||
};
|
|
||||||
for b in buckets {
|
|
||||||
ret.add_bucket(b)
|
|
||||||
.expect("Duplicate AllowedBucket in Key constructor");
|
|
||||||
}
|
}
|
||||||
ret
|
|
||||||
}
|
}
|
||||||
pub fn delete(key_id: String) -> Self {
|
pub fn delete(key_id: String) -> Self {
|
||||||
Self {
|
Self {
|
||||||
key_id,
|
key_id,
|
||||||
secret_key: "".into(),
|
secret_key: "".into(),
|
||||||
name: "".into(),
|
name: crdt::LWW::new("".to_string()),
|
||||||
name_timestamp: now_msec(),
|
deleted: crdt::Bool::new(true),
|
||||||
deleted: true,
|
authorized_buckets: crdt::LWWMap::new(),
|
||||||
authorized_buckets: vec![],
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
/// Add an authorized bucket, only if it wasn't there before
|
/// Add an authorized bucket, only if it wasn't there before
|
||||||
pub fn add_bucket(&mut self, new: AllowedBucket) -> Result<(), ()> {
|
|
||||||
match self
|
|
||||||
.authorized_buckets
|
|
||||||
.binary_search_by(|b| b.bucket.cmp(&new.bucket))
|
|
||||||
{
|
|
||||||
Err(i) => {
|
|
||||||
self.authorized_buckets.insert(i, new);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Ok(_) => Err(()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pub fn authorized_buckets(&self) -> &[AllowedBucket] {
|
|
||||||
&self.authorized_buckets[..]
|
|
||||||
}
|
|
||||||
pub fn clear_buckets(&mut self) {
|
|
||||||
self.authorized_buckets.clear();
|
|
||||||
}
|
|
||||||
pub fn allow_read(&self, bucket: &str) -> bool {
|
pub fn allow_read(&self, bucket: &str) -> bool {
|
||||||
self.authorized_buckets
|
self.authorized_buckets
|
||||||
.iter()
|
.get(&bucket.to_string())
|
||||||
.find(|x| x.bucket.as_str() == bucket)
|
|
||||||
.map(|x| x.allow_read)
|
.map(|x| x.allow_read)
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
pub fn allow_write(&self, bucket: &str) -> bool {
|
pub fn allow_write(&self, bucket: &str) -> bool {
|
||||||
self.authorized_buckets
|
self.authorized_buckets
|
||||||
.iter()
|
.get(&bucket.to_string())
|
||||||
.find(|x| x.bucket.as_str() == bucket)
|
|
||||||
.map(|x| x.allow_write)
|
.map(|x| x.allow_write)
|
||||||
.unwrap_or(false)
|
.unwrap_or(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct AllowedBucket {
|
pub struct PermissionSet {
|
||||||
pub bucket: String,
|
|
||||||
pub timestamp: u64,
|
|
||||||
pub allow_read: bool,
|
pub allow_read: bool,
|
||||||
pub allow_write: bool,
|
pub allow_write: bool,
|
||||||
}
|
}
|
||||||
|
@ -104,35 +78,15 @@ impl Entry<EmptyKey, String> for Key {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge(&mut self, other: &Self) {
|
fn merge(&mut self, other: &Self) {
|
||||||
if other.name_timestamp > self.name_timestamp {
|
self.name.merge(&other.name);
|
||||||
self.name_timestamp = other.name_timestamp;
|
self.deleted.merge(&other.deleted);
|
||||||
self.name = other.name.clone();
|
|
||||||
}
|
|
||||||
|
|
||||||
if other.deleted {
|
if self.deleted.get() {
|
||||||
self.deleted = true;
|
|
||||||
}
|
|
||||||
if self.deleted {
|
|
||||||
self.authorized_buckets.clear();
|
self.authorized_buckets.clear();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
for ab in other.authorized_buckets.iter() {
|
self.authorized_buckets.merge(&other.authorized_buckets);
|
||||||
match self
|
|
||||||
.authorized_buckets
|
|
||||||
.binary_search_by(|our_ab| our_ab.bucket.cmp(&ab.bucket))
|
|
||||||
{
|
|
||||||
Ok(i) => {
|
|
||||||
let our_ab = &mut self.authorized_buckets[i];
|
|
||||||
if ab.timestamp > our_ab.timestamp {
|
|
||||||
*our_ab = ab.clone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(i) => {
|
|
||||||
self.authorized_buckets.insert(i, ab.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -150,6 +104,32 @@ impl TableSchema for KeyTable {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
|
||||||
filter.apply(entry.deleted)
|
filter.apply(entry.deleted.get())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
|
||||||
|
let old = match rmp_serde::decode::from_read_ref::<_, prev::Key>(bytes) {
|
||||||
|
Ok(x) => x,
|
||||||
|
Err(_) => return None,
|
||||||
|
};
|
||||||
|
let mut new = Self::E {
|
||||||
|
key_id: old.key_id.clone(),
|
||||||
|
secret_key: old.secret_key.clone(),
|
||||||
|
name: crdt::LWW::migrate_from_raw(old.name_timestamp, old.name.clone()),
|
||||||
|
deleted: crdt::Bool::new(old.deleted),
|
||||||
|
authorized_buckets: crdt::LWWMap::new(),
|
||||||
|
};
|
||||||
|
for ab in old.authorized_buckets() {
|
||||||
|
let it = crdt::LWWMap::migrate_from_raw_item(
|
||||||
|
ab.bucket.clone(),
|
||||||
|
ab.timestamp,
|
||||||
|
PermissionSet {
|
||||||
|
allow_read: ab.allow_read,
|
||||||
|
allow_write: ab.allow_write,
|
||||||
|
},
|
||||||
|
);
|
||||||
|
new.authorized_buckets.merge(&it);
|
||||||
|
}
|
||||||
|
Some(new)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -21,6 +21,7 @@ rand = "0.7"
|
||||||
hex = "0.3"
|
hex = "0.3"
|
||||||
arc-swap = "0.4"
|
arc-swap = "0.4"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
|
hexdump = "0.1"
|
||||||
|
|
||||||
sled = "0.31"
|
sled = "0.31"
|
||||||
|
|
||||||
|
|
162
src/table/crdt.rs
Normal file
162
src/table/crdt.rs
Normal file
|
@ -0,0 +1,162 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
pub trait CRDT {
|
||||||
|
fn merge(&mut self, other: &Self);
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> CRDT for T
|
||||||
|
where
|
||||||
|
T: Ord + Clone,
|
||||||
|
{
|
||||||
|
fn merge(&mut self, other: &Self) {
|
||||||
|
if other > self {
|
||||||
|
*self = other.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- LWW Register ----
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct LWW<T> {
|
||||||
|
ts: u64,
|
||||||
|
v: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> LWW<T>
|
||||||
|
where
|
||||||
|
T: CRDT,
|
||||||
|
{
|
||||||
|
pub fn new(value: T) -> Self {
|
||||||
|
Self {
|
||||||
|
ts: now_msec(),
|
||||||
|
v: value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn migrate_from_raw(ts: u64, value: T) -> Self {
|
||||||
|
Self { ts, v: value }
|
||||||
|
}
|
||||||
|
pub fn update(&mut self, new_value: T) {
|
||||||
|
self.ts = std::cmp::max(self.ts + 1, now_msec());
|
||||||
|
self.v = new_value;
|
||||||
|
}
|
||||||
|
pub fn get(&self) -> &T {
|
||||||
|
&self.v
|
||||||
|
}
|
||||||
|
pub fn get_mut(&mut self) -> &mut T {
|
||||||
|
&mut self.v
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> CRDT for LWW<T>
|
||||||
|
where
|
||||||
|
T: Clone + CRDT,
|
||||||
|
{
|
||||||
|
fn merge(&mut self, other: &Self) {
|
||||||
|
if other.ts > self.ts {
|
||||||
|
self.ts = other.ts;
|
||||||
|
self.v = other.v.clone();
|
||||||
|
} else if other.ts == self.ts {
|
||||||
|
self.v.merge(&other.v);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- Boolean (true as absorbing state) ----
|
||||||
|
|
||||||
|
#[derive(Clone, Copy, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct Bool(bool);
|
||||||
|
|
||||||
|
impl Bool {
|
||||||
|
pub fn new(b: bool) -> Self {
|
||||||
|
Self(b)
|
||||||
|
}
|
||||||
|
pub fn set(&mut self) {
|
||||||
|
self.0 = true;
|
||||||
|
}
|
||||||
|
pub fn get(&self) -> bool {
|
||||||
|
self.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl CRDT for Bool {
|
||||||
|
fn merge(&mut self, other: &Self) {
|
||||||
|
self.0 = self.0 || other.0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- LWW Map ----
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
||||||
|
pub struct LWWMap<K, V> {
|
||||||
|
vals: Vec<(K, u64, V)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K, V> LWWMap<K, V>
|
||||||
|
where
|
||||||
|
K: Ord,
|
||||||
|
V: CRDT,
|
||||||
|
{
|
||||||
|
pub fn new() -> Self {
|
||||||
|
Self { vals: vec![] }
|
||||||
|
}
|
||||||
|
pub fn migrate_from_raw_item(k: K, ts: u64, v: V) -> Self {
|
||||||
|
Self {
|
||||||
|
vals: vec![(k, ts, v)],
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn take_and_clear(&mut self) -> Self {
|
||||||
|
let vals = std::mem::replace(&mut self.vals, vec![]);
|
||||||
|
Self { vals }
|
||||||
|
}
|
||||||
|
pub fn clear(&mut self) {
|
||||||
|
self.vals.clear();
|
||||||
|
}
|
||||||
|
pub fn update_mutator(&self, k: K, new_v: V) -> Self {
|
||||||
|
let new_vals = match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
||||||
|
Ok(i) => {
|
||||||
|
let (_, old_ts, _) = self.vals[i];
|
||||||
|
let new_ts = std::cmp::max(old_ts + 1, now_msec());
|
||||||
|
vec![(k, new_ts, new_v)]
|
||||||
|
}
|
||||||
|
Err(_) => vec![(k, now_msec(), new_v)],
|
||||||
|
};
|
||||||
|
Self { vals: new_vals }
|
||||||
|
}
|
||||||
|
pub fn get(&self, k: &K) -> Option<&V> {
|
||||||
|
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
||||||
|
Ok(i) => Some(&self.vals[i].2),
|
||||||
|
Err(_) => None,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pub fn items(&self) -> &[(K, u64, V)] {
|
||||||
|
&self.vals[..]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<K, V> CRDT for LWWMap<K, V>
|
||||||
|
where
|
||||||
|
K: Clone + Ord,
|
||||||
|
V: Clone + CRDT,
|
||||||
|
{
|
||||||
|
fn merge(&mut self, other: &Self) {
|
||||||
|
for (k, ts2, v2) in other.vals.iter() {
|
||||||
|
match self.vals.binary_search_by(|(k2, _, _)| k2.cmp(&k)) {
|
||||||
|
Ok(i) => {
|
||||||
|
let (_, ts1, _v1) = &self.vals[i];
|
||||||
|
if ts2 > ts1 {
|
||||||
|
self.vals[i].1 = *ts2;
|
||||||
|
self.vals[i].2 = v2.clone();
|
||||||
|
} else if ts1 == ts2 {
|
||||||
|
self.vals[i].2.merge(&v2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(i) => {
|
||||||
|
self.vals.insert(i, (k.clone(), *ts2, v2.clone()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate log;
|
extern crate log;
|
||||||
|
|
||||||
|
pub mod crdt;
|
||||||
pub mod schema;
|
pub mod schema;
|
||||||
pub mod util;
|
pub mod util;
|
||||||
|
|
||||||
|
@ -12,5 +13,5 @@ pub mod table_sharded;
|
||||||
pub mod table_sync;
|
pub mod table_sync;
|
||||||
|
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
pub use util::*;
|
|
||||||
pub use table::*;
|
pub use table::*;
|
||||||
|
pub use util::*;
|
||||||
|
|
|
@ -20,7 +20,6 @@ impl PartitionKey for Hash {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub trait SortKey {
|
pub trait SortKey {
|
||||||
fn sort_key(&self) -> &[u8];
|
fn sort_key(&self) -> &[u8];
|
||||||
}
|
}
|
||||||
|
@ -37,7 +36,6 @@ impl SortKey for Hash {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub trait Entry<P: PartitionKey, S: SortKey>:
|
pub trait Entry<P: PartitionKey, S: SortKey>:
|
||||||
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
|
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
|
||||||
{
|
{
|
||||||
|
@ -47,7 +45,6 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
|
||||||
fn merge(&mut self, other: &Self);
|
fn merge(&mut self, other: &Self);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait TableSchema: Send + Sync {
|
pub trait TableSchema: Send + Sync {
|
||||||
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
@ -66,4 +63,3 @@ pub trait TableSchema: Send + Sync {
|
||||||
true
|
true
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use log::warn;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -185,7 +187,7 @@ where
|
||||||
for resp in resps {
|
for resp in resps {
|
||||||
if let TableRPC::ReadEntryResponse(value) = resp {
|
if let TableRPC::ReadEntryResponse(value) = resp {
|
||||||
if let Some(v_bytes) = value {
|
if let Some(v_bytes) = value {
|
||||||
let v = Self::decode_entry(v_bytes.as_slice())?;
|
let v = self.decode_entry(v_bytes.as_slice())?;
|
||||||
ret = match ret {
|
ret = match ret {
|
||||||
None => Some(v),
|
None => Some(v),
|
||||||
Some(mut x) => {
|
Some(mut x) => {
|
||||||
|
@ -241,7 +243,7 @@ where
|
||||||
for resp in resps {
|
for resp in resps {
|
||||||
if let TableRPC::Update(entries) = resp {
|
if let TableRPC::Update(entries) = resp {
|
||||||
for entry_bytes in entries.iter() {
|
for entry_bytes in entries.iter() {
|
||||||
let entry = Self::decode_entry(entry_bytes.as_slice())?;
|
let entry = self.decode_entry(entry_bytes.as_slice())?;
|
||||||
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
|
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
|
||||||
match ret.remove(&entry_key) {
|
match ret.remove(&entry_key) {
|
||||||
None => {
|
None => {
|
||||||
|
@ -363,7 +365,7 @@ where
|
||||||
let keep = match filter {
|
let keep = match filter {
|
||||||
None => true,
|
None => true,
|
||||||
Some(f) => {
|
Some(f) => {
|
||||||
let entry = Self::decode_entry(value.as_ref())?;
|
let entry = self.decode_entry(value.as_ref())?;
|
||||||
F::matches_filter(&entry, f)
|
F::matches_filter(&entry, f)
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -382,14 +384,14 @@ where
|
||||||
let mut epidemic_propagate = vec![];
|
let mut epidemic_propagate = vec![];
|
||||||
|
|
||||||
for update_bytes in entries.iter() {
|
for update_bytes in entries.iter() {
|
||||||
let update = Self::decode_entry(update_bytes.as_slice())?;
|
let update = self.decode_entry(update_bytes.as_slice())?;
|
||||||
|
|
||||||
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
let tree_key = self.tree_key(update.partition_key(), update.sort_key());
|
||||||
|
|
||||||
let (old_entry, new_entry) = self.store.transaction(|db| {
|
let (old_entry, new_entry) = self.store.transaction(|db| {
|
||||||
let (old_entry, new_entry) = match db.get(&tree_key)? {
|
let (old_entry, new_entry) = match db.get(&tree_key)? {
|
||||||
Some(prev_bytes) => {
|
Some(prev_bytes) => {
|
||||||
let old_entry = Self::decode_entry(&prev_bytes)
|
let old_entry = self.decode_entry(&prev_bytes)
|
||||||
.map_err(sled::ConflictableTransactionError::Abort)?;
|
.map_err(sled::ConflictableTransactionError::Abort)?;
|
||||||
let mut new_entry = old_entry.clone();
|
let mut new_entry = old_entry.clone();
|
||||||
new_entry.merge(&update);
|
new_entry.merge(&update);
|
||||||
|
@ -437,7 +439,7 @@ where
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
if let Some(old_val) = self.store.remove(&key)? {
|
if let Some(old_val) = self.store.remove(&key)? {
|
||||||
let old_entry = Self::decode_entry(&old_val)?;
|
let old_entry = self.decode_entry(&old_val)?;
|
||||||
self.instance.updated(Some(old_entry), None).await?;
|
self.instance.updated(Some(old_entry), None).await?;
|
||||||
self.system
|
self.system
|
||||||
.background
|
.background
|
||||||
|
@ -455,12 +457,18 @@ where
|
||||||
ret
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> {
|
fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
|
||||||
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
|
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
|
||||||
Ok(x) => Ok(x),
|
Ok(x) => Ok(x),
|
||||||
Err(e) => match F::try_migrate(bytes) {
|
Err(e) => match F::try_migrate(bytes) {
|
||||||
Some(x) => Ok(x),
|
Some(x) => Ok(x),
|
||||||
None => Err(e.into()),
|
None => {
|
||||||
|
warn!("Unable to decode entry of {}: {}", self.name, e);
|
||||||
|
for line in hexdump::hexdump_iter(bytes) {
|
||||||
|
debug!("{}", line);
|
||||||
|
}
|
||||||
|
Err(e.into())
|
||||||
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue