Refactor how things are migrated #461

Merged
lx merged 9 commits from format-migration into main 2023-01-03 15:28:25 +00:00
27 changed files with 638 additions and 709 deletions
Showing only changes of commit cdb2a591e9 - Show all commits

1
Cargo.lock generated
View file

@ -1277,6 +1277,7 @@ dependencies = [
"garage_db", "garage_db",
"git-version", "git-version",
"hex", "hex",
"hexdump",
"http", "http",
"hyper", "hyper",
"lazy_static", "lazy_static",

View file

@ -178,6 +178,7 @@ struct ScrubWorkerPersisted {
time_last_complete_scrub: u64, time_last_complete_scrub: u64,
corruptions_detected: u64, corruptions_detected: u64,
} }
impl garage_util::migrate::InitialFormat for ScrubWorkerPersisted {}
enum ScrubWorkerState { enum ScrubWorkerState {
Running(BlockStoreIterator), Running(BlockStoreIterator),

View file

@ -63,6 +63,7 @@ struct ResyncPersistedConfig {
n_workers: usize, n_workers: usize,
tranquility: u32, tranquility: u32,
} }
impl garage_util::migrate::InitialFormat for ResyncPersistedConfig {}
enum ResyncIterResult { enum ResyncIterResult {
BusyDidSomething, BusyDidSomething,

View file

@ -74,7 +74,7 @@ base64 = "0.13"
[features] [features]
default = [ "bundled-libs", "metrics", "sled" ] default = [ "bundled-libs", "metrics", "sled", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ] k2v = [ "garage_util/k2v", "garage_api/k2v" ]

View file

@ -5,14 +5,22 @@ use garage_util::data::*;
use garage_table::crdt::*; use garage_table::crdt::*;
use garage_table::*; use garage_table::*;
mod v08 {
use super::*;
/// The bucket alias table holds the names given to buckets /// The bucket alias table holds the names given to buckets
/// in the global namespace. /// in the global namespace.
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BucketAlias { pub struct BucketAlias {
name: String, pub(super) name: String,
pub state: crdt::Lww<Option<Uuid>>, pub state: crdt::Lww<Option<Uuid>>,
} }
impl garage_util::migrate::InitialFormat for BucketAlias {}
}
pub use v08::*;
impl BucketAlias { impl BucketAlias {
pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> { pub fn new(name: String, ts: u64, bucket_id: Option<Uuid>) -> Option<Self> {
if !is_valid_bucket_name(&name) { if !is_valid_bucket_name(&name) {

View file

@ -7,6 +7,9 @@ use garage_util::time::*;
use crate::permission::BucketKeyPerm; use crate::permission::BucketKeyPerm;
mod v08 {
use super::*;
/// A bucket is a collection of objects /// A bucket is a collection of objects
/// ///
/// Its parameters are not directly accessible as: /// Its parameters are not directly accessible as:
@ -73,6 +76,11 @@ pub struct BucketQuotas {
pub max_objects: Option<u64>, pub max_objects: Option<u64>,
} }
impl garage_util::migrate::InitialFormat for Bucket {}
}
pub use v08::*;
impl AutoCrdt for BucketQuotas { impl AutoCrdt for BucketQuotas {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }

View file

@ -12,6 +12,7 @@ use garage_rpc::system::System;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::migrate::Migrate;
use garage_util::time::*; use garage_util::time::*;
use garage_table::crdt::*; use garage_table::crdt::*;
@ -29,6 +30,9 @@ pub trait CountedItem: Clone + PartialEq + Send + Sync + 'static {
fn counts(&self) -> Vec<(&'static str, i64)>; fn counts(&self) -> Vec<(&'static str, i64)>;
} }
mod v08 {
use super::*;
/// A counter entry in the global table /// A counter entry in the global table
#[derive(Clone, PartialEq, Debug, Serialize, Deserialize)] #[derive(Clone, PartialEq, Debug, Serialize, Deserialize)]
pub struct CounterEntry<T: CountedItem> { pub struct CounterEntry<T: CountedItem> {
@ -37,6 +41,17 @@ pub struct CounterEntry<T: CountedItem> {
pub values: BTreeMap<String, CounterValue>, pub values: BTreeMap<String, CounterValue>,
} }
/// A counter entry in the global table
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
impl<T: CountedItem> garage_util::migrate::InitialFormat for CounterEntry<T> {}
}
pub use v08::*;
impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> { impl<T: CountedItem> Entry<T::CP, T::CS> for CounterEntry<T> {
fn partition_key(&self) -> &T::CP { fn partition_key(&self) -> &T::CP {
&self.pk &self.pk
@ -78,12 +93,6 @@ impl<T: CountedItem> CounterEntry<T> {
} }
} }
/// A counter entry in the global table
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct CounterValue {
pub node_values: BTreeMap<Uuid, (u64, i64)>,
}
impl<T: CountedItem> Crdt for CounterEntry<T> { impl<T: CountedItem> Crdt for CounterEntry<T> {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
for (name, e2) in other.values.iter() { for (name, e2) in other.values.iter() {
@ -195,11 +204,9 @@ impl<T: CountedItem> IndexCounter<T> {
let tree_key = self.table.data.tree_key(pk, sk); let tree_key = self.table.data.tree_key(pk, sk);
let mut entry = match tx.get(&self.local_counter, &tree_key[..])? { let mut entry = match tx.get(&self.local_counter, &tree_key[..])? {
Some(old_bytes) => { Some(old_bytes) => LocalCounterEntry::<T>::decode(&old_bytes)
rmp_serde::decode::from_read_ref::<_, LocalCounterEntry<T>>(&old_bytes) .ok_or_message("Cannot decode local counter entry")
.map_err(Error::RmpDecode) .map_err(db::TxError::Abort)?,
.map_err(db::TxError::Abort)?
}
None => LocalCounterEntry { None => LocalCounterEntry {
pk: pk.clone(), pk: pk.clone(),
sk: sk.clone(), sk: sk.clone(),
@ -214,7 +221,8 @@ impl<T: CountedItem> IndexCounter<T> {
ent.1 += *inc; ent.1 += *inc;
} }
let new_entry_bytes = rmp_to_vec_all_named(&entry) let new_entry_bytes = entry
.encode()
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?; .map_err(db::TxError::Abort)?;
tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?; tx.insert(&self.local_counter, &tree_key[..], new_entry_bytes)?;
@ -263,7 +271,7 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 = 0; tv.1 = 0;
} }
let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; let local_counter_bytes = local_counter.encode()?;
self.local_counter self.local_counter
.insert(&local_counter_k, &local_counter_bytes)?; .insert(&local_counter_k, &local_counter_bytes)?;
@ -330,7 +338,7 @@ impl<T: CountedItem> IndexCounter<T> {
tv.1 += v; tv.1 += v;
} }
let local_counter_bytes = rmp_to_vec_all_named(&local_counter)?; let local_counter_bytes = local_counter.encode()?;
self.local_counter self.local_counter
.insert(&local_counter_key, local_counter_bytes)?; .insert(&local_counter_key, local_counter_bytes)?;
@ -357,6 +365,8 @@ struct LocalCounterEntry<T: CountedItem> {
values: BTreeMap<String, (u64, i64)>, values: BTreeMap<String, (u64, i64)>,
} }
impl<T: CountedItem> garage_util::migrate::InitialFormat for LocalCounterEntry<T> {}
impl<T: CountedItem> LocalCounterEntry<T> { impl<T: CountedItem> LocalCounterEntry<T> {
fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> { fn into_counter_entry(self, this_node: Uuid) -> CounterEntry<T> {
CounterEntry { CounterEntry {

View file

@ -1,7 +1,8 @@
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize, Serialize};
use garage_db as db; use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
@ -17,12 +18,18 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values"; pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes"; pub const BYTES: &str = "bytes";
mod v08 {
use crate::k2v::causality::K2VNodeId;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem { pub struct K2VItem {
pub partition: K2VItemPartition, pub partition: K2VItemPartition,
pub sort_key: String, pub sort_key: String,
items: BTreeMap<K2VNodeId, DvvsEntry>, pub(super) items: BTreeMap<K2VNodeId, DvvsEntry>,
} }
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
@ -32,9 +39,9 @@ pub struct K2VItemPartition {
} }
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
struct DvvsEntry { pub struct DvvsEntry {
t_discard: u64, pub(super) t_discard: u64,
values: Vec<(u64, DvvsValue)>, pub(super) values: Vec<(u64, DvvsValue)>,
} }
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@ -43,6 +50,11 @@ pub enum DvvsValue {
Deleted, Deleted,
} }
impl garage_util::migrate::InitialFormat for K2VItem {}
}
pub use v08::*;
impl K2VItem { impl K2VItem {
/// Creates a new K2VItem when no previous entry existed in the db /// Creates a new K2VItem when no previous entry existed in the db
pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self { pub fn new(bucket_id: Uuid, partition_key: String, sort_key: String) -> Self {

View file

@ -1,12 +1,58 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_table::crdt::*; use garage_util::crdt::{self, Crdt};
use garage_table::*;
use garage_util::data::*; use garage_util::data::*;
use garage_table::{DeletedFilter, EmptyKey, Entry, TableSchema};
use crate::permission::BucketKeyPerm; use crate::permission::BucketKeyPerm;
use crate::prev::v051::key_table as old; pub(crate) mod v05 {
use garage_util::crdt;
use serde::{Deserialize, Serialize};
/// An api key
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
/// The id of the key (immutable), used as partition key
pub key_id: String,
/// The secret_key associated
pub secret_key: String,
/// Name for the key
pub name: crdt::Lww<String>,
/// Is the key deleted
pub deleted: crdt::Bool,
/// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
}
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool,
}
impl crdt::AutoCrdt for PermissionSet {
const WARN_IF_DIFFERENT: bool = true;
}
impl garage_util::migrate::InitialFormat for Key {}
}
mod v08 {
use super::v05;
use crate::permission::BucketKeyPerm;
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
/// An api key /// An api key
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@ -40,6 +86,36 @@ pub struct KeyParams {
pub local_aliases: crdt::LwwMap<String, Option<Uuid>>, pub local_aliases: crdt::LwwMap<String, Option<Uuid>>,
} }
impl garage_util::migrate::Migrate for Key {
type Previous = v05::Key;
fn migrate(old_k: v05::Key) -> Key {
let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
let state = if old_k.deleted.get() {
crdt::Deletable::Deleted
} else {
// Authorized buckets is ignored here,
// migration is performed in specific migration code in
// garage/migrate.rs
crdt::Deletable::Present(KeyParams {
secret_key: old_k.secret_key,
name,
allow_create_bucket: crdt::Lww::new(false),
authorized_buckets: crdt::Map::new(),
local_aliases: crdt::LwwMap::new(),
})
};
Key {
key_id: old_k.key_id,
state,
}
}
}
}
pub use v08::*;
impl KeyParams { impl KeyParams {
fn new(secret_key: &str, name: &str) -> Self { fn new(secret_key: &str, name: &str) -> Self {
KeyParams { KeyParams {
@ -173,28 +249,4 @@ impl TableSchema for KeyTable {
} }
} }
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old_k = rmp_serde::decode::from_read_ref::<_, old::Key>(bytes).ok()?;
let name = crdt::Lww::raw(old_k.name.timestamp(), old_k.name.get().clone());
let state = if old_k.deleted.get() {
crdt::Deletable::Deleted
} else {
// Authorized buckets is ignored here,
// migration is performed in specific migration code in
// garage/migrate.rs
crdt::Deletable::Present(KeyParams {
secret_key: old_k.secret_key,
name,
allow_create_bucket: crdt::Lww::new(false),
authorized_buckets: crdt::Map::new(),
local_aliases: crdt::LwwMap::new(),
})
};
Some(Key {
key_id: old_k.key_id,
state,
})
}
} }

View file

@ -3,7 +3,7 @@ use serde::{Deserialize, Serialize};
use garage_table::crdt::Crdt; use garage_table::crdt::Crdt;
use garage_table::*; use garage_table::*;
use super::key_table::PermissionSet; use crate::key_table::v05::PermissionSet;
/// A bucket is a collection of objects /// A bucket is a collection of objects
/// ///

View file

@ -1,50 +0,0 @@
use serde::{Deserialize, Serialize};
use garage_table::crdt::*;
use garage_table::*;
/// An api key
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Key {
/// The id of the key (immutable), used as partition key
pub key_id: String,
/// The secret_key associated
pub secret_key: String,
/// Name for the key
pub name: crdt::Lww<String>,
/// Is the key deleted
pub deleted: crdt::Bool,
/// Buckets in which the key is authorized. Empty if `Key` is deleted
// CRDT interaction: deleted implies authorized_buckets is empty
pub authorized_buckets: crdt::LwwMap<String, PermissionSet>,
}
/// Permission given to a key in a bucket
#[derive(PartialOrd, Ord, PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct PermissionSet {
/// The key can be used to read the bucket
pub allow_read: bool,
/// The key can be used to write in the bucket
pub allow_write: bool,
}
impl AutoCrdt for PermissionSet {
const WARN_IF_DIFFERENT: bool = true;
}
impl Crdt for Key {
fn merge(&mut self, other: &Self) {
self.name.merge(&other.name);
self.deleted.merge(&other.deleted);
if self.deleted.get() {
self.authorized_buckets.clear();
} else {
self.authorized_buckets.merge(&other.authorized_buckets);
}
}
}

View file

@ -1,4 +1 @@
pub(crate) mod bucket_table; pub(crate) mod bucket_table;
pub(crate) mod key_table;
pub(crate) mod object_table;
pub(crate) mod version_table;

View file

@ -1,149 +0,0 @@
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use garage_util::data::*;
use garage_table::crdt::*;
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket: String,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
/// The list of currenty stored versions of the object
versions: Vec<ObjectVersion>,
}
impl Object {
/// Get a list of currently stored versions of `Object`
pub fn versions(&self) -> &[ObjectVersion] {
&self.versions[..]
}
}
/// Informations about a version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
impl Crdt for ObjectVersionState {
fn merge(&mut self, other: &Self) {
use ObjectVersionState::*;
match other {
Aborted => {
*self = Aborted;
}
Complete(b) => match self {
Aborted => {}
Complete(a) => {
a.merge(b);
}
Uploading(_) => {
*self = Complete(b.clone());
}
},
Uploading(_) => {}
}
}
}
/// Data stored in object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
impl AutoCrdt for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true;
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64,
/// etag of the object
pub etag: String,
}
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>,
}
impl ObjectVersion {
fn cmp_key(&self) -> (u64, Uuid) {
(self.timestamp, self.uuid)
}
/// Is the object version completely received
pub fn is_complete(&self) -> bool {
matches!(self.state, ObjectVersionState::Complete(_))
}
}
impl Crdt for Object {
fn merge(&mut self, other: &Self) {
// Merge versions from other into here
for other_v in other.versions.iter() {
match self
.versions
.binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key()))
{
Ok(i) => {
self.versions[i].state.merge(&other_v.state);
}
Err(i) => {
self.versions.insert(i, other_v.clone());
}
}
}
// Remove versions which are obsolete, i.e. those that come
// before the last version which .is_complete().
let last_complete = self
.versions
.iter()
.enumerate()
.rev()
.find(|(_, v)| v.is_complete())
.map(|(vi, _)| vi);
if let Some(last_vi) = last_complete {
self.versions = self.versions.drain(last_vi..).collect::<Vec<_>>();
}
}
}

View file

@ -1,79 +0,0 @@
use serde::{Deserialize, Serialize};
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::*;
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String,
/// Key in which the related object is stored
pub key: String,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64,
}
impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
.cmp(&other.part_number)
.then(self.offset.cmp(&other.offset))
}
}
impl PartialOrd for VersionBlockKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block
pub size: u64,
}
impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true;
}
impl Crdt for Version {
fn merge(&mut self, other: &Self) {
self.deleted.merge(&other.deleted);
if self.deleted.get() {
self.blocks.clear();
self.parts_etags.clear();
} else {
self.blocks.merge(&other.blocks);
self.parts_etags.merge(&other.parts_etags);
}
}
}

View file

@ -1,4 +1,3 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_db as db; use garage_db as db;
@ -10,6 +9,11 @@ use garage_table::*;
use garage_block::manager::*; use garage_block::manager::*;
mod v08 {
use garage_util::crdt;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct BlockRef { pub struct BlockRef {
/// Hash (blake2 sum) of the block, used as partition key /// Hash (blake2 sum) of the block, used as partition key
@ -23,6 +27,11 @@ pub struct BlockRef {
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
} }
impl garage_util::migrate::InitialFormat for BlockRef {}
}
pub use v08::*;
impl Entry<Hash, Uuid> for BlockRef { impl Entry<Hash, Uuid> for BlockRef {
fn partition_key(&self) -> &Hash { fn partition_key(&self) -> &Hash {
&self.block &self.block

View file

@ -1,5 +1,4 @@
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use garage_db as db; use garage_db as db;
@ -13,12 +12,96 @@ use garage_table::*;
use crate::index_counter::*; use crate::index_counter::*;
use crate::s3::version_table::*; use crate::s3::version_table::*;
use crate::prev::v051::object_table as old;
pub const OBJECTS: &str = "objects"; pub const OBJECTS: &str = "objects";
pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads"; pub const UNFINISHED_UPLOADS: &str = "unfinished_uploads";
pub const BYTES: &str = "bytes"; pub const BYTES: &str = "bytes";
mod v05 {
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket: String,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
/// The list of currenty stored versions of the object
pub(super) versions: Vec<ObjectVersion>,
}
/// Informations about a version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
/// Data stored in object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64,
/// etag of the object
pub etag: String,
}
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>,
}
impl garage_util::migrate::InitialFormat for Object {}
}
mod v08 {
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use super::v05;
pub use v05::{
ObjectVersion, ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta,
ObjectVersionState,
};
/// An object /// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object { pub struct Object {
@ -29,9 +112,26 @@ pub struct Object {
pub key: String, pub key: String,
/// The list of currenty stored versions of the object /// The list of currenty stored versions of the object
versions: Vec<ObjectVersion>, pub(super) versions: Vec<ObjectVersion>,
} }
impl garage_util::migrate::Migrate for Object {
type Previous = v05::Object;
fn migrate(old: v05::Object) -> Object {
use garage_util::data::blake2sum;
Object {
bucket_id: blake2sum(old.bucket.as_bytes()),
key: old.key,
versions: old.versions,
}
}
}
}
pub use v08::*;
impl Object { impl Object {
/// Initialize an Object struct from parts /// Initialize an Object struct from parts
pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self { pub fn new(bucket_id: Uuid, key: String, versions: Vec<ObjectVersion>) -> Self {
@ -68,28 +168,6 @@ impl Object {
} }
} }
/// Informations about a version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading(ObjectVersionHeaders),
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
impl Crdt for ObjectVersionState { impl Crdt for ObjectVersionState {
fn merge(&mut self, other: &Self) { fn merge(&mut self, other: &Self) {
use ObjectVersionState::*; use ObjectVersionState::*;
@ -111,42 +189,10 @@ impl Crdt for ObjectVersionState {
} }
} }
/// Data stored in object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
/// The object is short, it's stored inlined
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
impl AutoCrdt for ObjectVersionData { impl AutoCrdt for ObjectVersionData {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Headers to send to the client
pub headers: ObjectVersionHeaders,
/// Size of the object
pub size: u64,
/// etag of the object
pub etag: String,
}
/// Additional headers for an object
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders {
/// Content type of the object
pub content_type: String,
/// Any other http headers to send
pub other: BTreeMap<String, String>,
}
impl ObjectVersion { impl ObjectVersion {
fn cmp_key(&self) -> (u64, Uuid) { fn cmp_key(&self) -> (u64, Uuid) {
(self.timestamp, self.uuid) (self.timestamp, self.uuid)
@ -290,11 +336,6 @@ impl TableSchema for ObjectTable {
ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()), ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
} }
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old_obj = rmp_serde::decode::from_read_ref::<_, old::Object>(bytes).ok()?;
Some(migrate_object(old_obj))
}
} }
impl CountedItem for Object { impl CountedItem for Object {
@ -342,61 +383,3 @@ impl CountedItem for Object {
// vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv // vvvvvvvv migration code, stupid stuff vvvvvvvvvvvv
// (we just want to change bucket into bucket_id by hashing it) // (we just want to change bucket into bucket_id by hashing it)
fn migrate_object(o: old::Object) -> Object {
let versions = o
.versions()
.iter()
.cloned()
.map(migrate_object_version)
.collect();
Object {
bucket_id: blake2sum(o.bucket.as_bytes()),
key: o.key,
versions,
}
}
fn migrate_object_version(v: old::ObjectVersion) -> ObjectVersion {
ObjectVersion {
uuid: Uuid::try_from(v.uuid.as_slice()).unwrap(),
timestamp: v.timestamp,
state: match v.state {
old::ObjectVersionState::Uploading(h) => {
ObjectVersionState::Uploading(migrate_object_version_headers(h))
}
old::ObjectVersionState::Complete(d) => {
ObjectVersionState::Complete(migrate_object_version_data(d))
}
old::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
},
}
}
fn migrate_object_version_headers(h: old::ObjectVersionHeaders) -> ObjectVersionHeaders {
ObjectVersionHeaders {
content_type: h.content_type,
other: h.other,
}
}
fn migrate_object_version_data(d: old::ObjectVersionData) -> ObjectVersionData {
match d {
old::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
old::ObjectVersionData::Inline(m, b) => {
ObjectVersionData::Inline(migrate_object_version_meta(m), b)
}
old::ObjectVersionData::FirstBlock(m, h) => ObjectVersionData::FirstBlock(
migrate_object_version_meta(m),
Hash::try_from(h.as_slice()).unwrap(),
),
}
}
fn migrate_object_version_meta(m: old::ObjectVersionMeta) -> ObjectVersionMeta {
ObjectVersionMeta {
headers: migrate_object_version_headers(m.headers),
size: m.size,
etag: m.etag,
}
}

View file

@ -1,4 +1,3 @@
use serde::{Deserialize, Serialize};
use std::sync::Arc; use std::sync::Arc;
use garage_db as db; use garage_db as db;
@ -11,7 +10,61 @@ use garage_table::*;
use crate::s3::block_ref_table::*; use crate::s3::block_ref_table::*;
use crate::prev::v051::version_table as old; mod v05 {
use garage_util::crdt;
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
/// Etag of each part in case of a multipart upload, empty otherwise
pub parts_etags: crdt::Map<u64, String>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket: String,
/// Key in which the related object is stored
pub key: String,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64,
}
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block
pub size: u64,
}
impl garage_util::migrate::InitialFormat for Version {}
}
mod v08 {
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use super::v05;
/// A version of an object /// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
@ -37,6 +90,28 @@ pub struct Version {
pub key: String, pub key: String,
} }
pub use v05::{VersionBlock, VersionBlockKey};
impl garage_util::migrate::Migrate for Version {
type Previous = v05::Version;
fn migrate(old: v05::Version) -> Version {
use garage_util::data::blake2sum;
Version {
uuid: old.uuid,
deleted: old.deleted,
blocks: old.blocks,
parts_etags: old.parts_etags,
bucket_id: blake2sum(old.bucket.as_bytes()),
key: old.key,
}
}
}
}
pub use v08::*;
impl Version { impl Version {
pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self { pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self { Self {
@ -64,14 +139,6 @@ impl Version {
} }
} }
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part
pub offset: u64,
}
impl Ord for VersionBlockKey { impl Ord for VersionBlockKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering { fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number self.part_number
@ -86,15 +153,6 @@ impl PartialOrd for VersionBlockKey {
} }
} }
/// Informations about a single block
#[derive(PartialEq, Eq, Ord, PartialOrd, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block
pub size: u64,
}
impl AutoCrdt for VersionBlock { impl AutoCrdt for VersionBlock {
const WARN_IF_DIFFERENT: bool = true; const WARN_IF_DIFFERENT: bool = true;
} }
@ -166,42 +224,4 @@ impl TableSchema for VersionTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool { fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.deleted.get()) filter.apply(entry.deleted.get())
} }
fn try_migrate(bytes: &[u8]) -> Option<Self::E> {
let old = rmp_serde::decode::from_read_ref::<_, old::Version>(bytes).ok()?;
let blocks = old
.blocks
.items()
.iter()
.map(|(k, v)| {
(
VersionBlockKey {
part_number: k.part_number,
offset: k.offset,
},
VersionBlock {
hash: Hash::try_from(v.hash.as_slice()).unwrap(),
size: v.size,
},
)
})
.collect::<crdt::Map<_, _>>();
let parts_etags = old
.parts_etags
.items()
.iter()
.map(|(k, v)| (*k, v.clone()))
.collect::<crdt::Map<_, _>>();
Some(Version {
uuid: Hash::try_from(old.uuid.as_slice()).unwrap(),
deleted: crdt::Bool::new(old.deleted.get()),
blocks,
parts_etags,
bucket_id: blake2sum(old.bucket.as_bytes()),
key: old.key,
})
}
} }

View file

@ -35,6 +35,8 @@ pub struct ClusterLayout {
pub staging_hash: Hash, pub staging_hash: Hash,
} }
impl garage_util::migrate::InitialFormat for ClusterLayout {}
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct NodeRoleV(pub Option<NodeRole>); pub struct NodeRoleV(pub Option<NodeRole>);

View file

@ -73,13 +73,17 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>; type Response = Result<SystemRpc, Error>;
} }
#[derive(Serialize, Deserialize)]
pub struct PeerList(Vec<(Uuid, SocketAddr)>);
impl garage_util::migrate::InitialFormat for PeerList {}
/// This node's membership manager /// This node's membership manager
pub struct System { pub struct System {
/// The id of this node /// The id of this node
pub id: Uuid, pub id: Uuid,
persist_cluster_layout: Persister<ClusterLayout>, persist_cluster_layout: Persister<ClusterLayout>,
persist_peer_list: Persister<Vec<(Uuid, SocketAddr)>>, persist_peer_list: Persister<PeerList>,
local_status: ArcSwap<NodeStatus>, local_status: ArcSwap<NodeStatus>,
node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>, node_status: RwLock<HashMap<Uuid, (u64, NodeStatus)>>,
@ -721,7 +725,7 @@ impl System {
// Add peer list from list stored on disk // Add peer list from list stored on disk
if let Ok(peers) = self.persist_peer_list.load_async().await { if let Ok(peers) = self.persist_peer_list.load_async().await {
ping_list.extend(peers.iter().map(|(id, addr)| ((*id).into(), *addr))) ping_list.extend(peers.0.iter().map(|(id, addr)| ((*id).into(), *addr)))
} }
// Fetch peer list from Consul // Fetch peer list from Consul
@ -801,12 +805,16 @@ impl System {
// and append it to the list we are about to save, // and append it to the list we are about to save,
// so that no peer ID gets lost in the process. // so that no peer ID gets lost in the process.
if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await { if let Ok(mut prev_peer_list) = self.persist_peer_list.load_async().await {
prev_peer_list.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id)); prev_peer_list
peer_list.extend(prev_peer_list); .0
.retain(|(id, _ip)| peer_list.iter().all(|(id2, _ip2)| id2 != id));
peer_list.extend(prev_peer_list.0);
} }
// Save new peer list to file // Save new peer list to file
self.persist_peer_list.save_async(&peer_list).await self.persist_peer_list
.save_async(&PeerList(peer_list))
.await
} }
async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) { async fn pull_cluster_layout(self: Arc<Self>, peer: Uuid) {

View file

@ -10,6 +10,7 @@ use garage_db::counted_tree_hack::CountedTree;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
use garage_util::migrate::Migrate;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -219,7 +220,8 @@ where
// data format, the messagepack encoding changed. In this case, // data format, the messagepack encoding changed. In this case,
// we also have to write the migrated value in the table and update // we also have to write the migrated value in the table and update
// the associated Merkle tree entry. // the associated Merkle tree entry.
let new_bytes = rmp_to_vec_all_named(&new_entry) let new_bytes = new_entry
.encode()
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
.map_err(db::TxError::Abort)?; .map_err(db::TxError::Abort)?;
let changed = Some(&new_bytes[..]) != old_bytes.as_deref(); let changed = Some(&new_bytes[..]) != old_bytes.as_deref();
@ -329,9 +331,9 @@ where
Some(old_v) => { Some(old_v) => {
let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?; let mut entry = self.decode_entry(&old_v).map_err(db::TxError::Abort)?;
entry.merge(ins); entry.merge(ins);
rmp_to_vec_all_named(&entry) entry.encode()
} }
None => rmp_to_vec_all_named(ins), None => ins.encode(),
}; };
let new_entry = new_entry let new_entry = new_entry
.map_err(Error::RmpEncode) .map_err(Error::RmpEncode)
@ -351,18 +353,18 @@ where
} }
pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> { pub fn decode_entry(&self, bytes: &[u8]) -> Result<F::E, Error> {
match rmp_serde::decode::from_read_ref::<_, F::E>(bytes) { match F::E::decode(bytes) {
Ok(x) => Ok(x),
Err(e) => match F::try_migrate(bytes) {
Some(x) => Ok(x), Some(x) => Ok(x),
None => { None => {
warn!("Unable to decode entry of {}: {}", F::TABLE_NAME, e); error!("Unable to decode entry of {}", F::TABLE_NAME);
for line in hexdump::hexdump_iter(bytes) { for line in hexdump::hexdump_iter(bytes) {
debug!("{}", line); debug!("{}", line);
} }
Err(e.into()) Err(Error::Message(format!(
"Unable to decode entry of {}",
F::TABLE_NAME
)))
} }
},
} }
} }

