Ready to add K2V endpoint implementations

This commit is contained in:
Alex 2022-04-14 14:41:47 +02:00
parent b8562d6e3c
commit f294458f20
Signed by: lx
GPG key ID: 0E496D15096376BE
6 changed files with 468 additions and 54 deletions

View file

@ -1 +1,167 @@
use std::sync::Arc;
use async_trait::async_trait;
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::future::Future;
use futures::prelude::*;
use hyper::header;
use hyper::{Body, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue};
use garage_table::util::*;
use garage_util::error::Error as GarageError;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use crate::error::*;
use crate::generic_server::*;
use crate::signature::compute_scope;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::*;
use crate::signature::LONG_DATETIME;
use crate::helpers::*;
use crate::k2v::router::{Endpoint};
use crate::s3::cors::*;
pub struct K2VApiServer {
garage: Arc<Garage>,
}
pub(crate) struct K2VApiEndpoint {
bucket_name: String,
endpoint: Endpoint,
}
impl K2VApiServer {
pub async fn run(
garage: Arc<Garage>,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let addr = garage.config.s3_api.api_bind_addr;
ApiServer::new(
garage.config.s3_api.s3_region.clone(),
K2VApiServer { garage },
)
.run_server(addr, shutdown_signal)
.await
}
}
#[async_trait]
impl ApiHandler for K2VApiServer {
const API_NAME: &'static str = "k2v";
const API_NAME_DISPLAY: &'static str = "K2V";
type Endpoint = K2VApiEndpoint;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
let authority = req
.headers()
.get(header::HOST)
.ok_or_bad_request("Host header required")?
.to_str()?;
let host = authority_to_host(authority)?;
let bucket_name = self
.garage
.config
.s3_api
.root_domain
.as_ref()
.and_then(|root_domain| host_to_bucket(&host, root_domain));
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
Ok(K2VApiEndpoint {
bucket_name,
endpoint,
})
}
async fn handle(
&self,
req: Request<Body>,
endpoint: K2VApiEndpoint,
) -> Result<Response<Body>, Error> {
let K2VApiEndpoint {
bucket_name,
endpoint,
} = endpoint;
let garage = self.garage.clone();
// The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint {
return handle_options_s3api(garage, &req, Some(bucket_name)).await;
}
let (api_key, mut content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
let req = parse_streaming_body(&api_key, req, &mut content_sha256, &garage.config.s3_api.s3_region)?;
let bucket_id = resolve_bucket(&garage, &bucket_name, &api_key).await?;
let bucket = garage
.bucket_table
.get(&EmptyKey, &bucket_id)
.await?
.filter(|b| !b.state.is_deleted())
.ok_or(Error::NoSuchBucket)?;
let allowed = match endpoint.authorization_type() {
Authorization::Read => api_key.allow_read(&bucket_id),
Authorization::Write => api_key.allow_write(&bucket_id),
Authorization::Owner => api_key.allow_owner(&bucket_id),
_ => unreachable!(),
};
if !allowed {
return Err(Error::Forbidden(
"Operation is not allowed for this key.".to_string(),
));
}
// Look up what CORS rule might apply to response.
// Requests for methods different than GET, HEAD or POST
// are always preflighted, i.e. the browser should make
// an OPTIONS call before to check it is allowed
let matching_cors_rule = match *req.method() {
Method::GET | Method::HEAD | Method::POST => find_matching_cors_rule(&bucket, &req)?,
_ => None,
};
let resp = match endpoint {
//TODO
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};
// If request was a success and we have a CORS rule that applies to it,
// add the corresponding CORS headers to the response
let mut resp_ok = resp?;
if let Some(rule) = matching_cors_rule {
add_cors_headers(&mut resp_ok, rule)
.ok_or_internal_error("Invalid bucket CORS configuration")?;
}
Ok(resp_ok)
}
}
impl ApiEndpoint for K2VApiEndpoint {
fn name(&self) -> &'static str {
self.endpoint.name()
}
fn add_span_attributes(&self, span: SpanRef<'_>) {
span.set_attribute(KeyValue::new(
"bucket",
self.bucket_name.clone(),
));
}
}

View file

@ -1 +1,3 @@
pub mod api_server;
mod router;

236
src/api/k2v/router.rs Normal file
View file

