From f294458f204752ca0bfd32a8661b4f9626ddc35f Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 14 Apr 2022 14:41:47 +0200 Subject: [PATCH] Ready to add K2V endpoint implementations --- src/api/k2v/api_server.rs | 166 +++++++++++++++++++++++ src/api/k2v/mod.rs | 2 + src/api/k2v/router.rs | 236 +++++++++++++++++++++++++++++++++ src/api/router_macros.rs | 4 +- src/api/s3/api_server.rs | 48 +------ src/api/signature/streaming.rs | 66 ++++++++- 6 files changed, 468 insertions(+), 54 deletions(-) create mode 100644 src/api/k2v/router.rs diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index bd4b6c26..87e7b873 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1 +1,167 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use chrono::{DateTime, NaiveDateTime, Utc}; +use futures::future::Future; +use futures::prelude::*; +use hyper::header; +use hyper::{Body, Method, Request, Response}; + +use opentelemetry::{trace::SpanRef, KeyValue}; + +use garage_table::util::*; +use garage_util::error::Error as GarageError; + +use garage_model::garage::Garage; +use garage_model::key_table::Key; + +use crate::error::*; use crate::generic_server::*; +use crate::signature::compute_scope; +use crate::signature::payload::check_payload_signature; +use crate::signature::streaming::*; +use crate::signature::LONG_DATETIME; + +use crate::helpers::*; +use crate::k2v::router::{Endpoint}; +use crate::s3::cors::*; + +pub struct K2VApiServer { + garage: Arc, +} + +pub(crate) struct K2VApiEndpoint { + bucket_name: String, + endpoint: Endpoint, +} + +impl K2VApiServer { + pub async fn run( + garage: Arc, + shutdown_signal: impl Future, + ) -> Result<(), GarageError> { + let addr = garage.config.s3_api.api_bind_addr; + + ApiServer::new( + garage.config.s3_api.s3_region.clone(), + K2VApiServer { garage }, + ) + .run_server(addr, shutdown_signal) + .await + } +} + +#[async_trait] +impl ApiHandler for K2VApiServer { + const API_NAME: &'static str = "k2v"; + const API_NAME_DISPLAY: &'static str = "K2V"; + + type Endpoint = K2VApiEndpoint; + + fn parse_endpoint(&self, req: &Request) -> Result { + let authority = req + .headers() + .get(header::HOST) + .ok_or_bad_request("Host header required")? + .to_str()?; + + let host = authority_to_host(authority)?; + + let bucket_name = self + .garage + .config + .s3_api + .root_domain + .as_ref() + .and_then(|root_domain| host_to_bucket(&host, root_domain)); + + let (endpoint, bucket_name) = Endpoint::from_request(req)?; + + Ok(K2VApiEndpoint { + bucket_name, + endpoint, + }) + } + + async fn handle( + &self, + req: Request, + endpoint: K2VApiEndpoint, + ) -> Result, Error> { + let K2VApiEndpoint { + bucket_name, + endpoint, + } = endpoint; + let garage = self.garage.clone(); + + // The OPTIONS method is procesed early, before we even check for an API key + if let Endpoint::Options = endpoint { + return handle_options_s3api(garage, &req, Some(bucket_name)).await; + } + + let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?; + let api_key = api_key.ok_or_else(|| { + Error::Forbidden("Garage does not support anonymous access yet".to_string()) + })?; + + let req = parse_streaming_body(&api_key, req, &mut content_sha256, &garage.config.s3_api.s3_region)?; + + let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?; + let bucket = garage + .bucket_table + .get(&EmptyKey, &bucket_id) + .await? + .filter(|b| !b.state.is_deleted()) + .ok_or(Error::NoSuchBucket)?; + + let allowed = match endpoint.authorization_type() { + Authorization::Read => api_key.allow_read(&bucket_id), + Authorization::Write => api_key.allow_write(&bucket_id), + Authorization::Owner => api_key.allow_owner(&bucket_id), + _ => unreachable!(), + }; + + if !allowed { + return Err(Error::Forbidden( + "Operation is not allowed for this key.".to_string(), + )); + } + + // Look up what CORS rule might apply to response. + // Requests for methods different than GET, HEAD or POST + // are always preflighted, i.e. the browser should make + // an OPTIONS call before to check it is allowed + let matching_cors_rule = match *req.method() { + Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?, + _ => None, + }; + + let resp = match endpoint { + //TODO + endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())), + }; + + // If request was a success and we have a CORS rule that applies to it, + // add the corresponding CORS headers to the response + let mut resp_ok = resp?; + if let Some(rule) = matching_cors_rule { + add_cors_headers(&mut resp_ok, rule) + .ok_or_internal_error("Invalid bucket CORS configuration")?; + } + + Ok(resp_ok) + } +} + +impl ApiEndpoint for K2VApiEndpoint { + fn name(&self) -> &'static str { + self.endpoint.name() + } + + fn add_span_attributes(&self, span: SpanRef<'_>) { + span.set_attribute(KeyValue::new( + "bucket", + self.bucket_name.clone(), + )); + } +} diff --git a/src/api/k2v/mod.rs b/src/api/k2v/mod.rs index c086767d..23f24444 100644 --- a/src/api/k2v/mod.rs +++ b/src/api/k2v/mod.rs @@ -1 +1,3 @@ pub mod api_server; + +mod router; diff --git a/src/api/k2v/router.rs b/src/api/k2v/router.rs new file mode 100644 index 00000000..50c01237 --- /dev/null +++ b/src/api/k2v/router.rs @@ -0,0 +1,236 @@ +use crate::error::*; + +use std::borrow::Cow; + +use hyper::header::HeaderValue; +use hyper::{HeaderMap, Method, Request}; + +use crate::router_macros::{router_match, generateQueryParameters}; +use crate::helpers::Authorization; + +router_match! {@func + + +/// List of all K2V API endpoints. +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum Endpoint { + DeleteBatch { + }, + DeleteItem { + partition_key: String, + sort_key: String, + }, + InsertBatch { + }, + InsertItem { + partition_key: String, + sort_key: String, + }, + Options, + PollItem { + partition_key: String, + sort_key: String, + causality_token: String, + }, + ReadBatch { + }, + ReadIndex { + start: Option, + end: Option, + limit: Option, + }, + ReadItem { + partition_key: String, + sort_key: String, + }, +}} + +impl Endpoint { + /// Determine which S3 endpoint a request is for using the request, and a bucket which was + /// possibly extracted from the Host header. + /// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets + pub fn from_request( + req: &Request, + ) -> Result<(Self, String), Error> { + let uri = req.uri(); + let path = uri.path().trim_start_matches('/'); + let query = uri.query(); + + let (bucket, partition_key) = + path.split_once('/') .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/'))) + .unwrap_or((path.to_owned(), "")); + + if bucket.is_empty() { + return Err(Error::BadRequest("Missing bucket name".to_owned())); + } + + if *req.method() == Method::OPTIONS { + return Ok((Self::Options, bucket)); + } + + let partition_key = percent_encoding::percent_decode_str(partition_key) + .decode_utf8()? + .into_owned(); + + let mut query = QueryParameters::from_query(query.unwrap_or_default())?; + + let res = match *req.method() { + Method::GET => Self::from_get(partition_key, &mut query)?, + //Method::HEAD => Self::from_head(partition_key, &mut query)?, + Method::POST => Self::from_post(partition_key, &mut query)?, + Method::PUT => Self::from_put(partition_key, &mut query)?, + Method::DELETE => Self::from_delete(partition_key, &mut query)?, + _ => return Err(Error::BadRequest("Unknown method".to_owned())), + }; + + if let Some(message) = query.nonempty_message() { + debug!("Unused query parameter: {}", message) + } + Ok((res, bucket)) + } + + /// Determine which endpoint a request is for, knowing it is a GET. + fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY if causality_token => PollItem (query::sort_key, query::causality_token), + EMPTY => ReadItem (query::sort_key), + ], + no_key: [ + EMPTY => ReadIndex (query_opt::start, query_opt::end, opt_parse::limit), + ] + } + } + + /* + /// Determine which endpoint a request is for, knowing it is a HEAD. + fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id), + ], + no_key: [ + EMPTY => HeadBucket, + ] + } + } + */ + + /// Determine which endpoint a request is for, knowing it is a POST. + fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + ], + no_key: [ + EMPTY => InsertBatch, + DELETE => DeleteBatch, + SEARCH => ReadBatch, + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a PUT. + fn from_put( + partition_key: String, + query: &mut QueryParameters<'_>, + ) -> Result { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => InsertItem (query::sort_key), + + ], + no_key: [ + ] + } + } + + /// Determine which endpoint a request is for, knowing it is a DELETE. + fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result { + router_match! { + @gen_parser + (query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None), + key: [ + EMPTY => DeleteItem (query::sort_key), + ], + no_key: [ + ] + } + } + + /// Get the partition key the request target. Returns None for requests which don't use a partition key. + #[allow(dead_code)] + pub fn get_partition_key(&self) -> Option<&str> { + router_match! { + @extract + self, + partition_key, + [ + DeleteItem, + InsertItem, + PollItem, + ReadItem, + ] + } + } + + /// Get the sort key the request target. Returns None for requests which don't use a sort key. + #[allow(dead_code)] + pub fn get_sort_key(&self) -> Option<&str> { + router_match! { + @extract + self, + sort_key, + [ + DeleteItem, + InsertItem, + PollItem, + ReadItem, + ] + } + } + + /// Get the kind of authorization which is required to perform the operation. + pub fn authorization_type(&self) -> Authorization { + let readonly = router_match! { + @match + self, + [ + PollItem, + ReadBatch, + ReadIndex, + ReadItem, + ] + }; + if readonly { + Authorization::Read + } else { + Authorization::Write + } + } +} + +// parameter name => struct field +generateQueryParameters! { + "start" => start, + "causality_token" => causality_token, + "end" => end, + "limit" => limit, + "sort_key" => sort_key +} + +mod keywords { + //! This module contain all query parameters with no associated value + //! used to differentiate endpoints. + pub const EMPTY: &str = ""; + + pub const DELETE: &str = "delete"; + pub const SEARCH: &str = "search"; +} diff --git a/src/api/router_macros.rs b/src/api/router_macros.rs index bdbad9c0..fb7a8f6d 100644 --- a/src/api/router_macros.rs +++ b/src/api/router_macros.rs @@ -25,7 +25,7 @@ macro_rules! router_match { _ => None } }}; - (@gen_parser ($keyword:expr, $key:expr, $query:expr, $header:expr), + (@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr), key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*], no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{ // usage: router_match {@gen_parser (keyword, key, query, header), @@ -44,7 +44,7 @@ macro_rules! router_match { match ($keyword, !$key.is_empty()){ $( ($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k { - key: $key, + $key, $($( $param_k: router_match!(@@parse_param $query, $conv_k, $param_k), )*)? diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 80fde224..cec755ba 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -19,7 +19,7 @@ use crate::error::*; use crate::generic_server::*; use crate::signature::compute_scope; use crate::signature::payload::check_payload_signature; -use crate::signature::streaming::SignedPayloadStream; +use crate::signature::streaming::*; use crate::signature::LONG_DATETIME; use crate::helpers::*; @@ -128,51 +128,7 @@ impl ApiHandler for S3ApiServer { Error::Forbidden("Garage does not support anonymous access yet".to_string()) })?; - let req = match req.headers().get("x-amz-content-sha256") { - Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { - let signature = content_sha256 - .take() - .ok_or_bad_request("No signature provided")?; - - let secret_key = &api_key - .state - .as_option() - .ok_or_internal_error("Deleted key state")? - .secret_key; - - let date = req - .headers() - .get("x-amz-date") - .ok_or_bad_request("Missing X-Amz-Date field")? - .to_str()?; - let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) - .ok_or_bad_request("Invalid date")?; - let date: DateTime = DateTime::from_utc(date, Utc); - - let scope = compute_scope(&date, &garage.config.s3_api.s3_region); - let signing_hmac = crate::signature::signing_hmac( - &date, - secret_key, - &garage.config.s3_api.s3_region, - "s3", - ) - .ok_or_internal_error("Unable to build signing HMAC")?; - - req.map(move |body| { - Body::wrap_stream( - SignedPayloadStream::new( - body.map_err(Error::from), - signing_hmac, - date, - &scope, - signature, - ) - .map_err(Error::from), - ) - }) - } - _ => req, - }; + let req = parse_streaming_body(&api_key, req, &mut content_sha256, &garage.config.s3_api.s3_region)?; let bucket_name = match bucket_name { None => { diff --git a/src/api/signature/streaming.rs b/src/api/signature/streaming.rs index 969a45d6..30fafa62 100644 --- a/src/api/signature/streaming.rs +++ b/src/api/signature/streaming.rs @@ -1,19 +1,73 @@ use std::pin::Pin; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, NaiveDateTime, Utc}; use futures::prelude::*; use futures::task; use hyper::body::Bytes; - -use garage_util::data::Hash; +use hyper::{Body, Method, Request, Response}; +use garage_model::key_table::Key; use hmac::Mac; -use super::sha256sum; -use super::HmacSha256; -use super::LONG_DATETIME; +use garage_util::data::Hash; + +use super::{sha256sum, HmacSha256, LONG_DATETIME, compute_scope}; use crate::error::*; +pub fn parse_streaming_body( + api_key: &Key, + req: Request, + content_sha256: &mut Option, + region: &str, + ) -> Result, Error> { + match req.headers().get("x-amz-content-sha256") { + Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => { + let signature = content_sha256 + .take() + .ok_or_bad_request("No signature provided")?; + + let secret_key = &api_key + .state + .as_option() + .ok_or_internal_error("Deleted key state")? + .secret_key; + + let date = req + .headers() + .get("x-amz-date") + .ok_or_bad_request("Missing X-Amz-Date field")? + .to_str()?; + let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) + .ok_or_bad_request("Invalid date")?; + let date: DateTime = DateTime::from_utc(date, Utc); + + let scope = compute_scope(&date, region); + let signing_hmac = crate::signature::signing_hmac( + &date, + secret_key, + region, + "s3", + ) + .ok_or_internal_error("Unable to build signing HMAC")?; + + Ok(req.map(move |body| { + Body::wrap_stream( + SignedPayloadStream::new( + body.map_err(Error::from), + signing_hmac, + date, + &scope, + signature, + ) + .map_err(Error::from), + ) + })) + } + _ => Ok(req), + } +} + + /// Result of `sha256("")` const EMPTY_STRING_HEX_DIGEST: &str = "e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";