diff --git a/Cargo.lock b/Cargo.lock index 3d86fc5..4692977 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.21.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "ahash" version = "0.7.6" @@ -45,7 +60,7 @@ checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.99", ] [[package]] @@ -56,7 +71,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8" dependencies = [ "hermit-abi", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -65,6 +80,21 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" +[[package]] +name = "backtrace" +version = "0.3.69" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837" +dependencies = [ + "addr2line", + "cc", + "cfg-if 1.0.0", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bitflags" version = "1.3.2" @@ -83,6 +113,16 @@ version = "1.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" +[[package]] +name = "bytes" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c" +dependencies = [ + "byteorder", + "iovec", +] + [[package]] name = "bytes" version = "1.2.1" @@ -95,6 +135,12 @@ version = "1.0.73" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" +[[package]] +name = "cfg-if" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822" + [[package]] name = "cfg-if" version = "1.0.0" @@ -113,7 +159,7 @@ dependencies = [ "num-traits", "time", "wasm-bindgen", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -127,6 +173,15 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "cloudabi" +version = "0.0.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f" +dependencies = [ + "bitflags", +] + [[package]] name = "core-foundation-sys" version = "0.8.3" @@ -139,8 +194,19 @@ version = "0.5.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" dependencies = [ - "cfg-if", - "crossbeam-utils", + "cfg-if 1.0.0", + "crossbeam-utils 0.8.11", +] + +[[package]] +name = "crossbeam-utils" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8" +dependencies = [ + "autocfg", + "cfg-if 0.1.10", + "lazy_static", ] [[package]] @@ -149,7 +215,7 @@ version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "once_cell", ] @@ -176,10 +242,38 @@ dependencies = [ "proc-macro2", "quote", "rustversion", - "syn", + "syn 1.0.99", "synstructure", ] +[[package]] +name = "fnv" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" + +[[package]] +name = "fuchsia-zircon" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + +[[package]] +name = "futures" +version = "0.1.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" + [[package]] name = "futures" version = "0.3.24" @@ -236,7 +330,7 @@ checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.99", ] [[package]] @@ -275,11 +369,17 @@ version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "libc", "wasi 0.11.0+wasi-snapshot-preview1", ] +[[package]] +name = "gimli" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0" + [[package]] name = "hashbrown" version = "0.12.3" @@ -330,7 +430,16 @@ dependencies = [ "js-sys", "once_cell", "wasm-bindgen", - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "iovec" +version = "0.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e" +dependencies = [ + "libc", ] [[package]] @@ -342,13 +451,23 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "kuska-handshake" version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e33da4b69f23c2ece0b3e729d079cebdc2c0206e493e42f510f500ad81c631d5" dependencies = [ - "futures", + "futures 0.3.24", "hex", "kuska-sodiumoxide", "log", @@ -374,9 +493,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646" [[package]] name = "libc" -version = "0.2.132" +version = "0.2.149" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5" +checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" [[package]] name = "libsodium-sys" @@ -390,13 +509,22 @@ dependencies = [ "walkdir", ] +[[package]] +name = "lock_api" +version = "0.3.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75" +dependencies = [ + "scopeguard", +] + [[package]] name = "log" version = "0.4.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", ] [[package]] @@ -408,6 +536,12 @@ dependencies = [ "hashbrown", ] +[[package]] +name = "maybe-uninit" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" + [[package]] name = "memchr" version = "2.5.0" @@ -415,10 +549,38 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" [[package]] -name = "mio" -version = "0.8.4" +name = "miniz_oxide" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf" +checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7" +dependencies = [ + "adler", +] + +[[package]] +name = "mio" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow", + "net2", + "slab", + "winapi 0.2.8", +] + +[[package]] +name = "mio" +version = "0.8.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2" dependencies = [ "libc", "log", @@ -426,18 +588,52 @@ dependencies = [ "windows-sys", ] +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio 0.6.23", +] + +[[package]] +name = "miow" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + +[[package]] +name = "net2" +version = "0.2.39" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + [[package]] name = "netapp" version = "0.10.0" dependencies = [ "arc-swap", "async-trait", - "bytes", - "cfg-if", + "bytes 1.2.1", + "cfg-if 1.0.0", "chrono", "env_logger", "err-derive", - "futures", + "futures 0.3.24", "hex", "kuska-handshake", "kuska-sodiumoxide", @@ -452,6 +648,7 @@ dependencies = [ "structopt", "tokio", "tokio-stream", + "tokio-unix-tcp", "tokio-util", ] @@ -484,6 +681,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.32.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.13.1" @@ -519,6 +725,32 @@ dependencies = [ "opentelemetry", ] +[[package]] +name = "parking_lot" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252" +dependencies = [ + "lock_api", + "parking_lot_core", + "rustc_version", +] + +[[package]] +name = "parking_lot_core" +version = "0.6.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66b810a62be75176a80873726630147a5ca780cd33921e0b5709033e66b0a" +dependencies = [ + "cfg-if 0.1.10", + "cloudabi", + "libc", + "redox_syscall", + "rustc_version", + "smallvec", + "winapi 0.3.9", +] + [[package]] name = "paste" version = "1.0.9" @@ -548,14 +780,14 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.99", ] [[package]] name = "pin-project-lite" -version = "0.2.9" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -584,7 +816,7 @@ dependencies = [ "proc-macro-error-attr", "proc-macro2", "quote", - "syn", + "syn 1.0.99", "version_check", ] @@ -601,18 +833,18 @@ dependencies = [ [[package]] name = "proc-macro2" -version = "1.0.43" +version = "1.0.69" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab" +checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" dependencies = [ "unicode-ident", ] [[package]] name = "quote" -version = "1.0.21" +version = "1.0.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179" +checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" dependencies = [ "proc-macro2", ] @@ -647,6 +879,12 @@ dependencies = [ "getrandom", ] +[[package]] +name = "redox_syscall" +version = "0.1.57" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" + [[package]] name = "regex" version = "1.6.0" @@ -686,6 +924,21 @@ dependencies = [ "serde", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver", +] + [[package]] name = "rustversion" version = "1.0.9" @@ -702,23 +955,44 @@ dependencies = [ ] [[package]] -name = "serde" -version = "1.0.144" +name = "scopeguard" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + +[[package]] +name = "serde" +version = "1.0.189" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" dependencies = [ "serde_derive", ] [[package]] name = "serde_derive" -version = "1.0.144" +version = "1.0.189" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00" +checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.38", ] [[package]] @@ -740,13 +1014,22 @@ dependencies = [ ] [[package]] -name = "socket2" -version = "0.4.7" +name = "smallvec" +version = "0.6.14" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd" +checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" +dependencies = [ + "maybe-uninit", +] + +[[package]] +name = "socket2" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" dependencies = [ "libc", - "winapi", + "windows-sys", ] [[package]] @@ -770,7 +1053,7 @@ dependencies = [ "proc-macro-error", "proc-macro2", "quote", - "syn", + "syn 1.0.99", ] [[package]] @@ -784,6 +1067,17 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "syn" +version = "2.0.38" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "synstructure" version = "0.12.6" @@ -792,7 +1086,7 @@ checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.99", "unicode-xid", ] @@ -831,7 +1125,7 @@ checksum = "c251e90f708e16c49a16f4917dc2131e75222b72edfa9cb7f7c58ae56aae0c09" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.99", ] [[package]] @@ -842,38 +1136,87 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", + "winapi 0.3.9", ] [[package]] name = "tokio" -version = "1.21.0" +version = "1.33.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89797afd69d206ccd11fb0ea560a44bbb87731d020670e79416d442919257d42" +checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" dependencies = [ - "autocfg", - "bytes", + "backtrace", + "bytes 1.2.1", "libc", - "memchr", - "mio", + "mio 0.8.8", "num_cpus", - "once_cell", "pin-project-lite", "signal-hook-registry", "socket2", "tokio-macros", - "winapi", + "windows-sys", +] + +[[package]] +name = "tokio-codec" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "tokio-io", +] + +[[package]] +name = "tokio-executor" +version = "0.1.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", +] + +[[package]] +name = "tokio-io" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "log", ] [[package]] name = "tokio-macros" -version = "1.8.0" +version = "2.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.38", +] + +[[package]] +name = "tokio-reactor" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351" +dependencies = [ + "crossbeam-utils 0.7.2", + "futures 0.1.31", + "lazy_static", + "log", + "mio 0.6.23", + "num_cpus", + "parking_lot", + "slab", + "tokio-executor", + "tokio-io", + "tokio-sync", ] [[package]] @@ -887,13 +1230,53 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-sync" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee" +dependencies = [ + "fnv", + "futures 0.1.31", +] + +[[package]] +name = "tokio-uds" +version = "0.2.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0" +dependencies = [ + "bytes 0.4.12", + "futures 0.1.31", + "iovec", + "libc", + "log", + "mio 0.6.23", + "mio-uds", + "tokio-codec", + "tokio-io", + "tokio-reactor", +] + +[[package]] +name = "tokio-unix-tcp" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "25694660e312eccfaf8cdc29c7d1d9236a81ec14b968a77c5511940a142c30e2" +dependencies = [ + "mio 0.8.8", + "serde", + "tokio", + "tokio-uds", +] + [[package]] name = "tokio-util" version = "0.7.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" dependencies = [ - "bytes", + "bytes 1.2.1", "futures-core", "futures-io", "futures-sink", @@ -938,7 +1321,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" dependencies = [ "same-file", - "winapi", + "winapi 0.3.9", "winapi-util", ] @@ -960,7 +1343,7 @@ version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" dependencies = [ - "cfg-if", + "cfg-if 1.0.0", "wasm-bindgen-macro", ] @@ -975,7 +1358,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 1.0.99", "wasm-bindgen-shared", ] @@ -997,7 +1380,7 @@ checksum = "5be8e654bdd9b79216c2929ab90721aa82faf65c48cdf08bdc4e7f51357b80da" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 1.0.99", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1008,6 +1391,12 @@ version = "0.2.82" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6598dd0bd3c7d51095ff6531a5b23e02acdc81804e30d8f07afb77b7215a140a" +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1018,6 +1407,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1030,7 +1425,7 @@ version = "0.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1041,43 +1436,76 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" [[package]] name = "windows-sys" -version = "0.36.1" +version = "0.48.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2" +checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c" +dependencies = [ + "windows_aarch64_gnullvm", "windows_aarch64_msvc", "windows_i686_gnu", "windows_i686_msvc", "windows_x86_64_gnu", + "windows_x86_64_gnullvm", "windows_x86_64_msvc", ] [[package]] -name = "windows_aarch64_msvc" -version = "0.36.1" +name = "windows_aarch64_gnullvm" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47" +checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" [[package]] name = "windows_i686_gnu" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6" +checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" [[package]] name = "windows_i686_msvc" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024" +checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" [[package]] name = "windows_x86_64_gnu" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1" +checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.48.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc" [[package]] name = "windows_x86_64_msvc" -version = "0.36.1" +version = "0.48.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680" +checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] diff --git a/Cargo.toml b/Cargo.toml index 281fc20..996df29 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -46,6 +46,8 @@ kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] } opentelemetry = { version = "0.17", optional = true } opentelemetry-contrib = { version = "0.9", optional = true } +tokio-unix-tcp = { version = "0.2.0", features = ["serde"] } + [dev-dependencies] env_logger = "0.9" structopt = { version = "0.3", default-features = false } diff --git a/examples/basalt.rs b/examples/basalt.rs index a5a25c3..4a68323 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -9,10 +9,12 @@ use serde::{Deserialize, Serialize}; use structopt::StructOpt; use sodiumoxide::crypto::auth; -use sodiumoxide::crypto::sign::ed25519; +use sodiumoxide::crypto::sign::{ed25519, PublicKey}; use tokio::sync::watch; +use tokio_unix_tcp::NamedSocketAddr; + use netapp::endpoint::*; use netapp::message::*; use netapp::peering::basalt::*; @@ -96,9 +98,13 @@ async fn main() { let netapp = NetApp::new(0u64, netid, privkey); - let mut bootstrap_peers = vec![]; + let mut bootstrap_peers: Vec<(PublicKey, NamedSocketAddr)> = vec![]; for peer in opt.bootstrap_peers.iter() { - bootstrap_peers.push(parse_peer_addr(peer).expect("Invalid peer address")); + bootstrap_peers.push( + parse_peer_addr(peer) + .map(|(node_id, socket_bind_addr)| (node_id, socket_bind_addr.into())) + .expect("Invalid peer address"), + ); } let basalt_params = BasaltParams { diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 5bbde73..ca20dc0 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -1,5 +1,5 @@ use std::io::Write; -use std::net::SocketAddr; +use std::net; use std::sync::Arc; use std::time::Duration; @@ -9,10 +9,13 @@ use futures::{stream, StreamExt}; use log::*; use serde::{Deserialize, Serialize}; use structopt::StructOpt; + use tokio::sync::watch; +use tokio_unix_tcp::NamedSocketAddr; + use sodiumoxide::crypto::auth; -use sodiumoxide::crypto::sign::ed25519; +use sodiumoxide::crypto::sign::{ed25519, PublicKey}; use netapp::endpoint::*; use netapp::message::*; @@ -75,21 +78,25 @@ async fn main() { info!("Node public key: {}", hex::encode(privkey.public_key())); let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); - let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap(); + let listen_addr: net::SocketAddr = opt.listen_addr.parse().unwrap(); info!("Node public address: {:?}", public_addr); info!("Node listen address: {}", listen_addr); let netapp = NetApp::new(0u64, netid.clone(), privkey.clone()); - let mut bootstrap_peers = vec![]; + let mut bootstrap_peers: Vec<(PublicKey, NamedSocketAddr)> = vec![]; for peer in opt.bootstrap_peers.iter() { - bootstrap_peers.push(parse_peer_addr(peer).expect("Invalid peer address")); + bootstrap_peers.push( + parse_peer_addr(peer) + .map(|(node_id, socket_bind_addr)| (node_id, socket_bind_addr.into())) + .expect("Invalid peer address"), + ); } let peering = FullMeshPeeringStrategy::new( netapp.clone(), bootstrap_peers, - public_addr.map(|a| SocketAddr::new(a, listen_addr.port())), + public_addr.map(|a| net::SocketAddr::new(a, listen_addr.port()).into()), ); info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}", @@ -108,7 +115,7 @@ async fn main() { tokio::join!( example.exchange_loop(watch_cancel.clone()), - netapp.listen(listen_addr, public_addr, watch_cancel.clone()), + netapp.listen(listen_addr.into(), public_addr, watch_cancel.clone()), peering.run(watch_cancel), ); } diff --git a/src/client.rs b/src/client.rs index 607dd17..ba5a8f3 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::net::SocketAddr; use std::pin::Pin; use std::sync::atomic::{self, AtomicU32}; use std::sync::{Arc, Mutex}; @@ -13,11 +12,13 @@ use log::{debug, error, trace}; use futures::io::AsyncReadExt; use futures::Stream; use kuska_handshake::async_std::{handshake_client, BoxStream}; -use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, oneshot, watch}; use tokio_util::compat::*; +use tokio_unix_tcp::SocketAddr; +use tokio_unix_tcp::Stream as SocketStream; + #[cfg(feature = "telemetry")] use opentelemetry::{ trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, @@ -47,7 +48,7 @@ pub(crate) struct ClientConn { impl ClientConn { pub(crate) async fn init( netapp: Arc, - socket: TcpStream, + socket: SocketStream, peer_id: NodeID, ) -> Result<(), Error> { let remote_addr = socket.peer_addr()?; diff --git a/src/netapp.rs b/src/netapp.rs index b1ad9db..229b1c4 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -1,5 +1,6 @@ use std::collections::HashMap; -use std::net::{IpAddr, SocketAddr}; +use std::net::{self, IpAddr}; +use std::path::PathBuf; use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; @@ -13,10 +14,11 @@ use sodiumoxide::crypto::sign::ed25519; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; -use tokio::net::{TcpListener, TcpStream}; use tokio::select; use tokio::sync::{mpsc, watch}; +use tokio_unix_tcp::{Listener, NamedSocketAddr, SocketAddr, Stream, UnixSocketAddr}; + use crate::client::*; use crate::endpoint::*; use crate::error::*; @@ -39,16 +41,19 @@ pub(crate) type VersionTag = [u8; 16]; pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005 #[derive(Serialize, Deserialize, Debug)] -pub(crate) struct HelloMessage { - pub server_addr: Option, - pub server_port: u16, +pub(crate) enum HelloMessage { + Tcp { + server_addr: Option, + server_port: u16, + }, + Unix(PathBuf), } impl Message for HelloMessage { type Response = (); } -type OnConnectHandler = Box; +type OnConnectHandler = Box; type OnDisconnectHandler = Box; /// NetApp is the main class that handles incoming and outgoing connections. @@ -81,9 +86,12 @@ pub struct NetApp { on_disconnected_handler: ArcSwapOption, } -struct ListenParams { - listen_addr: SocketAddr, - public_addr: Option, +enum ListenParams { + Tcp { + listen_addr: net::SocketAddr, + public_addr: Option, + }, + Unix(PathBuf), } impl NetApp { @@ -129,7 +137,7 @@ impl NetApp { /// as the peering strategy will need to set this itself. pub fn on_connected(&self, handler: F) where - F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static, + F: Fn(NodeID, NamedSocketAddr, bool) + Sized + Send + Sync + 'static, { self.on_connected_handler .store(Some(Arc::new(Box::new(handler)))); @@ -179,14 +187,18 @@ impl NetApp { /// If this is not called, the NetApp instance remains a passive client. pub async fn listen( self: Arc, - listen_addr: SocketAddr, + listen_addr: NamedSocketAddr, public_addr: Option, mut must_exit: watch::Receiver, ) { - let listen_params = ListenParams { - listen_addr, - public_addr, + let listen_params = match &listen_addr { + NamedSocketAddr::Inet(addr) => ListenParams::Tcp { + listen_addr: *addr, + public_addr, + }, + NamedSocketAddr::Unix(path) => ListenParams::Unix(path.to_path_buf()), }; + if self .listen_params .swap(Some(Arc::new(listen_params))) @@ -195,7 +207,9 @@ impl NetApp { error!("Trying to listen on NetApp but we're already listening!"); } - let listener = TcpListener::bind(listen_addr).await.unwrap(); + let listener = Listener::bind_and_prepare_unix(&listen_addr, true, None) + .await + .unwrap(); info!("Listening on {}", listen_addr); let (conn_in, mut conn_out) = mpsc::unbounded_channel(); @@ -276,18 +290,22 @@ impl NetApp { self.on_disconnected_handler.store(None); } - /// Attempt to connect to a peer, given by its ip:port and its public key. - /// The public key will be checked during the secret handshake process. - /// This function returns once the connection has been established and a - /// successfull handshake was made. At this point we can send messages to - /// the other node with `Netapp::request` - pub async fn try_connect(self: Arc, ip: SocketAddr, id: NodeID) -> Result<(), Error> { + /// Attempt to connect to a peer, given by its ip:port / unix socket path + /// and its public key. The public key will be checked during the secret + /// handshake process. This function returns once the connection has been + /// established and a successful handshake was made. At this point we can + /// send messages to the other node with `Netapp::request` + pub async fn try_connect( + self: Arc, + addr: NamedSocketAddr, + id: NodeID, + ) -> Result<(), Error> { // Don't connect to ourself, we don't care // but pretend we did if id == self.id { tokio::spawn(async move { if let Some(h) = self.on_connected_handler.load().as_ref() { - h(id, ip, false); + h(id, addr, false); } }); return Ok(()); @@ -298,8 +316,8 @@ impl NetApp { return Ok(()); } - let socket = TcpStream::connect(ip).await?; - info!("Connected to {}, negotiating handshake...", ip); + let socket = Stream::connect(&addr).await?; + info!("Connected to {}, negotiating handshake...", addr); ClientConn::init(self, socket, id).await?; Ok(()) } @@ -392,23 +410,29 @@ impl NetApp { } if let Some(h) = self.on_connected_handler.load().as_ref() { - h(conn.peer_id, conn.remote_addr, false); + h( + conn.peer_id, + conn.remote_addr.clone().to_named_socket_addr().unwrap(), + false, + ); } if let Some(lp) = self.listen_params.load_full() { - let server_addr = lp.public_addr; - let server_port = lp.listen_addr.port(); + let hello_message: HelloMessage = match lp.as_ref() { + ListenParams::Tcp { + listen_addr, + public_addr, + } => HelloMessage::Tcp { + server_addr: *public_addr, + server_port: listen_addr.port(), + }, + ListenParams::Unix(listen_path) => HelloMessage::Unix(listen_path.clone()), + }; + let hello_endpoint = self.hello_endpoint.load_full().unwrap(); tokio::spawn(async move { hello_endpoint - .call( - &conn.peer_id, - HelloMessage { - server_addr, - server_port, - }, - PRIO_NORMAL, - ) + .call(&conn.peer_id, hello_message, PRIO_NORMAL) .await .map(|_| ()) .log_err("Sending hello message"); @@ -443,9 +467,64 @@ impl EndpointHandler for NetApp { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&from) { - let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip()); - let remote_addr = SocketAddr::new(remote_ip, msg.server_port); - h(from, remote_addr, true); + let remote_addr = match (msg, c.remote_addr.clone()) { + ( + HelloMessage::Tcp { + server_addr, + server_port, + }, + SocketAddr::Inet(remote_addr), + ) => { + let remote_ip = server_addr.unwrap_or_else(|| remote_addr.ip()); + + Some(NamedSocketAddr::Inet(net::SocketAddr::new( + remote_ip, + *server_port, + ))) + } + + ( + HelloMessage::Tcp { + server_addr: Some(server_addr), + server_port, + }, + SocketAddr::Unix(_), + ) => Some(NamedSocketAddr::Inet(net::SocketAddr::new( + *server_addr, + *server_port, + ))), + + ( + HelloMessage::Tcp { + server_addr: None, + server_port: _, + }, + SocketAddr::Unix(UnixSocketAddr::Pathname(pathname)), + ) => Some(NamedSocketAddr::Unix(pathname)), + + ( + HelloMessage::Tcp { + server_addr: None, + server_port: _, + }, + SocketAddr::Unix(UnixSocketAddr::AbstractOrUnnamed), + ) => { + error!("Remote client connected to us via a unix socket and is listening on TCP without a public address specified."); + error!("Their peer address is an unnamed unix socket, as such we have no way of connecting back to them. Disconnecting."); + + None + } + + (HelloMessage::Unix(server_path), _) => { + Some(NamedSocketAddr::Unix(server_path.clone())) + } + }; + + if let Some(remote_addr) = remote_addr { + h(from, remote_addr, true); + } else { + self.disconnect(&from); + } } } } diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 2c27dba..da7157f 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -1,5 +1,5 @@ use std::collections::HashSet; -use std::net::SocketAddr; +use std::net; use std::sync::{Arc, RwLock}; use std::time::Duration; @@ -13,6 +13,8 @@ use sodiumoxide::crypto::hash; use tokio::sync::watch; +use tokio_unix_tcp::NamedSocketAddr; + use crate::endpoint::*; use crate::message::*; use crate::netapp::*; @@ -40,10 +42,10 @@ impl Message for PushMessage { type Seed = [u8; 32]; -#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] +#[derive(Hash, Clone, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] struct Peer { id: NodeID, - addr: SocketAddr, + addr: NamedSocketAddr, } type Cost = [u8; 40]; @@ -57,24 +59,28 @@ impl Peer { let mut cost = [0u8; 40]; match self.addr { - SocketAddr::V4(v4addr) => { - let v4ip = v4addr.ip().octets(); + NamedSocketAddr::Inet(addr) => match addr { + net::SocketAddr::V4(v4addr) => { + let v4ip = v4addr.ip().octets(); - for i in 0..4 { - let mut h = hasher; - h.update(&v4ip[..i + 1]); - cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + for i in 0..4 { + let mut h = hasher; + h.update(&v4ip[..i + 1]); + cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + } } - } - SocketAddr::V6(v6addr) => { - let v6ip = v6addr.ip().octets(); + net::SocketAddr::V6(v6addr) => { + let v6ip = v6addr.ip().octets(); - for i in 0..4 { - let mut h = hasher; - h.update(&v6ip[..i + 2]); - cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + for i in 0..4 { + let mut h = hasher; + h.update(&v6ip[..i + 2]); + cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); + } } - } + }, + // FIXME: What should the cost calculation be here? + NamedSocketAddr::Unix(_) => {} } { @@ -94,7 +100,10 @@ struct BasaltSlot { impl BasaltSlot { fn cost(&self) -> Cost { - self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST) + self.peer + .clone() + .map(|p| p.cost(&self.seed)) + .unwrap_or(MAX_COST) } } @@ -117,7 +126,7 @@ impl BasaltView { fn current_peers(&self) -> HashSet { self.slots .iter() - .filter_map(|s| s.peer) + .filter_map(|s| s.peer.clone()) .collect::>() } fn current_peers_vec(&self) -> Vec { @@ -139,7 +148,7 @@ impl BasaltView { let mut rng = thread_rng(); for _i in 0..count { let idx = rng.gen_range(0..possibles.len()); - ret.push(self.slots[possibles[idx]].peer.unwrap()); + ret.push(self.slots[possibles[idx]].peer.clone().unwrap()); } ret } @@ -158,7 +167,7 @@ impl BasaltView { peer.addr, hex::encode(peer_cost) ); - self.slots[i].peer = Some(*peer); + self.slots[i].peer = Some(peer.clone()); slot_cost = peer_cost; } } @@ -172,7 +181,7 @@ impl BasaltView { fn disconnected(&mut self, id: NodeID) { let mut cleared_slots = vec![]; for i in 0..self.slots.len() { - if let Some(p) = self.slots[i].peer { + if let Some(p) = self.slots[i].peer.clone() { if p.id == id { self.slots[i].peer = None; cleared_slots.push(i); @@ -204,7 +213,7 @@ impl BasaltView { let peer_cost = peer.cost(&self.slots[i].seed); if peer_cost < min_cost { min_cost = peer_cost; - min_peer = Some(*peer); + min_peer = Some(peer.clone()); } } if let Some(p) = min_peer { @@ -251,14 +260,14 @@ pub struct Basalt { impl Basalt { pub fn new( netapp: Arc, - bootstrap_list: Vec<(NodeID, SocketAddr)>, + bootstrap_list: Vec<(NodeID, NamedSocketAddr)>, param: BasaltParams, ) -> Arc { let bootstrap_peers = bootstrap_list .iter() .map(|(id, addr)| Peer { id: *id, - addr: *addr, + addr: addr.clone(), }) .collect::>(); @@ -280,9 +289,11 @@ impl Basalt { basalt.push_endpoint.set_handler(basalt.clone()); let basalt2 = basalt.clone(); - netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { - basalt2.on_connected(id, addr, is_incoming); - }); + netapp.on_connected( + move |id: NodeID, addr: NamedSocketAddr, is_incoming: bool| { + basalt2.on_connected(id, addr, is_incoming); + }, + ); let basalt2 = basalt.clone(); netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { @@ -307,7 +318,7 @@ impl Basalt { pub async fn run(self: Arc, must_exit: watch::Receiver) { for peer in self.bootstrap_peers.iter() { - tokio::spawn(self.clone().try_connect(*peer)); + tokio::spawn(self.clone().try_connect(peer.clone())); } tokio::join!( @@ -386,7 +397,7 @@ impl Basalt { let mut to_retry_maybe = self.bootstrap_peers.clone(); for (peer, _) in self.backlog.read().unwrap().iter() { if !self.bootstrap_peers.contains(peer) { - to_retry_maybe.push(*peer); + to_retry_maybe.push(peer.clone()); } } self.handle_peer_list(&to_retry_maybe[..]); @@ -397,7 +408,7 @@ impl Basalt { let to_connect = self.view.read().unwrap().should_try_list(peers); for peer in to_connect.iter() { - tokio::spawn(self.clone().try_connect(*peer)); + tokio::spawn(self.clone().try_connect(peer.clone())); } } @@ -406,16 +417,20 @@ impl Basalt { let view = self.view.read().unwrap(); let mut attempts = self.current_attempts.write().unwrap(); - if view.slots.iter().any(|x| x.peer == Some(peer)) { + if view.slots.iter().any(|x| x.peer == Some(peer.clone())) { return; } if attempts.contains(&peer) { return; } - attempts.insert(peer); + attempts.insert(peer.clone()); } - let res = self.netapp.clone().try_connect(peer.addr, peer.id).await; + let res = self + .netapp + .clone() + .try_connect(peer.addr.clone(), peer.id) + .await; trace!("Connection attempt to {}: {:?}", peer.addr, res); self.current_attempts.write().unwrap().remove(&peer); @@ -425,7 +440,7 @@ impl Basalt { } } - fn on_connected(self: &Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { + fn on_connected(self: &Arc, id: NodeID, addr: NamedSocketAddr, is_incoming: bool) { if is_incoming { self.handle_peer_list(&[Peer { id, addr }][..]); } else { @@ -434,7 +449,7 @@ impl Basalt { let mut backlog = self.backlog.write().unwrap(); if backlog.get(&peer).is_none() { - backlog.put(peer, ()); + backlog.put(peer.clone(), ()); } drop(backlog); diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 8e66604..f902e46 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -1,5 +1,4 @@ use std::collections::{HashMap, VecDeque}; -use std::net::SocketAddr; use std::sync::atomic::{self, AtomicU64}; use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; @@ -12,6 +11,8 @@ use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::watch; +use tokio_unix_tcp::NamedSocketAddr; + use sodiumoxide::crypto::hash; use crate::endpoint::*; @@ -43,7 +44,7 @@ impl Message for PingMessage { #[derive(Serialize, Deserialize)] struct PeerListMessage { - pub list: Vec<(NodeID, SocketAddr)>, + pub list: Vec<(NodeID, NamedSocketAddr)>, } impl Message for PeerListMessage { @@ -57,9 +58,9 @@ struct PeerInfoInternal { // addr is the currently connected address, // or the last address we were connected to, // or an arbitrary address some other peer gave us - addr: SocketAddr, + addr: NamedSocketAddr, // all_addrs contains all of the addresses everyone gave us - all_addrs: Vec, + all_addrs: Vec, state: PeerConnState, last_send_ping: Option, @@ -69,9 +70,9 @@ struct PeerInfoInternal { } impl PeerInfoInternal { - fn new(addr: SocketAddr, state: PeerConnState) -> Self { + fn new(addr: NamedSocketAddr, state: PeerConnState) -> Self { Self { - addr, + addr: addr.clone(), all_addrs: vec![addr], state, last_send_ping: None, @@ -83,12 +84,12 @@ impl PeerInfoInternal { } /// Information that the full mesh peering strategy can return about the peers it knows of -#[derive(Copy, Clone, Debug)] +#[derive(Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) pub id: NodeID, /// The node's network address - pub addr: SocketAddr, + pub addr: NamedSocketAddr, /// The current status of our connection to this node pub state: PeerConnState, /// The last time at which the node was seen @@ -153,11 +154,11 @@ impl KnownHosts { fn update_hash(&mut self) { self.hash = Self::calculate_hash(&self.list); } - fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { + fn map_into_vec(input: &HashMap) -> Vec<(NodeID, NamedSocketAddr)> { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { - list.push((*id, peer.addr)); + list.push((*id, peer.addr.clone())); } } list @@ -196,8 +197,8 @@ impl FullMeshPeeringStrategy { /// to all of the nodes specified in the bootstrap list. pub fn new( netapp: Arc, - bootstrap_list: Vec<(NodeID, SocketAddr)>, - our_addr: Option, + bootstrap_list: Vec<(NodeID, NamedSocketAddr)>, + our_addr: Option, ) -> Arc { let mut known_hosts = KnownHosts::new(); for (id, addr) in bootstrap_list { @@ -232,10 +233,12 @@ impl FullMeshPeeringStrategy { strat.peer_list_endpoint.set_handler(strat.clone()); let strat2 = strat.clone(); - netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { - let strat2 = strat2.clone(); - strat2.on_connected(id, addr, is_incoming); - }); + netapp.on_connected( + move |id: NodeID, addr: NamedSocketAddr, is_incoming: bool| { + let strat2 = strat2.clone(); + strat2.on_connected(id, addr, is_incoming); + }, + ); let strat2 = strat.clone(); netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { @@ -318,7 +321,11 @@ impl FullMeshPeeringStrategy { .filter(|x| **x != h.addr) .cloned() .collect::>(); - tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs)); + tokio::spawn(self.clone().try_connect( + id, + h.addr.clone(), + alternate_addrs, + )); } } } @@ -351,7 +358,7 @@ impl FullMeshPeeringStrategy { if !pings.is_empty() { pub_peer_list.push(PeerInfo { id: *id, - addr: info.addr, + addr: info.addr.clone(), state: info.state, last_seen: info.last_seen, avg_ping: Some( @@ -366,7 +373,7 @@ impl FullMeshPeeringStrategy { } else { pub_peer_list.push(PeerInfo { id: *id, - addr: info.addr, + addr: info.addr.clone(), state: info.state, last_seen: info.last_seen, avg_ping: None, @@ -458,18 +465,20 @@ impl FullMeshPeeringStrategy { } } - fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { + fn handle_peer_list(&self, list: &[(NodeID, NamedSocketAddr)]) { let mut known_hosts = self.known_hosts.write().unwrap(); let mut changed = false; for (id, addr) in list.iter() { if let Some(kh) = known_hosts.list.get_mut(id) { if !kh.all_addrs.contains(addr) { - kh.all_addrs.push(*addr); + kh.all_addrs.push(addr.clone()); changed = true; } } else { - known_hosts.list.insert(*id, self.new_peer(id, *addr)); + known_hosts + .list + .insert(*id, self.new_peer(id, addr.clone())); changed = true; } } @@ -483,16 +492,16 @@ impl FullMeshPeeringStrategy { async fn try_connect( self: Arc, id: NodeID, - default_addr: SocketAddr, - alternate_addrs: Vec, + default_addr: NamedSocketAddr, + alternate_addrs: Vec, ) { let conn_addr = { let mut ret = None; for addr in [default_addr].iter().chain(alternate_addrs.iter()) { debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); - match self.netapp.clone().try_connect(*addr, id).await { + match self.netapp.clone().try_connect(addr.clone(), id).await { Ok(()) => { - ret = Some(*addr); + ret = Some(addr.clone()); break; } Err(e) => { @@ -533,7 +542,7 @@ impl FullMeshPeeringStrategy { } } - fn on_connected(self: Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { + fn on_connected(self: Arc, id: NodeID, addr: NamedSocketAddr, is_incoming: bool) { let mut known_hosts = self.known_hosts.write().unwrap(); if is_incoming { if let Some(host) = known_hosts.list.get_mut(&id) { @@ -551,7 +560,7 @@ impl FullMeshPeeringStrategy { ); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; - host.addr = addr; + host.addr = addr.clone(); if !host.all_addrs.contains(&addr) { host.all_addrs.push(addr); } @@ -577,7 +586,7 @@ impl FullMeshPeeringStrategy { } } - fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal { + fn new_peer(&self, id: &NodeID, addr: NamedSocketAddr) -> PeerInfoInternal { let state = if *id == self.netapp.id { PeerConnState::Ourself } else { diff --git a/src/server.rs b/src/server.rs index 55b9e67..f79c4e5 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,5 +1,4 @@ use std::collections::HashMap; -use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; @@ -8,11 +7,12 @@ use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; use kuska_handshake::async_std::{handshake_server, BoxStream}; -use tokio::net::TcpStream; use tokio::select; use tokio::sync::{mpsc, watch}; use tokio_util::compat::*; +use tokio_unix_tcp::{SocketAddr, Stream}; + #[cfg(feature = "telemetry")] use opentelemetry::{ trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, @@ -61,7 +61,7 @@ pub(crate) struct ServerConn { impl ServerConn { pub(crate) async fn run( netapp: Arc, - socket: TcpStream, + socket: Stream, must_exit: watch::Receiver, ) -> Result<(), Error> { let remote_addr = socket.peer_addr()?; diff --git a/src/test.rs b/src/test.rs index 19759cc..528b6eb 100644 --- a/src/test.rs +++ b/src/test.rs @@ -1,10 +1,11 @@ -use std::net::SocketAddr; use std::sync::Arc; use std::time::Duration; use tokio::select; use tokio::sync::watch; +use tokio_unix_tcp::NamedSocketAddr; + use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; @@ -37,14 +38,20 @@ async fn run_test_inner() { let (pk2, sk2) = ed25519::gen_keypair(); let (pk3, sk3) = ed25519::gen_keypair(); - let addr1: SocketAddr = "127.0.0.1:19991".parse().unwrap(); - let addr2: SocketAddr = "127.0.0.1:19992".parse().unwrap(); - let addr3: SocketAddr = "127.0.0.1:19993".parse().unwrap(); + let addr1: NamedSocketAddr = "127.0.0.1:19991".parse().unwrap(); + let addr2: NamedSocketAddr = "127.0.0.1:19992".parse().unwrap(); + let addr3: NamedSocketAddr = "127.0.0.1:19993".parse().unwrap(); let (stop_tx, stop_rx) = watch::channel(false); - let (thread1, _netapp1, peering1) = - run_netapp(netid.clone(), pk1, sk1, addr1, vec![], stop_rx.clone()); + let (thread1, _netapp1, peering1) = run_netapp( + netid.clone(), + pk1, + sk1, + addr1.clone(), + vec![], + stop_rx.clone(), + ); tokio::time::sleep(Duration::from_secs(2)).await; // Connect second node and check it peers with everyone @@ -52,8 +59,8 @@ async fn run_test_inner() { netid.clone(), pk2, sk2, - addr2, - vec![(pk1, addr1)], + addr2.clone(), + vec![(pk1, addr1.into())], stop_rx.clone(), ); tokio::time::sleep(Duration::from_secs(5)).await; @@ -67,8 +74,14 @@ async fn run_test_inner() { assert_eq!(pl2.len(), 2); // Connect third ndoe and check it peers with everyone - let (thread3, _netapp3, peering3) = - run_netapp(netid, pk3, sk3, addr3, vec![(pk2, addr2)], stop_rx.clone()); + let (thread3, _netapp3, peering3) = run_netapp( + netid, + pk3, + sk3, + addr3, + vec![(pk2, addr2.into())], + stop_rx.clone(), + ); tokio::time::sleep(Duration::from_secs(5)).await; let pl1 = peering1.get_peer_list(); @@ -94,8 +107,8 @@ fn run_netapp( netid: auth::Key, _pk: NodeID, sk: ed25519::SecretKey, - listen_addr: SocketAddr, - bootstrap_peers: Vec<(NodeID, SocketAddr)>, + listen_addr: NamedSocketAddr, + bootstrap_peers: Vec<(NodeID, NamedSocketAddr)>, must_exit: watch::Receiver, ) -> ( tokio::task::JoinHandle<()>, diff --git a/src/util.rs b/src/util.rs index 56230b7..a248c77 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,10 +1,14 @@ -use std::net::SocketAddr; +use std::net; +use std::path::PathBuf; +use std::str::FromStr; use log::info; use serde::Serialize; use tokio::sync::watch; +use tokio_unix_tcp::{NamedSocketAddr, UnixSocketAddr}; + use crate::netapp::*; /// Utility function: encodes any serializable value in MessagePack binary format @@ -56,24 +60,37 @@ pub fn watch_ctrl_c() -> watch::Receiver { } /// Parse a peer's address including public key, written in the format: -/// `@:` -pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> { - let delim = peer.find('@')?; - let (key, ip) = peer.split_at(delim); - let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; - let ip = ip[1..].parse::().ok()?; - Some((pubkey, ip)) -} - -/// Parse and resolve a peer's address including public key, written in the format: -/// `@:` -pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec)> { - use std::net::ToSocketAddrs; - +/// `@:` or +/// `@` for unix domain sockets +pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, NamedSocketAddr)> { let delim = peer.find('@')?; let (key, host) = peer.split_at(delim); let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; - let hosts = host[1..].to_socket_addrs().ok()?.collect::>(); + let host = NamedSocketAddr::from_str(&host[1..]).ok()?; + Some((pubkey, host)) +} + +/// Parse and resolve a peer's address including public key, written in the format: +/// `@:` or +/// `@` for unix domain sockets +pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec)> { + let delim = peer.find('@')?; + let (key, host) = peer.split_at(delim); + let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; + let host = &host[1..]; + let hosts = if UnixSocketAddr::is_pathname(host) { + vec![PathBuf::from_str(host).unwrap().into()] + } else { + use std::net::ToSocketAddrs; + + host.parse::() + .ok()? + .to_socket_addrs() + .ok()? + .map(NamedSocketAddr::Inet) + .collect::>() + }; + if hosts.is_empty() { return None; } @@ -81,14 +98,23 @@ pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec Option<(NodeID, Vec)> { +pub async fn parse_and_resolve_peer_addr_async( + peer: &str, +) -> Option<(NodeID, Vec)> { let delim = peer.find('@')?; let (key, host) = peer.split_at(delim); let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; - let hosts = tokio::net::lookup_host(&host[1..]) - .await - .ok()? - .collect::>(); + let host = &host[1..]; + let hosts = if UnixSocketAddr::is_pathname(host) { + vec![PathBuf::from_str(host).unwrap().into()] + } else { + tokio::net::lookup_host(host) + .await + .ok()? + .map(NamedSocketAddr::Inet) + .collect::>() + }; + if hosts.is_empty() { return None; }