Support for PostObject #222

Merged
lx merged 14 commits from trinity-1686a/garage:post-object into main 2022-02-21 22:02:31 +00:00
12 changed files with 745 additions and 71 deletions

44
Cargo.lock generated
View file

@ -469,6 +469,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df"
dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.7.1"
@ -690,6 +699,7 @@ dependencies = [
"chrono",
"crypto-mac 0.10.1",
"err-derive 0.3.0",
"form_urlencoded",
"futures",
"futures-util",
"garage_model 0.6.0",
@ -704,6 +714,7 @@ dependencies = [
"idna",
"log",
"md-5",
"multer",
"nom",
"percent-encoding",
"pin-project",
@ -711,6 +722,7 @@ dependencies = [
"roxmltree",
"serde",
"serde_bytes",
"serde_json",
"sha2",
"tokio",
"url",
@ -1314,6 +1326,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -1342,6 +1360,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "multer"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836"
dependencies = [
"bytes 1.1.0",
"encoding_rs",
"futures-util",
"http",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.2",
"version_check",
]
[[package]]
name = "netapp"
version = "0.3.0"
@ -1670,7 +1706,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -1933,6 +1969,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
[[package]]
name = "static_init"
version = "1.0.2"

View file

@ -560,7 +560,7 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"; };
dependencies = {
${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.115" { inherit profileName; };
${ if hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.115" { inherit profileName; };
};
});
@ -674,6 +674,20 @@ in
];
});
"registry+https://github.com/rust-lang/crates.io-index".encoding_rs."0.8.30" = overridableMkRustCrate (profileName: rec {
name = "encoding_rs";
version = "0.8.30";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
];
dependencies = {
cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".env_logger."0.7.1" = overridableMkRustCrate (profileName: rec {
name = "env_logger";
version = "0.7.1";
@ -1000,6 +1014,7 @@ in
chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; };
crypto_mac = rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-mac."0.10.1" { inherit profileName; };
err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.0" { profileName = "__noProfile"; };
form_urlencoded = rustPackages."registry+https://github.com/rust-lang/crates.io-index".form_urlencoded."1.0.1" { inherit profileName; };
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.17" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.17" { inherit profileName; };
garage_model = rustPackages."unknown".garage_model."0.6.0" { inherit profileName; };
@ -1014,6 +1029,7 @@ in
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
multer = rustPackages."registry+https://github.com/rust-lang/crates.io-index".multer."2.0.2" { inherit profileName; };
nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; };
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; };
@ -1021,6 +1037,7 @@ in
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; };
serde_json = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.68" { inherit profileName; };
sha2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.9.8" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; };
url = rustPackages."registry+https://github.com/rust-lang/crates.io-index".url."2.2.2" { inherit profileName; };
@ -1768,6 +1785,13 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".mime."0.3.16" = overridableMkRustCrate (profileName: rec {
name = "mime";
version = "0.3.16";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"; };
});
"registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec {
name = "minimal-lexical";
version = "0.2.1";
@ -1812,6 +1836,30 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".multer."2.0.2" = overridableMkRustCrate (profileName: rec {
name = "multer";
version = "2.0.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836"; };
features = builtins.concatLists [
[ "default" ]
];
dependencies = {
bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; };
encoding_rs = rustPackages."registry+https://github.com/rust-lang/crates.io-index".encoding_rs."0.8.30" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.17" { inherit profileName; };
http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.5" { inherit profileName; };
httparse = rustPackages."registry+https://github.com/rust-lang/crates.io-index".httparse."1.5.1" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; };
mime = rustPackages."registry+https://github.com/rust-lang/crates.io-index".mime."0.3.16" { inherit profileName; };
spin = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.9.2" { inherit profileName; };
};
buildDependencies = {
version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.0" = overridableMkRustCrate (profileName: rec {
name = "netapp";
version = "0.3.0";
@ -2635,6 +2683,17 @@ in
src = fetchCratesIo { inherit name version; sha256 = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"; };
});
"registry+https://github.com/rust-lang/crates.io-index".spin."0.9.2" = overridableMkRustCrate (profileName: rec {
name = "spin";
version = "0.9.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"; };
features = builtins.concatLists [
[ "mutex" ]
[ "spin_mutex" ]
];
});
"registry+https://github.com/rust-lang/crates.io-index".static_init."1.0.2" = overridableMkRustCrate (profileName: rec {
name = "static_init";
version = "1.0.2";

View file

@ -36,13 +36,16 @@ futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
form_urlencoded = "1.0.0"
http = "0.2"
httpdate = "0.3"
http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
multer = "2.0"
percent-encoding = "2.1.0"
roxmltree = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0"
quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"

View file

@ -25,6 +25,7 @@ use crate::s3_cors::*;
use crate::s3_delete::*;
use crate::s3_get::*;
use crate::s3_list::*;
use crate::s3_post_object::handle_post_object;
use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*;
@ -92,11 +93,6 @@ async fn handler(
}
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
let authority = req
.headers()
.get(header::HOST)
@ -115,6 +111,15 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?;
debug!("Endpoint: {:?}", endpoint);
if let Endpoint::PostObject {} = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
let bucket_name = match bucket_name {
None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
Some(bucket) => bucket.to_string(),

View file

@ -126,6 +126,12 @@ impl From<HelperError> for Error {
}
}
impl From<multer::Error> for Error {
fn from(err: multer::Error) -> Self {
Self::BadRequest(err.to_string())
}
}
impl Error {
/// Get the HTTP status code that best represents the meaning of the error for the client
pub fn http_status_code(&self) -> StatusCode {

View file

@ -19,6 +19,7 @@ pub mod s3_cors;
mod s3_delete;
pub mod s3_get;
mod s3_list;
mod s3_post_object;
mod s3_put;
mod s3_router;
mod s3_website;

View file

@ -46,7 +46,7 @@ pub async fn handle_copy(
// Implement x-amz-metadata-directive: REPLACE
let new_meta = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
headers: get_headers(req)?,
headers: get_headers(req.headers())?,
size: source_version_meta.size,
etag: source_version_meta.etag.clone(),
},

499
src/api/s3_post_object.rs Normal file
View file

@ -0,0 +1,499 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
use hyper::{Body, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit};
use serde::Deserialize;
use garage_model::garage::Garage;
use crate::api_server::resolve_bucket;
use crate::error::*;
use crate::s3_put::{get_headers, save_stream};
use crate::s3_xml;
use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
req: Request<Body>,
bucket: String,
) -> Result<Response<Body>, Error> {
let boundary = req
.headers()
.get(header::CONTENT_TYPE)
.and_then(|ct| ct.to_str().ok())
.and_then(|ct| multer::parse_boundary(ct).ok())
.ok_or_bad_request("Counld not get multipart boundary")?;
// 16k seems plenty for a header. 5G is the max size of a single part, so it seems reasonable
// for a PostObject
let constraints = Constraints::new().size_limit(
SizeLimit::new()
.per_field(16 * 1024)
.for_field("file", 5 * 1024 * 1024 * 1024),
);
let (head, body) = req.into_parts();
let mut multipart = Multipart::with_constraints(body, boundary, constraints);
let mut params = HeaderMap::new();
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

I think we can avoid putting almost the entire code of this function in the while loop (and remove 1 indentation level almost everywhere) by doing something like this:

let file = loop {
    let field = match multipart.next_field().await? {
        None => return Err(no file field found),
        Some(x) => x,
    };
    let name = ...;
    if name == "file" {
         break field;
    }
    // here handle header field adding it to the headermap
};
// here do all of the rest of the processing once we have all headers and are now reading the file body

This looks much nicer to me, especially as in the current version we have a for inside the while, which looks a bit like a nested loop but is in fact not at all.

I think we can avoid putting almost the entire code of this function in the `while` loop (and remove 1 indentation level almost everywhere) by doing something like this: ```rust let file = loop { let field = match multipart.next_field().await? { None => return Err(no file field found), Some(x) => x, }; let name = ...; if name == "file" { break field; } // here handle header field adding it to the headermap }; // here do all of the rest of the processing once we have all headers and are now reading the file body ``` This looks much nicer to me, especially as in the current version we have a `for` inside the `while`, which looks a bit like a nested loop but is in fact not at all.
let field = loop {
let field = if let Some(field) = multipart.next_field().await? {
field
} else {
return Err(Error::BadRequest(
"Request did not contain a file".to_owned(),
));
};
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
name
} else {
continue;
};
if name == "file" {
break field;
}
if let Ok(content) = HeaderValue::from_str(&field.text().await?) {
match name.as_str() {
"tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
"acl" => {
if params.insert("x-amz-acl", content).is_some() {
return Err(Error::BadRequest(
"Field 'acl' provided more than one time".to_string(),
));
}
}
_ => {
if params.insert(&name, content).is_some() {
return Err(Error::BadRequest(format!(
"Field '{}' provided more than one time",
name
)));
}
}
}
}
};
// Current part is file. Do some checks before handling to PutObject code
let key = params
.get("key")
.ok_or_bad_request("No key was provided")?
.to_str()?;
let credential = params
.get("x-amz-credential")
.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?
.to_str()?;
let policy = params
.get("policy")
.ok_or_bad_request("No policy was provided")?
.to_str()?;
let signature = params
.get("x-amz-signature")
.ok_or_bad_request("No signature was provided")?
.to_str()?;
let date = params
.get("x-amz-date")
.ok_or_bad_request("No date was provided")?
.to_str()?;
let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS.
if let Some(filename) = field.file_name() {
key.replace("${filename}", filename)
} else {
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

This looks like it should rather be an error case: if the application builder put ${filename} in the key field but the browser for some reason didn't include the name of the uploaded file, we probably want to reject the request.

This looks like it should rather be an error case: if the application builder put `${filename}` in the key field but the browser for some reason didn't include the name of the uploaded file, we probably want to reject the request.

that was actually how a previous iteration worked. This behavior was added because it's exactly how AWS behave (not that I can say I agree with that behavior)

that was actually how a previous iteration worked. This behavior was added because it's exactly how AWS behave (not that I can say I agree with that behavior)
Outdated
Review

:/

:/
key.to_owned()
}
} else {
key.to_owned()
};
let date = parse_date(date)?;
let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?;
let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
if !api_key.allow_write(&bucket_id) {
return Err(Error::Forbidden(
"Operation is not allowed for this key.".to_string(),
));
}
let decoded_policy = base64::decode(&policy)?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
let expiration: DateTime<Utc> = DateTime::parse_from_rfc3339(&decoded_policy.expiration)
.ok_or_bad_request("Invalid expiration date")?
.into();
if Utc::now() - expiration > Duration::zero() {
return Err(Error::BadRequest(
"Expiration date is in the paste".to_string(),
));
}
lx marked this conversation as resolved Outdated
Outdated
Review

I feel like we should add unit tests for the policy decoding logic (not just for into_conditions but end-to-end starting with JSON)

I feel like we should add unit tests for the policy decoding logic (not just for into_conditions but end-to-end starting with JSON)
Outdated
Review

I'll write a test and post it in the comments of the PR so that you can copy and paste

I'll write a test and post it in the comments of the PR so that you can copy and paste
let mut conditions = decoded_policy.into_conditions()?;
for (param_key, value) in params.iter() {
let mut param_key = param_key.to_string();
param_key.make_ascii_lowercase();
match param_key.as_str() {
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
Operation::Equal(s) => s.as_str() == value,
Operation::StartsWith(s) => {
value.to_str()?.split(',').all(|v| v.starts_with(&s))
}
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
}
}
}
lx marked this conversation as resolved Outdated
Outdated
Review

Can you point me to the documentation section which says that there must be a policy specified for the key field?

Can you point me to the documentation section which says that there must be a policy specified for the `key` field?

this is not specified, it's however the behavior of AWS. policy and x-amz-signature are the only two fields I found to not be required in the policy. Even x-amz-credential, which is definitelly required to make a valid v4 signature, must be allowed in policy
this is in fact specified somewhere

Each form field that you specify in a form (except x-amz-signature, file, policy, and field names that have an x-ignore- prefix) must appear in the list of conditions.

Which means I have to add some code to ignore x-ignore-*, others are already ignored

~~this is not specified, it's however the behavior of AWS. `policy` and `x-amz-signature` are the only two fields I found to not be required in the policy. Even `x-amz-credential`, which is definitelly required to make a valid v4 signature, must be allowed in policy~~ this is in fact [specified somewhere](https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-HTTPPOSTConstructPolicy.html#sigv4-PolicyConditions) > Each form field that you specify in a form (except x-amz-signature, file, policy, and field names that have an x-ignore- prefix) must appear in the list of conditions. Which means I have to add some code to ignore `x-ignore-*`, others are already ignored
"key" => {
let conds = conditions.params.remove("key").ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
Operation::Equal(s) => s == key,
Operation::StartsWith(s) => key.starts_with(&s),
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
}
}
}
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Same here, can you point me to the doc which says a policy must be given for all fields?

