Support for PostObject #222

Merged
lx merged 14 commits from trinity-1686a/garage:post-object into main 2022-02-21 22:02:31 +00:00
2 changed files with 84 additions and 61 deletions
Showing only changes of commit 19ac5ce20f - Show all commits

View file

@ -39,7 +39,8 @@ pub async fn handle_post_object(
.for_field("file", 5 * 1024 * 1024 * 1024), .for_field("file", 5 * 1024 * 1024 * 1024),
); );
let mut multipart = Multipart::with_constraints(req.into_body(), boundary, constraints); let (_head, body) = req.into_parts();
let mut multipart = Multipart::with_constraints(body, boundary, constraints);
let mut params = HeaderMap::new(); let mut params = HeaderMap::new();
while let Some(field) = multipart.next_field().await? { while let Some(field) = multipart.next_field().await? {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

I think we can avoid putting almost the entire code of this function in the while loop (and remove 1 indentation level almost everywhere) by doing something like this:

let file = loop {
    let field = match multipart.next_field().await? {
        None => return Err(no file field found),
        Some(x) => x,
    };
    let name = ...;
    if name == "file" {
         break field;
    }
    // here handle header field adding it to the headermap
};
// here do all of the rest of the processing once we have all headers and are now reading the file body

This looks much nicer to me, especially as in the current version we have a for inside the while, which looks a bit like a nested loop but is in fact not at all.

I think we can avoid putting almost the entire code of this function in the `while` loop (and remove 1 indentation level almost everywhere) by doing something like this: ```rust let file = loop { let field = match multipart.next_field().await? { None => return Err(no file field found), Some(x) => x, }; let name = ...; if name == "file" { break field; } // here handle header field adding it to the headermap }; // here do all of the rest of the processing once we have all headers and are now reading the file body ``` This looks much nicer to me, especially as in the current version we have a `for` inside the `while`, which looks a bit like a nested loop but is in fact not at all.
@ -56,6 +57,7 @@ pub async fn handle_post_object(
params.append("x-amz-acl", content); params.append("x-amz-acl", content);
} }
_ => { _ => {
// TODO actually that's illegal to have the same param multiple times
params.append(name, content); params.append(name, content);
} }
} }
@ -122,57 +124,65 @@ pub async fn handle_post_object(
let conditions = decoded_policy.into_conditions()?; let conditions = decoded_policy.into_conditions()?;
for (param_key, value) in params.iter() { for (param_key, value) in params.iter() {
let param_key = param_key.as_str(); let mut param_key = param_key.to_string();
if param_key.eq_ignore_ascii_case("content-type") { param_key.make_ascii_lowercase();
for cond in &conditions.content_type { match param_key.as_str() {
let ok = match cond { "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
Operation::Equal(s) => value == s, "content-type" => {
Operation::StartsWith(s) => { for cond in &conditions.content_type {
value.to_str()?.split(',').all(|v| v.starts_with(s)) let ok = match cond {
Operation::Equal(s) => value == s,
Operation::StartsWith(s) => {
value.to_str()?.split(',').all(|v| v.starts_with(s))
}
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
} }
lx marked this conversation as resolved Outdated
Outdated
Review

I feel like we should add unit tests for the policy decoding logic (not just for into_conditions but end-to-end starting with JSON)

I feel like we should add unit tests for the policy decoding logic (not just for into_conditions but end-to-end starting with JSON)
Outdated
Review

I'll write a test and post it in the comments of the PR so that you can copy and paste

I'll write a test and post it in the comments of the PR so that you can copy and paste
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
} }
} }
} else if param_key == "key" { "key" => {
let conds = conditions.params.get("key").ok_or_else(|| { let conds = conditions.params.get("key").ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?; })?;
for cond in conds { for cond in conds {
let ok = match cond { let ok = match cond {
Operation::Equal(s) => s == &key, Operation::Equal(s) => s == &key,
Operation::StartsWith(s) => key.starts_with(s), Operation::StartsWith(s) => key.starts_with(s),
}; };
if !ok { if !ok {
return Err(Error::BadRequest(format!( return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy", "Key '{}' has value not allowed in policy",
param_key param_key
))); )));
}
} }
} }
} else { _ => {
let conds = conditions.params.get(param_key).ok_or_else(|| { let conds = conditions.params.get(&param_key).ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key)) Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?; })?;
for cond in conds { for cond in conds {
let ok = match cond { let ok = match cond {
Operation::Equal(s) => s == value, Operation::Equal(s) => s == value,
lx marked this conversation as resolved Outdated
Outdated
Review

Can you point me to the documentation section which says that there must be a policy specified for the key field?

Can you point me to the documentation section which says that there must be a policy specified for the `key` field?

this is not specified, it's however the behavior of AWS. policy and x-amz-signature are the only two fields I found to not be required in the policy. Even x-amz-credential, which is definitelly required to make a valid v4 signature, must be allowed in policy
this is in fact specified somewhere

Each form field that you specify in a form (except x-amz-signature, file, policy, and field names that have an x-ignore- prefix) must appear in the list of conditions.

Which means I have to add some code to ignore x-ignore-*, others are already ignored

~~this is not specified, it's however the behavior of AWS. `policy` and `x-amz-signature` are the only two fields I found to not be required in the policy. Even `x-amz-credential`, which is definitelly required to make a valid v4 signature, must be allowed in policy~~ this is in fact [specified somewhere](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions) > Each form field that you specify in a form (except x-amz-signature, file, policy, and field names that have an x-ignore- prefix) must appear in the list of conditions. Which means I have to add some code to ignore `x-ignore-*`, others are already ignored
Operation::StartsWith(s) => value.to_str()?.starts_with(s), Operation::StartsWith(s) => value.to_str()?.starts_with(s),
}; };
if !ok { if !ok {
return Err(Error::BadRequest(format!( return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy", "Key '{}' has value not allowed in policy",
param_key param_key
))); )));
}
} }
} }
} }
} }
// TODO check that each policy item is used
let content_type = field let content_type = field
.content_type() .content_type()
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Same here, can you point me to the doc which says a policy must be given for all fields?