View file

@ -2,6 +2,7 @@ use serde::{Deserialize, Serialize};
use garage_db as db; use garage_db as db;
use garage_util::data::*; use garage_util::data::*;
use garage_util::migrate::Migrate;
use crate::crdt::Crdt; use crate::crdt::Crdt;
@ -46,7 +47,7 @@ impl SortKey for FixedBytes32 {
/// Trait for an entry in a table. It must be sortable and partitionnable. /// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>: pub trait Entry<P: PartitionKey, S: SortKey>:
Crdt + PartialEq + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static
{ {
/// Get the key used to partition /// Get the key used to partition
fn partition_key(&self) -> &P; fn partition_key(&self) -> &P;
@ -65,23 +66,23 @@ pub trait TableSchema: Send + Sync + 'static {
const TABLE_NAME: &'static str; const TABLE_NAME: &'static str;
/// The partition key used in that table /// The partition key used in that table
type P: PartitionKey + Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync; type P: PartitionKey
+ Clone
+ PartialEq
+ Serialize
+ for<'de> Deserialize<'de>
+ Send
+ Sync
+ 'static;
/// The sort key used int that table /// The sort key used int that table
type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type S: SortKey + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
/// They type for an entry in that table /// They type for an entry in that table
type E: Entry<Self::P, Self::S>; type E: Entry<Self::P, Self::S>;
/// The type for a filter that can be applied to select entries /// The type for a filter that can be applied to select entries
/// (e.g. filter out deleted entries) /// (e.g. filter out deleted entries)
type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync; type Filter: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
// Action to take if not able to decode current version:
// try loading from an older version
/// Try migrating an entry from an older version
fn try_migrate(_bytes: &[u8]) -> Option<Self::E> {
None
}
/// Actions triggered by data changing in a table. If such actions /// Actions triggered by data changing in a table. If such actions
/// include updates to the local database that should be applied /// include updates to the local database that should be applied

View file

@ -302,7 +302,7 @@ where
); );
return Ok(()); return Ok(());
} }
let root_ck_hash = hash_of::<MerkleNode>(&root_ck)?; let root_ck_hash = hash_of_merkle_node(&root_ck)?;
// Check if they have the same root checksum // Check if they have the same root checksum
// If so, do nothing. // If so, do nothing.
@ -468,7 +468,7 @@ where
match message { match message {
SyncRpc::RootCkHash(range, h) => { SyncRpc::RootCkHash(range, h) => {
let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; let (_root_ck_key, root_ck) = self.get_root_ck(*range)?;
let hash = hash_of::<MerkleNode>(&root_ck)?; let hash = hash_of_merkle_node(&root_ck)?;
Ok(SyncRpc::RootCkDifferent(hash != *h)) Ok(SyncRpc::RootCkDifferent(hash != *h))
} }
SyncRpc::GetNode(k) => { SyncRpc::GetNode(k) => {
@ -622,7 +622,7 @@ impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWor
// ---- UTIL ---- // ---- UTIL ----
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> { fn hash_of_merkle_node(x: &MerkleNode) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..])) Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
} }

