2023-05-03 10:02:59 +00:00
use std ::collections ::{ BTreeMap , HashMap } ;
2020-04-24 18:47:11 +00:00
use std ::sync ::Arc ;
2023-01-23 19:14:07 +00:00
use base64 ::prelude ::* ;
2022-02-18 16:05:19 +00:00
use futures ::prelude ::* ;
2023-04-18 16:03:10 +00:00
use futures ::try_join ;
2022-01-17 09:55:31 +00:00
use hyper ::body ::{ Body , Bytes } ;
2022-02-21 22:02:30 +00:00
use hyper ::header ::{ HeaderMap , HeaderValue } ;
2022-01-17 09:55:31 +00:00
use hyper ::{ Request , Response } ;
2020-11-22 10:04:33 +00:00
use md5 ::{ digest ::generic_array ::* , Digest as Md5Digest , Md5 } ;
2021-03-16 14:58:40 +00:00
use sha2 ::Sha256 ;
2020-04-24 18:47:11 +00:00
2022-07-18 16:40:57 +00:00
use opentelemetry ::{
trace ::{ FutureExt as OtelFutureExt , TraceContextExt , Tracer } ,
Context ,
} ;
2022-09-02 11:46:42 +00:00
use garage_rpc ::netapp ::bytes_buf ::BytesBuf ;
2020-04-28 10:18:14 +00:00
use garage_table ::* ;
2022-07-18 15:18:47 +00:00
use garage_util ::async_hash ::* ;
2020-04-24 18:47:11 +00:00
use garage_util ::data ::* ;
2020-11-08 14:04:30 +00:00
use garage_util ::error ::Error as GarageError ;
2021-03-15 15:21:41 +00:00
use garage_util ::time ::* ;
2020-04-24 18:47:11 +00:00
2022-03-15 11:04:12 +00:00
use garage_block ::manager ::INLINE_THRESHOLD ;
2022-06-15 18:20:28 +00:00
use garage_model ::bucket_table ::Bucket ;
2020-07-07 11:59:22 +00:00
use garage_model ::garage ::Garage ;
2022-06-15 18:20:28 +00:00
use garage_model ::index_counter ::CountedItem ;
2022-05-10 11:16:57 +00:00
use garage_model ::s3 ::block_ref_table ::* ;
use garage_model ::s3 ::object_table ::* ;
use garage_model ::s3 ::version_table ::* ;
2020-04-24 18:47:11 +00:00
2022-05-24 10:16:39 +00:00
use crate ::s3 ::error ::* ;
2020-04-26 20:39:32 +00:00
2020-04-24 18:47:11 +00:00
pub async fn handle_put (
garage : Arc < Garage > ,
2020-04-26 20:39:32 +00:00
req : Request < Body > ,
2022-06-15 18:20:28 +00:00
bucket : & Bucket ,
2023-04-18 16:03:10 +00:00
key : & String ,
2022-02-18 16:05:19 +00:00
content_sha256 : Option < Hash > ,
2020-07-07 15:15:53 +00:00
) -> Result < Response < Body > , Error > {
2021-02-19 11:11:02 +00:00
// Retrieve interesting headers from request
2022-02-21 22:02:30 +00:00
let headers = get_headers ( req . headers ( ) ) ? ;
2021-02-19 11:38:22 +00:00
debug! ( " Object headers: {:?} " , headers ) ;
2020-07-15 13:31:13 +00:00
let content_md5 = match req . headers ( ) . get ( " content-md5 " ) {
Some ( x ) = > Some ( x . to_str ( ) ? . to_string ( ) ) ,
None = > None ,
} ;
2020-07-08 15:33:24 +00:00
2022-02-18 16:05:19 +00:00
let ( _head , body ) = req . into_parts ( ) ;
2022-01-17 09:55:31 +00:00
let body = body . map_err ( Error ::from ) ;
2022-02-21 22:02:30 +00:00
save_stream (
garage ,
headers ,
body ,
2022-06-15 18:20:28 +00:00
bucket ,
2022-02-21 22:02:30 +00:00
key ,
content_md5 ,
content_sha256 ,
)
. await
. map ( | ( uuid , md5 ) | put_response ( uuid , md5 ) )
}
pub ( crate ) async fn save_stream < S : Stream < Item = Result < Bytes , Error > > + Unpin > (
garage : Arc < Garage > ,
headers : ObjectVersionHeaders ,
body : S ,
2022-06-15 18:20:28 +00:00
bucket : & Bucket ,
2023-04-18 16:03:10 +00:00
key : & String ,
2022-02-21 22:02:30 +00:00
content_md5 : Option < String > ,
content_sha256 : Option < FixedBytes32 > ,
) -> Result < ( Uuid , String ) , Error > {
2023-04-18 16:03:10 +00:00
let mut chunker = StreamChunker ::new ( body , garage . config . block_size ) ;
let ( first_block_opt , existing_object ) = try_join! (
chunker . next ( ) ,
garage
. object_table
. get ( & bucket . id , key )
. map_err ( Error ::from ) ,
) ? ;
let first_block = first_block_opt . unwrap_or_default ( ) ;
2022-02-21 22:02:30 +00:00
// Generate identity of new version
let version_uuid = gen_uuid ( ) ;
2023-10-20 11:55:34 +00:00
let version_timestamp = next_timestamp ( existing_object . as_ref ( ) ) ;
2020-04-24 18:47:11 +00:00
2021-02-19 11:11:02 +00:00
// If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately.
2020-04-24 18:47:11 +00:00
if first_block . len ( ) < INLINE_THRESHOLD {
2020-07-15 13:31:13 +00:00
let mut md5sum = Md5 ::new ( ) ;
md5sum . update ( & first_block [ .. ] ) ;
2021-03-10 15:33:31 +00:00
let data_md5sum = md5sum . finalize ( ) ;
let data_md5sum_hex = hex ::encode ( data_md5sum ) ;
2020-07-13 14:51:30 +00:00
2021-03-10 15:33:31 +00:00
let data_sha256sum = sha256sum ( & first_block [ .. ] ) ;
2022-06-15 18:20:28 +00:00
let size = first_block . len ( ) as u64 ;
2020-11-22 10:14:46 +00:00
ensure_checksum_matches (
2021-03-10 15:33:31 +00:00
data_md5sum . as_slice ( ) ,
data_sha256sum ,
2020-11-22 10:14:46 +00:00
content_md5 . as_deref ( ) ,
content_sha256 ,
) ? ;
2023-10-20 11:20:47 +00:00
check_quotas ( & garage , bucket , size , existing_object . as_ref ( ) ) . await ? ;
2022-06-15 18:20:28 +00:00
2021-02-19 11:11:02 +00:00
let object_version = ObjectVersion {
uuid : version_uuid ,
timestamp : version_timestamp ,
state : ObjectVersionState ::Complete ( ObjectVersionData ::Inline (
ObjectVersionMeta {
headers ,
2022-06-15 18:20:28 +00:00
size ,
2021-03-10 15:33:31 +00:00
etag : data_md5sum_hex . clone ( ) ,
2021-02-19 11:11:02 +00:00
} ,
2022-09-02 11:46:42 +00:00
first_block . to_vec ( ) ,
2021-02-19 11:11:02 +00:00
) ) ,
} ;
2020-04-24 18:47:11 +00:00
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version ] ) ;
2020-04-24 18:47:11 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-02-19 11:11:02 +00:00
2022-02-21 22:02:30 +00:00
return Ok ( ( version_uuid , data_md5sum_hex ) ) ;
2020-04-24 18:47:11 +00:00
}
2023-01-03 15:48:51 +00:00
// The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait).
2023-06-06 13:18:45 +00:00
let mut interrupted_cleanup = InterruptedCleanup ( Some ( InterruptedCleanupInner {
garage : garage . clone ( ) ,
bucket_id : bucket . id ,
key : key . into ( ) ,
2023-01-03 15:48:51 +00:00
version_uuid ,
version_timestamp ,
2023-06-06 13:18:45 +00:00
} ) ) ;
2023-01-03 15:48:51 +00:00
2021-02-19 11:11:02 +00:00
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
uuid : version_uuid ,
2021-03-15 14:26:29 +00:00
timestamp : version_timestamp ,
2023-05-03 10:02:59 +00:00
state : ObjectVersionState ::Uploading {
headers : headers . clone ( ) ,
multipart : false ,
} ,
2021-02-19 11:11:02 +00:00
} ;
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version . clone ( ) ] ) ;
2020-04-24 18:47:11 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-02-19 11:11:02 +00:00
// Initialize corresponding entry in version table
2021-03-15 14:26:29 +00:00
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
2023-05-03 10:02:59 +00:00
let version = Version ::new (
version_uuid ,
VersionBacklink ::Object {
bucket_id : bucket . id ,
key : key . into ( ) ,
} ,
false ,
) ;
2021-03-15 14:26:29 +00:00
garage . version_table . insert ( & version ) . await ? ;
2021-02-19 11:11:02 +00:00
// Transfer data and verify checksum
2022-07-18 15:18:47 +00:00
let first_block_hash = async_blake2sum ( first_block . clone ( ) ) . await ;
2022-06-15 18:20:28 +00:00
2023-01-03 15:48:51 +00:00
let ( total_size , data_md5sum , data_sha256sum ) = read_and_put_blocks (
& garage ,
& version ,
1 ,
first_block ,
first_block_hash ,
& mut chunker ,
)
. await ? ;
2022-06-15 18:20:28 +00:00
2023-01-03 15:48:51 +00:00
ensure_checksum_matches (
data_md5sum . as_slice ( ) ,
data_sha256sum ,
content_md5 . as_deref ( ) ,
content_sha256 ,
) ? ;
2021-02-19 11:11:02 +00:00
2023-10-20 11:20:47 +00:00
check_quotas ( & garage , bucket , total_size , existing_object . as_ref ( ) ) . await ? ;
2020-11-22 10:04:33 +00:00
2021-02-19 11:11:02 +00:00
// Save final object state, marked as Complete
2023-01-03 15:48:51 +00:00
let md5sum_hex = hex ::encode ( data_md5sum ) ;
2020-11-22 10:04:33 +00:00
object_version . state = ObjectVersionState ::Complete ( ObjectVersionData ::FirstBlock (
ObjectVersionMeta {
headers ,
size : total_size ,
etag : md5sum_hex . clone ( ) ,
} ,
first_block_hash ,
) ) ;
2022-06-15 18:20:28 +00:00
let object = Object ::new ( bucket . id , key . into ( ) , vec! [ object_version ] ) ;
2020-11-22 10:04:33 +00:00
garage . object_table . insert ( & object ) . await ? ;
2023-01-03 15:48:51 +00:00
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
interrupted_cleanup . cancel ( ) ;
2022-02-21 22:02:30 +00:00
Ok ( ( version_uuid , md5sum_hex ) )
2020-11-22 10:04:33 +00:00
}
/// Validate MD5 sum against content-md5 header
/// and sha256sum against signed content-sha256
2023-05-03 10:02:59 +00:00
pub ( crate ) fn ensure_checksum_matches (
2021-03-10 15:33:31 +00:00
data_md5sum : & [ u8 ] ,
data_sha256sum : garage_util ::data ::FixedBytes32 ,
2020-11-22 10:04:33 +00:00
content_md5 : Option < & str > ,
content_sha256 : Option < garage_util ::data ::FixedBytes32 > ,
) -> Result < ( ) , Error > {
2020-07-15 13:31:13 +00:00
if let Some ( expected_sha256 ) = content_sha256 {
2021-03-10 15:33:31 +00:00
if expected_sha256 ! = data_sha256sum {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request (
" Unable to validate x-amz-content-sha256 " ,
2021-04-23 20:18:00 +00:00
) ) ;
2020-07-15 13:41:49 +00:00
} else {
trace! ( " Successfully validated x-amz-content-sha256 " ) ;
2020-07-15 13:31:13 +00:00
}
}
if let Some ( expected_md5 ) = content_md5 {
2023-01-23 19:14:07 +00:00
if expected_md5 . trim_matches ( '"' ) ! = BASE64_STANDARD . encode ( data_md5sum ) {
2022-05-24 10:16:39 +00:00
return Err ( Error ::bad_request ( " Unable to validate content-md5 " ) ) ;
2020-07-15 13:41:49 +00:00
} else {
trace! ( " Successfully validated content-md5 " ) ;
2020-07-15 13:31:13 +00:00
}
}
2020-11-22 10:04:33 +00:00
Ok ( ( ) )
2020-04-26 20:39:32 +00:00
}
2022-06-15 18:20:28 +00:00
/// Check that inserting this object with this size doesn't exceed bucket quotas
2023-05-03 10:02:59 +00:00
pub ( crate ) async fn check_quotas (
2022-06-15 18:20:28 +00:00
garage : & Arc < Garage > ,
bucket : & Bucket ,
size : u64 ,
2023-10-18 14:36:48 +00:00
prev_object : Option < & Object > ,
2022-06-15 18:20:28 +00:00
) -> Result < ( ) , Error > {
let quotas = bucket . state . as_option ( ) . unwrap ( ) . quotas . get ( ) ;
if quotas . max_objects . is_none ( ) & & quotas . max_size . is_none ( ) {
return Ok ( ( ) ) ;
} ;
2023-10-18 14:36:48 +00:00
let counters = garage
. object_counter_table
. table
. get ( & bucket . id , & EmptyKey )
. await ? ;
2022-06-15 18:20:28 +00:00
let counters = counters
. map ( | x | x . filtered_values ( & garage . system . ring . borrow ( ) ) )
. unwrap_or_default ( ) ;
let ( prev_cnt_obj , prev_cnt_size ) = match prev_object {
Some ( o ) = > {
let prev_cnt = o . counts ( ) . into_iter ( ) . collect ::< HashMap < _ , _ > > ( ) ;
(
prev_cnt . get ( OBJECTS ) . cloned ( ) . unwrap_or_default ( ) ,
prev_cnt . get ( BYTES ) . cloned ( ) . unwrap_or_default ( ) ,
)
}
None = > ( 0 , 0 ) ,
} ;
let cnt_obj_diff = 1 - prev_cnt_obj ;
let cnt_size_diff = size as i64 - prev_cnt_size ;
if let Some ( mo ) = quotas . max_objects {
let current_objects = counters . get ( OBJECTS ) . cloned ( ) . unwrap_or_default ( ) ;
if cnt_obj_diff > 0 & & current_objects + cnt_obj_diff > mo as i64 {
return Err ( Error ::forbidden ( format! (
" Object quota is reached, maximum objects for this bucket: {} " ,
mo
) ) ) ;
}
}
if let Some ( ms ) = quotas . max_size {
let current_size = counters . get ( BYTES ) . cloned ( ) . unwrap_or_default ( ) ;
if cnt_size_diff > 0 & & current_size + cnt_size_diff > ms as i64 {
return Err ( Error ::forbidden ( format! (
" Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes. " ,
2023-10-18 14:36:48 +00:00
ms , current_size , cnt_size_diff
2022-06-15 18:20:28 +00:00
) ) ) ;
}
}
Ok ( ( ) )
}
2023-05-03 10:02:59 +00:00
pub ( crate ) async fn read_and_put_blocks < S : Stream < Item = Result < Bytes , Error > > + Unpin > (
2021-03-15 14:26:29 +00:00
garage : & Garage ,
2021-03-10 16:01:05 +00:00
version : & Version ,
2020-04-26 20:39:32 +00:00
part_number : u64 ,
2022-07-18 15:18:47 +00:00
first_block : Bytes ,
2020-04-26 20:39:32 +00:00
first_block_hash : Hash ,
2022-01-17 09:55:31 +00:00
chunker : & mut StreamChunker < S > ,
2020-11-22 10:04:33 +00:00
) -> Result < ( u64 , GenericArray < u8 , typenum ::U16 > , Hash ) , Error > {
2022-07-18 16:40:57 +00:00
let tracer = opentelemetry ::global ::tracer ( " garage " ) ;
2022-07-18 15:18:47 +00:00
let md5hasher = AsyncHasher ::< Md5 > ::new ( ) ;
let sha256hasher = AsyncHasher ::< Sha256 > ::new ( ) ;
2022-07-18 16:40:57 +00:00
futures ::future ::join (
md5hasher . update ( first_block . clone ( ) ) ,
sha256hasher . update ( first_block . clone ( ) ) ,
)
. with_context ( Context ::current_with_span (
tracer . start ( " Hash first block (md5, sha256) " ) ,
) )
. await ;
2020-07-13 14:51:30 +00:00
2020-04-24 18:47:11 +00:00
let mut next_offset = first_block . len ( ) ;
2020-04-28 10:18:14 +00:00
let mut put_curr_version_block = put_block_meta (
2021-10-26 08:20:05 +00:00
garage ,
version ,
2020-04-28 10:18:14 +00:00
part_number ,
0 ,
first_block_hash ,
first_block . len ( ) as u64 ,
) ;
2020-04-24 18:47:11 +00:00
let mut put_curr_block = garage
. block_manager
. rpc_put_block ( first_block_hash , first_block ) ;
loop {
2022-01-17 09:55:31 +00:00
let ( _ , _ , next_block ) = futures ::try_join! (
put_curr_block . map_err ( Error ::from ) ,
put_curr_version_block . map_err ( Error ::from ) ,
chunker . next ( ) ,
) ? ;
2020-04-24 18:47:11 +00:00
if let Some ( block ) = next_block {
2022-07-18 16:40:57 +00:00
let ( _ , _ , block_hash ) = futures ::future ::join3 (
md5hasher . update ( block . clone ( ) ) ,
sha256hasher . update ( block . clone ( ) ) ,
async_blake2sum ( block . clone ( ) ) ,
)
. with_context ( Context ::current_with_span (
tracer . start ( " Hash block (md5, sha256, blake2) " ) ,
) )
. await ;
2020-04-24 18:47:11 +00:00
let block_len = block . len ( ) ;
2020-04-28 10:18:14 +00:00
put_curr_version_block = put_block_meta (
2021-10-26 08:20:05 +00:00
garage ,
version ,
2020-04-28 10:18:14 +00:00
part_number ,
next_offset as u64 ,
block_hash ,
block_len as u64 ,
) ;
2020-04-24 18:47:11 +00:00
put_curr_block = garage . block_manager . rpc_put_block ( block_hash , block ) ;
next_offset + = block_len ;
} else {
break ;
}
}
2020-07-15 13:31:13 +00:00
let total_size = next_offset as u64 ;
2022-07-18 15:18:47 +00:00
let data_md5sum = md5hasher . finalize ( ) . await ;
2020-07-15 13:31:13 +00:00
2022-07-18 15:18:47 +00:00
let data_sha256sum = sha256hasher . finalize ( ) . await ;
2021-03-12 18:57:37 +00:00
let data_sha256sum = Hash ::try_from ( & data_sha256sum [ .. ] ) . unwrap ( ) ;
2020-07-15 13:31:13 +00:00
2021-03-10 15:33:31 +00:00
Ok ( ( total_size , data_md5sum , data_sha256sum ) )
2020-04-24 18:47:11 +00:00
}
async fn put_block_meta (
2021-03-15 14:26:29 +00:00
garage : & Garage ,
2020-04-24 18:47:11 +00:00
version : & Version ,
2020-04-26 18:55:13 +00:00
part_number : u64 ,
2020-04-24 18:47:11 +00:00
offset : u64 ,
hash : Hash ,
2020-04-26 20:39:32 +00:00
size : u64 ,
2020-11-08 14:04:30 +00:00
) -> Result < ( ) , GarageError > {
2020-04-24 18:47:11 +00:00
let mut version = version . clone ( ) ;
2021-03-10 15:21:56 +00:00
version . blocks . put (
VersionBlockKey {
2020-04-26 18:55:13 +00:00
part_number ,
offset ,
2021-03-10 15:21:56 +00:00
} ,
VersionBlock { hash , size } ,
) ;
2020-04-24 18:47:11 +00:00
let block_ref = BlockRef {
block : hash ,
version : version . uuid ,
2021-03-10 15:21:56 +00:00
deleted : false . into ( ) ,
2020-04-24 18:47:11 +00:00
} ;
futures ::try_join! (
garage . version_table . insert ( & version ) ,
garage . block_ref_table . insert ( & block_ref ) ,
) ? ;
Ok ( ( ) )
}
2023-05-03 10:02:59 +00:00
pub ( crate ) struct StreamChunker < S : Stream < Item = Result < Bytes , Error > > > {
2022-01-17 09:55:31 +00:00
stream : S ,
2020-04-24 18:47:11 +00:00
read_all : bool ,
block_size : usize ,
2022-09-02 11:46:42 +00:00
buf : BytesBuf ,
2020-04-24 18:47:11 +00:00
}
2022-01-17 09:55:31 +00:00
impl < S : Stream < Item = Result < Bytes , Error > > + Unpin > StreamChunker < S > {
2023-05-03 10:02:59 +00:00
pub ( crate ) fn new ( stream : S , block_size : usize ) -> Self {
2020-04-24 18:47:11 +00:00
Self {
2022-01-17 09:55:31 +00:00
stream ,
2020-04-24 18:47:11 +00:00
read_all : false ,
block_size ,
2022-09-02 11:46:42 +00:00
buf : BytesBuf ::new ( ) ,
2020-04-24 18:47:11 +00:00
}
}
2022-01-17 09:55:31 +00:00
2023-05-03 10:02:59 +00:00
pub ( crate ) async fn next ( & mut self ) -> Result < Option < Bytes > , Error > {
2022-09-02 11:46:42 +00:00
while ! self . read_all & & self . buf . len ( ) < self . block_size {
2022-01-17 09:55:31 +00:00
if let Some ( block ) = self . stream . next ( ) . await {
2020-04-24 18:47:11 +00:00
let bytes = block ? ;
trace! ( " Body next: {} bytes " , bytes . len ( ) ) ;
2022-09-02 11:46:42 +00:00
self . buf . extend ( bytes ) ;
2020-04-24 18:47:11 +00:00
} else {
self . read_all = true ;
}
}
2022-01-17 09:55:31 +00:00
2022-09-02 11:46:42 +00:00
if self . buf . is_empty ( ) {
2020-04-24 18:47:11 +00:00
Ok ( None )
} else {
2022-09-02 11:46:42 +00:00
Ok ( Some ( self . buf . take_max ( self . block_size ) ) )
2020-04-24 18:47:11 +00:00
}
}
}
2021-05-02 21:13:08 +00:00
pub fn put_response ( version_uuid : Uuid , md5sum_hex : String ) -> Response < Body > {
2020-05-01 14:30:50 +00:00
Response ::builder ( )
. header ( " x-amz-version-id " , hex ::encode ( version_uuid ) )
2020-11-29 15:38:01 +00:00
. header ( " ETag " , format! ( " \" {} \" " , md5sum_hex ) )
2020-07-07 15:15:53 +00:00
. body ( Body ::from ( vec! [ ] ) )
2020-05-01 14:30:50 +00:00
. unwrap ( )
2020-04-26 20:39:32 +00:00
}
2023-06-06 13:18:45 +00:00
struct InterruptedCleanup ( Option < InterruptedCleanupInner > ) ;
struct InterruptedCleanupInner {
garage : Arc < Garage > ,
bucket_id : Uuid ,
key : String ,
version_uuid : Uuid ,
version_timestamp : u64 ,
}
2023-01-03 15:48:51 +00:00
impl InterruptedCleanup {
fn cancel ( & mut self ) {
drop ( self . 0. take ( ) ) ;
}
}
impl Drop for InterruptedCleanup {
fn drop ( & mut self ) {
2023-06-06 13:18:45 +00:00
if let Some ( info ) = self . 0. take ( ) {
2023-01-03 15:48:51 +00:00
tokio ::spawn ( async move {
let object_version = ObjectVersion {
2023-06-06 13:18:45 +00:00
uuid : info . version_uuid ,
timestamp : info . version_timestamp ,
2023-01-03 15:48:51 +00:00
state : ObjectVersionState ::Aborted ,
} ;
2023-06-06 13:18:45 +00:00
let object = Object ::new ( info . bucket_id , info . key , vec! [ object_version ] ) ;
if let Err ( e ) = info . garage . object_table . insert ( & object ) . await {
2023-01-03 15:48:51 +00:00
warn! ( " Cannot cleanup after aborted PutObject: {} " , e ) ;
}
} ) ;
}
}
}
2023-05-03 10:02:59 +00:00
// ============ helpers ============
2021-02-19 11:11:02 +00:00
2023-05-03 10:02:59 +00:00
pub ( crate ) fn get_mime_type ( headers : & HeaderMap < HeaderValue > ) -> Result < String , Error > {
2022-02-21 22:02:30 +00:00
Ok ( headers
2020-04-26 20:39:32 +00:00
. get ( hyper ::header ::CONTENT_TYPE )
. map ( | x | x . to_str ( ) )
. unwrap_or ( Ok ( " blob " ) ) ?
. to_string ( ) )
}
2022-02-21 22:02:30 +00:00
pub ( crate ) fn get_headers ( headers : & HeaderMap < HeaderValue > ) -> Result < ObjectVersionHeaders , Error > {
let content_type = get_mime_type ( headers ) ? ;
2021-03-15 15:21:41 +00:00
let mut other = BTreeMap ::new ( ) ;
// Preserve standard headers
let standard_header = vec! [
2020-07-09 15:04:43 +00:00
hyper ::header ::CACHE_CONTROL ,
hyper ::header ::CONTENT_DISPOSITION ,
hyper ::header ::CONTENT_ENCODING ,
hyper ::header ::CONTENT_LANGUAGE ,
hyper ::header ::EXPIRES ,
] ;
2021-03-15 15:21:41 +00:00
for h in standard_header . iter ( ) {
2022-02-21 22:02:30 +00:00
if let Some ( v ) = headers . get ( h ) {
2021-02-19 11:11:02 +00:00
match v . to_str ( ) {
Ok ( v_str ) = > {
other . insert ( h . to_string ( ) , v_str . to_string ( ) ) ;
}
Err ( e ) = > {
warn! ( " Discarding header {}, error in .to_str(): {} " , h , e ) ;
}
2020-07-09 15:04:43 +00:00
}
}
}
2021-03-15 15:21:41 +00:00
// Preserve x-amz-meta- headers
2022-02-21 22:02:30 +00:00
for ( k , v ) in headers . iter ( ) {
2021-03-15 15:21:41 +00:00
if k . as_str ( ) . starts_with ( " x-amz-meta- " ) {
match v . to_str ( ) {
Ok ( v_str ) = > {
other . insert ( k . to_string ( ) , v_str . to_string ( ) ) ;
}
Err ( e ) = > {
warn! ( " Discarding header {}, error in .to_str(): {} " , k , e ) ;
}
}
}
}
2020-07-09 15:04:43 +00:00
Ok ( ObjectVersionHeaders {
content_type ,
2021-02-19 11:38:22 +00:00
other ,
2020-07-09 15:04:43 +00:00
} )
}
2023-10-20 11:37:37 +00:00
2023-10-20 11:55:34 +00:00
pub ( crate ) fn next_timestamp ( existing_object : Option < & Object > ) -> u64 {
2023-10-20 11:37:37 +00:00
existing_object
. as_ref ( )
. and_then ( | obj | obj . versions ( ) . iter ( ) . map ( | v | v . timestamp ) . max ( ) )
. map ( | t | std ::cmp ::max ( t + 1 , now_msec ( ) ) )
. unwrap_or_else ( now_msec )
}