Same here, can you point me to the doc which says a policy must be given for all fields?

see comment on key

see comment on `key`
_ => {
if param_key.starts_with("x-ignore-") {
// if a x-ignore is provided in policy, it's not removed here, so it will be
// rejected as provided in policy but not in the request. As odd as it is, it's
// how aws seems to behave.
continue;
}
let conds = conditions.params.remove(&param_key).ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
Operation::Equal(s) => s.as_str() == value,
Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()),
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
}
}
}
}
}
if let Some((param_key, _)) = conditions.params.iter().next() {
return Err(Error::BadRequest(format!(
"Key '{}' is required in policy, but no value was provided",
param_key
)));
}
lx marked this conversation as resolved Outdated
Outdated
Review

This definitely looks like it should have been done before the policy check

This definitely looks like it should have been done before the policy check

turns out AWS ignore this, and only consider content type set in what I called param, not in the field metadata

turns out AWS ignore this, and only consider content type set in what I called param, not in the field metadata

the actual response should be

<PostResponse>
    <Location>https://bucketname.garage.tld/key</Location>
    <Bucket>bucketname</Bucket>
    <Key>key</Key>
    <ETag>"0123456789abcdef0123456789abcdef"</ETag>
</PostResponse>

with corresponding etag and location http headers (these headers are also here for 200 and 204, but not the body)

