Compare commits

..

No commits in common. "unix-sockets-git-dependency" and "main" have entirely different histories.

12 changed files with 225 additions and 809 deletions

570
Cargo.lock generated
View file

@ -2,21 +2,6 @@
# It is not intended for manual editing. # It is not intended for manual editing.
version = 3 version = 3
[[package]]
name = "addr2line"
version = "0.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8a30b2e23b9e17a9f90641c7ab1549cd9b44f296d3ccbf309d2863cfe398a0cb"
dependencies = [
"gimli",
]
[[package]]
name = "adler"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe"
[[package]] [[package]]
name = "ahash" name = "ahash"
version = "0.7.6" version = "0.7.6"
@ -60,7 +45,7 @@ checksum = "76464446b8bc32758d7e88ee1a804d9914cd9b1cb264c029899680b0be29826f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
] ]
[[package]] [[package]]
@ -71,7 +56,7 @@ checksum = "d9b39be18770d11421cdb1b9947a45dd3f37e93092cbf377614828a319d5fee8"
dependencies = [ dependencies = [
"hermit-abi", "hermit-abi",
"libc", "libc",
"winapi 0.3.9", "winapi",
] ]
[[package]] [[package]]
@ -80,21 +65,6 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa" checksum = "d468802bab17cbc0cc575e9b053f41e72aa36bfa6b7f55e3529ffa43161b97fa"
[[package]]
name = "backtrace"
version = "0.3.69"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2089b7e3f35b9dd2d0ed921ead4f6d318c27680d4a5bd167b3ee120edb105837"
dependencies = [
"addr2line",
"cc",
"cfg-if 1.0.0",
"libc",
"miniz_oxide",
"object",
"rustc-demangle",
]
[[package]] [[package]]
name = "bitflags" name = "bitflags"
version = "1.3.2" version = "1.3.2"
@ -113,16 +83,6 @@ version = "1.4.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]]
name = "bytes"
version = "0.4.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "206fdffcfa2df7cbe15601ef46c813fce0965eb3286db6b56c583b814b51c81c"
dependencies = [
"byteorder",
"iovec",
]
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.2.1" version = "1.2.1"
@ -135,12 +95,6 @@ version = "1.0.73"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11" checksum = "2fff2a6927b3bb87f9595d67196a70493f627687a71d87a0d692242c33f58c11"
[[package]]
name = "cfg-if"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4785bdd1c96b2a846b2bd7cc02e86b6b3dbf14e7e53446c4f54c92a361040822"
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
version = "1.0.0" version = "1.0.0"
@ -159,7 +113,7 @@ dependencies = [
"num-traits", "num-traits",
"time", "time",
"wasm-bindgen", "wasm-bindgen",
"winapi 0.3.9", "winapi",
] ]
[[package]] [[package]]
@ -173,15 +127,6 @@ dependencies = [
"unicode-width", "unicode-width",
] ]
[[package]]
name = "cloudabi"
version = "0.0.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ddfc5b9aa5d4507acaf872de71051dfd0e309860e88966e1051e462a077aac4f"
dependencies = [
"bitflags",
]
[[package]] [[package]]
name = "core-foundation-sys" name = "core-foundation-sys"
version = "0.8.3" version = "0.8.3"
@ -194,19 +139,8 @@ version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521" checksum = "c2dd04ddaf88237dc3b8d8f9a3c1004b506b54b3313403944054d23c0870c521"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if",
"crossbeam-utils 0.8.11", "crossbeam-utils",
]
[[package]]
name = "crossbeam-utils"
version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c3c7c73a2d1e9fc0886a08b93e98eb643461230d5f1925e4036204d5f2e261a8"
dependencies = [
"autocfg",
"cfg-if 0.1.10",
"lazy_static",
] ]
[[package]] [[package]]
@ -215,7 +149,7 @@ version = "0.8.11"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc" checksum = "51887d4adc7b564537b15adcfb307936f8075dfcd5f00dde9a9f1d29383682bc"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if",
"once_cell", "once_cell",
] ]
@ -242,38 +176,10 @@ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"rustversion", "rustversion",
"syn 1.0.99", "syn",
"synstructure", "synstructure",
] ]
[[package]]
name = "fnv"
version = "1.0.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]]
name = "fuchsia-zircon"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.1.31"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678"
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.24" version = "0.3.24"
@ -330,7 +236,7 @@ checksum = "42cd15d1c7456c04dbdf7e88bcd69760d74f3a798d6444e16974b505b0e62f17"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
] ]
[[package]] [[package]]
@ -369,17 +275,11 @@ version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6" checksum = "4eb1a864a501629691edf6c15a593b7a51eebaa1e8468e9ddc623de7c9b58ec6"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if",
"libc", "libc",
"wasi 0.11.0+wasi-snapshot-preview1", "wasi 0.11.0+wasi-snapshot-preview1",
] ]
[[package]]
name = "gimli"
version = "0.28.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6fb8d784f27acf97159b40fc4db5ecd8aa23b9ad5ef69cdd136d3bc80665f0c0"
[[package]] [[package]]
name = "hashbrown" name = "hashbrown"
version = "0.12.3" version = "0.12.3"
@ -430,16 +330,7 @@ dependencies = [
"js-sys", "js-sys",
"once_cell", "once_cell",
"wasm-bindgen", "wasm-bindgen",
"winapi 0.3.9", "winapi",
]
[[package]]
name = "iovec"
version = "0.1.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b2b3ea6ff95e175473f8ffe6a7eb7c00d054240321b84c57051175fe3c1e075e"
dependencies = [
"libc",
] ]
[[package]] [[package]]
@ -451,23 +342,13 @@ dependencies = [
"wasm-bindgen", "wasm-bindgen",
] ]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]] [[package]]
name = "kuska-handshake" name = "kuska-handshake"
version = "0.2.0" version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e33da4b69f23c2ece0b3e729d079cebdc2c0206e493e42f510f500ad81c631d5" checksum = "e33da4b69f23c2ece0b3e729d079cebdc2c0206e493e42f510f500ad81c631d5"
dependencies = [ dependencies = [
"futures 0.3.24", "futures",
"hex", "hex",
"kuska-sodiumoxide", "kuska-sodiumoxide",
"log", "log",
@ -493,9 +374,9 @@ checksum = "e2abad23fbc42b3700f2f279844dc832adb2b2eb069b2df918f455c4e18cc646"
[[package]] [[package]]
name = "libc" name = "libc"
version = "0.2.149" version = "0.2.132"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a08173bc88b7955d1b3145aa561539096c421ac8debde8cbc3612ec635fee29b" checksum = "8371e4e5341c3a96db127eb2465ac681ced4c433e01dd0e938adbef26ba93ba5"
[[package]] [[package]]
name = "libsodium-sys" name = "libsodium-sys"
@ -509,22 +390,13 @@ dependencies = [
"walkdir", "walkdir",
] ]
[[package]]
name = "lock_api"
version = "0.3.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4da24a77a3d8a6d4862d95f72e6fdb9c09a643ecdb402d754004a557f2bec75"
dependencies = [
"scopeguard",
]
[[package]] [[package]]
name = "log" name = "log"
version = "0.4.17" version = "0.4.17"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if",
] ]
[[package]] [[package]]
@ -536,51 +408,17 @@ dependencies = [
"hashbrown", "hashbrown",
] ]
[[package]]
name = "maybe-uninit"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00"
[[package]] [[package]]
name = "memchr" name = "memchr"
version = "2.5.0" version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d" checksum = "2dffe52ecf27772e601905b7522cb4ef790d2cc203488bbd0e2fe85fcb74566d"
[[package]]
name = "miniz_oxide"
version = "0.7.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e7810e0be55b428ada41041c41f32c9f1a42817901b4ccf45fa3d4b6561e74c7"
dependencies = [
"adler",
]
[[package]] [[package]]
name = "mio" name = "mio"
version = "0.6.23" version = "0.8.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" checksum = "57ee1c23c7c63b0c9250c339ffdc69255f110b298b901b9f6c82547b7b87caaf"
dependencies = [
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio"
version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "927a765cd3fc26206e66b296465fa9d3e5ab003e651c1b3c060e7956d96b19d2"
dependencies = [ dependencies = [
"libc", "libc",
"log", "log",
@ -588,52 +426,18 @@ dependencies = [
"windows-sys", "windows-sys",
] ]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio 0.6.23",
]
[[package]]
name = "miow"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]]
name = "net2"
version = "0.2.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b13b648036a2339d06de780866fbdfda0dde886de7b3af2ddeba8b14f4ee34ac"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.10.0" version = "0.5.4"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 1.2.1", "bytes",
"cfg-if 1.0.0", "cfg-if",
"chrono", "chrono",
"env_logger", "env_logger",
"err-derive", "err-derive",
"futures 0.3.24", "futures",
"hex", "hex",
"kuska-handshake", "kuska-handshake",
"kuska-sodiumoxide", "kuska-sodiumoxide",
@ -648,7 +452,6 @@ dependencies = [
"structopt", "structopt",
"tokio", "tokio",
"tokio-stream", "tokio-stream",
"tokio-unix-tcp",
"tokio-util", "tokio-util",
] ]
@ -681,15 +484,6 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "object"
version = "0.32.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9cf5f9dd3933bd50a9e1f149ec995f39ae2c496d31fd772c1fd45ebc27e902b0"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "once_cell" name = "once_cell"
version = "1.13.1" version = "1.13.1"
@ -725,32 +519,6 @@ dependencies = [
"opentelemetry", "opentelemetry",
] ]
[[package]]
name = "parking_lot"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f842b1982eb6c2fe34036a4fbfb06dd185a3f5c8edfaacdf7d1ea10b07de6252"
dependencies = [
"lock_api",
"parking_lot_core",
"rustc_version",
]
[[package]]
name = "parking_lot_core"
version = "0.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bda66b810a62be75176a80873726630147a5ca780cd33921e0b5709033e66b0a"
dependencies = [
"cfg-if 0.1.10",
"cloudabi",
"libc",
"redox_syscall",
"rustc_version",
"smallvec",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "paste" name = "paste"
version = "1.0.9" version = "1.0.9"
@ -780,14 +548,14 @@ checksum = "069bdb1e05adc7a8990dce9cc75370895fbe4e3d58b9b73bf1aee56359344a55"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
] ]
[[package]] [[package]]
name = "pin-project-lite" name = "pin-project-lite"
version = "0.2.13" version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" checksum = "e0a7ae3ac2f1173085d398531c705756c94a4c56843785df85a60c1a0afac116"
[[package]] [[package]]
name = "pin-utils" name = "pin-utils"
@ -816,7 +584,7 @@ dependencies = [
"proc-macro-error-attr", "proc-macro-error-attr",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
"version_check", "version_check",
] ]
@ -833,18 +601,18 @@ dependencies = [
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "1.0.69" version = "1.0.43"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "134c189feb4956b20f6f547d2cf727d4c0fe06722b20a0eec87ed445a97f92da" checksum = "0a2ca2c61bc9f3d74d2886294ab7b9853abd9c1ad903a3ac7815c58989bb7bab"
dependencies = [ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]] [[package]]
name = "quote" name = "quote"
version = "1.0.33" version = "1.0.21"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5267fca4496028628a95160fc423a33e8b2e6af8a5302579e322e4b520293cae" checksum = "bbe448f377a7d6961e30f5955f9b8d106c3f5e449d493ee1b125c1d43c2b5179"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
] ]
@ -879,12 +647,6 @@ dependencies = [
"getrandom", "getrandom",
] ]
[[package]]
name = "redox_syscall"
version = "0.1.57"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce"
[[package]] [[package]]
name = "regex" name = "regex"
version = "1.6.0" version = "1.6.0"
@ -915,30 +677,15 @@ dependencies = [
[[package]] [[package]]
name = "rmp-serde" name = "rmp-serde"
version = "1.1.2" version = "0.15.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bffea85eea980d8a74453e5d02a8d93028f3c34725de143085a844ebe953258a" checksum = "723ecff9ad04f4ad92fe1c8ca6c20d2196d9286e9c60727c4cb5511629260e9d"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"rmp", "rmp",
"serde", "serde",
] ]
[[package]]
name = "rustc-demangle"
version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustc_version"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a"
dependencies = [
"semver",
]
[[package]] [[package]]
name = "rustversion" name = "rustversion"
version = "1.0.9" version = "1.0.9"
@ -954,45 +701,24 @@ dependencies = [
"winapi-util", "winapi-util",
] ]
[[package]]
name = "scopeguard"
version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
[[package]]
name = "semver"
version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403"
dependencies = [
"semver-parser",
]
[[package]]
name = "semver-parser"
version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3"
[[package]] [[package]]
name = "serde" name = "serde"
version = "1.0.189" version = "1.0.144"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e422a44e74ad4001bdc8eede9a4570ab52f71190e9c076d14369f38b9200537" checksum = "0f747710de3dcd43b88c9168773254e809d8ddbdf9653b84e2554ab219f17860"
dependencies = [ dependencies = [
"serde_derive", "serde_derive",
] ]
[[package]] [[package]]
name = "serde_derive" name = "serde_derive"
version = "1.0.189" version = "1.0.144"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e48d1f918009ce3145511378cf68d613e3b3d9137d67272562080d68a2b32d5" checksum = "94ed3a816fb1d101812f83e789f888322c34e291f894f19590dc310963e87a00"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn",
] ]
[[package]] [[package]]
@ -1013,23 +739,14 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "smallvec"
version = "0.6.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0"
dependencies = [
"maybe-uninit",
]
[[package]] [[package]]
name = "socket2" name = "socket2"
version = "0.5.4" version = "0.4.7"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4031e820eb552adee9295814c0ced9e5cf38ddf1e8b7d566d6de8e2538ea989e" checksum = "02e2d2db9033d13a1567121ddd7a095ee144db4e1ca1b1bda3419bc0da294ebd"
dependencies = [ dependencies = [
"libc", "libc",
"windows-sys", "winapi",
] ]
[[package]] [[package]]
@ -1053,7 +770,7 @@ dependencies = [
"proc-macro-error", "proc-macro-error",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
] ]
[[package]] [[package]]
@ -1067,17 +784,6 @@ dependencies = [
"unicode-ident", "unicode-ident",
] ]
[[package]]
name = "syn"
version = "2.0.38"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e96b79aaa137db8f61e26363a0c9b47d8b4ec75da28b7d1d614c2303e232408b"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]] [[package]]
name = "synstructure" name = "synstructure"
version = "0.12.6" version = "0.12.6"
@ -1086,7 +792,7 @@ checksum = "f36bdaa60a83aca3921b5259d5400cbf5e90fc51931376a9bd4a0eb79aa7210f"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
"unicode-xid", "unicode-xid",
] ]
@ -1125,7 +831,7 @@ checksum = "c251e90f708e16c49a16f4917dc2131e75222b72edfa9cb7f7c58ae56aae0c09"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
] ]
[[package]] [[package]]
@ -1136,87 +842,38 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [ dependencies = [
"libc", "libc",
"wasi 0.10.0+wasi-snapshot-preview1", "wasi 0.10.0+wasi-snapshot-preview1",
"winapi 0.3.9", "winapi",
] ]
[[package]] [[package]]
name = "tokio" name = "tokio"
version = "1.33.0" version = "1.21.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f38200e3ef7995e5ef13baec2f432a6da0aa9ac495b2c0e8f3b7eec2c92d653" checksum = "89797afd69d206ccd11fb0ea560a44bbb87731d020670e79416d442919257d42"
dependencies = [ dependencies = [
"backtrace", "autocfg",
"bytes 1.2.1", "bytes",
"libc", "libc",
"mio 0.8.8", "memchr",
"mio",
"num_cpus", "num_cpus",
"once_cell",
"pin-project-lite", "pin-project-lite",
"signal-hook-registry", "signal-hook-registry",
"socket2", "socket2",
"tokio-macros", "tokio-macros",
"windows-sys", "winapi",
]
[[package]]
name = "tokio-codec"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25b2998660ba0e70d18684de5d06b70b70a3a747469af9dea7618cc59e75976b"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"tokio-io",
]
[[package]]
name = "tokio-executor"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb2d1b8f4548dbf5e1f7818512e9c406860678f29c300cdf0ebac72d1a3a1671"
dependencies = [
"crossbeam-utils 0.7.2",
"futures 0.1.31",
]
[[package]]
name = "tokio-io"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "57fc868aae093479e3131e3d165c93b1c7474109d13c90ec0dda2a1bbfff0674"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"log",
] ]
[[package]] [[package]]
name = "tokio-macros" name = "tokio-macros"
version = "2.1.0" version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" checksum = "9724f9a975fb987ef7a3cd9be0350edcbe130698af5b8f7a631e23d42d052484"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 2.0.38", "syn",
]
[[package]]
name = "tokio-reactor"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "09bc590ec4ba8ba87652da2068d150dcada2cfa2e07faae270a5e0409aa51351"
dependencies = [
"crossbeam-utils 0.7.2",
"futures 0.1.31",
"lazy_static",
"log",
"mio 0.6.23",
"num_cpus",
"parking_lot",
"slab",
"tokio-executor",
"tokio-io",
"tokio-sync",
] ]
[[package]] [[package]]
@ -1230,53 +887,13 @@ dependencies = [
"tokio", "tokio",
] ]
[[package]]
name = "tokio-sync"
version = "0.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edfe50152bc8164fcc456dab7891fa9bf8beaf01c5ee7e1dd43a397c3cf87dee"
dependencies = [
"fnv",
"futures 0.1.31",
]
[[package]]
name = "tokio-uds"
version = "0.2.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ab57a4ac4111c8c9dbcf70779f6fc8bc35ae4b2454809febac840ad19bd7e4e0"
dependencies = [
"bytes 0.4.12",
"futures 0.1.31",
"iovec",
"libc",
"log",
"mio 0.6.23",
"mio-uds",
"tokio-codec",
"tokio-io",
"tokio-reactor",
]
[[package]]
name = "tokio-unix-tcp"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "25694660e312eccfaf8cdc29c7d1d9236a81ec14b968a77c5511940a142c30e2"
dependencies = [
"mio 0.8.8",
"serde",
"tokio",
"tokio-uds",
]
[[package]] [[package]]
name = "tokio-util" name = "tokio-util"
version = "0.7.3" version = "0.7.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45" checksum = "cc463cd8deddc3770d20f9852143d50bf6094e640b485cb2e189a2099085ff45"
dependencies = [ dependencies = [
"bytes 1.2.1", "bytes",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
@ -1321,7 +938,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56" checksum = "808cf2735cd4b6866113f648b791c6adc5714537bc222d9347bb203386ffda56"
dependencies = [ dependencies = [
"same-file", "same-file",
"winapi 0.3.9", "winapi",
"winapi-util", "winapi-util",
] ]
@ -1343,7 +960,7 @@ version = "0.2.82"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d" checksum = "fc7652e3f6c4706c8d9cd54832c4a4ccb9b5336e2c3bd154d5cccfbf1c1f5f7d"
dependencies = [ dependencies = [
"cfg-if 1.0.0", "cfg-if",
"wasm-bindgen-macro", "wasm-bindgen-macro",
] ]
@ -1358,7 +975,7 @@ dependencies = [
"once_cell", "once_cell",
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1380,7 +997,7 @@ checksum = "5be8e654bdd9b79216c2929ab90721aa82faf65c48cdf08bdc4e7f51357b80da"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
"syn 1.0.99", "syn",
"wasm-bindgen-backend", "wasm-bindgen-backend",
"wasm-bindgen-shared", "wasm-bindgen-shared",
] ]
@ -1391,12 +1008,6 @@ version = "0.2.82"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6598dd0bd3c7d51095ff6531a5b23e02acdc81804e30d8f07afb77b7215a140a" checksum = "6598dd0bd3c7d51095ff6531a5b23e02acdc81804e30d8f07afb77b7215a140a"
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@ -1407,12 +1018,6 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu", "winapi-x86_64-pc-windows-gnu",
] ]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]] [[package]]
name = "winapi-i686-pc-windows-gnu" name = "winapi-i686-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"
@ -1425,7 +1030,7 @@ version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178" checksum = "70ec6ce85bb158151cae5e5c87f95a8e97d2c0c4b001223f33a334e3ce5de178"
dependencies = [ dependencies = [
"winapi 0.3.9", "winapi",
] ]
[[package]] [[package]]
@ -1436,76 +1041,43 @@ checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "677d2418bec65e3338edb076e806bc1ec15693c5d0104683f2efe857f61056a9" checksum = "ea04155a16a59f9eab786fe12a4a450e75cdb175f9e0d80da1e17db09f55b8d2"
dependencies = [ dependencies = [
"windows-targets",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9a2fa6e2155d7247be68c096456083145c183cbbbc2764150dda45a87197940c"
dependencies = [
"windows_aarch64_gnullvm",
"windows_aarch64_msvc", "windows_aarch64_msvc",
"windows_i686_gnu", "windows_i686_gnu",
"windows_i686_msvc", "windows_i686_msvc",
"windows_x86_64_gnu", "windows_x86_64_gnu",
"windows_x86_64_gnullvm",
"windows_x86_64_msvc", "windows_x86_64_msvc",
] ]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b38e32f0abccf9987a4e3079dfb67dcd799fb61361e53e2882c3cbaf0d905d8"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.5" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dc35310971f3b2dbbf3f0690a219f40e2d9afcf64f9ab7cc1be722937c26b4bc" checksum = "9bb8c3fd39ade2d67e9874ac4f3db21f0d710bee00fe7cab16949ec184eeaa47"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.5" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a75915e7def60c94dcef72200b9a8e58e5091744960da64ec734a6c6e9b3743e" checksum = "180e6ccf01daf4c426b846dfc66db1fc518f074baa793aa7d9b9aaeffad6a3b6"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.5" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f55c233f70c4b27f66c523580f78f1004e8b5a8b659e05a4eb49d4166cca406" checksum = "e2e7917148b2812d1eeafaeb22a97e4813dfa60a3f8f78ebe204bcc88f12f024"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.5" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53d40abd2583d23e4718fddf1ebec84dbff8381c07cae67ff7768bbf19c6718e" checksum = "4dcd171b8776c41b97521e5da127a2d86ad280114807d0b2ab1e462bc764d9e1"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0b7b52767868a23d5bab768e390dc5f5c55825b6d30b86c844ff2dc7414044cc"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.36.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed94fce61571a4006852b7389a063ab983c02eb1bb37b47f8272ce92d06d9538" checksum = "c811ca4a8c853ef420abd8592ba53ddbbac90410fab6903b3e79972a631f7680"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]

View file

@ -1,6 +1,6 @@
[package] [package]
name = "netapp" name = "netapp"
version = "0.10.0" version = "0.5.4"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"
@ -27,7 +27,7 @@ tokio-util = { version = "0.7", default-features = false, features = ["compat",
tokio-stream = "0.1.7" tokio-stream = "0.1.7"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }
rmp-serde = "1.1" rmp-serde = "0.15"
hex = "0.4.2" hex = "0.4.2"
rand = { version = "0.8" } rand = { version = "0.8" }
@ -46,8 +46,6 @@ kuska-handshake = { version = "0.2.0", features = ["default", "async_std"] }
opentelemetry = { version = "0.17", optional = true } opentelemetry = { version = "0.17", optional = true }
opentelemetry-contrib = { version = "0.9", optional = true } opentelemetry-contrib = { version = "0.9", optional = true }
tokio-unix-tcp = { version = "0.2.0", features = ["serde"] }
[dev-dependencies] [dev-dependencies]
env_logger = "0.9" env_logger = "0.9"
structopt = { version = "0.3", default-features = false } structopt = { version = "0.3", default-features = false }

View file

@ -9,12 +9,10 @@ use serde::{Deserialize, Serialize};
use structopt::StructOpt; use structopt::StructOpt;
use sodiumoxide::crypto::auth; use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::{ed25519, PublicKey}; use sodiumoxide::crypto::sign::ed25519;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_unix_tcp::NamedSocketAddr;
use netapp::endpoint::*; use netapp::endpoint::*;
use netapp::message::*; use netapp::message::*;
use netapp::peering::basalt::*; use netapp::peering::basalt::*;
@ -98,13 +96,9 @@ async fn main() {
let netapp = NetApp::new(0u64, netid, privkey); let netapp = NetApp::new(0u64, netid, privkey);
let mut bootstrap_peers: Vec<(PublicKey, NamedSocketAddr)> = vec![]; let mut bootstrap_peers = vec![];
for peer in opt.bootstrap_peers.iter() { for peer in opt.bootstrap_peers.iter() {
bootstrap_peers.push( bootstrap_peers.push(parse_peer_addr(peer).expect("Invalid peer address"));
parse_peer_addr(peer)
.map(|(node_id, socket_bind_addr)| (node_id, socket_bind_addr.into()))
.expect("Invalid peer address"),
);
} }
let basalt_params = BasaltParams { let basalt_params = BasaltParams {

View file

@ -1,5 +1,5 @@
use std::io::Write; use std::io::Write;
use std::net; use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
@ -9,13 +9,10 @@ use futures::{stream, StreamExt};
use log::*; use log::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use structopt::StructOpt; use structopt::StructOpt;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_unix_tcp::NamedSocketAddr;
use sodiumoxide::crypto::auth; use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::{ed25519, PublicKey}; use sodiumoxide::crypto::sign::ed25519;
use netapp::endpoint::*; use netapp::endpoint::*;
use netapp::message::*; use netapp::message::*;
@ -78,25 +75,21 @@ async fn main() {
info!("Node public key: {}", hex::encode(privkey.public_key())); info!("Node public key: {}", hex::encode(privkey.public_key()));
let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
let listen_addr: net::SocketAddr = opt.listen_addr.parse().unwrap(); let listen_addr: SocketAddr = opt.listen_addr.parse().unwrap();
info!("Node public address: {:?}", public_addr); info!("Node public address: {:?}", public_addr);
info!("Node listen address: {}", listen_addr); info!("Node listen address: {}", listen_addr);
let netapp = NetApp::new(0u64, netid.clone(), privkey.clone()); let netapp = NetApp::new(0u64, netid.clone(), privkey.clone());
let mut bootstrap_peers: Vec<(PublicKey, NamedSocketAddr)> = vec![]; let mut bootstrap_peers = vec![];
for peer in opt.bootstrap_peers.iter() { for peer in opt.bootstrap_peers.iter() {
bootstrap_peers.push( bootstrap_peers.push(parse_peer_addr(peer).expect("Invalid peer address"));
parse_peer_addr(peer)
.map(|(node_id, socket_bind_addr)| (node_id, socket_bind_addr.into()))
.expect("Invalid peer address"),
);
} }
let peering = FullMeshPeeringStrategy::new( let peering = FullMeshPeeringStrategy::new(
netapp.clone(), netapp.clone(),
bootstrap_peers, bootstrap_peers,
public_addr.map(|a| net::SocketAddr::new(a, listen_addr.port()).into()), public_addr.map(|a| SocketAddr::new(a, listen_addr.port())),
); );
info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}", info!("Add more peers to this mesh by running: fullmesh -n {} -l 127.0.0.1:$((1000 + $RANDOM)) -b {}@{}",
@ -115,7 +108,7 @@ async fn main() {
tokio::join!( tokio::join!(
example.exchange_loop(watch_cancel.clone()), example.exchange_loop(watch_cancel.clone()),
netapp.listen(listen_addr.into(), public_addr, watch_cancel.clone()), netapp.listen(listen_addr, public_addr, watch_cancel.clone()),
peering.run(watch_cancel), peering.run(watch_cancel),
); );
} }

View file

@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr;
use std::pin::Pin; use std::pin::Pin;
use std::sync::atomic::{self, AtomicU32}; use std::sync::atomic::{self, AtomicU32};
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
@ -12,13 +13,11 @@ use log::{debug, error, trace};
use futures::io::AsyncReadExt; use futures::io::AsyncReadExt;
use futures::Stream; use futures::Stream;
use kuska_handshake::async_std::{handshake_client, BoxStream}; use kuska_handshake::async_std::{handshake_client, BoxStream};
use tokio::net::TcpStream;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::compat::*; use tokio_util::compat::*;
use tokio_unix_tcp::SocketAddr;
use tokio_unix_tcp::Stream as SocketStream;
#[cfg(feature = "telemetry")] #[cfg(feature = "telemetry")]
use opentelemetry::{ use opentelemetry::{
trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer}, trace::{FutureExt, Span, SpanKind, TraceContextExt, Tracer},
@ -48,7 +47,7 @@ pub(crate) struct ClientConn {
impl ClientConn { impl ClientConn {
pub(crate) async fn init( pub(crate) async fn init(
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
socket: SocketStream, socket: TcpStream,
peer_id: NodeID, peer_id: NodeID,
) -> Result<(), Error> { ) -> Result<(), Error> {
let remote_addr = socket.peer_addr()?; let remote_addr = socket.peer_addr()?;

View file

@ -155,7 +155,7 @@ impl<M: Message> Req<M> {
} }
pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> { pub(crate) fn from_enc(enc: ReqEnc) -> Result<Self, rmp_serde::decode::Error> {
let msg = rmp_serde::decode::from_slice(&enc.msg)?; let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
Ok(Req { Ok(Req {
msg: Arc::new(msg), msg: Arc::new(msg),
msg_ser: Some(enc.msg), msg_ser: Some(enc.msg),
@ -316,7 +316,7 @@ impl<M: Message> Resp<M> {
} }
pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> { pub(crate) fn from_enc(enc: RespEnc) -> Result<Self, Error> {
let msg = rmp_serde::decode::from_slice(&enc.msg)?; let msg = rmp_serde::decode::from_read_ref(&enc.msg)?;
Ok(Self { Ok(Self {
_phantom: Default::default(), _phantom: Default::default(),
msg, msg,

View file

@ -1,6 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::{self, IpAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
@ -14,11 +13,10 @@ use sodiumoxide::crypto::sign::ed25519;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::net::{TcpListener, TcpStream};
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tokio_unix_tcp::{Listener, NamedSocketAddr, SocketAddr, Stream, UnixSocketAddr};
use crate::client::*; use crate::client::*;
use crate::endpoint::*; use crate::endpoint::*;
use crate::error::*; use crate::error::*;
@ -41,19 +39,16 @@ pub(crate) type VersionTag = [u8; 16];
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005 pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005
#[derive(Serialize, Deserialize, Debug)] #[derive(Serialize, Deserialize, Debug)]
pub(crate) enum HelloMessage { pub(crate) struct HelloMessage {
Tcp { pub server_addr: Option<IpAddr>,
server_addr: Option<IpAddr>, pub server_port: u16,
server_port: u16,
},
Unix(PathBuf),
} }
impl Message for HelloMessage { impl Message for HelloMessage {
type Response = (); type Response = ();
} }
type OnConnectHandler = Box<dyn Fn(NodeID, NamedSocketAddr, bool) + Send + Sync>; type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>; type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
/// NetApp is the main class that handles incoming and outgoing connections. /// NetApp is the main class that handles incoming and outgoing connections.
@ -86,12 +81,9 @@ pub struct NetApp {
on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>, on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>,
} }
enum ListenParams { struct ListenParams {
Tcp { listen_addr: SocketAddr,
listen_addr: net::SocketAddr, public_addr: Option<IpAddr>,
public_addr: Option<IpAddr>,
},
Unix(PathBuf),
} }
impl NetApp { impl NetApp {
@ -137,7 +129,7 @@ impl NetApp {
/// as the peering strategy will need to set this itself. /// as the peering strategy will need to set this itself.
pub fn on_connected<F>(&self, handler: F) pub fn on_connected<F>(&self, handler: F)
where where
F: Fn(NodeID, NamedSocketAddr, bool) + Sized + Send + Sync + 'static, F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
{ {
self.on_connected_handler self.on_connected_handler
.store(Some(Arc::new(Box::new(handler)))); .store(Some(Arc::new(Box::new(handler))));
@ -187,18 +179,14 @@ impl NetApp {
/// If this is not called, the NetApp instance remains a passive client. /// If this is not called, the NetApp instance remains a passive client.
pub async fn listen( pub async fn listen(
self: Arc<Self>, self: Arc<Self>,
listen_addr: NamedSocketAddr, listen_addr: SocketAddr,
public_addr: Option<IpAddr>, public_addr: Option<IpAddr>,
mut must_exit: watch::Receiver<bool>, mut must_exit: watch::Receiver<bool>,
) { ) {
let listen_params = match &listen_addr { let listen_params = ListenParams {
NamedSocketAddr::Inet(addr) => ListenParams::Tcp { listen_addr,
listen_addr: *addr, public_addr,
public_addr,
},
NamedSocketAddr::Unix(path) => ListenParams::Unix(path.to_path_buf()),
}; };
if self if self
.listen_params .listen_params
.swap(Some(Arc::new(listen_params))) .swap(Some(Arc::new(listen_params)))
@ -207,9 +195,7 @@ impl NetApp {
error!("Trying to listen on NetApp but we're already listening!"); error!("Trying to listen on NetApp but we're already listening!");
} }
let listener = Listener::bind_and_prepare_unix(&listen_addr, true, None) let listener = TcpListener::bind(listen_addr).await.unwrap();
.await
.unwrap();
info!("Listening on {}", listen_addr); info!("Listening on {}", listen_addr);
let (conn_in, mut conn_out) = mpsc::unbounded_channel(); let (conn_in, mut conn_out) = mpsc::unbounded_channel();
@ -290,22 +276,18 @@ impl NetApp {
self.on_disconnected_handler.store(None); self.on_disconnected_handler.store(None);
} }
/// Attempt to connect to a peer, given by its ip:port / unix socket path /// Attempt to connect to a peer, given by its ip:port and its public key.
/// and its public key. The public key will be checked during the secret /// The public key will be checked during the secret handshake process.
/// handshake process. This function returns once the connection has been /// This function returns once the connection has been established and a
/// established and a successful handshake was made. At this point we can /// successfull handshake was made. At this point we can send messages to
/// send messages to the other node with `Netapp::request` /// the other node with `Netapp::request`
pub async fn try_connect( pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
self: Arc<Self>,
addr: NamedSocketAddr,
id: NodeID,
) -> Result<(), Error> {
// Don't connect to ourself, we don't care // Don't connect to ourself, we don't care
// but pretend we did // but pretend we did
if id == self.id { if id == self.id {
tokio::spawn(async move { tokio::spawn(async move {
if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(h) = self.on_connected_handler.load().as_ref() {
h(id, addr, false); h(id, ip, false);
} }
}); });
return Ok(()); return Ok(());
@ -316,8 +298,8 @@ impl NetApp {
return Ok(()); return Ok(());
} }
let socket = Stream::connect(&addr).await?; let socket = TcpStream::connect(ip).await?;
info!("Connected to {}, negotiating handshake...", addr); info!("Connected to {}, negotiating handshake...", ip);
ClientConn::init(self, socket, id).await?; ClientConn::init(self, socket, id).await?;
Ok(()) Ok(())
} }
@ -410,29 +392,23 @@ impl NetApp {
} }
if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(h) = self.on_connected_handler.load().as_ref() {
h( h(conn.peer_id, conn.remote_addr, false);
conn.peer_id,
conn.remote_addr.clone().to_named_socket_addr().unwrap(),
false,
);
} }
if let Some(lp) = self.listen_params.load_full() { if let Some(lp) = self.listen_params.load_full() {
let hello_message: HelloMessage = match lp.as_ref() { let server_addr = lp.public_addr;
ListenParams::Tcp { let server_port = lp.listen_addr.port();
listen_addr,
public_addr,
} => HelloMessage::Tcp {
server_addr: *public_addr,
server_port: listen_addr.port(),
},
ListenParams::Unix(listen_path) => HelloMessage::Unix(listen_path.clone()),
};
let hello_endpoint = self.hello_endpoint.load_full().unwrap(); let hello_endpoint = self.hello_endpoint.load_full().unwrap();
tokio::spawn(async move { tokio::spawn(async move {
hello_endpoint hello_endpoint
.call(&conn.peer_id, hello_message, PRIO_NORMAL) .call(
&conn.peer_id,
HelloMessage {
server_addr,
server_port,
},
PRIO_NORMAL,
)
.await .await
.map(|_| ()) .map(|_| ())
.log_err("Sending hello message"); .log_err("Sending hello message");
@ -467,64 +443,9 @@ impl EndpointHandler<HelloMessage> for NetApp {
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);
if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(h) = self.on_connected_handler.load().as_ref() {
if let Some(c) = self.server_conns.read().unwrap().get(&from) { if let Some(c) = self.server_conns.read().unwrap().get(&from) {
let remote_addr = match (msg, c.remote_addr.clone()) { let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
( let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
HelloMessage::Tcp { h(from, remote_addr, true);
server_addr,
server_port,
},
SocketAddr::Inet(remote_addr),
) => {
let remote_ip = server_addr.unwrap_or_else(|| remote_addr.ip());
Some(NamedSocketAddr::Inet(net::SocketAddr::new(
remote_ip,
*server_port,
)))
}
(
HelloMessage::Tcp {
server_addr: Some(server_addr),
server_port,
},
SocketAddr::Unix(_),
) => Some(NamedSocketAddr::Inet(net::SocketAddr::new(
*server_addr,
*server_port,
))),
(
HelloMessage::Tcp {
server_addr: None,
server_port: _,
},
SocketAddr::Unix(UnixSocketAddr::Pathname(pathname)),
) => Some(NamedSocketAddr::Unix(pathname)),
(
HelloMessage::Tcp {
server_addr: None,
server_port: _,
},
SocketAddr::Unix(UnixSocketAddr::AbstractOrUnnamed),
) => {
error!("Remote client connected to us via a unix socket and is listening on TCP without a public address specified.");
error!("Their peer address is an unnamed unix socket, as such we have no way of connecting back to them. Disconnecting.");
None
}
(HelloMessage::Unix(server_path), _) => {
Some(NamedSocketAddr::Unix(server_path.clone()))
}
};
if let Some(remote_addr) = remote_addr {
h(from, remote_addr, true);
} else {
self.disconnect(&from);
}
} }
} }
} }

View file

@ -1,5 +1,5 @@
use std::collections::HashSet; use std::collections::HashSet;
use std::net; use std::net::SocketAddr;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
@ -13,8 +13,6 @@ use sodiumoxide::crypto::hash;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_unix_tcp::NamedSocketAddr;
use crate::endpoint::*; use crate::endpoint::*;
use crate::message::*; use crate::message::*;
use crate::netapp::*; use crate::netapp::*;
@ -42,10 +40,10 @@ impl Message for PushMessage {
type Seed = [u8; 32]; type Seed = [u8; 32];
#[derive(Hash, Clone, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] #[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
struct Peer { struct Peer {
id: NodeID, id: NodeID,
addr: NamedSocketAddr, addr: SocketAddr,
} }
type Cost = [u8; 40]; type Cost = [u8; 40];
@ -59,28 +57,24 @@ impl Peer {
let mut cost = [0u8; 40]; let mut cost = [0u8; 40];
match self.addr { match self.addr {
NamedSocketAddr::Inet(addr) => match addr { SocketAddr::V4(v4addr) => {
net::SocketAddr::V4(v4addr) => { let v4ip = v4addr.ip().octets();
let v4ip = v4addr.ip().octets();
for i in 0..4 { for i in 0..4 {
let mut h = hasher; let mut h = hasher;
h.update(&v4ip[..i + 1]); h.update(&v4ip[..i + 1]);
cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
}
} }
net::SocketAddr::V6(v6addr) => { }
let v6ip = v6addr.ip().octets(); SocketAddr::V6(v6addr) => {
let v6ip = v6addr.ip().octets();
for i in 0..4 { for i in 0..4 {
let mut h = hasher; let mut h = hasher;
h.update(&v6ip[..i + 2]); h.update(&v6ip[..i + 2]);
cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]); cost[i * 8..(i + 1) * 8].copy_from_slice(&h.finalize()[..8]);
}
} }
}, }
// FIXME: What should the cost calculation be here?
NamedSocketAddr::Unix(_) => {}
} }
{ {
@ -100,10 +94,7 @@ struct BasaltSlot {
impl BasaltSlot { impl BasaltSlot {
fn cost(&self) -> Cost { fn cost(&self) -> Cost {
self.peer self.peer.map(|p| p.cost(&self.seed)).unwrap_or(MAX_COST)
.clone()
.map(|p| p.cost(&self.seed))
.unwrap_or(MAX_COST)
} }
} }
@ -126,7 +117,7 @@ impl BasaltView {
fn current_peers(&self) -> HashSet<Peer> { fn current_peers(&self) -> HashSet<Peer> {
self.slots self.slots
.iter() .iter()
.filter_map(|s| s.peer.clone()) .filter_map(|s| s.peer)
.collect::<HashSet<_>>() .collect::<HashSet<_>>()
} }
fn current_peers_vec(&self) -> Vec<Peer> { fn current_peers_vec(&self) -> Vec<Peer> {
@ -148,7 +139,7 @@ impl BasaltView {
let mut rng = thread_rng(); let mut rng = thread_rng();
for _i in 0..count { for _i in 0..count {
let idx = rng.gen_range(0..possibles.len()); let idx = rng.gen_range(0..possibles.len());
ret.push(self.slots[possibles[idx]].peer.clone().unwrap()); ret.push(self.slots[possibles[idx]].peer.unwrap());
} }
ret ret
} }
@ -167,7 +158,7 @@ impl BasaltView {
peer.addr, peer.addr,
hex::encode(peer_cost) hex::encode(peer_cost)
); );
self.slots[i].peer = Some(peer.clone()); self.slots[i].peer = Some(*peer);
slot_cost = peer_cost; slot_cost = peer_cost;
} }
} }
@ -181,7 +172,7 @@ impl BasaltView {
fn disconnected(&mut self, id: NodeID) { fn disconnected(&mut self, id: NodeID) {
let mut cleared_slots = vec![]; let mut cleared_slots = vec![];
for i in 0..self.slots.len() { for i in 0..self.slots.len() {
if let Some(p) = self.slots[i].peer.clone() { if let Some(p) = self.slots[i].peer {
if p.id == id { if p.id == id {
self.slots[i].peer = None; self.slots[i].peer = None;
cleared_slots.push(i); cleared_slots.push(i);
@ -213,7 +204,7 @@ impl BasaltView {
let peer_cost = peer.cost(&self.slots[i].seed); let peer_cost = peer.cost(&self.slots[i].seed);
if peer_cost < min_cost { if peer_cost < min_cost {
min_cost = peer_cost; min_cost = peer_cost;
min_peer = Some(peer.clone()); min_peer = Some(*peer);
} }
} }
if let Some(p) = min_peer { if let Some(p) = min_peer {
@ -260,14 +251,14 @@ pub struct Basalt {
impl Basalt { impl Basalt {
pub fn new( pub fn new(
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
bootstrap_list: Vec<(NodeID, NamedSocketAddr)>, bootstrap_list: Vec<(NodeID, SocketAddr)>,
param: BasaltParams, param: BasaltParams,
) -> Arc<Self> { ) -> Arc<Self> {
let bootstrap_peers = bootstrap_list let bootstrap_peers = bootstrap_list
.iter() .iter()
.map(|(id, addr)| Peer { .map(|(id, addr)| Peer {
id: *id, id: *id,
addr: addr.clone(), addr: *addr,
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
@ -289,11 +280,9 @@ impl Basalt {
basalt.push_endpoint.set_handler(basalt.clone()); basalt.push_endpoint.set_handler(basalt.clone());
let basalt2 = basalt.clone(); let basalt2 = basalt.clone();
netapp.on_connected( netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
move |id: NodeID, addr: NamedSocketAddr, is_incoming: bool| { basalt2.on_connected(id, addr, is_incoming);
basalt2.on_connected(id, addr, is_incoming); });
},
);
let basalt2 = basalt.clone(); let basalt2 = basalt.clone();
netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
@ -318,7 +307,7 @@ impl Basalt {
pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) { pub async fn run(self: Arc<Self>, must_exit: watch::Receiver<bool>) {
for peer in self.bootstrap_peers.iter() { for peer in self.bootstrap_peers.iter() {
tokio::spawn(self.clone().try_connect(peer.clone())); tokio::spawn(self.clone().try_connect(*peer));
} }
tokio::join!( tokio::join!(
@ -397,7 +386,7 @@ impl Basalt {
let mut to_retry_maybe = self.bootstrap_peers.clone(); let mut to_retry_maybe = self.bootstrap_peers.clone();
for (peer, _) in self.backlog.read().unwrap().iter() { for (peer, _) in self.backlog.read().unwrap().iter() {
if !self.bootstrap_peers.contains(peer) { if !self.bootstrap_peers.contains(peer) {
to_retry_maybe.push(peer.clone()); to_retry_maybe.push(*peer);
} }
} }
self.handle_peer_list(&to_retry_maybe[..]); self.handle_peer_list(&to_retry_maybe[..]);
@ -408,7 +397,7 @@ impl Basalt {
let to_connect = self.view.read().unwrap().should_try_list(peers); let to_connect = self.view.read().unwrap().should_try_list(peers);
for peer in to_connect.iter() { for peer in to_connect.iter() {
tokio::spawn(self.clone().try_connect(peer.clone())); tokio::spawn(self.clone().try_connect(*peer));
} }
} }
@ -417,20 +406,16 @@ impl Basalt {
let view = self.view.read().unwrap(); let view = self.view.read().unwrap();
let mut attempts = self.current_attempts.write().unwrap(); let mut attempts = self.current_attempts.write().unwrap();
if view.slots.iter().any(|x| x.peer == Some(peer.clone())) { if view.slots.iter().any(|x| x.peer == Some(peer)) {
return; return;
} }
if attempts.contains(&peer) { if attempts.contains(&peer) {
return; return;
} }
attempts.insert(peer.clone()); attempts.insert(peer);
} }
let res = self let res = self.netapp.clone().try_connect(peer.addr, peer.id).await;
.netapp
.clone()
.try_connect(peer.addr.clone(), peer.id)
.await;
trace!("Connection attempt to {}: {:?}", peer.addr, res); trace!("Connection attempt to {}: {:?}", peer.addr, res);
self.current_attempts.write().unwrap().remove(&peer); self.current_attempts.write().unwrap().remove(&peer);
@ -440,7 +425,7 @@ impl Basalt {
} }
} }
fn on_connected(self: &Arc<Self>, id: NodeID, addr: NamedSocketAddr, is_incoming: bool) { fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
if is_incoming { if is_incoming {
self.handle_peer_list(&[Peer { id, addr }][..]); self.handle_peer_list(&[Peer { id, addr }][..]);
} else { } else {
@ -449,7 +434,7 @@ impl Basalt {
let mut backlog = self.backlog.write().unwrap(); let mut backlog = self.backlog.write().unwrap();
if backlog.get(&peer).is_none() { if backlog.get(&peer).is_none() {
backlog.put(peer.clone(), ()); backlog.put(peer, ());
} }
drop(backlog); drop(backlog);

View file

@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::net::SocketAddr;
use std::sync::atomic::{self, AtomicU64}; use std::sync::atomic::{self, AtomicU64};
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
@ -11,8 +12,6 @@ use serde::{Deserialize, Serialize};
use tokio::select; use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_unix_tcp::NamedSocketAddr;
use sodiumoxide::crypto::hash; use sodiumoxide::crypto::hash;
use crate::endpoint::*; use crate::endpoint::*;
@ -44,7 +43,7 @@ impl Message for PingMessage {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct PeerListMessage { struct PeerListMessage {
pub list: Vec<(NodeID, NamedSocketAddr)>, pub list: Vec<(NodeID, SocketAddr)>,
} }
impl Message for PeerListMessage { impl Message for PeerListMessage {
@ -58,9 +57,9 @@ struct PeerInfoInternal {
// addr is the currently connected address, // addr is the currently connected address,
// or the last address we were connected to, // or the last address we were connected to,
// or an arbitrary address some other peer gave us // or an arbitrary address some other peer gave us
addr: NamedSocketAddr, addr: SocketAddr,
// all_addrs contains all of the addresses everyone gave us // all_addrs contains all of the addresses everyone gave us
all_addrs: Vec<NamedSocketAddr>, all_addrs: Vec<SocketAddr>,
state: PeerConnState, state: PeerConnState,
last_send_ping: Option<Instant>, last_send_ping: Option<Instant>,
@ -70,9 +69,9 @@ struct PeerInfoInternal {
} }
impl PeerInfoInternal { impl PeerInfoInternal {
fn new(addr: NamedSocketAddr, state: PeerConnState) -> Self { fn new(addr: SocketAddr, state: PeerConnState) -> Self {
Self { Self {
addr: addr.clone(), addr,
all_addrs: vec![addr], all_addrs: vec![addr],
state, state,
last_send_ping: None, last_send_ping: None,
@ -84,12 +83,12 @@ impl PeerInfoInternal {
} }
/// Information that the full mesh peering strategy can return about the peers it knows of /// Information that the full mesh peering strategy can return about the peers it knows of
#[derive(Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct PeerInfo { pub struct PeerInfo {
/// The node's identifier (its public key) /// The node's identifier (its public key)
pub id: NodeID, pub id: NodeID,
/// The node's network address /// The node's network address
pub addr: NamedSocketAddr, pub addr: SocketAddr,
/// The current status of our connection to this node /// The current status of our connection to this node
pub state: PeerConnState, pub state: PeerConnState,
/// The last time at which the node was seen /// The last time at which the node was seen
@ -154,11 +153,11 @@ impl KnownHosts {
fn update_hash(&mut self) { fn update_hash(&mut self) {
self.hash = Self::calculate_hash(&self.list); self.hash = Self::calculate_hash(&self.list);
} }
fn map_into_vec(input: &HashMap<NodeID, PeerInfoInternal>) -> Vec<(NodeID, NamedSocketAddr)> { fn map_into_vec(input: &HashMap<NodeID, PeerInfoInternal>) -> Vec<(NodeID, SocketAddr)> {
let mut list = Vec::with_capacity(input.len()); let mut list = Vec::with_capacity(input.len());
for (id, peer) in input.iter() { for (id, peer) in input.iter() {
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
list.push((*id, peer.addr.clone())); list.push((*id, peer.addr));
} }
} }
list list
@ -197,8 +196,8 @@ impl FullMeshPeeringStrategy {
/// to all of the nodes specified in the bootstrap list. /// to all of the nodes specified in the bootstrap list.
pub fn new( pub fn new(
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
bootstrap_list: Vec<(NodeID, NamedSocketAddr)>, bootstrap_list: Vec<(NodeID, SocketAddr)>,
our_addr: Option<NamedSocketAddr>, our_addr: Option<SocketAddr>,
) -> Arc<Self> { ) -> Arc<Self> {
let mut known_hosts = KnownHosts::new(); let mut known_hosts = KnownHosts::new();
for (id, addr) in bootstrap_list { for (id, addr) in bootstrap_list {
@ -233,12 +232,10 @@ impl FullMeshPeeringStrategy {
strat.peer_list_endpoint.set_handler(strat.clone()); strat.peer_list_endpoint.set_handler(strat.clone());
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.on_connected( netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
move |id: NodeID, addr: NamedSocketAddr, is_incoming: bool| { let strat2 = strat2.clone();
let strat2 = strat2.clone(); strat2.on_connected(id, addr, is_incoming);
strat2.on_connected(id, addr, is_incoming); });
},
);
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
@ -321,11 +318,7 @@ impl FullMeshPeeringStrategy {
.filter(|x| **x != h.addr) .filter(|x| **x != h.addr)
.cloned() .cloned()
.collect::<Vec<_>>(); .collect::<Vec<_>>();
tokio::spawn(self.clone().try_connect( tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs));
id,
h.addr.clone(),
alternate_addrs,
));
} }
} }
} }
@ -358,7 +351,7 @@ impl FullMeshPeeringStrategy {
if !pings.is_empty() { if !pings.is_empty() {
pub_peer_list.push(PeerInfo { pub_peer_list.push(PeerInfo {
id: *id, id: *id,
addr: info.addr.clone(), addr: info.addr,
state: info.state, state: info.state,
last_seen: info.last_seen, last_seen: info.last_seen,
avg_ping: Some( avg_ping: Some(
@ -373,7 +366,7 @@ impl FullMeshPeeringStrategy {
} else { } else {
pub_peer_list.push(PeerInfo { pub_peer_list.push(PeerInfo {
id: *id, id: *id,
addr: info.addr.clone(), addr: info.addr,
state: info.state, state: info.state,
last_seen: info.last_seen, last_seen: info.last_seen,
avg_ping: None, avg_ping: None,
@ -465,20 +458,18 @@ impl FullMeshPeeringStrategy {
} }
} }
fn handle_peer_list(&self, list: &[(NodeID, NamedSocketAddr)]) { fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
let mut changed = false; let mut changed = false;
for (id, addr) in list.iter() { for (id, addr) in list.iter() {
if let Some(kh) = known_hosts.list.get_mut(id) { if let Some(kh) = known_hosts.list.get_mut(id) {
if !kh.all_addrs.contains(addr) { if !kh.all_addrs.contains(addr) {
kh.all_addrs.push(addr.clone()); kh.all_addrs.push(*addr);
changed = true; changed = true;
} }
} else { } else {
known_hosts known_hosts.list.insert(*id, self.new_peer(id, *addr));
.list
.insert(*id, self.new_peer(id, addr.clone()));
changed = true; changed = true;
} }
} }
@ -492,16 +483,16 @@ impl FullMeshPeeringStrategy {
async fn try_connect( async fn try_connect(
self: Arc<Self>, self: Arc<Self>,
id: NodeID, id: NodeID,
default_addr: NamedSocketAddr, default_addr: SocketAddr,
alternate_addrs: Vec<NamedSocketAddr>, alternate_addrs: Vec<SocketAddr>,
) { ) {
let conn_addr = { let conn_addr = {
let mut ret = None; let mut ret = None;
for addr in [default_addr].iter().chain(alternate_addrs.iter()) { for addr in [default_addr].iter().chain(alternate_addrs.iter()) {
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
match self.netapp.clone().try_connect(addr.clone(), id).await { match self.netapp.clone().try_connect(*addr, id).await {
Ok(()) => { Ok(()) => {
ret = Some(addr.clone()); ret = Some(*addr);
break; break;
} }
Err(e) => { Err(e) => {
@ -542,7 +533,7 @@ impl FullMeshPeeringStrategy {
} }
} }
fn on_connected(self: Arc<Self>, id: NodeID, addr: NamedSocketAddr, is_incoming: bool) { fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
if is_incoming { if is_incoming {
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
@ -560,7 +551,7 @@ impl FullMeshPeeringStrategy {
); );
if let Some(host) = known_hosts.list.get_mut(&id) { if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected; host.state = PeerConnState::Connected;
host.addr = addr.clone(); host.addr = addr;
if !host.all_addrs.contains(&addr) { if !host.all_addrs.contains(&addr) {
host.all_addrs.push(addr); host.all_addrs.push(addr);
} }
@ -586,7 +577,7 @@ impl FullMeshPeeringStrategy {
} }
} }
fn new_peer(&self, id: &NodeID, addr: NamedSocketAddr) -> PeerInfoInternal { fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfoInternal {
let state = if *id == self.netapp.id { let state = if *id == self.netapp.id {
PeerConnState::Ourself PeerConnState::Ourself
} else { } else {

View file

@ -1,4 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
@ -7,12 +8,11 @@ use log::*;
use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::io::{AsyncReadExt, AsyncWriteExt};
use kuska_handshake::async_std::{handshake_server, BoxStream}; use kuska_handshake::async_std::{handshake_server, BoxStream};
use tokio::net::TcpStream;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use tokio_util::compat::*; use tokio_util::compat::*;
use tokio_unix_tcp::{SocketAddr, Stream};
#[cfg(feature = "telemetry")] #[cfg(feature = "telemetry")]
use opentelemetry::{ use opentelemetry::{
trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer}, trace::{FutureExt, Span, SpanKind, TraceContextExt, TraceId, Tracer},
@ -61,7 +61,7 @@ pub(crate) struct ServerConn {
impl ServerConn { impl ServerConn {
pub(crate) async fn run( pub(crate) async fn run(
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
socket: Stream, socket: TcpStream,
must_exit: watch::Receiver<bool>, must_exit: watch::Receiver<bool>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let remote_addr = socket.peer_addr()?; let remote_addr = socket.peer_addr()?;

View file

@ -1,11 +1,10 @@
use std::net::SocketAddr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use tokio::select; use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_unix_tcp::NamedSocketAddr;
use sodiumoxide::crypto::auth; use sodiumoxide::crypto::auth;
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
@ -38,20 +37,14 @@ async fn run_test_inner() {
let (pk2, sk2) = ed25519::gen_keypair(); let (pk2, sk2) = ed25519::gen_keypair();
let (pk3, sk3) = ed25519::gen_keypair(); let (pk3, sk3) = ed25519::gen_keypair();
let addr1: NamedSocketAddr = "127.0.0.1:19991".parse().unwrap(); let addr1: SocketAddr = "127.0.0.1:19991".parse().unwrap();
let addr2: NamedSocketAddr = "127.0.0.1:19992".parse().unwrap(); let addr2: SocketAddr = "127.0.0.1:19992".parse().unwrap();
let addr3: NamedSocketAddr = "127.0.0.1:19993".parse().unwrap(); let addr3: SocketAddr = "127.0.0.1:19993".parse().unwrap();
let (stop_tx, stop_rx) = watch::channel(false); let (stop_tx, stop_rx) = watch::channel(false);
let (thread1, _netapp1, peering1) = run_netapp( let (thread1, _netapp1, peering1) =
netid.clone(), run_netapp(netid.clone(), pk1, sk1, addr1, vec![], stop_rx.clone());
pk1,
sk1,
addr1.clone(),
vec![],
stop_rx.clone(),
);
tokio::time::sleep(Duration::from_secs(2)).await; tokio::time::sleep(Duration::from_secs(2)).await;
// Connect second node and check it peers with everyone // Connect second node and check it peers with everyone
@ -59,8 +52,8 @@ async fn run_test_inner() {
netid.clone(), netid.clone(),
pk2, pk2,
sk2, sk2,
addr2.clone(), addr2,
vec![(pk1, addr1.into())], vec![(pk1, addr1)],
stop_rx.clone(), stop_rx.clone(),
); );
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
@ -74,14 +67,8 @@ async fn run_test_inner() {
assert_eq!(pl2.len(), 2); assert_eq!(pl2.len(), 2);
// Connect third ndoe and check it peers with everyone // Connect third ndoe and check it peers with everyone
let (thread3, _netapp3, peering3) = run_netapp( let (thread3, _netapp3, peering3) =
netid, run_netapp(netid, pk3, sk3, addr3, vec![(pk2, addr2)], stop_rx.clone());
pk3,
sk3,
addr3,
vec![(pk2, addr2.into())],
stop_rx.clone(),
);
tokio::time::sleep(Duration::from_secs(5)).await; tokio::time::sleep(Duration::from_secs(5)).await;
let pl1 = peering1.get_peer_list(); let pl1 = peering1.get_peer_list();
@ -107,8 +94,8 @@ fn run_netapp(
netid: auth::Key, netid: auth::Key,
_pk: NodeID, _pk: NodeID,
sk: ed25519::SecretKey, sk: ed25519::SecretKey,
listen_addr: NamedSocketAddr, listen_addr: SocketAddr,
bootstrap_peers: Vec<(NodeID, NamedSocketAddr)>, bootstrap_peers: Vec<(NodeID, SocketAddr)>,
must_exit: watch::Receiver<bool>, must_exit: watch::Receiver<bool>,
) -> ( ) -> (
tokio::task::JoinHandle<()>, tokio::task::JoinHandle<()>,

View file

@ -1,14 +1,10 @@
use std::net; use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use log::info; use log::info;
use serde::Serialize; use serde::Serialize;
use tokio::sync::watch; use tokio::sync::watch;
use tokio_unix_tcp::{NamedSocketAddr, UnixSocketAddr};
use crate::netapp::*; use crate::netapp::*;
/// Utility function: encodes any serializable value in MessagePack binary format /// Utility function: encodes any serializable value in MessagePack binary format
@ -21,7 +17,9 @@ where
T: Serialize + ?Sized, T: Serialize + ?Sized,
{ {
let mut wr = Vec::with_capacity(128); let mut wr = Vec::with_capacity(128);
let mut se = rmp_serde::Serializer::new(&mut wr).with_struct_map(); let mut se = rmp_serde::Serializer::new(&mut wr)
.with_struct_map()
.with_string_variants();
val.serialize(&mut se)?; val.serialize(&mut se)?;
Ok(wr) Ok(wr)
} }
@ -60,37 +58,24 @@ pub fn watch_ctrl_c() -> watch::Receiver<bool> {
} }
/// Parse a peer's address including public key, written in the format: /// Parse a peer's address including public key, written in the format:
/// `<public key hex>@<ip>:<port>` or /// `<public key hex>@<ip>:<port>`
/// `<public key hex>@<path>` for unix domain sockets pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, SocketAddr)> {
pub fn parse_peer_addr(peer: &str) -> Option<(NodeID, NamedSocketAddr)> {
let delim = peer.find('@')?; let delim = peer.find('@')?;
let (key, host) = peer.split_at(delim); let (key, ip) = peer.split_at(delim);
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
let host = NamedSocketAddr::from_str(&host[1..]).ok()?; let ip = ip[1..].parse::<SocketAddr>().ok()?;
Some((pubkey, host)) Some((pubkey, ip))
} }
/// Parse and resolve a peer's address including public key, written in the format: /// Parse and resolve a peer's address including public key, written in the format:
/// `<public key hex>@<ip or hostname>:<port>` or /// `<public key hex>@<ip or hostname>:<port>`
/// `<public key hex>@<path>` for unix domain sockets pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<NamedSocketAddr>)> { use std::net::ToSocketAddrs;
let delim = peer.find('@')?; let delim = peer.find('@')?;
let (key, host) = peer.split_at(delim); let (key, host) = peer.split_at(delim);
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
let host = &host[1..]; let hosts = host[1..].to_socket_addrs().ok()?.collect::<Vec<_>>();
let hosts = if UnixSocketAddr::is_pathname(host) {
vec![PathBuf::from_str(host).unwrap().into()]
} else {
use std::net::ToSocketAddrs;
host.parse::<net::SocketAddr>()
.ok()?
.to_socket_addrs()
.ok()?
.map(NamedSocketAddr::Inet)
.collect::<Vec<_>>()
};
if hosts.is_empty() { if hosts.is_empty() {
return None; return None;
} }
@ -98,23 +83,14 @@ pub fn parse_and_resolve_peer_addr(peer: &str) -> Option<(NodeID, Vec<NamedSocke
} }
/// async version of parse_and_resolve_peer_addr /// async version of parse_and_resolve_peer_addr
pub async fn parse_and_resolve_peer_addr_async( pub async fn parse_and_resolve_peer_addr_async(peer: &str) -> Option<(NodeID, Vec<SocketAddr>)> {
peer: &str,
) -> Option<(NodeID, Vec<NamedSocketAddr>)> {
let delim = peer.find('@')?; let delim = peer.find('@')?;
let (key, host) = peer.split_at(delim); let (key, host) = peer.split_at(delim);
let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?; let pubkey = NodeID::from_slice(&hex::decode(key).ok()?)?;
let host = &host[1..]; let hosts = tokio::net::lookup_host(&host[1..])
let hosts = if UnixSocketAddr::is_pathname(host) { .await
vec![PathBuf::from_str(host).unwrap().into()] .ok()?
} else { .collect::<Vec<_>>();
tokio::net::lookup_host(host)
.await
.ok()?
.map(NamedSocketAddr::Inet)
.collect::<Vec<_>>()
};
if hosts.is_empty() { if hosts.is_empty() {
return None; return None;
} }