Support for PostObject #222
3 changed files with 93 additions and 104 deletions
|
@ -46,7 +46,7 @@ pub async fn handle_copy(
|
||||||
// Implement x-amz-metadata-directive: REPLACE
|
// Implement x-amz-metadata-directive: REPLACE
|
||||||
let new_meta = match req.headers().get("x-amz-metadata-directive") {
|
let new_meta = match req.headers().get("x-amz-metadata-directive") {
|
||||||
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
|
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
|
||||||
headers: get_headers(req)?,
|
headers: get_headers(req.headers())?,
|
||||||
size: source_version_meta.size,
|
size: source_version_meta.size,
|
||||||
etag: source_version_meta.etag.clone(),
|
etag: source_version_meta.etag.clone(),
|
||||||
},
|
},
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
use std::collections::{BTreeMap, HashMap};
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use hyper::{header, Body, Request, Response, StatusCode};
|
use futures::StreamExt;
|
||||||
|
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
|
||||||
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
use multer::{Constraints, Multipart, SizeLimit};
|
use multer::{Constraints, Multipart, SizeLimit};
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
|
|
||||||
|
@ -9,7 +12,7 @@ use garage_model::garage::Garage;
|
||||||
|
|
||||||
use crate::api_server::resolve_bucket;
|
use crate::api_server::resolve_bucket;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::s3_put::save_stream;
|
use crate::s3_put::{get_headers, save_stream};
|
||||||
use crate::signature::payload::{parse_date, verify_v4};
|
use crate::signature::payload::{parse_date, verify_v4};
|
||||||
|
|
||||||
pub async fn handle_post_object(
|
pub async fn handle_post_object(
|
||||||
|
@ -24,99 +27,72 @@ pub async fn handle_post_object(
|
||||||
.and_then(|ct| multer::parse_boundary(ct).ok())
|
.and_then(|ct| multer::parse_boundary(ct).ok())
|
||||||
.ok_or_bad_request("Counld not get multipart boundary")?;
|
.ok_or_bad_request("Counld not get multipart boundary")?;
|
||||||
|
|
||||||
// these limits are rather arbitrary
|
// 16k seems plenty for a header. 5G is the max size of a single part, so it seemrs reasonable
|
||||||
|
// for a PostObject
|
||||||
let constraints = Constraints::new().size_limit(
|
let constraints = Constraints::new().size_limit(
|
||||||
SizeLimit::new()
|
SizeLimit::new()
|
||||||
.per_field(32 * 1024)
|
.per_field(16 * 1024)
|
||||||
.for_field("file", 5 * 1024 * 1024 * 1024),
|
.for_field("file", 5 * 1024 * 1024 * 1024),
|
||||||
);
|
);
|
||||||
|
|
||||||
let mut multipart = Multipart::with_constraints(req.into_body(), boundary, constraints);
|
let mut multipart = Multipart::with_constraints(req.into_body(), boundary, constraints);
|
||||||
|
|
||||||
let mut headers = BTreeMap::new();
|
let mut params = HeaderMap::new();
|
||||||
let mut credential = None;
|
|
||||||
let mut key = None;
|
|
||||||
let mut policy = None;
|
|
||||||
let mut signature = None;
|
|
||||||
let mut date = None;
|
|
||||||
let mut redirect = Err(204);
|
|
||||||
while let Some(field) = multipart.next_field().await? {
|
while let Some(field) = multipart.next_field().await? {
|
||||||
let name = if let Some(name) = field.name() {
|
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
|
||||||
name.to_owned()
|
name
|
||||||
} else {
|
} else {
|
||||||
continue;
|
continue;
|
||||||
};
|
};
|
||||||
|
|
||||||
if name != "file" {
|
if name != "file" {
|
||||||
let content = field.text().await?;
|
if let Ok(content) = HeaderValue::from_str(&field.text().await?) {
|
||||||
// TODO wouldn't a header map be better?
|
match name.as_str() {
|
||||||
match name.to_ascii_lowercase().as_str() {
|
"tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
|
||||||
// main fields
|
"acl" => {
|
||||||
"key" => {
|
params.append("x-amz-acl", content);
|
||||||
key = Some(content);
|
}
|
||||||
}
|
_ => {
|
||||||
"policy" => {
|
params.append(name, content);
|
||||||
policy = Some(content);
|
}
|
||||||
}
|
|
||||||
"x-amz-credential" => {
|
|
||||||
credential = Some(content);
|
|
||||||
}
|
|
||||||
"x-amz-signature" => {
|
|
||||||
signature = Some(content);
|
|
||||||
}
|
|
||||||
"x-amz-date" => {
|
|
||||||
date = Some(content);
|
|
||||||
}
|
|
||||||
// special handling
|
|
||||||
"success_action_redirect" | "redirect" => {
|
|
||||||
// TODO should verify it's a valid looking URI
|
|
||||||
redirect = Ok(content);
|
|
||||||
}
|
|
||||||
"success_action_status" => {
|
|
||||||
let code = name.parse::<u16>().unwrap_or(204);
|
|
||||||
redirect = Err(code);
|
|
||||||
}
|
|
||||||
"tagging" => {
|
|
||||||
// TODO Garage does not support tagging so this can be left empty. It's essentially
|
|
||||||
// a header except it must be parsed from xml to x-www-form-urlencoded
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
// headers to PutObject
|
|
||||||
"cache-control" | "content-type" | "content-encoding" | "expires" => {
|
|
||||||
headers.insert(name, content);
|
|
||||||
}
|
|
||||||
"acl" => {
|
|
||||||
headers.insert("x-amz-acl".to_owned(), content);
|
|
||||||
}
|
|
||||||
_ if name.starts_with("x-amz-") => {
|
|
||||||
headers.insert(name, content);
|
|
||||||
}
|
|
||||||
_ => {
|
|
||||||
// TODO should we ignore, error or process?
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Current part is file. Do some checks before handling to PutObject code
|
// Current part is file. Do some checks before handling to PutObject code
|
||||||
let credential = credential.ok_or_else(|| {
|
let key = params
|
||||||
Error::Forbidden("Garage does not support anonymous access yet".to_string())
|
.get("key")
|
||||||
})?;
|
.ok_or_bad_request("No key was provided")?
|
||||||
let policy = policy.ok_or_bad_request("No policy was provided")?;
|
.to_str()?;
|
||||||
let signature = signature.ok_or_bad_request("No signature was provided")?;
|
let credential = params
|
||||||
let date = date.ok_or_bad_request("No date was provided")?;
|
.get("x-amz-credential")
|
||||||
let key = key.ok_or_bad_request("No key was provided")?;
|
.ok_or_else(|| {
|
||||||
|
Error::Forbidden("Garage does not support anonymous access yet".to_string())
|
||||||
|
})?
|
||||||
|
.to_str()?;
|
||||||
|
let policy = params
|
||||||
|
.get("policy")
|
||||||
|
.ok_or_bad_request("No policy was provided")?
|
||||||
|
.to_str()?;
|
||||||
|
let signature = params
|
||||||
|
.get("x-amz-signature")
|
||||||
|
.ok_or_bad_request("No signature was provided")?
|
||||||
|
.to_str()?;
|
||||||
|
let date = params
|
||||||
|
.get("x-amz-date")
|
||||||
|
.ok_or_bad_request("No date was provided")?
|
||||||
|
.to_str()?;
|
||||||
|
|
||||||
let key = if key.contains("${filename}") {
|
let key = if key.contains("${filename}") {
|
||||||
let filename = field.file_name();
|
let filename = field.file_name();
|
||||||
// is this correct? Maybe we should error instead of default?
|
// is this correct? Maybe we should error instead of default?
|
||||||
key.replace("${filename}", filename.unwrap_or_default())
|
key.replace("${filename}", filename.unwrap_or_default())
|
||||||
} else {
|
} else {
|
||||||
key
|
key.to_owned()
|
||||||
};
|
};
|
||||||
|
|
||||||
let date = parse_date(&date)?;
|
let date = parse_date(date)?;
|
||||||
let api_key = verify_v4(&garage, &credential, &date, &signature, policy.as_bytes()).await?;
|
let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?;
|
||||||
|
|
||||||
let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
|
let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
|
||||||
|
|
||||||
|
@ -128,19 +104,21 @@ pub async fn handle_post_object(
|
||||||
|
|
||||||
let decoded_policy = base64::decode(&policy)?;
|
let decoded_policy = base64::decode(&policy)?;
|
||||||
let _decoded_policy: Policy = serde_json::from_slice(&decoded_policy).unwrap();
|
let _decoded_policy: Policy = serde_json::from_slice(&decoded_policy).unwrap();
|
||||||
|
|
||||||
// TODO validate policy against request
|
// TODO validate policy against request
|
||||||
// unsafe to merge until implemented
|
// unsafe to merge until implemented
|
||||||
|
|
||||||
let content_type = field
|
let content_type = field
|
||||||
.content_type()
|
.content_type()
|
||||||
.map(ToString::to_string)
|
.map(AsRef::as_ref)
|
||||||
.unwrap_or_else(|| "blob".to_owned());
|
.map(HeaderValue::from_str)
|
||||||
let headers = garage_model::object_table::ObjectVersionHeaders {
|
.transpose()
|
||||||
content_type,
|
.ok_or_bad_request("Invalid content type")?
|
||||||
other: headers,
|
.unwrap_or_else(|| HeaderValue::from_static("blob"));
|
||||||
};
|
|
||||||
|
params.append(header::CONTENT_TYPE, content_type);
|
||||||
|
let headers = get_headers(¶ms)?;
|
||||||
|
|
||||||
use futures::StreamExt;
|
|
||||||
let res = save_stream(
|
let res = save_stream(
|
||||||
garage,
|
garage,
|
||||||
headers,
|
headers,
|
||||||
|
@ -152,24 +130,35 @@ pub async fn handle_post_object(
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let resp = match redirect {
|
let resp = if let Some(target) = params
|
||||||
Err(200) => Response::builder()
|
.get("success_action_redirect")
|
||||||
.status(StatusCode::OK)
|
.and_then(|h| h.to_str().ok())
|
||||||
.body(Body::empty())?,
|
{
|
||||||
Err(201) => {
|
// TODO should validate it's a valid url
|
||||||
// TODO body should be an XML document, not sure which yet
|
let target = target.to_owned();
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.status(StatusCode::CREATED)
|
|
||||||
.body(res.into_body())?
|
|
||||||
}
|
|
||||||
// invalid codes are handled as 204
|
|
||||||
Err(_) => Response::builder()
|
|
||||||
.status(StatusCode::NO_CONTENT)
|
|
||||||
.body(Body::empty())?,
|
|
||||||
Ok(uri) => Response::builder()
|
|
||||||
.status(StatusCode::SEE_OTHER)
|
.status(StatusCode::SEE_OTHER)
|
||||||
.header(header::LOCATION, uri.clone())
|
.header(header::LOCATION, target.clone())
|
||||||
.body(uri.into())?,
|
.body(target.into())?
|
||||||
|
} else {
|
||||||
|
let action = params
|
||||||
|
.get("success_action_status")
|
||||||
|
.and_then(|h| h.to_str().ok())
|
||||||
|
.unwrap_or("204");
|
||||||
|
match action {
|
||||||
|
"200" => Response::builder()
|
||||||
|
.status(StatusCode::OK)
|
||||||
|
.body(Body::empty())?,
|
||||||
|
"201" => {
|
||||||
|
// TODO body should be an XML document, not sure which yet
|
||||||
|
Response::builder()
|
||||||
|
.status(StatusCode::CREATED)
|
||||||
|
.body(res.into_body())?
|
||||||
|
}
|
||||||
|
_ => Response::builder()
|
||||||
|
.status(StatusCode::NO_CONTENT)
|
||||||
|
.body(Body::empty())?,
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
return Ok(resp);
|
return Ok(resp);
|
||||||
|
|
|
@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
use futures::{prelude::*, TryFutureExt};
|
use futures::{prelude::*, TryFutureExt};
|
||||||
use hyper::body::{Body, Bytes};
|
use hyper::body::{Body, Bytes};
|
||||||
|
use hyper::header::{HeaderMap, HeaderValue};
|
||||||
use hyper::{Request, Response};
|
use hyper::{Request, Response};
|
||||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
@ -35,7 +36,7 @@ pub async fn handle_put(
|
||||||
mut content_sha256: Option<Hash>,
|
mut content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
// Retrieve interesting headers from request
|
// Retrieve interesting headers from request
|
||||||
let headers = get_headers(&req)?;
|
let headers = get_headers(req.headers())?;
|
||||||
debug!("Object headers: {:?}", headers);
|
debug!("Object headers: {:?}", headers);
|
||||||
|
|
||||||
let content_md5 = match req.headers().get("content-md5") {
|
let content_md5 = match req.headers().get("content-md5") {
|
||||||
|
@ -394,7 +395,7 @@ pub async fn handle_create_multipart_upload(
|
||||||
key: &str,
|
key: &str,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
let headers = get_headers(req)?;
|
let headers = get_headers(req.headers())?;
|
||||||
|
|
||||||
// Create object in object table
|
// Create object in object table
|
||||||
let object_version = ObjectVersion {
|
let object_version = ObjectVersion {
|
||||||
|
@ -693,17 +694,16 @@ pub async fn handle_abort_multipart_upload(
|
||||||
Ok(Response::new(Body::from(vec![])))
|
Ok(Response::new(Body::from(vec![])))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
|
fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
|
||||||
Ok(req
|
Ok(headers
|
||||||
.headers()
|
|
||||||
.get(hyper::header::CONTENT_TYPE)
|
.get(hyper::header::CONTENT_TYPE)
|
||||||
.map(|x| x.to_str())
|
.map(|x| x.to_str())
|
||||||
.unwrap_or(Ok("blob"))?
|
.unwrap_or(Ok("blob"))?
|
||||||
.to_string())
|
.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
|
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
|
||||||
let content_type = get_mime_type(req)?;
|
let content_type = get_mime_type(headers)?;
|
||||||
let mut other = BTreeMap::new();
|
let mut other = BTreeMap::new();
|
||||||
|
|
||||||
// Preserve standard headers
|
// Preserve standard headers
|
||||||
|
@ -715,7 +715,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
|
||||||
hyper::header::EXPIRES,
|
hyper::header::EXPIRES,
|
||||||
];
|
];
|
||||||
for h in standard_header.iter() {
|
for h in standard_header.iter() {
|
||||||
if let Some(v) = req.headers().get(h) {
|
if let Some(v) = headers.get(h) {
|
||||||
match v.to_str() {
|
match v.to_str() {
|
||||||
Ok(v_str) => {
|
Ok(v_str) => {
|
||||||
other.insert(h.to_string(), v_str.to_string());
|
other.insert(h.to_string(), v_str.to_string());
|
||||||
|
@ -728,7 +728,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
|
||||||
}
|
}
|
||||||
|
|
||||||
// Preserve x-amz-meta- headers
|
// Preserve x-amz-meta- headers
|
||||||
for (k, v) in req.headers().iter() {
|
for (k, v) in headers.iter() {
|
||||||
if k.as_str().starts_with("x-amz-meta-") {
|
if k.as_str().starts_with("x-amz-meta-") {
|
||||||
match v.to_str() {
|
match v.to_str() {
|
||||||
Ok(v_str) => {
|
Ok(v_str) => {
|
||||||
|
|
Loading…
Reference in a new issue