Same here, can you point me to the doc which says a policy must be given for all fields?

see comment on key

see comment on `key`
.map(AsRef::as_ref) .map(AsRef::as_ref)
@ -185,7 +195,7 @@ pub async fn handle_post_object(
let headers = get_headers(&params)?; let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into)); let stream = field.map(|r| r.map_err(Into::into));
let res = save_stream( let (_, md5) = save_stream(
garage, garage,
headers, headers,
StreamLimiter::new(stream, conditions.content_length), StreamLimiter::new(stream, conditions.content_length),
@ -196,35 +206,45 @@ pub async fn handle_post_object(
) )
.await?; .await?;
let resp = if let Some(target) = params let etag = format!("\"{}\"", md5);
// TODO get uri
// get Host
// append www-form-urlencoded key
let location = "todo";
let resp = if let Some(mut target) = params
.get("success_action_redirect") .get("success_action_redirect")
.and_then(|h| h.to_str().ok()) .and_then(|h| h.to_str().ok())
.and_then(|u| url::Url::parse(u).ok()) .and_then(|u| url::Url::parse(u).ok())
.filter(|u| u.scheme() == "https" || u.scheme() == "http") .filter(|u| u.scheme() == "https" || u.scheme() == "http")
lx marked this conversation as resolved Outdated
Outdated
Review

This definitely looks like it should have been done before the policy check

This definitely looks like it should have been done before the policy check

turns out AWS ignore this, and only consider content type set in what I called param, not in the field metadata

turns out AWS ignore this, and only consider content type set in what I called param, not in the field metadata
{ {

the actual response should be

<PostResponse>
    <Location>https://bucketname.garage.tld/key</Location>
    <Bucket>bucketname</Bucket>
    <Key>key</Key>
    <ETag>"0123456789abcdef0123456789abcdef"</ETag>
</PostResponse>

with corresponding etag and location http headers (these headers are also here for 200 and 204, but not the body)

When using success_action_redirect, etag is set as usual, and location is set to ${success_action_redirect}?bucket=bucketname&key=key&etag=%220123456789abcdef0123456789abcdef%22

the actual response should be ```xml <PostResponse> <Location>https://bucketname.garage.tld/key</Location> <Bucket>bucketname</Bucket> <Key>key</Key> <ETag>"0123456789abcdef0123456789abcdef"</ETag> </PostResponse> ``` with corresponding `etag` and `location` http headers (these headers are also here for 200 and 204, but not the body) When using `success_action_redirect`, `etag` is set as usual, and location is set to `${success_action_redirect}?bucket=bucketname&key=key&etag=%220123456789abcdef0123456789abcdef%22`
target
.query_pairs_mut()
.append_pair("bucket", &bucket)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string(); let target = target.to_string();
Response::builder() Response::builder()
.status(StatusCode::SEE_OTHER) .status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone()) .header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
.body(target.into())? .body(target.into())?
} else { } else {
let action = params let action = params
.get("success_action_status") .get("success_action_status")
.and_then(|h| h.to_str().ok()) .and_then(|h| h.to_str().ok())
.unwrap_or("204"); .unwrap_or("204");
let builder = Response::builder()
.status(StatusCode::OK)
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Looks like we don't need this .status() as we are calling it in all branches below

Looks like we don't need this `.status()` as we are calling it in all branches below
.header(header::LOCATION, location)
.header(header::ETAG, etag);
match action { match action {
"200" => Response::builder() "200" => builder.status(StatusCode::OK).body(Body::empty())?,
.status(StatusCode::OK)
.body(Body::empty())?,
"201" => { "201" => {
// TODO body should be an XML document, not sure which yet // TODO body should be an XML document, not sure which yet
Response::builder() builder.status(StatusCode::CREATED).body(Body::from(""))?
.status(StatusCode::CREATED)
.body(res.into_body())?
} }
_ => Response::builder() _ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
.status(StatusCode::NO_CONTENT)
.body(Body::empty())?,
} }
}; };
@ -254,8 +274,9 @@ impl Policy {
if map.len() != 1 { if map.len() != 1 {
return Err(Error::BadRequest("Invalid policy item".to_owned())); return Err(Error::BadRequest("Invalid policy item".to_owned()));
} }
let (k, v) = map.into_iter().next().expect("size was verified"); let (mut k, v) = map.into_iter().next().expect("size was verified");
if k.eq_ignore_ascii_case("content-type") { k.make_ascii_lowercase();
if k == "content-type" {
content_type.push(Operation::Equal(v)); content_type.push(Operation::Equal(v));
} else { } else {
params.entry(k).or_default().push(Operation::Equal(v)); params.entry(k).or_default().push(Operation::Equal(v));
@ -265,16 +286,17 @@ impl Policy {
if key.remove(0) != '$' { if key.remove(0) != '$' {
return Err(Error::BadRequest("Invalid policy item".to_owned())); return Err(Error::BadRequest("Invalid policy item".to_owned()));
} }
key.make_ascii_lowercase();
match cond.as_str() { match cond.as_str() {
"eq" => { "eq" => {
if key.eq_ignore_ascii_case("content-type") { if key == "content-type" {
content_type.push(Operation::Equal(value)); content_type.push(Operation::Equal(value));
} else { } else {
params.entry(key).or_default().push(Operation::Equal(value)); params.entry(key).or_default().push(Operation::Equal(value));
} }
} }
"starts-with" => { "starts-with" => {
if key.eq_ignore_ascii_case("content-type") { if key == "content-type" {
content_type.push(Operation::StartsWith(value)); content_type.push(Operation::StartsWith(value));
} else { } else {
params params

View file

@ -99,6 +99,7 @@ pub async fn handle_put(
content_sha256, content_sha256,
) )
.await .await
.map(|(uuid, md5)| put_response(uuid, md5))
} }
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
@ -109,7 +110,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
key: &str, key: &str,
content_md5: Option<String>, content_md5: Option<String>,
content_sha256: Option<FixedBytes32>, content_sha256: Option<FixedBytes32>,
) -> Result<Response<Body>, Error> { ) -> Result<(Uuid, String), Error> {
// Generate identity of new version // Generate identity of new version
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let version_timestamp = now_msec(); let version_timestamp = now_msec();
@ -150,7 +151,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let object = Object::new(bucket_id, key.into(), vec![object_version]); let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
return Ok(put_response(version_uuid, data_md5sum_hex)); return Ok((version_uuid, data_md5sum_hex));
} }
// Write version identifier in object table so that we have a trace // Write version identifier in object table so that we have a trace
@ -216,7 +217,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let object = Object::new(bucket_id, key.into(), vec![object_version]); let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
Ok(put_response(version_uuid, md5sum_hex)) Ok((version_uuid, md5sum_hex))
} }
/// Validate MD5 sum against content-md5 header /// Validate MD5 sum against content-md5 header
@ -512,7 +513,7 @@ pub async fn handle_put_part(
let response = Response::builder() let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex)) .header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::from(vec![])) .body(Body::empty())
.unwrap(); .unwrap();
Ok(response) Ok(response)
} }