2021-12-06 14:17:47 +00:00
use std ::collections ::{ BTreeMap , BTreeSet } ;
2020-04-24 18:47:11 +00:00
use std ::sync ::Arc ;
2020-07-07 15:15:53 +00:00
use hyper ::{ Body , Response } ;
2020-04-24 18:47:11 +00:00
2021-12-14 12:55:11 +00:00
use garage_util ::data ::* ;
2021-02-19 15:44:06 +00:00
use garage_util ::error ::Error as GarageError ;
2021-03-15 15:21:41 +00:00
use garage_util ::time ::* ;
2020-04-24 18:47:11 +00:00
2020-07-07 11:59:22 +00:00
use garage_model ::garage ::Garage ;
2020-07-08 15:33:24 +00:00
use garage_model ::object_table ::* ;
2020-04-24 18:47:11 +00:00
2020-11-20 19:11:04 +00:00
use garage_table ::DeletedFilter ;
2020-04-28 10:18:14 +00:00
use crate ::encoding ::* ;
2021-02-19 15:44:06 +00:00
use crate ::error ::* ;
2021-05-03 20:45:42 +00:00
use crate ::s3_xml ;
2021-02-19 15:44:06 +00:00
#[ derive(Debug) ]
pub struct ListObjectsQuery {
pub is_v2 : bool ,
2021-12-14 12:55:11 +00:00
pub bucket_name : String ,
pub bucket_id : Uuid ,
2021-02-19 15:44:06 +00:00
pub delimiter : Option < String > ,
pub max_keys : usize ,
pub prefix : String ,
pub marker : Option < String > ,
pub continuation_token : Option < String > ,
pub start_after : Option < String > ,
pub urlencode_resp : bool ,
}
2020-04-24 18:47:11 +00:00
#[ derive(Debug) ]
struct ListResultInfo {
last_modified : u64 ,
size : u64 ,
2020-12-06 14:39:03 +00:00
etag : String ,
2020-04-24 18:47:11 +00:00
}
pub async fn handle_list (
garage : Arc < Garage > ,
2021-02-19 15:44:06 +00:00
query : & ListObjectsQuery ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2020-04-26 16:22:33 +00:00
let mut result_keys = BTreeMap ::< String , ListResultInfo > ::new ( ) ;
let mut result_common_prefixes = BTreeSet ::< String > ::new ( ) ;
2020-05-01 14:30:50 +00:00
2021-10-08 16:35:38 +00:00
// Determine the key from where we want to start fetch objects
// from the database, and whether the object at this key must
// be included or excluded from the response.
// This key can be the prefix in the base case, or intermediate
// points in the dataset if we are continuing a previous listing.
#[ allow(clippy::collapsible_else_if) ]
let ( mut next_chunk_start , mut next_chunk_exclude_start ) = if query . is_v2 {
2021-02-19 15:44:06 +00:00
if let Some ( ct ) = & query . continuation_token {
2021-10-08 16:35:38 +00:00
// In V2 mode, the continuation token is defined as an opaque
// string in the spec, so we can do whatever we want with it.
// In our case, it is defined as either [ or ] (for include
// and exclude, respectively), followed by a base64 string
// representing the key to start with.
let exclude = match & ct [ .. 1 ] {
" [ " = > false ,
" ] " = > true ,
_ = > return Err ( Error ::BadRequest ( " Invalid continuation token " . to_string ( ) ) ) ,
} ;
(
String ::from_utf8 ( base64 ::decode ( ct [ 1 .. ] . as_bytes ( ) ) ? ) ? ,
exclude ,
)
} else if let Some ( sa ) = & query . start_after {
// StartAfter has defined semantics in the spec:
// start listing at the first key immediately after.
( sa . clone ( ) , true )
2021-02-19 15:44:06 +00:00
} else {
2021-10-08 16:35:38 +00:00
// In the case where neither is specified, we start
// listing at the specified prefix. If an object has this
// exact same key, we include it. (TODO is this correct?)
( query . prefix . clone ( ) , false )
2021-02-19 15:44:06 +00:00
}
} else {
2021-10-08 16:35:38 +00:00
if let Some ( mk ) = & query . marker {
// In V1 mode, the spec defines the Marker value to mean
// the same thing as the StartAfter value in V2 mode.
( mk . clone ( ) , true )
} else {
// Base case, same as in V2 mode
( query . prefix . clone ( ) , false )
}
2021-02-19 15:44:06 +00:00
} ;
2020-04-24 18:47:11 +00:00
2021-02-19 15:44:06 +00:00
debug! (
2021-10-08 16:35:38 +00:00
" List request: `{:?}` {} `{}`, start from {}, exclude first {} " ,
query . delimiter , query . max_keys , query . prefix , next_chunk_start , next_chunk_exclude_start
2021-02-19 15:44:06 +00:00
) ;
2020-04-24 18:47:11 +00:00
2021-10-08 16:35:38 +00:00
// `truncated` is a boolean that determines whether there are
// more items to be added.
2020-05-01 15:52:35 +00:00
let truncated ;
2021-10-08 16:35:38 +00:00
// `last_processed_item` is the key of the last item
// that was included in the listing before truncating.
let mut last_processed_item = None ;
2020-05-01 15:52:35 +00:00
' query_loop : loop {
2021-10-08 16:35:38 +00:00
// Fetch objects
2020-04-24 18:47:11 +00:00
let objects = garage
. object_table
. get_range (
2021-12-14 12:55:11 +00:00
& query . bucket_id ,
2020-04-24 18:47:11 +00:00
Some ( next_chunk_start . clone ( ) ) ,
2020-11-20 19:11:04 +00:00
Some ( DeletedFilter ::NotDeleted ) ,
2021-02-19 15:44:06 +00:00
query . max_keys + 1 ,
2020-04-24 18:47:11 +00:00
)
. await ? ;
2020-05-01 14:30:50 +00:00
debug! (
" List: get range {} (max {}), results: {} " ,
next_chunk_start ,
2021-02-19 15:44:06 +00:00
query . max_keys + 1 ,
2020-05-01 14:30:50 +00:00
objects . len ( )
) ;
2021-10-08 16:35:38 +00:00
let current_chunk_start = next_chunk_start . clone ( ) ;
2020-05-01 14:30:50 +00:00
2021-10-08 16:35:38 +00:00
// Iterate on returned objects and add them to the response.
// If a delimiter is specified, we take care of grouping objects
// into CommonPrefixes.
2020-04-24 18:47:11 +00:00
for object in objects . iter ( ) {
2021-10-08 16:35:38 +00:00
// If we have retrieved an object that doesn't start with
// the prefix, we know we have finished listing our stuff.
2021-02-19 15:44:06 +00:00
if ! object . key . starts_with ( & query . prefix ) {
2021-10-08 16:35:38 +00:00
truncated = false ;
2020-05-01 15:52:35 +00:00
break 'query_loop ;
2020-05-01 14:30:50 +00:00
}
2021-02-19 15:44:06 +00:00
2021-10-08 16:35:38 +00:00
// Exclude the starting key if we have to.
if object . key = = next_chunk_start & & next_chunk_exclude_start {
2021-02-19 15:44:06 +00:00
continue ;
}
2021-10-08 16:35:38 +00:00
// Find if this object has a currently valid (non-deleted,
// non-still-uploading) version. If not, skip it.
let version = match object . versions ( ) . iter ( ) . find ( | x | x . is_data ( ) ) {
Some ( v ) = > v ,
None = > continue ,
} ;
// If we don't have space to add this object to our response,
// we will need to stop here and mark the key of this object
// as the marker from where
// we want to start again in the next list call.
let cannot_add = result_keys . len ( ) + result_common_prefixes . len ( ) > = query . max_keys ;
// Determine whether this object should be grouped inside
// a CommonPrefix because it contains the delimiter,
// or if it should be returned as an object.
let common_prefix = match & query . delimiter {
Some ( delimiter ) = > object . key [ query . prefix . len ( ) .. ]
. find ( delimiter )
. map ( | i | & object . key [ .. query . prefix . len ( ) + i + delimiter . len ( ) ] ) ,
None = > None ,
} ;
if let Some ( pfx ) = common_prefix {
// In the case where this object must be grouped in a
// common prefix, handle it here.
if ! result_common_prefixes . contains ( pfx ) {
// Determine the first listing key that starts after
// the common prefix, by finding the next possible
// string by alphabetical order.
let mut first_key_after_prefix = pfx . to_string ( ) ;
let tail = first_key_after_prefix . pop ( ) . unwrap ( ) ;
first_key_after_prefix . push ( ( ( tail as u8 ) + 1 ) as char ) ;
// If this were the end of the chunk,
// the next chunk should start after this prefix
next_chunk_start = first_key_after_prefix ;
next_chunk_exclude_start = false ;
if cannot_add {
truncated = true ;
break 'query_loop ;
}
2020-04-26 16:22:33 +00:00
result_common_prefixes . insert ( pfx . to_string ( ) ) ;
2021-10-08 16:35:38 +00:00
}
last_processed_item = Some ( object . key . clone ( ) ) ;
continue ;
} ;
// This is not a common prefix, we want to add it to our
// response directly.
next_chunk_start = object . key . clone ( ) ;
if cannot_add {
truncated = true ;
next_chunk_exclude_start = false ;
break 'query_loop ;
2020-04-24 18:47:11 +00:00
}
2021-10-08 16:35:38 +00:00
let meta = match & version . state {
ObjectVersionState ::Complete ( ObjectVersionData ::Inline ( meta , _ ) ) = > meta ,
ObjectVersionState ::Complete ( ObjectVersionData ::FirstBlock ( meta , _ ) ) = > meta ,
_ = > unreachable! ( ) ,
} ;
let info = match result_keys . get ( & object . key ) {
None = > ListResultInfo {
last_modified : version . timestamp ,
size : meta . size ,
etag : meta . etag . to_string ( ) ,
} ,
Some ( _lri ) = > {
return Err ( Error ::InternalError ( GarageError ::Message ( format! (
" Duplicate key?? {} (this is a bug, please report it) " ,
object . key
) ) ) )
}
} ;
result_keys . insert ( object . key . clone ( ) , info ) ;
last_processed_item = Some ( object . key . clone ( ) ) ;
next_chunk_exclude_start = true ;
2020-04-24 18:47:11 +00:00
}
2021-10-08 16:35:38 +00:00
// If our database returned less objects than what we were asking for,
// it means that no more objects are in the bucket. So we stop here.
2021-02-19 15:44:06 +00:00
if objects . len ( ) < query . max_keys + 1 {
2021-10-08 16:35:38 +00:00
truncated = false ;
2020-05-04 13:09:23 +00:00
break 'query_loop ;
2020-04-24 18:47:11 +00:00
}
2021-10-08 16:35:38 +00:00
// Sanity check: we should have added at least an object
// or a prefix to our returned result.
if next_chunk_start = = current_chunk_start | | last_processed_item . is_none ( ) {
return Err ( Error ::InternalError ( GarageError ::Message ( format! (
" S3 ListObject: made no progress, still starting at {} (this is a bug, please report it) " , next_chunk_start ) ) ) ) ;
2020-04-24 18:47:11 +00:00
}
2021-10-08 16:35:38 +00:00
// Loop and fetch more objects
2020-04-24 18:47:11 +00:00
}
2021-05-03 20:45:42 +00:00
let mut result = s3_xml ::ListBucketResult {
xmlns : ( ) ,
2021-12-14 12:55:11 +00:00
name : s3_xml ::Value ( query . bucket_name . to_string ( ) ) ,
2021-05-03 20:45:42 +00:00
prefix : uriencode_maybe ( & query . prefix , query . urlencode_resp ) ,
marker : None ,
next_marker : None ,
start_after : None ,
continuation_token : None ,
next_continuation_token : None ,
max_keys : s3_xml ::IntValue ( query . max_keys as i64 ) ,
delimiter : query
. delimiter
. as_ref ( )
. map ( | x | uriencode_maybe ( x , query . urlencode_resp ) ) ,
encoding_type : match query . urlencode_resp {
true = > Some ( s3_xml ::Value ( " url " . to_string ( ) ) ) ,
false = > None ,
} ,
key_count : Some ( s3_xml ::IntValue (
result_keys . len ( ) as i64 + result_common_prefixes . len ( ) as i64 ,
) ) ,
2021-10-08 16:35:38 +00:00
is_truncated : s3_xml ::Value ( format! ( " {} " , truncated ) ) ,
2021-05-03 20:45:42 +00:00
contents : vec ! [ ] ,
common_prefixes : vec ! [ ] ,
} ;
2021-02-19 15:44:06 +00:00
if query . is_v2 {
if let Some ( ct ) = & query . continuation_token {
2021-05-03 20:45:42 +00:00
result . continuation_token = Some ( s3_xml ::Value ( ct . to_string ( ) ) ) ;
2021-02-19 15:44:06 +00:00
}
if let Some ( sa ) = & query . start_after {
2021-05-03 20:45:42 +00:00
result . start_after = Some ( uriencode_maybe ( sa , query . urlencode_resp ) ) ;
2021-02-19 15:44:06 +00:00
}
2021-10-08 16:35:38 +00:00
if truncated {
let b64 = base64 ::encode ( next_chunk_start . as_bytes ( ) ) ;
let nct = if next_chunk_exclude_start {
format! ( " ] {} " , b64 )
} else {
format! ( " [ {} " , b64 )
} ;
result . next_continuation_token = Some ( s3_xml ::Value ( nct ) ) ;
2021-02-19 15:44:06 +00:00
}
} else {
// TODO: are these supposed to be urlencoded when encoding-type is URL??
if let Some ( mkr ) = & query . marker {
2021-05-03 20:45:42 +00:00
result . marker = Some ( uriencode_maybe ( mkr , query . urlencode_resp ) ) ;
2021-02-19 15:44:06 +00:00
}
2021-10-08 16:35:38 +00:00
if truncated {
if let Some ( lpi ) = last_processed_item {
result . next_marker = Some ( uriencode_maybe ( & lpi , query . urlencode_resp ) ) ;
} else {
return Err ( Error ::InternalError ( GarageError ::Message (
" S3 ListObject: last_processed_item is None but the response was truncated, indicating that many items were processed (this is a bug, please report it) " . to_string ( ) ) ) ) ;
}
2021-02-19 15:44:06 +00:00
}
2020-12-06 14:39:03 +00:00
}
2021-02-19 15:44:06 +00:00
2020-04-26 16:22:33 +00:00
for ( key , info ) in result_keys . iter ( ) {
2021-05-03 20:45:42 +00:00
result . contents . push ( s3_xml ::ListBucketItem {
key : uriencode_maybe ( key , query . urlencode_resp ) ,
last_modified : s3_xml ::Value ( msec_to_rfc3339 ( info . last_modified ) ) ,
size : s3_xml ::IntValue ( info . size as i64 ) ,
etag : s3_xml ::Value ( info . etag . to_string ( ) ) ,
storage_class : s3_xml ::Value ( " STANDARD " . to_string ( ) ) ,
} ) ;
2020-04-24 18:47:11 +00:00
}
2021-02-19 15:44:06 +00:00
for pfx in result_common_prefixes . iter ( ) {
2021-05-03 20:45:42 +00:00
result . common_prefixes . push ( s3_xml ::CommonPrefix {
prefix : uriencode_maybe ( pfx , query . urlencode_resp ) ,
} ) ;
2020-04-26 16:22:33 +00:00
}
2021-02-19 15:44:06 +00:00
2021-05-03 20:45:42 +00:00
let xml = s3_xml ::to_xml_with_header ( & result ) ? ;
2020-04-24 18:47:11 +00:00
2021-02-19 22:40:18 +00:00
Ok ( Response ::builder ( )
2021-02-23 17:46:25 +00:00
. header ( " Content-Type " , " application/xml " )
. body ( Body ::from ( xml . into_bytes ( ) ) ) ? )
2020-04-26 18:55:13 +00:00
}
2021-05-03 20:45:42 +00:00
fn uriencode_maybe ( s : & str , yes : bool ) -> s3_xml ::Value {
if yes {
s3_xml ::Value ( uri_encode ( s , true ) )
} else {
s3_xml ::Value ( s . to_string ( ) )
}
}