forked from Deuxfleurs/garage
add proper request router for s3 api (#163)
fix #161 Current request router was organically grown, and is getting messier and messier with each addition. This router cover exaustively existing API endpoints (with exceptions listed in [#161(comment)](Deuxfleurs/garage#161 (comment)) either because new and old api endpoint can't feasabily be differentied, or it's more lambda than s3). Co-authored-by: Trinity Pointard <trinity.pointard@gmail.com> Reviewed-on: Deuxfleurs/garage#163 Reviewed-by: Alex <alex@adnab.me> Co-authored-by: trinity-1686a <trinity.pointard@gmail.com> Co-committed-by: trinity-1686a <trinity.pointard@gmail.com>
This commit is contained in:
parent
ccce75bc25
commit
c4ac8835d3
7 changed files with 1476 additions and 185 deletions
|
@ -1,4 +1,3 @@
|
|||
use std::collections::HashMap;
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
|
@ -6,7 +5,7 @@ use futures::future::Future;
|
|||
use hyper::header;
|
||||
use hyper::server::conn::AddrStream;
|
||||
use hyper::service::{make_service_fn, service_fn};
|
||||
use hyper::{Body, Method, Request, Response, Server};
|
||||
use hyper::{Body, Request, Response, Server};
|
||||
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
|
@ -22,6 +21,7 @@ use crate::s3_delete::*;
|
|||
use crate::s3_get::*;
|
||||
use crate::s3_list::*;
|
||||
use crate::s3_put::*;
|
||||
use crate::s3_router::{Authorization, Endpoint};
|
||||
|
||||
/// Run the S3 API server
|
||||
pub async fn run_api_server(
|
||||
|
@ -86,8 +86,6 @@ async fn handler(
|
|||
}
|
||||
|
||||
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||
let path = req.uri().path().to_string();
|
||||
let path = percent_encoding::percent_decode_str(&path).decode_utf8()?;
|
||||
let (api_key, content_sha256) = check_signature(&garage, &req).await?;
|
||||
|
||||
let authority = req
|
||||
|
@ -105,166 +103,158 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
|||
.as_ref()
|
||||
.and_then(|root_domain| host_to_bucket(&host, root_domain));
|
||||
|
||||
if path == "/" && bucket.is_none() {
|
||||
return handle_list_buckets(&api_key);
|
||||
}
|
||||
|
||||
let (bucket, key) = parse_bucket_key(&path, bucket)?;
|
||||
let allowed = match req.method() {
|
||||
&Method::HEAD | &Method::GET => api_key.allow_read(bucket),
|
||||
_ => api_key.allow_write(bucket),
|
||||
let endpoint = Endpoint::from_request(&req, bucket.map(ToOwned::to_owned))?;
|
||||
let allowed = match endpoint.authorization_type() {
|
||||
Authorization::None => true,
|
||||
Authorization::Read(bucket) => api_key.allow_read(bucket),
|
||||
Authorization::Write(bucket) => api_key.allow_write(bucket),
|
||||
};
|
||||
|
||||
if !allowed {
|
||||
return Err(Error::Forbidden(
|
||||
"Operation is not allowed for this key.".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut params = HashMap::new();
|
||||
if let Some(query) = req.uri().query() {
|
||||
let query_pairs = url::form_urlencoded::parse(query.as_bytes());
|
||||
for (key, val) in query_pairs {
|
||||
params.insert(key.to_lowercase(), val.to_string());
|
||||
match endpoint {
|
||||
Endpoint::ListBuckets => handle_list_buckets(&api_key),
|
||||
Endpoint::HeadObject { bucket, key, .. } => handle_head(garage, &req, &bucket, &key).await,
|
||||
Endpoint::GetObject { bucket, key, .. } => handle_get(garage, &req, &bucket, &key).await,
|
||||
Endpoint::UploadPart {
|
||||
bucket,
|
||||
key,
|
||||
part_number,
|
||||
upload_id,
|
||||
} => {
|
||||
handle_put_part(
|
||||
garage,
|
||||
req,
|
||||
&bucket,
|
||||
&key,
|
||||
part_number,
|
||||
&upload_id,
|
||||
content_sha256,
|
||||
)
|
||||
.await
|
||||
}
|
||||
}
|
||||
|
||||
if let Some(key) = key {
|
||||
match *req.method() {
|
||||
Method::HEAD => {
|
||||
// HeadObject query
|
||||
Ok(handle_head(garage, &req, bucket, key).await?)
|
||||
Endpoint::CopyObject { bucket, key } => {
|
||||
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
||||
let copy_source = percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
|
||||
let (source_bucket, source_key) = parse_bucket_key(©_source, None)?;
|
||||
if !api_key.allow_read(source_bucket) {
|
||||
return Err(Error::Forbidden(format!(
|
||||
"Reading from bucket {} not allowed for this key",
|
||||
source_bucket
|
||||
)));
|
||||
}
|
||||
Method::GET => {
|
||||
// GetObject query
|
||||
Ok(handle_get(garage, &req, bucket, key).await?)
|
||||
}
|
||||
Method::PUT => {
|
||||
if params.contains_key(&"partnumber".to_string())
|
||||
&& params.contains_key(&"uploadid".to_string())
|
||||
{
|
||||
// UploadPart query
|
||||
let part_number = params.get("partnumber").unwrap();
|
||||
let upload_id = params.get("uploadid").unwrap();
|
||||
Ok(handle_put_part(
|
||||
garage,
|
||||
req,
|
||||
let source_key = source_key.ok_or_bad_request("No source key specified")?;
|
||||
handle_copy(garage, &req, &bucket, &key, source_bucket, source_key).await
|
||||
}
|
||||
Endpoint::PutObject { bucket, key } => {
|
||||
handle_put(garage, req, &bucket, &key, content_sha256).await
|
||||
}
|
||||
Endpoint::AbortMultipartUpload {
|
||||
bucket,
|
||||
key,
|
||||
upload_id,
|
||||
} => handle_abort_multipart_upload(garage, &bucket, &key, &upload_id).await,
|
||||
Endpoint::DeleteObject { bucket, key, .. } => handle_delete(garage, &bucket, &key).await,
|
||||
Endpoint::CreateMultipartUpload { bucket, key } => {
|
||||
handle_create_multipart_upload(garage, &req, &bucket, &key).await
|
||||
}
|
||||
Endpoint::CompleteMultipartUpload {
|
||||
bucket,
|
||||
key,
|
||||
upload_id,
|
||||
} => {
|
||||
handle_complete_multipart_upload(garage, req, &bucket, &key, &upload_id, content_sha256)
|
||||
.await
|
||||
}
|
||||
Endpoint::CreateBucket { bucket } => {
|
||||
debug!(
|
||||
"Body: {}",
|
||||
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
|
||||
.unwrap_or("<invalid utf8>")
|
||||
);
|
||||
let empty_body: Body = Body::from(vec![]);
|
||||
let response = Response::builder()
|
||||
.header("Location", format!("/{}", bucket))
|
||||
.body(empty_body)
|
||||
.unwrap();
|
||||
Ok(response)
|
||||
}
|
||||
Endpoint::HeadBucket { .. } => {
|
||||
let empty_body: Body = Body::from(vec![]);
|
||||
let response = Response::builder().body(empty_body).unwrap();
|
||||
Ok(response)
|
||||
}
|
||||
Endpoint::DeleteBucket { .. } => Err(Error::Forbidden(
|
||||
"Cannot delete buckets using S3 api, please talk to Garage directly".into(),
|
||||
)),
|
||||
Endpoint::GetBucketLocation { .. } => handle_get_bucket_location(garage),
|
||||
Endpoint::GetBucketVersioning { .. } => handle_get_bucket_versioning(),
|
||||
Endpoint::ListObjects {
|
||||
bucket,
|
||||
delimiter,
|
||||
encoding_type,
|
||||
marker,
|
||||
max_keys,
|
||||
prefix,
|
||||
} => {
|
||||
handle_list(
|
||||
garage,
|
||||
&ListObjectsQuery {
|
||||
is_v2: false,
|
||||
bucket,
|
||||
delimiter: delimiter.map(|d| d.to_string()),
|
||||
max_keys: max_keys.unwrap_or(1000),
|
||||
prefix: prefix.unwrap_or_default(),
|
||||
marker,
|
||||
continuation_token: None,
|
||||
start_after: None,
|
||||
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
Endpoint::ListObjectsV2 {
|
||||
bucket,
|
||||
delimiter,
|
||||
encoding_type,
|
||||
max_keys,
|
||||
prefix,
|
||||
continuation_token,
|
||||
start_after,
|
||||
list_type,
|
||||
..
|
||||
} => {
|
||||
if list_type == "2" {
|
||||
handle_list(
|
||||
garage,
|
||||
&ListObjectsQuery {
|
||||
is_v2: true,
|
||||
bucket,
|
||||
key,
|
||||
part_number,
|
||||
upload_id,
|
||||
content_sha256,
|
||||
)
|
||||
.await?)
|
||||
} else if req.headers().contains_key("x-amz-copy-source") {
|
||||
// CopyObject query
|
||||
let copy_source = req.headers().get("x-amz-copy-source").unwrap().to_str()?;
|
||||
let copy_source =
|
||||
percent_encoding::percent_decode_str(copy_source).decode_utf8()?;
|
||||
let (source_bucket, source_key) = parse_bucket_key(©_source, None)?;
|
||||
if !api_key.allow_read(source_bucket) {
|
||||
return Err(Error::Forbidden(format!(
|
||||
"Reading from bucket {} not allowed for this key",
|
||||
source_bucket
|
||||
)));
|
||||
}
|
||||
let source_key = source_key.ok_or_bad_request("No source key specified")?;
|
||||
Ok(handle_copy(garage, &req, bucket, key, source_bucket, source_key).await?)
|
||||
} else {
|
||||
// PutObject query
|
||||
Ok(handle_put(garage, req, bucket, key, content_sha256).await?)
|
||||
}
|
||||
delimiter: delimiter.map(|d| d.to_string()),
|
||||
max_keys: max_keys.unwrap_or(1000),
|
||||
prefix: prefix.unwrap_or_default(),
|
||||
marker: None,
|
||||
continuation_token,
|
||||
start_after,
|
||||
urlencode_resp: encoding_type.map(|e| e == "url").unwrap_or(false),
|
||||
},
|
||||
)
|
||||
.await
|
||||
} else {
|
||||
Err(Error::BadRequest(format!(
|
||||
"Invalid endpoint: list-type={}",
|
||||
list_type
|
||||
)))
|
||||
}
|
||||
Method::DELETE => {
|
||||
if params.contains_key(&"uploadid".to_string()) {
|
||||
// AbortMultipartUpload query
|
||||
let upload_id = params.get("uploadid").unwrap();
|
||||
Ok(handle_abort_multipart_upload(garage, bucket, key, upload_id).await?)
|
||||
} else {
|
||||
// DeleteObject query
|
||||
Ok(handle_delete(garage, bucket, key).await?)
|
||||
}
|
||||
}
|
||||
Method::POST => {
|
||||
if params.contains_key(&"uploads".to_string()) {
|
||||
// CreateMultipartUpload call
|
||||
Ok(handle_create_multipart_upload(garage, &req, bucket, key).await?)
|
||||
} else if params.contains_key(&"uploadid".to_string()) {
|
||||
// CompleteMultipartUpload call
|
||||
let upload_id = params.get("uploadid").unwrap();
|
||||
Ok(handle_complete_multipart_upload(
|
||||
garage,
|
||||
req,
|
||||
bucket,
|
||||
key,
|
||||
upload_id,
|
||||
content_sha256,
|
||||
)
|
||||
.await?)
|
||||
} else {
|
||||
Err(Error::BadRequest(
|
||||
"Not a CreateMultipartUpload call, what is it?".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
_ => Err(Error::BadRequest("Invalid method".to_string())),
|
||||
}
|
||||
} else {
|
||||
match *req.method() {
|
||||
Method::PUT => {
|
||||
// CreateBucket
|
||||
// If we're here, the bucket already exists, so just answer ok
|
||||
debug!(
|
||||
"Body: {}",
|
||||
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
|
||||
.unwrap_or("<invalid utf8>")
|
||||
);
|
||||
let empty_body: Body = Body::from(vec![]);
|
||||
let response = Response::builder()
|
||||
.header("Location", format!("/{}", bucket))
|
||||
.body(empty_body)
|
||||
.unwrap();
|
||||
Ok(response)
|
||||
}
|
||||
Method::HEAD => {
|
||||
// HeadBucket
|
||||
let empty_body: Body = Body::from(vec![]);
|
||||
let response = Response::builder().body(empty_body).unwrap();
|
||||
Ok(response)
|
||||
}
|
||||
Method::DELETE => {
|
||||
// DeleteBucket query
|
||||
Err(Error::Forbidden(
|
||||
"Cannot delete buckets using S3 api, please talk to Garage directly".into(),
|
||||
))
|
||||
}
|
||||
Method::GET => {
|
||||
if params.contains_key("location") {
|
||||
// GetBucketLocation call
|
||||
Ok(handle_get_bucket_location(garage)?)
|
||||
} else if params.contains_key("versioning") {
|
||||
// GetBucketVersioning
|
||||
Ok(handle_get_bucket_versioning()?)
|
||||
} else {
|
||||
// ListObjects or ListObjectsV2 query
|
||||
let q = parse_list_objects_query(bucket, ¶ms)?;
|
||||
Ok(handle_list(garage, &q).await?)
|
||||
}
|
||||
}
|
||||
Method::POST => {
|
||||
if params.contains_key(&"delete".to_string()) {
|
||||
// DeleteObjects
|
||||
Ok(handle_delete_objects(garage, bucket, req, content_sha256).await?)
|
||||
} else {
|
||||
debug!(
|
||||
"Body: {}",
|
||||
std::str::from_utf8(&hyper::body::to_bytes(req.into_body()).await?)
|
||||
.unwrap_or("<invalid utf8>")
|
||||
);
|
||||
Err(Error::BadRequest("Unsupported call".to_string()))
|
||||
}
|
||||
}
|
||||
_ => Err(Error::BadRequest("Invalid method".to_string())),
|
||||
Endpoint::DeleteObjects { bucket } => {
|
||||
handle_delete_objects(garage, &bucket, req, content_sha256).await
|
||||
}
|
||||
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -65,6 +65,10 @@ pub enum Error {
|
|||
/// The client sent an invalid request
|
||||
#[error(display = "Bad request: {}", _0)]
|
||||
BadRequest(String),
|
||||
|
||||
/// The client sent a request for an action not supported by garage
|
||||
#[error(display = "Unimplemented action: {}", _0)]
|
||||
NotImplemented(String),
|
||||
}
|
||||
|
||||
impl From<roxmltree::Error> for Error {
|
||||
|
@ -94,6 +98,7 @@ impl Error {
|
|||
StatusCode::INTERNAL_SERVER_ERROR
|
||||
}
|
||||
Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
|
||||
Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
|
||||
_ => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ pub fn host_to_bucket<'a>(host: &'a str, root: &str) -> Option<&'a str> {
|
|||
/// The HTTP host contains both a host and a port.
|
||||
/// Extracting the port is more complex than just finding the colon (:) symbol due to IPv6
|
||||
/// We do not use the collect pattern as there is no way in std rust to collect over a stack allocated value
|
||||
/// check here: https://docs.rs/collect_slice/1.2.0/collect_slice/
|
||||
/// check here: <https://docs.rs/collect_slice/1.2.0/collect_slice/>
|
||||
pub fn authority_to_host(authority: &str) -> Result<String, Error> {
|
||||
let mut iter = authority.chars().enumerate();
|
||||
let (_, first_char) = iter
|
||||
|
|
|
@ -19,4 +19,5 @@ mod s3_delete;
|
|||
pub mod s3_get;
|
||||
mod s3_list;
|
||||
mod s3_put;
|
||||
mod s3_router;
|
||||
mod s3_xml;
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||
use std::collections::{BTreeMap, BTreeSet};
|
||||
use std::sync::Arc;
|
||||
|
||||
use hyper::{Body, Response};
|
||||
|
@ -35,32 +35,6 @@ struct ListResultInfo {
|
|||
etag: String,
|
||||
}
|
||||
|
||||
pub fn parse_list_objects_query(
|
||||
bucket: &str,
|
||||
params: &HashMap<String, String>,
|
||||
) -> Result<ListObjectsQuery, Error> {
|
||||
Ok(ListObjectsQuery {
|
||||
is_v2: params.get("list-type").map(|x| x == "2").unwrap_or(false),
|
||||
bucket: bucket.to_string(),
|
||||
delimiter: params.get("delimiter").filter(|x| !x.is_empty()).cloned(),
|
||||
max_keys: params
|
||||
.get("max-keys")
|
||||
.map(|x| {
|
||||
x.parse::<usize>()
|
||||
.ok_or_bad_request("Invalid value for max-keys")
|
||||
})
|
||||
.unwrap_or(Ok(1000))?,
|
||||
prefix: params.get("prefix").cloned().unwrap_or_default(),
|
||||
marker: params.get("marker").cloned(),
|
||||
continuation_token: params.get("continuation-token").cloned(),
|
||||
start_after: params.get("start-after").cloned(),
|
||||
urlencode_resp: params
|
||||
.get("encoding-type")
|
||||
.map(|x| x == "url")
|
||||
.unwrap_or(false),
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn handle_list(
|
||||
garage: Arc<Garage>,
|
||||
query: &ListObjectsQuery,
|
||||
|
|
|
@ -354,15 +354,10 @@ pub async fn handle_put_part(
|
|||
req: Request<Body>,
|
||||
bucket: &str,
|
||||
key: &str,
|
||||
part_number_str: &str,
|
||||
part_number: u64,
|
||||
upload_id: &str,
|
||||
content_sha256: Option<Hash>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
// Check parameters
|
||||
let part_number = part_number_str
|
||||
.parse::<u64>()
|
||||
.ok_or_bad_request("Invalid part number")?;
|
||||
|
||||
let version_uuid = decode_upload_id(upload_id)?;
|
||||
|
||||
let content_md5 = match req.headers().get("content-md5") {
|
||||
|
|
1326
src/api/s3_router.rs
Normal file
1326
src/api/s3_router.rs
Normal file
File diff suppressed because it is too large
Load diff
Loading…
Reference in a new issue