Compare commits

..

No commits in common. "1acf7e4c66aab0a0b5bcce8eb2965b35d587dd2a" and "1aed317818a423439292055279dbe1a7023c049a" have entirely different histories.

4 changed files with 61 additions and 168 deletions

View file

@ -139,6 +139,6 @@ steps:
--- ---
kind: signature kind: signature
hmac: f0f2e947c8aa8bc5b83d25b4da22f3eb711b3fe1cc80ead4f93428dbd3d44164 hmac: e919f8a66d20ebfeeec56b291a8a0fdd59a482601da987fcf533d96d24768744
... ...

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Body, Request, Response};
use garage_util::data::*; use garage_util::data::*;
use garage_util::time::*; use garage_util::time::*;
@ -68,7 +68,6 @@ pub async fn handle_delete(
Ok(Response::builder() Ok(Response::builder()
.header("x-amz-version-id", hex::encode(delete_marker_version)) .header("x-amz-version-id", hex::encode(delete_marker_version))
.status(StatusCode::NO_CONTENT)
.body(Body::from(vec![])) .body(Body::from(vec![]))
.unwrap()) .unwrap())
} }

View file

@ -68,63 +68,26 @@ pub async fn handle_list(
let mut result_keys = BTreeMap::<String, ListResultInfo>::new(); let mut result_keys = BTreeMap::<String, ListResultInfo>::new();
let mut result_common_prefixes = BTreeSet::<String>::new(); let mut result_common_prefixes = BTreeSet::<String>::new();
// Determine the key from where we want to start fetch objects let mut next_chunk_start = if query.is_v2 {
// from the database, and whether the object at this key must
// be included or excluded from the response.
// This key can be the prefix in the base case, or intermediate
// points in the dataset if we are continuing a previous listing.
#[allow(clippy::collapsible_else_if)]
let (mut next_chunk_start, mut next_chunk_exclude_start) = if query.is_v2 {
if let Some(ct) = &query.continuation_token { if let Some(ct) = &query.continuation_token {
// In V2 mode, the continuation token is defined as an opaque String::from_utf8(base64::decode(ct.as_bytes())?)?
// string in the spec, so we can do whatever we want with it.
// In our case, it is defined as either [ or ] (for include
// and exclude, respectively), followed by a base64 string
// representing the key to start with.
let exclude = match &ct[..1] {
"[" => false,
"]" => true,
_ => return Err(Error::BadRequest("Invalid continuation token".to_string())),
};
(
String::from_utf8(base64::decode(ct[1..].as_bytes())?)?,
exclude,
)
} else if let Some(sa) = &query.start_after {
// StartAfter has defined semantics in the spec:
// start listing at the first key immediately after.
(sa.clone(), true)
} else { } else {
// In the case where neither is specified, we start query
// listing at the specified prefix. If an object has this .start_after
// exact same key, we include it. (TODO is this correct?) .clone()
(query.prefix.clone(), false) .unwrap_or_else(|| query.prefix.clone())
} }
} else { } else {
if let Some(mk) = &query.marker { query.marker.clone().unwrap_or_else(|| query.prefix.clone())
// In V1 mode, the spec defines the Marker value to mean
// the same thing as the StartAfter value in V2 mode.
(mk.clone(), true)
} else {
// Base case, same as in V2 mode
(query.prefix.clone(), false)
}
}; };
debug!( debug!(
"List request: `{:?}` {} `{}`, start from {}, exclude first {}", "List request: `{:?}` {} `{}`",
query.delimiter, query.max_keys, query.prefix, next_chunk_start, next_chunk_exclude_start query.delimiter, query.max_keys, query.prefix
); );
// `truncated` is a boolean that determines whether there are
// more items to be added.
let truncated; let truncated;
// `last_processed_item` is the key of the last item
// that was included in the listing before truncating.
let mut last_processed_item = None;
'query_loop: loop { 'query_loop: loop {
// Fetch objects
let objects = garage let objects = garage
.object_table .object_table
.get_range( .get_range(
@ -140,120 +103,64 @@ pub async fn handle_list(
query.max_keys + 1, query.max_keys + 1,
objects.len() objects.len()
); );
let current_chunk_start = next_chunk_start.clone();
// Iterate on returned objects and add them to the response.
// If a delimiter is specified, we take care of grouping objects
// into CommonPrefixes.
for object in objects.iter() { for object in objects.iter() {
// If we have retrieved an object that doesn't start with
// the prefix, we know we have finished listing our stuff.
if !object.key.starts_with(&query.prefix) { if !object.key.starts_with(&query.prefix) {
truncated = false; truncated = None;
break 'query_loop; break 'query_loop;
} }
// Exclude the starting key if we have to. if query.is_v2 && query.start_after.as_ref() == Some(&object.key) {
if object.key == next_chunk_start && next_chunk_exclude_start {
continue; continue;
} }
// Find if this object has a currently valid (non-deleted, if let Some(version) = object.versions().iter().find(|x| x.is_data()) {
// non-still-uploading) version. If not, skip it. if result_keys.len() + result_common_prefixes.len() >= query.max_keys {
let version = match object.versions().iter().find(|x| x.is_data()) { truncated = Some(object.key.to_string());
Some(v) => v, break 'query_loop;
None => continue, }
}; let common_prefix = if let Some(delimiter) = &query.delimiter {
let relative_key = &object.key[query.prefix.len()..];
// If we don't have space to add this object to our response, relative_key
// we will need to stop here and mark the key of this object .find(delimiter)
// as the marker from where .map(|i| &object.key[..query.prefix.len() + i + delimiter.len()])
// we want to start again in the next list call. } else {
let cannot_add = result_keys.len() + result_common_prefixes.len() >= query.max_keys; None
};
// Determine whether this object should be grouped inside if let Some(pfx) = common_prefix {
// a CommonPrefix because it contains the delimiter,
// or if it should be returned as an object.
let common_prefix = match &query.delimiter {
Some(delimiter) => object.key[query.prefix.len()..]
.find(delimiter)
.map(|i| &object.key[..query.prefix.len() + i + delimiter.len()]),
None => None,
};
if let Some(pfx) = common_prefix {
// In the case where this object must be grouped in a
// common prefix, handle it here.
if !result_common_prefixes.contains(pfx) {
// Determine the first listing key that starts after
// the common prefix, by finding the next possible
// string by alphabetical order.
let mut first_key_after_prefix = pfx.to_string();
let tail = first_key_after_prefix.pop().unwrap();
first_key_after_prefix.push(((tail as u8) + 1) as char);
// If this were the end of the chunk,
// the next chunk should start after this prefix
next_chunk_start = first_key_after_prefix;
next_chunk_exclude_start = false;
if cannot_add {
truncated = true;
break 'query_loop;
}
result_common_prefixes.insert(pfx.to_string()); result_common_prefixes.insert(pfx.to_string());
} } else {
last_processed_item = Some(object.key.clone()); let meta = match &version.state {
continue; ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
}; ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => {
meta
// This is not a common prefix, we want to add it to our }
// response directly. _ => unreachable!(),
next_chunk_start = object.key.clone(); };
let info = match result_keys.get(&object.key) {
if cannot_add { None => ListResultInfo {
truncated = true; last_modified: version.timestamp,
next_chunk_exclude_start = false; size: meta.size,
break 'query_loop; etag: meta.etag.to_string(),
},
Some(_lri) => {
return Err(Error::InternalError(GarageError::Message(format!(
"Duplicate key?? {}",
object.key
))))
}
};
result_keys.insert(object.key.clone(), info);
};
} }
let meta = match &version.state {
ObjectVersionState::Complete(ObjectVersionData::Inline(meta, _)) => meta,
ObjectVersionState::Complete(ObjectVersionData::FirstBlock(meta, _)) => meta,
_ => unreachable!(),
};
let info = match result_keys.get(&object.key) {
None => ListResultInfo {
last_modified: version.timestamp,
size: meta.size,
etag: meta.etag.to_string(),
},
Some(_lri) => {
return Err(Error::InternalError(GarageError::Message(format!(
"Duplicate key?? {} (this is a bug, please report it)",
object.key
))))
}
};
result_keys.insert(object.key.clone(), info);
last_processed_item = Some(object.key.clone());
next_chunk_exclude_start = true;
} }
// If our database returned less objects than what we were asking for,
// it means that no more objects are in the bucket. So we stop here.
if objects.len() < query.max_keys + 1 { if objects.len() < query.max_keys + 1 {
truncated = false; truncated = None;
break 'query_loop; break 'query_loop;
} }
if !objects.is_empty() {
// Sanity check: we should have added at least an object next_chunk_start = objects[objects.len() - 1].key.clone();
// or a prefix to our returned result.
if next_chunk_start == current_chunk_start || last_processed_item.is_none() {
return Err(Error::InternalError(GarageError::Message(format!(
"S3 ListObject: made no progress, still starting at {} (this is a bug, please report it)", next_chunk_start))));
} }
// Loop and fetch more objects
} }
let mut result = s3_xml::ListBucketResult { let mut result = s3_xml::ListBucketResult {
@ -274,10 +181,11 @@ pub async fn handle_list(
true => Some(s3_xml::Value("url".to_string())), true => Some(s3_xml::Value("url".to_string())),
false => None, false => None,
}, },
key_count: Some(s3_xml::IntValue( key_count: Some(s3_xml::IntValue(
result_keys.len() as i64 + result_common_prefixes.len() as i64, result_keys.len() as i64 + result_common_prefixes.len() as i64,
)), )),
is_truncated: s3_xml::Value(format!("{}", truncated)), is_truncated: s3_xml::Value(format!("{}", truncated.is_some())),
contents: vec![], contents: vec![],
common_prefixes: vec![], common_prefixes: vec![],
}; };
@ -289,27 +197,16 @@ pub async fn handle_list(
if let Some(sa) = &query.start_after { if let Some(sa) = &query.start_after {
result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp)); result.start_after = Some(uriencode_maybe(sa, query.urlencode_resp));
} }
if truncated { if let Some(nct) = truncated {
let b64 = base64::encode(next_chunk_start.as_bytes()); result.next_continuation_token = Some(s3_xml::Value(base64::encode(nct.as_bytes())));
let nct = if next_chunk_exclude_start {
format!("]{}", b64)
} else {
format!("[{}", b64)
};
result.next_continuation_token = Some(s3_xml::Value(nct));
} }
} else { } else {
// TODO: are these supposed to be urlencoded when encoding-type is URL?? // TODO: are these supposed to be urlencoded when encoding-type is URL??
if let Some(mkr) = &query.marker { if let Some(mkr) = &query.marker {
result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp)); result.marker = Some(uriencode_maybe(mkr, query.urlencode_resp));
} }
if truncated { if let Some(next_marker) = truncated {
if let Some(lpi) = last_processed_item { result.next_marker = Some(uriencode_maybe(&next_marker, query.urlencode_resp));
result.next_marker = Some(uriencode_maybe(&lpi, query.urlencode_resp));
} else {
return Err(Error::InternalError(GarageError::Message(
"S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it)".to_string())));
}
} }
} }
@ -324,6 +221,7 @@ pub async fn handle_list(
} }
for pfx in result_common_prefixes.iter() { for pfx in result_common_prefixes.iter() {
//TODO: in V1, are these urlencoded when urlencode_resp is true ?? (proably)
result.common_prefixes.push(s3_xml::CommonPrefix { result.common_prefixes.push(s3_xml::CommonPrefix {
prefix: uriencode_maybe(pfx, query.urlencode_resp), prefix: uriencode_maybe(pfx, query.urlencode_resp),
}); });

View file

@ -423,11 +423,7 @@ impl AdminRpcHandler {
writeln!( writeln!(
&mut ret, &mut ret,
"\nGarage version: {}", "\nGarage version: {}",
git_version::git_version!( git_version::git_version!()
prefix = "git:",
cargo_prefix = "cargo:",
fallback = "unknown"
)
) )
.unwrap(); .unwrap();