Support STREAMING-AWS4-HMAC-SHA256-PAYLOAD (#64) (#156)
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
Closes #64. Reviewed-on: #156 Co-authored-by: Jill <kokakiwi@deuxfleurs.fr> Co-committed-by: Jill <kokakiwi@deuxfleurs.fr>
This commit is contained in:
parent
60c0033c8b
commit
b45dcc1925
13 changed files with 578 additions and 77 deletions
39
Cargo.lock
generated
39
Cargo.lock
generated
|
@ -433,7 +433,9 @@ dependencies = [
|
||||||
"idna",
|
"idna",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5",
|
||||||
|
"nom",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
|
"pin-project",
|
||||||
"quick-xml",
|
"quick-xml",
|
||||||
"roxmltree",
|
"roxmltree",
|
||||||
"serde",
|
"serde",
|
||||||
|
@ -967,6 +969,12 @@ dependencies = [
|
||||||
"autocfg",
|
"autocfg",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "minimal-lexical"
|
||||||
|
version = "0.2.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "mio"
|
name = "mio"
|
||||||
version = "0.7.13"
|
version = "0.7.13"
|
||||||
|
@ -1011,6 +1019,17 @@ dependencies = [
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "nom"
|
||||||
|
version = "7.1.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
"minimal-lexical",
|
||||||
|
"version_check",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ntapi"
|
name = "ntapi"
|
||||||
version = "0.3.6"
|
version = "0.3.6"
|
||||||
|
@ -1092,6 +1111,26 @@ version = "2.1.0"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project"
|
||||||
|
version = "1.0.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
|
||||||
|
dependencies = [
|
||||||
|
"pin-project-internal",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "pin-project-internal"
|
||||||
|
version = "1.0.8"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
|
||||||
|
dependencies = [
|
||||||
|
"proc-macro2",
|
||||||
|
"quote",
|
||||||
|
"syn",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "pin-project-lite"
|
name = "pin-project-lite"
|
||||||
version = "0.2.7"
|
version = "0.2.7"
|
||||||
|
|
53
Cargo.nix
53
Cargo.nix
|
@ -671,7 +671,9 @@ in
|
||||||
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
|
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
|
||||||
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
|
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
|
||||||
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
|
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
|
||||||
|
nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; };
|
||||||
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
|
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
|
||||||
|
pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; };
|
||||||
quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; };
|
quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; };
|
||||||
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
|
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
|
||||||
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
|
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
|
||||||
|
@ -1321,6 +1323,16 @@ in
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "minimal-lexical";
|
||||||
|
version = "0.2.1";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; };
|
||||||
|
features = builtins.concatLists [
|
||||||
|
[ "std" ]
|
||||||
|
];
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "mio";
|
name = "mio";
|
||||||
version = "0.7.13";
|
version = "0.7.13";
|
||||||
|
@ -1381,6 +1393,25 @@ in
|
||||||
};
|
};
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "nom";
|
||||||
|
version = "7.1.0";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; };
|
||||||
|
features = builtins.concatLists [
|
||||||
|
[ "alloc" ]
|
||||||
|
[ "default" ]
|
||||||
|
[ "std" ]
|
||||||
|
];
|
||||||
|
dependencies = {
|
||||||
|
memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; };
|
||||||
|
minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; };
|
||||||
|
};
|
||||||
|
buildDependencies = {
|
||||||
|
version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; };
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "ntapi";
|
name = "ntapi";
|
||||||
version = "0.3.6";
|
version = "0.3.6";
|
||||||
|
@ -1490,6 +1521,28 @@ in
|
||||||
src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; };
|
src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; };
|
||||||
});
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "pin-project";
|
||||||
|
version = "1.0.8";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; };
|
||||||
|
dependencies = {
|
||||||
|
pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; };
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
|
"registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec {
|
||||||
|
name = "pin-project-internal";
|
||||||
|
version = "1.0.8";
|
||||||
|
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||||
|
src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; };
|
||||||
|
dependencies = {
|
||||||
|
proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; };
|
||||||
|
quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; };
|
||||||
|
syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; };
|
||||||
|
};
|
||||||
|
});
|
||||||
|
|
||||||
"registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec {
|
"registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec {
|
||||||
name = "pin-project-lite";
|
name = "pin-project-lite";
|
||||||
version = "0.2.7";
|
version = "0.2.7";
|
||||||
|
|
|
@ -76,7 +76,7 @@ in let
|
||||||
*/
|
*/
|
||||||
''^(src|tests)'' # fixed default
|
''^(src|tests)'' # fixed default
|
||||||
''.*\.(rs|toml)$'' # fixed default
|
''.*\.(rs|toml)$'' # fixed default
|
||||||
''^(crdt|replication|cli|helper)'' # our crate submodules
|
''^(crdt|replication|cli|helper|signature)'' # our crate submodules
|
||||||
];
|
];
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@ cat > /tmp/garage.mc/config.json <<EOF
|
||||||
"version": "10",
|
"version": "10",
|
||||||
"aliases": {
|
"aliases": {
|
||||||
"garage": {
|
"garage": {
|
||||||
"url": "https://localhost:4443",
|
"url": "http://127.0.0.1:3911",
|
||||||
"accessKey": "$ACCESS_KEY",
|
"accessKey": "$ACCESS_KEY",
|
||||||
"secretKey": "$SECRET_KEY",
|
"secretKey": "$SECRET_KEY",
|
||||||
"api": "S3v4",
|
"api": "S3v4",
|
||||||
|
|
|
@ -28,10 +28,12 @@ hmac = "0.10"
|
||||||
idna = "0.2"
|
idna = "0.2"
|
||||||
log = "0.4"
|
log = "0.4"
|
||||||
md-5 = "0.9"
|
md-5 = "0.9"
|
||||||
|
nom = "7.1"
|
||||||
sha2 = "0.9"
|
sha2 = "0.9"
|
||||||
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
|
pin-project = "1.0"
|
||||||
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
|
||||||
|
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
|
|
|
@ -15,7 +15,7 @@ use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::Key;
|
use garage_model::key_table::Key;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::signature::check_signature;
|
use crate::signature::payload::check_payload_signature;
|
||||||
|
|
||||||
use crate::helpers::*;
|
use crate::helpers::*;
|
||||||
use crate::s3_bucket::*;
|
use crate::s3_bucket::*;
|
||||||
|
@ -90,7 +90,7 @@ async fn handler(
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
|
||||||
let (api_key, content_sha256) = check_signature(&garage, &req).await?;
|
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
|
||||||
|
|
||||||
let authority = req
|
let authority = req
|
||||||
.headers()
|
.headers()
|
||||||
|
@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
Endpoint::PutObject { key, .. } => {
|
Endpoint::PutObject { key, .. } => {
|
||||||
handle_put(garage, req, bucket_id, &key, content_sha256).await
|
handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await
|
||||||
}
|
}
|
||||||
Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
|
Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
|
||||||
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await
|
||||||
|
|
|
@ -120,7 +120,10 @@ pub async fn handle_create_bucket(
|
||||||
bucket_name: String,
|
bucket_name: String,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let cmd =
|
let cmd =
|
||||||
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
|
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;
|
||||||
|
|
|
@ -81,7 +81,10 @@ pub async fn handle_delete_objects(
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
use std::collections::{BTreeMap, VecDeque};
|
use std::collections::{BTreeMap, VecDeque};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::stream::*;
|
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||||
use hyper::{Body, Request, Response};
|
use futures::{prelude::*, TryFutureExt};
|
||||||
|
use hyper::body::{Body, Bytes};
|
||||||
|
use hyper::{Request, Response};
|
||||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
|
||||||
|
@ -14,19 +16,23 @@ use garage_util::time::*;
|
||||||
use garage_model::block::INLINE_THRESHOLD;
|
use garage_model::block::INLINE_THRESHOLD;
|
||||||
use garage_model::block_ref_table::*;
|
use garage_model::block_ref_table::*;
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_model::key_table::Key;
|
||||||
use garage_model::object_table::*;
|
use garage_model::object_table::*;
|
||||||
use garage_model::version_table::*;
|
use garage_model::version_table::*;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::s3_xml;
|
use crate::s3_xml;
|
||||||
use crate::signature::verify_signed_content;
|
use crate::signature::streaming::SignedPayloadStream;
|
||||||
|
use crate::signature::LONG_DATETIME;
|
||||||
|
use crate::signature::{compute_scope, verify_signed_content};
|
||||||
|
|
||||||
pub async fn handle_put(
|
pub async fn handle_put(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
req: Request<Body>,
|
req: Request<Body>,
|
||||||
bucket_id: Uuid,
|
bucket_id: Uuid,
|
||||||
key: &str,
|
key: &str,
|
||||||
content_sha256: Option<Hash>,
|
api_key: &Key,
|
||||||
|
mut content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
// Generate identity of new version
|
// Generate identity of new version
|
||||||
let version_uuid = gen_uuid();
|
let version_uuid = gen_uuid();
|
||||||
|
@ -40,11 +46,53 @@ pub async fn handle_put(
|
||||||
Some(x) => Some(x.to_str()?.to_string()),
|
Some(x) => Some(x.to_str()?.to_string()),
|
||||||
None => None,
|
None => None,
|
||||||
};
|
};
|
||||||
|
let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
|
||||||
|
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
|
||||||
|
let content_sha256 = content_sha256
|
||||||
|
.take()
|
||||||
|
.ok_or_bad_request("No signature provided")?;
|
||||||
|
Some(content_sha256)
|
||||||
|
}
|
||||||
|
_ => None,
|
||||||
|
};
|
||||||
|
|
||||||
// Parse body of uploaded file
|
// Parse body of uploaded file
|
||||||
let body = req.into_body();
|
let (head, body) = req.into_parts();
|
||||||
|
let body = body.map_err(Error::from);
|
||||||
|
|
||||||
let mut chunker = BodyChunker::new(body, garage.config.block_size);
|
let body = if let Some(signature) = payload_seed_signature {
|
||||||
|
let secret_key = &api_key
|
||||||
|
.state
|
||||||
|
.as_option()
|
||||||
|
.ok_or_internal_error("Deleted key state")?
|
||||||
|
.secret_key;
|
||||||
|
|
||||||
|
let date = head
|
||||||
|
.headers
|
||||||
|
.get("x-amz-date")
|
||||||
|
.ok_or_bad_request("Missing X-Amz-Date field")?
|
||||||
|
.to_str()?;
|
||||||
|
let date: NaiveDateTime =
|
||||||
|
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
|
||||||
|
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
||||||
|
|
||||||
|
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
|
||||||
|
let signing_hmac = crate::signature::signing_hmac(
|
||||||
|
&date,
|
||||||
|
secret_key,
|
||||||
|
&garage.config.s3_api.s3_region,
|
||||||
|
"s3",
|
||||||
|
)
|
||||||
|
.ok_or_internal_error("Unable to build signing HMAC")?;
|
||||||
|
|
||||||
|
SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
|
||||||
|
.map_err(Error::from)
|
||||||
|
.boxed()
|
||||||
|
} else {
|
||||||
|
body.boxed()
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
let first_block = chunker.next().await?.unwrap_or_default();
|
let first_block = chunker.next().await?.unwrap_or_default();
|
||||||
|
|
||||||
// If body is small enough, store it directly in the object table
|
// If body is small enough, store it directly in the object table
|
||||||
|
@ -178,13 +226,13 @@ fn ensure_checksum_matches(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_and_put_blocks(
|
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
first_block: Vec<u8>,
|
first_block: Vec<u8>,
|
||||||
first_block_hash: Hash,
|
first_block_hash: Hash,
|
||||||
chunker: &mut BodyChunker,
|
chunker: &mut StreamChunker<S>,
|
||||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
||||||
let mut md5hasher = Md5::new();
|
let mut md5hasher = Md5::new();
|
||||||
let mut sha256hasher = Sha256::new();
|
let mut sha256hasher = Sha256::new();
|
||||||
|
@ -205,8 +253,11 @@ async fn read_and_put_blocks(
|
||||||
.rpc_put_block(first_block_hash, first_block);
|
.rpc_put_block(first_block_hash, first_block);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
let (_, _, next_block) =
|
let (_, _, next_block) = futures::try_join!(
|
||||||
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
|
put_curr_block.map_err(Error::from),
|
||||||
|
put_curr_version_block.map_err(Error::from),
|
||||||
|
chunker.next(),
|
||||||
|
)?;
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
md5hasher.update(&block[..]);
|
md5hasher.update(&block[..]);
|
||||||
sha256hasher.update(&block[..]);
|
sha256hasher.update(&block[..]);
|
||||||
|
@ -266,32 +317,34 @@ async fn put_block_meta(
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
struct BodyChunker {
|
struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
|
||||||
body: Body,
|
stream: S,
|
||||||
read_all: bool,
|
read_all: bool,
|
||||||
block_size: usize,
|
block_size: usize,
|
||||||
buf: VecDeque<u8>,
|
buf: VecDeque<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BodyChunker {
|
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
|
||||||
fn new(body: Body, block_size: usize) -> Self {
|
fn new(stream: S, block_size: usize) -> Self {
|
||||||
Self {
|
Self {
|
||||||
body,
|
stream,
|
||||||
read_all: false,
|
read_all: false,
|
||||||
block_size,
|
block_size,
|
||||||
buf: VecDeque::with_capacity(2 * block_size),
|
buf: VecDeque::with_capacity(2 * block_size),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
|
|
||||||
|
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
|
||||||
while !self.read_all && self.buf.len() < self.block_size {
|
while !self.read_all && self.buf.len() < self.block_size {
|
||||||
if let Some(block) = self.body.next().await {
|
if let Some(block) = self.stream.next().await {
|
||||||
let bytes = block?;
|
let bytes = block?;
|
||||||
trace!("Body next: {} bytes", bytes.len());
|
trace!("Body next: {} bytes", bytes.len());
|
||||||
self.buf.extend(&bytes[..]);
|
self.buf.extend(bytes);
|
||||||
} else {
|
} else {
|
||||||
self.read_all = true;
|
self.read_all = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.buf.is_empty() {
|
if self.buf.is_empty() {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else if self.buf.len() <= self.block_size {
|
} else if self.buf.len() <= self.block_size {
|
||||||
|
@ -368,12 +421,20 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
// Read first chuck, and at the same time try to get object to see if it exists
|
// Read first chuck, and at the same time try to get object to see if it exists
|
||||||
let key = key.to_string();
|
let key = key.to_string();
|
||||||
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
|
|
||||||
|
let body = req.into_body().map_err(Error::from);
|
||||||
|
let mut chunker = StreamChunker::new(body, garage.config.block_size);
|
||||||
|
|
||||||
let (object, version, first_block) = futures::try_join!(
|
let (object, version, first_block) = futures::try_join!(
|
||||||
garage.object_table.get(&bucket_id, &key),
|
garage
|
||||||
garage.version_table.get(&version_uuid, &EmptyKey),
|
.object_table
|
||||||
chunker.next()
|
.get(&bucket_id, &key)
|
||||||
|
.map_err(Error::from),
|
||||||
|
garage
|
||||||
|
.version_table
|
||||||
|
.get(&version_uuid, &EmptyKey)
|
||||||
|
.map_err(Error::from),
|
||||||
|
chunker.next(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Check object is valid and multipart block can be accepted
|
// Check object is valid and multipart block can be accepted
|
||||||
|
@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload(
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
|
||||||
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
|
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)
|
||||||
|
|
|
@ -80,7 +80,10 @@ pub async fn handle_put_website(
|
||||||
content_sha256: Option<Hash>,
|
content_sha256: Option<Hash>,
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let body = hyper::body::to_bytes(req.into_body()).await?;
|
let body = hyper::body::to_bytes(req.into_body()).await?;
|
||||||
|
|
||||||
|
if let Some(content_sha256) = content_sha256 {
|
||||||
verify_signed_content(content_sha256, &body[..])?;
|
verify_signed_content(content_sha256, &body[..])?;
|
||||||
|
}
|
||||||
|
|
||||||
let mut bucket = garage
|
let mut bucket = garage
|
||||||
.bucket_table
|
.bucket_table
|
||||||
|
|
47
src/api/signature/mod.rs
Normal file
47
src/api/signature/mod.rs
Normal file
|
@ -0,0 +1,47 @@
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use hmac::{Hmac, Mac, NewMac};
|
||||||
|
use sha2::Sha256;
|
||||||
|
|
||||||
|
use garage_util::data::{sha256sum, Hash};
|
||||||
|
|
||||||
|
use crate::error::*;
|
||||||
|
|
||||||
|
pub mod payload;
|
||||||
|
pub mod streaming;
|
||||||
|
|
||||||
|
pub const SHORT_DATE: &str = "%Y%m%d";
|
||||||
|
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
|
||||||
|
|
||||||
|
type HmacSha256 = Hmac<Sha256>;
|
||||||
|
|
||||||
|
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
|
||||||
|
if expected_sha256 != sha256sum(body) {
|
||||||
|
return Err(Error::BadRequest(
|
||||||
|
"Request content hash does not match signed hash".to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn signing_hmac(
|
||||||
|
datetime: &DateTime<Utc>,
|
||||||
|
secret_key: &str,
|
||||||
|
region: &str,
|
||||||
|
service: &str,
|
||||||
|
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
|
||||||
|
let secret = String::from("AWS4") + secret_key;
|
||||||
|
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
|
||||||
|
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
|
||||||
|
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
|
||||||
|
region_hmac.update(region.as_bytes());
|
||||||
|
let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?;
|
||||||
|
service_hmac.update(service.as_bytes());
|
||||||
|
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
|
||||||
|
signing_hmac.update(b"aws4_request");
|
||||||
|
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
|
||||||
|
Ok(hmac)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String {
|
||||||
|
format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,)
|
||||||
|
}
|
|
@ -1,25 +1,23 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||||
use hmac::{Hmac, Mac, NewMac};
|
use hmac::Mac;
|
||||||
use hyper::{Body, Method, Request};
|
use hyper::{Body, Method, Request};
|
||||||
use sha2::{Digest, Sha256};
|
use sha2::{Digest, Sha256};
|
||||||
|
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::data::{sha256sum, Hash};
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
use garage_model::key_table::*;
|
use garage_model::key_table::*;
|
||||||
|
|
||||||
|
use super::signing_hmac;
|
||||||
|
use super::{LONG_DATETIME, SHORT_DATE};
|
||||||
|
|
||||||
use crate::encoding::uri_encode;
|
use crate::encoding::uri_encode;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
|
||||||
const SHORT_DATE: &str = "%Y%m%d";
|
pub async fn check_payload_signature(
|
||||||
const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
|
|
||||||
|
|
||||||
type HmacSha256 = Hmac<Sha256>;
|
|
||||||
|
|
||||||
pub async fn check_signature(
|
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
request: &Request<Body>,
|
request: &Request<Body>,
|
||||||
) -> Result<(Key, Option<Hash>), Error> {
|
) -> Result<(Key, Option<Hash>), Error> {
|
||||||
|
@ -97,13 +95,13 @@ pub async fn check_signature(
|
||||||
|
|
||||||
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
|
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
|
||||||
None
|
None
|
||||||
|
} else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
|
||||||
|
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
|
||||||
|
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
|
||||||
} else {
|
} else {
|
||||||
let bytes = hex::decode(authorization.content_sha256)
|
let bytes = hex::decode(authorization.content_sha256)
|
||||||
.ok_or_bad_request("Invalid content sha256 hash")?;
|
.ok_or_bad_request("Invalid content sha256 hash")?;
|
||||||
Some(
|
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?)
|
||||||
Hash::try_from(&bytes[..])
|
|
||||||
.ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?,
|
|
||||||
)
|
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((key, content_sha256))
|
Ok((key, content_sha256))
|
||||||
|
@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &
|
||||||
.join("\n")
|
.join("\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
fn signing_hmac(
|
|
||||||
datetime: &DateTime<Utc>,
|
|
||||||
secret_key: &str,
|
|
||||||
region: &str,
|
|
||||||
service: &str,
|
|
||||||
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
|
|
||||||
let secret = String::from("AWS4") + secret_key;
|
|
||||||
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
|
|
||||||
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
|
|
||||||
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
|
|
||||||
region_hmac.update(region.as_bytes());
|
|
||||||
let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?;
|
|
||||||
service_hmac.update(service.as_bytes());
|
|
||||||
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
|
|
||||||
signing_hmac.update(b"aws4_request");
|
|
||||||
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
|
|
||||||
Ok(hmac)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn canonical_request(
|
fn canonical_request(
|
||||||
method: &Method,
|
method: &Method,
|
||||||
url_path: &str,
|
url_path: &str,
|
||||||
|
@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
|
||||||
"".to_string()
|
"".to_string()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn verify_signed_content(content_sha256: Option<Hash>, body: &[u8]) -> Result<(), Error> {
|
|
||||||
let expected_sha256 =
|
|
||||||
content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?;
|
|
||||||
if expected_sha256 != sha256sum(body) {
|
|
||||||
return Err(Error::BadRequest(
|
|
||||||
"Request content hash does not match signed hash".to_string(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
319
src/api/signature/streaming.rs
Normal file
319
src/api/signature/streaming.rs
Normal file
|
@ -0,0 +1,319 @@
|
||||||
|
use std::pin::Pin;
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use futures::prelude::*;
|
||||||
|
use futures::task;
|
||||||
|
use hyper::body::Bytes;
|
||||||
|
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
use hmac::Mac;
|
||||||
|
|
||||||
|
use super::sha256sum;
|
||||||
|
use super::HmacSha256;
|
||||||
|
use super::LONG_DATETIME;
|
||||||
|
|
||||||
|
use crate::error::*;
|
||||||
|
|
||||||
|
/// Result of `sha256("")`
|
||||||
|
const EMPTY_STRING_HEX_DIGEST: &str =
|
||||||
|
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
|
||||||
|
|
||||||
|
fn compute_streaming_payload_signature(
|
||||||
|
signing_hmac: &HmacSha256,
|
||||||
|
date: DateTime<Utc>,
|
||||||
|
scope: &str,
|
||||||
|
previous_signature: Hash,
|
||||||
|
content_sha256: Hash,
|
||||||
|
) -> Result<Hash, Error> {
|
||||||
|
let string_to_sign = [
|
||||||
|
"AWS4-HMAC-SHA256-PAYLOAD",
|
||||||
|
&date.format(LONG_DATETIME).to_string(),
|
||||||
|
scope,
|
||||||
|
&hex::encode(previous_signature),
|
||||||
|
EMPTY_STRING_HEX_DIGEST,
|
||||||
|
&hex::encode(content_sha256),
|
||||||
|
]
|
||||||
|
.join("\n");
|
||||||
|
|
||||||
|
let mut hmac = signing_hmac.clone();
|
||||||
|
hmac.update(string_to_sign.as_bytes());
|
||||||
|
|
||||||
|
Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
|
||||||
|
}
|
||||||
|
|
||||||
|
mod payload {
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
|
pub enum Error<I> {
|
||||||
|
Parser(nom::error::Error<I>),
|
||||||
|
BadSignature,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> Error<I> {
|
||||||
|
pub fn description(&self) -> &str {
|
||||||
|
match *self {
|
||||||
|
Error::Parser(ref e) => e.code.description(),
|
||||||
|
Error::BadSignature => "Bad signature",
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct Header {
|
||||||
|
pub size: usize,
|
||||||
|
pub signature: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Header {
|
||||||
|
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
|
||||||
|
use nom::bytes::streaming::tag;
|
||||||
|
use nom::character::streaming::hex_digit1;
|
||||||
|
use nom::combinator::map_res;
|
||||||
|
use nom::number::streaming::hex_u32;
|
||||||
|
|
||||||
|
macro_rules! try_parse {
|
||||||
|
($expr:expr) => {
|
||||||
|
$expr.map_err(|e| e.map(Error::Parser))?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, size) = try_parse!(hex_u32(input));
|
||||||
|
let (input, _) = try_parse!(tag(";")(input));
|
||||||
|
|
||||||
|
let (input, _) = try_parse!(tag("chunk-signature=")(input));
|
||||||
|
let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
|
||||||
|
let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
|
||||||
|
|
||||||
|
let (input, _) = try_parse!(tag("\r\n")(input));
|
||||||
|
|
||||||
|
let header = Header {
|
||||||
|
size: size as usize,
|
||||||
|
signature,
|
||||||
|
};
|
||||||
|
|
||||||
|
Ok((input, header))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum SignedPayloadStreamError {
|
||||||
|
Stream(Error),
|
||||||
|
InvalidSignature,
|
||||||
|
Message(String),
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SignedPayloadStreamError {
|
||||||
|
fn message(msg: &str) -> Self {
|
||||||
|
SignedPayloadStreamError::Message(msg.into())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<SignedPayloadStreamError> for Error {
|
||||||
|
fn from(err: SignedPayloadStreamError) -> Self {
|
||||||
|
match err {
|
||||||
|
SignedPayloadStreamError::Stream(e) => e,
|
||||||
|
SignedPayloadStreamError::InvalidSignature => {
|
||||||
|
Error::BadRequest("Invalid payload signature".into())
|
||||||
|
}
|
||||||
|
SignedPayloadStreamError::Message(e) => {
|
||||||
|
Error::BadRequest(format!("Chunk format error: {}", e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> From<payload::Error<I>> for SignedPayloadStreamError {
|
||||||
|
fn from(err: payload::Error<I>) -> Self {
|
||||||
|
Self::message(err.description())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
|
||||||
|
fn from(err: nom::error::Error<I>) -> Self {
|
||||||
|
Self::message(err.code.description())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SignedPayload {
|
||||||
|
header: payload::Header,
|
||||||
|
data: Bytes,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[pin_project::pin_project]
|
||||||
|
pub struct SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>>,
|
||||||
|
{
|
||||||
|
#[pin]
|
||||||
|
stream: S,
|
||||||
|
buf: bytes::BytesMut,
|
||||||
|
datetime: DateTime<Utc>,
|
||||||
|
scope: String,
|
||||||
|
signing_hmac: HmacSha256,
|
||||||
|
previous_signature: Hash,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>>,
|
||||||
|
{
|
||||||
|
pub fn new(
|
||||||
|
stream: S,
|
||||||
|
signing_hmac: HmacSha256,
|
||||||
|
datetime: DateTime<Utc>,
|
||||||
|
scope: &str,
|
||||||
|
seed_signature: Hash,
|
||||||
|
) -> Result<Self, Error> {
|
||||||
|
Ok(Self {
|
||||||
|
stream,
|
||||||
|
buf: bytes::BytesMut::new(),
|
||||||
|
datetime,
|
||||||
|
scope: scope.into(),
|
||||||
|
signing_hmac,
|
||||||
|
previous_signature: seed_signature,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
|
||||||
|
use nom::bytes::streaming::{tag, take};
|
||||||
|
|
||||||
|
macro_rules! try_parse {
|
||||||
|
($expr:expr) => {
|
||||||
|
$expr.map_err(nom::Err::convert)?
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, header) = try_parse!(payload::Header::parse(input));
|
||||||
|
|
||||||
|
// 0-sized chunk is the last
|
||||||
|
if header.size == 0 {
|
||||||
|
return Ok((
|
||||||
|
input,
|
||||||
|
SignedPayload {
|
||||||
|
header,
|
||||||
|
data: Bytes::new(),
|
||||||
|
},
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
|
||||||
|
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
|
||||||
|
|
||||||
|
let data = Bytes::from(data.to_vec());
|
||||||
|
|
||||||
|
Ok((input, SignedPayload { header, data }))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<S> Stream for SignedPayloadStream<S>
|
||||||
|
where
|
||||||
|
S: Stream<Item = Result<Bytes, Error>> + Unpin,
|
||||||
|
{
|
||||||
|
type Item = Result<Bytes, SignedPayloadStreamError>;
|
||||||
|
|
||||||
|
fn poll_next(
|
||||||
|
self: Pin<&mut Self>,
|
||||||
|
cx: &mut task::Context<'_>,
|
||||||
|
) -> task::Poll<Option<Self::Item>> {
|
||||||
|
use std::task::Poll;
|
||||||
|
|
||||||
|
let mut this = self.project();
|
||||||
|
|
||||||
|
loop {
|
||||||
|
let (input, payload) = match Self::parse_next(this.buf) {
|
||||||
|
Ok(res) => res,
|
||||||
|
Err(nom::Err::Incomplete(_)) => {
|
||||||
|
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
|
||||||
|
Some(Ok(bytes)) => {
|
||||||
|
this.buf.extend(bytes);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Some(Err(e)) => {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
|
||||||
|
}
|
||||||
|
None => {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
|
||||||
|
"Unexpected EOF",
|
||||||
|
))));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
|
||||||
|
return Poll::Ready(Some(Err(e)))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// 0-sized chunk is the last
|
||||||
|
if payload.data.is_empty() {
|
||||||
|
return Poll::Ready(None);
|
||||||
|
}
|
||||||
|
|
||||||
|
let data_sha256sum = sha256sum(&payload.data);
|
||||||
|
|
||||||
|
let expected_signature = compute_streaming_payload_signature(
|
||||||
|
this.signing_hmac,
|
||||||
|
*this.datetime,
|
||||||
|
this.scope,
|
||||||
|
*this.previous_signature,
|
||||||
|
data_sha256sum,
|
||||||
|
)
|
||||||
|
.map_err(|e| {
|
||||||
|
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
if payload.header.signature != expected_signature {
|
||||||
|
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
|
||||||
|
}
|
||||||
|
|
||||||
|
*this.buf = input.into();
|
||||||
|
*this.previous_signature = payload.header.signature;
|
||||||
|
|
||||||
|
return Poll::Ready(Some(Ok(payload.data)));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn size_hint(&self) -> (usize, Option<usize>) {
|
||||||
|
self.stream.size_hint()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use futures::prelude::*;
|
||||||
|
|
||||||
|
use super::{SignedPayloadStream, SignedPayloadStreamError};
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn test_interrupted_signed_payload_stream() {
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
|
||||||
|
use garage_util::data::Hash;
|
||||||
|
|
||||||
|
let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0
|
||||||
|
.unwrap()
|
||||||
|
.with_timezone(&Utc);
|
||||||
|
let secret_key = "test";
|
||||||
|
let region = "test";
|
||||||
|
let scope = crate::signature::compute_scope(&datetime, region);
|
||||||
|
let signing_hmac =
|
||||||
|
crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();
|
||||||
|
|
||||||
|
let data: &[&[u8]] = &[b"1"];
|
||||||
|
let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into())));
|
||||||
|
|
||||||
|
let seed_signature = Hash::default();
|
||||||
|
|
||||||
|
let mut stream =
|
||||||
|
SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap();
|
||||||
|
|
||||||
|
assert!(stream.try_next().await.is_err());
|
||||||
|
match stream.try_next().await {
|
||||||
|
Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {}
|
||||||
|
item => panic!(
|
||||||
|
"Unexpected result, expected early EOF error, got {:?}",
|
||||||
|
item
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue