Compare commits

...

10 commits

39 changed files with 716 additions and 420 deletions

102
Cargo.lock generated
View file

@ -1299,7 +1299,9 @@ dependencies = [
"hex", "hex",
"hmac", "hmac",
"http 1.0.0", "http 1.0.0",
"http-body-util",
"hyper 1.1.0", "hyper 1.1.0",
"hyper-util",
"k2v-client", "k2v-client",
"kuska-sodiumoxide", "kuska-sodiumoxide",
"mktemp", "mktemp",
@ -1906,7 +1908,7 @@ dependencies = [
"hyper 0.14.28", "hyper 0.14.28",
"log", "log",
"rustls 0.20.9", "rustls 0.20.9",
"rustls-native-certs", "rustls-native-certs 0.6.3",
"tokio", "tokio",
"tokio-rustls 0.23.4", "tokio-rustls 0.23.4",
] ]
@ -1922,11 +1924,30 @@ dependencies = [
"hyper 0.14.28", "hyper 0.14.28",
"log", "log",
"rustls 0.21.10", "rustls 0.21.10",
"rustls-native-certs", "rustls-native-certs 0.6.3",
"tokio", "tokio",
"tokio-rustls 0.24.1", "tokio-rustls 0.24.1",
] ]
[[package]]
name = "hyper-rustls"
version = "0.26.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
dependencies = [
"futures-util",
"http 1.0.0",
"hyper 1.1.0",
"hyper-util",
"log",
"rustls 0.22.2",
"rustls-native-certs 0.7.0",
"rustls-pki-types",
"tokio",
"tokio-rustls 0.25.0",
"tower-service",
]
[[package]] [[package]]
name = "hyper-timeout" name = "hyper-timeout"
version = "0.4.1" version = "0.4.1"
@ -2123,8 +2144,10 @@ dependencies = [
"format_table", "format_table",
"hex", "hex",
"http 1.0.0", "http 1.0.0",
"http-body-util",
"hyper 1.1.0", "hyper 1.1.0",
"hyper-rustls 0.24.2", "hyper-rustls 0.26.0",
"hyper-util",
"log", "log",
"percent-encoding", "percent-encoding",
"serde", "serde",
@ -2189,7 +2212,7 @@ dependencies = [
"pem", "pem",
"pin-project", "pin-project",
"rustls 0.20.9", "rustls 0.20.9",
"rustls-pemfile", "rustls-pemfile 1.0.4",
"secrecy", "secrecy",
"serde", "serde",
"serde_json", "serde_json",
@ -3188,7 +3211,7 @@ dependencies = [
"percent-encoding", "percent-encoding",
"pin-project-lite", "pin-project-lite",
"rustls 0.21.10", "rustls 0.21.10",
"rustls-pemfile", "rustls-pemfile 1.0.4",
"serde", "serde",
"serde_json", "serde_json",
"serde_urlencoded", "serde_urlencoded",
@ -3334,10 +3357,24 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
dependencies = [ dependencies = [
"log", "log",
"ring 0.17.7", "ring 0.17.7",
"rustls-webpki", "rustls-webpki 0.101.7",
"sct", "sct",
] ]
[[package]]
name = "rustls"
version = "0.22.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
dependencies = [
"log",
"ring 0.17.7",
"rustls-pki-types",
"rustls-webpki 0.102.2",
"subtle",
"zeroize",
]
[[package]] [[package]]
name = "rustls-native-certs" name = "rustls-native-certs"
version = "0.6.3" version = "0.6.3"
@ -3345,7 +3382,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00" checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
dependencies = [ dependencies = [
"openssl-probe", "openssl-probe",
"rustls-pemfile", "rustls-pemfile 1.0.4",
"schannel",
"security-framework",
]
[[package]]
name = "rustls-native-certs"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
dependencies = [
"openssl-probe",
"rustls-pemfile 2.0.0",
"rustls-pki-types",
"schannel", "schannel",
"security-framework", "security-framework",
] ]
@ -3359,6 +3409,22 @@ dependencies = [
"base64 0.21.7", "base64 0.21.7",
] ]
[[package]]
name = "rustls-pemfile"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"
dependencies = [
"base64 0.21.7",
"rustls-pki-types",
]
[[package]]
name = "rustls-pki-types"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf"
[[package]] [[package]]
name = "rustls-webpki" name = "rustls-webpki"
version = "0.101.7" version = "0.101.7"
@ -3369,6 +3435,17 @@ dependencies = [
"untrusted 0.9.0", "untrusted 0.9.0",
] ]
[[package]]
name = "rustls-webpki"
version = "0.102.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"
dependencies = [
"ring 0.17.7",
"rustls-pki-types",
"untrusted 0.9.0",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.14" version = "1.0.14"
@ -3998,6 +4075,17 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-rustls"
version = "0.25.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
dependencies = [
"rustls 0.22.2",
"rustls-pki-types",
"tokio",
]
[[package]] [[package]]
name = "tokio-stream" name = "tokio-stream"
version = "0.1.14" version = "0.1.14"

134
Cargo.nix
View file

@ -33,7 +33,7 @@ args@{
ignoreLockHash, ignoreLockHash,
}: }:
let let
nixifiedLockHash = "b09e8e1592cb6ec8175708b13ee4a2578aa697c18a94d5a545328078ab263b2f"; nixifiedLockHash = "ae22401ab1ffcc653dc14ce17eaece13d9fb2dad1422ae9ada91c1757cc85575";
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc; workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock); currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
lockHashIgnored = if ignoreLockHash lockHashIgnored = if ignoreLockHash
@ -1917,7 +1917,9 @@ in
chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.33" { inherit profileName; }).out; chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.33" { inherit profileName; }).out;
hmac = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hmac."0.12.1" { inherit profileName; }).out; hmac = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hmac."0.12.1" { inherit profileName; }).out;
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
http_body_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" { inherit profileName; }).out;
hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out;
hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out;
k2v_client = (rustPackages."unknown".k2v-client."0.0.4" { inherit profileName; }).out; k2v_client = (rustPackages."unknown".k2v-client."0.0.4" { inherit profileName; }).out;
mktemp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.5.1" { inherit profileName; }).out; mktemp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.5.1" { inherit profileName; }).out;
serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.113" { inherit profileName; }).out; serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.113" { inherit profileName; }).out;
@ -2731,6 +2733,37 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.26.0" = overridableMkRustCrate (profileName: rec {
name = "hyper-rustls";
version = "0.26.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"; };
features = builtins.concatLists [
[ "default" ]
[ "http1" ]
[ "http2" ]
[ "log" ]
[ "logging" ]
[ "native-tokio" ]
[ "ring" ]
[ "rustls-native-certs" ]
[ "tls12" ]
];
dependencies = {
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out;
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out;
hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out;
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out;
rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls."0.22.2" { inherit profileName; }).out;
rustls_native_certs = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.7.0" { inherit profileName; }).out;
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
tokio_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-rustls."0.25.0" { inherit profileName; }).out;
tower_service = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-service."0.3.2" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".hyper-timeout."0.4.1" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".hyper-timeout."0.4.1" = overridableMkRustCrate (profileName: rec {
name = "hyper-timeout"; name = "hyper-timeout";
version = "0.4.1"; version = "0.4.1";
@ -3007,8 +3040,10 @@ in
${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/format_table" then "format_table" else null } = (rustPackages."unknown".format_table."0.1.1" { inherit profileName; }).out; ${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/format_table" then "format_table" else null } = (rustPackages."unknown".format_table."0.1.1" { inherit profileName; }).out;
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out; hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out; http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
http_body_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" { inherit profileName; }).out;
hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out; hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out;
hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.24.2" { inherit profileName; }).out; hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.26.0" { inherit profileName; }).out;
hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out;
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out; log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out;
percent_encoding = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.3.1" { inherit profileName; }).out; percent_encoding = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.3.1" { inherit profileName; }).out;
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out; serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
@ -4789,6 +4824,27 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".rustls."0.22.2" = overridableMkRustCrate (profileName: rec {
name = "rustls";
version = "0.22.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"; };
features = builtins.concatLists [
[ "log" ]
[ "logging" ]
[ "ring" ]
[ "tls12" ]
];
dependencies = {
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out;
ring = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ring."0.17.7" { inherit profileName; }).out;
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
webpki = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.102.2" { inherit profileName; }).out;
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
zeroize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".zeroize."1.7.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.6.3" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.6.3" = overridableMkRustCrate (profileName: rec {
name = "rustls-native-certs"; name = "rustls-native-certs";
version = "0.6.3"; version = "0.6.3";
@ -4802,6 +4858,20 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.7.0" = overridableMkRustCrate (profileName: rec {
name = "rustls-native-certs";
version = "0.7.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"; };
dependencies = {
${ if hostPlatform.isUnix && !(hostPlatform.parsed.kernel.name == "darwin") then "openssl_probe" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl-probe."0.1.5" { inherit profileName; }).out;
rustls_pemfile = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."2.0.0" { inherit profileName; }).out;
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
${ if hostPlatform.isWindows then "schannel" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".schannel."0.1.23" { inherit profileName; }).out;
${ if hostPlatform.parsed.kernel.name == "darwin" then "security_framework" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".security-framework."2.9.2" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."1.0.4" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."1.0.4" = overridableMkRustCrate (profileName: rec {
name = "rustls-pemfile"; name = "rustls-pemfile";
version = "1.0.4"; version = "1.0.4";
@ -4812,6 +4882,33 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."2.0.0" = overridableMkRustCrate (profileName: rec {
name = "rustls-pemfile";
version = "2.0.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"; };
features = builtins.concatLists [
[ "default" ]
[ "std" ]
];
dependencies = {
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out;
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" = overridableMkRustCrate (profileName: rec {
name = "rustls-pki-types";
version = "1.2.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
[ "std" ]
];
});
"registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.101.7" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.101.7" = overridableMkRustCrate (profileName: rec {
name = "rustls-webpki"; name = "rustls-webpki";
version = "0.101.7"; version = "0.101.7";
@ -4828,6 +4925,23 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.102.2" = overridableMkRustCrate (profileName: rec {
name = "rustls-webpki";
version = "0.102.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"; };
features = builtins.concatLists [
[ "alloc" ]
[ "ring" ]
[ "std" ]
];
dependencies = {
ring = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ring."0.17.7" { inherit profileName; }).out;
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
untrusted = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.9.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".rustversion."1.0.14" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".rustversion."1.0.14" = overridableMkRustCrate (profileName: rec {
name = "rustversion"; name = "rustversion";
version = "1.0.14"; version = "1.0.14";
@ -5701,6 +5815,22 @@ in
}; };
}); });
"registry+https://github.com/rust-lang/crates.io-index".tokio-rustls."0.25.0" = overridableMkRustCrate (profileName: rec {
name = "tokio-rustls";
version = "0.25.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"; };
features = builtins.concatLists [
[ "logging" ]
[ "tls12" ]
];
dependencies = {
rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls."0.22.2" { inherit profileName; }).out;
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
};
});
"registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" = overridableMkRustCrate (profileName: rec {
name = "tokio-stream"; name = "tokio-stream";
version = "0.1.14"; version = "0.1.14";

View file

@ -98,7 +98,7 @@ httpdate = "1.0"
http-range = "0.1" http-range = "0.1"
http-body-util = "0.1" http-body-util = "0.1"
hyper = { version = "1.0", features = ["server", "http1"] } hyper = { version = "1.0", features = ["server", "http1"] }
hyper-util = { verion = "0.1", features = [ "full" ]} hyper-util = { version = "0.1", features = [ "full" ] }
multer = "3.0" multer = "3.0"
percent-encoding = "2.2" percent-encoding = "2.2"
roxmltree = "0.19" roxmltree = "0.19"
@ -116,8 +116,8 @@ opentelemetry-otlp = "0.10"
prometheus = "0.13" prometheus = "0.13"
# used by the k2v-client crate only # used by the k2v-client crate only
aws-sigv4 = {version = "1.1", features = ["http0-compat"] } aws-sigv4 = {version = "1.1" }
hyper-rustls = { version = "0.24", features = ["http2"] } hyper-rustls = { version = "0.26", features = ["http2"] }
log = "0.4" log = "0.4"
thiserror = "1.0" thiserror = "1.0"
@ -131,7 +131,10 @@ aws-sdk-config = "1.13"
aws-sdk-s3 = "1.14" aws-sdk-s3 = "1.14"
[profile.dev] [profile.dev]
lto = "off" lto = "thin"
[profile.release] [profile.release]
debug = true lto = true
codegen-units = 1
opt-level = "s"
strip = true

View file

@ -271,7 +271,7 @@ pub async fn handle_create_bucket(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<CreateBucketRequest>(req).await?; let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
if let Some(ga) = &req.global_alias { if let Some(ga) = &req.global_alias {
if !is_valid_bucket_name(ga) { if !is_valid_bucket_name(ga) {
@ -412,7 +412,7 @@ pub async fn handle_update_bucket(
id: String, id: String,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<UpdateBucketRequest>(req).await?; let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?;
let bucket_id = parse_bucket_id(&id)?; let bucket_id = parse_bucket_id(&id)?;
let mut bucket = garage let mut bucket = garage
@ -474,7 +474,7 @@ pub async fn handle_bucket_change_key_perm(
req: Request<IncomingBody>, req: Request<IncomingBody>,
new_perm_flag: bool, new_perm_flag: bool,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?; let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
let bucket_id = parse_bucket_id(&req.bucket_id)?; let bucket_id = parse_bucket_id(&req.bucket_id)?;

View file

@ -64,7 +64,7 @@ pub async fn handle_connect_cluster_nodes(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<Vec<String>>(req).await?; let req = parse_json_body::<Vec<String>, _, Error>(req).await?;
let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node))) let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
.await .await
@ -206,7 +206,7 @@ pub async fn handle_update_cluster_layout(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
let mut layout = garage.system.get_cluster_layout(); let mut layout = garage.system.get_cluster_layout();
@ -246,7 +246,7 @@ pub async fn handle_apply_cluster_layout(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
let layout = garage.system.get_cluster_layout(); let layout = garage.system.get_cluster_layout();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
@ -264,7 +264,7 @@ pub async fn handle_revert_cluster_layout(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?; let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
let layout = garage.system.get_cluster_layout(); let layout = garage.system.get_cluster_layout();
let layout = layout.revert_staged_changes(Some(param.version))?; let layout = layout.revert_staged_changes(Some(param.version))?;

View file

@ -7,7 +7,7 @@ pub use garage_model::helper::error::Error as HelperError;
use crate::common_error::CommonError; use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError; use crate::generic_server::ApiError;
use crate::helpers::{BytesBody, CustomApiErrorBody}; use crate::helpers::*;
/// Errors of this crate /// Errors of this crate
#[derive(Debug, Error)] #[derive(Debug, Error)]
@ -40,18 +40,6 @@ where
impl CommonErrorDerivative for Error {} impl CommonErrorDerivative for Error {}
impl From<HelperError> for Error {
fn from(err: HelperError) -> Self {
match err {
HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
}
}
}
impl Error { impl Error {
fn code(&self) -> &'static str { fn code(&self) -> &'static str {
match self { match self {
@ -77,7 +65,7 @@ impl ApiError for Error {
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
} }
fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = CustomApiErrorBody { let error = CustomApiErrorBody {
code: self.code().to_string(), code: self.code().to_string(),
message: format!("{}", self), message: format!("{}", self),
@ -93,6 +81,6 @@ impl ApiError for Error {
"# "#
.into() .into()
}); });
BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) error_body(error_str)
} }
} }

View file

@ -65,7 +65,7 @@ pub async fn handle_create_key(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<CreateKeyRequest>(req).await?; let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key")); let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?; garage.key_table.insert(&key).await?;
@ -83,7 +83,7 @@ pub async fn handle_import_key(
garage: &Arc<Garage>, garage: &Arc<Garage>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<ImportKeyRequest>(req).await?; let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?; let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
if prev_key.is_some() { if prev_key.is_some() {
@ -114,7 +114,7 @@ pub async fn handle_update_key(
id: String, id: String,
req: Request<IncomingBody>, req: Request<IncomingBody>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let req = parse_json_body::<UpdateKeyRequest>(req).await?; let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?;
let mut key = garage.key_helper().get_existing_key(&id).await?; let mut key = garage.key_helper().get_existing_key(&id).await?;

View file

@ -3,6 +3,8 @@ use hyper::StatusCode;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_model::helper::error::Error as HelperError;
/// Errors of this crate /// Errors of this crate
#[derive(Debug, Error)] #[derive(Debug, Error)]
pub enum CommonError { pub enum CommonError {
@ -28,6 +30,10 @@ pub enum CommonError {
#[error(display = "Bad request: {}", _0)] #[error(display = "Bad request: {}", _0)]
BadRequest(String), BadRequest(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
// ---- SPECIFIC ERROR CONDITIONS ---- // ---- SPECIFIC ERROR CONDITIONS ----
// These have to be error codes referenced in the S3 spec here: // These have to be error codes referenced in the S3 spec here:
// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList // https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
@ -64,7 +70,9 @@ impl CommonError {
CommonError::Forbidden(_) => StatusCode::FORBIDDEN, CommonError::Forbidden(_) => StatusCode::FORBIDDEN,
CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND, CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND,
CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT, CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT,
CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST, CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => {
StatusCode::BAD_REQUEST
}
} }
} }
@ -84,6 +92,7 @@ impl CommonError {
CommonError::BucketAlreadyExists => "BucketAlreadyExists", CommonError::BucketAlreadyExists => "BucketAlreadyExists",
CommonError::BucketNotEmpty => "BucketNotEmpty", CommonError::BucketNotEmpty => "BucketNotEmpty",
CommonError::InvalidBucketName(_) => "InvalidBucketName", CommonError::InvalidBucketName(_) => "InvalidBucketName",
CommonError::InvalidHeader(_) => "InvalidHeaderValue",
} }
} }
@ -92,6 +101,18 @@ impl CommonError {
} }
} }
impl From<HelperError> for CommonError {
fn from(err: HelperError) -> Self {
match err {
HelperError::Internal(i) => Self::InternalError(i),
HelperError::BadRequest(b) => Self::BadRequest(b),
HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n),
HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n),
e => Self::bad_request(format!("{}", e)),
}
}
}
pub trait CommonErrorDerivative: From<CommonError> { pub trait CommonErrorDerivative: From<CommonError> {
fn internal_error<M: ToString>(msg: M) -> Self { fn internal_error<M: ToString>(msg: M) -> Self {
Self::from(CommonError::InternalError(GarageError::Message( Self::from(CommonError::InternalError(GarageError::Message(

View file

@ -1,3 +1,4 @@
use std::convert::Infallible;
use std::fs::{self, Permissions}; use std::fs::{self, Permissions};
use std::os::unix::fs::PermissionsExt; use std::os::unix::fs::PermissionsExt;
use std::sync::Arc; use std::sync::Arc;
@ -5,6 +6,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::future::Future; use futures::future::Future;
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
use http_body_util::BodyExt; use http_body_util::BodyExt;
use hyper::header::HeaderValue; use hyper::header::HeaderValue;
@ -15,7 +17,7 @@ use hyper::{HeaderMap, StatusCode};
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite}; use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, UnixListener}; use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
use opentelemetry::{ use opentelemetry::{
global, global,
@ -29,7 +31,7 @@ use garage_util::forwarded_headers;
use garage_util::metrics::{gen_trace_id, RecordDuration}; use garage_util::metrics::{gen_trace_id, RecordDuration};
use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_util::socket_address::UnixOrTCPSocketAddress;
use crate::helpers::{BoxBody, BytesBody}; use crate::helpers::{BoxBody, ErrorBody};
pub(crate) trait ApiEndpoint: Send + Sync + 'static { pub(crate) trait ApiEndpoint: Send + Sync + 'static {
fn name(&self) -> &'static str; fn name(&self) -> &'static str;
@ -39,7 +41,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static {
pub trait ApiError: std::error::Error + Send + Sync + 'static { pub trait ApiError: std::error::Error + Send + Sync + 'static {
fn http_status_code(&self) -> StatusCode; fn http_status_code(&self) -> StatusCode;
fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>); fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>);
fn http_body(&self, garage_region: &str, path: &str) -> BytesBody; fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
} }
#[async_trait] #[async_trait]
@ -110,20 +112,12 @@ impl<A: ApiHandler> ApiServer<A> {
bind_addr bind_addr
); );
tokio::pin!(shutdown_signal);
match bind_addr { match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => { UnixOrTCPSocketAddress::TCPSocket(addr) => {
let listener = TcpListener::bind(addr).await?; let listener = TcpListener::bind(addr).await?;
loop { let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
let (stream, client_addr) = tokio::select! { server_loop(listener, handler, shutdown_signal).await
acc = listener.accept() => acc?,
_ = &mut shutdown_signal => break,
};
self.launch_handler(stream, client_addr.to_string());
}
} }
UnixOrTCPSocketAddress::UnixSocket(ref path) => { UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() { if path.exists() {
@ -131,52 +125,24 @@ impl<A: ApiHandler> ApiServer<A> {
} }
let listener = UnixListener::bind(path)?; let listener = UnixListener::bind(path)?;
let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions( fs::set_permissions(
path, path,
Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)), Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)),
)?; )?;
loop { let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
let (stream, _) = tokio::select! { server_loop(listener, handler, shutdown_signal).await
acc = listener.accept() => acc?,
_ = &mut shutdown_signal => break,
};
self.launch_handler(stream, path.display().to_string());
}
} }
}; }
Ok(())
}
fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String)
where
S: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
let this = self.clone();
let io = TokioIo::new(stream);
let serve =
move |req: Request<IncomingBody>| this.clone().handler(req, client_addr.to_string());
tokio::task::spawn(async move {
let io = Box::pin(io);
if let Err(e) = http1::Builder::new()
.serve_connection(io, service_fn(serve))
.await
{
debug!("Error handling HTTP connection: {}", e);
}
});
} }
async fn handler( async fn handler(
self: Arc<Self>, self: Arc<Self>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
addr: String, addr: String,
) -> Result<Response<BoxBody<A::Error>>, GarageError> { ) -> Result<Response<BoxBody<A::Error>>, http::Error> {
let uri = req.uri().clone(); let uri = req.uri().clone();
if let Ok(forwarded_for_ip_addr) = if let Ok(forwarded_for_ip_addr) =
@ -229,7 +195,8 @@ impl<A: ApiHandler> ApiServer<A> {
} else { } else {
info!("Response: error {}, {}", e.http_status_code(), e); info!("Response: error {}, {}", e.http_status_code(), e);
} }
Ok(http_error.map(|body| BoxBody::new(body.map_err(|_| unreachable!())))) Ok(http_error
.map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!()))))
} }
} }
} }
@ -278,3 +245,105 @@ impl<A: ApiHandler> ApiServer<A> {
res res
} }
} }
// ==== helper functions ====
#[async_trait]
pub trait Accept: Send + Sync + 'static {
type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
}
#[async_trait]
impl Accept for TcpListener {
type Stream = TcpStream;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
self.accept()
.await
.map(|(stream, addr)| (stream, addr.to_string()))
}
}
pub struct UnixListenerOn(pub UnixListener, pub String);
#[async_trait]
impl Accept for UnixListenerOn {
type Stream = UnixStream;
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
self.0
.accept()
.await
.map(|(stream, _addr)| (stream, self.1.clone()))
}
}
pub async fn server_loop<A, H, F, E>(
listener: A,
handler: H,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError>
where
A: Accept,
H: Fn(Request<IncomingBody>, String) -> F + Send + Sync + Clone + 'static,
F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static,
E: Send + Sync + std::error::Error + 'static,
{
tokio::pin!(shutdown_signal);
let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel();
let connection_collector = tokio::spawn(async move {
let mut collection = FuturesUnordered::new();
loop {
let collect_next = async {
if collection.is_empty() {
futures::future::pending().await
} else {
collection.next().await
}
};
tokio::select! {
result = collect_next => {
trace!("HTTP connection finished: {:?}", result);
}
new_fut = conn_out.recv() => {
match new_fut {
Some(f) => collection.push(f),
None => break,
}
}
}
}
debug!("Collecting last open HTTP connections.");
while let Some(conn_res) = collection.next().await {
trace!("HTTP connection finished: {:?}", conn_res);
}
debug!("No more HTTP connections to collect");
});
loop {
let (stream, client_addr) = tokio::select! {
acc = listener.accept() => acc?,
_ = &mut shutdown_signal => break,
};
let io = TokioIo::new(stream);
let handler = handler.clone();
let serve = move |req: Request<IncomingBody>| handler(req, client_addr.clone());
let fut = tokio::task::spawn(async move {
let io = Box::pin(io);
if let Err(e) = http1::Builder::new()
.serve_connection(io, service_fn(serve))
.await
{
debug!("Error handling HTTP connection: {}", e);
}
});
conn_in.send(fut)?;
}
connection_collector.await?;
Ok(())
}

View file

@ -1,8 +1,17 @@
use std::convert::Infallible;
use futures::{Stream, StreamExt, TryStreamExt};
use http_body_util::{BodyExt, Full as FullBody}; use http_body_util::{BodyExt, Full as FullBody};
use hyper::{body::Incoming as IncomingBody, Request, Response}; use hyper::{
body::{Body, Bytes},
Request, Response,
};
use idna::domain_to_unicode; use idna::domain_to_unicode;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::error::Error as GarageError;
use crate::common_error::{CommonError as Error, *}; use crate::common_error::{CommonError as Error, *};
/// What kind of authorization is required to perform a given action /// What kind of authorization is required to perform a given action
@ -141,34 +150,62 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
// =============== body helpers ================= // =============== body helpers =================
pub type BytesBody = FullBody<bytes::Bytes>; pub type EmptyBody = http_body_util::Empty<bytes::Bytes>;
pub type ErrorBody = FullBody<bytes::Bytes>;
pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>; pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>;
pub fn string_body<E>(s: String) -> BoxBody<E> { pub fn string_body<E>(s: String) -> BoxBody<E> {
bytes_body(bytes::Bytes::from(s.into_bytes())) bytes_body(bytes::Bytes::from(s.into_bytes()))
} }
pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> { pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> {
BoxBody::new(FullBody::new(b).map_err(|_| unreachable!())) BoxBody::new(FullBody::new(b).map_err(|_: Infallible| unreachable!()))
} }
pub fn empty_body<E>() -> BoxBody<E> { pub fn empty_body<E>() -> BoxBody<E> {
BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!())) BoxBody::new(http_body_util::Empty::new().map_err(|_: Infallible| unreachable!()))
}
pub fn error_body(s: String) -> ErrorBody {
ErrorBody::from(bytes::Bytes::from(s.into_bytes()))
} }
pub async fn parse_json_body<T>(req: Request<IncomingBody>) -> Result<T, Error> pub async fn parse_json_body<T, B, E>(req: Request<B>) -> Result<T, E>
where where
T: for<'de> Deserialize<'de>, T: for<'de> Deserialize<'de>,
B: Body,
E: From<<B as Body>::Error> + From<Error>,
{ {
let body = req.into_body().collect().await?.to_bytes(); let body = req.into_body().collect().await?.to_bytes();
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?; let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
Ok(resp) Ok(resp)
} }
pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, Error> { pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, E>
let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?; where
E: From<Error>,
{
let resp_json = serde_json::to_string_pretty(res)
.map_err(GarageError::from)
.map_err(Error::from)?;
Ok(Response::builder() Ok(Response::builder()
.status(hyper::StatusCode::OK) .status(hyper::StatusCode::OK)
.header(http::header::CONTENT_TYPE, "application/json") .header(http::header::CONTENT_TYPE, "application/json")
.body(string_body(resp_json))?) .body(string_body(resp_json))
.unwrap())
}
pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>>
where
B: Body<Data = Bytes>,
<B as Body>::Error: Into<E>,
E: From<Error>,
{
let stream = http_body_util::BodyStream::new(body);
let stream = TryStreamExt::map_err(stream, Into::into);
stream.map(|x| {
x.and_then(|f| {
f.into_data()
.map_err(|_| E::from(Error::bad_request("non-data frame")))
})
})
} }
pub fn is_default<T: Default + PartialEq>(v: &T) -> bool { pub fn is_default<T: Default + PartialEq>(v: &T) -> bool {

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use futures::future::Future; use futures::future::Future;
use hyper::{Body, Method, Request, Response}; use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
use opentelemetry::{trace::SpanRef, KeyValue}; use opentelemetry::{trace::SpanRef, KeyValue};
@ -25,6 +25,9 @@ use crate::k2v::item::*;
use crate::k2v::router::Endpoint; use crate::k2v::router::Endpoint;
use crate::s3::cors::*; use crate::s3::cors::*;
pub use crate::signature::streaming::ReqBody;
pub type ResBody = BoxBody<Error>;
pub struct K2VApiServer { pub struct K2VApiServer {
garage: Arc<Garage>, garage: Arc<Garage>,
} }
@ -55,7 +58,7 @@ impl ApiHandler for K2VApiServer {
type Endpoint = K2VApiEndpoint; type Endpoint = K2VApiEndpoint;
type Error = Error; type Error = Error;
fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> { fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> {
let (endpoint, bucket_name) = Endpoint::from_request(req)?; let (endpoint, bucket_name) = Endpoint::from_request(req)?;
Ok(K2VApiEndpoint { Ok(K2VApiEndpoint {
@ -66,9 +69,9 @@ impl ApiHandler for K2VApiServer {
async fn handle( async fn handle(
&self, &self,
req: Request<Body>, req: Request<IncomingBody>,
endpoint: K2VApiEndpoint, endpoint: K2VApiEndpoint,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let K2VApiEndpoint { let K2VApiEndpoint {
bucket_name, bucket_name,
endpoint, endpoint,
@ -77,9 +80,10 @@ impl ApiHandler for K2VApiServer {
// The OPTIONS method is procesed early, before we even check for an API key // The OPTIONS method is procesed early, before we even check for an API key
if let Endpoint::Options = endpoint { if let Endpoint::Options = endpoint {
return Ok(handle_options_s3api(garage, &req, Some(bucket_name)) let options_res = handle_options_api(garage, &req, Some(bucket_name))
.await .await
.ok_or_bad_request("Error handling OPTIONS")?); .ok_or_bad_request("Error handling OPTIONS")?;
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
} }
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?; let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;

View file

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use base64::prelude::*; use base64::prelude::*;
use hyper::{Body, Request, Response, StatusCode}; use hyper::{Request, Response, StatusCode};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_util::data::*; use garage_util::data::*;
@ -13,15 +13,16 @@ use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*; use garage_model::k2v::item_table::*;
use crate::helpers::*; use crate::helpers::*;
use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*; use crate::k2v::error::*;
use crate::k2v::range::read_range; use crate::k2v::range::read_range;
pub async fn handle_insert_batch( pub async fn handle_insert_batch(
garage: Arc<Garage>, garage: Arc<Garage>,
bucket_id: Uuid, bucket_id: Uuid,
req: Request<Body>, req: Request<ReqBody>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?; let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
let mut items2 = vec![]; let mut items2 = vec![];
for it in items { for it in items {
@ -41,15 +42,15 @@ pub async fn handle_insert_batch(
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::NO_CONTENT) .status(StatusCode::NO_CONTENT)
.body(Body::empty())?) .body(empty_body())?)
} }
pub async fn handle_read_batch( pub async fn handle_read_batch(
garage: Arc<Garage>, garage: Arc<Garage>,
bucket_id: Uuid, bucket_id: Uuid,
req: Request<Body>, req: Request<ReqBody>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?; let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all( let resp_results = futures::future::join_all(
queries queries
@ -139,9 +140,9 @@ async fn handle_read_batch_query(
pub async fn handle_delete_batch( pub async fn handle_delete_batch(
garage: Arc<Garage>, garage: Arc<Garage>,
bucket_id: Uuid, bucket_id: Uuid,
req: Request<Body>, req: Request<ReqBody>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?; let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
let resp_results = futures::future::join_all( let resp_results = futures::future::join_all(
queries queries
@ -253,11 +254,11 @@ pub(crate) async fn handle_poll_range(
garage: Arc<Garage>, garage: Arc<Garage>,
bucket_id: Uuid, bucket_id: Uuid,
partition_key: &str, partition_key: &str,
req: Request<Body>, req: Request<ReqBody>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
use garage_model::k2v::sub::PollRange; use garage_model::k2v::sub::PollRange;
let query = parse_json_body::<PollRangeQuery>(req).await?; let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000; let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
@ -292,7 +293,7 @@ pub(crate) async fn handle_poll_range(
} else { } else {
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED) .status(StatusCode::NOT_MODIFIED)
.body(Body::empty())?) .body(empty_body())?)
} }
} }

View file

@ -1,13 +1,11 @@
use err_derive::Error; use err_derive::Error;
use hyper::header::HeaderValue; use hyper::header::HeaderValue;
use hyper::{Body, HeaderMap, StatusCode}; use hyper::{HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
use crate::common_error::CommonError; use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError; use crate::generic_server::ApiError;
use crate::helpers::CustomApiErrorBody; use crate::helpers::*;
use crate::signature::error::Error as SignatureError; use crate::signature::error::Error as SignatureError;
/// Errors of this crate /// Errors of this crate
@ -30,10 +28,6 @@ pub enum Error {
#[error(display = "Invalid base64: {}", _0)] #[error(display = "Invalid base64: {}", _0)]
InvalidBase64(#[error(source)] base64::DecodeError), InvalidBase64(#[error(source)] base64::DecodeError),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client asked for an invalid return format (invalid Accept header) /// The client asked for an invalid return format (invalid Accept header)
#[error(display = "Not acceptable: {}", _0)] #[error(display = "Not acceptable: {}", _0)]
NotAcceptable(String), NotAcceptable(String),
@ -54,18 +48,6 @@ where
impl CommonErrorDerivative for Error {} impl CommonErrorDerivative for Error {}
impl From<HelperError> for Error {
fn from(err: HelperError) -> Self {
match err {
HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
e => Self::Common(CommonError::BadRequest(format!("{}", e))),
}
}
}
impl From<SignatureError> for Error { impl From<SignatureError> for Error {
fn from(err: SignatureError) -> Self { fn from(err: SignatureError) -> Self {
match err { match err {
@ -74,7 +56,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c) Self::AuthorizationHeaderMalformed(c)
} }
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
} }
} }
} }
@ -90,7 +71,6 @@ impl Error {
Error::NotAcceptable(_) => "NotAcceptable", Error::NotAcceptable(_) => "NotAcceptable",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::InvalidBase64(_) => "InvalidBase64", Error::InvalidBase64(_) => "InvalidBase64",
Error::InvalidHeader(_) => "InvalidHeaderValue",
Error::InvalidUtf8Str(_) => "InvalidUtf8String", Error::InvalidUtf8Str(_) => "InvalidUtf8String",
} }
} }
@ -105,7 +85,6 @@ impl ApiError for Error {
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
Error::AuthorizationHeaderMalformed(_) Error::AuthorizationHeaderMalformed(_)
| Error::InvalidBase64(_) | Error::InvalidBase64(_)
| Error::InvalidHeader(_)
| Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST, | Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
} }
} }
@ -115,14 +94,14 @@ impl ApiError for Error {
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap()); header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
} }
fn http_body(&self, garage_region: &str, path: &str) -> Body { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = CustomApiErrorBody { let error = CustomApiErrorBody {
code: self.code().to_string(), code: self.code().to_string(),
message: format!("{}", self), message: format!("{}", self),
path: path.to_string(), path: path.to_string(),
region: garage_region.to_string(), region: garage_region.to_string(),
}; };
Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| { let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
r#" r#"
{ {
"code": "InternalError", "code": "InternalError",
@ -130,6 +109,7 @@ impl ApiError for Error {
} }
"# "#
.into() .into()
})) });
error_body(error_str)
} }
} }

View file

@ -1,6 +1,6 @@
use std::sync::Arc; use std::sync::Arc;
use hyper::{Body, Response}; use hyper::Response;
use serde::Serialize; use serde::Serialize;
use garage_util::data::*; use garage_util::data::*;
@ -12,6 +12,7 @@ use garage_model::garage::Garage;
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES}; use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
use crate::helpers::*; use crate::helpers::*;
use crate::k2v::api_server::ResBody;
use crate::k2v::error::*; use crate::k2v::error::*;
use crate::k2v::range::read_range; use crate::k2v::range::read_range;
@ -23,7 +24,7 @@ pub async fn handle_read_index(
end: Option<String>, end: Option<String>,
limit: Option<u64>, limit: Option<u64>,
reverse: Option<bool>, reverse: Option<bool>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let reverse = reverse.unwrap_or(false); let reverse = reverse.unwrap_or(false);
let ring: Arc<Ring> = garage.system.ring.borrow().clone(); let ring: Arc<Ring> = garage.system.ring.borrow().clone();
@ -68,7 +69,7 @@ pub async fn handle_read_index(
next_start, next_start,
}; };
Ok(json_ok_response(&resp)?) json_ok_response::<Error, _>(&resp)
} }
#[derive(Serialize)] #[derive(Serialize)]

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use base64::prelude::*; use base64::prelude::*;
use http::header; use http::header;
use hyper::{body::HttpBody, Body, Request, Response, StatusCode}; use hyper::{Request, Response, StatusCode};
use garage_util::data::*; use garage_util::data::*;
@ -11,6 +11,8 @@ use garage_model::garage::Garage;
use garage_model::k2v::causality::*; use garage_model::k2v::causality::*;
use garage_model::k2v::item_table::*; use garage_model::k2v::item_table::*;
use crate::helpers::*;
use crate::k2v::api_server::{ReqBody, ResBody};
use crate::k2v::error::*; use crate::k2v::error::*;
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token"; pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
@ -22,7 +24,7 @@ pub enum ReturnFormat {
} }
impl ReturnFormat { impl ReturnFormat {
pub fn from(req: &Request<Body>) -> Result<Self, Error> { pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> {
let accept = match req.headers().get(header::ACCEPT) { let accept = match req.headers().get(header::ACCEPT) {
Some(a) => a.to_str()?, Some(a) => a.to_str()?,
None => return Ok(Self::Json), None => return Ok(Self::Json),
@ -40,7 +42,7 @@ impl ReturnFormat {
} }
} }
pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> { pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> {
let vals = item.values(); let vals = item.values();
if vals.is_empty() { if vals.is_empty() {
@ -52,7 +54,7 @@ impl ReturnFormat {
Self::Binary if vals.len() > 1 => Ok(Response::builder() Self::Binary if vals.len() > 1 => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(X_GARAGE_CAUSALITY_TOKEN, ct)
.status(StatusCode::CONFLICT) .status(StatusCode::CONFLICT)
.body(Body::empty())?), .body(empty_body())?),
Self::Binary => { Self::Binary => {
assert!(vals.len() == 1); assert!(vals.len() == 1);
Self::make_binary_response(ct, vals[0]) Self::make_binary_response(ct, vals[0])
@ -62,22 +64,22 @@ impl ReturnFormat {
} }
} }
fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> { fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> {
match v { match v {
DvvsValue::Deleted => Ok(Response::builder() DvvsValue::Deleted => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::NO_CONTENT) .status(StatusCode::NO_CONTENT)
.body(Body::empty())?), .body(empty_body())?),
DvvsValue::Value(v) => Ok(Response::builder() DvvsValue::Value(v) => Ok(Response::builder()
.header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/octet-stream") .header(header::CONTENT_TYPE, "application/octet-stream")
.status(StatusCode::OK) .status(StatusCode::OK)
.body(Body::from(v.to_vec()))?), .body(bytes_body(v.to_vec().into()))?),
} }
} }
fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> { fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> {
let items = v let items = v
.iter() .iter()
.map(|v| match v { .map(|v| match v {
@ -91,7 +93,7 @@ impl ReturnFormat {
.header(X_GARAGE_CAUSALITY_TOKEN, ct) .header(X_GARAGE_CAUSALITY_TOKEN, ct)
.header(header::CONTENT_TYPE, "application/json") .header(header::CONTENT_TYPE, "application/json")
.status(StatusCode::OK) .status(StatusCode::OK)
.body(Body::from(json_body))?) .body(string_body(json_body))?)
} }
} }
@ -99,11 +101,11 @@ impl ReturnFormat {
#[allow(clippy::ptr_arg)] #[allow(clippy::ptr_arg)]
pub async fn handle_read_item( pub async fn handle_read_item(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<ReqBody>,
bucket_id: Uuid, bucket_id: Uuid,
partition_key: &str, partition_key: &str,
sort_key: &String, sort_key: &String,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?; let format = ReturnFormat::from(req)?;
let item = garage let item = garage
@ -124,11 +126,11 @@ pub async fn handle_read_item(
pub async fn handle_insert_item( pub async fn handle_insert_item(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<ReqBody>,
bucket_id: Uuid, bucket_id: Uuid,
partition_key: &str, partition_key: &str,
sort_key: &str, sort_key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let causal_context = req let causal_context = req
.headers() .headers()
.get(X_GARAGE_CAUSALITY_TOKEN) .get(X_GARAGE_CAUSALITY_TOKEN)
@ -137,7 +139,9 @@ pub async fn handle_insert_item(
.map(CausalContext::parse_helper) .map(CausalContext::parse_helper)
.transpose()?; .transpose()?;
let body = req.into_body().collect().await?.to_bytes(); let body = http_body_util::BodyExt::collect(req.into_body())
.await?
.to_bytes();
let value = DvvsValue::Value(body.to_vec()); let value = DvvsValue::Value(body.to_vec());
@ -155,16 +159,16 @@ pub async fn handle_insert_item(
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::NO_CONTENT) .status(StatusCode::NO_CONTENT)
.body(Body::empty())?) .body(empty_body())?)
} }
pub async fn handle_delete_item( pub async fn handle_delete_item(
garage: Arc<Garage>, garage: Arc<Garage>,
req: Request<Body>, req: Request<ReqBody>,
bucket_id: Uuid, bucket_id: Uuid,
partition_key: &str, partition_key: &str,
sort_key: &str, sort_key: &str,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let causal_context = req let causal_context = req
.headers() .headers()
.get(X_GARAGE_CAUSALITY_TOKEN) .get(X_GARAGE_CAUSALITY_TOKEN)
@ -189,20 +193,20 @@ pub async fn handle_delete_item(
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::NO_CONTENT) .status(StatusCode::NO_CONTENT)
.body(Body::empty())?) .body(empty_body())?)
} }
/// Handle ReadItem request /// Handle ReadItem request
#[allow(clippy::ptr_arg)] #[allow(clippy::ptr_arg)]
pub async fn handle_poll_item( pub async fn handle_poll_item(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<Body>, req: &Request<ReqBody>,
bucket_id: Uuid, bucket_id: Uuid,
partition_key: String, partition_key: String,
sort_key: String, sort_key: String,
causality_token: String, causality_token: String,
timeout_secs: Option<u64>, timeout_secs: Option<u64>,
) -> Result<Response<Body>, Error> { ) -> Result<Response<ResBody>, Error> {
let format = ReturnFormat::from(req)?; let format = ReturnFormat::from(req)?;
let causal_context = let causal_context =
@ -227,6 +231,6 @@ pub async fn handle_poll_item(
} else { } else {
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::NOT_MODIFIED) .status(StatusCode::NOT_MODIFIED)
.body(Body::empty())?) .body(empty_body())?)
} }
} }

View file

@ -121,7 +121,8 @@ impl ApiHandler for S3ApiServer {
return handle_post_object(garage, req, bucket_name.unwrap()).await; return handle_post_object(garage, req, bucket_name.unwrap()).await;
} }
if let Endpoint::Options = endpoint { if let Endpoint::Options = endpoint {
return handle_options_s3api(garage, &req, bucket_name).await; let options_res = handle_options_api(garage, &req, bucket_name).await?;
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
} }
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?; let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;

View file

@ -14,6 +14,7 @@ use http_body_util::BodyExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::common_error::CommonError;
use crate::helpers::*; use crate::helpers::*;
use crate::s3::api_server::{ReqBody, ResBody}; use crate::s3::api_server::{ReqBody, ResBody};
use crate::s3::error::*; use crate::s3::error::*;
@ -94,11 +95,11 @@ pub async fn handle_put_cors(
.body(empty_body())?) .body(empty_body())?)
} }
pub async fn handle_options_s3api( pub async fn handle_options_api(
garage: Arc<Garage>, garage: Arc<Garage>,
req: &Request<IncomingBody>, req: &Request<IncomingBody>,
bucket_name: Option<String>, bucket_name: Option<String>,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<EmptyBody>, CommonError> {
// FIXME: CORS rules of buckets with local aliases are // FIXME: CORS rules of buckets with local aliases are
// not taken into account. // not taken into account.
@ -128,7 +129,7 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "*") .header(ACCESS_CONTROL_ALLOW_METHODS, "*")
.status(StatusCode::OK) .status(StatusCode::OK)
.body(empty_body())?) .body(EmptyBody::new())?)
} }
} else { } else {
// If there is no bucket name in the request, // If there is no bucket name in the request,
@ -138,14 +139,14 @@ pub async fn handle_options_s3api(
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*") .header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET") .header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
.status(StatusCode::OK) .status(StatusCode::OK)
.body(empty_body())?) .body(EmptyBody::new())?)
} }
} }
pub fn handle_options_for_bucket( pub fn handle_options_for_bucket(
req: &Request<IncomingBody>, req: &Request<IncomingBody>,
bucket: &Bucket, bucket: &Bucket,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<EmptyBody>, CommonError> {
let origin = req let origin = req
.headers() .headers()
.get("Origin") .get("Origin")
@ -168,13 +169,15 @@ pub fn handle_options_for_bucket(
if let Some(rule) = matching_rule { if let Some(rule) = matching_rule {
let mut resp = Response::builder() let mut resp = Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.body(empty_body())?; .body(EmptyBody::new())?;
add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?; add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
return Ok(resp); return Ok(resp);
} }
} }
Err(Error::forbidden("This CORS request is not allowed.")) Err(CommonError::Forbidden(
"This CORS request is not allowed.".into(),
))
} }
pub fn find_matching_cors_rule<'a>( pub fn find_matching_cors_rule<'a>(
@ -216,7 +219,7 @@ where
} }
pub fn add_cors_headers( pub fn add_cors_headers(
resp: &mut Response<ResBody>, resp: &mut Response<impl Body>,
rule: &GarageCorsRule, rule: &GarageCorsRule,
) -> Result<(), http::header::InvalidHeaderValue> { ) -> Result<(), http::header::InvalidHeaderValue> {
let h = resp.headers_mut(); let h = resp.headers_mut();

View file

@ -4,8 +4,6 @@ use err_derive::Error;
use hyper::header::HeaderValue; use hyper::header::HeaderValue;
use hyper::{HeaderMap, StatusCode}; use hyper::{HeaderMap, StatusCode};
use garage_model::helper::error::Error as HelperError;
use crate::common_error::CommonError; use crate::common_error::CommonError;
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError}; pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
use crate::generic_server::ApiError; use crate::generic_server::ApiError;
@ -63,10 +61,6 @@ pub enum Error {
#[error(display = "Invalid XML: {}", _0)] #[error(display = "Invalid XML: {}", _0)]
InvalidXml(String), InvalidXml(String),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
/// The client sent a range header with invalid value /// The client sent a range header with invalid value
#[error(display = "Invalid HTTP range: {:?}", _0)] #[error(display = "Invalid HTTP range: {:?}", _0)]
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)), InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
@ -87,18 +81,6 @@ where
impl CommonErrorDerivative for Error {} impl CommonErrorDerivative for Error {}
impl From<HelperError> for Error {
fn from(err: HelperError) -> Self {
match err {
HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
e => Self::bad_request(format!("{}", e)),
}
}
}
impl From<roxmltree::Error> for Error { impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self { fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err)) Self::InvalidXml(format!("{}", err))
@ -119,7 +101,6 @@ impl From<SignatureError> for Error {
Self::AuthorizationHeaderMalformed(c) Self::AuthorizationHeaderMalformed(c)
} }
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i), SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
} }
} }
} }
@ -144,9 +125,7 @@ impl Error {
Error::NotImplemented(_) => "NotImplemented", Error::NotImplemented(_) => "NotImplemented",
Error::InvalidXml(_) => "MalformedXML", Error::InvalidXml(_) => "MalformedXML",
Error::InvalidRange(_) => "InvalidRange", Error::InvalidRange(_) => "InvalidRange",
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => { Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
"InvalidRequest"
}
} }
} }
} }
@ -166,8 +145,7 @@ impl ApiError for Error {
| Error::EntityTooSmall | Error::EntityTooSmall
| Error::InvalidXml(_) | Error::InvalidXml(_)
| Error::InvalidUtf8Str(_) | Error::InvalidUtf8Str(_)
| Error::InvalidUtf8String(_) | Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,
| Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
} }
} }
@ -190,7 +168,7 @@ impl ApiError for Error {
} }
} }
fn http_body(&self, garage_region: &str, path: &str) -> BytesBody { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
let error = s3_xml::Error { let error = s3_xml::Error {
code: s3_xml::Value(self.aws_code().to_string()), code: s3_xml::Value(self.aws_code().to_string()),
message: s3_xml::Value(format!("{}", self)), message: s3_xml::Value(format!("{}", self)),
@ -207,6 +185,6 @@ impl ApiError for Error {
"# "#
.into() .into()
}); });
BytesBody::from(bytes::Bytes::from(error_str.into_bytes())) error_body(error_str)
} }
} }

