use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::StreamExt; use tokio::net::{TcpListener, TcpStream}; use tokio::select; use tokio::sync::{mpsc, watch}; use crate::client::*; use crate::endpoint::*; use crate::error::*; use crate::message::*; use crate::server::*; use crate::util::*; /// Tag which is exchanged between client and server upon connection establishment /// to check that they are running compatible versions of Netapp, /// composed of 8 bytes for Netapp version and 8 bytes for client version pub(crate) type VersionTag = [u8; 16]; /// Value of the Netapp version used in the version tag pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700004; // netapp 0x0004 #[derive(Serialize, Deserialize, Debug, Clone)] pub(crate) struct HelloMessage { pub server_addr: Option, pub server_port: u16, } impl Message for HelloMessage { type Response = (); } type OnConnectHandler = Box; type OnDisconnectHandler = Box; /// NetApp is the main class that handles incoming and outgoing connections. /// /// NetApp can be used in a stand-alone fashion or together with a peering strategy. /// If using it alone, you will want to set `on_connect` and `on_disconnect` events /// in order to manage information about the current peer list. /// /// It is generally not necessary to use NetApp stand-alone, as the provided full mesh /// and RPS peering strategies take care of the most common use cases. pub struct NetApp { listen_params: ArcSwapOption, /// Version tag, 8 bytes for netapp version, 8 bytes for app version pub version_tag: VersionTag, /// Network secret key pub netid: auth::Key, /// Our peer ID pub id: NodeID, /// Private key associated with our peer ID pub privkey: ed25519::SecretKey, pub(crate) server_conns: RwLock>>, pub(crate) client_conns: RwLock>>, pub(crate) endpoints: RwLock>, hello_endpoint: ArcSwapOption>, on_connected_handler: ArcSwapOption, on_disconnected_handler: ArcSwapOption, } struct ListenParams { listen_addr: SocketAddr, public_addr: Option, } impl NetApp { /// Creates a new instance of NetApp, which can serve either as a full p2p node, /// or just as a passive client. To upgrade to a full p2p node, spawn a listener /// using `.listen()` /// /// Our Peer ID is the public key associated to the secret key given here. pub fn new(app_version_tag: u64, netid: auth::Key, privkey: ed25519::SecretKey) -> Arc { let mut version_tag = [0u8; 16]; version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]); version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]); let id = privkey.public_key(); let netapp = Arc::new(Self { listen_params: ArcSwapOption::new(None), version_tag, netid, id, privkey, server_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()), endpoints: RwLock::new(HashMap::new()), hello_endpoint: ArcSwapOption::new(None), on_connected_handler: ArcSwapOption::new(None), on_disconnected_handler: ArcSwapOption::new(None), }); netapp .hello_endpoint .swap(Some(netapp.endpoint("__netapp/netapp.rs/Hello".into()))); netapp .hello_endpoint .load_full() .unwrap() .set_handler(netapp.clone()); netapp } /// Set the handler to be called when a new connection (incoming or outgoing) has /// been successfully established. Do not set this if using a peering strategy, /// as the peering strategy will need to set this itself. pub fn on_connected(&self, handler: F) where F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static, { self.on_connected_handler .store(Some(Arc::new(Box::new(handler)))); } /// Set the handler to be called when an existing connection (incoming or outgoing) has /// been closed by either party. Do not set this if using a peering strategy, /// as the peering strategy will need to set this itself. pub fn on_disconnected(&self, handler: F) where F: Fn(NodeID, bool) + Sized + Send + Sync + 'static, { self.on_disconnected_handler .store(Some(Arc::new(Box::new(handler)))); } /// Create a new endpoint with path `path`, /// that handles messages of type `M`. /// `H` is the type of the object that should handle requests /// to this endpoint on the local node. If you don't want /// to handle request on the local node (e.g. if this node /// is only a client in the network), define the type `H` /// to be `()`. /// This function will panic if the endpoint has already been /// created. pub fn endpoint(self: &Arc, path: String) -> Arc> where M: Message + 'static, H: StreamingEndpointHandler + 'static, { let endpoint = Arc::new(Endpoint::::new(self.clone(), path.clone())); let endpoint_arc = EndpointArc(endpoint.clone()); if self .endpoints .write() .unwrap() .insert(path.clone(), Box::new(endpoint_arc)) .is_some() { panic!("Redefining endpoint: {}", path); }; endpoint } /// Main listening process for our app. This future runs during the whole /// run time of our application. /// If this is not called, the NetApp instance remains a passive client. pub async fn listen( self: Arc, listen_addr: SocketAddr, public_addr: Option, mut must_exit: watch::Receiver, ) { let listen_params = ListenParams { listen_addr, public_addr, }; if self .listen_params .swap(Some(Arc::new(listen_params))) .is_some() { error!("Trying to listen on NetApp but we're already listening!"); } let listener = TcpListener::bind(listen_addr).await.unwrap(); info!("Listening on {}", listen_addr); let (conn_in, mut conn_out) = mpsc::unbounded_channel(); let connection_collector = tokio::spawn(async move { let mut collection = FuturesUnordered::new(); loop { if collection.is_empty() { match conn_out.recv().await { Some(f) => collection.push(f), None => break, } } else { select! { new_fut = conn_out.recv() => { match new_fut { Some(f) => collection.push(f), None => break, } } result = collection.next() => { trace!("Collected connection: {:?}", result); } } } } debug!("Collecting last open server connections."); while let Some(conn_res) = collection.next().await { trace!("Collected connection: {:?}", conn_res); } debug!("No more server connections to collect"); }); while !*must_exit.borrow_and_update() { let (socket, peer_addr) = select! { sockres = listener.accept() => { match sockres { Ok(x) => x, Err(e) => { warn!("Error in listener.accept: {}", e); continue; } } }, _ = must_exit.changed() => continue, }; info!( "Incoming connection from {}, negotiating handshake...", peer_addr ); let self2 = self.clone(); let must_exit2 = must_exit.clone(); conn_in .send(tokio::spawn(async move { ServerConn::run(self2, socket, must_exit2) .await .log_err("ServerConn::run"); })) .log_err("Failed to send connection to connection collector"); } drop(conn_in); connection_collector .await .log_err("Failed to await for connection collector"); } /// Drop all endpoint handlers, as well as handlers for connection/disconnection /// events. (This disables the peering strategy) /// /// Use this when terminating to break reference cycles pub fn drop_all_handlers(&self) { for (_, endpoint) in self.endpoints.read().unwrap().iter() { endpoint.drop_handler(); } self.on_connected_handler.store(None); self.on_disconnected_handler.store(None); } /// Attempt to connect to a peer, given by its ip:port and its public key. /// The public key will be checked during the secret handshake process. /// This function returns once the connection has been established and a /// successfull handshake was made. At this point we can send messages to /// the other node with `Netapp::request` pub async fn try_connect(self: Arc, ip: SocketAddr, id: NodeID) -> Result<(), Error> { // Don't connect to ourself, we don't care // but pretend we did if id == self.id { tokio::spawn(async move { if let Some(h) = self.on_connected_handler.load().as_ref() { h(id, ip, false); } }); return Ok(()); } // Don't connect if already connected if self.client_conns.read().unwrap().contains_key(&id) { return Ok(()); } let socket = TcpStream::connect(ip).await?; info!("Connected to {}, negotiating handshake...", ip); ClientConn::init(self, socket, id).await?; Ok(()) } /// Close the outgoing connection we have to a node specified by its public key, /// if such a connection is currently open. pub fn disconnect(self: &Arc, id: &NodeID) { // If id is ourself, we're not supposed to have a connection open if *id != self.id { let conn = self.client_conns.write().unwrap().remove(id); if let Some(c) = conn { debug!( "Closing connection to {} ({})", hex::encode(&c.peer_id[..8]), c.remote_addr ); c.close(); } else { return; } } // call on_disconnected_handler immediately, since the connection // was removed // (if id == self.id, we pretend we disconnected) let id = *id; let self2 = self.clone(); tokio::spawn(async move { if let Some(h) = self2.on_disconnected_handler.load().as_ref() { h(id, false); } }); } // Called from conn.rs when an incoming connection is successfully established // Registers the connection in our list of connections // Do not yet call the on_connected handler, because we don't know if the remote // has an actual IP address and port we can call them back on. // We will know this when they send a Hello message, which is handled below. pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc) { info!( "Accepted connection from {} at {}", hex::encode(&id[..8]), conn.remote_addr ); self.server_conns.write().unwrap().insert(id, conn); } // Handle hello message from a client. This message is used for them to tell us // that they are listening on a certain port number on which we can call them back. // At this point we know they are a full network member, and not just a client, // and we call the on_connected handler so that the peering strategy knows // we have a new potential peer // Called from conn.rs when an incoming connection is closed. // We deregister the connection from server_conns and call the // handler registered by on_disconnected pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc) { info!("Connection from {} closed", hex::encode(&id[..8])); let mut conn_list = self.server_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_id, true); } } } } // Called from conn.rs when an outgoinc connection is successfully established. // The connection is registered in self.client_conns, and the // on_connected handler is called. // // Since we are ourself listening, we send them a Hello message so that // they know on which port to call us back. (TODO: don't do this if we are // just a simple client and not a full p2p node) pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc) { info!("Connection established to {}", hex::encode(&id[..8])); { let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone()); if let Some(old_c) = old_c_opt { tokio::spawn(async move { old_c.close() }); } } if let Some(h) = self.on_connected_handler.load().as_ref() { h(conn.peer_id, conn.remote_addr, false); } if let Some(lp) = self.listen_params.load_full() { let server_addr = lp.public_addr; let server_port = lp.listen_addr.port(); let hello_endpoint = self.hello_endpoint.load_full().unwrap(); tokio::spawn(async move { hello_endpoint .call( &conn.peer_id, HelloMessage { server_addr, server_port, }, PRIO_NORMAL, ) .await .map(|_| ()) .log_err("Sending hello message"); }); } } // Called from conn.rs when an outgoinc connection is closed. // The connection is removed from conn_list, and the on_disconnected handler // is called. pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc) { info!("Connection to {} closed", hex::encode(&id[..8])); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_id, false); } } } // else case: happens if connection was removed in .disconnect() // in which case on_disconnected_handler was already called } } #[async_trait] impl EndpointHandler for NetApp { async fn handle(self: &Arc, msg: &HelloMessage, from: NodeID) { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&from) { let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip()); let remote_addr = SocketAddr::new(remote_ip, msg.server_port); h(from, remote_addr, true); } } } }