Support STREAMING-AWS4-HMAC-SHA256-PAYLOAD (#64) #156
39
Cargo.lock
generated
|
@ -433,7 +433,9 @@ dependencies = [
|
||||||
"idna",
|
"idna",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5",
|
||||||
|
"nom",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
|
"pin-project",
|
||||||
"quick-xml",
|
"quick-xml",
|
||||||
"roxmltree",
|
"roxmltree",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -967,6 +969,12 @@ dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "minimal-lexical"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.7.13"
|
version = "0.7.13"
|
||||||
|
@ -1011,6 +1019,17 @@ dependencies = [
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nom"
|
||||||
|
version = "7.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
"minimal-lexical",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ntapi"
|
name = "ntapi"
|
||||||
version = "0.3.6"
|
version = "0.3.6"
|
||||||
|
@ -1092,6 +1111,26 @@ version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project"
|
||||||
|
version = "1.0.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
|
||||||
|
dependencies = [
|
||||||
|
"pin-project-internal",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project-internal"
|
||||||
|
version = "1.0.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.7"
|
version = "0.2.7"
|
||||||
|
|
53
Cargo.nix
|
@ -671,7 +671,9 @@ in
|
||||||
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
|
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
|
||||||
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
|
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
|
||||||
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
|
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
|
||||||
|
nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; };
|
||||||
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
|
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
|
||||||
|
pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; };
|
||||||
quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; };
|
quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; };
|
||||||
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
|
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
|
||||||
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
|
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
|
||||||
|
@ -1321,6 +1323,16 @@ in
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "minimal-lexical";
|
||||||
|
version = "0.2.1";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; };
|
||||||
|
features = builtins.concatLists [
|
||||||
|
[ "std" ]
|
||||||
|
];
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "mio";
|
name = "mio";
|
||||||
version = "0.7.13";
|
version = "0.7.13";
|
||||||
|
@ -1381,6 +1393,25 @@ in
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "nom";
|
||||||
|
version = "7.1.0";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; };
|
||||||
|
features = builtins.concatLists [
|
||||||
|
[ "alloc" ]
|
||||||
|
[ "default" ]
|
||||||
|
[ "std" ]
|
||||||
|
];
|
||||||
|
dependencies = {
|
||||||
|
memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; };
|
||||||
|
minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; };
|
||||||
|
};
|
||||||
|
buildDependencies = {
|
||||||
|
version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; };
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "ntapi";
|
name = "ntapi";
|
||||||
version = "0.3.6";
|
version = "0.3.6";
|
||||||
|
@ -1490,6 +1521,28 @@ in
|
||||||
src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; };
|
src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; };
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "pin-project";
|
||||||
|
version = "1.0.8";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; };
|
||||||
|
dependencies = {
|
||||||
|
pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; };
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "pin-project-internal";
|
||||||
|
version = "1.0.8";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; };
|
||||||
|
dependencies = {
|
||||||
|
proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; };
|
||||||
|
quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; };
|
||||||
|
syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; };
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "pin-project-lite";
|
name = "pin-project-lite";
|
||||||
version = "0.2.7";
|
version = "0.2.7";
|
||||||
|
|
|
@ -76,7 +76,7 @@ in let
|
||||||
*/
|
*/
|
||||||
''^(src|tests)'' # fixed default
|
''^(src|tests)'' # fixed default
|
||||||
''.*\.(rs|toml)$'' # fixed default
|
''.*\.(rs|toml)$'' # fixed default
|
||||||
''^(crdt|replication|cli|helper)'' # our crate submodules
|
''^(crdt|replication|cli|helper|signature)'' # our crate submodules
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ cat > /tmp/garage.mc/config.json <<EOF
|
||||||
"version": "10",
|
"version": "10",
|
||||||
"aliases": {
|
"aliases": {
|
||||||
"garage": {
|
"garage": {
|
||||||
"url": "https://localhost:4443",
|
"url": "http://127.0.0.1:3911",
|
||||||
"accessKey": "$ACCESS_KEY",
|
"accessKey": "$ACCESS_KEY",
|
||||||
"secretKey": "$SECRET_KEY",
|
"secretKey": "$SECRET_KEY",
|
||||||
"api": "S3v4",
|
"api": "S3v4",
|
||||||
|
|
|
@ -28,10 +28,12 @@ hmac = "0.10"
|
||||||
idna = "0.2"
|
idna = "0.2"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
md-5 = "0.9"
|
md-5 = "0.9"
|
||||||
|
nom = "7.1"
|
||||||
sha2 = "0.9"
|
sha2 = "0.9"
|
||||||
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
|
pin-project = "1.0"
|
||||||
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
||||||
|
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
|
|
|
@ -15,7 +15,7 @@ use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::Key;
|
use garage_model::key_table::Key;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::signature::check_signature;
|
use crate::signature::payload::check_payload_signature;
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
use crate::s3_bucket::*;
|
use crate::s3_bucket::*;
|
||||||
|
@ -90,7 +90,7 @@ async fn handler(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||||
let (api_key, content_sha256) = check_signature(&garage, &req).await?;
|
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
|
||||||
|
|
||||||
let authority = req
|
let authority = req
|
||||||
.headers()
|
.headers()
|
||||||
|
@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Endpoint::PutObject { key, .. } => {
|
Endpoint::PutObject { key, .. } => {
|
||||||
handle_put(garage, req, bucket_id, &key, content_sha256).await
|
handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await
|
||||||
}
|
}
|
||||||
Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
|
Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
|
||||||
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
||||||
|
|
|
@ -120,7 +120,10 @@ pub async fn handle_create_bucket(
|
||||||
bucket_name: String,
|
bucket_name: String,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let cmd =
|
let cmd =
|
||||||
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
|
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
|
||||||
|
|
|
@ -81,7 +81,10 @@ pub async fn handle_delete_objects(
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
|
|||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::stream::*;
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
use hyper::{Body, Request, Response};
|
use futures::{prelude::*, TryFutureExt};
|
||||||
|
use hyper::body::{Body, Bytes};
|
||||||
|
use hyper::{Request, Response};
|
||||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
|
||||||
|
@ -14,19 +16,23 @@ use garage_util::time::*;
|
||||||
use garage_model::block::INLINE_THRESHOLD;
|
use garage_model::block::INLINE_THRESHOLD;
|
||||||
use garage_model::block_ref_table::*;
|
use garage_model::block_ref_table::*;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::key_table::Key;
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
use garage_model::version_table::*;
|
use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::s3_xml;
|
use crate::s3_xml;
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::streaming::SignedPayloadStream;
|
||||||
|
use crate::signature::LONG_DATETIME;
|
||||||
|
use crate::signature::{compute_scope, verify_signed_content};
|
||||||
|
|
||||||
pub async fn handle_put(
|
pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &str,
|
||||||
content_sha256: Option<Hash>,
|
api_key: &Key,
|
||||||
|
mut content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
// Generate identity of new version
|
// Generate identity of new version
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
|
@ -40,11 +46,53 @@ pub async fn handle_put(
|
||||||
Some(x) => Some(x.to_str()?.to_string()),
|
Some(x) => Some(x.to_str()?.to_string()),
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
|
||||||
|
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
|
||||||
|
let content_sha256 = content_sha256
|
||||||
|
.take()
|
||||||
|
.ok_or_bad_request("No signature provided")?;
|
||||||
|
Some(content_sha256)
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
// Parse body of uploaded file
|
// Parse body of uploaded file
|
||||||
let body = req.into_body();
|
let (head, body) = req.into_parts();
|
||||||
|
let body = body.map_err(Error::from);
|
||||||
|
|
||||||
let mut chunker = BodyChunker::new(body, garage.config.block_size);
|
let body = if let Some(signature) = payload_seed_signature {
|
||||||
|
let secret_key = &api_key
|
||||||
|
.state
|
||||||
|
.as_option()
|
||||||
|
.ok_or_internal_error("Deleted key state")?
|
||||||
|
.secret_key;
|
||||||
|
|
||||||
|
let date = head
|
||||||
|
.headers
|
||||||
|
.get("x-amz-date")
|
||||||
|
.ok_or_bad_request("Missing X-Amz-Date field")?
|
||||||
|
.to_str()?;
|
||||||
|
let date: NaiveDateTime =
|
||||||
|
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
|
||||||
|
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
||||||
|
|
||||||
|
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
|
||||||
|
let signing_hmac = crate::signature::signing_hmac(
|
||||||
|
&date,
|
||||||
|
secret_key,
|
||||||
|
&garage.config.s3_api.s3_region,
|
||||||
|
"s3",
|
||||||
|
)
|
||||||
|
.ok_or_internal_error("Unable to build signing HMAC")?;
|
||||||
|
|
||||||
|
SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
|
||||||
|
.map_err(Error::from)
|
||||||
|
.boxed()
|
||||||
|
} else {
|
||||||
|
body.boxed()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
let first_block = chunker.next().await?.unwrap_or_default();
|
let first_block = chunker.next().await?.unwrap_or_default();
|
||||||
|
|
||||||
// If body is small enough, store it directly in the object table
|
// If body is small enough, store it directly in the object table
|
||||||
|
@ -178,13 +226,13 @@ fn ensure_checksum_matches(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_and_put_blocks(
|
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
first_block: Vec<u8>,
|
first_block: Vec<u8>,
|
||||||
first_block_hash: Hash,
|
first_block_hash: Hash,
|
||||||
chunker: &mut BodyChunker,
|
chunker: &mut StreamChunker<S>,
|
||||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
||||||
let mut md5hasher = Md5::new();
|
let mut md5hasher = Md5::new();
|
||||||
let mut sha256hasher = Sha256::new();
|
let mut sha256hasher = Sha256::new();
|
||||||
|
@ -205,8 +253,11 @@ async fn read_and_put_blocks(
|
||||||
.rpc_put_block(first_block_hash, first_block);
|
.rpc_put_block(first_block_hash, first_block);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (_, _, next_block) =
|
let (_, _, next_block) = futures::try_join!(
|
||||||
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
|
put_curr_block.map_err(Error::from),
|
||||||
|
put_curr_version_block.map_err(Error::from),
|
||||||
|
chunker.next(),
|
||||||
|
)?;
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
md5hasher.update(&block[..]);
|
md5hasher.update(&block[..]);
|
||||||
sha256hasher.update(&block[..]);
|
sha256hasher.update(&block[..]);
|
||||||
|
@ -266,32 +317,34 @@ async fn put_block_meta(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BodyChunker {
|
struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
|
||||||
body: Body,
|
stream: S,
|
||||||
read_all: bool,
|
read_all: bool,
|
||||||
block_size: usize,
|
block_size: usize,
|
||||||
buf: VecDeque<u8>,
|
buf: VecDeque<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BodyChunker {
|
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
|
||||||
fn new(body: Body, block_size: usize) -> Self {
|
fn new(stream: S, block_size: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
body,
|
stream,
|
||||||
read_all: false,
|
read_all: false,
|
||||||
block_size,
|
block_size,
|
||||||
buf: VecDeque::with_capacity(2 * block_size),
|
buf: VecDeque::with_capacity(2 * block_size),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
|
|
||||||
|
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
|
||||||
while !self.read_all && self.buf.len() < self.block_size {
|
while !self.read_all && self.buf.len() < self.block_size {
|
||||||
if let Some(block) = self.body.next().await {
|
if let Some(block) = self.stream.next().await {
|
||||||
let bytes = block?;
|
let bytes = block?;
|
||||||
lx marked this conversation as resolved
Outdated
lx
commented
If signature was a If signature was a `Hash` we would benefit from the custom `impl Debug for Hash` and we could just derive Debug for Header and not implement it manually
|
|||||||
trace!("Body next: {} bytes", bytes.len());
|
trace!("Body next: {} bytes", bytes.len());
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
lx
commented
If If `signature` was of type `Hash` it would already have its own debug that shows (part of) the bytes in hex, so we could derive Debug instead of having a custom impl.
|
|||||||
self.buf.extend(&bytes[..]);
|
self.buf.extend(bytes);
|
||||||
} else {
|
} else {
|
||||||
self.read_all = true;
|
self.read_all = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.buf.is_empty() {
|
if self.buf.is_empty() {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else if self.buf.len() <= self.block_size {
|
} else if self.buf.len() <= self.block_size {
|
||||||
|
@ -368,12 +421,20 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
// Read first chuck, and at the same time try to get object to see if it exists
|
// Read first chuck, and at the same time try to get object to see if it exists
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
|
|
||||||
|
let body = req.into_body().map_err(Error::from);
|
||||||
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
|
|
||||||
let (object, version, first_block) = futures::try_join!(
|
let (object, version, first_block) = futures::try_join!(
|
||||||
garage.object_table.get(&bucket_id, &key),
|
garage
|
||||||
garage.version_table.get(&version_uuid, &EmptyKey),
|
.object_table
|
||||||
chunker.next()
|
.get(&bucket_id, &key)
|
||||||
|
.map_err(Error::from),
|
||||||
|
garage
|
||||||
|
.version_table
|
||||||
|
.get(&version_uuid, &EmptyKey)
|
||||||
|
.map_err(Error::from),
|
||||||
|
chunker.next(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Check object is valid and multipart block can be accepted
|
// Check object is valid and multipart block can be accepted
|
||||||
|
@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload(
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
lx
commented
Is there a good reason to make this generic on error type E ? Most things in Garage aren't generic and just return an Is there a good reason to make this generic on error type E ? Most things in Garage aren't generic and just return an `error::Error` and that's simpler
|
|||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
lx marked this conversation as resolved
Outdated
lx
commented
In the current codebase, nothing is generic on the error type, we always use In the current codebase, nothing is generic on the error type, we always use `garage_api::error::Error`. Is there a reason why StreamChunker needs a generic error type?
|
|||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
|
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
|
||||||
|
|
|
@ -44,7 +44,10 @@ pub async fn handle_put_website(
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut bucket = garage
|
let mut bucket = garage
|
||||||
.bucket_table
|
.bucket_table
|
||||||
|
|
47
src/api/signature/mod.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use hmac::{Hmac, Mac, NewMac};
|
||||||
|
use sha2::Sha256;
|
||||||
|
|
||||||
|
use garage_util::data::{sha256sum, Hash};
|
||||||
|
|
||||||
|
use crate::error::*;
|
||||||
|
|
||||||
|
pub mod payload;
|
||||||
|
pub mod streaming;
|
||||||
|
|
||||||
|
pub const SHORT_DATE: &str = "%Y%m%d";
|
||||||
|
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
|
||||||
|
|
||||||
|
type HmacSha256 = Hmac<Sha256>;
|
||||||
|
|
||||||
|
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
|
||||||
|
if expected_sha256 != sha256sum(body) {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
"Request content hash does not match signed hash".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn signing_hmac(
|
||||||
|
datetime: &DateTime<Utc>,
|
||||||
|
secret_key: &str,
|
||||||
|
region: &str,
|
||||||
|
service: &str,
|
||||||
|
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
|
||||||
|
let secret = String::from("AWS4") + secret_key;
|
||||||
|
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
|
||||||
|
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
|
||||||
|
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
|
||||||
|
region_hmac.update(region.as_bytes());
|
||||||
|
let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?;
|
||||||
|
service_hmac.update(service.as_bytes());
|
||||||
|
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
|
||||||
|
signing_hmac.update(b"aws4_request");
|
||||||
|
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
|
||||||
|
Ok(hmac)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String {
|
||||||
|
format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,)
|
||||||
|
}
|
|
@ -1,25 +1,23 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||||
use hmac::{Hmac, Mac, NewMac};
|
use hmac::Mac;
|
||||||
use hyper::{Body, Method, Request};
|
use hyper::{Body, Method, Request};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::{sha256sum, Hash};
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::*;
|
use garage_model::key_table::*;
|
||||||
|
|
||||||
|
use super::signing_hmac;
|
||||||
|
use super::{LONG_DATETIME, SHORT_DATE};
|
||||||
|
|
||||||
use crate::encoding::uri_encode;
|
use crate::encoding::uri_encode;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
|
||||||
const SHORT_DATE: &str = "%Y%m%d";
|
pub async fn check_payload_signature(
|
||||||
const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
|
|
||||||
|
|
||||||
type HmacSha256 = Hmac<Sha256>;
|
|
||||||
|
|
||||||
pub async fn check_signature(
|
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
request: &Request<Body>,
|
request: &Request<Body>,
|
||||||
) -> Result<(Key, Option<Hash>), Error> {
|
) -> Result<(Key, Option<Hash>), Error> {
|
||||||
|
@ -97,13 +95,13 @@ pub async fn check_signature(
|
||||||
|
|
||||||
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
|
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
|
||||||
None
|
None
|
||||||
|
} else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
|
||||||
|
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
|
||||||
|
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
|
||||||
} else {
|
} else {
|
||||||
let bytes = hex::decode(authorization.content_sha256)
|
let bytes = hex::decode(authorization.content_sha256)
|
||||||
.ok_or_bad_request("Invalid content sha256 hash")?;
|
.ok_or_bad_request("Invalid content sha256 hash")?;
|
||||||
Some(
|
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?)
|
||||||
Hash::try_from(&bytes[..])
|
|
||||||
.ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?,
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((key, content_sha256))
|
Ok((key, content_sha256))
|
||||||
|
@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &
|
||||||
.join("\n")
|
.join("\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signing_hmac(
|
|
||||||
datetime: &DateTime<Utc>,
|
|
||||||
secret_key: &str,
|
|
||||||
region: &str,
|
|
||||||
service: &str,
|
|
||||||
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
|
|
||||||
let secret = String::from("AWS4") + secret_key;
|
|
||||||
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
|
|
||||||
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
|
|
||||||
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
|
|
||||||
region_hmac.update(region.as_bytes());
|
|
||||||
let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?;
|
|
||||||
service_hmac.update(service.as_bytes());
|
|
||||||
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
|
|
||||||
signing_hmac.update(b"aws4_request");
|
|
||||||
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
|
|
||||||
Ok(hmac)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn canonical_request(
|
fn canonical_request(
|
||||||
method: &Method,
|
method: &Method,
|
||||||
url_path: &str,
|
url_path: &str,
|
||||||
|
@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
|
||||||
"".to_string()
|
"".to_string()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn verify_signed_content(content_sha256: Option<Hash>, body: &[u8]) -> Result<(), Error> {
|
|
||||||
let expected_sha256 =
|
|
||||||
content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?;
|
|
||||||
if expected_sha256 != sha256sum(body) {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
"Request content hash does not match signed hash".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
319
src/api/signature/streaming.rs
Normal file
|
@ -0,0 +1,319 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::task;
|
||||||
|
use hyper::body::Bytes;
|
||||||
|
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
use hmac::Mac;
|
||||||
|
|
||||||
|
use super::sha256sum;
|
||||||
|
use super::HmacSha256;
|
||||||
|
use super::LONG_DATETIME;
|
||||||
|
|
||||||
|
use crate::error::*;
|
||||||
|
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
lx
commented
This doesn't do a check, it returns the expected signature, so it shouldn't be called This doesn't do a check, it returns the expected signature, so it shouldn't be called `check_*`
|
|||||||
|
/// Result of `sha256("")`
|
||||||
|
const EMPTY_STRING_HEX_DIGEST: &str =
|
||||||
|
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
|
||||||
|
|
||||||
|
fn compute_streaming_payload_signature(
|
||||||
|
signing_hmac: &HmacSha256,
|
||||||
|
date: DateTime<Utc>,
|
||||||
|
scope: &str,
|
||||||
|
previous_signature: Hash,
|
||||||
|
content_sha256: Hash,
|
||||||
|
) -> Result<Hash, Error> {
|
||||||
|
let string_to_sign = [
|
||||||
|
"AWS4-HMAC-SHA256-PAYLOAD",
|
||||||
|
&date.format(LONG_DATETIME).to_string(),
|
||||||
|
scope,
|
||||||
|
&hex::encode(previous_signature),
|
||||||
|
EMPTY_STRING_HEX_DIGEST,
|
||||||
|
&hex::encode(content_sha256),
|
||||||
|
]
|
||||||
|
.join("\n");
|
||||||
|
|
||||||
|
let mut hmac = signing_hmac.clone();
|
||||||
|
hmac.update(string_to_sign.as_bytes());
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
trinity-1686a
commented
instead of recalculating the signing key from date, secret_key and region each time, couldn't instead of recalculating the signing key from date, secret_key and region each time, couldn't `check_streaming_payload_signature` take the precomputed signing key? It would cut down on parameters/fieds here and in `SignedPayloadChunker`, and be (negligeably) faster due to less hmac computation
|
|||||||
|
|
||||||
|
Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
|
||||||
|
}
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
lx
commented
If If `hash::try_from` fails here, I think it's an internal error and not a bad request (it can only happen if the hmac doesn't have the good number of bytes, which should never happen)
|
|||||||
|
|
||||||
|
mod payload {
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
|
pub enum Error<I> {
|
||||||
|
Parser(nom::error::Error<I>),
|
||||||
|
BadSignature,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> Error<I> {
|
||||||
|
pub fn description(&self) -> &str {
|
||||||
|
match *self {
|
||||||
|
Error::Parser(ref e) => e.code.description(),
|
||||||
|
Error::BadSignature => "Bad signature",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Header {
|
||||||
|
pub size: usize,
|
||||||
|
pub signature: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Header {
|
||||||
|
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
|
||||||
|
use nom::bytes::streaming::tag;
|
||||||
|
use nom::character::streaming::hex_digit1;
|
||||||
|
use nom::combinator::map_res;
|
||||||
|
use nom::number::streaming::hex_u32;
|
||||||
|
|
||||||
|
macro_rules! try_parse {
|
||||||
|
($expr:expr) => {
|
||||||
|
$expr.map_err(|e| e.map(Error::Parser))?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, size) = try_parse!(hex_u32(input));
|
||||||
|
let (input, _) = try_parse!(tag(";")(input));
|
||||||
|
|
||||||
|
let (input, _) = try_parse!(tag("chunk-signature=")(input));
|
||||||
|
let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
|
||||||
|
let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
|
||||||
|
|
||||||
|
let (input, _) = try_parse!(tag("\r\n")(input));
|
||||||
|
|
||||||
|
let header = Header {
|
||||||
|
size: size as usize,
|
||||||
|
signature,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((input, header))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum SignedPayloadStreamError {
|
||||||
|
Stream(Error),
|
||||||
|
InvalidSignature,
|
||||||
|
Message(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SignedPayloadStreamError {
|
||||||
|
fn message(msg: &str) -> Self {
|
||||||
|
SignedPayloadStreamError::Message(msg.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SignedPayloadStreamError> for Error {
|
||||||
|
fn from(err: SignedPayloadStreamError) -> Self {
|
||||||
|
match err {
|
||||||
|
SignedPayloadStreamError::Stream(e) => e,
|
||||||
|
SignedPayloadStreamError::InvalidSignature => {
|
||||||
|
Error::BadRequest("Invalid payload signature".into())
|
||||||
|
}
|
||||||
|
SignedPayloadStreamError::Message(e) => {
|
||||||
|
Error::BadRequest(format!("Chunk format error: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> From<payload::Error<I>> for SignedPayloadStreamError {
|
||||||
|
fn from(err: payload::Error<I>) -> Self {
|
||||||
|
Self::message(err.description())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
|
||||||
|
fn from(err: nom::error::Error<I>) -> Self {
|
||||||
|
Self::message(err.code.description())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SignedPayload {
|
||||||
|
header: payload::Header,
|
||||||
|
data: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>>,
|
||||||
|
{
|
||||||
|
#[pin]
|
||||||
|
stream: S,
|
||||||
|
buf: bytes::BytesMut,
|
||||||
|
datetime: DateTime<Utc>,
|
||||||
|
scope: String,
|
||||||
|
signing_hmac: HmacSha256,
|
||||||
|
previous_signature: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>>,
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
stream: S,
|
||||||
|
signing_hmac: HmacSha256,
|
||||||
|
datetime: DateTime<Utc>,
|
||||||
|
scope: &str,
|
||||||
|
seed_signature: Hash,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
Ok(Self {
|
||||||
|
stream,
|
||||||
|
buf: bytes::BytesMut::new(),
|
||||||
|
datetime,
|
||||||
|
scope: scope.into(),
|
||||||
|
signing_hmac,
|
||||||
|
previous_signature: seed_signature,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
|
||||||
|
use nom::bytes::streaming::{tag, take};
|
||||||
|
|
||||||
|
macro_rules! try_parse {
|
||||||
|
($expr:expr) => {
|
||||||
|
$expr.map_err(nom::Err::convert)?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, header) = try_parse!(payload::Header::parse(input));
|
||||||
|
|
||||||
|
// 0-sized chunk is the last
|
||||||
|
if header.size == 0 {
|
||||||
|
return Ok((
|
||||||
|
input,
|
||||||
|
SignedPayload {
|
||||||
|
header,
|
||||||
|
data: Bytes::new(),
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
|
||||||
|
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
|
||||||
|
|
||||||
|
let data = Bytes::from(data.to_vec());
|
||||||
|
|
||||||
|
Ok((input, SignedPayload { header, data }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Stream for SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>> + Unpin,
|
||||||
|
{
|
||||||
|
type Item = Result<Bytes, SignedPayloadStreamError>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> task::Poll<Option<Self::Item>> {
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
lx
commented
What if the payload input stream is interrupted in the middle? Do we take care of exiting the infinite loop and return an error? What if the payload input stream is interrupted in the middle? Do we take care of exiting the infinite loop and return an error?
|
|||||||
|
let mut this = self.project();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (input, payload) = match Self::parse_next(this.buf) {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(nom::Err::Incomplete(_)) => {
|
||||||
|
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
|
||||||
|
Some(Ok(bytes)) => {
|
||||||
|
this.buf.extend(bytes);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
|
||||||
|
"Unexpected EOF",
|
||||||
|
))));
|
||||||
KokaKiwi marked this conversation as resolved
Outdated
trinity-1686a
commented
this bit has an invalid edge case : by cutting the stream just before a new chunk header, an attacker can truncate the file without it being rejected. Getting here (inner stream returns None and this.buf is empy) is either such a truncation, or a call to SignedPayloadStream::poll_next after it returned Ok(Ready(None)) once, which is a contract error ( this bit has an invalid edge case : by cutting the stream just before a new chunk header, an attacker can truncate the file without it being rejected. Getting here (inner stream returns None and this.buf is empy) is either such a truncation, or a call to SignedPayloadStream::poll_next after it returned Ok(Ready(None)) once, which is a contract error ([`Ok(Ready(None)) means that the stream has terminated, and poll_next should not be invoked again`](https://docs.rs/futures/0.2.0/futures/stream/trait.Stream.html#return-value)), so this check and return can be safely removed
|
|||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
|
||||||
|
return Poll::Ready(Some(Err(e)))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 0-sized chunk is the last
|
||||||
|
if payload.data.is_empty() {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let data_sha256sum = sha256sum(&payload.data);
|
||||||
|
|
||||||
|
let expected_signature = compute_streaming_payload_signature(
|
||||||
|
this.signing_hmac,
|
||||||
|
*this.datetime,
|
||||||
|
this.scope,
|
||||||
|
*this.previous_signature,
|
||||||
|
data_sha256sum,
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if payload.header.signature != expected_signature {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
|
||||||
|
}
|
||||||
|
|
||||||
|
*this.buf = input.into();
|
||||||
|
*this.previous_signature = payload.header.signature;
|
||||||
|
|
||||||
|
return Poll::Ready(Some(Ok(payload.data)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
self.stream.size_hint()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use futures::prelude::*;
|
||||||
|
|
||||||
|
use super::{SignedPayloadStream, SignedPayloadStreamError};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_interrupted_signed_payload_stream() {
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
|
let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0
|
||||||
|
.unwrap()
|
||||||
|
.with_timezone(&Utc);
|
||||||
|
let secret_key = "test";
|
||||||
|
let region = "test";
|
||||||
|
let scope = crate::signature::compute_scope(&datetime, region);
|
||||||
|
let signing_hmac =
|
||||||
|
crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();
|
||||||
|
|
||||||
|
let data: &[&[u8]] = &[b"1"];
|
||||||
|
let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into())));
|
||||||
|
|
||||||
|
let seed_signature = Hash::default();
|
||||||
|
|
||||||
|
let mut stream =
|
||||||
|
SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap();
|
||||||
|
|
||||||
|
assert!(stream.try_next().await.is_err());
|
||||||
|
match stream.try_next().await {
|
||||||
|
Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {}
|
||||||
|
item => panic!(
|
||||||
|
"Unexpected result, expected early EOF error, got {:?}",
|
||||||
|
item
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Is it mandatory to provide a signed content-sha256 header? I thought rather it was the opposite because it seems to cause issues on other endpoints (e.g. #164)