@ -0,0 +1,236 @@
use crate::error::*;
use std::borrow::Cow;
use hyper::header::HeaderValue;
use hyper::{HeaderMap, Method, Request};
use crate::router_macros::{router_match, generateQueryParameters};
use crate::helpers::Authorization;
router_match! {@func
/// List of all K2V API endpoints.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum Endpoint {
DeleteBatch {
},
DeleteItem {
partition_key: String,
sort_key: String,
},
InsertBatch {
},
InsertItem {
partition_key: String,
sort_key: String,
},
Options,
PollItem {
partition_key: String,
sort_key: String,
causality_token: String,
},
ReadBatch {
},
ReadIndex {
start: Option<String>,
end: Option<String>,
limit: Option<u64>,
},
ReadItem {
partition_key: String,
sort_key: String,
},
}}
impl Endpoint {
/// Determine which S3 endpoint a request is for using the request, and a bucket which was
/// possibly extracted from the Host header.
/// Returns Self plus bucket name, if endpoint is not Endpoint::ListBuckets
pub fn from_request<T>(
req: &Request<T>,
) -> Result<(Self, String), Error> {
let uri = req.uri();
let path = uri.path().trim_start_matches('/');
let query = uri.query();
let (bucket, partition_key) =
path.split_once('/') .map(|(b, p)| (b.to_owned(), p.trim_start_matches('/')))
.unwrap_or((path.to_owned(), ""));
if bucket.is_empty() {
return Err(Error::BadRequest("Missing bucket name".to_owned()));
}
if *req.method() == Method::OPTIONS {
return Ok((Self::Options, bucket));
}
let partition_key = percent_encoding::percent_decode_str(partition_key)
.decode_utf8()?
.into_owned();
let mut query = QueryParameters::from_query(query.unwrap_or_default())?;
let res = match *req.method() {
Method::GET => Self::from_get(partition_key, &mut query)?,
//Method::HEAD => Self::from_head(partition_key, &mut query)?,
Method::POST => Self::from_post(partition_key, &mut query)?,
Method::PUT => Self::from_put(partition_key, &mut query)?,
Method::DELETE => Self::from_delete(partition_key, &mut query)?,
_ => return Err(Error::BadRequest("Unknown method".to_owned())),
};
if let Some(message) = query.nonempty_message() {
debug!("Unused query parameter: {}", message)
}
Ok((res, bucket))
}
/// Determine which endpoint a request is for, knowing it is a GET.
fn from_get(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
EMPTY if causality_token => PollItem (query::sort_key, query::causality_token),
EMPTY => ReadItem (query::sort_key),
],
no_key: [
EMPTY => ReadIndex (query_opt::start, query_opt::end, opt_parse::limit),
]
}
}
/*
/// Determine which endpoint a request is for, knowing it is a HEAD.
fn from_head(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
EMPTY => HeadObject(opt_parse::part_number, query_opt::version_id),
],
no_key: [
EMPTY => HeadBucket,
]
}
}
*/
/// Determine which endpoint a request is for, knowing it is a POST.
fn from_post(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
],
no_key: [
EMPTY => InsertBatch,
DELETE => DeleteBatch,
SEARCH => ReadBatch,
]
}
}
/// Determine which endpoint a request is for, knowing it is a PUT.
fn from_put(
partition_key: String,
query: &mut QueryParameters<'_>,
) -> Result<Self, Error> {
router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
EMPTY => InsertItem (query::sort_key),
],
no_key: [
]
}
}
/// Determine which endpoint a request is for, knowing it is a DELETE.
fn from_delete(partition_key: String, query: &mut QueryParameters<'_>) -> Result<Self, Error> {
router_match! {
@gen_parser
(query.keyword.take().unwrap_or_default().as_ref(), partition_key, query, None),
key: [
EMPTY => DeleteItem (query::sort_key),
],
no_key: [
]
}
}
/// Get the partition key the request target. Returns None for requests which don't use a partition key.
#[allow(dead_code)]
pub fn get_partition_key(&self) -> Option<&str> {
router_match! {
@extract
self,
partition_key,
[
DeleteItem,
InsertItem,
PollItem,
ReadItem,
]
}
}
/// Get the sort key the request target. Returns None for requests which don't use a sort key.
#[allow(dead_code)]
pub fn get_sort_key(&self) -> Option<&str> {
router_match! {
@extract
self,
sort_key,
[
DeleteItem,
InsertItem,
PollItem,
ReadItem,
]
}
}
/// Get the kind of authorization which is required to perform the operation.
pub fn authorization_type(&self) -> Authorization {
let readonly = router_match! {
@match
self,
[
PollItem,
ReadBatch,
ReadIndex,
ReadItem,
]
};
if readonly {
Authorization::Read
} else {
Authorization::Write
}
}
}
// parameter name => struct field
generateQueryParameters! {
"start" => start,
"causality_token" => causality_token,
"end" => end,
"limit" => limit,
"sort_key" => sort_key
}
mod keywords {
//! This module contain all query parameters with no associated value
//! used to differentiate endpoints.
pub const EMPTY: &str = "";
pub const DELETE: &str = "delete";
pub const SEARCH: &str = "search";
}

