Fix #204 (full Multipart Uploads semantics) #553

Merged
lx merged 20 commits from nlnet-task1 into next 2023-06-09 15:34:10 +00:00
8 changed files with 468 additions and 34 deletions
Showing only changes of commit 38d6ac4295 - Show all commits

View file

@ -17,6 +17,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::s3::block_ref_table::*;
use crate::s3::mpu_table::*;
use crate::s3::object_table::*;
use crate::s3::version_table::*;
@ -57,6 +58,10 @@ pub struct Garage {
pub object_table: Arc<Table<ObjectTable, TableShardedReplication>>,
/// Counting table containing object counters
pub object_counter_table: Arc<IndexCounter<Object>>,
/// Table containing S3 multipart uploads
pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>,
/// Counting table containing multipart object counters
pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>,
/// Table containing S3 object versions
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
/// Table containing S3 block references (not blocks themselves)
@ -261,6 +266,20 @@ impl Garage {
&db,
);
info!("Initialize multipart upload counter table...");
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
info!("Initialize multipart upload table...");
let mpu_table = Table::new(
MultipartUploadTable {
version_table: version_table.clone(),
mpu_counter_table: mpu_counter_table.clone(),
},
meta_rep_param.clone(),
system.clone(),
&db,
);
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
@ -269,6 +288,7 @@ impl Garage {
let object_table = Table::new(
ObjectTable {
version_table: version_table.clone(),
mpu_table: mpu_table.clone(),
object_counter_table: object_counter_table.clone(),
},
meta_rep_param.clone(),
@ -297,6 +317,8 @@ impl Garage {
key_table,
object_table,
object_counter_table,
mpu_table,
mpu_counter_table,
version_table,
block_ref_table,
#[cfg(feature = "k2v")]

View file

@ -496,7 +496,9 @@ impl<'a> BucketHelper<'a> {
.get_range(
bucket_id,
start,
Some(ObjectFilter::IsUploading),
Some(ObjectFilter::IsUploading {
check_multipart: None,
}),
1000,
EnumerationOrder::Forward,
)
@ -508,7 +510,7 @@ impl<'a> BucketHelper<'a> {
let aborted_versions = object
.versions()
.iter()
.filter(|v| v.is_uploading() && v.timestamp < older_than)
.filter(|v| v.is_uploading(None) && v.timestamp < older_than)
.map(|v| ObjectVersion {
state: ObjectVersionState::Aborted,
uuid: v.uuid,

View file

@ -1,3 +1,4 @@
pub mod block_ref_table;
pub mod mpu_table;
pub mod object_table;
pub mod version_table;

231
src/model/s3/mpu_table.rs Normal file
View file

@ -0,0 +1,231 @@
use std::sync::Arc;
use garage_db as db;
use garage_util::data::*;
use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::index_counter::*;
use crate::s3::version_table::*;
pub const UPLOADS: &str = "uploads";
pub const PARTS: &str = "parts";
pub const BYTES: &str = "bytes";
mod v09 {
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
pub use crate::s3::version_table::v09::VersionBlock;
/// A part of a multipart upload
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct MultipartUpload {
/// Partition key = Upload id = UUID of the object version
pub upload_id: Uuid,
/// Is this multipart upload deleted
pub deleted: crdt::Bool,
/// List of uploaded parts, key = (part number, timestamp)
/// In case of retries, all versions for each part are kept
/// Everything is cleaned up only once the multipart upload is completed or
/// aborted
pub parts: crdt::Map<MpuPartKey, MpuPart>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
/// Bucket in which the related object is stored
pub bucket_id: Uuid,
/// Key in which the related object is stored
pub key: String,
}
#[derive(PartialEq, Eq, Clone, Copy, Debug, Serialize, Deserialize)]
pub struct MpuPartKey {
/// Number of the part
pub part_number: u64,
/// Timestamp of part upload
pub timestamp: u64,
}
/// The version of an uploaded part
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct MpuPart {
/// Links to a Version in VersionTable
pub version: Uuid,
/// ETag of the content of this part (known only once done uploading)
pub etag: Option<String>,
/// Size of this part (known only once done uploading)
pub size: Option<u64>,
}
impl garage_util::migrate::InitialFormat for MultipartUpload {
const VERSION_MARKER: &'static [u8] = b"G09s3mpu";
}
}
pub use v09::*;
impl Ord for MpuPartKey {
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
self.part_number
.cmp(&other.part_number)
.then(self.timestamp.cmp(&other.timestamp))
}
}
impl PartialOrd for MpuPartKey {
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
Some(self.cmp(other))
}
}
impl MultipartUpload {
pub fn new(upload_id: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
Self {
upload_id,
deleted: crdt::Bool::new(deleted),
parts: crdt::Map::new(),
bucket_id,
key,
}
}
}
impl Entry<Uuid, EmptyKey> for MultipartUpload {
fn partition_key(&self) -> &Uuid {
&self.upload_id
}
fn sort_key(&self) -> &EmptyKey {
&EmptyKey
}
fn is_tombstone(&self) -> bool {
self.deleted.get()
}
}
impl Crdt for MultipartUpload {
fn merge(&mut self, other: &Self) {
self.deleted.merge(&other.deleted);
if self.deleted.get() {
self.parts.clear();
} else {
self.parts.merge(&other.parts);
}
}
}
impl Crdt for MpuPart {
fn merge(&mut self, other: &Self) {
self.etag = match (self.etag.take(), &other.etag) {
(None, Some(_)) => other.etag.clone(),
(Some(x), Some(y)) if x < *y => other.etag.clone(),
(x, _) => x,
};
self.size = match (self.size, other.size) {
(None, Some(_)) => other.size,
(Some(x), Some(y)) if x < y => other.size,
(x, _) => x,
};
}
}
pub struct MultipartUploadTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub mpu_counter_table: Arc<IndexCounter<MultipartUpload>>,
}
impl TableSchema for MultipartUploadTable {
const TABLE_NAME: &'static str = "multipart_upload";
type P = Uuid;
type S = EmptyKey;
type E = MultipartUpload;
type Filter = DeletedFilter;
fn updated(
&self,
tx: &mut db::Transaction,
old: Option<&Self::E>,
new: Option<&Self::E>,
) -> db::TxOpResult<()> {
// 1. Count
let counter_res = self.mpu_counter_table.count(tx, old, new);
if let Err(e) = db::unabort(counter_res)? {
error!(
"Unable to update multipart object part counter: {}. Index values will be wrong!",
e
);
}
// 2. Propagate deletions to version table
if let (Some(old_mpu), Some(new_mpu)) = (old, new) {
if new_mpu.deleted.get() && !old_mpu.deleted.get() {
let deleted_versions = old_mpu.parts.items().iter().map(|(_k, p)| {
Version::new(
p.version,
VersionBacklink::MultipartUpload {
upload_id: old_mpu.upload_id,
},
true,
)
});
for version in deleted_versions {
let res = self.version_table.queue_insert(tx, &version);
if let Err(e) = db::unabort(res)? {
error!("Unable to enqueue version deletion propagation: {}. A repair will be needed.", e);
}
}
}
}
Ok(())
}
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
filter.apply(entry.is_tombstone())
}
}
impl CountedItem for MultipartUpload {
const COUNTER_TABLE_NAME: &'static str = "bucket_mpu_part_counter";
// Partition key = bucket id
type CP = Uuid;
// Sort key = nothing
type CS = EmptyKey;
fn counter_partition_key(&self) -> &Uuid {
&self.bucket_id
}
fn counter_sort_key(&self) -> &EmptyKey {
&EmptyKey
}
fn counts(&self) -> Vec<(&'static str, i64)> {
let uploads = if self.deleted.get() { 0 } else { 1 };
let mut parts = self
.parts
.items()
.iter()
.map(|(k, _)| k.part_number)
.collect::<Vec<_>>();
parts.dedup();
let bytes = self
.parts
.items()
.iter()
.map(|(_, p)| p.size.unwrap_or(0))
.sum::<u64>();
vec![
(UPLOADS, uploads),
(PARTS, parts.len() as i64),
(BYTES, bytes as i64),
]
}
}

View file

@ -10,6 +10,7 @@ use garage_table::replication::TableShardedReplication;
use garage_table::*;
use crate::index_counter::*;
use crate::s3::mpu_table::*;
use crate::s3::version_table::*;
pub const OBJECTS: &str = "objects";
@ -130,7 +131,86 @@ mod v08 {
}
}
pub use v08::*;
mod v09 {
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use super::v08;
pub use v08::{ObjectVersionData, ObjectVersionHeaders, ObjectVersionMeta};
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
/// The list of currenty stored versions of the object
pub(super) versions: Vec<ObjectVersion>,
}
/// Informations about a version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading {
/// Indicates whether this is a multipart upload
multipart: bool,
/// Headers to be included in the final object
headers: ObjectVersionHeaders,
},
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
impl garage_util::migrate::Migrate for Object {
const VERSION_MARKER: &'static [u8] = b"G09s3o";
type Previous = v08::Object;
fn migrate(old: v08::Object) -> Object {
let versions = old
.versions
.into_iter()
.map(|x| ObjectVersion {
uuid: x.uuid,
timestamp: x.timestamp,
state: match x.state {
v08::ObjectVersionState::Uploading(h) => ObjectVersionState::Uploading {
multipart: false,
headers: h,
},
v08::ObjectVersionState::Complete(d) => ObjectVersionState::Complete(d),
v08::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
},
})
.collect();
Object {
bucket_id: old.bucket_id,
key: old.key,
versions,
}
}
}
}
pub use v09::*;
impl Object {
/// Initialize an Object struct from parts
@ -180,11 +260,11 @@ impl Crdt for ObjectVersionState {
Complete(a) => {
a.merge(b);
}
Uploading(_) => {
Uploading { .. } => {
*self = Complete(b.clone());
}
},
Uploading(_) => {}
Uploading { .. } => {}
}
}
}
@ -199,8 +279,17 @@ impl ObjectVersion {
}
/// Is the object version currently being uploaded
pub fn is_uploading(&self) -> bool {
matches!(self.state, ObjectVersionState::Uploading(_))
///
/// matches only multipart uploads if check_multipart is Some(true)
/// matches only non-multipart uploads if check_multipart is Some(false)
/// matches both if check_multipart is None
pub fn is_uploading(&self, check_multipart: Option<bool>) -> bool {
match &self.state {
ObjectVersionState::Uploading { multipart, .. } => {
check_multipart.map(|x| x == *multipart).unwrap_or(true)
}
_ => false,
}
}
/// Is the object version completely received
@ -267,13 +356,20 @@ impl Crdt for Object {
pub struct ObjectTable {
pub version_table: Arc<Table<VersionTable, TableShardedReplication>>,
pub mpu_table: Arc<Table<MultipartUploadTable, TableShardedReplication>>,
pub object_counter_table: Arc<IndexCounter<Object>>,
}
#[derive(Clone, Copy, Debug, Serialize, Deserialize)]
pub enum ObjectFilter {
/// Is the object version available (received and not a tombstone)
IsData,
IsUploading,
/// Is the object version currently being uploaded
///
/// matches only multipart uploads if check_multipart is Some(true)
/// matches only non-multipart uploads if check_multipart is Some(false)
/// matches both if check_multipart is None
IsUploading { check_multipart: Option<bool> },
}
impl TableSchema for ObjectTable {
@ -314,8 +410,29 @@ impl TableSchema for ObjectTable {
}
};
if newly_deleted {
let deleted_version =
Version::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
if let ObjectVersionState::Uploading {
multipart: true, ..
} = &v.state
{
let deleted_mpu =
MultipartUpload::new(v.uuid, old_v.bucket_id, old_v.key.clone(), true);
let res = self.mpu_table.queue_insert(tx, &deleted_mpu);
if let Err(e) = db::unabort(res)? {
error!(
"Unable to enqueue multipart upload deletion propagation: {}. A repair will be needed.",
e
);
}
}
let deleted_version = Version::new(
v.uuid,
VersionBacklink::Object {
bucket_id: old_v.bucket_id,
key: old_v.key.clone(),
},
true,
);
let res = self.version_table.queue_insert(tx, &deleted_version);
if let Err(e) = db::unabort(res)? {
error!(
@ -333,7 +450,10 @@ impl TableSchema for ObjectTable {
fn matches_filter(entry: &Self::E, filter: &Self::Filter) -> bool {
match filter {
ObjectFilter::IsData => entry.versions.iter().any(|v| v.is_data()),
ObjectFilter::IsUploading => entry.versions.iter().any(|v| v.is_uploading()),
ObjectFilter::IsUploading { check_multipart } => entry
.versions
.iter()
.any(|v| v.is_uploading(*check_multipart)),
}
}
}
@ -360,10 +480,7 @@ impl CountedItem for Object {
} else {
0
};
let n_unfinished_uploads = versions
.iter()
.filter(|v| matches!(v.state, ObjectVersionState::Uploading(_)))
.count();
let n_unfinished_uploads = versions.iter().filter(|v| v.is_uploading(None)).count();
let n_bytes = versions
.iter()
.map(|v| match &v.state {

View file

@ -66,6 +66,8 @@ mod v08 {
use super::v05;
pub use v05::{VersionBlock, VersionBlockKey};
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
@ -90,8 +92,6 @@ mod v08 {
pub key: String,
}
pub use v05::{VersionBlock, VersionBlockKey};
impl garage_util::migrate::Migrate for Version {
type Previous = v05::Version;
@ -110,32 +110,83 @@ mod v08 {
}
}
pub use v08::*;
pub(crate) mod v09 {
use garage_util::crdt;
use garage_util::data::Uuid;
use serde::{Deserialize, Serialize};
use super::v08;
pub use v08::{VersionBlock, VersionBlockKey};
/// A version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Version {
/// UUID of the version, used as partition key
pub uuid: Uuid,
// Actual data: the blocks for this version
// In the case of a multipart upload, also store the etags
// of individual parts and check them when doing CompleteMultipartUpload
/// Is this version deleted
pub deleted: crdt::Bool,
/// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
// Back link to bucket+key so that we can figure if
// this was deleted later on
pub backlink: VersionBacklink,
}
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum VersionBacklink {
Object {
/// Bucket in which the related object is stored
bucket_id: Uuid,
/// Key in which the related object is stored
key: String,
},
MultipartUpload {
upload_id: Uuid,
},
}
impl garage_util::migrate::Migrate for Version {
const VERSION_MARKER: &'static [u8] = b"G09s3v";
type Previous = v08::Version;
fn migrate(old: v08::Version) -> Version {
Version {
uuid: old.uuid,
deleted: old.deleted,
blocks: old.blocks,
backlink: VersionBacklink::Object {
bucket_id: old.bucket_id,
key: old.key,
},
}
}
}
}
pub use v09::*;
impl Version {
pub fn new(uuid: Uuid, bucket_id: Uuid, key: String, deleted: bool) -> Self {
pub fn new(uuid: Uuid, backlink: VersionBacklink, deleted: bool) -> Self {
Self {
uuid,
deleted: deleted.into(),
blocks: crdt::Map::new(),
parts_etags: crdt::Map::new(),
bucket_id,
key,
backlink,
}
}
pub fn has_part_number(&self, part_number: u64) -> bool {
let case1 = self
.parts_etags
.items()
.binary_search_by(|(k, _)| k.cmp(&part_number))
.is_ok();
let case2 = self
.blocks
self.blocks
.items()
.binary_search_by(|(k, _)| k.part_number.cmp(&part_number))
.is_ok();
case1 || case2
.is_ok()
}
}
@ -175,10 +226,8 @@ impl Crdt for Version {
if self.deleted.get() {
self.blocks.clear();
self.parts_etags.clear();
} else {
self.blocks.merge(&other.blocks);
self.parts_etags.merge(&other.parts_etags);
}
}
}

View file

@ -119,7 +119,7 @@ mod v09 {
}
impl garage_util::migrate::Migrate for ClusterLayout {
const VERSION_MARKER: &'static [u8] = b"Glayout09";
const VERSION_MARKER: &'static [u8] = b"G09layout";
type Previous = v08::ClusterLayout;

View file

@ -8,6 +8,8 @@ use garage_util::migrate::Migrate;
use crate::crdt::Crdt;
// =================================== PARTITION KEYS
/// Trait for field used to partition data
pub trait PartitionKey:
Clone + PartialEq + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
@ -31,6 +33,8 @@ impl PartitionKey for FixedBytes32 {
}
}
// =================================== SORT KEYS
/// Trait for field used to sort data
pub trait SortKey: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
/// Get the key used to sort
@ -49,6 +53,14 @@ impl SortKey for FixedBytes32 {
}
}
impl SortKey for u32 {
fn sort_key(&self) -> Cow<'_, [u8]> {
Cow::from(u32::to_be_bytes(*self).to_vec())
}
}
// =================================== SCHEMA
/// Trait for an entry in a table. It must be sortable and partitionnable.
pub trait Entry<P: PartitionKey, S: SortKey>:
Crdt + PartialEq + Clone + Migrate + Send + Sync + 'static