forked from Deuxfleurs/garage
Compare commits
No commits in common. "620664ee9c550636fdbcdf57e428569ac1694603" and "0bb5b77530ad432e4c77f13b395fe74613812337" have entirely different histories.
620664ee9c
...
0bb5b77530
39 changed files with 420 additions and 716 deletions
102
Cargo.lock
generated
102
Cargo.lock
generated
|
@ -1299,9 +1299,7 @@ dependencies = [
|
|||
"hex",
|
||||
"hmac",
|
||||
"http 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.1.0",
|
||||
"hyper-util",
|
||||
"k2v-client",
|
||||
"kuska-sodiumoxide",
|
||||
"mktemp",
|
||||
|
@ -1908,7 +1906,7 @@ dependencies = [
|
|||
"hyper 0.14.28",
|
||||
"log",
|
||||
"rustls 0.20.9",
|
||||
"rustls-native-certs 0.6.3",
|
||||
"rustls-native-certs",
|
||||
"tokio",
|
||||
"tokio-rustls 0.23.4",
|
||||
]
|
||||
|
@ -1924,30 +1922,11 @@ dependencies = [
|
|||
"hyper 0.14.28",
|
||||
"log",
|
||||
"rustls 0.21.10",
|
||||
"rustls-native-certs 0.6.3",
|
||||
"rustls-native-certs",
|
||||
"tokio",
|
||||
"tokio-rustls 0.24.1",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-rustls"
|
||||
version = "0.26.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"http 1.0.0",
|
||||
"hyper 1.1.0",
|
||||
"hyper-util",
|
||||
"log",
|
||||
"rustls 0.22.2",
|
||||
"rustls-native-certs 0.7.0",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
"tokio-rustls 0.25.0",
|
||||
"tower-service",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "hyper-timeout"
|
||||
version = "0.4.1"
|
||||
|
@ -2144,10 +2123,8 @@ dependencies = [
|
|||
"format_table",
|
||||
"hex",
|
||||
"http 1.0.0",
|
||||
"http-body-util",
|
||||
"hyper 1.1.0",
|
||||
"hyper-rustls 0.26.0",
|
||||
"hyper-util",
|
||||
"hyper-rustls 0.24.2",
|
||||
"log",
|
||||
"percent-encoding",
|
||||
"serde",
|
||||
|
@ -2212,7 +2189,7 @@ dependencies = [
|
|||
"pem",
|
||||
"pin-project",
|
||||
"rustls 0.20.9",
|
||||
"rustls-pemfile 1.0.4",
|
||||
"rustls-pemfile",
|
||||
"secrecy",
|
||||
"serde",
|
||||
"serde_json",
|
||||
|
@ -3211,7 +3188,7 @@ dependencies = [
|
|||
"percent-encoding",
|
||||
"pin-project-lite",
|
||||
"rustls 0.21.10",
|
||||
"rustls-pemfile 1.0.4",
|
||||
"rustls-pemfile",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_urlencoded",
|
||||
|
@ -3357,24 +3334,10 @@ checksum = "f9d5a6813c0759e4609cd494e8e725babae6a2ca7b62a5536a13daaec6fcb7ba"
|
|||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.7",
|
||||
"rustls-webpki 0.101.7",
|
||||
"rustls-webpki",
|
||||
"sct",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls"
|
||||
version = "0.22.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"
|
||||
dependencies = [
|
||||
"log",
|
||||
"ring 0.17.7",
|
||||
"rustls-pki-types",
|
||||
"rustls-webpki 0.102.2",
|
||||
"subtle",
|
||||
"zeroize",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.6.3"
|
||||
|
@ -3382,20 +3345,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
|||
checksum = "a9aace74cb666635c918e9c12bc0d348266037aa8eb599b5cba565709a8dff00"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile 1.0.4",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-native-certs"
|
||||
version = "0.7.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"
|
||||
dependencies = [
|
||||
"openssl-probe",
|
||||
"rustls-pemfile 2.0.0",
|
||||
"rustls-pki-types",
|
||||
"rustls-pemfile",
|
||||
"schannel",
|
||||
"security-framework",
|
||||
]
|
||||
|
@ -3409,22 +3359,6 @@ dependencies = [
|
|||
"base64 0.21.7",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pemfile"
|
||||
version = "2.0.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"
|
||||
dependencies = [
|
||||
"base64 0.21.7",
|
||||
"rustls-pki-types",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-pki-types"
|
||||
version = "1.2.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf"
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.101.7"
|
||||
|
@ -3435,17 +3369,6 @@ dependencies = [
|
|||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustls-webpki"
|
||||
version = "0.102.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"
|
||||
dependencies = [
|
||||
"ring 0.17.7",
|
||||
"rustls-pki-types",
|
||||
"untrusted 0.9.0",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "rustversion"
|
||||
version = "1.0.14"
|
||||
|
@ -4075,17 +3998,6 @@ dependencies = [
|
|||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-rustls"
|
||||
version = "0.25.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"
|
||||
dependencies = [
|
||||
"rustls 0.22.2",
|
||||
"rustls-pki-types",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "tokio-stream"
|
||||
version = "0.1.14"
|
||||
|
|
134
Cargo.nix
134
Cargo.nix
|
@ -33,7 +33,7 @@ args@{
|
|||
ignoreLockHash,
|
||||
}:
|
||||
let
|
||||
nixifiedLockHash = "ae22401ab1ffcc653dc14ce17eaece13d9fb2dad1422ae9ada91c1757cc85575";
|
||||
nixifiedLockHash = "b09e8e1592cb6ec8175708b13ee4a2578aa697c18a94d5a545328078ab263b2f";
|
||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||
lockHashIgnored = if ignoreLockHash
|
||||
|
@ -1917,9 +1917,7 @@ in
|
|||
chrono = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.33" { inherit profileName; }).out;
|
||||
hmac = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hmac."0.12.1" { inherit profileName; }).out;
|
||||
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
|
||||
http_body_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" { inherit profileName; }).out;
|
||||
hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out;
|
||||
hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out;
|
||||
k2v_client = (rustPackages."unknown".k2v-client."0.0.4" { inherit profileName; }).out;
|
||||
mktemp = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.5.1" { inherit profileName; }).out;
|
||||
serde_json = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_json."1.0.113" { inherit profileName; }).out;
|
||||
|
@ -2733,37 +2731,6 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.26.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "hyper-rustls";
|
||||
version = "0.26.0";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "a0bea761b46ae2b24eb4aef630d8d1c398157b6fc29e6350ecf090a0b70c952c"; };
|
||||
features = builtins.concatLists [
|
||||
[ "default" ]
|
||||
[ "http1" ]
|
||||
[ "http2" ]
|
||||
[ "log" ]
|
||||
[ "logging" ]
|
||||
[ "native-tokio" ]
|
||||
[ "ring" ]
|
||||
[ "rustls-native-certs" ]
|
||||
[ "tls12" ]
|
||||
];
|
||||
dependencies = {
|
||||
futures_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.30" { inherit profileName; }).out;
|
||||
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
|
||||
hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out;
|
||||
hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out;
|
||||
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out;
|
||||
rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls."0.22.2" { inherit profileName; }).out;
|
||||
rustls_native_certs = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.7.0" { inherit profileName; }).out;
|
||||
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
|
||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
|
||||
tokio_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio-rustls."0.25.0" { inherit profileName; }).out;
|
||||
tower_service = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tower-service."0.3.2" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".hyper-timeout."0.4.1" = overridableMkRustCrate (profileName: rec {
|
||||
name = "hyper-timeout";
|
||||
version = "0.4.1";
|
||||
|
@ -3040,10 +3007,8 @@ in
|
|||
${ if rootFeatures' ? "k2v-client/cli" || rootFeatures' ? "k2v-client/format_table" then "format_table" else null } = (rustPackages."unknown".format_table."0.1.1" { inherit profileName; }).out;
|
||||
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
|
||||
http = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."1.0.0" { inherit profileName; }).out;
|
||||
http_body_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".http-body-util."0.1.0" { inherit profileName; }).out;
|
||||
hyper = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."1.1.0" { inherit profileName; }).out;
|
||||
hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.26.0" { inherit profileName; }).out;
|
||||
hyper_util = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-util."0.1.3" { inherit profileName; }).out;
|
||||
hyper_rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper-rustls."0.24.2" { inherit profileName; }).out;
|
||||
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out;
|
||||
percent_encoding = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".percent-encoding."2.3.1" { inherit profileName; }).out;
|
||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
|
||||
|
@ -4824,27 +4789,6 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls."0.22.2" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls";
|
||||
version = "0.22.2";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "e87c9956bd9807afa1f77e0f7594af32566e830e088a5576d27c5b6f30f49d41"; };
|
||||
features = builtins.concatLists [
|
||||
[ "log" ]
|
||||
[ "logging" ]
|
||||
[ "ring" ]
|
||||
[ "tls12" ]
|
||||
];
|
||||
dependencies = {
|
||||
log = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.20" { inherit profileName; }).out;
|
||||
ring = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ring."0.17.7" { inherit profileName; }).out;
|
||||
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
|
||||
webpki = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.102.2" { inherit profileName; }).out;
|
||||
subtle = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".subtle."2.5.0" { inherit profileName; }).out;
|
||||
zeroize = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".zeroize."1.7.0" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.6.3" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-native-certs";
|
||||
version = "0.6.3";
|
||||
|
@ -4858,20 +4802,6 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-native-certs."0.7.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-native-certs";
|
||||
version = "0.7.0";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "8f1fb85efa936c42c6d5fc28d2629bb51e4b2f4b8a5211e297d599cc5a093792"; };
|
||||
dependencies = {
|
||||
${ if hostPlatform.isUnix && !(hostPlatform.parsed.kernel.name == "darwin") then "openssl_probe" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl-probe."0.1.5" { inherit profileName; }).out;
|
||||
rustls_pemfile = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."2.0.0" { inherit profileName; }).out;
|
||||
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
|
||||
${ if hostPlatform.isWindows then "schannel" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".schannel."0.1.23" { inherit profileName; }).out;
|
||||
${ if hostPlatform.parsed.kernel.name == "darwin" then "security_framework" else null } = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".security-framework."2.9.2" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."1.0.4" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-pemfile";
|
||||
version = "1.0.4";
|
||||
|
@ -4882,33 +4812,6 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-pemfile."2.0.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-pemfile";
|
||||
version = "2.0.0";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "35e4980fa29e4c4b212ffb3db068a564cbf560e51d3944b7c88bd8bf5bec64f4"; };
|
||||
features = builtins.concatLists [
|
||||
[ "default" ]
|
||||
[ "std" ]
|
||||
];
|
||||
dependencies = {
|
||||
base64 = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".base64."0.21.7" { inherit profileName; }).out;
|
||||
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-pki-types";
|
||||
version = "1.2.0";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "0a716eb65e3158e90e17cd93d855216e27bde02745ab842f2cab4a39dba1bacf"; };
|
||||
features = builtins.concatLists [
|
||||
[ "alloc" ]
|
||||
[ "default" ]
|
||||
[ "std" ]
|
||||
];
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.101.7" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-webpki";
|
||||
version = "0.101.7";
|
||||
|
@ -4925,23 +4828,6 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustls-webpki."0.102.2" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustls-webpki";
|
||||
version = "0.102.2";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "faaa0a62740bedb9b2ef5afa303da42764c012f743917351dc9a237ea1663610"; };
|
||||
features = builtins.concatLists [
|
||||
[ "alloc" ]
|
||||
[ "ring" ]
|
||||
[ "std" ]
|
||||
];
|
||||
dependencies = {
|
||||
ring = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".ring."0.17.7" { inherit profileName; }).out;
|
||||
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
|
||||
untrusted = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.9.0" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".rustversion."1.0.14" = overridableMkRustCrate (profileName: rec {
|
||||
name = "rustversion";
|
||||
version = "1.0.14";
|
||||
|
@ -5815,22 +5701,6 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".tokio-rustls."0.25.0" = overridableMkRustCrate (profileName: rec {
|
||||
name = "tokio-rustls";
|
||||
version = "0.25.0";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "775e0c0f0adb3a2f22a00c4745d728b479985fc15ee7ca6a2608388c5569860f"; };
|
||||
features = builtins.concatLists [
|
||||
[ "logging" ]
|
||||
[ "tls12" ]
|
||||
];
|
||||
dependencies = {
|
||||
rustls = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls."0.22.2" { inherit profileName; }).out;
|
||||
pki_types = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rustls-pki-types."1.2.0" { inherit profileName; }).out;
|
||||
tokio = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.36.0" { inherit profileName; }).out;
|
||||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".tokio-stream."0.1.14" = overridableMkRustCrate (profileName: rec {
|
||||
name = "tokio-stream";
|
||||
version = "0.1.14";
|
||||
|
|
13
Cargo.toml
13
Cargo.toml
|
@ -98,7 +98,7 @@ httpdate = "1.0"
|
|||
http-range = "0.1"
|
||||
http-body-util = "0.1"
|
||||
hyper = { version = "1.0", features = ["server", "http1"] }
|
||||
hyper-util = { version = "0.1", features = [ "full" ] }
|
||||
hyper-util = { verion = "0.1", features = [ "full" ]}
|
||||
multer = "3.0"
|
||||
percent-encoding = "2.2"
|
||||
roxmltree = "0.19"
|
||||
|
@ -116,8 +116,8 @@ opentelemetry-otlp = "0.10"
|
|||
prometheus = "0.13"
|
||||
|
||||
# used by the k2v-client crate only
|
||||
aws-sigv4 = {version = "1.1" }
|
||||
hyper-rustls = { version = "0.26", features = ["http2"] }
|
||||
aws-sigv4 = {version = "1.1", features = ["http0-compat"] }
|
||||
hyper-rustls = { version = "0.24", features = ["http2"] }
|
||||
log = "0.4"
|
||||
thiserror = "1.0"
|
||||
|
||||
|
@ -131,10 +131,7 @@ aws-sdk-config = "1.13"
|
|||
aws-sdk-s3 = "1.14"
|
||||
|
||||
[profile.dev]
|
||||
lto = "thin"
|
||||
lto = "off"
|
||||
|
||||
[profile.release]
|
||||
lto = true
|
||||
codegen-units = 1
|
||||
opt-level = "s"
|
||||
strip = true
|
||||
debug = true
|
||||
|
|
|
@ -271,7 +271,7 @@ pub async fn handle_create_bucket(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<CreateBucketRequest, _, Error>(req).await?;
|
||||
let req = parse_json_body::<CreateBucketRequest>(req).await?;
|
||||
|
||||
if let Some(ga) = &req.global_alias {
|
||||
if !is_valid_bucket_name(ga) {
|
||||
|
@ -412,7 +412,7 @@ pub async fn handle_update_bucket(
|
|||
id: String,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<UpdateBucketRequest, _, Error>(req).await?;
|
||||
let req = parse_json_body::<UpdateBucketRequest>(req).await?;
|
||||
let bucket_id = parse_bucket_id(&id)?;
|
||||
|
||||
let mut bucket = garage
|
||||
|
@ -474,7 +474,7 @@ pub async fn handle_bucket_change_key_perm(
|
|||
req: Request<IncomingBody>,
|
||||
new_perm_flag: bool,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<BucketKeyPermChangeRequest, _, Error>(req).await?;
|
||||
let req = parse_json_body::<BucketKeyPermChangeRequest>(req).await?;
|
||||
|
||||
let bucket_id = parse_bucket_id(&req.bucket_id)?;
|
||||
|
||||
|
|
|
@ -64,7 +64,7 @@ pub async fn handle_connect_cluster_nodes(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<Vec<String>, _, Error>(req).await?;
|
||||
let req = parse_json_body::<Vec<String>>(req).await?;
|
||||
|
||||
let res = futures::future::join_all(req.iter().map(|node| garage.system.connect(node)))
|
||||
.await
|
||||
|
@ -206,7 +206,7 @@ pub async fn handle_update_cluster_layout(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let updates = parse_json_body::<UpdateClusterLayoutRequest, _, Error>(req).await?;
|
||||
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
|
||||
|
||||
let mut layout = garage.system.get_cluster_layout();
|
||||
|
||||
|
@ -246,7 +246,7 @@ pub async fn handle_apply_cluster_layout(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
|
||||
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
||||
|
||||
let layout = garage.system.get_cluster_layout();
|
||||
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
||||
|
@ -264,7 +264,7 @@ pub async fn handle_revert_cluster_layout(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let param = parse_json_body::<ApplyRevertLayoutRequest, _, Error>(req).await?;
|
||||
let param = parse_json_body::<ApplyRevertLayoutRequest>(req).await?;
|
||||
|
||||
let layout = garage.system.get_cluster_layout();
|
||||
let layout = layout.revert_staged_changes(Some(param.version))?;
|
||||
|
|
|
@ -7,7 +7,7 @@ pub use garage_model::helper::error::Error as HelperError;
|
|||
use crate::common_error::CommonError;
|
||||
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
|
||||
use crate::generic_server::ApiError;
|
||||
use crate::helpers::*;
|
||||
use crate::helpers::{BytesBody, CustomApiErrorBody};
|
||||
|
||||
/// Errors of this crate
|
||||
#[derive(Debug, Error)]
|
||||
|
@ -40,6 +40,18 @@ where
|
|||
|
||||
impl CommonErrorDerivative for Error {}
|
||||
|
||||
impl From<HelperError> for Error {
|
||||
fn from(err: HelperError) -> Self {
|
||||
match err {
|
||||
HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
|
||||
HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
|
||||
HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
|
||||
HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
|
||||
HelperError::NoSuchAccessKey(n) => Self::NoSuchAccessKey(n),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Error {
|
||||
fn code(&self) -> &'static str {
|
||||
match self {
|
||||
|
@ -65,7 +77,7 @@ impl ApiError for Error {
|
|||
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
}
|
||||
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> BytesBody {
|
||||
let error = CustomApiErrorBody {
|
||||
code: self.code().to_string(),
|
||||
message: format!("{}", self),
|
||||
|
@ -81,6 +93,6 @@ impl ApiError for Error {
|
|||
"#
|
||||
.into()
|
||||
});
|
||||
error_body(error_str)
|
||||
BytesBody::from(bytes::Bytes::from(error_str.into_bytes()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -65,7 +65,7 @@ pub async fn handle_create_key(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<CreateKeyRequest, _, Error>(req).await?;
|
||||
let req = parse_json_body::<CreateKeyRequest>(req).await?;
|
||||
|
||||
let key = Key::new(req.name.as_deref().unwrap_or("Unnamed key"));
|
||||
garage.key_table.insert(&key).await?;
|
||||
|
@ -83,7 +83,7 @@ pub async fn handle_import_key(
|
|||
garage: &Arc<Garage>,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<ImportKeyRequest, _, Error>(req).await?;
|
||||
let req = parse_json_body::<ImportKeyRequest>(req).await?;
|
||||
|
||||
let prev_key = garage.key_table.get(&EmptyKey, &req.access_key_id).await?;
|
||||
if prev_key.is_some() {
|
||||
|
@ -114,7 +114,7 @@ pub async fn handle_update_key(
|
|||
id: String,
|
||||
req: Request<IncomingBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let req = parse_json_body::<UpdateKeyRequest, _, Error>(req).await?;
|
||||
let req = parse_json_body::<UpdateKeyRequest>(req).await?;
|
||||
|
||||
let mut key = garage.key_helper().get_existing_key(&id).await?;
|
||||
|
||||
|
|
|
@ -3,8 +3,6 @@ use hyper::StatusCode;
|
|||
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use garage_model::helper::error::Error as HelperError;
|
||||
|
||||
/// Errors of this crate
|
||||
#[derive(Debug, Error)]
|
||||
pub enum CommonError {
|
||||
|
@ -30,10 +28,6 @@ pub enum CommonError {
|
|||
#[error(display = "Bad request: {}", _0)]
|
||||
BadRequest(String),
|
||||
|
||||
/// The client sent a header with invalid value
|
||||
#[error(display = "Invalid header value: {}", _0)]
|
||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||
|
||||
// ---- SPECIFIC ERROR CONDITIONS ----
|
||||
// These have to be error codes referenced in the S3 spec here:
|
||||
// https://docs.aws.amazon.com/AmazonS3/latest/API/ErrorResponses.html#ErrorCodeList
|
||||
|
@ -70,9 +64,7 @@ impl CommonError {
|
|||
CommonError::Forbidden(_) => StatusCode::FORBIDDEN,
|
||||
CommonError::NoSuchBucket(_) => StatusCode::NOT_FOUND,
|
||||
CommonError::BucketNotEmpty | CommonError::BucketAlreadyExists => StatusCode::CONFLICT,
|
||||
CommonError::InvalidBucketName(_) | CommonError::InvalidHeader(_) => {
|
||||
StatusCode::BAD_REQUEST
|
||||
}
|
||||
CommonError::InvalidBucketName(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -92,7 +84,6 @@ impl CommonError {
|
|||
CommonError::BucketAlreadyExists => "BucketAlreadyExists",
|
||||
CommonError::BucketNotEmpty => "BucketNotEmpty",
|
||||
CommonError::InvalidBucketName(_) => "InvalidBucketName",
|
||||
CommonError::InvalidHeader(_) => "InvalidHeaderValue",
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,18 +92,6 @@ impl CommonError {
|
|||
}
|
||||
}
|
||||
|
||||
impl From<HelperError> for CommonError {
|
||||
fn from(err: HelperError) -> Self {
|
||||
match err {
|
||||
HelperError::Internal(i) => Self::InternalError(i),
|
||||
HelperError::BadRequest(b) => Self::BadRequest(b),
|
||||
HelperError::InvalidBucketName(n) => Self::InvalidBucketName(n),
|
||||
HelperError::NoSuchBucket(n) => Self::NoSuchBucket(n),
|
||||
e => Self::bad_request(format!("{}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub trait CommonErrorDerivative: From<CommonError> {
|
||||
fn internal_error<M: ToString>(msg: M) -> Self {
|
||||
Self::from(CommonError::InternalError(GarageError::Message(
|
||||
|
|
|
@ -1,4 +1,3 @@
|
|||
use std::convert::Infallible;
|
||||
use std::fs::{self, Permissions};
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
use std::sync::Arc;
|
||||
|
@ -6,7 +5,6 @@ use std::sync::Arc;
|
|||
use async_trait::async_trait;
|
||||
|
||||
use futures::future::Future;
|
||||
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
|
||||
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::header::HeaderValue;
|
||||
|
@ -17,7 +15,7 @@ use hyper::{HeaderMap, StatusCode};
|
|||
use hyper_util::rt::TokioIo;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::{TcpListener, TcpStream, UnixListener, UnixStream};
|
||||
use tokio::net::{TcpListener, UnixListener};
|
||||
|
||||
use opentelemetry::{
|
||||
global,
|
||||
|
@ -31,7 +29,7 @@ use garage_util::forwarded_headers;
|
|||
use garage_util::metrics::{gen_trace_id, RecordDuration};
|
||||
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||
|
||||
use crate::helpers::{BoxBody, ErrorBody};
|
||||
use crate::helpers::{BoxBody, BytesBody};
|
||||
|
||||
pub(crate) trait ApiEndpoint: Send + Sync + 'static {
|
||||
fn name(&self) -> &'static str;
|
||||
|
@ -41,7 +39,7 @@ pub(crate) trait ApiEndpoint: Send + Sync + 'static {
|
|||
pub trait ApiError: std::error::Error + Send + Sync + 'static {
|
||||
fn http_status_code(&self) -> StatusCode;
|
||||
fn add_http_headers(&self, header_map: &mut HeaderMap<HeaderValue>);
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> BytesBody;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -112,12 +110,20 @@ impl<A: ApiHandler> ApiServer<A> {
|
|||
bind_addr
|
||||
);
|
||||
|
||||
tokio::pin!(shutdown_signal);
|
||||
|
||||
match bind_addr {
|
||||
UnixOrTCPSocketAddress::TCPSocket(addr) => {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
|
||||
server_loop(listener, handler, shutdown_signal).await
|
||||
loop {
|
||||
let (stream, client_addr) = tokio::select! {
|
||||
acc = listener.accept() => acc?,
|
||||
_ = &mut shutdown_signal => break,
|
||||
};
|
||||
|
||||
self.launch_handler(stream, client_addr.to_string());
|
||||
}
|
||||
}
|
||||
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
|
||||
if path.exists() {
|
||||
|
@ -125,24 +131,52 @@ impl<A: ApiHandler> ApiServer<A> {
|
|||
}
|
||||
|
||||
let listener = UnixListener::bind(path)?;
|
||||
let listener = UnixListenerOn(listener, path.display().to_string());
|
||||
|
||||
fs::set_permissions(
|
||||
path,
|
||||
Permissions::from_mode(unix_bind_addr_mode.unwrap_or(0o222)),
|
||||
)?;
|
||||
|
||||
let handler = move |request, socketaddr| self.clone().handler(request, socketaddr);
|
||||
server_loop(listener, handler, shutdown_signal).await
|
||||
loop {
|
||||
let (stream, _) = tokio::select! {
|
||||
acc = listener.accept() => acc?,
|
||||
_ = &mut shutdown_signal => break,
|
||||
};
|
||||
|
||||
self.launch_handler(stream, path.display().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String)
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Send + Sync + 'static,
|
||||
{
|
||||
let this = self.clone();
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
let serve =
|
||||
move |req: Request<IncomingBody>| this.clone().handler(req, client_addr.to_string());
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let io = Box::pin(io);
|
||||
if let Err(e) = http1::Builder::new()
|
||||
.serve_connection(io, service_fn(serve))
|
||||
.await
|
||||
{
|
||||
debug!("Error handling HTTP connection: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn handler(
|
||||
self: Arc<Self>,
|
||||
req: Request<IncomingBody>,
|
||||
addr: String,
|
||||
) -> Result<Response<BoxBody<A::Error>>, http::Error> {
|
||||
) -> Result<Response<BoxBody<A::Error>>, GarageError> {
|
||||
let uri = req.uri().clone();
|
||||
|
||||
if let Ok(forwarded_for_ip_addr) =
|
||||
|
@ -195,8 +229,7 @@ impl<A: ApiHandler> ApiServer<A> {
|
|||
} else {
|
||||
info!("Response: error {}, {}", e.http_status_code(), e);
|
||||
}
|
||||
Ok(http_error
|
||||
.map(|body| BoxBody::new(body.map_err(|_: Infallible| unreachable!()))))
|
||||
Ok(http_error.map(|body| BoxBody::new(body.map_err(|_| unreachable!()))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -245,105 +278,3 @@ impl<A: ApiHandler> ApiServer<A> {
|
|||
res
|
||||
}
|
||||
}
|
||||
|
||||
// ==== helper functions ====
|
||||
|
||||
#[async_trait]
|
||||
pub trait Accept: Send + Sync + 'static {
|
||||
type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
|
||||
async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Accept for TcpListener {
|
||||
type Stream = TcpStream;
|
||||
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
|
||||
self.accept()
|
||||
.await
|
||||
.map(|(stream, addr)| (stream, addr.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
pub struct UnixListenerOn(pub UnixListener, pub String);
|
||||
|
||||
#[async_trait]
|
||||
impl Accept for UnixListenerOn {
|
||||
type Stream = UnixStream;
|
||||
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
|
||||
self.0
|
||||
.accept()
|
||||
.await
|
||||
.map(|(stream, _addr)| (stream, self.1.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn server_loop<A, H, F, E>(
|
||||
listener: A,
|
||||
handler: H,
|
||||
shutdown_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), GarageError>
|
||||
where
|
||||
A: Accept,
|
||||
H: Fn(Request<IncomingBody>, String) -> F + Send + Sync + Clone + 'static,
|
||||
F: Future<Output = Result<Response<BoxBody<E>>, http::Error>> + Send + 'static,
|
||||
E: Send + Sync + std::error::Error + 'static,
|
||||
{
|
||||
tokio::pin!(shutdown_signal);
|
||||
|
||||
let (conn_in, mut conn_out) = tokio::sync::mpsc::unbounded_channel();
|
||||
let connection_collector = tokio::spawn(async move {
|
||||
let mut collection = FuturesUnordered::new();
|
||||
loop {
|
||||
let collect_next = async {
|
||||
if collection.is_empty() {
|
||||
futures::future::pending().await
|
||||
} else {
|
||||
collection.next().await
|
||||
}
|
||||
};
|
||||
tokio::select! {
|
||||
result = collect_next => {
|
||||
trace!("HTTP connection finished: {:?}", result);
|
||||
}
|
||||
new_fut = conn_out.recv() => {
|
||||
match new_fut {
|
||||
Some(f) => collection.push(f),
|
||||
None => break,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("Collecting last open HTTP connections.");
|
||||
while let Some(conn_res) = collection.next().await {
|
||||
trace!("HTTP connection finished: {:?}", conn_res);
|
||||
}
|
||||
debug!("No more HTTP connections to collect");
|
||||
});
|
||||
|
||||
loop {
|
||||
let (stream, client_addr) = tokio::select! {
|
||||
acc = listener.accept() => acc?,
|
||||
_ = &mut shutdown_signal => break,
|
||||
};
|
||||
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
let handler = handler.clone();
|
||||
let serve = move |req: Request<IncomingBody>| handler(req, client_addr.clone());
|
||||
|
||||
let fut = tokio::task::spawn(async move {
|
||||
let io = Box::pin(io);
|
||||
if let Err(e) = http1::Builder::new()
|
||||
.serve_connection(io, service_fn(serve))
|
||||
.await
|
||||
{
|
||||
debug!("Error handling HTTP connection: {}", e);
|
||||
}
|
||||
});
|
||||
conn_in.send(fut)?;
|
||||
}
|
||||
|
||||
connection_collector.await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -1,17 +1,8 @@
|
|||
use std::convert::Infallible;
|
||||
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
|
||||
use http_body_util::{BodyExt, Full as FullBody};
|
||||
use hyper::{
|
||||
body::{Body, Bytes},
|
||||
Request, Response,
|
||||
};
|
||||
use hyper::{body::Incoming as IncomingBody, Request, Response};
|
||||
use idna::domain_to_unicode;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_util::error::Error as GarageError;
|
||||
|
||||
use crate::common_error::{CommonError as Error, *};
|
||||
|
||||
/// What kind of authorization is required to perform a given action
|
||||
|
@ -150,62 +141,34 @@ pub fn key_after_prefix(pfx: &str) -> Option<String> {
|
|||
|
||||
// =============== body helpers =================
|
||||
|
||||
pub type EmptyBody = http_body_util::Empty<bytes::Bytes>;
|
||||
pub type ErrorBody = FullBody<bytes::Bytes>;
|
||||
pub type BytesBody = FullBody<bytes::Bytes>;
|
||||
pub type BoxBody<E> = http_body_util::combinators::BoxBody<bytes::Bytes, E>;
|
||||
|
||||
pub fn string_body<E>(s: String) -> BoxBody<E> {
|
||||
bytes_body(bytes::Bytes::from(s.into_bytes()))
|
||||
}
|
||||
pub fn bytes_body<E>(b: bytes::Bytes) -> BoxBody<E> {
|
||||
BoxBody::new(FullBody::new(b).map_err(|_: Infallible| unreachable!()))
|
||||
BoxBody::new(FullBody::new(b).map_err(|_| unreachable!()))
|
||||
}
|
||||
pub fn empty_body<E>() -> BoxBody<E> {
|
||||
BoxBody::new(http_body_util::Empty::new().map_err(|_: Infallible| unreachable!()))
|
||||
}
|
||||
pub fn error_body(s: String) -> ErrorBody {
|
||||
ErrorBody::from(bytes::Bytes::from(s.into_bytes()))
|
||||
BoxBody::new(http_body_util::Empty::new().map_err(|_| unreachable!()))
|
||||
}
|
||||
|
||||
pub async fn parse_json_body<T, B, E>(req: Request<B>) -> Result<T, E>
|
||||
pub async fn parse_json_body<T>(req: Request<IncomingBody>) -> Result<T, Error>
|
||||
where
|
||||
T: for<'de> Deserialize<'de>,
|
||||
B: Body,
|
||||
E: From<<B as Body>::Error> + From<Error>,
|
||||
{
|
||||
let body = req.into_body().collect().await?.to_bytes();
|
||||
let resp: T = serde_json::from_slice(&body).ok_or_bad_request("Invalid JSON")?;
|
||||
Ok(resp)
|
||||
}
|
||||
|
||||
pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, E>
|
||||
where
|
||||
E: From<Error>,
|
||||
{
|
||||
let resp_json = serde_json::to_string_pretty(res)
|
||||
.map_err(GarageError::from)
|
||||
.map_err(Error::from)?;
|
||||
pub fn json_ok_response<E, T: Serialize>(res: &T) -> Result<Response<BoxBody<E>>, Error> {
|
||||
let resp_json = serde_json::to_string_pretty(res).map_err(garage_util::error::Error::from)?;
|
||||
Ok(Response::builder()
|
||||
.status(hyper::StatusCode::OK)
|
||||
.header(http::header::CONTENT_TYPE, "application/json")
|
||||
.body(string_body(resp_json))
|
||||
.unwrap())
|
||||
}
|
||||
|
||||
pub fn body_stream<B, E>(body: B) -> impl Stream<Item = Result<Bytes, E>>
|
||||
where
|
||||
B: Body<Data = Bytes>,
|
||||
<B as Body>::Error: Into<E>,
|
||||
E: From<Error>,
|
||||
{
|
||||
let stream = http_body_util::BodyStream::new(body);
|
||||
let stream = TryStreamExt::map_err(stream, Into::into);
|
||||
stream.map(|x| {
|
||||
x.and_then(|f| {
|
||||
f.into_data()
|
||||
.map_err(|_| E::from(Error::bad_request("non-data frame")))
|
||||
})
|
||||
})
|
||||
.body(string_body(resp_json))?)
|
||||
}
|
||||
|
||||
pub fn is_default<T: Default + PartialEq>(v: &T) -> bool {
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use async_trait::async_trait;
|
||||
|
||||
use futures::future::Future;
|
||||
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
|
||||
use hyper::{Body, Method, Request, Response};
|
||||
|
||||
use opentelemetry::{trace::SpanRef, KeyValue};
|
||||
|
||||
|
@ -25,9 +25,6 @@ use crate::k2v::item::*;
|
|||
use crate::k2v::router::Endpoint;
|
||||
use crate::s3::cors::*;
|
||||
|
||||
pub use crate::signature::streaming::ReqBody;
|
||||
pub type ResBody = BoxBody<Error>;
|
||||
|
||||
pub struct K2VApiServer {
|
||||
garage: Arc<Garage>,
|
||||
}
|
||||
|
@ -58,7 +55,7 @@ impl ApiHandler for K2VApiServer {
|
|||
type Endpoint = K2VApiEndpoint;
|
||||
type Error = Error;
|
||||
|
||||
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<K2VApiEndpoint, Error> {
|
||||
fn parse_endpoint(&self, req: &Request<Body>) -> Result<K2VApiEndpoint, Error> {
|
||||
let (endpoint, bucket_name) = Endpoint::from_request(req)?;
|
||||
|
||||
Ok(K2VApiEndpoint {
|
||||
|
@ -69,9 +66,9 @@ impl ApiHandler for K2VApiServer {
|
|||
|
||||
async fn handle(
|
||||
&self,
|
||||
req: Request<IncomingBody>,
|
||||
req: Request<Body>,
|
||||
endpoint: K2VApiEndpoint,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let K2VApiEndpoint {
|
||||
bucket_name,
|
||||
endpoint,
|
||||
|
@ -80,10 +77,9 @@ impl ApiHandler for K2VApiServer {
|
|||
|
||||
// The OPTIONS method is procesed early, before we even check for an API key
|
||||
if let Endpoint::Options = endpoint {
|
||||
let options_res = handle_options_api(garage, &req, Some(bucket_name))
|
||||
return Ok(handle_options_s3api(garage, &req, Some(bucket_name))
|
||||
.await
|
||||
.ok_or_bad_request("Error handling OPTIONS")?;
|
||||
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
|
||||
.ok_or_bad_request("Error handling OPTIONS")?);
|
||||
}
|
||||
|
||||
let (api_key, mut content_sha256) = check_payload_signature(&garage, "k2v", &req).await?;
|
||||
|
|
|
@ -1,7 +1,7 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use base64::prelude::*;
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use hyper::{Body, Request, Response, StatusCode};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use garage_util::data::*;
|
||||
|
@ -13,16 +13,15 @@ use garage_model::k2v::causality::*;
|
|||
use garage_model::k2v::item_table::*;
|
||||
|
||||
use crate::helpers::*;
|
||||
use crate::k2v::api_server::{ReqBody, ResBody};
|
||||
use crate::k2v::error::*;
|
||||
use crate::k2v::range::read_range;
|
||||
|
||||
pub async fn handle_insert_batch(
|
||||
garage: Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
req: Request<ReqBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let items = parse_json_body::<Vec<InsertBatchItem>, _, Error>(req).await?;
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let items = parse_json_body::<Vec<InsertBatchItem>>(req).await?;
|
||||
|
||||
let mut items2 = vec![];
|
||||
for it in items {
|
||||
|
@ -42,15 +41,15 @@ pub async fn handle_insert_batch(
|
|||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(empty_body())?)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
|
||||
pub async fn handle_read_batch(
|
||||
garage: Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
req: Request<ReqBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let queries = parse_json_body::<Vec<ReadBatchQuery>, _, Error>(req).await?;
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let queries = parse_json_body::<Vec<ReadBatchQuery>>(req).await?;
|
||||
|
||||
let resp_results = futures::future::join_all(
|
||||
queries
|
||||
|
@ -140,9 +139,9 @@ async fn handle_read_batch_query(
|
|||
pub async fn handle_delete_batch(
|
||||
garage: Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
req: Request<ReqBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let queries = parse_json_body::<Vec<DeleteBatchQuery>, _, Error>(req).await?;
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let queries = parse_json_body::<Vec<DeleteBatchQuery>>(req).await?;
|
||||
|
||||
let resp_results = futures::future::join_all(
|
||||
queries
|
||||
|
@ -254,11 +253,11 @@ pub(crate) async fn handle_poll_range(
|
|||
garage: Arc<Garage>,
|
||||
bucket_id: Uuid,
|
||||
partition_key: &str,
|
||||
req: Request<ReqBody>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>, Error> {
|
||||
use garage_model::k2v::sub::PollRange;
|
||||
|
||||
let query = parse_json_body::<PollRangeQuery, _, Error>(req).await?;
|
||||
let query = parse_json_body::<PollRangeQuery>(req).await?;
|
||||
|
||||
let timeout_msec = query.timeout.unwrap_or(300).clamp(1, 600) * 1000;
|
||||
|
||||
|
@ -293,7 +292,7 @@ pub(crate) async fn handle_poll_range(
|
|||
} else {
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_MODIFIED)
|
||||
.body(empty_body())?)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1,11 +1,13 @@
|
|||
use err_derive::Error;
|
||||
use hyper::header::HeaderValue;
|
||||
use hyper::{HeaderMap, StatusCode};
|
||||
use hyper::{Body, HeaderMap, StatusCode};
|
||||
|
||||
use garage_model::helper::error::Error as HelperError;
|
||||
|
||||
use crate::common_error::CommonError;
|
||||
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
|
||||
use crate::generic_server::ApiError;
|
||||
use crate::helpers::*;
|
||||
use crate::helpers::CustomApiErrorBody;
|
||||
use crate::signature::error::Error as SignatureError;
|
||||
|
||||
/// Errors of this crate
|
||||
|
@ -28,6 +30,10 @@ pub enum Error {
|
|||
#[error(display = "Invalid base64: {}", _0)]
|
||||
InvalidBase64(#[error(source)] base64::DecodeError),
|
||||
|
||||
/// The client sent a header with invalid value
|
||||
#[error(display = "Invalid header value: {}", _0)]
|
||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||
|
||||
/// The client asked for an invalid return format (invalid Accept header)
|
||||
#[error(display = "Not acceptable: {}", _0)]
|
||||
NotAcceptable(String),
|
||||
|
@ -48,6 +54,18 @@ where
|
|||
|
||||
impl CommonErrorDerivative for Error {}
|
||||
|
||||
impl From<HelperError> for Error {
|
||||
fn from(err: HelperError) -> Self {
|
||||
match err {
|
||||
HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
|
||||
HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
|
||||
HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
|
||||
HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
|
||||
e => Self::Common(CommonError::BadRequest(format!("{}", e))),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<SignatureError> for Error {
|
||||
fn from(err: SignatureError) -> Self {
|
||||
match err {
|
||||
|
@ -56,6 +74,7 @@ impl From<SignatureError> for Error {
|
|||
Self::AuthorizationHeaderMalformed(c)
|
||||
}
|
||||
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
|
||||
SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -71,6 +90,7 @@ impl Error {
|
|||
Error::NotAcceptable(_) => "NotAcceptable",
|
||||
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
|
||||
Error::InvalidBase64(_) => "InvalidBase64",
|
||||
Error::InvalidHeader(_) => "InvalidHeaderValue",
|
||||
Error::InvalidUtf8Str(_) => "InvalidUtf8String",
|
||||
}
|
||||
}
|
||||
|
@ -85,6 +105,7 @@ impl ApiError for Error {
|
|||
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
|
||||
Error::AuthorizationHeaderMalformed(_)
|
||||
| Error::InvalidBase64(_)
|
||||
| Error::InvalidHeader(_)
|
||||
| Error::InvalidUtf8Str(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
@ -94,14 +115,14 @@ impl ApiError for Error {
|
|||
header_map.append(header::CONTENT_TYPE, "application/json".parse().unwrap());
|
||||
}
|
||||
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> Body {
|
||||
let error = CustomApiErrorBody {
|
||||
code: self.code().to_string(),
|
||||
message: format!("{}", self),
|
||||
path: path.to_string(),
|
||||
region: garage_region.to_string(),
|
||||
};
|
||||
let error_str = serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
|
||||
Body::from(serde_json::to_string_pretty(&error).unwrap_or_else(|_| {
|
||||
r#"
|
||||
{
|
||||
"code": "InternalError",
|
||||
|
@ -109,7 +130,6 @@ impl ApiError for Error {
|
|||
}
|
||||
"#
|
||||
.into()
|
||||
});
|
||||
error_body(error_str)
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use hyper::Response;
|
||||
use hyper::{Body, Response};
|
||||
use serde::Serialize;
|
||||
|
||||
use garage_util::data::*;
|
||||
|
@ -12,7 +12,6 @@ use garage_model::garage::Garage;
|
|||
use garage_model::k2v::item_table::{BYTES, CONFLICTS, ENTRIES, VALUES};
|
||||
|
||||
use crate::helpers::*;
|
||||
use crate::k2v::api_server::ResBody;
|
||||
use crate::k2v::error::*;
|
||||
use crate::k2v::range::read_range;
|
||||
|
||||
|
@ -24,7 +23,7 @@ pub async fn handle_read_index(
|
|||
end: Option<String>,
|
||||
limit: Option<u64>,
|
||||
reverse: Option<bool>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let reverse = reverse.unwrap_or(false);
|
||||
|
||||
let ring: Arc<Ring> = garage.system.ring.borrow().clone();
|
||||
|
@ -69,7 +68,7 @@ pub async fn handle_read_index(
|
|||
next_start,
|
||||
};
|
||||
|
||||
json_ok_response::<Error, _>(&resp)
|
||||
Ok(json_ok_response(&resp)?)
|
||||
}
|
||||
|
||||
#[derive(Serialize)]
|
||||
|
|
|
@ -3,7 +3,7 @@ use std::sync::Arc;
|
|||
use base64::prelude::*;
|
||||
use http::header;
|
||||
|
||||
use hyper::{Request, Response, StatusCode};
|
||||
use hyper::{body::HttpBody, Body, Request, Response, StatusCode};
|
||||
|
||||
use garage_util::data::*;
|
||||
|
||||
|
@ -11,8 +11,6 @@ use garage_model::garage::Garage;
|
|||
use garage_model::k2v::causality::*;
|
||||
use garage_model::k2v::item_table::*;
|
||||
|
||||
use crate::helpers::*;
|
||||
use crate::k2v::api_server::{ReqBody, ResBody};
|
||||
use crate::k2v::error::*;
|
||||
|
||||
pub const X_GARAGE_CAUSALITY_TOKEN: &str = "X-Garage-Causality-Token";
|
||||
|
@ -24,7 +22,7 @@ pub enum ReturnFormat {
|
|||
}
|
||||
|
||||
impl ReturnFormat {
|
||||
pub fn from(req: &Request<ReqBody>) -> Result<Self, Error> {
|
||||
pub fn from(req: &Request<Body>) -> Result<Self, Error> {
|
||||
let accept = match req.headers().get(header::ACCEPT) {
|
||||
Some(a) => a.to_str()?,
|
||||
None => return Ok(Self::Json),
|
||||
|
@ -42,7 +40,7 @@ impl ReturnFormat {
|
|||
}
|
||||
}
|
||||
|
||||
pub fn make_response(&self, item: &K2VItem) -> Result<Response<ResBody>, Error> {
|
||||
pub fn make_response(&self, item: &K2VItem) -> Result<Response<Body>, Error> {
|
||||
let vals = item.values();
|
||||
|
||||
if vals.is_empty() {
|
||||
|
@ -54,7 +52,7 @@ impl ReturnFormat {
|
|||
Self::Binary if vals.len() > 1 => Ok(Response::builder()
|
||||
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||
.status(StatusCode::CONFLICT)
|
||||
.body(empty_body())?),
|
||||
.body(Body::empty())?),
|
||||
Self::Binary => {
|
||||
assert!(vals.len() == 1);
|
||||
Self::make_binary_response(ct, vals[0])
|
||||
|
@ -64,22 +62,22 @@ impl ReturnFormat {
|
|||
}
|
||||
}
|
||||
|
||||
fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<ResBody>, Error> {
|
||||
fn make_binary_response(ct: String, v: &DvvsValue) -> Result<Response<Body>, Error> {
|
||||
match v {
|
||||
DvvsValue::Deleted => Ok(Response::builder()
|
||||
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(empty_body())?),
|
||||
.body(Body::empty())?),
|
||||
DvvsValue::Value(v) => Ok(Response::builder()
|
||||
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||
.header(header::CONTENT_TYPE, "application/octet-stream")
|
||||
.status(StatusCode::OK)
|
||||
.body(bytes_body(v.to_vec().into()))?),
|
||||
.body(Body::from(v.to_vec()))?),
|
||||
}
|
||||
}
|
||||
|
||||
fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<ResBody>, Error> {
|
||||
fn make_json_response(ct: String, v: &[&DvvsValue]) -> Result<Response<Body>, Error> {
|
||||
let items = v
|
||||
.iter()
|
||||
.map(|v| match v {
|
||||
|
@ -93,7 +91,7 @@ impl ReturnFormat {
|
|||
.header(X_GARAGE_CAUSALITY_TOKEN, ct)
|
||||
.header(header::CONTENT_TYPE, "application/json")
|
||||
.status(StatusCode::OK)
|
||||
.body(string_body(json_body))?)
|
||||
.body(Body::from(json_body))?)
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -101,11 +99,11 @@ impl ReturnFormat {
|
|||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn handle_read_item(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<ReqBody>,
|
||||
req: &Request<Body>,
|
||||
bucket_id: Uuid,
|
||||
partition_key: &str,
|
||||
sort_key: &String,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let format = ReturnFormat::from(req)?;
|
||||
|
||||
let item = garage
|
||||
|
@ -126,11 +124,11 @@ pub async fn handle_read_item(
|
|||
|
||||
pub async fn handle_insert_item(
|
||||
garage: Arc<Garage>,
|
||||
req: Request<ReqBody>,
|
||||
req: Request<Body>,
|
||||
bucket_id: Uuid,
|
||||
partition_key: &str,
|
||||
sort_key: &str,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let causal_context = req
|
||||
.headers()
|
||||
.get(X_GARAGE_CAUSALITY_TOKEN)
|
||||
|
@ -139,9 +137,7 @@ pub async fn handle_insert_item(
|
|||
.map(CausalContext::parse_helper)
|
||||
.transpose()?;
|
||||
|
||||
let body = http_body_util::BodyExt::collect(req.into_body())
|
||||
.await?
|
||||
.to_bytes();
|
||||
let body = req.into_body().collect().await?.to_bytes();
|
||||
|
||||
let value = DvvsValue::Value(body.to_vec());
|
||||
|
||||
|
@ -159,16 +155,16 @@ pub async fn handle_insert_item(
|
|||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(empty_body())?)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
|
||||
pub async fn handle_delete_item(
|
||||
garage: Arc<Garage>,
|
||||
req: Request<ReqBody>,
|
||||
req: Request<Body>,
|
||||
bucket_id: Uuid,
|
||||
partition_key: &str,
|
||||
sort_key: &str,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let causal_context = req
|
||||
.headers()
|
||||
.get(X_GARAGE_CAUSALITY_TOKEN)
|
||||
|
@ -193,20 +189,20 @@ pub async fn handle_delete_item(
|
|||
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NO_CONTENT)
|
||||
.body(empty_body())?)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
|
||||
/// Handle ReadItem request
|
||||
#[allow(clippy::ptr_arg)]
|
||||
pub async fn handle_poll_item(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<ReqBody>,
|
||||
req: &Request<Body>,
|
||||
bucket_id: Uuid,
|
||||
partition_key: String,
|
||||
sort_key: String,
|
||||
causality_token: String,
|
||||
timeout_secs: Option<u64>,
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
) -> Result<Response<Body>, Error> {
|
||||
let format = ReturnFormat::from(req)?;
|
||||
|
||||
let causal_context =
|
||||
|
@ -231,6 +227,6 @@ pub async fn handle_poll_item(
|
|||
} else {
|
||||
Ok(Response::builder()
|
||||
.status(StatusCode::NOT_MODIFIED)
|
||||
.body(empty_body())?)
|
||||
.body(Body::empty())?)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -121,8 +121,7 @@ impl ApiHandler for S3ApiServer {
|
|||
return handle_post_object(garage, req, bucket_name.unwrap()).await;
|
||||
}
|
||||
if let Endpoint::Options = endpoint {
|
||||
let options_res = handle_options_api(garage, &req, bucket_name).await?;
|
||||
return Ok(options_res.map(|_empty_body: EmptyBody| empty_body()));
|
||||
return handle_options_s3api(garage, &req, bucket_name).await;
|
||||
}
|
||||
|
||||
let (api_key, mut content_sha256) = check_payload_signature(&garage, "s3", &req).await?;
|
||||
|
|
|
@ -14,7 +14,6 @@ use http_body_util::BodyExt;
|
|||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::common_error::CommonError;
|
||||
use crate::helpers::*;
|
||||
use crate::s3::api_server::{ReqBody, ResBody};
|
||||
use crate::s3::error::*;
|
||||
|
@ -95,11 +94,11 @@ pub async fn handle_put_cors(
|
|||
.body(empty_body())?)
|
||||
}
|
||||
|
||||
pub async fn handle_options_api(
|
||||
pub async fn handle_options_s3api(
|
||||
garage: Arc<Garage>,
|
||||
req: &Request<IncomingBody>,
|
||||
bucket_name: Option<String>,
|
||||
) -> Result<Response<EmptyBody>, CommonError> {
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
// FIXME: CORS rules of buckets with local aliases are
|
||||
// not taken into account.
|
||||
|
||||
|
@ -129,7 +128,7 @@ pub async fn handle_options_api(
|
|||
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.header(ACCESS_CONTROL_ALLOW_METHODS, "*")
|
||||
.status(StatusCode::OK)
|
||||
.body(EmptyBody::new())?)
|
||||
.body(empty_body())?)
|
||||
}
|
||||
} else {
|
||||
// If there is no bucket name in the request,
|
||||
|
@ -139,14 +138,14 @@ pub async fn handle_options_api(
|
|||
.header(ACCESS_CONTROL_ALLOW_ORIGIN, "*")
|
||||
.header(ACCESS_CONTROL_ALLOW_METHODS, "GET")
|
||||
.status(StatusCode::OK)
|
||||
.body(EmptyBody::new())?)
|
||||
.body(empty_body())?)
|
||||
}
|
||||
}
|
||||
|
||||
pub fn handle_options_for_bucket(
|
||||
req: &Request<IncomingBody>,
|
||||
bucket: &Bucket,
|
||||
) -> Result<Response<EmptyBody>, CommonError> {
|
||||
) -> Result<Response<ResBody>, Error> {
|
||||
let origin = req
|
||||
.headers()
|
||||
.get("Origin")
|
||||
|
@ -169,15 +168,13 @@ pub fn handle_options_for_bucket(
|
|||
if let Some(rule) = matching_rule {
|
||||
let mut resp = Response::builder()
|
||||
.status(StatusCode::OK)
|
||||
.body(EmptyBody::new())?;
|
||||
.body(empty_body())?;
|
||||
add_cors_headers(&mut resp, rule).ok_or_internal_error("Invalid CORS configuration")?;
|
||||
return Ok(resp);
|
||||
}
|
||||
}
|
||||
|
||||
Err(CommonError::Forbidden(
|
||||
"This CORS request is not allowed.".into(),
|
||||
))
|
||||
Err(Error::forbidden("This CORS request is not allowed."))
|
||||
}
|
||||
|
||||
pub fn find_matching_cors_rule<'a>(
|
||||
|
@ -219,7 +216,7 @@ where
|
|||
}
|
||||
|
||||
pub fn add_cors_headers(
|
||||
resp: &mut Response<impl Body>,
|
||||
resp: &mut Response<ResBody>,
|
||||
rule: &GarageCorsRule,
|
||||
) -> Result<(), http::header::InvalidHeaderValue> {
|
||||
let h = resp.headers_mut();
|
||||
|
|
|
@ -4,6 +4,8 @@ use err_derive::Error;
|
|||
use hyper::header::HeaderValue;
|
||||
use hyper::{HeaderMap, StatusCode};
|
||||
|
||||
use garage_model::helper::error::Error as HelperError;
|
||||
|
||||
use crate::common_error::CommonError;
|
||||
pub use crate::common_error::{CommonErrorDerivative, OkOrBadRequest, OkOrInternalError};
|
||||
use crate::generic_server::ApiError;
|
||||
|
@ -61,6 +63,10 @@ pub enum Error {
|
|||
#[error(display = "Invalid XML: {}", _0)]
|
||||
InvalidXml(String),
|
||||
|
||||
/// The client sent a header with invalid value
|
||||
#[error(display = "Invalid header value: {}", _0)]
|
||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||
|
||||
/// The client sent a range header with invalid value
|
||||
#[error(display = "Invalid HTTP range: {:?}", _0)]
|
||||
InvalidRange(#[error(from)] (http_range::HttpRangeParseError, u64)),
|
||||
|
@ -81,6 +87,18 @@ where
|
|||
|
||||
impl CommonErrorDerivative for Error {}
|
||||
|
||||
impl From<HelperError> for Error {
|
||||
fn from(err: HelperError) -> Self {
|
||||
match err {
|
||||
HelperError::Internal(i) => Self::Common(CommonError::InternalError(i)),
|
||||
HelperError::BadRequest(b) => Self::Common(CommonError::BadRequest(b)),
|
||||
HelperError::InvalidBucketName(n) => Self::Common(CommonError::InvalidBucketName(n)),
|
||||
HelperError::NoSuchBucket(n) => Self::Common(CommonError::NoSuchBucket(n)),
|
||||
e => Self::bad_request(format!("{}", e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl From<roxmltree::Error> for Error {
|
||||
fn from(err: roxmltree::Error) -> Self {
|
||||
Self::InvalidXml(format!("{}", err))
|
||||
|
@ -101,6 +119,7 @@ impl From<SignatureError> for Error {
|
|||
Self::AuthorizationHeaderMalformed(c)
|
||||
}
|
||||
SignatureError::InvalidUtf8Str(i) => Self::InvalidUtf8Str(i),
|
||||
SignatureError::InvalidHeader(h) => Self::InvalidHeader(h),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -125,7 +144,9 @@ impl Error {
|
|||
Error::NotImplemented(_) => "NotImplemented",
|
||||
Error::InvalidXml(_) => "MalformedXML",
|
||||
Error::InvalidRange(_) => "InvalidRange",
|
||||
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) => "InvalidRequest",
|
||||
Error::InvalidUtf8Str(_) | Error::InvalidUtf8String(_) | Error::InvalidHeader(_) => {
|
||||
"InvalidRequest"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -145,7 +166,8 @@ impl ApiError for Error {
|
|||
| Error::EntityTooSmall
|
||||
| Error::InvalidXml(_)
|
||||
| Error::InvalidUtf8Str(_)
|
||||
| Error::InvalidUtf8String(_) => StatusCode::BAD_REQUEST,
|
||||
| Error::InvalidUtf8String(_)
|
||||
| Error::InvalidHeader(_) => StatusCode::BAD_REQUEST,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -168,7 +190,7 @@ impl ApiError for Error {
|
|||
}
|
||||
}
|
||||
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody {
|
||||
fn http_body(&self, garage_region: &str, path: &str) -> BytesBody {
|
||||
let error = s3_xml::Error {
|
||||
code: s3_xml::Value(self.aws_code().to_string()),
|
||||
message: s3_xml::Value(format!("{}", self)),
|
||||
|
@ -185,6 +207,6 @@ impl ApiError for Error {
|
|||
"#
|
||||
.into()
|
||||
});
|
||||
error_body(error_str)
|
||||
BytesBody::from(bytes::Bytes::from(error_str.into_bytes()))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,8 +11,7 @@ use http::header::{
|
|||
use hyper::{body::Body, Request, Response, StatusCode};
|
||||
use tokio::sync::mpsc;
|
||||
|
||||
use garage_block::manager::BlockStream;
|
||||
use garage_rpc::rpc_helper::OrderTag;
|
||||
use garage_rpc::rpc_helper::{netapp::stream::ByteStream, OrderTag};
|
||||
use garage_table::EmptyKey;
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::OkOrMessage;
|
||||
|
@ -246,7 +245,7 @@ pub async fn handle_get(
|
|||
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
|
||||
}
|
||||
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
||||
let (tx, rx) = mpsc::channel::<BlockStream>(2);
|
||||
let (tx, rx) = mpsc::channel(2);
|
||||
|
||||
let order_stream = OrderTag::stream();
|
||||
let first_block_hash = *first_block_hash;
|
||||
|
@ -284,13 +283,25 @@ pub async fn handle_get(
|
|||
{
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
let _ = tx.send(error_stream_item(e)).await;
|
||||
let err = std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Error while getting object data: {}", e),
|
||||
);
|
||||
let _ = tx
|
||||
.send(Box::pin(stream::once(future::ready(Err(err)))))
|
||||
.await;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
let body = response_body_from_block_stream(rx);
|
||||
Ok(resp_builder.body(body)?)
|
||||
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
|
||||
.flatten()
|
||||
.map(|x| {
|
||||
x.map(hyper::body::Frame::data)
|
||||
.map_err(|e| Error::from(garage_util::error::Error::from(e)))
|
||||
});
|
||||
let body = http_body_util::StreamBody::new(body_stream);
|
||||
Ok(resp_builder.body(ResBody::new(body))?)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -450,75 +461,67 @@ fn body_from_blocks_range(
|
|||
}
|
||||
|
||||
let order_stream = OrderTag::stream();
|
||||
let (tx, rx) = mpsc::channel::<BlockStream>(2);
|
||||
let mut body_stream =
|
||||
futures::stream::iter(blocks)
|
||||
.enumerate()
|
||||
.map(move |(i, (block, block_offset))| {
|
||||
let garage = garage.clone();
|
||||
async move {
|
||||
garage
|
||||
.block_manager
|
||||
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
|
||||
.await
|
||||
.unwrap_or_else(|e| error_stream(i, e))
|
||||
.scan(block_offset, move |chunk_offset, chunk| {
|
||||
let r = match chunk {
|
||||
Ok(chunk_bytes) => {
|
||||
let chunk_len = chunk_bytes.len() as u64;
|
||||
let r = if *chunk_offset >= end {
|
||||
// The current chunk is after the part we want to read.
|
||||
// Returning None here will stop the scan, the rest of the
|
||||
// stream will be ignored
|
||||
None
|
||||
} else if *chunk_offset + chunk_len <= begin {
|
||||
// The current chunk is before the part we want to read.
|
||||
// We return a None that will be removed by the filter_map
|
||||
// below.
|
||||
Some(None)
|
||||
} else {
|
||||
// The chunk has an intersection with the requested range
|
||||
let start_in_chunk = if *chunk_offset > begin {
|
||||
0
|
||||
} else {
|
||||
begin - *chunk_offset
|
||||
};
|
||||
let end_in_chunk = if *chunk_offset + chunk_len < end {
|
||||
chunk_len
|
||||
} else {
|
||||
end - *chunk_offset
|
||||
};
|
||||
Some(Some(Ok(chunk_bytes.slice(
|
||||
start_in_chunk as usize..end_in_chunk as usize,
|
||||
))))
|
||||
};
|
||||
*chunk_offset += chunk_bytes.len() as u64;
|
||||
r
|
||||
}
|
||||
Err(e) => Some(Some(Err(e))),
|
||||
};
|
||||
futures::future::ready(r)
|
||||
})
|
||||
.filter_map(futures::future::ready)
|
||||
}
|
||||
});
|
||||
|
||||
let (tx, rx) = mpsc::channel(2);
|
||||
tokio::spawn(async move {
|
||||
match async {
|
||||
let garage = garage.clone();
|
||||
for (i, (block, block_offset)) in blocks.iter().enumerate() {
|
||||
let block_stream = garage
|
||||
.block_manager
|
||||
.rpc_get_block_streaming(&block.hash, Some(order_stream.order(i as u64)))
|
||||
.await?
|
||||
.scan(*block_offset, move |chunk_offset, chunk| {
|
||||
let r = match chunk {
|
||||
Ok(chunk_bytes) => {
|
||||
let chunk_len = chunk_bytes.len() as u64;
|
||||
let r = if *chunk_offset >= end {
|
||||
// The current chunk is after the part we want to read.
|
||||
// Returning None here will stop the scan, the rest of the
|
||||
// stream will be ignored
|
||||
None
|
||||
} else if *chunk_offset + chunk_len <= begin {
|
||||
// The current chunk is before the part we want to read.
|
||||
// We return a None that will be removed by the filter_map
|
||||
// below.
|
||||
Some(None)
|
||||
} else {
|
||||
// The chunk has an intersection with the requested range
|
||||
let start_in_chunk = if *chunk_offset > begin {
|
||||
0
|
||||
} else {
|
||||
begin - *chunk_offset
|
||||
};
|
||||
let end_in_chunk = if *chunk_offset + chunk_len < end {
|
||||
chunk_len
|
||||
} else {
|
||||
end - *chunk_offset
|
||||
};
|
||||
Some(Some(Ok(chunk_bytes
|
||||
.slice(start_in_chunk as usize..end_in_chunk as usize))))
|
||||
};
|
||||
*chunk_offset += chunk_bytes.len() as u64;
|
||||
r
|
||||
}
|
||||
Err(e) => Some(Some(Err(e))),
|
||||
};
|
||||
futures::future::ready(r)
|
||||
})
|
||||
.filter_map(futures::future::ready);
|
||||
|
||||
let block_stream: BlockStream = Box::pin(block_stream);
|
||||
tx.send(Box::pin(block_stream))
|
||||
.await
|
||||
.ok_or_message("channel closed")?;
|
||||
}
|
||||
|
||||
Ok::<(), Error>(())
|
||||
}
|
||||
.await
|
||||
{
|
||||
Ok(()) => (),
|
||||
Err(e) => {
|
||||
let _ = tx.send(error_stream_item(e)).await;
|
||||
while let Some(item) = body_stream.next().await {
|
||||
if tx.send(item.await).await.is_err() {
|
||||
break; // connection closed by client
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
response_body_from_block_stream(rx)
|
||||
}
|
||||
|
||||
fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
|
||||
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
|
||||
.flatten()
|
||||
.map(|x| {
|
||||
|
@ -528,10 +531,11 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
|
|||
ResBody::new(http_body_util::StreamBody::new(body_stream))
|
||||
}
|
||||
|
||||
fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
|
||||
let err = std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Error while getting object data: {}", e),
|
||||
);
|
||||
Box::pin(stream::once(future::ready(Err(err))))
|
||||
fn error_stream(i: usize, e: garage_util::error::Error) -> ByteStream {
|
||||
Box::pin(futures::stream::once(async move {
|
||||
Err(std::io::Error::new(
|
||||
std::io::ErrorKind::Other,
|
||||
format!("Could not get block {}: {}", i, e),
|
||||
))
|
||||
}))
|
||||
}
|
||||
|
|
|
@ -1,7 +1,8 @@
|
|||
use std::collections::HashMap;
|
||||
use std::sync::Arc;
|
||||
|
||||
use futures::prelude::*;
|
||||
use futures::{prelude::*, TryStreamExt};
|
||||
use http_body_util::BodyStream;
|
||||
use hyper::{Request, Response};
|
||||
use md5::{Digest as Md5Digest, Md5};
|
||||
|
||||
|
@ -88,8 +89,10 @@ pub async fn handle_put_part(
|
|||
// Read first chuck, and at the same time try to get object to see if it exists
|
||||
let key = key.to_string();
|
||||
|
||||
let stream = body_stream(req.into_body());
|
||||
let mut chunker = StreamChunker::new(stream, garage.config.block_size);
|
||||
let body_stream = BodyStream::new(req.into_body())
|
||||
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
|
||||
.map_err(Error::from);
|
||||
let mut chunker = StreamChunker::new(body_stream, garage.config.block_size);
|
||||
|
||||
let ((_, _, mut mpu), first_block) = futures::try_join!(
|
||||
get_upload(&garage, &bucket_id, &key, &upload_id),
|
||||
|
|
|
@ -7,7 +7,8 @@ use std::task::{Context, Poll};
|
|||
use base64::prelude::*;
|
||||
use bytes::Bytes;
|
||||
use chrono::{DateTime, Duration, Utc};
|
||||
use futures::{Stream, StreamExt};
|
||||
use futures::{Stream, StreamExt, TryStreamExt};
|
||||
use http_body_util::BodyStream;
|
||||
use hyper::header::{self, HeaderMap, HeaderName, HeaderValue};
|
||||
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
|
||||
use multer::{Constraints, Multipart, SizeLimit};
|
||||
|
@ -44,8 +45,10 @@ pub async fn handle_post_object(
|
|||
);
|
||||
|
||||
let (head, body) = req.into_parts();
|
||||
let stream = body_stream::<_, Error>(body);
|
||||
let mut multipart = Multipart::with_constraints(stream, boundary, constraints);
|
||||
let body_stream = BodyStream::new(body)
|
||||
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
|
||||
.map_err(Error::from);
|
||||
let mut multipart = Multipart::with_constraints(body_stream, boundary, constraints);
|
||||
|
||||
let mut params = HeaderMap::new();
|
||||
let field = loop {
|
||||
|
|
|
@ -4,12 +4,12 @@ use std::sync::Arc;
|
|||
use base64::prelude::*;
|
||||
use futures::prelude::*;
|
||||
use futures::try_join;
|
||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||
use sha2::Sha256;
|
||||
|
||||
use hyper::body::{Body, Bytes};
|
||||
use http_body_util::BodyStream;
|
||||
use hyper::body::Bytes;
|
||||
use hyper::header::{HeaderMap, HeaderValue};
|
||||
use hyper::{Request, Response};
|
||||
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||
use sha2::Sha256;
|
||||
|
||||
use opentelemetry::{
|
||||
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
|
||||
|
@ -51,12 +51,14 @@ pub async fn handle_put(
|
|||
None => None,
|
||||
};
|
||||
|
||||
let stream = body_stream(req.into_body());
|
||||
let body_stream = BodyStream::new(req.into_body())
|
||||
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
|
||||
.map_err(Error::from);
|
||||
|
||||
save_stream(
|
||||
garage,
|
||||
headers,
|
||||
stream,
|
||||
body_stream,
|
||||
bucket,
|
||||
key,
|
||||
content_md5,
|
||||
|
|
|
@ -18,6 +18,10 @@ pub enum Error {
|
|||
/// The request contained an invalid UTF-8 sequence in its path or in other parameters
|
||||
#[error(display = "Invalid UTF-8: {}", _0)]
|
||||
InvalidUtf8Str(#[error(source)] std::str::Utf8Error),
|
||||
|
||||
/// The client sent a header with invalid value
|
||||
#[error(display = "Invalid header value: {}", _0)]
|
||||
InvalidHeader(#[error(source)] hyper::header::ToStrError),
|
||||
}
|
||||
|
||||
impl<T> From<T> for Error
|
||||
|
|
|
@ -1,6 +1,6 @@
|
|||
use std::collections::HashMap;
|
||||
|
||||
use chrono::{DateTime, Duration, NaiveDateTime, TimeZone, Utc};
|
||||
use chrono::{DateTime, Duration, NaiveDateTime, Utc};
|
||||
use hmac::Mac;
|
||||
use hyper::{body::Incoming as IncomingBody, Method, Request};
|
||||
use sha2::{Digest, Sha256};
|
||||
|
@ -316,7 +316,7 @@ fn canonical_query_string(uri: &hyper::Uri) -> String {
|
|||
pub fn parse_date(date: &str) -> Result<DateTime<Utc>, Error> {
|
||||
let date: NaiveDateTime =
|
||||
NaiveDateTime::parse_from_str(date, LONG_DATETIME).ok_or_bad_request("Invalid date")?;
|
||||
Ok(Utc.from_utc_datetime(&date))
|
||||
Ok(DateTime::from_utc(date, Utc))
|
||||
}
|
||||
|
||||
pub async fn verify_v4(
|
||||
|
|
|
@ -1,11 +1,11 @@
|
|||
use std::pin::Pin;
|
||||
|
||||
use chrono::{DateTime, NaiveDateTime, TimeZone, Utc};
|
||||
use chrono::{DateTime, NaiveDateTime, Utc};
|
||||
use futures::prelude::*;
|
||||
use futures::task;
|
||||
use garage_model::key_table::Key;
|
||||
use hmac::Mac;
|
||||
use http_body_util::StreamBody;
|
||||
use http_body_util::{BodyStream, StreamBody};
|
||||
use hyper::body::{Bytes, Incoming as IncomingBody};
|
||||
use hyper::Request;
|
||||
|
||||
|
@ -44,16 +44,18 @@ pub fn parse_streaming_body(
|
|||
.to_str()?;
|
||||
let date: NaiveDateTime = NaiveDateTime::parse_from_str(date, LONG_DATETIME)
|
||||
.ok_or_bad_request("Invalid date")?;
|
||||
let date: DateTime<Utc> = Utc.from_utc_datetime(&date);
|
||||
let date: DateTime<Utc> = DateTime::from_utc(date, Utc);
|
||||
|
||||
let scope = compute_scope(&date, region, service);
|
||||
let signing_hmac = crate::signature::signing_hmac(&date, secret_key, region, service)
|
||||
.ok_or_internal_error("Unable to build signing HMAC")?;
|
||||
|
||||
Ok(req.map(move |body| {
|
||||
let stream = body_stream::<_, Error>(body);
|
||||
let body_stream = BodyStream::new(body)
|
||||
.map(|x| x.map(|f| f.into_data().unwrap())) //TODO remove unwrap
|
||||
.map_err(Error::from);
|
||||
let signed_payload_stream =
|
||||
SignedPayloadStream::new(stream, signing_hmac, date, &scope, signature)
|
||||
SignedPayloadStream::new(body_stream, signing_hmac, date, &scope, signature)
|
||||
.map(|x| x.map(hyper::body::Frame::data))
|
||||
.map_err(Error::from);
|
||||
ReqBody::new(StreamBody::new(signed_payload_stream))
|
||||
|
|
|
@ -53,9 +53,6 @@ pub const INLINE_THRESHOLD: usize = 3072;
|
|||
// to delete the block locally.
|
||||
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
||||
|
||||
pub type BlockStream =
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
|
||||
|
||||
/// RPC messages used to share blocks of data between nodes
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum BlockRpc {
|
||||
|
@ -327,7 +324,10 @@ impl BlockManager {
|
|||
&self,
|
||||
hash: &Hash,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<BlockStream, Error> {
|
||||
) -> Result<
|
||||
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>,
|
||||
Error,
|
||||
> {
|
||||
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
||||
match header {
|
||||
DataBlockHeader::Plain => Ok(stream),
|
||||
|
|
|
@ -66,9 +66,7 @@ aws-sdk-s3.workspace = true
|
|||
chrono.workspace = true
|
||||
http.workspace = true
|
||||
hmac.workspace = true
|
||||
http-body-util.workspace = true
|
||||
hyper.workspace = true
|
||||
hyper-util.workspace = true
|
||||
mktemp.workspace = true
|
||||
sha2.workspace = true
|
||||
|
||||
|
|
|
@ -113,11 +113,12 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
|||
|
||||
if let Some(web_config) = &config.s3_web {
|
||||
info!("Initializing web server...");
|
||||
let web_server = WebServer::new(garage.clone(), web_config.root_domain.clone());
|
||||
servers.push((
|
||||
"Web",
|
||||
tokio::spawn(web_server.run(
|
||||
tokio::spawn(WebServer::run(
|
||||
garage.clone(),
|
||||
web_config.bind_addr.clone(),
|
||||
web_config.root_domain.clone(),
|
||||
wait_from(watch_cancel.clone()),
|
||||
)),
|
||||
));
|
||||
|
|
|
@ -5,17 +5,12 @@ use std::convert::TryFrom;
|
|||
|
||||
use chrono::{offset::Utc, DateTime};
|
||||
use hmac::{Hmac, Mac};
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::Full as FullBody;
|
||||
use hyper::{Method, Request, Response, Uri};
|
||||
use hyper_util::client::legacy::{connect::HttpConnector, Client};
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
use hyper::client::HttpConnector;
|
||||
use hyper::{Body, Client, Method, Request, Response, Uri};
|
||||
|
||||
use super::garage::{Instance, Key};
|
||||
use garage_api::signature;
|
||||
|
||||
pub type Body = FullBody<hyper::body::Bytes>;
|
||||
|
||||
/// You should ever only use this to send requests AWS sdk won't send,
|
||||
/// like to reproduce behavior of unusual implementations found to be
|
||||
/// problematic.
|
||||
|
@ -24,7 +19,7 @@ pub struct CustomRequester {
|
|||
key: Key,
|
||||
uri: Uri,
|
||||
service: &'static str,
|
||||
client: Client<HttpConnector, Body>,
|
||||
client: Client<HttpConnector>,
|
||||
}
|
||||
|
||||
impl CustomRequester {
|
||||
|
@ -33,7 +28,7 @@ impl CustomRequester {
|
|||
key: key.clone(),
|
||||
uri: instance.s3_uri(),
|
||||
service: "s3",
|
||||
client: Client::builder(TokioExecutor::new()).build_http(),
|
||||
client: Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -42,7 +37,7 @@ impl CustomRequester {
|
|||
key: key.clone(),
|
||||
uri: instance.k2v_uri(),
|
||||
service: "k2v",
|
||||
client: Client::builder(TokioExecutor::new()).build_http(),
|
||||
client: Client::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -144,7 +139,7 @@ impl<'a> RequestBuilder<'a> {
|
|||
self
|
||||
}
|
||||
|
||||
pub async fn send(&mut self) -> Result<Response<Body>, String> {
|
||||
pub async fn send(&mut self) -> hyper::Result<Response<Body>> {
|
||||
// TODO this is a bit incorrect in that path and query params should be url-encoded and
|
||||
// aren't, but this is good enought for now.
|
||||
|
||||
|
@ -247,22 +242,7 @@ impl<'a> RequestBuilder<'a> {
|
|||
.method(self.method.clone())
|
||||
.body(Body::from(body))
|
||||
.unwrap();
|
||||
|
||||
let result = self
|
||||
.requester
|
||||
.client
|
||||
.request(request)
|
||||
.await
|
||||
.map_err(|err| format!("hyper client error: {}", err))?;
|
||||
|
||||
let (head, body) = result.into_parts();
|
||||
let body = Body::new(
|
||||
body.collect()
|
||||
.await
|
||||
.map_err(|err| format!("hyper client error in body.collect: {}", err))?
|
||||
.to_bytes(),
|
||||
);
|
||||
Ok(Response::from_parts(head, body))
|
||||
self.requester.client.request(request).await
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -7,8 +7,7 @@ use base64::prelude::*;
|
|||
use serde_json::json;
|
||||
|
||||
use crate::json_body;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Method, StatusCode};
|
||||
use hyper::{body::HttpBody, Method, StatusCode};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_batch() {
|
||||
|
|
|
@ -7,8 +7,7 @@ use base64::prelude::*;
|
|||
use serde_json::json;
|
||||
|
||||
use crate::json_body;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Method, StatusCode};
|
||||
use hyper::{body::HttpBody, Method, StatusCode};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_items_and_indices() {
|
||||
|
|
|
@ -1,6 +1,5 @@
|
|||
use base64::prelude::*;
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Method, StatusCode};
|
||||
use hyper::{body::HttpBody, Method, StatusCode};
|
||||
use std::time::Duration;
|
||||
|
||||
use assert_json_diff::assert_json_eq;
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use crate::common;
|
||||
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{Method, StatusCode};
|
||||
use hyper::{body::HttpBody, Method, StatusCode};
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_simple() {
|
||||
|
|
|
@ -11,14 +11,9 @@ mod k2v;
|
|||
#[cfg(feature = "k2v")]
|
||||
mod k2v_client;
|
||||
|
||||
use http_body_util::BodyExt;
|
||||
use hyper::{body::Body, Response};
|
||||
use hyper::{body::HttpBody, Body, Response};
|
||||
|
||||
pub async fn json_body<B>(res: Response<B>) -> serde_json::Value
|
||||
where
|
||||
B: Body,
|
||||
<B as Body>::Error: std::fmt::Debug,
|
||||
{
|
||||
pub async fn json_body(res: Response<Body>) -> serde_json::Value {
|
||||
let body = res.into_body().collect().await.unwrap().to_bytes();
|
||||
let res_body: serde_json::Value = serde_json::from_slice(&body).unwrap();
|
||||
res_body
|
||||
|
|
|
@ -8,18 +8,15 @@ use aws_sdk_s3::{
|
|||
types::{CorsConfiguration, CorsRule, ErrorDocument, IndexDocument, WebsiteConfiguration},
|
||||
};
|
||||
use http::{Request, StatusCode};
|
||||
use http_body_util::BodyExt;
|
||||
use http_body_util::Full as FullBody;
|
||||
use hyper::body::Bytes;
|
||||
use hyper_util::client::legacy::Client;
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
use hyper::{
|
||||
body::{Body, HttpBody},
|
||||
Client,
|
||||
};
|
||||
use serde_json::json;
|
||||
|
||||
const BODY: &[u8; 16] = b"<h1>bonjour</h1>";
|
||||
const BODY_ERR: &[u8; 6] = b"erreur";
|
||||
|
||||
pub type Body = FullBody<Bytes>;
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_website() {
|
||||
const BCKT_NAME: &str = "my-website";
|
||||
|
@ -37,14 +34,14 @@ async fn test_website() {
|
|||
.await
|
||||
.unwrap();
|
||||
|
||||
let client = Client::builder(TokioExecutor::new()).build_http();
|
||||
let client = Client::new();
|
||||
|
||||
let req = || {
|
||||
Request::builder()
|
||||
.method("GET")
|
||||
.uri(format!("http://127.0.0.1:{}/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -52,7 +49,7 @@ async fn test_website() {
|
|||
|
||||
assert_eq!(resp.status(), StatusCode::NOT_FOUND);
|
||||
assert_ne!(
|
||||
BodyExt::collect(resp.into_body()).await.unwrap().to_bytes(),
|
||||
resp.into_body().collect().await.unwrap().to_bytes(),
|
||||
BODY.as_ref()
|
||||
); /* check that we do not leak body */
|
||||
|
||||
|
@ -64,7 +61,7 @@ async fn test_website() {
|
|||
ctx.garage.admin_port,
|
||||
BCKT_NAME.to_string()
|
||||
))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -106,7 +103,7 @@ async fn test_website() {
|
|||
"http://127.0.0.1:{0}/check?domain={1}",
|
||||
ctx.garage.admin_port, bname
|
||||
))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -139,7 +136,7 @@ async fn test_website() {
|
|||
ctx.garage.admin_port,
|
||||
BCKT_NAME.to_string()
|
||||
))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -251,7 +248,7 @@ async fn test_website_s3_api() {
|
|||
);
|
||||
}
|
||||
|
||||
let client = Client::builder(TokioExecutor::new()).build_http();
|
||||
let client = Client::new();
|
||||
|
||||
// Test direct requests with CORS
|
||||
{
|
||||
|
@ -260,7 +257,7 @@ async fn test_website_s3_api() {
|
|||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let resp = client.request(req).await.unwrap();
|
||||
|
@ -285,7 +282,7 @@ async fn test_website_s3_api() {
|
|||
ctx.garage.web_port
|
||||
))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let resp = client.request(req).await.unwrap();
|
||||
|
@ -305,7 +302,7 @@ async fn test_website_s3_api() {
|
|||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.header("Access-Control-Request-Method", "PUT")
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let resp = client.request(req).await.unwrap();
|
||||
|
@ -329,7 +326,7 @@ async fn test_website_s3_api() {
|
|||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.header("Access-Control-Request-Method", "DELETE")
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let resp = client.request(req).await.unwrap();
|
||||
|
@ -370,7 +367,7 @@ async fn test_website_s3_api() {
|
|||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.header("Origin", "https://example.com")
|
||||
.header("Access-Control-Request-Method", "PUT")
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let resp = client.request(req).await.unwrap();
|
||||
|
@ -396,7 +393,7 @@ async fn test_website_s3_api() {
|
|||
.method("GET")
|
||||
.uri(format!("http://127.0.0.1:{}/site/", ctx.garage.web_port))
|
||||
.header("Host", format!("{}.web.garage", BCKT_NAME))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap();
|
||||
|
||||
let resp = client.request(req).await.unwrap();
|
||||
|
@ -412,13 +409,13 @@ async fn test_website_s3_api() {
|
|||
async fn test_website_check_domain() {
|
||||
let ctx = common::context();
|
||||
|
||||
let client = Client::builder(TokioExecutor::new()).build_http();
|
||||
let client = Client::new();
|
||||
|
||||
let admin_req = || {
|
||||
Request::builder()
|
||||
.method("GET")
|
||||
.uri(format!("http://127.0.0.1:{}/check", ctx.garage.admin_port))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -442,7 +439,7 @@ async fn test_website_check_domain() {
|
|||
"http://127.0.0.1:{}/check?domain=",
|
||||
ctx.garage.admin_port
|
||||
))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -466,7 +463,7 @@ async fn test_website_check_domain() {
|
|||
"http://127.0.0.1:{}/check?domain=foobar",
|
||||
ctx.garage.admin_port
|
||||
))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
@ -490,7 +487,7 @@ async fn test_website_check_domain() {
|
|||
"http://127.0.0.1:{}/check?domain=%E2%98%B9",
|
||||
ctx.garage.admin_port
|
||||
))
|
||||
.body(Body::new(Bytes::new()))
|
||||
.body(Body::empty())
|
||||
.unwrap()
|
||||
};
|
||||
|
||||
|
|
|
@ -13,13 +13,11 @@ base64.workspace = true
|
|||
sha2.workspace = true
|
||||
hex.workspace = true
|
||||
http.workspace = true
|
||||
http-body-util.workspace = true
|
||||
log.workspace = true
|
||||
aws-sigv4.workspace = true
|
||||
aws-sdk-config.workspace = true
|
||||
percent-encoding.workspace = true
|
||||
hyper = { workspace = true, default-features = false, features = ["http1", "http2"] }
|
||||
hyper-util.workspace = true
|
||||
hyper = { workspace = true, default-features = false, features = ["client", "http1", "http2"] }
|
||||
hyper-rustls.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
|
|
|
@ -22,8 +22,6 @@ pub enum Error {
|
|||
Http(#[from] http::Error),
|
||||
#[error("hyper error: {0}")]
|
||||
Hyper(#[from] hyper::Error),
|
||||
#[error("hyper client error: {0}")]
|
||||
HyperClient(#[from] hyper_util::client::legacy::Error),
|
||||
#[error("invalid header: {0}")]
|
||||
Header(#[from] hyper::header::ToStrError),
|
||||
#[error("deserialization error: {0}")]
|
||||
|
|
|
@ -9,11 +9,9 @@ use percent_encoding::{utf8_percent_encode, AsciiSet, NON_ALPHANUMERIC};
|
|||
use http::header::{ACCEPT, CONTENT_TYPE};
|
||||
use http::status::StatusCode;
|
||||
use http::{HeaderName, HeaderValue, Request};
|
||||
use http_body_util::{BodyExt, Full as FullBody};
|
||||
use hyper::{body::Body as BodyTrait, body::Bytes};
|
||||
use hyper::{body::Bytes, body::HttpBody, Body};
|
||||
use hyper::{client::connect::HttpConnector, Client as HttpClient};
|
||||
use hyper_rustls::HttpsConnector;
|
||||
use hyper_util::client::legacy::{connect::HttpConnector, Client as HttpClient};
|
||||
use hyper_util::rt::TokioExecutor;
|
||||
|
||||
use aws_sdk_config::config::Credentials;
|
||||
use aws_sigv4::http_request::{sign, SignableBody, SignableRequest, SigningSettings};
|
||||
|
@ -26,8 +24,6 @@ mod error;
|
|||
|
||||
pub use error::Error;
|
||||
|
||||
pub type Body = FullBody<Bytes>;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(5);
|
||||
const DEFAULT_POLL_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
const SERVICE: &str = "k2v";
|
||||
|
@ -59,19 +55,19 @@ pub struct K2vClientConfig {
|
|||
pub struct K2vClient {
|
||||
config: K2vClientConfig,
|
||||
user_agent: HeaderValue,
|
||||
client: HttpClient<HttpsConnector<HttpConnector>, Body>,
|
||||
client: HttpClient<HttpsConnector<HttpConnector>>,
|
||||
}
|
||||
|
||||
impl K2vClient {
|
||||
/// Create a new K2V client.
|
||||
pub fn new(config: K2vClientConfig) -> Result<Self, Error> {
|
||||
let connector = hyper_rustls::HttpsConnectorBuilder::new()
|
||||
.with_native_roots()?
|
||||
.with_native_roots()
|
||||
.https_or_http()
|
||||
.enable_http1()
|
||||
.enable_http2()
|
||||
.build();
|
||||
let client = HttpClient::builder(TokioExecutor::new()).build(connector);
|
||||
let client = HttpClient::builder().build(connector);
|
||||
let user_agent: std::borrow::Cow<str> = match &config.user_agent {
|
||||
Some(ua) => ua.into(),
|
||||
None => format!("k2v/{}", env!("CARGO_PKG_VERSION")).into(),
|
||||
|
@ -399,7 +395,7 @@ impl K2vClient {
|
|||
// Sign and then apply the signature to the request
|
||||
let (signing_instructions, _signature) =
|
||||
sign(signable_request, &signing_params)?.into_parts();
|
||||
signing_instructions.apply_to_request_http1x(&mut req);
|
||||
signing_instructions.apply_to_request_http0x(&mut req);
|
||||
|
||||
// Send and wait for timeout
|
||||
let res = tokio::select! {
|
||||
|
@ -420,7 +416,7 @@ impl K2vClient {
|
|||
};
|
||||
|
||||
let body = match res.status {
|
||||
StatusCode::OK => BodyExt::collect(body).await?.to_bytes(),
|
||||
StatusCode::OK => body.collect().await?.to_bytes(),
|
||||
StatusCode::NO_CONTENT => Bytes::new(),
|
||||
StatusCode::NOT_FOUND => return Err(Error::NotFound),
|
||||
StatusCode::NOT_MODIFIED => Bytes::new(),
|
||||
|
|
|
@ -4,12 +4,16 @@ use std::{convert::Infallible, sync::Arc};
|
|||
|
||||
use futures::future::Future;
|
||||
|
||||
use hyper::server::conn::http1;
|
||||
use hyper::{
|
||||
body::Incoming as IncomingBody,
|
||||
header::{HeaderValue, HOST},
|
||||
service::service_fn,
|
||||
Method, Request, Response, StatusCode,
|
||||
};
|
||||
use hyper_util::rt::TokioIo;
|
||||
|
||||
use tokio::io::{AsyncRead, AsyncWrite};
|
||||
use tokio::net::{TcpListener, UnixListener};
|
||||
|
||||
use opentelemetry::{
|
||||
|
@ -21,7 +25,6 @@ use opentelemetry::{
|
|||
|
||||
use crate::error::*;
|
||||
|
||||
use garage_api::generic_server::{server_loop, UnixListenerOn};
|
||||
use garage_api::helpers::*;
|
||||
use garage_api::s3::cors::{add_cors_headers, find_matching_cors_rule, handle_options_for_bucket};
|
||||
use garage_api::s3::error::{
|
||||
|
@ -72,29 +75,35 @@ pub struct WebServer {
|
|||
|
||||
impl WebServer {
|
||||
/// Run a web server
|
||||
pub fn new(garage: Arc<Garage>, root_domain: String) -> Arc<Self> {
|
||||
pub async fn run(
|
||||
garage: Arc<Garage>,
|
||||
bind_addr: UnixOrTCPSocketAddress,
|
||||
root_domain: String,
|
||||
shutdown_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), GarageError> {
|
||||
let metrics = Arc::new(WebMetrics::new());
|
||||
Arc::new(WebServer {
|
||||
let web_server = Arc::new(WebServer {
|
||||
garage,
|
||||
metrics,
|
||||
root_domain,
|
||||
})
|
||||
}
|
||||
});
|
||||
|
||||
pub async fn run(
|
||||
self: Arc<Self>,
|
||||
bind_addr: UnixOrTCPSocketAddress,
|
||||
shutdown_signal: impl Future<Output = ()>,
|
||||
) -> Result<(), GarageError> {
|
||||
info!("Web server listening on {}", bind_addr);
|
||||
|
||||
tokio::pin!(shutdown_signal);
|
||||
|
||||
match bind_addr {
|
||||
UnixOrTCPSocketAddress::TCPSocket(addr) => {
|
||||
let listener = TcpListener::bind(addr).await?;
|
||||
|
||||
let handler =
|
||||
move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
|
||||
server_loop(listener, handler, shutdown_signal).await
|
||||
loop {
|
||||
let (stream, client_addr) = tokio::select! {
|
||||
acc = listener.accept() => acc?,
|
||||
_ = &mut shutdown_signal => break,
|
||||
};
|
||||
|
||||
web_server.launch_handler(stream, client_addr.to_string());
|
||||
}
|
||||
}
|
||||
UnixOrTCPSocketAddress::UnixSocket(ref path) => {
|
||||
if path.exists() {
|
||||
|
@ -102,22 +111,50 @@ impl WebServer {
|
|||
}
|
||||
|
||||
let listener = UnixListener::bind(path)?;
|
||||
let listener = UnixListenerOn(listener, path.display().to_string());
|
||||
|
||||
fs::set_permissions(path, Permissions::from_mode(0o222))?;
|
||||
|
||||
let handler =
|
||||
move |stream, socketaddr| self.clone().handle_request(stream, socketaddr);
|
||||
server_loop(listener, handler, shutdown_signal).await
|
||||
loop {
|
||||
let (stream, _) = tokio::select! {
|
||||
acc = listener.accept() => acc?,
|
||||
_ = &mut shutdown_signal => break,
|
||||
};
|
||||
|
||||
web_server.launch_handler(stream, path.display().to_string());
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn launch_handler<S>(self: &Arc<Self>, stream: S, client_addr: String)
|
||||
where
|
||||
S: AsyncRead + AsyncWrite + Send + Sync + 'static,
|
||||
{
|
||||
let this = self.clone();
|
||||
let io = TokioIo::new(stream);
|
||||
|
||||
let serve = move |req: Request<IncomingBody>| {
|
||||
this.clone().handle_request(req, client_addr.to_string())
|
||||
};
|
||||
|
||||
tokio::task::spawn(async move {
|
||||
let io = Box::pin(io);
|
||||
if let Err(e) = http1::Builder::new()
|
||||
.serve_connection(io, service_fn(serve))
|
||||
.await
|
||||
{
|
||||
debug!("Error handling HTTP connection: {}", e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn handle_request(
|
||||
self: Arc<Self>,
|
||||
req: Request<IncomingBody>,
|
||||
addr: String,
|
||||
) -> Result<Response<BoxBody<Error>>, http::Error> {
|
||||
) -> Result<Response<BoxBody<Error>>, Infallible> {
|
||||
if let Ok(forwarded_for_ip_addr) =
|
||||
forwarded_headers::handle_forwarded_for_headers(req.headers())
|
||||
{
|
||||
|
@ -243,9 +280,7 @@ impl WebServer {
|
|||
);
|
||||
|
||||
let ret_doc = match *req.method() {
|
||||
Method::OPTIONS => handle_options_for_bucket(req, &bucket)
|
||||
.map_err(ApiError::from)
|
||||
.map(|res| res.map(|_empty_body: EmptyBody| empty_body())),
|
||||
Method::OPTIONS => handle_options_for_bucket(req, &bucket),
|
||||
Method::HEAD => handle_head(self.garage.clone(), &req, bucket_id, &key, None).await,
|
||||
Method::GET => handle_get(self.garage.clone(), &req, bucket_id, &key, None).await,
|
||||
_ => Err(ApiError::bad_request("HTTP method not supported")),
|
||||
|
|
Loading…
Reference in a new issue