Support for PostObject (#222)
continuous-integration/drone/push Build is passing Details

Add support for [PostObject](https://docs.aws.amazon.com/AmazonS3/latest/API/RESTObjectPOST.html)

- [x] routing PostObject properly
- [x] parsing multipart body
- [x] validating signature
- [x] validating policy
- [x] validating content length
- [x] actually saving data

Co-authored-by: trinity-1686a <trinity@deuxfleurs.fr>
Co-authored-by: Trinity Pointard <trinity.pointard@gmail.com>
Reviewed-on: #222
Reviewed-by: Alex <alex@adnab.me>
Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>
Co-committed-by: trinity-1686a <trinity.pointard@gmail.com>
This commit is contained in:
trinity-1686a 2022-02-21 23:02:30 +01:00 committed by Alex
parent e312ba977e
commit f6f8b7f1ad
12 changed files with 745 additions and 71 deletions

44
Cargo.lock generated
View File

@ -469,6 +469,15 @@ version = "1.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
[[package]]
name = "encoding_rs"
version = "0.8.30"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df"
dependencies = [
"cfg-if",
]
[[package]]
name = "env_logger"
version = "0.7.1"
@ -690,6 +699,7 @@ dependencies = [
"chrono",
"crypto-mac 0.10.1",
"err-derive 0.3.0",
"form_urlencoded",
"futures",
"futures-util",
"garage_model 0.6.0",
@ -704,6 +714,7 @@ dependencies = [
"idna",
"log",
"md-5",
"multer",
"nom",
"percent-encoding",
"pin-project",
@ -711,6 +722,7 @@ dependencies = [
"roxmltree",
"serde",
"serde_bytes",
"serde_json",
"sha2",
"tokio",
"url",
@ -1314,6 +1326,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "mime"
version = "0.3.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -1342,6 +1360,24 @@ dependencies = [
"winapi",
]
[[package]]
name = "multer"
version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836"
dependencies = [
"bytes 1.1.0",
"encoding_rs",
"futures-util",
"http",
"httparse",
"log",
"memchr",
"mime",
"spin 0.9.2",
"version_check",
]
[[package]]
name = "netapp"
version = "0.3.0"
@ -1670,7 +1706,7 @@ dependencies = [
"cc",
"libc",
"once_cell",
"spin",
"spin 0.5.2",
"untrusted",
"web-sys",
"winapi",
@ -1933,6 +1969,12 @@ version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"
[[package]]
name = "spin"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"
[[package]]
name = "static_init"
version = "1.0.2"

View File

@ -560,7 +560,7 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "95059428f66df56b63431fdb4e1947ed2190586af5c5a8a8b71122bdf5a7f469"; };
dependencies = {
${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.115" { inherit profileName; };
${ if hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.115" { inherit profileName; };
};
});
@ -674,6 +674,20 @@ in
];
});
"registry+https://github.com/rust-lang/crates.io-index".encoding_rs."0.8.30" = overridableMkRustCrate (profileName: rec {
name = "encoding_rs";
version = "0.8.30";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "7896dc8abb250ffdda33912550faa54c88ec8b998dec0b2c55ab224921ce11df"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
];
dependencies = {
cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".env_logger."0.7.1" = overridableMkRustCrate (profileName: rec {
name = "env_logger";
version = "0.7.1";
@ -1000,6 +1014,7 @@ in
chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; };
crypto_mac = rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-mac."0.10.1" { inherit profileName; };
err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.0" { profileName = "__noProfile"; };
form_urlencoded = rustPackages."registry+https://github.com/rust-lang/crates.io-index".form_urlencoded."1.0.1" { inherit profileName; };
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.17" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.17" { inherit profileName; };
garage_model = rustPackages."unknown".garage_model."0.6.0" { inherit profileName; };
@ -1014,6 +1029,7 @@ in
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
multer = rustPackages."registry+https://github.com/rust-lang/crates.io-index".multer."2.0.2" { inherit profileName; };
nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; };
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; };
@ -1021,6 +1037,7 @@ in
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; };
serde_json = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.68" { inherit profileName; };
sha2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.9.8" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; };
url = rustPackages."registry+https://github.com/rust-lang/crates.io-index".url."2.2.2" { inherit profileName; };
@ -1768,6 +1785,13 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".mime."0.3.16" = overridableMkRustCrate (profileName: rec {
name = "mime";
version = "0.3.16";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "2a60c7ce501c71e03a9c9c0d35b861413ae925bd979cc7a4e30d060069aaac8d"; };
});
"registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec {
name = "minimal-lexical";
version = "0.2.1";
@ -1812,6 +1836,30 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".multer."2.0.2" = overridableMkRustCrate (profileName: rec {
name = "multer";
version = "2.0.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836"; };
features = builtins.concatLists [
[ "default" ]
];
dependencies = {
bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; };
encoding_rs = rustPackages."registry+https://github.com/rust-lang/crates.io-index".encoding_rs."0.8.30" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.17" { inherit profileName; };
http = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.5" { inherit profileName; };
httparse = rustPackages."registry+https://github.com/rust-lang/crates.io-index".httparse."1.5.1" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; };
mime = rustPackages."registry+https://github.com/rust-lang/crates.io-index".mime."0.3.16" { inherit profileName; };
spin = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.9.2" { inherit profileName; };
};
buildDependencies = {
version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.3.0" = overridableMkRustCrate (profileName: rec {
name = "netapp";
version = "0.3.0";
@ -2635,6 +2683,17 @@ in
src = fetchCratesIo { inherit name version; sha256 = "6e63cff320ae2c57904679ba7cb63280a3dc4613885beafb148ee7bf9aa9042d"; };
});
"registry+https://github.com/rust-lang/crates.io-index".spin."0.9.2" = overridableMkRustCrate (profileName: rec {
name = "spin";
version = "0.9.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "511254be0c5bcf062b019a6c89c01a664aa359ded62f78aa72c6fc137c0590e5"; };
features = builtins.concatLists [
[ "mutex" ]
[ "spin_mutex" ]
];
});
"registry+https://github.com/rust-lang/crates.io-index".static_init."1.0.2" = overridableMkRustCrate (profileName: rec {
name = "static_init";
version = "1.0.2";

View File

@ -36,13 +36,16 @@ futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
form_urlencoded = "1.0.0"
http = "0.2"
httpdate = "0.3"
http-range = "0.1"
hyper = { version = "0.14", features = ["server", "http1", "runtime", "tcp", "stream"] }
multer = "2.0"
percent-encoding = "2.1.0"
roxmltree = "0.14"
serde = { version = "1.0", features = ["derive"] }
serde_bytes = "0.11"
serde_json = "1.0"
quick-xml = { version = "0.21", features = [ "serialize" ] }
url = "2.1"

View File

@ -25,6 +25,7 @@ use crate::s3_cors::*;
use crate::s3_delete::*;
use crate::s3_get::*;
use crate::s3_list::*;
use crate::s3_post_object::handle_post_object;
use crate::s3_put::*;
use crate::s3_router::{Authorization, Endpoint};
use crate::s3_website::*;
@ -92,11 +93,6 @@ async fn handler(
}
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
let authority = req
.headers()
.get(header::HOST)
@ -115,6 +111,15 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
let (endpoint, bucket_name) = Endpoint::from_request(&req, bucket_name.map(ToOwned::to_owned))?;
debug!("Endpoint: {:?}", endpoint);
if let Endpoint::PostObject {} = endpoint {
return handle_post_object(garage, req, bucket_name.unwrap()).await;
}
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let api_key = api_key.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?;
let bucket_name = match bucket_name {
None => return handle_request_without_bucket(garage, req, api_key, endpoint).await,
Some(bucket) => bucket.to_string(),

View File

@ -126,6 +126,12 @@ impl From<HelperError> for Error {
}
}
impl From<multer::Error> for Error {
fn from(err: multer::Error) -> Self {
Self::BadRequest(err.to_string())
}
}
impl Error {
/// Get the HTTP status code that best represents the meaning of the error for the client
pub fn http_status_code(&self) -> StatusCode {

View File

@ -19,6 +19,7 @@ pub mod s3_cors;
mod s3_delete;
pub mod s3_get;
mod s3_list;
mod s3_post_object;
mod s3_put;
mod s3_router;
mod s3_website;

View File

@ -46,7 +46,7 @@ pub async fn handle_copy(
// Implement x-amz-metadata-directive: REPLACE
let new_meta = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
headers: get_headers(req)?,
headers: get_headers(req.headers())?,
size: source_version_meta.size,
etag: source_version_meta.etag.clone(),
},

499
src/api/s3_post_object.rs Normal file
View File

@ -0,0 +1,499 @@
use std::collections::HashMap;
use std::convert::TryInto;
use std::ops::RangeInclusive;
use std::sync::Arc;
use std::task::{Context, Poll};
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt};
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
use hyper::{Body, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit};
use serde::Deserialize;
use garage_model::garage::Garage;
use crate::api_server::resolve_bucket;
use crate::error::*;
use crate::s3_put::{get_headers, save_stream};
use crate::s3_xml;
use crate::signature::payload::{parse_date, verify_v4};
pub async fn handle_post_object(
garage: Arc<Garage>,
req: Request<Body>,
bucket: String,
) -> Result<Response<Body>, Error> {
let boundary = req
.headers()
.get(header::CONTENT_TYPE)
.and_then(|ct| ct.to_str().ok())
.and_then(|ct| multer::parse_boundary(ct).ok())
.ok_or_bad_request("Counld not get multipart boundary")?;
// 16k seems plenty for a header. 5G is the max size of a single part, so it seems reasonable
// for a PostObject
let constraints = Constraints::new().size_limit(
SizeLimit::new()
.per_field(16 * 1024)
.for_field("file", 5 * 1024 * 1024 * 1024),
);
let (head, body) = req.into_parts();
let mut multipart = Multipart::with_constraints(body, boundary, constraints);
let mut params = HeaderMap::new();
let field = loop {
let field = if let Some(field) = multipart.next_field().await? {
field
} else {
return Err(Error::BadRequest(
"Request did not contain a file".to_owned(),
));
};
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
name
} else {
continue;
};
if name == "file" {
break field;
}
if let Ok(content) = HeaderValue::from_str(&field.text().await?) {
match name.as_str() {
"tag" => (/* tag need to be reencoded, but we don't support them yet anyway */),
"acl" => {
if params.insert("x-amz-acl", content).is_some() {
return Err(Error::BadRequest(
"Field 'acl' provided more than one time".to_string(),
));
}
}
_ => {
if params.insert(&name, content).is_some() {
return Err(Error::BadRequest(format!(
"Field '{}' provided more than one time",
name
)));
}
}
}
}
};
// Current part is file. Do some checks before handling to PutObject code
let key = params
.get("key")
.ok_or_bad_request("No key was provided")?
.to_str()?;
let credential = params
.get("x-amz-credential")
.ok_or_else(|| {
Error::Forbidden("Garage does not support anonymous access yet".to_string())
})?
.to_str()?;
let policy = params
.get("policy")
.ok_or_bad_request("No policy was provided")?
.to_str()?;
let signature = params
.get("x-amz-signature")
.ok_or_bad_request("No signature was provided")?
.to_str()?;
let date = params
.get("x-amz-date")
.ok_or_bad_request("No date was provided")?
.to_str()?;
let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS.
if let Some(filename) = field.file_name() {
key.replace("${filename}", filename)
} else {
key.to_owned()
}
} else {
key.to_owned()
};
let date = parse_date(date)?;
let api_key = verify_v4(&garage, credential, &date, signature, policy.as_bytes()).await?;
let bucket_id = resolve_bucket(&garage, &bucket, &api_key).await?;
if !api_key.allow_write(&bucket_id) {
return Err(Error::Forbidden(
"Operation is not allowed for this key.".to_string(),
));
}
let decoded_policy = base64::decode(&policy)?;
let decoded_policy: Policy =
serde_json::from_slice(&decoded_policy).ok_or_bad_request("Invalid policy")?;
let expiration: DateTime<Utc> = DateTime::parse_from_rfc3339(&decoded_policy.expiration)
.ok_or_bad_request("Invalid expiration date")?
.into();
if Utc::now() - expiration > Duration::zero() {
return Err(Error::BadRequest(
"Expiration date is in the paste".to_string(),
));
}
let mut conditions = decoded_policy.into_conditions()?;
for (param_key, value) in params.iter() {
let mut param_key = param_key.to_string();
param_key.make_ascii_lowercase();
match param_key.as_str() {
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
Operation::Equal(s) => s.as_str() == value,
Operation::StartsWith(s) => {
value.to_str()?.split(',').all(|v| v.starts_with(&s))
}
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
}
}
}
"key" => {
let conds = conditions.params.remove("key").ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
Operation::Equal(s) => s == key,
Operation::StartsWith(s) => key.starts_with(&s),
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
}
}
}
_ => {
if param_key.starts_with("x-ignore-") {
// if a x-ignore is provided in policy, it's not removed here, so it will be
// rejected as provided in policy but not in the request. As odd as it is, it's
// how aws seems to behave.
continue;
}
let conds = conditions.params.remove(&param_key).ok_or_else(|| {
Error::BadRequest(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
let ok = match cond {
Operation::Equal(s) => s.as_str() == value,
Operation::StartsWith(s) => value.to_str()?.starts_with(s.as_str()),
};
if !ok {
return Err(Error::BadRequest(format!(
"Key '{}' has value not allowed in policy",
param_key
)));
}
}
}
}
}
if let Some((param_key, _)) = conditions.params.iter().next() {
return Err(Error::BadRequest(format!(
"Key '{}' is required in policy, but no value was provided",
param_key
)));
}
let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into));
let (_, md5) = save_stream(
garage,
headers,
StreamLimiter::new(stream, conditions.content_length),
bucket_id,
&key,
None,
None,
)
.await?;
let etag = format!("\"{}\"", md5);
let resp = if let Some(mut target) = params
.get("success_action_redirect")
.and_then(|h| h.to_str().ok())
.and_then(|u| url::Url::parse(u).ok())
.filter(|u| u.scheme() == "https" || u.scheme() == "http")
{
target
.query_pairs_mut()
.append_pair("bucket", &bucket)
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
Response::builder()
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
.body(target.into())?
} else {
let path = head
.uri
.into_parts()
.path_and_query
.map(|paq| paq.path().to_string())
.unwrap_or_else(|| "/".to_string());
let authority = head
.headers
.get(header::HOST)
.and_then(|h| h.to_str().ok())
.unwrap_or_default();
let proto = if !authority.is_empty() {
"https://"
} else {
""
};
let url_key: String = form_urlencoded::byte_serialize(key.as_bytes())
.flat_map(str::chars)
.collect();
let location = format!("{}{}{}{}", proto, authority, path, url_key);
let action = params
.get("success_action_status")
.and_then(|h| h.to_str().ok())
.unwrap_or("204");
let builder = Response::builder()
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
match action {
"200" => builder.status(StatusCode::OK).body(Body::empty())?,
"201" => {
let xml = s3_xml::PostObject {
xmlns: (),
location: s3_xml::Value(location),
bucket: s3_xml::Value(bucket),
key: s3_xml::Value(key),
etag: s3_xml::Value(etag),
};
let body = s3_xml::to_xml_with_header(&xml)?;
builder
.status(StatusCode::CREATED)
.body(Body::from(body.into_bytes()))?
}
_ => builder.status(StatusCode::NO_CONTENT).body(Body::empty())?,
}
};
Ok(resp)
}
#[derive(Deserialize)]
struct Policy {
expiration: String,
conditions: Vec<PolicyCondition>,
}
impl Policy {
fn into_conditions(self) -> Result<Conditions, Error> {
let mut params = HashMap::<_, Vec<_>>::new();
let mut length = (0, u64::MAX);
for condition in self.conditions {
match condition {
PolicyCondition::Equal(map) => {
if map.len() != 1 {
return Err(Error::BadRequest("Invalid policy item".to_owned()));
}
let (mut k, v) = map.into_iter().next().expect("size was verified");
k.make_ascii_lowercase();
params.entry(k).or_default().push(Operation::Equal(v));
}
PolicyCondition::OtherOp([cond, mut key, value]) => {
if key.remove(0) != '$' {
return Err(Error::BadRequest("Invalid policy item".to_owned()));
}
key.make_ascii_lowercase();
match cond.as_str() {
"eq" => {
params.entry(key).or_default().push(Operation::Equal(value));
}
"starts-with" => {
params
.entry(key)
.or_default()
.push(Operation::StartsWith(value));
}
_ => return Err(Error::BadRequest("Invalid policy item".to_owned())),
}
}
PolicyCondition::SizeRange(key, min, max) => {
if key == "content-length-range" {
length.0 = length.0.max(min);
length.1 = length.1.min(max);
} else {
return Err(Error::BadRequest("Invalid policy item".to_owned()));
}
}
}
}
Ok(Conditions {
params,
content_length: RangeInclusive::new(length.0, length.1),
})
}
}
/// A single condition from a policy
#[derive(Debug, Deserialize)]
#[serde(untagged)]
enum PolicyCondition {
// will contain a single key-value pair
Equal(HashMap<String, String>),
OtherOp([String; 3]),
SizeRange(String, u64, u64),
}
#[derive(Debug)]
struct Conditions {
params: HashMap<String, Vec<Operation>>,
content_length: RangeInclusive<u64>,
}
#[derive(Debug, PartialEq, Eq)]
enum Operation {
Equal(String),
StartsWith(String),
}
struct StreamLimiter<T> {
inner: T,
length: RangeInclusive<u64>,
read: u64,
}
impl<T> StreamLimiter<T> {
fn new(stream: T, length: RangeInclusive<u64>) -> Self {
StreamLimiter {
inner: stream,
length,
read: 0,
}
}
}
impl<T> Stream for StreamLimiter<T>
where
T: Stream<Item = Result<Bytes, Error>> + Unpin,
{
type Item = Result<Bytes, Error>;
fn poll_next(
mut self: std::pin::Pin<&mut Self>,
ctx: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
let res = std::pin::Pin::new(&mut self.inner).poll_next(ctx);
match &res {
Poll::Ready(Some(Ok(bytes))) => {
self.read += bytes.len() as u64;
// optimization to fail early when we know before the end it's too long
if self.length.end() < &self.read {
return Poll::Ready(Some(Err(Error::BadRequest(
"File size does not match policy".to_owned(),
))));
}
}
Poll::Ready(None) => {
if !self.length.contains(&self.read) {
return Poll::Ready(Some(Err(Error::BadRequest(
"File size does not match policy".to_owned(),
))));
}
}
_ => {}
}
res
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_policy_1() {
let policy_json = br#"
{ "expiration": "2007-12-01T12:00:00.000Z",
"conditions": [
{"acl": "public-read" },
{"bucket": "johnsmith" },
["starts-with", "$key", "user/eric/"]
]
}
"#;
let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap();
let mut conditions = policy_2.into_conditions().unwrap();
assert_eq!(
conditions.params.remove(&"acl".to_string()),
Some(vec![Operation::Equal("public-read".into())])
);
assert_eq!(
conditions.params.remove(&"bucket".to_string()),
Some(vec![Operation::Equal("johnsmith".into())])
);
assert_eq!(
conditions.params.remove(&"key".to_string()),
Some(vec![Operation::StartsWith("user/eric/".into())])
);
assert!(conditions.params.is_empty());
assert_eq!(conditions.content_length, 0..=u64::MAX);
}
#[test]
fn test_policy_2() {
let policy_json = br#"
{ "expiration": "2007-12-01T12:00:00.000Z",
"conditions": [
[ "eq", "$acl", "public-read" ],
["starts-with", "$Content-Type", "image/"],
["starts-with", "$success_action_redirect", ""],
["content-length-range", 1048576, 10485760]
]
}
"#;
let policy_2: Policy = serde_json::from_slice(&policy_json[..]).unwrap();
let mut conditions = policy_2.into_conditions().unwrap();
assert_eq!(
conditions.params.remove(&"acl".to_string()),
Some(vec![Operation::Equal("public-read".into())])
);
assert_eq!(
conditions.params.remove("content-type").unwrap(),
vec![Operation::StartsWith("image/".into())]
);
assert_eq!(
conditions
.params
.remove(&"success_action_redirect".to_string()),
Some(vec![Operation::StartsWith("".into())])
);
assert!(conditions.params.is_empty());
assert_eq!(conditions.content_length, 1048576..=10485760);
}
}

View File

@ -4,6 +4,7 @@ use std::sync::Arc;
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::{prelude::*, TryFutureExt};
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@ -34,12 +35,8 @@ pub async fn handle_put(
api_key: &Key,
mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
let version_timestamp = now_msec();
// Retrieve interesting headers from request
let headers = get_headers(&req)?;
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
let content_md5 = match req.headers().get("content-md5") {
@ -92,6 +89,32 @@ pub async fn handle_put(
body.boxed()
};
save_stream(
garage,
headers,
body,
bucket_id,
key,
content_md5,
content_sha256,
)
.await
.map(|(uuid, md5)| put_response(uuid, md5))
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: Arc<Garage>,
headers: ObjectVersionHeaders,
body: S,
bucket_id: Uuid,
key: &str,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
let version_timestamp = now_msec();
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
@ -128,7 +151,7 @@ pub async fn handle_put(
let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok(put_response(version_uuid, data_md5sum_hex));
return Ok((version_uuid, data_md5sum_hex));
}
// Write version identifier in object table so that we have a trace
@ -194,7 +217,7 @@ pub async fn handle_put(
let object = Object::new(bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
Ok(put_response(version_uuid, md5sum_hex))
Ok((version_uuid, md5sum_hex))
}
/// Validate MD5 sum against content-md5 header
@ -373,7 +396,7 @@ pub async fn handle_create_multipart_upload(
key: &str,
) -> Result<Response<Body>, Error> {
let version_uuid = gen_uuid();
let headers = get_headers(req)?;
let headers = get_headers(req.headers())?;
// Create object in object table
let object_version = ObjectVersion {
@ -490,7 +513,7 @@ pub async fn handle_put_part(
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(Body::from(vec![]))
.body(Body::empty())
.unwrap();
Ok(response)
}
@ -672,17 +695,16 @@ pub async fn handle_abort_multipart_upload(
Ok(Response::new(Body::from(vec![])))
}
fn get_mime_type(req: &Request<Body>) -> Result<String, Error> {
Ok(req
.headers()
fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
Ok(headers
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(req)?;
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(headers)?;
let mut other = BTreeMap::new();
// Preserve standard headers
@ -694,7 +716,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = req.headers().get(h) {
if let Some(v) = headers.get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
@ -707,7 +729,7 @@ pub(crate) fn get_headers(req: &Request<Body>) -> Result<ObjectVersionHeaders, E
}
// Preserve x-amz-meta- headers
for (k, v) in req.headers().iter() {
for (k, v) in headers.iter() {
if k.as_str().starts_with("x-amz-meta-") {
match v.to_str() {
Ok(v_str) => {

View File

@ -410,6 +410,12 @@ pub enum Endpoint {
part_number: u64,
upload_id: String,
},
// This endpoint is not documented with others because it has special use case :
// It's intended to be used with HTML forms, using a multipart/form-data body.
// It works a lot like presigned requests, but everything is in the form instead
// of being query parameters of the URL, so authenticating it is a bit different.
PostObject {
},
}}
impl Endpoint {
@ -543,6 +549,7 @@ impl Endpoint {
UPLOADS => CreateMultipartUpload,
],
no_key: [
EMPTY => PostObject,
DELETE => DeleteObjects,
]
}
@ -1165,6 +1172,7 @@ mod tests {
POST "/{Key+}?restore&versionId=VersionId" => RestoreObject
PUT "/my-movie.m2ts?partNumber=1&uploadId=VCVsb2FkIElEIGZvciBlbZZpbmcncyBteS1tb3ZpZS5tMnRzIHVwbG9hZR" => UploadPart
PUT "/Key+?partNumber=2&uploadId=UploadId" => UploadPart
POST "/" => PostObject
);
// no bucket, won't work with the rest of the test suite
assert!(matches!(

View File

@ -289,6 +289,20 @@ pub struct VersioningConfiguration {
pub status: Option<Value>,
}
#[derive(Debug, Serialize, PartialEq)]
pub struct PostObject {
#[serde(serialize_with = "xmlns_tag")]
pub xmlns: (),
#[serde(rename = "Location")]
pub location: Value,
#[serde(rename = "Bucket")]
pub bucket: Value,
#[serde(rename = "Key")]
pub key: Value,
#[serde(rename = "ETag")]
pub etag: Value,
}
#[cfg(test)]
mod tests {
use super::*;

View File

@ -49,23 +49,6 @@ pub async fn check_payload_signature(
}
};
let scope = format!(
"{}/{}/s3/aws4_request",
authorization.date.format(SHORT_DATE),
garage.config.s3_api.s3_region
);
if authorization.scope != scope {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
}
let key = garage
.key_table
.get(&EmptyKey, &authorization.key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::Forbidden(format!("No such key: {}", authorization.key_id)))?;
let key_p = key.params().unwrap();
let canonical_request = canonical_request(
request.method(),
&request.uri().path().to_string(),
@ -74,24 +57,17 @@ pub async fn check_payload_signature(
&authorization.signed_headers,
&authorization.content_sha256,
);
let (_, scope) = parse_credential(&authorization.credential)?;
let string_to_sign = string_to_sign(&authorization.date, &scope, &canonical_request);
let mut hmac = signing_hmac(
let key = verify_v4(
garage,
&authorization.credential,
&authorization.date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
"s3",
&authorization.signature,
string_to_sign.as_bytes(),
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(string_to_sign.as_bytes());
let signature = hex::encode(hmac.finalize().into_bytes());
if authorization.signature != signature {
trace!("Canonical request: ``{}``", canonical_request);
trace!("String to sign: ``{}``", string_to_sign);
trace!("Expected: {}, got: {}", signature, authorization.signature);
return Err(Error::Forbidden("Invalid signature".to_string()));
}
.await?;
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
None
@ -108,8 +84,7 @@ pub async fn check_payload_signature(
}
struct Authorization {
key_id: String,
scope: String,
credential: String,
signed_headers: String,
signature: String,
content_sha256: String,
@ -142,7 +117,6 @@ fn parse_authorization(
let cred = auth_params
.get("Credential")
.ok_or_bad_request("Could not find Credential in Authorization field")?;
let (key_id, scope) = parse_credential(cred)?;
let content_sha256 = headers
.get("x-amz-content-sha256")
@ -150,18 +124,15 @@ fn parse_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?;
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
.ok_or_bad_request("Missing X-Amz-Date field")
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::hours(24) {
return Err(Error::BadRequest("Date is too old".to_string()));
}
let auth = Authorization {
key_id,
scope,
credential: cred.to_string(),
signed_headers: auth_params
.get("SignedHeaders")
.ok_or_bad_request("Could not find SignedHeaders in Authorization field")?
@ -189,7 +160,6 @@ fn parse_query_authorization(
let cred = headers
.get("x-amz-credential")
.ok_or_bad_request("X-Amz-Credential not found in query parameters")?;
let (key_id, scope) = parse_credential(cred)?;
let signed_headers = headers
.get("x-amz-signedheaders")
.ok_or_bad_request("X-Amz-SignedHeaders not found in query parameters")?;
@ -215,18 +185,15 @@ fn parse_query_authorization(
let date = headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?;
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
.ok_or_bad_request("Missing X-Amz-Date field")
.and_then(|d| parse_date(d))?;
if Utc::now() - date > Duration::seconds(duration) {
return Err(Error::BadRequest("Date is too old".to_string()));
}
Ok(Authorization {
key_id,
scope,
credential: cred.to_string(),
signed_headers: signed_headers.to_string(),
signature: signature.to_string(),
content_sha256: content_sha256.to_string(),
@ -304,3 +271,51 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
"".to_string()
}
}
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
Ok(DateTime::from_utc(date, Utc))
}
pub async fn verify_v4(
garage: &Garage,
credential: &str,
date: &DateTime<Utc>,
signature: &str,
payload: &[u8],
) -> Result<Key, Error> {
let (key_id, scope) = parse_credential(credential)?;
let scope_expected = format!(
"{}/{}/s3/aws4_request",
date.format(SHORT_DATE),
garage.config.s3_api.s3_region
);
if scope != scope_expected {
return Err(Error::AuthorizationHeaderMalformed(scope.to_string()));
}
let key = garage
.key_table
.get(&EmptyKey, &key_id)
.await?
.filter(|k| !k.state.is_deleted())
.ok_or_else(|| Error::Forbidden(format!("No such key: {}", &key_id)))?;
let key_p = key.params().unwrap();
let mut hmac = signing_hmac(
date,
&key_p.secret_key,
&garage.config.s3_api.s3_region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
hmac.update(payload);
let our_signature = hex::encode(hmac.finalize().into_bytes());
if signature != our_signature {
return Err(Error::Forbidden("Invalid signature".to_string()));
}
Ok(key)
}