Background task manager (#332)
All checks were successful
continuous-integration/drone/push Build is passing

- [x] New background worker trait
- [x] Adapt all current workers to use new API
- [x] Command to list currently running workers, and whether they are active, idle, or dead
- [x] Error reporting
- Optimizations
  - [x] Merkle updater: several items per iteration
  - [ ] Use `tokio::task::spawn_blocking` where appropriate so that CPU-intensive tasks don't block other things going on
- scrub:
  - [x] have only one worker with a channel to start/pause/cancel
  - [x] automatic scrub
  - [x] ability to view and change tranquility from CLI
  - [x] persistence of a few info
- [ ] Testing

Co-authored-by: Alex Auvolat <alex@adnab.me>
Reviewed-on: #332
Co-authored-by: Alex <alex@adnab.me>
Co-committed-by: Alex <alex@adnab.me>
This commit is contained in:
Alex 2022-07-08 13:30:26 +02:00
parent aab34bfe54
commit 4f38cadf6e
27 changed files with 2050 additions and 738 deletions

195
Cargo.lock generated
View file

@ -93,6 +93,15 @@ dependencies = [
"winapi",
]
[[package]]
name = "autocfg"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"
dependencies = [
"autocfg 1.1.0",
]
[[package]]
name = "autocfg"
version = "1.1.0"
@ -543,7 +552,7 @@ version = "0.9.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1145cf131a2c6ba0615079ab6a638f7e1973ac9c2634fcbeaaad6114246efe8c"
dependencies = [
"autocfg",
"autocfg 1.1.0",
"cfg-if 1.0.0",
"crossbeam-utils 0.8.8",
"lazy_static",
@ -959,6 +968,7 @@ dependencies = [
"futures",
"futures-util",
"garage_api",
"garage_block",
"garage_db",
"garage_model 0.7.0",
"garage_rpc 0.7.0",
@ -984,6 +994,7 @@ dependencies = [
"sha2",
"static_init",
"structopt",
"timeago",
"tokio",
"toml",
"tracing",
@ -1038,6 +1049,7 @@ dependencies = [
name = "garage_block"
version = "0.7.0"
dependencies = [
"arc-swap",
"async-trait",
"bytes 1.1.0",
"futures",
@ -1065,11 +1077,11 @@ dependencies = [
"err-derive 0.3.1",
"heed",
"hexdump",
"log",
"mktemp",
"pretty_env_logger",
"rusqlite",
"sled",
"tracing",
]
[[package]]
@ -1258,6 +1270,7 @@ dependencies = [
name = "garage_util"
version = "0.7.0"
dependencies = [
"async-trait",
"blake2",
"chrono",
"err-derive 0.3.1",
@ -1629,7 +1642,7 @@ version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "282a6247722caba404c065016bbfa522806e51714c34f5dfc3e4a3a46fcb4223"
dependencies = [
"autocfg",
"autocfg 1.1.0",
"hashbrown",
]
@ -1651,6 +1664,16 @@ dependencies = [
"serde",
]
[[package]]
name = "isolang"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "265ef164908329e47e753c769b14cbb27434abf0c41984dca201484022f09ce5"
dependencies = [
"phf",
"phf_codegen",
]
[[package]]
name = "itertools"
version = "0.4.19"
@ -1714,7 +1737,7 @@ dependencies = [
[[package]]
name = "k2v-client"
version = "0.1.0"
version = "0.0.1"
dependencies = [
"base64",
"clap 3.1.18",
@ -1975,7 +1998,7 @@ version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aa361d4faea93603064a027415f07bd8e1d5c88c9fbf68bf56a285428fd79ce"
dependencies = [
"autocfg",
"autocfg 1.1.0",
]
[[package]]
@ -2137,7 +2160,7 @@ version = "0.1.44"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d2cc698a63b549a70bc047073d2949cce27cd1c7b0a4a862d08a8031bc2801db"
dependencies = [
"autocfg",
"autocfg 1.1.0",
"num-traits",
]
@ -2147,7 +2170,7 @@ version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290"
dependencies = [
"autocfg",
"autocfg 1.1.0",
]
[[package]]
@ -2216,7 +2239,7 @@ version = "0.9.72"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e46109c383602735fa0a2e48dd2b7c892b048e1bf69e5c3b1d804b7d9c203cb"
dependencies = [
"autocfg",
"autocfg 1.1.0",
"cc",
"libc",
"openssl-src",
@ -2386,6 +2409,44 @@ dependencies = [
"indexmap",
]
[[package]]
name = "phf"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"
dependencies = [
"phf_shared",
]
[[package]]
name = "phf_codegen"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"
dependencies = [
"phf_generator",
"phf_shared",
]
[[package]]
name = "phf_generator"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"
dependencies = [
"phf_shared",
"rand 0.6.5",
]
[[package]]
name = "phf_shared"
version = "0.7.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"
dependencies = [
"siphasher",
]
[[package]]
name = "pin-project"
version = "0.4.29"
@ -2640,6 +2701,25 @@ dependencies = [
"winapi",
]
[[package]]
name = "rand"
version = "0.6.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"
dependencies = [
"autocfg 0.1.8",
"libc",
"rand_chacha 0.1.1",
"rand_core 0.4.2",
"rand_hc",
"rand_isaac",
"rand_jitter",
"rand_os",
"rand_pcg",
"rand_xorshift",
"winapi",
]
[[package]]
name = "rand"
version = "0.8.5"
@ -2647,10 +2727,20 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404"
dependencies = [
"libc",
"rand_chacha",
"rand_chacha 0.3.1",
"rand_core 0.6.3",
]
[[package]]
name = "rand_chacha"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"
dependencies = [
"autocfg 0.1.8",
"rand_core 0.3.1",
]
[[package]]
name = "rand_chacha"
version = "0.3.1"
@ -2685,6 +2775,77 @@ dependencies = [
"getrandom",
]
[[package]]
name = "rand_hc"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_isaac"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rand_jitter"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"
dependencies = [
"libc",
"rand_core 0.4.2",
"winapi",
]
[[package]]
name = "rand_os"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"
dependencies = [
"cloudabi",
"fuchsia-cprng",
"libc",
"rand_core 0.4.2",
"rdrand",
"winapi",
]
[[package]]
name = "rand_pcg"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"
dependencies = [
"autocfg 0.1.8",
"rand_core 0.4.2",
]
[[package]]
name = "rand_xorshift"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "rdrand"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"
dependencies = [
"rand_core 0.3.1",
]
[[package]]
name = "redox_syscall"
version = "0.2.11"
@ -3107,6 +3268,12 @@ dependencies = [
"libc",
]
[[package]]
name = "siphasher"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"
[[package]]
name = "slab"
version = "0.4.5"
@ -3355,6 +3522,16 @@ dependencies = [
"num_threads",
]
[[package]]
name = "timeago"
version = "0.3.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6ec32dde57efb15c035ac074118d7f32820451395f28cb0524a01d4e94983b26"
dependencies = [
"chrono",
"isolang",
]
[[package]]
name = "tinyvec"
version = "1.5.1"

254
Cargo.nix
View file

@ -172,6 +172,16 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" = overridableMkRustCrate (profileName: rec {
name = "autocfg";
version = "0.1.8";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "0dde43e75fd43e8a1bf86103336bc699aa8d17ad1be60c76c0bdfd4828e19b78"; };
dependencies = {
autocfg = rustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".autocfg."1.1.0" = overridableMkRustCrate (profileName: rec {
name = "autocfg";
version = "1.1.0";
@ -1364,6 +1374,7 @@ in
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; };
futures_util = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures-util."0.3.21" { inherit profileName; };
garage_api = rustPackages."unknown".garage_api."0.7.0" { inherit profileName; };
garage_block = rustPackages."unknown".garage_block."0.7.0" { inherit profileName; };
garage_db = rustPackages."unknown".garage_db."0.8.0" { inherit profileName; };
garage_model = rustPackages."unknown".garage_model."0.7.0" { inherit profileName; };
garage_rpc = rustPackages."unknown".garage_rpc."0.7.0" { inherit profileName; };
@ -1383,6 +1394,7 @@ in
serde = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.137" { inherit profileName; };
serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; };
structopt = rustPackages."registry+https://github.com/rust-lang/crates.io-index".structopt."0.3.26" { inherit profileName; };
timeago = rustPackages."registry+https://github.com/rust-lang/crates.io-index".timeago."0.3.1" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.17.0" { inherit profileName; };
toml = rustPackages."registry+https://github.com/rust-lang/crates.io-index".toml."0.5.8" { inherit profileName; };
tracing = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; };
@ -1458,6 +1470,7 @@ in
registry = "unknown";
src = fetchCrateLocal (workspaceSrc + "/src/block");
dependencies = {
arc_swap = rustPackages."registry+https://github.com/rust-lang/crates.io-index".arc-swap."1.5.0" { inherit profileName; };
async_trait = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; };
bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bytes."1.1.0" { inherit profileName; };
futures = rustPackages."registry+https://github.com/rust-lang/crates.io-index".futures."0.3.21" { inherit profileName; };
@ -1493,10 +1506,10 @@ in
err_derive = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; };
heed = rustPackages."registry+https://github.com/rust-lang/crates.io-index".heed."0.11.0" { inherit profileName; };
hexdump = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hexdump."0.1.1" { inherit profileName; };
log = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; };
${ if rootFeatures' ? "garage_db" then "pretty_env_logger" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pretty_env_logger."0.4.0" { inherit profileName; };
rusqlite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rusqlite."0.27.0" { inherit profileName; };
sled = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sled."0.34.7" { inherit profileName; };
tracing = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing."0.1.32" { inherit profileName; };
};
devDependencies = {
mktemp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".mktemp."0.4.1" { inherit profileName; };
@ -1717,6 +1730,7 @@ in
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_util") "k2v")
];
dependencies = {
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "async_trait" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".async-trait."0.1.52" { profileName = "__noProfile"; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "blake2" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".blake2."0.9.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "chrono" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "err_derive" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".err-derive."0.3.1" { profileName = "__noProfile"; };
@ -2219,6 +2233,22 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".isolang."1.0.0" = overridableMkRustCrate (profileName: rec {
name = "isolang";
version = "1.0.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "265ef164908329e47e753c769b14cbb27434abf0c41984dca201484022f09ce5"; };
features = builtins.concatLists [
[ "default" ]
];
dependencies = {
phf = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf."0.7.24" { inherit profileName; };
};
buildDependencies = {
phf_codegen = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_codegen."0.7.24" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".itertools."0.4.19" = overridableMkRustCrate (profileName: rec {
name = "itertools";
version = "0.4.19";
@ -3242,6 +3272,48 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".phf."0.7.24" = overridableMkRustCrate (profileName: rec {
name = "phf";
version = "0.7.24";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "b3da44b85f8e8dfaec21adae67f95d93244b2ecf6ad2a692320598dcc8e6dd18"; };
dependencies = {
phf_shared = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".phf_codegen."0.7.24" = overridableMkRustCrate (profileName: rec {
name = "phf_codegen";
version = "0.7.24";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "b03e85129e324ad4166b06b2c7491ae27fe3ec353af72e72cd1654c7225d517e"; };
dependencies = {
phf_generator = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_generator."0.7.24" { inherit profileName; };
phf_shared = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".phf_generator."0.7.24" = overridableMkRustCrate (profileName: rec {
name = "phf_generator";
version = "0.7.24";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "09364cc93c159b8b06b1f4dd8a4398984503483891b0c26b867cf431fb132662"; };
dependencies = {
phf_shared = rustPackages."registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" { inherit profileName; };
rand = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.6.5" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".phf_shared."0.7.24" = overridableMkRustCrate (profileName: rec {
name = "phf_shared";
version = "0.7.24";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "234f71a15de2288bcb7e3b6515828d22af7ec8598ee6d24c3b526fa0a80b67a0"; };
dependencies = {
siphasher = rustPackages."registry+https://github.com/rust-lang/crates.io-index".siphasher."0.2.3" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".pin-project."0.4.29" = overridableMkRustCrate (profileName: rec {
name = "pin-project";
version = "0.4.29";
@ -3568,6 +3640,34 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand."0.6.5" = overridableMkRustCrate (profileName: rec {
name = "rand";
version = "0.6.5";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "6d71dacdc3c88c1fde3885a3be3fbab9f35724e6ce99467f7d9c5026132184ca"; };
features = builtins.concatLists [
[ "alloc" ]
[ "default" ]
[ "rand_os" ]
[ "std" ]
];
dependencies = {
${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
rand_chacha = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_chacha."0.1.1" { inherit profileName; };
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
rand_hc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_hc."0.1.0" { inherit profileName; };
rand_isaac = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_isaac."0.1.1" { inherit profileName; };
rand_jitter = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_jitter."0.1.4" { inherit profileName; };
rand_os = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_os."0.1.3" { inherit profileName; };
rand_pcg = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_pcg."0.1.2" { inherit profileName; };
rand_xorshift = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_xorshift."0.1.1" { inherit profileName; };
${ if hostPlatform.isWindows then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; };
};
buildDependencies = {
autocfg = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" = overridableMkRustCrate (profileName: rec {
name = "rand";
version = "0.8.5";
@ -3590,6 +3690,19 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_chacha."0.1.1" = overridableMkRustCrate (profileName: rec {
name = "rand_chacha";
version = "0.1.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "556d3a1ca6600bfcbab7c7c91ccb085ac7fbbcd70e008a98742e7847f4f7bcef"; };
dependencies = {
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
};
buildDependencies = {
autocfg = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_chacha."0.3.1" = overridableMkRustCrate (profileName: rec {
name = "rand_chacha";
version = "0.3.1";
@ -3644,6 +3757,93 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_hc."0.1.0" = overridableMkRustCrate (profileName: rec {
name = "rand_hc";
version = "0.1.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "7b40677c7be09ae76218dc623efbf7b18e34bced3f38883af07bb75630a21bc4"; };
dependencies = {
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_isaac."0.1.1" = overridableMkRustCrate (profileName: rec {
name = "rand_isaac";
version = "0.1.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "ded997c9d5f13925be2a6fd7e66bf1872597f759fd9dd93513dd7e92e5a5ee08"; };
dependencies = {
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_jitter."0.1.4" = overridableMkRustCrate (profileName: rec {
name = "rand_jitter";
version = "0.1.4";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "1166d5c91dc97b88d1decc3285bb0a99ed84b05cfd0bc2341bdf2d43fc41e39b"; };
features = builtins.concatLists [
[ "std" ]
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "darwin" || hostPlatform.parsed.kernel.name == "ios" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "windows" then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_os."0.1.3" = overridableMkRustCrate (profileName: rec {
name = "rand_os";
version = "0.1.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "7b75f676a1e053fc562eafbb47838d67c84801e38fc1ba459e8f180deabd5071"; };
dependencies = {
${ if hostPlatform.parsed.kernel.name == "cloudabi" then "cloudabi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cloudabi."0.0.3" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "fuchsia" then "fuchsia_cprng" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".fuchsia-cprng."0.1.1" { inherit profileName; };
${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
${ if hostPlatform.parsed.abi.name == "sgx" then "rdrand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rdrand."0.4.0" { inherit profileName; };
${ if hostPlatform.isWindows then "winapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".winapi."0.3.9" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_pcg."0.1.2" = overridableMkRustCrate (profileName: rec {
name = "rand_pcg";
version = "0.1.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "abf9b09b01790cfe0364f52bf32995ea3c39f4d2dd011eac241d2914146d0b44"; };
dependencies = {
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.4.2" { inherit profileName; };
};
buildDependencies = {
autocfg = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".autocfg."0.1.8" { profileName = "__noProfile"; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rand_xorshift."0.1.1" = overridableMkRustCrate (profileName: rec {
name = "rand_xorshift";
version = "0.1.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "cbf7e9e623549b0e21f6e97cf8ecf247c1a8fd2e8a992ae265314300b2455d5c"; };
dependencies = {
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".rdrand."0.4.0" = overridableMkRustCrate (profileName: rec {
name = "rdrand";
version = "0.4.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2"; };
features = builtins.concatLists [
[ "default" ]
[ "std" ]
];
dependencies = {
rand_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand_core."0.3.1" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".redox_syscall."0.2.11" = overridableMkRustCrate (profileName: rec {
name = "redox_syscall";
version = "0.2.11";
@ -3738,7 +3938,7 @@ in
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; };
untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; };
@ -4213,6 +4413,13 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".siphasher."0.2.3" = overridableMkRustCrate (profileName: rec {
name = "siphasher";
version = "0.2.3";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "0b8de496cf83d4ed58b6be86c3a275b8602f6ffe98d3024a869e124147a9a3ac"; };
});
"registry+https://github.com/rust-lang/crates.io-index".slab."0.4.5" = overridableMkRustCrate (profileName: rec {
name = "slab";
version = "0.4.5";
@ -4327,7 +4534,7 @@ in
];
dependencies = {
bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; };
static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; };
@ -4410,7 +4617,7 @@ in
[ "proc-macro" ]
[ "quote" ]
[ "visit" ]
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "visit-mut")
[ "visit-mut" ]
];
dependencies = {
proc_macro2 = rustPackages."registry+https://github.com/rust-lang/crates.io-index".proc-macro2."1.0.36" { inherit profileName; };
@ -4539,6 +4746,23 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".timeago."0.3.1" = overridableMkRustCrate (profileName: rec {
name = "timeago";
version = "0.3.1";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "6ec32dde57efb15c035ac074118d7f32820451395f28cb0524a01d4e94983b26"; };
features = builtins.concatLists [
[ "chrono" ]
[ "default" ]
[ "isolang" ]
[ "translations" ]
];
dependencies = {
chrono = rustPackages."registry+https://github.com/rust-lang/crates.io-index".chrono."0.4.19" { inherit profileName; };
isolang = rustPackages."registry+https://github.com/rust-lang/crates.io-index".isolang."1.0.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".tinyvec."1.5.1" = overridableMkRustCrate (profileName: rec {
name = "tinyvec";
version = "1.5.1";
@ -4883,19 +5107,19 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "4a1bdf54a7c28a2bbf701e1d2233f6c77f473486b94bee4f9678da5a148dca7f"; };
features = builtins.concatLists [
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "attributes")
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default")
[ "attributes" ]
[ "default" ]
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web") "log")
(lib.optional (rootFeatures' ? "garage") "log-always")
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "std")
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "tracing-attributes")
[ "std" ]
[ "tracing-attributes" ]
];
dependencies = {
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "cfg_if" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
cfg_if = rustPackages."registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_web" then "log" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".log."0.4.16" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "pin_project_lite" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tracing_attributes" else null } = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-attributes."0.1.20" { profileName = "__noProfile"; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "tracing_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-core."0.1.23" { inherit profileName; };
pin_project_lite = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pin-project-lite."0.2.8" { inherit profileName; };
tracing_attributes = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-attributes."0.1.20" { profileName = "__noProfile"; };
tracing_core = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tracing-core."0.1.23" { inherit profileName; };
};
});
@ -5330,10 +5554,10 @@ in
[ "default" ]
];
dependencies = {
${ if hostPlatform.config == "aarch64-uwp-windows-msvc" || hostPlatform.config == "aarch64-pc-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "aarch64-pc-windows-msvc" || hostPlatform.config == "aarch64-uwp-windows-msvc" then "windows_aarch64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_aarch64_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-uwp-windows-gnu" || hostPlatform.config == "i686-pc-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-uwp-windows-msvc" || hostPlatform.config == "i686-pc-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-pc-windows-gnu" || hostPlatform.config == "x86_64-uwp-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
};
});

View file

@ -1,13 +1,27 @@
.PHONY: doc all release shell
.PHONY: doc all release shell run1 run2 run3
all:
clear; cargo build --all-features
doc:
cd doc/book; mdbook build
release:
nix-build --arg release true
shell:
nix-shell
# ----
run1:
RUST_LOG=garage=debug ./target/debug/garage -c tmp/config1.toml server
run1rel:
RUST_LOG=garage=debug ./target/release/garage -c tmp/config1.toml server
run2:
RUST_LOG=garage=debug ./target/debug/garage -c tmp/config2.toml server
run2rel:
RUST_LOG=garage=debug ./target/release/garage -c tmp/config2.toml server
run3:
RUST_LOG=garage=debug ./target/debug/garage -c tmp/config3.toml server
run3rel:
RUST_LOG=garage=debug ./target/release/garage -c tmp/config3.toml server

View file

@ -21,6 +21,7 @@ garage_table = { version = "0.7.0", path = "../table" }
opentelemetry = "0.17"
arc-swap = "1.5"
async-trait = "0.1.7"
bytes = "1.0"
hex = "0.4"

View file

@ -2,6 +2,7 @@
extern crate tracing;
pub mod manager;
pub mod repair;
mod block;
mod metrics;

View file

@ -1,18 +1,17 @@
use core::ops::Bound;
use std::convert::TryInto;
use std::path::{Path, PathBuf};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use futures::future::*;
use futures::select;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use tokio::select;
use tokio::sync::{mpsc, watch, Mutex, Notify};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@ -22,6 +21,7 @@ use opentelemetry::{
use garage_db as db;
use garage_db::counted_tree_hack::CountedTree;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::metrics::RecordDuration;
@ -36,6 +36,7 @@ use garage_table::replication::{TableReplication, TableShardedReplication};
use crate::block::*;
use crate::metrics::*;
use crate::rc::*;
use crate::repair::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
@ -93,16 +94,18 @@ pub struct BlockManager {
mutation_lock: Mutex<BlockManagerLocked>,
rc: BlockRc,
pub(crate) rc: BlockRc,
resync_queue: CountedTree,
resync_notify: Notify,
resync_errors: CountedTree,
system: Arc<System>,
pub(crate) system: Arc<System>,
endpoint: Arc<Endpoint<BlockRpc, Self>>,
metrics: BlockManagerMetrics,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
// This custom struct contains functions that must only be ran
@ -110,6 +113,12 @@ pub struct BlockManager {
// it INSIDE a Mutex.
struct BlockManagerLocked();
enum ResyncIterResult {
BusyDidSomething,
BusyDidNothing,
IdleFor(Duration),
}
impl BlockManager {
pub fn new(
db: &db::Db,
@ -157,10 +166,11 @@ impl BlockManager {
system,
endpoint,
metrics,
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
block_manager.clone().spawn_background_worker();
block_manager.clone().spawn_background_workers();
block_manager
}
@ -218,90 +228,6 @@ impl BlockManager {
Ok(())
}
/// Launch the repair procedure on the data store
///
/// This will list all blocks locally present, as well as those
/// that are required because of refcount > 0, and will try
/// to fix any mismatch between the two.
pub async fn repair_data_store(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
// 1. Repair blocks from RC table.
let mut next_start: Option<Hash> = None;
loop {
// We have to do this complicated two-step process where we first read a bunch
// of hashes from the RC table, and then insert them in the to-resync queue,
// because of SQLite. Basically, as long as we have an iterator on a DB table,
// we can't do anything else on the DB. The naive approach (which we had previously)
// of just iterating on the RC table and inserting items one to one in the resync
// queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
// This is mostly because the Rust bindings for SQLite assume a worst-case scenario
// where SQLite is not compiled in thread-safe mode, so we have to wrap everything
// in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
let mut batch_of_hashes = vec![];
let start_bound = match next_start.as_ref() {
None => Bound::Unbounded,
Some(x) => Bound::Excluded(x.as_slice()),
};
for entry in self
.rc
.rc
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
batch_of_hashes.push(hash);
if batch_of_hashes.len() >= 1000 {
break;
}
}
if batch_of_hashes.is_empty() {
break;
}
for hash in batch_of_hashes.into_iter() {
self.put_to_resync(&hash, Duration::from_secs(0))?;
next_start = Some(hash)
}
if *must_exit.borrow() {
return Ok(());
}
}
// 2. Repair blocks actually on disk
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
self.for_each_file(
(),
move |_, hash| async move {
self.put_to_resync(&hash, Duration::from_secs(0))
.map_err(Into::into)
},
must_exit,
)
.await
}
/// Verify integrity of each block on disk. Use `speed_limit` to limit the load generated by
/// this function.
pub async fn scrub_data_store(
&self,
must_exit: &watch::Receiver<bool>,
tranquility: u32,
) -> Result<(), Error> {
let tranquilizer = Tranquilizer::new(30);
self.for_each_file(
tranquilizer,
move |mut tranquilizer, hash| async move {
let _ = self.read_block(&hash).await;
tranquilizer.tranquilize(tranquility).await;
Ok(tranquilizer)
},
must_exit,
)
.await
}
/// Get lenght of resync queue
pub fn resync_queue_len(&self) -> Result<usize, Error> {
// This currently can't return an error because the CountedTree hack
@ -321,6 +247,17 @@ impl BlockManager {
Ok(self.rc.rc.len()?)
}
/// Send command to start/stop/manager scrub worker
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
let _ = self
.tx_scrub_command
.load()
.as_ref()
.unwrap()
.send(cmd)
.await;
}
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is
@ -390,7 +327,7 @@ impl BlockManager {
}
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let data = self
.read_block_internal(hash)
.bound_record_duration(&self.metrics.block_read_duration)
@ -554,18 +491,23 @@ impl BlockManager {
// for times that are earlier than the exponential back-off delay
// is a natural condition that is handled properly).
fn spawn_background_worker(self: Arc<Self>) {
fn spawn_background_workers(self: Arc<Self>) {
// Launch a background workers for background resync loop processing
let background = self.system.background.clone();
let worker = ResyncWorker::new(self.clone());
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(10)).await;
background.spawn_worker("block resync worker".into(), move |must_exit| {
self.resync_loop(must_exit)
});
background.spawn_worker(worker);
});
// Launch a background worker for data store scrubs
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
let scrub_worker = ScrubWorker::new(self.clone(), scrub_rx);
self.system.background.spawn_worker(scrub_worker);
}
fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
pub(crate) fn put_to_resync(&self, hash: &Hash, delay: Duration) -> db::Result<()> {
let when = now_msec() + delay.as_millis() as u64;
self.put_to_resync_at(hash, when)
}
@ -579,37 +521,7 @@ impl BlockManager {
Ok(())
}
async fn resync_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
let mut tranquilizer = Tranquilizer::new(30);
while !*must_exit.borrow() {
match self.resync_iter(&mut must_exit).await {
Ok(true) => {
tranquilizer.tranquilize(self.background_tranquility).await;
}
Ok(false) => {
tranquilizer.reset();
}
Err(e) => {
// The errors that we have here are only Sled errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
// (there is kind of an assumption that Sled won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
error!(
"Could not do a resync iteration: {} (this is a very bad error)",
e
);
tranquilizer.reset();
}
}
}
}
// The result of resync_iter is:
// - Ok(true) -> a block was processed (successfully or not)
// - Ok(false) -> no block was processed, but we are ready for the next iteration
// - Err(_) -> a Sled error occurred when reading/writing from resync_queue/resync_errors
async fn resync_iter(&self, must_exit: &mut watch::Receiver<bool>) -> Result<bool, db::Error> {
async fn resync_iter(&self) -> Result<ResyncIterResult, db::Error> {
if let Some((time_bytes, hash_bytes)) = self.resync_queue.first()? {
let time_msec = u64::from_be_bytes(time_bytes[0..8].try_into().unwrap());
let now = now_msec();
@ -629,7 +541,7 @@ impl BlockManager {
// (we want to do the remove after the insert to ensure
// that the item is not lost if we crash in-between)
self.resync_queue.remove(time_bytes)?;
return Ok(false);
return Ok(ResyncIterResult::BusyDidNothing);
}
}
@ -676,15 +588,11 @@ impl BlockManager {
self.resync_queue.remove(time_bytes)?;
}
Ok(true)
Ok(ResyncIterResult::BusyDidSomething)
} else {
let delay = tokio::time::sleep(Duration::from_millis(time_msec - now));
select! {
_ = delay.fuse() => {},
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
Ok(false)
Ok(ResyncIterResult::IdleFor(Duration::from_millis(
time_msec - now,
)))
}
} else {
// Here we wait either for a notification that an item has been
@ -693,13 +601,7 @@ impl BlockManager {
// between the time we checked the queue and the first poll
// to resync_notify.notified(): if that happens, we'll just loop
// back 10 seconds later, which is fine.
let delay = tokio::time::sleep(Duration::from_secs(10));
select! {
_ = delay.fuse() => {},
_ = self.resync_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
Ok(false)
Ok(ResyncIterResult::IdleFor(Duration::from_secs(10)))
}
}
@ -814,72 +716,6 @@ impl BlockManager {
Ok(())
}
// ---- Utility: iteration on files in the data directory ----
async fn for_each_file<F, Fut, State>(
&self,
state: State,
mut f: F,
must_exit: &watch::Receiver<bool>,
) -> Result<(), Error>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send,
{
self.for_each_file_rec(&self.data_dir, state, &mut f, must_exit)
.await
.map(|_| ())
}
fn for_each_file_rec<'a, F, Fut, State>(
&'a self,
path: &'a Path,
mut state: State,
f: &'a mut F,
must_exit: &'a watch::Receiver<bool>,
) -> BoxFuture<'a, Result<State, Error>>
where
F: FnMut(State, Hash) -> Fut + Send,
Fut: Future<Output = Result<State, Error>> + Send,
State: Send + 'a,
{
async move {
let mut ls_data_dir = fs::read_dir(path).await?;
while let Some(data_dir_ent) = ls_data_dir.next_entry().await? {
if *must_exit.borrow() {
break;
}
let name = data_dir_ent.file_name();
let name = if let Ok(n) = name.into_string() {
n
} else {
continue;
};
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
state = self
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
.await?;
} else if name.len() == 64 {
let hash_bytes = if let Ok(h) = hex::decode(&name) {
h
} else {
continue;
};
let mut hash = [0u8; 32];
hash.copy_from_slice(&hash_bytes[..]);
state = f(state, hash.into()).await?;
}
}
Ok(state)
}
.boxed()
}
}
#[async_trait]
@ -898,6 +734,77 @@ impl EndpointHandler<BlockRpc> for BlockManager {
}
}
struct ResyncWorker {
manager: Arc<BlockManager>,
tranquilizer: Tranquilizer,
next_delay: Duration,
}
impl ResyncWorker {
fn new(manager: Arc<BlockManager>) -> Self {
Self {
manager,
tranquilizer: Tranquilizer::new(30),
next_delay: Duration::from_secs(10),
}
}
}
#[async_trait]
impl Worker for ResyncWorker {
fn name(&self) -> String {
"Block resync worker".into()
}
fn info(&self) -> Option<String> {
let mut ret = vec![];
let qlen = self.manager.resync_queue_len().unwrap_or(0);
let elen = self.manager.resync_errors_len().unwrap_or(0);
if qlen > 0 {
ret.push(format!("{} blocks in queue", qlen));
}
if elen > 0 {
ret.push(format!("{} blocks in error state", elen));
}
if !ret.is_empty() {
Some(ret.join(", "))
} else {
None
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
self.tranquilizer.reset();
match self.manager.resync_iter().await {
Ok(ResyncIterResult::BusyDidSomething) => Ok(self
.tranquilizer
.tranquilize_worker(self.manager.background_tranquility)),
Ok(ResyncIterResult::BusyDidNothing) => Ok(WorkerState::Busy),
Ok(ResyncIterResult::IdleFor(delay)) => {
self.next_delay = delay;
Ok(WorkerState::Idle)
}
Err(e) => {
// The errors that we have here are only Sled errors
// We don't really know how to handle them so just ¯\_(ツ)_/¯
// (there is kind of an assumption that Sled won't error on us,
// if it does there is not much we can do -- TODO should we just panic?)
// Here we just give the error to the worker manager,
// it will print it to the logs and increment a counter
Err(e.into())
}
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
select! {
_ = tokio::time::sleep(self.next_delay) => (),
_ = self.manager.resync_notify.notified() => (),
};
WorkerState::Busy
}
}
struct BlockStatus {
exists: bool,
needed: RcEntry,

444
src/block/repair.rs Normal file
View file

@ -0,0 +1,444 @@
use core::ops::Bound;
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::select;
use tokio::sync::mpsc;
use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::persister::Persister;
use garage_util::time::*;
use garage_util::tranquilizer::Tranquilizer;
use crate::manager::*;
const SCRUB_INTERVAL: Duration = Duration::from_secs(3600 * 24 * 30); // full scrub every 30 days
pub struct RepairWorker {
manager: Arc<BlockManager>,
next_start: Option<Hash>,
block_iter: Option<BlockStoreIterator>,
}
impl RepairWorker {
pub fn new(manager: Arc<BlockManager>) -> Self {
Self {
manager,
next_start: None,
block_iter: None,
}
}
}
#[async_trait]
impl Worker for RepairWorker {
fn name(&self) -> String {
"Block repair worker".into()
}
fn info(&self) -> Option<String> {
match self.block_iter.as_ref() {
None => {
let idx_bytes = self
.next_start
.as_ref()
.map(|x| x.as_slice())
.unwrap_or(&[]);
let idx_bytes = if idx_bytes.len() > 4 {
&idx_bytes[..4]
} else {
idx_bytes
};
Some(format!("Phase 1: {}", hex::encode(idx_bytes)))
}
Some(bi) => Some(format!("Phase 2: {:.2}% done", bi.progress() * 100.)),
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.block_iter.as_mut() {
None => {
// Phase 1: Repair blocks from RC table.
// We have to do this complicated two-step process where we first read a bunch
// of hashes from the RC table, and then insert them in the to-resync queue,
// because of SQLite. Basically, as long as we have an iterator on a DB table,
// we can't do anything else on the DB. The naive approach (which we had previously)
// of just iterating on the RC table and inserting items one to one in the resync
// queue can't work here, it would just provoke a deadlock in the SQLite adapter code.
// This is mostly because the Rust bindings for SQLite assume a worst-case scenario
// where SQLite is not compiled in thread-safe mode, so we have to wrap everything
// in a mutex (see db/sqlite_adapter.rs and discussion in PR #322).
// TODO: maybe do this with tokio::task::spawn_blocking ?
let mut batch_of_hashes = vec![];
let start_bound = match self.next_start.as_ref() {
None => Bound::Unbounded,
Some(x) => Bound::Excluded(x.as_slice()),
};
for entry in self
.manager
.rc
.rc
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
{
let (hash, _) = entry?;
let hash = Hash::try_from(&hash[..]).unwrap();
batch_of_hashes.push(hash);
if batch_of_hashes.len() >= 1000 {
break;
}
}
if batch_of_hashes.is_empty() {
// move on to phase 2
self.block_iter = Some(BlockStoreIterator::new(&self.manager));
return Ok(WorkerState::Busy);
}
for hash in batch_of_hashes.into_iter() {
self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
self.next_start = Some(hash)
}
Ok(WorkerState::Busy)
}
Some(bi) => {
// Phase 2: Repair blocks actually on disk
// Lists all blocks on disk and adds them to the resync queue.
// This allows us to find blocks we are storing but don't actually need,
// so that we can offload them if necessary and then delete them locally.
if let Some(hash) = bi.next().await? {
self.manager.put_to_resync(&hash, Duration::from_secs(0))?;
Ok(WorkerState::Busy)
} else {
Ok(WorkerState::Done)
}
}
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
unreachable!()
}
}
// ----
pub struct ScrubWorker {
manager: Arc<BlockManager>,
rx_cmd: mpsc::Receiver<ScrubWorkerCommand>,
work: ScrubWorkerState,
tranquilizer: Tranquilizer,
persister: Persister<ScrubWorkerPersisted>,
persisted: ScrubWorkerPersisted,
}
#[derive(Serialize, Deserialize)]
struct ScrubWorkerPersisted {
tranquility: u32,
time_last_complete_scrub: u64,
corruptions_detected: u64,
}
enum ScrubWorkerState {
Running(BlockStoreIterator),
Paused(BlockStoreIterator, u64), // u64 = time when to resume scrub
Finished,
}
impl Default for ScrubWorkerState {
fn default() -> Self {
ScrubWorkerState::Finished
}
}
#[derive(Debug)]
pub enum ScrubWorkerCommand {
Start,
Pause(Duration),
Resume,
Cancel,
SetTranquility(u32),
}
impl ScrubWorker {
pub fn new(manager: Arc<BlockManager>, rx_cmd: mpsc::Receiver<ScrubWorkerCommand>) -> Self {
let persister = Persister::new(&manager.system.metadata_dir, "scrub_info");
let persisted = match persister.load() {
Ok(v) => v,
Err(_) => ScrubWorkerPersisted {
time_last_complete_scrub: 0,
tranquility: 4,
corruptions_detected: 0,
},
};
Self {
manager,
rx_cmd,
work: ScrubWorkerState::Finished,
tranquilizer: Tranquilizer::new(30),
persister,
persisted,
}
}
async fn handle_cmd(&mut self, cmd: ScrubWorkerCommand) {
match cmd {
ScrubWorkerCommand::Start => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Finished => {
let iterator = BlockStoreIterator::new(&self.manager);
ScrubWorkerState::Running(iterator)
}
work => {
error!("Cannot start scrub worker: already running!");
work
}
};
}
ScrubWorkerCommand::Pause(dur) => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Running(it) | ScrubWorkerState::Paused(it, _) => {
ScrubWorkerState::Paused(it, now_msec() + dur.as_millis() as u64)
}
work => {
error!("Cannot pause scrub worker: not running!");
work
}
};
}
ScrubWorkerCommand::Resume => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Paused(it, _) => ScrubWorkerState::Running(it),
work => {
error!("Cannot resume scrub worker: not paused!");
work
}
};
}
ScrubWorkerCommand::Cancel => {
self.work = match std::mem::take(&mut self.work) {
ScrubWorkerState::Running(_) | ScrubWorkerState::Paused(_, _) => {
ScrubWorkerState::Finished
}
work => {
error!("Cannot cancel scrub worker: not running!");
work
}
}
}
ScrubWorkerCommand::SetTranquility(t) => {
self.persisted.tranquility = t;
if let Err(e) = self.persister.save_async(&self.persisted).await {
error!("Could not save new tranquilitiy value: {}", e);
}
}
}
}
}
#[async_trait]
impl Worker for ScrubWorker {
fn name(&self) -> String {
"Block scrub worker".into()
}
fn info(&self) -> Option<String> {
let s = match &self.work {
ScrubWorkerState::Running(bsi) => format!(
"{:.2}% done (tranquility = {})",
bsi.progress() * 100.,
self.persisted.tranquility
),
ScrubWorkerState::Paused(bsi, rt) => {
format!(
"Paused, {:.2}% done, resumes at {}",
bsi.progress() * 100.,
msec_to_rfc3339(*rt)
)
}
ScrubWorkerState::Finished => format!(
"Last completed scrub: {}",
msec_to_rfc3339(self.persisted.time_last_complete_scrub)
),
};
Some(format!(
"{} ; corruptions detected: {}",
s, self.persisted.corruptions_detected
))
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.rx_cmd.try_recv() {
Ok(cmd) => self.handle_cmd(cmd).await,
Err(mpsc::error::TryRecvError::Disconnected) => return Ok(WorkerState::Done),
Err(mpsc::error::TryRecvError::Empty) => (),
};
match &mut self.work {
ScrubWorkerState::Running(bsi) => {
self.tranquilizer.reset();
if let Some(hash) = bsi.next().await? {
match self.manager.read_block(&hash).await {
Err(Error::CorruptData(_)) => {
error!("Found corrupt data block during scrub: {:?}", hash);
self.persisted.corruptions_detected += 1;
self.persister.save_async(&self.persisted).await?;
}
Err(e) => return Err(e),
_ => (),
};
Ok(self
.tranquilizer
.tranquilize_worker(self.persisted.tranquility))
} else {
self.persisted.time_last_complete_scrub = now_msec();
self.persister.save_async(&self.persisted).await?;
self.work = ScrubWorkerState::Finished;
self.tranquilizer.clear();
Ok(WorkerState::Idle)
}
}
_ => Ok(WorkerState::Idle),
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
let (wait_until, command) = match &self.work {
ScrubWorkerState::Running(_) => return WorkerState::Busy,
ScrubWorkerState::Paused(_, resume_time) => (*resume_time, ScrubWorkerCommand::Resume),
ScrubWorkerState::Finished => (
self.persisted.time_last_complete_scrub + SCRUB_INTERVAL.as_millis() as u64,
ScrubWorkerCommand::Start,
),
};
let now = now_msec();
if now >= wait_until {
self.handle_cmd(command).await;
return WorkerState::Busy;
}
let delay = Duration::from_millis(wait_until - now);
select! {
_ = tokio::time::sleep(delay) => self.handle_cmd(command).await,
cmd = self.rx_cmd.recv() => if let Some(cmd) = cmd {
self.handle_cmd(cmd).await;
} else {
return WorkerState::Done;
}
}
match &self.work {
ScrubWorkerState::Running(_) => WorkerState::Busy,
_ => WorkerState::Idle,
}
}
}
// ----
struct BlockStoreIterator {
path: Vec<ReadingDir>,
}
enum ReadingDir {
Pending(PathBuf),
Read {
subpaths: Vec<fs::DirEntry>,
pos: usize,
},
}
impl BlockStoreIterator {
fn new(manager: &BlockManager) -> Self {
let root_dir = manager.data_dir.clone();
Self {
path: vec![ReadingDir::Pending(root_dir)],
}
}
/// Returns progress done, between 0 and 1
fn progress(&self) -> f32 {
if self.path.is_empty() {
1.0
} else {
let mut ret = 0.0;
let mut next_div = 1;
for p in self.path.iter() {
match p {
ReadingDir::Pending(_) => break,
ReadingDir::Read { subpaths, pos } => {
next_div *= subpaths.len();
ret += ((*pos - 1) as f32) / (next_div as f32);
}
}
}
ret
}
}
async fn next(&mut self) -> Result<Option<Hash>, Error> {
loop {
let last_path = match self.path.last_mut() {
None => return Ok(None),
Some(lp) => lp,
};
if let ReadingDir::Pending(path) = last_path {
let mut reader = fs::read_dir(&path).await?;
let mut subpaths = vec![];
while let Some(ent) = reader.next_entry().await? {
subpaths.push(ent);
}
*last_path = ReadingDir::Read { subpaths, pos: 0 };
}
let (subpaths, pos) = match *last_path {
ReadingDir::Read {
ref subpaths,
ref mut pos,
} => (subpaths, pos),
ReadingDir::Pending(_) => unreachable!(),
};
let data_dir_ent = match subpaths.get(*pos) {
None => {
self.path.pop();
continue;
}
Some(ent) => {
*pos += 1;
ent
}
};
let name = data_dir_ent.file_name();
let name = if let Ok(n) = name.into_string() {
n
} else {
continue;
};
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
let path = data_dir_ent.path();
self.path.push(ReadingDir::Pending(path));
} else if name.len() == 64 {
if let Ok(h) = hex::decode(&name) {
let mut hash = [0u8; 32];
hash.copy_from_slice(&h);
return Ok(Some(hash.into()));
}
}
}
}
}

View file

@ -19,7 +19,7 @@ required-features = ["cli"]
[dependencies]
err-derive = "0.3"
hexdump = "0.1"
log = "0.4"
tracing = "0.1.30"
heed = "0.11"
rusqlite = { version = "0.27", features = ["bundled"] }

View file

@ -6,7 +6,7 @@ use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard};
use log::trace;
use tracing::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction};

View file

@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
garage_api = { version = "0.7.0", path = "../api" }
garage_block = { version = "0.7.0", path = "../block" }
garage_model = { version = "0.7.0", path = "../model" }
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_table = { version = "0.7.0", path = "../table" }
@ -31,6 +32,7 @@ garage_web = { version = "0.7.0", path = "../web" }
bytes = "1.0"
bytesize = "1.1"
timeago = "0.3"
hex = "0.4"
tracing = { version = "0.1.30", features = ["log-always"] }
pretty_env_logger = "0.4"

View file

@ -24,7 +24,7 @@ use garage_model::migrate::Migrate;
use garage_model::permission::*;
use crate::cli::*;
use crate::repair::online::OnlineRepair;
use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
@ -36,6 +36,7 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
Worker(WorkerOpt),
// Replies
Ok(String),
@ -47,6 +48,10 @@ pub enum AdminRpc {
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
WorkerList(
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
}
impl Rpc for AdminRpc {
@ -693,15 +698,7 @@ impl AdminRpcHandler {
)))
}
} else {
let repair = OnlineRepair {
garage: self.garage.clone(),
};
self.garage
.system
.background
.spawn_worker("Repair worker".into(), move |must_exit| async move {
repair.repair_worker(opt, must_exit).await
});
launch_online_repair(self.garage.clone(), opt).await;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@ -830,6 +827,17 @@ impl AdminRpcHandler {
Ok(())
}
// ----
async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
match opt.cmd {
WorkerCmd::List { opt } => {
let workers = self.garage.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, opt))
}
}
}
}
#[async_trait]
@ -845,6 +853,7 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}

View file

@ -1,4 +1,5 @@
use std::collections::HashSet;
use std::time::Duration;
use garage_util::error::*;
use garage_util::formater::format_table;
@ -39,6 +40,7 @@ pub async fn cli_command_dispatch(
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
_ => unreachable!(),
}
}
@ -100,6 +102,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.iter().filter(|adv| !adv.is_up) {
if let Some(NodeRoleV(Some(cfg))) = layout.roles.get(&adv.id) {
let tf = timeago::Formatter::new();
failed_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = adv.id,
@ -110,7 +113,7 @@ pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) ->
capacity = cfg.capacity_string(),
last_seen = adv
.last_seen_secs_ago
.map(|s| format!("{}s ago", s))
.map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
));
}
@ -182,6 +185,9 @@ pub async fn cmd_admin(
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
AdminRpc::WorkerList(wi, wlo) => {
print_worker_info(wi, wlo);
}
r => {
error!("Unexpected response: {:?}", r);
}

View file

@ -45,6 +45,10 @@ pub enum Command {
/// Gather node statistics
#[structopt(name = "stats")]
Stats(StatsOpt),
/// Manage background workers
#[structopt(name = "worker")]
Worker(WorkerOpt),
}
#[derive(StructOpt, Debug)]
@ -423,8 +427,29 @@ pub enum RepairWhat {
/// Verify integrity of all blocks on disc (extremely slow, i/o intensive)
#[structopt(name = "scrub")]
Scrub {
/// Tranquility factor (see tranquilizer documentation)
#[structopt(name = "tranquility", default_value = "2")]
#[structopt(subcommand)]
cmd: ScrubCmd,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start")]
Start,
/// Pause scrub (it will resume automatically after 24 hours)
#[structopt(name = "pause")]
Pause,
/// Resume paused scrub
#[structopt(name = "resume")]
Resume,
/// Cancel scrub in progress
#[structopt(name = "cancel")]
Cancel,
/// Set tranquility level for in-progress and future scrubs
#[structopt(name = "set-tranquility")]
SetTranquility {
#[structopt()]
tranquility: u32,
},
}
@ -460,3 +485,29 @@ pub struct StatsOpt {
#[structopt(short = "d", long = "detailed")]
pub detailed: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct WorkerOpt {
#[structopt(subcommand)]
pub cmd: WorkerCmd,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerCmd {
/// List all workers on Garage node
#[structopt(name = "list")]
List {
#[structopt(flatten)]
opt: WorkerListOpt,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
pub busy: bool,
/// Show only workers with errors
#[structopt(short = "e", long = "errors")]
pub errors: bool,
}

View file

@ -1,14 +1,19 @@
use std::collections::HashMap;
use std::time::Duration;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::error::*;
use garage_util::formater::format_table;
use garage_util::time::*;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
use crate::cli::structs::WorkerListOpt;
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
@ -235,3 +240,56 @@ pub fn find_matching_node(
Ok(candidates[0])
}
}
pub fn print_worker_info(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {
(
match info.state {
WorkerState::Busy | WorkerState::Throttled(_) => 0,
WorkerState::Idle => 1,
WorkerState::Done => 2,
},
*tid,
)
});
let mut table = vec![];
for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue;
}
if wlo.errors && info.errors == 0 {
continue;
}
table.push(format!("{}\t{}\t{}", tid, info.state, info.name));
if let Some(i) = &info.info {
table.push(format!("\t\t {}", i));
}
let tf = timeago::Formatter::new();
let (err_ago, err_msg) = info
.last_error
.as_ref()
.map(|(m, t)| {
(
tf.convert(Duration::from_millis(now_msec() - t)),
m.as_str(),
)
})
.unwrap_or(("(?) ago".into(), "(?)"));
if info.consecutive_errors > 0 {
table.push(format!(
"\t\t {} consecutive errors ({} total), last {}",
info.consecutive_errors, info.errors, err_ago,
));
table.push(format!("\t\t {}", err_msg));
} else if info.errors > 0 {
table.push(format!("\t\t ({} errors, last {})", info.errors, err_ago,));
if wlo.errors {
table.push(format!("\t\t {}", err_msg));
}
}
}
format_table(table);
}

View file

@ -1,89 +1,110 @@
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::garage::Garage;
use garage_model::s3::block_ref_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::error::Error;
use crate::*;
pub struct OnlineRepair {
pub garage: Arc<Garage>,
pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
garage.bucket_table.syncer.add_full_sync();
garage.object_table.syncer.add_full_sync();
garage.version_table.syncer.add_full_sync();
garage.block_ref_table.syncer.add_full_sync();
garage.key_table.syncer.add_full_sync();
}
RepairWhat::Versions => {
info!("Repairing the versions table");
garage
.background
.spawn_worker(RepairVersionsWorker::new(garage.clone()));
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
garage
.background
.spawn_worker(RepairBlockrefsWorker::new(garage.clone()));
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
garage
.background
.spawn_worker(garage_block::repair::RepairWorker::new(
garage.block_manager.clone(),
));
}
RepairWhat::Scrub { cmd } => {
let cmd = match cmd {
ScrubCmd::Start => ScrubWorkerCommand::Start,
ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
ScrubWorkerCommand::SetTranquility(tranquility)
}
};
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await;
}
}
}
impl OnlineRepair {
pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver<bool>) {
if let Err(e) = self.repair_worker_aux(opt, must_exit).await {
warn!("Repair worker failed with error: {}", e);
// ----
struct RepairVersionsWorker {
garage: Arc<Garage>,
pos: Vec<u8>,
counter: usize,
}
impl RepairVersionsWorker {
fn new(garage: Arc<Garage>) -> Self {
Self {
garage,
pos: vec![],
counter: 0,
}
}
}
async fn repair_worker_aux(
&self,
opt: RepairOpt,
must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
self.garage.bucket_table.syncer.add_full_sync();
self.garage.object_table.syncer.add_full_sync();
self.garage.version_table.syncer.add_full_sync();
self.garage.block_ref_table.syncer.add_full_sync();
self.garage.key_table.syncer.add_full_sync();
}
RepairWhat::Versions => {
info!("Repairing the versions table");
self.repair_versions(&must_exit).await?;
}
RepairWhat::BlockRefs => {
info!("Repairing the block refs table");
self.repair_block_ref(&must_exit).await?;
}
RepairWhat::Blocks => {
info!("Repairing the stored blocks");
self.garage
.block_manager
.repair_data_store(&must_exit)
.await?;
}
RepairWhat::Scrub { tranquility } => {
info!("Verifying integrity of stored blocks");
self.garage
.block_manager
.scrub_data_store(&must_exit, tranquility)
.await?;
}
}
Ok(())
#[async_trait]
impl Worker for RepairVersionsWorker {
fn name(&self) -> String {
"Version repair worker".into()
}
async fn repair_versions(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
let mut i = 0;
fn info(&self) -> Option<String> {
Some(format!("{} items done", self.counter))
}
while !*must_exit.borrow() {
let item_bytes = match self.garage.version_table.data.store.get_gt(pos)? {
Some((k, v)) => {
pos = k;
v
}
None => break,
};
i += 1;
if i % 1000 == 0 {
info!("repair_versions: {}", i);
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.version_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => {
self.pos = k;
v
}
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if version.deleted.get() {
continue;
None => {
info!("repair_versions: finished, done {}", self.counter);
return Ok(WorkerState::Done);
}
};
self.counter += 1;
let version = rmp_serde::decode::from_read_ref::<_, Version>(&item_bytes)?;
if !version.deleted.get() {
let object = self
.garage
.object_table
@ -109,32 +130,59 @@ impl OnlineRepair {
.await?;
}
}
info!("repair_versions: finished, done {}", i);
Ok(())
Ok(WorkerState::Busy)
}
async fn repair_block_ref(&self, must_exit: &watch::Receiver<bool>) -> Result<(), Error> {
let mut pos = vec![];
let mut i = 0;
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
unreachable!()
}
}
while !*must_exit.borrow() {
let item_bytes = match self.garage.block_ref_table.data.store.get_gt(pos)? {
Some((k, v)) => {
pos = k;
v
}
None => break,
};
// ----
i += 1;
if i % 1000 == 0 {
info!("repair_block_ref: {}", i);
struct RepairBlockrefsWorker {
garage: Arc<Garage>,
pos: Vec<u8>,
counter: usize,
}
impl RepairBlockrefsWorker {
fn new(garage: Arc<Garage>) -> Self {
Self {
garage,
pos: vec![],
counter: 0,
}
}
}
#[async_trait]
impl Worker for RepairBlockrefsWorker {
fn name(&self) -> String {
"Block refs repair worker".into()
}
fn info(&self) -> Option<String> {
Some(format!("{} items done", self.counter))
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let item_bytes = match self.garage.block_ref_table.data.store.get_gt(&self.pos)? {
Some((k, v)) => {
self.pos = k;
v
}
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if block_ref.deleted.get() {
continue;
None => {
info!("repair_block_ref: finished, done {}", self.counter);
return Ok(WorkerState::Done);
}
};
self.counter += 1;
let block_ref = rmp_serde::decode::from_read_ref::<_, BlockRef>(&item_bytes)?;
if !block_ref.deleted.get() {
let version = self
.garage
.version_table
@ -157,7 +205,11 @@ impl OnlineRepair {
.await?;
}
}
info!("repair_block_ref: finished, done {}", i);
Ok(())
Ok(WorkerState::Busy)
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
unreachable!()
}
}

View file

@ -2,8 +2,8 @@ use core::ops::Bound;
use std::collections::{hash_map, BTreeMap, HashMap};
use std::marker::PhantomData;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
@ -11,6 +11,7 @@ use garage_db as db;
use garage_rpc::ring::Ring;
use garage_rpc::system::System;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@ -171,11 +172,13 @@ impl<T: CountedItem> IndexCounter<T> {
),
});
let this2 = this.clone();
background.spawn_worker(
format!("{} index counter propagator", T::COUNTER_TABLE_NAME),
move |must_exit| this2.clone().propagate_loop(propagate_rx, must_exit),
);
background.spawn_worker(IndexPropagatorWorker {
index_counter: this.clone(),
propagate_rx,
buf: HashMap::new(),
errors: 0,
});
this
}
@ -239,68 +242,6 @@ impl<T: CountedItem> IndexCounter<T> {
Ok(())
}
async fn propagate_loop(
self: Arc<Self>,
mut propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
must_exit: watch::Receiver<bool>,
) {
// This loop batches updates to counters to be sent all at once.
// They are sent once the propagate_rx channel has been emptied (or is closed).
let mut buf = HashMap::new();
let mut errors = 0;
loop {
let (ent, closed) = match propagate_rx.try_recv() {
Ok(ent) => (Some(ent), false),
Err(mpsc::error::TryRecvError::Empty) if buf.is_empty() => {
match propagate_rx.recv().await {
Some(ent) => (Some(ent), false),
None => (None, true),
}
}
Err(mpsc::error::TryRecvError::Empty) => (None, false),
Err(mpsc::error::TryRecvError::Disconnected) => (None, true),
};
if let Some((pk, sk, counters)) = ent {
let tree_key = self.table.data.tree_key(&pk, &sk);
let dist_entry = counters.into_counter_entry(self.this_node);
match buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
}
hash_map::Entry::Occupied(mut e) => {
e.get_mut().merge(&dist_entry);
}
}
// As long as we can add entries, loop back and add them to batch
// before sending batch to other nodes
continue;
}
if !buf.is_empty() {
let entries = buf.iter().map(|(_k, v)| v);
if let Err(e) = self.table.insert_many(entries).await {
errors += 1;
if errors >= 2 && *must_exit.borrow() {
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, buf.len(), e);
break;
}
warn!("({}) Could not propagate {} counter values: {}, retrying in 5 seconds (retry #{})", T::COUNTER_TABLE_NAME, buf.len(), e, errors);
tokio::time::sleep(Duration::from_secs(5)).await;
continue;
}
buf.clear();
errors = 0;
}
if closed || *must_exit.borrow() {
break;
}
}
}
pub fn offline_recount_all<TS, TR>(
&self,
counted_table: &Arc<Table<TS, TR>>,
@ -437,6 +378,98 @@ impl<T: CountedItem> IndexCounter<T> {
}
}
struct IndexPropagatorWorker<T: CountedItem> {
index_counter: Arc<IndexCounter<T>>,
propagate_rx: mpsc::UnboundedReceiver<(T::CP, T::CS, LocalCounterEntry<T>)>,
buf: HashMap<Vec<u8>, CounterEntry<T>>,
errors: usize,
}
impl<T: CountedItem> IndexPropagatorWorker<T> {
fn add_ent(&mut self, pk: T::CP, sk: T::CS, counters: LocalCounterEntry<T>) {
let tree_key = self.index_counter.table.data.tree_key(&pk, &sk);
let dist_entry = counters.into_counter_entry(self.index_counter.this_node);
match self.buf.entry(tree_key) {
hash_map::Entry::Vacant(e) => {
e.insert(dist_entry);
}
hash_map::Entry::Occupied(mut e) => {
e.get_mut().merge(&dist_entry);
}
}
}
}
#[async_trait]
impl<T: CountedItem> Worker for IndexPropagatorWorker<T> {
fn name(&self) -> String {
format!("{} index counter propagator", T::COUNTER_TABLE_NAME)
}
fn info(&self) -> Option<String> {
if !self.buf.is_empty() {
Some(format!("{} items in queue", self.buf.len()))
} else {
None
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
// This loop batches updates to counters to be sent all at once.
// They are sent once the propagate_rx channel has been emptied (or is closed).
let closed = loop {
match self.propagate_rx.try_recv() {
Ok((pk, sk, counters)) => {
self.add_ent(pk, sk, counters);
}
Err(mpsc::error::TryRecvError::Empty) => break false,
Err(mpsc::error::TryRecvError::Disconnected) => break true,
}
};
if !self.buf.is_empty() {
let entries_k = self.buf.keys().take(100).cloned().collect::<Vec<_>>();
let entries = entries_k.iter().map(|k| self.buf.get(k).unwrap());
if let Err(e) = self.index_counter.table.insert_many(entries).await {
self.errors += 1;
if self.errors >= 2 && *must_exit.borrow() {
error!("({}) Could not propagate {} counter values: {}, these counters will not be updated correctly.", T::COUNTER_TABLE_NAME, self.buf.len(), e);
return Ok(WorkerState::Done);
}
// Propagate error up to worker manager, it will log it, increment a counter,
// and sleep for a certain delay (with exponential backoff), waiting for
// things to go back to normal
return Err(e);
} else {
for k in entries_k {
self.buf.remove(&k);
}
self.errors = 0;
}
return Ok(WorkerState::Busy);
} else if closed {
return Ok(WorkerState::Done);
} else {
return Ok(WorkerState::Idle);
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerState {
match self.propagate_rx.recv().await {
Some((pk, sk, counters)) => {
self.add_ent(pk, sk, counters);
WorkerState::Busy
}
None => match self.buf.is_empty() {
false => WorkerState::Busy,
true => WorkerState::Done,
},
}
}
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
struct LocalCounterEntry<T: CountedItem> {
pk: T::CP,

View file

@ -2,7 +2,7 @@
use std::collections::HashMap;
use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr};
use std::path::Path;
use std::path::{Path, PathBuf};
use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant};
@ -104,6 +104,9 @@ pub struct System {
/// The job runner of this node
pub background: Arc<BackgroundRunner>,
/// Path to metadata directory
pub metadata_dir: PathBuf,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -295,6 +298,7 @@ impl System {
ring,
update_ring: Mutex::new(update_ring),
background,
metadata_dir: config.metadata_dir.clone(),
});
sys.system_endpoint.set_handler(sys.clone());
sys

View file

@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use tokio::sync::watch;
use garage_db::counted_tree_hack::CountedTree;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@ -69,35 +68,11 @@ where
gc.endpoint.set_handler(gc.clone());
let gc1 = gc.clone();
system.background.spawn_worker(
format!("GC loop for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
);
system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(None) => {
// Stuff was done, loop immediately
}
Ok(Some(wait_delay)) => {
// Nothing was done, wait specified delay.
select! {
_ = tokio::time::sleep(wait_delay).fuse() => {},
_ = must_exit.changed().fuse() => {},
}
}
Err(e) => {
warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
}
}
}
}
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@ -328,6 +303,66 @@ where
}
}
struct GcWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
impl<F, R> GcWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
wait_delay: Duration::from_secs(0),
}
}
}
#[async_trait]
impl<F, R> Worker for GcWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String {
format!("{} GC", F::TABLE_NAME)
}
fn info(&self) -> Option<String> {
let l = self.gc.data.gc_todo_len().unwrap_or(0);
if l > 0 {
Some(format!("{} items in queue", l))
} else {
None
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.gc.gc_loop_iter().await? {
None => Ok(WorkerState::Busy),
Some(delay) => {
self.wait_delay = delay;
Ok(WorkerState::Idle)
}
}
}
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
if *must_exit.borrow() {
return WorkerState::Done;
}
tokio::time::sleep(self.wait_delay).await;
WorkerState::Busy
}
}
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled

View file

@ -1,14 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
use futures::select;
use futures_util::future::*;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::BackgroundRunner;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@ -78,43 +77,17 @@ where
empty_node_hash,
});
let ret2 = ret.clone();
background.spawn_worker(
format!("Merkle tree updater for {}", F::TABLE_NAME),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.updater_loop_iter() {
Ok(true) => (),
Ok(false) => {
select! {
_ = self.data.merkle_todo_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
}
Err(e) => {
warn!(
"({}) Error while updating Merkle tree item: {}",
F::TABLE_NAME,
e
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
fn updater_loop_iter(&self) -> Result<bool, Error> {
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
Ok(true)
Ok(WorkerState::Busy)
} else {
Ok(false)
Ok(WorkerState::Idle)
}
}
@ -325,6 +298,54 @@ where
}
}
struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
where
F: TableSchema + 'static,
R: TableReplication + 'static;
#[async_trait]
impl<F, R> Worker for MerkleWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String {
format!("{} Merkle tree updater", F::TABLE_NAME)
}
fn info(&self) -> Option<String> {
let l = self.0.todo_len().unwrap_or(0);
if l > 0 {
Some(format!("{} items in queue", l))
} else {
None
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
let updater = self.0.clone();
tokio::task::spawn_blocking(move || {
for _i in 0..100 {
let s = updater.updater_loop_iter();
if !matches!(s, Ok(WorkerState::Busy)) {
return s;
}
}
Ok(WorkerState::Busy)
})
.await
.unwrap()
}
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
if *must_exit.borrow() {
return WorkerState::Done;
}
tokio::time::sleep(Duration::from_secs(10)).await;
WorkerState::Busy
}
}
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());

View file

@ -1,17 +1,17 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@ -52,10 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
struct SyncTodo {
todo: Vec<TodoPartition>,
}
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@ -80,118 +76,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
let todo = SyncTodo { todo: vec![] };
let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
todo: Mutex::new(todo),
add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
system.background.spawn_worker(
format!("table sync watcher for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
system.background.spawn_worker(
format!("table syncer for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
let s3 = syncer.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(20)).await;
s3.add_full_sync();
system.background.spawn_worker(SyncWorker {
syncer: syncer.clone(),
ring_recv: system.ring.clone(),
ring: system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) {
let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
select! {
_ = ring_recv.changed().fuse() => {
let new_ring = ring_recv.borrow();
if !Arc::ptr_eq(&new_ring, &prev_ring) {
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
prev_ring = new_ring.clone();
}
}
busy_opt = busy_rx.recv().fuse() => {
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
} else if nothing_to_do_since.is_none() {
nothing_to_do_since = Some(Instant::now());
}
}
}
_ = must_exit.changed().fuse() => {},
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
}
}
}
}
}
pub fn add_full_sync(&self) {
self.todo
.lock()
.unwrap()
.add_full_sync(&self.data, &self.system);
}
async fn syncer_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
busy_tx: mpsc::UnboundedSender<bool>,
) {
while !*must_exit.borrow() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
busy_tx.send(true).unwrap();
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
.await;
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
F::TABLE_NAME,
partition,
e
);
}
} else {
busy_tx.send(false).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
if self.add_full_sync_tx.send(()).is_err() {
error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
// ----
async fn sync_partition(
self: Arc<Self>,
self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@ -577,12 +495,22 @@ where
}
}
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
data: &TableData<F, R>,
system: &System,
) {
// -------- Sync Worker ---------
struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
todo: Vec<TodoPartition>,
next_full_sync: Instant,
}
impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
let my_id = system.id;
self.todo.clear();
@ -623,6 +551,8 @@ impl SyncTodo {
retain,
});
}
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@ -641,6 +571,62 @@ impl SyncTodo {
}
}
#[async_trait]
impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("{} sync", F::TABLE_NAME)
}
fn info(&self) -> Option<String> {
let l = self.todo.len();
if l > 0 {
Some(format!("{} partitions remaining", l))
} else {
None
}
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
if let Some(partition) = self.pop_task() {
self.syncer.sync_partition(&partition, must_exit).await?;
Ok(WorkerState::Busy)
} else {
Ok(WorkerState::Idle)
}
}
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
if *must_exit.borrow() {
return WorkerState::Done;
}
select! {
s = self.add_full_sync_rx.recv() => {
if let Some(()) = s {
self.add_full_sync();
}
},
_ = self.ring_recv.changed() => {
let new_ring = self.ring_recv.borrow();
if !Arc::ptr_eq(&new_ring, &self.ring) {
self.ring = new_ring.clone();
drop(new_ring);
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
}
},
_ = tokio::time::sleep(self.next_full_sync - Instant::now()) => {
self.add_full_sync();
}
}
match self.todo.is_empty() {
false => WorkerState::Busy,
true => WorkerState::Idle,
}
}
}
// ---- UTIL ----
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}

