Implement ListMultipartUploads #171

Merged
lx merged 2 commits from feature/s3-multipart-compat into main 2022-01-12 18:04:56 +00:00
Showing only changes of commit 3bbc263027 - Show all commits

View file

@ -251,8 +251,9 @@ struct UploadInfo {
} }
enum ExtractionResult { enum ExtractionResult {
Stopped, NoMore,
StoppedAtUpload { Filled,
FilledAtUpload {
key: String, key: String,
upload: Uuid, upload: Uuid,
}, },
@ -343,10 +344,11 @@ where
ExtractionResult::SkipTo { key, fallback_key } => { ExtractionResult::SkipTo { key, fallback_key } => {
RangeBegin::IncludingKey { key, fallback_key } RangeBegin::IncludingKey { key, fallback_key }
} }
ExtractionResult::StoppedAtUpload { key, upload } => { ExtractionResult::FilledAtUpload { key, upload } => {
return Ok(Some(RangeBegin::AfterUpload { key, upload })) return Ok(Some(RangeBegin::AfterUpload { key, upload }))
} }
ExtractionResult::Stopped => return Ok(Some(cursor)), ExtractionResult::Filled => return Ok(Some(cursor)),
ExtractionResult::NoMore => return Ok(None),
}; };
} }
@ -515,7 +517,7 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
// Try to register this prefix // Try to register this prefix
// If not possible, we can return early // If not possible, we can return early
if !self.try_insert_common_prefix(pfx.to_string()) { if !self.try_insert_common_prefix(pfx.to_string()) {
return Some(ExtractionResult::Stopped); return Some(ExtractionResult::Filled);
} }
// We consume the whole common prefix from the iterator // We consume the whole common prefix from the iterator
@ -529,10 +531,13 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
}) })
} }
None => { None => {
return Some(ExtractionResult::SkipTo { return match key_after_prefix(pfx) {
key: key_after_prefix(pfx), Some(next) => Some(ExtractionResult::SkipTo {
fallback_key: Some(last_pfx_key.to_owned()), key: next,
}) fallback_key: Some(last_pfx_key.to_owned()),
}),
None => Some(ExtractionResult::NoMore),
}
} }
}; };
@ -609,7 +614,7 @@ impl ExtractAccumulator for ObjectAccumulator {
true => ExtractionResult::Extracted { true => ExtractionResult::Extracted {
key: object.key.clone(), key: object.key.clone(),
}, },
false => ExtractionResult::Stopped, false => ExtractionResult::Filled,
} }
} }
} }
@ -676,7 +681,7 @@ impl ExtractAccumulator for UploadAccumulator {
timestamp: first_upload.timestamp, timestamp: first_upload.timestamp,
}; };
if !self.try_insert_entry(first_upload.uuid, first_up_info) { if !self.try_insert_entry(first_upload.uuid, first_up_info) {
return ExtractionResult::Stopped; return ExtractionResult::Filled;
} }
// We can then collect the remaining uploads in a loop // We can then collect the remaining uploads in a loop
@ -690,7 +695,7 @@ impl ExtractAccumulator for UploadAccumulator {
// Insert data in our accumulator // Insert data in our accumulator
// If it is full, return information to paginate. // If it is full, return information to paginate.
if !self.try_insert_entry(upload.uuid, up_info) { if !self.try_insert_entry(upload.uuid, up_info) {
return ExtractionResult::StoppedAtUpload { return ExtractionResult::FilledAtUpload {
key: object.key.clone(), key: object.key.clone(),
upload: prev_uuid, upload: prev_uuid,
}; };
@ -729,12 +734,30 @@ fn uriencode_maybe(s: &str, yes: bool) -> s3_xml::Value {
} }
} }
const UTF8_BEFORE_LAST_CHAR: char = '\u{10FFFE}';
/// Compute the key after the prefix /// Compute the key after the prefix
fn key_after_prefix(pfx: &str) -> String { fn key_after_prefix(pfx: &str) -> Option<String> {
let mut first_key_after_prefix = pfx.to_string(); let mut next = pfx.to_string();
let tail = first_key_after_prefix.pop().unwrap(); while !next.is_empty() {
first_key_after_prefix.push(((tail as u8) + 1) as char); let tail = next.pop().unwrap();
first_key_after_prefix if tail >= char::MAX {
continue;
}
// Circumvent a limitation of RangeFrom that overflow earlier than needed
// See: https://doc.rust-lang.org/core/ops/struct.RangeFrom.html
let new_tail = if tail == UTF8_BEFORE_LAST_CHAR {
char::MAX
} else {
(tail..).nth(1).unwrap()
};
next.push(new_tail);
return Some(next);
}
None
} }
/* /*
@ -743,6 +766,7 @@ fn key_after_prefix(pfx: &str) -> String {
#[cfg(test)] #[cfg(test)]
mod tests { mod tests {
use super::*; use super::*;
use std::iter::FromIterator;
const TS: u64 = 1641394898314; const TS: u64 = 1641394898314;
@ -787,6 +811,39 @@ mod tests {
} }
} }
#[test]
fn test_key_after_prefix() {
assert_eq!(UTF8_BEFORE_LAST_CHAR as u32, (char::MAX as u32) - 1);
assert_eq!(key_after_prefix("a/b/").unwrap().as_str(), "a/b0");
assert_eq!(key_after_prefix("").unwrap().as_str(), "");
assert_eq!(
key_after_prefix("􏿽").unwrap().as_str(),
String::from(char::from_u32(0x10FFFE).unwrap())
);
// When the last character is the biggest UTF8 char
let a = String::from_iter(['a', char::MAX].iter());
assert_eq!(key_after_prefix(a.as_str()).unwrap().as_str(), "b");
// When all characters are the biggest UTF8 char
let b = String::from_iter([char::MAX; 3].iter());
assert!(key_after_prefix(b.as_str()).is_none());
// Check utf8 surrogates
let c = String::from('\u{D7FF}');
assert_eq!(
key_after_prefix(c.as_str()).unwrap().as_str(),
String::from('\u{E000}')
);
// Check the character before the biggest one
let d = String::from('\u{10FFFE}');
assert_eq!(
key_after_prefix(d.as_str()).unwrap().as_str(),
String::from(char::MAX)
);
}
#[test] #[test]
fn test_common_prefixes() { fn test_common_prefixes() {
let mut query = query(); let mut query = query();
@ -844,7 +901,7 @@ mod tests {
// Check the case where we skip some uploads // Check the case where we skip some uploads
match acc.extract(&(query().common), &start, &mut iter) { match acc.extract(&(query().common), &start, &mut iter) {
ExtractionResult::StoppedAtUpload { key, upload } => { ExtractionResult::FilledAtUpload { key, upload } => {
assert_eq!(key, "b"); assert_eq!(key, "b");
assert_eq!(upload, Uuid::from([0x8f; 32])); assert_eq!(upload, Uuid::from([0x8f; 32]));
} }