2024-03-06 19:23:36 +01:00
use std ::collections ::HashMap ;
2020-04-24 18:47:11 +00:00
use std ::sync ::Arc ;
2022-02-18 17:05:19 +01:00
use futures ::prelude ::* ;
2024-02-26 18:21:17 +01:00
use futures ::stream ::FuturesOrdered ;
2023-04-18 18:03:10 +02:00
use futures ::try_join ;
2020-04-24 18:47:11 +00:00
2024-02-26 17:22:16 +01:00
use tokio ::sync ::mpsc ;
2024-02-08 18:57:18 +01:00
use hyper ::body ::Bytes ;
2024-02-07 15:25:49 +01:00
use hyper ::header ::{ HeaderMap , HeaderValue } ;
use hyper ::{ Request , Response } ;
2022-07-18 18:40:57 +02:00
use opentelemetry ::{
trace ::{ FutureExt as OtelFutureExt , TraceContextExt , Tracer } ,
Context ,
} ;
2024-02-13 12:55:41 +01:00
use garage_net ::bytes_buf ::BytesBuf ;
2024-02-26 18:34:52 +01:00
use garage_rpc ::rpc_helper ::OrderTag ;
2020-04-28 10:18:14 +00:00
use garage_table ::* ;
2020-04-24 18:47:11 +00:00
use garage_util ::data ::* ;
2020-11-08 15:04:30 +01:00
use garage_util ::error ::Error as GarageError ;
2021-03-15 16:21:41 +01:00
use garage_util ::time ::* ;
2020-04-24 18:47:11 +00:00
2022-03-15 12:04:12 +01:00
use garage_block ::manager ::INLINE_THRESHOLD ;
2020-07-07 13:59:22 +02:00
use garage_model ::garage ::Garage ;
2022-06-15 20:20:28 +02:00
use garage_model ::index_counter ::CountedItem ;
2022-05-10 13:16:57 +02:00
use garage_model ::s3 ::block_ref_table ::* ;
use garage_model ::s3 ::object_table ::* ;
use garage_model ::s3 ::version_table ::* ;
2020-04-24 18:47:11 +00:00
2024-02-05 18:49:54 +01:00
use crate ::helpers ::* ;
use crate ::s3 ::api_server ::{ ReqBody , ResBody } ;
2024-03-21 14:06:59 +01:00
use crate ::s3 ::checksum ::* ;
2024-02-23 16:49:50 +01:00
use crate ::s3 ::encryption ::EncryptionParams ;
2022-05-24 12:16:39 +02:00
use crate ::s3 ::error ::* ;
2020-04-26 20:39:32 +00:00
2024-02-26 18:21:17 +01:00
const PUT_BLOCKS_MAX_PARALLEL : usize = 3 ;
2024-03-21 14:06:59 +01:00
pub ( crate ) struct SaveStreamResult {
pub ( crate ) version_uuid : Uuid ,
pub ( crate ) version_timestamp : u64 ,
2024-02-23 16:49:50 +01:00
/// Etag WITHOUT THE QUOTES (just the hex value)
2024-03-21 14:06:59 +01:00
pub ( crate ) etag : String ,
}
pub ( crate ) enum ChecksumMode < ' a > {
Verify ( & ' a ExpectedChecksums ) ,
Calculate ( Option < ChecksumAlgorithm > ) ,
2024-02-23 16:49:50 +01:00
}
2020-04-24 18:47:11 +00:00
pub async fn handle_put (
2024-03-03 14:56:52 +01:00
ctx : ReqCtx ,
2024-02-05 18:49:54 +01:00
req : Request < ReqBody > ,
2023-04-18 18:03:10 +02:00
key : & String ,
2022-02-18 17:05:19 +01:00
content_sha256 : Option < Hash > ,
2024-02-05 18:49:54 +01:00
) -> Result < Response < ResBody > , Error > {
2021-02-19 12:11:02 +01:00
// Retrieve interesting headers from request
2022-02-21 23:02:30 +01:00
let headers = get_headers ( req . headers ( ) ) ? ;
2021-02-19 12:38:22 +01:00
debug! ( " Object headers: {:?} " , headers ) ;
2024-03-21 14:06:59 +01:00
let expected_checksums = ExpectedChecksums {
md5 : match req . headers ( ) . get ( " content-md5 " ) {
Some ( x ) = > Some ( x . to_str ( ) ? . to_string ( ) ) ,
None = > None ,
} ,
sha256 : content_sha256 ,
extra : request_checksum_value ( req . headers ( ) ) ? ,
} ;
2024-02-23 16:49:50 +01:00
2024-03-21 14:06:59 +01:00
let meta = ObjectVersionMetaInner {
headers ,
checksum : expected_checksums . extra ,
2020-07-15 15:31:13 +02:00
} ;
2020-07-08 17:33:24 +02:00
2024-03-21 14:06:59 +01:00
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams ::new_from_headers ( & ctx . garage , req . headers ( ) ) ? ;
2024-02-07 15:25:49 +01:00
let stream = body_stream ( req . into_body ( ) ) ;
2022-01-17 10:55:31 +01:00
2024-02-23 16:49:50 +01:00
let res = save_stream (
& ctx ,
2024-03-21 14:06:59 +01:00
meta ,
2024-02-23 16:49:50 +01:00
encryption ,
stream ,
key ,
2024-03-21 14:06:59 +01:00
ChecksumMode ::Verify ( & expected_checksums ) ,
2024-02-23 16:49:50 +01:00
)
. await ? ;
let mut resp = Response ::builder ( )
. header ( " x-amz-version-id " , hex ::encode ( res . version_uuid ) )
. header ( " ETag " , format! ( " \" {} \" " , res . etag ) ) ;
encryption . add_response_headers ( & mut resp ) ;
2024-03-21 14:06:59 +01:00
let resp = add_checksum_response_headers ( & expected_checksums . extra , resp ) ;
2024-02-23 16:49:50 +01:00
Ok ( resp . body ( empty_body ( ) ) ? )
2022-02-21 23:02:30 +01:00
}
pub ( crate ) async fn save_stream < S : Stream < Item = Result < Bytes , Error > > + Unpin > (
2024-03-03 14:56:52 +01:00
ctx : & ReqCtx ,
2024-03-21 14:06:59 +01:00
mut meta : ObjectVersionMetaInner ,
2024-02-23 16:49:50 +01:00
encryption : EncryptionParams ,
2022-02-21 23:02:30 +01:00
body : S ,
2023-04-18 18:03:10 +02:00
key : & String ,
2024-03-21 14:06:59 +01:00
checksum_mode : ChecksumMode < '_ > ,
2024-02-23 16:49:50 +01:00
) -> Result < SaveStreamResult , Error > {
2024-03-03 14:56:52 +01:00
let ReqCtx {
garage , bucket_id , ..
} = ctx ;
2023-04-18 18:03:10 +02:00
let mut chunker = StreamChunker ::new ( body , garage . config . block_size ) ;
let ( first_block_opt , existing_object ) = try_join! (
chunker . next ( ) ,
2024-03-03 14:56:52 +01:00
garage . object_table . get ( bucket_id , key ) . map_err ( Error ::from ) ,
2023-04-18 18:03:10 +02:00
) ? ;
let first_block = first_block_opt . unwrap_or_default ( ) ;
2022-02-21 23:02:30 +01:00
// Generate identity of new version
let version_uuid = gen_uuid ( ) ;
2023-10-20 13:55:34 +02:00
let version_timestamp = next_timestamp ( existing_object . as_ref ( ) ) ;
2020-04-24 18:47:11 +00:00
2024-03-21 14:06:59 +01:00
let mut checksummer = match checksum_mode {
ChecksumMode ::Verify ( expected ) = > Checksummer ::init ( expected , ! encryption . is_encrypted ( ) ) ,
ChecksumMode ::Calculate ( algo ) = > {
Checksummer ::init ( & Default ::default ( ) , ! encryption . is_encrypted ( ) ) . add ( algo )
}
} ;
2021-02-19 12:11:02 +01:00
// If body is small enough, store it directly in the object table
// as "inline data". We can then return immediately.
2020-04-24 18:47:11 +00:00
if first_block . len ( ) < INLINE_THRESHOLD {
2024-03-21 14:06:59 +01:00
checksummer . update ( & first_block ) ;
let checksums = checksummer . finalize ( ) ;
2020-11-22 11:14:46 +01:00
2024-03-21 14:06:59 +01:00
match checksum_mode {
ChecksumMode ::Verify ( expected ) = > {
checksums . verify ( & expected ) ? ;
}
ChecksumMode ::Calculate ( algo ) = > {
meta . checksum = checksums . extract ( algo ) ;
}
} ;
2020-11-22 11:14:46 +01:00
2024-02-23 16:49:50 +01:00
let size = first_block . len ( ) as u64 ;
2024-03-03 14:56:52 +01:00
check_quotas ( ctx , size , existing_object . as_ref ( ) ) . await ? ;
2022-06-15 20:20:28 +02:00
2024-03-21 14:06:59 +01:00
let etag = encryption . etag_from_md5 ( & checksums . md5 ) ;
2024-02-23 16:49:50 +01:00
let inline_data = encryption . encrypt_blob ( & first_block ) ? . to_vec ( ) ;
2021-02-19 12:11:02 +01:00
let object_version = ObjectVersion {
uuid : version_uuid ,
timestamp : version_timestamp ,
state : ObjectVersionState ::Complete ( ObjectVersionData ::Inline (
ObjectVersionMeta {
2024-03-21 14:06:59 +01:00
encryption : encryption . encrypt_meta ( meta ) ? ,
2022-06-15 20:20:28 +02:00
size ,
2024-02-23 16:49:50 +01:00
etag : etag . clone ( ) ,
2021-02-19 12:11:02 +01:00
} ,
2024-02-23 16:49:50 +01:00
inline_data ,
2021-02-19 12:11:02 +01:00
) ) ,
} ;
2020-04-24 18:47:11 +00:00
2024-03-03 14:56:52 +01:00
let object = Object ::new ( * bucket_id , key . into ( ) , vec! [ object_version ] ) ;
2020-04-24 18:47:11 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-02-19 12:11:02 +01:00
2024-02-23 16:49:50 +01:00
return Ok ( SaveStreamResult {
version_uuid ,
version_timestamp ,
etag ,
} ) ;
2020-04-24 18:47:11 +00:00
}
2023-01-03 16:48:51 +01:00
// The following consists in many steps that can each fail.
// Keep track that some cleanup will be needed if things fail
// before everything is finished (cleanup is done using the Drop trait).
2023-06-06 15:18:45 +02:00
let mut interrupted_cleanup = InterruptedCleanup ( Some ( InterruptedCleanupInner {
garage : garage . clone ( ) ,
2024-03-03 14:56:52 +01:00
bucket_id : * bucket_id ,
2023-06-06 15:18:45 +02:00
key : key . into ( ) ,
2023-01-03 16:48:51 +01:00
version_uuid ,
version_timestamp ,
2023-06-06 15:18:45 +02:00
} ) ) ;
2023-01-03 16:48:51 +01:00
2021-02-19 12:11:02 +01:00
// Write version identifier in object table so that we have a trace
// that we are uploading something
let mut object_version = ObjectVersion {
uuid : version_uuid ,
2021-03-15 15:26:29 +01:00
timestamp : version_timestamp ,
2023-05-03 12:02:59 +02:00
state : ObjectVersionState ::Uploading {
2024-03-21 14:06:59 +01:00
encryption : encryption . encrypt_meta ( meta . clone ( ) ) ? ,
checksum_algorithm : None , // don't care; overwritten later
2023-05-03 12:02:59 +02:00
multipart : false ,
} ,
2021-02-19 12:11:02 +01:00
} ;
2024-03-03 14:56:52 +01:00
let object = Object ::new ( * bucket_id , key . into ( ) , vec! [ object_version . clone ( ) ] ) ;
2020-04-24 18:47:11 +00:00
garage . object_table . insert ( & object ) . await ? ;
2021-02-19 12:11:02 +01:00
// Initialize corresponding entry in version table
2021-03-15 15:26:29 +01:00
// Write this entry now, even with empty block list,
// to prevent block_ref entries from being deleted (they can be deleted
// if the reference a version that isn't found in the version table)
2023-05-03 12:02:59 +02:00
let version = Version ::new (
version_uuid ,
VersionBacklink ::Object {
2024-03-03 14:56:52 +01:00
bucket_id : * bucket_id ,
2023-05-03 12:02:59 +02:00
key : key . into ( ) ,
} ,
false ,
) ;
2021-03-15 15:26:29 +01:00
garage . version_table . insert ( & version ) . await ? ;
2021-02-19 12:11:02 +01:00
2024-03-21 14:06:59 +01:00
// Transfer data
let ( total_size , checksums , first_block_hash ) = read_and_put_blocks (
ctx ,
& version ,
encryption ,
1 ,
first_block ,
& mut chunker ,
checksummer ,
)
. await ? ;
2022-06-15 20:20:28 +02:00
2024-03-21 14:06:59 +01:00
// Verify checksums are ok / add calculated checksum to metadata
match checksum_mode {
ChecksumMode ::Verify ( expected ) = > {
checksums . verify ( & expected ) ? ;
}
ChecksumMode ::Calculate ( algo ) = > {
meta . checksum = checksums . extract ( algo ) ;
}
} ;
2021-02-19 12:11:02 +01:00
2024-03-21 14:06:59 +01:00
// Verify quotas are respsected
2024-03-03 14:56:52 +01:00
check_quotas ( ctx , total_size , existing_object . as_ref ( ) ) . await ? ;
2020-11-22 11:04:33 +01:00
2021-02-19 12:11:02 +01:00
// Save final object state, marked as Complete
2024-03-21 14:06:59 +01:00
let etag = encryption . etag_from_md5 ( & checksums . md5 ) ;
2024-02-23 16:49:50 +01:00
2020-11-22 11:04:33 +01:00
object_version . state = ObjectVersionState ::Complete ( ObjectVersionData ::FirstBlock (
ObjectVersionMeta {
2024-03-21 14:06:59 +01:00
encryption : encryption . encrypt_meta ( meta ) ? ,
2020-11-22 11:04:33 +01:00
size : total_size ,
2024-02-23 16:49:50 +01:00
etag : etag . clone ( ) ,
2020-11-22 11:04:33 +01:00
} ,
first_block_hash ,
) ) ;
2024-03-03 14:56:52 +01:00
let object = Object ::new ( * bucket_id , key . into ( ) , vec! [ object_version ] ) ;
2020-11-22 11:04:33 +01:00
garage . object_table . insert ( & object ) . await ? ;
2023-01-03 16:48:51 +01:00
// We were not interrupted, everything went fine.
// We won't have to clean up on drop.
interrupted_cleanup . cancel ( ) ;
2024-02-23 16:49:50 +01:00
Ok ( SaveStreamResult {
version_uuid ,
version_timestamp ,
etag ,
} )
2020-11-22 11:04:33 +01:00
}
2022-06-15 20:20:28 +02:00
/// Check that inserting this object with this size doesn't exceed bucket quotas
2023-05-03 12:02:59 +02:00
pub ( crate ) async fn check_quotas (
2024-03-03 14:56:52 +01:00
ctx : & ReqCtx ,
2022-06-15 20:20:28 +02:00
size : u64 ,
2023-10-18 16:36:48 +02:00
prev_object : Option < & Object > ,
2022-06-15 20:20:28 +02:00
) -> Result < ( ) , Error > {
2024-03-03 14:56:52 +01:00
let ReqCtx {
garage ,
bucket_id ,
bucket_params ,
..
} = ctx ;
let quotas = bucket_params . quotas . get ( ) ;
2022-06-15 20:20:28 +02:00
if quotas . max_objects . is_none ( ) & & quotas . max_size . is_none ( ) {
return Ok ( ( ) ) ;
} ;
2023-10-18 16:36:48 +02:00
let counters = garage
. object_counter_table
. table
2024-03-03 14:56:52 +01:00
. get ( bucket_id , & EmptyKey )
2023-10-18 16:36:48 +02:00
. await ? ;
2022-06-15 20:20:28 +02:00
let counters = counters
2023-11-08 16:41:00 +01:00
. map ( | x | x . filtered_values ( & garage . system . cluster_layout ( ) ) )
2022-06-15 20:20:28 +02:00
. unwrap_or_default ( ) ;
let ( prev_cnt_obj , prev_cnt_size ) = match prev_object {
Some ( o ) = > {
let prev_cnt = o . counts ( ) . into_iter ( ) . collect ::< HashMap < _ , _ > > ( ) ;
(
prev_cnt . get ( OBJECTS ) . cloned ( ) . unwrap_or_default ( ) ,
prev_cnt . get ( BYTES ) . cloned ( ) . unwrap_or_default ( ) ,
)
}
None = > ( 0 , 0 ) ,
} ;
let cnt_obj_diff = 1 - prev_cnt_obj ;
let cnt_size_diff = size as i64 - prev_cnt_size ;
if let Some ( mo ) = quotas . max_objects {
let current_objects = counters . get ( OBJECTS ) . cloned ( ) . unwrap_or_default ( ) ;
if cnt_obj_diff > 0 & & current_objects + cnt_obj_diff > mo as i64 {
return Err ( Error ::forbidden ( format! (
" Object quota is reached, maximum objects for this bucket: {} " ,
mo
) ) ) ;
}
}
if let Some ( ms ) = quotas . max_size {
let current_size = counters . get ( BYTES ) . cloned ( ) . unwrap_or_default ( ) ;
if cnt_size_diff > 0 & & current_size + cnt_size_diff > ms as i64 {
return Err ( Error ::forbidden ( format! (
" Bucket size quota is reached, maximum total size of objects for this bucket: {}. The bucket is already {} bytes, and this object would add {} bytes. " ,
2023-10-18 16:36:48 +02:00
ms , current_size , cnt_size_diff
2022-06-15 20:20:28 +02:00
) ) ) ;
}
}
Ok ( ( ) )
}
2023-05-03 12:02:59 +02:00
pub ( crate ) async fn read_and_put_blocks < S : Stream < Item = Result < Bytes , Error > > + Unpin > (
2024-03-03 14:56:52 +01:00
ctx : & ReqCtx ,
2021-03-10 17:01:05 +01:00
version : & Version ,
2024-02-23 16:49:50 +01:00
encryption : EncryptionParams ,
2020-04-26 20:39:32 +00:00
part_number : u64 ,
2022-07-18 17:18:47 +02:00
first_block : Bytes ,
2022-01-17 10:55:31 +01:00
chunker : & mut StreamChunker < S > ,
2024-03-21 14:06:59 +01:00
checksummer : Checksummer ,
) -> Result < ( u64 , Checksums , Hash ) , Error > {
2022-07-18 18:40:57 +02:00
let tracer = opentelemetry ::global ::tracer ( " garage " ) ;
2024-02-26 17:22:16 +01:00
let ( block_tx , mut block_rx ) = mpsc ::channel ::< Result < Bytes , Error > > ( 2 ) ;
let read_blocks = async {
block_tx . send ( Ok ( first_block ) ) . await ? ;
loop {
let res = chunker
. next ( )
. with_context ( Context ::current_with_span (
tracer . start ( " Read block from client " ) ,
) )
. await ;
match res {
Ok ( Some ( block ) ) = > block_tx . send ( Ok ( block ) ) . await ? ,
Ok ( None ) = > break ,
Err ( e ) = > {
block_tx . send ( Err ( e ) ) . await ? ;
break ;
}
}
}
drop ( block_tx ) ;
Ok ::< _ , mpsc ::error ::SendError < _ > > ( ( ) )
} ;
2022-07-18 18:40:57 +02:00
2024-02-26 17:22:16 +01:00
let ( block_tx2 , mut block_rx2 ) = mpsc ::channel ::< Result < Bytes , Error > > ( 1 ) ;
let hash_stream = async {
2024-03-21 14:06:59 +01:00
let mut checksummer = checksummer ;
2024-02-26 17:22:16 +01:00
while let Some ( next ) = block_rx . recv ( ) . await {
match next {
Ok ( block ) = > {
block_tx2 . send ( Ok ( block . clone ( ) ) ) . await ? ;
2024-03-21 14:06:59 +01:00
checksummer = tokio ::task ::spawn_blocking ( move | | {
checksummer . update ( & block ) ;
checksummer
} )
2024-02-26 17:22:16 +01:00
. with_context ( Context ::current_with_span (
tracer . start ( " Hash block (md5, sha256) " ) ,
) )
2024-03-21 14:06:59 +01:00
. await
. unwrap ( )
2024-02-26 17:22:16 +01:00
}
Err ( e ) = > {
block_tx2 . send ( Err ( e ) ) . await ? ;
break ;
}
}
}
drop ( block_tx2 ) ;
2024-03-21 14:06:59 +01:00
Ok ::< _ , mpsc ::error ::SendError < _ > > ( checksummer )
2024-02-26 17:22:16 +01:00
} ;
2020-07-13 16:51:30 +02:00
2024-02-23 16:49:50 +01:00
let ( block_tx3 , mut block_rx3 ) = mpsc ::channel ::< Result < ( Bytes , u64 , Hash ) , Error > > ( 1 ) ;
let encrypt_hash_blocks = async {
2024-02-26 17:22:16 +01:00
let mut first_block_hash = None ;
while let Some ( next ) = block_rx2 . recv ( ) . await {
match next {
Ok ( block ) = > {
2024-02-23 16:49:50 +01:00
let unencrypted_len = block . len ( ) as u64 ;
2024-03-21 14:06:59 +01:00
let res = tokio ::task ::spawn_blocking ( move | | {
let block = encryption . encrypt_block ( block ) ? ;
let hash = blake2sum ( & block ) ;
Ok ( ( block , hash ) )
} )
. with_context ( Context ::current_with_span (
tracer . start ( " Encrypt and hash (blake2) block " ) ,
) )
. await
. unwrap ( ) ;
match res {
Ok ( ( block , hash ) ) = > {
if first_block_hash . is_none ( ) {
first_block_hash = Some ( hash ) ;
2024-02-23 16:49:50 +01:00
}
2024-03-21 14:06:59 +01:00
block_tx3 . send ( Ok ( ( block , unencrypted_len , hash ) ) ) . await ? ;
}
Err ( e ) = > {
block_tx3 . send ( Err ( e ) ) . await ? ;
break ;
2024-02-23 16:49:50 +01:00
}
2024-02-26 17:22:16 +01:00
}
}
Err ( e ) = > {
block_tx3 . send ( Err ( e ) ) . await ? ;
break ;
}
}
2020-04-24 18:47:11 +00:00
}
2024-02-26 17:22:16 +01:00
drop ( block_tx3 ) ;
Ok ::< _ , mpsc ::error ::SendError < _ > > ( first_block_hash . unwrap ( ) )
} ;
2020-04-24 18:47:11 +00:00
2024-02-26 17:22:16 +01:00
let put_blocks = async {
2024-02-26 18:21:17 +01:00
// Structure for handling several concurrent writes to storage nodes
2024-02-26 18:34:52 +01:00
let order_stream = OrderTag ::stream ( ) ;
2024-02-26 18:21:17 +01:00
let mut write_futs = FuturesOrdered ::new ( ) ;
2024-02-26 17:22:16 +01:00
let mut written_bytes = 0 u64 ;
2024-02-26 18:21:17 +01:00
loop {
// Simultaneously write blocks to storage nodes & await for next block to be written
let currently_running = write_futs . len ( ) ;
let write_futs_next = async {
if write_futs . is_empty ( ) {
futures ::future ::pending ( ) . await
} else {
write_futs . next ( ) . await . unwrap ( )
}
} ;
let recv_next = async {
// If more than a maximum number of writes are in progress, don't add more for now
if currently_running > = PUT_BLOCKS_MAX_PARALLEL {
futures ::future ::pending ( ) . await
} else {
block_rx3 . recv ( ) . await
}
} ;
2024-02-23 16:49:50 +01:00
let ( block , unencrypted_len , hash ) = tokio ::select! {
2024-02-26 18:21:17 +01:00
result = write_futs_next = > {
result ? ;
continue ;
} ,
recv = recv_next = > match recv {
Some ( next ) = > next ? ,
None = > break ,
} ,
} ;
// For next block to be written: count its size and spawn future to write it
write_futs . push_back ( put_block_and_meta (
2024-03-03 14:56:52 +01:00
ctx ,
2021-10-26 10:20:05 +02:00
version ,
2020-04-28 10:18:14 +00:00
part_number ,
2024-02-23 16:49:50 +01:00
written_bytes ,
2024-02-26 18:21:17 +01:00
hash ,
block ,
2024-02-23 16:49:50 +01:00
unencrypted_len ,
encryption . is_encrypted ( ) ,
2024-02-26 18:34:52 +01:00
order_stream . order ( written_bytes ) ,
2024-02-26 18:21:17 +01:00
) ) ;
2024-02-23 16:49:50 +01:00
written_bytes + = unencrypted_len ;
2020-04-24 18:47:11 +00:00
}
2024-02-26 18:21:17 +01:00
while let Some ( res ) = write_futs . next ( ) . await {
res ? ;
2024-02-26 17:22:16 +01:00
}
Ok ::< _ , Error > ( written_bytes )
} ;
let ( _ , stream_hash_result , block_hash_result , final_result ) =
2024-02-23 16:49:50 +01:00
futures ::join! ( read_blocks , hash_stream , encrypt_hash_blocks , put_blocks ) ;
2020-04-24 18:47:11 +00:00
2024-02-26 17:22:16 +01:00
let total_size = final_result ? ;
// unwrap here is ok, because if hasher failed, it is because something failed
// later in the pipeline which already caused a return at the ? on previous line
let first_block_hash = block_hash_result . unwrap ( ) ;
2024-03-21 14:06:59 +01:00
let checksums = stream_hash_result . unwrap ( ) . finalize ( ) ;
2020-07-15 15:31:13 +02:00
2024-03-21 14:06:59 +01:00
Ok ( ( total_size , checksums , first_block_hash ) )
2020-04-24 18:47:11 +00:00
}
2024-02-26 17:22:16 +01:00
async fn put_block_and_meta (
2024-03-03 14:56:52 +01:00
ctx : & ReqCtx ,
2020-04-24 18:47:11 +00:00
version : & Version ,
2020-04-26 18:55:13 +00:00
part_number : u64 ,
2020-04-24 18:47:11 +00:00
offset : u64 ,
hash : Hash ,
2024-02-26 17:22:16 +01:00
block : Bytes ,
2024-02-23 16:49:50 +01:00
size : u64 ,
is_encrypted : bool ,
2024-02-26 18:34:52 +01:00
order_tag : OrderTag ,
2020-11-08 15:04:30 +01:00
) -> Result < ( ) , GarageError > {
2024-03-03 14:56:52 +01:00
let ReqCtx { garage , .. } = ctx ;
2020-04-24 18:47:11 +00:00
let mut version = version . clone ( ) ;
2021-03-10 16:21:56 +01:00
version . blocks . put (
VersionBlockKey {
2020-04-26 18:55:13 +00:00
part_number ,
offset ,
2021-03-10 16:21:56 +01:00
} ,
2024-02-23 16:49:50 +01:00
VersionBlock { hash , size } ,
2021-03-10 16:21:56 +01:00
) ;
2020-04-24 18:47:11 +00:00
let block_ref = BlockRef {
block : hash ,
version : version . uuid ,
2021-03-10 16:21:56 +01:00
deleted : false . into ( ) ,
2020-04-24 18:47:11 +00:00
} ;
futures ::try_join! (
2024-02-26 18:34:52 +01:00
garage
. block_manager
2024-02-23 16:49:50 +01:00
. rpc_put_block ( hash , block , is_encrypted , Some ( order_tag ) ) ,
2020-04-24 18:47:11 +00:00
garage . version_table . insert ( & version ) ,
garage . block_ref_table . insert ( & block_ref ) ,
) ? ;
Ok ( ( ) )
}
2023-05-03 12:02:59 +02:00
pub ( crate ) struct StreamChunker < S : Stream < Item = Result < Bytes , Error > > > {
2022-01-17 10:55:31 +01:00
stream : S ,
2020-04-24 18:47:11 +00:00
read_all : bool ,
block_size : usize ,
2022-09-02 13:46:42 +02:00
buf : BytesBuf ,
2020-04-24 18:47:11 +00:00
}
2022-01-17 10:55:31 +01:00
impl < S : Stream < Item = Result < Bytes , Error > > + Unpin > StreamChunker < S > {
2023-05-03 12:02:59 +02:00
pub ( crate ) fn new ( stream : S , block_size : usize ) -> Self {
2020-04-24 18:47:11 +00:00
Self {
2022-01-17 10:55:31 +01:00
stream ,
2020-04-24 18:47:11 +00:00
read_all : false ,
block_size ,
2022-09-02 13:46:42 +02:00
buf : BytesBuf ::new ( ) ,
2020-04-24 18:47:11 +00:00
}
}
2022-01-17 10:55:31 +01:00
2023-05-03 12:02:59 +02:00
pub ( crate ) async fn next ( & mut self ) -> Result < Option < Bytes > , Error > {
2022-09-02 13:46:42 +02:00
while ! self . read_all & & self . buf . len ( ) < self . block_size {
2022-01-17 10:55:31 +01:00
if let Some ( block ) = self . stream . next ( ) . await {
2020-04-24 18:47:11 +00:00
let bytes = block ? ;
trace! ( " Body next: {} bytes " , bytes . len ( ) ) ;
2022-09-02 13:46:42 +02:00
self . buf . extend ( bytes ) ;
2020-04-24 18:47:11 +00:00
} else {
self . read_all = true ;
}
}
2022-01-17 10:55:31 +01:00
2022-09-02 13:46:42 +02:00
if self . buf . is_empty ( ) {
2020-04-24 18:47:11 +00:00
Ok ( None )
} else {
2022-09-02 13:46:42 +02:00
Ok ( Some ( self . buf . take_max ( self . block_size ) ) )
2020-04-24 18:47:11 +00:00
}
}
}
2023-06-06 15:18:45 +02:00
struct InterruptedCleanup ( Option < InterruptedCleanupInner > ) ;
struct InterruptedCleanupInner {
garage : Arc < Garage > ,
bucket_id : Uuid ,
key : String ,
version_uuid : Uuid ,
version_timestamp : u64 ,
}
2023-01-03 16:48:51 +01:00
impl InterruptedCleanup {
fn cancel ( & mut self ) {
drop ( self . 0. take ( ) ) ;
}
}
impl Drop for InterruptedCleanup {
fn drop ( & mut self ) {
2023-06-06 15:18:45 +02:00
if let Some ( info ) = self . 0. take ( ) {
2023-01-03 16:48:51 +01:00
tokio ::spawn ( async move {
let object_version = ObjectVersion {
2023-06-06 15:18:45 +02:00
uuid : info . version_uuid ,
timestamp : info . version_timestamp ,
2023-01-03 16:48:51 +01:00
state : ObjectVersionState ::Aborted ,
} ;
2023-06-06 15:18:45 +02:00
let object = Object ::new ( info . bucket_id , info . key , vec! [ object_version ] ) ;
if let Err ( e ) = info . garage . object_table . insert ( & object ) . await {
2023-01-03 16:48:51 +01:00
warn! ( " Cannot cleanup after aborted PutObject: {} " , e ) ;
}
} ) ;
}
}
}
2023-05-03 12:02:59 +02:00
// ============ helpers ============
2021-02-19 12:11:02 +01:00
2024-03-21 14:06:59 +01:00
pub ( crate ) fn get_headers ( headers : & HeaderMap < HeaderValue > ) -> Result < HeaderList , Error > {
2024-03-06 19:23:36 +01:00
let mut ret = Vec ::new ( ) ;
2021-03-15 16:21:41 +01:00
// Preserve standard headers
let standard_header = vec! [
2024-03-06 19:23:36 +01:00
hyper ::header ::CONTENT_TYPE ,
2020-07-09 17:04:43 +02:00
hyper ::header ::CACHE_CONTROL ,
hyper ::header ::CONTENT_DISPOSITION ,
hyper ::header ::CONTENT_ENCODING ,
hyper ::header ::CONTENT_LANGUAGE ,
hyper ::header ::EXPIRES ,
] ;
2024-03-06 19:23:36 +01:00
for name in standard_header . iter ( ) {
if let Some ( value ) = headers . get ( name ) {
ret . push ( ( name . to_string ( ) , value . to_str ( ) ? . to_string ( ) ) ) ;
2020-07-09 17:04:43 +02:00
}
}
2021-03-15 16:21:41 +01:00
// Preserve x-amz-meta- headers
2024-03-06 19:23:36 +01:00
for ( name , value ) in headers . iter ( ) {
if name . as_str ( ) . starts_with ( " x-amz-meta- " ) {
ret . push ( (
name . to_string ( ) ,
std ::str ::from_utf8 ( value . as_bytes ( ) ) ? . to_string ( ) ,
) ) ;
2021-03-15 16:21:41 +01:00
}
}
2024-03-21 14:06:59 +01:00
Ok ( ret )
2020-07-09 17:04:43 +02:00
}
2023-10-20 13:37:37 +02:00
2023-10-20 13:55:34 +02:00
pub ( crate ) fn next_timestamp ( existing_object : Option < & Object > ) -> u64 {
2023-10-20 13:37:37 +02:00
existing_object
. as_ref ( )
. and_then ( | obj | obj . versions ( ) . iter ( ) . map ( | v | v . timestamp ) . max ( ) )
. map ( | t | std ::cmp ::max ( t + 1 , now_msec ( ) ) )
. unwrap_or_else ( now_msec )
}