Refactor how things are migrated #461
17 changed files with 57 additions and 57 deletions
5
Cargo.lock
generated
5
Cargo.lock
generated
|
@ -1080,7 +1080,6 @@ dependencies = [
|
||||||
"parse_duration",
|
"parse_duration",
|
||||||
"prometheus",
|
"prometheus",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rmp-serde",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
|
@ -1156,7 +1155,6 @@ dependencies = [
|
||||||
"hex",
|
"hex",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rmp-serde",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -1200,7 +1198,6 @@ dependencies = [
|
||||||
"netapp",
|
"netapp",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rmp-serde",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
@ -1229,7 +1226,6 @@ dependencies = [
|
||||||
"pnet_datalink",
|
"pnet_datalink",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"reqwest",
|
"reqwest",
|
||||||
"rmp-serde",
|
|
||||||
"schemars",
|
"schemars",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
|
@ -1255,7 +1251,6 @@ dependencies = [
|
||||||
"hexdump",
|
"hexdump",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rmp-serde",
|
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
|
|
@ -32,7 +32,7 @@ args@{
|
||||||
ignoreLockHash,
|
ignoreLockHash,
|
||||||
}:
|
}:
|
||||||
let
|
let
|
||||||
nixifiedLockHash = "f277e0cfb8bb74ed78c99c8e75821c5066b4b8250e9c14d34d0cca70a38c6cab";
|
nixifiedLockHash = "b6aeefc112eb232904b24398f4e5da776c8ee2c13d427a26dbdf1732205d4fc9";
|
||||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||||
lockHashIgnored = if ignoreLockHash
|
lockHashIgnored = if ignoreLockHash
|
||||||
|
@ -1539,7 +1539,6 @@ in
|
||||||
parse_duration = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" { inherit profileName; }).out;
|
parse_duration = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" { inherit profileName; }).out;
|
||||||
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage/prometheus" then "prometheus" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".prometheus."0.13.0" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage/prometheus" then "prometheus" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".prometheus."0.13.0" { inherit profileName; }).out;
|
||||||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||||
rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out;
|
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
||||||
structopt = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; }).out;
|
structopt = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; }).out;
|
||||||
|
@ -1639,7 +1638,6 @@ in
|
||||||
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
|
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
|
||||||
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
||||||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||||
rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out;
|
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
||||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
||||||
|
@ -1710,7 +1708,6 @@ in
|
||||||
netapp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; }).out;
|
netapp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; }).out;
|
||||||
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
||||||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||||
rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out;
|
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
||||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
||||||
|
@ -1752,7 +1749,6 @@ in
|
||||||
pnet_datalink = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; }).out;
|
pnet_datalink = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; }).out;
|
||||||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||||
${ if rootFeatures' ? "garage/consul-discovery" || rootFeatures' ? "garage_rpc/consul-discovery" || rootFeatures' ? "garage_rpc/reqwest" then "reqwest" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".reqwest."0.11.12" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/consul-discovery" || rootFeatures' ? "garage_rpc/consul-discovery" || rootFeatures' ? "garage_rpc/reqwest" then "reqwest" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".reqwest."0.11.12" { inherit profileName; }).out;
|
||||||
rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out;
|
|
||||||
${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage_rpc/kubernetes-discovery" || rootFeatures' ? "garage_rpc/schemars" then "schemars" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".schemars."0.8.8" { inherit profileName; }).out;
|
${ if rootFeatures' ? "garage/kubernetes-discovery" || rootFeatures' ? "garage_rpc/kubernetes-discovery" || rootFeatures' ? "garage_rpc/schemars" then "schemars" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".schemars."0.8.8" { inherit profileName; }).out;
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
||||||
|
@ -1781,7 +1777,6 @@ in
|
||||||
hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out;
|
hexdump = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; }).out;
|
||||||
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
||||||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||||
rmp_serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; }).out;
|
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }).out;
|
||||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; }).out;
|
||||||
|
|
|
@ -31,7 +31,6 @@ rand = "0.8"
|
||||||
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
|
async-compression = { version = "0.3", features = ["tokio", "zstd"] }
|
||||||
zstd = { version = "0.9", default-features = false }
|
zstd = { version = "0.9", default-features = false }
|
||||||
|
|
||||||
rmp-serde = "0.15"
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
|
|
||||||
|
|
|
@ -42,7 +42,6 @@ rand = "0.8"
|
||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
|
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
|
||||||
|
|
||||||
rmp-serde = "0.15"
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
structopt = { version = "0.3", default-features = false }
|
structopt = { version = "0.3", default-features = false }
|
||||||
|
|
|
@ -12,6 +12,7 @@ use garage_model::s3::version_table::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
use garage_util::migrate::Migrate;
|
||||||
|
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
||||||
|
@ -100,7 +101,7 @@ impl Worker for RepairVersionsWorker {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
|
let version = Version::decode(&item_bytes).ok_or_message("Cannot decode Version")?;
|
||||||
if !version.deleted.get() {
|
if !version.deleted.get() {
|
||||||
let object = self
|
let object = self
|
||||||
.garage
|
.garage
|
||||||
|
@ -180,7 +181,7 @@ impl Worker for RepairBlockrefsWorker {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
|
let block_ref = BlockRef::decode(&item_bytes).ok_or_message("Cannot decode BlockRef")?;
|
||||||
if !block_ref.deleted.get() {
|
if !block_ref.deleted.get() {
|
||||||
let version = self
|
let version = self
|
||||||
.garage
|
.garage
|
||||||
|
|
|
@ -30,7 +30,6 @@ tracing = "0.1.30"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
zstd = { version = "0.9", default-features = false }
|
zstd = { version = "0.9", default-features = false }
|
||||||
|
|
||||||
rmp-serde = "0.15"
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
|
|
||||||
|
|
|
@ -279,8 +279,8 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
|
|
||||||
info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
|
info!("zeroing old counters... ({})", hex::encode(&batch[0].0));
|
||||||
for (local_counter_k, local_counter) in batch {
|
for (local_counter_k, local_counter) in batch {
|
||||||
let mut local_counter =
|
let mut local_counter = LocalCounterEntry::<T>::decode(&local_counter)
|
||||||
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&local_counter)?;
|
.ok_or_message("Cannot decode local counter entry")?;
|
||||||
|
|
||||||
for (_, tv) in local_counter.values.iter_mut() {
|
for (_, tv) in local_counter.values.iter_mut() {
|
||||||
tv.0 = std::cmp::max(tv.0 + 1, now);
|
tv.0 = std::cmp::max(tv.0 + 1, now);
|
||||||
|
@ -335,9 +335,8 @@ impl<T: CountedItem> IndexCounter<T> {
|
||||||
let local_counter_key = self.table.data.tree_key(pk, sk);
|
let local_counter_key = self.table.data.tree_key(pk, sk);
|
||||||
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
|
let mut local_counter = match self.local_counter.get(&local_counter_key)? {
|
||||||
Some(old_bytes) => {
|
Some(old_bytes) => {
|
||||||
let ent = rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(
|
let ent = LocalCounterEntry::<T>::decode(&old_bytes)
|
||||||
&old_bytes,
|
.ok_or_message("Cannot decode local counter entry")?;
|
||||||
)?;
|
|
||||||
assert!(ent.pk == *pk);
|
assert!(ent.pk == *pk);
|
||||||
assert!(ent.sk == *sk);
|
assert!(ent.sk == *sk);
|
||||||
ent
|
ent
|
||||||
|
|
|
@ -2,6 +2,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use garage_util::crdt::*;
|
use garage_util::crdt::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
use garage_util::encode::nonversioned_decode;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
|
||||||
|
@ -28,8 +29,8 @@ impl Migrate {
|
||||||
let mut old_buckets = vec![];
|
let mut old_buckets = vec![];
|
||||||
for res in tree.iter().map_err(GarageError::from)? {
|
for res in tree.iter().map_err(GarageError::from)? {
|
||||||
let (_k, v) = res.map_err(GarageError::from)?;
|
let (_k, v) = res.map_err(GarageError::from)?;
|
||||||
let bucket = rmp_serde::decode::from_read_ref::<_, old_bucket::Bucket>(&v[..])
|
let bucket =
|
||||||
.map_err(GarageError::from)?;
|
nonversioned_decode::<old_bucket::Bucket>(&v[..]).map_err(GarageError::from)?;
|
||||||
old_buckets.push(bucket);
|
old_buckets.push(bucket);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,6 @@ rand = "0.8"
|
||||||
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
|
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
|
||||||
|
|
||||||
async-trait = "0.1.7"
|
async-trait = "0.1.7"
|
||||||
rmp-serde = "0.15"
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
|
|
@ -5,6 +5,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
|
use garage_util::crdt::{AutoCrdt, Crdt, LwwMap};
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
use garage_util::encode::nonversioned_encode;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
|
||||||
use crate::ring::*;
|
use crate::ring::*;
|
||||||
|
@ -70,7 +71,7 @@ impl NodeRole {
|
||||||
impl ClusterLayout {
|
impl ClusterLayout {
|
||||||
pub fn new(replication_factor: usize) -> Self {
|
pub fn new(replication_factor: usize) -> Self {
|
||||||
let empty_lwwmap = LwwMap::new();
|
let empty_lwwmap = LwwMap::new();
|
||||||
let empty_lwwmap_hash = blake2sum(&rmp_to_vec_all_named(&empty_lwwmap).unwrap()[..]);
|
let empty_lwwmap_hash = blake2sum(&nonversioned_encode(&empty_lwwmap).unwrap()[..]);
|
||||||
|
|
||||||
ClusterLayout {
|
ClusterLayout {
|
||||||
version: 0,
|
version: 0,
|
||||||
|
@ -92,7 +93,7 @@ impl ClusterLayout {
|
||||||
Ordering::Equal => {
|
Ordering::Equal => {
|
||||||
self.staging.merge(&other.staging);
|
self.staging.merge(&other.staging);
|
||||||
|
|
||||||
let new_staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
let new_staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
|
||||||
let changed = new_staging_hash != self.staging_hash;
|
let changed = new_staging_hash != self.staging_hash;
|
||||||
|
|
||||||
self.staging_hash = new_staging_hash;
|
self.staging_hash = new_staging_hash;
|
||||||
|
@ -127,7 +128,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
|
|
||||||
self.staging.clear();
|
self.staging.clear();
|
||||||
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
|
@ -151,7 +152,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
}
|
}
|
||||||
|
|
||||||
self.staging.clear();
|
self.staging.clear();
|
||||||
self.staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
self.staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
|
||||||
|
|
||||||
self.version += 1;
|
self.version += 1;
|
||||||
|
|
||||||
|
@ -180,7 +181,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
|
||||||
/// returns true if consistent, false if error
|
/// returns true if consistent, false if error
|
||||||
pub fn check(&self) -> bool {
|
pub fn check(&self) -> bool {
|
||||||
// Check that the hash of the staging data is correct
|
// Check that the hash of the staging data is correct
|
||||||
let staging_hash = blake2sum(&rmp_to_vec_all_named(&self.staging).unwrap()[..]);
|
let staging_hash = blake2sum(&nonversioned_encode(&self.staging).unwrap()[..]);
|
||||||
if staging_hash != self.staging_hash {
|
if staging_hash != self.staging_hash {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -28,7 +28,6 @@ hexdump = "0.1"
|
||||||
tracing = "0.1.30"
|
tracing = "0.1.30"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
|
|
||||||
rmp-serde = "0.15"
|
|
||||||
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
|
||||||
serde_bytes = "0.11"
|
serde_bytes = "0.11"
|
||||||
|
|
||||||
|
|
|
@ -10,6 +10,7 @@ use garage_db as db;
|
||||||
|
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
use garage_util::encode::{nonversioned_decode, nonversioned_encode};
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
|
|
||||||
use garage_rpc::ring::*;
|
use garage_rpc::ring::*;
|
||||||
|
@ -67,7 +68,7 @@ pub enum MerkleNode {
|
||||||
|
|
||||||
impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
||||||
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
|
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
|
||||||
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
|
let empty_node_hash = blake2sum(&nonversioned_encode(&MerkleNode::Empty).unwrap()[..]);
|
||||||
|
|
||||||
Arc::new(Self {
|
Arc::new(Self {
|
||||||
data,
|
data,
|
||||||
|
@ -273,7 +274,7 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
|
||||||
tx.remove(&self.data.merkle_tree, k.encode())?;
|
tx.remove(&self.data.merkle_tree, k.encode())?;
|
||||||
Ok(self.empty_node_hash)
|
Ok(self.empty_node_hash)
|
||||||
} else {
|
} else {
|
||||||
let vby = rmp_to_vec_all_named(v).map_err(|e| db::TxError::Abort(e.into()))?;
|
let vby = nonversioned_encode(v).map_err(|e| db::TxError::Abort(e.into()))?;
|
||||||
let rethash = blake2sum(&vby[..]);
|
let rethash = blake2sum(&vby[..]);
|
||||||
tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
|
tx.insert(&self.data.merkle_tree, k.encode(), vby)?;
|
||||||
Ok(rethash)
|
Ok(rethash)
|
||||||
|
@ -364,7 +365,7 @@ impl MerkleNode {
|
||||||
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
|
fn decode_opt(ent: &Option<db::Value>) -> Result<Self, Error> {
|
||||||
match ent {
|
match ent {
|
||||||
None => Ok(MerkleNode::Empty),
|
None => Ok(MerkleNode::Empty),
|
||||||
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?),
|
Some(v) => Ok(nonversioned_decode::<MerkleNode>(&v[..])?),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -14,6 +14,7 @@ use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
use garage_util::encode::nonversioned_encode;
|
||||||
use garage_util::error::{Error, OkOrMessage};
|
use garage_util::error::{Error, OkOrMessage};
|
||||||
|
|
||||||
use garage_rpc::ring::*;
|
use garage_rpc::ring::*;
|
||||||
|
@ -615,7 +616,7 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
|
||||||
// ---- UTIL ----
|
// ---- UTIL ----
|
||||||
|
|
||||||
fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
|
fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
|
||||||
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
|
Ok(blake2sum(&nonversioned_encode(x)?[..]))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
|
fn join_ordered<'a, K: Ord + Eq, V1, V2>(
|
||||||
|
|
|
@ -141,21 +141,6 @@ pub fn gen_uuid() -> Uuid {
|
||||||
rand::thread_rng().gen::<[u8; 32]>().into()
|
rand::thread_rng().gen::<[u8; 32]>().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
// RMP serialization with names of fields and variants
|
|
||||||
|
|
||||||
/// Serialize to MessagePack
|
|
||||||
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
|
||||||
where
|
|
||||||
T: Serialize + ?Sized,
|
|
||||||
{
|
|
||||||
let mut wr = Vec::with_capacity(128);
|
|
||||||
let mut se = rmp_serde::Serializer::new(&mut wr)
|
|
||||||
.with_struct_map()
|
|
||||||
.with_string_variants();
|
|
||||||
val.serialize(&mut se)?;
|
|
||||||
Ok(wr)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Serialize to JSON, truncating long result
|
/// Serialize to JSON, truncating long result
|
||||||
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
pub fn debug_serialize<T: Serialize>(x: T) -> String {
|
||||||
match serde_json::to_string(&x) {
|
match serde_json::to_string(&x) {
|
||||||
|
|
26
src/util/encode.rs
Normal file
26
src/util/encode.rs
Normal file
|
@ -0,0 +1,26 @@
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// Serialize to MessagePacki, without versionning
|
||||||
|
/// (see garage_util::migrate for functions that manage versionned
|
||||||
|
/// data formats)
|
||||||
|
pub fn nonversioned_encode<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||||
|
where
|
||||||
|
T: Serialize + ?Sized,
|
||||||
|
{
|
||||||
|
let mut wr = Vec::with_capacity(128);
|
||||||
|
let mut se = rmp_serde::Serializer::new(&mut wr)
|
||||||
|
.with_struct_map()
|
||||||
|
.with_string_variants();
|
||||||
|
val.serialize(&mut se)?;
|
||||||
|
Ok(wr)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Deserialize from MessagePacki, without versionning
|
||||||
|
/// (see garage_util::migrate for functions that manage versionned
|
||||||
|
/// data formats)
|
||||||
|
pub fn nonversioned_decode<T>(bytes: &[u8]) -> Result<T, rmp_serde::decode::Error>
|
||||||
|
where
|
||||||
|
T: for<'de> Deserialize<'de> + ?Sized,
|
||||||
|
{
|
||||||
|
rmp_serde::decode::from_read_ref::<_, T>(bytes)
|
||||||
|
}
|
|
@ -8,6 +8,7 @@ pub mod background;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod crdt;
|
pub mod crdt;
|
||||||
pub mod data;
|
pub mod data;
|
||||||
|
pub mod encode;
|
||||||
pub mod error;
|
pub mod error;
|
||||||
pub mod formater;
|
pub mod formater;
|
||||||
pub mod metrics;
|
pub mod metrics;
|
||||||
|
|
|
@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
|
||||||
pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
|
pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
|
||||||
/// A sequence of bytes to add at the beginning of the serialized
|
/// A sequence of bytes to add at the beginning of the serialized
|
||||||
/// string, to identify that the data is of this version.
|
/// string, to identify that the data is of this version.
|
||||||
const MARKER: &'static [u8] = b"";
|
const VERSION_MARKER: &'static [u8] = b"";
|
||||||
|
|
||||||
/// The previous version of this data type, from which items of this version
|
/// The previous version of this data type, from which items of this version
|
||||||
/// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype
|
/// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype
|
||||||
|
@ -15,10 +15,9 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
|
||||||
fn migrate(previous: Self::Previous) -> Self;
|
fn migrate(previous: Self::Previous) -> Self;
|
||||||
|
|
||||||
fn decode(bytes: &[u8]) -> Option<Self> {
|
fn decode(bytes: &[u8]) -> Option<Self> {
|
||||||
if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER {
|
let marker_len = Self::VERSION_MARKER.len();
|
||||||
if let Ok(value) =
|
if bytes.len() >= marker_len && &bytes[..marker_len] == Self::VERSION_MARKER {
|
||||||
rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..])
|
if let Ok(value) = rmp_serde::decode::from_read_ref::<_, Self>(&bytes[marker_len..]) {
|
||||||
{
|
|
||||||
return Some(value);
|
return Some(value);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -28,7 +27,7 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
|
||||||
|
|
||||||
fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
|
fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
|
||||||
let mut wr = Vec::with_capacity(128);
|
let mut wr = Vec::with_capacity(128);
|
||||||
wr.extend_from_slice(Self::MARKER);
|
wr.extend_from_slice(Self::VERSION_MARKER);
|
||||||
let mut se = rmp_serde::Serializer::new(&mut wr)
|
let mut se = rmp_serde::Serializer::new(&mut wr)
|
||||||
.with_struct_map()
|
.with_struct_map()
|
||||||
.with_string_variants();
|
.with_string_variants();
|
||||||
|
@ -40,13 +39,13 @@ pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
|
||||||
pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
|
pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
|
||||||
/// A sequence of bytes to add at the beginning of the serialized
|
/// A sequence of bytes to add at the beginning of the serialized
|
||||||
/// string, to identify that the data is of this version.
|
/// string, to identify that the data is of this version.
|
||||||
const MARKER: &'static [u8] = b"";
|
const VERSION_MARKER: &'static [u8] = b"";
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
impl<T: InitialFormat> Migrate for T {
|
impl<T: InitialFormat> Migrate for T {
|
||||||
const MARKER: &'static [u8] = <T as InitialFormat>::MARKER;
|
const VERSION_MARKER: &'static [u8] = <T as InitialFormat>::VERSION_MARKER;
|
||||||
|
|
||||||
type Previous = NoPrevious;
|
type Previous = NoPrevious;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue