2022-06-15 18:20:28 +00:00
use std ::collections ::{ BTreeMap , BTreeSet , HashMap , VecDeque } ;
2020-04-24 18:47:11 +00:00
use std ::sync ::Arc ;
2022-02-18 16:05:19 +00:00
use futures ::prelude ::* ;
2022-01-17 09:55:31 +00:00
use hyper ::body ::{ Body , Bytes } ;
2022-02-21 22:02:30 +00:00
use hyper ::header ::{ HeaderMap , HeaderValue } ;
2022-01-17 09:55:31 +00:00
use hyper ::{ Request , Response } ;
2020-11-22 10:04:33 +00:00
use md5 ::{ digest ::generic_array ::* , Digest as Md5Digest , Md5 } ;
2021-03-16 14:58:40 +00:00
use sha2 ::Sha256 ;
2020-04-24 18:47:11 +00:00
2022-07-18 16:40:57 +00:00
use opentelemetry ::{
trace ::{ FutureExt as OtelFutureExt , TraceContextExt , Tracer } ,
Context ,
} ;
2020-04-28 10:18:14 +00:00
use garage_table ::* ;
2022-07-18 15:18:47 +00:00
use garage_util ::async_hash ::* ;
2020-04-24 18:47:11 +00:00
use garage_util ::data ::* ;
2020-11-08 14:04:30 +00:00
use garage_util ::error ::Error as GarageError ;
2021-03-15 15:21:41 +00:00
use garage_util ::time ::* ;
2020-04-24 18:47:11 +00:00
2022-03-15 11:04:12 +00:00
use garage_block ::manager ::INLINE_THRESHOLD ;
2022-06-15 18:20:28 +00:00
use garage_model ::bucket_table ::Bucket ;
2020-07-07 11:59:22 +00:00
use garage_model ::garage ::Garage ;
2022-06-15 18:20:28 +00:00
use garage_model ::index_counter ::CountedItem ;
2022-05-10 11:16:57 +00:00
use garage_model ::s3 ::block_ref_table ::* ;
use garage_model ::s3 ::object_table ::* ;
use garage_model ::s3 ::version_table ::* ;
2020-04-24 18:47:11 +00:00
2022-05-24 10:16:39 +00:00
use crate ::s3 ::error ::* ;
2022-05-10 11:16:57 +00:00
use crate ::s3 ::xml as s3_xml ;
2022-02-18 16:05:19 +00:00
use crate ::signature ::verify_signed_content ;
2020-04-26 20:39:32 +00:00
2020-04-24 18:47:11 +00:00
pub async fn handle_put (
garage : Arc < Garage > ,
2020-04-26 20:39:32 +00:00
req : Request < Body > ,
2022-06-15 18:20:28 +00:00
bucket : & Bucket ,
2020-04-24 18:47:11 +00:00
key : & str ,
2022-02-18 16:05:19 +00:00
content_sha256 : Option < Hash > ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2021-02-19 11:11:02 +00:00
// Retrieve interesting headers from request
2022-02-21 22:02:30 +00:00
let headers = get_headers ( req . headers ( ) ) ? ;
2021-02-19 11:38:22 +00:00
debug! ( " Object headers: {:?} " , headers ) ;
2020-07-15 13:31:13 +00:00
let content_md5 = match req . headers ( ) . get ( " content-md5 " ) {
Some ( x ) = > Some ( x . to_str ( ) ? . to_string ( ) ) ,
None = > None ,
} ;
2020-07-08 15:33:24 +00:00
2022-02-18 16:05:19 +00:00
let ( _head , body ) = req . into_parts ( ) ;
2022-01-17 09:55:31 +00:00
let body = body . map_err ( Error ::from ) ;
2022-02-21 22:02:30 +00:00
save_stream (
garage ,
headers ,
body ,
2022-06-15 18:20:28 +00:00
bucket ,
2022-02-21 22:02:30 +00:00
key ,
content_md5 ,
content_sha256 ,
)
. await
. map ( | ( uuid , md5 ) | put_response ( uuid , md5 ) )
}
pub ( crate ) async fn save_stream < S : Stream < Item = Result < Bytes , Error > > + Unpin > (
garage : Arc < Garage > ,
headers : ObjectVersionHeaders ,
body : S ,
2022-06-15 18:20:28 +00:00
bucket : & Bucket ,
2022-02-21 22:02:30 +00:00
key : & str ,
content_md5 : Option < String > ,
content_sha256 : Option < FixedBytes32 > ,
) -> Result < ( Uuid , String ) , Error > {
// Generate identity of new version
let version_uuid = gen_uuid ( ) ;
let version_timestamp = now_msec ( ) ;
2022-01-17 09:55:31 +00:00
let mut chunker = StreamChunker ::new ( body , garage . config . block_size ) ;
2021-04-23 20:18:00 +00:00
let first_block = chunker . next ( ) . await ? . unwrap_or_default ( ) ;
2020-04-24 18:47:11 +00:00
2021-02-19 11:11:02 +00:00
// If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately.
2020-04-24 18:47:11 +00:00
if first_block . len ( ) < INLINE_THRESHOLD {
2020-07-15 13:31:13 +00:00
let mut md5sum = Md5 ::new ( ) ;
md5sum . update ( & first_block [ .. ] ) ;
2021-03-10 15:33:31 +00:00
let data_md5sum = md5sum . finalize ( ) ;
let data_md5sum_hex = hex ::encode ( data_md5sum ) ;
2020-07-13 14:51:30 +00:00
2021-03-10 15:33:31 +00:00
let data_sha256sum = sha256sum ( & first_block [ .. ] ) ;
2022-06-15 18:20:28 +00:00
let size = first_block . len ( ) as u64 ;
2020-11-22 10:14:46 +00:00
ensure_checksum_matches (
2021-03-10 15:33:31 +00:00
data_md5sum . as_slice ( ) ,
data_sha256sum ,
2020-11-22 10:14:46 +00:00
content_md5 . as_deref ( ) ,
content_sha256 ,
) ? ;
2022-06-15 18:20:28 +00:00
check_quotas ( & garage , bucket , key , size ) . await ? ;
2021-02-19 11:11:02 +00:00
let object_version = ObjectVersion {
uuid : version_uuid ,
timestamp : version_timestamp ,
state : ObjectVersionState ::Complete ( ObjectVersionData ::Inline (
ObjectVersionMeta {
headers ,
2022-06-15 18:20:28 +00:00
size ,
2021-03-10 15:33:31 +00:00
etag : data_md5sum_hex . clone ( ) ,
2021-02-19 11:11:02 +00:00
} ,
first_block ,
) ) ,
} ;
2020-04-24 18:47:11 +00:00
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version ] ) ;
2020-04-24 18:47:11 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-02-19 11:11:02 +00:00
2022-02-21 22:02:30 +00:00
return Ok ( ( version_uuid , data_md5sum_hex ) ) ;
2020-04-24 18:47:11 +00:00
}
2021-02-19 11:11:02 +00:00
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
uuid : version_uuid ,
2021-03-15 14:26:29 +00:00
timestamp : version_timestamp ,
2021-02-19 11:11:02 +00:00
state : ObjectVersionState ::Uploading ( headers . clone ( ) ) ,
} ;
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version . clone ( ) ] ) ;
2020-04-24 18:47:11 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-02-19 11:11:02 +00:00
// Initialize corresponding entry in version table
2021-03-15 14:26:29 +00:00
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
2022-06-15 18:20:28 +00:00
let version = Version ::new ( version_uuid , bucket . id , key . into ( ) , false ) ;
2021-03-15 14:26:29 +00:00
garage . version_table . insert ( & version ) . await ? ;
2021-02-19 11:11:02 +00:00
// Transfer data and verify checksum
2022-07-18 15:18:47 +00:00
let first_block = Bytes ::from ( first_block ) ;
let first_block_hash = async_blake2sum ( first_block . clone ( ) ) . await ;
2022-06-15 18:20:28 +00:00
let tx_result = ( | | async {
let ( total_size , data_md5sum , data_sha256sum ) = read_and_put_blocks (
& garage ,
& version ,
1 ,
first_block ,
first_block_hash ,
& mut chunker ,
)
. await ? ;
2021-02-19 11:11:02 +00:00
ensure_checksum_matches (
2021-03-10 15:33:31 +00:00
data_md5sum . as_slice ( ) ,
data_sha256sum ,
2021-02-19 11:11:02 +00:00
content_md5 . as_deref ( ) ,
content_sha256 ,
2022-06-15 18:20:28 +00:00
) ? ;
check_quotas ( & garage , bucket , key , total_size ) . await ? ;
Ok ( ( total_size , data_md5sum ) )
} ) ( )
. await ;
2021-02-19 11:11:02 +00:00
// If something went wrong, clean up
let ( total_size , md5sum_arr ) = match tx_result {
Ok ( rv ) = > rv ,
Err ( e ) = > {
// Mark object as aborted, this will free the blocks further down
object_version . state = ObjectVersionState ::Aborted ;
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version . clone ( ) ] ) ;
2021-02-19 11:11:02 +00:00
garage . object_table . insert ( & object ) . await ? ;
return Err ( e ) ;
}
} ;
2020-11-22 10:04:33 +00:00
2021-02-19 11:11:02 +00:00
// Save final object state, marked as Complete
2020-11-22 10:04:33 +00:00
let md5sum_hex = hex ::encode ( md5sum_arr ) ;
object_version . state = ObjectVersionState ::Complete ( ObjectVersionData ::FirstBlock (
ObjectVersionMeta {
headers ,
size : total_size ,
etag : md5sum_hex . clone ( ) ,
} ,
first_block_hash ,
) ) ;
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version ] ) ;
2020-11-22 10:04:33 +00:00
garage . object_table . insert ( & object ) . await ? ;
2022-02-21 22:02:30 +00:00
Ok ( ( version_uuid , md5sum_hex ) )
2020-11-22 10:04:33 +00:00
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
fn ensure_checksum_matches (
2021-03-10 15:33:31 +00:00
data_md5sum : & [ u8 ] ,
data_sha256sum : garage_util ::data ::FixedBytes32 ,
2020-11-22 10:04:33 +00:00
content_md5 : Option < & str > ,
content_sha256 : Option < garage_util ::data ::FixedBytes32 > ,
) -> Result < ( ) , Error > {
2020-07-15 13:31:13 +00:00
if let Some ( expected_sha256 ) = content_sha256 {
2021-03-10 15:33:31 +00:00
if expected_sha256 ! = data_sha256sum {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request (
" Unable to validate x-amz-content-sha256 " ,
2021-04-23 20:18:00 +00:00
) ) ;
2020-07-15 13:41:49 +00:00
} else {
trace! ( " Successfully validated x-amz-content-sha256 " ) ;
2020-07-15 13:31:13 +00:00
}
}
if let Some ( expected_md5 ) = content_md5 {
2021-03-10 15:33:31 +00:00
if expected_md5 . trim_matches ( '"' ) ! = base64 ::encode ( data_md5sum ) {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request ( " Unable to validate content-md5 " ) ) ;
2020-07-15 13:41:49 +00:00
} else {
trace! ( " Successfully validated content-md5 " ) ;
2020-07-15 13:31:13 +00:00
}
}
2020-11-22 10:04:33 +00:00
Ok ( ( ) )
2020-04-26 20:39:32 +00:00
}
2022-06-15 18:20:28 +00:00
/// Check that inserting this object with this size doesn't exceed bucket quotas
async fn check_quotas (
garage : & Arc < Garage > ,
bucket : & Bucket ,
key : & str ,
size : u64 ,
) -> Result < ( ) , Error > {
let quotas = bucket . state . as_option ( ) . unwrap ( ) . quotas . get ( ) ;
if quotas . max_objects . is_none ( ) & & quotas . max_size . is_none ( ) {
return Ok ( ( ) ) ;
} ;
let key = key . to_string ( ) ;
let ( prev_object , counters ) = futures ::try_join! (
garage . object_table . get ( & bucket . id , & key ) ,
garage . object_counter_table . table . get ( & bucket . id , & EmptyKey ) ,
) ? ;
let counters = counters
. map ( | x | x . filtered_values ( & garage . system . ring . borrow ( ) ) )
. unwrap_or_default ( ) ;
let ( prev_cnt_obj , prev_cnt_size ) = match prev_object {
Some ( o ) = > {
let prev_cnt = o . counts ( ) . into_iter ( ) . collect ::< HashMap < _ , _ > > ( ) ;
(
prev_cnt . get ( OBJECTS ) . cloned ( ) . unwrap_or_default ( ) ,
prev_cnt . get ( BYTES ) . cloned ( ) . unwrap_or_default ( ) ,
)
}
None = > ( 0 , 0 ) ,
} ;
let cnt_obj_diff = 1 - prev_cnt_obj ;
let cnt_size_diff = size as i64 - prev_cnt_size ;
if let Some ( mo ) = quotas . max_objects {
let current_objects = counters . get ( OBJECTS ) . cloned ( ) . unwrap_or_default ( ) ;
if cnt_obj_diff > 0 & & current_objects + cnt_obj_diff > mo as i64 {
return Err ( Error ::forbidden ( format! (
" Object quota is reached, maximum objects for this bucket: {} " ,
mo
) ) ) ;
}
}
if let Some ( ms ) = quotas . max_size {
let current_size = counters . get ( BYTES ) . cloned ( ) . unwrap_or_default ( ) ;
if cnt_size_diff > 0 & & current_size + cnt_size_diff > ms as i64 {
return Err ( Error ::forbidden ( format! (
" Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes. " ,
ms , current_size , size
) ) ) ;
}
}
Ok ( ( ) )
}
2022-01-17 09:55:31 +00:00
async fn read_and_put_blocks < S : Stream < Item = Result < Bytes , Error > > + Unpin > (
2021-03-15 14:26:29 +00:00
garage : & Garage ,
2021-03-10 16:01:05 +00:00
version : & Version ,
2020-04-26 20:39:32 +00:00
part_number : u64 ,
2022-07-18 15:18:47 +00:00
first_block : Bytes ,
2020-04-26 20:39:32 +00:00
first_block_hash : Hash ,
2022-01-17 09:55:31 +00:00
chunker : & mut StreamChunker < S > ,
2020-11-22 10:04:33 +00:00
) -> Result < ( u64 , GenericArray < u8 , typenum ::U16 > , Hash ) , Error > {
2022-07-18 16:40:57 +00:00
let tracer = opentelemetry ::global ::tracer ( " garage " ) ;
2022-07-18 15:18:47 +00:00
let md5hasher = AsyncHasher ::< Md5 > ::new ( ) ;
let sha256hasher = AsyncHasher ::< Sha256 > ::new ( ) ;
2022-07-18 16:40:57 +00:00
futures ::future ::join (
md5hasher . update ( first_block . clone ( ) ) ,
sha256hasher . update ( first_block . clone ( ) ) ,
)
. with_context ( Context ::current_with_span (
tracer . start ( " Hash first block (md5, sha256) " ) ,
) )
. await ;
2020-07-13 14:51:30 +00:00
2020-04-24 18:47:11 +00:00
let mut next_offset = first_block . len ( ) ;
2020-04-28 10:18:14 +00:00
let mut put_curr_version_block = put_block_meta (
2021-10-26 08:20:05 +00:00
garage ,
version ,
2020-04-28 10:18:14 +00:00
part_number ,
0 ,
first_block_hash ,
first_block . len ( ) as u64 ,
) ;
2020-04-24 18:47:11 +00:00
let mut put_curr_block = garage
. block_manager
. rpc_put_block ( first_block_hash , first_block ) ;
loop {
2022-01-17 09:55:31 +00:00
let ( _ , _ , next_block ) = futures ::try_join! (
put_curr_block . map_err ( Error ::from ) ,
put_curr_version_block . map_err ( Error ::from ) ,
chunker . next ( ) ,
) ? ;
2020-04-24 18:47:11 +00:00
if let Some ( block ) = next_block {
2022-07-18 15:18:47 +00:00
let block = Bytes ::from ( block ) ;
2022-07-18 16:40:57 +00:00
let ( _ , _ , block_hash ) = futures ::future ::join3 (
md5hasher . update ( block . clone ( ) ) ,
sha256hasher . update ( block . clone ( ) ) ,
async_blake2sum ( block . clone ( ) ) ,
)
. with_context ( Context ::current_with_span (
tracer . start ( " Hash block (md5, sha256, blake2) " ) ,
) )
. await ;
2020-04-24 18:47:11 +00:00
let block_len = block . len ( ) ;
2020-04-28 10:18:14 +00:00
put_curr_version_block = put_block_meta (
2021-10-26 08:20:05 +00:00
garage ,
version ,
2020-04-28 10:18:14 +00:00
part_number ,
next_offset as u64 ,
block_hash ,
block_len as u64 ,
) ;
2020-04-24 18:47:11 +00:00
put_curr_block = garage . block_manager . rpc_put_block ( block_hash , block ) ;
next_offset + = block_len ;
} else {
break ;
}
}
2020-07-15 13:31:13 +00:00
let total_size = next_offset as u64 ;
2022-07-18 15:18:47 +00:00
let data_md5sum = md5hasher . finalize ( ) . await ;
2020-07-15 13:31:13 +00:00
2022-07-18 15:18:47 +00:00
let data_sha256sum = sha256hasher . finalize ( ) . await ;
2021-03-12 18:57:37 +00:00
let data_sha256sum = Hash ::try_from ( & data_sha256sum [ .. ] ) . unwrap ( ) ;
2020-07-15 13:31:13 +00:00
2021-03-10 15:33:31 +00:00
Ok ( ( total_size , data_md5sum , data_sha256sum ) )
2020-04-24 18:47:11 +00:00
}
async fn put_block_meta (
2021-03-15 14:26:29 +00:00
garage : & Garage ,
2020-04-24 18:47:11 +00:00
version : & Version ,
2020-04-26 18:55:13 +00:00
part_number : u64 ,
2020-04-24 18:47:11 +00:00
offset : u64 ,
hash : Hash ,
2020-04-26 20:39:32 +00:00
size : u64 ,
2020-11-08 14:04:30 +00:00
) -> Result < ( ) , GarageError > {
2020-04-24 18:47:11 +00:00
let mut version = version . clone ( ) ;
2021-03-10 15:21:56 +00:00
version . blocks . put (
VersionBlockKey {
2020-04-26 18:55:13 +00:00
part_number ,
offset ,
2021-03-10 15:21:56 +00:00
} ,
VersionBlock { hash , size } ,
) ;
2020-04-24 18:47:11 +00:00
let block_ref = BlockRef {
block : hash ,
version : version . uuid ,
2021-03-10 15:21:56 +00:00
deleted : false . into ( ) ,
2020-04-24 18:47:11 +00:00
} ;
futures ::try_join! (
garage . version_table . insert ( & version ) ,
garage . block_ref_table . insert ( & block_ref ) ,
) ? ;
Ok ( ( ) )
}
2022-01-17 09:55:31 +00:00
struct StreamChunker < S : Stream < Item = Result < Bytes , Error > > > {
stream : S ,
2020-04-24 18:47:11 +00:00
read_all : bool ,
block_size : usize ,
buf : VecDeque < u8 > ,
}
2022-01-17 09:55:31 +00:00
impl < S : Stream < Item = Result < Bytes , Error > > + Unpin > StreamChunker < S > {
fn new ( stream : S , block_size : usize ) -> Self {
2020-04-24 18:47:11 +00:00
Self {
2022-01-17 09:55:31 +00:00
stream ,
2020-04-24 18:47:11 +00:00
read_all : false ,
block_size ,
2020-11-29 16:06:55 +00:00
buf : VecDeque ::with_capacity ( 2 * block_size ) ,
2020-04-24 18:47:11 +00:00
}
}
2022-01-17 09:55:31 +00:00
async fn next ( & mut self ) -> Result < Option < Vec < u8 > > , Error > {
2020-04-24 18:47:11 +00:00
while ! self . read_all & & self . buf . len ( ) < self . block_size {
2022-01-17 09:55:31 +00:00
if let Some ( block ) = self . stream . next ( ) . await {
2020-04-24 18:47:11 +00:00
let bytes = block ? ;
trace! ( " Body next: {} bytes " , bytes . len ( ) ) ;
2022-01-17 09:55:31 +00:00
self . buf . extend ( bytes ) ;
2020-04-24 18:47:11 +00:00
} else {
self . read_all = true ;
}
}
2022-01-17 09:55:31 +00:00
2021-04-23 20:18:00 +00:00
if self . buf . is_empty ( ) {
2020-04-24 18:47:11 +00:00
Ok ( None )
} else if self . buf . len ( ) < = self . block_size {
let block = self . buf . drain ( .. ) . collect ::< Vec < u8 > > ( ) ;
Ok ( Some ( block ) )
} else {
let block = self . buf . drain ( .. self . block_size ) . collect ::< Vec < u8 > > ( ) ;
Ok ( Some ( block ) )
}
}
}
2021-05-02 21:13:08 +00:00
pub fn put_response ( version_uuid : Uuid , md5sum_hex : String ) -> Response < Body > {
2020-05-01 14:30:50 +00:00
Response ::builder ( )
. header ( " x-amz-version-id " , hex ::encode ( version_uuid ) )
2020-11-29 15:38:01 +00:00
. header ( " ETag " , format! ( " \" {} \" " , md5sum_hex ) )
2020-07-07 15:15:53 +00:00
. body ( Body ::from ( vec! [ ] ) )
2020-05-01 14:30:50 +00:00
. unwrap ( )
2020-04-26 20:39:32 +00:00
}
pub async fn handle_create_multipart_upload (
garage : Arc < Garage > ,
req : & Request < Body > ,
2021-12-14 12:55:11 +00:00
bucket_name : & str ,
bucket_id : Uuid ,
2020-04-26 20:39:32 +00:00
key : & str ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2020-04-26 20:39:32 +00:00
let version_uuid = gen_uuid ( ) ;
2022-02-21 22:02:30 +00:00
let headers = get_headers ( req . headers ( ) ) ? ;
2020-04-26 20:39:32 +00:00
2021-03-15 14:26:29 +00:00
// Create object in object table
2020-04-26 20:39:32 +00:00
let object_version = ObjectVersion {
uuid : version_uuid ,
timestamp : now_msec ( ) ,
2020-07-08 15:33:24 +00:00
state : ObjectVersionState ::Uploading ( headers ) ,
2020-04-26 20:39:32 +00:00
} ;
2021-12-14 12:55:11 +00:00
let object = Object ::new ( bucket_id , key . to_string ( ) , vec! [ object_version ] ) ;
2020-04-26 20:39:32 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-03-15 14:26:29 +00:00
// Insert empty version so that block_ref entries refer to something
// (they are inserted concurrently with blocks in the version table, so
// there is the possibility that they are inserted before the version table
// is created, in which case it is allowed to delete them, e.g. in repair_*)
2021-12-14 12:55:11 +00:00
let version = Version ::new ( version_uuid , bucket_id , key . into ( ) , false ) ;
2021-03-15 14:26:29 +00:00
garage . version_table . insert ( & version ) . await ? ;
// Send success response
2021-05-03 20:45:42 +00:00
let result = s3_xml ::InitiateMultipartUploadResult {
xmlns : ( ) ,
2021-12-14 12:55:11 +00:00
bucket : s3_xml ::Value ( bucket_name . to_string ( ) ) ,
2021-05-03 20:45:42 +00:00
key : s3_xml ::Value ( key . to_string ( ) ) ,
upload_id : s3_xml ::Value ( hex ::encode ( version_uuid ) ) ,
} ;
let xml = s3_xml ::to_xml_with_header ( & result ) ? ;
2020-04-26 20:39:32 +00:00
2020-07-07 15:15:53 +00:00
Ok ( Response ::new ( Body ::from ( xml . into_bytes ( ) ) ) )
2020-04-26 20:39:32 +00:00
}
pub async fn handle_put_part (
garage : Arc < Garage > ,
req : Request < Body > ,
2021-12-14 12:55:11 +00:00
bucket_id : Uuid ,
2020-04-26 20:39:32 +00:00
key : & str ,
2021-12-06 14:17:47 +00:00
part_number : u64 ,
2020-04-26 20:39:32 +00:00
upload_id : & str ,
2020-07-15 13:31:13 +00:00
content_sha256 : Option < Hash > ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2020-11-08 14:04:30 +00:00
let version_uuid = decode_upload_id ( upload_id ) ? ;
2020-04-28 10:18:14 +00:00
2020-07-15 13:31:13 +00:00
let content_md5 = match req . headers ( ) . get ( " content-md5 " ) {
Some ( x ) = > Some ( x . to_str ( ) ? . to_string ( ) ) ,
None = > None ,
} ;
2020-04-26 20:39:32 +00:00
// Read first chuck, and at the same time try to get object to see if it exists
let key = key . to_string ( ) ;
2022-01-17 09:55:31 +00:00
let body = req . into_body ( ) . map_err ( Error ::from ) ;
let mut chunker = StreamChunker ::new ( body , garage . config . block_size ) ;
2021-02-19 11:11:02 +00:00
2022-01-11 16:31:09 +00:00
let ( object , version , first_block ) = futures ::try_join! (
2022-01-17 09:55:31 +00:00
garage
. object_table
. get ( & bucket_id , & key )
. map_err ( Error ::from ) ,
garage
. version_table
. get ( & version_uuid , & EmptyKey )
. map_err ( Error ::from ) ,
chunker . next ( ) ,
2022-01-11 16:31:09 +00:00
) ? ;
2020-04-26 20:39:32 +00:00
// Check object is valid and multipart block can be accepted
2022-01-11 16:31:09 +00:00
let first_block = first_block . ok_or_bad_request ( " Empty body " ) ? ;
let object = object . ok_or_bad_request ( " Object not found " ) ? ;
2020-11-11 15:12:42 +00:00
2020-07-08 15:34:37 +00:00
if ! object
. versions ( )
. iter ( )
. any ( | v | v . uuid = = version_uuid & & v . is_uploading ( ) )
{
2022-01-05 16:07:36 +00:00
return Err ( Error ::NoSuchUpload ) ;
2020-04-26 20:39:32 +00:00
}
2022-01-11 16:31:09 +00:00
// Check part hasn't already been uploaded
if let Some ( v ) = version {
if v . has_part_number ( part_number ) {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request ( format! (
2022-01-11 16:31:09 +00:00
" Part number {} has already been uploaded " ,
part_number
) ) ) ;
}
}
2020-04-26 20:39:32 +00:00
// Copy block to store
2021-12-14 12:55:11 +00:00
let version = Version ::new ( version_uuid , bucket_id , key , false ) ;
2022-07-18 15:18:47 +00:00
let first_block = Bytes ::from ( first_block ) ;
let first_block_hash = async_blake2sum ( first_block . clone ( ) ) . await ;
2021-03-10 15:33:31 +00:00
let ( _ , data_md5sum , data_sha256sum ) = read_and_put_blocks (
2020-04-28 10:18:14 +00:00
& garage ,
2021-03-10 16:01:05 +00:00
& version ,
2020-04-28 10:18:14 +00:00
part_number ,
first_block ,
first_block_hash ,
& mut chunker ,
)
. await ? ;
2020-04-26 20:39:32 +00:00
2021-03-10 16:01:05 +00:00
// Verify that checksums map
2020-11-22 10:04:33 +00:00
ensure_checksum_matches (
2021-03-10 15:33:31 +00:00
data_md5sum . as_slice ( ) ,
data_sha256sum ,
2020-11-22 10:04:33 +00:00
content_md5 . as_deref ( ) ,
content_sha256 ,
) ? ;
2020-07-15 13:31:13 +00:00
2021-03-10 16:01:05 +00:00
// Store part etag in version
let data_md5sum_hex = hex ::encode ( data_md5sum ) ;
let mut version = version ;
2021-03-10 20:50:09 +00:00
version
. parts_etags
. put ( part_number , data_md5sum_hex . clone ( ) ) ;
2021-03-10 16:01:05 +00:00
garage . version_table . insert ( & version ) . await ? ;
2020-11-29 15:38:01 +00:00
let response = Response ::builder ( )
2021-03-10 16:01:05 +00:00
. header ( " ETag " , format! ( " \" {} \" " , data_md5sum_hex ) )
2022-02-21 22:02:30 +00:00
. body ( Body ::empty ( ) )
2020-11-29 15:38:01 +00:00
. unwrap ( ) ;
Ok ( response )
2020-04-26 20:39:32 +00:00
}
pub async fn handle_complete_multipart_upload (
garage : Arc < Garage > ,
2021-02-19 23:13:07 +00:00
req : Request < Body > ,
2021-12-14 12:55:11 +00:00
bucket_name : & str ,
2022-06-15 18:20:28 +00:00
bucket : & Bucket ,
2020-04-26 20:39:32 +00:00
key : & str ,
upload_id : & str ,
2021-02-19 23:13:07 +00:00
content_sha256 : Option < Hash > ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2021-02-19 23:13:07 +00:00
let body = hyper ::body ::to_bytes ( req . into_body ( ) ) . await ? ;
2022-01-17 09:55:31 +00:00
if let Some ( content_sha256 ) = content_sha256 {
verify_signed_content ( content_sha256 , & body [ .. ] ) ? ;
}
2021-02-19 23:13:07 +00:00
2021-10-26 08:20:05 +00:00
let body_xml = roxmltree ::Document ::parse ( std ::str ::from_utf8 ( & body ) ? ) ? ;
2022-01-17 10:18:40 +00:00
let body_list_of_parts = parse_complete_multipart_upload_body ( & body_xml )
2021-02-23 17:46:25 +00:00
. ok_or_bad_request ( " Invalid CompleteMultipartUpload XML " ) ? ;
debug! (
" CompleteMultipartUpload list of parts: {:?} " ,
body_list_of_parts
) ;
2021-02-19 23:13:07 +00:00
2020-11-08 14:04:30 +00:00
let version_uuid = decode_upload_id ( upload_id ) ? ;
2020-04-26 20:39:32 +00:00
2022-01-12 11:43:33 +00:00
// Get object and version
2020-04-26 20:39:32 +00:00
let key = key . to_string ( ) ;
let ( object , version ) = futures ::try_join! (
2022-06-15 18:20:28 +00:00
garage . object_table . get ( & bucket . id , & key ) ,
2020-04-26 20:39:32 +00:00
garage . version_table . get ( & version_uuid , & EmptyKey ) ,
) ? ;
2020-11-11 15:12:42 +00:00
2022-01-05 16:07:36 +00:00
let object = object . ok_or ( Error ::NoSuchKey ) ? ;
2021-03-15 14:26:29 +00:00
let mut object_version = object
2020-07-08 15:34:37 +00:00
. versions ( )
. iter ( )
2021-03-15 14:26:29 +00:00
. find ( | v | v . uuid = = version_uuid & & v . is_uploading ( ) )
. cloned ( )
2022-01-05 16:07:36 +00:00
. ok_or ( Error ::NoSuchUpload ) ? ;
2020-11-11 15:12:42 +00:00
2022-01-05 16:07:36 +00:00
let version = version . ok_or ( Error ::NoSuchKey ) ? ;
2021-04-23 20:18:00 +00:00
if version . blocks . is_empty ( ) {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request ( " No data was uploaded " ) ) ;
2020-04-26 20:39:32 +00:00
}
2021-02-19 11:11:02 +00:00
2020-07-08 15:34:37 +00:00
let headers = match object_version . state {
2021-04-23 20:18:00 +00:00
ObjectVersionState ::Uploading ( headers ) = > headers ,
2020-07-08 15:34:37 +00:00
_ = > unreachable! ( ) ,
} ;
2020-04-26 20:39:32 +00:00
2022-01-12 11:43:33 +00:00
// Check that part numbers are an increasing sequence.
// (it doesn't need to start at 1 nor to be a continuous sequence,
// see discussion in #192)
if body_list_of_parts . is_empty ( ) {
return Err ( Error ::EntityTooSmall ) ;
}
if ! body_list_of_parts
. iter ( )
. zip ( body_list_of_parts . iter ( ) . skip ( 1 ) )
. all ( | ( p1 , p2 ) | p1 . part_number < p2 . part_number )
{
return Err ( Error ::InvalidPartOrder ) ;
}
2022-01-25 11:25:29 +00:00
// Garage-specific restriction, see #204: part numbers must be
// consecutive starting at 1
if body_list_of_parts [ 0 ] . part_number ! = 1
| | ! body_list_of_parts
. iter ( )
. zip ( body_list_of_parts . iter ( ) . skip ( 1 ) )
. all ( | ( p1 , p2 ) | p1 . part_number + 1 = = p2 . part_number )
{
return Err ( Error ::NotImplemented ( " Garage does not support completing a Multipart upload with non-consecutive part numbers. This is a restriction of Garage's data model, which might be fixed in a future release. See issue #204 for more information on this topic. " . into ( ) ) ) ;
}
2021-02-19 23:13:07 +00:00
// Check that the list of parts they gave us corresponds to the parts we have here
2021-03-12 13:37:46 +00:00
debug! ( " Expected parts from request: {:?} " , body_list_of_parts ) ;
debug! ( " Parts stored in version: {:?} " , version . parts_etags . items ( ) ) ;
2021-03-10 16:01:05 +00:00
let parts = version
. parts_etags
2021-03-10 15:21:56 +00:00
. items ( )
2021-02-23 17:46:25 +00:00
. iter ( )
2021-03-10 16:01:05 +00:00
. map ( | pair | ( & pair . 0 , & pair . 1 ) ) ;
2021-02-23 17:46:25 +00:00
let same_parts = body_list_of_parts
. iter ( )
2021-03-10 16:01:05 +00:00
. map ( | x | ( & x . part_number , & x . etag ) )
. eq ( parts ) ;
2022-01-12 11:43:33 +00:00
if ! same_parts {
return Err ( Error ::InvalidPart ) ;
}
// Check that all blocks belong to one of the parts
let block_parts = version
. blocks
. items ( )
. iter ( )
. map ( | ( bk , _ ) | bk . part_number )
. collect ::< BTreeSet < _ > > ( ) ;
let same_parts = body_list_of_parts
. iter ( )
. map ( | x | x . part_number )
. eq ( block_parts . into_iter ( ) ) ;
2021-02-19 23:13:07 +00:00
if ! same_parts {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request (
" Part numbers in block list and part list do not match. This can happen if a part was partially uploaded. Please abort the multipart upload and try again. "
2021-04-23 20:18:00 +00:00
) ) ;
2021-02-19 23:13:07 +00:00
}
2021-03-10 16:01:05 +00:00
// Calculate etag of final object
// To understand how etags are calculated, read more here:
// https://teppen.io/2018/06/23/aws_s3_etags/
2022-01-12 11:43:33 +00:00
let num_parts = body_list_of_parts . len ( ) ;
2021-03-10 16:01:05 +00:00
let mut etag_md5_hasher = Md5 ::new ( ) ;
for ( _ , etag ) in version . parts_etags . items ( ) . iter ( ) {
etag_md5_hasher . update ( etag . as_bytes ( ) ) ;
}
2021-03-10 20:50:09 +00:00
let etag = format! ( " {} - {} " , hex ::encode ( etag_md5_hasher . finalize ( ) ) , num_parts ) ;
2020-12-05 18:23:46 +00:00
2021-03-10 16:01:05 +00:00
// Calculate total size of final object
2021-03-15 14:26:29 +00:00
let total_size = version . blocks . items ( ) . iter ( ) . map ( | x | x . 1. size ) . sum ( ) ;
2021-03-10 16:01:05 +00:00
2022-06-15 18:20:28 +00:00
if let Err ( e ) = check_quotas ( & garage , bucket , & key , total_size ) . await {
object_version . state = ObjectVersionState ::Aborted ;
let final_object = Object ::new ( bucket . id , key . clone ( ) , vec! [ object_version ] ) ;
garage . object_table . insert ( & final_object ) . await ? ;
return Err ( e ) ;
}
2021-03-10 16:01:05 +00:00
// Write final object version
2020-07-08 15:34:37 +00:00
object_version . state = ObjectVersionState ::Complete ( ObjectVersionData ::FirstBlock (
ObjectVersionMeta {
headers ,
size : total_size ,
2021-05-03 20:45:42 +00:00
etag : etag . clone ( ) ,
2020-07-08 15:34:37 +00:00
} ,
2021-03-10 15:21:56 +00:00
version . blocks . items ( ) [ 0 ] . 1. hash ,
2020-07-08 15:34:37 +00:00
) ) ;
2020-07-08 15:33:24 +00:00
2022-06-15 18:20:28 +00:00
let final_object = Object ::new ( bucket . id , key . clone ( ) , vec! [ object_version ] ) ;
2020-04-26 20:39:32 +00:00
garage . object_table . insert ( & final_object ) . await ? ;
2021-03-10 16:01:05 +00:00
// Send response saying ok we're done
2021-05-03 20:45:42 +00:00
let result = s3_xml ::CompleteMultipartUploadResult {
xmlns : ( ) ,
location : None ,
2021-12-14 12:55:11 +00:00
bucket : s3_xml ::Value ( bucket_name . to_string ( ) ) ,
2021-05-03 20:45:42 +00:00
key : s3_xml ::Value ( key ) ,
2022-01-12 10:41:20 +00:00
etag : s3_xml ::Value ( format! ( " \" {} \" " , etag ) ) ,
2021-05-03 20:45:42 +00:00
} ;
let xml = s3_xml ::to_xml_with_header ( & result ) ? ;
2020-04-26 20:39:32 +00:00
2020-07-07 15:15:53 +00:00
Ok ( Response ::new ( Body ::from ( xml . into_bytes ( ) ) ) )
2020-04-26 20:39:32 +00:00
}
2020-04-26 20:46:21 +00:00
pub async fn handle_abort_multipart_upload (
garage : Arc < Garage > ,
2021-12-14 12:55:11 +00:00
bucket_id : Uuid ,
2020-04-26 20:46:21 +00:00
key : & str ,
upload_id : & str ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2020-11-08 14:04:30 +00:00
let version_uuid = decode_upload_id ( upload_id ) ? ;
2020-04-26 20:46:21 +00:00
2020-04-28 10:18:14 +00:00
let object = garage
. object_table
2021-12-14 12:55:11 +00:00
. get ( & bucket_id , & key . to_string ( ) )
2020-04-28 10:18:14 +00:00
. await ? ;
2022-01-05 16:07:36 +00:00
let object = object . ok_or ( Error ::NoSuchKey ) ? ;
2020-11-11 15:12:42 +00:00
2020-07-08 15:34:37 +00:00
let object_version = object
. versions ( )
. iter ( )
. find ( | v | v . uuid = = version_uuid & & v . is_uploading ( ) ) ;
2020-04-26 20:46:21 +00:00
let mut object_version = match object_version {
2022-01-05 16:07:36 +00:00
None = > return Err ( Error ::NoSuchUpload ) ,
2020-04-26 20:46:21 +00:00
Some ( x ) = > x . clone ( ) ,
} ;
object_version . state = ObjectVersionState ::Aborted ;
2021-12-14 12:55:11 +00:00
let final_object = Object ::new ( bucket_id , key . to_string ( ) , vec! [ object_version ] ) ;
2020-04-26 20:46:21 +00:00
garage . object_table . insert ( & final_object ) . await ? ;
2020-07-07 15:15:53 +00:00
Ok ( Response ::new ( Body ::from ( vec! [ ] ) ) )
2020-04-26 20:46:21 +00:00
}
2022-02-21 22:02:30 +00:00
fn get_mime_type ( headers : & HeaderMap < HeaderValue > ) -> Result < String , Error > {
Ok ( headers
2020-04-26 20:39:32 +00:00
. get ( hyper ::header ::CONTENT_TYPE )
. map ( | x | x . to_str ( ) )
. unwrap_or ( Ok ( " blob " ) ) ?
. to_string ( ) )
}
2022-02-21 22:02:30 +00:00
pub ( crate ) fn get_headers ( headers : & HeaderMap < HeaderValue > ) -> Result < ObjectVersionHeaders , Error > {
let content_type = get_mime_type ( headers ) ? ;
2021-03-15 15:21:41 +00:00
let mut other = BTreeMap ::new ( ) ;
// Preserve standard headers
let standard_header = vec! [
2020-07-09 15:04:43 +00:00
hyper ::header ::CACHE_CONTROL ,
hyper ::header ::CONTENT_DISPOSITION ,
hyper ::header ::CONTENT_ENCODING ,
hyper ::header ::CONTENT_LANGUAGE ,
hyper ::header ::EXPIRES ,
] ;
2021-03-15 15:21:41 +00:00
for h in standard_header . iter ( ) {
2022-02-21 22:02:30 +00:00
if let Some ( v ) = headers . get ( h ) {
2021-02-19 11:11:02 +00:00
match v . to_str ( ) {
Ok ( v_str ) = > {
other . insert ( h . to_string ( ) , v_str . to_string ( ) ) ;
}
Err ( e ) = > {
warn! ( " Discarding header {}, error in .to_str(): {} " , h , e ) ;
}
2020-07-09 15:04:43 +00:00
}
}
}
2021-03-15 15:21:41 +00:00
// Preserve x-amz-meta- headers
2022-02-21 22:02:30 +00:00
for ( k , v ) in headers . iter ( ) {
2021-03-15 15:21:41 +00:00
if k . as_str ( ) . starts_with ( " x-amz-meta- " ) {
match v . to_str ( ) {
Ok ( v_str ) = > {
other . insert ( k . to_string ( ) , v_str . to_string ( ) ) ;
}
Err ( e ) = > {
warn! ( " Discarding header {}, error in .to_str(): {} " , k , e ) ;
}
}
}
}
2020-07-09 15:04:43 +00:00
Ok ( ObjectVersionHeaders {
content_type ,
2021-02-19 11:38:22 +00:00
other ,
2020-07-09 15:04:43 +00:00
} )
}
2022-01-12 18:04:55 +00:00
pub fn decode_upload_id ( id : & str ) -> Result < Uuid , Error > {
2022-01-05 16:07:36 +00:00
let id_bin = hex ::decode ( id ) . map_err ( | _ | Error ::NoSuchUpload ) ? ;
2020-04-26 20:39:32 +00:00
if id_bin . len ( ) ! = 32 {
2022-01-05 16:07:36 +00:00
return Err ( Error ::NoSuchUpload ) ;
2020-04-26 20:39:32 +00:00
}
let mut uuid = [ 0 u8 ; 32 ] ;
uuid . copy_from_slice ( & id_bin [ .. ] ) ;
2021-05-02 21:13:08 +00:00
Ok ( Uuid ::from ( uuid ) )
2020-04-26 20:39:32 +00:00
}
2021-02-19 23:13:07 +00:00
#[ derive(Debug) ]
struct CompleteMultipartUploadPart {
etag : String ,
part_number : u64 ,
}
2022-01-17 10:18:40 +00:00
fn parse_complete_multipart_upload_body (
2021-02-23 17:46:25 +00:00
xml : & roxmltree ::Document ,
) -> Option < Vec < CompleteMultipartUploadPart > > {
2021-02-19 23:13:07 +00:00
let mut parts = vec! [ ] ;
let root = xml . root ( ) ;
let cmu = root . first_child ( ) ? ;
if ! cmu . has_tag_name ( " CompleteMultipartUpload " ) {
return None ;
}
for item in cmu . children ( ) {
2022-01-17 10:18:40 +00:00
// Only parse <Part> nodes
if ! item . is_element ( ) {
continue ;
}
2021-02-19 23:13:07 +00:00
if item . has_tag_name ( " Part " ) {
let etag = item . children ( ) . find ( | e | e . has_tag_name ( " ETag " ) ) ? . text ( ) ? ;
2021-02-23 17:46:25 +00:00
let part_number = item
. children ( )
. find ( | e | e . has_tag_name ( " PartNumber " ) ) ?
. text ( ) ? ;
parts . push ( CompleteMultipartUploadPart {
2021-02-19 23:13:07 +00:00
etag : etag . trim_matches ( '"' ) . to_string ( ) ,
part_number : part_number . parse ( ) . ok ( ) ? ,
} ) ;
} else {
return None ;
}
}
Some ( parts )
}