When using success_action_redirect, etag is set as usual, and location is set to ${success_action_redirect}?bucket=bucketname&key=key&etag=%220123456789abcdef0123456789abcdef%22

the actual response should be ```xml <PostResponse> <Location>https://bucketname.garage.tld/key</Location> <Bucket>bucketname</Bucket> <Key>key</Key> <ETag>"0123456789abcdef0123456789abcdef"</ETag> </PostResponse> ``` with corresponding `etag` and `location` http headers (these headers are also here for 200 and 204, but not the body) When using `success_action_redirect`, `etag` is set as usual, and location is set to `${success_action_redirect}?bucket=bucketname&key=key&etag=%220123456789abcdef0123456789abcdef%22`
let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into));
let (_, md5) = save_stream(
garage,
headers,
StreamLimiter::new(stream, conditions.content_length),
bucket_id,
&key,
None,
None,
)
.await?;
let etag = format!("\"{}\"", md5);
let resp = if let Some(mut target) = params
.get("success_action_redirect")
trinity-1686a marked this conversation as resolved Outdated
Outdated
Review

Looks like we don't need this .status() as we are calling it in all branches below

Looks like we don't need this `.status()` as we are calling it in all branches below
.and_then(|h| h.to_str().ok())
.and_then(|u| url::Url::parse(u).ok())
.filter(|u| u.scheme() == "https" || u.scheme() == "http")
{
target
.query_pairs_mut()
.append_pair("bucket", &bucket)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
Response::builder()
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
.body(target.into())?
} else {
let path = head
.uri
.into_parts()
.path_and_query
.map(|paq| paq.path().to_string())
.unwrap_or_else(|| "/".to_string());
let authority = head
.headers
.get(header::HOST)
.and_then(|h| h.to_str().ok())
.unwrap_or_default();
let proto = if !authority.is_empty() {
"https://"
} else {
""
};
let url_key: String = form_urlencoded::byte_serialize(key.as_bytes())
.flat_map(str::chars)
.collect();
let location = format!("{}{}{}{}", proto, authority, path, url_key);
let action = params
.get("success_action_status")
.and_then(|h| h.to_str().ok())
.unwrap_or("204");
let builder = Response::builder()
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
match action {
"200" => builder.status(StatusCode::OK).body(Body::empty())?,
"201" => {
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
bucket: s3_xml::Value(bucket),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
let body = s3_xml::to_xml_with_header(&xml)?;
builder
.status(StatusCode::CREATED)
.body(Body::from(body.into_bytes()))?
}
_ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
}
};
Ok(resp)
}
#[derive(Deserialize)]
struct Policy {
expiration: String,
conditions: Vec<PolicyCondition>,
}
impl Policy {
fn into_conditions(self) -> Result<Conditions, Error> {
let mut params = HashMap::<_, Vec<_>>::new();
let mut length = (0, u64::MAX);
for condition in self.conditions {
match condition {
PolicyCondition::Equal(map) => {
if map.len() != 1 {
return Err(Error::BadRequest("Invalid policy item".to_owned()));
}
let (mut k, v) = map.into_iter().next().expect("size was verified");
k.make_ascii_lowercase();
params.entry(k).or_default().push(Operation::Equal(v));
}
PolicyCondition::OtherOp([cond, mut key, value]) => {
if key.remove(0) != '$' {
return Err(Error::BadRequest("Invalid policy item".to_owned()));
}
key.make_ascii_lowercase();
match cond.as_str() {
"eq" => {
params.entry(key).or_default().push(Operation::Equal(value));
}
"starts-with" => {
params
.entry(key)
.or_default()
.push(Operation::StartsWith(value));
}
_ => return Err(Error::BadRequest("Invalid policy item".to_owned())),
}
}
PolicyCondition::SizeRange(key, min, max) => {
if key == "content-length-range" {
length.0 = length.0.max(min);
length.1 = length.1.min(max);
} else {
return Err(Error::BadRequest("Invalid policy item".to_owned()));
}
}
}
}
Ok(Conditions {
params,
content_length: RangeInclusive::new(length.0, length.1),
})
}
}
/// A single condition from a policy
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum PolicyCondition {
// will contain a single key-value pair
Equal(HashMap<String, String>),
OtherOp([String; 3]),
SizeRange(String, u64, u64),
}
#[derive(Debug)]
struct Conditions {
params: HashMap<String, Vec<Operation>>,
content_length: RangeInclusive<u64>,
}
#[derive(Debug, PartialEq, Eq)]
enum Operation {
Equal(String),
StartsWith(String),
}
struct StreamLimiter<T> {
inner: T,
length: RangeInclusive<u64>,
read: u64,
lx marked this conversation as resolved Outdated
Outdated
Review

Why do we need content_type to be set apart here? If it has different treatment I think that's applied at a later moment when we check the set of headers against the policy, here it could just be added to the map with the rest (the same that is done with key).

Why do we need content_type to be set apart here? If it has different treatment I think that's applied at a later moment when we check the set of headers against the policy, here it could just be added to the map with the rest (the same that is done with `key`).
}
impl<T> StreamLimiter<T> {
fn new(stream: T, length: RangeInclusive<u64>) -> Self {
StreamLimiter {
inner: stream,
length,
read: 0,
}
}
}
impl<T> Stream for StreamLimiter<T>
where
T: Stream<Item = Result<Bytes, Error>> + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = std::pin::Pin::new(&mut self.inner).poll_next(ctx);
match &res {
Poll::Ready(Some(Ok(bytes))) => {
self.read += bytes.len() as u64;
// optimization to fail early when we know before the end it's too long
if self.length.end() < &self.read {
return Poll::Ready(Some(Err(Error::BadRequest(
"File size does not match policy".to_owned(),
))));
}
}
Poll::Ready(None) => {
if !self.length.contains(&self.read) {
return Poll::Ready(Some(Err(Error::BadRequest(
"File size does not match policy".to_owned(),
))));
}
}
_ => {}
}
res
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_policy_1() {
let policy_json = br#"
{ "expiration": "2007-12-01T12:00:00.000Z",
"conditions": [
{"acl": "public-read" },
{"bucket": "johnsmith" },
["starts-with", "$key", "user/eric/"]
]
}
"#;
let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap();
let mut conditions = policy_2.into_conditions().unwrap();
assert_eq!(
conditions.params.remove(&"acl".to_string()),
Some(vec![Operation::Equal("public-read".into())])
);
assert_eq!(
conditions.params.remove(&"bucket".to_string()),
Some(vec![Operation::Equal("johnsmith".into())])
);
assert_eq!(
conditions.params.remove(&"key".to_string()),
Some(vec![Operation::StartsWith("user/eric/".into())])
);
assert!(conditions.params.is_empty());
assert_eq!(conditions.content_length, 0..=u64::MAX);
}
#[test]
fn test_policy_2() {
let policy_json = br#"
{ "expiration": "2007-12-01T12:00:00.000Z",
"conditions": [
[ "eq", "$acl", "public-read" ],
["starts-with", "$Content-Type", "image/"],
["starts-with", "$success_action_redirect", ""],
["content-length-range", 1048576, 10485760]
]
}
"#;
let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap();
let mut conditions = policy_2.into_conditions().unwrap();
assert_eq!(
conditions.params.remove(&"acl".to_string()),
Some(vec![Operation::Equal("public-read".into())])
);
assert_eq!(
conditions.params.remove("content-type").unwrap(),
vec![Operation::StartsWith("image/".into())]
);
assert_eq!(
conditions
.params
.remove(&"success_action_redirect".to_string()),
Some(vec![Operation::StartsWith("".into())])
);
assert!(conditions.params.is_empty());
assert_eq!(conditions.content_length, 1048576..=10485760);
}
}

