Garage v1.0 #683

Merged
lx merged 119 commits from next-0.10 into main 2024-04-10 15:23:13 +00:00
18 changed files with 1530 additions and 235 deletions
Showing only changes of commit 57acc60082 - Show all commits

105
Cargo.lock generated
View file

@ -17,6 +17,41 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]]
name = "aead"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"
dependencies = [
"crypto-common",
"generic-array",
]
[[package]]
name = "aes"
version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"
dependencies = [
"cfg-if",
"cipher",
"cpufeatures",
]
[[package]]
name = "aes-gcm"
version = "0.10.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1"
dependencies = [
"aead",
"aes",
"cipher",
"ctr",
"ghash",
"subtle",
]
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.8.7" version = "0.8.7"
@ -761,6 +796,16 @@ dependencies = [
"windows-targets 0.52.0", "windows-targets 0.52.0",
] ]
[[package]]
name = "cipher"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"
dependencies = [
"crypto-common",
"inout",
]
[[package]] [[package]]
name = "clap" name = "clap"
version = "2.34.0" version = "2.34.0"
@ -929,9 +974,19 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
dependencies = [ dependencies = [
"generic-array", "generic-array",
"rand_core",
"typenum", "typenum",
] ]
[[package]]
name = "ctr"
version = "0.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"
dependencies = [
"cipher",
]
[[package]] [[package]]
name = "darling" name = "darling"
version = "0.20.5" version = "0.20.5"
@ -1333,7 +1388,9 @@ dependencies = [
name = "garage_api" name = "garage_api"
version = "0.10.0" version = "0.10.0"
dependencies = [ dependencies = [
"aes-gcm",
"argon2", "argon2",
"async-compression",
"async-trait", "async-trait",
"base64 0.21.7", "base64 0.21.7",
"bytes", "bytes",
@ -1374,6 +1431,7 @@ dependencies = [
"sha2", "sha2",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-util 0.7.10",
"tracing", "tracing",
"url", "url",
] ]
@ -1614,6 +1672,16 @@ dependencies = [
"wasi", "wasi",
] ]
[[package]]
name = "ghash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1"
dependencies = [
"opaque-debug",
"polyval",
]
[[package]] [[package]]
name = "gimli" name = "gimli"
version = "0.28.1" version = "0.28.1"
@ -2063,6 +2131,15 @@ dependencies = [
"hashbrown 0.14.3", "hashbrown 0.14.3",
] ]
[[package]]
name = "inout"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"
dependencies = [
"generic-array",
]
[[package]] [[package]]
name = "instant" name = "instant"
version = "0.1.12" version = "0.1.12"
@ -2643,6 +2720,12 @@ version = "1.19.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92" checksum = "3fdb12b2476b595f9358c5161aa467c2438859caa136dec86c26fdd2efe17b92"
[[package]]
name = "opaque-debug"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"
[[package]] [[package]]
name = "openssl-probe" name = "openssl-probe"
version = "0.1.5" version = "0.1.5"
@ -2980,6 +3063,18 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "polyval"
version = "0.6.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"
dependencies = [
"cfg-if",
"cpufeatures",
"opaque-debug",
"universal-hash",
]
[[package]] [[package]]
name = "powerfmt" name = "powerfmt"
version = "0.2.0" version = "0.2.0"
@ -4399,6 +4494,16 @@ version = "0.2.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c"
[[package]]
name = "universal-hash"
version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"
dependencies = [
"crypto-common",
"subtle",
]
[[package]] [[package]]
name = "unsafe-libyaml" name = "unsafe-libyaml"
version = "0.2.10" version = "0.2.10"

133
Cargo.nix
View file

