Garage v1.0 #683

Merged
lx merged 119 commits from next-0.10 into main 2024-04-10 15:23:13 +00:00
24 changed files with 2090 additions and 271 deletions
Showing only changes of commit 2fd13c7d13 - Show all commits

106
Cargo.lock generated
View file

@ -17,6 +17,41 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "aes-gcm"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"ghash",
"subtle",
]
[[package]]
name = "ahash"
version = "0.8.7"
@ -761,6 +796,16 @@ dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]]
name = "clap"
version = "2.34.0"
@ -929,9 +974,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [
"generic-array",
"rand_core",
"typenum",
]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
]
[[package]]
name = "darling"
version = "0.20.5"
@ -1333,7 +1388,9 @@ dependencies = [
name = "garage_api"
version = "0.10.0"
dependencies = [
"aes-gcm",
"argon2",
"async-compression",
"async-trait",
"base64 0.21.7",
"bytes",
@ -1374,6 +1431,7 @@ dependencies = [
"sha2",
"tokio",
"tokio-stream",
"tokio-util 0.7.10",
"tracing",
"url",
]
@ -1437,6 +1495,7 @@ dependencies = [
"garage_table",
"garage_util",
"hex",
"http 1.0.0",
"opentelemetry",
"rand",
"serde",
@ -1614,6 +1673,16 @@ dependencies = [
"wasi",
]
[[package]]
name = "ghash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1"
dependencies = [
"opaque-debug",
"polyval",
]
[[package]]
name = "gimli"
version = "0.28.1"
@ -2063,6 +2132,15 @@ dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]]
name = "instant"
version = "0.1.12"
@ -2643,6 +2721,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]]
name = "openssl-probe"
version = "0.1.5"
@ -2980,6 +3064,18 @@ dependencies = [
"winapi",
]
[[package]]
name = "polyval"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]]
name = "powerfmt"
version = "0.2.0"
@ -4399,6 +4495,16 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "universal-hash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle",
]
[[package]]
name = "unsafe-libyaml"
version = "0.2.10"

134
Cargo.nix
View file