View file

@ -16,6 +16,7 @@ path = "lib.rs"
[dependencies]
garage_db = { version = "0.8.0", path = "../db" }
async-trait = "0.1"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }

View file

@ -1,160 +0,0 @@
//! Job runner for futures and async functions
use core::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use futures::future::*;
use futures::select;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::sync::{mpsc, mpsc::error::TryRecvError, watch, Mutex};
use crate::error::Error;
type JobOutput = Result<(), Error>;
type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner {
stop_signal: watch::Receiver<bool>,
queue_in: mpsc::UnboundedSender<(Job, bool)>,
worker_in: mpsc::UnboundedSender<tokio::task::JoinHandle<()>>,
}
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (worker_in, mut worker_out) = mpsc::unbounded_channel();
let stop_signal_2 = stop_signal.clone();
let await_all_done = tokio::spawn(async move {
let mut workers = FuturesUnordered::new();
let mut shutdown_timer = 0;
loop {
let closed = match worker_out.try_recv() {
Ok(wkr) => {
workers.push(wkr);
false
}
Err(TryRecvError::Empty) => false,
Err(TryRecvError::Disconnected) => true,
};
select! {
res = workers.next() => {
if let Some(Err(e)) = res {
error!("Worker exited with error: {}", e);
}
}
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if closed || *stop_signal_2.borrow() {
shutdown_timer += 1;
if shutdown_timer >= 10 {
break;
}
}
}
}
}
});
let (queue_in, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
for i in 0..n_runners {
let queue_out = queue_out.clone();
let stop_signal = stop_signal.clone();
worker_in
.send(tokio::spawn(async move {
loop {
let (job, cancellable) = {
select! {
item = wait_job(&queue_out).fuse() => match item {
// We received a task, process it
Some(x) => x,
// We received a signal that no more tasks will ever be sent
// because the sending side was dropped. Exit now.
None => break,
},
_ = tokio::time::sleep(Duration::from_secs(5)).fuse() => {
if *stop_signal.borrow() {
// Nothing has been going on for 5 secs, and we are shutting
// down. Exit now.
break;
} else {
// Nothing is going on but we don't want to exit.
continue;
}
}
}
};
if cancellable && *stop_signal.borrow() {
continue;
}
if let Err(e) = job.await {
error!("Job failed: {}", e)
}
}
info!("Background worker {} exiting", i);
}))
.unwrap();
}
let bgrunner = Arc::new(Self {
stop_signal,
queue_in,
worker_in,
});
(bgrunner, await_all_done)
}
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.queue_in
.send((boxed, false))
.map_err(|_| "could not put job in queue")
.unwrap();
}
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.queue_in
.send((boxed, true))
.map_err(|_| "could not put job in queue")
.unwrap();
}
pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = ()> + Send + 'static,
{
let stop_signal = self.stop_signal.clone();
let task = tokio::spawn(async move {
info!("Worker started: {}", name);
worker(stop_signal).await;
info!("Worker exited: {}", name);
});
self.worker_in
.send(task)
.map_err(|_| "could not put job in queue")
.unwrap();
}
}
async fn wait_job(q: &Mutex<mpsc::UnboundedReceiver<(Job, bool)>>) -> Option<(Job, bool)> {
q.lock().await.recv().await
}