@ -34,7 +34,7 @@ args@{
ignoreLockHash, ignoreLockHash,
}: }:
let let
nixifiedLockHash = "263873397c8aa960f9ef6a815187218ab9c58b5ab35bbeb9c3dc70d032dcc963"; nixifiedLockHash = "170b83bf5f94d624b1caf773805f52b36970c99f4db21088c4ac794dad02c53b";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash lockHashIgnored = if ignoreLockHash
@ -88,6 +88,58 @@ in
src = fetchCratesIo { inherit name version; sha256 = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"; }; src = fetchCratesIo { inherit name version; sha256 = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".aead."0.5.2" = overridableMkRustCrate (profileName: rec {
name = "aead";
version = "0.5.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "d122413f284cf2d62fb1b7db97e02edb8cda96d769b16e443a4f6195e35662b0"; };
features = builtins.concatLists [
[ "alloc" ]
[ "getrandom" ]
[ "rand_core" ]
[ "stream" ]
];
dependencies = {
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".aes."0.8.4" = overridableMkRustCrate (profileName: rec {
name = "aes";
version = "0.8.4";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0"; };
dependencies = {
cfg_if = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }).out;
cipher = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" { inherit profileName; }).out;
${ if hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "x86_64" || hostPlatform.parsed.cpu.name == "i686" then "cpufeatures" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cpufeatures."0.2.12" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".aes-gcm."0.10.3" = overridableMkRustCrate (profileName: rec {
name = "aes-gcm";
version = "0.10.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "831010a0f742e1209b3bcea8fab6a8e149051ba6099432c8cb2cc117dec3ead1"; };
features = builtins.concatLists [
[ "aes" ]
[ "alloc" ]
[ "default" ]
[ "getrandom" ]
[ "rand_core" ]
[ "stream" ]
];
dependencies = {
aead = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aead."0.5.2" { inherit profileName; }).out;
aes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aes."0.8.4" { inherit profileName; }).out;
cipher = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" { inherit profileName; }).out;
ctr = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ctr."0.9.2" { inherit profileName; }).out;
ghash = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ghash."0.5.1" { inherit profileName; }).out;
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".ahash."0.8.7" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".ahash."0.8.7" = overridableMkRustCrate (profileName: rec {
name = "ahash"; name = "ahash";
version = "0.8.7"; version = "0.8.7";
@ -1085,6 +1137,17 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" = overridableMkRustCrate (profileName: rec {
name = "cipher";
version = "0.4.4";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "773f3b9af64447d2ce9850330c473515014aa235e6a783b02db81ff39e4a3dad"; };
dependencies = {
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
inout = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".inout."0.1.3" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".clap."2.34.0" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".clap."2.34.0" = overridableMkRustCrate (profileName: rec {
name = "clap"; name = "clap";
version = "2.34.0"; version = "2.34.0";
@ -1333,14 +1396,27 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index"; registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"; }; src = fetchCratesIo { inherit name version; sha256 = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"; };
features = builtins.concatLists [ features = builtins.concatLists [
[ "getrandom" ]
[ "rand_core" ]
[ "std" ] [ "std" ]
]; ];
dependencies = { dependencies = {
generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out; generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out;
rand_core = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.6.4" { inherit profileName; }).out;
typenum = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".typenum."1.17.0" { inherit profileName; }).out; typenum = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".typenum."1.17.0" { inherit profileName; }).out;
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".ctr."0.9.2" = overridableMkRustCrate (profileName: rec {
name = "ctr";
version = "0.9.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "0369ee1ad671834580515889b80f2ea915f23b8be8d0daa4bbaf2ac5c7590835"; };
dependencies = {
cipher = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cipher."0.4.4" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".darling."0.20.5" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".darling."0.20.5" = overridableMkRustCrate (profileName: rec {
name = "darling"; name = "darling";
version = "0.20.5"; version = "0.20.5";
@ -1958,7 +2034,9 @@ in
(lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics" || rootFeatures' ? "garage_api/prometheus") "prometheus") (lib.optional (rootFeatures' ? "garage/default" || rootFeatures' ? "garage/metrics" || rootFeatures' ? "garage_api/metrics" || rootFeatures' ? "garage_api/prometheus") "prometheus")
]; ];
dependencies = { dependencies = {
aes_gcm = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".aes-gcm."0.10.3" { inherit profileName; }).out;
argon2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".argon2."0.5.3" { inherit profileName; }).out; argon2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".argon2."0.5.3" { inherit profileName; }).out;
async_compression = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".async-compression."0.4.6" { inherit profileName; }).out;
async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.77" { profileName = "__noProfile"; }).out; async_trait = (buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.77" { profileName = "__noProfile"; }).out;
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out; base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out;
bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out; bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.5.0" { inherit profileName; }).out;
@ -1999,6 +2077,7 @@ in
sha2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.10.8" { inherit profileName; }).out; sha2 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".sha2."0.10.8" { inherit profileName; }).out;
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out; tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" { inherit profileName; }).out; tokio_stream = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" { inherit profileName; }).out;
tokio_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-util."0.7.10" { inherit profileName; }).out;
tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out; tracing = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.40" { inherit profileName; }).out;
url = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".url."2.5.0" { inherit profileName; }).out; url = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".url."2.5.0" { inherit profileName; }).out;
}; };
@ -2321,6 +2400,17 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".ghash."0.5.1" = overridableMkRustCrate (profileName: rec {
name = "ghash";
version = "0.5.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "f0d8a4362ccb29cb0b265253fb0a2728f592895ee6854fd9bc13f2ffda266ff1"; };
dependencies = {
opaque_debug = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opaque-debug."0.3.1" { inherit profileName; }).out;
polyval = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".polyval."0.6.2" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".gimli."0.28.1" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".gimli."0.28.1" = overridableMkRustCrate (profileName: rec {
name = "gimli"; name = "gimli";
version = "0.28.1"; version = "0.28.1";
@ -2928,6 +3018,16 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".inout."0.1.3" = overridableMkRustCrate (profileName: rec {
name = "inout";
version = "0.1.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "a0c10553d664a4d0bcff9f4215d0aac67a639cc68ef660840afe309b807bc9f5"; };
dependencies = {
generic_array = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".generic-array."0.14.7" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".instant."0.1.12" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".instant."0.1.12" = overridableMkRustCrate (profileName: rec {
name = "instant"; name = "instant";
version = "0.1.12"; version = "0.1.12";
@ -3777,6 +3877,13 @@ in
]; ];
}); });
"registry+https://github.com/rust-lang/crates.io-index".opaque-debug."0.3.1" = overridableMkRustCrate (profileName: rec {
name = "opaque-debug";
version = "0.3.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "c08d65885ee38876c4f86fa503fb49d7b507c2b62552df7c70b2fce627e06381"; };
});
"registry+https://github.com/rust-lang/crates.io-index".openssl-probe."0.1.5" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".openssl-probe."0.1.5" = overridableMkRustCrate (profileName: rec {
name = "openssl-probe"; name = "openssl-probe";
version = "0.1.5"; version = "0.1.5";
@ -4236,6 +4343,19 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".polyval."0.6.2" = overridableMkRustCrate (profileName: rec {
name = "polyval";
version = "0.6.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "9d1fe60d06143b2430aa532c94cfe9e29783047f06c0d7fd359a9a51b729fa25"; };
dependencies = {
cfg_if = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; }).out;
${ if hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "x86_64" || hostPlatform.parsed.cpu.name == "i686" then "cpufeatures" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".cpufeatures."0.2.12" { inherit profileName; }).out;
opaque_debug = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opaque-debug."0.3.1" { inherit profileName; }).out;
universal_hash = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".universal-hash."0.5.1" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".powerfmt."0.2.0" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".powerfmt."0.2.0" = overridableMkRustCrate (profileName: rec {
name = "powerfmt"; name = "powerfmt";
version = "0.2.0"; version = "0.2.0";
@ -6314,6 +6434,17 @@ in
]; ];
}); });
"registry+https://github.com/rust-lang/crates.io-index".universal-hash."0.5.1" = overridableMkRustCrate (profileName: rec {
name = "universal-hash";
version = "0.5.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "fc1de2c688dc15305988b563c3854064043356019f97a4b46276fe734c4f07ea"; };
dependencies = {
crypto_common = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".crypto-common."0.1.6" { inherit profileName; }).out;
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".unsafe-libyaml."0.2.10" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".unsafe-libyaml."0.2.10" = overridableMkRustCrate (profileName: rec {
name = "unsafe-libyaml"; name = "unsafe-libyaml";
version = "0.2.10"; version = "0.2.10";

View file

@ -66,6 +66,7 @@ sha2 = "0.10"
timeago = { version = "0.4", default-features = false } timeago = { version = "0.4", default-features = false }
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
aes-gcm = { version = "0.10", features = ["aes", "stream"] }
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] } kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }

View file

@ -21,7 +21,9 @@ garage_net.workspace = true
garage_util.workspace = true garage_util.workspace = true
garage_rpc.workspace = true garage_rpc.workspace = true
aes-gcm.workspace = true
argon2.workspace = true argon2.workspace = true
async-compression.workspace = true
async-trait.workspace = true async-trait.workspace = true
base64.workspace = true base64.workspace = true
bytes.workspace = true bytes.workspace = true
@ -41,6 +43,7 @@ futures.workspace = true
futures-util.workspace = true futures-util.workspace = true
tokio.workspace = true tokio.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
tokio-util.workspace = true
form_urlencoded.workspace = true form_urlencoded.workspace = true
http.workspace = true http.workspace = true

View file

@ -1,7 +1,7 @@
use std::pin::Pin; use std::pin::Pin;
use std::time::{Duration, SystemTime, UNIX_EPOCH}; use std::time::{Duration, SystemTime, UNIX_EPOCH};
use futures::{stream, stream::Stream, StreamExt}; use futures::{stream, stream::Stream, StreamExt, TryStreamExt};
use md5::{Digest as Md5Digest, Md5}; use md5::{Digest as Md5Digest, Md5};
use bytes::Bytes; use bytes::Bytes;
@ -9,9 +9,11 @@ use hyper::{Request, Response};
use serde::Serialize; use serde::Serialize;
use garage_net::bytes_buf::BytesBuf; use garage_net::bytes_buf::BytesBuf;
use garage_net::stream::read_stream_to_end;
use garage_rpc::rpc_helper::OrderTag; use garage_rpc::rpc_helper::OrderTag;
use garage_table::*; use garage_table::*;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::*; use garage_util::time::*;
use garage_model::s3::block_ref_table::*; use garage_model::s3::block_ref_table::*;
@ -21,11 +23,15 @@ use garage_model::s3::version_table::*;
use crate::helpers::*; use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*; use crate::s3::error::*;
use crate::s3::get::full_object_byte_stream;
use crate::s3::multipart; use crate::s3::multipart;
use crate::s3::put::get_headers; use crate::s3::put::{get_headers, save_stream, SaveStreamResult};
use crate::s3::xml::{self as s3_xml, xmlns_tag}; use crate::s3::xml::{self as s3_xml, xmlns_tag};
// -------- CopyObject ---------
pub async fn handle_copy( pub async fn handle_copy(
ctx: ReqCtx, ctx: ReqCtx,
req: &Request<ReqBody>, req: &Request<ReqBody>,
@ -35,38 +41,114 @@ pub async fn handle_copy(
let source_object = get_copy_source(&ctx, req).await?; let source_object = get_copy_source(&ctx, req).await?;
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
..
} = ctx;
let (source_version, source_version_data, source_version_meta) = let (source_version, source_version_data, source_version_meta) =
extract_source_info(&source_object)?; extract_source_info(&source_object)?;
// Check precondition, e.g. x-amz-copy-source-if-match // Check precondition, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_version, &source_version_meta.etag)?; copy_precondition.check(source_version, &source_version_meta.etag)?;
// Determine encryption parameters
let (source_encryption, source_object_headers) =
EncryptionParams::check_decrypt_for_copy_source(
&ctx.garage,
req.headers(),
&source_version_meta.encryption,
)?;
let dest_encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
// Determine headers of destination object
let dest_object_headers = match req.headers().get("x-amz-metadata-directive") {
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => {
get_headers(req.headers())?
}
_ => source_object_headers.into_owned(),
};
// Do actual object copying
let res = if EncryptionParams::is_same(&source_encryption, &dest_encryption) {
// If source and dest are both unencrypted, or if the encryption keys
// are the same, we can just copy the metadata and link blocks of the
// old object from the new object.
handle_copy_metaonly(
ctx,
dest_key,
dest_object_headers,
dest_encryption,
source_version,
source_version_data,
source_version_meta,
)
.await?
} else {
// If source and dest encryption use different keys,
// we must decrypt content and re-encrypt, so rewrite all data blocks.
handle_copy_reencrypt(
ctx,
dest_key,
dest_object_headers,
dest_encryption,
source_version,
source_version_data,
source_encryption,
)
.await?
};
let last_modified = msec_to_rfc3339(res.version_timestamp);
let result = CopyObjectResult {
last_modified: s3_xml::Value(last_modified),
etag: s3_xml::Value(format!("\"{}\"", res.etag)),
};
let xml = s3_xml::to_xml_with_header(&result)?;
let mut resp = Response::builder()
.header("Content-Type", "application/xml")
.header("x-amz-version-id", hex::encode(res.version_uuid))
.header(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
);
dest_encryption.add_response_headers(&mut resp);
Ok(resp.body(string_body(xml))?)
}
async fn handle_copy_metaonly(
ctx: ReqCtx,
dest_key: &str,
dest_object_headers: ObjectVersionHeaders,
dest_encryption: EncryptionParams,
source_version: &ObjectVersion,
source_version_data: &ObjectVersionData,
source_version_meta: &ObjectVersionMeta,
) -> Result<SaveStreamResult, Error> {
let ReqCtx {
garage,
bucket_id: dest_bucket_id,
..
} = ctx;
// Generate parameters for copied object // Generate parameters for copied object
let new_uuid = gen_uuid(); let new_uuid = gen_uuid();
let new_timestamp = now_msec(); let new_timestamp = now_msec();
// Implement x-amz-metadata-directive: REPLACE let new_meta = ObjectVersionMeta {
let new_meta = match req.headers().get("x-amz-metadata-directive") { encryption: dest_encryption.encrypt_headers(dest_object_headers)?,
Some(v) if v == hyper::header::HeaderValue::from_static("REPLACE") => ObjectVersionMeta { size: source_version_meta.size,
headers: get_headers(req.headers())?, etag: source_version_meta.etag.clone(),
size: source_version_meta.size,
etag: source_version_meta.etag.clone(),
},
_ => source_version_meta.clone(),
}; };
let etag = new_meta.etag.to_string(); let res = SaveStreamResult {
version_uuid: new_uuid,
version_timestamp: new_timestamp,
etag: new_meta.etag.clone(),
};
// Save object copy // Save object copy
match source_version_data { match source_version_data {
ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => { ObjectVersionData::Inline(_meta, bytes) => {
// bytes is either plaintext before&after or encrypted with the
// same keys, so it's ok to just copy it as is
let dest_object_version = ObjectVersion { let dest_object_version = ObjectVersion {
uuid: new_uuid, uuid: new_uuid,
timestamp: new_timestamp, timestamp: new_timestamp,
@ -97,7 +179,7 @@ pub async fn handle_copy(
uuid: new_uuid, uuid: new_uuid,
timestamp: new_timestamp, timestamp: new_timestamp,
state: ObjectVersionState::Uploading { state: ObjectVersionState::Uploading {
headers: new_meta.headers.clone(), encryption: new_meta.encryption.clone(),
multipart: false, multipart: false,
}, },
}; };
@ -164,23 +246,42 @@ pub async fn handle_copy(
} }
} }
let last_modified = msec_to_rfc3339(new_timestamp); Ok(res)
let result = CopyObjectResult {
last_modified: s3_xml::Value(last_modified),
etag: s3_xml::Value(format!("\"{}\"", etag)),
};
let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::builder()
.header("Content-Type", "application/xml")
.header("x-amz-version-id", hex::encode(new_uuid))
.header(
"x-amz-copy-source-version-id",
hex::encode(source_version.uuid),
)
.body(string_body(xml))?)
} }
async fn handle_copy_reencrypt(
ctx: ReqCtx,
dest_key: &str,
dest_object_headers: ObjectVersionHeaders,
dest_encryption: EncryptionParams,
source_version: &ObjectVersion,
source_version_data: &ObjectVersionData,
source_encryption: EncryptionParams,
) -> Result<SaveStreamResult, Error> {
// basically we will read the source data (decrypt if necessary)
// and save that in a new object (encrypt if necessary),
// by combining the code used in getobject and putobject
let source_stream = full_object_byte_stream(
ctx.garage.clone(),
source_version,
source_version_data,
source_encryption,
);
save_stream(
&ctx,
dest_object_headers,
dest_encryption,
source_stream.map_err(|e| Error::from(GarageError::from(e))),
&dest_key.to_string(),
None,
None,
)
.await
}
// -------- UploadPartCopy ---------
pub async fn handle_upload_part_copy( pub async fn handle_upload_part_copy(
ctx: ReqCtx, ctx: ReqCtx,
req: &Request<ReqBody>, req: &Request<ReqBody>,
@ -193,7 +294,7 @@ pub async fn handle_upload_part_copy(
let dest_upload_id = multipart::decode_upload_id(upload_id)?; let dest_upload_id = multipart::decode_upload_id(upload_id)?;
let dest_key = dest_key.to_string(); let dest_key = dest_key.to_string();
let (source_object, (_, _, mut dest_mpu)) = futures::try_join!( let (source_object, (_, dest_version, mut dest_mpu)) = futures::try_join!(
get_copy_source(&ctx, req), get_copy_source(&ctx, req),
multipart::get_upload(&ctx, &dest_key, &dest_upload_id) multipart::get_upload(&ctx, &dest_key, &dest_upload_id)
)?; )?;
@ -206,6 +307,20 @@ pub async fn handle_upload_part_copy(
// Check precondition on source, e.g. x-amz-copy-source-if-match // Check precondition on source, e.g. x-amz-copy-source-if-match
copy_precondition.check(source_object_version, &source_version_meta.etag)?; copy_precondition.check(source_object_version, &source_version_meta.etag)?;
// Determine encryption parameters
let (source_encryption, _) = EncryptionParams::check_decrypt_for_copy_source(
&garage,
req.headers(),
&source_version_meta.encryption,
)?;
let dest_object_encryption = match dest_version.state {
ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(),
};
let (dest_encryption, _) =
EncryptionParams::check_decrypt(&garage, req.headers(), &dest_object_encryption)?;
let same_encryption = EncryptionParams::is_same(&source_encryption, &dest_encryption);
// Check source range is valid // Check source range is valid
let source_range = match req.headers().get("x-amz-copy-source-range") { let source_range = match req.headers().get("x-amz-copy-source-range") {
Some(range) => { Some(range) => {
@ -227,21 +342,16 @@ pub async fn handle_upload_part_copy(
}; };
// Check source version is not inlined // Check source version is not inlined
match source_version_data { if matches!(source_version_data, ObjectVersionData::Inline(_, _)) {
ObjectVersionData::DeleteMarker => unreachable!(), // This is only for small files, we don't bother handling this.
ObjectVersionData::Inline(_meta, _bytes) => { // (in AWS UploadPartCopy works for parts at least 5MB which
// This is only for small files, we don't bother handling this. // is never the case of an inline object)
// (in AWS UploadPartCopy works for parts at least 5MB which return Err(Error::bad_request(
// is never the case of an inline object) "Source object is too small (minimum part size is 5Mb)",
return Err(Error::bad_request( ));
"Source object is too small (minimum part size is 5Mb)", }
));
}
ObjectVersionData::FirstBlock(_meta, _first_block_hash) => (),
};
// Fetch source versin with its block list, // Fetch source version with its block list
// and destination version to check part hasn't yet been uploaded
let source_version = garage let source_version = garage
.version_table .version_table
.get(&source_object_version.uuid, &EmptyKey) .get(&source_object_version.uuid, &EmptyKey)
@ -251,7 +361,9 @@ pub async fn handle_upload_part_copy(
// We want to reuse blocks from the source version as much as possible. // We want to reuse blocks from the source version as much as possible.
// However, we still need to get the data from these blocks // However, we still need to get the data from these blocks
// because we need to know it to calculate the MD5sum of the part // because we need to know it to calculate the MD5sum of the part
// which is used as its ETag. // which is used as its ETag. For encrypted sources or destinations,
// we must always read(+decrypt) and then write(+encrypt), so we
// can never reuse data blocks as is.
// First, calculate what blocks we want to keep, // First, calculate what blocks we want to keep,
// and the subrange of the block to take, if the bounds of the // and the subrange of the block to take, if the bounds of the
@ -313,6 +425,8 @@ pub async fn handle_upload_part_copy(
}, },
false, false,
); );
// write an empty version now to be the parent of the block_ref entries
garage.version_table.insert(&dest_version).await?;
// Now, actually copy the blocks // Now, actually copy the blocks
let mut md5hasher = Md5::new(); let mut md5hasher = Md5::new();
@ -321,24 +435,44 @@ pub async fn handle_upload_part_copy(
// and extract the subrange if necessary. // and extract the subrange if necessary.
// The second returned value is an Option<Hash>, that is Some // The second returned value is an Option<Hash>, that is Some
// if and only if the block returned is a block that already existed // if and only if the block returned is a block that already existed
// in the Garage data store (thus we don't need to save it again). // in the Garage data store and can be reused as-is instead of having
// to save it again. This excludes encrypted source blocks that we had
// to decrypt.
let garage2 = garage.clone(); let garage2 = garage.clone();
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let source_blocks = stream::iter(blocks_to_copy) let source_blocks = stream::iter(blocks_to_copy)
.enumerate() .enumerate()
.flat_map(|(i, (block_hash, range_to_copy))| { .map(|(i, (block_hash, range_to_copy))| {
let garage3 = garage2.clone(); let garage3 = garage2.clone();
stream::once(async move { async move {
let data = garage3 let stream = source_encryption
.block_manager .get_block(&garage3, &block_hash, Some(order_stream.order(i as u64)))
.rpc_get_block(&block_hash, Some(order_stream.order(i as u64)))
.await?; .await?;
let data = read_stream_to_end(stream).await?.into_bytes();
// For each item, we return a tuple of:
// 1. the full data block (decrypted)
// 2. an Option<Hash> that indicates the hash of the block in the block store,
// only if it can be re-used as-is in the copied object
match range_to_copy { match range_to_copy {
Some(r) => Ok((data.slice(r), None)), Some(r) => {
None => Ok((data, Some(block_hash))), // If we are taking a subslice of the data, we cannot reuse the block as-is
Ok((data.slice(r), None))
}
None if same_encryption => {
// If the data is unencrypted before & after, or if we are using
// the same encryption key, we can reuse the stored block, no need
// to re-send it to storage nodes.
Ok((data, Some(block_hash)))
}
None => {
// If we are decrypting / (re)encrypting with different keys,
// we cannot reuse the block as-is
Ok((data, None))
}
} }
}) }
}) })
.buffered(2)
.peekable(); .peekable();
// The defragmenter is a custom stream (defined below) that concatenates // The defragmenter is a custom stream (defined below) that concatenates
@ -346,22 +480,33 @@ pub async fn handle_upload_part_copy(
// It returns a series of (Vec<u8>, Option<Hash>). // It returns a series of (Vec<u8>, Option<Hash>).
// When it is done, it returns an empty vec. // When it is done, it returns an empty vec.
// Same as the previous iterator, the Option is Some(_) if and only if // Same as the previous iterator, the Option is Some(_) if and only if
// it's an existing block of the Garage data store. // it's an existing block of the Garage data store that can be reused.
let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks)); let mut defragmenter = Defragmenter::new(garage.config.block_size, Box::pin(source_blocks));
let mut current_offset = 0; let mut current_offset = 0;
let mut next_block = defragmenter.next().await?; let mut next_block = defragmenter.next().await?;
// TODO this could be optimized similarly to read_and_put_blocks
// low priority because uploadpartcopy is rarely used
loop { loop {
let (data, existing_block_hash) = next_block; let (data, existing_block_hash) = next_block;
if data.is_empty() { if data.is_empty() {
break; break;
} }
let data_len = data.len() as u64;
md5hasher.update(&data[..]); md5hasher.update(&data[..]);
let must_upload = existing_block_hash.is_none(); let (final_data, must_upload, final_hash) = match existing_block_hash {
let final_hash = existing_block_hash.unwrap_or_else(|| blake2sum(&data[..])); Some(hash) if same_encryption => (data, false, hash),
_ => tokio::task::spawn_blocking(move || {
let data_enc = dest_encryption.encrypt_block(data)?;
let hash = blake2sum(&data_enc);
Ok::<_, Error>((data_enc, true, hash))
})
.await
.unwrap()?,
};
dest_version.blocks.clear(); dest_version.blocks.clear();
dest_version.blocks.put( dest_version.blocks.put(
@ -371,10 +516,10 @@ pub async fn handle_upload_part_copy(
}, },
VersionBlock { VersionBlock {
hash: final_hash, hash: final_hash,
size: data.len() as u64, size: data_len,
}, },
); );
current_offset += data.len() as u64; current_offset += data_len;
let block_ref = BlockRef { let block_ref = BlockRef {
block: final_hash, block: final_hash,
@ -382,36 +527,33 @@ pub async fn handle_upload_part_copy(
deleted: false.into(), deleted: false.into(),
}; };
let garage2 = garage.clone(); let (_, _, _, next) = futures::try_join!(
let res = futures::try_join!(
// Thing 1: if the block is not exactly a block that existed before, // Thing 1: if the block is not exactly a block that existed before,
// we need to insert that data as a new block. // we need to insert that data as a new block.
async move { async {
if must_upload { if must_upload {
garage2 garage
.block_manager .block_manager
.rpc_put_block(final_hash, data, None) .rpc_put_block(final_hash, final_data, dest_encryption.is_encrypted(), None)
.await .await
} else { } else {
Ok(()) Ok(())
} }
}, },
async { // Thing 2: we need to insert the block in the version
// Thing 2: we need to insert the block in the version garage.version_table.insert(&dest_version),
garage.version_table.insert(&dest_version).await?; // Thing 3: we need to add a block reference
// Thing 3: we need to add a block reference garage.block_ref_table.insert(&block_ref),
garage.block_ref_table.insert(&block_ref).await // Thing 4: we need to read the next block
},
// Thing 4: we need to prefetch the next block
defragmenter.next(), defragmenter.next(),
)?; )?;
next_block = res.2; next_block = next;
} }
assert_eq!(current_offset, source_range.length); assert_eq!(current_offset, source_range.length);
let data_md5sum = md5hasher.finalize(); let data_md5sum = md5hasher.finalize();
let etag = hex::encode(data_md5sum); let etag = dest_encryption.etag_from_md5(&data_md5sum);
// Put the part's ETag in the Versiontable // Put the part's ETag in the Versiontable
dest_mpu.parts.put( dest_mpu.parts.put(
@ -431,13 +573,14 @@ pub async fn handle_upload_part_copy(
last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)), last_modified: s3_xml::Value(msec_to_rfc3339(source_object_version.timestamp)),
})?; })?;
Ok(Response::builder() let mut resp = Response::builder()
.header("Content-Type", "application/xml") .header("Content-Type", "application/xml")
.header( .header(
"x-amz-copy-source-version-id", "x-amz-copy-source-version-id",
hex::encode(source_object_version.uuid), hex::encode(source_object_version.uuid),
) );
.body(string_body(resp_xml))?) dest_encryption.add_response_headers(&mut resp);
Ok(resp.body(string_body(resp_xml))?)
} }
async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> { async fn get_copy_source(ctx: &ReqCtx, req: &Request<ReqBody>) -> Result<Object, Error> {

595
src/api/s3/encryption.rs Normal file
View file

@ -0,0 +1,595 @@
use std::borrow::Cow;
use std::convert::TryInto;
use std::pin::Pin;
use aes_gcm::{
aead::stream::{DecryptorLE31, EncryptorLE31, StreamLE31},
aead::{Aead, AeadCore, KeyInit, OsRng},
aes::cipher::crypto_common::rand_core::RngCore,
aes::cipher::typenum::Unsigned,
Aes256Gcm, Key, Nonce,
};
use base64::prelude::*;
use bytes::Bytes;
use futures::stream::Stream;
use futures::task;
use tokio::io::BufReader;
use http::header::{HeaderMap, HeaderName, HeaderValue};
use garage_net::bytes_buf::BytesBuf;
use garage_net::stream::{stream_asyncread, ByteStream};
use garage_rpc::rpc_helper::OrderTag;
use garage_util::data::Hash;
use garage_util::error::Error as GarageError;
use garage_util::migrate::Migrate;
use garage_model::garage::Garage;
use garage_model::s3::object_table::{ObjectVersionEncryption, ObjectVersionHeaders};
use crate::common_error::*;
use crate::s3::error::Error;
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-algorithm");
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-key");
const X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
HeaderName::from_static("x-amz-server-side-encryption-customer-key-md5");
const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM: HeaderName =
HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-algorithm");
const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY: HeaderName =
HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key");
const X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5: HeaderName =
HeaderName::from_static("x-amz-copy-source-server-side-encryption-customer-key-md5");
const CUSTOMER_ALGORITHM_AES256: &[u8] = b"AES256";
type Md5Output = md5::digest::Output<md5::Md5Core>;
type StreamNonceSize = aes_gcm::aead::stream::NonceSize<Aes256Gcm, StreamLE31<Aes256Gcm>>;
// Data blocks are encrypted by smaller chunks of size 4096 bytes,
// so that data can be streamed when reading.
// This size has to be known and has to be constant, or data won't be
// readable anymore. DO NOT CHANGE THIS VALUE.
const STREAM_ENC_PLAIN_CHUNK_SIZE: usize = 0x1000; // 4096 bytes
const STREAM_ENC_CYPER_CHUNK_SIZE: usize = STREAM_ENC_PLAIN_CHUNK_SIZE + 16;
#[derive(Clone, Copy)]
pub enum EncryptionParams {
Plaintext,
SseC {
client_key: Key<Aes256Gcm>,
client_key_md5: Md5Output,
compression_level: Option<i32>,
},
}
impl EncryptionParams {
pub fn is_encrypted(&self) -> bool {
!matches!(self, Self::Plaintext)
}
pub fn is_same(a: &Self, b: &Self) -> bool {
let relevant_info = |x: &Self| match x {
Self::Plaintext => None,
Self::SseC {
client_key,
compression_level,
..
} => Some((*client_key, compression_level.is_some())),
};
relevant_info(a) == relevant_info(b)
}
pub fn new_from_headers(
garage: &Garage,
headers: &HeaderMap,
) -> Result<EncryptionParams, Error> {
let key = parse_request_headers(
headers,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
)?;
match key {
Some((client_key, client_key_md5)) => Ok(EncryptionParams::SseC {
client_key,
client_key_md5,
compression_level: garage.config.compression_level,
}),
None => Ok(EncryptionParams::Plaintext),
}
}
pub fn add_response_headers(&self, resp: &mut http::response::Builder) {
if let Self::SseC { client_key_md5, .. } = self {
let md5 = BASE64_STANDARD.encode(&client_key_md5);
resp.headers_mut().unwrap().insert(
X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
HeaderValue::from_bytes(CUSTOMER_ALGORITHM_AES256).unwrap(),
);
resp.headers_mut().unwrap().insert(
X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
HeaderValue::from_bytes(md5.as_bytes()).unwrap(),
);
}
}
pub fn check_decrypt<'a>(
garage: &Garage,
headers: &HeaderMap,
obj_enc: &'a ObjectVersionEncryption,
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
let key = parse_request_headers(
headers,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
&X_AMZ_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
)?;
Self::check_decrypt_common(garage, key, obj_enc)
}
pub fn check_decrypt_for_copy_source<'a>(
garage: &Garage,
headers: &HeaderMap,
obj_enc: &'a ObjectVersionEncryption,
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
let key = parse_request_headers(
headers,
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_ALGORITHM,
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY,
&X_AMZ_COPY_SOURCE_SERVER_SIDE_ENCRYPTION_CUSTOMER_KEY_MD5,
)?;
Self::check_decrypt_common(garage, key, obj_enc)
}
fn check_decrypt_common<'a>(
garage: &Garage,
key: Option<(Key<Aes256Gcm>, Md5Output)>,
obj_enc: &'a ObjectVersionEncryption,
) -> Result<(Self, Cow<'a, ObjectVersionHeaders>), Error> {
match (key, &obj_enc) {
(
Some((client_key, client_key_md5)),
ObjectVersionEncryption::SseC {
headers,
compressed,
},
) => {
let enc = Self::SseC {
client_key,
client_key_md5,
compression_level: if *compressed {
Some(garage.config.compression_level.unwrap_or(1))
} else {
None
},
};
let plaintext = enc.decrypt_blob(&headers)?;
let headers = ObjectVersionHeaders::decode(&plaintext)
.ok_or_internal_error("Could not decode encrypted headers")?;
Ok((enc, Cow::Owned(headers)))
}
(None, ObjectVersionEncryption::Plaintext { headers }) => {
Ok((Self::Plaintext, Cow::Borrowed(headers)))
}
(_, ObjectVersionEncryption::SseC { .. }) => {
Err(Error::bad_request("Object is encrypted"))
}
(Some(_), _) => {
// TODO: should this be an OK scenario?
Err(Error::bad_request("Trying to decrypt a plaintext object"))
}
}
}
pub fn encrypt_headers(
&self,
h: ObjectVersionHeaders,
) -> Result<ObjectVersionEncryption, Error> {
match self {
Self::SseC {
compression_level, ..
} => {
let plaintext = h.encode().map_err(GarageError::from)?;
let ciphertext = self.encrypt_blob(&plaintext)?;
Ok(ObjectVersionEncryption::SseC {
headers: ciphertext.into_owned(),
compressed: compression_level.is_some(),
})
}
Self::Plaintext => Ok(ObjectVersionEncryption::Plaintext { headers: h }),
}
}
// ---- generating object Etag values ----
pub fn etag_from_md5(&self, md5sum: &[u8]) -> String {
match self {
Self::Plaintext => hex::encode(md5sum),
Self::SseC { .. } => {
// AWS specifies that for encrypted objects, the Etag is not
// the md5sum of the data, but doesn't say what it is.
// So we just put some random bytes.
let mut random = [0u8; 16];
OsRng.fill_bytes(&mut random);
hex::encode(&random)
}
}
}
// ---- generic function for encrypting / decrypting blobs ----
// Prepends a randomly-generated nonce to the encrypted value.
// This is used for encrypting object headers and inlined data for small objects.
// This does not compress anything.
pub fn encrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
match self {
Self::SseC { client_key, .. } => {
let cipher = Aes256Gcm::new(&client_key);
let nonce = Aes256Gcm::generate_nonce(&mut OsRng);
let ciphertext = cipher
.encrypt(&nonce, blob)
.ok_or_internal_error("Encryption failed")?;
Ok(Cow::Owned([nonce.to_vec(), ciphertext].concat()))
}
Self::Plaintext => Ok(Cow::Borrowed(blob)),
}
}
pub fn decrypt_blob<'a>(&self, blob: &'a [u8]) -> Result<Cow<'a, [u8]>, Error> {
match self {
Self::SseC { client_key, .. } => {
let cipher = Aes256Gcm::new(&client_key);
let nonce_size = <Aes256Gcm as AeadCore>::NonceSize::to_usize();
let nonce = Nonce::from_slice(
blob.get(..nonce_size)
.ok_or_internal_error("invalid encrypted data")?,
);
let plaintext = cipher
.decrypt(nonce, &blob[nonce_size..])
.ok_or_bad_request(
"Invalid encryption key, could not decrypt object metadata.",
)?;
Ok(Cow::Owned(plaintext))
}
Self::Plaintext => Ok(Cow::Borrowed(blob)),
}
}
// ---- function for encrypting / decrypting byte streams ----
/// Get a data block from the storage node, and decrypt+decompress it
/// if necessary. If object is plaintext, just get it without any processing.
pub async fn get_block(
&self,
garage: &Garage,
hash: &Hash,
order: Option<OrderTag>,
) -> Result<ByteStream, GarageError> {
let raw_block = garage
.block_manager
.rpc_get_block_streaming(hash, order)
.await?;
Ok(self.decrypt_block_stream(raw_block))
}
pub fn decrypt_block_stream(&self, stream: ByteStream) -> ByteStream {
match self {
Self::Plaintext => stream,
Self::SseC {
client_key,
compression_level,
..
} => {
let plaintext = DecryptStream::new(stream, *client_key);
if compression_level.is_some() {
let reader = stream_asyncread(Box::pin(plaintext));
let reader = BufReader::new(reader);
let reader = async_compression::tokio::bufread::ZstdDecoder::new(reader);
Box::pin(tokio_util::io::ReaderStream::new(reader))
} else {
Box::pin(plaintext)
}
}
}
}
/// Encrypt a data block if encryption is set, for use before
/// putting the data blocks into storage
pub fn encrypt_block(&self, block: Bytes) -> Result<Bytes, Error> {
match self {
Self::Plaintext => Ok(block),
Self::SseC {
client_key,
compression_level,
..
} => {
let block = if let Some(level) = compression_level {
Cow::Owned(
garage_block::zstd_encode(block.as_ref(), *level)
.ok_or_internal_error("failed to compress data block")?,
)
} else {
Cow::Borrowed(block.as_ref())
};
let mut ret = Vec::with_capacity(block.len() + 32 + block.len() / 64);
let mut nonce: Nonce<StreamNonceSize> = Default::default();
OsRng.fill_bytes(&mut nonce);
ret.extend_from_slice(nonce.as_slice());
let mut cipher = EncryptorLE31::<Aes256Gcm>::new(&client_key, &nonce);
let mut iter = block.chunks(STREAM_ENC_PLAIN_CHUNK_SIZE).peekable();
if iter.peek().is_none() {
// Empty stream: we encrypt an empty last chunk
let chunk_enc = cipher
.encrypt_last(&[][..])
.ok_or_internal_error("failed to encrypt chunk")?;
ret.extend_from_slice(&chunk_enc);
} else {
loop {
let chunk = iter.next().unwrap();
if iter.peek().is_some() {
let chunk_enc = cipher
.encrypt_next(chunk)
.ok_or_internal_error("failed to encrypt chunk")?;
assert_eq!(chunk.len(), STREAM_ENC_PLAIN_CHUNK_SIZE);
assert_eq!(chunk_enc.len(), STREAM_ENC_CYPER_CHUNK_SIZE);
ret.extend_from_slice(&chunk_enc);
} else {
// use encrypt_last for the last chunk
let chunk_enc = cipher
.encrypt_last(chunk)
.ok_or_internal_error("failed to encrypt chunk")?;
ret.extend_from_slice(&chunk_enc);
break;
}
}
}
Ok(ret.into())
}
}
}
}
fn parse_request_headers(
headers: &HeaderMap,
alg_header: &HeaderName,
key_header: &HeaderName,
md5_header: &HeaderName,
) -> Result<Option<(Key<Aes256Gcm>, Md5Output)>, Error> {
let alg = headers.get(alg_header).map(HeaderValue::as_bytes);
let key = headers.get(key_header).map(HeaderValue::as_bytes);
let md5 = headers.get(md5_header).map(HeaderValue::as_bytes);
match alg {
Some(CUSTOMER_ALGORITHM_AES256) => {
use md5::{Digest, Md5};
let key_b64 =
key.ok_or_bad_request("Missing server-side-encryption-customer-key header")?;
let key_bytes: [u8; 32] = BASE64_STANDARD
.decode(&key_b64)
.ok_or_bad_request(
"Invalid server-side-encryption-customer-key header: invalid base64",
)?
.try_into()
.ok()
.ok_or_bad_request(
"Invalid server-side-encryption-customer-key header: invalid length",
)?;
let md5_b64 =
md5.ok_or_bad_request("Missing server-side-encryption-customer-key-md5 header")?;
let md5_bytes = BASE64_STANDARD.decode(&md5_b64).ok_or_bad_request(
"Invalid server-side-encryption-customer-key-md5 header: invalid bass64",
)?;
let mut hasher = Md5::new();
hasher.update(&key_bytes[..]);
let our_md5 = hasher.finalize();
if our_md5.as_slice() != md5_bytes.as_slice() {
return Err(Error::bad_request(
"Server-side encryption client key MD5 checksum does not match",
));
}
Ok(Some((key_bytes.into(), our_md5)))
}
Some(alg) => Err(Error::InvalidEncryptionAlgorithm(
String::from_utf8_lossy(alg).into_owned(),
)),
None => {
if key.is_some() || md5.is_some() {
Err(Error::bad_request(
"Unexpected server-side-encryption-customer-key{,-md5} header(s)",
))
} else {
Ok(None)
}
}
}
}
// ---- encrypt & decrypt streams ----
#[pin_project::pin_project]
struct DecryptStream {
#[pin]
stream: ByteStream,
done_reading: bool,
buf: BytesBuf,
key: Key<Aes256Gcm>,
state: DecryptStreamState,
}
enum DecryptStreamState {
Starting,
Running(DecryptorLE31<Aes256Gcm>),
Done,
}
impl DecryptStream {
fn new(stream: ByteStream, key: Key<Aes256Gcm>) -> Self {
Self {
stream,
done_reading: false,
buf: BytesBuf::new(),
key,
state: DecryptStreamState::Starting,
}
}
}
impl Stream for DecryptStream {
type Item = Result<Bytes, std::io::Error>;
fn poll_next(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> task::Poll<Option<Self::Item>> {
use std::task::Poll;
let mut this = self.project();
// The first bytes of the stream should contain the starting nonce.
// If we don't have a Running state, it means that we haven't
// yet read the nonce.
while matches!(this.state, DecryptStreamState::Starting) {
let nonce_size = StreamNonceSize::to_usize();
if let Some(nonce) = this.buf.take_exact(nonce_size) {
let nonce = Nonce::from_slice(nonce.as_ref());
*this.state = DecryptStreamState::Running(DecryptorLE31::new(&this.key, nonce));
break;
}
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
None => {
return Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::UnexpectedEof,
"Decrypt: unexpected EOF, could not read nonce",
))));
}
}
}
// Read at least one byte more than the encrypted chunk size
// (if possible), so that we know if we are decrypting the
// last chunk or not.
while !*this.done_reading && this.buf.len() <= STREAM_ENC_CYPER_CHUNK_SIZE {
match futures::ready!(this.stream.as_mut().poll_next(cx)) {
Some(Ok(bytes)) => {
this.buf.extend(bytes);
}
Some(Err(e)) => {
return Poll::Ready(Some(Err(e)));
}
None => {
*this.done_reading = true;
break;
}
}
}
if matches!(this.state, DecryptStreamState::Done) {
if !this.buf.is_empty() {
return Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Decrypt: unexpected bytes after last encrypted chunk",
))));
}
return Poll::Ready(None);
}
let res = if this.buf.len() > STREAM_ENC_CYPER_CHUNK_SIZE {
// we have strictly more bytes than the encrypted chunk size,
// so we know this is not the last
let DecryptStreamState::Running(ref mut cipher) = this.state else {
unreachable!()
};
let chunk = this.buf.take_exact(STREAM_ENC_CYPER_CHUNK_SIZE).unwrap();
let chunk_dec = cipher.decrypt_next(chunk.as_ref());
if let Ok(c) = &chunk_dec {
assert_eq!(c.len(), STREAM_ENC_PLAIN_CHUNK_SIZE);
}
chunk_dec
} else {
// We have one encrypted chunk size or less, even though we tried
// to read more, so this is the last chunk. Decrypt using the
// appropriate decrypt_last() function that then destroys the cipher.
let state = std::mem::replace(this.state, DecryptStreamState::Done);
let DecryptStreamState::Running(cipher) = state else {
unreachable!()
};
let chunk = this.buf.take_all();
cipher.decrypt_last(chunk.as_ref())
};
match res {
Ok(bytes) if bytes.is_empty() => Poll::Ready(None),
Ok(bytes) => Poll::Ready(Some(Ok(bytes.into()))),
Err(_) => Poll::Ready(Some(Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Decryption failed",
)))),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::stream::StreamExt;
use garage_net::stream::read_stream_to_end;
fn stream() -> ByteStream {
Box::pin(
futures::stream::iter(16usize..1024)
.map(|i| Ok(Bytes::from(vec![(i % 256) as u8; (i * 37) % 1024]))),
)
}
async fn test_block_enc(compression_level: Option<i32>) {
let enc = EncryptionParams::SseC {
client_key: Aes256Gcm::generate_key(&mut OsRng),
client_key_md5: Default::default(), // not needed
compression_level,
};
let block_plain = read_stream_to_end(stream()).await.unwrap().into_bytes();
let block_enc = enc.encrypt_block(block_plain.clone()).unwrap();
let block_dec =
enc.decrypt_block_stream(Box::pin(futures::stream::once(async { Ok(block_enc) })));
let block_dec = read_stream_to_end(block_dec).await.unwrap().into_bytes();
assert_eq!(block_plain, block_dec);
assert!(block_dec.len() > 128000);
}
#[tokio::test]
async fn test_encrypt_block() {
test_block_enc(None).await
}
#[tokio::test]
async fn test_encrypt_block_compressed() {
test_block_enc(Some(1)).await
}
}

