Add support for model migrations

This commit is contained in:
Alex 2020-07-08 16:10:53 +02:00
parent 86fb7bbba5
commit 86bf4dedac
8 changed files with 220 additions and 91 deletions

119
Cargo.lock generated
View file

@ -340,15 +340,15 @@ dependencies = [
[[package]] [[package]]
name = "garage" name = "garage"
version = "0.1.0" version = "0.1.1"
dependencies = [ dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_api 0.1.0", "garage_api 0.1.1",
"garage_model 0.1.0", "garage_model 0.1.1",
"garage_rpc 0.1.0", "garage_rpc 0.1.0",
"garage_table 0.1.0", "garage_table 0.1.1",
"garage_util 0.1.0", "garage_util 0.1.0",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -365,15 +365,15 @@ dependencies = [
[[package]] [[package]]
name = "garage_api" name = "garage_api"
version = "0.1.0" version = "0.1.1"
dependencies = [ dependencies = [
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"chrono 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)", "chrono 0.4.13 (registry+https://github.com/rust-lang/crates.io-index)",
"crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)", "crypto-mac 0.7.0 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_model 0.1.0", "garage_model 0.1.1",
"garage_table 0.1.0", "garage_table 0.1.1",
"garage_util 0.1.0", "garage_util 0.1.0",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)", "hmac 0.7.1 (registry+https://github.com/rust-lang/crates.io-index)",
@ -392,14 +392,39 @@ dependencies = [
[[package]] [[package]]
name = "garage_model" name = "garage_model"
version = "0.1.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [ dependencies = [
"arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)", "bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)", "futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_rpc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_table 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_bytes 0.11.5 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "garage_model"
version = "0.1.1"
dependencies = [
"arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_model 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_rpc 0.1.0", "garage_rpc 0.1.0",
"garage_table 0.1.0", "garage_table 0.1.1",
"garage_util 0.1.0", "garage_util 0.1.0",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)", "hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)", "log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
@ -438,9 +463,58 @@ dependencies = [
"webpki 0.21.3 (registry+https://github.com/rust-lang/crates.io-index)", "webpki 0.21.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "garage_rpc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.6 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper-rustls 0.20.0 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.56 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio-rustls 0.13.1 (registry+https://github.com/rust-lang/crates.io-index)",
"webpki 0.21.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "garage_table" name = "garage_table"
version = "0.1.0" version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
"bytes 0.4.12 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_rpc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"garage_util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_bytes 0.11.5 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]]
name = "garage_table"
version = "0.1.1"
dependencies = [ dependencies = [
"arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)", "arc-swap 0.4.7 (registry+https://github.com/rust-lang/crates.io-index)",
"async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)", "async-trait 0.1.36 (registry+https://github.com/rust-lang/crates.io-index)",
@ -483,6 +557,31 @@ dependencies = [
"webpki 0.21.3 (registry+https://github.com/rust-lang/crates.io-index)", "webpki 0.21.3 (registry+https://github.com/rust-lang/crates.io-index)",
] ]
[[package]]
name = "garage_util"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
dependencies = [
"err-derive 0.2.4 (registry+https://github.com/rust-lang/crates.io-index)",
"futures 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)",
"hex 0.3.2 (registry+https://github.com/rust-lang/crates.io-index)",
"http 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)",
"hyper 0.13.6 (registry+https://github.com/rust-lang/crates.io-index)",
"log 0.4.8 (registry+https://github.com/rust-lang/crates.io-index)",
"rand 0.7.3 (registry+https://github.com/rust-lang/crates.io-index)",
"rmp-serde 0.14.3 (registry+https://github.com/rust-lang/crates.io-index)",
"roxmltree 0.11.0 (registry+https://github.com/rust-lang/crates.io-index)",
"rustls 0.17.0 (registry+https://github.com/rust-lang/crates.io-index)",
"serde 1.0.114 (registry+https://github.com/rust-lang/crates.io-index)",
"serde_json 1.0.56 (registry+https://github.com/rust-lang/crates.io-index)",
"sha2 0.8.2 (registry+https://github.com/rust-lang/crates.io-index)",
"sled 0.31.0 (registry+https://github.com/rust-lang/crates.io-index)",
"tokio 0.2.21 (registry+https://github.com/rust-lang/crates.io-index)",
"toml 0.5.6 (registry+https://github.com/rust-lang/crates.io-index)",
"webpki 0.21.3 (registry+https://github.com/rust-lang/crates.io-index)",
]
[[package]] [[package]]
name = "generator" name = "generator"
version = "0.6.21" version = "0.6.21"
@ -1629,6 +1728,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
"checksum futures-task 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626" "checksum futures-task 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "bdb66b5f09e22019b1ab0830f7785bcea8e7a42148683f99214f73f8ec21a626"
"checksum futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6" "checksum futures-util 0.3.5 (registry+https://github.com/rust-lang/crates.io-index)" = "8764574ff08b701a084482c3c7031349104b07ac897393010494beaa18ce32c6"
"checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c" "checksum fxhash 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "c31b6d751ae2c7f11320402d34e41349dd1016f8d5d45e48c4312bc8625af50c"
"checksum garage_model 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "b6403312b28077fd585e5f96184d2d56142f10eb42709d2893294d70a892d73f"
"checksum garage_rpc 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "0ef794407808c89f300f73457bdc567c953b323d7fe30534aba686a28ebb4a0d"
"checksum garage_table 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "567d7e9f791a1d65d26e36840aa0570b3a2144302c32f62b5e63a39f147cbf57"
"checksum garage_util 0.1.0 (registry+https://github.com/rust-lang/crates.io-index)" = "77de76a4167c041094f3f3415c6d3d773373e0326668fbce70dfd3b024788800"
"checksum generator 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "add72f17bb81521258fcc8a7a3245b1e184e916bfbe34f0ea89558f440df5c68" "checksum generator 0.6.21 (registry+https://github.com/rust-lang/crates.io-index)" = "add72f17bb81521258fcc8a7a3245b1e184e916bfbe34f0ea89558f440df5c68"
"checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec" "checksum generic-array 0.12.3 (registry+https://github.com/rust-lang/crates.io-index)" = "c68f0274ae0e023facc3c97b2e00f076be70e254bc851d972503b328db79b2ec"
"checksum gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028" "checksum gethostname 0.2.1 (registry+https://github.com/rust-lang/crates.io-index)" = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_api" name = "garage_api"
version = "0.1.0" version = "0.1.1"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "GPL-3.0" license = "GPL-3.0"
@ -14,8 +14,8 @@ path = "lib.rs"
[dependencies] [dependencies]
garage_util = { version = "0.1", path = "../util" } garage_util = { version = "0.1", path = "../util" }
garage_table = { version = "0.1", path = "../table" } garage_table = { version = "0.1.1", path = "../table" }
garage_model = { version = "0.1", path = "../model" } garage_model = { version = "0.1.1", path = "../model" }
bytes = "0.4" bytes = "0.4"
hex = "0.3" hex = "0.3"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage" name = "garage"
version = "0.1.0" version = "0.1.1"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "GPL-3.0" license = "GPL-3.0"
@ -16,9 +16,9 @@ path = "main.rs"
[dependencies] [dependencies]
garage_util = { version = "0.1", path = "../util" } garage_util = { version = "0.1", path = "../util" }
garage_rpc = { version = "0.1", path = "../rpc" } garage_rpc = { version = "0.1", path = "../rpc" }
garage_table = { version = "0.1", path = "../table" } garage_table = { version = "0.1.1", path = "../table" }
garage_model = { version = "0.1", path = "../model" } garage_model = { version = "0.1.1", path = "../model" }
garage_api = { version = "0.1", path = "../api" } garage_api = { version = "0.1.1", path = "../api" }
bytes = "0.4" bytes = "0.4"
rand = "0.7" rand = "0.7"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_model" name = "garage_model"
version = "0.1.0" version = "0.1.1"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "GPL-3.0" license = "GPL-3.0"
@ -15,7 +15,8 @@ path = "lib.rs"
[dependencies] [dependencies]
garage_util = { version = "0.1", path = "../util" } garage_util = { version = "0.1", path = "../util" }
garage_rpc = { version = "0.1", path = "../rpc" } garage_rpc = { version = "0.1", path = "../rpc" }
garage_table = { version = "0.1", path = "../table" } garage_table = { version = "0.1.1", path = "../table" }
model010 = { package = "garage_model", version = "0.1.0" }
bytes = "0.4" bytes = "0.4"
rand = "0.7" rand = "0.7"

View file

@ -1,6 +1,6 @@
[package] [package]
name = "garage_table" name = "garage_table"
version = "0.1.0" version = "0.1.1"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license = "GPL-3.0" license = "GPL-3.0"

View file

@ -3,9 +3,11 @@
#[macro_use] #[macro_use]
extern crate log; extern crate log;
pub mod schema;
pub mod table; pub mod table;
pub mod table_fullcopy; pub mod table_fullcopy;
pub mod table_sharded; pub mod table_sharded;
pub mod table_sync; pub mod table_sync;
pub use table::*; pub use table::*;
pub use schema::*;

77
src/table/schema.rs Normal file
View file

@ -0,0 +1,77 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_util::error::Error;
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
fn merge(&mut self, other: &Self);
}
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
impl PartitionKey for EmptyKey {
fn hash(&self) -> Hash {
[0u8; 32].into()
}
}
impl PartitionKey for String {
fn hash(&self) -> Hash {
hash(self.as_bytes())
}
}
impl SortKey for String {
fn sort_key(&self) -> &[u8] {
self.as_bytes()
}
}
impl PartitionKey for Hash {
fn hash(&self) -> Hash {
self.clone()
}
}
impl SortKey for Hash {
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
}
#[async_trait]
pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
// Action to take if not able to decode current version:
// try loading from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None
}
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
}

View file

@ -3,7 +3,6 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures::stream::*; use futures::stream::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
@ -16,6 +15,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use crate::table_sync::*; use crate::table_sync::*;
use crate::schema::*;
const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10); const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(10);
@ -48,70 +48,6 @@ pub enum TableRPC<F: TableSchema> {
impl<F: TableSchema> RpcMessage for TableRPC<F> {} impl<F: TableSchema> RpcMessage for TableRPC<F> {}
pub trait PartitionKey {
fn hash(&self) -> Hash;
}
pub trait SortKey {
fn sort_key(&self) -> &[u8];
}
pub trait Entry<P: PartitionKey, S: SortKey>:
PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync
{
fn partition_key(&self) -> &P;
fn sort_key(&self) -> &S;
fn merge(&mut self, other: &Self);
}
#[derive(Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct EmptyKey;
impl SortKey for EmptyKey {
fn sort_key(&self) -> &[u8] {
&[]
}
}
impl PartitionKey for EmptyKey {
fn hash(&self) -> Hash {
[0u8; 32].into()
}
}
impl PartitionKey for String {
fn hash(&self) -> Hash {
hash(self.as_bytes())
}
}
impl SortKey for String {
fn sort_key(&self) -> &[u8] {
self.as_bytes()
}
}
impl PartitionKey for Hash {
fn hash(&self) -> Hash {
self.clone()
}
}
impl SortKey for Hash {
fn sort_key(&self) -> &[u8] {
self.as_slice()
}
}
#[async_trait]
pub trait TableSchema: Send + Sync {
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
type E: Entry<Self::P, Self::S>;
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync;
async fn updated(&self, old: Option<Self::E>, new: Option<Self::E>) -> Result<(), Error>;
fn matches_filter(_entry: &Self::E, _filter: &Self::Filter) -> bool {
true
}
}
pub trait TableReplication: Send + Sync { pub trait TableReplication: Send + Sync {
// See examples in table_sharded.rs and table_fullcopy.rs // See examples in table_sharded.rs and table_fullcopy.rs
@ -250,7 +186,7 @@ where
for resp in resps { for resp in resps {
if let TableRPC::ReadEntryResponse(value) = resp { if let TableRPC::ReadEntryResponse(value) = resp {
if let Some(v_bytes) = value { if let Some(v_bytes) = value {
let v = rmp_serde::decode::from_read_ref::<_, F::E>(v_bytes.as_slice())?; let v = Self::decode_entry(v_bytes.as_slice())?;
ret = match ret { ret = match ret {
None => Some(v), None => Some(v),
Some(mut x) => { Some(mut x) => {
@ -306,8 +242,7 @@ where
for resp in resps { for resp in resps {
if let TableRPC::Update(entries) = resp { if let TableRPC::Update(entries) = resp {
for entry_bytes in entries.iter() { for entry_bytes in entries.iter() {
let entry = let entry = Self::decode_entry(entry_bytes.as_slice())?;
rmp_serde::decode::from_read_ref::<_, F::E>(entry_bytes.as_slice())?;
let entry_key = self.tree_key(entry.partition_key(), entry.sort_key()); let entry_key = self.tree_key(entry.partition_key(), entry.sort_key());
match ret.remove(&entry_key) { match ret.remove(&entry_key) {
None => { None => {
@ -429,7 +364,7 @@ where
let keep = match filter { let keep = match filter {
None => true, None => true,
Some(f) => { Some(f) => {
let entry = rmp_serde::decode::from_read_ref::<_, F::E>(value.as_ref())?; let entry = Self::decode_entry(value.as_ref())?;
F::matches_filter(&entry, f) F::matches_filter(&entry, f)
} }
}; };
@ -448,15 +383,14 @@ where
let mut epidemic_propagate = vec![]; let mut epidemic_propagate = vec![];
for update_bytes in entries.iter() { for update_bytes in entries.iter() {
let update = rmp_serde::decode::from_read_ref::<_, F::E>(update_bytes.as_slice())?; let update = Self::decode_entry(update_bytes.as_slice())?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| { let (old_entry, new_entry) = self.store.transaction(|db| {
let (old_entry, new_entry) = match db.get(&tree_key)? { let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&prev_bytes) let old_entry = Self::decode_entry(&prev_bytes)
.map_err(Error::RMPDecode)
.map_err(sled::ConflictableTransactionError::Abort)?; .map_err(sled::ConflictableTransactionError::Abort)?;
let mut new_entry = old_entry.clone(); let mut new_entry = old_entry.clone();
new_entry.merge(&update); new_entry.merge(&update);
@ -504,7 +438,7 @@ where
break; break;
} }
if let Some(old_val) = self.store.remove(&key)? { if let Some(old_val) = self.store.remove(&key)? {
let old_entry = rmp_serde::decode::from_read_ref::<_, F::E>(&old_val)?; let old_entry = Self::decode_entry(&old_val)?;
self.instance.updated(Some(old_entry), None).await?; self.instance.updated(Some(old_entry), None).await?;
self.system self.system
.background .background
@ -521,4 +455,16 @@ where
ret.extend(s.sort_key()); ret.extend(s.sort_key());
ret ret
} }
fn decode_entry(bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x),
Err(e) => {
match F::try_migrate(bytes) {
Some(x) => Ok(x),
None => Err(e.into()),
}
}
}
}
} }