diff --git a/src/model/garage.rs b/src/model/garage.rs
index 9b7121db..31da1715 100644
--- a/src/model/garage.rs
+++ b/src/model/garage.rs
@@ -17,6 +17,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::s3::block_ref_table::*;
+use crate::s3::mpu_table::*;
use crate::s3::object_table::*;
use crate::s3::version_table::*;
@@ -57,6 +58,10 @@ pub struct Garage {
pub object_table: Arc
>,
/// Counting table containing object counters
pub object_counter_table: Arc>,
+ /// Table containing S3 multipart uploads
+ pub mpu_table: Arc>,
+ /// Counting table containing multipart object counters
+ pub mpu_counter_table: Arc>,
/// Table containing S3 object versions
pub version_table: Arc>,
/// Table containing S3 block references (not blocks themselves)
@@ -261,6 +266,20 @@ impl Garage {
&db,
);
+ info!("Initialize multipart upload counter table...");
+ let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
+
+ info!("Initialize multipart upload table...");
+ let mpu_table = Table::new(
+ MultipartUploadTable {
+ version_table: version_table.clone(),
+ mpu_counter_table: mpu_counter_table.clone(),
+ },
+ meta_rep_param.clone(),
+ system.clone(),
+ &db,
+ );
+
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
@@ -269,6 +288,7 @@ impl Garage {
let object_table = Table::new(
ObjectTable {
version_table: version_table.clone(),
+ mpu_table: mpu_table.clone(),
object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
@@ -297,6 +317,8 @@ impl Garage {
key_table,
object_table,
object_counter_table,
+ mpu_table,
+ mpu_counter_table,
version_table,
block_ref_table,
#[cfg(feature = "k2v")]
diff --git a/src/model/helper/bucket.rs b/src/model/helper/bucket.rs
index 4a488d7f..ff711d29 100644
--- a/src/model/helper/bucket.rs
+++ b/src/model/helper/bucket.rs
@@ -496,7 +496,9 @@ impl<'a> BucketHelper<'a> {
.get_range(
bucket_id,
start,
- Some(ObjectFilter::IsUploading),
+ Some(ObjectFilter::IsUploading {
+ check_multipart: None,
+ }),
1000,
EnumerationOrder::Forward,
)
@@ -508,7 +510,7 @@ impl<'a> BucketHelper<'a> {
let aborted_versions = object
.versions()
.iter()
- .filter(|v| v.is_uploading() && v.timestamp < older_than)
+ .filter(|v| v.is_uploading(None) && v.timestamp < older_than)
.map(|v| ObjectVersion {
state: ObjectVersionState::Aborted,
uuid: v.uuid,
diff --git a/src/model/s3/mod.rs b/src/model/s3/mod.rs
index 4e94337d..36d67093 100644
--- a/src/model/s3/mod.rs
+++ b/src/model/s3/mod.rs
@@ -1,3 +1,4 @@
pub mod block_ref_table;
+pub mod mpu_table;
pub mod object_table;
pub mod version_table;
diff --git a/src/model/s3/mpu_table.rs b/src/model/s3/mpu_table.rs
new file mode 100644
index 00000000..dc5b5a82
--- /dev/null
+++ b/src/model/s3/mpu_table.rs
@@ -0,0 +1,231 @@
+use std::sync::Arc;
+
+use garage_db as db;
+
+use garage_util::data::*;
+
+use garage_table::crdt::*;
+use garage_table::replication::TableShardedReplication;
+use garage_table::*;
+
+use crate::index_counter::*;
+use crate::s3::version_table::*;
+
+pub const UPLOADS: &str = "uploads";
+pub const PARTS: &str = "parts";
+pub const BYTES: &str = "bytes";
+
+mod v09 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ pub use crate::s3::version_table::v09::VersionBlock;
+
+ /// A part of a multipart upload
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct MultipartUpload {
+ /// Partition key = Upload id = UUID of the object version
+ pub upload_id: Uuid,
+
+ /// Is this multipart upload deleted
+ pub deleted: crdt::Bool,
+ /// List of uploaded parts, key = (part number, timestamp)
+ /// In case of retries, all versions for each part are kept
+ /// Everything is cleaned up only once the multipart upload is completed or
+ /// aborted
+ pub parts: crdt::Map,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ /// Bucket in which the related object is stored
+ pub bucket_id: Uuid,
+ /// Key in which the related object is stored
+ pub key: String,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
+ pub struct MpuPartKey {
+ /// Number of the part
+ pub part_number: u64,
+ /// Timestamp of part upload
+ pub timestamp: u64,
+ }
+
+ /// The version of an uploaded part
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct MpuPart {
+ /// Links to a Version in VersionTable
+ pub version: Uuid,
+ /// ETag of the content of this part (known only once done uploading)
+ pub etag: Option,
+ /// Size of this part (known only once done uploading)
+ pub size: Option,
+ }
+
+ impl garage_util::migrate::InitialFormat for MultipartUpload {
+ const VERSION_MARKER: &'static [u8] = b"G09s3mpu";
+ }
+}
+
+pub use v09::*;
+
+impl Ord for MpuPartKey {
+ fn cmp(&self, other: &Self) -> std::cmp::Ordering {
+ self.part_number
+ .cmp(&other.part_number)
+ .then(self.timestamp.cmp(&other.timestamp))
+ }
+}
+
+impl PartialOrd for MpuPartKey {
+ fn partial_cmp(&self, other: &Self) -> Option {
+ Some(self.cmp(other))
+ }
+}
+
+impl MultipartUpload {
+ pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
+ Self {
+ upload_id,
+ deleted: crdt::Bool::new(deleted),
+ parts: crdt::Map::new(),
+ bucket_id,
+ key,
+ }
+ }
+}
+
+impl Entry for MultipartUpload {
+ fn partition_key(&self) -> &Uuid {
+ &self.upload_id
+ }
+ fn sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+ fn is_tombstone(&self) -> bool {
+ self.deleted.get()
+ }
+}
+
+impl Crdt for MultipartUpload {
+ fn merge(&mut self, other: &Self) {
+ self.deleted.merge(&other.deleted);
+
+ if self.deleted.get() {
+ self.parts.clear();
+ } else {
+ self.parts.merge(&other.parts);
+ }
+ }
+}
+
+impl Crdt for MpuPart {
+ fn merge(&mut self, other: &Self) {
+ self.etag = match (self.etag.take(), &other.etag) {
+ (None, Some(_)) => other.etag.clone(),
+ (Some(x), Some(y)) if x < *y => other.etag.clone(),
+ (x, _) => x,
+ };
+ self.size = match (self.size, other.size) {
+ (None, Some(_)) => other.size,
+ (Some(x), Some(y)) if x < y => other.size,
+ (x, _) => x,
+ };
+ }
+}
+
+pub struct MultipartUploadTable {
+ pub version_table: Arc>,
+ pub mpu_counter_table: Arc>,
+}
+
+impl TableSchema for MultipartUploadTable {
+ const TABLE_NAME: &'static str = "multipart_upload";
+
+ type P = Uuid;
+ type S = EmptyKey;
+ type E = MultipartUpload;
+ type Filter = DeletedFilter;
+
+ fn updated(
+ &self,
+ tx: &mut db::Transaction,
+ old: Option<&Self::E>,
+ new: Option<&Self::E>,
+ ) -> db::TxOpResult<()> {
+ // 1. Count
+ let counter_res = self.mpu_counter_table.count(tx, old, new);
+ if let Err(e) = db::unabort(counter_res)? {
+ error!(
+ "Unable to update multipart object part counter: {}. Index values will be wrong!",
+ e
+ );
+ }
+
+ // 2. Propagate deletions to version table
+ if let (Some(old_mpu), Some(new_mpu)) = (old, new) {
+ if new_mpu.deleted.get() && !old_mpu.deleted.get() {
+ let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| {
+ Version::new(
+ p.version,
+ VersionBacklink::MultipartUpload {
+ upload_id: old_mpu.upload_id,
+ },
+ true,
+ )
+ });
+ for version in deleted_versions {
+ let res = self.version_table.queue_insert(tx, &version);
+ if let Err(e) = db::unabort(res)? {
+ error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e);
+ }
+ }
+ }
+ }
+
+ Ok(())
+ }
+
+ fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
+ filter.apply(entry.is_tombstone())
+ }
+}
+
+impl CountedItem for MultipartUpload {
+ const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter";
+
+ // Partition key = bucket id
+ type CP = Uuid;
+ // Sort key = nothing
+ type CS = EmptyKey;
+
+ fn counter_partition_key(&self) -> &Uuid {
+ &self.bucket_id
+ }
+ fn counter_sort_key(&self) -> &EmptyKey {
+ &EmptyKey
+ }
+
+ fn counts(&self) -> Vec<(&'static str, i64)> {
+ let uploads = if self.deleted.get() { 0 } else { 1 };
+ let mut parts = self
+ .parts
+ .items()
+ .iter()
+ .map(|(k, _)| k.part_number)
+ .collect::>();
+ parts.dedup();
+ let bytes = self
+ .parts
+ .items()
+ .iter()
+ .map(|(_, p)| p.size.unwrap_or(0))
+ .sum::();
+ vec![
+ (UPLOADS, uploads),
+ (PARTS, parts.len() as i64),
+ (BYTES, bytes as i64),
+ ]
+ }
+}
diff --git a/src/model/s3/object_table.rs b/src/model/s3/object_table.rs
index 518acc95..a69069b2 100644
--- a/src/model/s3/object_table.rs
+++ b/src/model/s3/object_table.rs
@@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::index_counter::*;
+use crate::s3::mpu_table::*;
use crate::s3::version_table::*;
pub const OBJECTS: &str = "objects";
@@ -130,7 +131,86 @@ mod v08 {
}
}
-pub use v08::*;
+mod v09 {
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v08;
+
+ pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta};
+
+ /// An object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Object {
+ /// The bucket in which the object is stored, used as partition key
+ pub bucket_id: Uuid,
+
+ /// The key at which the object is stored in its bucket, used as sorting key
+ pub key: String,
+
+ /// The list of currenty stored versions of the object
+ pub(super) versions: Vec,
+ }
+
+ /// Informations about a version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct ObjectVersion {
+ /// Id of the version
+ pub uuid: Uuid,
+ /// Timestamp of when the object was created
+ pub timestamp: u64,
+ /// State of the version
+ pub state: ObjectVersionState,
+ }
+
+ /// State of an object version
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum ObjectVersionState {
+ /// The version is being received
+ Uploading {
+ /// Indicates whether this is a multipart upload
+ multipart: bool,
+ /// Headers to be included in the final object
+ headers: ObjectVersionHeaders,
+ },
+ /// The version is fully received
+ Complete(ObjectVersionData),
+ /// The version uploaded containded errors or the upload was explicitly aborted
+ Aborted,
+ }
+
+ impl garage_util::migrate::Migrate for Object {
+ const VERSION_MARKER: &'static [u8] = b"G09s3o";
+
+ type Previous = v08::Object;
+
+ fn migrate(old: v08::Object) -> Object {
+ let versions = old
+ .versions
+ .into_iter()
+ .map(|x| ObjectVersion {
+ uuid: x.uuid,
+ timestamp: x.timestamp,
+ state: match x.state {
+ v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading {
+ multipart: false,
+ headers: h,
+ },
+ v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d),
+ v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
+ },
+ })
+ .collect();
+ Object {
+ bucket_id: old.bucket_id,
+ key: old.key,
+ versions,
+ }
+ }
+ }
+}
+
+pub use v09::*;
impl Object {
/// Initialize an Object struct from parts
@@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState {
Complete(a) => {
a.merge(b);
}
- Uploading(_) => {
+ Uploading { .. } => {
*self = Complete(b.clone());
}
},
- Uploading(_) => {}
+ Uploading { .. } => {}
}
}
}
@@ -199,8 +279,17 @@ impl ObjectVersion {
}
/// Is the object version currently being uploaded
- pub fn is_uploading(&self) -> bool {
- matches!(self.state, ObjectVersionState::Uploading(_))
+ ///
+ /// matches only multipart uploads if check_multipart is Some(true)
+ /// matches only non-multipart uploads if check_multipart is Some(false)
+ /// matches both if check_multipart is None
+ pub fn is_uploading(&self, check_multipart: Option) -> bool {
+ match &self.state {
+ ObjectVersionState::Uploading { multipart, .. } => {
+ check_multipart.map(|x| x == *multipart).unwrap_or(true)
+ }
+ _ => false,
+ }
}
/// Is the object version completely received
@@ -267,13 +356,20 @@ impl Crdt for Object {
pub struct ObjectTable {
pub version_table: Arc>,
+ pub mpu_table: Arc>,
pub object_counter_table: Arc>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ObjectFilter {
+ /// Is the object version available (received and not a tombstone)
IsData,
- IsUploading,
+ /// Is the object version currently being uploaded
+ ///
+ /// matches only multipart uploads if check_multipart is Some(true)
+ /// matches only non-multipart uploads if check_multipart is Some(false)
+ /// matches both if check_multipart is None
+ IsUploading { check_multipart: Option },
}
impl TableSchema for ObjectTable {
@@ -314,8 +410,29 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
- let deleted_version =
- Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ if let ObjectVersionState::Uploading {
+ multipart: true, ..
+ } = &v.state
+ {
+ let deleted_mpu =
+ MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
+ let res = self.mpu_table.queue_insert(tx, &deleted_mpu);
+ if let Err(e) = db::unabort(res)? {
+ error!(
+ "Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.",
+ e
+ );
+ }
+ }
+
+ let deleted_version = Version::new(
+ v.uuid,
+ VersionBacklink::Object {
+ bucket_id: old_v.bucket_id,
+ key: old_v.key.clone(),
+ },
+ true,
+ );
let res = self.version_table.queue_insert(tx, &deleted_version);
if let Err(e) = db::unabort(res)? {
error!(
@@ -333,7 +450,10 @@ impl TableSchema for ObjectTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
match filter {
ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()),
- ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
+ ObjectFilter::IsUploading { check_multipart } => entry
+ .versions
+ .iter()
+ .any(|v| v.is_uploading(*check_multipart)),
}
}
}
@@ -360,10 +480,7 @@ impl CountedItem for Object {
} else {
0
};
- let n_unfinished_uploads = versions
- .iter()
- .filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
- .count();
+ let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count();
let n_bytes = versions
.iter()
.map(|v| match &v.state {
diff --git a/src/model/s3/version_table.rs b/src/model/s3/version_table.rs
index 6edc83f4..6cf1cc75 100644
--- a/src/model/s3/version_table.rs
+++ b/src/model/s3/version_table.rs
@@ -66,6 +66,8 @@ mod v08 {
use super::v05;
+ pub use v05::{VersionBlock, VersionBlockKey};
+
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
@@ -90,8 +92,6 @@ mod v08 {
pub key: String,
}
- pub use v05::{VersionBlock, VersionBlockKey};
-
impl garage_util::migrate::Migrate for Version {
type Previous = v05::Version;
@@ -110,32 +110,83 @@ mod v08 {
}
}
-pub use v08::*;
+pub(crate) mod v09 {
+ use garage_util::crdt;
+ use garage_util::data::Uuid;
+ use serde::{Deserialize, Serialize};
+
+ use super::v08;
+
+ pub use v08::{VersionBlock, VersionBlockKey};
+
+ /// A version of an object
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub struct Version {
+ /// UUID of the version, used as partition key
+ pub uuid: Uuid,
+
+ // Actual data: the blocks for this version
+ // In the case of a multipart upload, also store the etags
+ // of individual parts and check them when doing CompleteMultipartUpload
+ /// Is this version deleted
+ pub deleted: crdt::Bool,
+ /// list of blocks of data composing the version
+ pub blocks: crdt::Map,
+
+ // Back link to bucket+key so that we can figure if
+ // this was deleted later on
+ pub backlink: VersionBacklink,
+ }
+
+ #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
+ pub enum VersionBacklink {
+ Object {
+ /// Bucket in which the related object is stored
+ bucket_id: Uuid,
+ /// Key in which the related object is stored
+ key: String,
+ },
+ MultipartUpload {
+ upload_id: Uuid,
+ },
+ }
+
+ impl garage_util::migrate::Migrate for Version {
+ const VERSION_MARKER: &'static [u8] = b"G09s3v";
+
+ type Previous = v08::Version;
+
+ fn migrate(old: v08::Version) -> Version {
+ Version {
+ uuid: old.uuid,
+ deleted: old.deleted,
+ blocks: old.blocks,
+ backlink: VersionBacklink::Object {
+ bucket_id: old.bucket_id,
+ key: old.key,
+ },
+ }
+ }
+ }
+}
+
+pub use v09::*;
impl Version {
- pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
+ pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self {
Self {
uuid,
deleted: deleted.into(),
blocks: crdt::Map::new(),
- parts_etags: crdt::Map::new(),
- bucket_id,
- key,
+ backlink,
}
}
pub fn has_part_number(&self, part_number: u64) -> bool {
- let case1 = self
- .parts_etags
- .items()
- .binary_search_by(|(k, _)| k.cmp(&part_number))
- .is_ok();
- let case2 = self
- .blocks
+ self.blocks
.items()
.binary_search_by(|(k, _)| k.part_number.cmp(&part_number))
- .is_ok();
- case1 || case2
+ .is_ok()
}
}
@@ -175,10 +226,8 @@ impl Crdt for Version {
if self.deleted.get() {
self.blocks.clear();
- self.parts_etags.clear();
} else {
self.blocks.merge(&other.blocks);
- self.parts_etags.merge(&other.parts_etags);
}
}
}
diff --git a/src/rpc/layout.rs b/src/rpc/layout.rs
index b6c2fd27..c2655e59 100644
--- a/src/rpc/layout.rs
+++ b/src/rpc/layout.rs
@@ -119,7 +119,7 @@ mod v09 {
}
impl garage_util::migrate::Migrate for ClusterLayout {
- const VERSION_MARKER: &'static [u8] = b"Glayout09";
+ const VERSION_MARKER: &'static [u8] = b"G09layout";
type Previous = v08::ClusterLayout;
diff --git a/src/table/schema.rs b/src/table/schema.rs
index 44b10898..7daf3000 100644
--- a/src/table/schema.rs
+++ b/src/table/schema.rs
@@ -8,6 +8,8 @@ use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
+// =================================== PARTITION KEYS
+
/// Trait for field used to partition data
pub trait PartitionKey:
Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
@@ -31,6 +33,8 @@ impl PartitionKey for FixedBytes32 {
}
}
+// =================================== SORT KEYS
+
/// Trait for field used to sort data
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
@@ -49,6 +53,14 @@ impl SortKey for FixedBytes32 {
}
}
+impl SortKey for u32 {
+ fn sort_key(&self) -> Cow<'_, [u8]> {
+ Cow::from(u32::to_be_bytes(*self).to_vec())
+ }
+}
+
+// =================================== SCHEMA
+
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry:
Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static