View file

@ -18,6 +18,7 @@ use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
use garage_util::migrate::Migrate;
use garage_rpc::system::System; use garage_rpc::system::System;
use garage_rpc::*; use garage_rpc::*;
@ -122,7 +123,7 @@ where
let hash = e.partition_key().hash(); let hash = e.partition_key().hash();
let who = self.data.replication.write_nodes(&hash); let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); let e_enc = Arc::new(ByteBuf::from(e.encode()?));
let rpc = TableRpc::<F>::Update(vec![e_enc]); let rpc = TableRpc::<F>::Update(vec![e_enc]);
self.system self.system
@ -173,7 +174,7 @@ where
let entry = entry.borrow(); let entry = entry.borrow();
let hash = entry.partition_key().hash(); let hash = entry.partition_key().hash();
let who = self.data.replication.write_nodes(&hash); let who = self.data.replication.write_nodes(&hash);
let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
for node in who { for node in who {
call_list.entry(node).or_default().push(e_enc.clone()); call_list.entry(node).or_default().push(e_enc.clone());
} }
@ -412,7 +413,7 @@ where
// =============== UTILITY FUNCTION FOR CLIENT OPERATIONS =============== // =============== UTILITY FUNCTION FOR CLIENT OPERATIONS ===============
async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> { async fn repair_on_read(&self, who: &[Uuid], what: F::E) -> Result<(), Error> {
let what_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(&what)?)); let what_enc = Arc::new(ByteBuf::from(what.encode()?));
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(

View file

@ -23,6 +23,7 @@ bytes = "1.0"
digest = "0.10" digest = "0.10"
err-derive = "0.3" err-derive = "0.3"
git-version = "0.3.4" git-version = "0.3.4"
hexdump = "0.1"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
hex = "0.4" hex = "0.4"
lazy_static = "1.4" lazy_static = "1.4"

View file

@ -11,6 +11,7 @@ pub mod data;
pub mod error; pub mod error;
pub mod formater; pub mod formater;
pub mod metrics; pub mod metrics;
pub mod migrate;
pub mod persister; pub mod persister;
pub mod time; pub mod time;
pub mod token_bucket; pub mod token_bucket;

75
src/util/migrate.rs Normal file
View file

@ -0,0 +1,75 @@
use serde::{Deserialize, Serialize};
pub trait Migrate: Serialize + for<'de> Deserialize<'de> + 'static {
/// A sequence of bytes to add at the beginning of the serialized
/// string, to identify that the data is of this version.
const MARKER: &'static [u8] = b"";
/// The previous version of this data type, from which items of this version
/// can be migrated. Set `type Previous = NoPrevious` to indicate that this datatype
/// is the initial schema and cannot be migrated.
type Previous: Migrate;
/// This function must be filled in by implementors to migrate from a previons iteration
/// of the data format.
fn migrate(previous: Self::Previous) -> Self;
fn decode(bytes: &[u8]) -> Option<Self> {
if bytes.len() >= Self::MARKER.len() && &bytes[..Self::MARKER.len()] == Self::MARKER {
if let Ok(value) =
rmp_serde::decode::from_read_ref::<_, Self>(&bytes[Self::MARKER.len()..])
{
return Some(value);
}
}
Self::Previous::decode(bytes).map(Self::migrate)
}
fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
let mut wr = Vec::with_capacity(128);
wr.extend_from_slice(Self::MARKER);
let mut se = rmp_serde::Serializer::new(&mut wr)
.with_struct_map()
.with_string_variants();
self.serialize(&mut se)?;
Ok(wr)
}
}
pub trait InitialFormat: Serialize + for<'de> Deserialize<'de> + 'static {
/// A sequence of bytes to add at the beginning of the serialized
/// string, to identify that the data is of this version.
const MARKER: &'static [u8] = b"";
}
// ----
impl<T: InitialFormat> Migrate for T {
const MARKER: &'static [u8] = <T as InitialFormat>::MARKER;
type Previous = NoPrevious;
fn migrate(_previous: Self::Previous) -> Self {
unreachable!();
}
}
#[derive(Serialize, Deserialize)]
pub struct NoPrevious;
impl Migrate for NoPrevious {
type Previous = NoPrevious;
fn migrate(_previous: Self::Previous) -> Self {
unreachable!();
}
fn decode(_bytes: &[u8]) -> Option<Self> {
None
}
fn encode(&self) -> Result<Vec<u8>, rmp_serde::encode::Error> {
unreachable!()
}
}

