Support STREAMING-AWS4-HMAC-SHA256-PAYLOAD (#64) #156

Merged
lx merged 11 commits from KokaKiwi/garage:aws4-payload-signing into main 2022-01-17 09:55:31 +00:00
13 changed files with 578 additions and 77 deletions

39
Cargo.lock generated
View file

@ -433,7 +433,9 @@ dependencies = [
"idna",
"log",
"md-5",
"nom",
"percent-encoding",
"pin-project",
"quick-xml",
"roxmltree",
"serde",
@ -967,6 +969,12 @@ dependencies = [
"autocfg",
]
[[package]]
name = "minimal-lexical"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"
[[package]]
name = "mio"
version = "0.7.13"
@ -1011,6 +1019,17 @@ dependencies = [
"tokio-util",
]
[[package]]
name = "nom"
version = "7.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"
dependencies = [
"memchr",
"minimal-lexical",
"version_check",
]
[[package]]
name = "ntapi"
version = "0.3.6"
@ -1092,6 +1111,26 @@ version = "2.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"
[[package]]
name = "pin-project"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"
dependencies = [
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "pin-project-lite"
version = "0.2.7"

View file

@ -671,7 +671,9 @@ in
idna = rustPackages."registry+https://github.com/rust-lang/crates.io-index".idna."0.2.3" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.14" { inherit profileName; };
md5 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".md-5."0.9.1" { inherit profileName; };
nom = rustPackages."registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" { inherit profileName; };
percent_encoding = rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.1.0" { inherit profileName; };
pin_project = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" { inherit profileName; };
quick_xml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quick-xml."0.21.0" { inherit profileName; };
roxmltree = rustPackages."registry+https://github.com/rust-lang/crates.io-index".roxmltree."0.14.1" { inherit profileName; };
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.130" { inherit profileName; };
@ -1321,6 +1323,16 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" = overridableMkRustCrate (profileName: rec {
name = "minimal-lexical";
version = "0.2.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a"; };
features = builtins.concatLists [
[ "std" ]
];
});
"registry+https://github.com/rust-lang/crates.io-index".mio."0.7.13" = overridableMkRustCrate (profileName: rec {
name = "mio";
version = "0.7.13";
@ -1381,6 +1393,25 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".nom."7.1.0" = overridableMkRustCrate (profileName: rec {
name = "nom";
version = "7.1.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "1b1d11e1ef389c76fe5b81bcaf2ea32cf88b62bc494e19f493d0b30e7a930109"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
[ "std" ]
];
dependencies = {
memchr = rustPackages."registry+https://github.com/rust-lang/crates.io-index".memchr."2.4.1" { inherit profileName; };
minimal_lexical = rustPackages."registry+https://github.com/rust-lang/crates.io-index".minimal-lexical."0.2.1" { inherit profileName; };
};
buildDependencies = {
version_check = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".version_check."0.9.3" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".ntapi."0.3.6" = overridableMkRustCrate (profileName: rec {
name = "ntapi";
version = "0.3.6";
@ -1490,6 +1521,28 @@ in
src = fetchCratesIo { inherit name version; sha256 = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e"; };
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project."1.0.8" = overridableMkRustCrate (profileName: rec {
name = "pin-project";
version = "1.0.8";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "576bc800220cc65dac09e99e97b08b358cfab6e17078de8dc5fee223bd2d0c08"; };
dependencies = {
pin_project_internal = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project-internal."1.0.8" = overridableMkRustCrate (profileName: rec {
name = "pin-project-internal";
version = "1.0.8";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "6e8fe8163d14ce7f0cdac2e040116f22eac817edabff0be91e8aff7e9accf389"; };
dependencies = {
proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.30" { inherit profileName; };
quote = rustPackages."registry+https://github.com/rust-lang/crates.io-index".quote."1.0.10" { inherit profileName; };
syn = rustPackages."registry+https://github.com/rust-lang/crates.io-index".syn."1.0.80" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.7" = overridableMkRustCrate (profileName: rec {
name = "pin-project-lite";
version = "0.2.7";

View file

@ -76,7 +76,7 @@ in let
*/
''^(src|tests)'' # fixed default
''.*\.(rs|toml)$'' # fixed default
''^(crdt|replication|cli|helper)'' # our crate submodules
''^(crdt|replication|cli|helper|signature)'' # our crate submodules
];
};

View file

@ -8,7 +8,7 @@ cat > /tmp/garage.mc/config.json <<EOF
"version": "10",
"aliases": {
"garage": {
"url": "https://localhost:4443",
"url": "http://127.0.0.1:3911",
"accessKey": "$ACCESS_KEY",
"secretKey": "$SECRET_KEY",
"api": "S3v4",

View file

@ -28,10 +28,12 @@ hmac = "0.10"
idna = "0.2"
log = "0.4"
md-5 = "0.9"
nom = "7.1"
sha2 = "0.9"
futures = "0.3"
futures-util = "0.3"
pin-project = "1.0"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
http = "0.2"

View file

@ -15,7 +15,7 @@ use garage_model::garage::Garage;
use garage_model::key_table::Key;
use crate::error::*;
use crate::signature::check_signature;
use crate::signature::payload::check_payload_signature;
use crate::helpers::*;
use crate::s3_bucket::*;
@ -90,7 +90,7 @@ async fn handler(
}
async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Response<Body>, Error> {
let (api_key, content_sha256) = check_signature(&garage, &req).await?;
let (api_key, content_sha256) = check_payload_signature(&garage, &req).await?;
let authority = req
.headers()
@ -176,7 +176,7 @@ async fn handler_inner(garage: Arc<Garage>, req: Request<Body>) -> Result<Respon
.await
}
Endpoint::PutObject { key, .. } => {
handle_put(garage, req, bucket_id, &key, content_sha256).await
handle_put(garage, req, bucket_id, &key, &api_key, content_sha256).await
}
Endpoint::AbortMultipartUpload { key, upload_id, .. } => {
handle_abort_multipart_upload(garage, bucket_id, &key, &upload_id).await

View file

@ -120,7 +120,10 @@ pub async fn handle_create_bucket(
bucket_name: String,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
verify_signed_content(content_sha256, &body[..])?;
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let cmd =
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;

View file

@ -81,7 +81,10 @@ pub async fn handle_delete_objects(
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
verify_signed_content(content_sha256, &body[..])?;
KokaKiwi marked this conversation as resolved Outdated
Outdated
Review

Is it mandatory to provide a signed content-sha256 header? I thought rather it was the opposite because it seems to cause issues on other endpoints (e.g. #164)

Is it mandatory to provide a signed content-sha256 header? I thought rather it was the opposite because it seems to cause issues on other endpoints (e.g. #164)
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;

View file

@ -1,8 +1,10 @@
use std::collections::{BTreeMap, VecDeque};
use std::sync::Arc;
use futures::stream::*;
use hyper::{Body, Request, Response};
use chrono::{DateTime, NaiveDateTime, Utc};
use futures::{prelude::*, TryFutureExt};
use hyper::body::{Body, Bytes};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256;
@ -14,19 +16,23 @@ use garage_util::time::*;
use garage_model::block::INLINE_THRESHOLD;
use garage_model::block_ref_table::*;
use garage_model::garage::Garage;
use garage_model::key_table::Key;
use garage_model::object_table::*;
use garage_model::version_table::*;
use crate::error::*;
use crate::s3_xml;
use crate::signature::verify_signed_content;
use crate::signature::streaming::SignedPayloadStream;
use crate::signature::LONG_DATETIME;
use crate::signature::{compute_scope, verify_signed_content};
pub async fn handle_put(
garage: Arc<Garage>,
req: Request<Body>,
bucket_id: Uuid,
key: &str,
content_sha256: Option<Hash>,
api_key: &Key,
mut content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
// Generate identity of new version
let version_uuid = gen_uuid();
@ -40,11 +46,53 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
};
let payload_seed_signature = match req.headers().get("x-amz-content-sha256") {
Some(header) if header == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" => {
let content_sha256 = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
Some(content_sha256)
}
_ => None,
};
// Parse body of uploaded file
let body = req.into_body();
let (head, body) = req.into_parts();
let body = body.map_err(Error::from);
let mut chunker = BodyChunker::new(body, garage.config.block_size);
let body = if let Some(signature) = payload_seed_signature {
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
let date = head
.headers
.get("x-amz-date")
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
let scope = compute_scope(&date, &garage.config.s3_api.s3_region);
let signing_hmac = crate::signature::signing_hmac(
&date,
secret_key,
&garage.config.s3_api.s3_region,
"s3",
)
.ok_or_internal_error("Unable to build signing HMAC")?;
SignedPayloadStream::new(body, signing_hmac, date, &scope, signature)?
.map_err(Error::from)
.boxed()
} else {
body.boxed()
};
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let first_block = chunker.next().await?.unwrap_or_default();
// If body is small enough, store it directly in the object table
@ -178,13 +226,13 @@ fn ensure_checksum_matches(
Ok(())
}
async fn read_and_put_blocks(
async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage: &Garage,
version: &Version,
part_number: u64,
first_block: Vec<u8>,
first_block_hash: Hash,
chunker: &mut BodyChunker,
chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let mut md5hasher = Md5::new();
let mut sha256hasher = Sha256::new();
@ -205,8 +253,11 @@ async fn read_and_put_blocks(
.rpc_put_block(first_block_hash, first_block);
loop {
let (_, _, next_block) =
futures::try_join!(put_curr_block, put_curr_version_block, chunker.next())?;
let (_, _, next_block) = futures::try_join!(
put_curr_block.map_err(Error::from),
put_curr_version_block.map_err(Error::from),
chunker.next(),
)?;
if let Some(block) = next_block {
md5hasher.update(&block[..]);
sha256hasher.update(&block[..]);
@ -266,32 +317,34 @@ async fn put_block_meta(
Ok(())
}
struct BodyChunker {
body: Body,
struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
stream: S,
read_all: bool,
block_size: usize,
buf: VecDeque<u8>,
}
impl BodyChunker {
fn new(body: Body, block_size: usize) -> Self {
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
fn new(stream: S, block_size: usize) -> Self {
Self {
body,
stream,
read_all: false,
block_size,
buf: VecDeque::with_capacity(2 * block_size),
}
}
async fn next(&mut self) -> Result<Option<Vec<u8>>, GarageError> {
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
while !self.read_all && self.buf.len() < self.block_size {
if let Some(block) = self.body.next().await {
if let Some(block) = self.stream.next().await {
let bytes = block?;
lx marked this conversation as resolved Outdated
Outdated
Review

If signature was a Hash we would benefit from the custom impl Debug for Hash and we could just derive Debug for Header and not implement it manually

If signature was a `Hash` we would benefit from the custom `impl Debug for Hash` and we could just derive Debug for Header and not implement it manually
trace!("Body next: {} bytes", bytes.len());
KokaKiwi marked this conversation as resolved Outdated
Outdated
Review

If signature was of type Hash it would already have its own debug that shows (part of) the bytes in hex, so we could derive Debug instead of having a custom impl.

If `signature` was of type `Hash` it would already have its own debug that shows (part of) the bytes in hex, so we could derive Debug instead of having a custom impl.
self.buf.extend(&bytes[..]);
self.buf.extend(bytes);
} else {
self.read_all = true;
}
}
if self.buf.is_empty() {
Ok(None)
} else if self.buf.len() <= self.block_size {
@ -368,12 +421,20 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
let mut chunker = BodyChunker::new(req.into_body(), garage.config.block_size);
let body = req.into_body().map_err(Error::from);
let mut chunker = StreamChunker::new(body, garage.config.block_size);
let (object, version, first_block) = futures::try_join!(
garage.object_table.get(&bucket_id, &key),
garage.version_table.get(&version_uuid, &EmptyKey),
chunker.next()
garage
.object_table
.get(&bucket_id, &key)
.map_err(Error::from),
garage
.version_table
.get(&version_uuid, &EmptyKey)
.map_err(Error::from),
chunker.next(),
)?;
// Check object is valid and multipart block can be accepted
@ -444,7 +505,10 @@ pub async fn handle_complete_multipart_upload(
content_sha256: Option<Hash>,
KokaKiwi marked this conversation as resolved Outdated
Outdated
Review

Is there a good reason to make this generic on error type E ? Most things in Garage aren't generic and just return an error::Error and that's simpler

Is there a good reason to make this generic on error type E ? Most things in Garage aren't generic and just return an `error::Error` and that's simpler
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
lx marked this conversation as resolved Outdated
Outdated
Review

In the current codebase, nothing is generic on the error type, we always use garage_api::error::Error. Is there a reason why StreamChunker needs a generic error type?

In the current codebase, nothing is generic on the error type, we always use `garage_api::error::Error`. Is there a reason why StreamChunker needs a generic error type?
verify_signed_content(content_sha256, &body[..])?;
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let body_list_of_parts = parse_complete_multpart_upload_body(&body_xml)

View file

@ -44,7 +44,10 @@ pub async fn handle_put_website(
content_sha256: Option<Hash>,
) -> Result<Response<Body>, Error> {
let body = hyper::body::to_bytes(req.into_body()).await?;
verify_signed_content(content_sha256, &body[..])?;
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let mut bucket = garage
.bucket_table

47
src/api/signature/mod.rs Normal file
View file

@ -0,0 +1,47 @@
use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac, NewMac};
use sha2::Sha256;
use garage_util::data::{sha256sum, Hash};
use crate::error::*;
pub mod payload;
pub mod streaming;
pub const SHORT_DATE: &str = "%Y%m%d";
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
type HmacSha256 = Hmac<Sha256>;
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
return Err(Error::BadRequest(
"Request content hash does not match signed hash".to_string(),
));
}
Ok(())
}
pub fn signing_hmac(
datetime: &DateTime<Utc>,
secret_key: &str,
region: &str,
service: &str,
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
let secret = String::from("AWS4") + secret_key;
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
region_hmac.update(region.as_bytes());
let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
service_hmac.update(service.as_bytes());
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
signing_hmac.update(b"aws4_request");
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
Ok(hmac)
}
pub fn compute_scope(datetime: &DateTime<Utc>, region: &str) -> String {
format!("{}/{}/s3/aws4_request", datetime.format(SHORT_DATE), region,)
}

View file

@ -1,25 +1,23 @@
use std::collections::HashMap;
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
use hmac::{Hmac, Mac, NewMac};
use hmac::Mac;
use hyper::{Body, Method, Request};
use sha2::{Digest, Sha256};
use garage_table::*;
use garage_util::data::{sha256sum, Hash};
use garage_util::data::Hash;
use garage_model::garage::Garage;
use garage_model::key_table::*;
use super::signing_hmac;
use super::{LONG_DATETIME, SHORT_DATE};
use crate::encoding::uri_encode;
use crate::error::*;
const SHORT_DATE: &str = "%Y%m%d";
const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
type HmacSha256 = Hmac<Sha256>;
pub async fn check_signature(
pub async fn check_payload_signature(
garage: &Garage,
request: &Request<Body>,
) -> Result<(Key, Option<Hash>), Error> {
@ -97,13 +95,13 @@ pub async fn check_signature(
let content_sha256 = if authorization.content_sha256 == "UNSIGNED-PAYLOAD" {
None
} else if authorization.content_sha256 == "STREAMING-AWS4-HMAC-SHA256-PAYLOAD" {
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
} else {
let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?;
Some(
Hash::try_from(&bytes[..])
.ok_or_else(|| Error::BadRequest("Invalid content sha256 hash".to_string()))?,
)
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?)
};
Ok((key, content_sha256))
@ -222,25 +220,6 @@ fn string_to_sign(datetime: &DateTime<Utc>, scope_string: &str, canonical_req: &
.join("\n")
}
fn signing_hmac(
datetime: &DateTime<Utc>,
secret_key: &str,
region: &str,
service: &str,
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
let secret = String::from("AWS4") + secret_key;
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
region_hmac.update(region.as_bytes());
let mut service_hmac = HmacSha256::new_varkey(&region_hmac.finalize().into_bytes())?;
service_hmac.update(service.as_bytes());
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
signing_hmac.update(b"aws4_request");
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
Ok(hmac)
}
fn canonical_request(
method: &Method,
url_path: &str,
@ -288,14 +267,3 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
"".to_string()
}
}
pub fn verify_signed_content(content_sha256: Option<Hash>, body: &[u8]) -> Result<(), Error> {
let expected_sha256 =
content_sha256.ok_or_bad_request("Request content hash not signed, aborting.")?;
if expected_sha256 != sha256sum(body) {
return Err(Error::BadRequest(
"Request content hash does not match signed hash".to_string(),
));
}
Ok(())
}

View file

@ -0,0 +1,319 @@
use std::pin::Pin;
use chrono::{DateTime, Utc};
use futures::prelude::*;
use futures::task;
use hyper::body::Bytes;
use garage_util::data::Hash;
use hmac::Mac;
use super::sha256sum;
use super::HmacSha256;
use super::LONG_DATETIME;
use crate::error::*;
KokaKiwi marked this conversation as resolved Outdated
Outdated
Review

This doesn't do a check, it returns the expected signature, so it shouldn't be called check_*

This doesn't do a check, it returns the expected signature, so it shouldn't be called `check_*`
/// Result of `sha256("")`
const EMPTY_STRING_HEX_DIGEST: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
fn compute_streaming_payload_signature(
signing_hmac: &HmacSha256,
date: DateTime<Utc>,
scope: &str,
previous_signature: Hash,
content_sha256: Hash,
) -> Result<Hash, Error> {
let string_to_sign = [
"AWS4-HMAC-SHA256-PAYLOAD",
&date.format(LONG_DATETIME).to_string(),
scope,
&hex::encode(previous_signature),
EMPTY_STRING_HEX_DIGEST,
&hex::encode(content_sha256),
]
.join("\n");
let mut hmac = signing_hmac.clone();
hmac.update(string_to_sign.as_bytes());
KokaKiwi marked this conversation as resolved Outdated

instead of recalculating the signing key from date, secret_key and region each time, couldn't check_streaming_payload_signature take the precomputed signing key? It would cut down on parameters/fieds here and in SignedPayloadChunker, and be (negligeably) faster due to less hmac computation

instead of recalculating the signing key from date, secret_key and region each time, couldn't `check_streaming_payload_signature` take the precomputed signing key? It would cut down on parameters/fieds here and in `SignedPayloadChunker`, and be (negligeably) faster due to less hmac computation
Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")
}
KokaKiwi marked this conversation as resolved Outdated
Outdated
Review

If hash::try_from fails here, I think it's an internal error and not a bad request (it can only happen if the hmac doesn't have the good number of bytes, which should never happen)

If `hash::try_from` fails here, I think it's an internal error and not a bad request (it can only happen if the hmac doesn't have the good number of bytes, which should never happen)
mod payload {
use garage_util::data::Hash;
pub enum Error<I> {
Parser(nom::error::Error<I>),
BadSignature,
}
impl<I> Error<I> {
pub fn description(&self) -> &str {
match *self {
Error::Parser(ref e) => e.code.description(),
Error::BadSignature => "Bad signature",
}
}
}
#[derive(Debug, Clone)]
pub struct Header {
pub size: usize,
pub signature: Hash,
}
impl Header {
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
use nom::bytes::streaming::tag;
use nom::character::streaming::hex_digit1;
use nom::combinator::map_res;
use nom::number::streaming::hex_u32;
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(|e| e.map(Error::Parser))?
};
}
let (input, size) = try_parse!(hex_u32(input));
let (input, _) = try_parse!(tag(";")(input));
let (input, _) = try_parse!(tag("chunk-signature=")(input));
let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
let (input, _) = try_parse!(tag("\r\n")(input));
let header = Header {
size: size as usize,
signature,
};
Ok((input, header))
}
}
}
#[derive(Debug)]
pub enum SignedPayloadStreamError {
Stream(Error),
InvalidSignature,
Message(String),
}
impl SignedPayloadStreamError {
fn message(msg: &str) -> Self {
SignedPayloadStreamError::Message(msg.into())
}
}
impl From<SignedPayloadStreamError> for Error {
fn from(err: SignedPayloadStreamError) -> Self {
match err {
SignedPayloadStreamError::Stream(e) => e,
SignedPayloadStreamError::InvalidSignature => {
Error::BadRequest("Invalid payload signature".into())
}
SignedPayloadStreamError::Message(e) => {
Error::BadRequest(format!("Chunk format error: {}", e))
}
}
}
}
impl<I> From<payload::Error<I>> for SignedPayloadStreamError {
fn from(err: payload::Error<I>) -> Self {
Self::message(err.description())
}
}
impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
fn from(err: nom::error::Error<I>) -> Self {
Self::message(err.code.description())
}
}
struct SignedPayload {
header: payload::Header,
data: Bytes,
}
#[pin_project::pin_project]
pub struct SignedPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>>,
{
#[pin]
stream: S,
buf: bytes::BytesMut,
datetime: DateTime<Utc>,
scope: String,
signing_hmac: HmacSha256,
previous_signature: Hash,
}
impl<S> SignedPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>>,
{
pub fn new(
stream: S,
signing_hmac: HmacSha256,
datetime: DateTime<Utc>,
scope: &str,
seed_signature: Hash,
) -> Result<Self, Error> {
Ok(Self {
stream,
buf: bytes::BytesMut::new(),
datetime,
scope: scope.into(),
signing_hmac,
previous_signature: seed_signature,
})
}
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
use nom::bytes::streaming::{tag, take};
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(nom::Err::convert)?
};
}
let (input, header) = try_parse!(payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
return Ok((
input,
SignedPayload {
header,
data: Bytes::new(),
},
));
}
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
let (input, _) = try_parse!(tag::<_, _, nom::error::Error<_>>("\r\n")(input));
let data = Bytes::from(data.to_vec());
Ok((input, SignedPayload { header, data }))
}
}
impl<S> Stream for SignedPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
type Item = Result<Bytes, SignedPayloadStreamError>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use std::task::Poll;
KokaKiwi marked this conversation as resolved Outdated
Outdated
Review

What if the payload input stream is interrupted in the middle? Do we take care of exiting the infinite loop and return an error?

What if the payload input stream is interrupted in the middle? Do we take care of exiting the infinite loop and return an error?
let mut this = self.project();
loop {
let (input, payload) = match Self::parse_next(this.buf) {
Ok(res) => res,
Err(nom::Err::Incomplete(_)) => {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
continue;
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
}
None => {
return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
"Unexpected EOF",
))));
KokaKiwi marked this conversation as resolved Outdated

this bit has an invalid edge case : by cutting the stream just before a new chunk header, an attacker can truncate the file without it being rejected. Getting here (inner stream returns None and this.buf is empy) is either such a truncation, or a call to SignedPayloadStream::poll_next after it returned Ok(Ready(None)) once, which is a contract error (Ok(Ready(None)) means that the stream has terminated, and poll_next should not be invoked again), so this check and return can be safely removed

this bit has an invalid edge case : by cutting the stream just before a new chunk header, an attacker can truncate the file without it being rejected. Getting here (inner stream returns None and this.buf is empy) is either such a truncation, or a call to SignedPayloadStream::poll_next after it returned Ok(Ready(None)) once, which is a contract error ([`Ok(Ready(None)) means that the stream has terminated, and poll_next should not be invoked again`](https://docs.rs/futures/0.2.0/futures/stream/trait.Stream.html#return-value)), so this check and return can be safely removed
}
}
}
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
return Poll::Ready(Some(Err(e)))
}
};
// 0-sized chunk is the last
if payload.data.is_empty() {
return Poll::Ready(None);
}
let data_sha256sum = sha256sum(&payload.data);
let expected_signature = compute_streaming_payload_signature(
this.signing_hmac,
*this.datetime,
this.scope,
*this.previous_signature,
data_sha256sum,
)
.map_err(|e| {
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
})?;
if payload.header.signature != expected_signature {
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
}
*this.buf = input.into();
*this.previous_signature = payload.header.signature;
return Poll::Ready(Some(Ok(payload.data)));
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.stream.size_hint()
}
}
#[cfg(test)]
mod tests {
use futures::prelude::*;
use super::{SignedPayloadStream, SignedPayloadStreamError};
#[tokio::test]
async fn test_interrupted_signed_payload_stream() {
use chrono::{DateTime, Utc};
use garage_util::data::Hash;
let datetime = DateTime::parse_from_rfc3339("2021-12-13T13:12:42+01:00") // TODO UNIX 0
.unwrap()
.with_timezone(&Utc);
let secret_key = "test";
let region = "test";
let scope = crate::signature::compute_scope(&datetime, region);
let signing_hmac =
crate::signature::signing_hmac(&datetime, secret_key, region, "s3").unwrap();
let data: &[&[u8]] = &[b"1"];
let body = futures::stream::iter(data.iter().map(|block| Ok(block.as_ref().into())));
let seed_signature = Hash::default();
let mut stream =
SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature).unwrap();
assert!(stream.try_next().await.is_err());
match stream.try_next().await {
Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {}
item => panic!(
"Unexpected result, expected early EOF error, got {:?}",
item
),
}
}
}