View file

@ -0,0 +1,48 @@
//! Job worker: a generic worker that just processes incoming
//! jobs one by one
use std::sync::Arc;
use async_trait::async_trait;
use tokio::sync::{mpsc, Mutex};
use crate::background::worker::*;
use crate::background::*;
pub(crate) struct JobWorker {
pub(crate) index: usize,
pub(crate) job_chan: Arc<Mutex<mpsc::UnboundedReceiver<(Job, bool)>>>,
pub(crate) next_job: Option<Job>,
}
#[async_trait]
impl Worker for JobWorker {
fn name(&self) -> String {
format!("Job worker #{}", self.index)
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
match self.next_job.take() {
None => return Ok(WorkerState::Idle),
Some(job) => {
job.await?;
Ok(WorkerState::Busy)
}
}
}
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState {
loop {
match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => {
if cancellable && *must_exit.borrow() {
continue;
}
self.next_job = Some(job);
return WorkerState::Busy;
}
None => return WorkerState::Done,
}
}
}
}

117
src/util/background/mod.rs Normal file
View file

@ -0,0 +1,117 @@
//! Job runner for futures and async functions
pub mod job_worker;
pub mod worker;
use core::future::Future;
use std::collections::HashMap;
use std::pin::Pin;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState};
pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
/// Job runner for futures and async functions
pub struct BackgroundRunner {
send_job: mpsc::UnboundedSender<(Job, bool)>,
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
pub struct WorkerInfo {
pub name: String,
pub info: Option<String>,
pub state: WorkerState,
pub errors: usize,
pub consecutive_errors: usize,
pub last_error: Option<(String, u64)>,
}
impl BackgroundRunner {
/// Create a new BackgroundRunner
pub fn new(
n_runners: usize,
stop_signal: watch::Receiver<bool>,
) -> (Arc<Self>, tokio::task::JoinHandle<()>) {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let worker_info = Arc::new(std::sync::Mutex::new(HashMap::new()));
let mut worker_processor =
WorkerProcessor::new(worker_out, stop_signal, worker_info.clone());
let await_all_done = tokio::spawn(async move {
worker_processor.run().await;
});
let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
for i in 0..n_runners {
let queue_out = queue_out.clone();
send_worker
.send(Box::new(job_worker::JobWorker {
index: i,
job_chan: queue_out.clone(),
next_job: None,
}))
.ok()
.unwrap();
}
let bgrunner = Arc::new(Self {
send_job,
send_worker,
worker_info,
});
(bgrunner, await_all_done)
}
pub fn get_worker_info(&self) -> HashMap<usize, WorkerInfo> {
self.worker_info.lock().unwrap().clone()
}
/// Spawn a task to be run in background
pub fn spawn<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.send_job
.send((boxed, false))
.ok()
.expect("Could not put job in queue");
}
/// Spawn a task to be run in background. It may get discarded before running if spawned while
/// the runner is stopping
pub fn spawn_cancellable<T>(&self, job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
let boxed: Job = Box::pin(job);
self.send_job
.send((boxed, true))
.ok()
.expect("Could not put job in queue");
}
pub fn spawn_worker<W>(&self, worker: W)
where
W: Worker + 'static,
{
self.send_worker
.send(Box::new(worker))
.ok()
.expect("Could not put worker in queue");
}
}

