add proper request router for s3 api #163

Merged
lx merged 5 commits from trinity-1686a/garage:s3-router into main 2021-12-06 14:17:47 +00:00
7 changed files with 1476 additions and 185 deletions

View file

@ -1,4 +1,3 @@
use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
@ -6,7 +5,7 @@ use futures::future::Future;
use hyper::header; use hyper::header;
use hyper::server::conn::AddrStream; use hyper::server::conn::AddrStream;
use hyper::service::{make_service_fn, service_fn}; use hyper::service::{make_service_fn, service_fn};
use hyper::{Body, Method, Request, Response, Server}; use hyper::{Body, Request, Response, Server};
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
@ -22,6 +21,7 @@ use crate::s3_delete::*;
use crate::s3_get::*; use crate::s3_get::*;
use crate::s3_list::*; use crate::s3_list::*;
use crate::s3_put::*; use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint};
/// Run the S3 API server /// Run the S3 API server
pub async fn run_api_server( pub async fn run_api_server(
@ -86,8 +86,6 @@ async fn handler(
} }
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> { async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
let path = req.uri().path().to_string();
let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
let (api_key, content_sha256) = check_signature(&garage, &req).await?; let (api_key, content_sha256) = check_signature(&garage, &req).await?;
let authority = req let authority = req
@ -105,61 +103,43 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.as_ref() .as_ref()
.and_then(|root_domain| host_to_bucket(&host, root_domain)); .and_then(|root_domain| host_to_bucket(&host, root_domain));
if path == "/" && bucket.is_none() { let endpoint = Endpoint::from_request(&req, bucket.map(ToOwned::to_owned))?;
return handle_list_buckets(&api_key); let allowed = match endpoint.authorization_type() {
} Authorization::None => true,
Authorization::Read(bucket) => api_key.allow_read(bucket),
let (bucket, key) = parse_bucket_key(&path, bucket)?; Authorization::Write(bucket) => api_key.allow_write(bucket),
let allowed = match req.method() {
&Method::HEAD | &Method::GET => api_key.allow_read(bucket),
_ => api_key.allow_write(bucket),
}; };
if !allowed { if !allowed {
return Err(Error::Forbidden( return Err(Error::Forbidden(
"Operation is not allowed for this key.".to_string(), "Operation is not allowed for this key.".to_string(),
)); ));
} }
let mut params = HashMap::new(); match endpoint {
if let Some(query) = req.uri().query() { Endpoint::ListBuckets => handle_list_buckets(&api_key),
let query_pairs = url::form_urlencoded::parse(query.as_bytes()); Endpoint::HeadObject { bucket, key, .. } => handle_head(garage, &req, &bucket, &key).await,
for (key, val) in query_pairs { Endpoint::GetObject { bucket, key, .. } => handle_get(garage, &req, &bucket, &key).await,
params.insert(key.to_lowercase(), val.to_string()); Endpoint::UploadPart {
}
}
if let Some(key) = key {
match *req.method() {
Method::HEAD => {
// HeadObject query
Ok(handle_head(garage, &req, bucket, key).await?)
}
Method::GET => {
// GetObject query
Ok(handle_get(garage, &req, bucket, key).await?)
}
Method::PUT => {
if params.contains_key(&"partnumber".to_string())
&& params.contains_key(&"uploadid".to_string())
{
// UploadPart query
let part_number = params.get("partnumber").unwrap();
let upload_id = params.get("uploadid").unwrap();
Ok(handle_put_part(
garage,
req,
bucket, bucket,
key, key,
part_number, part_number,
upload_id, upload_id,
} => {
handle_put_part(
garage,
req,
&bucket,
&key,
part_number,
&upload_id,
content_sha256, content_sha256,
) )
.await?) .await
} else if req.headers().contains_key("x-amz-copy-source") { }
// CopyObject query Endpoint::CopyObject { bucket, key } => {
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?; let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
let copy_source = let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?; let (source_bucket, source_key) = parse_bucket_key(&copy_source, None)?;
if !api_key.allow_read(source_bucket) { if !api_key.allow_read(source_bucket) {
return Err(Error::Forbidden(format!( return Err(Error::Forbidden(format!(
@ -168,51 +148,29 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
))); )));
} }
let source_key = source_key.ok_or_bad_request("No source key specified")?; let source_key = source_key.ok_or_bad_request("No source key specified")?;
Ok(handle_copy(garage, &req, bucket, key, source_bucket, source_key).await?) handle_copy(garage, &req, &bucket, &key, source_bucket, source_key).await
} else {
// PutObject query
Ok(handle_put(garage, req, bucket, key, content_sha256).await?)
} }
Endpoint::PutObject { bucket, key } => {
handle_put(garage, req, &bucket, &key, content_sha256).await
} }
Method::DELETE => { Endpoint::AbortMultipartUpload {
if params.contains_key(&"uploadid".to_string()) {
// AbortMultipartUpload query
let upload_id = params.get("uploadid").unwrap();
Ok(handle_abort_multipart_upload(garage, bucket, key, upload_id).await?)
} else {
// DeleteObject query
Ok(handle_delete(garage, bucket, key).await?)
}
}
Method::POST => {
if params.contains_key(&"uploads".to_string()) {
// CreateMultipartUpload call
Ok(handle_create_multipart_upload(garage, &req, bucket, key).await?)
} else if params.contains_key(&"uploadid".to_string()) {
// CompleteMultipartUpload call
let upload_id = params.get("uploadid").unwrap();
Ok(handle_complete_multipart_upload(
garage,
req,
bucket, bucket,
key, key,
upload_id, upload_id,
content_sha256, } => handle_abort_multipart_upload(garage, &bucket, &key, &upload_id).await,
) Endpoint::DeleteObject { bucket, key, .. } => handle_delete(garage, &bucket, &key).await,
.await?) Endpoint::CreateMultipartUpload { bucket, key } => {
} else { handle_create_multipart_upload(garage, &req, &bucket, &key).await
Err(Error::BadRequest(
"Not a CreateMultipartUpload call, what is it?".to_string(),
))
} }
Endpoint::CompleteMultipartUpload {
bucket,
key,
upload_id,
} => {
handle_complete_multipart_upload(garage, req, &bucket, &key, &upload_id, content_sha256)
.await
} }
_ => Err(Error::BadRequest("Invalid method".to_string())), Endpoint::CreateBucket { bucket } => {
}
} else {
match *req.method() {
Method::PUT => {
// CreateBucket
// If we're here, the bucket already exists, so just answer ok
debug!( debug!(
"Body: {}", "Body: {}",
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?) std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
@ -225,46 +183,78 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.unwrap(); .unwrap();
Ok(response) Ok(response)
} }
Method::HEAD => { Endpoint::HeadBucket { .. } => {
// HeadBucket
let empty_body: Body = Body::from(vec![]); let empty_body: Body = Body::from(vec![]);
let response = Response::builder().body(empty_body).unwrap(); let response = Response::builder().body(empty_body).unwrap();
Ok(response) Ok(response)
} }
Method::DELETE => { Endpoint::DeleteBucket { .. } => Err(Error::Forbidden(
// DeleteBucket query
Err(Error::Forbidden(
"Cannot delete buckets using S3 api, please talk to Garage directly".into(), "Cannot delete buckets using S3 api, please talk to Garage directly".into(),
)) )),
Endpoint::GetBucketLocation { .. } => handle_get_bucket_location(garage),
Endpoint::GetBucketVersioning { .. } => handle_get_bucket_versioning(),
Endpoint::ListObjects {
bucket,
delimiter,
encoding_type,
marker,
max_keys,
prefix,
} => {
handle_list(
garage,
&ListObjectsQuery {
is_v2: false,
bucket,
delimiter: delimiter.map(|d| d.to_string()),
max_keys: max_keys.unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
marker,
continuation_token: None,
start_after: None,
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
} }
Method::GET => { Endpoint::ListObjectsV2 {
if params.contains_key("location") { bucket,
// GetBucketLocation call delimiter,
Ok(handle_get_bucket_location(garage)?) encoding_type,
} else if params.contains_key("versioning") { max_keys,
// GetBucketVersioning prefix,
Ok(handle_get_bucket_versioning()?) continuation_token,
start_after,
list_type,
..
} => {
if list_type == "2" {
handle_list(
garage,
&ListObjectsQuery {
is_v2: true,
bucket,
delimiter: delimiter.map(|d| d.to_string()),
max_keys: max_keys.unwrap_or(1000),
prefix: prefix.unwrap_or_default(),
marker: None,
continuation_token,
start_after,
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
},
)
.await
} else { } else {
// ListObjects or ListObjectsV2 query Err(Error::BadRequest(format!(
let q = parse_list_objects_query(bucket, &params)?; "Invalid endpoint: list-type={}",
Ok(handle_list(garage, &q).await?) list_type
)))
} }
} }
Method::POST => { Endpoint::DeleteObjects { bucket } => {
if params.contains_key(&"delete".to_string()) { handle_delete_objects(garage, &bucket, req, content_sha256).await
// DeleteObjects
Ok(handle_delete_objects(garage, bucket, req, content_sha256).await?)
} else {
debug!(
"Body: {}",
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
.unwrap_or("<invalid utf8>")
);
Err(Error::BadRequest("Unsupported call".to_string()))
}
}
_ => Err(Error::BadRequest("Invalid method".to_string())),
} }
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
} }
} }

