Merge pull request 's3 api: refactoring and bug fix in ListObjects' (#655) from fix-list-objects into main
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
Reviewed-on: #655
This commit is contained in:
commit
a0fa50dfcd
1 changed files with 41 additions and 31 deletions
|
@ -426,8 +426,10 @@ where
|
|||
// Drop the first key if needed
|
||||
// Only AfterKey requires it according to the S3 spec and our implem.
|
||||
match (&cursor, iter.peek()) {
|
||||
(RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => iter.next(),
|
||||
(_, _) => None,
|
||||
(RangeBegin::AfterKey { key }, Some(object)) if &object.key == key => {
|
||||
iter.next();
|
||||
}
|
||||
_ => (),
|
||||
};
|
||||
|
||||
while let Some(object) = iter.peek() {
|
||||
|
@ -436,16 +438,22 @@ where
|
|||
return Ok(None);
|
||||
}
|
||||
|
||||
cursor = match acc.extract(query, &cursor, &mut iter) {
|
||||
ExtractionResult::Extracted { key } => RangeBegin::AfterKey { key },
|
||||
match acc.extract(query, &cursor, &mut iter) {
|
||||
ExtractionResult::Extracted { key } => {
|
||||
cursor = RangeBegin::AfterKey { key };
|
||||
}
|
||||
ExtractionResult::SkipTo { key, fallback_key } => {
|
||||
RangeBegin::IncludingKey { key, fallback_key }
|
||||
cursor = RangeBegin::IncludingKey { key, fallback_key };
|
||||
}
|
||||
ExtractionResult::FilledAtUpload { key, upload } => {
|
||||
return Ok(Some(RangeBegin::AfterUpload { key, upload }))
|
||||
return Ok(Some(RangeBegin::AfterUpload { key, upload }));
|
||||
}
|
||||
ExtractionResult::Filled => {
|
||||
return Ok(Some(cursor));
|
||||
}
|
||||
ExtractionResult::NoMore => {
|
||||
return Ok(None);
|
||||
}
|
||||
ExtractionResult::Filled => return Ok(Some(cursor)),
|
||||
ExtractionResult::NoMore => return Ok(None),
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -519,8 +527,8 @@ fn fetch_part_info<'a>(
|
|||
/// This key can be the prefix in the base case, or intermediate
|
||||
/// points in the dataset if we are continuing a previous listing.
|
||||
impl ListObjectsQuery {
|
||||
fn build_accumulator(&self) -> Accumulator<String, ObjectInfo> {
|
||||
Accumulator::<String, ObjectInfo>::new(self.common.page_size)
|
||||
fn build_accumulator(&self) -> ObjectAccumulator {
|
||||
ObjectAccumulator::new(self.common.page_size)
|
||||
}
|
||||
|
||||
fn begin(&self) -> Result<RangeBegin, Error> {
|
||||
|
@ -529,9 +537,10 @@ impl ListObjectsQuery {
|
|||
// In V2 mode, the continuation token is defined as an opaque
|
||||
// string in the spec, so we can do whatever we want with it.
|
||||
// In our case, it is defined as either [ or ] (for include
|
||||
// or exclude), followed by a base64-encoded string
|
||||
// representing the key to start with.
|
||||
(Some(token), _) => match &token[..1] {
|
||||
"[" => Ok(RangeBegin::IncludingKey {
|
||||
(Some(token), _) => match &token.get(..1) {
|
||||
Some("[") => Ok(RangeBegin::IncludingKey {
|
||||
key: String::from_utf8(
|
||||
BASE64_STANDARD
|
||||
.decode(token[1..].as_bytes())
|
||||
|
@ -539,7 +548,7 @@ impl ListObjectsQuery {
|
|||
)?,
|
||||
fallback_key: None,
|
||||
}),
|
||||
"]" => Ok(RangeBegin::AfterKey {
|
||||
Some("]") => Ok(RangeBegin::AfterKey {
|
||||
key: String::from_utf8(
|
||||
BASE64_STANDARD
|
||||
.decode(token[1..].as_bytes())
|
||||
|
@ -580,8 +589,8 @@ impl ListObjectsQuery {
|
|||
}
|
||||
|
||||
impl ListMultipartUploadsQuery {
|
||||
fn build_accumulator(&self) -> Accumulator<Uuid, UploadInfo> {
|
||||
Accumulator::<Uuid, UploadInfo>::new(self.common.page_size)
|
||||
fn build_accumulator(&self) -> UploadAccumulator {
|
||||
UploadAccumulator::new(self.common.page_size)
|
||||
}
|
||||
|
||||
fn begin(&self) -> Result<RangeBegin, Error> {
|
||||
|
@ -665,6 +674,7 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
|
|||
Some(p) => p,
|
||||
None => return None,
|
||||
};
|
||||
assert!(pfx.starts_with(&query.prefix));
|
||||
|
||||
// Try to register this prefix
|
||||
// If not possible, we can return early
|
||||
|
@ -675,8 +685,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
|
|||
// We consume the whole common prefix from the iterator
|
||||
let mut last_pfx_key = &object.key;
|
||||
loop {
|
||||
last_pfx_key = match objects.peek() {
|
||||
Some(o) if o.key.starts_with(pfx) => &o.key,
|
||||
match objects.peek() {
|
||||
Some(o) if o.key.starts_with(pfx) => {
|
||||
last_pfx_key = &o.key;
|
||||
objects.next();
|
||||
}
|
||||
Some(_) => {
|
||||
return Some(ExtractionResult::Extracted {
|
||||
key: last_pfx_key.to_owned(),
|
||||
|
@ -692,8 +705,6 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
|
|||
}
|
||||
}
|
||||
};
|
||||
|
||||
objects.next();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -708,12 +719,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
|
|||
}
|
||||
|
||||
// Otherwise, we need to check if we can add it
|
||||
match self.is_full() {
|
||||
true => false,
|
||||
false => {
|
||||
self.common_prefixes.insert(key);
|
||||
true
|
||||
}
|
||||
if self.is_full() {
|
||||
false
|
||||
} else {
|
||||
self.common_prefixes.insert(key);
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -721,12 +731,11 @@ impl<K: std::cmp::Ord, V> Accumulator<K, V> {
|
|||
// It is impossible to add twice a key, this is an error
|
||||
assert!(!self.keys.contains_key(&key));
|
||||
|
||||
match self.is_full() {
|
||||
true => false,
|
||||
false => {
|
||||
self.keys.insert(key, value);
|
||||
true
|
||||
}
|
||||
if self.is_full() {
|
||||
false
|
||||
} else {
|
||||
self.keys.insert(key, value);
|
||||
true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -743,6 +752,7 @@ impl ExtractAccumulator for ObjectAccumulator {
|
|||
}
|
||||
|
||||
let object = objects.next().expect("This iterator can not be empty as it is checked earlier in the code. This is a logic bug, please report it.");
|
||||
assert!(object.key.starts_with(&query.prefix));
|
||||
|
||||
let version = match object.versions().iter().find(|x| x.is_data()) {
|
||||
Some(v) => v,
|
||||
|
|
Loading…
Reference in a new issue