From 0cd5b2ae19965b8c1f3176afeb8f678c4d8366dd Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 15 Mar 2021 22:36:41 +0100 Subject: [PATCH] WIP migrate to tokio 1 --- Cargo.lock | 278 ++++++++++++------------------------- src/api/Cargo.toml | 4 +- src/garage/Cargo.toml | 2 +- src/garage/server.rs | 6 +- src/model/Cargo.toml | 3 +- src/model/block.rs | 23 +-- src/model/object_table.rs | 3 +- src/model/version_table.rs | 3 +- src/rpc/Cargo.toml | 11 +- src/rpc/membership.rs | 30 ++-- src/rpc/rpc_client.rs | 2 +- src/rpc/rpc_server.rs | 5 +- src/table/Cargo.toml | 2 +- src/table/data.rs | 6 +- src/table/gc.rs | 4 +- src/table/merkle.rs | 4 +- src/table/sync.rs | 35 ++--- src/util/Cargo.toml | 7 +- src/util/background.rs | 131 ++++++++--------- src/util/error.rs | 8 +- src/web/Cargo.toml | 2 +- 21 files changed, 233 insertions(+), 336 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9fb368e5..8e8641ef 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -17,6 +17,12 @@ version = "0.4.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dabe5a181f83789739c194cbe5a897dde195078fac08568d09221fd6137a7ba8" +[[package]] +name = "arc-swap" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d4d7d63395147b81a9e570bcc6243aaf71c017bd666d4909cfef0085bdda8d73" + [[package]] name = "arrayvec" version = "0.3.25" @@ -46,7 +52,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -55,12 +61,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" -[[package]] -name = "base64" -version = "0.11.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b41b7ea54a0c9d92199de89e20e58d49f02f8e699814ef3fdf266f6f748d15c7" - [[package]] name = "base64" version = "0.13.0" @@ -142,12 +142,6 @@ dependencies = [ "iovec", ] -[[package]] -name = "bytes" -version = "0.5.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" - [[package]] name = "bytes" version = "1.0.1" @@ -182,7 +176,7 @@ dependencies = [ "num-integer", "num-traits", "time", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -347,7 +341,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9564fc758e15025b46aa6643b1b77d047d1a56a1aea6e01002ac0c7026876213" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -356,22 +350,6 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" -[[package]] -name = "fuchsia-zircon" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" -dependencies = [ - "bitflags", - "fuchsia-zircon-sys", -] - -[[package]] -name = "fuchsia-zircon-sys" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" - [[package]] name = "futures" version = "0.3.12" @@ -460,7 +438,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.4", + "pin-project-lite", "pin-utils", "proc-macro-hack", "proc-macro-nested", @@ -507,7 +485,7 @@ dependencies = [ name = "garage_api" version = "0.1.1" dependencies = [ - "base64 0.13.0", + "base64", "bytes 0.4.12", "chrono", "crypto-mac 0.7.0", @@ -537,7 +515,7 @@ dependencies = [ name = "garage_model" version = "0.1.1" dependencies = [ - "arc-swap", + "arc-swap 0.4.8", "async-trait", "bytes 0.4.12", "futures", @@ -560,7 +538,7 @@ dependencies = [ name = "garage_rpc" version = "0.1.1" dependencies = [ - "arc-swap", + "arc-swap 0.4.8", "bytes 0.4.12", "futures", "futures-util", @@ -579,6 +557,7 @@ dependencies = [ "sha2", "tokio", "tokio-rustls", + "tokio-stream", "webpki", ] @@ -607,6 +586,7 @@ dependencies = [ name = "garage_util" version = "0.1.1" dependencies = [ + "arc-swap 1.2.0", "blake2", "chrono", "err-derive", @@ -664,7 +644,7 @@ dependencies = [ "libc", "log", "rustversion", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -693,7 +673,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e692e296bfac1d2533ef168d0b60ff5897b8b70a4009276834014dd8924cc028" dependencies = [ "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -731,11 +711,11 @@ dependencies = [ [[package]] name = "h2" -version = "0.2.7" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e4728fd124914ad25e99e3d15a9361a879f6620f63cb56bbb08f95abb97a535" +checksum = "d832b01df74254fe364568d6ddc294443f61cbec82816b60904303af87efae78" dependencies = [ - "bytes 0.5.6", + "bytes 1.0.1", "fnv", "futures-core", "futures-sink", @@ -746,7 +726,6 @@ dependencies = [ "tokio", "tokio-util", "tracing", - "tracing-futures", ] [[package]] @@ -812,11 +791,11 @@ dependencies = [ [[package]] name = "http-body" -version = "0.3.1" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13d5ff830006f7646652e057693569bfe0d51760c0085a071769d142a205111b" +checksum = "2861bd27ee074e5ee891e8b539837a9430012e249d7f0ca2d795650f579c1994" dependencies = [ - "bytes 0.5.6", + "bytes 1.0.1", "http", ] @@ -849,11 +828,11 @@ dependencies = [ [[package]] name = "hyper" -version = "0.13.9" +version = "0.14.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6ad767baac13b44d4529fcf58ba2cd0995e36e7b435bc5b039de6f47e880dbf" +checksum = "e8e946c2b1349055e0b72ae281b238baf1a3ea7307c7e9f9d64673bdd9c26ac7" dependencies = [ - "bytes 0.5.6", + "bytes 1.0.1", "futures-channel", "futures-core", "futures-util", @@ -863,7 +842,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project 1.0.4", + "pin-project", "socket2", "tokio", "tower-service", @@ -873,11 +852,10 @@ dependencies = [ [[package]] name = "hyper-rustls" -version = "0.20.0" +version = "0.22.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ac965ea399ec3a25ac7d13b8affd4b8f39325cca00858ddf5eb29b79e6b14b08" +checksum = "5f9f7a97316d44c0af9b0301e65010573a853a9fc97046d7331d7f6bc0fd5a64" dependencies = [ - "bytes 0.5.6", "futures-util", "hyper", "log", @@ -947,16 +925,6 @@ dependencies = [ "wasm-bindgen", ] -[[package]] -name = "kernel32-sys" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - [[package]] name = "lazy_static" version = "1.4.0" @@ -965,9 +933,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.82" +version = "0.2.89" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89203f3fba0a3795506acaad8ebce3c80c0af93f994d5a1d7a0b1eeb23271929" +checksum = "538c092e5586f4cdd7dd8078c4a79220e3e168880218124dcbce860f0ea938c6" [[package]] name = "lock_api" @@ -1032,55 +1000,25 @@ dependencies = [ [[package]] name = "mio" -version = "0.6.23" +version = "0.7.10" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +checksum = "2182a122f3b7f3f5329cb1972cee089ba2459a0a80a56935e6e674f096f8d839" dependencies = [ - "cfg-if 0.1.10", - "fuchsia-zircon", - "fuchsia-zircon-sys", - "iovec", - "kernel32-sys", "libc", "log", "miow", - "net2", - "slab", - "winapi 0.2.8", -] - -[[package]] -name = "mio-uds" -version = "0.6.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" -dependencies = [ - "iovec", - "libc", - "mio", + "ntapi", + "winapi", ] [[package]] name = "miow" -version = "0.2.2" +version = "0.3.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" dependencies = [ - "kernel32-sys", - "net2", - "winapi 0.2.8", - "ws2_32-sys", -] - -[[package]] -name = "net2" -version = "0.2.37" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" -dependencies = [ - "cfg-if 0.1.10", - "libc", - "winapi 0.3.9", + "socket2", + "winapi", ] [[package]] @@ -1089,6 +1027,15 @@ version = "0.1.14" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "72ef4a56884ca558e5ddb05a1d1e7e1bfd9a68d9ed024c21704cc98872dae1bb" +[[package]] +name = "ntapi" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f6bb902e437b6d86e03cce10a7e2af662292c5dfef23b65899ea3ac9354ad44" +dependencies = [ + "winapi", +] + [[package]] name = "num-integer" version = "0.1.44" @@ -1164,7 +1111,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1173,33 +1120,13 @@ version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d4fd5641d01c8f18a23da7b6fe29298ff4b55afcccdf78973b24cf3175fee32e" -[[package]] -name = "pin-project" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2ffbc8e94b38ea3d2d8ba92aea2983b503cd75d0888d75b86bb37970b5698e15" -dependencies = [ - "pin-project-internal 0.4.27", -] - [[package]] name = "pin-project" version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95b70b68509f17aa2857863b6fa00bf21fc93674c7a8893de2f469f6aa7ca2f2" dependencies = [ - "pin-project-internal 1.0.4", -] - -[[package]] -name = "pin-project-internal" -version = "0.4.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "65ad2ae56b6abe3a1ee25f15ee605bacadb9a764edaba9c2bf4103800d4a1895" -dependencies = [ - "proc-macro2", - "quote", - "syn", + "pin-project-internal", ] [[package]] @@ -1213,12 +1140,6 @@ dependencies = [ "syn", ] -[[package]] -name = "pin-project-lite" -version = "0.1.11" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" - [[package]] name = "pin-project-lite" version = "0.2.4" @@ -1317,7 +1238,7 @@ dependencies = [ "libc", "rand_core 0.3.1", "rdrand", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1424,7 +1345,7 @@ dependencies = [ "spin", "untrusted", "web-sys", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1459,11 +1380,11 @@ dependencies = [ [[package]] name = "rustls" -version = "0.17.0" +version = "0.19.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0d4a31f5d68413404705d6982529b0e11a9aacd4839d1d6222ee3b8cb4015e1" +checksum = "064fd21ff87c6e87ed4506e68beb42459caa4a0e2eb144932e6776768556980b" dependencies = [ - "base64 0.11.0", + "base64", "log", "ring", "sct", @@ -1607,7 +1528,7 @@ checksum = "122e570113d28d773067fab24266b66753f6ea915758651696b6e35e49f88d6e" dependencies = [ "cfg-if 1.0.0", "libc", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1710,7 +1631,7 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", "wasi 0.10.0+wasi-snapshot-preview1", - "winapi 0.3.9", + "winapi", ] [[package]] @@ -1730,32 +1651,28 @@ checksum = "cda74da7e1a664f795bb1f8a87ec406fb89a02522cf6e50620d016add6dbbf5c" [[package]] name = "tokio" -version = "0.2.24" +version = "1.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "099837d3464c16a808060bb3f02263b412f6fafcb5d01c533d309985fbeebe48" +checksum = "8d56477f6ed99e10225f38f9f75f872f29b8b8bd8c0b946f63345bb144e9eeda" dependencies = [ - "bytes 0.5.6", - "fnv", - "futures-core", - "iovec", - "lazy_static", + "autocfg", + "bytes 1.0.1", "libc", "memchr", "mio", - "mio-uds", "num_cpus", - "pin-project-lite 0.1.11", + "once_cell", + "pin-project-lite", "signal-hook-registry", - "slab", "tokio-macros", - "winapi 0.3.9", + "winapi", ] [[package]] name = "tokio-macros" -version = "0.2.6" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +checksum = "caf7b11a536f46a809a8a9f0bb4237020f70ecbf115b842360afb127ea2fda57" dependencies = [ "proc-macro2", "quote", @@ -1764,27 +1681,37 @@ dependencies = [ [[package]] name = "tokio-rustls" -version = "0.13.1" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "15cb62a0d2770787abc96e99c1cd98fcf17f94959f3af63ca85bdfb203f051b4" +checksum = "bc6844de72e57df1980054b38be3a9f4702aba4858be64dd700181a8a6d0e1b6" dependencies = [ - "futures-core", "rustls", "tokio", "webpki", ] [[package]] -name = "tokio-util" -version = "0.3.1" +name = "tokio-stream" +version = "0.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "be8242891f2b6cbef26a2d7e8605133c2c554cd35b3e4948ea892d6d68436499" +checksum = "c535f53c0cfa1acace62995a8994fc9cc1f12d202420da96ff306ee24d576469" dependencies = [ - "bytes 0.5.6", + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec31e5cc6b46e653cf57762f36f71d5e6386391d88a72fd6db4508f8f676fb29" +dependencies = [ + "bytes 1.0.1", "futures-core", "futures-sink", "log", - "pin-project-lite 0.1.11", + "pin-project-lite", "tokio", ] @@ -1810,8 +1737,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f47026cdc4080c07e49b37087de021820269d996f581aac150ef9e5583eefe3" dependencies = [ "cfg-if 1.0.0", - "log", - "pin-project-lite 0.2.4", + "pin-project-lite", "tracing-core", ] @@ -1824,16 +1750,6 @@ dependencies = [ "lazy_static", ] -[[package]] -name = "tracing-futures" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab7bb6f14721aa00656086e9335d363c5c8747bae02ebe32ea2c7dece5689b4c" -dependencies = [ - "pin-project 0.4.27", - "tracing", -] - [[package]] name = "try-lock" version = "0.2.3" @@ -2002,12 +1918,6 @@ dependencies = [ "untrusted", ] -[[package]] -name = "winapi" -version = "0.2.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" - [[package]] name = "winapi" version = "0.3.9" @@ -2018,12 +1928,6 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] -[[package]] -name = "winapi-build" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" - [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -2036,7 +1940,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi 0.3.9", + "winapi", ] [[package]] @@ -2045,16 +1949,6 @@ version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" -[[package]] -name = "ws2_32-sys" -version = "0.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" -dependencies = [ - "winapi 0.2.8", - "winapi-build", -] - [[package]] name = "xmlparser" version = "0.13.3" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index 45388eff..c3208b66 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -31,10 +31,10 @@ rand = "0.7" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" -hyper = "^0.13.6" +hyper = "0.14" url = "2.1" httpdate = "0.3" percent-encoding = "2.1.0" diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 8c28394b..36bbcd50 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -38,4 +38,4 @@ serde = { version = "1.0", default-features = false, features = ["derive", "rc"] futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/garage/server.rs b/src/garage/server.rs index 29740feb..ce90ecab 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -21,13 +21,13 @@ async fn shutdown_signal(send_cancel: watch::Sender) -> Result<(), Error> .await .expect("failed to install CTRL+C signal handler"); info!("Received CTRL+C, shutting down."); - send_cancel.broadcast(true)?; + send_cancel.send(true)?; Ok(()) } async fn wait_from(mut chan: watch::Receiver) -> () { - while let Some(exit_now) = chan.recv().await { - if exit_now { + while !*chan.borrow() { + if chan.changed().await.is_err() { return; } } diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index caeed66c..8f36cf2e 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -33,5 +33,4 @@ serde_bytes = "0.11" async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } - +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/model/block.rs b/src/model/block.rs index 023ed3ab..7185372c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -5,10 +5,9 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use futures::future::*; use futures::select; -use futures::stream::*; use serde::{Deserialize, Serialize}; use tokio::fs; -use tokio::prelude::*; +use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; use garage_util::data::*; @@ -134,7 +133,7 @@ impl BlockManager { let bm2 = self.clone(); let background = self.system.background.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await; + tokio::time::sleep(Duration::from_secs(10 * (i + 1))).await; background.spawn_worker(format!("block resync worker {}", i), move |must_exit| { bm2.resync_loop(must_exit) }); @@ -251,7 +250,7 @@ impl BlockManager { let mut key = u64::to_be_bytes(when).to_vec(); key.extend(hash.as_ref()); self.resync_queue.insert(key, hash.as_ref())?; - self.resync_notify.notify(); + self.resync_notify.notify_waiters(); Ok(()) } @@ -262,7 +261,7 @@ impl BlockManager { while !*must_exit.borrow() { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); - tokio::time::delay_for(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } @@ -283,17 +282,17 @@ impl BlockManager { self.resync_queue.remove(&time_bytes)?; res?; // propagate error to delay main loop } else { - let delay = tokio::time::delay_for(Duration::from_millis(time_msec - now)); + let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { _ = delay.fuse() => (), _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } } else { select! { _ = self.resync_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } Ok(()) @@ -467,8 +466,12 @@ impl BlockManager { // so that we can offload them if necessary and then delete them locally. async move { let mut ls_data_dir = fs::read_dir(path).await?; - while let Some(data_dir_ent) = ls_data_dir.next().await { - let data_dir_ent = data_dir_ent?; + loop { + let data_dir_ent = ls_data_dir.next_entry().await?; + let data_dir_ent = match data_dir_ent { + Some(x) => x, + None => break, + }; let name = data_dir_ent.file_name(); let name = match name.into_string() { Ok(x) => x, diff --git a/src/model/object_table.rs b/src/model/object_table.rs index 99fad3ce..d08bba70 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -195,7 +195,8 @@ impl TableSchema for ObjectTable { fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); - self.background.spawn(async move { + // TODO not cancellable + self.background.spawn_cancellable(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions for v in old_v.versions.iter() { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 841fbfea..19343890 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -110,7 +110,8 @@ impl TableSchema for VersionTable { fn updated(&self, old: Option, new: Option) { let block_ref_table = self.block_ref_table.clone(); - self.background.spawn(async move { + // TODO not cancellable + self.background.spawn_cancellable(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks if new_v.deleted.get() && !old_v.deleted.get() { diff --git a/src/rpc/Cargo.toml b/src/rpc/Cargo.toml index 48f05755..fc066bef 100644 --- a/src/rpc/Cargo.toml +++ b/src/rpc/Cargo.toml @@ -29,13 +29,14 @@ serde_json = "1.0" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } +tokio-stream = {version = "0.1", features = ["net"] } http = "0.2" -hyper = "0.13" -rustls = "0.17" -tokio-rustls = "0.13" -hyper-rustls = { version = "0.20", default-features = false } +hyper = { version = "0.14", features = ["full"] } +rustls = "0.19" +tokio-rustls = "0.22" +hyper-rustls = { version = "0.22", default-features = false } webpki = "0.21" diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6749478a..6cc3ed2e 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,9 +11,9 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; -use tokio::prelude::*; use tokio::sync::watch; use tokio::sync::Mutex; +use tokio::io::AsyncWriteExt; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -395,7 +395,7 @@ impl System { if has_changes { status.recalculate_hash(); } - if let Err(e) = update_locked.0.broadcast(Arc::new(status)) { + if let Err(e) = update_locked.0.send(Arc::new(status)) { error!("In ping_nodes: could not save status update ({})", e); } drop(update_locked); @@ -421,7 +421,7 @@ impl System { let status_hash = status.hash; let config_version = self.ring.borrow().config.version; - update_locked.0.broadcast(Arc::new(status))?; + update_locked.0.send(Arc::new(status))?; drop(update_locked); if is_new || status_hash != ping.status_hash { @@ -503,7 +503,7 @@ impl System { if has_changed { status.recalculate_hash(); } - update_lock.0.broadcast(Arc::new(status))?; + update_lock.0.send(Arc::new(status))?; drop(update_lock); if to_ping.len() > 0 { @@ -523,7 +523,7 @@ impl System { if adv.version > ring.config.version { let ring = Ring::new(adv.clone()); - update_lock.1.broadcast(Arc::new(ring))?; + update_lock.1.send(Arc::new(ring))?; drop(update_lock); self.background.spawn_cancellable( @@ -531,7 +531,7 @@ impl System { .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .map(Ok), ); - self.background.spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_network_config()).await; } Ok(Message::Ok) @@ -539,7 +539,7 @@ impl System { async fn ping_loop(self: Arc, mut stop_signal: watch::Receiver) { loop { - let restart_at = tokio::time::delay_for(PING_INTERVAL); + let restart_at = tokio::time::sleep(PING_INTERVAL); let status = self.status.borrow().clone(); let ping_addrs = status @@ -553,10 +553,9 @@ impl System { select! { _ = restart_at.fuse() => (), - must_exit = stop_signal.recv().fuse() => { - match must_exit { - None | Some(true) => return, - _ => (), + _ = stop_signal.changed().fuse() => { + if *stop_signal.borrow() { + return; } } } @@ -570,7 +569,7 @@ impl System { consul_service_name: String, ) { loop { - let restart_at = tokio::time::delay_for(CONSUL_INTERVAL); + let restart_at = tokio::time::sleep(CONSUL_INTERVAL); match get_consul_nodes(&consul_host, &consul_service_name).await { Ok(mut node_list) => { @@ -584,10 +583,9 @@ impl System { select! { _ = restart_at.fuse() => (), - must_exit = stop_signal.recv().fuse() => { - match must_exit { - None | Some(true) => return, - _ => (), + _ = stop_signal.changed().fuse() => { + if *stop_signal.borrow() { + return; } } } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index cffcf106..60286256 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -198,7 +198,7 @@ impl RpcClient { let wait_finished_fut = tokio::spawn(async move { resp_stream.collect::>().await; }); - self.background.spawn(wait_finished_fut.map(|_| Ok(()))); + self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await; } Ok(results) diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 4d14b790..3c5014c4 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -13,6 +13,7 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; +use tokio_stream::wrappers::TcpListenerStream; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; @@ -171,8 +172,8 @@ impl RpcServer { config.set_single_cert([&node_certs[..], &ca_certs[..]].concat(), node_key)?; let tls_acceptor = Arc::new(TlsAcceptor::from(Arc::new(config))); - let mut listener = TcpListener::bind(&self.bind_addr).await?; - let incoming = listener.incoming().filter_map(|socket| async { + let listener = TcpListener::bind(&self.bind_addr).await?; + let incoming = TcpListenerStream::new(listener).filter_map(|socket| async { match socket { Ok(stream) => match tls_acceptor.clone().accept(stream).await { Ok(x) => Some(Ok::<_, hyper::Error>(x)), diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index 6b3aaceb..8f73470e 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -31,5 +31,5 @@ serde_bytes = "0.11" async-trait = "0.1.30" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } diff --git a/src/table/data.rs b/src/table/data.rs index 0a7b2cec..0029b936 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -159,7 +159,7 @@ where if let Some((old_entry, new_entry, new_bytes_hash)) = changed { let is_tombstone = new_entry.is_tombstone(); self.instance.updated(old_entry, Some(new_entry)); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); if is_tombstone { self.gc_todo.insert(&tree_key, new_bytes_hash.as_slice())?; } @@ -184,7 +184,7 @@ where if removed { let old_entry = self.decode_entry(v)?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); } Ok(removed) } @@ -209,7 +209,7 @@ where if let Some(old_v) = removed { let old_entry = self.decode_entry(&old_v[..])?; self.instance.updated(Some(old_entry), None); - self.merkle_updater.todo_notify.notify(); + self.merkle_updater.todo_notify.notify_one(); Ok(true) } else { Ok(false) diff --git a/src/table/gc.rs b/src/table/gc.rs index fd9a26d1..d37fdf35 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -85,8 +85,8 @@ where } } select! { - _ = tokio::time::delay_for(Duration::from_secs(10)).fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = must_exit.changed().fuse() => (), } } } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 5ce9cee3..86289bf1 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -121,13 +121,13 @@ impl MerkleUpdater { "({}) Error while iterating on Merkle todo tree: {}", self.table_name, e ); - tokio::time::delay_for(Duration::from_secs(10)).await; + tokio::time::sleep(Duration::from_secs(10)).await; } } } else { select! { _ = self.todo_notify.notified().fuse() => (), - _ = must_exit.recv().fuse() => (), + _ = must_exit.changed().fuse() => (), } } } diff --git a/src/table/sync.rs b/src/table/sync.rs index b344eb88..65231cd5 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::{pin_mut, select}; +use futures::{select}; use futures_util::future::*; use futures_util::stream::*; use rand::Rng; @@ -110,7 +110,7 @@ where let s3 = syncer.clone(); tokio::spawn(async move { - tokio::time::delay_for(Duration::from_secs(20)).await; + tokio::time::sleep(Duration::from_secs(20)).await; s3.add_full_sync(); }); @@ -142,23 +142,16 @@ where let mut nothing_to_do_since = Some(Instant::now()); while !*must_exit.borrow() { - let s_ring_recv = ring_recv.recv().fuse(); - let s_busy = busy_rx.recv().fuse(); - let s_must_exit = must_exit.recv().fuse(); - let s_timeout = tokio::time::delay_for(Duration::from_secs(1)).fuse(); - pin_mut!(s_ring_recv, s_busy, s_must_exit, s_timeout); - select! { - new_ring_r = s_ring_recv => { - if let Some(new_ring) = new_ring_r { - if !Arc::ptr_eq(&new_ring, &prev_ring) { - debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); - self.add_full_sync(); - prev_ring = new_ring; - } + _ = ring_recv.changed().fuse() => { + let new_ring = ring_recv.borrow(); + if !Arc::ptr_eq(&new_ring, &prev_ring) { + debug!("({}) Ring changed, adding full sync to syncer todo list", self.data.name); + self.add_full_sync(); + prev_ring = new_ring.clone(); } } - busy_opt = s_busy => { + busy_opt = busy_rx.recv().fuse() => { if let Some(busy) = busy_opt { if busy { nothing_to_do_since = None; @@ -169,12 +162,8 @@ where } } } - must_exit_v = s_must_exit => { - if must_exit_v.unwrap_or(false) { - break; - } - } - _ = s_timeout => { + _ = must_exit.changed().fuse() => (), + _ = tokio::time::sleep(Duration::from_secs(1)).fuse() => { if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) { nothing_to_do_since = None; debug!("({}) Interval passed, adding full sync to syncer todo list", self.data.name); @@ -213,7 +202,7 @@ where } } else { busy_tx.send(false).unwrap(); - tokio::time::delay_for(Duration::from_secs(1)).await; + tokio::time::sleep(Duration::from_secs(1)).await; } } } diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 7bb7cb31..2ae4796c 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -28,14 +28,15 @@ rmp-serde = "0.14.3" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde_json = "1.0" chrono = "0.4" +arc-swap = "1.2" futures = "0.3" futures-util = "0.3" -tokio = { version = "0.2", default-features = false, features = ["rt-core", "rt-threaded", "io-driver", "net", "tcp", "time", "macros", "sync", "signal", "fs"] } +tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } http = "0.2" -hyper = "0.13" -rustls = "0.17" +hyper = "0.14" +rustls = "0.19" webpki = "0.21" roxmltree = "0.11" diff --git a/src/util/background.rs b/src/util/background.rs index 3e600fdf..0ec9779a 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -2,11 +2,9 @@ use core::future::Future; use std::pin::Pin; use std::sync::Mutex; -use futures::future::join_all; -use futures::select; -use futures_util::future::*; +use arc_swap::ArcSwapOption; use std::sync::Arc; -use tokio::sync::{mpsc, watch, Notify}; +use tokio::sync::{mpsc, watch}; use crate::error::Error; @@ -14,12 +12,9 @@ type JobOutput = Result<(), Error>; type Job = Pin + Send>>; pub struct BackgroundRunner { - n_runners: usize, pub stop_signal: watch::Receiver, - queue_in: mpsc::UnboundedSender<(Job, bool)>, - queue_out: Mutex>, - job_notify: Notify, + queue_in: ArcSwapOption>, workers: Mutex>>, } @@ -27,50 +22,91 @@ pub struct BackgroundRunner { impl BackgroundRunner { pub fn new(n_runners: usize, stop_signal: watch::Receiver) -> Arc { let (queue_in, queue_out) = mpsc::unbounded_channel(); + + let mut workers = vec![]; + let queue_out = Arc::new(tokio::sync::Mutex::new(queue_out)); + + for i in 0..n_runners { + let queue_out = queue_out.clone(); + let stop_signal = stop_signal.clone(); + + workers.push(tokio::spawn(async move { + while let Some((job, cancellable)) = queue_out.lock().await.recv().await { + if cancellable && *stop_signal.borrow() { + continue; + } + if let Err(e) = job.await { + error!("Job failed: {}", e) + } + } + info!("Worker {} exiting", i); + })); + } + Arc::new(Self { - n_runners, stop_signal, - queue_in, - queue_out: Mutex::new(queue_out), - job_notify: Notify::new(), - workers: Mutex::new(Vec::new()), + queue_in: ArcSwapOption::new(Some(Arc::new(queue_in))), + workers: Mutex::new(workers), }) } pub async fn run(self: Arc) { - let mut workers = self.workers.lock().unwrap(); - for i in 0..self.n_runners { - workers.push(tokio::spawn(self.clone().runner(i))); - } - drop(workers); - let mut stop_signal = self.stop_signal.clone(); - while let Some(exit_now) = stop_signal.recv().await { + + loop { + let exit_now = match stop_signal.changed().await { + Ok(()) => *stop_signal.borrow(), + Err(e) => { + error!("Watch .changed() error: {}", e); + true + } + }; if exit_now { - let mut workers = self.workers.lock().unwrap(); - let workers_vec = workers.drain(..).collect::>(); - join_all(workers_vec).await; - return; + break; + } + } + + info!("Closing background job queue_in..."); + drop(self.queue_in.swap(None)); + + info!("Waiting for all workers to terminate..."); + while let Some(task) = self.workers.lock().unwrap().pop() { + if let Err(e) = task.await { + warn!("Error awaiting task: {}", e); } } } - pub fn spawn(&self, job: T) + // Spawn a task to be run in background + pub async fn spawn(&self, job: T) where T: Future + Send + 'static, { - let boxed: Job = Box::pin(job); - let _: Result<_, _> = self.queue_in.clone().send((boxed, false)); - self.job_notify.notify(); + match self.queue_in.load().as_ref() { + Some(chan) => { + let boxed: Job = Box::pin(job); + chan.send((boxed, false)).map_err(|_| "send error").unwrap(); + } + None => { + warn!("Doing background job now because we are exiting..."); + if let Err(e) = job.await { + warn!("Task failed: {}", e); + } + } + } } pub fn spawn_cancellable(&self, job: T) where T: Future + Send + 'static, { - let boxed: Job = Box::pin(job); - let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); - self.job_notify.notify(); + match self.queue_in.load().as_ref() { + Some(chan) => { + let boxed: Job = Box::pin(job); + chan.send((boxed, false)).map_err(|_| "send error").unwrap(); + } + None => (), // drop job if we are exiting + } } pub fn spawn_worker(&self, name: String, worker: F) @@ -85,37 +121,4 @@ impl BackgroundRunner { info!("Worker exited: {}", name); })); } - - async fn runner(self: Arc, i: usize) { - let mut stop_signal = self.stop_signal.clone(); - loop { - let must_exit: bool = *stop_signal.borrow(); - if let Some(job) = self.dequeue_job(must_exit) { - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } else { - if must_exit { - info!("Background runner {} exiting", i); - return; - } - select! { - _ = self.job_notify.notified().fuse() => (), - _ = stop_signal.recv().fuse() => (), - } - } - } - } - - fn dequeue_job(&self, must_exit: bool) -> Option { - let mut queue = self.queue_out.lock().unwrap(); - while let Ok((job, cancellable)) = queue.try_recv() { - if cancellable && must_exit { - continue; - } else { - return Some(job); - } - } - None - } } diff --git a/src/util/error.rs b/src/util/error.rs index dbf71ac1..a9bf0824 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -8,16 +8,22 @@ use crate::data::*; pub enum RPCError { #[error(display = "Node is down: {:?}.", _0)] NodeDown(UUID), + #[error(display = "Timeout: {}", _0)] - Timeout(#[error(source)] tokio::time::Elapsed), + Timeout(#[error(source)] tokio::time::error::Elapsed), + #[error(display = "HTTP error: {}", _0)] HTTP(#[error(source)] http::Error), + #[error(display = "Hyper error: {}", _0)] Hyper(#[error(source)] hyper::Error), + #[error(display = "Messagepack encode error: {}", _0)] RMPEncode(#[error(source)] rmp_serde::encode::Error), + #[error(display = "Messagepack decode error: {}", _0)] RMPDecode(#[error(source)] rmp_serde::decode::Error), + #[error(display = "Too many errors: {:?}", _0)] TooManyErrors(Vec), } diff --git a/src/web/Cargo.toml b/src/web/Cargo.toml index 5cc8683c..8c340f6b 100644 --- a/src/web/Cargo.toml +++ b/src/web/Cargo.toml @@ -22,7 +22,7 @@ err-derive = "0.2.3" log = "0.4" futures = "0.3" http = "0.2" -hyper = "0.13" +hyper = "0.14" percent-encoding = "2.1.0" roxmltree = "0.11" idna = "0.2"