From a5fa2a136b4875fb45d990b2d94437e5d77a7ae5 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 8 Jul 2020 16:46:47 +0200 Subject: [PATCH] (WIP) New object table model, TODO: update API calls to use it --- src/model/object_table.rs | 119 +++++++++++++++++++++++++++++--------- 1 file changed, 92 insertions(+), 27 deletions(-) diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 01df70e65..2221c57e5 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -11,6 +11,8 @@ use garage_table::*; use crate::version_table::*; +use model010::object_table as prev; + #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub struct Object { // Primary key @@ -59,39 +61,59 @@ pub struct ObjectVersion { pub uuid: UUID, pub timestamp: u64, - pub mime_type: String, - pub size: u64, pub state: ObjectVersionState, - - pub data: ObjectVersionData, } -#[derive(PartialEq, Clone, Copy, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionState { Uploading, - Complete, + Complete(ObjectVersionData), Aborted, } impl ObjectVersionState { - fn max(self, other: Self) -> Self { + fn merge(&mut self, other: &Self) { use ObjectVersionState::*; - if self == Aborted || other == Aborted { - Aborted - } else if self == Complete || other == Complete { - Complete - } else { - Uploading - } + match other { + Aborted => { + *self = Aborted; + } + Complete(b) => { + match self { + Aborted => {}, + Complete(a) => { + a.merge(b); + } + Uploading => { + *self = Complete(b.clone()); + } + } + } + Uploading => {} + } } } #[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] pub enum ObjectVersionData { - Uploading, DeleteMarker, - Inline(#[serde(with = "serde_bytes")] Vec), - FirstBlock(Hash), + Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec), + FirstBlock(ObjectVersionMeta, Hash), +} + +#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)] +pub struct ObjectVersionMeta { + pub mime_type: String, + pub size: u64, + pub etag: String, +} + +impl ObjectVersionData { + fn merge(&mut self, b: &Self) { + if *self != *b { + warn!("Inconsistent object version data: {:?} (local) vs {:?} (remote)", self, b); + } + } } impl ObjectVersion { @@ -99,10 +121,17 @@ impl ObjectVersion { (self.timestamp, self.uuid) } pub fn is_complete(&self) -> bool { - self.state == ObjectVersionState::Complete + match self.state { + ObjectVersionState::Complete(_) => true, + _ => false, + } } pub fn is_data(&self) -> bool { - self.state == ObjectVersionState::Complete && self.data != ObjectVersionData::DeleteMarker + match self.state { + ObjectVersionState::Complete(ObjectVersionData::DeleteMarker) => false, + ObjectVersionState::Complete(_) => true, + _ => false, + } } } @@ -121,14 +150,7 @@ impl Entry for Object { .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) { Ok(i) => { - let mut v = &mut self.versions[i]; - if other_v.size > v.size { - v.size = other_v.size; - } - v.state = v.state.max(other_v.state); - if v.data == ObjectVersionData::Uploading { - v.data = other_v.data.clone(); - } + self.versions[i].state.merge(&other_v.state); } Err(i) => { self.versions.insert(i, other_v.clone()); @@ -195,4 +217,47 @@ impl TableSchema for ObjectTable { fn matches_filter(entry: &Self::E, _filter: &Self::Filter) -> bool { entry.versions.iter().any(|v| v.is_data()) } + + fn try_migrate(bytes: &[u8]) -> Option { + let old = match rmp_serde::decode::from_read_ref::<_, prev::Object>(bytes) { + Ok(x) => x, + Err(_) => return None, + }; + let new_v = old.versions().iter() + .map(migrate_version) + .collect::>(); + let new = Object::new(old.bucket.clone(), old.key.clone(), new_v); + Some(new) + } +} + +fn migrate_version(old: &prev::ObjectVersion) -> ObjectVersion { + let meta = ObjectVersionMeta{ + size: old.size, + mime_type: old.mime_type.clone(), + etag: "".to_string(), + }; + let state = match old.state { + prev::ObjectVersionState::Uploading => ObjectVersionState::Uploading, + prev::ObjectVersionState::Aborted => ObjectVersionState::Aborted, + prev::ObjectVersionState::Complete => { + match &old.data { + prev::ObjectVersionData::Uploading => ObjectVersionState::Uploading, + prev::ObjectVersionData::DeleteMarker => ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + prev::ObjectVersionData::Inline(x) => ObjectVersionState::Complete(ObjectVersionData::Inline(meta, x.clone())), + prev::ObjectVersionData::FirstBlock(h) => { + let mut hash = [0u8; 32]; + hash.copy_from_slice(h.as_ref()); + ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, Hash::from(hash))) + } + } + } + }; + let mut uuid = [0u8; 32]; + uuid.copy_from_slice(old.uuid.as_ref()); + ObjectVersion{ + uuid: UUID::from(uuid), + timestamp: old.timestamp, + state, + } }