Fix #204 (full Multipart Uploads semantics) #553

Merged
lx merged 20 commits from nlnet-task1 into next 2023-06-09 15:34:10 +00:00
9 changed files with 22 additions and 24 deletions
Showing only changes of commit 412ab77b08 - Show all commits

View file

@ -192,8 +192,8 @@ async fn bucket_info_results(
} }
}), }),
keys: relevant_keys keys: relevant_keys
.into_iter() .into_values()
.map(|(_, key)| { .map(|key| {
let p = key.state.as_option().unwrap(); let p = key.state.as_option().unwrap();
GetBucketInfoKey { GetBucketInfoKey {
access_key_id: key.key_id, access_key_id: key.key_id,

View file

@ -183,8 +183,8 @@ async fn key_info_results(garage: &Arc<Garage>, key: Key) -> Result<Response<Bod
create_bucket: *key_state.allow_create_bucket.get(), create_bucket: *key_state.allow_create_bucket.get(),
}, },
buckets: relevant_buckets buckets: relevant_buckets
.into_iter() .into_values()
.map(|(_, bucket)| { .map(|bucket| {
let state = bucket.state.as_option().unwrap(); let state = bucket.state.as_option().unwrap();
KeyInfoBucketResult { KeyInfoBucketResult {
id: hex::encode(bucket.id), id: hex::encode(bucket.id),

View file

@ -441,7 +441,7 @@ fn body_from_blocks_range(
// block.part_number, which is not the same in the case of a multipart upload) // block.part_number, which is not the same in the case of a multipart upload)
let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min( let mut blocks: Vec<(VersionBlock, u64)> = Vec::with_capacity(std::cmp::min(
all_blocks.len(), all_blocks.len(),
4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size as u64, 1024)) as usize, 4 + ((end - begin) / std::cmp::max(all_blocks[0].1.size, 1024)) as usize,
)); ));
let mut block_offset: u64 = 0; let mut block_offset: u64 = 0;
for (_, b) in all_blocks.iter() { for (_, b) in all_blocks.iter() {
@ -452,7 +452,7 @@ fn body_from_blocks_range(
if block_offset < end && block_offset + b.size > begin { if block_offset < end && block_offset + b.size > begin {
blocks.push((*b, block_offset)); blocks.push((*b, block_offset));
} }
block_offset += b.size as u64; block_offset += b.size;
} }
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();

View file

@ -340,6 +340,7 @@ pub async fn handle_abort_multipart_upload(
// ======== helpers ============ // ======== helpers ============
#[allow(clippy::ptr_arg)]
pub(crate) async fn get_upload( pub(crate) async fn get_upload(
garage: &Garage, garage: &Garage,
bucket_id: &Uuid, bucket_id: &Uuid,
@ -347,13 +348,10 @@ pub(crate) async fn get_upload(
upload_id: &Uuid, upload_id: &Uuid,
) -> Result<(Object, ObjectVersion, MultipartUpload), Error> { ) -> Result<(Object, ObjectVersion, MultipartUpload), Error> {
let (object, mpu) = futures::try_join!( let (object, mpu) = futures::try_join!(
garage garage.object_table.get(bucket_id, key).map_err(Error::from),
.object_table
.get(&bucket_id, &key)
.map_err(Error::from),
garage garage
.mpu_table .mpu_table
.get(&upload_id, &EmptyKey) .get(upload_id, &EmptyKey)
.map_err(Error::from), .map_err(Error::from),
)?; )?;

View file

@ -153,7 +153,7 @@ impl AdminRpcHandler {
let (bucket_id, key, ov_id) = match &version.backlink { let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => { VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = self.garage.mpu_table.get(&upload_id, &EmptyKey).await? { if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if !mpu.deleted.get() { if !mpu.deleted.get() {
mpu.parts.clear(); mpu.parts.clear();
mpu.deleted.set(); mpu.deleted.set();

View file

@ -170,7 +170,7 @@ impl TableRepair for RepairVersions {
let ref_exists = match &version.backlink { let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage VersionBacklink::Object { bucket_id, key } => garage
.object_table .object_table
.get(&bucket_id, &key) .get(bucket_id, key)
.await? .await?
.map(|o| { .map(|o| {
o.versions().iter().any(|x| { o.versions().iter().any(|x| {
@ -180,7 +180,7 @@ impl TableRepair for RepairVersions {
.unwrap_or(false), .unwrap_or(false),
VersionBacklink::MultipartUpload { upload_id } => garage VersionBacklink::MultipartUpload { upload_id } => garage
.mpu_table .mpu_table
.get(&upload_id, &EmptyKey) .get(upload_id, &EmptyKey)
.await? .await?
.map(|u| !u.deleted.get()) .map(|u| !u.deleted.get())
.unwrap_or(false), .unwrap_or(false),

View file

@ -2,10 +2,10 @@ use std::sync::Arc;
use garage_db as db; use garage_db as db;
use garage_util::crdt::Crdt;
use garage_util::data::*; use garage_util::data::*;
use garage_util::time::*; use garage_util::time::*;
use garage_table::crdt::*;
use garage_table::replication::TableShardedReplication; use garage_table::replication::TableShardedReplication;
use garage_table::*; use garage_table::*;
@ -21,8 +21,6 @@ mod v09 {
use garage_util::data::Uuid; use garage_util::data::Uuid;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
pub use crate::s3::version_table::v09::VersionBlock;
/// A part of a multipart upload /// A part of a multipart upload
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)] #[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct MultipartUpload { pub struct MultipartUpload {
@ -30,15 +28,16 @@ mod v09 {
pub upload_id: Uuid, pub upload_id: Uuid,
/// Is this multipart upload deleted /// Is this multipart upload deleted
/// The MultipartUpload is marked as deleted as soon as the
/// multipart upload is either completed or aborted
pub deleted: crdt::Bool, pub deleted: crdt::Bool,
/// List of uploaded parts, key = (part number, timestamp) /// List of uploaded parts, key = (part number, timestamp)
/// In case of retries, all versions for each part are kept /// In case of retries, all versions for each part are kept
/// Everything is cleaned up only once the multipart upload is completed or /// Everything is cleaned up only once the MultipartUpload is marked deleted
/// aborted
pub parts: crdt::Map<MpuPartKey, MpuPart>, pub parts: crdt::Map<MpuPartKey, MpuPart>,
// Back link to bucket+key so that we can figure if // Back link to bucket+key so that we can find the object this mpu
// this was deleted later on // belongs to and check whether it is still valid
/// Bucket in which the related object is stored /// Bucket in which the related object is stored
pub bucket_id: Uuid, pub bucket_id: Uuid,
/// Key in which the related object is stored /// Key in which the related object is stored

View file

@ -134,8 +134,9 @@ pub(crate) mod v09 {
/// list of blocks of data composing the version /// list of blocks of data composing the version
pub blocks: crdt::Map<VersionBlockKey, VersionBlock>, pub blocks: crdt::Map<VersionBlockKey, VersionBlock>,
// Back link to bucket+key so that we can figure if // Back link to owner of this version (either an object or a multipart
// this was deleted later on // upload), used to find whether it has been deleted and this version
// should in turn be deleted (see versions repair procedure)
pub backlink: VersionBacklink, pub backlink: VersionBacklink,
} }

View file

@ -106,7 +106,7 @@ impl WebServer {
addr: SocketAddr, addr: SocketAddr,
) -> Result<Response<Body>, Infallible> { ) -> Result<Response<Body>, Infallible> {
if let Ok(forwarded_for_ip_addr) = if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(&req.headers()) forwarded_headers::handle_forwarded_for_headers(req.headers())
{ {
info!( info!(
"{} (via {}) {} {}", "{} (via {}) {} {}",