View file

@ -0,0 +1,261 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
use tracing::*;
use crate::background::WorkerInfo;
use crate::error::Error;
use crate::time::now_msec;
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
pub enum WorkerState {
Busy,
Throttled(f32),
Idle,
Done,
}
impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerState::Busy => write!(f, "Busy"),
WorkerState::Throttled(t) => write!(f, "Thr:{:.3}", t),
WorkerState::Idle => write!(f, "Idle"),
WorkerState::Done => write!(f, "Done"),
}
}
}
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;
fn info(&self) -> Option<String> {
None
}
/// Work: do a basic unit of work, if one is available (otherwise, should return
/// WorkerState::Idle immediately). We will do our best to not interrupt this future in the
/// middle of processing, it will only be interrupted at the last minute when Garage is trying
/// to exit and this hasn't returned yet. This function may return an error to indicate that
/// its unit of work could not be processed due to an error: the error will be logged and
/// .work() will be called again after a short delay.
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error>;
/// Wait for work: await for some task to become available. This future can be interrupted in
/// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
/// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
/// it to check if we are exiting.
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerState;
}
pub(crate) struct WorkerProcessor {
stop_signal: watch::Receiver<bool>,
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
impl WorkerProcessor {
pub(crate) fn new(
worker_chan: mpsc::UnboundedReceiver<Box<dyn Worker>>,
stop_signal: watch::Receiver<bool>,
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
) -> Self {
Self {
stop_signal,
worker_chan,
worker_info,
}
}
pub(crate) async fn run(&mut self) {
let mut workers = FuturesUnordered::new();
let mut next_task_id = 1;
while !*self.stop_signal.borrow() {
let await_next_worker = async {
if workers.is_empty() {
futures::future::pending().await
} else {
workers.next().await
}
};
select! {
new_worker_opt = self.worker_chan.recv() => {
if let Some(new_worker) = new_worker_opt {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
let stop_signal_worker = self.stop_signal.clone();
let mut worker = WorkerHandler {
task_id,
stop_signal,
stop_signal_worker,
worker: new_worker,
state: WorkerState::Busy,
errors: 0,
consecutive_errors: 0,
last_error: None,
};
workers.push(async move {
worker.step().await;
worker
}.boxed());
}
}
worker = await_next_worker => {
if let Some(mut worker) = worker {
trace!("{} (TID {}): {:?}", worker.worker.name(), worker.task_id, worker.state);
// Save worker info
let mut wi = self.worker_info.lock().unwrap();
match wi.get_mut(&worker.task_id) {
Some(i) => {
i.state = worker.state;
i.info = worker.worker.info();
i.errors = worker.errors;
i.consecutive_errors = worker.consecutive_errors;
if worker.last_error.is_some() {
i.last_error = worker.last_error.take();
}
}
None => {
wi.insert(worker.task_id, WorkerInfo {
name: worker.worker.name(),
state: worker.state,
info: worker.worker.info(),
errors: worker.errors,
consecutive_errors: worker.consecutive_errors,
last_error: worker.last_error.take(),
});
}
}
if worker.state == WorkerState::Done {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
} else {
workers.push(async move {
worker.step().await;
worker
}.boxed());
}
}
}
_ = self.stop_signal.changed() => (),
}
}
// We are exiting, drain everything
let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
while let Some(mut worker) = workers.next().await {
if worker.state == WorkerState::Done {
info!(
"Worker {} (TID {}) exited",
worker.worker.name(),
worker.task_id
);
} else if Instant::now() > drain_half_time {
warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.state);
} else {
workers.push(
async move {
worker.step().await;
worker
}
.boxed(),
);
}
}
};
select! {
_ = drain_everything => {
info!("All workers exited peacefully \\o/");
}
_ = tokio::time::sleep(Duration::from_secs(9)) => {
error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
}
}
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
state: WorkerState,
errors: usize,
consecutive_errors: usize,
last_error: Option<(String, u64)>,
}
impl WorkerHandler {
async fn step(&mut self) {
match self.state {
WorkerState::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
self.state = s;
self.consecutive_errors = 0;
}
Err(e) => {
error!(
"Error in worker {} (TID {}): {}",
self.worker.name(),
self.task_id,
e
);
self.errors += 1;
self.consecutive_errors += 1;
self.last_error = Some((format!("{}", e), now_msec()));
// Sleep a bit so that error won't repeat immediately, exponential backoff
// strategy (min 1sec, max ~60sec)
self.state = WorkerState::Throttled(
(1.5f32).powf(std::cmp::min(10, self.consecutive_errors - 1) as f32),
);
}
},
WorkerState::Throttled(delay) => {
// Sleep for given delay and go back to busy state
if !*self.stop_signal.borrow() {
select! {
_ = tokio::time::sleep(Duration::from_secs_f32(delay)) => (),
_ = self.stop_signal.changed() => (),
}
}
self.state = WorkerState::Busy;
}
WorkerState::Idle => {
if *self.stop_signal.borrow() {
select! {
new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
self.state = new_st;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// stay in Idle state
}
}
} else {
select! {
new_st = self.worker.wait_for_work(&self.stop_signal_worker) => {
self.state = new_st;
}
_ = self.stop_signal.changed() => {
// stay in Idle state
}
}
}
}
WorkerState::Done => unreachable!(),
}
}
}

