Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
17 changed files with 30 additions and 46 deletions
Showing only changes of commit a77efd7ca6 - Show all commits

34
Cargo.lock generated
View file

@ -970,7 +970,7 @@ dependencies = [
"pretty_env_logger", "pretty_env_logger",
"prometheus", "prometheus",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 1.1.0", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"serde_json", "serde_json",
@ -1042,7 +1042,7 @@ dependencies = [
"hex", "hex",
"opentelemetry", "opentelemetry",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 1.1.0", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"tokio", "tokio",
@ -1111,7 +1111,7 @@ dependencies = [
"netapp 0.4.4", "netapp 0.4.4",
"opentelemetry", "opentelemetry",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 1.1.0", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"tokio", "tokio",
@ -1168,7 +1168,7 @@ dependencies = [
"opentelemetry", "opentelemetry",
"pnet_datalink", "pnet_datalink",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 1.1.0", "rmp-serde 0.15.5",
"schemars", "schemars",
"serde", "serde",
"serde_bytes", "serde_bytes",
@ -1214,7 +1214,7 @@ dependencies = [
"hexdump", "hexdump",
"opentelemetry", "opentelemetry",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 1.1.0", "rmp-serde 0.15.5",
"serde", "serde",
"serde_bytes", "serde_bytes",
"tokio", "tokio",
@ -1262,7 +1262,7 @@ dependencies = [
"netapp 0.4.4", "netapp 0.4.4",
"opentelemetry", "opentelemetry",
"rand 0.8.5", "rand 0.8.5",
"rmp-serde 1.1.0", "rmp-serde 0.15.5",
"serde", "serde",
"serde_json", "serde_json",
"sha2", "sha2",
@ -2351,12 +2351,6 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "paste"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0c520e05135d6e763148b6426a837e239041653ba7becd2e538c076c738025fc"
[[package]] [[package]]
name = "pem" name = "pem"
version = "0.8.3" version = "0.8.3"
@ -2746,13 +2740,12 @@ dependencies = [
[[package]] [[package]]
name = "rmp" name = "rmp"
version = "0.8.11" version = "0.8.10"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44519172358fd6d58656c86ab8e7fbc9e1490c3e8f14d35ed78ca0dd07403c9f" checksum = "4f55e5fa1446c4d5dd1f5daeed2a4fe193071771a2636274d0d7a3b082aa7ad6"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"num-traits", "num-traits",
"paste",
] ]
[[package]] [[package]]
@ -2777,17 +2770,6 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "rmp-serde"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25786b0d276110195fa3d6f3f31299900cf71dfbd6c28450f3f58a0e7f7a347e"
dependencies = [
"byteorder",
"rmp",
"serde",
]
[[package]] [[package]]
name = "roxmltree" name = "roxmltree"
version = "0.14.1" version = "0.14.1"

View file

@ -28,7 +28,7 @@ tracing = "0.1.30"
rand = "0.8" rand = "0.8"
zstd = { version = "0.9", default-features = false } zstd = { version = "0.9", default-features = false }
rmp-serde = "1.1" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"

View file

@ -37,7 +37,7 @@ rand = "0.8"
async-trait = "0.1.7" async-trait = "0.1.7"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
rmp-serde = "1.1" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"
structopt = { version = "0.3", default-features = false } structopt = { version = "0.3", default-features = false }

View file

@ -81,7 +81,7 @@ impl Repair {
info!("repair_versions: {}", i); info!("repair_versions: {}", i);
} }
let version = rmp_serde::decode::from_slice::<Version>(&item_bytes)?; let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() { if version.deleted.get() {
continue; continue;
} }
@ -133,7 +133,7 @@ impl Repair {
info!("repair_block_ref: {}", i); info!("repair_block_ref: {}", i);
} }
let block_ref = rmp_serde::decode::from_slice::<BlockRef>(&item_bytes)?; let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() { if block_ref.deleted.get() {
continue; continue;
} }

View file

@ -31,7 +31,7 @@ tracing = "0.1.30"
rand = "0.8" rand = "0.8"
zstd = { version = "0.9", default-features = false } zstd = { version = "0.9", default-features = false }
rmp-serde = "1.1" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"

View file

@ -181,9 +181,11 @@ impl<T: CounterSchema> IndexCounter<T> {
let new_entry = self.local_counter.db().transaction(|mut tx| { let new_entry = self.local_counter.db().transaction(|mut tx| {
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => rmp_serde::decode::from_slice::<LocalCounterEntry>(&old_bytes) Some(old_bytes) => {
.map_err(Error::RmpDecode) rmp_serde::decode::from_read_ref::<_, LocalCounterEntry>(&old_bytes)
.map_err(db::TxError::Abort)?, .map_err(Error::RmpDecode)
.map_err(db::TxError::Abort)?
}
None => LocalCounterEntry { None => LocalCounterEntry {
values: BTreeMap::new(), values: BTreeMap::new(),
}, },

View file

@ -175,7 +175,7 @@ impl TableSchema for KeyTable {
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> { fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old_k = rmp_serde::decode::from_slice::<old::Key>(bytes).ok()?; let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?;
let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone()); let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
let state = if old_k.deleted.get() { let state = if old_k.deleted.get() {

View file

@ -28,7 +28,7 @@ impl Migrate {
let mut old_buckets = vec![]; let mut old_buckets = vec![];
for res in tree.iter().map_err(GarageError::from)? { for res in tree.iter().map_err(GarageError::from)? {
let (_k, v) = res.map_err(GarageError::from)?; let (_k, v) = res.map_err(GarageError::from)?;
let bucket = rmp_serde::decode::from_slice::<old_bucket::Bucket>(&v[..]) let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
.map_err(GarageError::from)?; .map_err(GarageError::from)?;
old_buckets.push(bucket); old_buckets.push(bucket);
} }

View file

@ -270,7 +270,7 @@ impl TableSchema for ObjectTable {
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> { fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old_obj = rmp_serde::decode::from_slice::<old::Object>(bytes).ok()?; let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?;
Some(migrate_object(old_obj)) Some(migrate_object(old_obj))
} }
} }

View file

@ -168,7 +168,7 @@ impl TableSchema for VersionTable {
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> { fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old = rmp_serde::decode::from_slice::<old::Version>(bytes).ok()?; let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
let blocks = old let blocks = old
.blocks .blocks

View file

@ -26,7 +26,7 @@ rand = "0.8"
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
async-trait = "0.1.7" async-trait = "0.1.7"
rmp-serde = "1.1" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"
serde_json = "1.0" serde_json = "1.0"

View file

@ -26,7 +26,7 @@ hexdump = "0.1"
tracing = "0.1.30" tracing = "0.1.30"
rand = "0.8" rand = "0.8"
rmp-serde = "1.1" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_bytes = "0.11" serde_bytes = "0.11"

View file

@ -300,7 +300,7 @@ where
} }
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_slice::<F::E>(bytes) { match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) {
Ok(x) => Ok(x), Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) { Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x), Some(x) => Ok(x),

View file

@ -354,7 +354,7 @@ impl MerkleNode {
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> { fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
match ent { match ent {
None => Ok(MerkleNode::Empty), None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_slice::<MerkleNode>(&v[..])?), Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
} }
} }

View file

@ -25,7 +25,7 @@ rand = "0.8"
sha2 = "0.9" sha2 = "0.9"
chrono = "0.4" chrono = "0.4"
rmp-serde = "1.1" rmp-serde = "0.15"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
serde_json = "1.0" serde_json = "1.0"
toml = "0.5" toml = "0.5"

View file

@ -151,7 +151,7 @@ where
let mut wr = Vec::with_capacity(128); let mut wr = Vec::with_capacity(128);
let mut se = rmp_serde::Serializer::new(&mut wr) let mut se = rmp_serde::Serializer::new(&mut wr)
.with_struct_map() .with_struct_map()
.with_binary(); .with_string_variants();
val.serialize(&mut se)?; val.serialize(&mut se)?;
Ok(wr) Ok(wr)
} }

View file

@ -33,7 +33,7 @@ where
let mut bytes = vec![]; let mut bytes = vec![];
file.read_to_end(&mut bytes)?; file.read_to_end(&mut bytes)?;
let value = rmp_serde::decode::from_slice(&bytes[..])?; let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value) Ok(value)
} }
@ -57,7 +57,7 @@ where
let mut bytes = vec![]; let mut bytes = vec![];
file.read_to_end(&mut bytes).await?; file.read_to_end(&mut bytes).await?;
let value = rmp_serde::decode::from_slice(&bytes[..])?; let value = rmp_serde::decode::from_read_ref(&bytes[..])?;
Ok(value) Ok(value)
} }