View file

@ -4,6 +4,7 @@ use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::{prelude::*, TryFutureExt};
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@ -34,12 +35,8 @@ pub async fn handle_put(
api_key: &Key,
mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
let version_timestamp = now_msec();
// Retrieve interesting headers from request
let headers = get_headers(&req)?;
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
let content_md5 = match req.headers().get("content-md5") {
@ -92,6 +89,32 @@ pub async fn handle_put(
body.boxed()
};
save_stream(
garage,
headers,
body,
bucket_id,
key,
content_md5,
content_sha256,
)
.await
.map(|(uuid, md5)| put_response(uuid, md5))
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: Arc<Garage>,
headers: ObjectVersionHeaders,
body: S,
bucket_id: Uuid,
key: &str,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
let version_timestamp = now_msec();
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
@ -128,7 +151,7 @@ pub async fn handle_put(
let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok(put_response(version_uuid, data_md5sum_hex));
return Ok((version_uuid, data_md5sum_hex));
}
// Write version identifier in object table so that we have a trace
@ -194,7 +217,7 @@ pub async fn handle_put(
let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
Ok(put_response(version_uuid, md5sum_hex))
Ok((version_uuid, md5sum_hex))
}
/// Validate MD5 sum against content-md5 header
@ -373,7 +396,7 @@ pub async fn handle_create_multipart_upload(
key: &str,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
let headers = get_headers(req)?;
let headers = get_headers(req.headers())?;
// Create object in object table
let object_version = ObjectVersion {
@ -490,7 +513,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::from(vec![]))
.body(Body::empty())
.unwrap();
Ok(response)
}
@ -672,17 +695,16 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![])))
}
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
Ok(req
.headers()
fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
Ok(headers
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(headers)?;
let mut other = BTreeMap::new();
// Preserve standard headers
@ -694,7 +716,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
if let Some(v) = headers.get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
@ -707,7 +729,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
for (k, v) in headers.iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {

View file

@ -410,6 +410,12 @@ pub enum Endpoint {
part_number: u64,
upload_id: String,
},
// This endpoint is not documented with others because it has special use case :
// It's intended to be used with HTML forms, using a multipart/form-data body.
// It works a lot like presigned requests, but everything is in the form instead
// of being query parameters of the URL, so authenticating it is a bit different.
PostObject {
},
}}
impl Endpoint {
@ -543,6 +549,7 @@ impl Endpoint {
UPLOADS => CreateMultipartUpload,
],
no_key: [
EMPTY => PostObject,
DELETE => DeleteObjects,
]
}
@ -1165,6 +1172,7 @@ mod tests {
POST "/{Key+}?restore&versionId=VersionId" => RestoreObject
PUT "/my-movie.m2ts?partNumber=1&uploadId=VCVsb2FkIElEIGZvciBlbZZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZR" => UploadPart
PUT "/Key+?partNumber=2&uploadId=UploadId" => UploadPart
POST "/" => PostObject
);
// no bucket, won't work with the rest of the test suite
assert!(matches!(

View file

@ -289,6 +289,20 @@ pub struct VersioningConfiguration {
pub status: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct PostObject {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
#[serde(rename = "Location")]
pub location: Value,
#[serde(rename = "Bucket")]
pub bucket: Value,
#[serde(rename = "Key")]
pub key: Value,
#[serde(rename = "ETag")]
pub etag: Value,
}
#[cfg(test)]
mod tests {
use super::*;

View file

@ -49,23 +49,6 @@ pub async fn check_payload_signature(
}
};
let scope = format!(
"{}/{}/s3/aws4_request",
authorization.date.format(SHORT_DATE),
garage.config.s3_api.s3_region
);
if authorization.scope != scope {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
}
let key = garage
.key_table
.get(&EmptyKey, &authorization.key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?;
let key_p = key.params().unwrap();
let canonical_request = canonical_request(
request.method(),
&request.uri().path().to_string(),
@ -74,24 +57,17 @@ pub async fn check_payload_signature(
&authorization.signed_headers,
&authorization.content_sha256,
);
let (_, scope) = parse_credential(&authorization.credential)?;
let string_to_sign = string_to_sign(&authorization.date, &scope, &canonical_request);
let mut hmac = signing_hmac(
let key = verify_v4(
garage,
&authorization.credential,
&authorization.date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
"s3",
&authorization.signature,
string_to_sign.as_bytes(),
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(string_to_sign.as_bytes());
let signature = hex::encode(hmac.finalize().into_bytes());
if authorization.signature != signature {
trace!("Canonical request: ``{}``", canonical_request);
trace!("String to sign: ``{}``", string_to_sign);
trace!("Expected: {}, got: {}", signature, authorization.signature);
return Err(Error::Forbidden("Invalid signature".to_string()));
}
.await?;
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
None
@ -108,8 +84,7 @@ pub async fn check_payload_signature(
}
struct Authorization {
key_id: String,
scope: String,
credential: String,
signed_headers: String,
signature: String,
content_sha256: String,
@ -142,7 +117,6 @@ fn parse_authorization(
let cred = auth_params
.get("Credential")
.ok_or_bad_request("Could not find Credential in Authorization field")?;
let (key_id, scope) = parse_credential(cred)?;
let content_sha256 = headers
.get("x-amz-content-sha256")
@ -150,18 +124,15 @@ fn parse_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?;
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
.ok_or_bad_request("Missing X-Amz-Date field")
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::hours(24) {
return Err(Error::BadRequest("Date is too old".to_string()));
}
let auth = Authorization {
key_id,
scope,
credential: cred.to_string(),
signed_headers: auth_params
.get("SignedHeaders")
.ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
@ -189,7 +160,6 @@ fn parse_query_authorization(
let cred = headers
.get("x-amz-credential")
.ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
let (key_id, scope) = parse_credential(cred)?;
let signed_headers = headers
.get("x-amz-signedheaders")
.ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
@ -215,18 +185,15 @@ fn parse_query_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?;
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
.ok_or_bad_request("Missing X-Amz-Date field")
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::seconds(duration) {
return Err(Error::BadRequest("Date is too old".to_string()));
}
Ok(Authorization {
key_id,
scope,
credential: cred.to_string(),
signed_headers: signed_headers.to_string(),
signature: signature.to_string(),
content_sha256: content_sha256.to_string(),
@ -304,3 +271,51 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
"".to_string()
}
}
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
Ok(DateTime::from_utc(date, Utc))
}
pub async fn verify_v4(
garage: &Garage,
credential: &str,
date: &DateTime<Utc>,
signature: &str,
payload: &[u8],
) -> Result<Key, Error> {
let (key_id, scope) = parse_credential(credential)?;
let scope_expected = format!(
"{}/{}/s3/aws4_request",
date.format(SHORT_DATE),
garage.config.s3_api.s3_region
);
if scope != scope_expected {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
}
let key = garage
.key_table
.get(&EmptyKey, &key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::Forbidden(format!("No such key: {}", &key_id)))?;
let key_p = key.params().unwrap();
let mut hmac = signing_hmac(
date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(payload);
let our_signature = hex::encode(hmac.finalize().into_bytes());
if signature != our_signature {
return Err(Error::Forbidden("Invalid signature".to_string()));
}
Ok(key)
}