2023-01-23 19:14:07 +00:00
use base64 ::prelude ::* ;
2022-05-10 11:16:57 +00:00
use http ::header ;
2024-02-05 18:27:12 +00:00
use hyper ::{ Request , Response , StatusCode } ;
2022-05-10 11:16:57 +00:00
use garage_model ::k2v ::causality ::* ;
use garage_model ::k2v ::item_table ::* ;
2024-02-05 18:27:12 +00:00
use crate ::helpers ::* ;
use crate ::k2v ::api_server ::{ ReqBody , ResBody } ;
2022-05-24 10:16:39 +00:00
use crate ::k2v ::error ::* ;
2022-05-10 11:16:57 +00:00
pub const X_GARAGE_CAUSALITY_TOKEN : & str = " X-Garage-Causality-Token " ;
pub enum ReturnFormat {
Json ,
Binary ,
Either ,
}
impl ReturnFormat {
2024-02-05 18:27:12 +00:00
pub fn from ( req : & Request < ReqBody > ) -> Result < Self , Error > {
2022-05-10 11:16:57 +00:00
let accept = match req . headers ( ) . get ( header ::ACCEPT ) {
Some ( a ) = > a . to_str ( ) ? ,
None = > return Ok ( Self ::Json ) ,
} ;
let accept = accept . split ( ',' ) . map ( | s | s . trim ( ) ) . collect ::< Vec < _ > > ( ) ;
let accept_json = accept . contains ( & " application/json " ) | | accept . contains ( & " */* " ) ;
let accept_binary = accept . contains ( & " application/octet-stream " ) | | accept . contains ( & " */* " ) ;
match ( accept_json , accept_binary ) {
( true , true ) = > Ok ( Self ::Either ) ,
( true , false ) = > Ok ( Self ::Json ) ,
( false , true ) = > Ok ( Self ::Binary ) ,
( false , false ) = > Err ( Error ::NotAcceptable ( " Invalid Accept: header value, must contain either application/json or application/octet-stream (or both) " . into ( ) ) ) ,
}
}
2024-02-05 18:27:12 +00:00
pub fn make_response ( & self , item : & K2VItem ) -> Result < Response < ResBody > , Error > {
2022-05-10 11:16:57 +00:00
let vals = item . values ( ) ;
if vals . is_empty ( ) {
return Err ( Error ::NoSuchKey ) ;
}
let ct = item . causal_context ( ) . serialize ( ) ;
match self {
Self ::Binary if vals . len ( ) > 1 = > Ok ( Response ::builder ( )
. header ( X_GARAGE_CAUSALITY_TOKEN , ct )
. status ( StatusCode ::CONFLICT )
2024-02-05 18:27:12 +00:00
. body ( empty_body ( ) ) ? ) ,
2022-05-10 11:16:57 +00:00
Self ::Binary = > {
assert! ( vals . len ( ) = = 1 ) ;
Self ::make_binary_response ( ct , vals [ 0 ] )
}
Self ::Either if vals . len ( ) = = 1 = > Self ::make_binary_response ( ct , vals [ 0 ] ) ,
_ = > Self ::make_json_response ( ct , & vals [ .. ] ) ,
}
}
2024-02-05 18:27:12 +00:00
fn make_binary_response ( ct : String , v : & DvvsValue ) -> Result < Response < ResBody > , Error > {
2022-05-10 11:16:57 +00:00
match v {
DvvsValue ::Deleted = > Ok ( Response ::builder ( )
. header ( X_GARAGE_CAUSALITY_TOKEN , ct )
. header ( header ::CONTENT_TYPE , " application/octet-stream " )
. status ( StatusCode ::NO_CONTENT )
2024-02-05 18:27:12 +00:00
. body ( empty_body ( ) ) ? ) ,
2022-05-10 11:16:57 +00:00
DvvsValue ::Value ( v ) = > Ok ( Response ::builder ( )
. header ( X_GARAGE_CAUSALITY_TOKEN , ct )
. header ( header ::CONTENT_TYPE , " application/octet-stream " )
. status ( StatusCode ::OK )
2024-02-05 18:27:12 +00:00
. body ( bytes_body ( v . to_vec ( ) . into ( ) ) ) ? ) ,
2022-05-10 11:16:57 +00:00
}
}
2024-02-05 18:27:12 +00:00
fn make_json_response ( ct : String , v : & [ & DvvsValue ] ) -> Result < Response < ResBody > , Error > {
2022-05-10 11:16:57 +00:00
let items = v
. iter ( )
. map ( | v | match v {
DvvsValue ::Deleted = > serde_json ::Value ::Null ,
2023-01-23 19:14:07 +00:00
DvvsValue ::Value ( v ) = > serde_json ::Value ::String ( BASE64_STANDARD . encode ( v ) ) ,
2022-05-10 11:16:57 +00:00
} )
. collect ::< Vec < _ > > ( ) ;
let json_body =
serde_json ::to_string_pretty ( & items ) . ok_or_internal_error ( " JSON encoding error " ) ? ;
Ok ( Response ::builder ( )
. header ( X_GARAGE_CAUSALITY_TOKEN , ct )
. header ( header ::CONTENT_TYPE , " application/json " )
. status ( StatusCode ::OK )
2024-02-05 18:27:12 +00:00
. body ( string_body ( json_body ) ) ? )
2022-05-10 11:16:57 +00:00
}
}
/// Handle ReadItem request
#[ allow(clippy::ptr_arg) ]
pub async fn handle_read_item (
2024-03-03 13:56:52 +00:00
ctx : ReqCtx ,
2024-02-05 18:27:12 +00:00
req : & Request < ReqBody > ,
2022-05-10 11:16:57 +00:00
partition_key : & str ,
sort_key : & String ,
2024-02-05 18:27:12 +00:00
) -> Result < Response < ResBody > , Error > {
2024-03-03 13:56:52 +00:00
let ReqCtx {
garage , bucket_id , ..
} = & ctx ;
2022-05-10 11:16:57 +00:00
let format = ReturnFormat ::from ( req ) ? ;
let item = garage
. k2v
. item_table
. get (
& K2VItemPartition {
2024-03-03 13:56:52 +00:00
bucket_id : * bucket_id ,
2022-05-10 11:16:57 +00:00
partition_key : partition_key . to_string ( ) ,
} ,
sort_key ,
)
. await ?
. ok_or ( Error ::NoSuchKey ) ? ;
format . make_response ( & item )
}
pub async fn handle_insert_item (
2024-03-03 13:56:52 +00:00
ctx : ReqCtx ,
2024-02-05 18:27:12 +00:00
req : Request < ReqBody > ,
2022-05-10 11:16:57 +00:00
partition_key : & str ,
sort_key : & str ,
2024-02-05 18:27:12 +00:00
) -> Result < Response < ResBody > , Error > {
2024-03-03 13:56:52 +00:00
let ReqCtx {
garage , bucket_id , ..
} = & ctx ;
2022-05-10 11:16:57 +00:00
let causal_context = req
. headers ( )
. get ( X_GARAGE_CAUSALITY_TOKEN )
. map ( | s | s . to_str ( ) )
. transpose ( ) ?
2023-01-11 11:27:19 +00:00
. map ( CausalContext ::parse_helper )
. transpose ( ) ? ;
2022-05-10 11:16:57 +00:00
2024-02-05 18:27:12 +00:00
let body = http_body_util ::BodyExt ::collect ( req . into_body ( ) )
. await ?
. to_bytes ( ) ;
2024-02-05 13:44:12 +00:00
2022-05-10 11:16:57 +00:00
let value = DvvsValue ::Value ( body . to_vec ( ) ) ;
garage
. k2v
. rpc
. insert (
2024-03-03 13:56:52 +00:00
* bucket_id ,
2022-05-10 11:16:57 +00:00
partition_key . to_string ( ) ,
sort_key . to_string ( ) ,
causal_context ,
value ,
)
. await ? ;
Ok ( Response ::builder ( )
2022-10-16 17:46:15 +00:00
. status ( StatusCode ::NO_CONTENT )
2024-02-05 18:27:12 +00:00
. body ( empty_body ( ) ) ? )
2022-05-10 11:16:57 +00:00
}
pub async fn handle_delete_item (
2024-03-03 13:56:52 +00:00
ctx : ReqCtx ,
2024-02-05 18:27:12 +00:00
req : Request < ReqBody > ,
2022-05-10 11:16:57 +00:00
partition_key : & str ,
sort_key : & str ,
2024-02-05 18:27:12 +00:00
) -> Result < Response < ResBody > , Error > {
2024-03-03 13:56:52 +00:00
let ReqCtx {
garage , bucket_id , ..
} = & ctx ;
2022-05-10 11:16:57 +00:00
let causal_context = req
. headers ( )
. get ( X_GARAGE_CAUSALITY_TOKEN )
. map ( | s | s . to_str ( ) )
. transpose ( ) ?
2023-01-11 11:27:19 +00:00
. map ( CausalContext ::parse_helper )
. transpose ( ) ? ;
2022-05-10 11:16:57 +00:00
let value = DvvsValue ::Deleted ;
garage
. k2v
. rpc
. insert (
2024-03-03 13:56:52 +00:00
* bucket_id ,
2022-05-10 11:16:57 +00:00
partition_key . to_string ( ) ,
sort_key . to_string ( ) ,
causal_context ,
value ,
)
. await ? ;
Ok ( Response ::builder ( )
. status ( StatusCode ::NO_CONTENT )
2024-02-05 18:27:12 +00:00
. body ( empty_body ( ) ) ? )
2022-05-10 11:16:57 +00:00
}
/// Handle ReadItem request
#[ allow(clippy::ptr_arg) ]
pub async fn handle_poll_item (
2024-03-03 13:56:52 +00:00
ctx : ReqCtx ,
2024-02-05 18:27:12 +00:00
req : & Request < ReqBody > ,
2022-05-10 11:16:57 +00:00
partition_key : String ,
sort_key : String ,
causality_token : String ,
timeout_secs : Option < u64 > ,
2024-02-05 18:27:12 +00:00
) -> Result < Response < ResBody > , Error > {
2024-03-03 13:56:52 +00:00
let ReqCtx {
garage , bucket_id , ..
} = & ctx ;
2022-05-10 11:16:57 +00:00
let format = ReturnFormat ::from ( req ) ? ;
let causal_context =
CausalContext ::parse ( & causality_token ) . ok_or_bad_request ( " Invalid causality token " ) ? ;
2023-01-11 14:03:08 +00:00
let timeout_msec = timeout_secs . unwrap_or ( 300 ) . clamp ( 1 , 600 ) * 1000 ;
2023-01-10 14:22:25 +00:00
2022-05-10 11:16:57 +00:00
let item = garage
. k2v
. rpc
2023-01-10 09:30:59 +00:00
. poll_item (
2024-03-03 13:56:52 +00:00
* bucket_id ,
2022-05-10 11:16:57 +00:00
partition_key ,
sort_key ,
causal_context ,
2023-01-10 14:22:25 +00:00
timeout_msec ,
2022-05-10 11:16:57 +00:00
)
. await ? ;
if let Some ( item ) = item {
format . make_response ( & item )
} else {
Ok ( Response ::builder ( )
. status ( StatusCode ::NOT_MODIFIED )
2024-02-05 18:27:12 +00:00
. body ( empty_body ( ) ) ? )
2022-05-10 11:16:57 +00:00
}
}