View file

@ -11,7 +11,6 @@ pub mod error;
pub mod formater;
pub mod metrics;
pub mod persister;
//pub mod sled_counter;
pub mod time;
pub mod token_bucket;
pub mod tranquilizer;

View file

@ -3,6 +3,8 @@ use std::time::{Duration, Instant};
use tokio::time::sleep;
use crate::background::WorkerState;
/// A tranquilizer is a helper object that is used to make
/// background operations not take up too much time.
///
@ -33,7 +35,7 @@ impl Tranquilizer {
}
}
pub async fn tranquilize(&mut self, tranquility: u32) {
fn tranquilize_internal(&mut self, tranquility: u32) -> Option<Duration> {
let observation = Instant::now() - self.last_step_begin;
self.observations.push_back(observation);
@ -45,13 +47,32 @@ impl Tranquilizer {
if !self.observations.is_empty() {
let delay = (tranquility * self.sum_observations) / (self.observations.len() as u32);
sleep(delay).await;
Some(delay)
} else {
None
}
}
self.reset();
pub async fn tranquilize(&mut self, tranquility: u32) {
if let Some(delay) = self.tranquilize_internal(tranquility) {
sleep(delay).await;
self.reset();
}
}
#[must_use]
pub fn tranquilize_worker(&mut self, tranquility: u32) -> WorkerState {
match self.tranquilize_internal(tranquility) {
Some(delay) => WorkerState::Throttled(delay.as_secs_f32()),
None => WorkerState::Busy,
}
}
pub fn reset(&mut self) {
self.last_step_begin = Instant::now();
}
pub fn clear(&mut self) {
self.observations.clear();
}
}