s3 api: refactoring and bug fix in ListObjects #655

Merged
lx merged 2 commits from fix-list-objects into main 2023-10-26 09:22:48 +00:00

View file

@ -426,8 +426,10 @@ where
// Drop the first key if needed // Drop the first key if needed
// Only AfterKey requires it according to the S3 spec and our implem. // Only AfterKey requires it according to the S3 spec and our implem.
match (&cursor, iter.peek()) { match (&cursor, iter.peek()) {
(RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(), (RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => {
(_, _) => None, iter.next();
}
_ => (),
}; };
while let Some(object) = iter.peek() { while let Some(object) = iter.peek() {
@ -436,16 +438,22 @@ where
return Ok(None); return Ok(None);
} }
cursor = match acc.extract(query, &cursor, &mut iter) { match acc.extract(query, &cursor, &mut iter) {
ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key }, ExtractionResult::Extracted { key } => {
cursor = RangeBegin::AfterKey { key };
}
ExtractionResult::SkipTo { key, fallback_key } => { ExtractionResult::SkipTo { key, fallback_key } => {
RangeBegin::IncludingKey { key, fallback_key } cursor = RangeBegin::IncludingKey { key, fallback_key };
} }
ExtractionResult::FilledAtUpload { key, upload } => { ExtractionResult::FilledAtUpload { key, upload } => {
return Ok(Some(RangeBegin::AfterUpload { key, upload })) return Ok(Some(RangeBegin::AfterUpload { key, upload }));
}
ExtractionResult::Filled => {
return Ok(Some(cursor));
}
ExtractionResult::NoMore => {
return Ok(None);
} }
ExtractionResult::Filled => return Ok(Some(cursor)),
ExtractionResult::NoMore => return Ok(None),
}; };
} }
@ -519,8 +527,8 @@ fn fetch_part_info<'a>(
/// This key can be the prefix in the base case, or intermediate /// This key can be the prefix in the base case, or intermediate
/// points in the dataset if we are continuing a previous listing. /// points in the dataset if we are continuing a previous listing.
impl ListObjectsQuery { impl ListObjectsQuery {
fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> { fn build_accumulator(&self) -> ObjectAccumulator {
Accumulator::<String, ObjectInfo>::new(self.common.page_size) ObjectAccumulator::new(self.common.page_size)
} }
fn begin(&self) -> Result<RangeBegin, Error> { fn begin(&self) -> Result<RangeBegin, Error> {
@ -529,9 +537,10 @@ impl ListObjectsQuery {
// In V2 mode, the continuation token is defined as an opaque // In V2 mode, the continuation token is defined as an opaque
// string in the spec, so we can do whatever we want with it. // string in the spec, so we can do whatever we want with it.
// In our case, it is defined as either [ or ] (for include // In our case, it is defined as either [ or ] (for include
// or exclude), followed by a base64-encoded string
// representing the key to start with. // representing the key to start with.
(Some(token), _) => match &token[..1] { (Some(token), _) => match &token.get(..1) {
"[" => Ok(RangeBegin::IncludingKey { Some("[") => Ok(RangeBegin::IncludingKey {
key: String::from_utf8( key: String::from_utf8(
BASE64_STANDARD BASE64_STANDARD
.decode(token[1..].as_bytes()) .decode(token[1..].as_bytes())
@ -539,7 +548,7 @@ impl ListObjectsQuery {
)?, )?,
fallback_key: None, fallback_key: None,
}), }),
"]" => Ok(RangeBegin::AfterKey { Some("]") => Ok(RangeBegin::AfterKey {
key: String::from_utf8( key: String::from_utf8(
BASE64_STANDARD BASE64_STANDARD
.decode(token[1..].as_bytes()) .decode(token[1..].as_bytes())
@ -580,8 +589,8 @@ impl ListObjectsQuery {
} }
impl ListMultipartUploadsQuery { impl ListMultipartUploadsQuery {
fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> { fn build_accumulator(&self) -> UploadAccumulator {
Accumulator::<Uuid, UploadInfo>::new(self.common.page_size) UploadAccumulator::new(self.common.page_size)
} }
fn begin(&self) -> Result<RangeBegin, Error> { fn begin(&self) -> Result<RangeBegin, Error> {
@ -665,6 +674,7 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
Some(p) => p, Some(p) => p,
None => return None, None => return None,
}; };
assert!(pfx.starts_with(&query.prefix));
// Try to register this prefix // Try to register this prefix
// If not possible, we can return early // If not possible, we can return early
@ -675,8 +685,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
// We consume the whole common prefix from the iterator // We consume the whole common prefix from the iterator
let mut last_pfx_key = &object.key; let mut last_pfx_key = &object.key;
loop { loop {
last_pfx_key = match objects.peek() { match objects.peek() {
Some(o) if o.key.starts_with(pfx) => &o.key, Some(o) if o.key.starts_with(pfx) => {
last_pfx_key = &o.key;
objects.next();
}
Some(_) => { Some(_) => {
return Some(ExtractionResult::Extracted { return Some(ExtractionResult::Extracted {
key: last_pfx_key.to_owned(), key: last_pfx_key.to_owned(),
@ -692,8 +705,6 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
} }
} }
}; };
objects.next();
} }
} }
@ -708,27 +719,25 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
} }
// Otherwise, we need to check if we can add it // Otherwise, we need to check if we can add it
match self.is_full() { if self.is_full() {
true => false, false
false => { } else {
self.common_prefixes.insert(key); self.common_prefixes.insert(key);
true true
} }
} }
}
fn try_insert_entry(&mut self, key: K, value: V) -> bool { fn try_insert_entry(&mut self, key: K, value: V) -> bool {
// It is impossible to add twice a key, this is an error // It is impossible to add twice a key, this is an error
assert!(!self.keys.contains_key(&key)); assert!(!self.keys.contains_key(&key));
match self.is_full() { if self.is_full() {
true => false, false
false => { } else {
self.keys.insert(key, value); self.keys.insert(key, value);
true true
} }
} }
}
} }
impl ExtractAccumulator for ObjectAccumulator { impl ExtractAccumulator for ObjectAccumulator {
@ -743,6 +752,7 @@ impl ExtractAccumulator for ObjectAccumulator {
} }
let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it."); let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
assert!(object.key.starts_with(&query.prefix));
let version = match object.versions().iter().find(|x| x.is_data()) { let version = match object.versions().iter().find(|x| x.is_data()) {
Some(v) => v, Some(v) => v,