Compare commits

..

1 commit

Author SHA1 Message Date
0e7ef87496 wip: add boto3 test for STREAMING-UNSIGNED-PAYLOAD-TRAILER
All checks were successful
ci/woodpecker/push/debug Pipeline was successful
ci/woodpecker/pr/debug Pipeline was successful
The version of boto3 packaged in nixpkgs does not enable this test yet.
2025-02-18 12:17:18 +01:00
40 changed files with 740 additions and 1710 deletions

103
Cargo.lock generated
View file

@ -238,9 +238,9 @@ checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26"
[[package]]
name = "aws-credential-types"
version = "1.2.1"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60e8f6b615cb5fc60a98132268508ad104310f0cfb25a1c22eee76efdf9154da"
checksum = "33cc49dcdd31c8b6e79850a179af4c367669150c7ac0135f176c61bec81a70f7"
dependencies = [
"aws-smithy-async",
"aws-smithy-runtime-api",
@ -250,16 +250,15 @@ dependencies = [
[[package]]
name = "aws-runtime"
version = "1.5.5"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76dd04d39cc12844c0994f2c9c5a6f5184c22e9188ec1ff723de41910a21dcad"
checksum = "eb031bff99877c26c28895766f7bb8484a05e24547e370768d6cc9db514662aa"
dependencies = [
"aws-credential-types",
"aws-sigv4",
"aws-smithy-async",
"aws-smithy-eventstream",
"aws-smithy-http",
"aws-smithy-runtime",
"aws-smithy-runtime-api",
"aws-smithy-types",
"aws-types",
@ -267,7 +266,6 @@ dependencies = [
"fastrand",
"http 0.2.12",
"http-body 0.4.6",
"once_cell",
"percent-encoding",
"pin-project-lite",
"tracing",
@ -276,9 +274,9 @@ dependencies = [
[[package]]
name = "aws-sdk-config"
version = "1.62.0"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0f94d79b8eef608af51b5415d13f5c670dec177880c6f78cd27bea968e5c9b76"
checksum = "4af4f5b0f64563ada272e009cc95027effb546110ed85d014611420ac0d97858"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -298,9 +296,9 @@ dependencies = [
[[package]]
name = "aws-sdk-s3"
version = "1.68.0"
version = "1.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bc5ddf1dc70287dc9a2f953766a1fe15e3e74aef02fd1335f2afa475c9b4f4fc"
checksum = "951f7730f51a2155c711c85c79f337fbc02a577fa99d2a0a8059acfce5392113"
dependencies = [
"aws-credential-types",
"aws-runtime",
@ -316,25 +314,20 @@ dependencies = [
"aws-smithy-xml",
"aws-types",
"bytes",
"fastrand",
"hex",
"hmac",
"http 0.2.12",
"http-body 0.4.6",
"lru",
"once_cell",
"percent-encoding",
"regex-lite",
"sha2",
"tracing",
"url",
]
[[package]]
name = "aws-sigv4"
version = "1.2.9"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bfe75fad52793ce6dec0dc3d4b1f388f038b5eb866c8d4d7f3a8e21b5ea5051"
checksum = "c371c6b0ac54d4605eb6f016624fb5c7c2925d315fdf600ac1bf21b19d5f1742"
dependencies = [
"aws-credential-types",
"aws-smithy-eventstream",
@ -361,9 +354,9 @@ dependencies = [
[[package]]
name = "aws-smithy-async"
version = "1.2.4"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fa59d1327d8b5053c54bf2eaae63bf629ba9e904434d0835a28ed3c0ed0a614e"
checksum = "72ee2d09cce0ef3ae526679b522835d63e75fb427aca5413cd371e490d52dcc6"
dependencies = [
"futures-util",
"pin-project-lite",
@ -372,9 +365,9 @@ dependencies = [
[[package]]
name = "aws-smithy-checksums"
version = "0.60.13"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba1a71073fca26775c8b5189175ea8863afb1c9ea2cceb02a5de5ad9dfbaa795"
checksum = "be2acd1b9c6ae5859999250ed5a62423aedc5cf69045b844432de15fa2f31f2b"
dependencies = [
"aws-smithy-http",
"aws-smithy-types",
@ -393,9 +386,9 @@ dependencies = [
[[package]]
name = "aws-smithy-eventstream"
version = "0.60.6"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b18559a41e0c909b77625adf2b8c50de480a8041e5e4a3f5f7d177db70abc5a"
checksum = "e6363078f927f612b970edf9d1903ef5cef9a64d1e8423525ebb1f0a1633c858"
dependencies = [
"aws-smithy-types",
"bytes",
@ -404,9 +397,9 @@ dependencies = [
[[package]]
name = "aws-smithy-http"
version = "0.60.12"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7809c27ad8da6a6a68c454e651d4962479e81472aa19ae99e59f9aba1f9713cc"
checksum = "dab56aea3cd9e1101a0a999447fb346afb680ab1406cebc44b32346e25b4117d"
dependencies = [
"aws-smithy-eventstream",
"aws-smithy-runtime-api",
@ -425,18 +418,18 @@ dependencies = [
[[package]]
name = "aws-smithy-json"
version = "0.61.2"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "623a51127f24c30776c8b374295f2df78d92517386f77ba30773f15a30ce1422"
checksum = "fd3898ca6518f9215f62678870064398f00031912390efd03f1f6ef56d83aa8e"
dependencies = [
"aws-smithy-types",
]
[[package]]
name = "aws-smithy-runtime"
version = "1.7.8"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d526a12d9ed61fadefda24abe2e682892ba288c2018bcb38b1b4c111d13f6d92"
checksum = "fafdab38f40ad7816e7da5dec279400dd505160780083759f01441af1bbb10ea"
dependencies = [
"aws-smithy-async",
"aws-smithy-http",
@ -447,8 +440,6 @@ dependencies = [
"h2 0.3.24",
"http 0.2.12",
"http-body 0.4.6",
"http-body 1.0.1",
"httparse",
"hyper 0.14.32",
"hyper-rustls 0.24.2",
"once_cell",
@ -461,36 +452,31 @@ dependencies = [
[[package]]
name = "aws-smithy-runtime-api"
version = "1.7.3"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "92165296a47a812b267b4f41032ff8069ab7ff783696d217f0994a0d7ab585cd"
checksum = "c18276dd28852f34b3bf501f4f3719781f4999a51c7bff1a5c6dc8c4529adc29"
dependencies = [
"aws-smithy-async",
"aws-smithy-types",
"bytes",
"http 0.2.12",
"http 1.2.0",
"pin-project-lite",
"tokio",
"tracing",
"zeroize",
]
[[package]]
name = "aws-smithy-types"
version = "1.2.13"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c7b8a53819e42f10d0821f56da995e1470b199686a1809168db6ca485665f042"
checksum = "bb3e134004170d3303718baa2a4eb4ca64ee0a1c0a7041dca31b38be0fb414f3"
dependencies = [
"base64-simd",
"bytes",
"bytes-utils",
"futures-core",
"http 0.2.12",
"http 1.2.0",
"http-body 0.4.6",
"http-body 1.0.1",
"http-body-util",
"itoa",
"num-integer",
"pin-project-lite",
@ -504,23 +490,24 @@ dependencies = [
[[package]]
name = "aws-smithy-xml"
version = "0.60.9"
version = "0.60.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab0b0166827aa700d3dc519f72f8b3a91c35d0b8d042dc5d643a91e6f80648fc"
checksum = "8604a11b25e9ecaf32f9aa56b9fe253c5e2f606a3477f0071e96d3155a5ed218"
dependencies = [
"xmlparser",
]
[[package]]
name = "aws-types"
version = "1.3.5"
version = "1.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dfbd0a668309ec1f66c0f6bda4840dd6d4796ae26d699ebc266d7cc95c6d040f"
checksum = "789bbe008e65636fe1b6dbbb374c40c8960d1232b96af5ff4aec349f9c4accf4"
dependencies = [
"aws-credential-types",
"aws-smithy-async",
"aws-smithy-runtime-api",
"aws-smithy-types",
"http 0.2.12",
"rustc_version",
"tracing",
]
@ -1129,12 +1116,6 @@ version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "foldhash"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0d2fde1f7b3d48b8395d5f2de76c18a528bd6a9cdde438df747bfcba3e05d6f"
[[package]]
name = "form_urlencoded"
version = "1.2.1"
@ -1249,7 +1230,6 @@ dependencies = [
"bytes",
"bytesize",
"chrono",
"crc32fast",
"format_table",
"futures",
"garage_api_admin",
@ -1321,11 +1301,8 @@ dependencies = [
name = "garage_api_common"
version = "1.0.1"
dependencies = [
"base64 0.21.7",
"bytes",
"chrono",
"crc32c",
"crc32fast",
"crypto-common",
"err-derive",
"futures",
@ -1339,13 +1316,11 @@ dependencies = [
"hyper 1.6.0",
"hyper-util",
"idna 0.5.0",
"md-5",
"nom",
"opentelemetry",
"pin-project",
"serde",
"serde_json",
"sha1",
"sha2",
"tokio",
"tracing",
@ -1763,11 +1738,6 @@ name = "hashbrown"
version = "0.15.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bf151400ff0baff5465007dd2f3e717f3fe502074ca563069ce3a6629d07b289"
dependencies = [
"allocator-api2",
"equivalent",
"foldhash",
]
[[package]]
name = "hashlink"
@ -2624,15 +2594,6 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "lru"
version = "0.12.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234cf4f4a04dc1f57e24b96cc0cd600cf2af460d4161ac5ecdd0af8e1f3b2a38"
dependencies = [
"hashbrown 0.15.2",
]
[[package]]
name = "matchers"
version = "0.1.0"
@ -4948,7 +4909,7 @@ version = "0.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb"
dependencies = [
"windows-sys 0.52.0",
"windows-sys 0.48.0",
]
[[package]]

View file

@ -141,8 +141,8 @@ thiserror = "1.0"
assert-json-diff = "2.0"
rustc_version = "0.4.0"
static_init = "1.0"
aws-sdk-config = "1.62"
aws-sdk-s3 = "=1.68"
aws-sdk-config = "1.13"
aws-sdk-s3 = "1.14"
[profile.dev]
#lto = "thin" # disabled for now, adds 2-4 min to each CI build

View file

@ -11,7 +11,7 @@ PATH="${GARAGE_DEBUG}:${GARAGE_RELEASE}:${NIX_RELEASE}:$PATH"
FANCYCOLORS=("41m" "42m" "44m" "45m" "100m" "104m")
export RUST_BACKTRACE=1
export RUST_LOG=garage=info,garage_api_common=debug,garage_api_s3=debug
export RUST_LOG=garage=info,garage_api=debug
MAIN_LABEL="\e[${FANCYCOLORS[0]}[main]\e[49m"
if [ -z "$GARAGE_BIN" ]; then

View file

@ -112,6 +112,19 @@ if [ -z "$SKIP_S3CMD" ]; then
done
fi
# BOTO3
if [ -z "$SKIP_BOTO3" ]; then
echo "🛠️ Testing with boto3 for STREAMING-UNSIGNED-PAYLOAD-TRAILER"
source ${SCRIPT_FOLDER}/dev-env-aws.sh
AWS_ENDPOINT_URL=https://localhost:4443 python <<EOF
import boto3
client = boto3.client('s3', verify=False)
client.put_object(Body=b'hello world', Bucket='eprouvette', Key='test.s3.txt')
client.delete_object(Bucket='eprouvette', Key='test.s3.txt')
print("OK!")
EOF
fi
# Minio Client
if [ -z "$SKIP_MC" ]; then
echo "🛠️ Testing with mc (minio client)"

View file

@ -26,6 +26,8 @@ in
s3cmd
minio-client
rclone
(python312.withPackages (ps: [ ps.boto3 ]))
socat
psmisc
which

View file

@ -18,21 +18,16 @@ garage_model.workspace = true
garage_table.workspace = true
garage_util.workspace = true
base64.workspace = true
bytes.workspace = true
chrono.workspace = true
crc32fast.workspace = true
crc32c.workspace = true
crypto-common.workspace = true
err-derive.workspace = true
hex.workspace = true
hmac.workspace = true
md-5.workspace = true
idna.workspace = true
tracing.workspace = true
nom.workspace = true
pin-project.workspace = true
sha1.workspace = true
sha2.workspace = true
futures.workspace = true

View file

@ -14,9 +14,9 @@ use crate::common_error::{
};
use crate::helpers::*;
pub fn find_matching_cors_rule<'a, B>(
pub fn find_matching_cors_rule<'a>(
bucket_params: &'a BucketParams,
req: &Request<B>,
req: &Request<impl Body>,
) -> Result<Option<&'a GarageCorsRule>, CommonError> {
if let Some(cors_config) = bucket_params.cors_config.get() {
if let Some(origin) = req.headers().get("Origin") {
@ -132,8 +132,8 @@ pub async fn handle_options_api(
}
}
pub fn handle_options_for_bucket<B>(
req: &Request<B>,
pub fn handle_options_for_bucket(
req: &Request<IncomingBody>,
bucket_params: &BucketParams,
) -> Result<Response<EmptyBody>, CommonError> {
let origin = req

View file

@ -1,135 +0,0 @@
use std::sync::Mutex;
use futures::prelude::*;
use futures::stream::BoxStream;
use http_body_util::{BodyExt, StreamBody};
use hyper::body::{Bytes, Frame};
use serde::Deserialize;
use tokio::sync::mpsc;
use tokio::task;
use super::*;
use crate::signature::checksum::*;
pub struct ReqBody {
// why need mutex to be sync??
pub(crate) stream: Mutex<BoxStream<'static, Result<Frame<Bytes>, Error>>>,
pub(crate) checksummer: Checksummer,
pub(crate) expected_checksums: ExpectedChecksums,
pub(crate) trailer_algorithm: Option<ChecksumAlgorithm>,
}
pub type StreamingChecksumReceiver = task::JoinHandle<Result<Checksums, Error>>;
impl ReqBody {
pub fn add_expected_checksums(&mut self, more: ExpectedChecksums) {
if more.md5.is_some() {
self.expected_checksums.md5 = more.md5;
}
if more.sha256.is_some() {
self.expected_checksums.sha256 = more.sha256;
}
if more.extra.is_some() {
self.expected_checksums.extra = more.extra;
}
self.checksummer.add_expected(&self.expected_checksums);
}
pub fn add_md5(&mut self) {
self.checksummer.add_md5();
}
// ============ non-streaming =============
pub async fn json<T: for<'a> Deserialize<'a>>(self) -> Result<T, Error> {
let body = self.collect().await?;
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp)
}
pub async fn collect(self) -> Result<Bytes, Error> {
self.collect_with_checksums().await.map(|(b, _)| b)
}
pub async fn collect_with_checksums(mut self) -> Result<(Bytes, Checksums), Error> {
let stream: BoxStream<_> = self.stream.into_inner().unwrap();
let bytes = BodyExt::collect(StreamBody::new(stream)).await?.to_bytes();
self.checksummer.update(&bytes);
let checksums = self.checksummer.finalize();
checksums.verify(&self.expected_checksums)?;
Ok((bytes, checksums))
}
// ============ streaming =============
pub fn streaming_with_checksums(
self,
) -> (
BoxStream<'static, Result<Bytes, Error>>,
StreamingChecksumReceiver,
) {
let Self {
stream,
mut checksummer,
mut expected_checksums,
trailer_algorithm,
} = self;
let (frame_tx, mut frame_rx) = mpsc::channel::<Frame<Bytes>>(5);
let join_checksums = tokio::spawn(async move {
while let Some(frame) = frame_rx.recv().await {
match frame.into_data() {
Ok(data) => {
checksummer = tokio::task::spawn_blocking(move || {
checksummer.update(&data);
checksummer
})
.await
.unwrap()
}
Err(frame) => {
let trailers = frame.into_trailers().unwrap();
let algo = trailer_algorithm.unwrap();
expected_checksums.extra = Some(extract_checksum_value(&trailers, algo)?);
break;
}
}
}
if trailer_algorithm.is_some() && expected_checksums.extra.is_none() {
return Err(Error::bad_request("trailing checksum was not sent"));
}
let checksums = checksummer.finalize();
checksums.verify(&expected_checksums)?;
Ok(checksums)
});
let stream: BoxStream<_> = stream.into_inner().unwrap();
let stream = stream.filter_map(move |x| {
let frame_tx = frame_tx.clone();
async move {
match x {
Err(e) => Some(Err(e)),
Ok(frame) => {
if frame.is_data() {
let data = frame.data_ref().unwrap().clone();
let _ = frame_tx.send(frame).await;
Some(Ok(data))
} else {
let _ = frame_tx.send(frame).await;
None
}
}
}
}
});
(stream.boxed(), join_checksums)
}
}

View file

@ -18,10 +18,6 @@ pub enum Error {
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)]
InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
/// The provided digest (checksum) value was invalid
#[error(display = "Invalid digest: {}", _0)]
InvalidDigest(String),
}
impl<T> From<T> for Error

View file

@ -2,7 +2,6 @@ use chrono::{DateTime, Utc};
use hmac::{Hmac, Mac};
use sha2::Sha256;
use hyper::header::HeaderName;
use hyper::{body::Incoming as IncomingBody, Request};
use garage_model::garage::Garage;
@ -11,8 +10,6 @@ use garage_util::data::{sha256sum, Hash};
use error::*;
pub mod body;
pub mod checksum;
pub mod error;
pub mod payload;
pub mod streaming;
@ -20,73 +17,36 @@ pub mod streaming;
pub const SHORT_DATE: &str = "%Y%m%d";
pub const LONG_DATETIME: &str = "%Y%m%dT%H%M%SZ";
// ---- Constants used in AWSv4 signatures ----
pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm");
pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential");
pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date");
pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires");
pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders");
pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature");
pub const X_AMZ_CONTENT_SHA256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
pub const X_AMZ_TRAILER: HeaderName = HeaderName::from_static("x-amz-trailer");
/// Result of `sha256("")`
pub(crate) const EMPTY_STRING_HEX_DIGEST: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
// Signature calculation algorithm
pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256";
type HmacSha256 = Hmac<Sha256>;
// Possible values for x-amz-content-sha256, in addition to the actual sha256
pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD";
pub const STREAMING_UNSIGNED_PAYLOAD_TRAILER: &str = "STREAMING-UNSIGNED-PAYLOAD-TRAILER";
pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
// Used in the computation of StringToSign
pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
// ---- enums to describe stuff going on in signature calculation ----
#[derive(Debug)]
pub enum ContentSha256Header {
UnsignedPayload,
Sha256Checksum(Hash),
StreamingPayload { trailer: bool, signed: bool },
}
// ---- top-level functions ----
pub struct VerifiedRequest {
pub request: Request<streaming::ReqBody>,
pub access_key: Key,
pub content_sha256_header: ContentSha256Header,
}
pub async fn verify_request(
garage: &Garage,
mut req: Request<IncomingBody>,
service: &'static str,
) -> Result<VerifiedRequest, Error> {
let checked_signature = payload::check_payload_signature(&garage, &mut req, service).await?;
) -> Result<(Request<streaming::ReqBody>, Key, Option<Hash>), Error> {
let (api_key, mut content_sha256) =
payload::check_payload_signature(&garage, &mut req, service).await?;
let api_key =
api_key.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
let request = streaming::parse_streaming_body(
let req = streaming::parse_streaming_body(
&api_key,
req,
&checked_signature,
&mut content_sha256,
&garage.config.s3_api.s3_region,
service,
)?;
let access_key = checked_signature
.key
.ok_or_else(|| Error::forbidden("Garage does not support anonymous access yet"))?;
Ok((req, api_key, content_sha256))
}
Ok(VerifiedRequest {
request,
access_key,
content_sha256_header: checked_signature.content_sha256_header,
})
pub fn verify_signed_content(expected_sha256: Hash, body: &[u8]) -> Result<(), Error> {
if expected_sha256 != sha256sum(body) {
return Err(Error::bad_request(
"Request content hash does not match signed hash".to_string(),
));
}
Ok(())
}
pub fn signing_hmac(

View file

@ -13,9 +13,23 @@ use garage_util::data::Hash;
use garage_model::garage::Garage;
use garage_model::key_table::*;
use super::*;
use super::LONG_DATETIME;
use super::{compute_scope, signing_hmac};
use crate::encoding::uri_encode;
use crate::signature::error::*;
pub const X_AMZ_ALGORITHM: HeaderName = HeaderName::from_static("x-amz-algorithm");
pub const X_AMZ_CREDENTIAL: HeaderName = HeaderName::from_static("x-amz-credential");
pub const X_AMZ_DATE: HeaderName = HeaderName::from_static("x-amz-date");
pub const X_AMZ_EXPIRES: HeaderName = HeaderName::from_static("x-amz-expires");
pub const X_AMZ_SIGNEDHEADERS: HeaderName = HeaderName::from_static("x-amz-signedheaders");
pub const X_AMZ_SIGNATURE: HeaderName = HeaderName::from_static("x-amz-signature");
pub const X_AMZ_CONTENT_SH256: HeaderName = HeaderName::from_static("x-amz-content-sha256");
pub const AWS4_HMAC_SHA256: &str = "AWS4-HMAC-SHA256";
pub const UNSIGNED_PAYLOAD: &str = "UNSIGNED-PAYLOAD";
pub const STREAMING_AWS4_HMAC_SHA256_PAYLOAD: &str = "STREAMING-AWS4-HMAC-SHA256-PAYLOAD";
pub type QueryMap = HeaderMap<QueryValue>;
pub struct QueryValue {
@ -25,18 +39,11 @@ pub struct QueryValue {
value: String,
}
#[derive(Debug)]
pub struct CheckedSignature {
pub key: Option<Key>,
pub content_sha256_header: ContentSha256Header,
pub signature_header: Option<String>,
}
pub async fn check_payload_signature(
garage: &Garage,
request: &mut Request<IncomingBody>,
service: &'static str,
) -> Result<CheckedSignature, Error> {
) -> Result<(Option<Key>, Option<Hash>), Error> {
let query = parse_query_map(request.uri())?;
if query.contains_key(&X_AMZ_ALGORITHM) {
@ -50,46 +57,17 @@ pub async fn check_payload_signature(
// Unsigned (anonymous) request
let content_sha256 = request
.headers()
.get(X_AMZ_CONTENT_SHA256)
.map(|x| x.to_str())
.transpose()?;
Ok(CheckedSignature {
key: None,
content_sha256_header: parse_x_amz_content_sha256(content_sha256)?,
signature_header: None,
})
}
}
fn parse_x_amz_content_sha256(header: Option<&str>) -> Result<ContentSha256Header, Error> {
let header = match header {
Some(x) => x,
None => return Ok(ContentSha256Header::UnsignedPayload),
};
if header == UNSIGNED_PAYLOAD {
Ok(ContentSha256Header::UnsignedPayload)
} else if let Some(rest) = header.strip_prefix("STREAMING-") {
let (trailer, algo) = if let Some(rest2) = rest.strip_suffix("-TRAILER") {
(true, rest2)
.get("x-amz-content-sha256")
.filter(|c| c.as_bytes() != UNSIGNED_PAYLOAD.as_bytes());
if let Some(content_sha256) = content_sha256 {
let sha256 = hex::decode(content_sha256)
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
Ok((None, Some(sha256)))
} else {
(false, rest)
};
let signed = match algo {
AWS4_HMAC_SHA256_PAYLOAD => true,
UNSIGNED_PAYLOAD => false,
_ => {
return Err(Error::bad_request(
"invalid or unsupported x-amz-content-sha256",
))
}
};
Ok(ContentSha256Header::StreamingPayload { trailer, signed })
} else {
let sha256 = hex::decode(header)
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid content sha256 hash")?;
Ok(ContentSha256Header::Sha256Checksum(sha256))
Ok((None, None))
}
}
}
@ -98,7 +76,7 @@ async fn check_standard_signature(
service: &'static str,
request: &Request<IncomingBody>,
query: QueryMap,
) -> Result<CheckedSignature, Error> {
) -> Result<(Option<Key>, Option<Hash>), Error> {
let authorization = Authorization::parse_header(request.headers())?;
// Verify that all necessary request headers are included in signed_headers
@ -130,13 +108,18 @@ async fn check_standard_signature(
let key = verify_v4(garage, service, &authorization, string_to_sign.as_bytes()).await?;
let content_sha256_header = parse_x_amz_content_sha256(Some(&authorization.content_sha256))?;
let content_sha256 = if authorization.content_sha256 == UNSIGNED_PAYLOAD {
None
} else if authorization.content_sha256 == STREAMING_AWS4_HMAC_SHA256_PAYLOAD {
let bytes = hex::decode(authorization.signature).ok_or_bad_request("Invalid signature")?;
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid signature")?)
} else {
let bytes = hex::decode(authorization.content_sha256)
.ok_or_bad_request("Invalid content sha256 hash")?;
Some(Hash::try_from(&bytes).ok_or_bad_request("Invalid content sha256 hash")?)
};
Ok(CheckedSignature {
key: Some(key),
content_sha256_header,
signature_header: Some(authorization.signature),
})
Ok((Some(key), content_sha256))
}
async fn check_presigned_signature(
@ -144,7 +127,7 @@ async fn check_presigned_signature(
service: &'static str,
request: &mut Request<IncomingBody>,
mut query: QueryMap,
) -> Result<CheckedSignature, Error> {
) -> Result<(Option<Key>, Option<Hash>), Error> {
let algorithm = query.get(&X_AMZ_ALGORITHM).unwrap();
let authorization = Authorization::parse_presigned(&algorithm.value, &query)?;
@ -210,11 +193,7 @@ async fn check_presigned_signature(
// Presigned URLs always use UNSIGNED-PAYLOAD,
// so there is no sha256 hash to return.
Ok(CheckedSignature {
key: Some(key),
content_sha256_header: ContentSha256Header::UnsignedPayload,
signature_header: Some(authorization.signature),
})
Ok((Some(key), None))
}
pub fn parse_query_map(uri: &http::uri::Uri) -> Result<QueryMap, Error> {
@ -463,7 +442,7 @@ impl Authorization {
.to_string();
let content_sha256 = headers
.get(X_AMZ_CONTENT_SHA256)
.get(X_AMZ_CONTENT_SH256)
.ok_or_bad_request("Missing X-Amz-Content-Sha256 field")?;
let date = headers

View file

@ -1,157 +1,84 @@
use std::pin::Pin;
use std::sync::Mutex;
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*;
use futures::task;
use garage_model::key_table::Key;
use hmac::Mac;
use http::header::{HeaderMap, HeaderValue, CONTENT_ENCODING};
use hyper::body::{Bytes, Frame, Incoming as IncomingBody};
use http_body_util::StreamBody;
use hyper::body::{Bytes, Incoming as IncomingBody};
use hyper::Request;
use garage_util::data::Hash;
use super::*;
use super::{compute_scope, sha256sum, HmacSha256, LONG_DATETIME};
use crate::helpers::body_stream;
use crate::signature::checksum::*;
use crate::signature::payload::CheckedSignature;
use crate::helpers::*;
use crate::signature::error::*;
use crate::signature::payload::{
STREAMING_AWS4_HMAC_SHA256_PAYLOAD, X_AMZ_CONTENT_SH256, X_AMZ_DATE,
};
pub use crate::signature::body::ReqBody;
pub const AWS4_HMAC_SHA256_PAYLOAD: &str = "AWS4-HMAC-SHA256-PAYLOAD";
pub type ReqBody = BoxBody<Error>;
pub fn parse_streaming_body(
mut req: Request<IncomingBody>,
checked_signature: &CheckedSignature,
api_key: &Key,
req: Request<IncomingBody>,
content_sha256: &mut Option<Hash>,
region: &str,
service: &str,
) -> Result<Request<ReqBody>, Error> {
debug!(
"Content signature mode: {:?}",
checked_signature.content_sha256_header
);
match req.headers().get(X_AMZ_CONTENT_SH256) {
Some(header) if header == STREAMING_AWS4_HMAC_SHA256_PAYLOAD => {
let signature = content_sha256
.take()
.ok_or_bad_request("No signature provided")?;
match checked_signature.content_sha256_header {
ContentSha256Header::StreamingPayload { signed, trailer } => {
// Sanity checks
if !signed && !trailer {
return Err(Error::bad_request(
"STREAMING-UNSIGNED-PAYLOAD without trailer is not a valid combination",
));
}
let secret_key = &api_key
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key;
// Remove the aws-chunked component in the content-encoding: header
// Note: this header is not properly sent by minio client, so don't fail
// if it is absent from the request.
if let Some(content_encoding) = req.headers_mut().remove(CONTENT_ENCODING) {
if let Some(rest) = content_encoding.as_bytes().strip_prefix(b"aws-chunked,") {
req.headers_mut()
.insert(CONTENT_ENCODING, HeaderValue::from_bytes(rest).unwrap());
} else if content_encoding != "aws-chunked" {
return Err(Error::bad_request(
"content-encoding does not contain aws-chunked for STREAMING-*-PAYLOAD",
));
}
}
let date = req
.headers()
.get(X_AMZ_DATE)
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = Utc.from_utc_datetime(&date);
// If trailer header is announced, add the calculation of the requested checksum
let mut checksummer = Checksummer::init(&Default::default(), false);
let trailer_algorithm = if trailer {
let algo = Some(
request_trailer_checksum_algorithm(req.headers())?
.ok_or_bad_request("Missing x-amz-trailer header")?,
);
checksummer = checksummer.add(algo);
algo
} else {
None
};
// For signed variants, determine signing parameters
let sign_params = if signed {
let signature = checked_signature
.signature_header
.clone()
.ok_or_bad_request("No signature provided")?;
let signature = hex::decode(signature)
.ok()
.and_then(|bytes| Hash::try_from(&bytes))
.ok_or_bad_request("Invalid signature")?;
let secret_key = checked_signature
.key
.as_ref()
.ok_or_bad_request("Cannot sign streaming payload without signing key")?
.state
.as_option()
.ok_or_internal_error("Deleted key state")?
.secret_key
.to_string();
let date = req
.headers()
.get(X_AMZ_DATE)
.ok_or_bad_request("Missing X-Amz-Date field")?
.to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = Utc.from_utc_datetime(&date);
let scope = compute_scope(&date, region, service);
let signing_hmac =
crate::signature::signing_hmac(&date, &secret_key, region, service)
.ok_or_internal_error("Unable to build signing HMAC")?;
Some(SignParams {
datetime: date,
scope,
signing_hmac,
previous_signature: signature,
})
} else {
None
};
let scope = compute_scope(&date, region, service);
let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
.ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| {
let stream = body_stream::<_, Error>(body);
let signed_payload_stream =
StreamingPayloadStream::new(stream, sign_params, trailer).map_err(Error::from);
ReqBody {
stream: Mutex::new(signed_payload_stream.boxed()),
checksummer,
expected_checksums: Default::default(),
trailer_algorithm,
}
SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
.map(|x| x.map(hyper::body::Frame::data))
.map_err(Error::from);
ReqBody::new(StreamBody::new(signed_payload_stream))
}))
}
_ => Ok(req.map(|body| {
let expected_checksums = ExpectedChecksums {
sha256: match &checked_signature.content_sha256_header {
ContentSha256Header::Sha256Checksum(sha256) => Some(*sha256),
_ => None,
},
..Default::default()
};
let checksummer = Checksummer::init(&expected_checksums, false);
let stream = http_body_util::BodyStream::new(body).map_err(Error::from);
ReqBody {
stream: Mutex::new(stream.boxed()),
checksummer,
expected_checksums,
trailer_algorithm: None,
}
})),
_ => Ok(req.map(|body| ReqBody::new(http_body_util::BodyExt::map_err(body, Error::from)))),
}
}
/// Result of `sha256("")`
const EMPTY_STRING_HEX_DIGEST: &str =
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855";
fn compute_streaming_payload_signature(
signing_hmac: &HmacSha256,
date: DateTime<Utc>,
scope: &str,
previous_signature: Hash,
content_sha256: Hash,
) -> Result<Hash, StreamingPayloadError> {
) -> Result<Hash, Error> {
let string_to_sign = [
AWS4_HMAC_SHA256_PAYLOAD,
&date.format(LONG_DATETIME).to_string(),
@ -165,49 +92,12 @@ fn compute_streaming_payload_signature(
let mut hmac = signing_hmac.clone();
hmac.update(string_to_sign.as_bytes());
Hash::try_from(&hmac.finalize().into_bytes())
.ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into()))
}
fn compute_streaming_trailer_signature(
signing_hmac: &HmacSha256,
date: DateTime<Utc>,
scope: &str,
previous_signature: Hash,
trailer_sha256: Hash,
) -> Result<Hash, StreamingPayloadError> {
let string_to_sign = [
AWS4_HMAC_SHA256_PAYLOAD,
&date.format(LONG_DATETIME).to_string(),
scope,
&hex::encode(previous_signature),
&hex::encode(trailer_sha256),
]
.join("\n");
let mut hmac = signing_hmac.clone();
hmac.update(string_to_sign.as_bytes());
Hash::try_from(&hmac.finalize().into_bytes())
.ok_or_else(|| StreamingPayloadError::Message("Could not build signature".into()))
Ok(Hash::try_from(&hmac.finalize().into_bytes()).ok_or_internal_error("Invalid signature")?)
}
mod payload {
use http::{HeaderName, HeaderValue};
use garage_util::data::Hash;
use nom::bytes::streaming::{tag, take_while};
use nom::character::streaming::hex_digit1;
use nom::combinator::{map_res, opt};
use nom::number::streaming::hex_u32;
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(|e| e.map(Error::Parser))?
};
}
pub enum Error<I> {
Parser(nom::error::Error<I>),
BadSignature,
@ -223,13 +113,24 @@ mod payload {
}
#[derive(Debug, Clone)]
pub struct ChunkHeader {
pub struct Header {
pub size: usize,
pub signature: Option<Hash>,
pub signature: Hash,
}
impl ChunkHeader {
pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
impl Header {
pub fn parse(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
use nom::bytes::streaming::tag;
use nom::character::streaming::hex_digit1;
use nom::combinator::map_res;
use nom::number::streaming::hex_u32;
macro_rules! try_parse {
($expr:expr) => {
$expr.map_err(|e| e.map(Error::Parser))?
};
}
let (input, size) = try_parse!(hex_u32(input));
let (input, _) = try_parse!(tag(";")(input));
@ -239,172 +140,96 @@ mod payload {
let (input, _) = try_parse!(tag("\r\n")(input));
let header = ChunkHeader {
let header = Header {
size: size as usize,
signature: Some(signature),
signature,
};
Ok((input, header))
}
pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
let (input, size) = try_parse!(hex_u32(input));
let (input, _) = try_parse!(tag("\r\n")(input));
let header = ChunkHeader {
size: size as usize,
signature: None,
};
Ok((input, header))
}
}
#[derive(Debug, Clone)]
pub struct TrailerChunk {
pub header_name: HeaderName,
pub header_value: HeaderValue,
pub signature: Option<Hash>,
}
impl TrailerChunk {
fn parse_content(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
let (input, header_name) = try_parse!(map_res(
take_while(|c: u8| c.is_ascii_alphanumeric() || c == b'-'),
HeaderName::from_bytes
)(input));
let (input, _) = try_parse!(tag(b":")(input));
let (input, header_value) = try_parse!(map_res(
take_while(|c: u8| c.is_ascii_alphanumeric() || b"+/=".contains(&c)),
HeaderValue::from_bytes
)(input));
// Possible '\n' after the header value, depends on clients
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
let (input, _) = try_parse!(opt(tag(b"\n"))(input));
let (input, _) = try_parse!(tag(b"\r\n")(input));
Ok((
input,
TrailerChunk {
header_name,
header_value,
signature: None,
},
))
}
pub fn parse_signed(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
let (input, trailer) = Self::parse_content(input)?;
let (input, _) = try_parse!(tag(b"x-amz-trailer-signature:")(input));
let (input, data) = try_parse!(map_res(hex_digit1, hex::decode)(input));
let signature = Hash::try_from(&data).ok_or(nom::Err::Failure(Error::BadSignature))?;
let (input, _) = try_parse!(tag(b"\r\n")(input));
Ok((
input,
TrailerChunk {
signature: Some(signature),
..trailer
},
))
}
pub fn parse_unsigned(input: &[u8]) -> nom::IResult<&[u8], Self, Error<&[u8]>> {
let (input, trailer) = Self::parse_content(input)?;
let (input, _) = try_parse!(tag(b"\r\n")(input));
Ok((input, trailer))
}
}
}
#[derive(Debug)]
pub enum StreamingPayloadError {
pub enum SignedPayloadStreamError {
Stream(Error),
InvalidSignature,
Message(String),
}
impl StreamingPayloadError {
impl SignedPayloadStreamError {
fn message(msg: &str) -> Self {
StreamingPayloadError::Message(msg.into())
SignedPayloadStreamError::Message(msg.into())
}
}
impl From<StreamingPayloadError> for Error {
fn from(err: StreamingPayloadError) -> Self {
impl From<SignedPayloadStreamError> for Error {
fn from(err: SignedPayloadStreamError) -> Self {
match err {
StreamingPayloadError::Stream(e) => e,
StreamingPayloadError::InvalidSignature => {
SignedPayloadStreamError::Stream(e) => e,
SignedPayloadStreamError::InvalidSignature => {
Error::bad_request("Invalid payload signature")
}
StreamingPayloadError::Message(e) => {
SignedPayloadStreamError::Message(e) => {
Error::bad_request(format!("Chunk format error: {}", e))
}
}
}
}
impl<I> From<payload::Error<I>> for StreamingPayloadError {
impl<I> From<payload::Error<I>> for SignedPayloadStreamError {
fn from(err: payload::Error<I>) -> Self {
Self::message(err.description())
}
}
impl<I> From<nom::error::Error<I>> for StreamingPayloadError {
impl<I> From<nom::error::Error<I>> for SignedPayloadStreamError {
fn from(err: nom::error::Error<I>) -> Self {
Self::message(err.code.description())
}
}
enum StreamingPayloadChunk {
Chunk {
header: payload::ChunkHeader,
data: Bytes,
},
Trailer(payload::TrailerChunk),
}
struct SignParams {
datetime: DateTime<Utc>,
scope: String,
signing_hmac: HmacSha256,
previous_signature: Hash,
struct SignedPayload {
header: payload::Header,
data: Bytes,
}
#[pin_project::pin_project]
pub struct StreamingPayloadStream<S>
pub struct SignedPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>>,
{
#[pin]
stream: S,
buf: bytes::BytesMut,
signing: Option<SignParams>,
has_trailer: bool,
done: bool,
datetime: DateTime<Utc>,
scope: String,
signing_hmac: HmacSha256,
previous_signature: Hash,
}
impl<S> StreamingPayloadStream<S>
impl<S> SignedPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>>,
{
fn new(stream: S, signing: Option<SignParams>, has_trailer: bool) -> Self {
pub fn new(
stream: S,
signing_hmac: HmacSha256,
datetime: DateTime<Utc>,
scope: &str,
seed_signature: Hash,
) -> Self {
Self {
stream,
buf: bytes::BytesMut::new(),
signing,
has_trailer,
done: false,
datetime,
scope: scope.into(),
signing_hmac,
previous_signature: seed_signature,
}
}
fn parse_next(
input: &[u8],
is_signed: bool,
has_trailer: bool,
) -> nom::IResult<&[u8], StreamingPayloadChunk, StreamingPayloadError> {
fn parse_next(input: &[u8]) -> nom::IResult<&[u8], SignedPayload, SignedPayloadStreamError> {
use nom::bytes::streaming::{tag, take};
macro_rules! try_parse {
@ -413,30 +238,17 @@ where
};
}
let (input, header) = if is_signed {
try_parse!(payload::ChunkHeader::parse_signed(input))
} else {
try_parse!(payload::ChunkHeader::parse_unsigned(input))
};
let (input, header) = try_parse!(payload::Header::parse(input));
// 0-sized chunk is the last
if header.size == 0 {
if has_trailer {
let (input, trailer) = if is_signed {
try_parse!(payload::TrailerChunk::parse_signed(input))
} else {
try_parse!(payload::TrailerChunk::parse_unsigned(input))
};
return Ok((input, StreamingPayloadChunk::Trailer(trailer)));
} else {
return Ok((
input,
StreamingPayloadChunk::Chunk {
header,
data: Bytes::new(),
},
));
}
return Ok((
input,
SignedPayload {
header,
data: Bytes::new(),
},
));
}
let (input, data) = try_parse!(take::<_, _, nom::error::Error<_>>(header.size)(input));
@ -444,15 +256,15 @@ where
let data = Bytes::from(data.to_vec());
Ok((input, StreamingPayloadChunk::Chunk { header, data }))
Ok((input, SignedPayload { header, data }))
}
}
impl<S> Stream for StreamingPayloadStream<S>
impl<S> Stream for SignedPayloadStream<S>
where
S: Stream<Item = Result<Bytes, Error>> + Unpin,
{
type Item = Result<Frame<Bytes>, StreamingPayloadError>;
type Item = Result<Bytes, SignedPayloadStreamError>;
fn poll_next(
self: Pin<&mut Self>,
@ -462,105 +274,56 @@ where
let mut this = self.project();
if *this.done {
return Poll::Ready(None);
}
loop {
let (input, payload) =
match Self::parse_next(this.buf, this.signing.is_some(), *this.has_trailer) {
Ok(res) => res,
Err(nom::Err::Incomplete(_)) => {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
continue;
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(StreamingPayloadError::Stream(e))))
}
None => {
return Poll::Ready(Some(Err(StreamingPayloadError::message(
"Unexpected EOF",
))));
}
let (input, payload) = match Self::parse_next(this.buf) {
Ok(res) => res,
Err(nom::Err::Incomplete(_)) => {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
continue;
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(SignedPayloadStreamError::Stream(e))))
}
None => {
return Poll::Ready(Some(Err(SignedPayloadStreamError::message(
"Unexpected EOF",
))));
}
}
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
return Poll::Ready(Some(Err(e)))
}
};
match payload {
StreamingPayloadChunk::Chunk { data, header } => {
if let Some(signing) = this.signing.as_mut() {
let data_sha256sum = sha256sum(&data);
let expected_signature = compute_streaming_payload_signature(
&signing.signing_hmac,
signing.datetime,
&signing.scope,
signing.previous_signature,
data_sha256sum,
)?;
if header.signature.unwrap() != expected_signature {
return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature)));
}
signing.previous_signature = header.signature.unwrap();
}
*this.buf = input.into();
// 0-sized chunk is the last
if data.is_empty() {
// if there was a trailer, it would have been returned by the parser
assert!(!*this.has_trailer);
*this.done = true;
return Poll::Ready(None);
}
return Poll::Ready(Some(Ok(Frame::data(data))));
}
StreamingPayloadChunk::Trailer(trailer) => {
trace!(
"In StreamingPayloadStream::poll_next: got trailer {:?}",
trailer
);
if let Some(signing) = this.signing.as_mut() {
let data = [
trailer.header_name.as_ref(),
&b":"[..],
trailer.header_value.as_ref(),
&b"\n"[..],
]
.concat();
let trailer_sha256sum = sha256sum(&data);
let expected_signature = compute_streaming_trailer_signature(
&signing.signing_hmac,
signing.datetime,
&signing.scope,
signing.previous_signature,
trailer_sha256sum,
)?;
if trailer.signature.unwrap() != expected_signature {
return Poll::Ready(Some(Err(StreamingPayloadError::InvalidSignature)));
}
}
*this.buf = input.into();
*this.done = true;
let mut trailers_map = HeaderMap::new();
trailers_map.insert(trailer.header_name, trailer.header_value);
return Poll::Ready(Some(Ok(Frame::trailers(trailers_map))));
Err(nom::Err::Error(e)) | Err(nom::Err::Failure(e)) => {
return Poll::Ready(Some(Err(e)))
}
};
// 0-sized chunk is the last
if payload.data.is_empty() {
return Poll::Ready(None);
}
let data_sha256sum = sha256sum(&payload.data);
let expected_signature = compute_streaming_payload_signature(
this.signing_hmac,
*this.datetime,
this.scope,
*this.previous_signature,
data_sha256sum,
)
.map_err(|e| {
SignedPayloadStreamError::Message(format!("Could not build signature: {}", e))
})?;
if payload.header.signature != expected_signature {
return Poll::Ready(Some(Err(SignedPayloadStreamError::InvalidSignature)));
}
*this.buf = input.into();
*this.previous_signature = payload.header.signature;
return Poll::Ready(Some(Ok(payload.data)));
}
}
@ -573,7 +336,7 @@ where
mod tests {
use futures::prelude::*;
use super::{SignParams, StreamingPayloadError, StreamingPayloadStream};
use super::{SignedPayloadStream, SignedPayloadStreamError};
#[tokio::test]
async fn test_interrupted_signed_payload_stream() {
@ -595,20 +358,12 @@ mod tests {
let seed_signature = Hash::default();
let mut stream = StreamingPayloadStream::new(
body,
Some(SignParams {
signing_hmac,
datetime,
scope,
previous_signature: seed_signature,
}),
false,
);
let mut stream =
SignedPayloadStream::new(body, signing_hmac, datetime, &scope, seed_signature);
assert!(stream.try_next().await.is_err());
match stream.try_next().await {
Err(StreamingPayloadError::Message(msg)) if msg == "Unexpected EOF" => {}
Err(SignedPayloadStreamError::Message(msg)) if msg == "Unexpected EOF" => {}
item => panic!(
"Unexpected result, expected early EOF error, got {:?}",
item

View file

@ -81,9 +81,7 @@ impl ApiHandler for K2VApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let verified_request = verify_request(&garage, req, "k2v").await?;
let req = verified_request.request;
let api_key = verified_request.access_key;
let (req, api_key, _content_sha256) = verify_request(&garage, req, "k2v").await?;
let bucket_id = garage
.bucket_helper()

View file

@ -20,7 +20,7 @@ pub async fn handle_insert_batch(
let ReqCtx {
garage, bucket_id, ..
} = &ctx;
let items = req.into_body().json::<Vec<InsertBatchItem>>().await?;
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![];
for it in items {
@ -47,7 +47,7 @@ pub async fn handle_read_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = req.into_body().json::<Vec<ReadBatchQuery>>().await?;
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all(
queries
@ -141,7 +141,7 @@ pub async fn handle_delete_batch(
ctx: ReqCtx,
req: Request<ReqBody>,
) -> Result<Response<ResBody>, Error> {
let queries = req.into_body().json::<Vec<DeleteBatchQuery>>().await?;
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all(
queries
@ -262,7 +262,7 @@ pub(crate) async fn handle_poll_range(
} = ctx;
use garage_model::k2v::sub::PollRange;
let query = req.into_body().json::<PollRangeQuery>().await?;
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;

View file

@ -23,10 +23,6 @@ pub enum Error {
#[error(display = "Authorization header malformed, unexpected scope: {}", _0)]
AuthorizationHeaderMalformed(String),
/// The provided digest (checksum) value was invalid
#[error(display = "Invalid digest: {}", _0)]
InvalidDigest(String),
/// The object requested don't exists
#[error(display = "Key not found")]
NoSuchKey,
@ -58,7 +54,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
SignatureError::InvalidDigest(d) => Self::InvalidDigest(d),
}
}
}
@ -76,7 +71,6 @@ impl Error {
Error::InvalidBase64(_) => "InvalidBase64",
Error::InvalidUtf8Str(_) => "InvalidUtf8String",
Error::InvalidCausalityToken => "CausalityToken",
Error::InvalidDigest(_) => "InvalidDigest",
}
}
}
@ -91,7 +85,6 @@ impl ApiError for Error {
Error::AuthorizationHeaderMalformed(_)
| Error::InvalidBase64(_)
| Error::InvalidUtf8Str(_)
| Error::InvalidDigest(_)
| Error::InvalidCausalityToken => StatusCode::BAD_REQUEST,
}
}

View file

@ -144,7 +144,9 @@ pub async fn handle_insert_item(
.map(parse_causality_token)
.transpose()?;
let body = req.into_body().collect().await?;
let body = http_body_util::BodyExt::collect(req.into_body())
.await?
.to_bytes();
let value = DvvsValue::Value(body.to_vec());

View file

@ -121,9 +121,7 @@ impl ApiHandler for S3ApiServer {
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
}
let verified_request = verify_request(&garage, req, "s3").await?;
let req = verified_request.request;
let api_key = verified_request.access_key;
let (req, api_key, content_sha256) = verify_request(&garage, req, "s3").await?;
let bucket_name = match bucket_name {
None => {
@ -136,7 +134,14 @@ impl ApiHandler for S3ApiServer {
// Special code path for CreateBucket API endpoint
if let Endpoint::CreateBucket {} = endpoint {
return handle_create_bucket(&garage, req, &api_key.key_id, bucket_name).await;
return handle_create_bucket(
&garage,
req,
content_sha256,
&api_key.key_id,
bucket_name,
)
.await;
}
let bucket_id = garage
@ -174,7 +179,7 @@ impl ApiHandler for S3ApiServer {
let resp = match endpoint {
Endpoint::HeadObject {
key, part_number, ..
} => handle_head(ctx, &req.map(|_| ()), &key, part_number).await,
} => handle_head(ctx, &req, &key, part_number).await,
Endpoint::GetObject {
key,
part_number,
@ -194,20 +199,20 @@ impl ApiHandler for S3ApiServer {
response_content_type,
response_expires,
};
handle_get(ctx, &req.map(|_| ()), &key, part_number, overrides).await
handle_get(ctx, &req, &key, part_number, overrides).await
}
Endpoint::UploadPart {
key,
part_number,
upload_id,
} => handle_put_part(ctx, req, &key, part_number, &upload_id).await,
} => handle_put_part(ctx, req, &key, part_number, &upload_id, content_sha256).await,
Endpoint::CopyObject { key } => handle_copy(ctx, &req, &key).await,
Endpoint::UploadPartCopy {
key,
part_number,
upload_id,
} => handle_upload_part_copy(ctx, &req, &key, part_number, &upload_id).await,
Endpoint::PutObject { key } => handle_put(ctx, req, &key).await,
Endpoint::PutObject { key } => handle_put(ctx, req, &key, content_sha256).await,
Endpoint::AbortMultipartUpload { key, upload_id } => {
handle_abort_multipart_upload(ctx, &key, &upload_id).await
}
@ -216,7 +221,7 @@ impl ApiHandler for S3ApiServer {
handle_create_multipart_upload(ctx, &req, &key).await
}
Endpoint::CompleteMultipartUpload { key, upload_id } => {
handle_complete_multipart_upload(ctx, req, &key, &upload_id).await
handle_complete_multipart_upload(ctx, req, &key, &upload_id, content_sha256).await
}
Endpoint::CreateBucket {} => unreachable!(),
Endpoint::HeadBucket {} => {
@ -319,15 +324,17 @@ impl ApiHandler for S3ApiServer {
};
handle_list_parts(ctx, req, &query).await
}
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req).await,
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req).await,
Endpoint::PutBucketWebsite {} => handle_put_website(ctx, req, content_sha256).await,
Endpoint::DeleteBucketWebsite {} => handle_delete_website(ctx).await,
Endpoint::GetBucketCors {} => handle_get_cors(ctx).await,
Endpoint::PutBucketCors {} => handle_put_cors(ctx, req).await,
Endpoint::PutBucketCors {} => handle_put_cors(ctx, req, content_sha256).await,
Endpoint::DeleteBucketCors {} => handle_delete_cors(ctx).await,
Endpoint::GetBucketLifecycleConfiguration {} => handle_get_lifecycle(ctx).await,
Endpoint::PutBucketLifecycleConfiguration {} => handle_put_lifecycle(ctx, req).await,
Endpoint::PutBucketLifecycleConfiguration {} => {
handle_put_lifecycle(ctx, req, content_sha256).await
}
Endpoint::DeleteBucketLifecycle {} => handle_delete_lifecycle(ctx).await,
endpoint => Err(Error::NotImplemented(endpoint.name().to_owned())),
};

View file

@ -1,5 +1,6 @@
use std::collections::HashMap;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_model::bucket_alias_table::*;
@ -9,10 +10,12 @@ use garage_model::key_table::Key;
use garage_model::permission::BucketKeyPerm;
use garage_table::util::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::time::*;
use garage_api_common::common_error::CommonError;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -119,10 +122,15 @@ pub async fn handle_list_buckets(
pub async fn handle_create_bucket(
garage: &Garage,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
api_key_id: &String,
bucket_name: String,
) -> Result<Response<ResBody>, Error> {
let body = req.into_body().collect().await?;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let cmd =
parse_create_bucket_xml(&body[..]).ok_or_bad_request("Invalid create bucket XML query")?;

View file

@ -11,12 +11,11 @@ use sha2::Sha256;
use http::{HeaderMap, HeaderName, HeaderValue};
use garage_util::data::*;
use garage_util::error::OkOrMessage;
use super::*;
use garage_model::s3::object_table::*;
pub use garage_model::s3::object_table::{ChecksumAlgorithm, ChecksumValue};
pub const CONTENT_MD5: HeaderName = HeaderName::from_static("content-md5");
use crate::error::*;
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-checksum-algorithm");
@ -32,8 +31,8 @@ pub type Md5Checksum = [u8; 16];
pub type Sha1Checksum = [u8; 20];
pub type Sha256Checksum = [u8; 32];
#[derive(Debug, Default, Clone)]
pub struct ExpectedChecksums {
#[derive(Debug, Default)]
pub(crate) struct ExpectedChecksums {
// base64-encoded md5 (content-md5 header)
pub md5: Option<String>,
// content_sha256 (as a Hash / FixedBytes32)
@ -42,7 +41,7 @@ pub struct ExpectedChecksums {
pub extra: Option<ChecksumValue>,
}
pub struct Checksummer {
pub(crate) struct Checksummer {
pub crc32: Option<Crc32>,
pub crc32c: Option<Crc32c>,
pub md5: Option<Md5>,
@ -51,7 +50,7 @@ pub struct Checksummer {
}
#[derive(Default)]
pub struct Checksums {
pub(crate) struct Checksums {
pub crc32: Option<Crc32Checksum>,
pub crc32c: Option<Crc32cChecksum>,
pub md5: Option<Md5Checksum>,
@ -60,48 +59,34 @@ pub struct Checksums {
}
impl Checksummer {
pub fn new() -> Self {
Self {
pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
let mut ret = Self {
crc32: None,
crc32c: None,
md5: None,
sha1: None,
sha256: None,
}
}
};
pub fn init(expected: &ExpectedChecksums, add_md5: bool) -> Self {
let mut ret = Self::new();
ret.add_expected(expected);
if add_md5 {
ret.add_md5();
if expected.md5.is_some() || require_md5 {
ret.md5 = Some(Md5::new());
}
if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
ret.sha256 = Some(Sha256::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
ret.crc32 = Some(Crc32::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
ret.crc32c = Some(Crc32c::default());
}
if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
ret.sha1 = Some(Sha1::new());
}
ret
}
pub fn add_md5(&mut self) {
self.md5 = Some(Md5::new());
}
pub fn add_expected(&mut self, expected: &ExpectedChecksums) {
if expected.md5.is_some() {
self.md5 = Some(Md5::new());
}
if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
self.sha256 = Some(Sha256::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
self.crc32 = Some(Crc32::new());
}
if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
self.crc32c = Some(Crc32c::default());
}
if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
self.sha1 = Some(Sha1::new());
}
}
pub fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self {
pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self {
match algo {
Some(ChecksumAlgorithm::Crc32) => {
self.crc32 = Some(Crc32::new());
@ -120,7 +105,7 @@ impl Checksummer {
self
}
pub fn update(&mut self, bytes: &[u8]) {
pub(crate) fn update(&mut self, bytes: &[u8]) {
if let Some(crc32) = &mut self.crc32 {
crc32.update(bytes);
}
@ -138,7 +123,7 @@ impl Checksummer {
}
}
pub fn finalize(self) -> Checksums {
pub(crate) fn finalize(self) -> Checksums {
Checksums {
crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())),
crc32c: self
@ -198,56 +183,153 @@ impl Checksums {
// ----
pub fn parse_checksum_algorithm(algo: &str) -> Result<ChecksumAlgorithm, Error> {
match algo {
"CRC32" => Ok(ChecksumAlgorithm::Crc32),
"CRC32C" => Ok(ChecksumAlgorithm::Crc32c),
"SHA1" => Ok(ChecksumAlgorithm::Sha1),
"SHA256" => Ok(ChecksumAlgorithm::Sha256),
_ => Err(Error::bad_request("invalid checksum algorithm")),
#[derive(Default)]
pub(crate) struct MultipartChecksummer {
pub md5: Md5,
pub extra: Option<MultipartExtraChecksummer>,
}
pub(crate) enum MultipartExtraChecksummer {
Crc32(Crc32),
Crc32c(Crc32c),
Sha1(Sha1),
Sha256(Sha256),
}
impl MultipartChecksummer {
pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self {
Self {
md5: Md5::new(),
extra: match algo {
None => None,
Some(ChecksumAlgorithm::Crc32) => {
Some(MultipartExtraChecksummer::Crc32(Crc32::new()))
}
Some(ChecksumAlgorithm::Crc32c) => {
Some(MultipartExtraChecksummer::Crc32c(Crc32c::default()))
}
Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())),
Some(ChecksumAlgorithm::Sha256) => {
Some(MultipartExtraChecksummer::Sha256(Sha256::new()))
}
},
}
}
pub(crate) fn update(
&mut self,
etag: &str,
checksum: Option<ChecksumValue>,
) -> Result<(), Error> {
self.md5
.update(&hex::decode(&etag).ok_or_message("invalid etag hex")?);
match (&mut self.extra, checksum) {
(None, _) => (),
(
Some(MultipartExtraChecksummer::Crc32(ref mut crc32)),
Some(ChecksumValue::Crc32(x)),
) => {
crc32.update(&x);
}
(
Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)),
Some(ChecksumValue::Crc32c(x)),
) => {
crc32c.write(&x);
}
(Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => {
sha1.update(&x);
}
(
Some(MultipartExtraChecksummer::Sha256(ref mut sha256)),
Some(ChecksumValue::Sha256(x)),
) => {
sha256.update(&x);
}
(Some(_), b) => {
return Err(Error::internal_error(format!(
"part checksum was not computed correctly, got: {:?}",
b
)))
}
}
Ok(())
}
pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) {
let md5 = self.md5.finalize()[..].try_into().unwrap();
let extra = match self.extra {
None => None,
Some(MultipartExtraChecksummer::Crc32(crc32)) => {
Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize())))
}
Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c(
u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()),
)),
Some(MultipartExtraChecksummer::Sha1(sha1)) => {
Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap()))
}
Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256(
sha256.finalize()[..].try_into().unwrap(),
)),
};
(md5, extra)
}
}
// ----
/// Extract the value of the x-amz-checksum-algorithm header
pub fn request_checksum_algorithm(
pub(crate) fn request_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
None => Ok(None),
Some(x) => parse_checksum_algorithm(x.to_str()?).map(Some),
}
}
pub fn request_trailer_checksum_algorithm(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumAlgorithm>, Error> {
match headers.get(X_AMZ_TRAILER).map(|x| x.to_str()).transpose()? {
None => Ok(None),
Some(x) if x == X_AMZ_CHECKSUM_CRC32 => Ok(Some(ChecksumAlgorithm::Crc32)),
Some(x) if x == X_AMZ_CHECKSUM_CRC32C => Ok(Some(ChecksumAlgorithm::Crc32c)),
Some(x) if x == X_AMZ_CHECKSUM_SHA1 => Ok(Some(ChecksumAlgorithm::Sha1)),
Some(x) if x == X_AMZ_CHECKSUM_SHA256 => Ok(Some(ChecksumAlgorithm::Sha256)),
Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
_ => Err(Error::bad_request("invalid checksum algorithm")),
}
}
/// Extract the value of any of the x-amz-checksum-* headers
pub fn request_checksum_value(
pub(crate) fn request_checksum_value(
headers: &HeaderMap<HeaderValue>,
) -> Result<Option<ChecksumValue>, Error> {
let mut ret = vec![];
if headers.contains_key(X_AMZ_CHECKSUM_CRC32) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32)?);
if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
let crc32 = BASE64_STANDARD
.decode(&crc32_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
ret.push(ChecksumValue::Crc32(crc32))
}
if headers.contains_key(X_AMZ_CHECKSUM_CRC32C) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Crc32c)?);
if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
let crc32c = BASE64_STANDARD
.decode(&crc32c_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
ret.push(ChecksumValue::Crc32c(crc32c))
}
if headers.contains_key(X_AMZ_CHECKSUM_SHA1) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha1)?);
if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
let sha1 = BASE64_STANDARD
.decode(&sha1_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
ret.push(ChecksumValue::Sha1(sha1))
}
if headers.contains_key(X_AMZ_CHECKSUM_SHA256) {
ret.push(extract_checksum_value(headers, ChecksumAlgorithm::Sha256)?);
if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
let sha256 = BASE64_STANDARD
.decode(&sha256_str)
.ok()
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
ret.push(ChecksumValue::Sha256(sha256))
}
if ret.len() > 1 {
@ -260,47 +342,48 @@ pub fn request_checksum_value(
/// Checks for the presence of x-amz-checksum-algorithm
/// if so extract the corresponding x-amz-checksum-* value
pub fn extract_checksum_value(
pub(crate) fn request_checksum_algorithm_value(
headers: &HeaderMap<HeaderValue>,
algo: ChecksumAlgorithm,
) -> Result<ChecksumValue, Error> {
match algo {
ChecksumAlgorithm::Crc32 => {
) -> Result<Option<ChecksumValue>, Error> {
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
Some(x) if x == "CRC32" => {
let crc32 = headers
.get(X_AMZ_CHECKSUM_CRC32)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
Ok(ChecksumValue::Crc32(crc32))
Ok(Some(ChecksumValue::Crc32(crc32)))
}
ChecksumAlgorithm::Crc32c => {
Some(x) if x == "CRC32C" => {
let crc32c = headers
.get(X_AMZ_CHECKSUM_CRC32C)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
Ok(ChecksumValue::Crc32c(crc32c))
Ok(Some(ChecksumValue::Crc32c(crc32c)))
}
ChecksumAlgorithm::Sha1 => {
Some(x) if x == "SHA1" => {
let sha1 = headers
.get(X_AMZ_CHECKSUM_SHA1)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
Ok(ChecksumValue::Sha1(sha1))
Ok(Some(ChecksumValue::Sha1(sha1)))
}
ChecksumAlgorithm::Sha256 => {
Some(x) if x == "SHA256" => {
let sha256 = headers
.get(X_AMZ_CHECKSUM_SHA256)
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
.and_then(|x| x.try_into().ok())
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
Ok(ChecksumValue::Sha256(sha256))
Ok(Some(ChecksumValue::Sha256(sha256)))
}
Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
None => Ok(None),
}
}
pub fn add_checksum_response_headers(
pub(crate) fn add_checksum_response_headers(
checksum: &Option<ChecksumValue>,
mut resp: http::response::Builder,
) -> http::response::Builder {

View file

@ -1,9 +1,9 @@
use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
use bytes::Bytes;
use http::header::HeaderName;
use hyper::{Request, Response};
use serde::Serialize;
@ -21,25 +21,16 @@ use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
use crate::api_server::{ReqBody, ResBody};
use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::get::{full_object_byte_stream, PreconditionHeaders};
use crate::get::full_object_byte_stream;
use crate::multipart;
use crate::put::{extract_metadata_headers, save_stream, ChecksumMode, SaveStreamResult};
use crate::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
use crate::xml::{self as s3_xml, xmlns_tag};
pub const X_AMZ_COPY_SOURCE_IF_MATCH: HeaderName =
HeaderName::from_static("x-amz-copy-source-if-match");
pub const X_AMZ_COPY_SOURCE_IF_NONE_MATCH: HeaderName =
HeaderName::from_static("x-amz-copy-source-if-none-match");
pub const X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE: HeaderName =
HeaderName::from_static("x-amz-copy-source-if-modified-since");
pub const X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE: HeaderName =
HeaderName::from_static("x-amz-copy-source-if-unmodified-since");
// -------- CopyObject ---------
pub async fn handle_copy(
@ -47,7 +38,7 @@ pub async fn handle_copy(
req: &Request<ReqBody>,
dest_key: &str,
) -> Result<Response<ResBody>, Error> {
let copy_precondition = PreconditionHeaders::parse_copy_source(req)?;
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let checksum_algorithm = request_checksum_algorithm(req.headers())?;
@ -57,7 +48,7 @@ pub async fn handle_copy(
extract_source_info(&source_object)?;
// Check precondition, e.g. x-amz-copy-source-if-match
copy_precondition.check_copy_source(source_version, &source_version_meta.etag)?;
copy_precondition.check(source_version, &source_version_meta.etag)?;
// Determine encryption parameters
let (source_encryption, source_object_meta_inner) =
@ -82,7 +73,7 @@ pub async fn handle_copy(
let dest_object_meta = ObjectVersionMetaInner {
headers: match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
extract_metadata_headers(req.headers())?
get_headers(req.headers())?
}
_ => source_object_meta_inner.into_owned().headers,
},
@ -344,7 +335,7 @@ pub async fn handle_upload_part_copy(
part_number: u64,
upload_id: &str,
) -> Result<Response<ResBody>, Error> {
let copy_precondition = PreconditionHeaders::parse_copy_source(req)?;
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
@ -360,7 +351,7 @@ pub async fn handle_upload_part_copy(
extract_source_info(&source_object)?;
// Check precondition on source, e.g. x-amz-copy-source-if-match
copy_precondition.check_copy_source(source_object_version, &source_version_meta.etag)?;
copy_precondition.check(source_object_version, &source_version_meta.etag)?;
// Determine encryption parameters
let (source_encryption, _) = EncryptionParams::check_decrypt_for_copy_source(
@ -712,6 +703,97 @@ fn extract_source_info(
Ok((source_version, source_version_data, source_version_meta))
}
struct CopyPreconditionHeaders {
copy_source_if_match: Option<Vec<String>>,
copy_source_if_modified_since: Option<SystemTime>,
copy_source_if_none_match: Option<Vec<String>>,
copy_source_if_unmodified_since: Option<SystemTime>,
}
impl CopyPreconditionHeaders {
fn parse(req: &Request<ReqBody>) -> Result<Self, Error> {
Ok(Self {
copy_source_if_match: req
.headers()
.get("x-amz-copy-source-if-match")
.map(|x| x.to_str())
.transpose()?
.map(|x| {
x.split(',')
.map(|m| m.trim().trim_matches('"').to_string())
.collect::<Vec<_>>()
}),
copy_source_if_modified_since: req
.headers()
.get("x-amz-copy-source-if-modified-since")
.map(|x| x.to_str())
.transpose()?
.map(httpdate::parse_http_date)
.transpose()
.ok_or_bad_request("Invalid date in x-amz-copy-source-if-modified-since")?,
copy_source_if_none_match: req
.headers()
.get("x-amz-copy-source-if-none-match")
.map(|x| x.to_str())
.transpose()?
.map(|x| {
x.split(',')
.map(|m| m.trim().trim_matches('"').to_string())
.collect::<Vec<_>>()
}),
copy_source_if_unmodified_since: req
.headers()
.get("x-amz-copy-source-if-unmodified-since")
.map(|x| x.to_str())
.transpose()?
.map(httpdate::parse_http_date)
.transpose()
.ok_or_bad_request("Invalid date in x-amz-copy-source-if-unmodified-since")?,
})
}
fn check(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> {
let v_date = UNIX_EPOCH + Duration::from_millis(v.timestamp);
let ok = match (
&self.copy_source_if_match,
&self.copy_source_if_unmodified_since,
&self.copy_source_if_none_match,
&self.copy_source_if_modified_since,
) {
// TODO I'm not sure all of the conditions are evaluated correctly here
// If we have both if-match and if-unmodified-since,
// basically we don't care about if-unmodified-since,
// because in the spec it says that if if-match evaluates to
// true but if-unmodified-since evaluates to false,
// the copy is still done.
(Some(im), _, None, None) => im.iter().any(|x| x == etag || x == "*"),
(None, Some(ius), None, None) => v_date <= *ius,
// If we have both if-none-match and if-modified-since,
// then both of the two conditions must evaluate to true
(None, None, Some(inm), Some(ims)) => {
!inm.iter().any(|x| x == etag || x == "*") && v_date > *ims
}
(None, None, Some(inm), None) => !inm.iter().any(|x| x == etag || x == "*"),
(None, None, None, Some(ims)) => v_date > *ims,
(None, None, None, None) => true,
_ => {
return Err(Error::bad_request(
"Invalid combination of x-amz-copy-source-if-xxxxx headers",
))
}
};
if ok {
Ok(())
} else {
Err(Error::PreconditionFailed)
}
}
}
type BlockStreamItemOk = (Bytes, Option<Hash>);
type BlockStreamItem = Result<BlockStreamItemOk, garage_util::error::Error>;

View file

@ -2,11 +2,15 @@ use quick_xml::de::from_reader;
use hyper::{header::HeaderName, Method, Request, Response, StatusCode};
use http_body_util::BodyExt;
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::{Bucket, CorsRule as GarageCorsRule};
use garage_util::data::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -55,6 +59,7 @@ pub async fn handle_delete_cors(ctx: ReqCtx) -> Result<Response<ResBody>, Error>
pub async fn handle_put_cors(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -63,7 +68,11 @@ pub async fn handle_put_cors(
..
} = ctx;
let body = req.into_body().collect().await?;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let conf: CorsConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;

View file

@ -1,3 +1,4 @@
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use garage_util::data::*;
@ -5,6 +6,7 @@ use garage_util::data::*;
use garage_model::s3::object_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -66,8 +68,13 @@ pub async fn handle_delete(ctx: ReqCtx, key: &str) -> Result<Response<ResBody>,
pub async fn handle_delete_objects(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let body = req.into_body().collect().await?;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let cmd_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let cmd = parse_delete_objects_xml(&cmd_xml).ok_or_bad_request("Invalid delete XML query")?;

View file

@ -29,8 +29,8 @@ use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner};
use garage_api_common::common_error::*;
use garage_api_common::signature::checksum::Md5Checksum;
use crate::checksum::Md5Checksum;
use crate::error::Error;
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =

View file

@ -80,7 +80,7 @@ pub enum Error {
#[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)]
InvalidEncryptionAlgorithm(String),
/// The provided digest (checksum) value was invalid
/// The client sent invalid XML data
#[error(display = "Invalid digest: {}", _0)]
InvalidDigest(String),
@ -119,7 +119,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c)
}
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
SignatureError::InvalidDigest(d) => Self::InvalidDigest(d),
}
}
}

View file

@ -2,17 +2,17 @@
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use std::time::{Duration, UNIX_EPOCH};
use bytes::Bytes;
use futures::future;
use futures::stream::{self, Stream, StreamExt};
use http::header::{
HeaderMap, HeaderName, ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING,
CONTENT_LANGUAGE, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MATCH,
IF_MODIFIED_SINCE, IF_NONE_MATCH, IF_UNMODIFIED_SINCE, LAST_MODIFIED, RANGE,
ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
LAST_MODIFIED, RANGE,
};
use hyper::{Request, Response, StatusCode};
use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc;
use garage_net::stream::ByteStream;
@ -26,14 +26,13 @@ use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
use crate::api_server::ResBody;
use crate::copy::*;
use crate::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
use crate::encryption::EncryptionParams;
use crate::error::*;
const X_AMZ_MP_PARTS_COUNT: HeaderName = HeaderName::from_static("x-amz-mp-parts-count");
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
#[derive(Default)]
pub struct GetObjectOverrides {
@ -116,29 +115,49 @@ fn getobject_override_headers(
Ok(())
}
fn handle_http_precondition(
fn try_answer_cached(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
req: &Request<()>,
) -> Result<Option<Response<ResBody>>, Error> {
let precondition_headers = PreconditionHeaders::parse(req)?;
req: &Request<impl Body>,
) -> Option<Response<ResBody>> {
// <trinity> It is possible, and is even usually the case, [that both If-None-Match and
// If-Modified-Since] are present in a request. In this situation If-None-Match takes
// precedence and If-Modified-Since is ignored (as per 6.Precedence from rfc7232). The rational
// being that etag based matching is more accurate, it has no issue with sub-second precision
// for instance (in case of very fast updates)
let cached = if let Some(none_match) = req.headers().get(IF_NONE_MATCH) {
let none_match = none_match.to_str().ok()?;
let expected = format!("\"{}\"", version_meta.etag);
let found = none_match
.split(',')
.map(str::trim)
.any(|etag| etag == expected || etag == "\"*\"");
found
} else if let Some(modified_since) = req.headers().get(IF_MODIFIED_SINCE) {
let modified_since = modified_since.to_str().ok()?;
let client_date = httpdate::parse_http_date(modified_since).ok()?;
let server_date = UNIX_EPOCH + Duration::from_millis(version.timestamp);
client_date >= server_date
} else {
false
};
if let Some(status_code) = precondition_headers.check(&version, &version_meta.etag)? {
Ok(Some(
if cached {
Some(
Response::builder()
.status(status_code)
.status(StatusCode::NOT_MODIFIED)
.body(empty_body())
.unwrap(),
))
)
} else {
Ok(None)
None
}
}
/// Handle HEAD request
pub async fn handle_head(
ctx: ReqCtx,
req: &Request<()>,
req: &Request<impl Body>,
key: &str,
part_number: Option<u64>,
) -> Result<Response<ResBody>, Error> {
@ -148,7 +167,7 @@ pub async fn handle_head(
/// Handle HEAD request for website
pub async fn handle_head_without_ctx(
garage: Arc<Garage>,
req: &Request<()>,
req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
@ -177,8 +196,8 @@ pub async fn handle_head_without_ctx(
_ => unreachable!(),
};
if let Some(res) = handle_http_precondition(object_version, version_meta, req)? {
return Ok(res);
if let Some(cached) = try_answer_cached(object_version, version_meta, req) {
return Ok(cached);
}
let (encryption, headers) =
@ -259,7 +278,7 @@ pub async fn handle_head_without_ctx(
/// Handle GET request
pub async fn handle_get(
ctx: ReqCtx,
req: &Request<()>,
req: &Request<impl Body>,
key: &str,
part_number: Option<u64>,
overrides: GetObjectOverrides,
@ -270,7 +289,7 @@ pub async fn handle_get(
/// Handle GET request
pub async fn handle_get_without_ctx(
garage: Arc<Garage>,
req: &Request<()>,
req: &Request<impl Body>,
bucket_id: Uuid,
key: &str,
part_number: Option<u64>,
@ -299,8 +318,8 @@ pub async fn handle_get_without_ctx(
ObjectVersionData::FirstBlock(meta, _) => meta,
};
if let Some(res) = handle_http_precondition(last_v, last_v_meta, req)? {
return Ok(res);
if let Some(cached) = try_answer_cached(last_v, last_v_meta, req) {
return Ok(cached);
}
let (enc, headers) =
@ -321,12 +340,7 @@ pub async fn handle_get_without_ctx(
enc,
&headers,
pn,
ChecksumMode {
// TODO: for multipart uploads, checksums of each part should be stored
// so that we can return the corresponding checksum here
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
enabled: false,
},
checksum_mode,
)
.await
}
@ -340,12 +354,7 @@ pub async fn handle_get_without_ctx(
&headers,
range.start,
range.start + range.length,
ChecksumMode {
// TODO: for range queries that align with part boundaries,
// we should return the saved checksum of the part
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
enabled: false,
},
checksum_mode,
)
.await
}
@ -568,7 +577,7 @@ async fn handle_get_part(
}
fn parse_range_header(
req: &Request<()>,
req: &Request<impl Body>,
total_size: u64,
) -> Result<Option<http_range::HttpRange>, Error> {
let range = match req.headers().get(RANGE) {
@ -609,7 +618,7 @@ struct ChecksumMode {
enabled: bool,
}
fn checksum_mode(req: &Request<()>) -> ChecksumMode {
fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode {
ChecksumMode {
enabled: req
.headers()
@ -742,116 +751,3 @@ fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error {
format!("Error while reading object data: {}", e),
)
}
// ----
pub struct PreconditionHeaders {
if_match: Option<Vec<String>>,
if_modified_since: Option<SystemTime>,
if_none_match: Option<Vec<String>>,
if_unmodified_since: Option<SystemTime>,
}
impl PreconditionHeaders {
fn parse<B>(req: &Request<B>) -> Result<Self, Error> {
Self::parse_with(
req.headers(),
&IF_MATCH,
&IF_NONE_MATCH,
&IF_MODIFIED_SINCE,
&IF_UNMODIFIED_SINCE,
)
}
pub(crate) fn parse_copy_source<B>(req: &Request<B>) -> Result<Self, Error> {
Self::parse_with(
req.headers(),
&X_AMZ_COPY_SOURCE_IF_MATCH,
&X_AMZ_COPY_SOURCE_IF_NONE_MATCH,
&X_AMZ_COPY_SOURCE_IF_MODIFIED_SINCE,
&X_AMZ_COPY_SOURCE_IF_UNMODIFIED_SINCE,
)
}
fn parse_with(
headers: &HeaderMap,
hdr_if_match: &HeaderName,
hdr_if_none_match: &HeaderName,
hdr_if_modified_since: &HeaderName,
hdr_if_unmodified_since: &HeaderName,
) -> Result<Self, Error> {
Ok(Self {
if_match: headers
.get(hdr_if_match)
.map(|x| x.to_str())
.transpose()?
.map(|x| {
x.split(',')
.map(|m| m.trim().trim_matches('"').to_string())
.collect::<Vec<_>>()
}),
if_none_match: headers
.get(hdr_if_none_match)
.map(|x| x.to_str())
.transpose()?
.map(|x| {
x.split(',')
.map(|m| m.trim().trim_matches('"').to_string())
.collect::<Vec<_>>()
}),
if_modified_since: headers
.get(hdr_if_modified_since)
.map(|x| x.to_str())
.transpose()?
.map(httpdate::parse_http_date)
.transpose()
.ok_or_bad_request("Invalid date in if-modified-since")?,
if_unmodified_since: headers
.get(hdr_if_unmodified_since)
.map(|x| x.to_str())
.transpose()?
.map(httpdate::parse_http_date)
.transpose()
.ok_or_bad_request("Invalid date in if-unmodified-since")?,
})
}
fn check(&self, v: &ObjectVersion, etag: &str) -> Result<Option<StatusCode>, Error> {
let v_date = UNIX_EPOCH + Duration::from_millis(v.timestamp);
// Implemented from https://datatracker.ietf.org/doc/html/rfc7232#section-6
if let Some(im) = &self.if_match {
// Step 1: if-match is present
if !im.iter().any(|x| x == etag || x == "*") {
return Ok(Some(StatusCode::PRECONDITION_FAILED));
}
} else if let Some(ius) = &self.if_unmodified_since {
// Step 2: if-unmodified-since is present, and if-match is absent
if v_date > *ius {
return Ok(Some(StatusCode::PRECONDITION_FAILED));
}
}
if let Some(inm) = &self.if_none_match {
// Step 3: if-none-match is present
if inm.iter().any(|x| x == etag || x == "*") {
return Ok(Some(StatusCode::NOT_MODIFIED));
}
} else if let Some(ims) = &self.if_modified_since {
// Step 4: if-modified-since is present, and if-none-match is absent
if v_date <= *ims {
return Ok(Some(StatusCode::NOT_MODIFIED));
}
}
Ok(None)
}
pub(crate) fn check_copy_source(&self, v: &ObjectVersion, etag: &str) -> Result<(), Error> {
match self.check(v, etag)? {
Some(_) => Err(Error::PreconditionFailed),
None => Ok(()),
}
}
}

View file

@ -14,8 +14,9 @@ mod list;
mod multipart;
mod post_object;
mod put;
pub mod website;
mod website;
mod checksum;
mod encryption;
mod router;
pub mod xml;

View file

@ -1,10 +1,12 @@
use quick_xml::de::from_reader;
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
@ -14,6 +16,7 @@ use garage_model::bucket_table::{
parse_lifecycle_date, Bucket, LifecycleExpiration as GarageLifecycleExpiration,
LifecycleFilter as GarageLifecycleFilter, LifecycleRule as GarageLifecycleRule,
};
use garage_util::data::*;
pub async fn handle_get_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
@ -53,6 +56,7 @@ pub async fn handle_delete_lifecycle(ctx: ReqCtx) -> Result<Response<ResBody>, E
pub async fn handle_put_lifecycle(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -61,7 +65,11 @@ pub async fn handle_put_lifecycle(
..
} = ctx;
let body = req.into_body().collect().await?;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let conf: LifecycleConfiguration = from_reader(&body as &[u8])?;
let config = conf

View file

@ -1,20 +1,13 @@
use std::collections::HashMap;
use std::convert::{TryFrom, TryInto};
use std::hash::Hasher;
use std::convert::TryInto;
use std::sync::Arc;
use base64::prelude::*;
use crc32c::Crc32cHasher as Crc32c;
use crc32fast::Hasher as Crc32;
use futures::prelude::*;
use hyper::{Request, Response};
use md5::{Digest, Md5};
use sha1::Sha1;
use sha2::Sha256;
use garage_table::*;
use garage_util::data::*;
use garage_util::error::OkOrMessage;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
@ -23,9 +16,10 @@ use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::put::*;
@ -49,7 +43,7 @@ pub async fn handle_create_multipart_upload(
let upload_id = gen_uuid();
let timestamp = next_timestamp(existing_object.as_ref());
let headers = extract_metadata_headers(req.headers())?;
let headers = get_headers(req.headers())?;
let meta = ObjectVersionMetaInner {
headers,
checksum: None,
@ -100,6 +94,7 @@ pub async fn handle_put_part(
key: &str,
part_number: u64,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx { garage, .. } = &ctx;
@ -110,30 +105,17 @@ pub async fn handle_put_part(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
sha256: None,
sha256: content_sha256,
extra: request_checksum_value(req.headers())?,
};
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
let (req_head, mut req_body) = req.into_parts();
// Before we stream the body, configure the needed checksums.
req_body.add_expected_checksums(expected_checksums.clone());
// TODO: avoid parsing encryption headers twice...
if !EncryptionParams::new_from_headers(&garage, &req_head.headers)?.is_encrypted() {
// For non-encrypted objects, we need to compute the md5sum in all cases
// (even if content-md5 is not set), because it is used as an etag of the
// part, which is in turn used in the etag computation of the whole object
req_body.add_md5();
}
let (stream, stream_checksums) = req_body.streaming_with_checksums();
let stream = stream.map_err(Error::from);
let (req_head, req_body) = req.into_parts();
let stream = body_stream(req_body);
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
// Read first chuck, and at the same time try to get object to see if it exists
let ((_, object_version, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
@ -190,21 +172,21 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
let (total_size, _, _) = read_and_put_blocks(
let checksummer =
Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm);
let (total_size, checksums, _) = read_and_put_blocks(
&ctx,
&version,
encryption,
part_number,
first_block,
chunker,
Checksummer::new(),
&mut chunker,
checksummer,
)
.await?;
// Verify that checksums match
let checksums = stream_checksums
.await
.ok_or_internal_error("checksum calculation")??;
// Verify that checksums map
checksums.verify(&expected_checksums)?;
// Store part etag in version
let etag = encryption.etag_from_md5(&checksums.md5);
@ -266,6 +248,7 @@ pub async fn handle_complete_multipart_upload(
req: Request<ReqBody>,
key: &str,
upload_id: &str,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -277,7 +260,11 @@ pub async fn handle_complete_multipart_upload(
let expected_checksum = request_checksum_value(&req_head.headers)?;
let body = req_body.collect().await?;
let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let body_xml = roxmltree::Document::parse(std::str::from_utf8(&body)?)?;
let body_list_of_parts = parse_complete_multipart_upload_body(&body_xml)
@ -615,99 +602,3 @@ fn parse_complete_multipart_upload_body(
Some(parts)
}
// ====== checksummer ====
#[derive(Default)]
pub(crate) struct MultipartChecksummer {
pub md5: Md5,
pub extra: Option<MultipartExtraChecksummer>,
}
pub(crate) enum MultipartExtraChecksummer {
Crc32(Crc32),
Crc32c(Crc32c),
Sha1(Sha1),
Sha256(Sha256),
}
impl MultipartChecksummer {
pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self {
Self {
md5: Md5::new(),
extra: match algo {
None => None,
Some(ChecksumAlgorithm::Crc32) => {
Some(MultipartExtraChecksummer::Crc32(Crc32::new()))
}
Some(ChecksumAlgorithm::Crc32c) => {
Some(MultipartExtraChecksummer::Crc32c(Crc32c::default()))
}
Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())),
Some(ChecksumAlgorithm::Sha256) => {
Some(MultipartExtraChecksummer::Sha256(Sha256::new()))
}
},
}
}
pub(crate) fn update(
&mut self,
etag: &str,
checksum: Option<ChecksumValue>,
) -> Result<(), Error> {
self.md5
.update(&hex::decode(&etag).ok_or_message("invalid etag hex")?);
match (&mut self.extra, checksum) {
(None, _) => (),
(
Some(MultipartExtraChecksummer::Crc32(ref mut crc32)),
Some(ChecksumValue::Crc32(x)),
) => {
crc32.update(&x);
}
(
Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)),
Some(ChecksumValue::Crc32c(x)),
) => {
crc32c.write(&x);
}
(Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => {
sha1.update(&x);
}
(
Some(MultipartExtraChecksummer::Sha256(ref mut sha256)),
Some(ChecksumValue::Sha256(x)),
) => {
sha256.update(&x);
}
(Some(_), b) => {
return Err(Error::internal_error(format!(
"part checksum was not computed correctly, got: {:?}",
b
)))
}
}
Ok(())
}
pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) {
let md5 = self.md5.finalize()[..].try_into().unwrap();
let extra = match self.extra {
None => None,
Some(MultipartExtraChecksummer::Crc32(crc32)) => {
Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize())))
}
Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c(
u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()),
)),
Some(MultipartExtraChecksummer::Sha1(sha1)) => {
Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap()))
}
Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256(
sha256.finalize()[..].try_into().unwrap(),
)),
};
(md5, extra)
}
}

View file

@ -18,13 +18,13 @@ use garage_model::s3::object_table::*;
use garage_api_common::cors::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::checksum::*;
use garage_api_common::signature::payload::{verify_v4, Authorization};
use crate::api_server::ResBody;
use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::put::{extract_metadata_headers, save_stream, ChecksumMode};
use crate::put::{get_headers, save_stream, ChecksumMode};
use crate::xml as s3_xml;
pub async fn handle_post_object(
@ -216,9 +216,8 @@ pub async fn handle_post_object(
// if we ever start supporting ACLs, we likely want to map "acl" to x-amz-acl" somewhere
// around here to make sure the rest of the machinery takes our acl into account.
let headers = extract_metadata_headers(&params)?;
let headers = get_headers(&params)?;
let checksum_algorithm = request_checksum_algorithm(&params)?;
let expected_checksums = ExpectedChecksums {
md5: params
.get("content-md5")
@ -226,9 +225,7 @@ pub async fn handle_post_object(
.transpose()?
.map(str::to_string),
sha256: None,
extra: checksum_algorithm
.map(|algo| extract_checksum_value(&params, algo))
.transpose()?,
extra: request_checksum_algorithm_value(&params)?,
};
let meta = ObjectVersionMetaInner {

View file

@ -31,13 +31,11 @@ use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::body::StreamingChecksumReceiver;
use garage_api_common::signature::checksum::*;
use crate::api_server::{ReqBody, ResBody};
use crate::checksum::*;
use crate::encryption::EncryptionParams;
use crate::error::*;
use crate::website::X_AMZ_WEBSITE_REDIRECT_LOCATION;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
@ -50,10 +48,6 @@ pub(crate) struct SaveStreamResult {
pub(crate) enum ChecksumMode<'a> {
Verify(&'a ExpectedChecksums),
VerifyFrom {
checksummer: StreamingChecksumReceiver,
trailer_algo: Option<ChecksumAlgorithm>,
},
Calculate(Option<ChecksumAlgorithm>),
}
@ -61,9 +55,10 @@ pub async fn handle_put(
ctx: ReqCtx,
req: Request<ReqBody>,
key: &String,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
// Retrieve interesting headers from request
let headers = extract_metadata_headers(req.headers())?;
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
let expected_checksums = ExpectedChecksums {
@ -71,10 +66,9 @@ pub async fn handle_put(
Some(x) => Some(x.to_str()?.to_string()),
None => None,
},
sha256: None,
sha256: content_sha256,
extra: request_checksum_value(req.headers())?,
};
let trailer_checksum_algorithm = request_trailer_checksum_algorithm(req.headers())?;
let meta = ObjectVersionMetaInner {
headers,
@ -84,19 +78,7 @@ pub async fn handle_put(
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
// The request body is a special ReqBody object (see garage_api_common::signature::body)
// which supports calculating checksums while streaming the data.
// Before we start streaming, we configure it to calculate all the checksums we need.
let mut req_body = req.into_body();
req_body.add_expected_checksums(expected_checksums.clone());
if !encryption.is_encrypted() {
// For non-encrypted objects, we need to compute the md5sum in all cases
// (even if content-md5 is not set), because it is used as the object etag
req_body.add_md5();
}
let (stream, checksummer) = req_body.streaming_with_checksums();
let stream = stream.map_err(Error::from);
let stream = body_stream(req.into_body());
let res = save_stream(
&ctx,
@ -104,10 +86,7 @@ pub async fn handle_put(
encryption,
stream,
key,
ChecksumMode::VerifyFrom {
checksummer,
trailer_algo: trailer_checksum_algorithm,
},
ChecksumMode::Verify(&expected_checksums),
)
.await?;
@ -143,15 +122,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref());
let mut checksummer = match &checksum_mode {
let mut checksummer = match checksum_mode {
ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()),
ChecksumMode::Calculate(algo) => {
Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(*algo)
}
ChecksumMode::VerifyFrom { .. } => {
// Checksums are calculated by the garage_api_common::signature module
// so here we can just have an empty checksummer that does nothing
Checksummer::new()
Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo)
}
};
@ -159,7 +133,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// as "inline data". We can then return immediately.
if first_block.len() < INLINE_THRESHOLD {
checksummer.update(&first_block);
let mut checksums = checksummer.finalize();
let checksums = checksummer.finalize();
match checksum_mode {
ChecksumMode::Verify(expected) => {
@ -168,18 +142,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo);
}
ChecksumMode::VerifyFrom {
checksummer,
trailer_algo,
} => {
drop(chunker);
checksums = checksummer
.await
.ok_or_internal_error("checksum calculation")??;
if let Some(algo) = trailer_algo {
meta.checksum = checksums.extract(Some(algo));
}
}
};
let size = first_block.len() as u64;
@ -251,13 +213,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
garage.version_table.insert(&version).await?;
// Transfer data
let (total_size, mut checksums, first_block_hash) = read_and_put_blocks(
let (total_size, checksums, first_block_hash) = read_and_put_blocks(
ctx,
&version,
encryption,
1,
first_block,
chunker,
&mut chunker,
checksummer,
)
.await?;
@ -270,17 +232,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ChecksumMode::Calculate(algo) => {
meta.checksum = checksums.extract(algo);
}
ChecksumMode::VerifyFrom {
checksummer,
trailer_algo,
} => {
checksums = checksummer
.await
.ok_or_internal_error("checksum calculation")??;
if let Some(algo) = trailer_algo {
meta.checksum = checksums.extract(Some(algo));
}
}
};
// Verify quotas are respsected
@ -381,7 +332,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
encryption: EncryptionParams,
part_number: u64,
first_block: Bytes,
mut chunker: StreamChunker<S>,
chunker: &mut StreamChunker<S>,
checksummer: Checksummer,
) -> Result<(u64, Checksums, Hash), Error> {
let tracer = opentelemetry::global::tracer("garage");
@ -650,9 +601,7 @@ impl Drop for InterruptedCleanup {
// ============ helpers ============
pub(crate) fn extract_metadata_headers(
headers: &HeaderMap<HeaderValue>,
) -> Result<HeaderList, Error> {
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> {
let mut ret = Vec::new();
// Preserve standard headers
@ -678,18 +627,6 @@ pub(crate) fn extract_metadata_headers(
std::str::from_utf8(value.as_bytes())?.to_string(),
));
}
if name == X_AMZ_WEBSITE_REDIRECT_LOCATION {
let value = std::str::from_utf8(value.as_bytes())?.to_string();
if !(value.starts_with("/")
|| value.starts_with("http://")
|| value.starts_with("https://"))
{
return Err(Error::bad_request(format!(
"Invalid {X_AMZ_WEBSITE_REDIRECT_LOCATION} header",
)));
}
ret.push((X_AMZ_WEBSITE_REDIRECT_LOCATION.to_string(), value));
}
}
Ok(ret)

View file

@ -1,19 +1,19 @@
use quick_xml::de::from_reader;
use hyper::{header::HeaderName, Request, Response, StatusCode};
use http_body_util::BodyExt;
use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use garage_model::bucket_table::*;
use garage_util::data::*;
use garage_api_common::helpers::*;
use garage_api_common::signature::verify_signed_content;
use crate::api_server::{ReqBody, ResBody};
use crate::error::*;
use crate::xml::{to_xml_with_header, xmlns_tag, IntValue, Value};
pub const X_AMZ_WEBSITE_REDIRECT_LOCATION: HeaderName =
HeaderName::from_static("x-amz-website-redirect-location");
pub async fn handle_get_website(ctx: ReqCtx) -> Result<Response<ResBody>, Error> {
let ReqCtx { bucket_params, .. } = ctx;
if let Some(website) = bucket_params.website_config.get() {
@ -61,6 +61,7 @@ pub async fn handle_delete_website(ctx: ReqCtx) -> Result<Response<ResBody>, Err
pub async fn handle_put_website(
ctx: ReqCtx,
req: Request<ReqBody>,
content_sha256: Option<Hash>,
) -> Result<Response<ResBody>, Error> {
let ReqCtx {
garage,
@ -69,7 +70,11 @@ pub async fn handle_put_website(
..
} = ctx;
let body = req.into_body().collect().await?;
let body = BodyExt::collect(req.into_body()).await?.to_bytes();
if let Some(content_sha256) = content_sha256 {
verify_signed_content(content_sha256, &body[..])?;
}
let conf: WebsiteConfiguration = from_reader(&body as &[u8])?;
conf.validate()?;

View file

@ -71,12 +71,10 @@ hyper-util.workspace = true
mktemp.workspace = true
sha2.workspace = true
static_init.workspace = true
assert-json-diff.workspace = true
serde_json.workspace = true
base64.workspace = true
crc32fast.workspace = true
k2v-client.workspace = true

View file

@ -12,7 +12,7 @@ pub fn build_client(key: &Key) -> Client {
.endpoint_url(format!("http://127.0.0.1:{}", DEFAULT_PORT))
.region(super::REGION)
.credentials_provider(credentials)
.behavior_version(BehaviorVersion::v2024_03_28())
.behavior_version(BehaviorVersion::v2023_11_09())
.build();
Client::from_conf(config)

View file

@ -192,13 +192,16 @@ impl<'a> RequestBuilder<'a> {
.collect::<HeaderMap>();
let date = now.format(signature::LONG_DATETIME).to_string();
all_headers.insert(signature::X_AMZ_DATE, HeaderValue::from_str(&date).unwrap());
all_headers.insert(
signature::payload::X_AMZ_DATE,
HeaderValue::from_str(&date).unwrap(),
);
all_headers.insert(HOST, HeaderValue::from_str(&host).unwrap());
let body_sha = match &self.body_signature {
let body_sha = match self.body_signature {
BodySignature::Unsigned => "UNSIGNED-PAYLOAD".to_owned(),
BodySignature::Classic => hex::encode(garage_util::data::sha256sum(&self.body)),
BodySignature::Streaming { chunk_size } => {
BodySignature::Streaming(size) => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
@ -213,59 +216,18 @@ impl<'a> RequestBuilder<'a> {
// code.
all_headers.insert(
CONTENT_LENGTH,
to_streaming_body(
&self.body,
*chunk_size,
String::new(),
signer.clone(),
now,
"",
)
.len()
.to_string()
.try_into()
.unwrap(),
to_streaming_body(&self.body, size, String::new(), signer.clone(), now, "")
.len()
.to_string()
.try_into()
.unwrap(),
);
"STREAMING-AWS4-HMAC-SHA256-PAYLOAD".to_owned()
}
BodySignature::StreamingUnsignedTrailer {
chunk_size,
trailer_algorithm,
trailer_value,
} => {
all_headers.insert(
CONTENT_ENCODING,
HeaderValue::from_str("aws-chunked").unwrap(),
);
all_headers.insert(
HeaderName::from_static("x-amz-decoded-content-length"),
HeaderValue::from_str(&self.body.len().to_string()).unwrap(),
);
all_headers.insert(
HeaderName::from_static("x-amz-trailer"),
HeaderValue::from_str(&trailer_algorithm).unwrap(),
);
all_headers.insert(
CONTENT_LENGTH,
to_streaming_unsigned_trailer_body(
&self.body,
*chunk_size,
&trailer_algorithm,
&trailer_value,
)
.len()
.to_string()
.try_into()
.unwrap(),
);
"STREAMING-UNSIGNED-PAYLOAD-TRAILER".to_owned()
}
};
all_headers.insert(
signature::X_AMZ_CONTENT_SHA256,
signature::payload::X_AMZ_CONTENT_SH256,
HeaderValue::from_str(&body_sha).unwrap(),
);
@ -314,26 +276,10 @@ impl<'a> RequestBuilder<'a> {
let mut request = Request::builder();
*request.headers_mut().unwrap() = all_headers;
let body = match &self.body_signature {
BodySignature::Streaming { chunk_size } => to_streaming_body(
&self.body,
*chunk_size,
signature,
streaming_signer,
now,
&scope,
),
BodySignature::StreamingUnsignedTrailer {
chunk_size,
trailer_algorithm,
trailer_value,
} => to_streaming_unsigned_trailer_body(
&self.body,
*chunk_size,
&trailer_algorithm,
&trailer_value,
),
_ => self.body.clone(),
let body = if let BodySignature::Streaming(size) = self.body_signature {
to_streaming_body(&self.body, size, signature, streaming_signer, now, &scope)
} else {
self.body.clone()
};
let request = request
.uri(uri)
@ -362,14 +308,7 @@ impl<'a> RequestBuilder<'a> {
pub enum BodySignature {
Unsigned,
Classic,
Streaming {
chunk_size: usize,
},
StreamingUnsignedTrailer {
chunk_size: usize,
trailer_algorithm: String,
trailer_value: String,
},
Streaming(usize),
}
fn query_param_to_string(params: &HashMap<String, Option<String>>) -> String {
@ -424,26 +363,3 @@ fn to_streaming_body(
res
}
fn to_streaming_unsigned_trailer_body(
body: &[u8],
chunk_size: usize,
trailer_algorithm: &str,
trailer_value: &str,
) -> Vec<u8> {
let mut res = Vec::with_capacity(body.len());
for chunk in body.chunks(chunk_size) {
let header = format!("{:x}\r\n", chunk.len());
res.extend_from_slice(header.as_bytes());
res.extend_from_slice(chunk);
res.extend_from_slice(b"\r\n");
}
res.extend_from_slice(b"0\r\n");
res.extend_from_slice(trailer_algorithm.as_bytes());
res.extend_from_slice(b":");
res.extend_from_slice(trailer_value.as_bytes());
res.extend_from_slice(b"\n\r\n\r\n");
res
}

View file

@ -99,10 +99,7 @@ api_bind_addr = "127.0.0.1:{admin_port}"
.arg("server")
.stdout(stdout)
.stderr(stderr)
.env(
"RUST_LOG",
"garage=debug,garage_api_common=trace,garage_api_s3=trace",
)
.env("RUST_LOG", "garage=debug,garage_api=trace")
.spawn()
.expect("Could not start garage");

View file

@ -1,6 +1,5 @@
use crate::common;
use aws_sdk_s3::error::SdkError;
use aws_sdk_s3::primitives::{ByteStream, DateTime};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{Delete, ObjectIdentifier};
const STD_KEY: &str = "hello world";
@ -126,129 +125,6 @@ async fn test_putobject() {
}
}
#[tokio::test]
async fn test_precondition() {
let ctx = common::context();
let bucket = ctx.create_bucket("precondition");
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let etag2 = "\"ae4984b984cd984fe98d4efa954dce98\"";
let data = ByteStream::from_static(BODY);
let r = ctx
.client
.put_object()
.bucket(&bucket)
.key(STD_KEY)
.body(data)
.send()
.await
.unwrap();
assert_eq!(r.e_tag.unwrap().as_str(), etag);
let last_modified;
{
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_match(etag)
.send()
.await
.unwrap();
assert_eq!(o.e_tag.as_ref().unwrap().as_str(), etag);
last_modified = o.last_modified.unwrap();
let err = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_match(etag2)
.send()
.await;
assert!(
matches!(err, Err(SdkError::ServiceError(se)) if se.raw().status().as_u16() == 412)
);
}
{
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_none_match(etag2)
.send()
.await
.unwrap();
assert_eq!(o.e_tag.as_ref().unwrap().as_str(), etag);
let err = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_none_match(etag)
.send()
.await;
assert!(
matches!(err, Err(SdkError::ServiceError(se)) if se.raw().status().as_u16() == 304)
);
}
let older_date = DateTime::from_secs_f64(last_modified.as_secs_f64() - 10.0);
let newer_date = DateTime::from_secs_f64(last_modified.as_secs_f64() + 10.0);
{
let err = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_modified_since(newer_date)
.send()
.await;
assert!(
matches!(err, Err(SdkError::ServiceError(se)) if se.raw().status().as_u16() == 304)
);
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_modified_since(older_date)
.send()
.await
.unwrap();
assert_eq!(o.e_tag.as_ref().unwrap().as_str(), etag);
}
{
let err = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_unmodified_since(older_date)
.send()
.await;
assert!(
matches!(err, Err(SdkError::ServiceError(se)) if se.raw().status().as_u16() == 412)
);
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.if_unmodified_since(newer_date)
.send()
.await
.unwrap();
assert_eq!(o.e_tag.as_ref().unwrap().as_str(), etag);
}
}
#[tokio::test]
async fn test_getobject() {
let ctx = common::context();
@ -313,14 +189,12 @@ async fn test_getobject() {
#[tokio::test]
async fn test_metadata() {
use aws_sdk_s3::primitives::{DateTime, DateTimeFormat};
let ctx = common::context();
let bucket = ctx.create_bucket("testmetadata");
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let exp = DateTime::from_secs(10000000000);
let exp2 = DateTime::from_secs(10000500000);
let exp = aws_sdk_s3::primitives::DateTime::from_secs(10000000000);
let exp2 = aws_sdk_s3::primitives::DateTime::from_secs(10000500000);
{
// Note. The AWS client SDK adds a Content-Type header
@ -353,7 +227,7 @@ async fn test_metadata() {
assert_eq!(o.content_disposition, None);
assert_eq!(o.content_encoding, None);
assert_eq!(o.content_language, None);
assert_eq!(o.expires_string, None);
assert_eq!(o.expires, None);
assert_eq!(o.metadata.unwrap_or_default().len(), 0);
let o = ctx
@ -376,10 +250,7 @@ async fn test_metadata() {
assert_eq!(o.content_disposition.unwrap().as_str(), "cddummy");
assert_eq!(o.content_encoding.unwrap().as_str(), "cedummy");
assert_eq!(o.content_language.unwrap().as_str(), "cldummy");
assert_eq!(
o.expires_string.unwrap(),
exp.fmt(DateTimeFormat::HttpDate).unwrap()
);
assert_eq!(o.expires.unwrap(), exp);
}
{
@ -417,10 +288,7 @@ async fn test_metadata() {
assert_eq!(o.content_disposition.unwrap().as_str(), "cdtest");
assert_eq!(o.content_encoding.unwrap().as_str(), "cetest");
assert_eq!(o.content_language.unwrap().as_str(), "cltest");
assert_eq!(
o.expires_string.unwrap(),
exp2.fmt(DateTimeFormat::HttpDate).unwrap()
);
assert_eq!(o.expires.unwrap(), exp2);
let mut meta = o.metadata.unwrap();
assert_eq!(meta.remove("testmeta").unwrap(), "hello people");
assert_eq!(meta.remove("nice-unicode-meta").unwrap(), "宅配便");
@ -446,10 +314,7 @@ async fn test_metadata() {
assert_eq!(o.content_disposition.unwrap().as_str(), "cddummy");
assert_eq!(o.content_encoding.unwrap().as_str(), "cedummy");
assert_eq!(o.content_language.unwrap().as_str(), "cldummy");
assert_eq!(
o.expires_string.unwrap(),
exp.fmt(DateTimeFormat::HttpDate).unwrap()
);
assert_eq!(o.expires.unwrap(), exp);
}
}

View file

@ -1,8 +1,5 @@
use std::collections::HashMap;
use base64::prelude::*;
use crc32fast::Hasher as Crc32;
use crate::common;
use crate::common::ext::CommandExt;
use common::custom_requester::BodySignature;
@ -24,7 +21,7 @@ async fn test_putobject_streaming() {
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
let res = ctx
let _ = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@ -32,11 +29,10 @@ async fn test_putobject_streaming() {
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
.body_signature(BodySignature::Streaming { chunk_size: 10 })
.body_signature(BodySignature::Streaming(10))
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
@ -69,14 +65,7 @@ async fn test_putobject_streaming() {
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let mut crc32 = Crc32::new();
crc32.update(&BODY[..]);
let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
let mut headers = HashMap::new();
headers.insert("x-amz-checksum-crc32".to_owned(), crc32.clone());
let res = ctx
let _ = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
@ -84,13 +73,11 @@ async fn test_putobject_streaming() {
//fail
.path("abc".to_owned())
.vhost_style(true)
.signed_headers(headers)
.body(BODY.to_vec())
.body_signature(BodySignature::Streaming { chunk_size: 16 })
.body_signature(BodySignature::Streaming(16))
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
@ -101,7 +88,6 @@ async fn test_putobject_streaming() {
.bucket(&bucket)
//.key(CTRL_KEY)
.key("abc")
.checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
.send()
.await
.unwrap();
@ -112,142 +98,6 @@ async fn test_putobject_streaming() {
assert_eq!(o.content_length.unwrap(), 62);
assert_eq!(o.parts_count, None);
assert_eq!(o.tag_count, None);
assert_eq!(o.checksum_crc32.unwrap(), crc32);
}
}
#[tokio::test]
async fn test_putobject_streaming_unsigned_trailer() {
let ctx = common::context();
let bucket = ctx.create_bucket("putobject-streaming-unsigned-trailer");
{
// Send an empty object (can serve as a directory marker)
// with a content type
let etag = "\"d41d8cd98f00b204e9800998ecf8427e\"";
let content_type = "text/csv";
let mut headers = HashMap::new();
headers.insert("content-type".to_owned(), content_type.to_owned());
let empty_crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(Crc32::new().finalize())[..]);
let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
.path(STD_KEY.to_owned())
.signed_headers(headers)
.vhost_style(true)
.body(vec![])
.body_signature(BodySignature::StreamingUnsignedTrailer {
chunk_size: 10,
trailer_algorithm: "x-amz-checksum-crc32".into(),
trailer_value: empty_crc32,
})
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// We return a version ID here
// We should check if Amazon is returning one when versioning is not enabled
// assert!(r.version_id.is_some());
//let _version = r.version_id.unwrap();
let o = ctx
.client
.get_object()
.bucket(&bucket)
.key(STD_KEY)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, b"");
assert_eq!(o.e_tag.unwrap(), etag);
// We do not return version ID
// We should check if Amazon is returning one when versioning is not enabled
// assert_eq!(o.version_id.unwrap(), _version);
assert_eq!(o.content_type.unwrap(), content_type);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length.unwrap(), 0);
assert_eq!(o.parts_count, None);
assert_eq!(o.tag_count, None);
}
{
let etag = "\"46cf18a9b447991b450cad3facf5937e\"";
let mut crc32 = Crc32::new();
crc32.update(&BODY[..]);
let crc32 = BASE64_STANDARD.encode(&u32::to_be_bytes(crc32.finalize())[..]);
// try sending with wrong crc32, check that it fails
let err_res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
//.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
//fail
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
.body_signature(BodySignature::StreamingUnsignedTrailer {
chunk_size: 16,
trailer_algorithm: "x-amz-checksum-crc32".into(),
trailer_value: "2Yp9Yw==".into(),
})
.send()
.await
.unwrap();
assert!(
err_res.status().is_client_error(),
"got response: {:?}",
err_res
);
let res = ctx
.custom_request
.builder(bucket.clone())
.method(Method::PUT)
//.path(CTRL_KEY.to_owned()) at the moment custom_request does not encode url so this
//fail
.path("abc".to_owned())
.vhost_style(true)
.body(BODY.to_vec())
.body_signature(BodySignature::StreamingUnsignedTrailer {
chunk_size: 16,
trailer_algorithm: "x-amz-checksum-crc32".into(),
trailer_value: crc32.clone(),
})
.send()
.await
.unwrap();
assert!(res.status().is_success(), "got response: {:?}", res);
// assert_eq!(r.e_tag.unwrap().as_str(), etag);
// assert!(r.version_id.is_some());
let o = ctx
.client
.get_object()
.bucket(&bucket)
//.key(CTRL_KEY)
.key("abc")
.checksum_mode(aws_sdk_s3::types::ChecksumMode::Enabled)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, BODY);
assert_eq!(o.e_tag.unwrap(), etag);
assert!(o.last_modified.is_some());
assert_eq!(o.content_length.unwrap(), 62);
assert_eq!(o.parts_count, None);
assert_eq!(o.tag_count, None);
assert_eq!(o.checksum_crc32.unwrap(), crc32);
}
}
@ -269,7 +119,7 @@ async fn test_create_bucket_streaming() {
.custom_request
.builder(bucket.to_owned())
.method(Method::PUT)
.body_signature(BodySignature::Streaming { chunk_size: 10 })
.body_signature(BodySignature::Streaming(10))
.send()
.await
.unwrap();
@ -324,7 +174,7 @@ async fn test_put_website_streaming() {
.method(Method::PUT)
.query_params(query)
.body(website_config.as_bytes().to_vec())
.body_signature(BodySignature::Streaming { chunk_size: 10 })
.body_signature(BodySignature::Streaming(10))
.send()
.await
.unwrap();

View file

@ -11,7 +11,6 @@ use http::{Request, StatusCode};
use http_body_util::BodyExt;
use http_body_util::Full as FullBody;
use hyper::body::Bytes;
use hyper::header::LOCATION;
use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use serde_json::json;
@ -296,33 +295,6 @@ async fn test_website_s3_api() {
);
}
// Test x-amz-website-redirect-location
{
ctx.client
.put_object()
.bucket(&bucket)
.key("test-redirect.html")
.website_redirect_location("https://perdu.com")
.send()
.await
.unwrap();
let req = Request::builder()
.method("GET")
.uri(format!(
"http://127.0.0.1:{}/test-redirect.html",
ctx.garage.web_port
))
.header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::new(Bytes::new()))
.unwrap();
let resp = client.request(req).await.unwrap();
assert_eq!(resp.status(), StatusCode::MOVED_PERMANENTLY);
assert_eq!(resp.headers().get(LOCATION).unwrap(), "https://perdu.com");
}
// Test CORS with an allowed preflight request
{
let req = Request::builder()

View file

@ -650,11 +650,8 @@ impl LayoutVersion {
let mut cost = CostFunction::new();
for (p, assoc_p) in prev_assign.iter().enumerate() {
for n in assoc_p.iter() {
if let Some(&node_zone) =
zone_to_id.get(self.expect_get_node_zone(&self.node_id_vec[*n]))
{
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
let node_zone = zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])];
cost.insert((Vertex::PZ(p, node_zone), Vertex::N(*n)), -1);
}
}
@ -754,11 +751,8 @@ impl LayoutVersion {
if let Some(prev_assign) = prev_assign_opt {
let mut old_zones_of_p = Vec::<usize>::new();
for n in prev_assign[p].iter() {
if let Some(&zone_id) =
zone_to_id.get(self.expect_get_node_zone(&self.node_id_vec[*n]))
{
old_zones_of_p.push(zone_id);
}
old_zones_of_p
.push(zone_to_id[self.expect_get_node_zone(&self.node_id_vec[*n])]);
}
if !old_zones_of_p.contains(&z) {
new_partitions_zone[z] += 1;

View file

@ -1,13 +1,13 @@
use std::fs::{self, Permissions};
use std::os::unix::prelude::PermissionsExt;
use std::sync::Arc;
use std::{convert::Infallible, sync::Arc};
use tokio::net::{TcpListener, UnixListener};
use tokio::sync::watch;
use hyper::{
body::Incoming as IncomingBody,
header::{HeaderValue, HOST, LOCATION},
header::{HeaderValue, HOST},
Method, Request, Response, StatusCode,
};
@ -29,7 +29,6 @@ use garage_api_s3::error::{
CommonErrorDerivative, Error as ApiError, OkOrBadRequest, OkOrInternalError,
};
use garage_api_s3::get::{handle_get_without_ctx, handle_head_without_ctx};
use garage_api_s3::website::X_AMZ_WEBSITE_REDIRECT_LOCATION;
use garage_model::garage::Garage;
@ -164,8 +163,6 @@ impl WebServer {
metrics_tags.push(KeyValue::new("host", host_header.clone()));
}
let req = req.map(|_| ());
// The actual handler
let res = self
.serve_file(&req)
@ -221,7 +218,7 @@ impl WebServer {
async fn serve_file(
self: &Arc<Self>,
req: &Request<()>,
req: &Request<IncomingBody>,
) -> Result<Response<BoxBody<ApiError>>, Error> {
// Get http authority string (eg. [::1]:3902 or garage.tld:80)
let authority = req
@ -295,15 +292,7 @@ impl WebServer {
{
Ok(Response::builder()
.status(StatusCode::FOUND)
.header(LOCATION, url)
.body(empty_body())
.unwrap())
}
(Ok(ret), _) if ret.headers().contains_key(X_AMZ_WEBSITE_REDIRECT_LOCATION) => {
let redirect_location = ret.headers().get(X_AMZ_WEBSITE_REDIRECT_LOCATION).unwrap();
Ok(Response::builder()
.status(StatusCode::MOVED_PERMANENTLY)
.header(LOCATION, redirect_location)
.header("Location", url)
.body(empty_body())
.unwrap())
}
@ -333,7 +322,7 @@ impl WebServer {
// Create a fake HTTP request with path = the error document
let req2 = Request::builder()
.uri(format!("http://{}/{}", host, &error_document))
.body(())
.body(empty_body::<Infallible>())
.unwrap();
match handle_get_without_ctx(