View file

@ -3,21 +3,16 @@ use std::path::{Path, PathBuf};
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use serde::{Deserialize, Serialize};
use crate::data::*;
use crate::error::Error; use crate::error::Error;
use crate::migrate::Migrate;
pub struct Persister<T: Serialize + for<'de> Deserialize<'de>> { pub struct Persister<T: Migrate> {
path: PathBuf, path: PathBuf,
_marker: std::marker::PhantomData<T>, _marker: std::marker::PhantomData<T>,
} }
impl<T> Persister<T> impl<T: Migrate> Persister<T> {
where
T: Serialize + for<'de> Deserialize<'de>,
{
pub fn new(base_dir: &Path, file_name: &str) -> Self { pub fn new(base_dir: &Path, file_name: &str) -> Self {
let mut path = base_dir.to_path_buf(); let mut path = base_dir.to_path_buf();
path.push(file_name); path.push(file_name);
@ -27,18 +22,37 @@ where
} }
} }
fn decode(&self, bytes: &[u8]) -> Result<T, Error> {
match T::decode(bytes) {
Some(v) => Ok(v),
None => {
error!(
"Unable to decode persisted data file {}",
self.path.display()
);
for line in hexdump::hexdump_iter(bytes) {
debug!("{}", line);
}
Err(Error::Message(format!(
"Unable to decode persisted data file {}",
self.path.display()
)))
}
}
}
pub fn load(&self) -> Result<T, Error> { pub fn load(&self) -> Result<T, Error> {
let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?; let mut file = std::fs::OpenOptions::new().read(true).open(&self.path)?;
let mut bytes = vec![]; let mut bytes = vec![];
file.read_to_end(&mut bytes)?; file.read_to_end(&mut bytes)?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?; let value = self.decode(&bytes[..])?;
Ok(value) Ok(value)
} }
pub fn save(&self, t: &T) -> Result<(), Error> { pub fn save(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?; let bytes = t.encode()?;
let mut file = std::fs::OpenOptions::new() let mut file = std::fs::OpenOptions::new()
.write(true) .write(true)
@ -57,12 +71,12 @@ where
let mut bytes = vec![]; let mut bytes = vec![];
file.read_to_end(&mut bytes).await?; file.read_to_end(&mut bytes).await?;
let value = rmp_serde::decode::from_read_ref(&bytes[..])?; let value = self.decode(&bytes[..])?;
Ok(value) Ok(value)
} }
pub async fn save_async(&self, t: &T) -> Result<(), Error> { pub async fn save_async(&self, t: &T) -> Result<(), Error> {
let bytes = rmp_to_vec_all_named(t)?; let bytes = t.encode()?;
let mut file = tokio::fs::File::create(&self.path).await?; let mut file = tokio::fs::File::create(&self.path).await?;
file.write_all(&bytes[..]).await?; file.write_all(&bytes[..]).await?;