View file

@ -11,7 +11,8 @@ use http::header::{
use hyper::{body::Body, Request, Response, StatusCode}; use hyper::{body::Body, Request, Response, StatusCode};
use tokio::sync::mpsc; use tokio::sync::mpsc;
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag}; use garage_block::manager::BlockStream;
use garage_rpc::rpc_helper::OrderTag;
use garage_table::EmptyKey; use garage_table::EmptyKey;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::OkOrMessage; use garage_util::error::OkOrMessage;
@ -245,7 +246,7 @@ pub async fn handle_get(
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?) Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
} }
ObjectVersionData::FirstBlock(_, first_block_hash) => { ObjectVersionData::FirstBlock(_, first_block_hash) => {
let (tx, rx) = mpsc::channel(2); let (tx, rx) = mpsc::channel::<BlockStream>(2);
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let first_block_hash = *first_block_hash; let first_block_hash = *first_block_hash;
@ -283,25 +284,13 @@ pub async fn handle_get(
{ {
Ok(()) => (), Ok(()) => (),
Err(e) => { Err(e) => {
let err = std::io::Error::new( let _ = tx.send(error_stream_item(e)).await;
std::io::ErrorKind::Other,
format!("Error while getting object data: {}", e),
);
let _ = tx
.send(Box::pin(stream::once(future::ready(Err(err)))))
.await;
} }
} }
}); });
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) let body = response_body_from_block_stream(rx);
.flatten() Ok(resp_builder.body(body)?)
.map(|x| {
x.map(hyper::body::Frame::data)
.map_err(|e| Error::from(garage_util::error::Error::from(e)))
});
let body = http_body_util::StreamBody::new(body_stream);
Ok(resp_builder.body(ResBody::new(body))?)
} }
} }
} }
@ -461,67 +450,75 @@ fn body_from_blocks_range(
} }
let order_stream = OrderTag::stream(); let order_stream = OrderTag::stream();
let mut body_stream = let (tx, rx) = mpsc::channel::<BlockStream>(2);
futures::stream::iter(blocks)
.enumerate()
.map(move |(i, (block, block_offset))| {
let garage = garage.clone();
async move {
garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
.await
.unwrap_or_else(|e| error_stream(i, e))
.scan(block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
let r = if *chunk_offset >= end {
// The current chunk is after the part we want to read.
// Returning None here will stop the scan, the rest of the
// stream will be ignored
None
} else if *chunk_offset + chunk_len <= begin {
// The current chunk is before the part we want to read.
// We return a None that will be removed by the filter_map
// below.
Some(None)
} else {
// The chunk has an intersection with the requested range
let start_in_chunk = if *chunk_offset > begin {
0
} else {
begin - *chunk_offset
};
let end_in_chunk = if *chunk_offset + chunk_len < end {
chunk_len
} else {
end - *chunk_offset
};
Some(Some(Ok(chunk_bytes.slice(
start_in_chunk as usize..end_in_chunk as usize,
))))
};
*chunk_offset += chunk_bytes.len() as u64;
r
}
Err(e) => Some(Some(Err(e))),
};
futures::future::ready(r)
})
.filter_map(futures::future::ready)
}
});
let (tx, rx) = mpsc::channel(2);
tokio::spawn(async move { tokio::spawn(async move {
while let Some(item) = body_stream.next().await { match async {
if tx.send(item.await).await.is_err() { let garage = garage.clone();
break; // connection closed by client for (i, (block, block_offset)) in blocks.iter().enumerate() {
let block_stream = garage
.block_manager
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
.await?
.scan(*block_offset, move |chunk_offset, chunk| {
let r = match chunk {
Ok(chunk_bytes) => {
let chunk_len = chunk_bytes.len() as u64;
let r = if *chunk_offset >= end {
// The current chunk is after the part we want to read.
// Returning None here will stop the scan, the rest of the
// stream will be ignored
None
} else if *chunk_offset + chunk_len <= begin {
// The current chunk is before the part we want to read.
// We return a None that will be removed by the filter_map
// below.
Some(None)
} else {
// The chunk has an intersection with the requested range
let start_in_chunk = if *chunk_offset > begin {
0
} else {
begin - *chunk_offset
};
let end_in_chunk = if *chunk_offset + chunk_len < end {
chunk_len
} else {
end - *chunk_offset
};
Some(Some(Ok(chunk_bytes
.slice(start_in_chunk as usize..end_in_chunk as usize))))
};
*chunk_offset += chunk_bytes.len() as u64;
r
}
Err(e) => Some(Some(Err(e))),
};
futures::future::ready(r)
})
.filter_map(futures::future::ready);
let block_stream: BlockStream = Box::pin(block_stream);
tx.send(Box::pin(block_stream))
.await
.ok_or_message("channel closed")?;
}
Ok::<(), Error>(())
}
.await
{
Ok(()) => (),
Err(e) => {
let _ = tx.send(error_stream_item(e)).await;
} }
} }
}); });
response_body_from_block_stream(rx)
}
fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx) let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
.flatten() .flatten()
.map(|x| { .map(|x| {
@ -531,11 +528,10 @@ fn body_from_blocks_range(
ResBody::new(http_body_util::StreamBody::new(body_stream)) ResBody::new(http_body_util::StreamBody::new(body_stream))
} }
fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream { fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
Box::pin(futures::stream::once(async move { let err = std::io::Error::new(
Err(std::io::Error::new( std::io::ErrorKind::Other,
std::io::ErrorKind::Other, format!("Error while getting object data: {}", e),
format!("Could not get block {}: {}", i, e), );
)) Box::pin(stream::once(future::ready(Err(err))))
}))
} }

View file

@ -1,8 +1,7 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use futures::{prelude::*, TryStreamExt}; use futures::prelude::*;
use http_body_util::BodyStream;
use hyper::{Request, Response}; use hyper::{Request, Response};
use md5::{Digest as Md5Digest, Md5}; use md5::{Digest as Md5Digest, Md5};
@ -89,10 +88,8 @@ pub async fn handle_put_part(
// Read first chuck, and at the same time try to get object to see if it exists // Read first chuck, and at the same time try to get object to see if it exists
let key = key.to_string(); let key = key.to_string();
let body_stream = BodyStream::new(req.into_body()) let stream = body_stream(req.into_body());
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap let mut chunker = StreamChunker::new(stream, garage.config.block_size);
.map_err(Error::from);
let mut chunker = StreamChunker::new(body_stream, garage.config.block_size);
let ((_, _, mut mpu), first_block) = futures::try_join!( let ((_, _, mut mpu), first_block) = futures::try_join!(
get_upload(&garage, &bucket_id, &key, &upload_id), get_upload(&garage, &bucket_id, &key, &upload_id),

View file

@ -7,8 +7,7 @@ use std::task::{Context, Poll};
use base64::prelude::*; use base64::prelude::*;
use bytes::Bytes; use bytes::Bytes;
use chrono::{DateTime, Duration, Utc}; use chrono::{DateTime, Duration, Utc};
use futures::{Stream, StreamExt, TryStreamExt}; use futures::{Stream, StreamExt};
use http_body_util::BodyStream;
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue}; use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use multer::{Constraints, Multipart, SizeLimit}; use multer::{Constraints, Multipart, SizeLimit};
@ -45,10 +44,8 @@ pub async fn handle_post_object(
); );
let (head, body) = req.into_parts(); let (head, body) = req.into_parts();
let body_stream = BodyStream::new(body) let stream = body_stream::<_, Error>(body);
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
.map_err(Error::from);
let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints);
let mut params = HeaderMap::new(); let mut params = HeaderMap::new();
let field = loop { let field = loop {

View file

@ -4,13 +4,13 @@ use std::sync::Arc;
use base64::prelude::*; use base64::prelude::*;
use futures::prelude::*; use futures::prelude::*;
use futures::try_join; use futures::try_join;
use http_body_util::BodyStream;
use hyper::body::Bytes;
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256; use sha2::Sha256;
use hyper::body::{Body, Bytes};
use hyper::header::{HeaderMap, HeaderValue};
use hyper::{Request, Response};
use opentelemetry::{ use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context, Context,
@ -51,14 +51,12 @@ pub async fn handle_put(
None => None, None => None,
}; };
let body_stream = BodyStream::new(req.into_body()) let stream = body_stream(req.into_body());
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
.map_err(Error::from);
save_stream( save_stream(
garage, garage,
headers, headers,
body_stream, stream,
bucket, bucket,
key, key,
content_md5, content_md5,

View file

@ -18,10 +18,6 @@ pub enum Error {
/// The request contained an invalid UTF-8 sequence in its path or in other parameters /// The request contained an invalid UTF-8 sequence in its path or in other parameters
#[error(display = "Invalid UTF-8: {}", _0)] #[error(display = "Invalid UTF-8: {}", _0)]
InvalidUtf8Str(#[error(source)] std::str::Utf8Error), InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
/// The client sent a header with invalid value
#[error(display = "Invalid header value: {}", _0)]
InvalidHeader(#[error(source)] hyper::header::ToStrError),
} }
impl<T> From<T> for Error impl<T> From<T> for Error

View file

@ -1,6 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use chrono::{DateTime, Duration, NaiveDateTime, Utc}; use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc};
use hmac::Mac; use hmac::Mac;
use hyper::{body::Incoming as IncomingBody, Method, Request}; use hyper::{body::Incoming as IncomingBody, Method, Request};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
@ -316,7 +316,7 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> { pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
let date: NaiveDateTime = let date: NaiveDateTime =
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?; NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
Ok(DateTime::from_utc(date, Utc)) Ok(Utc.from_utc_datetime(&date))
} }
pub async fn verify_v4( pub async fn verify_v4(

View file

@ -1,11 +1,11 @@
use std::pin::Pin; use std::pin::Pin;
use chrono::{DateTime, NaiveDateTime, Utc}; use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
use futures::prelude::*; use futures::prelude::*;
use futures::task; use futures::task;
use garage_model::key_table::Key; use garage_model::key_table::Key;
use hmac::Mac; use hmac::Mac;
use http_body_util::{BodyStream, StreamBody}; use http_body_util::StreamBody;
use hyper::body::{Bytes, Incoming as IncomingBody}; use hyper::body::{Bytes, Incoming as IncomingBody};
use hyper::Request; use hyper::Request;
@ -44,18 +44,16 @@ pub fn parse_streaming_body(
.to_str()?; .to_str()?;
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME) let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
.ok_or_bad_request("Invalid date")?; .ok_or_bad_request("Invalid date")?;
let date: DateTime<Utc> = DateTime::from_utc(date, Utc); let date: DateTime<Utc> = Utc.from_utc_datetime(&date);
let scope = compute_scope(&date, region, service); let scope = compute_scope(&date, region, service);
let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service) let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
.ok_or_internal_error("Unable to build signing HMAC")?; .ok_or_internal_error("Unable to build signing HMAC")?;
Ok(req.map(move |body| { Ok(req.map(move |body| {
let body_stream = BodyStream::new(body) let stream = body_stream::<_, Error>(body);
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
.map_err(Error::from);
let signed_payload_stream = let signed_payload_stream =
SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature) SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
.map(|x| x.map(hyper::body::Frame::data)) .map(|x| x.map(hyper::body::Frame::data))
.map_err(Error::from); .map_err(Error::from);
ReqBody::new(StreamBody::new(signed_payload_stream)) ReqBody::new(StreamBody::new(signed_payload_stream))

View file

@ -53,6 +53,9 @@ pub const INLINE_THRESHOLD: usize = 3072;
// to delete the block locally. // to delete the block locally.
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
pub type BlockStream =
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
/// RPC messages used to share blocks of data between nodes /// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpc { pub enum BlockRpc {
@ -324,10 +327,7 @@ impl BlockManager {
&self, &self,
hash: &Hash, hash: &Hash,
order_tag: Option<OrderTag>, order_tag: Option<OrderTag>,
) -> Result< ) -> Result<BlockStream, Error> {
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
Error,
> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?; let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header { match header {
DataBlockHeader::Plain => Ok(stream), DataBlockHeader::Plain => Ok(stream),

View file

@ -66,7 +66,9 @@ aws-sdk-s3.workspace = true
chrono.workspace = true chrono.workspace = true
http.workspace = true http.workspace = true
hmac.workspace = true hmac.workspace = true
http-body-util.workspace = true
hyper.workspace = true hyper.workspace = true
hyper-util.workspace = true
mktemp.workspace = true mktemp.workspace = true
sha2.workspace = true sha2.workspace = true

View file

@ -113,12 +113,11 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
if let Some(web_config) = &config.s3_web { if let Some(web_config) = &config.s3_web {
info!("Initializing web server..."); info!("Initializing web server...");
let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone());
servers.push(( servers.push((
"Web", "Web",
tokio::spawn(WebServer::run( tokio::spawn(web_server.run(
garage.clone(),
web_config.bind_addr.clone(), web_config.bind_addr.clone(),
web_config.root_domain.clone(),
wait_from(watch_cancel.clone()), wait_from(watch_cancel.clone()),
)), )),
)); ));

View file

@ -5,12 +5,17 @@ use std::convert::TryFrom;
use chrono::{offset::Utc, DateTime}; use chrono::{offset::Utc, DateTime};
use hmac::{Hmac, Mac}; use hmac::{Hmac, Mac};
use hyper::client::HttpConnector; use http_body_util::BodyExt;
use hyper::{Body, Client, Method, Request, Response, Uri}; use http_body_util::Full as FullBody;
use hyper::{Method, Request, Response, Uri};
use hyper_util::client::legacy::{connect::HttpConnector, Client};
use hyper_util::rt::TokioExecutor;
use super::garage::{Instance, Key}; use super::garage::{Instance, Key};
use garage_api::signature; use garage_api::signature;
pub type Body = FullBody<hyper::body::Bytes>;
/// You should ever only use this to send requests AWS sdk won't send, /// You should ever only use this to send requests AWS sdk won't send,
/// like to reproduce behavior of unusual implementations found to be /// like to reproduce behavior of unusual implementations found to be
/// problematic. /// problematic.
@ -19,7 +24,7 @@ pub struct CustomRequester {
key: Key, key: Key,
uri: Uri, uri: Uri,
service: &'static str, service: &'static str,
client: Client<HttpConnector>, client: Client<HttpConnector, Body>,
} }
impl CustomRequester { impl CustomRequester {
@ -28,7 +33,7 @@ impl CustomRequester {
key: key.clone(), key: key.clone(),
uri: instance.s3_uri(), uri: instance.s3_uri(),
service: "s3", service: "s3",
client: Client::new(), client: Client::builder(TokioExecutor::new()).build_http(),
} }
} }
@ -37,7 +42,7 @@ impl CustomRequester {
key: key.clone(), key: key.clone(),
uri: instance.k2v_uri(), uri: instance.k2v_uri(),
service: "k2v", service: "k2v",
client: Client::new(), client: Client::builder(TokioExecutor::new()).build_http(),
} }
} }
@ -139,7 +144,7 @@ impl<'a> RequestBuilder<'a> {
self self
} }
pub async fn send(&mut self) -> hyper::Result<Response<Body>> { pub async fn send(&mut self) -> Result<Response<Body>, String> {
// TODO this is a bit incorrect in that path and query params should be url-encoded and // TODO this is a bit incorrect in that path and query params should be url-encoded and
// aren't, but this is good enought for now. // aren't, but this is good enought for now.
@ -242,7 +247,22 @@ impl<'a> RequestBuilder<'a> {
.method(self.method.clone()) .method(self.method.clone())
.body(Body::from(body)) .body(Body::from(body))
.unwrap(); .unwrap();
self.requester.client.request(request).await
let result = self
.requester
.client
.request(request)
.await
.map_err(|err| format!("hyper client error: {}", err))?;
let (head, body) = result.into_parts();
let body = Body::new(
body.collect()
.await
.map_err(|err| format!("hyper client error in body.collect: {}", err))?
.to_bytes(),
);
Ok(Response::from_parts(head, body))
} }
} }

View file

@ -7,7 +7,8 @@ use base64::prelude::*;
use serde_json::json; use serde_json::json;
use crate::json_body; use crate::json_body;
use hyper::{body::HttpBody, Method, StatusCode}; use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
#[tokio::test] #[tokio::test]
async fn test_batch() { async fn test_batch() {

View file

@ -7,7 +7,8 @@ use base64::prelude::*;
use serde_json::json; use serde_json::json;
use crate::json_body; use crate::json_body;
use hyper::{body::HttpBody, Method, StatusCode}; use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
#[tokio::test] #[tokio::test]
async fn test_items_and_indices() { async fn test_items_and_indices() {

View file

@ -1,5 +1,6 @@
use base64::prelude::*; use base64::prelude::*;
use hyper::{body::HttpBody, Method, StatusCode}; use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
use std::time::Duration; use std::time::Duration;
use assert_json_diff::assert_json_eq; use assert_json_diff::assert_json_eq;

View file

@ -1,6 +1,7 @@
use crate::common; use crate::common;
use hyper::{body::HttpBody, Method, StatusCode}; use http_body_util::BodyExt;
use hyper::{Method, StatusCode};
#[tokio::test] #[tokio::test]
async fn test_simple() { async fn test_simple() {

View file

@ -11,9 +11,14 @@ mod k2v;
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
mod k2v_client; mod k2v_client;
use hyper::{body::HttpBody, Body, Response}; use http_body_util::BodyExt;
use hyper::{body::Body, Response};
pub async fn json_body(res: Response<Body>) -> serde_json::Value { pub async fn json_body<B>(res: Response<B>) -> serde_json::Value
where
B: Body,
<B as Body>::Error: std::fmt::Debug,
{
let body = res.into_body().collect().await.unwrap().to_bytes(); let body = res.into_body().collect().await.unwrap().to_bytes();
let res_body: serde_json::Value = serde_json::from_slice(&body).unwrap(); let res_body: serde_json::Value = serde_json::from_slice(&body).unwrap();
res_body res_body

View file

@ -8,15 +8,18 @@ use aws_sdk_s3::{
types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration}, types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
}; };
use http::{Request, StatusCode}; use http::{Request, StatusCode};
use hyper::{ use http_body_util::BodyExt;
body::{Body, HttpBody}, use http_body_util::Full as FullBody;
Client, use hyper::body::Bytes;
}; use hyper_util::client::legacy::Client;
use hyper_util::rt::TokioExecutor;
use serde_json::json; use serde_json::json;
const BODY: &[u8; 16] = b"<h1>bonjour</h1>"; const BODY: &[u8; 16] = b"<h1>bonjour</h1>";
const BODY_ERR: &[u8; 6] = b"erreur"; const BODY_ERR: &[u8; 6] = b"erreur";
pub type Body = FullBody<Bytes>;
#[tokio::test] #[tokio::test]
async fn test_website() { async fn test_website() {
const BCKT_NAME: &str = "my-website"; const BCKT_NAME: &str = "my-website";
@ -34,14 +37,14 @@ async fn test_website() {
.await .await
.unwrap(); .unwrap();
let client = Client::new(); let client = Client::builder(TokioExecutor::new()).build_http();
let req = || { let req = || {
Request::builder() Request::builder()
.method("GET") .method("GET")
.uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port)) .uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -49,7 +52,7 @@ async fn test_website() {
assert_eq!(resp.status(), StatusCode::NOT_FOUND); assert_eq!(resp.status(), StatusCode::NOT_FOUND);
assert_ne!( assert_ne!(
resp.into_body().collect().await.unwrap().to_bytes(), BodyExt::collect(resp.into_body()).await.unwrap().to_bytes(),
BODY.as_ref() BODY.as_ref()
); /* check that we do not leak body */ ); /* check that we do not leak body */
@ -61,7 +64,7 @@ async fn test_website() {
ctx.garage.admin_port, ctx.garage.admin_port,
BCKT_NAME.to_string() BCKT_NAME.to_string()
)) ))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -103,7 +106,7 @@ async fn test_website() {
"http://127.0.0.1:{0}/check?domain={1}", "http://127.0.0.1:{0}/check?domain={1}",
ctx.garage.admin_port, bname ctx.garage.admin_port, bname
)) ))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -136,7 +139,7 @@ async fn test_website() {
ctx.garage.admin_port, ctx.garage.admin_port,
BCKT_NAME.to_string() BCKT_NAME.to_string()
)) ))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -248,7 +251,7 @@ async fn test_website_s3_api() {
); );
} }
let client = Client::new(); let client = Client::builder(TokioExecutor::new()).build_http();
// Test direct requests with CORS // Test direct requests with CORS
{ {
@ -257,7 +260,7 @@ async fn test_website_s3_api() {
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com") .header("Origin", "https://example.com")
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap(); .unwrap();
let resp = client.request(req).await.unwrap(); let resp = client.request(req).await.unwrap();
@ -282,7 +285,7 @@ async fn test_website_s3_api() {
ctx.garage.web_port ctx.garage.web_port
)) ))
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap(); .unwrap();
let resp = client.request(req).await.unwrap(); let resp = client.request(req).await.unwrap();
@ -302,7 +305,7 @@ async fn test_website_s3_api() {
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com") .header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT") .header("Access-Control-Request-Method", "PUT")
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap(); .unwrap();
let resp = client.request(req).await.unwrap(); let resp = client.request(req).await.unwrap();
@ -326,7 +329,7 @@ async fn test_website_s3_api() {
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com") .header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "DELETE") .header("Access-Control-Request-Method", "DELETE")
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap(); .unwrap();
let resp = client.request(req).await.unwrap(); let resp = client.request(req).await.unwrap();
@ -367,7 +370,7 @@ async fn test_website_s3_api() {
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.header("Origin", "https://example.com") .header("Origin", "https://example.com")
.header("Access-Control-Request-Method", "PUT") .header("Access-Control-Request-Method", "PUT")
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap(); .unwrap();
let resp = client.request(req).await.unwrap(); let resp = client.request(req).await.unwrap();
@ -393,7 +396,7 @@ async fn test_website_s3_api() {
.method("GET") .method("GET")
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port)) .uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
.header("Host", format!("{}.web.garage", BCKT_NAME)) .header("Host", format!("{}.web.garage", BCKT_NAME))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap(); .unwrap();
let resp = client.request(req).await.unwrap(); let resp = client.request(req).await.unwrap();
@ -409,13 +412,13 @@ async fn test_website_s3_api() {
async fn test_website_check_domain() { async fn test_website_check_domain() {
let ctx = common::context(); let ctx = common::context();
let client = Client::new(); let client = Client::builder(TokioExecutor::new()).build_http();
let admin_req = || { let admin_req = || {
Request::builder() Request::builder()
.method("GET") .method("GET")
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port)) .uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -439,7 +442,7 @@ async fn test_website_check_domain() {
"http://127.0.0.1:{}/check?domain=", "http://127.0.0.1:{}/check?domain=",
ctx.garage.admin_port ctx.garage.admin_port
)) ))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -463,7 +466,7 @@ async fn test_website_check_domain() {
"http://127.0.0.1:{}/check?domain=foobar", "http://127.0.0.1:{}/check?domain=foobar",
ctx.garage.admin_port ctx.garage.admin_port
)) ))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };
@ -487,7 +490,7 @@ async fn test_website_check_domain() {
"http://127.0.0.1:{}/check?domain=%E2%98%B9", "http://127.0.0.1:{}/check?domain=%E2%98%B9",
ctx.garage.admin_port ctx.garage.admin_port
)) ))
.body(Body::empty()) .body(Body::new(Bytes::new()))
.unwrap() .unwrap()
}; };