@ -34,7 +34,7 @@ args@{
ignoreLockHash,
}:
let
nixifiedLockHash = "263873397c8aa960f9ef6a815187218ab9c58b5ab35bbeb9c3dc70d032dcc963";
nixifiedLockHash = "c3296a54f1c6f385e0d4a4a937734f1fe0fee4405b44d7462249d72675f7ac40";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash
@ -88,6 +88,58 @@ in
src = fetchCratesIo { inherit name version; sha256 = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"; };
});
"registry+https://github.com/rust-lang/crates.io-index".aead."0.5.2" = overridableMkRustCrate (profileName: rec {
name = "aead";
version = "0.5.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"; };
features = builtins.concatLists [
[ "alloc" ]
[ "getrandom" ]
[ "rand_core" ]
[ "stream" ]
];
dependencies = {
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".aes."0.8.4" = overridableMkRustCrate (profileName: rec {
name = "aes";
version = "0.8.4";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"; };
dependencies = {
cfg_if = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }).out;
cipher = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" { inherit profileName; }).out;
${ if hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "x86_64" || hostPlatform.parsed.cpu.name == "i686" then "cpufeatures" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cpufeatures."0.2.12" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".aes-gcm."0.10.3" = overridableMkRustCrate (profileName: rec {
name = "aes-gcm";
version = "0.10.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1"; };
features = builtins.concatLists [
[ "aes" ]
[ "alloc" ]
[ "default" ]
[ "getrandom" ]
[ "rand_core" ]
[ "stream" ]
];
dependencies = {
aead = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aead."0.5.2" { inherit profileName; }).out;
aes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aes."0.8.4" { inherit profileName; }).out;
cipher = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" { inherit profileName; }).out;
ctr = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ctr."0.9.2" { inherit profileName; }).out;
ghash = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ghash."0.5.1" { inherit profileName; }).out;
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".ahash."0.8.7" = overridableMkRustCrate (profileName: rec {
name = "ahash";
version = "0.8.7";
@ -1085,6 +1137,17 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" = overridableMkRustCrate (profileName: rec {
name = "cipher";
version = "0.4.4";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"; };
dependencies = {
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
inout = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".inout."0.1.3" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".clap."2.34.0" = overridableMkRustCrate (profileName: rec {
name = "clap";
version = "2.34.0";
@ -1333,14 +1396,27 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"; };
features = builtins.concatLists [
[ "getrandom" ]
[ "rand_core" ]
[ "std" ]
];
dependencies = {
generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out;
rand_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.6.4" { inherit profileName; }).out;
typenum = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".typenum."1.17.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".ctr."0.9.2" = overridableMkRustCrate (profileName: rec {
name = "ctr";
version = "0.9.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"; };
dependencies = {
cipher = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".darling."0.20.5" = overridableMkRustCrate (profileName: rec {
name = "darling";
version = "0.20.5";
@ -1958,7 +2034,9 @@ in
(lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics" || rootFeatures' ? "garage_api/prometheus") "prometheus")
];
dependencies = {
aes_gcm = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aes-gcm."0.10.3" { inherit profileName; }).out;
argon2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".argon2."0.5.3" { inherit profileName; }).out;
async_compression = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-compression."0.4.6" { inherit profileName; }).out;
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.77" { profileName = "__noProfile"; }).out;
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out;
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out;
@ -1999,6 +2077,7 @@ in
sha2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.10.8" { inherit profileName; }).out;
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" { inherit profileName; }).out;
tokio_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.10" { inherit profileName; }).out;
tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out;
url = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".url."2.5.0" { inherit profileName; }).out;
};
@ -2092,6 +2171,7 @@ in
garage_table = (rustPackages."unknown".garage_table."0.10.0" { inherit profileName; }).out;
garage_util = (rustPackages."unknown".garage_util."0.10.0" { inherit profileName; }).out;
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
@ -2321,6 +2401,17 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".ghash."0.5.1" = overridableMkRustCrate (profileName: rec {
name = "ghash";
version = "0.5.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1"; };
dependencies = {
opaque_debug = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opaque-debug."0.3.1" { inherit profileName; }).out;
polyval = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".polyval."0.6.2" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".gimli."0.28.1" = overridableMkRustCrate (profileName: rec {
name = "gimli";
version = "0.28.1";
@ -2928,6 +3019,16 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".inout."0.1.3" = overridableMkRustCrate (profileName: rec {
name = "inout";
version = "0.1.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"; };
dependencies = {
generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".instant."0.1.12" = overridableMkRustCrate (profileName: rec {
name = "instant";
version = "0.1.12";
@ -3777,6 +3878,13 @@ in
];
});
"registry+https://github.com/rust-lang/crates.io-index".opaque-debug."0.3.1" = overridableMkRustCrate (profileName: rec {
name = "opaque-debug";
version = "0.3.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"; };
});
"registry+https://github.com/rust-lang/crates.io-index".openssl-probe."0.1.5" = overridableMkRustCrate (profileName: rec {
name = "openssl-probe";
version = "0.1.5";
@ -4236,6 +4344,19 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".polyval."0.6.2" = overridableMkRustCrate (profileName: rec {
name = "polyval";
version = "0.6.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"; };
dependencies = {
cfg_if = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }).out;
${ if hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "x86_64" || hostPlatform.parsed.cpu.name == "i686" then "cpufeatures" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cpufeatures."0.2.12" { inherit profileName; }).out;
opaque_debug = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opaque-debug."0.3.1" { inherit profileName; }).out;
universal_hash = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".universal-hash."0.5.1" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".powerfmt."0.2.0" = overridableMkRustCrate (profileName: rec {
name = "powerfmt";
version = "0.2.0";
@ -6314,6 +6435,17 @@ in
];
});
"registry+https://github.com/rust-lang/crates.io-index".universal-hash."0.5.1" = overridableMkRustCrate (profileName: rec {
name = "universal-hash";
version = "0.5.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"; };
dependencies = {
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".unsafe-libyaml."0.2.10" = overridableMkRustCrate (profileName: rec {
name = "unsafe-libyaml";
version = "0.2.10";

View file

@ -66,6 +66,7 @@ sha2 = "0.10"
timeago = { version = "0.4", default-features = false }
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
aes-gcm = { version = "0.10", features = ["aes", "stream"] }
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }

View file

@ -80,6 +80,53 @@ To test your new configuration, just reload your Nextcloud webpage and start sen
*External link:* [Nextcloud Documentation > Primary Storage](https://docs.nextcloud.com/server/latest/admin_manual/configuration_files/primary_storage.html)
#### SSE-C encryption (since Garage v1.0)
Since version 1.0, Garage supports server-side encryption with customer keys
(SSE-C). In this mode, Garage is responsible for encrypting and decrypting
objects, but it does not store the encryption key itself. The encryption key
should be provided by Nextcloud upon each request. This mode of operation is
supported by Nextcloud and it has successfully been tested together with
Garage.
To enable SSE-C encryption:
1. Make sure your Garage server is accessible via SSL through a reverse proxy
such as Nginx, and that it is using a valid public certificate (Nextcloud
might be able to connect to an S3 server that is using a self-signed
certificate, but you will lose many hours while trying, so don't).
Configure values for `use_ssl` and `port` accordingly in your `config.php`
file.
2. Generate an encryption key using the following command:
```
openssl rand -base64 32
```
Make sure to keep this key **secret**!
3. Add the encryption key in your `config.php` file as follows:
```php
<?php
$CONFIG = array(
'objectstore' => [
'class' => '\\OC\\Files\\ObjectStore\\S3',
'arguments' => [
...
'sse_c_key' => 'exampleencryptionkeyLbU+5fKYQcVoqnn+RaIOXgo=',
...
],
],
```
Nextcloud will now make Garage encrypt files at rest in the storage bucket.
These files will not be readable by an S3 client that has credentials to the
bucket but doesn't also know the secret encryption key.
### External Storage
**From the GUI.** Activate the "External storage support" app from the "Applications" page (click on your account icon on the top right corner of your screen to display the menu). Go to your parameters page (also located below your account icon). Click on external storage (or the corresponding translation in your language).

View file

@ -33,6 +33,7 @@ Feel free to open a PR to suggest fixes this table. Minio is missing because the
| [URL path-style](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#path-style-access) (eg. `host.tld/bucket/key`) | ✅ Implemented | ✅ | ✅ | ❓| ✅ |
| [URL vhost-style](https://docs.aws.amazon.com/AmazonS3/latest/userguide/VirtualHosting.html#virtual-hosted-style-access) URL (eg. `bucket.host.tld/key`) | ✅ Implemented | ❌| ✅| ✅ | ✅ |
| [Presigned URLs](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ShareObjectPreSignedURL.html) | ✅ Implemented | ❌| ✅ | ✅ | ✅(❓) |
| [SSE-C encryption](https://docs.aws.amazon.com/AmazonS3/latest/userguide/ServerSideEncryptionCustomerKeys.html) | ✅ Implemented | ❓ | ✅ | ❌ | ✅ |
*Note:* OpenIO does not says if it supports presigned URLs. Because it is part
of signature v4 and they claim they support it without additional precisions,

View file

@ -82,6 +82,19 @@ if [ -z "$SKIP_AWS" ]; then
exit 1
fi
aws s3api delete-object --bucket eprouvette --key upload
echo "🛠️ Test SSE-C with awscli (aws s3)"
SSEC_KEY="u8zCfnEyt5Imo/krN+sxA1DQXxLWtPJavU6T6gOVj1Y="
SSEC_KEY_MD5="jMGbs3GyZkYjJUP6q5jA7g=="
echo "$SSEC_KEY" | base64 -d > /tmp/garage.ssec-key
for idx in {1,2}.rnd; do
aws s3 cp --sse-c AES256 --sse-c-key fileb:///tmp/garage.ssec-key \
"/tmp/garage.$idx" "s3://eprouvette/garage.$idx.aws.sse-c"
aws s3 cp --sse-c AES256 --sse-c-key fileb:///tmp/garage.ssec-key \
"s3://eprouvette/garage.$idx.aws.sse-c" "/tmp/garage.$idx.dl.sse-c"
diff "/tmp/garage.$idx" "/tmp/garage.$idx.dl.sse-c"
aws s3api delete-object --bucket eprouvette --key "garage.$idx.aws.sse-c"
done
fi
# S3CMD

View file

@ -21,7 +21,9 @@ garage_net.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
aes-gcm.workspace = true
argon2.workspace = true
async-compression.workspace = true
async-trait.workspace = true
base64.workspace = true
bytes.workspace = true
@ -41,6 +43,7 @@ futures.workspace = true
futures-util.workspace = true
tokio.workspace = true
tokio-stream.workspace = true
tokio-util.workspace = true
form_urlencoded.workspace = true
http.workspace = true

View file

@ -1,7 +1,7 @@
use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt};
use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes;
@ -9,9 +9,11 @@ use hyper::{Request, Response};
use serde::Serialize;
use garage_net::bytes_buf::BytesBuf;
use garage_net::stream::read_stream_to_end;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::*;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*;
use garage_model::s3::block_ref_table::*;
@ -21,11 +23,15 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::get::full_object_byte_stream;
use crate::s3::multipart;
use crate::s3::put::get_headers;
use crate::s3::put::{get_headers, save_stream, SaveStreamResult};
use crate::s3::xml::{self as s3_xml, xmlns_tag};
// -------- CopyObject ---------
pub async fn handle_copy(
ctx: ReqCtx,
req: &Request<ReqBody>,
@ -35,38 +41,114 @@ pub async fn handle_copy(
let source_object = get_copy_source(&ctx, req).await?;
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
..
} = ctx;
let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?;
// Check precondition, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_version, &source_version_meta.etag)?;
// Determine encryption parameters
let (source_encryption, source_object_headers) =
EncryptionParams::check_decrypt_for_copy_source(
&ctx.garage,
req.headers(),
&source_version_meta.encryption,
)?;
let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
// Determine headers of destination object
let dest_object_headers = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
get_headers(req.headers())?
}
_ => source_object_headers.into_owned(),
};
// Do actual object copying
let res = if EncryptionParams::is_same(&source_encryption, &dest_encryption) {
// If source and dest are both unencrypted, or if the encryption keys
// are the same, we can just copy the metadata and link blocks of the
// old object from the new object.
handle_copy_metaonly(
ctx,
dest_key,
dest_object_headers,
dest_encryption,
source_version,
source_version_data,
source_version_meta,
)
.await?
} else {
// If source and dest encryption use different keys,
// we must decrypt content and re-encrypt, so rewrite all data blocks.
handle_copy_reencrypt(
ctx,
dest_key,
dest_object_headers,
dest_encryption,
source_version,
source_version_data,
source_encryption,
)
.await?
};
let last_modified = msec_to_rfc3339(res.version_timestamp);
let result = CopyObjectResult {
last_modified: s3_xml::Value(last_modified),
etag: s3_xml::Value(format!("\"{}\"", res.etag)),
};
let xml = s3_xml::to_xml_with_header(&result)?;
let mut resp = Response::builder()
.header("Content-Type", "application/xml")
.header("x-amz-version-id", hex::encode(res.version_uuid))
.header(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
);
dest_encryption.add_response_headers(&mut resp);
Ok(resp.body(string_body(xml))?)
}
async fn handle_copy_metaonly(
ctx: ReqCtx,
dest_key: &str,
dest_object_headers: ObjectVersionHeaders,
dest_encryption: EncryptionParams,
source_version: &ObjectVersion,
source_version_data: &ObjectVersionData,
source_version_meta: &ObjectVersionMeta,
) -> Result<SaveStreamResult, Error> {
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
..
} = ctx;
// Generate parameters for copied object
let new_uuid = gen_uuid();
let new_timestamp = now_msec();
// Implement x-amz-metadata-directive: REPLACE
let new_meta = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta {
headers: get_headers(req.headers())?,
let new_meta = ObjectVersionMeta {
encryption: dest_encryption.encrypt_headers(dest_object_headers)?,
size: source_version_meta.size,
etag: source_version_meta.etag.clone(),
},
_ => source_version_meta.clone(),
};
let etag = new_meta.etag.to_string();
let res = SaveStreamResult {
version_uuid: new_uuid,
version_timestamp: new_timestamp,
etag: new_meta.etag.clone(),
};
// Save object copy
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
// bytes is either plaintext before&after or encrypted with the
// same keys, so it's ok to just copy it as is
let dest_object_version = ObjectVersion {
uuid: new_uuid,
timestamp: new_timestamp,
@ -97,7 +179,7 @@ pub async fn handle_copy(
uuid: new_uuid,
timestamp: new_timestamp,
state: ObjectVersionState::Uploading {
headers: new_meta.headers.clone(),
encryption: new_meta.encryption.clone(),
multipart: false,
},
};
@ -164,23 +246,42 @@ pub async fn handle_copy(
}
}
let last_modified = msec_to_rfc3339(new_timestamp);
let result = CopyObjectResult {
last_modified: s3_xml::Value(last_modified),
etag: s3_xml::Value(format!("\"{}\"", etag)),
};
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
.header("x-amz-version-id", hex::encode(new_uuid))
.header(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
)
.body(string_body(xml))?)
Ok(res)
}
async fn handle_copy_reencrypt(
ctx: ReqCtx,
dest_key: &str,
dest_object_headers: ObjectVersionHeaders,
dest_encryption: EncryptionParams,
source_version: &ObjectVersion,
source_version_data: &ObjectVersionData,
source_encryption: EncryptionParams,
) -> Result<SaveStreamResult, Error> {
// basically we will read the source data (decrypt if necessary)
// and save that in a new object (encrypt if necessary),
// by combining the code used in getobject and putobject
let source_stream = full_object_byte_stream(
ctx.garage.clone(),
source_version,
source_version_data,
source_encryption,
);
save_stream(
&ctx,
dest_object_headers,
dest_encryption,
source_stream.map_err(|e| Error::from(GarageError::from(e))),
&dest_key.to_string(),
None,
None,
)
.await
}
// -------- UploadPartCopy ---------
pub async fn handle_upload_part_copy(
ctx: ReqCtx,
req: &Request<ReqBody>,
@ -193,7 +294,7 @@ pub async fn handle_upload_part_copy(
let dest_upload_id = multipart::decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string();
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!(
let (source_object, (_, dest_version, mut dest_mpu)) = futures::try_join!(
get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?;
@ -206,6 +307,20 @@ pub async fn handle_upload_part_copy(
// Check precondition on source, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_object_version, &source_version_meta.etag)?;
// Determine encryption parameters
let (source_encryption, _) = EncryptionParams::check_decrypt_for_copy_source(
&garage,
req.headers(),
&source_version_meta.encryption,
)?;
let dest_object_encryption = match dest_version.state {
ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(),
};
let (dest_encryption, _) =
EncryptionParams::check_decrypt(&garage, req.headers(), &dest_object_encryption)?;
let same_encryption = EncryptionParams::is_same(&source_encryption, &dest_encryption);
// Check source range is valid
let source_range = match req.headers().get("x-amz-copy-source-range") {
Some(range) => {
@ -227,9 +342,7 @@ pub async fn handle_upload_part_copy(
};
// Check source version is not inlined
match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, _bytes) => {
if matches!(source_version_data, ObjectVersionData::Inline(_, _)) {
// This is only for small files, we don't bother handling this.
// (in AWS UploadPartCopy works for parts at least 5MB which
// is never the case of an inline object)
@ -237,11 +350,8 @@ pub async fn handle_upload_part_copy(
"Source object is too small (minimum part size is 5Mb)",
));
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
};
// Fetch source versin with its block list,
// and destination version to check part hasn't yet been uploaded
// Fetch source version with its block list
let source_version = garage
.version_table
.get(&source_object_version.uuid, &EmptyKey)
@ -251,7 +361,9 @@ pub async fn handle_upload_part_copy(
// We want to reuse blocks from the source version as much as possible.
// However, we still need to get the data from these blocks
// because we need to know it to calculate the MD5sum of the part
// which is used as its ETag.
// which is used as its ETag. For encrypted sources or destinations,
// we must always read(+decrypt) and then write(+encrypt), so we
// can never reuse data blocks as is.
// First, calculate what blocks we want to keep,
// and the subrange of the block to take, if the bounds of the
@ -313,6 +425,8 @@ pub async fn handle_upload_part_copy(
},
false,
);
// write an empty version now to be the parent of the block_ref entries
garage.version_table.insert(&dest_version).await?;
// Now, actually copy the blocks
let mut md5hasher = Md5::new();
@ -321,24 +435,44 @@ pub async fn handle_upload_part_copy(
// and extract the subrange if necessary.
// The second returned value is an Option<Hash>, that is Some
// if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again).
// in the Garage data store and can be reused as-is instead of having
// to save it again. This excludes encrypted source blocks that we had
// to decrypt.
let garage2 = garage.clone();
let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy)
.enumerate()
.flat_map(|(i, (block_hash, range_to_copy))| {
.map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone();
stream::once(async move {
let data = garage3
.block_manager
.rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
async move {
let stream = source_encryption
.get_block(&garage3, &block_hash, Some(order_stream.order(i as u64)))
.await?;
let data = read_stream_to_end(stream).await?.into_bytes();
// For each item, we return a tuple of:
// 1. the full data block (decrypted)
// 2. an Option<Hash> that indicates the hash of the block in the block store,
// only if it can be re-used as-is in the copied object
match range_to_copy {
Some(r) => Ok((data.slice(r), None)),
None => Ok((data, Some(block_hash))),
Some(r) => {
// If we are taking a subslice of the data, we cannot reuse the block as-is
Ok((data.slice(r), None))
}
None if same_encryption => {
// If the data is unencrypted before & after, or if we are using
// the same encryption key, we can reuse the stored block, no need
// to re-send it to storage nodes.
Ok((data, Some(block_hash)))
}
None => {
// If we are decrypting / (re)encrypting with different keys,
// we cannot reuse the block as-is
Ok((data, None))
}
}
}
})
})
.buffered(2)
.peekable();
// The defragmenter is a custom stream (defined below) that concatenates
@ -346,22 +480,33 @@ pub async fn handle_upload_part_copy(
// It returns a series of (Vec<u8>, Option<Hash>).
// When it is done, it returns an empty vec.
// Same as the previous iterator, the Option is Some(_) if and only if
// it's an existing block of the Garage data store.
// it's an existing block of the Garage data store that can be reused.
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
let mut current_offset = 0;
let mut next_block = defragmenter.next().await?;
// TODO this could be optimized similarly to read_and_put_blocks
// low priority because uploadpartcopy is rarely used
loop {
let (data, existing_block_hash) = next_block;
if data.is_empty() {
break;
}
let data_len = data.len() as u64;
md5hasher.update(&data[..]);
let must_upload = existing_block_hash.is_none();
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..]));
let (final_data, must_upload, final_hash) = match existing_block_hash {
Some(hash) if same_encryption => (data, false, hash),
_ => tokio::task::spawn_blocking(move || {
let data_enc = dest_encryption.encrypt_block(data)?;
let hash = blake2sum(&data_enc);
Ok::<_, Error>((data_enc, true, hash))
})
.await
.unwrap()?,
};
dest_version.blocks.clear();
dest_version.blocks.put(
@ -371,10 +516,10 @@ pub async fn handle_upload_part_copy(
},
VersionBlock {
hash: final_hash,
size: data.len() as u64,
size: data_len,
},
);
current_offset += data.len() as u64;
current_offset += data_len;
let block_ref = BlockRef {
block: final_hash,
@ -382,36 +527,33 @@ pub async fn handle_upload_part_copy(
deleted: false.into(),
};
let garage2 = garage.clone();
let res = futures::try_join!(
let (_, _, _, next) = futures::try_join!(
// Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block.
async move {
async {
if must_upload {
garage2
garage
.block_manager
.rpc_put_block(final_hash, data, None)
.rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
.await
} else {
Ok(())
}
},
async {
// Thing 2: we need to insert the block in the version
garage.version_table.insert(&dest_version).await?;
garage.version_table.insert(&dest_version),
// Thing 3: we need to add a block reference
garage.block_ref_table.insert(&block_ref).await
},
// Thing 4: we need to prefetch the next block
garage.block_ref_table.insert(&block_ref),
// Thing 4: we need to read the next block
defragmenter.next(),
)?;
next_block = res.2;
next_block = next;
}
assert_eq!(current_offset, source_range.length);
let data_md5sum = md5hasher.finalize();
let etag = hex::encode(data_md5sum);
let etag = dest_encryption.etag_from_md5(&data_md5sum);
// Put the part's ETag in the Versiontable
dest_mpu.parts.put(
@ -431,13 +573,14 @@ pub async fn handle_upload_part_copy(
last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)),
})?;
Ok(Response::builder()
let mut resp = Response::builder()
.header("Content-Type", "application/xml")
.header(
"x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid),
)
.body(string_body(resp_xml))?)
);
dest_encryption.add_response_headers(&mut resp);
Ok(resp.body(string_body(resp_xml))?)
}
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {

595
src/api/s3/encryption.rs Normal file
View file

@ -0,0 +1,595 @@
use std::borrow::Cow;
use std::convert::TryInto;
use std::pin::Pin;
use aes_gcm::{
aead::stream::{DecryptorLE31, EncryptorLE31, StreamLE31},
aead::{Aead, AeadCore, KeyInit, OsRng},
aes::cipher::crypto_common::rand_core::RngCore,
aes::cipher::typenum::Unsigned,
Aes256Gcm, Key, Nonce,
};
use base64::prelude::*;
use bytes::Bytes;
use futures::stream::Stream;
use futures::task;
use tokio::io::BufReader;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use garage_net::bytes_buf::BytesBuf;
use garage_net::stream::{stream_asyncread, ByteStream};
use garage_rpc::rpc_helper::OrderTag;
use garage_util::data::Hash;
use garage_util::error::Error as GarageError;
use garage_util::migrate::Migrate;
use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders};
use crate::common_error::*;
use crate::s3::error::Error;
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm");
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-key");
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-key-md5");
const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-algorithm");
const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName =
HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key");
const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-md5");
const CUSTOMER_ALGORITHM_AES256: &[u8] = b"AES256";
type Md5Output = md5::digest::Output<md5::Md5Core>;
type StreamNonceSize = aes_gcm::aead::stream::NonceSize<Aes256Gcm, StreamLE31<Aes256Gcm>>;
// Data blocks are encrypted by smaller chunks of size 4096 bytes,
// so that data can be streamed when reading.
// This size has to be known and has to be constant, or data won't be
// readable anymore. DO NOT CHANGE THIS VALUE.
const STREAM_ENC_PLAIN_CHUNK_SIZE: usize = 0x1000; // 4096 bytes
const STREAM_ENC_CYPER_CHUNK_SIZE: usize = STREAM_ENC_PLAIN_CHUNK_SIZE + 16;
#[derive(Clone, Copy)]
pub enum EncryptionParams {
Plaintext,
SseC {
client_key: Key<Aes256Gcm>,
client_key_md5: Md5Output,
compression_level: Option<i32>,
},
}
impl EncryptionParams {
pub fn is_encrypted(&self) -> bool {
!matches!(self, Self::Plaintext)
}
pub fn is_same(a: &Self, b: &Self) -> bool {
let relevant_info = |x: &Self| match x {
Self::Plaintext => None,
Self::SseC {
client_key,
compression_level,
..
} => Some((*client_key, compression_level.is_some())),
};
relevant_info(a) == relevant_info(b)
}
pub fn new_from_headers(
garage: &Garage,
headers: &HeaderMap,
) -> Result<EncryptionParams, Error> {
let key = parse_request_headers(
headers,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
)?;
match key {
Some((client_key, client_key_md5)) => Ok(EncryptionParams::SseC {
client_key,
client_key_md5,
compression_level: garage.config.compression_level,
}),
None => Ok(EncryptionParams::Plaintext),
}
}
pub fn add_response_headers(&self, resp: &mut http::response::Builder) {
if let Self::SseC { client_key_md5, .. } = self {
let md5 = BASE64_STANDARD.encode(&client_key_md5);
resp.headers_mut().unwrap().insert(
X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
HeaderValue::from_bytes(CUSTOMER_ALGORITHM_AES256).unwrap(),
);
resp.headers_mut().unwrap().insert(
X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
HeaderValue::from_bytes(md5.as_bytes()).unwrap(),
);
}
}
pub fn check_decrypt<'a>(
garage: &Garage,
headers: &HeaderMap,
obj_enc: &'a ObjectVersionEncryption,
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
let key = parse_request_headers(
headers,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
)?;
Self::check_decrypt_common(garage, key, obj_enc)
}
pub fn check_decrypt_for_copy_source<'a>(
garage: &Garage,
headers: &HeaderMap,
obj_enc: &'a ObjectVersionEncryption,
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
let key = parse_request_headers(
headers,
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
)?;
Self::check_decrypt_common(garage, key, obj_enc)
}
fn check_decrypt_common<'a>(
garage: &Garage,
key: Option<(Key<Aes256Gcm>, Md5Output)>,
obj_enc: &'a ObjectVersionEncryption,
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
match (key, &obj_enc) {
(
Some((client_key, client_key_md5)),
ObjectVersionEncryption::SseC {
headers,
compressed,
},
) => {
let enc = Self::SseC {
client_key,
client_key_md5,
compression_level: if *compressed {
Some(garage.config.compression_level.unwrap_or(1))
} else {
None
},
};
let plaintext = enc.decrypt_blob(&headers)?;
let headers = ObjectVersionHeaders::decode(&plaintext)
.ok_or_internal_error("Could not decode encrypted headers")?;
Ok((enc, Cow::Owned(headers)))
}
(None, ObjectVersionEncryption::Plaintext { headers }) => {
Ok((Self::Plaintext, Cow::Borrowed(headers)))
}
(_, ObjectVersionEncryption::SseC { .. }) => {
Err(Error::bad_request("Object is encrypted"))
}
(Some(_), _) => {
// TODO: should this be an OK scenario?
Err(Error::bad_request("Trying to decrypt a plaintext object"))
}
}
}
pub fn encrypt_headers(
&self,
h: ObjectVersionHeaders,
) -> Result<ObjectVersionEncryption, Error> {
match self {
Self::SseC {
compression_level, ..
} => {
let plaintext = h.encode().map_err(GarageError::from)?;
let ciphertext = self.encrypt_blob(&plaintext)?;
Ok(ObjectVersionEncryption::SseC {
headers: ciphertext.into_owned(),
compressed: compression_level.is_some(),
})
}
Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }),
}
}
// ---- generating object Etag values ----
pub fn etag_from_md5(&self, md5sum: &[u8]) -> String {
match self {
Self::Plaintext => hex::encode(md5sum),
Self::SseC { .. } => {
// AWS specifies that for encrypted objects, the Etag is not
// the md5sum of the data, but doesn't say what it is.
// So we just put some random bytes.
let mut random = [0u8; 16];
OsRng.fill_bytes(&mut random);
hex::encode(&random)
}
}
}
// ---- generic function for encrypting / decrypting blobs ----
// Prepends a randomly-generated nonce to the encrypted value.
// This is used for encrypting object headers and inlined data for small objects.
// This does not compress anything.
pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
match self {
Self::SseC { client_key, .. } => {
let cipher = Aes256Gcm::new(&client_key);
let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
let ciphertext = cipher
.encrypt(&nonce, blob)
.ok_or_internal_error("Encryption failed")?;
Ok(Cow::Owned([nonce.to_vec(), ciphertext].concat()))
}
Self::Plaintext => Ok(Cow::Borrowed(blob)),
}
}
pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
match self {
Self::SseC { client_key, .. } => {
let cipher = Aes256Gcm::new(&client_key);
let nonce_size = <Aes256Gcm as AeadCore>::NonceSize::to_usize();
let nonce = Nonce::from_slice(
blob.get(..nonce_size)
.ok_or_internal_error("invalid encrypted data")?,
);
let plaintext = cipher
.decrypt(nonce, &blob[nonce_size..])
.ok_or_bad_request(
"Invalid encryption key, could not decrypt object metadata.",
)?;
Ok(Cow::Owned(plaintext))
}
Self::Plaintext => Ok(Cow::Borrowed(blob)),
}
}
// ---- function for encrypting / decrypting byte streams ----
/// Get a data block from the storage node, and decrypt+decompress it
/// if necessary. If object is plaintext, just get it without any processing.
pub async fn get_block(
&self,
garage: &Garage,
hash: &Hash,
order: Option<OrderTag>,
) -> Result<ByteStream, GarageError> {
let raw_block = garage
.block_manager
.rpc_get_block_streaming(hash, order)
.await?;
Ok(self.decrypt_block_stream(raw_block))
}
pub fn decrypt_block_stream(&self, stream: ByteStream) -> ByteStream {
match self {
Self::Plaintext => stream,
Self::SseC {
client_key,
compression_level,
..
} => {
let plaintext = DecryptStream::new(stream, *client_key);
if compression_level.is_some() {
let reader = stream_asyncread(Box::pin(plaintext));
let reader = BufReader::new(reader);
let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
Box::pin(tokio_util::io::ReaderStream::new(reader))
} else {
Box::pin(plaintext)
}
}
}
}
/// Encrypt a data block if encryption is set, for use before
/// putting the data blocks into storage
pub fn encrypt_block(&self, block: Bytes) -> Result<Bytes, Error> {
match self {
Self::Plaintext => Ok(block),
Self::SseC {
client_key,
compression_level,
..
} => {
let block = if let Some(level) = compression_level {
Cow::Owned(
garage_block::zstd_encode(block.as_ref(), *level)
.ok_or_internal_error("failed to compress data block")?,
)
} else {
Cow::Borrowed(block.as_ref())
};
let mut ret = Vec::with_capacity(block.len() + 32 + block.len() / 64);
let mut nonce: Nonce<StreamNonceSize> = Default::default();
OsRng.fill_bytes(&mut nonce);
ret.extend_from_slice(nonce.as_slice());
let mut cipher = EncryptorLE31::<Aes256Gcm>::new(&client_key, &nonce);
let mut iter = block.chunks(STREAM_ENC_PLAIN_CHUNK_SIZE).peekable();
if iter.peek().is_none() {
// Empty stream: we encrypt an empty last chunk
let chunk_enc = cipher
.encrypt_last(&[][..])
.ok_or_internal_error("failed to encrypt chunk")?;
ret.extend_from_slice(&chunk_enc);
} else {
loop {
let chunk = iter.next().unwrap();
if iter.peek().is_some() {
let chunk_enc = cipher
.encrypt_next(chunk)
.ok_or_internal_error("failed to encrypt chunk")?;
assert_eq!(chunk.len(), STREAM_ENC_PLAIN_CHUNK_SIZE);
assert_eq!(chunk_enc.len(), STREAM_ENC_CYPER_CHUNK_SIZE);
ret.extend_from_slice(&chunk_enc);
} else {
// use encrypt_last for the last chunk
let chunk_enc = cipher
.encrypt_last(chunk)
.ok_or_internal_error("failed to encrypt chunk")?;
ret.extend_from_slice(&chunk_enc);
break;
}
}
}
Ok(ret.into())
}
}
}
}
fn parse_request_headers(
headers: &HeaderMap,
alg_header: &HeaderName,
key_header: &HeaderName,
md5_header: &HeaderName,
) -> Result<Option<(Key<Aes256Gcm>, Md5Output)>, Error> {
let alg = headers.get(alg_header).map(HeaderValue::as_bytes);
let key = headers.get(key_header).map(HeaderValue::as_bytes);
let md5 = headers.get(md5_header).map(HeaderValue::as_bytes);
match alg {
Some(CUSTOMER_ALGORITHM_AES256) => {
use md5::{Digest, Md5};
let key_b64 =
key.ok_or_bad_request("Missing server-side-encryption-customer-key header")?;
let key_bytes: [u8; 32] = BASE64_STANDARD
.decode(&key_b64)
.ok_or_bad_request(
"Invalid server-side-encryption-customer-key header: invalid base64",
)?
.try_into()
.ok()
.ok_or_bad_request(
"Invalid server-side-encryption-customer-key header: invalid length",
)?;
let md5_b64 =
md5.ok_or_bad_request("Missing server-side-encryption-customer-key-md5 header")?;
let md5_bytes = BASE64_STANDARD.decode(&md5_b64).ok_or_bad_request(
"Invalid server-side-encryption-customer-key-md5 header: invalid bass64",
)?;
let mut hasher = Md5::new();
hasher.update(&key_bytes[..]);
let our_md5 = hasher.finalize();
if our_md5.as_slice() != md5_bytes.as_slice() {
return Err(Error::bad_request(
"Server-side encryption client key MD5 checksum does not match",
));
}
Ok(Some((key_bytes.into(), our_md5)))
}
Some(alg) => Err(Error::InvalidEncryptionAlgorithm(
String::from_utf8_lossy(alg).into_owned(),
)),
None => {
if key.is_some() || md5.is_some() {
Err(Error::bad_request(
"Unexpected server-side-encryption-customer-key{,-md5} header(s)",
))
} else {
Ok(None)
}
}
}
}
// ---- encrypt & decrypt streams ----
#[pin_project::pin_project]
struct DecryptStream {
#[pin]
stream: ByteStream,
done_reading: bool,
buf: BytesBuf,
key: Key<Aes256Gcm>,
state: DecryptStreamState,
}
enum DecryptStreamState {
Starting,
Running(DecryptorLE31<Aes256Gcm>),
Done,
}
impl DecryptStream {
fn new(stream: ByteStream, key: Key<Aes256Gcm>) -> Self {
Self {
stream,
done_reading: false,
buf: BytesBuf::new(),
key,
state: DecryptStreamState::Starting,
}
}
}
impl Stream for DecryptStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use std::task::Poll;
let mut this = self.project();
// The first bytes of the stream should contain the starting nonce.
// If we don't have a Running state, it means that we haven't
// yet read the nonce.
while matches!(this.state, DecryptStreamState::Starting) {
let nonce_size = StreamNonceSize::to_usize();
if let Some(nonce) = this.buf.take_exact(nonce_size) {
let nonce = Nonce::from_slice(nonce.as_ref());
*this.state = DecryptStreamState::Running(DecryptorLE31::new(&this.key, nonce));
break;
}
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
None => {
return Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Decrypt: unexpected EOF, could not read nonce",
))));
}
}
}
// Read at least one byte more than the encrypted chunk size
// (if possible), so that we know if we are decrypting the
// last chunk or not.
while !*this.done_reading && this.buf.len() <= STREAM_ENC_CYPER_CHUNK_SIZE {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
None => {
*this.done_reading = true;
break;
}
}
}
if matches!(this.state, DecryptStreamState::Done) {
if !this.buf.is_empty() {
return Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Decrypt: unexpected bytes after last encrypted chunk",
))));
}
return Poll::Ready(None);
}
let res = if this.buf.len() > STREAM_ENC_CYPER_CHUNK_SIZE {
// we have strictly more bytes than the encrypted chunk size,
// so we know this is not the last
let DecryptStreamState::Running(ref mut cipher) = this.state else {
unreachable!()
};
let chunk = this.buf.take_exact(STREAM_ENC_CYPER_CHUNK_SIZE).unwrap();
let chunk_dec = cipher.decrypt_next(chunk.as_ref());
if let Ok(c) = &chunk_dec {
assert_eq!(c.len(), STREAM_ENC_PLAIN_CHUNK_SIZE);
}
chunk_dec
} else {
// We have one encrypted chunk size or less, even though we tried
// to read more, so this is the last chunk. Decrypt using the
// appropriate decrypt_last() function that then destroys the cipher.
let state = std::mem::replace(this.state, DecryptStreamState::Done);
let DecryptStreamState::Running(cipher) = state else {
unreachable!()
};
let chunk = this.buf.take_all();
cipher.decrypt_last(chunk.as_ref())
};
match res {
Ok(bytes) if bytes.is_empty() => Poll::Ready(None),
Ok(bytes) => Poll::Ready(Some(Ok(bytes.into()))),
Err(_) => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Decryption failed",
)))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::StreamExt;
use garage_net::stream::read_stream_to_end;
fn stream() -> ByteStream {
Box::pin(
futures::stream::iter(16usize..1024)
.map(|i| Ok(Bytes::from(vec![(i % 256) as u8; (i * 37) % 1024]))),
)
}
async fn test_block_enc(compression_level: Option<i32>) {
let enc = EncryptionParams::SseC {
client_key: Aes256Gcm::generate_key(&mut OsRng),
client_key_md5: Default::default(), // not needed
compression_level,
};
let block_plain = read_stream_to_end(stream()).await.unwrap().into_bytes();
let block_enc = enc.encrypt_block(block_plain.clone()).unwrap();
let block_dec =
enc.decrypt_block_stream(Box::pin(futures::stream::once(async { Ok(block_enc) })));
let block_dec = read_stream_to_end(block_dec).await.unwrap().into_bytes();
assert_eq!(block_plain, block_dec);
assert!(block_dec.len() > 128000);
}
#[tokio::test]
async fn test_encrypt_block() {
test_block_enc(None).await
}
#[tokio::test]
async fn test_encrypt_block_compressed() {
test_block_enc(Some(1)).await
}
}