View file

@ -65,6 +65,10 @@ pub enum Error {
#[error(display = "Invalid HTTP range: {:?}", _0)] #[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
/// The client sent a range header with invalid value
#[error(display = "Invalid encryption algorithm: {:?}, should be AES256", _0)]
InvalidEncryptionAlgorithm(String),
/// The client sent a request for an action not supported by garage /// The client sent a request for an action not supported by garage
#[error(display = "Unimplemented action: {}", _0)] #[error(display = "Unimplemented action: {}", _0)]
NotImplemented(String), NotImplemented(String),
@ -126,6 +130,7 @@ impl Error {
Error::InvalidXml(_) => "MalformedXML", Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange", Error::InvalidRange(_) => "InvalidRange",
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest", Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
Error::InvalidEncryptionAlgorithm(_) => "InvalidEncryptionAlgorithmError",
} }
} }
} }
@ -143,6 +148,7 @@ impl ApiError for Error {
| Error::InvalidPart | Error::InvalidPart
| Error::InvalidPartOrder | Error::InvalidPartOrder
| Error::EntityTooSmall | Error::EntityTooSmall
| Error::InvalidEncryptionAlgorithm(_)
| Error::InvalidXml(_) | Error::InvalidXml(_)
| Error::InvalidUtf8Str(_) | Error::InvalidUtf8Str(_)
| Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST, | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,

View file

@ -3,8 +3,9 @@ use std::convert::TryInto;
use std::sync::Arc; use std::sync::Arc;
use std::time::{Duration, UNIX_EPOCH}; use std::time::{Duration, UNIX_EPOCH};
use bytes::Bytes;
use futures::future; use futures::future;
use futures::stream::{self, StreamExt}; use futures::stream::{self, Stream, StreamExt};
use http::header::{ use http::header::{
ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE, ACCEPT_RANGES, CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_LANGUAGE,
CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH, CONTENT_LENGTH, CONTENT_RANGE, CONTENT_TYPE, ETAG, EXPIRES, IF_MODIFIED_SINCE, IF_NONE_MATCH,
@ -25,6 +26,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*; use crate::helpers::*;
use crate::s3::api_server::ResBody; use crate::s3::api_server::ResBody;
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*; use crate::s3::error::*;
const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count"; const X_AMZ_MP_PARTS_COUNT: &str = "x-amz-mp-parts-count";
@ -42,6 +44,8 @@ pub struct GetObjectOverrides {
fn object_headers( fn object_headers(
version: &ObjectVersion, version: &ObjectVersion,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
headers: &ObjectVersionHeaders,
encryption: EncryptionParams,
) -> http::response::Builder { ) -> http::response::Builder {
debug!("Version meta: {:?}", version_meta); debug!("Version meta: {:?}", version_meta);
@ -49,7 +53,7 @@ fn object_headers(
let date_str = httpdate::fmt_http_date(date); let date_str = httpdate::fmt_http_date(date);
let mut resp = Response::builder() let mut resp = Response::builder()
.header(CONTENT_TYPE, version_meta.headers.content_type.to_string()) .header(CONTENT_TYPE, headers.content_type.to_string())
.header(LAST_MODIFIED, date_str) .header(LAST_MODIFIED, date_str)
.header(ACCEPT_RANGES, "bytes".to_string()); .header(ACCEPT_RANGES, "bytes".to_string());
@ -57,10 +61,12 @@ fn object_headers(
resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag)); resp = resp.header(ETAG, format!("\"{}\"", version_meta.etag));
} }
for (k, v) in version_meta.headers.other.iter() { for (k, v) in headers.other.iter() {
resp = resp.header(k, v.to_string()); resp = resp.header(k, v.to_string());
} }
encryption.add_response_headers(&mut resp);
resp resp
} }
@ -175,21 +181,27 @@ pub async fn handle_head_without_ctx(
return Ok(cached); return Ok(cached);
} }
let (encryption, headers) =
EncryptionParams::check_decrypt(&garage, req.headers(), &version_meta.encryption)?;
if let Some(pn) = part_number { if let Some(pn) = part_number {
match version_data { match version_data {
ObjectVersionData::Inline(_, bytes) => { ObjectVersionData::Inline(_, _) => {
if pn != 1 { if pn != 1 {
return Err(Error::InvalidPart); return Err(Error::InvalidPart);
} }
Ok(object_headers(object_version, version_meta) let bytes_len = version_meta.size;
.header(CONTENT_LENGTH, format!("{}", bytes.len())) Ok(
.header( object_headers(object_version, version_meta, &headers, encryption)
CONTENT_RANGE, .header(CONTENT_LENGTH, format!("{}", bytes_len))
format!("bytes 0-{}/{}", bytes.len() - 1, bytes.len()), .header(
) CONTENT_RANGE,
.header(X_AMZ_MP_PARTS_COUNT, "1") format!("bytes 0-{}/{}", bytes_len - 1, bytes_len),
.status(StatusCode::PARTIAL_CONTENT) )
.body(empty_body())?) .header(X_AMZ_MP_PARTS_COUNT, "1")
.status(StatusCode::PARTIAL_CONTENT)
.body(empty_body())?,
)
} }
ObjectVersionData::FirstBlock(_, _) => { ObjectVersionData::FirstBlock(_, _) => {
let version = garage let version = garage
@ -201,28 +213,32 @@ pub async fn handle_head_without_ctx(
let (part_offset, part_end) = let (part_offset, part_end) =
calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?; calculate_part_bounds(&version, pn).ok_or(Error::InvalidPart)?;
Ok(object_headers(object_version, version_meta) Ok(
.header(CONTENT_LENGTH, format!("{}", part_end - part_offset)) object_headers(object_version, version_meta, &headers, encryption)
.header( .header(CONTENT_LENGTH, format!("{}", part_end - part_offset))
CONTENT_RANGE, .header(
format!( CONTENT_RANGE,
"bytes {}-{}/{}", format!(
part_offset, "bytes {}-{}/{}",
part_end - 1, part_offset,
version_meta.size part_end - 1,
), version_meta.size
) ),
.header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?)) )
.status(StatusCode::PARTIAL_CONTENT) .header(X_AMZ_MP_PARTS_COUNT, format!("{}", version.n_parts()?))
.body(empty_body())?) .status(StatusCode::PARTIAL_CONTENT)
.body(empty_body())?,
)
} }
_ => unreachable!(), _ => unreachable!(),
} }
} else { } else {
Ok(object_headers(object_version, version_meta) Ok(
.header(CONTENT_LENGTH, format!("{}", version_meta.size)) object_headers(object_version, version_meta, &headers, encryption)
.status(StatusCode::OK) .header(CONTENT_LENGTH, format!("{}", version_meta.size))
.body(empty_body())?) .status(StatusCode::OK)
.body(empty_body())?,
)
} }
} }
@ -273,23 +289,41 @@ pub async fn handle_get_without_ctx(
return Ok(cached); return Ok(cached);
} }
let (enc, headers) =
EncryptionParams::check_decrypt(&garage, req.headers(), &last_v_meta.encryption)?;
match (part_number, parse_range_header(req, last_v_meta.size)?) { match (part_number, parse_range_header(req, last_v_meta.size)?) {
(Some(_), Some(_)) => Err(Error::bad_request( (Some(_), Some(_)) => Err(Error::bad_request(
"Cannot specify both partNumber and Range header", "Cannot specify both partNumber and Range header",
)), )),
(Some(pn), None) => handle_get_part(garage, last_v, last_v_data, last_v_meta, pn).await, (Some(pn), None) => {
handle_get_part(garage, last_v, last_v_data, last_v_meta, enc, &headers, pn).await
}
(None, Some(range)) => { (None, Some(range)) => {
handle_get_range( handle_get_range(
garage, garage,
last_v, last_v,
last_v_data, last_v_data,
last_v_meta, last_v_meta,
enc,
&headers,
range.start, range.start,
range.start + range.length, range.start + range.length,
) )
.await .await
} }
(None, None) => handle_get_full(garage, last_v, last_v_data, last_v_meta, overrides).await, (None, None) => {
handle_get_full(
garage,
last_v,
last_v_data,
last_v_meta,
enc,
&headers,
overrides,
)
.await
}
} }
} }
@ -298,17 +332,36 @@ async fn handle_get_full(
version: &ObjectVersion, version: &ObjectVersion,
version_data: &ObjectVersionData, version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
encryption: EncryptionParams,
headers: &ObjectVersionHeaders,
overrides: GetObjectOverrides, overrides: GetObjectOverrides,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let mut resp_builder = object_headers(version, version_meta) let mut resp_builder = object_headers(version, version_meta, &headers, encryption)
.header(CONTENT_LENGTH, format!("{}", version_meta.size)) .header(CONTENT_LENGTH, format!("{}", version_meta.size))
.status(StatusCode::OK); .status(StatusCode::OK);
getobject_override_headers(overrides, &mut resp_builder)?; getobject_override_headers(overrides, &mut resp_builder)?;
let stream = full_object_byte_stream(garage, version, version_data, encryption);
Ok(resp_builder.body(response_body_from_stream(stream))?)
}
pub fn full_object_byte_stream(
garage: Arc<Garage>,
version: &ObjectVersion,
version_data: &ObjectVersionData,
encryption: EncryptionParams,
) -> ByteStream {
match &version_data { match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_, bytes) => { ObjectVersionData::Inline(_, bytes) => {
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) let bytes = bytes.to_vec();
Box::pin(futures::stream::once(async move {
encryption
.decrypt_blob(&bytes)
.map(|x| Bytes::from(x.to_vec()))
.map_err(std_error_from_read_error)
}))
} }
ObjectVersionData::FirstBlock(_, first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel::<ByteStream>(2); let (tx, rx) = mpsc::channel::<ByteStream>(2);
@ -324,19 +377,18 @@ async fn handle_get_full(
garage2.version_table.get(&version_uuid, &EmptyKey).await garage2.version_table.get(&version_uuid, &EmptyKey).await
}); });
let stream_block_0 = garage let stream_block_0 = encryption
.block_manager .get_block(&garage, &first_block_hash, Some(order_stream.order(0)))
.rpc_get_block_streaming(&first_block_hash, Some(order_stream.order(0)))
.await?; .await?;
tx.send(stream_block_0) tx.send(stream_block_0)
.await .await
.ok_or_message("channel closed")?; .ok_or_message("channel closed")?;
let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?; let version = version_fut.await.unwrap()?.ok_or(Error::NoSuchKey)?;
for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) { for (i, (_, vb)) in version.blocks.items().iter().enumerate().skip(1) {
let stream_block_i = garage let stream_block_i = encryption
.block_manager .get_block(&garage, &vb.hash, Some(order_stream.order(i as u64)))
.rpc_get_block_streaming(&vb.hash, Some(order_stream.order(i as u64)))
.await?; .await?;
tx.send(stream_block_i) tx.send(stream_block_i)
.await .await
@ -354,8 +406,7 @@ async fn handle_get_full(
} }
}); });
let body = response_body_from_block_stream(rx); Box::pin(tokio_stream::wrappers::ReceiverStream::new(rx).flatten())
Ok(resp_builder.body(body)?)
} }
} }
} }
@ -365,13 +416,15 @@ async fn handle_get_range(
version: &ObjectVersion, version: &ObjectVersion,
version_data: &ObjectVersionData, version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
encryption: EncryptionParams,
headers: &ObjectVersionHeaders,
begin: u64, begin: u64,
end: u64, end: u64,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
// Here we do not use getobject_override_headers because we don't // Here we do not use getobject_override_headers because we don't
// want to add any overridden headers (those should not be added // want to add any overridden headers (those should not be added
// when returning PARTIAL_CONTENT) // when returning PARTIAL_CONTENT)
let resp_builder = object_headers(version, version_meta) let resp_builder = object_headers(version, version_meta, headers, encryption)
.header(CONTENT_LENGTH, format!("{}", end - begin)) .header(CONTENT_LENGTH, format!("{}", end - begin))
.header( .header(
CONTENT_RANGE, CONTENT_RANGE,
@ -382,6 +435,7 @@ async fn handle_get_range(
match &version_data { match &version_data {
ObjectVersionData::DeleteMarker => unreachable!(), ObjectVersionData::DeleteMarker => unreachable!(),
ObjectVersionData::Inline(_meta, bytes) => { ObjectVersionData::Inline(_meta, bytes) => {
let bytes = encryption.decrypt_blob(&bytes)?;
if end as usize <= bytes.len() { if end as usize <= bytes.len() {
let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into()); let body = bytes_body(bytes[begin as usize..end as usize].to_vec().into());
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
@ -398,7 +452,8 @@ async fn handle_get_range(
.await? .await?
.ok_or(Error::NoSuchKey)?; .ok_or(Error::NoSuchKey)?;
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); let body =
body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder.body(body)?) Ok(resp_builder.body(body)?)
} }
} }
@ -409,17 +464,21 @@ async fn handle_get_part(
object_version: &ObjectVersion, object_version: &ObjectVersion,
version_data: &ObjectVersionData, version_data: &ObjectVersionData,
version_meta: &ObjectVersionMeta, version_meta: &ObjectVersionMeta,
encryption: EncryptionParams,
headers: &ObjectVersionHeaders,
part_number: u64, part_number: u64,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
// Same as for get_range, no getobject_override_headers // Same as for get_range, no getobject_override_headers
let resp_builder = let resp_builder = object_headers(object_version, version_meta, headers, encryption)
object_headers(object_version, version_meta).status(StatusCode::PARTIAL_CONTENT); .status(StatusCode::PARTIAL_CONTENT);
match version_data { match version_data {
ObjectVersionData::Inline(_, bytes) => { ObjectVersionData::Inline(_, bytes) => {
if part_number != 1 { if part_number != 1 {
return Err(Error::InvalidPart); return Err(Error::InvalidPart);
} }
let bytes = encryption.decrypt_blob(&bytes)?;
assert_eq!(bytes.len() as u64, version_meta.size);
Ok(resp_builder Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", bytes.len())) .header(CONTENT_LENGTH, format!("{}", bytes.len()))
.header( .header(
@ -427,7 +486,7 @@ async fn handle_get_part(
format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()), format!("bytes {}-{}/{}", 0, bytes.len() - 1, bytes.len()),
) )
.header(X_AMZ_MP_PARTS_COUNT, "1") .header(X_AMZ_MP_PARTS_COUNT, "1")
.body(bytes_body(bytes.to_vec().into()))?) .body(bytes_body(bytes.into_owned().into()))?)
} }
ObjectVersionData::FirstBlock(_, _) => { ObjectVersionData::FirstBlock(_, _) => {
let version = garage let version = garage
@ -439,7 +498,8 @@ async fn handle_get_part(
let (begin, end) = let (begin, end) =
calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?; calculate_part_bounds(&version, part_number).ok_or(Error::InvalidPart)?;
let body = body_from_blocks_range(garage, version.blocks.items(), begin, end); let body =
body_from_blocks_range(garage, encryption, version.blocks.items(), begin, end);
Ok(resp_builder Ok(resp_builder
.header(CONTENT_LENGTH, format!("{}", end - begin)) .header(CONTENT_LENGTH, format!("{}", end - begin))
@ -494,6 +554,7 @@ fn calculate_part_bounds(v: &Version, part_number: u64) -> Option<(u64, u64)> {
fn body_from_blocks_range( fn body_from_blocks_range(
garage: Arc<Garage>, garage: Arc<Garage>,
encryption: EncryptionParams,
all_blocks: &[(VersionBlockKey, VersionBlock)], all_blocks: &[(VersionBlockKey, VersionBlock)],
begin: u64, begin: u64,
end: u64, end: u64,
@ -523,12 +584,11 @@ fn body_from_blocks_range(
tokio::spawn(async move { tokio::spawn(async move {
match async { match async {
let garage = garage.clone();
for (i, (block, block_offset)) in blocks.iter().enumerate() { for (i, (block, block_offset)) in blocks.iter().enumerate() {
let block_stream = garage let block_stream = encryption
.block_manager .get_block(&garage, &block.hash, Some(order_stream.order(i as u64)))
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64))) .await?;
.await? let block_stream = block_stream
.scan(*block_offset, move |chunk_offset, chunk| { .scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk { let r = match chunk {
Ok(chunk_bytes) => { Ok(chunk_bytes) => {
@ -588,19 +648,30 @@ fn body_from_blocks_range(
} }
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody { fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx).flatten();
.flatten() response_body_from_stream(body_stream)
.map(|x| { }
x.map(hyper::body::Frame::data)
.map_err(|e| Error::from(garage_util::error::Error::from(e))) fn response_body_from_stream<S>(stream: S) -> ResBody
}); where
S: Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static,
{
let body_stream = stream.map(|x| {
x.map(hyper::body::Frame::data)
.map_err(|e| Error::from(garage_util::error::Error::from(e)))
});
ResBody::new(http_body_util::StreamBody::new(body_stream)) ResBody::new(http_body_util::StreamBody::new(body_stream))
} }
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream { fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream {
let err = std::io::Error::new( Box::pin(stream::once(future::ready(Err(std_error_from_read_error(
std::io::ErrorKind::Other, e,
format!("Error while getting object data: {}", e), )))))
); }
Box::pin(stream::once(future::ready(Err(err))))
fn std_error_from_read_error<E: std::fmt::Display>(e: E) -> std::io::Error {
std::io::Error::new(
std::io::ErrorKind::Other,
format!("Error while reading object data: {}", e),
)
} }

View file

@ -944,9 +944,11 @@ mod tests {
timestamp: TS, timestamp: TS,
state: ObjectVersionState::Uploading { state: ObjectVersionState::Uploading {
multipart: true, multipart: true,
headers: ObjectVersionHeaders { encryption: ObjectVersionEncryption::Plaintext {
content_type: "text/plain".to_string(), headers: ObjectVersionHeaders {
other: BTreeMap::<String, String>::new(), content_type: "text/plain".to_string(),
other: BTreeMap::<String, String>::new(),
},
}, },
}, },
} }

View file

@ -13,5 +13,6 @@ mod post_object;
mod put; mod put;
mod website; mod website;
mod encryption;
mod router; mod router;
pub mod xml; pub mod xml;

View file

@ -16,6 +16,7 @@ use garage_model::s3::version_table::*;
use crate::helpers::*; use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*; use crate::s3::error::*;
use crate::s3::put::*; use crate::s3::put::*;
use crate::s3::xml as s3_xml; use crate::s3::xml as s3_xml;
@ -41,13 +42,17 @@ pub async fn handle_create_multipart_upload(
let headers = get_headers(req.headers())?; let headers = get_headers(req.headers())?;
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&garage, req.headers())?;
let object_encryption = encryption.encrypt_headers(headers)?;
// Create object in object table // Create object in object table
let object_version = ObjectVersion { let object_version = ObjectVersion {
uuid: upload_id, uuid: upload_id,
timestamp, timestamp,
state: ObjectVersionState::Uploading { state: ObjectVersionState::Uploading {
multipart: true, multipart: true,
headers, encryption: object_encryption,
}, },
}; };
let object = Object::new(*bucket_id, key.to_string(), vec![object_version]); let object = Object::new(*bucket_id, key.to_string(), vec![object_version]);
@ -68,7 +73,9 @@ pub async fn handle_create_multipart_upload(
}; };
let xml = s3_xml::to_xml_with_header(&result)?; let xml = s3_xml::to_xml_with_header(&result)?;
Ok(Response::new(string_body(xml))) let mut resp = Response::builder();
encryption.add_response_headers(&mut resp);
Ok(resp.body(string_body(xml))?)
} }
pub async fn handle_put_part( pub async fn handle_put_part(
@ -91,12 +98,21 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists // Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string(); let key = key.to_string();
let stream = body_stream(req.into_body()); let (req_head, req_body) = req.into_parts();
let stream = body_stream(req_body);
let mut chunker = StreamChunker::new(stream, garage.config.block_size); let mut chunker = StreamChunker::new(stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = let ((_, object_version, mut mpu), first_block) =
futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?; futures::try_join!(get_upload(&ctx, &key, &upload_id), chunker.next(),)?;
// Check encryption params
let object_encryption = match object_version.state {
ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(),
};
let (encryption, _) =
EncryptionParams::check_decrypt(&garage, &req_head.headers, &object_encryption)?;
// Check object is valid and part can be accepted // Check object is valid and part can be accepted
let first_block = first_block.ok_or_bad_request("Empty body")?; let first_block = first_block.ok_or_bad_request("Empty body")?;
@ -136,24 +152,32 @@ pub async fn handle_put_part(
garage.version_table.insert(&version).await?; garage.version_table.insert(&version).await?;
// Copy data to version // Copy data to version
let (total_size, data_md5sum, data_sha256sum, _) = let (total_size, data_md5sum, data_sha256sum, _) = read_and_put_blocks(
read_and_put_blocks(&ctx, &version, part_number, first_block, &mut chunker).await?; &ctx,
&version,
encryption,
part_number,
first_block,
&mut chunker,
)
.await?;
// Verify that checksums map // Verify that checksums map
ensure_checksum_matches( ensure_checksum_matches(
data_md5sum.as_slice(), &data_md5sum,
data_sha256sum, data_sha256sum,
content_md5.as_deref(), content_md5.as_deref(),
content_sha256, content_sha256,
)?; )?;
// Store part etag in version // Store part etag in version
let data_md5sum_hex = hex::encode(data_md5sum); let etag = encryption.etag_from_md5(&data_md5sum);
mpu.parts.put( mpu.parts.put(
mpu_part_key, mpu_part_key,
MpuPart { MpuPart {
version: version_uuid, version: version_uuid,
etag: Some(data_md5sum_hex.clone()), etag: Some(etag.clone()),
size: Some(total_size), size: Some(total_size),
}, },
); );
@ -163,11 +187,9 @@ pub async fn handle_put_part(
// We won't have to clean up on drop. // We won't have to clean up on drop.
interrupted_cleanup.cancel(); interrupted_cleanup.cancel();
let response = Response::builder() let mut resp = Response::builder().header("ETag", format!("\"{}\"", etag));
.header("ETag", format!("\"{}\"", data_md5sum_hex)) encryption.add_response_headers(&mut resp);
.body(empty_body()) Ok(resp.body(empty_body())?)
.unwrap();
Ok(response)
} }
struct InterruptedCleanup(Option<InterruptedCleanupInner>); struct InterruptedCleanup(Option<InterruptedCleanupInner>);
@ -241,8 +263,8 @@ pub async fn handle_complete_multipart_upload(
return Err(Error::bad_request("No data was uploaded")); return Err(Error::bad_request("No data was uploaded"));
} }
let headers = match object_version.state { let object_encryption = match object_version.state {
ObjectVersionState::Uploading { headers, .. } => headers, ObjectVersionState::Uploading { encryption, .. } => encryption,
_ => unreachable!(), _ => unreachable!(),
}; };
@ -344,7 +366,7 @@ pub async fn handle_complete_multipart_upload(
// Write final object version // Write final object version
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta { ObjectVersionMeta {
headers, encryption: object_encryption,
size: total_size, size: total_size,
etag: etag.clone(), etag: etag.clone(),
}, },

View file

@ -18,6 +18,7 @@ use garage_model::garage::Garage;
use crate::helpers::*; use crate::helpers::*;
use crate::s3::api_server::ResBody; use crate::s3::api_server::ResBody;
use crate::s3::cors::*; use crate::s3::cors::*;
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*; use crate::s3::error::*;
use crate::s3::put::{get_headers, save_stream}; use crate::s3::put::{get_headers, save_stream};
use crate::s3::xml as s3_xml; use crate::s3::xml as s3_xml;
@ -48,13 +49,17 @@ pub async fn handle_post_object(
let mut multipart = Multipart::with_constraints(stream, boundary, constraints); let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
let mut params = HeaderMap::new(); let mut params = HeaderMap::new();
let field = loop { let file_field = loop {
let field = if let Some(field) = multipart.next_field().await? { let field = if let Some(field) = multipart.next_field().await? {
field field
} else { } else {
return Err(Error::bad_request("Request did not contain a file")); return Err(Error::bad_request("Request did not contain a file"));
}; };
let name: HeaderName = if let Some(Ok(name)) = field.name().map(TryInto::try_into) { let name: HeaderName = if let Some(Ok(name)) = field
.name()
.map(str::to_ascii_lowercase)
.map(TryInto::try_into)
{
name name
} else { } else {
continue; continue;
@ -93,10 +98,14 @@ pub async fn handle_post_object(
.ok_or_bad_request("No policy was provided")? .ok_or_bad_request("No policy was provided")?
.to_str()?; .to_str()?;
let authorization = Authorization::parse_form(&params)?; let authorization = Authorization::parse_form(&params)?;
let content_md5 = params
.get("content-md5")
.map(HeaderValue::to_str)
.transpose()?;
let key = if key.contains("${filename}") { let key = if key.contains("${filename}") {
// if no filename is provided, don't replace. This matches the behavior of AWS. // if no filename is provided, don't replace. This matches the behavior of AWS.
if let Some(filename) = field.file_name() { if let Some(filename) = file_field.file_name() {
key.replace("${filename}", filename) key.replace("${filename}", filename)
} else { } else {
key.to_owned() key.to_owned()
@ -143,9 +152,8 @@ pub async fn handle_post_object(
let mut conditions = decoded_policy.into_conditions()?; let mut conditions = decoded_policy.into_conditions()?;
for (param_key, value) in params.iter() { for (param_key, value) in params.iter() {
let mut param_key = param_key.to_string(); let param_key = param_key.as_str();
param_key.make_ascii_lowercase(); match param_key {
match param_key.as_str() {
"policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields "policy" | "x-amz-signature" => (), // this is always accepted, as it's required to validate other fields
"content-type" => { "content-type" => {
let conds = conditions.params.remove("content-type").ok_or_else(|| { let conds = conditions.params.remove("content-type").ok_or_else(|| {
@ -190,7 +198,7 @@ pub async fn handle_post_object(
// how aws seems to behave. // how aws seems to behave.
continue; continue;
} }
let conds = conditions.params.remove(&param_key).ok_or_else(|| { let conds = conditions.params.remove(param_key).ok_or_else(|| {
Error::bad_request(format!("Key '{}' is not allowed in policy", param_key)) Error::bad_request(format!("Key '{}' is not allowed in policy", param_key))
})?; })?;
for cond in conds { for cond in conds {
@ -218,8 +226,9 @@ pub async fn handle_post_object(
let headers = get_headers(&params)?; let headers = get_headers(&params)?;
let stream = field.map(|r| r.map_err(Into::into)); let encryption = EncryptionParams::new_from_headers(&garage, &params)?;
let stream = file_field.map(|r| r.map_err(Into::into));
let ctx = ReqCtx { let ctx = ReqCtx {
garage, garage,
bucket_id, bucket_id,
@ -228,17 +237,18 @@ pub async fn handle_post_object(
api_key, api_key,
}; };
let (_, md5) = save_stream( let res = save_stream(
&ctx, &ctx,
headers, headers,
encryption,
StreamLimiter::new(stream, conditions.content_length), StreamLimiter::new(stream, conditions.content_length),
&key, &key,
None, content_md5.map(str::to_string),
None, None,
) )
.await?; .await?;
let etag = format!("\"{}\"", md5); let etag = format!("\"{}\"", res.etag);
let mut resp = if let Some(mut target) = params let mut resp = if let Some(mut target) = params
.get("success_action_redirect") .get("success_action_redirect")
@ -252,11 +262,12 @@ pub async fn handle_post_object(
.append_pair("key", &key) .append_pair("key", &key)
.append_pair("etag", &etag); .append_pair("etag", &etag);
let target = target.to_string(); let target = target.to_string();
Response::builder() let mut resp = Response::builder()
.status(StatusCode::SEE_OTHER) .status(StatusCode::SEE_OTHER)
.header(header::LOCATION, target.clone()) .header(header::LOCATION, target.clone())
.header(header::ETAG, etag) .header(header::ETAG, etag);
.body(string_body(target))? encryption.add_response_headers(&mut resp);
resp.body(string_body(target))?
} else { } else {
let path = head let path = head
.uri .uri
@ -283,9 +294,10 @@ pub async fn handle_post_object(
.get("success_action_status") .get("success_action_status")
.and_then(|h| h.to_str().ok()) .and_then(|h| h.to_str().ok())
.unwrap_or("204"); .unwrap_or("204");
let builder = Response::builder() let mut builder = Response::builder()
.header(header::LOCATION, location.clone()) .header(header::LOCATION, location.clone())
.header(header::ETAG, etag.clone()); .header(header::ETAG, etag.clone());
encryption.add_response_headers(&mut builder);
match action { match action {
"200" => builder.status(StatusCode::OK).body(empty_body())?, "200" => builder.status(StatusCode::OK).body(empty_body())?,
"201" => { "201" => {

View file

@ -36,10 +36,18 @@ use garage_model::s3::version_table::*;
use crate::helpers::*; use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::encryption::EncryptionParams;
use crate::s3::error::*; use crate::s3::error::*;
const PUT_BLOCKS_MAX_PARALLEL: usize = 3; const PUT_BLOCKS_MAX_PARALLEL: usize = 3;
pub struct SaveStreamResult {
pub version_uuid: Uuid,
pub version_timestamp: u64,
/// Etag WITHOUT THE QUOTES (just the hex value)
pub etag: String,
}
pub async fn handle_put( pub async fn handle_put(
ctx: ReqCtx, ctx: ReqCtx,
req: Request<ReqBody>, req: Request<ReqBody>,
@ -50,6 +58,9 @@ pub async fn handle_put(
let headers = get_headers(req.headers())?; let headers = get_headers(req.headers())?;
debug!("Object headers: {:?}", headers); debug!("Object headers: {:?}", headers);
// Determine whether object should be encrypted, and if so the key
let encryption = EncryptionParams::new_from_headers(&ctx.garage, req.headers())?;
let content_md5 = match req.headers().get("content-md5") { let content_md5 = match req.headers().get("content-md5") {
Some(x) => Some(x.to_str()?.to_string()), Some(x) => Some(x.to_str()?.to_string()),
None => None, None => None,
@ -57,19 +68,33 @@ pub async fn handle_put(
let stream = body_stream(req.into_body()); let stream = body_stream(req.into_body());
save_stream(&ctx, headers, stream, key, content_md5, content_sha256) let res = save_stream(
.await &ctx,
.map(|(uuid, md5)| put_response(uuid, md5)) headers,
encryption,
stream,
key,
content_md5,
content_sha256,
)
.await?;
let mut resp = Response::builder()
.header("x-amz-version-id", hex::encode(res.version_uuid))
.header("ETag", format!("\"{}\"", res.etag));
encryption.add_response_headers(&mut resp);
Ok(resp.body(empty_body())?)
} }
pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>( pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ctx: &ReqCtx, ctx: &ReqCtx,
headers: ObjectVersionHeaders, headers: ObjectVersionHeaders,
encryption: EncryptionParams,
body: S, body: S,
key: &String, key: &String,
content_md5: Option<String>, content_md5: Option<String>,
content_sha256: Option<FixedBytes32>, content_sha256: Option<FixedBytes32>,
) -> Result<(Uuid, String), Error> { ) -> Result<SaveStreamResult, Error> {
let ReqCtx { let ReqCtx {
garage, bucket_id, .. garage, bucket_id, ..
} = ctx; } = ctx;
@ -82,6 +107,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let first_block = first_block_opt.unwrap_or_default(); let first_block = first_block_opt.unwrap_or_default();
let object_encryption = encryption.encrypt_headers(headers)?;
// Generate identity of new version // Generate identity of new version
let version_uuid = gen_uuid(); let version_uuid = gen_uuid();
let version_timestamp = next_timestamp(existing_object.as_ref()); let version_timestamp = next_timestamp(existing_object.as_ref());
@ -92,37 +119,43 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
let mut md5sum = Md5::new(); let mut md5sum = Md5::new();
md5sum.update(&first_block[..]); md5sum.update(&first_block[..]);
let data_md5sum = md5sum.finalize(); let data_md5sum = md5sum.finalize();
let data_md5sum_hex = hex::encode(data_md5sum);
let data_sha256sum = sha256sum(&first_block[..]); let data_sha256sum = sha256sum(&first_block[..]);
let size = first_block.len() as u64;
ensure_checksum_matches( ensure_checksum_matches(
data_md5sum.as_slice(), &data_md5sum,
data_sha256sum, data_sha256sum,
content_md5.as_deref(), content_md5.as_deref(),
content_sha256, content_sha256,
)?; )?;
let size = first_block.len() as u64;
check_quotas(ctx, size, existing_object.as_ref()).await?; check_quotas(ctx, size, existing_object.as_ref()).await?;
let etag = encryption.etag_from_md5(&data_md5sum);
let inline_data = encryption.encrypt_blob(&first_block)?.to_vec();
let object_version = ObjectVersion { let object_version = ObjectVersion {
uuid: version_uuid, uuid: version_uuid,
timestamp: version_timestamp, timestamp: version_timestamp,
state: ObjectVersionState::Complete(ObjectVersionData::Inline( state: ObjectVersionState::Complete(ObjectVersionData::Inline(
ObjectVersionMeta { ObjectVersionMeta {
headers, encryption: object_encryption,
size, size,
etag: data_md5sum_hex.clone(), etag: etag.clone(),
}, },
first_block.to_vec(), inline_data,
)), )),
}; };
let object = Object::new(*bucket_id, key.into(), vec![object_version]); let object = Object::new(*bucket_id, key.into(), vec![object_version]);
garage.object_table.insert(&object).await?; garage.object_table.insert(&object).await?;
return Ok((version_uuid, data_md5sum_hex)); return Ok(SaveStreamResult {
version_uuid,
version_timestamp,
etag,
});
} }
// The following consists in many steps that can each fail. // The following consists in many steps that can each fail.
@ -142,7 +175,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
uuid: version_uuid, uuid: version_uuid,
timestamp: version_timestamp, timestamp: version_timestamp,
state: ObjectVersionState::Uploading { state: ObjectVersionState::Uploading {
headers: headers.clone(), encryption: object_encryption.clone(),
multipart: false, multipart: false,
}, },
}; };
@ -165,10 +198,10 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// Transfer data and verify checksum // Transfer data and verify checksum
let (total_size, data_md5sum, data_sha256sum, first_block_hash) = let (total_size, data_md5sum, data_sha256sum, first_block_hash) =
read_and_put_blocks(ctx, &version, 1, first_block, &mut chunker).await?; read_and_put_blocks(ctx, &version, encryption, 1, first_block, &mut chunker).await?;
ensure_checksum_matches( ensure_checksum_matches(
data_md5sum.as_slice(), &data_md5sum,
data_sha256sum, data_sha256sum,
content_md5.as_deref(), content_md5.as_deref(),
content_sha256, content_sha256,
@ -177,12 +210,13 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
check_quotas(ctx, total_size, existing_object.as_ref()).await?; check_quotas(ctx, total_size, existing_object.as_ref()).await?;
// Save final object state, marked as Complete // Save final object state, marked as Complete
let md5sum_hex = hex::encode(data_md5sum); let etag = encryption.etag_from_md5(&data_md5sum);
object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock( object_version.state = ObjectVersionState::Complete(ObjectVersionData::FirstBlock(
ObjectVersionMeta { ObjectVersionMeta {
headers, encryption: object_encryption,
size: total_size, size: total_size,
etag: md5sum_hex.clone(), etag: etag.clone(),
}, },
first_block_hash, first_block_hash,
)); ));
@ -193,7 +227,11 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
// We won't have to clean up on drop. // We won't have to clean up on drop.
interrupted_cleanup.cancel(); interrupted_cleanup.cancel();
Ok((version_uuid, md5sum_hex)) Ok(SaveStreamResult {
version_uuid,
version_timestamp,
etag,
})
} }
/// Validate MD5 sum against content-md5 header /// Validate MD5 sum against content-md5 header
@ -290,6 +328,7 @@ pub(crate) async fn check_quotas(
pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>( pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
ctx: &ReqCtx, ctx: &ReqCtx,
version: &Version, version: &Version,
encryption: EncryptionParams,
part_number: u64, part_number: u64,
first_block: Bytes, first_block: Bytes,
chunker: &mut StreamChunker<S>, chunker: &mut StreamChunker<S>,
@ -349,12 +388,31 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
)) ))
}; };
let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, Hash), Error>>(1); let (block_tx3, mut block_rx3) = mpsc::channel::<Result<(Bytes, u64, Hash), Error>>(1);
let hash_blocks = async { let encrypt_hash_blocks = async {
let mut first_block_hash = None; let mut first_block_hash = None;
while let Some(next) = block_rx2.recv().await { while let Some(next) = block_rx2.recv().await {
match next { match next {
Ok(block) => { Ok(block) => {
let unencrypted_len = block.len() as u64;
let block = if encryption.is_encrypted() {
let res =
tokio::task::spawn_blocking(move || encryption.encrypt_block(block))
.with_context(Context::current_with_span(
tracer.start("Encrypt block"),
))
.await
.unwrap();
match res {
Ok(b) => b,
Err(e) => {
block_tx3.send(Err(e)).await?;
break;
}
}
} else {
block
};
let hash = async_blake2sum(block.clone()) let hash = async_blake2sum(block.clone())
.with_context(Context::current_with_span( .with_context(Context::current_with_span(
tracer.start("Hash block (blake2)"), tracer.start("Hash block (blake2)"),
@ -363,7 +421,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
if first_block_hash.is_none() { if first_block_hash.is_none() {
first_block_hash = Some(hash); first_block_hash = Some(hash);
} }
block_tx3.send(Ok((block, hash))).await?; block_tx3.send(Ok((block, unencrypted_len, hash))).await?;
} }
Err(e) => { Err(e) => {
block_tx3.send(Err(e)).await?; block_tx3.send(Err(e)).await?;
@ -398,7 +456,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
block_rx3.recv().await block_rx3.recv().await
} }
}; };
let (block, hash) = tokio::select! { let (block, unencrypted_len, hash) = tokio::select! {
result = write_futs_next => { result = write_futs_next => {
result?; result?;
continue; continue;
@ -410,17 +468,18 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
}; };
// For next block to be written: count its size and spawn future to write it // For next block to be written: count its size and spawn future to write it
let offset = written_bytes;
written_bytes += block.len() as u64;
write_futs.push_back(put_block_and_meta( write_futs.push_back(put_block_and_meta(
ctx, ctx,
version, version,
part_number, part_number,
offset, written_bytes,
hash, hash,
block, block,
unencrypted_len,
encryption.is_encrypted(),
order_stream.order(written_bytes), order_stream.order(written_bytes),
)); ));
written_bytes += unencrypted_len;
} }
while let Some(res) = write_futs.next().await { while let Some(res) = write_futs.next().await {
res?; res?;
@ -429,7 +488,7 @@ pub(crate) async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> +
}; };
let (_, stream_hash_result, block_hash_result, final_result) = let (_, stream_hash_result, block_hash_result, final_result) =
futures::join!(read_blocks, hash_stream, hash_blocks, put_blocks); futures::join!(read_blocks, hash_stream, encrypt_hash_blocks, put_blocks);
let total_size = final_result?; let total_size = final_result?;
// unwrap here is ok, because if hasher failed, it is because something failed // unwrap here is ok, because if hasher failed, it is because something failed
@ -449,6 +508,8 @@ async fn put_block_and_meta(
offset: u64, offset: u64,
hash: Hash, hash: Hash,
block: Bytes, block: Bytes,
size: u64,
is_encrypted: bool,
order_tag: OrderTag, order_tag: OrderTag,
) -> Result<(), GarageError> { ) -> Result<(), GarageError> {
let ReqCtx { garage, .. } = ctx; let ReqCtx { garage, .. } = ctx;
@ -459,10 +520,7 @@ async fn put_block_and_meta(
part_number, part_number,
offset, offset,
}, },
VersionBlock { VersionBlock { hash, size },
hash,
size: block.len() as u64,
},
); );
let block_ref = BlockRef { let block_ref = BlockRef {
@ -474,7 +532,7 @@ async fn put_block_and_meta(
futures::try_join!( futures::try_join!(
garage garage
.block_manager .block_manager
.rpc_put_block(hash, block, Some(order_tag)), .rpc_put_block(hash, block, is_encrypted, Some(order_tag)),
garage.version_table.insert(&version), garage.version_table.insert(&version),
garage.block_ref_table.insert(&block_ref), garage.block_ref_table.insert(&block_ref),
)?; )?;
@ -517,14 +575,6 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
} }
} }
pub fn put_response(version_uuid: Uuid, md5sum_hex: String) -> Response<ResBody> {
Response::builder()
.header("x-amz-version-id", hex::encode(version_uuid))
.header("ETag", format!("\"{}\"", md5sum_hex))
.body(empty_body())
.unwrap()
}
struct InterruptedCleanup(Option<InterruptedCleanupInner>); struct InterruptedCleanup(Option<InterruptedCleanupInner>);
struct InterruptedCleanupInner { struct InterruptedCleanupInner {
garage: Arc<Garage>, garage: Arc<Garage>,

View file

@ -96,7 +96,7 @@ impl DataBlock {
} }
} }
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> { pub fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new(); let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?; let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?; encoder.include_checksum(true)?;

View file

@ -9,3 +9,5 @@ mod block;
mod layout; mod layout;
mod metrics; mod metrics;
mod rc; mod rc;
pub use block::zstd_encode;

View file

@ -337,26 +337,18 @@ impl BlockManager {
} }
} }
/// Ask nodes that might have a block for it, return it as one big Bytes
pub async fn rpc_get_block(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<Bytes, Error> {
let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
Ok(read_stream_to_end(stream).await?.into_bytes())
}
/// Send block to nodes that should have it /// Send block to nodes that should have it
pub async fn rpc_put_block( pub async fn rpc_put_block(
&self, &self,
hash: Hash, hash: Hash,
data: Bytes, data: Bytes,
prevent_compression: bool,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let who = self.replication.write_sets(&hash); let who = self.replication.write_sets(&hash);
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
.await .await
.into_parts(); .into_parts();
let put_block_rpc = let put_block_rpc =

View file

@ -210,7 +210,165 @@ mod v09 {
} }
} }
pub use v09::*; mod v010 {
use garage_util::data::{Hash, Uuid};
use serde::{Deserialize, Serialize};
use super::v09;
pub use v09::ObjectVersionHeaders;
/// An object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct Object {
/// The bucket in which the object is stored, used as partition key
pub bucket_id: Uuid,
/// The key at which the object is stored in its bucket, used as sorting key
pub key: String,
/// The list of currenty stored versions of the object
pub(super) versions: Vec<ObjectVersion>,
}
/// Informations about a version of an object
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersion {
/// Id of the version
pub uuid: Uuid,
/// Timestamp of when the object was created
pub timestamp: u64,
/// State of the version
pub state: ObjectVersionState,
}
/// State of an object version
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionState {
/// The version is being received
Uploading {
/// Indicates whether this is a multipart upload
multipart: bool,
/// Encryption params + headers to be included in the final object
encryption: ObjectVersionEncryption,
},
/// The version is fully received
Complete(ObjectVersionData),
/// The version uploaded containded errors or the upload was explicitly aborted
Aborted,
}
/// Data stored in object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionData {
/// The object was deleted, this Version is a tombstone to mark it as such
DeleteMarker,
/// The object is short, it's stored inlined.
/// It is never compressed. For encrypted objects, it is encrypted using
/// AES256-GCM, like the encrypted headers.
Inline(ObjectVersionMeta, #[serde(with = "serde_bytes")] Vec<u8>),
/// The object is not short, Hash of first block is stored here, next segments hashes are
/// stored in the version table
FirstBlock(ObjectVersionMeta, Hash),
}
/// Metadata about the object version
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct ObjectVersionMeta {
/// Size of the object. If object is encrypted/compressed,
/// this is always the size of the unencrypted/uncompressed data
pub size: u64,
/// etag of the object
pub etag: String,
/// Encryption params + headers (encrypted or plaintext)
pub encryption: ObjectVersionEncryption,
}
/// Encryption information + metadata
#[derive(PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub enum ObjectVersionEncryption {
SseC {
/// Encrypted serialized ObjectVersionHeaders struct.
/// This is never compressed, just encrypted using AES256-GCM.
#[serde(with = "serde_bytes")]
headers: Vec<u8>,
/// Whether data blocks are compressed in addition to being encrypted
/// (compression happens before encryption, whereas for non-encrypted
/// objects, compression is handled at the level of the block manager)
compressed: bool,
},
Plaintext {
/// Plain-text headers
headers: ObjectVersionHeaders,
},
}
impl garage_util::migrate::Migrate for Object {
const VERSION_MARKER: &'static [u8] = b"G010s3ob";
type Previous = v09::Object;
fn migrate(old: v09::Object) -> Object {
Object {
bucket_id: old.bucket_id,
key: old.key,
versions: old.versions.into_iter().map(migrate_version).collect(),
}
}
}
fn migrate_version(old: v09::ObjectVersion) -> ObjectVersion {
ObjectVersion {
uuid: old.uuid,
timestamp: old.timestamp,
state: match old.state {
v09::ObjectVersionState::Uploading { multipart, headers } => {
ObjectVersionState::Uploading {
multipart,
encryption: migrate_headers(headers),
}
}
v09::ObjectVersionState::Complete(d) => {
ObjectVersionState::Complete(migrate_data(d))
}
v09::ObjectVersionState::Aborted => ObjectVersionState::Aborted,
},
}
}
fn migrate_data(old: v09::ObjectVersionData) -> ObjectVersionData {
match old {
v09::ObjectVersionData::DeleteMarker => ObjectVersionData::DeleteMarker,
v09::ObjectVersionData::Inline(meta, data) => {
ObjectVersionData::Inline(migrate_meta(meta), data)
}
v09::ObjectVersionData::FirstBlock(meta, fb) => {
ObjectVersionData::FirstBlock(migrate_meta(meta), fb)
}
}
}
fn migrate_meta(old: v09::ObjectVersionMeta) -> ObjectVersionMeta {
ObjectVersionMeta {
size: old.size,
etag: old.etag,
encryption: migrate_headers(old.headers),
}
}
fn migrate_headers(old: v09::ObjectVersionHeaders) -> ObjectVersionEncryption {
ObjectVersionEncryption::Plaintext { headers: old }
}
// Since ObjectVersionHeaders can now be serialized independently, for the
// purpose of being encrypted, we need it to support migrations on its own
// as well.
impl garage_util::migrate::InitialFormat for ObjectVersionHeaders {
const VERSION_MARKER: &'static [u8] = b"G010s3oh";
}
}
pub use v010::*;
impl Object { impl Object {
/// Initialize an Object struct from parts /// Initialize an Object struct from parts

View file

@ -44,7 +44,8 @@ mod v05 {
pub struct VersionBlockKey { pub struct VersionBlockKey {
/// Number of the part /// Number of the part
pub part_number: u64, pub part_number: u64,
/// Offset of this sub-segment in its part /// Offset of this sub-segment in its part as sent by the client
/// (before any kind of compression or encryption)
pub offset: u64, pub offset: u64,
} }
@ -53,7 +54,7 @@ mod v05 {
pub struct VersionBlock { pub struct VersionBlock {
/// Blake2 sum of the block /// Blake2 sum of the block
pub hash: Hash, pub hash: Hash,
/// Size of the block /// Size of the block, before any kind of compression or encryption
pub size: u64, pub size: u64,
} }