Support for PostObject #222
6 changed files with 178 additions and 32 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -721,6 +721,7 @@ dependencies = [
|
|||
"roxmltree",
|
||||
"serde",
|
||||
"serde_bytes",
|
||||
"serde_json",
|
||||
"sha2",
|
||||
"tokio",
|
||||
"url",
|
||||
|
|
|
@ -45,5 +45,6 @@ percent-encoding = "2.1.0"
|
|||
roxmltree = "0.14"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_bytes = "0.11"
|
||||
serde_json = "1.0"
|
||||
quick-xml = { version = "0.21", features = [ "serialize" ] }
|
||||
url = "2.1"
|
||||
|
|
|
@ -126,6 +126,12 @@ impl From<HelperError> for Error {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<multer::Error> for Error {
|
||||
fn from(err: multer::Error) -> Self {
|
||||
Self::BadRequest(err.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl Error {
|
||||
/// Get the HTTP status code that best represents the meaning of the error for the client
|
||||
pub fn http_status_code(&self) -> StatusCode {
|
||||
|
|
|
@ -1,13 +1,19 @@
|
|||
use std::collections::HashMap;
|
||||
use std::collections::{BTreeMap, HashMap};
|
||||
use std::sync::Arc;
|
||||
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use hmac::Mac;
|
||||
use hyper::{header, Body, Request, Response, StatusCode};
|
||||
use multer::{Constraints, Multipart, SizeLimit};
|
||||
use serde::Deserialize;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
|
||||
use crate::error::Error;
|
||||
|
||||
use multer::{Constraints, Multipart, SizeLimit};
|
||||
use crate::api_server::resolve_bucket;
|
||||
use crate::error::*;
|
||||
use crate::s3_put::save_stream;
|
||||
use crate::signature::payload::parse_credential;
|
||||
use crate::signature::{signing_hmac, LONG_DATETIME};
|
||||
|
||||
pub async fn handle_post_object(
|
||||
garage: Arc<Garage>,
|
||||
|
@ -19,7 +25,7 @@ pub async fn handle_post_object(
|
|||
.get(header::CONTENT_TYPE)
|
||||
.and_then(|ct| ct.to_str().ok())
|
||||
.and_then(|ct| multer::parse_boundary(ct).ok())
|
||||
.ok_or_else(|| Error::BadRequest("Counld not get multipart boundary".to_owned()))?;
|
||||
.ok_or_bad_request("Counld not get multipart boundary")?;
|
||||
|
||||
// these limits are rather arbitrary
|
||||
let constraints = Constraints::new().size_limit(
|
||||
|
@ -30,12 +36,14 @@ pub async fn handle_post_object(
|
|||
|
||||
let mut multipart = Multipart::with_constraints(req.into_body(), boundary, constraints);
|
||||
|
||||
let mut headers = HashMap::new();
|
||||
let mut key_id = None;
|
||||
let mut headers = BTreeMap::new();
|
||||
let mut credential = None;
|
||||
let mut key = None;
|
||||
let mut policy = None;
|
||||
let mut signature = None;
|
||||
let mut date = None;
|
||||
let mut redirect = Err(204);
|
||||
while let Some(mut field) = multipart.next_field().await.unwrap() {
|
||||
while let Some(field) = multipart.next_field().await? {
|
||||
let name = if let Some(name) = field.name() {
|
||||
name.to_owned()
|
||||
} else {
|
||||
|
@ -43,18 +51,25 @@ pub async fn handle_post_object(
|
|||
};
|
||||
|
||||
if name != "file" {
|
||||
let content = field.text().await.unwrap();
|
||||
match name.as_str() {
|
||||
let content = field.text().await?;
|
||||
// TODO wouldn't a header map be better?
|
||||
match name.to_ascii_lowercase().as_str() {
|
||||
// main fields
|
||||
"AWSAccessKeyId" => {
|
||||
key_id = Some(content);
|
||||
}
|
||||
"key" => {
|
||||
key = Some(content);
|
||||
}
|
||||
"policy" => {
|
||||
policy = Some(content);
|
||||
}
|
||||
"x-amz-credential" => {
|
||||
credential = Some(content);
|
||||
}
|
||||
"x-amz-signature" => {
|
||||
signature = Some(content);
|
||||
}
|
||||
"x-amz-date" => {
|
||||
date = Some(content);
|
||||
}
|
||||
// special handling
|
||||
"success_action_redirect" | "redirect" => {
|
||||
// TODO should verify it's a valid looking URI
|
||||
|
@ -70,44 +85,118 @@ pub async fn handle_post_object(
|
|||
continue;
|
||||
}
|
||||
// headers to PutObject
|
||||
"acl" | "Cache-Control" | "Content-Type" | "Content-Encoding" | "Expires" => {
|
||||
"cache-control" | "content-type" | "content-encoding" | "expires" => {
|
||||
headers.insert(name, content);
|
||||
}
|
||||
"acl" => {
|
||||
headers.insert("x-amz-acl".to_owned(), content);
|
||||
}
|
||||
_ if name.starts_with("x-amz-") => {
|
||||
headers.insert(name, content);
|
||||
}
|
||||
_ => {
|
||||
// TODO should we ignore or error?
|
||||
// TODO should we ignore, error or process?
|
||||
}
|
||||
}
|
||||
continue;
|
||||
}
|
||||
|
||||
let _file_name = field.file_name();
|
||||
let _content_type = field.content_type();
|
||||
while let Some(_chunk) = field.chunk().await.unwrap() {}
|
||||
// Current part is file. Do some checks before handling to PutObject code
|
||||
let credential = credential.ok_or_else(|| {
|
||||
Error::Forbidden("Garage does not support anonymous access yet".to_string())
|
||||
})?;
|
||||
let policy = policy.ok_or_bad_request("No policy was provided")?;
|
||||
let signature = signature.ok_or_bad_request("No signature was provided")?;
|
||||
let date = date.ok_or_bad_request("No date was provided")?;
|
||||
let key = key.ok_or_bad_request("No key was provided")?;
|
||||
|
||||
let key = if key.contains("${filename}") {
|
||||
let filename = field.file_name();
|
||||
// is this correct? Maybe we should error instead of default?
|
||||
key.replace("${filename}", &filename.unwrap_or_default())
|
||||
} else {
|
||||
key
|
||||
};
|
||||
|
||||
// TODO verify scope against bucket&date?
|
||||
let (key_id, scope) = parse_credential(&credential)?;
|
||||
// TODO duplicated from signature/*
|
||||
let date: NaiveDateTime = NaiveDateTime::parse_from_str(&date, LONG_DATETIME)
|
||||
.ok_or_bad_request("invalid date")?;
|
||||
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
||||
|
||||
// TODO counldn't this be a garage.get_key?
|
||||
let api_key = garage
|
||||
.key_table
|
||||
.get(&garage_table::EmptyKey, &key_id)
|
||||
.await?
|
||||
.filter(|k| !k.state.is_deleted())
|
||||
.ok_or_else(|| Error::Forbidden(format!("No such key: {}", key_id)))?;
|
||||
|
||||
// TODO duplicated from signature/*
|
||||
let key_p = api_key.params().unwrap();
|
||||
let secret_key = &key_p.secret_key;
|
||||
|
||||
let mut hmac = signing_hmac(&date, secret_key, &garage.config.s3_api.s3_region, "s3")
|
||||
.ok_or_internal_error("Unable to build signing HMAC")?;
|
||||
hmac.update(policy.as_bytes());
|
||||
let our_signature = hex::encode(hmac.finalize().into_bytes());
|
||||
if signature != our_signature {
|
||||
return Err(Error::Forbidden("Invalid signature".to_string()));
|
||||
}
|
||||
|
||||
let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
|
||||
|
||||
if !api_key.allow_write(&bucket_id) {
|
||||
return Err(Error::Forbidden(
|
||||
"Operation is not allowed for this key.".to_string(),
|
||||
));
|
||||
}
|
||||
|
||||
let decoded_policy = base64::decode(&policy)?;
|
||||
let _decoded_policy: Policy = serde_json::from_slice(&decoded_policy).unwrap();
|
||||
// TODO validate policy against request
|
||||
// unsafe to merge until implemented
|
||||
|
||||
let content_type = field
|
||||
.content_type()
|
||||
.map(ToString::to_string)
|
||||
.unwrap_or_else(|| "blob".to_owned());
|
||||
let headers = garage_model::object_table::ObjectVersionHeaders {
|
||||
content_type,
|
||||
other: headers,
|
||||
};
|
||||
|
||||
use futures::StreamExt;
|
||||
let res = save_stream(
|
||||
garage,
|
||||
headers,
|
||||
field.map(|r| r.map_err(Into::into)),
|
||||
bucket_id,
|
||||
&key,
|
||||
None,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let resp = match redirect {
|
||||
Err(200) => Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(Body::empty())?,
|
||||
Err(201) => {
|
||||
// body should be an XML document, not sure which yet
|
||||
// TODO body should be an XML document, not sure which yet
|
||||
Response::builder()
|
||||
.status(StatusCode::CREATED)
|
||||
.body(todo!())?
|
||||
.body(res.into_body())?
|
||||
}
|
||||
// invalid codes are handled as 204
|
||||
Err(_) => Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(Body::empty())?,
|
||||
Ok(uri) => {
|
||||
// TODO maybe body should contain a link to the ressource?
|
||||
Response::builder()
|
||||
Ok(uri) => Response::builder()
|
||||
.status(StatusCode::SEE_OTHER)
|
||||
.header(header::LOCATION, uri)
|
||||
.body(Body::empty())?
|
||||
}
|
||||
.header(header::LOCATION, uri.clone())
|
||||
.body(uri.into())?,
|
||||
};
|
||||
|
||||
return Ok(resp);
|
||||
|
@ -117,3 +206,31 @@ pub async fn handle_post_object(
|
|||
"Request did not contain a file".to_owned(),
|
||||
));
|
||||
}
|
||||
|
||||
// TODO remove allow(dead_code) when policy is verified
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(Deserialize)]
|
||||
struct Policy {
|
||||
expiration: String,
|
||||
conditions: Vec<PolicyCondition>,
|
||||
}
|
||||
|
||||
/// A single condition from a policy
|
||||
#[derive(Deserialize)]
|
||||
#[serde(untagged)]
|
||||
enum PolicyCondition {
|
||||
// will contain a single key-value pair
|
||||
Equal(HashMap<String, String>),
|
||||
OtherOp([String; 3]),
|
||||
SizeRange(String, u64, u64),
|
||||
}
|
||||
|
||||
#[allow(dead_code)]
|
||||
#[derive(PartialEq, Eq)]
|
||||
enum Operation {
|
||||
Equal,
|
||||
StartsWith,
|
||||
StartsWithCT,
|
||||
SizeRange,
|
||||
}
|
||||
|
|
|
@ -34,10 +34,6 @@ pub async fn handle_put(
|
|||
api_key: &Key,
|
||||
mut content_sha256: Option<Hash>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
// Generate identity of new version
|
||||
let version_uuid = gen_uuid();
|
||||
let version_timestamp = now_msec();
|
||||
|
||||
// Retrieve interesting headers from request
|
||||
let headers = get_headers(&req)?;
|
||||
debug!("Object headers: {:?}", headers);
|
||||
|
@ -92,6 +88,31 @@ pub async fn handle_put(
|
|||
body.boxed()
|
||||
};
|
||||
|
||||
save_stream(
|
||||
garage,
|
||||
headers,
|
||||
body,
|
||||
bucket_id,
|
||||
key,
|
||||
content_md5,
|
||||
content_sha256,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||
garage: Arc<Garage>,
|
||||
headers: ObjectVersionHeaders,
|
||||
body: S,
|
||||
bucket_id: Uuid,
|
||||
key: &str,
|
||||
content_md5: Option<String>,
|
||||
content_sha256: Option<FixedBytes32>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
// Generate identity of new version
|
||||
let version_uuid = gen_uuid();
|
||||
let version_timestamp = now_msec();
|
||||
|
||||
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||
let first_block = chunker.next().await?.unwrap_or_default();
|
||||
|
||||
|
|
|
@ -234,7 +234,7 @@ fn parse_query_authorization(
|
|||
})
|
||||
}
|
||||
|
||||
fn parse_credential(cred: &str) -> Result<(String, String), Error> {
|
||||
pub(crate) fn parse_credential(cred: &str) -> Result<(String, String), Error> {
|
||||
let first_slash = cred
|
||||
.find('/')
|
||||
.ok_or_bad_request("Credentials does not contain / in authorization field")?;
|
||||
|
|
Loading…
Reference in a new issue