Support for PostObject #222
|
@ -2,9 +2,11 @@ use std::collections::HashMap;
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
use std::ops::RangeInclusive;
|
use std::ops::RangeInclusive;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
|
use bytes::Bytes;
|
||||||
use chrono::{DateTime, Duration, Utc};
|
use chrono::{DateTime, Duration, Utc};
|
||||||
use futures::StreamExt;
|
use futures::{Stream, StreamExt};
|
||||||
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
|
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
|
||||||
use hyper::{Body, Request, Response, StatusCode};
|
use hyper::{Body, Request, Response, StatusCode};
|
||||||
use multer::{Constraints, Multipart, SizeLimit};
|
use multer::{Constraints, Multipart, SizeLimit};
|
||||||
|
@ -29,7 +31,7 @@ pub async fn handle_post_object(
|
||||||
.and_then(|ct| multer::parse_boundary(ct).ok())
|
.and_then(|ct| multer::parse_boundary(ct).ok())
|
||||||
.ok_or_bad_request("Counld not get multipart boundary")?;
|
.ok_or_bad_request("Counld not get multipart boundary")?;
|
||||||
|
|
||||||
// 16k seems plenty for a header. 5G is the max size of a single part, so it seemrs reasonable
|
// 16k seems plenty for a header. 5G is the max size of a single part, so it seems reasonable
|
||||||
// for a PostObject
|
// for a PostObject
|
||||||
let constraints = Constraints::new().size_limit(
|
let constraints = Constraints::new().size_limit(
|
||||||
SizeLimit::new()
|
SizeLimit::new()
|
||||||
|
@ -119,9 +121,9 @@ pub async fn handle_post_object(
|
||||||
|
|
||||||
let conditions = decoded_policy.into_conditions()?;
|
let conditions = decoded_policy.into_conditions()?;
|
||||||
|
|
||||||
for (key, value) in params.iter() {
|
for (param_key, value) in params.iter() {
|
||||||
let key = key.as_str();
|
let param_key = param_key.as_str();
|
||||||
if key.eq_ignore_ascii_case("content-type") {
|
if param_key.eq_ignore_ascii_case("content-type") {
|
||||||
for cond in &conditions.content_type {
|
for cond in &conditions.content_type {
|
||||||
let ok = match cond {
|
let ok = match cond {
|
||||||
Operation::Equal(s) => value == s,
|
Operation::Equal(s) => value == s,
|
||||||
|
@ -132,13 +134,29 @@ pub async fn handle_post_object(
|
||||||
if !ok {
|
if !ok {
|
||||||
return Err(Error::BadRequest(format!(
|
return Err(Error::BadRequest(format!(
|
||||||
"Key '{}' has value not allowed in policy",
|
"Key '{}' has value not allowed in policy",
|
||||||
key
|
param_key
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if param_key == "key" {
|
||||||
|
let conds = conditions.params.get("key").ok_or_else(|| {
|
||||||
|
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
|
||||||
|
})?;
|
||||||
lx marked this conversation as resolved
Outdated
|
|||||||
|
for cond in conds {
|
||||||
|
let ok = match cond {
|
||||||
|
Operation::Equal(s) => s == &key,
|
||||||
|
Operation::StartsWith(s) => key.starts_with(s),
|
||||||
|
};
|
||||||
|
if !ok {
|
||||||
|
return Err(Error::BadRequest(format!(
|
||||||
|
"Key '{}' has value not allowed in policy",
|
||||||
|
param_key
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
let conds = conditions.params.get(key).ok_or_else(|| {
|
let conds = conditions.params.get(param_key).ok_or_else(|| {
|
||||||
Error::BadRequest(format!("Key '{}' is not allowed in policy", key))
|
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
|
||||||
})?;
|
})?;
|
||||||
for cond in conds {
|
for cond in conds {
|
||||||
let ok = match cond {
|
let ok = match cond {
|
||||||
|
@ -148,16 +166,13 @@ pub async fn handle_post_object(
|
||||||
if !ok {
|
if !ok {
|
||||||
return Err(Error::BadRequest(format!(
|
return Err(Error::BadRequest(format!(
|
||||||
"Key '{}' has value not allowed in policy",
|
"Key '{}' has value not allowed in policy",
|
||||||
key
|
param_key
|
||||||
)));
|
)));
|
||||||
lx marked this conversation as resolved
Outdated
lx
commented
Can you point me to the documentation section which says that there must be a policy specified for the Can you point me to the documentation section which says that there must be a policy specified for the `key` field?
trinity-1686a
commented
Which means I have to add some code to ignore ~~this is not specified, it's however the behavior of AWS. `policy` and `x-amz-signature` are the only two fields I found to not be required in the policy. Even `x-amz-credential`, which is definitelly required to make a valid v4 signature, must be allowed in policy~~
this is in fact [specified somewhere](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions)
> Each form field that you specify in a form (except x-amz-signature, file, policy, and field names that have an x-ignore- prefix) must appear in the list of conditions.
Which means I have to add some code to ignore `x-ignore-*`, others are already ignored
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO validate policy against request
|
|
||||||
// unsafe to merge until implemented
|
|
||||||
|
|
||||||
let content_type = field
|
let content_type = field
|
||||||
.content_type()
|
.content_type()
|
||||||
.map(AsRef::as_ref)
|
.map(AsRef::as_ref)
|
||||||
|
@ -169,10 +184,11 @@ pub async fn handle_post_object(
|
||||||
params.append(header::CONTENT_TYPE, content_type);
|
params.append(header::CONTENT_TYPE, content_type);
|
||||||
let headers = get_headers(¶ms)?;
|
let headers = get_headers(¶ms)?;
|
||||||
|
|
||||||
|
let stream = field.map(|r| r.map_err(Into::into));
|
||||||
trinity-1686a marked this conversation as resolved
Outdated
lx
commented
Same here, can you point me to the doc which says a policy must be given for all fields? Same here, can you point me to the doc which says a policy must be given for all fields?
trinity-1686a
commented
see comment on see comment on `key`
|
|||||||
let res = save_stream(
|
let res = save_stream(
|
||||||
garage,
|
garage,
|
||||||
headers,
|
headers,
|
||||||
field.map(|r| r.map_err(Into::into)),
|
StreamLimiter::new(stream, conditions.content_length),
|
||||||
bucket_id,
|
bucket_id,
|
||||||
&key,
|
&key,
|
||||||
None,
|
None,
|
||||||
|
@ -183,9 +199,10 @@ pub async fn handle_post_object(
|
||||||
let resp = if let Some(target) = params
|
let resp = if let Some(target) = params
|
||||||
.get("success_action_redirect")
|
.get("success_action_redirect")
|
||||||
.and_then(|h| h.to_str().ok())
|
.and_then(|h| h.to_str().ok())
|
||||||
|
.and_then(|u| url::Url::parse(u).ok())
|
||||||
|
.filter(|u| u.scheme() == "https" || u.scheme() == "http")
|
||||||
{
|
{
|
||||||
// TODO should validate it's a valid url
|
let target = target.to_string();
|
||||||
let target = target.to_owned();
|
|
||||||
Response::builder()
|
Response::builder()
|
||||||
.status(StatusCode::SEE_OTHER)
|
.status(StatusCode::SEE_OTHER)
|
||||||
.header(header::LOCATION, target.clone())
|
.header(header::LOCATION, target.clone())
|
||||||
|
@ -309,3 +326,52 @@ enum Operation {
|
||||||
Equal(String),
|
Equal(String),
|
||||||
StartsWith(String),
|
StartsWith(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct StreamLimiter<T> {
|
||||||
|
inner: T,
|
||||||
|
length: RangeInclusive<u64>,
|
||||||
|
read: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> StreamLimiter<T> {
|
||||||
|
fn new(stream: T, length: RangeInclusive<u64>) -> Self {
|
||||||
|
StreamLimiter {
|
||||||
|
inner: stream,
|
||||||
|
length,
|
||||||
|
read: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Stream for StreamLimiter<T>
|
||||||
|
where
|
||||||
|
T: Stream<Item = Result<Bytes, Error>> + Unpin,
|
||||||
|
{
|
||||||
|
type Item = Result<Bytes, Error>;
|
||||||
|
fn poll_next(
|
||||||
|
mut self: std::pin::Pin<&mut Self>,
|
||||||
|
ctx: &mut Context<'_>,
|
||||||
|
) -> Poll<Option<Self::Item>> {
|
||||||
|
let res = std::pin::Pin::new(&mut self.inner).poll_next(ctx);
|
||||||
|
match &res {
|
||||||
|
Poll::Ready(Some(Ok(bytes))) => {
|
||||||
|
self.read += bytes.len() as u64;
|
||||||
|
// optimization to fail early when we know before the end it's too long
|
||||||
|
if self.length.end() < &self.read {
|
||||||
|
return Poll::Ready(Some(Err(Error::BadRequest(
|
||||||
|
"File size does not match policy".to_owned(),
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Poll::Ready(None) => {
|
||||||
|
if !self.length.contains(&self.read) {
|
||||||
|
return Poll::Ready(Some(Err(Error::BadRequest(
|
||||||
|
"File size does not match policy".to_owned(),
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ => {}
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
I feel like we should add unit tests for the policy decoding logic (not just for into_conditions but end-to-end starting with JSON)
I'll write a test and post it in the comments of the PR so that you can copy and paste