diff --git a/Cargo.lock b/Cargo.lock index e5a2551..522fe30 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1,11 +1,5 @@ # This file is automatically @generated by Cargo. # It is not intended for manual editing. -[[package]] -name = "adler32" -version = "1.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "aae1277d39aeec15cb388266ecc24b11c80469deae6067e17a1a7aa9e5c1f234" - [[package]] name = "ahash" version = "0.4.6" @@ -288,15 +282,6 @@ dependencies = [ "cache-padded", ] -[[package]] -name = "crc32fast" -version = "1.2.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81156fece84ab6a9f2afdb109ce3ae577e42b1228441eded99bd77f627953b1a" -dependencies = [ - "cfg-if 1.0.0", -] - [[package]] name = "crossbeam-utils" version = "0.8.1" @@ -350,18 +335,6 @@ dependencies = [ "instant", ] -[[package]] -name = "filetime" -version = "0.2.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0c122a393ea57648015bf06fbd3d372378992e86b9ff5a7a497b076a28c79efe" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "redox_syscall", - "winapi 0.3.9", -] - [[package]] name = "fnv" version = "1.0.7" @@ -596,12 +569,23 @@ dependencies = [ "async-std", "futures", "hex", + "kuska-sodiumoxide", "log", - "sodiumoxide", "thiserror", "tokio", ] +[[package]] +name = "kuska-sodiumoxide" +version = "0.2.5-0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae0f8eafdd240b722243787b51fdaf8df6693fb8621d0f7061cdba574214cf88" +dependencies = [ + "libc", + "libsodium-sys", + "serde", +] + [[package]] name = "kv-log-macro" version = "1.0.7" @@ -623,29 +607,15 @@ version = "0.2.80" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58d1b70b004888f764dfbf6a26a3b0342a1632d33968e4a179d8011c760614" -[[package]] -name = "libflate" -version = "0.1.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d9135df43b1f5d0e333385cb6e7897ecd1a43d7d11b91ac003f4d2c2d2401fdd" -dependencies = [ - "adler32", - "crc32fast", - "rle-decode-fast", - "take_mut", -] - [[package]] name = "libsodium-sys" -version = "0.2.4" -source = "git+https://github.com/Dhole/sodiumoxidez?branch=extra#53c0fb16069309c35010eb568d9ed05f5bd52ce8" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a685b64f837b339074115f2e7f7b431ac73681d08d75b389db7498b8892b8a58" dependencies = [ "cc", "libc", - "libflate", "pkg-config", - "tar", - "vcpkg", ] [[package]] @@ -685,24 +655,12 @@ dependencies = [ "kernel32-sys", "libc", "log", - "miow 0.2.2", + "miow", "net2", "slab", "winapi 0.2.8", ] -[[package]] -name = "mio-named-pipes" -version = "0.1.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0840c1c50fd55e521b247f949c241c9997709f23bd7f023b9762cd561e935656" -dependencies = [ - "log", - "mio", - "miow 0.3.6", - "winapi 0.3.9", -] - [[package]] name = "mio-uds" version = "0.6.8" @@ -726,16 +684,6 @@ dependencies = [ "ws2_32-sys", ] -[[package]] -name = "miow" -version = "0.3.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5a33c1b55807fbed163481b5ba66db4b2fa6cde694a5027be10fb724206c5897" -dependencies = [ - "socket2", - "winapi 0.3.9", -] - [[package]] name = "nb-connect" version = "1.0.2" @@ -771,12 +719,12 @@ dependencies = [ "err-derive", "hex", "kuska-handshake", + "kuska-sodiumoxide", "log", "lru", "rand", "rmp-serde", "serde", - "sodiumoxide", "structopt", "tokio", ] @@ -955,12 +903,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" -[[package]] -name = "redox_syscall" -version = "0.1.57" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" - [[package]] name = "regex" version = "1.4.2" @@ -979,12 +921,6 @@ version = "0.6.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3b181ba2dcf07aaccad5448e8ead58db5b742cf85dfe035e2227f137a539a189" -[[package]] -name = "rle-decode-fast" -version = "1.0.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cabe4fa914dec5870285fa7f71f602645da47c486e68486d2b4ceb4a343e90ac" - [[package]] name = "rmp" version = "0.8.9" @@ -1032,43 +968,12 @@ dependencies = [ "syn", ] -[[package]] -name = "signal-hook-registry" -version = "1.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ce32ea0c6c56d5eacaeb814fbed9960547021d3edd010ded1425f180536b20ab" -dependencies = [ - "libc", -] - [[package]] name = "slab" version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" -[[package]] -name = "socket2" -version = "0.3.17" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c29947abdee2a218277abeca306f25789c938e500ea5a9d4b12a5a504466902" -dependencies = [ - "cfg-if 1.0.0", - "libc", - "redox_syscall", - "winapi 0.3.9", -] - -[[package]] -name = "sodiumoxide" -version = "0.2.4" -source = "git+https://github.com/Dhole/sodiumoxidez?branch=extra#53c0fb16069309c35010eb568d9ed05f5bd52ce8" -dependencies = [ - "libc", - "libsodium-sys", - "serde", -] - [[package]] name = "structopt" version = "0.3.21" @@ -1116,24 +1021,6 @@ dependencies = [ "unicode-xid", ] -[[package]] -name = "take_mut" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60" - -[[package]] -name = "tar" -version = "0.4.30" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "489997b7557e9a43e192c527face4feacc78bfbe6eed67fd55c4c9e381cba290" -dependencies = [ - "filetime", - "libc", - "redox_syscall", - "xattr", -] - [[package]] name = "termcolor" version = "1.1.2" @@ -1200,20 +1087,16 @@ checksum = "a6d7ad61edd59bfcc7e80dababf0f4aed2e6d5e0ba1659356ae889752dfc12ff" dependencies = [ "bytes 0.5.6", "fnv", - "futures-core", "iovec", "lazy_static", "libc", "memchr", "mio", - "mio-named-pipes", "mio-uds", "num_cpus", "pin-project-lite", - "signal-hook-registry", "slab", "tokio-macros", - "winapi 0.3.9", ] [[package]] @@ -1245,12 +1128,6 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f7fe0bb3479651439c9112f72b6c505038574c9fbb575ed1bf3b797fa39dd564" -[[package]] -name = "vcpkg" -version = "0.2.10" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6454029bf181f092ad1b853286f23e2c507d8e8194d01d92da4a55c274a5508c" - [[package]] name = "vec-arena" version = "1.0.0" @@ -1412,12 +1289,3 @@ dependencies = [ "winapi 0.2.8", "winapi-build", ] - -[[package]] -name = "xattr" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "244c3741f4240ef46274860397c7c74e50eb23624996930e484c16679633a54c" -dependencies = [ - "libc", -] diff --git a/Cargo.toml b/Cargo.toml index 20f69f1..3d0cd1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,16 +1,16 @@ [package] name = "netapp" version = "0.1.0" -authors = ["Alex Auvolat "] +authors = ["Alex Auvolat "] edition = "2018" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] -async-std = { version = "1.5.0", features=["unstable","attributes"] } -tokio = { version = "0.2", features = ["full"] } +async-std = { version = "1.5.0", default-features = false } +tokio = { version = "0.2", default-features = false, features = ["net", "tcp", "rt-core", "rt-threaded", "sync", "time", "macros"] } -serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } +serde = { version = "1.0", default-features = false, features = ["derive"] } rmp-serde = "0.14.3" hex = "0.4.2" base64 = "0.12.1" @@ -27,7 +27,7 @@ err-derive = "0.2.3" bytes = "0.6.0" lru = "0.6" -sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } -kuska-handshake = { path = "../handshake", features = ["default", "tokio_compat"] } -#kuska-handshake = { git = "https://github.com/kuska-ssb/handshake", branch = "master", features = ["default", "tokio_compat"] } +sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } +kuska-handshake = { version = "0.1.1", path = "../handshake", features = ["default", "tokio_compat"] } +#kuska-handshake = { version = "0.1.1", git = "https://github.com/kuska-ssb/handshake", branch = "master", features = ["default", "tokio_compat"] } diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 88784c1..acc0a7b 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -10,6 +10,7 @@ use sodiumoxide::crypto::sign::ed25519; use netapp::peering::fullmesh::*; use netapp::NetApp; +use netapp::NodeID; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] @@ -65,15 +66,13 @@ async fn main() { info!("Node private key: {}", hex::encode(&privkey)); info!("Node public key: {}", hex::encode(&privkey.public_key())); - let listen_addr = opt.listen_addr.parse().unwrap(); - let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); - let netapp = NetApp::new(listen_addr, public_addr, netid, privkey); + let netapp = NetApp::new(netid, privkey); let mut bootstrap_peers = vec![]; for peer in opt.bootstrap_peers.iter() { if let Some(delim) = peer.find('@') { let (key, ip) = peer.split_at(delim); - let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); + let pubkey = NodeID::from_slice(&hex::decode(&key).unwrap()).unwrap(); let ip = ip[1..].parse::().unwrap(); bootstrap_peers.push((pubkey, ip)); } @@ -81,5 +80,7 @@ async fn main() { let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers); - tokio::join!(netapp.listen(), peering.run(),); + let listen_addr = opt.listen_addr.parse().unwrap(); + let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); + tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),); } diff --git a/src/conn.rs b/src/conn.rs index e965fc9..a4c4f4e 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex}; use bytes::Bytes; use log::{debug, error, trace}; -use sodiumoxide::crypto::sign::ed25519; use tokio::io::split; use tokio::net::TcpStream; use tokio::sync::{mpsc, oneshot, watch}; @@ -26,7 +25,7 @@ use crate::util::*; pub(crate) struct ServerConn { pub(crate) remote_addr: SocketAddr, - pub(crate) peer_pk: ed25519::PublicKey, + pub(crate) peer_id: NodeID, netapp: Arc, @@ -40,18 +39,18 @@ impl ServerConn { let handshake = handshake_server( &mut asyncstd_socket, netapp.netid.clone(), - netapp.pubkey.clone(), + netapp.id.clone(), netapp.privkey.clone(), ) .await?; - let peer_pk = handshake.peer_pk.clone(); + let peer_id = handshake.peer_pk.clone(); let tokio_socket = asyncstd_socket.into_inner(); let remote_addr = tokio_socket.peer_addr().unwrap(); debug!( "Handshake complete (server) with {}@{}", - hex::encode(&peer_pk), + hex::encode(&peer_id), remote_addr ); @@ -70,12 +69,12 @@ impl ServerConn { let conn = Arc::new(ServerConn { netapp: netapp.clone(), remote_addr, - peer_pk: peer_pk.clone(), + peer_id: peer_id.clone(), resp_send, close_send, }); - netapp.connected_as_server(peer_pk.clone(), conn.clone()); + netapp.connected_as_server(peer_id.clone(), conn.clone()); let conn2 = conn.clone(); let conn3 = conn.clone(); @@ -97,7 +96,7 @@ impl ServerConn { .map(|_| ()) .log_err("ServerConn recv_loop/send_loop"); - netapp.disconnected_as_server(&peer_pk, conn); + netapp.disconnected_as_server(&peer_id, conn); Ok(()) } @@ -124,7 +123,7 @@ impl RecvLoop for ServerConn { if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) { let net_handler = &handler.net_handler; - let resp = net_handler(self.peer_pk.clone(), bytes.slice(5..)).await; + let resp = net_handler(self.peer_id.clone(), bytes.slice(5..)).await; self.resp_send .send(Some((id, prio, resp))) .log_err("ServerConn recv_handler send resp"); @@ -133,7 +132,7 @@ impl RecvLoop for ServerConn { } pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, - pub(crate) peer_pk: ed25519::PublicKey, + pub(crate) peer_id: NodeID, query_send: mpsc::UnboundedSender)>>, @@ -147,16 +146,16 @@ impl ClientConn { pub(crate) async fn init( netapp: Arc, socket: TcpStream, - remote_pk: ed25519::PublicKey, + peer_id: NodeID, ) -> Result<(), Error> { let mut asyncstd_socket = TokioCompatExt::wrap(socket); let handshake = handshake_client( &mut asyncstd_socket, netapp.netid.clone(), - netapp.pubkey.clone(), + netapp.id.clone(), netapp.privkey.clone(), - remote_pk.clone(), + peer_id.clone(), ) .await?; @@ -165,7 +164,7 @@ impl ClientConn { debug!( "Handshake complete (client) with {}@{}", - hex::encode(&remote_pk), + hex::encode(&peer_id), remote_addr ); @@ -183,7 +182,7 @@ impl ClientConn { let conn = Arc::new(ClientConn { remote_addr, - peer_pk: remote_pk.clone(), + peer_id: peer_id.clone(), next_query_number: AtomicU16::from(0u16), query_send, inflight: Mutex::new(HashMap::new()), @@ -191,7 +190,7 @@ impl ClientConn { stop_recv_loop, }); - netapp.connected_as_client(remote_pk.clone(), conn.clone()); + netapp.connected_as_client(peer_id.clone(), conn.clone()); tokio::spawn(async move { let conn2 = conn.clone(); @@ -205,7 +204,7 @@ impl ClientConn { .map(|_| ()) .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); - netapp.disconnected_as_client(&remote_pk, conn); + netapp.disconnected_as_client(&peer_id, conn); }); Ok(()) diff --git a/src/lib.rs b/src/lib.rs index af8fbb8..6987b54 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -27,3 +27,4 @@ pub mod netapp; pub mod peering; pub use netapp::*; +pub use util::NodeID; diff --git a/src/netapp.rs b/src/netapp.rs index 8397be9..9733fb7 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -28,9 +28,7 @@ pub(crate) struct Handler { pub(crate) local_handler: Box Pin + Sync + Send>> + Sync + Send>, pub(crate) net_handler: Box< - dyn Fn(ed25519::PublicKey, Bytes) -> Pin> + Sync + Send>> - + Sync - + Send, + dyn Fn(NodeID, Bytes) -> Pin> + Sync + Send>> + Sync + Send, >, } @@ -47,30 +45,32 @@ pub(crate) struct Handler { /// It is generally not necessary to use NetApp stand-alone, as the provided full mesh /// and RPS peering strategies take care of the most common use cases. pub struct NetApp { - pub listen_addr: SocketAddr, - pub public_addr: Option, + listen_params: ArcSwapOption, + /// Network secret key pub netid: auth::Key, - pub pubkey: ed25519::PublicKey, + /// Our peer ID + pub id: NodeID, + /// Private key associated with our peer ID pub privkey: ed25519::SecretKey, - server_conns: RwLock>>, - client_conns: RwLock>>, + server_conns: RwLock>>, + client_conns: RwLock>>, pub(crate) msg_handlers: ArcSwap>>, - on_connected_handler: - ArcSwapOption>, - on_disconnected_handler: ArcSwapOption>, + on_connected_handler: ArcSwapOption>, + on_disconnected_handler: ArcSwapOption>, } -async fn net_handler_aux( - handler: Arc, - remote: ed25519::PublicKey, - bytes: Bytes, -) -> Vec +struct ListenParams { + listen_addr: SocketAddr, + public_addr: Option, +} + +async fn net_handler_aux(handler: Arc, remote: NodeID, bytes: Bytes) -> Vec where M: Message + 'static, - F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync, { debug!( @@ -93,14 +93,10 @@ where rmp_to_vec_all_named(&res).unwrap_or(vec![]) } -async fn local_handler_aux( - handler: Arc, - remote: ed25519::PublicKey, - msg: DynMsg, -) -> DynMsg +async fn local_handler_aux(handler: Arc, remote: NodeID, msg: DynMsg) -> DynMsg where M: Message + 'static, - F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync, { debug!("Handling message of kind {:08x} from ourself", M::KIND); @@ -110,19 +106,17 @@ where } impl NetApp { - /// Creates a new instance of NetApp. No background process is - pub fn new( - listen_addr: SocketAddr, - public_addr: Option, - netid: auth::Key, - privkey: ed25519::SecretKey, - ) -> Arc { - let pubkey = privkey.public_key(); + /// Creates a new instance of NetApp, which can serve either as a full p2p node, + /// or just as a passive client. To upgrade to a full p2p node, spawn a listener + /// using `.listen()` + /// + /// Our Peer ID is the public key associated to the secret key given here. + pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc { + let id = privkey.public_key(); let netapp = Arc::new(Self { - listen_addr, - public_addr, + listen_params: ArcSwapOption::new(None), netid, - pubkey, + id, privkey, server_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()), @@ -132,12 +126,10 @@ impl NetApp { }); let netapp2 = netapp.clone(); - netapp.add_msg_handler::( - move |from: ed25519::PublicKey, msg: HelloMessage| { - netapp2.handle_hello_message(from, msg); - async { () } - }, - ); + netapp.add_msg_handler::(move |from: NodeID, msg: HelloMessage| { + netapp2.handle_hello_message(from, msg); + async { () } + }); netapp } @@ -147,7 +139,7 @@ impl NetApp { /// as the peering strategy will need to set this itself. pub fn on_connected(&self, handler: F) where - F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static, + F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static, { self.on_connected_handler .store(Some(Arc::new(Box::new(handler)))); @@ -158,7 +150,7 @@ impl NetApp { /// as the peering strategy will need to set this itself. pub fn on_disconnected(&self, handler: F) where - F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static, + F: Fn(NodeID, bool) + Sized + Send + Sync + 'static, { self.on_disconnected_handler .store(Some(Arc::new(Box::new(handler)))); @@ -169,19 +161,19 @@ impl NetApp { pub fn add_msg_handler(&self, handler: F) where M: Message + 'static, - F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, + F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync + 'static, { let handler = Arc::new(handler); let handler2 = handler.clone(); - let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| { + let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| { let fun: Pin> + Sync + Send>> = Box::pin(net_handler_aux(handler2.clone(), remote, bytes)); fun }); - let self_id = self.pubkey.clone(); + let self_id = self.id.clone(); let local_handler = Box::new(move |msg: DynMsg| { let fun: Pin + Sync + Send>> = Box::pin(local_handler_aux(handler.clone(), self_id, msg)); @@ -200,9 +192,16 @@ impl NetApp { /// Main listening process for our app. This future runs during the whole /// run time of our application. - pub async fn listen(self: Arc) { - let mut listener = TcpListener::bind(self.listen_addr).await.unwrap(); - info!("Listening on {}", self.listen_addr); + /// If this is not called, the NetApp instance remains a passive client. + pub async fn listen(self: Arc, listen_addr: SocketAddr, public_addr: Option) { + let listen_params = ListenParams { + listen_addr, + public_addr, + }; + self.listen_params.store(Some(Arc::new(listen_params))); + + let mut listener = TcpListener::bind(listen_addr).await.unwrap(); + info!("Listening on {}", listen_addr); loop { // The second item contains the IP and port of the new connection. @@ -225,43 +224,39 @@ impl NetApp { /// This function returns once the connection has been established and a /// successfull handshake was made. At this point we can send messages to /// the other node with `Netapp::request` - pub async fn try_connect( - self: Arc, - ip: SocketAddr, - pk: ed25519::PublicKey, - ) -> Result<(), Error> { + pub async fn try_connect(self: Arc, ip: SocketAddr, id: NodeID) -> Result<(), Error> { // Don't connect to ourself, we don't care // but pretend we did - if pk == self.pubkey { + if id == self.id { tokio::spawn(async move { if let Some(h) = self.on_connected_handler.load().as_ref() { - h(pk, ip, false); + h(id, ip, false); } }); return Ok(()); } // Don't connect if already connected - if self.client_conns.read().unwrap().contains_key(&pk) { + if self.client_conns.read().unwrap().contains_key(&id) { return Ok(()); } let socket = TcpStream::connect(ip).await?; info!("Connected to {}, negotiating handshake...", ip); - ClientConn::init(self, socket, pk.clone()).await?; + ClientConn::init(self, socket, id.clone()).await?; Ok(()) } /// Close the outgoing connection we have to a node specified by its public key, /// if such a connection is currently open. - pub fn disconnect(self: &Arc, pk: &ed25519::PublicKey) { - // If pk is ourself, we're not supposed to have a connection open - if *pk != self.pubkey { - let conn = self.client_conns.write().unwrap().remove(pk); + pub fn disconnect(self: &Arc, id: &NodeID) { + // If id is ourself, we're not supposed to have a connection open + if *id != self.id { + let conn = self.client_conns.write().unwrap().remove(id); if let Some(c) = conn { debug!( "Closing connection to {} ({})", - hex::encode(c.peer_pk), + hex::encode(c.peer_id), c.remote_addr ); c.close(); @@ -272,24 +267,24 @@ impl NetApp { // call on_disconnected_handler immediately, since the connection // was removed - // (if pk == self.pubkey, we pretend we disconnected) - let pk = *pk; + // (if id == self.id, we pretend we disconnected) + let id = *id; let self2 = self.clone(); tokio::spawn(async move { if let Some(h) = self2.on_disconnected_handler.load().as_ref() { - h(pk, false); + h(id, false); } }); } /// Close the incoming connection from a certain client to us, /// if such a connection is currently open. - pub fn server_disconnect(self: &Arc, pk: &ed25519::PublicKey) { - let conn = self.server_conns.read().unwrap().get(pk).cloned(); + pub fn server_disconnect(self: &Arc, id: &NodeID) { + let conn = self.server_conns.read().unwrap().get(id).cloned(); if let Some(c) = conn { debug!( "Closing incoming connection from {} ({})", - hex::encode(c.peer_pk), + hex::encode(c.peer_id), c.remote_addr ); c.close(); @@ -301,7 +296,7 @@ impl NetApp { // Do not yet call the on_connected handler, because we don't know if the remote // has an actual IP address and port we can call them back on. // We will know this when they send a Hello message, which is handled below. - pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc) { + pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc) { info!("Accepted connection from {}", hex::encode(id)); self.server_conns.write().unwrap().insert(id, conn); @@ -312,11 +307,10 @@ impl NetApp { // At this point we know they are a full network member, and not just a client, // and we call the on_connected handler so that the peering strategy knows // we have a new potential peer - fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { + fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) { if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { - let remote_ip = msg.server_addr - .unwrap_or(c.remote_addr.ip()); + let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip()); let remote_addr = SocketAddr::new(remote_ip, msg.server_port); h(id, remote_addr, true); } @@ -326,7 +320,7 @@ impl NetApp { // Called from conn.rs when an incoming connection is closed. // We deregister the connection from server_conns and call the // handler registered by on_disconnected - pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc) { + pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc) { info!("Connection from {} closed", hex::encode(id)); let mut conn_list = self.server_conns.write().unwrap(); @@ -336,7 +330,7 @@ impl NetApp { drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { - h(conn.peer_pk, true); + h(conn.peer_id, true); } } } @@ -349,7 +343,7 @@ impl NetApp { // Since we are ourself listening, we send them a Hello message so that // they know on which port to call us back. (TODO: don't do this if we are // just a simple client and not a full p2p node) - pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc) { + pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc) { info!("Connection established to {}", hex::encode(id)); { @@ -360,22 +354,30 @@ impl NetApp { } if let Some(h) = self.on_connected_handler.load().as_ref() { - h(conn.peer_pk, conn.remote_addr, false); + h(conn.peer_id, conn.remote_addr, false); } - let server_addr = self.public_addr; - let server_port = self.listen_addr.port(); - tokio::spawn(async move { - conn.request(HelloMessage { server_addr, server_port }, PRIO_NORMAL) + if let Some(lp) = self.listen_params.load_full() { + let server_addr = lp.public_addr; + let server_port = lp.listen_addr.port(); + tokio::spawn(async move { + conn.request( + HelloMessage { + server_addr, + server_port, + }, + PRIO_NORMAL, + ) .await .log_err("Sending hello message"); - }); + }); + } } // Called from conn.rs when an outgoinc connection is closed. // The connection is removed from conn_list, and the on_disconnected handler // is called. - pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc) { + pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc) { info!("Connection to {} closed", hex::encode(id)); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { @@ -384,7 +386,7 @@ impl NetApp { drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { - h(conn.peer_pk, false); + h(conn.peer_id, false); } } } @@ -398,14 +400,14 @@ impl NetApp { /// The priority is an `u8`, with lower numbers meaning highest priority. pub async fn request( &self, - target: &ed25519::PublicKey, + target: &NodeID, rq: T, prio: RequestPriority, ) -> Result<::Response, Error> where T: Message + 'static, { - if *target == self.pubkey { + if *target == self.id { let handler = self.msg_handlers.load().get(&T::KIND).cloned(); match handler { None => Err(Error::Message(format!( diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index d6ca08a..a4b9248 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -8,11 +8,11 @@ use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::hash; -use sodiumoxide::crypto::sign::ed25519; use crate::message::*; use crate::netapp::*; use crate::proto::*; +use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; @@ -34,7 +34,7 @@ impl Message for PingMessage { #[derive(Serialize, Deserialize)] struct PeerListMessage { - pub list: Vec<(ed25519::PublicKey, SocketAddr)>, + pub list: Vec<(NodeID, SocketAddr)>, } impl Message for PeerListMessage { @@ -54,7 +54,7 @@ struct PeerInfo { #[derive(Copy, Clone, Debug)] pub struct PeerInfoPub { - pub id: ed25519::PublicKey, + pub id: NodeID, pub addr: SocketAddr, pub state: PeerConnState, pub last_seen: Option, @@ -86,7 +86,7 @@ pub enum PeerConnState { } struct KnownHosts { - list: HashMap, + list: HashMap, hash: hash::Digest, } @@ -99,9 +99,7 @@ impl KnownHosts { fn update_hash(&mut self) { self.hash = Self::calculate_hash(&self.list); } - fn map_into_vec( - input: &HashMap, - ) -> Vec<(ed25519::PublicKey, SocketAddr)> { + fn map_into_vec(input: &HashMap) -> Vec<(NodeID, SocketAddr)> { let mut list = Vec::with_capacity(input.len()); for (id, peer) in input.iter() { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { @@ -110,7 +108,7 @@ impl KnownHosts { } list } - fn calculate_hash(input: &HashMap) -> hash::Digest { + fn calculate_hash(input: &HashMap) -> hash::Digest { let mut list = Self::map_into_vec(input); list.sort(); let mut hash_state = hash::State::new(); @@ -129,15 +127,12 @@ pub struct FullMeshPeeringStrategy { } impl FullMeshPeeringStrategy { - pub fn new( - netapp: Arc, - bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, - ) -> Arc { + pub fn new(netapp: Arc, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc { let mut known_hosts = KnownHosts::new(); - for (pk, addr) in bootstrap_list { - if pk != netapp.pubkey { + for (id, addr) in bootstrap_list { + if id != netapp.id { known_hosts.list.insert( - pk, + id, PeerInfo { addr: addr, state: PeerConnState::Waiting(0, Instant::now()), @@ -155,20 +150,18 @@ impl FullMeshPeeringStrategy { }); let strat2 = strat.clone(); - netapp.add_msg_handler::( - move |from: ed25519::PublicKey, ping: PingMessage| { - let ping_resp = PingMessage { - id: ping.id, - peer_list_hash: strat2.known_hosts.read().unwrap().hash, - }; - debug!("Ping from {}", hex::encode(&from)); - async move { ping_resp } - }, - ); + netapp.add_msg_handler::(move |from: NodeID, ping: PingMessage| { + let ping_resp = PingMessage { + id: ping.id, + peer_list_hash: strat2.known_hosts.read().unwrap().hash, + }; + debug!("Ping from {}", hex::encode(&from)); + async move { ping_resp } + }); let strat2 = strat.clone(); netapp.add_msg_handler::( - move |_from: ed25519::PublicKey, peer_list: PeerListMessage| { + move |_from: NodeID, peer_list: PeerListMessage| { strat2.handle_peer_list(&peer_list.list[..]); let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); let resp = PeerListMessage { list: peer_list }; @@ -177,17 +170,15 @@ impl FullMeshPeeringStrategy { ); let strat2 = strat.clone(); - netapp.on_connected( - move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { - let strat2 = strat2.clone(); - tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); - }, - ); + netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| { + let strat2 = strat2.clone(); + tokio::spawn(strat2.on_connected(id, addr, is_incoming)); + }); let strat2 = strat.clone(); - netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| { + netapp.on_disconnected(move |id: NodeID, is_incoming: bool| { let strat2 = strat2.clone(); - tokio::spawn(strat2.on_disconnected(pk, is_incoming)); + tokio::spawn(strat2.on_disconnected(id, is_incoming)); }); strat @@ -254,7 +245,7 @@ impl FullMeshPeeringStrategy { } } - async fn ping(self: Arc, id: ed25519::PublicKey) { + async fn ping(self: Arc, id: NodeID) { let peer_list_hash = self.known_hosts.read().unwrap().hash; let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_time = Instant::now(); @@ -295,7 +286,7 @@ impl FullMeshPeeringStrategy { } } - async fn exchange_peers(self: Arc, id: &ed25519::PublicKey) { + async fn exchange_peers(self: Arc, id: &NodeID) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await { @@ -306,7 +297,7 @@ impl FullMeshPeeringStrategy { } } - fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) { + fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) { let mut known_hosts = self.known_hosts.write().unwrap(); for (id, addr) in list.iter() { if !known_hosts.list.contains_key(id) { @@ -315,7 +306,7 @@ impl FullMeshPeeringStrategy { } } - async fn try_connect(self: Arc, id: ed25519::PublicKey, addr: SocketAddr) { + async fn try_connect(self: Arc, id: NodeID, addr: SocketAddr) { let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await; if let Err(e) = conn_result { warn!("Error connecting to {}: {}", hex::encode(id), e); @@ -335,35 +326,30 @@ impl FullMeshPeeringStrategy { } } - async fn on_connected( - self: Arc, - pk: ed25519::PublicKey, - addr: SocketAddr, - is_incoming: bool, - ) { + async fn on_connected(self: Arc, id: NodeID, addr: SocketAddr, is_incoming: bool) { if is_incoming { - if !self.known_hosts.read().unwrap().list.contains_key(&pk) { + if !self.known_hosts.read().unwrap().list.contains_key(&id) { self.known_hosts .write() .unwrap() .list - .insert(pk, self.new_peer(&pk, addr)); + .insert(id, self.new_peer(&id, addr)); } } else { - info!("Successfully connected to {} at {}", hex::encode(&pk), addr); + info!("Successfully connected to {} at {}", hex::encode(&id), addr); let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&pk) { + if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; known_hosts.update_hash(); } } } - async fn on_disconnected(self: Arc, pk: ed25519::PublicKey, is_incoming: bool) { + async fn on_disconnected(self: Arc, id: NodeID, is_incoming: bool) { if !is_incoming { - info!("Connection to {} was closed", hex::encode(pk)); + info!("Connection to {} was closed", hex::encode(id)); let mut known_hosts = self.known_hosts.write().unwrap(); - if let Some(host) = known_hosts.list.get_mut(&pk) { + if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Waiting(0, Instant::now()); known_hosts.update_hash(); } @@ -406,8 +392,8 @@ impl FullMeshPeeringStrategy { ret } - fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo { - let state = if *id == self.netapp.pubkey { + fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo { + let state = if *id == self.netapp.id { PeerConnState::Ourself } else { PeerConnState::Waiting(0, Instant::now()) diff --git a/src/util.rs b/src/util.rs index 017ef00..faeea79 100644 --- a/src/util.rs +++ b/src/util.rs @@ -2,6 +2,8 @@ use serde::Serialize; use tokio::sync::watch; +pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey; + /// Utility function: encodes any serializable value in MessagePack binary format /// using the RMP library. ///