use std::any::Any; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr}; use std::pin::Pin; use std::sync::{Arc, RwLock}; use std::time::Instant; use std::future::Future; use log::{debug, info}; use arc_swap::{ArcSwap, ArcSwapOption}; use bytes::Bytes; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use tokio::net::{TcpListener, TcpStream}; use crate::conn::*; use crate::error::*; use crate::message::*; use crate::proto::*; use crate::util::*; type DynMsg = Box; type OnConnectHandler = Box; type OnDisconnectHandler = Box; pub(crate) type LocalHandler = Box Pin + Sync + Send>> + Sync + Send>; pub(crate) type NetHandler = Box< dyn Fn(NodeID, Bytes) -> Pin> + Sync + Send>> + Sync + Send, >; pub(crate) struct Handler { pub(crate) local_handler: LocalHandler, pub(crate) net_handler: NetHandler, } /// NetApp is the main class that handles incoming and outgoing connections. /// /// The `request()` method can be used to send a message to any peer to which we have /// an outgoing connection, or to ourself. On the server side, these messages are /// processed by the handlers that have been defined using `add_msg_handler()`. /// /// NetApp can be used in a stand-alone fashion or together with a peering strategy. /// If using it alone, you will want to set `on_connect` and `on_disconnect` events /// in order to manage information about the current peer list. /// /// It is generally not necessary to use NetApp stand-alone, as the provided full mesh /// and RPS peering strategies take care of the most common use cases. pub struct NetApp { listen_params: ArcSwapOption, /// Network secret key pub netid: auth::Key, /// Our peer ID pub id: NodeID, /// Private key associated with our peer ID pub privkey: ed25519::SecretKey, server_conns: RwLock>>, client_conns: RwLock>>, pub(crate) msg_handlers: ArcSwap>>, on_connected_handler: ArcSwapOption, on_disconnected_handler: ArcSwapOption, } struct ListenParams { listen_addr: SocketAddr, public_addr: Option, } async fn net_handler_aux(handler: Arc, remote: NodeID, bytes: Bytes) -> Vec where M: Message + 'static, F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync, { debug!( "Handling message of kind {:08x} from {}", M::KIND, hex::encode(remote) ); let begin_time = Instant::now(); let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) { Ok(msg) => Ok(handler(remote, msg).await), Err(e) => Err(e.to_string()), }; let end_time = Instant::now(); debug!( "Request {:08x} from {} handled in {}msec", M::KIND, hex::encode(remote), (end_time - begin_time).as_millis() ); rmp_to_vec_all_named(&res).unwrap_or_default() } async fn local_handler_aux(handler: Arc, remote: NodeID, msg: DynMsg) -> DynMsg where M: Message + 'static, F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync, { debug!("Handling message of kind {:08x} from ourself", M::KIND); let msg = (msg as Box).downcast::().unwrap(); let res = handler(remote, *msg).await; Box::new(res) } impl NetApp { /// Creates a new instance of NetApp, which can serve either as a full p2p node, /// or just as a passive client. To upgrade to a full p2p node, spawn a listener /// using `.listen()` /// /// Our Peer ID is the public key associated to the secret key given here. pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc { let id = privkey.public_key(); let netapp = Arc::new(Self { listen_params: ArcSwapOption::new(None), netid, id, privkey, server_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()), msg_handlers: ArcSwap::new(Arc::new(HashMap::new())), on_connected_handler: ArcSwapOption::new(None), on_disconnected_handler: ArcSwapOption::new(None), }); let netapp2 = netapp.clone(); netapp.add_msg_handler::(move |from: NodeID, msg: HelloMessage| { netapp2.handle_hello_message(from, msg); async {} }); netapp } /// Set the handler to be called when a new connection (incoming or outgoing) has /// been successfully established. Do not set this if using a peering strategy, /// as the peering strategy will need to set this itself. pub fn on_connected(&self, handler: F) where F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static, { self.on_connected_handler .store(Some(Arc::new(Box::new(handler)))); } /// Set the handler to be called when an existing connection (incoming or outgoing) has /// been closed by either party. Do not set this if using a peering strategy, /// as the peering strategy will need to set this itself. pub fn on_disconnected(&self, handler: F) where F: Fn(NodeID, bool) + Sized + Send + Sync + 'static, { self.on_disconnected_handler .store(Some(Arc::new(Box::new(handler)))); } /// Add a handler for a certain message type. Note that only one handler /// can be specified for each message type. /// The handler is an asynchronous function, i.e. a function that returns /// a future. pub fn add_msg_handler(&self, handler: F) where M: Message + 'static, F: Fn(NodeID, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync + 'static, { let handler = Arc::new(handler); let handler2 = handler.clone(); let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| { let fun: Pin> + Sync + Send>> = Box::pin(net_handler_aux(handler2.clone(), remote, bytes)); fun }); let self_id = self.id; let local_handler = Box::new(move |msg: DynMsg| { let fun: Pin + Sync + Send>> = Box::pin(local_handler_aux(handler.clone(), self_id, msg)); fun }); let funs = Arc::new(Handler { net_handler, local_handler, }); let mut handlers = self.msg_handlers.load().as_ref().clone(); handlers.insert(M::KIND, funs); self.msg_handlers.store(Arc::new(handlers)); } /// Main listening process for our app. This future runs during the whole /// run time of our application. /// If this is not called, the NetApp instance remains a passive client. pub async fn listen(self: Arc, listen_addr: SocketAddr, public_addr: Option) { let listen_params = ListenParams { listen_addr, public_addr, }; self.listen_params.store(Some(Arc::new(listen_params))); let listener = TcpListener::bind(listen_addr).await.unwrap(); info!("Listening on {}", listen_addr); loop { // The second item contains the IP and port of the new connection. let (socket, _) = listener.accept().await.unwrap(); info!( "Incoming connection from {}, negotiating handshake...", match socket.peer_addr() { Ok(x) => format!("{}", x), Err(e) => format!("", e), } ); let self2 = self.clone(); tokio::spawn(async move { ServerConn::run(self2, socket) .await .log_err("ServerConn::run"); }); } } /// Attempt to connect to a peer, given by its ip:port and its public key. /// The public key will be checked during the secret handshake process. /// This function returns once the connection has been established and a /// successfull handshake was made. At this point we can send messages to /// the other node with `Netapp::request` pub async fn try_connect(self: Arc, ip: SocketAddr, id: NodeID) -> Result<(), Error> { // Don't connect to ourself, we don't care // but pretend we did if id == self.id { tokio::spawn(async move { if let Some(h) = self.on_connected_handler.load().as_ref() { h(id, ip, false); } }); return Ok(()); } // Don't connect if already connected if self.client_conns.read().unwrap().contains_key(&id) { return Ok(()); } let socket = TcpStream::connect(ip).await?; info!("Connected to {}, negotiating handshake...", ip); ClientConn::init(self, socket, id).await?; Ok(()) } /// Close the outgoing connection we have to a node specified by its public key, /// if such a connection is currently open. pub fn disconnect(self: &Arc, id: &NodeID) { // If id is ourself, we're not supposed to have a connection open if *id != self.id { let conn = self.client_conns.write().unwrap().remove(id); if let Some(c) = conn { debug!( "Closing connection to {} ({})", hex::encode(c.peer_id), c.remote_addr ); c.close(); } else { return; } } // call on_disconnected_handler immediately, since the connection // was removed // (if id == self.id, we pretend we disconnected) let id = *id; let self2 = self.clone(); tokio::spawn(async move { if let Some(h) = self2.on_disconnected_handler.load().as_ref() { h(id, false); } }); } /// Close the incoming connection from a certain client to us, /// if such a connection is currently open. pub fn server_disconnect(self: &Arc, id: &NodeID) { let conn = self.server_conns.read().unwrap().get(id).cloned(); if let Some(c) = conn { debug!( "Closing incoming connection from {} ({})", hex::encode(c.peer_id), c.remote_addr ); c.close(); } } // Called from conn.rs when an incoming connection is successfully established // Registers the connection in our list of connections // Do not yet call the on_connected handler, because we don't know if the remote // has an actual IP address and port we can call them back on. // We will know this when they send a Hello message, which is handled below. pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc) { info!("Accepted connection from {}", hex::encode(id)); self.server_conns.write().unwrap().insert(id, conn); } // Handle hello message from a client. This message is used for them to tell us // that they are listening on a certain port number on which we can call them back. // At this point we know they are a full network member, and not just a client, // and we call the on_connected handler so that the peering strategy knows // we have a new potential peer fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) { if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip()); let remote_addr = SocketAddr::new(remote_ip, msg.server_port); h(id, remote_addr, true); } } } // Called from conn.rs when an incoming connection is closed. // We deregister the connection from server_conns and call the // handler registered by on_disconnected pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc) { info!("Connection from {} closed", hex::encode(id)); let mut conn_list = self.server_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_id, true); } } } } // Called from conn.rs when an outgoinc connection is successfully established. // The connection is registered in self.client_conns, and the // on_connected handler is called. // // Since we are ourself listening, we send them a Hello message so that // they know on which port to call us back. (TODO: don't do this if we are // just a simple client and not a full p2p node) pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc) { info!("Connection established to {}", hex::encode(id)); { let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone()); if let Some(old_c) = old_c_opt { tokio::spawn(async move { old_c.close() }); } } if let Some(h) = self.on_connected_handler.load().as_ref() { h(conn.peer_id, conn.remote_addr, false); } if let Some(lp) = self.listen_params.load_full() { let server_addr = lp.public_addr; let server_port = lp.listen_addr.port(); tokio::spawn(async move { conn.request( HelloMessage { server_addr, server_port, }, PRIO_NORMAL, ) .await .log_err("Sending hello message"); }); } } // Called from conn.rs when an outgoinc connection is closed. // The connection is removed from conn_list, and the on_disconnected handler // is called. pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc) { info!("Connection to {} closed", hex::encode(id)); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_id, false); } } } // else case: happens if connection was removed in .disconnect() // in which case on_disconnected_handler was already called } /// Send a message to a remote host to which a client connection is already /// established, and await their response. The target is the id of the peer we /// want to send the message to. /// The priority is an `u8`, with lower numbers meaning highest priority. pub async fn request( &self, target: &NodeID, rq: T, prio: RequestPriority, ) -> Result<::Response, Error> where T: Message + 'static, { if *target == self.id { let handler = self.msg_handlers.load().get(&T::KIND).cloned(); match handler { None => Err(Error::Message(format!( "No handler registered for message kind {:08x}", T::KIND ))), Some(h) => { let local_handler = &h.local_handler; let res = local_handler(Box::new(rq)).await; let res_t = (res as Box) .downcast::<::Response>() .unwrap(); Ok(*res_t) } } } else { let conn = self.client_conns.read().unwrap().get(target).cloned(); match conn { None => Err(Error::Message(format!( "Not connected: {}", hex::encode(target) ))), Some(c) => c.request(rq, prio).await, } } } }