forked from Deuxfleurs/garage
[s3-checksum] implement x-amz-checksum-* headers
This commit is contained in:
parent
7e0107c47d
commit
74949c69cb
22 changed files with 1227 additions and 340 deletions
8
Cargo.lock
generated
8
Cargo.lock
generated
|
@ -905,9 +905,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "crc32fast"
|
||||
version = "1.3.2"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"
|
||||
checksum = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa"
|
||||
dependencies = [
|
||||
"cfg-if",
|
||||
]
|
||||
|
@ -1346,6 +1346,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_bytes",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"sha2",
|
||||
"static_init",
|
||||
"structopt",
|
||||
|
@ -1367,6 +1368,8 @@ dependencies = [
|
|||
"base64 0.21.7",
|
||||
"bytes",
|
||||
"chrono",
|
||||
"crc32c",
|
||||
"crc32fast",
|
||||
"crypto-common",
|
||||
"err-derive",
|
||||
"form_urlencoded",
|
||||
|
@ -1400,6 +1403,7 @@ dependencies = [
|
|||
"serde",
|
||||
"serde_bytes",
|
||||
"serde_json",
|
||||
"sha1",
|
||||
"sha2",
|
||||
"tokio",
|
||||
"tokio-stream",
|
||||
|
|
16
Cargo.nix
16
Cargo.nix
|
@ -34,7 +34,7 @@ args@{
|
|||
ignoreLockHash,
|
||||
}:
|
||||
let
|
||||
nixifiedLockHash = "f73523af24b5164222da0a1c326ba65fa4a01b55751dd9ddab251334cfe20d13";
|
||||
nixifiedLockHash = "a49da9d5ef560672a34c1e004c0122e706a74fac512300f20858f136cd00582e";
|
||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||
lockHashIgnored = if ignoreLockHash
|
||||
|
@ -674,7 +674,7 @@ in
|
|||
aws_smithy_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-types."1.1.4" { inherit profileName; }).out;
|
||||
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out;
|
||||
crc32c = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32c."0.6.4" { inherit profileName; }).out;
|
||||
crc32fast = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.3.2" { inherit profileName; }).out;
|
||||
crc32fast = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.4.0" { inherit profileName; }).out;
|
||||
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
|
||||
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.11" { inherit profileName; }).out;
|
||||
http_body = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body."0.4.6" { inherit profileName; }).out;
|
||||
|
@ -694,7 +694,7 @@ in
|
|||
dependencies = {
|
||||
aws_smithy_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aws-smithy-types."1.1.4" { inherit profileName; }).out;
|
||||
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out;
|
||||
crc32fast = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.3.2" { inherit profileName; }).out;
|
||||
crc32fast = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.4.0" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
|
@ -1287,11 +1287,11 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.3.2" = overridableMkRustCrate (profileName: rec {
|
||||
"registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.4.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "crc32fast";
|
||||
version = "1.3.2";
|
||||
version = "1.4.0";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "b540bd8bc810d3885c6ea91e2018302f68baba2129ab3e88f32389ee9370880d"; };
|
||||
src = fetchCratesIo { inherit name version; sha256 = "b3855a8a784b474f333699ef2bbca9db2c4a1f6d9088a90a2d25b1eb53111eaa"; };
|
||||
features = builtins.concatLists [
|
||||
[ "default" ]
|
||||
[ "std" ]
|
||||
|
@ -1958,6 +1958,7 @@ in
|
|||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
|
||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.14" { inherit profileName; }).out;
|
||||
sha1 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha1."0.10.6" { inherit profileName; }).out;
|
||||
structopt = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; }).out;
|
||||
timeago = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".timeago."0.4.2" { inherit profileName; }).out;
|
||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
|
||||
|
@ -2003,6 +2004,8 @@ in
|
|||
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out;
|
||||
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out;
|
||||
chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.33" { inherit profileName; }).out;
|
||||
crc32c = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32c."0.6.4" { inherit profileName; }).out;
|
||||
crc32fast = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crc32fast."1.4.0" { inherit profileName; }).out;
|
||||
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
|
||||
err_derive = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; }).out;
|
||||
form_urlencoded = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".form_urlencoded."1.2.1" { inherit profileName; }).out;
|
||||
|
@ -2036,6 +2039,7 @@ in
|
|||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
|
||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.14" { inherit profileName; }).out;
|
||||
serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.113" { inherit profileName; }).out;
|
||||
sha1 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha1."0.10.6" { inherit profileName; }).out;
|
||||
sha2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.10.8" { inherit profileName; }).out;
|
||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
|
||||
tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" { inherit profileName; }).out;
|
||||
|
|
|
@ -43,6 +43,8 @@ bytes = "1.0"
|
|||
bytesize = "1.1"
|
||||
cfg-if = "1.0"
|
||||
chrono = "0.4"
|
||||
crc32fast = "1.4"
|
||||
crc32c = "0.6"
|
||||
crypto-common = "0.1"
|
||||
digest = "0.10"
|
||||
err-derive = "0.3"
|
||||
|
@ -62,6 +64,7 @@ parse_duration = "2.1"
|
|||
pin-project = "1.0.12"
|
||||
pnet_datalink = "0.34"
|
||||
rand = "0.8"
|
||||
sha1 = "0.10"
|
||||
sha2 = "0.10"
|
||||
timeago = { version = "0.4", default-features = false }
|
||||
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
|
||||
|
|
|
@ -28,6 +28,8 @@ async-trait.workspace = true
|
|||
base64.workspace = true
|
||||
bytes.workspace = true
|
||||
chrono.workspace = true
|
||||
crc32fast.workspace = true
|
||||
crc32c.workspace = true
|
||||
crypto-common.workspace = true
|
||||
err-derive.workspace = true
|
||||
hex.workspace = true
|
||||
|
@ -37,6 +39,7 @@ tracing.workspace = true
|
|||
md-5.workspace = true
|
||||
nom.workspace = true
|
||||
pin-project.workspace = true
|
||||
sha1.workspace = true
|
||||
sha2.workspace = true
|
||||
|
||||
futures.workspace = true
|
||||
|
|
|
@ -325,7 +325,7 @@ impl ApiHandler for S3ApiServer {
|
|||
part_number_marker: part_number_marker.map(|p| p.min(10000)),
|
||||
max_parts: max_parts.unwrap_or(1000).clamp(1, 1000),
|
||||
};
|
||||
handle_list_parts(ctx, &query).await
|
||||
handle_list_parts(ctx, req, &query).await
|
||||
}
|
||||
Endpoint::DeleteObjects {} => handle_delete_objects(ctx, req, content_sha256).await,
|
||||
Endpoint::GetBucketWebsite {} => handle_get_website(ctx).await,
|
||||
|
|
406
src/api/s3/checksum.rs
Normal file
406
src/api/s3/checksum.rs
Normal file
|
@ -0,0 +1,406 @@
|
|||
use std::convert::{TryFrom, TryInto};
|
||||
use std::hash::Hasher;
|
||||
|
||||
use base64::prelude::*;
|
||||
use crc32c::Crc32cHasher as Crc32c;
|
||||
use crc32fast::Hasher as Crc32;
|
||||
use md5::{Digest, Md5};
|
||||
use sha1::Sha1;
|
||||
use sha2::Sha256;
|
||||
|
||||
use http::{HeaderMap, HeaderName, HeaderValue};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::OkOrMessage;
|
||||
|
||||
use garage_model::s3::object_table::*;
|
||||
|
||||
use crate::s3::error::*;
|
||||
|
||||
pub const X_AMZ_CHECKSUM_ALGORITHM: HeaderName =
|
||||
HeaderName::from_static("x-amz-checksum-algorithm");
|
||||
pub const X_AMZ_CHECKSUM_MODE: HeaderName = HeaderName::from_static("x-amz-checksum-mode");
|
||||
pub const X_AMZ_CHECKSUM_CRC32: HeaderName = HeaderName::from_static("x-amz-checksum-crc32");
|
||||
pub const X_AMZ_CHECKSUM_CRC32C: HeaderName = HeaderName::from_static("x-amz-checksum-crc32c");
|
||||
pub const X_AMZ_CHECKSUM_SHA1: HeaderName = HeaderName::from_static("x-amz-checksum-sha1");
|
||||
pub const X_AMZ_CHECKSUM_SHA256: HeaderName = HeaderName::from_static("x-amz-checksum-sha256");
|
||||
|
||||
pub type Crc32Checksum = [u8; 4];
|
||||
pub type Crc32cChecksum = [u8; 4];
|
||||
pub type Md5Checksum = [u8; 16];
|
||||
pub type Sha1Checksum = [u8; 20];
|
||||
pub type Sha256Checksum = [u8; 32];
|
||||
|
||||
#[derive(Debug, Default)]
|
||||
pub(crate) struct ExpectedChecksums {
|
||||
// base64-encoded md5 (content-md5 header)
|
||||
pub md5: Option<String>,
|
||||
// content_sha256 (as a Hash / FixedBytes32)
|
||||
pub sha256: Option<Hash>,
|
||||
// extra x-amz-checksum-* header
|
||||
pub extra: Option<ChecksumValue>,
|
||||
}
|
||||
|
||||
pub(crate) struct Checksummer {
|
||||
pub crc32: Option<Crc32>,
|
||||
pub crc32c: Option<Crc32c>,
|
||||
pub md5: Option<Md5>,
|
||||
pub sha1: Option<Sha1>,
|
||||
pub sha256: Option<Sha256>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct Checksums {
|
||||
pub crc32: Option<Crc32Checksum>,
|
||||
pub crc32c: Option<Crc32cChecksum>,
|
||||
pub md5: Option<Md5Checksum>,
|
||||
pub sha1: Option<Sha1Checksum>,
|
||||
pub sha256: Option<Sha256Checksum>,
|
||||
}
|
||||
|
||||
impl Checksummer {
|
||||
pub(crate) fn init(expected: &ExpectedChecksums, require_md5: bool) -> Self {
|
||||
let mut ret = Self {
|
||||
crc32: None,
|
||||
crc32c: None,
|
||||
md5: None,
|
||||
sha1: None,
|
||||
sha256: None,
|
||||
};
|
||||
|
||||
if expected.md5.is_some() || require_md5 {
|
||||
ret.md5 = Some(Md5::new());
|
||||
}
|
||||
if expected.sha256.is_some() || matches!(&expected.extra, Some(ChecksumValue::Sha256(_))) {
|
||||
ret.sha256 = Some(Sha256::new());
|
||||
}
|
||||
if matches!(&expected.extra, Some(ChecksumValue::Crc32(_))) {
|
||||
ret.crc32 = Some(Crc32::new());
|
||||
}
|
||||
if matches!(&expected.extra, Some(ChecksumValue::Crc32c(_))) {
|
||||
ret.crc32c = Some(Crc32c::default());
|
||||
}
|
||||
if matches!(&expected.extra, Some(ChecksumValue::Sha1(_))) {
|
||||
ret.sha1 = Some(Sha1::new());
|
||||
}
|
||||
ret
|
||||
}
|
||||
|
||||
pub(crate) fn add(mut self, algo: Option<ChecksumAlgorithm>) -> Self {
|
||||
match algo {
|
||||
Some(ChecksumAlgorithm::Crc32) => {
|
||||
self.crc32 = Some(Crc32::new());
|
||||
}
|
||||
Some(ChecksumAlgorithm::Crc32c) => {
|
||||
self.crc32c = Some(Crc32c::default());
|
||||
}
|
||||
Some(ChecksumAlgorithm::Sha1) => {
|
||||
self.sha1 = Some(Sha1::new());
|
||||
}
|
||||
Some(ChecksumAlgorithm::Sha256) => {
|
||||
self.sha256 = Some(Sha256::new());
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
self
|
||||
}
|
||||
|
||||
pub(crate) fn update(&mut self, bytes: &[u8]) {
|
||||
if let Some(crc32) = &mut self.crc32 {
|
||||
crc32.update(bytes);
|
||||
}
|
||||
if let Some(crc32c) = &mut self.crc32c {
|
||||
crc32c.write(bytes);
|
||||
}
|
||||
if let Some(md5) = &mut self.md5 {
|
||||
md5.update(bytes);
|
||||
}
|
||||
if let Some(sha1) = &mut self.sha1 {
|
||||
sha1.update(bytes);
|
||||
}
|
||||
if let Some(sha256) = &mut self.sha256 {
|
||||
sha256.update(bytes);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn finalize(self) -> Checksums {
|
||||
Checksums {
|
||||
crc32: self.crc32.map(|x| u32::to_be_bytes(x.finalize())),
|
||||
crc32c: self
|
||||
.crc32c
|
||||
.map(|x| u32::to_be_bytes(u32::try_from(x.finish()).unwrap())),
|
||||
md5: self.md5.map(|x| x.finalize()[..].try_into().unwrap()),
|
||||
sha1: self.sha1.map(|x| x.finalize()[..].try_into().unwrap()),
|
||||
sha256: self.sha256.map(|x| x.finalize()[..].try_into().unwrap()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Checksums {
|
||||
pub fn verify(&self, expected: &ExpectedChecksums) -> Result<(), Error> {
|
||||
if let Some(expected_md5) = &expected.md5 {
|
||||
match self.md5 {
|
||||
Some(md5) if BASE64_STANDARD.encode(&md5) == expected_md5.trim_matches('"') => (),
|
||||
_ => {
|
||||
return Err(Error::InvalidDigest(
|
||||
"MD5 checksum verification failed (from content-md5)".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(expected_sha256) = &expected.sha256 {
|
||||
match self.sha256 {
|
||||
Some(sha256) if &sha256[..] == expected_sha256.as_slice() => (),
|
||||
_ => {
|
||||
return Err(Error::InvalidDigest(
|
||||
"SHA256 checksum verification failed (from x-amz-content-sha256)".into(),
|
||||
))
|
||||
}
|
||||
}
|
||||
}
|
||||
if let Some(extra) = expected.extra {
|
||||
let algo = extra.algorithm();
|
||||
if self.extract(Some(algo)) != Some(extra) {
|
||||
return Err(Error::InvalidDigest(format!(
|
||||
"Failed to validate checksum for algorithm {:?}",
|
||||
algo
|
||||
)));
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn extract(&self, algo: Option<ChecksumAlgorithm>) -> Option<ChecksumValue> {
|
||||
match algo {
|
||||
None => None,
|
||||
Some(ChecksumAlgorithm::Crc32) => Some(ChecksumValue::Crc32(self.crc32.unwrap())),
|
||||
Some(ChecksumAlgorithm::Crc32c) => Some(ChecksumValue::Crc32c(self.crc32c.unwrap())),
|
||||
Some(ChecksumAlgorithm::Sha1) => Some(ChecksumValue::Sha1(self.sha1.unwrap())),
|
||||
Some(ChecksumAlgorithm::Sha256) => Some(ChecksumValue::Sha256(self.sha256.unwrap())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
#[derive(Default)]
|
||||
pub(crate) struct MultipartChecksummer {
|
||||
pub md5: Md5,
|
||||
pub extra: Option<MultipartExtraChecksummer>,
|
||||
}
|
||||
|
||||
pub(crate) enum MultipartExtraChecksummer {
|
||||
Crc32(Crc32),
|
||||
Crc32c(Crc32c),
|
||||
Sha1(Sha1),
|
||||
Sha256(Sha256),
|
||||
}
|
||||
|
||||
impl MultipartChecksummer {
|
||||
pub(crate) fn init(algo: Option<ChecksumAlgorithm>) -> Self {
|
||||
Self {
|
||||
md5: Md5::new(),
|
||||
extra: match algo {
|
||||
None => None,
|
||||
Some(ChecksumAlgorithm::Crc32) => {
|
||||
Some(MultipartExtraChecksummer::Crc32(Crc32::new()))
|
||||
}
|
||||
Some(ChecksumAlgorithm::Crc32c) => {
|
||||
Some(MultipartExtraChecksummer::Crc32c(Crc32c::default()))
|
||||
}
|
||||
Some(ChecksumAlgorithm::Sha1) => Some(MultipartExtraChecksummer::Sha1(Sha1::new())),
|
||||
Some(ChecksumAlgorithm::Sha256) => {
|
||||
Some(MultipartExtraChecksummer::Sha256(Sha256::new()))
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn update(
|
||||
&mut self,
|
||||
etag: &str,
|
||||
checksum: Option<ChecksumValue>,
|
||||
) -> Result<(), Error> {
|
||||
self.md5
|
||||
.update(&hex::decode(&etag).ok_or_message("invalid etag hex")?);
|
||||
match (&mut self.extra, checksum) {
|
||||
(None, _) => (),
|
||||
(
|
||||
Some(MultipartExtraChecksummer::Crc32(ref mut crc32)),
|
||||
Some(ChecksumValue::Crc32(x)),
|
||||
) => {
|
||||
crc32.update(&x);
|
||||
}
|
||||
(
|
||||
Some(MultipartExtraChecksummer::Crc32c(ref mut crc32c)),
|
||||
Some(ChecksumValue::Crc32c(x)),
|
||||
) => {
|
||||
crc32c.write(&x);
|
||||
}
|
||||
(Some(MultipartExtraChecksummer::Sha1(ref mut sha1)), Some(ChecksumValue::Sha1(x))) => {
|
||||
sha1.update(&x);
|
||||
}
|
||||
(
|
||||
Some(MultipartExtraChecksummer::Sha256(ref mut sha256)),
|
||||
Some(ChecksumValue::Sha256(x)),
|
||||
) => {
|
||||
sha256.update(&x);
|
||||
}
|
||||
(Some(_), b) => {
|
||||
return Err(Error::internal_error(format!(
|
||||
"part checksum was not computed correctly, got: {:?}",
|
||||
b
|
||||
)))
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) fn finalize(self) -> (Md5Checksum, Option<ChecksumValue>) {
|
||||
let md5 = self.md5.finalize()[..].try_into().unwrap();
|
||||
let extra = match self.extra {
|
||||
None => None,
|
||||
Some(MultipartExtraChecksummer::Crc32(crc32)) => {
|
||||
Some(ChecksumValue::Crc32(u32::to_be_bytes(crc32.finalize())))
|
||||
}
|
||||
Some(MultipartExtraChecksummer::Crc32c(crc32c)) => Some(ChecksumValue::Crc32c(
|
||||
u32::to_be_bytes(u32::try_from(crc32c.finish()).unwrap()),
|
||||
)),
|
||||
Some(MultipartExtraChecksummer::Sha1(sha1)) => {
|
||||
Some(ChecksumValue::Sha1(sha1.finalize()[..].try_into().unwrap()))
|
||||
}
|
||||
Some(MultipartExtraChecksummer::Sha256(sha256)) => Some(ChecksumValue::Sha256(
|
||||
sha256.finalize()[..].try_into().unwrap(),
|
||||
)),
|
||||
};
|
||||
(md5, extra)
|
||||
}
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
/// Extract the value of the x-amz-checksum-algorithm header
|
||||
pub(crate) fn request_checksum_algorithm(
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
) -> Result<Option<ChecksumAlgorithm>, Error> {
|
||||
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
|
||||
None => Ok(None),
|
||||
Some(x) if x == "CRC32" => Ok(Some(ChecksumAlgorithm::Crc32)),
|
||||
Some(x) if x == "CRC32C" => Ok(Some(ChecksumAlgorithm::Crc32c)),
|
||||
Some(x) if x == "SHA1" => Ok(Some(ChecksumAlgorithm::Sha1)),
|
||||
Some(x) if x == "SHA256" => Ok(Some(ChecksumAlgorithm::Sha256)),
|
||||
_ => Err(Error::bad_request("invalid checksum algorithm")),
|
||||
}
|
||||
}
|
||||
|
||||
/// Extract the value of any of the x-amz-checksum-* headers
|
||||
pub(crate) fn request_checksum_value(
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
) -> Result<Option<ChecksumValue>, Error> {
|
||||
let mut ret = vec![];
|
||||
|
||||
if let Some(crc32_str) = headers.get(X_AMZ_CHECKSUM_CRC32) {
|
||||
let crc32 = BASE64_STANDARD
|
||||
.decode(&crc32_str)
|
||||
.ok()
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
|
||||
ret.push(ChecksumValue::Crc32(crc32))
|
||||
}
|
||||
if let Some(crc32c_str) = headers.get(X_AMZ_CHECKSUM_CRC32C) {
|
||||
let crc32c = BASE64_STANDARD
|
||||
.decode(&crc32c_str)
|
||||
.ok()
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
|
||||
ret.push(ChecksumValue::Crc32c(crc32c))
|
||||
}
|
||||
if let Some(sha1_str) = headers.get(X_AMZ_CHECKSUM_SHA1) {
|
||||
let sha1 = BASE64_STANDARD
|
||||
.decode(&sha1_str)
|
||||
.ok()
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
|
||||
ret.push(ChecksumValue::Sha1(sha1))
|
||||
}
|
||||
if let Some(sha256_str) = headers.get(X_AMZ_CHECKSUM_SHA256) {
|
||||
let sha256 = BASE64_STANDARD
|
||||
.decode(&sha256_str)
|
||||
.ok()
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
|
||||
ret.push(ChecksumValue::Sha256(sha256))
|
||||
}
|
||||
|
||||
if ret.len() > 1 {
|
||||
return Err(Error::bad_request(
|
||||
"multiple x-amz-checksum-* headers given",
|
||||
));
|
||||
}
|
||||
Ok(ret.pop())
|
||||
}
|
||||
|
||||
/// Checks for the presense of x-amz-checksum-algorithm
|
||||
/// if so extract the corrseponding x-amz-checksum-* value
|
||||
pub(crate) fn request_checksum_algorithm_value(
|
||||
headers: &HeaderMap<HeaderValue>,
|
||||
) -> Result<Option<ChecksumValue>, Error> {
|
||||
match headers.get(X_AMZ_CHECKSUM_ALGORITHM) {
|
||||
Some(x) if x == "CRC32" => {
|
||||
let crc32 = headers
|
||||
.get(X_AMZ_CHECKSUM_CRC32)
|
||||
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-crc32 header")?;
|
||||
Ok(Some(ChecksumValue::Crc32(crc32)))
|
||||
}
|
||||
Some(x) if x == "CRC32C" => {
|
||||
let crc32c = headers
|
||||
.get(X_AMZ_CHECKSUM_CRC32C)
|
||||
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-crc32c header")?;
|
||||
Ok(Some(ChecksumValue::Crc32c(crc32c)))
|
||||
}
|
||||
Some(x) if x == "SHA1" => {
|
||||
let sha1 = headers
|
||||
.get(X_AMZ_CHECKSUM_SHA1)
|
||||
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-sha1 header")?;
|
||||
Ok(Some(ChecksumValue::Sha1(sha1)))
|
||||
}
|
||||
Some(x) if x == "SHA256" => {
|
||||
let sha256 = headers
|
||||
.get(X_AMZ_CHECKSUM_SHA256)
|
||||
.and_then(|x| BASE64_STANDARD.decode(&x).ok())
|
||||
.and_then(|x| x.try_into().ok())
|
||||
.ok_or_bad_request("invalid x-amz-checksum-sha256 header")?;
|
||||
Ok(Some(ChecksumValue::Sha256(sha256)))
|
||||
}
|
||||
Some(_) => Err(Error::bad_request("invalid x-amz-checksum-algorithm")),
|
||||
None => Ok(None),
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn add_checksum_response_headers(
|
||||
checksum: &Option<ChecksumValue>,
|
||||
mut resp: http::response::Builder,
|
||||
) -> http::response::Builder {
|
||||
match checksum {
|
||||
Some(ChecksumValue::Crc32(crc32)) => {
|
||||
resp = resp.header(X_AMZ_CHECKSUM_CRC32, BASE64_STANDARD.encode(&crc32));
|
||||
}
|
||||
Some(ChecksumValue::Crc32c(crc32c)) => {
|
||||
resp = resp.header(X_AMZ_CHECKSUM_CRC32C, BASE64_STANDARD.encode(&crc32c));
|
||||
}
|
||||
Some(ChecksumValue::Sha1(sha1)) => {
|
||||
resp = resp.header(X_AMZ_CHECKSUM_SHA1, BASE64_STANDARD.encode(&sha1));
|
||||
}
|
||||
Some(ChecksumValue::Sha256(sha256)) => {
|
||||
resp = resp.header(X_AMZ_CHECKSUM_SHA256, BASE64_STANDARD.encode(&sha256));
|
||||
}
|
||||
None => (),
|
||||
}
|
||||
resp
|
||||
}
|
|
@ -2,7 +2,6 @@ use std::pin::Pin;
|
|||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
|
||||
use bytes::Bytes;
|
||||
use hyper::{Request, Response};
|
||||
|
@ -23,11 +22,12 @@ use garage_model::s3::version_table::*;
|
|||
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::{ReqBody, ResBody};
|
||||
use crate::s3::checksum::*;
|
||||
use crate::s3::encryption::EncryptionParams;
|
||||
use crate::s3::error::*;
|
||||
use crate::s3::get::full_object_byte_stream;
|
||||
use crate::s3::multipart;
|
||||
use crate::s3::put::{get_headers, save_stream, SaveStreamResult};
|
||||
use crate::s3::put::{get_headers, save_stream, ChecksumMode, SaveStreamResult};
|
||||
use crate::s3::xml::{self as s3_xml, xmlns_tag};
|
||||
|
||||
// -------- CopyObject ---------
|
||||
|
@ -39,6 +39,8 @@ pub async fn handle_copy(
|
|||
) -> Result<Response<ResBody>, Error> {
|
||||
let copy_precondition = CopyPreconditionHeaders::parse(req)?;
|
||||
|
||||
let checksum_algorithm = request_checksum_algorithm(req.headers())?;
|
||||
|
||||
let source_object = get_copy_source(&ctx, req).await?;
|
||||
|
||||
let (source_version, source_version_data, source_version_meta) =
|
||||
|
@ -48,7 +50,7 @@ pub async fn handle_copy(
|
|||
copy_precondition.check(source_version, &source_version_meta.etag)?;
|
||||
|
||||
// Determine encryption parameters
|
||||
let (source_encryption, source_object_headers) =
|
||||
let (source_encryption, source_object_meta_inner) =
|
||||
EncryptionParams::check_decrypt_for_copy_source(
|
||||
&ctx.garage,
|
||||
req.headers(),
|
||||
|
@ -56,23 +58,54 @@ pub async fn handle_copy(
|
|||
)?;
|
||||
let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
|
||||
|
||||
// Determine headers of destination object
|
||||
let dest_object_headers = match req.headers().get("x-amz-metadata-directive") {
|
||||
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
|
||||
get_headers(req.headers())?
|
||||
}
|
||||
_ => source_object_headers.into_owned(),
|
||||
// Extract source checksum info before source_object_meta_inner is consumed
|
||||
let source_checksum = source_object_meta_inner.checksum;
|
||||
let source_checksum_algorithm = source_checksum.map(|x| x.algorithm());
|
||||
|
||||
// If source object has a checksum, the destination object must as well.
|
||||
// The x-amz-checksum-algorihtm header allows to change that algorithm,
|
||||
// but if it is absent, we must use the same as before
|
||||
let checksum_algorithm = checksum_algorithm.or(source_checksum_algorithm);
|
||||
|
||||
// Determine metadata of destination object
|
||||
let was_multipart = source_version_meta.etag.contains('-');
|
||||
let dest_object_meta = ObjectVersionMetaInner {
|
||||
headers: match req.headers().get("x-amz-metadata-directive") {
|
||||
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
|
||||
get_headers(req.headers())?
|
||||
}
|
||||
_ => source_object_meta_inner.into_owned().headers,
|
||||
},
|
||||
checksum: source_checksum,
|
||||
};
|
||||
|
||||
// Do actual object copying
|
||||
let res = if EncryptionParams::is_same(&source_encryption, &dest_encryption) {
|
||||
// If source and dest are both unencrypted, or if the encryption keys
|
||||
// are the same, we can just copy the metadata and link blocks of the
|
||||
//
|
||||
// In any of the following scenarios, we need to read the whole object
|
||||
// data and re-write it again:
|
||||
//
|
||||
// - the data needs to be decrypted or encrypted
|
||||
// - the requested checksum algorithm requires us to recompute a checksum
|
||||
// - the original object was a multipart upload and a checksum algorithm
|
||||
// is defined (AWS specifies that in this case, we must recompute the
|
||||
// checksum from scratch as if this was a single big object and not
|
||||
// a multipart object, as the checksums are not computed in the same way)
|
||||
//
|
||||
// In other cases, we can just copy the metadata and reference the same blocks.
|
||||
//
|
||||
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
|
||||
|
||||
let must_recopy = !EncryptionParams::is_same(&source_encryption, &dest_encryption)
|
||||
|| source_checksum_algorithm != checksum_algorithm
|
||||
|| (was_multipart && checksum_algorithm.is_some());
|
||||
|
||||
let res = if !must_recopy {
|
||||
// In most cases, we can just copy the metadata and link blocks of the
|
||||
// old object from the new object.
|
||||
handle_copy_metaonly(
|
||||
ctx,
|
||||
dest_key,
|
||||
dest_object_headers,
|
||||
dest_object_meta,
|
||||
dest_encryption,
|
||||
source_version,
|
||||
source_version_data,
|
||||
|
@ -80,16 +113,27 @@ pub async fn handle_copy(
|
|||
)
|
||||
.await?
|
||||
} else {
|
||||
let expected_checksum = ExpectedChecksums {
|
||||
md5: None,
|
||||
sha256: None,
|
||||
extra: source_checksum,
|
||||
};
|
||||
let checksum_mode = if was_multipart || source_checksum_algorithm != checksum_algorithm {
|
||||
ChecksumMode::Calculate(checksum_algorithm)
|
||||
} else {
|
||||
ChecksumMode::Verify(&expected_checksum)
|
||||
};
|
||||
// If source and dest encryption use different keys,
|
||||
// we must decrypt content and re-encrypt, so rewrite all data blocks.
|
||||
handle_copy_reencrypt(
|
||||
ctx,
|
||||
dest_key,
|
||||
dest_object_headers,
|
||||
dest_object_meta,
|
||||
dest_encryption,
|
||||
source_version,
|
||||
source_version_data,
|
||||
source_encryption,
|
||||
checksum_mode,
|
||||
)
|
||||
.await?
|
||||
};
|
||||
|
@ -115,7 +159,7 @@ pub async fn handle_copy(
|
|||
async fn handle_copy_metaonly(
|
||||
ctx: ReqCtx,
|
||||
dest_key: &str,
|
||||
dest_object_headers: ObjectVersionHeaders,
|
||||
dest_object_meta: ObjectVersionMetaInner,
|
||||
dest_encryption: EncryptionParams,
|
||||
source_version: &ObjectVersion,
|
||||
source_version_data: &ObjectVersionData,
|
||||
|
@ -132,7 +176,7 @@ async fn handle_copy_metaonly(
|
|||
let new_timestamp = now_msec();
|
||||
|
||||
let new_meta = ObjectVersionMeta {
|
||||
encryption: dest_encryption.encrypt_headers(dest_object_headers)?,
|
||||
encryption: dest_encryption.encrypt_meta(dest_object_meta)?,
|
||||
size: source_version_meta.size,
|
||||
etag: source_version_meta.etag.clone(),
|
||||
};
|
||||
|
@ -180,6 +224,7 @@ async fn handle_copy_metaonly(
|
|||
timestamp: new_timestamp,
|
||||
state: ObjectVersionState::Uploading {
|
||||
encryption: new_meta.encryption.clone(),
|
||||
checksum_algorithm: None,
|
||||
multipart: false,
|
||||
},
|
||||
};
|
||||
|
@ -252,11 +297,12 @@ async fn handle_copy_metaonly(
|
|||
async fn handle_copy_reencrypt(
|
||||
ctx: ReqCtx,
|
||||
dest_key: &str,
|
||||
dest_object_headers: ObjectVersionHeaders,
|
||||
dest_object_meta: ObjectVersionMetaInner,
|
||||
dest_encryption: EncryptionParams,
|
||||
source_version: &ObjectVersion,
|
||||
source_version_data: &ObjectVersionData,
|
||||
source_encryption: EncryptionParams,
|
||||
checksum_mode: ChecksumMode<'_>,
|
||||
) -> Result<SaveStreamResult, Error> {
|
||||
// basically we will read the source data (decrypt if necessary)
|
||||
// and save that in a new object (encrypt if necessary),
|
||||
|
@ -270,12 +316,11 @@ async fn handle_copy_reencrypt(
|
|||
|
||||
save_stream(
|
||||
&ctx,
|
||||
dest_object_headers,
|
||||
dest_object_meta,
|
||||
dest_encryption,
|
||||
source_stream.map_err(|e| Error::from(GarageError::from(e))),
|
||||
&dest_key.to_string(),
|
||||
None,
|
||||
None,
|
||||
checksum_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -313,8 +358,12 @@ pub async fn handle_upload_part_copy(
|
|||
req.headers(),
|
||||
&source_version_meta.encryption,
|
||||
)?;
|
||||
let dest_object_encryption = match dest_version.state {
|
||||
ObjectVersionState::Uploading { encryption, .. } => encryption,
|
||||
let (dest_object_encryption, dest_object_checksum_algorithm) = match dest_version.state {
|
||||
ObjectVersionState::Uploading {
|
||||
encryption,
|
||||
checksum_algorithm,
|
||||
..
|
||||
} => (encryption, checksum_algorithm),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let (dest_encryption, _) =
|
||||
|
@ -412,7 +461,9 @@ pub async fn handle_upload_part_copy(
|
|||
dest_mpu_part_key,
|
||||
MpuPart {
|
||||
version: dest_version_id,
|
||||
// These are all filled in later (bottom of this function)
|
||||
etag: None,
|
||||
checksum: None,
|
||||
size: None,
|
||||
},
|
||||
);
|
||||
|
@ -429,7 +480,8 @@ pub async fn handle_upload_part_copy(
|
|||
garage.version_table.insert(&dest_version).await?;
|
||||
|
||||
// Now, actually copy the blocks
|
||||
let mut md5hasher = Md5::new();
|
||||
let mut checksummer = Checksummer::init(&Default::default(), !dest_encryption.is_encrypted())
|
||||
.add(dest_object_checksum_algorithm);
|
||||
|
||||
// First, create a stream that is able to read the source blocks
|
||||
// and extract the subrange if necessary.
|
||||
|
@ -495,18 +547,24 @@ pub async fn handle_upload_part_copy(
|
|||
}
|
||||
|
||||
let data_len = data.len() as u64;
|
||||
md5hasher.update(&data[..]);
|
||||
|
||||
let (final_data, must_upload, final_hash) = match existing_block_hash {
|
||||
Some(hash) if same_encryption => (data, false, hash),
|
||||
_ => tokio::task::spawn_blocking(move || {
|
||||
let data_enc = dest_encryption.encrypt_block(data)?;
|
||||
let hash = blake2sum(&data_enc);
|
||||
Ok::<_, Error>((data_enc, true, hash))
|
||||
let (checksummer_updated, (data_to_upload, final_hash)) =
|
||||
tokio::task::spawn_blocking(move || {
|
||||
checksummer.update(&data[..]);
|
||||
|
||||
let tup = match existing_block_hash {
|
||||
Some(hash) if same_encryption => (None, hash),
|
||||
_ => {
|
||||
let data_enc = dest_encryption.encrypt_block(data)?;
|
||||
let hash = blake2sum(&data_enc);
|
||||
(Some(data_enc), hash)
|
||||
}
|
||||
};
|
||||
Ok::<_, Error>((checksummer, tup))
|
||||
})
|
||||
.await
|
||||
.unwrap()?,
|
||||
};
|
||||
.unwrap()?;
|
||||
checksummer = checksummer_updated;
|
||||
|
||||
dest_version.blocks.clear();
|
||||
dest_version.blocks.put(
|
||||
|
@ -531,7 +589,7 @@ pub async fn handle_upload_part_copy(
|
|||
// Thing 1: if the block is not exactly a block that existed before,
|
||||
// we need to insert that data as a new block.
|
||||
async {
|
||||
if must_upload {
|
||||
if let Some(final_data) = data_to_upload {
|
||||
garage
|
||||
.block_manager
|
||||
.rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
|
||||
|
@ -552,8 +610,9 @@ pub async fn handle_upload_part_copy(
|
|||
|
||||
assert_eq!(current_offset, source_range.length);
|
||||
|
||||
let data_md5sum = md5hasher.finalize();
|
||||
let etag = dest_encryption.etag_from_md5(&data_md5sum);
|
||||
let checksums = checksummer.finalize();
|
||||
let etag = dest_encryption.etag_from_md5(&checksums.md5);
|
||||
let checksum = checksums.extract(dest_object_checksum_algorithm);
|
||||
|
||||
// Put the part's ETag in the Versiontable
|
||||
dest_mpu.parts.put(
|
||||
|
@ -561,6 +620,7 @@ pub async fn handle_upload_part_copy(
|
|||
MpuPart {
|
||||
version: dest_version_id,
|
||||
etag: Some(etag.clone()),
|
||||
checksum,
|
||||
size: Some(current_offset),
|
||||
},
|
||||
);
|
||||
|
|
|
@ -26,9 +26,10 @@ use garage_util::error::Error as GarageError;
|
|||
use garage_util::migrate::Migrate;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders};
|
||||
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionMetaInner};
|
||||
|
||||
use crate::common_error::*;
|
||||
use crate::s3::checksum::Md5Checksum;
|
||||
use crate::s3::error::Error;
|
||||
|
||||
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
|
||||
|
@ -124,7 +125,7 @@ impl EncryptionParams {
|
|||
garage: &Garage,
|
||||
headers: &HeaderMap,
|
||||
obj_enc: &'a ObjectVersionEncryption,
|
||||
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
|
||||
) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> {
|
||||
let key = parse_request_headers(
|
||||
headers,
|
||||
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
|
||||
|
@ -138,7 +139,7 @@ impl EncryptionParams {
|
|||
garage: &Garage,
|
||||
headers: &HeaderMap,
|
||||
obj_enc: &'a ObjectVersionEncryption,
|
||||
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
|
||||
) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> {
|
||||
let key = parse_request_headers(
|
||||
headers,
|
||||
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
|
||||
|
@ -152,14 +153,11 @@ impl EncryptionParams {
|
|||
garage: &Garage,
|
||||
key: Option<(Key<Aes256Gcm>, Md5Output)>,
|
||||
obj_enc: &'a ObjectVersionEncryption,
|
||||
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
|
||||
) -> Result<(Self, Cow<'a, ObjectVersionMetaInner>), Error> {
|
||||
match (key, &obj_enc) {
|
||||
(
|
||||
Some((client_key, client_key_md5)),
|
||||
ObjectVersionEncryption::SseC {
|
||||
headers,
|
||||
compressed,
|
||||
},
|
||||
ObjectVersionEncryption::SseC { inner, compressed },
|
||||
) => {
|
||||
let enc = Self::SseC {
|
||||
client_key,
|
||||
|
@ -170,13 +168,13 @@ impl EncryptionParams {
|
|||
None
|
||||
},
|
||||
};
|
||||
let plaintext = enc.decrypt_blob(&headers)?;
|
||||
let headers = ObjectVersionHeaders::decode(&plaintext)
|
||||
.ok_or_internal_error("Could not decode encrypted headers")?;
|
||||
Ok((enc, Cow::Owned(headers)))
|
||||
let plaintext = enc.decrypt_blob(&inner)?;
|
||||
let inner = ObjectVersionMetaInner::decode(&plaintext)
|
||||
.ok_or_internal_error("Could not decode encrypted metadata")?;
|
||||
Ok((enc, Cow::Owned(inner)))
|
||||
}
|
||||
(None, ObjectVersionEncryption::Plaintext { headers }) => {
|
||||
Ok((Self::Plaintext, Cow::Borrowed(headers)))
|
||||
(None, ObjectVersionEncryption::Plaintext { inner }) => {
|
||||
Ok((Self::Plaintext, Cow::Borrowed(inner)))
|
||||
}
|
||||
(_, ObjectVersionEncryption::SseC { .. }) => {
|
||||
Err(Error::bad_request("Object is encrypted"))
|
||||
|
@ -188,29 +186,31 @@ impl EncryptionParams {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn encrypt_headers(
|
||||
pub fn encrypt_meta(
|
||||
&self,
|
||||
h: ObjectVersionHeaders,
|
||||
meta: ObjectVersionMetaInner,
|
||||
) -> Result<ObjectVersionEncryption, Error> {
|
||||
match self {
|
||||
Self::SseC {
|
||||
compression_level, ..
|
||||
} => {
|
||||
let plaintext = h.encode().map_err(GarageError::from)?;
|
||||
let plaintext = meta.encode().map_err(GarageError::from)?;
|
||||
let ciphertext = self.encrypt_blob(&plaintext)?;
|
||||
Ok(ObjectVersionEncryption::SseC {
|
||||
headers: ciphertext.into_owned(),
|
||||
inner: ciphertext.into_owned(),
|
||||
compressed: compression_level.is_some(),
|
||||
})
|
||||
}
|
||||
Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }),
|
||||
Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { inner: meta }),
|
||||
}
|
||||
}
|
||||
|
||||
// ---- generating object Etag values ----
|
||||
pub fn etag_from_md5(&self, md5sum: &[u8]) -> String {
|
||||
pub fn etag_from_md5(&self, md5sum: &Option<Md5Checksum>) -> String {
|
||||
match self {
|
||||
Self::Plaintext => hex::encode(md5sum),
|
||||
Self::Plaintext => md5sum
|
||||
.map(|x| hex::encode(&x[..]))
|
||||
.expect("md5 digest should have been computed"),
|
||||
Self::SseC { .. } => {
|
||||
// AWS specifies that for encrypted objects, the Etag is not
|
||||
// the md5sum of the data, but doesn't say what it is.
|
||||
|
@ -224,7 +224,7 @@ impl EncryptionParams {
|
|||
|
||||
// ---- generic function for encrypting / decrypting blobs ----
|
||||
// Prepends a randomly-generated nonce to the encrypted value.
|
||||
// This is used for encrypting object headers and inlined data for small objects.
|
||||
// This is used for encrypting object metadata and inlined data for small objects.
|
||||
// This does not compress anything.
|
||||
|
||||
pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
|
||||
|
|
|
@ -69,6 +69,10 @@ pub enum Error {
|
|||
#[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)]
|
||||
InvalidEncryptionAlgorithm(String),
|
||||
|
||||
/// The client sent invalid XML data
|
||||
#[error(display = "Invalid digest: {}", _0)]
|
||||
InvalidDigest(String),
|
||||
|
||||
/// The client sent a request for an action not supported by garage
|
||||
#[error(display = "Unimplemented action: {}", _0)]
|
||||
NotImplemented(String),
|
||||
|
@ -129,6 +133,7 @@ impl Error {
|
|||
Error::NotImplemented(_) => "NotImplemented",
|
||||
Error::InvalidXml(_) => "MalformedXML",
|
||||
Error::InvalidRange(_) => "InvalidRange",
|
||||
Error::InvalidDigest(_) => "InvalidDigest",
|
||||
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
|
||||
Error::InvalidEncryptionAlgorithm(_) => "InvalidEncryptionAlgorithmError",
|
||||
}
|
||||
|
@ -148,6 +153,7 @@ impl ApiError for Error {
|
|||
| Error::InvalidPart
|
||||
| Error::InvalidPartOrder
|
||||
| Error::EntityTooSmall
|
||||
| Error::InvalidDigest(_)
|
||||
| Error::InvalidEncryptionAlgorithm(_)
|
||||
| Error::InvalidXml(_)
|
||||
| Error::InvalidUtf8Str(_)
|
||||
|
|
|
@ -27,6 +27,7 @@ use garage_model::s3::version_table::*;
|
|||
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::ResBody;
|
||||
use crate::s3::checksum::{add_checksum_response_headers, X_AMZ_CHECKSUM_MODE};
|
||||
use crate::s3::encryption::EncryptionParams;
|
||||
use crate::s3::error::*;
|
||||
|
||||
|
@ -45,8 +46,9 @@ pub struct GetObjectOverrides {
|
|||
fn object_headers(
|
||||
version: &ObjectVersion,
|
||||
version_meta: &ObjectVersionMeta,
|
||||
headers: &ObjectVersionHeaders,
|
||||
meta_inner: &ObjectVersionMetaInner,
|
||||
encryption: EncryptionParams,
|
||||
checksum_mode: ChecksumMode,
|
||||
) -> http::response::Builder {
|
||||
debug!("Version meta: {:?}", version_meta);
|
||||
|
||||
|
@ -65,7 +67,7 @@ fn object_headers(
|
|||
// have the same name (ignoring case) into a comma-delimited list.
|
||||
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
|
||||
let mut headers_by_name = BTreeMap::new();
|
||||
for (name, value) in headers.0.iter() {
|
||||
for (name, value) in meta_inner.headers.iter() {
|
||||
match headers_by_name.get_mut(name) {
|
||||
None => {
|
||||
headers_by_name.insert(name, vec![value.as_str()]);
|
||||
|
@ -80,6 +82,10 @@ fn object_headers(
|
|||
resp = resp.header(name, values.join(","));
|
||||
}
|
||||
|
||||
if checksum_mode.enabled {
|
||||
resp = add_checksum_response_headers(&meta_inner.checksum, resp);
|
||||
}
|
||||
|
||||
encryption.add_response_headers(&mut resp);
|
||||
|
||||
resp
|
||||
|
@ -199,6 +205,8 @@ pub async fn handle_head_without_ctx(
|
|||
let (encryption, headers) =
|
||||
EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?;
|
||||
|
||||
let checksum_mode = checksum_mode(&req);
|
||||
|
||||
if let Some(pn) = part_number {
|
||||
match version_data {
|
||||
ObjectVersionData::Inline(_, _) => {
|
||||
|
@ -206,17 +214,21 @@ pub async fn handle_head_without_ctx(
|
|||
return Err(Error::InvalidPart);
|
||||
}
|
||||
let bytes_len = version_meta.size;
|
||||
Ok(
|
||||
object_headers(object_version, version_meta, &headers, encryption)
|
||||
.header(CONTENT_LENGTH, format!("{}", bytes_len))
|
||||
.header(
|
||||
CONTENT_RANGE,
|
||||
format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
|
||||
)
|
||||
.header(X_AMZ_MP_PARTS_COUNT, "1")
|
||||
.status(StatusCode::PARTIAL_CONTENT)
|
||||
.body(empty_body())?,
|
||||
Ok(object_headers(
|
||||
object_version,
|
||||
version_meta,
|
||||
&headers,
|
||||
encryption,
|
||||
checksum_mode,
|
||||
)
|
||||
.header(CONTENT_LENGTH, format!("{}", bytes_len))
|
||||
.header(
|
||||
CONTENT_RANGE,
|
||||
format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
|
||||
)
|
||||
.header(X_AMZ_MP_PARTS_COUNT, "1")
|
||||
.status(StatusCode::PARTIAL_CONTENT)
|
||||
.body(empty_body())?)
|
||||
}
|
||||
ObjectVersionData::FirstBlock(_, _) => {
|
||||
let version = garage
|
||||
|
@ -228,32 +240,40 @@ pub async fn handle_head_without_ctx(
|
|||
let (part_offset, part_end) =
|
||||
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
|
||||
|
||||
Ok(
|
||||
object_headers(object_version, version_meta, &headers, encryption)
|
||||
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
|
||||
.header(
|
||||
CONTENT_RANGE,
|
||||
format!(
|
||||
"bytes {}-{}/{}",
|
||||
part_offset,
|
||||
part_end - 1,
|
||||
version_meta.size
|
||||
),
|
||||
)
|
||||
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
|
||||
.status(StatusCode::PARTIAL_CONTENT)
|
||||
.body(empty_body())?,
|
||||
Ok(object_headers(
|
||||
object_version,
|
||||
version_meta,
|
||||
&headers,
|
||||
encryption,
|
||||
checksum_mode,
|
||||
)
|
||||
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
|
||||
.header(
|
||||
CONTENT_RANGE,
|
||||
format!(
|
||||
"bytes {}-{}/{}",
|
||||
part_offset,
|
||||
part_end - 1,
|
||||
version_meta.size
|
||||
),
|
||||
)
|
||||
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
|
||||
.status(StatusCode::PARTIAL_CONTENT)
|
||||
.body(empty_body())?)
|
||||
}
|
||||
_ => unreachable!(),
|
||||
}
|
||||
} else {
|
||||
Ok(
|
||||
object_headers(object_version, version_meta, &headers, encryption)
|
||||
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
||||
.status(StatusCode::OK)
|
||||
.body(empty_body())?,
|
||||
Ok(object_headers(
|
||||
object_version,
|
||||
version_meta,
|
||||
&headers,
|
||||
encryption,
|
||||
checksum_mode,
|
||||
)
|
||||
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
||||
.status(StatusCode::OK)
|
||||
.body(empty_body())?)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -307,12 +327,24 @@ pub async fn handle_get_without_ctx(
|
|||
let (enc, headers) =
|
||||
EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?;
|
||||
|
||||
let checksum_mode = checksum_mode(&req);
|
||||
|
||||
match (part_number, parse_range_header(req, last_v_meta.size)?) {
|
||||
(Some(_), Some(_)) => Err(Error::bad_request(
|
||||
"Cannot specify both partNumber and Range header",
|
||||
)),
|
||||
(Some(pn), None) => {
|
||||
handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await
|
||||
handle_get_part(
|
||||
garage,
|
||||
last_v,
|
||||
last_v_data,
|
||||
last_v_meta,
|
||||
enc,
|
||||
&headers,
|
||||
pn,
|
||||
checksum_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
(None, Some(range)) => {
|
||||
handle_get_range(
|
||||
|
@ -324,6 +356,7 @@ pub async fn handle_get_without_ctx(
|
|||
&headers,
|
||||
range.start,
|
||||
range.start + range.length,
|
||||
checksum_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -336,6 +369,7 @@ pub async fn handle_get_without_ctx(
|
|||
enc,
|
||||
&headers,
|
||||
overrides,
|
||||
checksum_mode,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
@ -348,12 +382,19 @@ async fn handle_get_full(
|
|||
version_data: &ObjectVersionData,
|
||||
version_meta: &ObjectVersionMeta,
|
||||
encryption: EncryptionParams,
|
||||
headers: &ObjectVersionHeaders,
|
||||
meta_inner: &ObjectVersionMetaInner,
|
||||
overrides: GetObjectOverrides,
|
||||
checksum_mode: ChecksumMode,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let mut resp_builder = object_headers(version, version_meta, &headers, encryption)
|
||||
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
||||
.status(StatusCode::OK);
|
||||
let mut resp_builder = object_headers(
|
||||
version,
|
||||
version_meta,
|
||||
&meta_inner,
|
||||
encryption,
|
||||
checksum_mode,
|
||||
)
|
||||
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
|
||||
.status(StatusCode::OK);
|
||||
getobject_override_headers(overrides, &mut resp_builder)?;
|
||||
|
||||
let stream = full_object_byte_stream(garage, version, version_data, encryption);
|
||||
|
@ -432,14 +473,15 @@ async fn handle_get_range(
|
|||
version_data: &ObjectVersionData,
|
||||
version_meta: &ObjectVersionMeta,
|
||||
encryption: EncryptionParams,
|
||||
headers: &ObjectVersionHeaders,
|
||||
meta_inner: &ObjectVersionMetaInner,
|
||||
begin: u64,
|
||||
end: u64,
|
||||
checksum_mode: ChecksumMode,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
// Here we do not use getobject_override_headers because we don't
|
||||
// want to add any overridden headers (those should not be added
|
||||
// when returning PARTIAL_CONTENT)
|
||||
let resp_builder = object_headers(version, version_meta, headers, encryption)
|
||||
let resp_builder = object_headers(version, version_meta, meta_inner, encryption, checksum_mode)
|
||||
.header(CONTENT_LENGTH, format!("{}", end - begin))
|
||||
.header(
|
||||
CONTENT_RANGE,
|
||||
|
@ -480,12 +522,19 @@ async fn handle_get_part(
|
|||
version_data: &ObjectVersionData,
|
||||
version_meta: &ObjectVersionMeta,
|
||||
encryption: EncryptionParams,
|
||||
headers: &ObjectVersionHeaders,
|
||||
meta_inner: &ObjectVersionMetaInner,
|
||||
part_number: u64,
|
||||
checksum_mode: ChecksumMode,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
// Same as for get_range, no getobject_override_headers
|
||||
let resp_builder = object_headers(object_version, version_meta, headers, encryption)
|
||||
.status(StatusCode::PARTIAL_CONTENT);
|
||||
let resp_builder = object_headers(
|
||||
object_version,
|
||||
version_meta,
|
||||
meta_inner,
|
||||
encryption,
|
||||
checksum_mode,
|
||||
)
|
||||
.status(StatusCode::PARTIAL_CONTENT);
|
||||
|
||||
match version_data {
|
||||
ObjectVersionData::Inline(_, bytes) => {
|
||||
|
@ -567,6 +616,20 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
|
|||
None
|
||||
}
|
||||
|
||||
struct ChecksumMode {
|
||||
enabled: bool,
|
||||
}
|
||||
|
||||
fn checksum_mode(req: &Request<impl Body>) -> ChecksumMode {
|
||||
ChecksumMode {
|
||||
enabled: req
|
||||
.headers()
|
||||
.get(X_AMZ_CHECKSUM_MODE)
|
||||
.map(|x| x == "ENABLED")
|
||||
.unwrap_or(false),
|
||||
}
|
||||
}
|
||||
|
||||
fn body_from_blocks_range(
|
||||
garage: Arc<Garage>,
|
||||
encryption: EncryptionParams,
|
||||
|
|
|
@ -2,7 +2,7 @@ use std::collections::{BTreeMap, BTreeSet};
|
|||
use std::iter::{Iterator, Peekable};
|
||||
|
||||
use base64::prelude::*;
|
||||
use hyper::Response;
|
||||
use hyper::{Request, Response};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
@ -15,7 +15,8 @@ use garage_table::EnumerationOrder;
|
|||
|
||||
use crate::encoding::*;
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::ResBody;
|
||||
use crate::s3::api_server::{ReqBody, ResBody};
|
||||
use crate::s3::encryption::EncryptionParams;
|
||||
use crate::s3::error::*;
|
||||
use crate::s3::multipart as s3_multipart;
|
||||
use crate::s3::xml as s3_xml;
|
||||
|
@ -271,13 +272,21 @@ pub async fn handle_list_multipart_upload(
|
|||
|
||||
pub async fn handle_list_parts(
|
||||
ctx: ReqCtx,
|
||||
req: Request<ReqBody>,
|
||||
query: &ListPartsQuery,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
debug!("ListParts {:?}", query);
|
||||
|
||||
let upload_id = s3_multipart::decode_upload_id(&query.upload_id)?;
|
||||
|
||||
let (_, _, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
|
||||
let (_, object_version, mpu) = s3_multipart::get_upload(&ctx, &query.key, &upload_id).await?;
|
||||
|
||||
let object_encryption = match object_version.state {
|
||||
ObjectVersionState::Uploading { encryption, .. } => encryption,
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let encryption_res =
|
||||
EncryptionParams::check_decrypt(&ctx.garage, req.headers(), &object_encryption);
|
||||
|
||||
let (info, next) = fetch_part_info(query, &mpu)?;
|
||||
|
||||
|
@ -296,11 +305,40 @@ pub async fn handle_list_parts(
|
|||
is_truncated: s3_xml::Value(format!("{}", next.is_some())),
|
||||
parts: info
|
||||
.iter()
|
||||
.map(|part| s3_xml::PartItem {
|
||||
etag: s3_xml::Value(format!("\"{}\"", part.etag)),
|
||||
last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
|
||||
part_number: s3_xml::IntValue(part.part_number as i64),
|
||||
size: s3_xml::IntValue(part.size as i64),
|
||||
.map(|part| {
|
||||
// hide checksum if object is encrypted and the decryption
|
||||
// keys are not provided
|
||||
let checksum = part.checksum.filter(|_| encryption_res.is_ok());
|
||||
s3_xml::PartItem {
|
||||
etag: s3_xml::Value(format!("\"{}\"", part.etag)),
|
||||
last_modified: s3_xml::Value(msec_to_rfc3339(part.timestamp)),
|
||||
part_number: s3_xml::IntValue(part.part_number as i64),
|
||||
size: s3_xml::IntValue(part.size as i64),
|
||||
checksum_crc32: match &checksum {
|
||||
Some(ChecksumValue::Crc32(x)) => {
|
||||
Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
checksum_crc32c: match &checksum {
|
||||
Some(ChecksumValue::Crc32c(x)) => {
|
||||
Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
checksum_sha1: match &checksum {
|
||||
Some(ChecksumValue::Sha1(x)) => {
|
||||
Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
checksum_sha256: match &checksum {
|
||||
Some(ChecksumValue::Sha256(x)) => {
|
||||
Some(s3_xml::Value(BASE64_STANDARD.encode(&x)))
|
||||
}
|
||||
_ => None,
|
||||
},
|
||||
}
|
||||
})
|
||||
.collect(),
|
||||
|
||||
|
@ -346,6 +384,7 @@ struct PartInfo<'a> {
|
|||
timestamp: u64,
|
||||
part_number: u64,
|
||||
size: u64,
|
||||
checksum: Option<ChecksumValue>,
|
||||
}
|
||||
|
||||
enum ExtractionResult {
|
||||
|
@ -486,6 +525,7 @@ fn fetch_part_info<'a>(
|
|||
timestamp: pk.timestamp,
|
||||
etag,
|
||||
size,
|
||||
checksum: p.checksum,
|
||||
};
|
||||
match parts.last_mut() {
|
||||
Some(lastpart) if lastpart.part_number == pk.part_number => {
|
||||
|
@ -945,8 +985,12 @@ mod tests {
|
|||
state: ObjectVersionState::Uploading {
|
||||
multipart: true,
|
||||
encryption: ObjectVersionEncryption::Plaintext {
|
||||
headers: ObjectVersionHeaders(vec![]),
|
||||
inner: ObjectVersionMetaInner {
|
||||
headers: vec![],
|
||||
checksum: None,
|
||||
},
|
||||
},
|
||||
checksum_algorithm: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
@ -1135,6 +1179,7 @@ mod tests {
|
|||
version: uuid,
|
||||
size: Some(3),
|
||||
etag: Some("etag1".into()),
|
||||
checksum: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
|
@ -1146,6 +1191,7 @@ mod tests {
|
|||
version: uuid,
|
||||
size: None,
|
||||
etag: None,
|
||||
checksum: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
|
@ -1157,6 +1203,7 @@ mod tests {
|
|||
version: uuid,
|
||||
size: Some(10),
|
||||
etag: Some("etag2".into()),
|
||||
checksum: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
|
@ -1168,6 +1215,7 @@ mod tests {
|
|||
version: uuid,
|
||||
size: Some(7),
|
||||
etag: Some("etag3".into()),
|
||||
checksum: None,
|
||||
},
|
||||
),
|
||||
(
|
||||
|
@ -1179,6 +1227,7 @@ mod tests {
|
|||
version: uuid,
|
||||
size: Some(5),
|
||||
etag: Some("etag4".into()),
|
||||
checksum: None,
|
||||
},
|
||||
),
|
||||
];
|
||||
|
@ -1217,12 +1266,14 @@ mod tests {
|
|||
etag: "etag1",
|
||||
timestamp: TS,
|
||||
part_number: 1,
|
||||
size: 3
|
||||
size: 3,
|
||||
checksum: None,
|
||||
},
|
||||
PartInfo {
|
||||
etag: "etag2",
|
||||
timestamp: TS,
|
||||
part_number: 3,
|
||||
checksum: None,
|
||||
size: 10
|
||||
},
|
||||
]
|
||||
|
@ -1238,12 +1289,14 @@ mod tests {
|
|||
PartInfo {
|
||||
etag: "etag3",
|
||||
timestamp: TS,
|
||||
checksum: None,
|
||||
part_number: 5,
|
||||
size: 7
|
||||
},
|
||||
PartInfo {
|
||||
etag: "etag4",
|
||||
timestamp: TS,
|
||||
checksum: None,
|
||||
part_number: 8,
|
||||
size: 5
|
||||
},
|
||||
|
@ -1267,24 +1320,28 @@ mod tests {
|
|||
PartInfo {
|
||||
etag: "etag1",
|
||||
timestamp: TS,
|
||||
checksum: None,
|
||||
part_number: 1,
|
||||
size: 3
|
||||
},
|
||||
PartInfo {
|
||||
etag: "etag2",
|
||||
timestamp: TS,
|
||||
checksum: None,
|
||||
part_number: 3,
|
||||
size: 10
|
||||
},
|
||||
PartInfo {
|
||||
etag: "etag3",
|
||||
timestamp: TS,
|
||||
checksum: None,
|
||||
part_number: 5,
|
||||
size: 7
|
||||
},
|
||||
PartInfo {
|
||||
etag: "etag4",
|
||||
timestamp: TS,
|
||||
checksum: None,
|
||||
part_number: 8,
|
||||
size: 5
|
||||
},
|
||||
|
|
|
@ -13,6 +13,7 @@ mod post_object;
|
|||
mod put;
|
||||
mod website;
|
||||
|
||||
mod checksum;
|
||||
mod encryption;
|
||||
mod router;
|
||||
pub mod xml;
|
||||
|
|
|
@ -1,9 +1,10 @@
|
|||
use std::collections::HashMap;
|
||||
use std::convert::TryInto;
|
||||
use std::sync::Arc;
|
||||
|
||||
use base64::prelude::*;
|
||||
use futures::prelude::*;
|
||||
use hyper::{Request, Response};
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
|
||||
use garage_table::*;
|
||||
use garage_util::data::*;
|
||||
|
@ -16,6 +17,7 @@ use garage_model::s3::version_table::*;
|
|||
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::{ReqBody, ResBody};
|
||||
use crate::s3::checksum::*;
|
||||
use crate::s3::encryption::EncryptionParams;
|
||||
use crate::s3::error::*;
|
||||
use crate::s3::put::*;
|
||||
|
@ -41,10 +43,16 @@ pub async fn handle_create_multipart_upload(
|
|||
let timestamp = next_timestamp(existing_object.as_ref());
|
||||
|
||||
let headers = get_headers(req.headers())?;
|
||||
let meta = ObjectVersionMetaInner {
|
||||
headers,
|
||||
checksum: None,
|
||||
};
|
||||
|
||||
// Determine whether object should be encrypted, and if so the key
|
||||
let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
|
||||
let object_encryption = encryption.encrypt_headers(headers)?;
|
||||
let object_encryption = encryption.encrypt_meta(meta)?;
|
||||
|
||||
let checksum_algorithm = request_checksum_algorithm(req.headers())?;
|
||||
|
||||
// Create object in object table
|
||||
let object_version = ObjectVersion {
|
||||
|
@ -53,6 +61,7 @@ pub async fn handle_create_multipart_upload(
|
|||
state: ObjectVersionState::Uploading {
|
||||
multipart: true,
|
||||
encryption: object_encryption,
|
||||
checksum_algorithm,
|
||||
},
|
||||
};
|
||||
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
|
||||
|
@ -90,9 +99,13 @@ pub async fn handle_put_part(
|
|||
|
||||
let upload_id = decode_upload_id(upload_id)?;
|
||||
|
||||
let content_md5 = match req.headers().get("content-md5") {
|
||||
Some(x) => Some(x.to_str()?.to_string()),
|
||||
None => None,
|
||||
let expected_checksums = ExpectedChecksums {
|
||||
md5: match req.headers().get("content-md5") {
|
||||
Some(x) => Some(x.to_str()?.to_string()),
|
||||
None => None,
|
||||
},
|
||||
sha256: content_sha256,
|
||||
extra: request_checksum_value(req.headers())?,
|
||||
};
|
||||
|
||||
// Read first chuck, and at the same time try to get object to see if it exists
|
||||
|
@ -106,8 +119,12 @@ pub async fn handle_put_part(
|
|||
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
|
||||
|
||||
// Check encryption params
|
||||
let object_encryption = match object_version.state {
|
||||
ObjectVersionState::Uploading { encryption, .. } => encryption,
|
||||
let (object_encryption, checksum_algorithm) = match object_version.state {
|
||||
ObjectVersionState::Uploading {
|
||||
encryption,
|
||||
checksum_algorithm,
|
||||
..
|
||||
} => (encryption, checksum_algorithm),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
let (encryption, _) =
|
||||
|
@ -138,7 +155,9 @@ pub async fn handle_put_part(
|
|||
mpu_part_key,
|
||||
MpuPart {
|
||||
version: version_uuid,
|
||||
// all these are filled in later, at the end of this function
|
||||
etag: None,
|
||||
checksum: None,
|
||||
size: None,
|
||||
},
|
||||
);
|
||||
|
@ -152,32 +171,31 @@ pub async fn handle_put_part(
|
|||
garage.version_table.insert(&version).await?;
|
||||
|
||||
// Copy data to version
|
||||
let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks(
|
||||
let checksummer =
|
||||
Checksummer::init(&expected_checksums, !encryption.is_encrypted()).add(checksum_algorithm);
|
||||
let (total_size, checksums, _) = read_and_put_blocks(
|
||||
&ctx,
|
||||
&version,
|
||||
encryption,
|
||||
part_number,
|
||||
first_block,
|
||||
&mut chunker,
|
||||
checksummer,
|
||||
)
|
||||
.await?;
|
||||
|
||||
// Verify that checksums map
|
||||
ensure_checksum_matches(
|
||||
&data_md5sum,
|
||||
data_sha256sum,
|
||||
content_md5.as_deref(),
|
||||
content_sha256,
|
||||
)?;
|
||||
checksums.verify(&expected_checksums)?;
|
||||
|
||||
// Store part etag in version
|
||||
let etag = encryption.etag_from_md5(&data_md5sum);
|
||||
let etag = encryption.etag_from_md5(&checksums.md5);
|
||||
|
||||
mpu.parts.put(
|
||||
mpu_part_key,
|
||||
MpuPart {
|
||||
version: version_uuid,
|
||||
etag: Some(etag.clone()),
|
||||
checksum: checksums.extract(checksum_algorithm),
|
||||
size: Some(total_size),
|
||||
},
|
||||
);
|
||||
|
@ -189,6 +207,7 @@ pub async fn handle_put_part(
|
|||
|
||||
let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
|
||||
encryption.add_response_headers(&mut resp);
|
||||
let resp = add_checksum_response_headers(&expected_checksums.extra, resp);
|
||||
Ok(resp.body(empty_body())?)
|
||||
}
|
||||
|
||||
|
@ -236,10 +255,11 @@ pub async fn handle_complete_multipart_upload(
|
|||
bucket_name,
|
||||
..
|
||||
} = &ctx;
|
||||
let (req_head, req_body) = req.into_parts();
|
||||
|
||||
let body = http_body_util::BodyExt::collect(req.into_body())
|
||||
.await?
|
||||
.to_bytes();
|
||||
let expected_checksum = request_checksum_value(&req_head.headers)?;
|
||||
|
||||
let body = http_body_util::BodyExt::collect(req_body).await?.to_bytes();
|
||||
|
||||
if let Some(content_sha256) = content_sha256 {
|
||||
verify_signed_content(content_sha256, &body[..])?;
|
||||
|
@ -263,8 +283,12 @@ pub async fn handle_complete_multipart_upload(
|
|||
return Err(Error::bad_request("No data was uploaded"));
|
||||
}
|
||||
|
||||
let object_encryption = match object_version.state {
|
||||
ObjectVersionState::Uploading { encryption, .. } => encryption,
|
||||
let (object_encryption, checksum_algorithm) = match object_version.state {
|
||||
ObjectVersionState::Uploading {
|
||||
encryption,
|
||||
checksum_algorithm,
|
||||
..
|
||||
} => (encryption, checksum_algorithm),
|
||||
_ => unreachable!(),
|
||||
};
|
||||
|
||||
|
@ -292,6 +316,13 @@ pub async fn handle_complete_multipart_upload(
|
|||
for req_part in body_list_of_parts.iter() {
|
||||
match have_parts.get(&req_part.part_number) {
|
||||
Some(part) if part.etag.as_ref() == Some(&req_part.etag) && part.size.is_some() => {
|
||||
// alternative version: if req_part.checksum.is_some() && part.checksum != req_part.checksum {
|
||||
if part.checksum != req_part.checksum {
|
||||
return Err(Error::InvalidDigest(format!(
|
||||
"Invalid checksum for part {}: in request = {:?}, uploaded part = {:?}",
|
||||
req_part.part_number, req_part.checksum, part.checksum
|
||||
)));
|
||||
}
|
||||
parts.push(*part)
|
||||
}
|
||||
_ => return Err(Error::InvalidPart),
|
||||
|
@ -339,18 +370,23 @@ pub async fn handle_complete_multipart_upload(
|
|||
});
|
||||
garage.block_ref_table.insert_many(block_refs).await?;
|
||||
|
||||
// Calculate etag of final object
|
||||
// Calculate checksum and etag of final object
|
||||
// To understand how etags are calculated, read more here:
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/userguide/checking-object-integrity.html
|
||||
// https://teppen.io/2018/06/23/aws_s3_etags/
|
||||
let mut etag_md5_hasher = Md5::new();
|
||||
let mut checksummer = MultipartChecksummer::init(checksum_algorithm);
|
||||
for part in parts.iter() {
|
||||
etag_md5_hasher.update(part.etag.as_ref().unwrap().as_bytes());
|
||||
checksummer.update(part.etag.as_ref().unwrap(), part.checksum)?;
|
||||
}
|
||||
let etag = format!(
|
||||
"{}-{}",
|
||||
hex::encode(etag_md5_hasher.finalize()),
|
||||
parts.len()
|
||||
);
|
||||
let (checksum_md5, checksum_extra) = checksummer.finalize();
|
||||
|
||||
if expected_checksum.is_some() && checksum_extra != expected_checksum {
|
||||
return Err(Error::InvalidDigest(
|
||||
"Failed to validate x-amz-checksum-*".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let etag = format!("{}-{}", hex::encode(&checksum_md5[..]), parts.len());
|
||||
|
||||
// Calculate total size of final object
|
||||
let total_size = parts.iter().map(|x| x.size.unwrap()).sum();
|
||||
|
@ -363,6 +399,20 @@ pub async fn handle_complete_multipart_upload(
|
|||
return Err(e);
|
||||
}
|
||||
|
||||
// If there is a checksum algorithm, update metadata with checksum
|
||||
let object_encryption = match checksum_algorithm {
|
||||
None => object_encryption,
|
||||
Some(_) => {
|
||||
let (encryption, meta) =
|
||||
EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
|
||||
let new_meta = ObjectVersionMetaInner {
|
||||
headers: meta.into_owned().headers,
|
||||
checksum: checksum_extra,
|
||||
};
|
||||
encryption.encrypt_meta(new_meta)?
|
||||
}
|
||||
};
|
||||
|
||||
// Write final object version
|
||||
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
|
||||
ObjectVersionMeta {
|
||||
|
@ -383,10 +433,28 @@ pub async fn handle_complete_multipart_upload(
|
|||
bucket: s3_xml::Value(bucket_name.to_string()),
|
||||
key: s3_xml::Value(key),
|
||||
etag: s3_xml::Value(format!("\"{}\"", etag)),
|
||||
checksum_crc32: match &checksum_extra {
|
||||
Some(ChecksumValue::Crc32(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
|
||||
_ => None,
|
||||
},
|
||||
checksum_crc32c: match &checksum_extra {
|
||||
Some(ChecksumValue::Crc32c(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
|
||||
_ => None,
|
||||
},
|
||||
checksum_sha1: match &checksum_extra {
|
||||
Some(ChecksumValue::Sha1(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
|
||||
_ => None,
|
||||
},
|
||||
checksum_sha256: match &checksum_extra {
|
||||
Some(ChecksumValue::Sha256(x)) => Some(s3_xml::Value(BASE64_STANDARD.encode(&x))),
|
||||
_ => None,
|
||||
},
|
||||
};
|
||||
let xml = s3_xml::to_xml_with_header(&result)?;
|
||||
|
||||
Ok(Response::new(string_body(xml)))
|
||||
let resp = Response::builder();
|
||||
let resp = add_checksum_response_headers(&expected_checksum, resp);
|
||||
Ok(resp.body(string_body(xml))?)
|
||||
}
|
||||
|
||||
pub async fn handle_abort_multipart_upload(
|
||||
|
@ -455,6 +523,7 @@ pub fn decode_upload_id(id: &str) -> Result<Uuid, Error> {
|
|||
struct CompleteMultipartUploadPart {
|
||||
etag: String,
|
||||
part_number: u64,
|
||||
checksum: Option<ChecksumValue>,
|
||||
}
|
||||
|
||||
fn parse_complete_multipart_upload_body(
|
||||
|
@ -480,9 +549,41 @@ fn parse_complete_multipart_upload_body(
|
|||
.children()
|
||||
.find(|e| e.has_tag_name("PartNumber"))?
|
||||
.text()?;
|
||||
let checksum = if let Some(crc32) =
|
||||
item.children().find(|e| e.has_tag_name("ChecksumCRC32"))
|
||||
{
|
||||
Some(ChecksumValue::Crc32(
|
||||
BASE64_STANDARD.decode(crc32.text()?).ok()?[..]
|
||||
.try_into()
|
||||
.ok()?,
|
||||
))
|
||||
} else if let Some(crc32c) = item.children().find(|e| e.has_tag_name("ChecksumCRC32C"))
|
||||
{
|
||||
Some(ChecksumValue::Crc32c(
|
||||
BASE64_STANDARD.decode(crc32c.text()?).ok()?[..]
|
||||
.try_into()
|
||||
.ok()?,
|
||||
))
|
||||
} else if let Some(sha1) = item.children().find(|e| e.has_tag_name("ChecksumSHA1")) {
|
||||
Some(ChecksumValue::Sha1(
|
||||
BASE64_STANDARD.decode(sha1.text()?).ok()?[..]
|
||||
.try_into()
|
||||
.ok()?,
|
||||
))
|
||||
} else if let Some(sha256) = item.children().find(|e| e.has_tag_name("ChecksumSHA256"))
|
||||
{
|
||||
Some(ChecksumValue::Sha256(
|
||||
BASE64_STANDARD.decode(sha256.text()?).ok()?[..]
|
||||
.try_into()
|
||||
.ok()?,
|
||||
))
|
||||
} else {
|
||||
None
|
||||
};
|
||||
parts.push(CompleteMultipartUploadPart {
|
||||
etag: etag.trim_matches('"').to_string(),
|
||||
part_number: part_number.parse().ok()?,
|
||||
checksum,
|
||||
});
|
||||
} else {
|
||||
return None;
|
||||
|
|
|
@ -14,13 +14,15 @@ use multer::{Constraints, Multipart, SizeLimit};
|
|||
use serde::Deserialize;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
use garage_model::s3::object_table::*;
|
||||
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::ResBody;
|
||||
use crate::s3::checksum::*;
|
||||
use crate::s3::cors::*;
|
||||
use crate::s3::encryption::EncryptionParams;
|
||||
use crate::s3::error::*;
|
||||
use crate::s3::put::{get_headers, save_stream};
|
||||
use crate::s3::put::{get_headers, save_stream, ChecksumMode};
|
||||
use crate::s3::xml as s3_xml;
|
||||
use crate::signature::payload::{verify_v4, Authorization};
|
||||
|
||||
|
@ -98,10 +100,6 @@ pub async fn handle_post_object(
|
|||
.ok_or_bad_request("No policy was provided")?
|
||||
.to_str()?;
|
||||
let authorization = Authorization::parse_form(¶ms)?;
|
||||
let content_md5 = params
|
||||
.get("content-md5")
|
||||
.map(HeaderValue::to_str)
|
||||
.transpose()?;
|
||||
|
||||
let key = if key.contains("${filename}") {
|
||||
// if no filename is provided, don't replace. This matches the behavior of AWS.
|
||||
|
@ -226,6 +224,21 @@ pub async fn handle_post_object(
|
|||
|
||||
let headers = get_headers(¶ms)?;
|
||||
|
||||
let expected_checksums = ExpectedChecksums {
|
||||
md5: params
|
||||
.get("content-md5")
|
||||
.map(HeaderValue::to_str)
|
||||
.transpose()?
|
||||
.map(str::to_string),
|
||||
sha256: None,
|
||||
extra: request_checksum_algorithm_value(¶ms)?,
|
||||
};
|
||||
|
||||
let meta = ObjectVersionMetaInner {
|
||||
headers,
|
||||
checksum: expected_checksums.extra,
|
||||
};
|
||||
|
||||
let encryption = EncryptionParams::new_from_headers(&garage, ¶ms)?;
|
||||
|
||||
let stream = file_field.map(|r| r.map_err(Into::into));
|
||||
|
@ -239,12 +252,11 @@ pub async fn handle_post_object(
|
|||
|
||||
let res = save_stream(
|
||||
&ctx,
|
||||
headers,
|
||||
meta,
|
||||
encryption,
|
||||
StreamLimiter::new(stream, conditions.content_length),
|
||||
&key,
|
||||
content_md5.map(str::to_string),
|
||||
None,
|
||||
ChecksumMode::Verify(&expected_checksums),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
|
|
@ -1,12 +1,9 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use base64::prelude::*;
|
||||
use futures::prelude::*;
|
||||
use futures::stream::FuturesOrdered;
|
||||
use futures::try_join;
|
||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||
use sha2::Sha256;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
|
@ -22,7 +19,6 @@ use opentelemetry::{
|
|||
use garage_net::bytes_buf::BytesBuf;
|
||||
use garage_rpc::rpc_helper::OrderTag;
|
||||
use garage_table::*;
|
||||
use garage_util::async_hash::*;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::Error as GarageError;
|
||||
use garage_util::time::*;
|
||||
|
@ -36,16 +32,22 @@ use garage_model::s3::version_table::*;
|
|||
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::{ReqBody, ResBody};
|
||||
use crate::s3::checksum::*;
|
||||
use crate::s3::encryption::EncryptionParams;
|
||||
use crate::s3::error::*;
|
||||
|
||||
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
|
||||
|
||||
pub struct SaveStreamResult {
|
||||
pub version_uuid: Uuid,
|
||||
pub version_timestamp: u64,
|
||||
pub(crate) struct SaveStreamResult {
|
||||
pub(crate) version_uuid: Uuid,
|
||||
pub(crate) version_timestamp: u64,
|
||||
/// Etag WITHOUT THE QUOTES (just the hex value)
|
||||
pub etag: String,
|
||||
pub(crate) etag: String,
|
||||
}
|
||||
|
||||
pub(crate) enum ChecksumMode<'a> {
|
||||
Verify(&'a ExpectedChecksums),
|
||||
Calculate(Option<ChecksumAlgorithm>),
|
||||
}
|
||||
|
||||
pub async fn handle_put(
|
||||
|
@ -58,24 +60,32 @@ pub async fn handle_put(
|
|||
let headers = get_headers(req.headers())?;
|
||||
debug!("Object headers: {:?}", headers);
|
||||
|
||||
let expected_checksums = ExpectedChecksums {
|
||||
md5: match req.headers().get("content-md5") {
|
||||
Some(x) => Some(x.to_str()?.to_string()),
|
||||
None => None,
|
||||
},
|
||||
sha256: content_sha256,
|
||||
extra: request_checksum_value(req.headers())?,
|
||||
};
|
||||
|
||||
let meta = ObjectVersionMetaInner {
|
||||
headers,
|
||||
checksum: expected_checksums.extra,
|
||||
};
|
||||
|
||||
// Determine whether object should be encrypted, and if so the key
|
||||
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
|
||||
|
||||
let content_md5 = match req.headers().get("content-md5") {
|
||||
Some(x) => Some(x.to_str()?.to_string()),
|
||||
None => None,
|
||||
};
|
||||
|
||||
let stream = body_stream(req.into_body());
|
||||
|
||||
let res = save_stream(
|
||||
&ctx,
|
||||
headers,
|
||||
meta,
|
||||
encryption,
|
||||
stream,
|
||||
key,
|
||||
content_md5,
|
||||
content_sha256,
|
||||
ChecksumMode::Verify(&expected_checksums),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -83,17 +93,17 @@ pub async fn handle_put(
|
|||
.header("x-amz-version-id", hex::encode(res.version_uuid))
|
||||
.header("ETag", format!("\"{}\"", res.etag));
|
||||
encryption.add_response_headers(&mut resp);
|
||||
let resp = add_checksum_response_headers(&expected_checksums.extra, resp);
|
||||
Ok(resp.body(empty_body())?)
|
||||
}
|
||||
|
||||
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||
ctx: &ReqCtx,
|
||||
headers: ObjectVersionHeaders,
|
||||
mut meta: ObjectVersionMetaInner,
|
||||
encryption: EncryptionParams,
|
||||
body: S,
|
||||
key: &String,
|
||||
content_md5: Option<String>,
|
||||
content_sha256: Option<FixedBytes32>,
|
||||
checksum_mode: ChecksumMode<'_>,
|
||||
) -> Result<SaveStreamResult, Error> {
|
||||
let ReqCtx {
|
||||
garage, bucket_id, ..
|
||||
|
@ -107,32 +117,36 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
|
||||
let first_block = first_block_opt.unwrap_or_default();
|
||||
|
||||
let object_encryption = encryption.encrypt_headers(headers)?;
|
||||
|
||||
// Generate identity of new version
|
||||
let version_uuid = gen_uuid();
|
||||
let version_timestamp = next_timestamp(existing_object.as_ref());
|
||||
|
||||
let mut checksummer = match checksum_mode {
|
||||
ChecksumMode::Verify(expected) => Checksummer::init(expected, !encryption.is_encrypted()),
|
||||
ChecksumMode::Calculate(algo) => {
|
||||
Checksummer::init(&Default::default(), !encryption.is_encrypted()).add(algo)
|
||||
}
|
||||
};
|
||||
|
||||
// If body is small enough, store it directly in the object table
|
||||
// as "inline data". We can then return immediately.
|
||||
if first_block.len() < INLINE_THRESHOLD {
|
||||
let mut md5sum = Md5::new();
|
||||
md5sum.update(&first_block[..]);
|
||||
let data_md5sum = md5sum.finalize();
|
||||
checksummer.update(&first_block);
|
||||
let checksums = checksummer.finalize();
|
||||
|
||||
let data_sha256sum = sha256sum(&first_block[..]);
|
||||
|
||||
ensure_checksum_matches(
|
||||
&data_md5sum,
|
||||
data_sha256sum,
|
||||
content_md5.as_deref(),
|
||||
content_sha256,
|
||||
)?;
|
||||
match checksum_mode {
|
||||
ChecksumMode::Verify(expected) => {
|
||||
checksums.verify(&expected)?;
|
||||
}
|
||||
ChecksumMode::Calculate(algo) => {
|
||||
meta.checksum = checksums.extract(algo);
|
||||
}
|
||||
};
|
||||
|
||||
let size = first_block.len() as u64;
|
||||
check_quotas(ctx, size, existing_object.as_ref()).await?;
|
||||
|
||||
let etag = encryption.etag_from_md5(&data_md5sum);
|
||||
let etag = encryption.etag_from_md5(&checksums.md5);
|
||||
let inline_data = encryption.encrypt_blob(&first_block)?.to_vec();
|
||||
|
||||
let object_version = ObjectVersion {
|
||||
|
@ -140,7 +154,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
timestamp: version_timestamp,
|
||||
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
|
||||
ObjectVersionMeta {
|
||||
encryption: object_encryption,
|
||||
encryption: encryption.encrypt_meta(meta)?,
|
||||
size,
|
||||
etag: etag.clone(),
|
||||
},
|
||||
|
@ -175,7 +189,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
uuid: version_uuid,
|
||||
timestamp: version_timestamp,
|
||||
state: ObjectVersionState::Uploading {
|
||||
encryption: object_encryption.clone(),
|
||||
encryption: encryption.encrypt_meta(meta.clone())?,
|
||||
checksum_algorithm: None, // don't care; overwritten later
|
||||
multipart: false,
|
||||
},
|
||||
};
|
||||
|
@ -196,25 +211,37 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
);
|
||||
garage.version_table.insert(&version).await?;
|
||||
|
||||
// Transfer data and verify checksum
|
||||
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
|
||||
read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?;
|
||||
// Transfer data
|
||||
let (total_size, checksums, first_block_hash) = read_and_put_blocks(
|
||||
ctx,
|
||||
&version,
|
||||
encryption,
|
||||
1,
|
||||
first_block,
|
||||
&mut chunker,
|
||||
checksummer,
|
||||
)
|
||||
.await?;
|
||||
|
||||
ensure_checksum_matches(
|
||||
&data_md5sum,
|
||||
data_sha256sum,
|
||||
content_md5.as_deref(),
|
||||
content_sha256,
|
||||
)?;
|
||||
// Verify checksums are ok / add calculated checksum to metadata
|
||||
match checksum_mode {
|
||||
ChecksumMode::Verify(expected) => {
|
||||
checksums.verify(&expected)?;
|
||||
}
|
||||
ChecksumMode::Calculate(algo) => {
|
||||
meta.checksum = checksums.extract(algo);
|
||||
}
|
||||
};
|
||||
|
||||
// Verify quotas are respsected
|
||||
check_quotas(ctx, total_size, existing_object.as_ref()).await?;
|
||||
|
||||
// Save final object state, marked as Complete
|
||||
let etag = encryption.etag_from_md5(&data_md5sum);
|
||||
let etag = encryption.etag_from_md5(&checksums.md5);
|
||||
|
||||
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
|
||||
ObjectVersionMeta {
|
||||
encryption: object_encryption,
|
||||
encryption: encryption.encrypt_meta(meta)?,
|
||||
size: total_size,
|
||||
etag: etag.clone(),
|
||||
},
|
||||
|
@ -234,33 +261,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
|||
})
|
||||
}
|
||||
|
||||
/// Validate MD5 sum against content-md5 header
|
||||
/// and sha256sum against signed content-sha256
|
||||
pub(crate) fn ensure_checksum_matches(
|
||||
data_md5sum: &[u8],
|
||||
data_sha256sum: garage_util::data::FixedBytes32,
|
||||
content_md5: Option<&str>,
|
||||
content_sha256: Option<garage_util::data::FixedBytes32>,
|
||||
) -> Result<(), Error> {
|
||||
if let Some(expected_sha256) = content_sha256 {
|
||||
if expected_sha256 != data_sha256sum {
|
||||
return Err(Error::bad_request(
|
||||
"Unable to validate x-amz-content-sha256",
|
||||
));
|
||||
} else {
|
||||
trace!("Successfully validated x-amz-content-sha256");
|
||||
}
|
||||
}
|
||||
if let Some(expected_md5) = content_md5 {
|
||||
if expected_md5.trim_matches('"') != BASE64_STANDARD.encode(data_md5sum) {
|
||||
return Err(Error::bad_request("Unable to validate content-md5"));
|
||||
} else {
|
||||
trace!("Successfully validated content-md5");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Check that inserting this object with this size doesn't exceed bucket quotas
|
||||
pub(crate) async fn check_quotas(
|
||||
ctx: &ReqCtx,
|
||||
|
@ -332,7 +332,8 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
|||
part_number: u64,
|
||||
first_block: Bytes,
|
||||
chunker: &mut StreamChunker<S>,
|
||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash, Hash), Error> {
|
||||
checksummer: Checksummer,
|
||||
) -> Result<(u64, Checksums, Hash), Error> {
|
||||
let tracer = opentelemetry::global::tracer("garage");
|
||||
|
||||
let (block_tx, mut block_rx) = mpsc::channel::<Result<Bytes, Error>>(2);
|
||||
|
@ -360,20 +361,20 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
|||
|
||||
let (block_tx2, mut block_rx2) = mpsc::channel::<Result<Bytes, Error>>(1);
|
||||
let hash_stream = async {
|
||||
let md5hasher = AsyncHasher::<Md5>::new();
|
||||
let sha256hasher = AsyncHasher::<Sha256>::new();
|
||||
let mut checksummer = checksummer;
|
||||
while let Some(next) = block_rx.recv().await {
|
||||
match next {
|
||||
Ok(block) => {
|
||||
block_tx2.send(Ok(block.clone())).await?;
|
||||
futures::future::join(
|
||||
md5hasher.update(block.clone()),
|
||||
sha256hasher.update(block.clone()),
|
||||
)
|
||||
checksummer = tokio::task::spawn_blocking(move || {
|
||||
checksummer.update(&block);
|
||||
checksummer
|
||||
})
|
||||
.with_context(Context::current_with_span(
|
||||
tracer.start("Hash block (md5, sha256)"),
|
||||
))
|
||||
.await;
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
Err(e) => {
|
||||
block_tx2.send(Err(e)).await?;
|
||||
|
@ -382,10 +383,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
|||
}
|
||||
}
|
||||
drop(block_tx2);
|
||||
Ok::<_, mpsc::error::SendError<_>>(futures::join!(
|
||||
md5hasher.finalize(),
|
||||
sha256hasher.finalize()
|
||||
))
|
||||
Ok::<_, mpsc::error::SendError<_>>(checksummer)
|
||||
};
|
||||
|
||||
let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1);
|
||||
|
@ -395,33 +393,28 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
|||
match next {
|
||||
Ok(block) => {
|
||||
let unencrypted_len = block.len() as u64;
|
||||
let block = if encryption.is_encrypted() {
|
||||
let res =
|
||||
tokio::task::spawn_blocking(move || encryption.encrypt_block(block))
|
||||
.with_context(Context::current_with_span(
|
||||
tracer.start("Encrypt block"),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
match res {
|
||||
Ok(b) => b,
|
||||
Err(e) => {
|
||||
block_tx3.send(Err(e)).await?;
|
||||
break;
|
||||
let res = tokio::task::spawn_blocking(move || {
|
||||
let block = encryption.encrypt_block(block)?;
|
||||
let hash = blake2sum(&block);
|
||||
Ok((block, hash))
|
||||
})
|
||||
.with_context(Context::current_with_span(
|
||||
tracer.start("Encrypt and hash (blake2) block"),
|
||||
))
|
||||
.await
|
||||
.unwrap();
|
||||
match res {
|
||||
Ok((block, hash)) => {
|
||||
if first_block_hash.is_none() {
|
||||
first_block_hash = Some(hash);
|
||||
}
|
||||
block_tx3.send(Ok((block, unencrypted_len, hash))).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
block_tx3.send(Err(e)).await?;
|
||||
break;
|
||||
}
|
||||
} else {
|
||||
block
|
||||
};
|
||||
let hash = async_blake2sum(block.clone())
|
||||
.with_context(Context::current_with_span(
|
||||
tracer.start("Hash block (blake2)"),
|
||||
))
|
||||
.await;
|
||||
if first_block_hash.is_none() {
|
||||
first_block_hash = Some(hash);
|
||||
}
|
||||
block_tx3.send(Ok((block, unencrypted_len, hash))).await?;
|
||||
}
|
||||
Err(e) => {
|
||||
block_tx3.send(Err(e)).await?;
|
||||
|
@ -493,12 +486,10 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
|
|||
let total_size = final_result?;
|
||||
// unwrap here is ok, because if hasher failed, it is because something failed
|
||||
// later in the pipeline which already caused a return at the ? on previous line
|
||||
let (data_md5sum, data_sha256sum) = stream_hash_result.unwrap();
|
||||
let first_block_hash = block_hash_result.unwrap();
|
||||
let checksums = stream_hash_result.unwrap().finalize();
|
||||
|
||||
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
|
||||
|
||||
Ok((total_size, data_md5sum, data_sha256sum, first_block_hash))
|
||||
Ok((total_size, checksums, first_block_hash))
|
||||
}
|
||||
|
||||
async fn put_block_and_meta(
|
||||
|
@ -609,7 +600,7 @@ impl Drop for InterruptedCleanup {
|
|||
|
||||
// ============ helpers ============
|
||||
|
||||
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
|
||||
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<HeaderList, Error> {
|
||||
let mut ret = Vec::new();
|
||||
|
||||
// Preserve standard headers
|
||||
|
@ -637,7 +628,7 @@ pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVers
|
|||
}
|
||||
}
|
||||
|
||||
Ok(ObjectVersionHeaders(ret))
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {
|
||||
|
|
|
@ -131,6 +131,14 @@ pub struct CompleteMultipartUploadResult {
|
|||
pub key: Value,
|
||||
#[serde(rename = "ETag")]
|
||||
pub etag: Value,
|
||||
#[serde(rename = "ChecksumCRC32")]
|
||||
pub checksum_crc32: Option<Value>,
|
||||
#[serde(rename = "ChecksumCRC32C")]
|
||||
pub checksum_crc32c: Option<Value>,
|
||||
#[serde(rename = "ChecksumSHA1")]
|
||||
pub checksum_sha1: Option<Value>,
|
||||
#[serde(rename = "ChecksumSHA256")]
|
||||
pub checksum_sha256: Option<Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, PartialEq, Eq)]
|
||||
|
@ -197,6 +205,14 @@ pub struct PartItem {
|
|||
pub part_number: IntValue,
|
||||
#[serde(rename = "Size")]
|
||||
pub size: IntValue,
|
||||
#[serde(rename = "ChecksumCRC32")]
|
||||
pub checksum_crc32: Option<Value>,
|
||||
#[serde(rename = "ChecksumCRC32C")]
|
||||
pub checksum_crc32c: Option<Value>,
|
||||
#[serde(rename = "ChecksumSHA1")]
|
||||
pub checksum_sha1: Option<Value>,
|
||||
#[serde(rename = "ChecksumSHA256")]
|
||||
pub checksum_sha256: Option<Value>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, PartialEq, Eq)]
|
||||
|
@ -500,6 +516,10 @@ mod tests {
|
|||
bucket: Value("mybucket".to_string()),
|
||||
key: Value("a/plop".to_string()),
|
||||
etag: Value("\"3858f62230ac3c915f300c664312c11f-9\"".to_string()),
|
||||
checksum_crc32: None,
|
||||
checksum_crc32c: None,
|
||||
checksum_sha1: Some(Value("ZJAnHyG8PeKz9tI8UTcHrJos39A=".into())),
|
||||
checksum_sha256: None,
|
||||
};
|
||||
assert_eq!(
|
||||
to_xml_with_header(&result)?,
|
||||
|
@ -509,6 +529,7 @@ mod tests {
|
|||
<Bucket>mybucket</Bucket>\
|
||||
<Key>a/plop</Key>\
|
||||
<ETag>"3858f62230ac3c915f300c664312c11f-9"</ETag>\
|
||||
<ChecksumSHA1>ZJAnHyG8PeKz9tI8UTcHrJos39A=</ChecksumSHA1>\
|
||||
</CompleteMultipartUploadResult>"
|
||||
);
|
||||
Ok(())
|
||||
|
@ -780,12 +801,22 @@ mod tests {
|
|||
last_modified: Value("2010-11-10T20:48:34.000Z".to_string()),
|
||||
part_number: IntValue(2),
|
||||
size: IntValue(10485760),
|
||||
checksum_crc32: None,
|
||||
checksum_crc32c: None,
|
||||
checksum_sha256: Some(Value(
|
||||
"5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=".into(),
|
||||
)),
|
||||
checksum_sha1: None,
|
||||
},
|
||||
PartItem {
|
||||
etag: Value("\"aaaa18db4cc2f85cedef654fccc4a4x8\"".to_string()),
|
||||
last_modified: Value("2010-11-10T20:48:33.000Z".to_string()),
|
||||
part_number: IntValue(3),
|
||||
size: IntValue(10485760),
|
||||
checksum_sha256: None,
|
||||
checksum_crc32c: None,
|
||||
checksum_crc32: Some(Value("ZJAnHyG8=".into())),
|
||||
checksum_sha1: None,
|
||||
},
|
||||
],
|
||||
initiator: Initiator {
|
||||
|
@ -820,12 +851,14 @@ mod tests {
|
|||
<LastModified>2010-11-10T20:48:34.000Z</LastModified>\
|
||||
<PartNumber>2</PartNumber>\
|
||||
<Size>10485760</Size>\
|
||||
<ChecksumSHA256>5RQ3A5uk0w7ojNjvegohch4JRBBGN/cLhsNrPzfv/hA=</ChecksumSHA256>\
|
||||
</Part>\
|
||||
<Part>\
|
||||
<ETag>"aaaa18db4cc2f85cedef654fccc4a4x8"</ETag>\
|
||||
<LastModified>2010-11-10T20:48:33.000Z</LastModified>\
|
||||
<PartNumber>3</PartNumber>\
|
||||
<Size>10485760</Size>\
|
||||
<ChecksumCRC32>ZJAnHyG8=</ChecksumCRC32>\
|
||||
</Part>\
|
||||
<Initiator>\
|
||||
<DisplayName>umat-user-11116a31-17b5-4fb7-9df5-b288870f11xx</DisplayName>\
|
||||
|
|
|
@ -42,6 +42,7 @@ tracing.workspace = true
|
|||
tracing-subscriber.workspace = true
|
||||
rand.workspace = true
|
||||
async-trait.workspace = true
|
||||
sha1.workspace = true
|
||||
sodiumoxide.workspace = true
|
||||
structopt.workspace = true
|
||||
git-version.workspace = true
|
||||
|
|
|
@ -1,6 +1,7 @@
|
|||
use crate::common;
|
||||
use aws_sdk_s3::primitives::ByteStream;
|
||||
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
|
||||
use aws_sdk_s3::types::{ChecksumAlgorithm, CompletedMultipartUpload, CompletedPart};
|
||||
use base64::prelude::*;
|
||||
|
||||
const SZ_5MB: usize = 5 * 1024 * 1024;
|
||||
const SZ_10MB: usize = 10 * 1024 * 1024;
|
||||
|
@ -189,6 +190,153 @@ async fn test_multipart_upload() {
|
|||
}
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_multipart_with_checksum() {
|
||||
let ctx = common::context();
|
||||
let bucket = ctx.create_bucket("testmpu-cksum");
|
||||
|
||||
let u1 = vec![0x11; SZ_5MB];
|
||||
let u2 = vec![0x22; SZ_5MB];
|
||||
let u3 = vec![0x33; SZ_5MB];
|
||||
|
||||
let ck1 = calculate_sha1(&u1);
|
||||
let ck2 = calculate_sha1(&u2);
|
||||
let ck3 = calculate_sha1(&u3);
|
||||
|
||||
let up = ctx
|
||||
.client
|
||||
.create_multipart_upload()
|
||||
.bucket(&bucket)
|
||||
.checksum_algorithm(ChecksumAlgorithm::Sha1)
|
||||
.key("a")
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(up.upload_id.is_some());
|
||||
|
||||
let uid = up.upload_id.as_ref().unwrap();
|
||||
|
||||
let p1 = ctx
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket(&bucket)
|
||||
.key("a")
|
||||
.upload_id(uid)
|
||||
.part_number(1)
|
||||
.checksum_sha1(&ck1)
|
||||
.body(ByteStream::from(u1.clone()))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// wrong checksum value should return an error
|
||||
let err1 = ctx
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket(&bucket)
|
||||
.key("a")
|
||||
.upload_id(uid)
|
||||
.part_number(2)
|
||||
.checksum_sha1(&ck1)
|
||||
.body(ByteStream::from(u2.clone()))
|
||||
.send()
|
||||
.await;
|
||||
assert!(err1.is_err());
|
||||
|
||||
let p2 = ctx
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket(&bucket)
|
||||
.key("a")
|
||||
.upload_id(uid)
|
||||
.part_number(2)
|
||||
.checksum_sha1(&ck2)
|
||||
.body(ByteStream::from(u2))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
let p3 = ctx
|
||||
.client
|
||||
.upload_part()
|
||||
.bucket(&bucket)
|
||||
.key("a")
|
||||
.upload_id(uid)
|
||||
.part_number(3)
|
||||
.checksum_sha1(&ck3)
|
||||
.body(ByteStream::from(u3.clone()))
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
{
|
||||
let r = ctx
|
||||
.client
|
||||
.list_parts()
|
||||
.bucket(&bucket)
|
||||
.key("a")
|
||||
.upload_id(uid)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
let parts = r.parts.unwrap();
|
||||
assert_eq!(parts.len(), 3);
|
||||
assert!(parts[0].checksum_crc32.is_none());
|
||||
assert!(parts[0].checksum_crc32_c.is_none());
|
||||
assert!(parts[0].checksum_sha256.is_none());
|
||||
assert_eq!(parts[0].checksum_sha1.as_deref().unwrap(), ck1);
|
||||
assert_eq!(parts[1].checksum_sha1.as_deref().unwrap(), ck2);
|
||||
assert_eq!(parts[2].checksum_sha1.as_deref().unwrap(), ck3);
|
||||
}
|
||||
|
||||
let cmp = CompletedMultipartUpload::builder()
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(1)
|
||||
.checksum_sha1(&ck1)
|
||||
.e_tag(p1.e_tag.unwrap())
|
||||
.build(),
|
||||
)
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(2)
|
||||
.checksum_sha1(&ck2)
|
||||
.e_tag(p2.e_tag.unwrap())
|
||||
.build(),
|
||||
)
|
||||
.parts(
|
||||
CompletedPart::builder()
|
||||
.part_number(3)
|
||||
.checksum_sha1(&ck3)
|
||||
.e_tag(p3.e_tag.unwrap())
|
||||
.build(),
|
||||
)
|
||||
.build();
|
||||
|
||||
let expected_checksum = calculate_sha1(
|
||||
&vec![
|
||||
BASE64_STANDARD.decode(&ck1).unwrap(),
|
||||
BASE64_STANDARD.decode(&ck2).unwrap(),
|
||||
BASE64_STANDARD.decode(&ck3).unwrap(),
|
||||
]
|
||||
.concat(),
|
||||
);
|
||||
|
||||
let res = ctx
|
||||
.client
|
||||
.complete_multipart_upload()
|
||||
.bucket(&bucket)
|
||||
.key("a")
|
||||
.upload_id(uid)
|
||||
.checksum_sha1(expected_checksum.clone())
|
||||
.multipart_upload(cmp)
|
||||
.send()
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
assert_eq!(res.checksum_sha1, Some(expected_checksum));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_uploadlistpart() {
|
||||
let ctx = common::context();
|
||||
|
@ -624,3 +772,11 @@ async fn test_uploadpartcopy() {
|
|||
assert_eq!(real_obj.len(), exp_obj.len());
|
||||
assert_eq!(real_obj, exp_obj);
|
||||
}
|
||||
|
||||
fn calculate_sha1(bytes: &[u8]) -> String {
|
||||
use sha1::{Digest, Sha1};
|
||||
|
||||
let mut hasher = Sha1::new();
|
||||
hasher.update(bytes);
|
||||
BASE64_STANDARD.encode(&hasher.finalize()[..])
|
||||
}
|
||||
|
|
|
@ -17,6 +17,7 @@ pub const PARTS: &str = "parts";
|
|||
pub const BYTES: &str = "bytes";
|
||||
|
||||
mod v09 {
|
||||
use crate::s3::object_table::ChecksumValue;
|
||||
use garage_util::crdt;
|
||||
use garage_util::data::Uuid;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
@ -61,6 +62,9 @@ mod v09 {
|
|||
pub version: Uuid,
|
||||
/// ETag of the content of this part (known only once done uploading)
|
||||
pub etag: Option<String>,
|
||||
/// Checksum requested by x-amz-checksum-algorithm
|
||||
#[serde(default)]
|
||||
pub checksum: Option<ChecksumValue>,
|
||||
/// Size of this part (known only once done uploading)
|
||||
pub size: Option<u64>,
|
||||
}
|
||||
|
@ -155,6 +159,11 @@ impl Crdt for MpuPart {
|
|||
(Some(x), Some(y)) if x < y => other.size,
|
||||
(x, _) => x,
|
||||
};
|
||||
self.checksum = match (self.checksum.take(), &other.checksum) {
|
||||
(None, Some(_)) => other.checksum.clone(),
|
||||
(Some(x), Some(y)) if x < *y => other.checksum.clone(),
|
||||
(x, _) => x,
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -208,6 +208,8 @@ mod v010 {
|
|||
Uploading {
|
||||
/// Indicates whether this is a multipart upload
|
||||
multipart: bool,
|
||||
/// Checksum algorithm to use
|
||||
checksum_algorithm: Option<ChecksumAlgorithm>,
|
||||
/// Encryption params + headers to be included in the final object
|
||||
encryption: ObjectVersionEncryption,
|
||||
},
|
||||
|
@ -247,10 +249,10 @@ mod v010 {
|
|||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum ObjectVersionEncryption {
|
||||
SseC {
|
||||
/// Encrypted serialized ObjectVersionHeaders struct.
|
||||
/// Encrypted serialized ObjectVersionInner struct.
|
||||
/// This is never compressed, just encrypted using AES256-GCM.
|
||||
#[serde(with = "serde_bytes")]
|
||||
headers: Vec<u8>,
|
||||
inner: Vec<u8>,
|
||||
/// Whether data blocks are compressed in addition to being encrypted
|
||||
/// (compression happens before encryption, whereas for non-encrypted
|
||||
/// objects, compression is handled at the level of the block manager)
|
||||
|
@ -258,13 +260,35 @@ mod v010 {
|
|||
},
|
||||
Plaintext {
|
||||
/// Plain-text headers
|
||||
headers: ObjectVersionHeaders,
|
||||
inner: ObjectVersionMetaInner,
|
||||
},
|
||||
}
|
||||
|
||||
/// Vector of headers, as tuples of the format (header name, header value)
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct ObjectVersionHeaders(pub Vec<(String, String)>);
|
||||
pub struct ObjectVersionMetaInner {
|
||||
pub headers: HeaderList,
|
||||
pub checksum: Option<ChecksumValue>,
|
||||
}
|
||||
|
||||
pub type HeaderList = Vec<(String, String)>;
|
||||
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub enum ChecksumAlgorithm {
|
||||
Crc32,
|
||||
Crc32c,
|
||||
Sha1,
|
||||
Sha256,
|
||||
}
|
||||
|
||||
/// Checksum value for x-amz-checksum-algorithm
|
||||
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Copy, Debug, Serialize, Deserialize)]
|
||||
pub enum ChecksumValue {
|
||||
Crc32(#[serde(with = "serde_bytes")] [u8; 4]),
|
||||
Crc32c(#[serde(with = "serde_bytes")] [u8; 4]),
|
||||
Sha1(#[serde(with = "serde_bytes")] [u8; 20]),
|
||||
Sha256(#[serde(with = "serde_bytes")] [u8; 32]),
|
||||
}
|
||||
|
||||
impl garage_util::migrate::Migrate for Object {
|
||||
const VERSION_MARKER: &'static [u8] = b"G010s3ob";
|
||||
|
@ -288,6 +312,7 @@ mod v010 {
|
|||
v09::ObjectVersionState::Uploading { multipart, headers } => {
|
||||
ObjectVersionState::Uploading {
|
||||
multipart,
|
||||
checksum_algorithm: None,
|
||||
encryption: migrate_headers(headers),
|
||||
}
|
||||
}
|
||||
|
@ -331,15 +356,18 @@ mod v010 {
|
|||
}
|
||||
|
||||
ObjectVersionEncryption::Plaintext {
|
||||
headers: ObjectVersionHeaders(new_headers),
|
||||
inner: ObjectVersionMetaInner {
|
||||
headers: new_headers,
|
||||
checksum: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// Since ObjectVersionHeaders can now be serialized independently, for the
|
||||
// purpose of being encrypted, we need it to support migrations on its own
|
||||
// as well.
|
||||
impl garage_util::migrate::InitialFormat for ObjectVersionHeaders {
|
||||
const VERSION_MARKER: &'static [u8] = b"G010s3oh";
|
||||
impl garage_util::migrate::InitialFormat for ObjectVersionMetaInner {
|
||||
const VERSION_MARKER: &'static [u8] = b"G010s3om";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -454,6 +482,17 @@ impl Entry<Uuid, String> for Object {
|
|||
}
|
||||
}
|
||||
|
||||
impl ChecksumValue {
|
||||
pub fn algorithm(&self) -> ChecksumAlgorithm {
|
||||
match self {
|
||||
ChecksumValue::Crc32(_) => ChecksumAlgorithm::Crc32,
|
||||
ChecksumValue::Crc32c(_) => ChecksumAlgorithm::Crc32c,
|
||||
ChecksumValue::Sha1(_) => ChecksumAlgorithm::Sha1,
|
||||
ChecksumValue::Sha256(_) => ChecksumAlgorithm::Sha256,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Crdt for Object {
|
||||
fn merge(&mut self, other: &Self) {
|
||||
// Merge versions from other into here
|
||||
|
|
|
@ -1,61 +0,0 @@
|
|||
use bytes::Bytes;
|
||||
use digest::Digest;
|
||||
|
||||
use tokio::sync::mpsc;
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
use crate::data::*;
|
||||
|
||||
/// Compute the sha256 of a slice,
|
||||
/// spawning on a tokio thread for CPU-intensive processing
|
||||
/// The argument has to be an owned Bytes, as it is moved out to a new thread.
|
||||
pub async fn async_sha256sum(data: Bytes) -> Hash {
|
||||
tokio::task::spawn_blocking(move || sha256sum(&data))
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
/// Compute the blake2sum of a slice,
|
||||
/// spawning on a tokio thread for CPU-intensive processing.
|
||||
/// The argument has to be an owned Bytes, as it is moved out to a new thread.
|
||||
pub async fn async_blake2sum(data: Bytes) -> Hash {
|
||||
tokio::task::spawn_blocking(move || blake2sum(&data))
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
// ----
|
||||
|
||||
pub struct AsyncHasher<D: Digest> {
|
||||
sendblk: mpsc::Sender<Bytes>,
|
||||
task: JoinHandle<digest::Output<D>>,
|
||||
}
|
||||
|
||||
impl<D: Digest> AsyncHasher<D> {
|
||||
pub fn new() -> Self {
|
||||
let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1);
|
||||
let task = tokio::task::spawn_blocking(move || {
|
||||
let mut digest = D::new();
|
||||
while let Some(blk) = recvblk.blocking_recv() {
|
||||
digest.update(&blk[..]);
|
||||
}
|
||||
digest.finalize()
|
||||
});
|
||||
Self { sendblk, task }
|
||||
}
|
||||
|
||||
pub async fn update(&self, b: Bytes) {
|
||||
self.sendblk.send(b).await.unwrap();
|
||||
}
|
||||
|
||||
pub async fn finalize(self) -> digest::Output<D> {
|
||||
drop(self.sendblk);
|
||||
self.task.await.unwrap()
|
||||
}
|
||||
}
|
||||
|
||||
impl<D: Digest> Default for AsyncHasher<D> {
|
||||
fn default() -> Self {
|
||||
Self::new()
|
||||
}
|
||||
}
|
|
@ -3,7 +3,6 @@
|
|||
#[macro_use]
|
||||
extern crate tracing;
|
||||
|
||||
pub mod async_hash;
|
||||
pub mod background;
|
||||
pub mod config;
|
||||
pub mod crdt;
|
||||
|
|
Loading…
Add table
Reference in a new issue