View file

@ -65,6 +65,10 @@ pub enum Error {
/// The client sent an invalid request /// The client sent an invalid request
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),
/// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String),
} }
impl From<roxmltree::Error> for Error { impl From<roxmltree::Error> for Error {
@ -94,6 +98,7 @@ impl Error {
StatusCode::INTERNAL_SERVER_ERROR StatusCode::INTERNAL_SERVER_ERROR
} }
Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE, Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
_ => StatusCode::BAD_REQUEST, _ => StatusCode::BAD_REQUEST,
} }
} }

View file

@ -26,7 +26,7 @@ pub fn host_to_bucket<'a>(host: &'a str, root: &str) -> Option<&'a str> {
/// The HTTP host contains both a host and a port. /// The HTTP host contains both a host and a port.
/// Extracting the port is more complex than just finding the colon (:) symbol due to IPv6 /// Extracting the port is more complex than just finding the colon (:) symbol due to IPv6
/// We do not use the collect pattern as there is no way in std rust to collect over a stack allocated value /// We do not use the collect pattern as there is no way in std rust to collect over a stack allocated value
/// check here: https://docs.rs/collect_slice/1.2.0/collect_slice/ /// check here: <https://docs.rs/collect_slice/1.2.0/collect_slice/>
pub fn authority_to_host(authority: &str) -> Result<String, Error> { pub fn authority_to_host(authority: &str) -> Result<String, Error> {
let mut iter = authority.chars().enumerate(); let mut iter = authority.chars().enumerate();
let (_, first_char) = iter let (_, first_char) = iter

View file

@ -19,4 +19,5 @@ mod s3_delete;
pub mod s3_get; pub mod s3_get;
mod s3_list; mod s3_list;
mod s3_put; mod s3_put;
mod s3_router;
mod s3_xml; mod s3_xml;

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet};
use std::sync::Arc; use std::sync::Arc;
use hyper::{Body, Response}; use hyper::{Body, Response};
@ -35,32 +35,6 @@ struct ListResultInfo {
etag: String, etag: String,
} }
pub fn parse_list_objects_query(
bucket: &str,
params: &HashMap<String, String>,
) -> Result<ListObjectsQuery, Error> {
Ok(ListObjectsQuery {
is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false),
bucket: bucket.to_string(),
delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(),
max_keys: params
.get("max-keys")
.map(|x| {
x.parse::<usize>()
.ok_or_bad_request("Invalid value for max-keys")
})
.unwrap_or(Ok(1000))?,
prefix: params.get("prefix").cloned().unwrap_or_default(),
marker: params.get("marker").cloned(),
continuation_token: params.get("continuation-token").cloned(),
start_after: params.get("start-after").cloned(),
urlencode_resp: params
.get("encoding-type")
.map(|x| x == "url")
.unwrap_or(false),
})
}
pub async fn handle_list( pub async fn handle_list(
garage: Arc<Garage>, garage: Arc<Garage>,
query: &ListObjectsQuery, query: &ListObjectsQuery,

View file

@ -354,15 +354,10 @@ pub async fn handle_put_part(
req: Request<Body>, req: Request<Body>,
bucket: &str, bucket: &str,
key: &str, key: &str,
part_number_str: &str, part_number: u64,
upload_id: &str, upload_id: &str,
content_sha256: Option<Hash>, content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
// Check parameters
let part_number = part_number_str
.parse::<u64>()
.ok_or_bad_request("Invalid part number")?;
let version_uuid = decode_upload_id(upload_id)?; let version_uuid = decode_upload_id(upload_id)?;
let content_md5 = match req.headers().get("content-md5") { let content_md5 = match req.headers().get("content-md5") {

1326
src/api/s3_router.rs Normal file

File diff suppressed because it is too large Load diff