View file

@ -25,7 +25,7 @@ macro_rules! router_match {
_ => None
}
}};
(@gen_parser ($keyword:expr, $key:expr, $query:expr, $header:expr),
(@gen_parser ($keyword:expr, $key:ident, $query:expr, $header:expr),
key: [$($kw_k:ident $(if $required_k:ident)? $(header $header_k:expr)? => $api_k:ident $(($($conv_k:ident :: $param_k:ident),*))?,)*],
no_key: [$($kw_nk:ident $(if $required_nk:ident)? $(if_header $header_nk:expr)? => $api_nk:ident $(($($conv_nk:ident :: $param_nk:ident),*))?,)*]) => {{
// usage: router_match {@gen_parser (keyword, key, query, header),
@ -44,7 +44,7 @@ macro_rules! router_match {
match ($keyword, !$key.is_empty()){
$(
($kw_k, true) if true $(&& $query.$required_k.is_some())? $(&& $header.contains_key($header_k))? => Ok($api_k {
key: $key,
$key,
$($(
$param_k: router_match!(@@parse_param $query, $conv_k, $param_k),
)*)?

View file

@ -19,7 +19,7 @@ use crate::error::*;
use crate::generic_server::*;
use crate::signature::compute_scope;
use crate::signature::payload::check_payload_signature;
use crate::signature::streaming::SignedPayloadStream;
use crate::signature::streaming::*;
use crate::signature::LONG_DATETIME;
use crate::helpers::*;
@ -128,51 +128,7 @@ impl ApiHandler for S3ApiServer {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
let req = match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
let date = req
.headers()
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
let signing_hmac = crate::signature::signing_hmac(
&date,
secret_key,
&garage.config.s3_api.s3_region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
req.map(move |body| {
Body::wrap_stream(
SignedPayloadStream::new(
body.map_err(Error::from),
signing_hmac,
date,
&scope,
signature,
)
.map_err(Error::from),
)
})
}
_ => req,
};
let req = parse_streaming_body(&api_key, req, &mut content_sha256, &garage.config.s3_api.s3_region)?;
let bucket_name = match bucket_name {
None => {

View file

@ -1,19 +1,73 @@
use std::pin::Pin;
use chrono::{DateTime, Utc};
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::prelude::*;
use futures::task;
use hyper::body::Bytes;
use garage_util::data::Hash;
use hyper::{Body, Method, Request, Response};
use garage_model::key_table::Key;
use hmac::Mac;
use super::sha256sum;
use super::HmacSha256;
use super::LONG_DATETIME;
use garage_util::data::Hash;
use super::{sha256sum, HmacSha256, LONG_DATETIME, compute_scope};
use crate::error::*;
pub fn parse_streaming_body(
api_key: &Key,
req: Request<Body>,
content_sha256: &mut Option<Hash>,
region: &str,
) -> Result<Request<Body>, Error> {
match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
let date = req
.headers()
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
let scope = compute_scope(&date, region);
let signing_hmac = crate::signature::signing_hmac(
&date,
secret_key,
region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| {
Body::wrap_stream(
SignedPayloadStream::new(
body.map_err(Error::from),
signing_hmac,
date,
&scope,
signature,
)
.map_err(Error::from),
)
}))
}
_ => Ok(req),
}
}
/// Result of `sha256("")`
const EMPTY_STRING_HEX_DIGEST: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";