From d4de2ffc40fe9d003f12139053ca070eda0b7221 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 2 Dec 2020 13:30:47 +0100 Subject: [PATCH] First commit --- .gitignore | 1 + Cargo.lock | 1409 +++++++++++++++++++++++++++++++++++++++ Cargo.toml | 27 + Makefile | 3 + examples/basalt.rs | 76 +++ examples/fullmesh.rs | 68 ++ rustfmt.toml | 1 + src/conn.rs | 280 ++++++++ src/error.rs | 57 ++ src/lib.rs | 9 + src/message.rs | 18 + src/netapp.rs | 214 ++++++ src/peering/basalt.rs | 475 +++++++++++++ src/peering/fullmesh.rs | 437 ++++++++++++ src/peering/mod.rs | 2 + src/proto.rs | 251 +++++++ src/util.rs | 14 + 17 files changed, 3342 insertions(+) create mode 100644 .gitignore create mode 100644 Cargo.lock create mode 100644 Cargo.toml create mode 100644 Makefile create mode 100644 examples/basalt.rs create mode 100644 examples/fullmesh.rs create mode 100644 rustfmt.toml create mode 100644 src/conn.rs create mode 100644 src/error.rs create mode 100644 src/lib.rs create mode 100644 src/message.rs create mode 100644 src/netapp.rs create mode 100644 src/peering/basalt.rs create mode 100644 src/peering/fullmesh.rs create mode 100644 src/peering/mod.rs create mode 100644 src/proto.rs create mode 100644 src/util.rs diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..2f7896d --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +target/ diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..3dac6d0 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1409 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +[[package]] +name = "adler32" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" + +[[package]] +name = "ahash" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6789e291be47ace86a60303502173d84af8327e3627ecf334356ee0f87a164c" + +[[package]] +name = "aho-corasick" +version = "0.7.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7404febffaa47dac81aa44dba71523c9d069b1bdc50a77db41195149e17f68e5" +dependencies = [ + "memchr", +] + +[[package]] +name = "arc-swap" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0688b520bcc7498f6ca8fa006e8031d353e3fd4f51bd4a50fb03cc4230b28bd2" + +[[package]] +name = "async-attributes" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd3d156917d94862e779f356c5acae312b08fd3121e792c857d7928c8088423" +dependencies = [ + "quote", + "syn", +] + +[[package]] +name = "async-channel" +version = "1.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "59740d83946db6a5af71ae25ddf9562c2b176b2ca42cf99a455f09f4a220d6b9" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + +[[package]] +name = "async-executor" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eb877970c7b440ead138f6321a3b5395d6061183af779340b65e20c0fede9146" +dependencies = [ + "async-task", + "concurrent-queue", + "fastrand", + "futures-lite", + "once_cell", + "vec-arena", +] + +[[package]] +name = "async-global-executor" +version = "1.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73079b49cd26b8fd5a15f68fc7707fc78698dc2a3d61430f2a7a9430230dfa04" +dependencies = [ + "async-executor", + "async-io", + "futures-lite", + "num_cpus", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "40a0b2bb8ae20fede194e779150fe283f65a4a08461b496de546ec366b174ad9" +dependencies = [ + "concurrent-queue", + "fastrand", + "futures-lite", + "libc", + "log", + "nb-connect", + "once_cell", + "parking", + "polling", + "vec-arena", + "waker-fn", + "winapi 0.3.9", +] + +[[package]] +name = "async-mutex" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "479db852db25d9dbf6204e6cb6253698f175c15726470f78af0d918e99d6156e" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7e82538bc65a25dbdff70e4c5439d52f068048ab97cdea0acd73f131594caa1" +dependencies = [ + "async-attributes", + "async-global-executor", + "async-io", + "async-mutex", + "blocking", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "num_cpus", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + +[[package]] +name = "async-task" +version = "4.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91831deabf0d6d7ec49552e489aed63b7456a7a3c46cff62adad428110b0af0" + +[[package]] +name = "async-trait" +version = "0.1.41" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b246867b8b3b6ae56035f1eb1ed557c1d8eae97f0d53696138a50fa0e3a3b8c0" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "atomic-waker" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "065374052e7df7ee4047b1160cca5e1467a12351a40b3da123c870ba0b8eda2a" + +[[package]] +name = "atty" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" +dependencies = [ + "hermit-abi", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "autocfg" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdb031dd78e28731d87d56cc8ffef4a8f36ca26c38fe2de700543e627f8a464a" + +[[package]] +name = "base64" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3441f0f7b02788e948e47f457ca01f1d7e6d92c693bc132c22b087d3141c03ff" + +[[package]] +name = "bitflags" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf1de2fe8c75bc145a2f577add951f8134889b4795d47466a54a5c846d691693" + +[[package]] +name = "blocking" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c5e170dbede1f740736619b776d7251cb1b9095c435c34d8ca9f57fcd2f335e9" +dependencies = [ + "async-channel", + "async-task", + "atomic-waker", + "fastrand", + "futures-lite", + "once_cell", +] + +[[package]] +name = "bumpalo" +version = "3.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e8c087f005730276d1096a652e92a8bacee2e2472bcc9715a74d2bec38b5820" + +[[package]] +name = "byteorder" +version = "1.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c48aae112d48ed9f069b33538ea9e3e90aa263cfa3d1c24309612b1f7472de" + +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + +[[package]] +name = "bytes" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" + +[[package]] +name = "cache-padded" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "631ae5198c9be5e753e5cc215e1bd73c2b466a3565173db433f52bb9d3e66dba" + +[[package]] +name = "cc" +version = "1.0.63" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad9c6140b5a2c7db40ea56eb1821245e5362b44385c05b76288b1a599934ac87" + +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "clap" +version = "2.33.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37e58ac78573c40708d45522f0d80fa2f01cc4f9b4e2bf749807255454312002" +dependencies = [ + "bitflags", + "textwrap", + "unicode-width", +] + +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + +[[package]] +name = "concurrent-queue" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "30ed07550be01594c6026cff2a1d7fe9c8f683caa798e12b68694ac9e88286a3" +dependencies = [ + "cache-padded", +] + +[[package]] +name = "const_fn" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c478836e029dcef17fb47c89023448c64f781a046e0300e257ad8225ae59afab" + +[[package]] +name = "crc32fast" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec91540d98355f690a86367e566ecad2e9e579f230230eb7c21398372be73ea5" +dependencies = [ + "autocfg", + "cfg-if 1.0.0", + "const_fn", + "lazy_static", +] + +[[package]] +name = "env_logger" +version = "0.7.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44533bbbb3bb3c1fa17d9f2e4e38bbbaf8396ba82193c4cb1b6445d711445d36" +dependencies = [ + "atty", + "humantime", + "log", + "regex", + "termcolor", +] + +[[package]] +name = "err-derive" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22deed3a8124cff5fa835713fa105621e43bbdc46690c3a6b68328a012d350d4" +dependencies = [ + "proc-macro-error", + "proc-macro2", + "quote", + "rustversion", + "syn", + "synstructure", +] + +[[package]] +name = "event-listener" +version = "2.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7531096570974c3a9dcf9e4b8e1cede1ec26cf5046219fb3b9d897503b9be59" + +[[package]] +name = "fastrand" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca5faf057445ce5c9d4329e382b2ce7ca38550ef3b73a5348362d5f24e0c7fe3" +dependencies = [ + "instant", +] + +[[package]] +name = "filetime" +version = "0.2.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c122a393ea57648015bf06fbd3d372378992e86b9ff5a7a497b076a28c79efe" +dependencies = [ + "cfg-if 1.0.0", + "libc", + "redox_syscall", + "winapi 0.3.9", +] + +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "fuchsia-cprng" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + +[[package]] +name = "futures" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" + +[[package]] +name = "futures-executor" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" + +[[package]] +name = "futures-lite" +version = "1.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5e6c079abfac3ab269e2927ec048dabc89d009ebfdda6b8ee86624f30c689658" +dependencies = [ + "fastrand", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + +[[package]] +name = "futures-macro" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" + +[[package]] +name = "futures-task" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] + +[[package]] +name = "gloo-timers" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47204a46aaff920a1ea58b11d03dec6f704287d27561724a4631e450654a891f" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "hashbrown" +version = "0.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7afe4a420e3fe79967a00898cc1f4db7c8a49a9333a29f8a4bd76a253d5cd04" +dependencies = [ + "ahash", +] + +[[package]] +name = "heck" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "20564e78d53d2bb135c343b3f47714a56af2061f1c928fdb541dc7b9fdd94205" +dependencies = [ + "unicode-segmentation", +] + +[[package]] +name = "hermit-abi" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" +dependencies = [ + "libc", +] + +[[package]] +name = "hex" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "644f9158b2f133fd50f5fb3242878846d9eb792e445c893805ff0e3824006e35" + +[[package]] +name = "humantime" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df004cfca50ef23c36850aaaa59ad52cc70d0e90243c3c7737a4dd32dc7a3c4f" +dependencies = [ + "quick-error", +] + +[[package]] +name = "instant" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb1fc4429a33e1f80d41dc9fea4d108a88bec1de8053878898ae448a0b52f613" +dependencies = [ + "cfg-if 1.0.0", +] + +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", +] + +[[package]] +name = "js-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca059e81d9486668f12d455a4ea6daa600bd408134cd17e3d3fb5a32d1f016f8" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + +[[package]] +name = "kuska-handshake" +version = "0.1.1" +dependencies = [ + "async-std", + "futures", + "hex", + "log", + "sodiumoxide", + "thiserror", + "tokio", +] + +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + +[[package]] +name = "lazy_static" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" + +[[package]] +name = "libc" +version = "0.2.80" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" + +[[package]] +name = "libflate" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d9135df43b1f5d0e333385cb6e7897ecd1a43d7d11b91ac003f4d2c2d2401fdd" +dependencies = [ + "adler32", + "crc32fast", + "rle-decode-fast", + "take_mut", +] + +[[package]] +name = "libsodium-sys" +version = "0.2.4" +source = "git+https://github.com/Dhole/sodiumoxidez?branch=extra#53c0fb16069309c35010eb568d9ed05f5bd52ce8" +dependencies = [ + "cc", + "libc", + "libflate", + "pkg-config", + "tar", + "vcpkg", +] + +[[package]] +name = "log" +version = "0.4.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fabed175da42fed1fa0746b0ea71f412aa9d35e76e95e59b192c64b9dc2bf8b" +dependencies = [ + "cfg-if 0.1.10", +] + +[[package]] +name = "lru" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be716eb6878ca2263eb5d00a781aa13264a794f519fe6af4fbb2668b2d5441c0" +dependencies = [ + "hashbrown", +] + +[[package]] +name = "memchr" +version = "2.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ee1c47aaa256ecabcaea351eae4a9b01ef39ed810004e298d2511ed284b1525" + +[[package]] +name = "mio" +version = "0.6.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fce347092656428bc8eaf6201042cb551b8d67855af7374542a92a0fbfcac430" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow 0.2.1", + "net2", + "slab", + "winapi 0.2.8", +] + +[[package]] +name = "mio-named-pipes" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" +dependencies = [ + "log", + "mio", + "miow 0.3.6", + "winapi 0.3.9", +] + +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio", +] + +[[package]] +name = "miow" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c1f2f3b1cf331de6896aabf6e9d55dca90356cc9960cca7eaaf408a355ae919" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + +[[package]] +name = "miow" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" +dependencies = [ + "socket2", + "winapi 0.3.9", +] + +[[package]] +name = "nb-connect" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8123a81538e457d44b933a02faf885d3fe8408806b23fa700e8f01c6c3a98998" +dependencies = [ + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "net2" +version = "0.2.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3ebc3ec692ed7c9a255596c67808dee269f64655d8baf7b4f0638e51ba1d6853" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + +[[package]] +name = "netapp" +version = "0.1.0" +dependencies = [ + "arc-swap", + "async-std", + "async-trait", + "base64", + "bytes 0.6.0", + "env_logger", + "err-derive", + "hex", + "kuska-handshake", + "log", + "lru", + "pretty_env_logger", + "rand", + "rmp-serde", + "serde", + "sodiumoxide", + "structopt", + "tokio", +] + +[[package]] +name = "num-traits" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a64b1ec5cda2586e284722486d802acf1f7dbdc623e2bfc57e65ca1cd099290" +dependencies = [ + "autocfg", +] + +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + +[[package]] +name = "once_cell" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" + +[[package]] +name = "parking" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "427c3892f9e783d91cc128285287e70a59e206ca452770ece88a76f7a3eddd72" + +[[package]] +name = "pin-project" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "pin-project-lite" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "pkg-config" +version = "0.3.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3831453b3449ceb48b6d9c7ad7c96d5ea673e9b470a1dc578c2ce6521230884c" + +[[package]] +name = "polling" +version = "2.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2a7bc6b2a29e632e45451c941832803a18cce6781db04de8a04696cdca8bde4" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "log", + "wepoll-sys", + "winapi 0.3.9", +] + +[[package]] +name = "pretty_env_logger" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "926d36b9553851b8b0005f1275891b392ee4d2d833852c417ed025477350fb9d" +dependencies = [ + "env_logger", + "log", +] + +[[package]] +name = "proc-macro-error" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da25490ff9892aab3fcf7c36f08cfb902dd3e71ca0f9f9517bea02a73a5ce38c" +dependencies = [ + "proc-macro-error-attr", + "proc-macro2", + "quote", + "syn", + "version_check", +] + +[[package]] +name = "proc-macro-error-attr" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1be40180e52ecc98ad80b184934baf3d0d29f979574e439af5a55274b35f869" +dependencies = [ + "proc-macro2", + "quote", + "version_check", +] + +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + +[[package]] +name = "proc-macro2" +version = "1.0.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e0704ee1a7e00d7bb417d0770ea303c1bccbabf0ef1667dae92b5967f5f8a71" +dependencies = [ + "unicode-xid", +] + +[[package]] +name = "quick-error" +version = "1.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a1d01941d82fa2ab50be1e79e6714289dd7cde78eba4c074bc5a4374f650dfe0" + +[[package]] +name = "quote" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa563d17ecb180e500da1cfd2b028310ac758de548efdd203e18f283af693f37" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c618c47cd3ebd209790115ab837de41425723956ad3ce2e6a7f09890947cacb9" +dependencies = [ + "cloudabi", + "fuchsia-cprng", + "libc", + "rand_core 0.3.1", + "winapi 0.3.9", +] + +[[package]] +name = "rand_core" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" +dependencies = [ + "rand_core 0.4.2", +] + +[[package]] +name = "rand_core" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" + +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + +[[package]] +name = "regex" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38cf2c13ed4745de91a5eb834e11c00bcc3709e773173b2ce4c56c9fbde04b9c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", + "thread_local", +] + +[[package]] +name = "regex-syntax" +version = "0.6.21" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" + +[[package]] +name = "rle-decode-fast" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" + +[[package]] +name = "rmp" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f10b46df14cf1ee1ac7baa4d2fbc2c52c0622a4b82fa8740e37bc452ac0184f" +dependencies = [ + "byteorder", + "num-traits", +] + +[[package]] +name = "rmp-serde" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4ce7d70c926fe472aed493b902010bccc17fa9f7284145cb8772fd22fdb052d8" +dependencies = [ + "byteorder", + "rmp", + "serde", +] + +[[package]] +name = "rustversion" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb5d2a036dc6d2d8fd16fde3498b04306e29bd193bf306a57427019b823d5acd" + +[[package]] +name = "serde" +version = "1.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b88fa983de7720629c9387e9f517353ed404164b1e482c970a90c1a4aaf7dc1a" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.117" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cbd1ae72adb44aab48f325a02444a5fc079349a8d804c1fc922aed3f7454c74e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "signal-hook-registry" +version = "1.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + +[[package]] +name = "socket2" +version = "0.3.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fd8b795c389288baa5f355489c65e71fd48a02104600d15c4cfbc561e9e429d" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "redox_syscall", + "winapi 0.3.9", +] + +[[package]] +name = "sodiumoxide" +version = "0.2.4" +source = "git+https://github.com/Dhole/sodiumoxidez?branch=extra#53c0fb16069309c35010eb568d9ed05f5bd52ce8" +dependencies = [ + "libc", + "libsodium-sys", + "serde", +] + +[[package]] +name = "structopt" +version = "0.3.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "126d630294ec449fae0b16f964e35bf3c74f940da9dca17ee9b905f7b3112eb8" +dependencies = [ + "clap", + "lazy_static", + "structopt-derive", +] + +[[package]] +name = "structopt-derive" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65e51c492f9e23a220534971ff5afc14037289de430e3c83f9daf6a1b6ae91e8" +dependencies = [ + "heck", + "proc-macro-error", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "syn" +version = "1.0.48" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cc371affeffc477f42a221a1e4297aedcea33d47d19b61455588bd9d8f6b19ac" +dependencies = [ + "proc-macro2", + "quote", + "unicode-xid", +] + +[[package]] +name = "synstructure" +version = "0.12.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b834f2d66f734cb897113e34aaff2f1ab4719ca946f9a7358dba8f8064148701" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "unicode-xid", +] + +[[package]] +name = "take_mut" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" + +[[package]] +name = "tar" +version = "0.4.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "489997b7557e9a43e192c527face4feacc78bfbe6eed67fd55c4c9e381cba290" +dependencies = [ + "filetime", + "libc", + "redox_syscall", + "xattr", +] + +[[package]] +name = "termcolor" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf11676eb135389f21fcda654382c4859bbfc1d2f36e4425a2f829bb41b1e20e" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "textwrap" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d326610f408c7a4eb6f51c37c330e496b08506c9457c9d34287ecc38809fb060" +dependencies = [ + "unicode-width", +] + +[[package]] +name = "thiserror" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e9ae34b84616eedaaf1e9dd6026dbe00dcafa92aa0c8077cb69df1fcfe5e53e" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ba20f23e85b10754cd195504aebf6a27e2e6cbe28c17778a0c930724628dd56" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "thread_local" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d40c6d1b69745a6ec6fb1ca717914848da4b44ae29d9b3080cbee91d72a69b14" +dependencies = [ + "lazy_static", +] + +[[package]] +name = "tokio" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" +dependencies = [ + "bytes 0.5.6", + "fnv", + "futures-core", + "iovec", + "lazy_static", + "libc", + "memchr", + "mio", + "mio-named-pipes", + "mio-uds", + "num_cpus", + "pin-project-lite", + "signal-hook-registry", + "slab", + "tokio-macros", + "winapi 0.3.9", +] + +[[package]] +name = "tokio-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "unicode-segmentation" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "db8716a166f290ff49dabc18b44aa407cb7c6dbe1aa0971b44b8a24b0ca35aae" + +[[package]] +name = "unicode-width" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9337591893a19b88d8d87f2cec1e73fad5cdfd10e5a6f349f498ad6ea2ffb1e3" + +[[package]] +name = "unicode-xid" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" + +[[package]] +name = "vcpkg" +version = "0.2.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" + +[[package]] +name = "vec-arena" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eafc1b9b2dfc6f5529177b62cf806484db55b32dc7c9658a118e11bbeb33061d" + +[[package]] +name = "version_check" +version = "0.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5a972e5669d67ba988ce3dc826706fb0a8b01471c088cb0b6110b805cc36aed" + +[[package]] +name = "waker-fn" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" + +[[package]] +name = "wasm-bindgen" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ac64ead5ea5f05873d7c12b545865ca2b8d28adfc50a49b84770a3a97265d42" +dependencies = [ + "cfg-if 0.1.10", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f22b422e2a757c35a73774860af8e112bff612ce6cb604224e8e47641a9e4f68" +dependencies = [ + "bumpalo", + "lazy_static", + "log", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-futures" +version = "0.4.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7866cab0aa01de1edf8b5d7936938a7e397ee50ce24119aef3e1eaa3b6171da" +dependencies = [ + "cfg-if 0.1.10", + "js-sys", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b13312a745c08c469f0b292dd2fcd6411dba5f7160f593da6ef69b64e407038" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f249f06ef7ee334cc3b8ff031bfc11ec99d00f34d86da7498396dc1e3b1498fe" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.68" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d649a3145108d7d3fbcde896a468d1bd636791823c9921135218ad89be08307" + +[[package]] +name = "web-sys" +version = "0.3.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bf6ef87ad7ae8008e15a355ce696bed26012b7caa21605188cfd8214ab51e2d" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "wepoll-sys" +version = "3.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fcb14dea929042224824779fbc82d9fab8d2e6d3cbc0ac404de8edf489e77ff" +dependencies = [ + "cc", +] + +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" +dependencies = [ + "winapi 0.3.9", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + +[[package]] +name = "xattr" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" +dependencies = [ + "libc", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..9c0eadb --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,27 @@ +[package] +name = "netapp" +version = "0.1.0" +authors = ["Alex Auvolat "] +edition = "2018" + +# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html + +[dependencies] +async-std = { version = "1.5.0", features=["unstable","attributes"] } +tokio = { version = "0.2", features = ["full"] } +kuska-handshake = { path = "../../handshake", features = ["default", "tokio_compat"] } +hex = "0.4.2" +log = "0.4.8" +pretty_env_logger = "0.4" +sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } +env_logger = "0.7.1" +base64 = "0.12.1" +rmp-serde = "0.14.3" +serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +arc-swap = "1.0" +structopt = { version = "0.3", default-features = false } +async-trait = "0.1.7" +err-derive = "0.2.3" +bytes = "0.6.0" +lru = "0.6" +rand = "0.5.5" diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..bf2a928 --- /dev/null +++ b/Makefile @@ -0,0 +1,3 @@ +all: + cargo build + RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7 diff --git a/examples/basalt.rs b/examples/basalt.rs new file mode 100644 index 0000000..e486e08 --- /dev/null +++ b/examples/basalt.rs @@ -0,0 +1,76 @@ +use std::net::SocketAddr; +use std::time::Duration; + +use log::info; + +use structopt::StructOpt; + +use sodiumoxide::crypto::auth; +use sodiumoxide::crypto::sign::ed25519; + +use netapp::netapp::*; +use netapp::peering::basalt::*; + +#[derive(StructOpt, Debug)] +#[structopt(name = "netapp")] +pub struct Opt { + #[structopt(long = "network-key", short = "n")] + network_key: Option, + + #[structopt(long = "private-key", short = "p")] + private_key: Option, + + #[structopt(long = "bootstrap-peer", short = "b")] + bootstrap_peers: Vec, + + #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")] + listen_addr: String, +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let opt = Opt::from_args(); + + let netid = match &opt.network_key { + Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => auth::gen_key(), + }; + info!("Network key: {}", hex::encode(&netid)); + + let privkey = match &opt.private_key { + Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => { + let (_pk, sk) = ed25519::gen_keypair(); + sk + } + }; + + info!("Node private key: {}", hex::encode(&privkey)); + info!("Node public key: {}", hex::encode(&privkey.public_key())); + + let listen_addr = opt.listen_addr.parse().unwrap(); + let netapp = NetApp::new(listen_addr, netid, privkey); + + let mut bootstrap_peers = vec![]; + for peer in opt.bootstrap_peers.iter() { + if let Some(delim) = peer.find('@') { + let (key, ip) = peer.split_at(delim); + let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); + let ip = ip[1..].parse::().unwrap(); + bootstrap_peers.push((pubkey, ip)); + } + } + + let basalt_params = BasaltParams{ + view_size: 100, + cache_size: 1000, + exchange_interval: Duration::from_secs(1), + reset_interval: Duration::from_secs(10), + reset_count: 20, + }; + let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); + + tokio::join!(netapp.listen(), peering.run(),); +} diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs new file mode 100644 index 0000000..8e2ae07 --- /dev/null +++ b/examples/fullmesh.rs @@ -0,0 +1,68 @@ +use std::net::SocketAddr; + +use log::info; + +use structopt::StructOpt; + +use sodiumoxide::crypto::auth; +use sodiumoxide::crypto::sign::ed25519; + +use netapp::netapp::*; +use netapp::peering::fullmesh::*; + +#[derive(StructOpt, Debug)] +#[structopt(name = "netapp")] +pub struct Opt { + #[structopt(long = "network-key", short = "n")] + network_key: Option, + + #[structopt(long = "private-key", short = "p")] + private_key: Option, + + #[structopt(long = "bootstrap-peer", short = "b")] + bootstrap_peers: Vec, + + #[structopt(long = "listen-addr", short = "l", default_value = "127.0.0.1:1980")] + listen_addr: String, +} + +#[tokio::main] +async fn main() { + pretty_env_logger::init(); + + let opt = Opt::from_args(); + + let netid = match &opt.network_key { + Some(k) => auth::Key::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => auth::gen_key(), + }; + info!("Network key: {}", hex::encode(&netid)); + + let privkey = match &opt.private_key { + Some(k) => ed25519::SecretKey::from_slice(&hex::decode(k).unwrap()).unwrap(), + None => { + let (_pk, sk) = ed25519::gen_keypair(); + sk + } + }; + + info!("Node private key: {}", hex::encode(&privkey)); + info!("Node public key: {}", hex::encode(&privkey.public_key())); + + let listen_addr = opt.listen_addr.parse().unwrap(); + let netapp = NetApp::new(listen_addr, netid, privkey); + + let mut bootstrap_peers = vec![]; + for peer in opt.bootstrap_peers.iter() { + if let Some(delim) = peer.find('@') { + let (key, ip) = peer.split_at(delim); + let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); + let ip = ip[1..].parse::().unwrap(); + bootstrap_peers.push((pubkey, ip)); + } + } + + let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers); + + tokio::join!(netapp.listen(), peering.run(),); +} diff --git a/rustfmt.toml b/rustfmt.toml new file mode 100644 index 0000000..218e203 --- /dev/null +++ b/rustfmt.toml @@ -0,0 +1 @@ +hard_tabs = true diff --git a/src/conn.rs b/src/conn.rs new file mode 100644 index 0000000..9b60d2a --- /dev/null +++ b/src/conn.rs @@ -0,0 +1,280 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::sync::atomic::{self, AtomicU16}; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use log::{debug, trace}; + +use sodiumoxide::crypto::sign::ed25519; +use tokio::io::split; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, oneshot, watch}; + +use kuska_handshake::async_std::{ + handshake_client, handshake_server, BoxStream, TokioCompatExt, TokioCompatExtRead, + TokioCompatExtWrite, +}; + +use crate::error::*; +use crate::message::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + +pub struct ServerConn { + netapp: Arc, + pub remote_addr: SocketAddr, + pub peer_pk: ed25519::PublicKey, + resp_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec)>, + close_send: watch::Sender, +} + +impl ServerConn { + pub(crate) async fn run(netapp: Arc, socket: TcpStream) -> Result<(), Error> { + let mut asyncstd_socket = TokioCompatExt::wrap(socket); + let handshake = handshake_server( + &mut asyncstd_socket, + netapp.netid.clone(), + netapp.pubkey.clone(), + netapp.privkey.clone(), + ) + .await?; + let peer_pk = handshake.peer_pk.clone(); + + let tokio_socket = asyncstd_socket.into_inner(); + let remote_addr = tokio_socket.peer_addr().unwrap(); + + debug!( + "Handshake complete (server) with {}@{}", + hex::encode(&peer_pk), + remote_addr + ); + + let (read, write) = split(tokio_socket); + + let read = TokioCompatExtRead::wrap(read); + let write = TokioCompatExtWrite::wrap(write); + + let (box_stream_read, box_stream_write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (resp_send, resp_recv) = mpsc::unbounded_channel(); + + let (close_send, close_recv) = watch::channel(false); + + let conn = Arc::new(ServerConn { + netapp: netapp.clone(), + remote_addr, + peer_pk: peer_pk.clone(), + resp_send, + close_send, + }); + + netapp.connected_as_server(peer_pk.clone(), conn.clone()); + + let conn2 = conn.clone(); + let conn3 = conn.clone(); + tokio::try_join!( + conn2.recv_loop(box_stream_read, close_recv.clone()), + conn3.send_loop(resp_recv, box_stream_write, close_recv.clone()), + ) + .map(|_| ()) + .log_err("ServerConn recv_loop/send_loop"); + + netapp.disconnected_as_server(&peer_pk, conn); + + Ok(()) + } + + pub fn close(&self) { + self.close_send.broadcast(true).unwrap(); + } +} + +impl SendLoop for ServerConn {} + +#[async_trait] +impl RecvLoop for ServerConn { + async fn recv_handler(self: Arc, id: u16, bytes: Vec) { + let bytes: Bytes = bytes.into(); + + let prio = bytes[0]; + + let mut kind_bytes = [0u8; 4]; + kind_bytes.copy_from_slice(&bytes[1..5]); + let kind = u32::from_be_bytes(kind_bytes); + + if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) { + let resp = handler(self.peer_pk.clone(), bytes.slice(5..)).await; + self.resp_send + .send((id, prio, resp)) + .log_err("ServerConn recv_handler send resp"); + } + } +} +pub struct ClientConn { + pub netapp: Arc, + pub remote_addr: SocketAddr, + pub peer_pk: ed25519::PublicKey, + query_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec)>, + next_query_number: AtomicU16, + resp_send: mpsc::UnboundedSender<(RequestID, Vec)>, + resp_notify_send: mpsc::UnboundedSender<(RequestID, oneshot::Sender>)>, + close_send: watch::Sender, +} + +impl ClientConn { + pub(crate) async fn init( + netapp: Arc, + socket: TcpStream, + remote_pk: ed25519::PublicKey, + ) -> Result<(), Error> { + let mut asyncstd_socket = TokioCompatExt::wrap(socket); + + let handshake = handshake_client( + &mut asyncstd_socket, + netapp.netid.clone(), + netapp.pubkey.clone(), + netapp.privkey.clone(), + remote_pk.clone(), + ) + .await?; + + let tokio_socket = asyncstd_socket.into_inner(); + let remote_addr = tokio_socket.peer_addr().unwrap(); + + debug!( + "Handshake complete (client) with {}@{}", + hex::encode(&remote_pk), + remote_addr + ); + + let (read, write) = split(tokio_socket); + + let read = TokioCompatExtRead::wrap(read); + let write = TokioCompatExtWrite::wrap(write); + + let (box_stream_read, box_stream_write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (query_send, query_recv) = mpsc::unbounded_channel(); + let (resp_send, resp_recv) = mpsc::unbounded_channel(); + let (resp_notify_send, resp_notify_recv) = mpsc::unbounded_channel(); + + let (close_send, close_recv) = watch::channel(false); + + let conn = Arc::new(ClientConn { + netapp: netapp.clone(), + remote_addr, + peer_pk: remote_pk.clone(), + next_query_number: AtomicU16::from(0u16), + query_send, + resp_send, + resp_notify_send, + close_send, + }); + + netapp.connected_as_client(remote_pk.clone(), conn.clone()); + + tokio::spawn(async move { + let conn2 = conn.clone(); + let conn3 = conn.clone(); + let conn4 = conn.clone(); + tokio::try_join!( + conn2.send_loop(query_recv, box_stream_write, close_recv.clone()), + conn3.recv_loop(box_stream_read, close_recv.clone()), + conn4.dispatch_resp(resp_recv, resp_notify_recv, close_recv.clone()), + ) + .map(|_| ()) + .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); + + netapp.disconnected_as_client(&remote_pk, conn); + }); + + Ok(()) + } + + pub fn close(&self) { + self.close_send.broadcast(true).unwrap(); + } + + async fn dispatch_resp( + self: Arc, + mut resp_recv: mpsc::UnboundedReceiver<(RequestID, Vec)>, + mut resp_notify_recv: mpsc::UnboundedReceiver<(RequestID, oneshot::Sender>)>, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + let mut resps: HashMap> = HashMap::new(); + let mut resp_notify: HashMap>> = HashMap::new(); + while !*must_exit.borrow() { + tokio::select! { + resp = resp_recv.recv() => { + if let Some((id, resp)) = resp { + trace!("dispatch_resp: got resp to {}, {} bytes", id, resp.len()); + if let Some(ch) = resp_notify.remove(&id) { + ch.send(resp).map_err(|_| Error::Message("Could not dispatch reply".to_string()))?; + } else { + resps.insert(id, resp); + } + } + } + resp_ch = resp_notify_recv.recv() => { + if let Some((id, resp_ch)) = resp_ch { + trace!("dispatch_resp: got resp_ch {}", id); + if let Some(rs) = resps.remove(&id) { + resp_ch.send(rs).map_err(|_| Error::Message("Could not dispatch reply".to_string()))?; + } else { + resp_notify.insert(id, resp_ch); + } + } + } + exit = must_exit.recv() => { + if exit == Some(true) { + break; + } + } + } + } + Ok(()) + } + + pub async fn request( + self: Arc, + rq: T, + prio: RequestPriority, + ) -> Result<::Response, Error> + where + T: Message, + { + let id = self + .next_query_number + .fetch_add(1u16, atomic::Ordering::Relaxed); + let mut bytes = vec![prio]; + bytes.extend_from_slice(&u32::to_be_bytes(T::KIND)[..]); + bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); + + let (resp_send, resp_recv) = oneshot::channel(); + self.resp_notify_send.send((id, resp_send))?; + + trace!("request: query_send {}, {} bytes", id, bytes.len()); + self.query_send.send((id, prio, bytes))?; + + let resp = resp_recv.await?; + + rmp_serde::decode::from_read_ref::<_, Result<::Response, String>>(&resp[..])? + .map_err(Error::Remote) + } +} + +impl SendLoop for ClientConn {} + +#[async_trait] +impl RecvLoop for ClientConn { + async fn recv_handler(self: Arc, id: RequestID, msg: Vec) { + self.resp_send + .send((id, msg)) + .log_err("ClientConn::recv_handler"); + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..b54423a --- /dev/null +++ b/src/error.rs @@ -0,0 +1,57 @@ +use err_derive::Error; +use std::io; + +use log::error; + +#[derive(Debug, Error)] +pub enum Error { + #[error(display = "IO error: {}", _0)] + Io(#[error(source)] io::Error), + + #[error(display = "Messagepack encode error: {}", _0)] + RMPEncode(#[error(source)] rmp_serde::encode::Error), + #[error(display = "Messagepack decode error: {}", _0)] + RMPDecode(#[error(source)] rmp_serde::decode::Error), + + #[error(display = "Tokio join error: {}", _0)] + TokioJoin(#[error(source)] tokio::task::JoinError), + + #[error(display = "oneshot receive error: {}", _0)] + OneshotRecv(#[error(source)] tokio::sync::oneshot::error::RecvError), + + #[error(display = "Handshake error: {}", _0)] + Handshake(#[error(source)] kuska_handshake::async_std::Error), + + #[error(display = "{}", _0)] + Message(String), + + #[error(display = "Remote error: {}", _0)] + Remote(String), +} + +impl From> for Error { + fn from(_e: tokio::sync::watch::error::SendError) -> Error { + Error::Message(format!("Watch send error")) + } +} + +impl From> for Error { + fn from(_e: tokio::sync::mpsc::error::SendError) -> Error { + Error::Message(format!("MPSC send error")) + } +} + +pub trait LogError { + fn log_err(self, msg: &'static str); +} + +impl LogError for Result<(), E> +where + E: Into, +{ + fn log_err(self, msg: &'static str) { + if let Err(e) = self { + error!("Error: {}: {}", msg, Into::::into(e)); + }; + } +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..2db8a22 --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,9 @@ +#![feature(map_first_last)] + +pub mod conn; +pub mod error; +pub mod message; +pub mod netapp; +pub mod peering; +pub mod proto; +pub mod util; diff --git a/src/message.rs b/src/message.rs new file mode 100644 index 0000000..bcc5aac --- /dev/null +++ b/src/message.rs @@ -0,0 +1,18 @@ +use serde::{Deserialize, Serialize}; + +pub type MessageKind = u32; + +pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { + const KIND: MessageKind; + type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; +} + +#[derive(Serialize, Deserialize)] +pub(crate) struct HelloMessage { + pub server_port: u16, +} + +impl Message for HelloMessage { + const KIND: MessageKind = 0x42000001; + type Response = (); +} diff --git a/src/netapp.rs b/src/netapp.rs new file mode 100644 index 0000000..6f174b4 --- /dev/null +++ b/src/netapp.rs @@ -0,0 +1,214 @@ +use std::collections::HashMap; +use std::net::SocketAddr; +use std::pin::Pin; +use std::sync::{Arc, RwLock}; + +use std::future::Future; + +use log::{debug, info}; + +use arc_swap::{ArcSwap, ArcSwapOption}; +use bytes::Bytes; + +use sodiumoxide::crypto::auth; +use sodiumoxide::crypto::sign::ed25519; +use tokio::net::{TcpListener, TcpStream}; + +use crate::conn::*; +use crate::error::*; +use crate::message::*; +use crate::proto::*; +use crate::util::*; + +pub struct NetApp { + pub listen_addr: SocketAddr, + pub netid: auth::Key, + pub pubkey: ed25519::PublicKey, + pub privkey: ed25519::SecretKey, + pub server_conns: RwLock>>, + pub client_conns: RwLock>>, + pub(crate) msg_handlers: ArcSwap< + HashMap< + MessageKind, + Arc< + dyn Fn( + ed25519::PublicKey, + Bytes, + ) -> Pin> + Sync + Send>> + + Sync + + Send, + >, + >, + >, + pub(crate) on_connected: + ArcSwapOption>, + pub(crate) on_disconnected: ArcSwapOption>, +} + +async fn handler_aux(handler: Arc, remote: ed25519::PublicKey, bytes: Bytes) -> Vec +where + M: Message + 'static, + F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + R: Future::Response, Error>> + Send + Sync, +{ + debug!( + "Handling message of kind {:08x} from {}", + M::KIND, + hex::encode(remote) + ); + let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) { + Ok(msg) => handler(remote.clone(), msg).await, + Err(e) => Err(e.into()), + }; + let res = res.map_err(|e| format!("{}", e)); + rmp_to_vec_all_named(&res).unwrap_or(vec![]) +} + +impl NetApp { + pub fn new( + listen_addr: SocketAddr, + netid: auth::Key, + privkey: ed25519::SecretKey, + ) -> Arc { + let pubkey = privkey.public_key(); + let netapp = Arc::new(Self { + listen_addr, + netid, + pubkey, + privkey, + server_conns: RwLock::new(HashMap::new()), + client_conns: RwLock::new(HashMap::new()), + msg_handlers: ArcSwap::new(Arc::new(HashMap::new())), + on_connected: ArcSwapOption::new(None), + on_disconnected: ArcSwapOption::new(None), + }); + + let netapp2 = netapp.clone(); + netapp.add_msg_handler::( + move |from: ed25519::PublicKey, msg: HelloMessage| { + netapp2.handle_hello_message(from, msg); + async { Ok(()) } + }, + ); + + netapp + } + + pub fn add_msg_handler(&self, handler: F) + where + M: Message + 'static, + F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + R: Future::Response, Error>> + Send + Sync + 'static, + { + let handler = Arc::new(handler); + let fun = Arc::new(move |remote: ed25519::PublicKey, bytes: Bytes| { + let fun: Pin> + Sync + Send>> = + Box::pin(handler_aux(handler.clone(), remote, bytes)); + fun + }); + let mut handlers = self.msg_handlers.load().as_ref().clone(); + handlers.insert(M::KIND, fun); + self.msg_handlers.store(Arc::new(handlers)); + } + + pub async fn listen(self: Arc) { + let mut listener = TcpListener::bind(self.listen_addr).await.unwrap(); + info!("Listening on {}", self.listen_addr); + + loop { + // The second item contains the IP and port of the new connection. + let (socket, _) = listener.accept().await.unwrap(); + info!( + "Incoming connection from {}, negotiating handshake...", + socket.peer_addr().unwrap() + ); + let self2 = self.clone(); + tokio::spawn(async move { + ServerConn::run(self2, socket) + .await + .log_err("ServerConn::run"); + }); + } + } + + pub async fn try_connect( + self: Arc, + ip: SocketAddr, + pk: ed25519::PublicKey, + ) -> Result<(), Error> { + if self.client_conns.read().unwrap().contains_key(&pk) { + return Ok(()); + } + let socket = TcpStream::connect(ip).await?; + info!("Connected to {}, negotiating handshake...", ip); + ClientConn::init(self, socket, pk.clone()).await?; + Ok(()) + } + + pub fn disconnect(self: Arc, id: &ed25519::PublicKey) { + let conn = self.client_conns.read().unwrap().get(id).cloned(); + if let Some(c) = conn { + c.close(); + } + } + + pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc) { + let mut conn_list = self.server_conns.write().unwrap(); + conn_list.insert(id.clone(), conn); + } + + fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { + if let Some(h) = self.on_connected.load().as_ref() { + if let Some(c) = self.server_conns.read().unwrap().get(&id) { + let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port); + h(id, remote_addr, true); + } + } + } + + pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc) { + let mut conn_list = self.server_conns.write().unwrap(); + if let Some(c) = conn_list.get(id) { + if Arc::ptr_eq(c, &conn) { + conn_list.remove(id); + } + + if let Some(h) = self.on_disconnected.load().as_ref() { + h(conn.peer_pk, true); + } + } + } + + pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc) { + { + let mut conn_list = self.client_conns.write().unwrap(); + if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) { + tokio::spawn(async move { old_c.close() }); + } + } + + if let Some(h) = self.on_connected.load().as_ref() { + h(conn.peer_pk, conn.remote_addr, false); + } + + tokio::spawn(async move { + let server_port = conn.netapp.listen_addr.port(); + conn.request(HelloMessage { server_port }, prio::NORMAL) + .await + .log_err("Sending hello message"); + }); + } + + pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc) { + let mut conn_list = self.client_conns.write().unwrap(); + if let Some(c) = conn_list.get(id) { + if Arc::ptr_eq(c, &conn) { + conn_list.remove(id); + } + + if let Some(h) = self.on_disconnected.load().as_ref() { + h(conn.peer_pk, false); + } + } + } +} diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs new file mode 100644 index 0000000..be807a8 --- /dev/null +++ b/src/peering/basalt.rs @@ -0,0 +1,475 @@ +use std::collections::HashSet; +use std::net::SocketAddr; +use std::sync::{Arc, RwLock}; +use std::time::Duration; + +use log::{debug, warn}; +use lru::LruCache; +use rand::{thread_rng, Rng}; +use serde::{Deserialize, Serialize}; + +use sodiumoxide::crypto::hash; +use sodiumoxide::crypto::sign::ed25519; + +use crate::conn::*; +use crate::message::*; +use crate::netapp::*; +use crate::proto::*; + +// -- Protocol messages -- + +#[derive(Serialize, Deserialize)] +struct PullMessage {} + +impl Message for PullMessage { + const KIND: MessageKind = 0x42001100; + type Response = PushMessage; +} + +#[derive(Serialize, Deserialize)] +struct PushMessage { + peers: Vec, +} + +impl Message for PushMessage { + const KIND: MessageKind = 0x42001101; + type Response = (); +} + +// -- Algorithm data structures -- + +type Seed = [u8; 32]; + +#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +struct Peer { + id: ed25519::PublicKey, + addr: SocketAddr, +} + +type Cost = [u8; 40]; +const MAX_COST: Cost = [0xffu8; 40]; + +impl Peer { + fn cost(&self, seed: &Seed) -> Cost { + let mut hasher = hash::State::new(); + hasher.update(&seed[..]); + + let mut cost = [0u8; 40]; + match self.addr { + SocketAddr::V4(v4addr) => { + let v4ip = v4addr.ip().octets(); + + for i in 0..4 { + let mut h = hasher.clone(); + h.update(&v4ip[..i + 1]); + cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + } + } + SocketAddr::V6(v6addr) => { + let v6ip = v6addr.ip().octets(); + + for i in 0..4 { + let mut h = hasher.clone(); + h.update(&v6ip[..i + 2]); + cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + } + } + } + + { + let mut h5 = hasher.clone(); + h5.update(&format!("{}", self.addr).into_bytes()[..]); + cost[32..40].copy_from_slice(&h5.finalize()[..8]); + } + + cost + } +} + +struct BasaltSlot { + seed: Seed, + peer: Option, +} + +impl BasaltSlot { + fn cost(&self) -> Cost { + self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST) + } +} + +struct BasaltView { + i_reset: usize, + slots: Vec, +} + +impl BasaltView { + fn new(size: usize) -> Self { + let slots = (0..size) + .map(|_| BasaltSlot { + seed: rand_seed(), + peer: None, + }) + .collect::>(); + Self { i_reset: 0, slots } + } + + fn current_peers(&self) -> HashSet { + self.slots + .iter() + .filter(|s| s.peer.is_some()) + .map(|s| s.peer.unwrap().clone()) + .collect::>() + } + fn current_peers_vec(&self) -> Vec { + self.current_peers().drain().collect::>() + } + + fn sample(&self, count: usize) -> Vec { + let possibles = self + .slots + .iter() + .enumerate() + .filter(|(_i, s)| s.peer.is_some()) + .map(|(i, _s)| i) + .collect::>(); + if possibles.len() == 0 { + vec![] + } else { + let mut ret = vec![]; + let mut rng = thread_rng(); + for _i in 0..count { + let idx = rng.gen_range(0, possibles.len()); + ret.push(self.slots[possibles[idx]].peer.unwrap()); + } + ret + } + } + + fn update_slot(&mut self, i: usize, peers: &[Peer]) { + let mut slot_cost = self.slots[i].cost(); + + for peer in peers.iter() { + let peer_cost = peer.cost(&self.slots[i].seed); + if self.slots[i].peer.is_none() || peer_cost < slot_cost { + self.slots[i].peer = Some(*peer); + slot_cost = peer_cost; + } + } + } + fn update_all_slots(&mut self, peers: &[Peer]) { + for i in 0..self.slots.len() { + self.update_slot(i, peers); + } + } + + fn disconnected(&mut self, id: ed25519::PublicKey) { + let mut cleared_slots = vec![]; + for i in 0..self.slots.len() { + if let Some(p) = self.slots[i].peer { + if p.id == id { + self.slots[i].peer = None; + cleared_slots.push(i); + } + } + } + + let remaining_peers = self.current_peers_vec(); + + for i in cleared_slots { + self.update_slot(i, &remaining_peers[..]); + } + } + + fn should_try_list(&self, peers: &[Peer]) -> Vec { + // Select peers that have lower cost than any of our slots + let mut ret = HashSet::new(); + + for i in 0..self.slots.len() { + if self.slots[i].peer.is_none() { + return peers.to_vec(); + } + let mut min_cost = self.slots[i].cost(); + let mut min_peer = None; + for peer in peers.iter() { + if ret.contains(peer) { + continue; + } + let peer_cost = peer.cost(&self.slots[i].seed); + if peer_cost < min_cost { + min_cost = peer_cost; + min_peer = Some(*peer); + } + } + if let Some(p) = min_peer { + ret.insert(p); + if ret.len() == peers.len() { + break; + } + } + } + + ret.drain().collect::>() + } + + fn reset_some_slots(&mut self, count: usize) { + for _i in 0..count { + self.slots[self.i_reset].seed = rand_seed(); + self.i_reset = (self.i_reset + 1) % self.slots.len(); + } + } +} + +pub struct BasaltParams { + pub view_size: usize, + pub cache_size: usize, + pub exchange_interval: Duration, + pub reset_interval: Duration, + pub reset_count: usize, +} + +pub struct Basalt { + netapp: Arc, + + param: BasaltParams, + bootstrap_peers: Vec, + + view: RwLock, + current_attempts: RwLock>, + backlog: RwLock>, +} + +impl Basalt { + pub fn new( + netapp: Arc, + bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, + param: BasaltParams, + ) -> Arc { + let bootstrap_peers = bootstrap_list + .iter() + .map(|(id, addr)| Peer { + id: *id, + addr: *addr, + }) + .collect::>(); + + let view = BasaltView::new(param.view_size); + let backlog = LruCache::new(param.cache_size); + + let basalt = Arc::new(Self { + netapp: netapp.clone(), + param, + bootstrap_peers, + view: RwLock::new(view), + current_attempts: RwLock::new(HashSet::new()), + backlog: RwLock::new(backlog), + }); + + let basalt2 = basalt.clone(); + netapp.on_connected.store(Some(Arc::new(Box::new( + move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { + basalt2.on_connected(pk, addr, is_incoming); + }, + )))); + + let basalt2 = basalt.clone(); + netapp.on_disconnected.store(Some(Arc::new(Box::new( + move |pk: ed25519::PublicKey, is_incoming: bool| { + basalt2.on_disconnected(pk, is_incoming); + }, + )))); + + let basalt2 = basalt.clone(); + netapp.add_msg_handler::( + move |_from: ed25519::PublicKey, _pullmsg: PullMessage| { + let push_msg = basalt2.make_push_message(); + async move { Ok(push_msg) } + }, + ); + + let basalt2 = basalt.clone(); + netapp.add_msg_handler::( + move |_from: ed25519::PublicKey, push_msg: PushMessage| { + basalt2.handle_peer_list(&push_msg.peers[..]); + async move { Ok(()) } + }, + ); + + basalt + } + + pub fn sample(&self, count: usize) -> Vec { + self.view + .read() + .unwrap() + .sample(count) + .iter() + .map(|p| p.id) + .collect::>() + } + + pub async fn run(self: Arc) { + for peer in self.bootstrap_peers.iter() { + tokio::spawn(self.clone().try_connect(*peer)); + } + + let pushpull_loop = self.clone().run_pushpull_loop(); + let reset_loop = self.run_reset_loop(); + tokio::join!(pushpull_loop, reset_loop); + } + + async fn run_pushpull_loop(self: Arc) { + loop { + tokio::time::delay_for(self.param.exchange_interval).await; + + let peers = self.view.read().unwrap().sample(2); + if peers.len() == 2 { + let (c1, c2) = { + let client_conns = self.netapp.client_conns.read().unwrap(); + ( + client_conns.get(&peers[0].id).cloned(), + client_conns.get(&peers[1].id).cloned(), + ) + }; + if let Some(c) = c1 { + tokio::spawn(self.clone().do_pull(c)); + } + if let Some(c) = c2 { + tokio::spawn(self.clone().do_push(c)); + } + } + } + } + + async fn do_pull(self: Arc, peer: Arc) { + match peer.request(PullMessage {}, prio::NORMAL).await { + Ok(resp) => { + self.handle_peer_list(&resp.peers[..]); + } + Err(e) => { + warn!("Error during pull exchange: {}", e); + } + }; + } + + async fn do_push(self: Arc, peer: Arc) { + let push_msg = self.make_push_message(); + if let Err(e) = peer.request(push_msg, prio::NORMAL).await { + warn!("Error during push exchange: {}", e); + } + } + + fn make_push_message(&self) -> PushMessage { + let current_peers = self.view.read().unwrap().current_peers_vec(); + PushMessage { + peers: current_peers, + } + } + + async fn run_reset_loop(self: Arc) { + loop { + tokio::time::delay_for(self.param.reset_interval).await; + + { + let mut view = self.view.write().unwrap(); + let prev_peers = view.current_peers(); + let prev_peers_vec = prev_peers.iter().cloned().collect::>(); + + view.reset_some_slots(self.param.reset_count); + view.update_all_slots(&prev_peers_vec[..]); + + let new_peers = view.current_peers(); + drop(view); + + self.close_all_diff(&prev_peers, &new_peers); + } + + let mut to_retry_maybe = self.bootstrap_peers.clone(); + for (peer, _) in self.backlog.read().unwrap().iter() { + if !self.bootstrap_peers.contains(peer) { + to_retry_maybe.push(*peer); + } + } + self.handle_peer_list(&to_retry_maybe[..]); + } + } + + fn handle_peer_list(self: &Arc, peers: &[Peer]) { + let to_connect = self.view.read().unwrap().should_try_list(peers); + + for peer in to_connect.iter() { + tokio::spawn(self.clone().try_connect(*peer)); + } + } + + async fn try_connect(self: Arc, peer: Peer) { + { + let view = self.view.read().unwrap(); + let mut attempts = self.current_attempts.write().unwrap(); + + if view.slots.iter().any(|x| x.peer == Some(peer)) { + return; + } + if attempts.contains(&peer) { + return; + } + + attempts.insert(peer); + } + let res = self.netapp.clone().try_connect(peer.addr, peer.id).await; + debug!("Connection attempt to {}: {:?}", peer.addr, res); + + self.current_attempts.write().unwrap().remove(&peer); + + if res.is_err() { + self.backlog.write().unwrap().pop(&peer); + } + } + + fn on_connected(self: &Arc, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) { + if is_incoming { + self.handle_peer_list(&[Peer{id: pk, addr}][..]); + } else { + let peer = Peer { id: pk, addr }; + + let mut backlog = self.backlog.write().unwrap(); + if backlog.get(&peer).is_none() { + backlog.put(peer, ()); + } + drop(backlog); + + let mut view = self.view.write().unwrap(); + let prev_peers = view.current_peers(); + + view.update_all_slots(&[peer][..]); + + let new_peers = view.current_peers(); + drop(view); + + self.close_all_diff(&prev_peers, &new_peers); + } + } + + fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) { + if !is_incoming { + self.view.write().unwrap().disconnected(pk); + } + } + + fn close_all_diff(&self, prev_peers: &HashSet, new_peers: &HashSet) { + let client_conns = self.netapp.client_conns.read().unwrap(); + for peer in prev_peers.iter() { + if !new_peers.contains(peer) { + if let Some(c) = client_conns.get(&peer.id) { + debug!("Closing connection to {} ({})", hex::encode(peer.id), peer.addr); + c.close(); + } + } + } + } +} + +fn rand_seed() -> Seed { + let mut seed = [0u8; 32]; + sodiumoxide::randombytes::randombytes_into(&mut seed[..]); + seed +} diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs new file mode 100644 index 0000000..e04beb6 --- /dev/null +++ b/src/peering/fullmesh.rs @@ -0,0 +1,437 @@ +use std::collections::{HashMap, VecDeque}; +use std::net::SocketAddr; +use std::sync::atomic::{self, AtomicU64}; +use std::sync::{Arc, RwLock}; +use std::time::{Duration, Instant}; + +use log::{debug, info, trace, warn}; +use serde::{Deserialize, Serialize}; + +use sodiumoxide::crypto::hash; +use sodiumoxide::crypto::sign::ed25519; + +use crate::conn::*; +use crate::message::*; +use crate::netapp::*; +use crate::proto::*; + +const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); +const CONN_MAX_RETRIES: usize = 10; +const PING_INTERVAL: Duration = Duration::from_secs(10); +const LOOP_DELAY: Duration = Duration::from_secs(1); + +// -- Protocol messages -- + +#[derive(Serialize, Deserialize)] +struct PingMessage { + pub id: u64, + pub peer_list_hash: hash::Digest, +} + +impl Message for PingMessage { + const KIND: MessageKind = 0x42001000; + type Response = PingMessage; +} + +#[derive(Serialize, Deserialize)] +struct PeerListMessage { + pub list: Vec<(ed25519::PublicKey, SocketAddr)>, +} + +impl Message for PeerListMessage { + const KIND: MessageKind = 0x42001001; + type Response = PeerListMessage; +} + +// -- Algorithm data structures -- + +#[derive(Debug)] +struct PeerInfo { + addr: SocketAddr, + state: PeerConnState, + last_seen: Option, + ping: VecDeque, +} + +#[derive(Copy, Clone, Debug)] +pub struct PeerInfoPub { + pub id: ed25519::PublicKey, + pub addr: SocketAddr, + pub state: PeerConnState, + pub last_seen: Option, + pub avg_ping: Option, + pub max_ping: Option, + pub med_ping: Option, +} + +// PeerConnState: possible states for our tentative connections to given peer +// This module is only interested in recording connection info for outgoing +// TCP connections +#[derive(Copy, Clone, Debug, PartialEq)] +pub enum PeerConnState { + // This entry represents ourself + Ourself, + + // We currently have a connection to this peer + Connected, + + // Our next connection tentative (the nth, where n is the first value) + // will be at given Instant + Waiting(usize, Instant), + + // A connection tentative is in progress + Trying(usize), + + // We abandonned trying to connect to this peer (too many failed attempts) + Abandonned, +} + +struct KnownHosts { + list: HashMap, + hash: hash::Digest, +} + +impl KnownHosts { + fn new() -> Self { + let list = HashMap::new(); + let hash = Self::calculate_hash(&list); + Self { list, hash } + } + fn update_hash(&mut self) { + self.hash = Self::calculate_hash(&self.list); + } + fn map_into_vec( + input: &HashMap, + ) -> Vec<(ed25519::PublicKey, SocketAddr)> { + let mut list = Vec::with_capacity(input.len()); + for (id, peer) in input.iter() { + if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { + list.push((id.clone(), peer.addr)); + } + } + list + } + fn calculate_hash(input: &HashMap) -> hash::Digest { + let mut list = Self::map_into_vec(input); + list.sort(); + let mut hash_state = hash::State::new(); + for (id, addr) in list { + hash_state.update(&id[..]); + hash_state.update(&format!("{}", addr).into_bytes()[..]); + } + hash_state.finalize() + } +} + +pub struct FullMeshPeeringStrategy { + netapp: Arc, + known_hosts: RwLock, + next_ping_id: AtomicU64, +} + +impl FullMeshPeeringStrategy { + pub fn new( + netapp: Arc, + bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, + ) -> Arc { + let mut known_hosts = KnownHosts::new(); + for (pk, addr) in bootstrap_list { + if pk != netapp.pubkey { + known_hosts.list.insert( + pk, + PeerInfo { + addr: addr, + state: PeerConnState::Waiting(0, Instant::now()), + last_seen: None, + ping: VecDeque::new(), + }, + ); + } + } + + let strat = Arc::new(Self { + netapp: netapp.clone(), + known_hosts: RwLock::new(known_hosts), + next_ping_id: AtomicU64::new(42), + }); + + let strat2 = strat.clone(); + netapp.add_msg_handler::( + move |from: ed25519::PublicKey, ping: PingMessage| { + let ping_resp = PingMessage { + id: ping.id, + peer_list_hash: strat2.known_hosts.read().unwrap().hash, + }; + async move { + debug!("Ping from {}", hex::encode(&from)); + Ok(ping_resp) + } + }, + ); + + let strat2 = strat.clone(); + netapp.add_msg_handler::( + move |_from: ed25519::PublicKey, peer_list: PeerListMessage| { + strat2.handle_peer_list(&peer_list.list[..]); + let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); + let resp = PeerListMessage { list: peer_list }; + async move { Ok(resp) } + }, + ); + + let strat2 = strat.clone(); + netapp.on_connected.store(Some(Arc::new(Box::new( + move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { + let strat2 = strat2.clone(); + tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); + }, + )))); + + let strat2 = strat.clone(); + netapp.on_disconnected.store(Some(Arc::new(Box::new( + move |pk: ed25519::PublicKey, is_incoming: bool| { + let strat2 = strat2.clone(); + tokio::spawn(strat2.on_disconnected(pk, is_incoming)); + }, + )))); + + strat + } + + pub async fn run(self: Arc) { + loop { + // 1. Read current state: get list of connected peers (ping them) + let known_hosts = self.known_hosts.read().unwrap(); + debug!("known_hosts: {} peers", known_hosts.list.len()); + + let mut to_ping = vec![]; + let mut to_retry = vec![]; + for (id, info) in known_hosts.list.iter() { + debug!("{}, {:?}", hex::encode(id), info); + match info.state { + PeerConnState::Connected => { + let must_ping = match info.last_seen { + None => true, + Some(t) => Instant::now() - t > PING_INTERVAL, + }; + if must_ping { + to_ping.push(id.clone()); + } + } + PeerConnState::Waiting(_, t) => { + if Instant::now() >= t { + to_retry.push(id.clone()); + } + } + _ => (), + } + } + drop(known_hosts); + + // 2. Dispatch ping to hosts + trace!("to_ping: {} peers", to_retry.len()); + for id in to_ping { + tokio::spawn(self.clone().ping(id)); + } + + // 3. Try reconnects + trace!("to_retry: {} peers", to_retry.len()); + if !to_retry.is_empty() { + let mut known_hosts = self.known_hosts.write().unwrap(); + for id in to_retry { + if let Some(h) = known_hosts.list.get_mut(&id) { + if let PeerConnState::Waiting(i, _) = h.state { + info!( + "Retrying connection to {} at {} ({})", + hex::encode(&id), + h.addr, + i + 1 + ); + h.state = PeerConnState::Trying(i); + tokio::spawn(self.clone().try_connect(id, h.addr.clone())); + } + } + } + } + + // 4. Sleep before next loop iteration + tokio::time::delay_for(LOOP_DELAY).await; + } + } + + async fn ping(self: Arc, id: ed25519::PublicKey) { + let peer = { + match self.netapp.client_conns.read().unwrap().get(&id) { + None => { + warn!("Should ping {}, but no connection", hex::encode(id)); + return; + } + Some(peer) => peer.clone(), + } + }; + + let peer_list_hash = self.known_hosts.read().unwrap().hash; + let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); + let ping_time = Instant::now(); + let ping_msg = PingMessage { + id: ping_id, + peer_list_hash, + }; + + debug!( + "Sending ping {} to {} at {:?}", + ping_id, + hex::encode(id), + ping_time + ); + match peer.clone().request(ping_msg, prio::HIGH).await { + Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), + Ok(ping_resp) => { + let resp_time = Instant::now(); + debug!( + "Got ping response from {} at {:?}", + hex::encode(id), + resp_time + ); + { + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.last_seen = Some(resp_time); + host.ping.push_back(resp_time - ping_time); + while host.ping.len() > 10 { + host.ping.pop_front(); + } + } + } + if ping_resp.peer_list_hash != peer_list_hash { + self.exchange_peers(peer).await; + } + } + } + } + + async fn exchange_peers(self: Arc, peer: Arc) { + let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); + let pex_message = PeerListMessage { list: peer_list }; + match peer.request(pex_message, prio::BACKGROUND).await { + Err(e) => warn!("Error doing peer exchange: {}", e), + Ok(resp) => { + self.handle_peer_list(&resp.list[..]); + } + } + } + + fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) { + let mut known_hosts = self.known_hosts.write().unwrap(); + for (id, addr) in list.iter() { + if !known_hosts.list.contains_key(id) { + known_hosts.list.insert(*id, self.new_peer(id, *addr)); + } + } + } + + async fn try_connect(self: Arc, id: ed25519::PublicKey, addr: SocketAddr) { + let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await; + if let Err(e) = conn_result { + warn!("Error connecting to {}: {}", hex::encode(id), e); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&id) { + host.state = match host.state { + PeerConnState::Trying(i) => { + if i >= CONN_MAX_RETRIES { + PeerConnState::Abandonned + } else { + PeerConnState::Waiting(i + 1, Instant::now() + CONN_RETRY_INTERVAL) + } + } + _ => PeerConnState::Waiting(0, Instant::now() + CONN_RETRY_INTERVAL), + }; + } + } + } + + async fn on_connected( + self: Arc, + pk: ed25519::PublicKey, + addr: SocketAddr, + is_incoming: bool, + ) { + if is_incoming { + if !self.known_hosts.read().unwrap().list.contains_key(&pk) { + self.known_hosts + .write() + .unwrap() + .list + .insert(pk, self.new_peer(&pk, addr)); + } + } else { + info!("Successfully connected to {} at {}", hex::encode(&pk), addr); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&pk) { + host.state = PeerConnState::Connected; + known_hosts.update_hash(); + } + } + } + + async fn on_disconnected(self: Arc, pk: ed25519::PublicKey, is_incoming: bool) { + if !is_incoming { + info!("Connection to {} was closed", hex::encode(pk)); + let mut known_hosts = self.known_hosts.write().unwrap(); + if let Some(host) = known_hosts.list.get_mut(&pk) { + host.state = PeerConnState::Waiting(0, Instant::now()); + known_hosts.update_hash(); + } + } + } + + pub fn get_peer_list(&self) -> Vec { + let known_hosts = self.known_hosts.read().unwrap(); + let mut ret = Vec::with_capacity(known_hosts.list.len()); + for (id, info) in known_hosts.list.iter() { + let mut pings = info.ping.iter().cloned().collect::>(); + pings.sort(); + if pings.len() > 0 { + ret.push(PeerInfoPub { + id: id.clone(), + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: Some( + pings + .iter() + .fold(Duration::from_secs(0), |x, y| x + *y) + .div_f64(pings.len() as f64), + ), + max_ping: pings.last().cloned(), + med_ping: Some(pings[pings.len() / 2]), + }); + } else { + ret.push(PeerInfoPub { + id: id.clone(), + addr: info.addr, + state: info.state, + last_seen: info.last_seen, + avg_ping: None, + max_ping: None, + med_ping: None, + }); + } + } + ret + } + + fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo { + let state = if *id == self.netapp.pubkey { + PeerConnState::Ourself + } else { + PeerConnState::Waiting(0, Instant::now()) + }; + PeerInfo { + addr, + state, + last_seen: None, + ping: VecDeque::new(), + } + } +} diff --git a/src/peering/mod.rs b/src/peering/mod.rs new file mode 100644 index 0000000..beb2e18 --- /dev/null +++ b/src/peering/mod.rs @@ -0,0 +1,2 @@ +pub mod basalt; +pub mod fullmesh; diff --git a/src/proto.rs b/src/proto.rs new file mode 100644 index 0000000..58c914e --- /dev/null +++ b/src/proto.rs @@ -0,0 +1,251 @@ +use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::sync::Arc; + +use log::trace; + +use async_trait::async_trait; + +use async_std::io::prelude::WriteExt; +use async_std::io::ReadExt; + +use tokio::io::{ReadHalf, WriteHalf}; +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch}; + +use crate::error::*; + +use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat}; + +const MAX_CHUNK_SIZE: usize = 0x4000; + +pub mod prio { + pub const HIGH: u8 = 0x20; + pub const NORMAL: u8 = 0x40; + pub const BACKGROUND: u8 = 0x80; + + pub const PRIMARY: u8 = 0x00; + pub const SECONDARY: u8 = 0x01; +} + +pub type RequestID = u16; +pub type RequestPriority = u8; + +struct SendQueueItem { + id: RequestID, + prio: RequestPriority, + data: Vec, + cursor: usize, +} + +struct SendQueue { + items: BTreeMap>, +} + +impl SendQueue { + fn new() -> Self { + Self { + items: BTreeMap::new(), + } + } + fn push(&mut self, item: SendQueueItem) { + let prio = item.prio; + let mut items_at_prio = self + .items + .remove(&prio) + .unwrap_or(VecDeque::with_capacity(4)); + items_at_prio.push_back(item); + self.items.insert(prio, items_at_prio); + } + fn pop(&mut self) -> Option { + match self.items.pop_first() { + None => None, + Some((prio, mut items_at_prio)) => { + let ret = items_at_prio.pop_front(); + if !items_at_prio.is_empty() { + self.items.insert(prio, items_at_prio); + } + ret + } + } + } +} + +#[async_trait] +pub(crate) trait SendLoop: Sync { + async fn send_loop( + self: Arc, + mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec)>, + mut write: BoxStreamWrite>>, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + let mut sending = SendQueue::new(); + while !*must_exit.borrow() { + if let Ok((id, prio, data)) = msg_recv.try_recv() { + trace!("send_loop: got {}, {} bytes", id, data.len()); + sending.push(SendQueueItem { + id, + prio, + data, + cursor: 0, + }); + } else if let Some(mut item) = sending.pop() { + trace!( + "send_loop: sending bytes for {} ({} bytes, {} already sent)", + item.id, + item.data.len(), + item.cursor + ); + let header_id = u16::to_be_bytes(item.id); + if write_all_or_exit(&header_id[..], &mut write, &mut must_exit) + .await? + .is_none() + { + break; + } + + if item.data.len() - item.cursor > MAX_CHUNK_SIZE { + let header_size = u16::to_be_bytes(MAX_CHUNK_SIZE as u16 | 0x8000); + if write_all_or_exit(&header_size[..], &mut write, &mut must_exit) + .await? + .is_none() + { + break; + } + + let new_cursor = item.cursor + MAX_CHUNK_SIZE as usize; + if write_all_or_exit( + &item.data[item.cursor..new_cursor], + &mut write, + &mut must_exit, + ) + .await? + .is_none() + { + break; + } + item.cursor = new_cursor; + + sending.push(item); + } else { + let send_len = (item.data.len() - item.cursor) as u16; + + let header_size = u16::to_be_bytes(send_len); + if write_all_or_exit(&header_size[..], &mut write, &mut must_exit) + .await? + .is_none() + { + break; + } + + if write_all_or_exit(&item.data[item.cursor..], &mut write, &mut must_exit) + .await? + .is_none() + { + break; + } + } + write.flush().await.log_err("Could not flush in send_loop"); + } else { + let (id, prio, data) = msg_recv + .recv() + .await + .ok_or(Error::Message("Connection closed.".into()))?; + trace!("send_loop: got {}, {} bytes", id, data.len()); + sending.push(SendQueueItem { + id, + prio, + data, + cursor: 0, + }); + } + } + Ok(()) + } +} + +#[async_trait] +pub(crate) trait RecvLoop: Sync + 'static { + async fn recv_handler(self: Arc, id: RequestID, msg: Vec); + + async fn recv_loop( + self: Arc, + mut read: BoxStreamRead>>, + mut must_exit: watch::Receiver, + ) -> Result<(), Error> { + let mut receiving = HashMap::new(); + while !*must_exit.borrow() { + trace!("recv_loop: reading packet"); + let mut header_id = [0u8; 2]; + if read_exact_or_exit(&mut header_id[..], &mut read, &mut must_exit) + .await? + .is_none() + { + break; + } + let id = RequestID::from_be_bytes(header_id); + trace!("recv_loop: got header id: {:04x}", id); + + let mut header_size = [0u8; 2]; + if read_exact_or_exit(&mut header_size[..], &mut read, &mut must_exit) + .await? + .is_none() + { + break; + } + let size = RequestID::from_be_bytes(header_size); + trace!("recv_loop: got header size: {:04x}", id); + + let has_cont = (size & 0x8000) != 0; + let size = size & !0x8000; + + let mut next_slice = vec![0; size as usize]; + if read_exact_or_exit(&mut next_slice[..], &mut read, &mut must_exit) + .await? + .is_none() + { + break; + } + trace!("recv_loop: read {} bytes", size); + + let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]); + msg_bytes.extend_from_slice(&next_slice[..]); + + if has_cont { + receiving.insert(id, msg_bytes); + } else { + tokio::spawn(self.clone().recv_handler(id, msg_bytes)); + } + } + Ok(()) + } +} + +async fn read_exact_or_exit( + buf: &mut [u8], + read: &mut BoxStreamRead>>, + must_exit: &mut watch::Receiver, +) -> Result, Error> { + tokio::select!( + res = read.read_exact(buf) => Ok(Some(res?)), + _ = await_exit(must_exit) => Ok(None), + ) +} + +async fn write_all_or_exit( + buf: &[u8], + write: &mut BoxStreamWrite>>, + must_exit: &mut watch::Receiver, +) -> Result, Error> { + tokio::select!( + res = write.write_all(buf) => Ok(Some(res?)), + _ = await_exit(must_exit) => Ok(None), + ) +} + +async fn await_exit(must_exit: &mut watch::Receiver) { + loop { + if must_exit.recv().await == Some(true) { + return; + } + } +} diff --git a/src/util.rs b/src/util.rs new file mode 100644 index 0000000..e83822e --- /dev/null +++ b/src/util.rs @@ -0,0 +1,14 @@ +use serde::Serialize; + +// util +pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> +where + T: Serialize + ?Sized, +{ + let mut wr = Vec::with_capacity(128); + let mut se = rmp_serde::Serializer::new(&mut wr) + .with_struct_map() + .with_string_variants(); + val.serialize(&mut se)?; + Ok(wr) +}