Fix #204 (full Multipart Uploads semantics) #553

Merged
lx merged 20 commits from nlnet-task1 into next 2023-06-09 15:34:10 +00:00
Showing only changes of commit 7ad7dae5d4 - Show all commits

View file

@ -484,27 +484,25 @@ fn fetch_part_info<'a>(
} }
} }
// Cut the beginning and end // Cut the beginning if we have a marker
match &query.part_number_marker { if let Some(marker) = &query.part_number_marker {
Some(marker) => { let next = marker + 1;
let next = marker + 1; let part_idx = parts
let part_idx = .binary_search_by(|part| part.part_number.cmp(&next))
into_ok_or_err(parts.binary_search_by(|part| part.part_number.cmp(&next))); .unwrap_or_else(|x| x);
parts.truncate(part_idx + query.max_parts as usize); parts = parts.split_off(part_idx);
parts = parts.split_off(part_idx);
}
None => {
parts.truncate(query.max_parts as usize);
}
};
match parts.last() {
Some(part_info) => {
let pagination = Some(part_info.part_number);
Ok((parts, pagination))
}
None => Ok((parts, None)),
} }
// Cut the end if we have too many parts
if parts.len() > query.max_parts as usize {
parts.truncate(query.max_parts as usize);
if let Some(part_info) = parts.last() {
let pagination = Some(part_info.part_number);
return Ok((parts, pagination));
}
}
Ok((parts, None))
} }
/* /*
@ -866,14 +864,6 @@ impl ExtractAccumulator for UploadAccumulator {
* Utility functions * Utility functions
*/ */
/// This is a stub for Result::into_ok_or_err that is not yet in Rust stable
fn into_ok_or_err<T>(r: Result<T, T>) -> T {
match r {
Ok(r) => r,
Err(r) => r,
}
}
/// Returns the common prefix of the object given the query prefix and delimiter /// Returns the common prefix of the object given the query prefix and delimiter
fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> { fn common_prefix<'a>(object: &'a Object, query: &ListQueryCommon) -> Option<&'a str> {
match &query.delimiter { match &query.delimiter {
@ -899,7 +889,6 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use garage_model::s3::version_table::*;
use garage_util::*; use garage_util::*;
use std::iter::FromIterator; use std::iter::FromIterator;
@ -1120,85 +1109,76 @@ mod tests {
Ok(()) Ok(())
} }
fn version() -> Version { fn mpu() -> MultipartUpload {
let uuid = Uuid::from([0x08; 32]); let uuid = Uuid::from([0x08; 32]);
let blocks = vec![ let parts = vec![
( (
VersionBlockKey { MpuPartKey {
part_number: 1, part_number: 1,
offset: 1, timestamp: TS,
}, },
VersionBlock { MpuPart {
hash: uuid, version: uuid,
size: 3, size: Some(3),
etag: Some("etag1".into()),
}, },
), ),
( (
VersionBlockKey { MpuPartKey {
part_number: 1,
offset: 2,
},
VersionBlock {
hash: uuid,
size: 2,
},
),
(
VersionBlockKey {
part_number: 2, part_number: 2,
offset: 1, timestamp: TS,
}, },
VersionBlock { MpuPart {
hash: uuid, version: uuid,
size: 8, size: None,
etag: None,
}, },
), ),
( (
VersionBlockKey { MpuPartKey {
part_number: 3,
timestamp: TS,
},
MpuPart {
version: uuid,
size: Some(10),
etag: Some("etag2".into()),
},
),
(
MpuPartKey {
part_number: 5, part_number: 5,
offset: 1, timestamp: TS,
}, },
VersionBlock { MpuPart {
hash: uuid, version: uuid,
size: 7, size: Some(7),
etag: Some("etag3".into()),
}, },
), ),
( (
VersionBlockKey { MpuPartKey {
part_number: 8, part_number: 8,
offset: 1, timestamp: TS,
}, },
VersionBlock { MpuPart {
hash: uuid, version: uuid,
size: 5, size: Some(5),
etag: Some("etag4".into()),
}, },
), ),
]; ];
let etags = vec![
(1, "etag1".to_string()),
(3, "etag2".to_string()),
(5, "etag3".to_string()),
(8, "etag4".to_string()),
(9, "etag5".to_string()),
];
Version { MultipartUpload {
uuid, upload_id: uuid,
deleted: false.into(), deleted: false.into(),
blocks: crdt::Map::<VersionBlockKey, VersionBlock>::from_iter(blocks), parts: crdt::Map::<MpuPartKey, MpuPart>::from_iter(parts),
backlink: VersionBacklink::Object { bucket_id: uuid,
bucket_id: uuid, key: "a".into(),
key: "a".to_string(),
},
parts_etags: crdt::Map::<u64, String>::from_iter(etags),
} }
} }
fn obj() -> Object {
Object::new(bucket(), "d".to_string(), vec![objup_version([0x08; 32])])
}
#[test] #[test]
fn test_fetch_part_info() -> Result<(), Error> { fn test_fetch_part_info() -> Result<(), Error> {
let uuid = Uuid::from([0x08; 32]); let uuid = Uuid::from([0x08; 32]);
@ -1211,82 +1191,85 @@ mod tests {
max_parts: 2, max_parts: 2,
}; };
assert!( let mpu = mpu();
fetch_part_info(&query, None, None, uuid).is_err(),
"No object and version should fail"
);
assert!(
fetch_part_info(&query, Some(obj()), None, uuid).is_err(),
"No version should faild"
);
assert!(
fetch_part_info(&query, None, Some(version()), uuid).is_err(),
"No object should fail"
);
// Start from the beginning but with limited size to trigger pagination // Start from the beginning but with limited size to trigger pagination
let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert_eq!(pagination.unwrap(), 5); assert_eq!(pagination.unwrap(), 3);
assert_eq!( assert_eq!(
info, info,
vec![ vec![
PartInfo { PartInfo {
etag: "etag1".to_string(), etag: "etag1",
timestamp: TS, timestamp: TS,
part_number: 1, part_number: 1,
size: 5 size: 3
}, },
PartInfo { PartInfo {
etag: "etag3".to_string(), etag: "etag2",
timestamp: TS, timestamp: TS,
part_number: 5, part_number: 3,
size: 7 size: 10
}, },
] ]
); );
// Use previous pagination to make a new request // Use previous pagination to make a new request
query.part_number_marker = Some(pagination.unwrap()); query.part_number_marker = Some(pagination.unwrap());
let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert!(pagination.is_none()); assert!(pagination.is_none());
assert_eq!( assert_eq!(
info, info,
vec![PartInfo { vec![
etag: "etag4".to_string(), PartInfo {
timestamp: TS, etag: "etag3",
part_number: 8, timestamp: TS,
size: 5 part_number: 5,
},] size: 7
},
PartInfo {
etag: "etag4",
timestamp: TS,
part_number: 8,
size: 5
},
]
); );
// Trying to access a part that is way larger than registered ones // Trying to access a part that is way larger than registered ones
query.part_number_marker = Some(9999); query.part_number_marker = Some(9999);
let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert!(pagination.is_none()); assert!(pagination.is_none());
assert_eq!(info, vec![]); assert_eq!(info, vec![]);
// Try without any limitation // Try without any limitation
query.max_parts = 1000; query.max_parts = 1000;
query.part_number_marker = None; query.part_number_marker = None;
let (info, pagination) = fetch_part_info(&query, Some(obj()), Some(version()), uuid)?; let (info, pagination) = fetch_part_info(&query, &mpu)?;
assert!(pagination.is_none()); assert!(pagination.is_none());
assert_eq!( assert_eq!(
info, info,
vec![ vec![
PartInfo { PartInfo {
etag: "etag1".to_string(), etag: "etag1",
timestamp: TS, timestamp: TS,
part_number: 1, part_number: 1,
size: 5 size: 3
}, },
PartInfo { PartInfo {
etag: "etag3".to_string(), etag: "etag2",
timestamp: TS,
part_number: 3,
size: 10
},
PartInfo {
etag: "etag3",
timestamp: TS, timestamp: TS,
part_number: 5, part_number: 5,
size: 7 size: 7
}, },
PartInfo { PartInfo {
etag: "etag4".to_string(), etag: "etag4",
timestamp: TS, timestamp: TS,
part_number: 8, part_number: 8,
size: 5 size: 5