View file

@ -65,6 +65,10 @@ pub enum Error {
#[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
/// The client sent a range header with invalid value
#[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)]
InvalidEncryptionAlgorithm(String),
/// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String),
@ -126,6 +130,7 @@ impl Error {
Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange",
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
Error::InvalidEncryptionAlgorithm(_) => "InvalidEncryptionAlgorithmError",
}
}
}
@ -143,6 +148,7 @@ impl ApiError for Error {
| Error::InvalidPart
| Error::InvalidPartOrder
| Error::EntityTooSmall
| Error::InvalidEncryptionAlgorithm(_)
| Error::InvalidXml(_)
| Error::InvalidUtf8Str(_)
| Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,

View file

@ -1,10 +1,12 @@
//! Function related to GET and HEAD requests
use std::collections::BTreeMap;
use std::convert::TryInto;
use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH};
use bytes::Bytes;
use futures::future;
use futures::stream::{self, StreamExt};
use futures::stream::{self, Stream, StreamExt};
use http::header::{
ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
@ -25,6 +27,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@ -42,6 +45,8 @@ pub struct GetObjectOverrides {
fn object_headers(
version: &ObjectVersion,
version_meta: &ObjectVersionMeta,
headers: &ObjectVersionHeaders,
encryption: EncryptionParams,
) -> http::response::Builder {
debug!("Version meta: {:?}", version_meta);
@ -49,7 +54,6 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder()
.header(CONTENT_TYPE, version_meta.headers.content_type.to_string())
.header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string());
@ -57,9 +61,26 @@ fn object_headers(
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
}
for (k, v) in version_meta.headers.other.iter() {
resp = resp.header(k, v.to_string());
// When metadata is retrieved through the REST API, Amazon S3 combines headers that
// have the same name (ignoring case) into a comma-delimited list.
// See: https://docs.aws.amazon.com/AmazonS3/latest/userguide/UsingMetadata.html
let mut headers_by_name = BTreeMap::new();
for (name, value) in headers.0.iter() {
match headers_by_name.get_mut(name) {
None => {
headers_by_name.insert(name, vec![value.as_str()]);
}
Some(headers) => {
headers.push(value.as_str());
}
}
}
for (name, values) in headers_by_name {
resp = resp.header(name, values.join(","));
}
encryption.add_response_headers(&mut resp);
resp
}
@ -175,21 +196,27 @@ pub async fn handle_head_without_ctx(
return Ok(cached);
}
let (encryption, headers) =
EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?;
if let Some(pn) = part_number {
match version_data {
ObjectVersionData::Inline(_, bytes) => {
ObjectVersionData::Inline(_, _) => {
if pn != 1 {
return Err(Error::InvalidPart);
}
Ok(object_headers(object_version, version_meta)
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
let bytes_len = version_meta.size;
Ok(
object_headers(object_version, version_meta, &headers, encryption)
.header(CONTENT_LENGTH, format!("{}", bytes_len))
.header(
CONTENT_RANGE,
format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()),
format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
.body(empty_body())?)
.body(empty_body())?,
)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@ -201,7 +228,8 @@ pub async fn handle_head_without_ctx(
let (part_offset, part_end) =
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
Ok(object_headers(object_version, version_meta)
Ok(
object_headers(object_version, version_meta, &headers, encryption)
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
.header(
CONTENT_RANGE,
@ -214,15 +242,18 @@ pub async fn handle_head_without_ctx(
)
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
.status(StatusCode::PARTIAL_CONTENT)
.body(empty_body())?)
.body(empty_body())?,
)
}
_ => unreachable!(),
}
} else {
Ok(object_headers(object_version, version_meta)
Ok(
object_headers(object_version, version_meta, &headers, encryption)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK)
.body(empty_body())?)
.body(empty_body())?,
)
}
}
@ -273,23 +304,41 @@ pub async fn handle_get_without_ctx(
return Ok(cached);
}
let (enc, headers) =
EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?;
match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header",
)),
(Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await,
(Some(pn), None) => {
handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await
}
(None, Some(range)) => {
handle_get_range(
garage,
last_v,
last_v_data,
last_v_meta,
enc,
&headers,
range.start,
range.start + range.length,
)
.await
}
(None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await,
(None, None) => {
handle_get_full(
garage,
last_v,
last_v_data,
last_v_meta,
enc,
&headers,
overrides,
)
.await
}
}
}
@ -298,17 +347,36 @@ async fn handle_get_full(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
encryption: EncryptionParams,
headers: &ObjectVersionHeaders,
overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> {
let mut resp_builder = object_headers(version, version_meta)
let mut resp_builder = object_headers(version, version_meta, &headers, encryption)
.header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK);
getobject_override_headers(overrides, &mut resp_builder)?;
let stream = full_object_byte_stream(garage, version, version_data, encryption);
Ok(resp_builder.body(response_body_from_stream(stream))?)
}
pub fn full_object_byte_stream(
garage: Arc<Garage>,
version: &ObjectVersion,
version_data: &ObjectVersionData,
encryption: EncryptionParams,
) -> ByteStream {
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => {
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
let bytes = bytes.to_vec();
Box::pin(futures::stream::once(async move {
encryption
.decrypt_blob(&bytes)
.map(|x| Bytes::from(x.to_vec()))
.map_err(std_error_from_read_error)
}))
}
ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel::<ByteStream>(2);
@ -324,19 +392,18 @@ async fn handle_get_full(
garage2.version_table.get(&version_uuid, &EmptyKey).await
});
let stream_block_0 = garage
.block_manager
.rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
let stream_block_0 = encryption
.get_block(&garage, &first_block_hash, Some(order_stream.order(0)))
.await?;
tx.send(stream_block_0)
.await
.ok_or_message("channel closed")?;
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
let stream_block_i = garage
.block_manager
.rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
let stream_block_i = encryption
.get_block(&garage, &vb.hash, Some(order_stream.order(i as u64)))
.await?;
tx.send(stream_block_i)
.await
@ -354,8 +421,7 @@ async fn handle_get_full(
}
});
let body = response_body_from_block_stream(rx);
Ok(resp_builder.body(body)?)
Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).flatten())
}
}
}
@ -365,13 +431,15 @@ async fn handle_get_range(
version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
encryption: EncryptionParams,
headers: &ObjectVersionHeaders,
begin: u64,
end: u64,
) -> Result<Response<ResBody>, Error> {
// Here we do not use getobject_override_headers because we don't
// want to add any overridden headers (those should not be added
// when returning PARTIAL_CONTENT)
let resp_builder = object_headers(version, version_meta)
let resp_builder = object_headers(version, version_meta, headers, encryption)
.header(CONTENT_LENGTH, format!("{}", end - begin))
.header(
CONTENT_RANGE,
@ -382,6 +450,7 @@ async fn handle_get_range(
match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => {
let bytes = encryption.decrypt_blob(&bytes)?;
if end as usize <= bytes.len() {
let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?)
@ -398,7 +467,8 @@ async fn handle_get_range(
.await?
.ok_or(Error::NoSuchKey)?;
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
let body =
body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?)
}
}
@ -409,17 +479,21 @@ async fn handle_get_part(
object_version: &ObjectVersion,
version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta,
encryption: EncryptionParams,
headers: &ObjectVersionHeaders,
part_number: u64,
) -> Result<Response<ResBody>, Error> {
// Same as for get_range, no getobject_override_headers
let resp_builder =
object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT);
let resp_builder = object_headers(object_version, version_meta, headers, encryption)
.status(StatusCode::PARTIAL_CONTENT);
match version_data {
ObjectVersionData::Inline(_, bytes) => {
if part_number != 1 {
return Err(Error::InvalidPart);
}
let bytes = encryption.decrypt_blob(&bytes)?;
assert_eq!(bytes.len() as u64, version_meta.size);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header(
@ -427,7 +501,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
)
.header(X_AMZ_MP_PARTS_COUNT, "1")
.body(bytes_body(bytes.to_vec().into()))?)
.body(bytes_body(bytes.into_owned().into()))?)
}
ObjectVersionData::FirstBlock(_, _) => {
let version = garage
@ -439,7 +513,8 @@ async fn handle_get_part(
let (begin, end) =
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end);
let body =
body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin))
@ -494,6 +569,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
fn body_from_blocks_range(
garage: Arc<Garage>,
encryption: EncryptionParams,
all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64,
end: u64,
@ -523,12 +599,11 @@ fn body_from_blocks_range(
tokio::spawn(async move {
match async {
let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() {
let block_stream = garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
.await?
let block_stream = encryption
.get_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
.await?;
let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
@ -588,9 +663,15 @@ fn body_from_blocks_range(
}
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
.flatten()
.map(|x| {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
response_body_from_stream(body_stream)
}
fn response_body_from_stream<S>(stream: S) -> ResBody
where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static,
{
let body_stream = stream.map(|x| {
x.map(hyper::body::Frame::data)
.map_err(|e| Error::from(garage_util::error::Error::from(e)))
});
@ -598,9 +679,14 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
}
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream {
let err = std::io::Error::new(
std::io::ErrorKind::Other,
format!("Error while getting object data: {}", e),
);
Box::pin(stream::once(future::ready(Err(err))))
Box::pin(stream::once(future::ready(Err(std_error_from_read_error(
e,
)))))
}
fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Error while reading object data: {}", e),
)
}

View file

@ -944,9 +944,8 @@ mod tests {
timestamp: TS,
state: ObjectVersionState::Uploading {
multipart: true,
headers: ObjectVersionHeaders {
content_type: "text/plain".to_string(),
other: BTreeMap::<String, String>::new(),
encryption: ObjectVersionEncryption::Plaintext {
headers: ObjectVersionHeaders(vec![]),
},
},
}

View file

@ -13,5 +13,6 @@ mod post_object;
mod put;
mod website;
mod encryption;
mod router;
pub mod xml;

View file

@ -16,6 +16,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::put::*;
use crate::s3::xml as s3_xml;
@ -41,13 +42,17 @@ pub async fn handle_create_multipart_upload(
let headers = get_headers(req.headers())?;
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
let object_encryption = encryption.encrypt_headers(headers)?;
// Create object in object table
let object_version = ObjectVersion {
uuid: upload_id,
timestamp,
state: ObjectVersionState::Uploading {
multipart: true,
headers,
encryption: object_encryption,
},
};
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
@ -68,7 +73,9 @@ pub async fn handle_create_multipart_upload(
};
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::new(string_body(xml)))
let mut resp = Response::builder();
encryption.add_response_headers(&mut resp);
Ok(resp.body(string_body(xml))?)
}
pub async fn handle_put_part(
@ -91,12 +98,21 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string();
let stream = body_stream(req.into_body());
let (req_head, req_body) = req.into_parts();
let stream = body_stream(req_body);
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) =
let ((_, object_version, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
// Check encryption params
let object_encryption = match object_version.state {
ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(),
};
let (encryption, _) =
EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
// Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?;
@ -136,24 +152,32 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?;
// Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) =
read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?;
let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks(
&ctx,
&version,
encryption,
part_number,
first_block,
&mut chunker,
)
.await?;
// Verify that checksums map
ensure_checksum_matches(
data_md5sum.as_slice(),
&data_md5sum,
data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
// Store part etag in version
let data_md5sum_hex = hex::encode(data_md5sum);
let etag = encryption.etag_from_md5(&data_md5sum);
mpu.parts.put(
mpu_part_key,
MpuPart {
version: version_uuid,
etag: Some(data_md5sum_hex.clone()),
etag: Some(etag.clone()),
size: Some(total_size),
},
);
@ -163,11 +187,9 @@ pub async fn handle_put_part(
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
let response = Response::builder()
.header("ETag", format!("\"{}\"", data_md5sum_hex))
.body(empty_body())
.unwrap();
Ok(response)
let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
encryption.add_response_headers(&mut resp);
Ok(resp.body(empty_body())?)
}
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
@ -241,8 +263,8 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::bad_request("No data was uploaded"));
}
let headers = match object_version.state {
ObjectVersionState::Uploading { headers, .. } => headers,
let object_encryption = match object_version.state {
ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(),
};
@ -344,7 +366,7 @@ pub async fn handle_complete_multipart_upload(
// Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
encryption: object_encryption,
size: total_size,
etag: etag.clone(),
},

View file

@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use crate::helpers::*;
use crate::s3::api_server::ResBody;
use crate::s3::cors::*;
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml;
@ -48,13 +49,17 @@ pub async fn handle_post_object(
let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new();
let field = loop {
let file_field = loop {
let field = if let Some(field) = multipart.next_field().await? {
field
} else {
return Err(Error::bad_request("Request did not contain a file"));
};
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) {
let name: HeaderName = if let Some(Ok(name)) = field
.name()
.map(str::to_ascii_lowercase)
.map(TryInto::try_into)
{
name
} else {
continue;
@ -93,10 +98,14 @@ pub async fn handle_post_object(
.ok_or_bad_request("No policy was provided")?
.to_str()?;
let authorization = Authorization::parse_form(&params)?;
let content_md5 = params
.get("content-md5")
.map(HeaderValue::to_str)
.transpose()?;
let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS.
if let Some(filename) = field.file_name() {
if let Some(filename) = file_field.file_name() {
key.replace("${filename}", filename)
} else {
key.to_owned()
@ -143,9 +152,8 @@ pub async fn handle_post_object(
let mut conditions = decoded_policy.into_conditions()?;
for (param_key, value) in params.iter() {
let mut param_key = param_key.to_string();
param_key.make_ascii_lowercase();
match param_key.as_str() {
let param_key = param_key.as_str();
match param_key {
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| {
@ -190,7 +198,7 @@ pub async fn handle_post_object(
// how aws seems to behave.
continue;
}
let conds = conditions.params.remove(&param_key).ok_or_else(|| {
let conds = conditions.params.remove(param_key).ok_or_else(|| {
Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?;
for cond in conds {
@ -218,8 +226,9 @@ pub async fn handle_post_object(
let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into));
let encryption = EncryptionParams::new_from_headers(&garage, &params)?;
let stream = file_field.map(|r| r.map_err(Into::into));
let ctx = ReqCtx {
garage,
bucket_id,
@ -228,17 +237,18 @@ pub async fn handle_post_object(
api_key,
};
let (_, md5) = save_stream(
let res = save_stream(
&ctx,
headers,
encryption,
StreamLimiter::new(stream, conditions.content_length),
&key,
None,
content_md5.map(str::to_string),
None,
)
.await?;
let etag = format!("\"{}\"", md5);
let etag = format!("\"{}\"", res.etag);
let mut resp = if let Some(mut target) = params
.get("success_action_redirect")
@ -252,11 +262,12 @@ pub async fn handle_post_object(
.append_pair("key", &key)
.append_pair("etag", &etag);
let target = target.to_string();
Response::builder()
let mut resp = Response::builder()
.status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone())
.header(header::ETAG, etag)
.body(string_body(target))?
.header(header::ETAG, etag);
encryption.add_response_headers(&mut resp);
resp.body(string_body(target))?
} else {
let path = head
.uri
@ -283,9 +294,10 @@ pub async fn handle_post_object(
.get("success_action_status")
.and_then(|h| h.to_str().ok())
.unwrap_or("204");
let builder = Response::builder()
let mut builder = Response::builder()
.header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone());
encryption.add_response_headers(&mut builder);
match action {
"200" => builder.status(StatusCode::OK).body(empty_body())?,
"201" => {

View file

@ -1,4 +1,4 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use std::sync::Arc;
use base64::prelude::*;
@ -36,10 +36,18 @@ use garage_model::s3::version_table::*;
use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
pub struct SaveStreamResult {
pub version_uuid: Uuid,
pub version_timestamp: u64,
/// Etag WITHOUT THE QUOTES (just the hex value)
pub etag: String,
}
pub async fn handle_put(
ctx: ReqCtx,
req: Request<ReqBody>,
@ -50,6 +58,9 @@ pub async fn handle_put(
let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers);
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
let content_md5 = match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()),
None => None,
@ -57,19 +68,33 @@ pub async fn handle_put(
let stream = body_stream(req.into_body());
save_stream(&ctx, headers, stream, key, content_md5, content_sha256)
.await
.map(|(uuid, md5)| put_response(uuid, md5))
let res = save_stream(
&ctx,
headers,
encryption,
stream,
key,
content_md5,
content_sha256,
)
.await?;
let mut resp = Response::builder()
.header("x-amz-version-id", hex::encode(res.version_uuid))
.header("ETag", format!("\"{}\"", res.etag));
encryption.add_response_headers(&mut resp);
Ok(resp.body(empty_body())?)
}
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ctx: &ReqCtx,
headers: ObjectVersionHeaders,
encryption: EncryptionParams,
body: S,
key: &String,
content_md5: Option<String>,
content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> {
) -> Result<SaveStreamResult, Error> {
let ReqCtx {
garage, bucket_id, ..
} = ctx;
@ -82,6 +107,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let first_block = first_block_opt.unwrap_or_default();
let object_encryption = encryption.encrypt_headers(headers)?;
// Generate identity of new version
let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref());
@ -92,37 +119,43 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let mut md5sum = Md5::new();
md5sum.update(&first_block[..]);
let data_md5sum = md5sum.finalize();
let data_md5sum_hex = hex::encode(data_md5sum);
let data_sha256sum = sha256sum(&first_block[..]);
let size = first_block.len() as u64;
ensure_checksum_matches(
data_md5sum.as_slice(),
&data_md5sum,
data_sha256sum,
content_md5.as_deref(),
content_sha256,
)?;
let size = first_block.len() as u64;
check_quotas(ctx, size, existing_object.as_ref()).await?;
let etag = encryption.etag_from_md5(&data_md5sum);
let inline_data = encryption.encrypt_blob(&first_block)?.to_vec();
let object_version = ObjectVersion {
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta {
headers,
encryption: object_encryption,
size,
etag: data_md5sum_hex.clone(),
etag: etag.clone(),
},
first_block.to_vec(),
inline_data,
)),
};
let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex));
return Ok(SaveStreamResult {
version_uuid,
version_timestamp,
etag,
});
}
// The following consists in many steps that can each fail.
@ -142,7 +175,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
uuid: version_uuid,
timestamp: version_timestamp,
state: ObjectVersionState::Uploading {
headers: headers.clone(),
encryption: object_encryption.clone(),
multipart: false,
},
};
@ -165,10 +198,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?;
read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?;
ensure_checksum_matches(
data_md5sum.as_slice(),
&data_md5sum,
data_sha256sum,
content_md5.as_deref(),
content_sha256,
@ -177,12 +210,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
check_quotas(ctx, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum);
let etag = encryption.etag_from_md5(&data_md5sum);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta {
headers,
encryption: object_encryption,
size: total_size,
etag: md5sum_hex.clone(),
etag: etag.clone(),
},
first_block_hash,
));
@ -193,7 +227,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// We won't have to clean up on drop.
interrupted_cleanup.cancel();
Ok((version_uuid, md5sum_hex))
Ok(SaveStreamResult {
version_uuid,
version_timestamp,
etag,
})
}
/// Validate MD5 sum against content-md5 header
@ -290,6 +328,7 @@ pub(crate) async fn check_quotas(
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ctx: &ReqCtx,
version: &Version,
encryption: EncryptionParams,
part_number: u64,
first_block: Bytes,
chunker: &mut StreamChunker<S>,
@ -349,12 +388,31 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
))
};
let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1);
let hash_blocks = async {
let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1);
let encrypt_hash_blocks = async {
let mut first_block_hash = None;
while let Some(next) = block_rx2.recv().await {
match next {
Ok(block) => {
let unencrypted_len = block.len() as u64;
let block = if encryption.is_encrypted() {
let res =
tokio::task::spawn_blocking(move || encryption.encrypt_block(block))
.with_context(Context::current_with_span(
tracer.start("Encrypt block"),
))
.await
.unwrap();
match res {
Ok(b) => b,
Err(e) => {
block_tx3.send(Err(e)).await?;
break;
}
}
} else {
block
};
let hash = async_blake2sum(block.clone())
.with_context(Context::current_with_span(
tracer.start("Hash block (blake2)"),
@ -363,7 +421,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
if first_block_hash.is_none() {
first_block_hash = Some(hash);
}
block_tx3.send(Ok((block, hash))).await?;
block_tx3.send(Ok((block, unencrypted_len, hash))).await?;
}
Err(e) => {
block_tx3.send(Err(e)).await?;
@ -398,7 +456,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
block_rx3.recv().await
}
};
let (block, hash) = tokio::select! {
let (block, unencrypted_len, hash) = tokio::select! {
result = write_futs_next => {
result?;
continue;
@ -410,17 +468,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
// For next block to be written: count its size and spawn future to write it
let offset = written_bytes;
written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta(
ctx,
version,
part_number,
offset,
written_bytes,
hash,
block,
unencrypted_len,
encryption.is_encrypted(),
order_stream.order(written_bytes),
));
written_bytes += unencrypted_len;
}
while let Some(res) = write_futs.next().await {
res?;
@ -429,7 +488,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
};
let (_, stream_hash_result, block_hash_result, final_result) =
futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks);
futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks);
let total_size = final_result?;
// unwrap here is ok, because if hasher failed, it is because something failed
@ -449,6 +508,8 @@ async fn put_block_and_meta(
offset: u64,
hash: Hash,
block: Bytes,
size: u64,
is_encrypted: bool,
order_tag: OrderTag,
) -> Result<(), GarageError> {
let ReqCtx { garage, .. } = ctx;
@ -459,10 +520,7 @@ async fn put_block_and_meta(
part_number,
offset,
},
VersionBlock {
hash,
size: block.len() as u64,
},
VersionBlock { hash, size },
);
let block_ref = BlockRef {
@ -474,7 +532,7 @@ async fn put_block_and_meta(
futures::try_join!(
garage
.block_manager
.rpc_put_block(hash, block, Some(order_tag)),
.rpc_put_block(hash, block, is_encrypted, Some(order_tag)),
garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref),
)?;
@ -517,14 +575,6 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
}
}
pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
.header("ETag", format!("\"{}\"", md5sum_hex))
.body(empty_body())
.unwrap()
}
struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner {
garage: Arc<Garage>,
@ -559,57 +609,35 @@ impl Drop for InterruptedCleanup {
// ============ helpers ============
pub(crate) fn get_mime_type(headers: &HeaderMap<HeaderValue>) -> Result<String, Error> {
Ok(headers
.get(hyper::header::CONTENT_TYPE)
.map(|x| x.to_str())
.unwrap_or(Ok("blob"))?
.to_string())
}
pub(crate) fn get_headers(headers: &HeaderMap<HeaderValue>) -> Result<ObjectVersionHeaders, Error> {
let content_type = get_mime_type(headers)?;
let mut other = BTreeMap::new();
let mut ret = Vec::new();
// Preserve standard headers
let standard_header = vec![
hyper::header::CONTENT_TYPE,
hyper::header::CACHE_CONTROL,
hyper::header::CONTENT_DISPOSITION,
hyper::header::CONTENT_ENCODING,
hyper::header::CONTENT_LANGUAGE,
hyper::header::EXPIRES,
];
for h in standard_header.iter() {
if let Some(v) = headers.get(h) {
match v.to_str() {
Ok(v_str) => {
other.insert(h.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", h, e);
}
}
for name in standard_header.iter() {
if let Some(value) = headers.get(name) {
ret.push((name.to_string(), value.to_str()?.to_string()));
}
}
// Preserve x-amz-meta- headers
for (k, v) in headers.iter() {
if k.as_str().starts_with("x-amz-meta-") {
match std::str::from_utf8(v.as_bytes()) {
Ok(v_str) => {
other.insert(k.to_string(), v_str.to_string());
}
Err(e) => {
warn!("Discarding header {}, error in .to_str(): {}", k, e);
}
}
for (name, value) in headers.iter() {
if name.as_str().starts_with("x-amz-meta-") {
ret.push((
name.to_string(),
std::str::from_utf8(value.as_bytes())?.to_string(),
));
}
}
Ok(ObjectVersionHeaders {
content_type,
other,
})
Ok(ObjectVersionHeaders(ret))
}
pub(crate) fn next_timestamp(existing_object: Option<&Object>) -> u64 {

View file

@ -96,7 +96,7 @@ impl DataBlock {
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
pub fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?;

View file

@ -9,3 +9,5 @@ mod block;
mod layout;
mod metrics;
mod rc;
pub use block::zstd_encode;

View file

@ -337,26 +337,18 @@ impl BlockManager {
}
}
/// Ask nodes that might have a block for it, return it as one big Bytes
pub async fn rpc_get_block(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<Bytes, Error> {
let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
Ok(read_stream_to_end(stream).await?.into_bytes())
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(
&self,
hash: Hash,
data: Bytes,
prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.replication.write_sets(&hash);
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await
.into_parts();
let put_block_rpc =

View file

@ -3,5 +3,6 @@ mod multipart;
mod objects;
mod presigned;
mod simple;
mod ssec;
mod streaming_signature;
mod website;

455
src/garage/tests/s3/ssec.rs Normal file
View file

@ -0,0 +1,455 @@
use crate::common::{self, Context};
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::{CompletedMultipartUpload, CompletedPart};
const SSEC_KEY: &str = "u8zCfnEyt5Imo/krN+sxA1DQXxLWtPJavU6T6gOVj1Y=";
const SSEC_KEY_MD5: &str = "jMGbs3GyZkYjJUP6q5jA7g==";
const SSEC_KEY2: &str = "XkYVk4Z3vVDO2yJaUqCAEZX6lL10voMxtV06d8my/eU=";
const SSEC_KEY2_MD5: &str = "kedo2ab8J1MCjHwJuLTJHw==";
const SZ_2MB: usize = 2 * 1024 * 1024;
#[tokio::test]
async fn test_ssec_object() {
let ctx = common::context();
let bucket = ctx.create_bucket("sse-c");
let bytes1 = b"0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz".to_vec();
let bytes2 = (0..400000)
.map(|x| ((x * 3792) % 256) as u8)
.collect::<Vec<u8>>();
for data in vec![bytes1, bytes2] {
let stream = ByteStream::new(data.clone().into());
// Write encrypted object
let r = ctx
.client
.put_object()
.bucket(&bucket)
.key("testobj")
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY)
.sse_customer_key_md5(SSEC_KEY_MD5)
.body(stream)
.send()
.await
.unwrap();
assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY_MD5.into()));
test_read_encrypted(
&ctx,
&bucket,
"testobj",
&data,
SSEC_KEY,
SSEC_KEY_MD5,
SSEC_KEY2,
SSEC_KEY2_MD5,
)
.await;
// Test copy from encrypted to non-encrypted
let r = ctx
.client
.copy_object()
.bucket(&bucket)
.key("test-copy-enc-dec")
.copy_source(format!("{}/{}", bucket, "testobj"))
.copy_source_sse_customer_algorithm("AES256")
.copy_source_sse_customer_key(SSEC_KEY)
.copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
.send()
.await
.unwrap();
assert_eq!(r.sse_customer_algorithm, None);
assert_eq!(r.sse_customer_key_md5, None);
// Test read decrypted file
let r = ctx
.client
.get_object()
.bucket(&bucket)
.key("test-copy-enc-dec")
.send()
.await
.unwrap();
assert_bytes_eq!(r.body, &data);
assert_eq!(r.sse_customer_algorithm, None);
assert_eq!(r.sse_customer_key_md5, None);
// Test copy from non-encrypted to encrypted
let r = ctx
.client
.copy_object()
.bucket(&bucket)
.key("test-copy-enc-dec-enc")
.copy_source(format!("{}/test-copy-enc-dec", bucket))
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.send()
.await
.unwrap();
assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY2_MD5.into()));
test_read_encrypted(
&ctx,
&bucket,
"test-copy-enc-dec-enc",
&data,
SSEC_KEY2,
SSEC_KEY2_MD5,
SSEC_KEY,
SSEC_KEY_MD5,
)
.await;
// Test copy from encrypted to encrypted with different keys
let r = ctx
.client
.copy_object()
.bucket(&bucket)
.key("test-copy-enc-enc")
.copy_source(format!("{}/{}", bucket, "testobj"))
.copy_source_sse_customer_algorithm("AES256")
.copy_source_sse_customer_key(SSEC_KEY)
.copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.send()
.await
.unwrap();
assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY2_MD5.into()));
test_read_encrypted(
&ctx,
&bucket,
"test-copy-enc-enc",
&data,
SSEC_KEY2,
SSEC_KEY2_MD5,
SSEC_KEY,
SSEC_KEY_MD5,
)
.await;
// Test copy from encrypted to encrypted with the same key
let r = ctx
.client
.copy_object()
.bucket(&bucket)
.key("test-copy-enc-enc-same")
.copy_source(format!("{}/{}", bucket, "testobj"))
.copy_source_sse_customer_algorithm("AES256")
.copy_source_sse_customer_key(SSEC_KEY)
.copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY)
.sse_customer_key_md5(SSEC_KEY_MD5)
.send()
.await
.unwrap();
assert_eq!(r.sse_customer_algorithm, Some("AES256".into()));
assert_eq!(r.sse_customer_key_md5, Some(SSEC_KEY_MD5.into()));
test_read_encrypted(
&ctx,
&bucket,
"test-copy-enc-enc-same",
&data,
SSEC_KEY,
SSEC_KEY_MD5,
SSEC_KEY2,
SSEC_KEY2_MD5,
)
.await;
}
}
#[tokio::test]
async fn test_multipart_upload() {
let ctx = common::context();
let bucket = ctx.create_bucket("test-ssec-mpu");
let u1 = vec![0x11; SZ_2MB];
let u2 = vec![0x22; SZ_2MB];
let u3 = vec![0x33; SZ_2MB];
let all = [&u1[..], &u2[..], &u3[..]].concat();
// Test simple encrypted mpu
{
let up = ctx
.client
.create_multipart_upload()
.bucket(&bucket)
.key("a")
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY)
.sse_customer_key_md5(SSEC_KEY_MD5)
.send()
.await
.unwrap();
assert!(up.upload_id.is_some());
assert_eq!(up.sse_customer_algorithm, Some("AES256".into()));
assert_eq!(up.sse_customer_key_md5, Some(SSEC_KEY_MD5.into()));
let uid = up.upload_id.as_ref().unwrap();
let mut etags = vec![];
for (i, part) in vec![&u1, &u2, &u3].into_iter().enumerate() {
let pu = ctx
.client
.upload_part()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.part_number((i + 1) as i32)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY)
.sse_customer_key_md5(SSEC_KEY_MD5)
.body(ByteStream::from(part.to_vec()))
.send()
.await
.unwrap();
etags.push(pu.e_tag.unwrap());
}
let mut cmp = CompletedMultipartUpload::builder();
for (i, etag) in etags.into_iter().enumerate() {
cmp = cmp.parts(
CompletedPart::builder()
.part_number((i + 1) as i32)
.e_tag(etag)
.build(),
);
}
ctx.client
.complete_multipart_upload()
.bucket(&bucket)
.key("a")
.upload_id(uid)
.multipart_upload(cmp.build())
.send()
.await
.unwrap();
test_read_encrypted(
&ctx,
&bucket,
"a",
&all,
SSEC_KEY,
SSEC_KEY_MD5,
SSEC_KEY2,
SSEC_KEY2_MD5,
)
.await;
}
// Test upload part copy from first object
{
// (setup) Upload a single part object
ctx.client
.put_object()
.bucket(&bucket)
.key("b")
.body(ByteStream::from(u1.clone()))
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.send()
.await
.unwrap();
let up = ctx
.client
.create_multipart_upload()
.bucket(&bucket)
.key("target")
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.send()
.await
.unwrap();
let uid = up.upload_id.as_ref().unwrap();
let p1 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(1)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.body(ByteStream::from(u3.clone()))
.send()
.await
.unwrap();
let p2 = ctx
.client
.upload_part_copy()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(2)
.copy_source(format!("{}/a", bucket))
.copy_source_range("bytes=500-550000")
.copy_source_sse_customer_algorithm("AES256")
.copy_source_sse_customer_key(SSEC_KEY)
.copy_source_sse_customer_key_md5(SSEC_KEY_MD5)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.send()
.await
.unwrap();
let p3 = ctx
.client
.upload_part()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(3)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.body(ByteStream::from(u2.clone()))
.send()
.await
.unwrap();
let p4 = ctx
.client
.upload_part_copy()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.part_number(4)
.copy_source(format!("{}/b", bucket))
.copy_source_range("bytes=1500-20500")
.copy_source_sse_customer_algorithm("AES256")
.copy_source_sse_customer_key(SSEC_KEY2)
.copy_source_sse_customer_key_md5(SSEC_KEY2_MD5)
.sse_customer_algorithm("AES256")
.sse_customer_key(SSEC_KEY2)
.sse_customer_key_md5(SSEC_KEY2_MD5)
.send()
.await
.unwrap();
let cmp = CompletedMultipartUpload::builder()
.parts(
CompletedPart::builder()
.part_number(1)
.e_tag(p1.e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(2)
.e_tag(p2.copy_part_result.unwrap().e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(3)
.e_tag(p3.e_tag.unwrap())
.build(),
)
.parts(
CompletedPart::builder()
.part_number(4)
.e_tag(p4.copy_part_result.unwrap().e_tag.unwrap())
.build(),
)
.build();
ctx.client
.complete_multipart_upload()
.bucket(&bucket)
.key("target")
.upload_id(uid)
.multipart_upload(cmp)
.send()
.await
.unwrap();
// (check) Get object
let expected = [&u3[..], &all[500..550001], &u2[..], &u1[1500..20501]].concat();
test_read_encrypted(
&ctx,
&bucket,
"target",
&expected,
SSEC_KEY2,
SSEC_KEY2_MD5,
SSEC_KEY,
SSEC_KEY_MD5,
)
.await;
}
}
async fn test_read_encrypted(
ctx: &Context,
bucket: &str,
obj_key: &str,
expected_data: &[u8],
enc_key: &str,
enc_key_md5: &str,
wrong_enc_key: &str,
wrong_enc_key_md5: &str,
) {
// Test read encrypted without key
let o = ctx
.client
.get_object()
.bucket(bucket)
.key(obj_key)
.send()
.await;
assert!(
o.is_err(),
"encrypted file could be read without encryption key"
);
// Test read encrypted with wrong key
let o = ctx
.client
.get_object()
.bucket(bucket)
.key(obj_key)
.sse_customer_key(wrong_enc_key)
.sse_customer_key_md5(wrong_enc_key_md5)
.send()
.await;
assert!(
o.is_err(),
"encrypted file could be read with incorrect encryption key"
);
// Test read encrypted with correct key
let o = ctx
.client
.get_object()
.bucket(bucket)
.key(obj_key)
.sse_customer_algorithm("AES256")
.sse_customer_key(enc_key)
.sse_customer_key_md5(enc_key_md5)
.send()
.await
.unwrap();
assert_bytes_eq!(o.body, expected_data);
assert_eq!(o.sse_customer_algorithm, Some("AES256".into()));
assert_eq!(o.sse_customer_key_md5, Some(enc_key_md5.to_string()));
}

View file

@ -27,6 +27,7 @@ blake2.workspace = true
chrono.workspace = true
err-derive.workspace = true
hex.workspace = true
http.workspace = true
base64.workspace = true
tracing.workspace = true
rand.workspace = true

View file

@ -210,7 +210,179 @@ mod v09 {
}
}
pub use v09::*;
mod v010 {
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
use super::v09;
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
/// The list of currenty stored versions of the object
pub(super) versions: Vec<ObjectVersion>,
}
/// Informations about a version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading {
/// Indicates whether this is a multipart upload
multipart: bool,
/// Encryption params + headers to be included in the final object
encryption: ObjectVersionEncryption,
},
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
/// Data stored in object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
/// The object is short, it's stored inlined.
/// It is never compressed. For encrypted objects, it is encrypted using
/// AES256-GCM, like the encrypted headers.
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Size of the object. If object is encrypted/compressed,
/// this is always the size of the unencrypted/uncompressed data
pub size: u64,
/// etag of the object
pub etag: String,
/// Encryption params + headers (encrypted or plaintext)
pub encryption: ObjectVersionEncryption,
}
/// Encryption information + metadata
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionEncryption {
SseC {
/// Encrypted serialized ObjectVersionHeaders struct.
/// This is never compressed, just encrypted using AES256-GCM.
#[serde(with = "serde_bytes")]
headers: Vec<u8>,
/// Whether data blocks are compressed in addition to being encrypted
/// (compression happens before encryption, whereas for non-encrypted
/// objects, compression is handled at the level of the block manager)
compressed: bool,
},
Plaintext {
/// Plain-text headers
headers: ObjectVersionHeaders,
},
}
/// Vector of headers, as tuples of the format (header name, header value)
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionHeaders(pub Vec<(String, String)>);
impl garage_util::migrate::Migrate for Object {
const VERSION_MARKER: &'static [u8] = b"G010s3ob";
type Previous = v09::Object;
fn migrate(old: v09::Object) -> Object {
Object {
bucket_id: old.bucket_id,
key: old.key,
versions: old.versions.into_iter().map(migrate_version).collect(),
}
}
}
fn migrate_version(old: v09::ObjectVersion) -> ObjectVersion {
ObjectVersion {
uuid: old.uuid,
timestamp: old.timestamp,
state: match old.state {
v09::ObjectVersionState::Uploading { multipart, headers } => {
ObjectVersionState::Uploading {
multipart,
encryption: migrate_headers(headers),
}
}
v09::ObjectVersionState::Complete(d) => {
ObjectVersionState::Complete(migrate_data(d))
}
v09::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
},
}
}
fn migrate_data(old: v09::ObjectVersionData) -> ObjectVersionData {
match old {
v09::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
v09::ObjectVersionData::Inline(meta, data) => {
ObjectVersionData::Inline(migrate_meta(meta), data)
}
v09::ObjectVersionData::FirstBlock(meta, fb) => {
ObjectVersionData::FirstBlock(migrate_meta(meta), fb)
}
}
}
fn migrate_meta(old: v09::ObjectVersionMeta) -> ObjectVersionMeta {
ObjectVersionMeta {
size: old.size,
etag: old.etag,
encryption: migrate_headers(old.headers),
}
}
fn migrate_headers(old: v09::ObjectVersionHeaders) -> ObjectVersionEncryption {
use http::header::CONTENT_TYPE;
let mut new_headers = Vec::with_capacity(old.other.len() + 1);
if old.content_type != "blob" {
new_headers.push((CONTENT_TYPE.as_str().to_string(), old.content_type));
}
for (name, value) in old.other.into_iter() {
new_headers.push((name, value));
}
ObjectVersionEncryption::Plaintext {
headers: ObjectVersionHeaders(new_headers),
}
}
// Since ObjectVersionHeaders can now be serialized independently, for the
// purpose of being encrypted, we need it to support migrations on its own
// as well.
impl garage_util::migrate::InitialFormat for ObjectVersionHeaders {
const VERSION_MARKER: &'static [u8] = b"G010s3oh";
}
}
pub use v010::*;
impl Object {
/// Initialize an Object struct from parts

View file

@ -44,7 +44,8 @@ mod v05 {
pub struct VersionBlockKey {
/// Number of the part
pub part_number: u64,
/// Offset of this sub-segment in its part
/// Offset of this sub-segment in its part as sent by the client
/// (before any kind of compression or encryption)
pub offset: u64,
}
@ -53,7 +54,7 @@ mod v05 {
pub struct VersionBlock {
/// Blake2 sum of the block
pub hash: Hash,
/// Size of the block
/// Size of the block, before any kind of compression or encryption
pub size: u64,
}