View file

@ -13,11 +13,13 @@ base64.workspace = true
sha2.workspace = true sha2.workspace = true
hex.workspace = true hex.workspace = true
http.workspace = true http.workspace = true
http-body-util.workspace = true
log.workspace = true log.workspace = true
aws-sigv4.workspace = true aws-sigv4.workspace = true
aws-sdk-config.workspace = true aws-sdk-config.workspace = true
percent-encoding.workspace = true percent-encoding.workspace = true
hyper = { workspace = true, default-features = false, features = ["client", "http1", "http2"] } hyper = { workspace = true, default-features = false, features = ["http1", "http2"] }
hyper-util.workspace = true
hyper-rustls.workspace = true hyper-rustls.workspace = true
serde.workspace = true serde.workspace = true
serde_json.workspace = true serde_json.workspace = true

View file

@ -22,6 +22,8 @@ pub enum Error {
Http(#[from] http::Error), Http(#[from] http::Error),
#[error("hyper error: {0}")] #[error("hyper error: {0}")]
Hyper(#[from] hyper::Error), Hyper(#[from] hyper::Error),
#[error("hyper client error: {0}")]
HyperClient(#[from] hyper_util::client::legacy::Error),
#[error("invalid header: {0}")] #[error("invalid header: {0}")]
Header(#[from] hyper::header::ToStrError), Header(#[from] hyper::header::ToStrError),
#[error("deserialization error: {0}")] #[error("deserialization error: {0}")]

View file

@ -9,9 +9,11 @@ use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
use http::header::{ACCEPT, CONTENT_TYPE}; use http::header::{ACCEPT, CONTENT_TYPE};
use http::status::StatusCode; use http::status::StatusCode;
use http::{HeaderName, HeaderValue, Request}; use http::{HeaderName, HeaderValue, Request};
use hyper::{body::Bytes, body::HttpBody, Body}; use http_body_util::{BodyExt, Full as FullBody};
use hyper::{client::connect::HttpConnector, Client as HttpClient}; use hyper::{body::Body as BodyTrait, body::Bytes};
use hyper_rustls::HttpsConnector; use hyper_rustls::HttpsConnector;
use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
use hyper_util::rt::TokioExecutor;
use aws_sdk_config::config::Credentials; use aws_sdk_config::config::Credentials;
use aws_sigv4::http_request::{sign, SignableBody, SignableRequest, SigningSettings}; use aws_sigv4::http_request::{sign, SignableBody, SignableRequest, SigningSettings};
@ -24,6 +26,8 @@ mod error;
pub use error::Error; pub use error::Error;
pub type Body = FullBody<Bytes>;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300); const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
const SERVICE: &str = "k2v"; const SERVICE: &str = "k2v";
@ -55,19 +59,19 @@ pub struct K2vClientConfig {
pub struct K2vClient { pub struct K2vClient {
config: K2vClientConfig, config: K2vClientConfig,
user_agent: HeaderValue, user_agent: HeaderValue,
client: HttpClient<HttpsConnector<HttpConnector>>, client: HttpClient<HttpsConnector<HttpConnector>, Body>,
} }
impl K2vClient { impl K2vClient {
/// Create a new K2V client. /// Create a new K2V client.
pub fn new(config: K2vClientConfig) -> Result<Self, Error> { pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
let connector = hyper_rustls::HttpsConnectorBuilder::new() let connector = hyper_rustls::HttpsConnectorBuilder::new()
.with_native_roots() .with_native_roots()?
.https_or_http() .https_or_http()
.enable_http1() .enable_http1()
.enable_http2() .enable_http2()
.build(); .build();
let client = HttpClient::builder().build(connector); let client = HttpClient::builder(TokioExecutor::new()).build(connector);
let user_agent: std::borrow::Cow<str> = match &config.user_agent { let user_agent: std::borrow::Cow<str> = match &config.user_agent {
Some(ua) => ua.into(), Some(ua) => ua.into(),
None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(), None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
@ -395,7 +399,7 @@ impl K2vClient {
// Sign and then apply the signature to the request // Sign and then apply the signature to the request
let (signing_instructions, _signature) = let (signing_instructions, _signature) =
sign(signable_request, &signing_params)?.into_parts(); sign(signable_request, &signing_params)?.into_parts();
signing_instructions.apply_to_request_http0x(&mut req); signing_instructions.apply_to_request_http1x(&mut req);
// Send and wait for timeout // Send and wait for timeout
let res = tokio::select! { let res = tokio::select! {
@ -416,7 +420,7 @@ impl K2vClient {
}; };
let body = match res.status { let body = match res.status {
StatusCode::OK => body.collect().await?.to_bytes(), StatusCode::OK => BodyExt::collect(body).await?.to_bytes(),
StatusCode::NO_CONTENT => Bytes::new(), StatusCode::NO_CONTENT => Bytes::new(),
StatusCode::NOT_FOUND => return Err(Error::NotFound), StatusCode::NOT_FOUND => return Err(Error::NotFound),
StatusCode::NOT_MODIFIED => Bytes::new(), StatusCode::NOT_MODIFIED => Bytes::new(),

View file

@ -4,16 +4,12 @@ use std::{convert::Infallible, sync::Arc};
use futures::future::Future; use futures::future::Future;
use hyper::server::conn::http1;
use hyper::{ use hyper::{
body::Incoming as IncomingBody, body::Incoming as IncomingBody,
header::{HeaderValue, HOST}, header::{HeaderValue, HOST},
service::service_fn,
Method, Request, Response, StatusCode, Method, Request, Response, StatusCode,
}; };
use hyper_util::rt::TokioIo;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::{TcpListener, UnixListener}; use tokio::net::{TcpListener, UnixListener};
use opentelemetry::{ use opentelemetry::{
@ -25,6 +21,7 @@ use opentelemetry::{
use crate::error::*; use crate::error::*;
use garage_api::generic_server::{server_loop, UnixListenerOn};
use garage_api::helpers::*; use garage_api::helpers::*;
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket}; use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
use garage_api::s3::error::{ use garage_api::s3::error::{
@ -75,35 +72,29 @@ pub struct WebServer {
impl WebServer { impl WebServer {
/// Run a web server /// Run a web server
pub async fn run( pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> {
garage: Arc<Garage>,
bind_addr: UnixOrTCPSocketAddress,
root_domain: String,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
let metrics = Arc::new(WebMetrics::new()); let metrics = Arc::new(WebMetrics::new());
let web_server = Arc::new(WebServer { Arc::new(WebServer {
garage, garage,
metrics, metrics,
root_domain, root_domain,
}); })
}
pub async fn run(
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
shutdown_signal: impl Future<Output = ()>,
) -> Result<(), GarageError> {
info!("Web server listening on {}", bind_addr); info!("Web server listening on {}", bind_addr);
tokio::pin!(shutdown_signal);
match bind_addr { match bind_addr {
UnixOrTCPSocketAddress::TCPSocket(addr) => { UnixOrTCPSocketAddress::TCPSocket(addr) => {
let listener = TcpListener::bind(addr).await?; let listener = TcpListener::bind(addr).await?;
loop { let handler =
let (stream, client_addr) = tokio::select! { move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
acc = listener.accept() => acc?, server_loop(listener, handler, shutdown_signal).await
_ = &mut shutdown_signal => break,
};
web_server.launch_handler(stream, client_addr.to_string());
}
} }
UnixOrTCPSocketAddress::UnixSocket(ref path) => { UnixOrTCPSocketAddress::UnixSocket(ref path) => {
if path.exists() { if path.exists() {
@ -111,50 +102,22 @@ impl WebServer {
} }
let listener = UnixListener::bind(path)?; let listener = UnixListener::bind(path)?;
let listener = UnixListenerOn(listener, path.display().to_string());
fs::set_permissions(path, Permissions::from_mode(0o222))?; fs::set_permissions(path, Permissions::from_mode(0o222))?;
loop { let handler =
let (stream, _) = tokio::select! { move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
acc = listener.accept() => acc?, server_loop(listener, handler, shutdown_signal).await
_ = &mut shutdown_signal => break,
};
web_server.launch_handler(stream, path.display().to_string());
}
} }
}; }
Ok(())
}
fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String)
where
S: AsyncRead + AsyncWrite + Send + Sync + 'static,
{
let this = self.clone();
let io = TokioIo::new(stream);
let serve = move |req: Request<IncomingBody>| {
this.clone().handle_request(req, client_addr.to_string())
};
tokio::task::spawn(async move {
let io = Box::pin(io);
if let Err(e) = http1::Builder::new()
.serve_connection(io, service_fn(serve))
.await
{
debug!("Error handling HTTP connection: {}", e);
}
});
} }
async fn handle_request( async fn handle_request(
self: Arc<Self>, self: Arc<Self>,
req: Request<IncomingBody>, req: Request<IncomingBody>,
addr: String, addr: String,
) -> Result<Response<BoxBody<Error>>, Infallible> { ) -> Result<Response<BoxBody<Error>>, http::Error> {
if let Ok(forwarded_for_ip_addr) = if let Ok(forwarded_for_ip_addr) =
forwarded_headers::handle_forwarded_for_headers(req.headers()) forwarded_headers::handle_forwarded_for_headers(req.headers())
{ {
@ -280,7 +243,9 @@ impl WebServer {
); );
let ret_doc = match *req.method() { let ret_doc = match *req.method() {
Method::OPTIONS => handle_options_for_bucket(req, &bucket), Method::OPTIONS => handle_options_for_bucket(req, &bucket)
.map_err(ApiError::from)
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await, Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await, Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await,
_ => Err(ApiError::bad_request("HTTP method not supported")), _ => Err(ApiError::bad_request("HTTP method not supported")),