Use cleaner CRDT data types for objects to avoid accidents like #16 #18

Merged
lx merged 7 commits from feature/better-crdt into master 2020-11-21 15:01:30 +00:00
Showing only changes of commit f9be964c3f - Show all commits

View file

@ -2,6 +2,8 @@ use std::collections::{BTreeMap, HashMap};
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use log::warn;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use futures::stream::*; use futures::stream::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -185,7 +187,7 @@ where
for resp in resps { for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp { if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value { if let Some(v_bytes) = value {
let v = Self::decode_entry(v_bytes.as_slice())?; let v = self.decode_entry(v_bytes.as_slice())?;
ret = match ret { ret = match ret {
None => Some(v), None => Some(v),
Some(mut x) => { Some(mut x) => {
@ -241,7 +243,7 @@ where
for resp in resps { for resp in resps {
if let TableRPC::Update(entries) = resp { if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() { for entry_bytes in entries.iter() {
let entry = Self::decode_entry(entry_bytes.as_slice())?; let entry = self.decode_entry(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) { match ret.remove(&entry_key) {
None => { None => {
@ -363,7 +365,7 @@ where
let keep = match filter { let keep = match filter {
None => true, None => true,
Some(f) => { Some(f) => {
let entry = Self::decode_entry(value.as_ref())?; let entry = self.decode_entry(value.as_ref())?;
F::matches_filter(&entry, f) F::matches_filter(&entry, f)
} }
}; };
@ -382,14 +384,14 @@ where
let mut epidemic_propagate = vec![]; let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() { for update_bytes in entries.iter() {
let update = Self::decode_entry(update_bytes.as_slice())?; let update = self.decode_entry(update_bytes.as_slice())?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? { let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let old_entry = Self::decode_entry(&prev_bytes) let old_entry = self.decode_entry(&prev_bytes)
.map_err(sled::ConflictableTransactionError::Abort)?; .map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone(); let mut new_entry = old_entry.clone();
new_entry.merge(&update); new_entry.merge(&update);
@ -437,7 +439,7 @@ where
break; break;
} }
if let Some(old_val) = self.store.remove(&key)? { if let Some(old_val) = self.store.remove(&key)? {
let old_entry = Self::decode_entry(&old_val)?; let old_entry = self.decode_entry(&old_val)?;
self.instance.updated(Some(old_entry), None).await?; self.instance.updated(Some(old_entry), None).await?;
self.system self.system
.background .background
@ -455,12 +457,15 @@ where
ret ret
} }
fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> { fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) { Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x), Some(x) => Ok(x),
None => Err(e.into()), None => {
warn!("Unable to decode entry of {}: {}", self.name, e);
Err(e.into())
}
}, },
} }
} }