diff --git a/examples/basalt.rs b/examples/basalt.rs index bf00547..d6ce520 100644 --- a/examples/basalt.rs +++ b/examples/basalt.rs @@ -9,7 +9,7 @@ use structopt::StructOpt; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; -use netapp::netapp::*; +use netapp::NetApp; use netapp::peering::basalt::*; #[derive(StructOpt, Debug)] diff --git a/examples/fullmesh.rs b/examples/fullmesh.rs index 8e2ae07..af315fb 100644 --- a/examples/fullmesh.rs +++ b/examples/fullmesh.rs @@ -7,8 +7,8 @@ use structopt::StructOpt; use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; -use netapp::netapp::*; use netapp::peering::fullmesh::*; +use netapp::NetApp; #[derive(StructOpt, Debug)] #[structopt(name = "netapp")] diff --git a/src/conn.rs b/src/conn.rs index df3b7cf..7f869e8 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -23,10 +23,12 @@ use crate::netapp::*; use crate::proto::*; use crate::util::*; -pub struct ServerConn { +pub(crate) struct ServerConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_pk: ed25519::PublicKey, + netapp: Arc, - pub remote_addr: SocketAddr, - pub peer_pk: ed25519::PublicKey, + resp_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec)>, close_send: watch::Sender, } @@ -115,10 +117,10 @@ impl RecvLoop for ServerConn { } } } -pub struct ClientConn { - pub netapp: Arc, - pub remote_addr: SocketAddr, - pub peer_pk: ed25519::PublicKey, +pub(crate) struct ClientConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_pk: ed25519::PublicKey, + query_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec)>, next_query_number: AtomicU16, resp_send: mpsc::UnboundedSender<(RequestID, Vec)>, @@ -167,7 +169,6 @@ impl ClientConn { let (close_send, close_recv) = watch::channel(false); let conn = Arc::new(ClientConn { - netapp: netapp.clone(), remote_addr, peer_pk: remote_pk.clone(), next_query_number: AtomicU16::from(0u16), diff --git a/src/error.rs b/src/error.rs index b54423a..44a3d50 100644 --- a/src/error.rs +++ b/src/error.rs @@ -41,6 +41,8 @@ impl From> for Error { } } +/// Ths trait adds a `.log_err()` method on `Result<(), E>` types, +/// which dismisses the error by logging it to stderr. pub trait LogError { fn log_err(self, msg: &'static str); } diff --git a/src/lib.rs b/src/lib.rs index 2db8a22..ba365c7 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,9 +1,29 @@ +//! Netapp is a Rust library that takes care of a few common tasks in distributed software: +//! +//! - establishing secure connections +//! - managing connection lifetime, reconnecting on failure +//! - checking peer's state +//! - peer discovery +//! - query/response message passing model for communications +//! - multiplexing transfers over a connection +//! - overlay networks: full mesh or random peer sampling using Basalt +//! +//! Of particular interest, read the documentation for the `netapp::NetApp` type, +//! the `message::Message` trait, and `proto::RequestPriority` to learn more +//! about message priorization. +//! Also check out the examples to learn how to use this crate. + #![feature(map_first_last)] -pub mod conn; pub mod error; +pub mod util; + +pub mod proto; pub mod message; + +mod conn; + pub mod netapp; pub mod peering; -pub mod proto; -pub mod util; + +pub use netapp::*; diff --git a/src/message.rs b/src/message.rs index bcc5aac..bd54523 100644 --- a/src/message.rs +++ b/src/message.rs @@ -2,6 +2,21 @@ use serde::{Deserialize, Serialize}; pub type MessageKind = u32; +/// This trait should be implemented by all messages your application +/// wants to handle (click to read more). +/// +/// It defines a `KIND`, which should be a **unique** +/// `u32` that distinguishes these messages from other types of messages +/// (it is used by our communication protocol), as well as an associated +/// `Response` type that defines the type of the response that is given +/// to the message. It is your responsibility to ensure that `KIND` is a +/// unique `u32` that is not used by any other protocol messages. +/// All `KIND` values of the form `0x42xxxxxx` are reserved by the netapp +/// crate for internal purposes. +/// +/// A handler for this message has type `Self -> Self::Response`. +/// If you need to return an error, the `Response` type should be +/// a `Result<_, _>`. pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync { const KIND: MessageKind; type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync; diff --git a/src/netapp.rs b/src/netapp.rs index 25c3b5a..e903f44 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -33,17 +33,31 @@ pub(crate) struct Handler { >, } +/// NetApp is the main class that handles incoming and outgoing connections. +/// +/// The `request()` method can be used to send a message to any peer to which we have +/// an outgoing connection, or to ourself. On the server side, these messages are +/// processed by the handlers that have been defined using `add_msg_handler()`. +/// +/// NetApp can be used in a stand-alone fashion or together with a peering strategy. +/// If using it alone, you will want to set `on_connect` and `on_disconnect` events +/// in order to manage information about the current peer list. +/// +/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh +/// and RPS peering strategies take care of the most common use cases. pub struct NetApp { pub listen_addr: SocketAddr, pub netid: auth::Key, pub pubkey: ed25519::PublicKey, pub privkey: ed25519::SecretKey, - pub server_conns: RwLock>>, - pub client_conns: RwLock>>, + + server_conns: RwLock>>, + client_conns: RwLock>>, + pub(crate) msg_handlers: ArcSwap>>, - pub(crate) on_connected: + on_connected_handler: ArcSwapOption>, - pub(crate) on_disconnected: ArcSwapOption>, + on_disconnected_handler: ArcSwapOption>, } async fn net_handler_aux( @@ -78,13 +92,14 @@ where F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, R: Future::Response> + Send + Sync, { - debug!("Handling message of kind {:08x} from ourself", M::KIND,); + debug!("Handling message of kind {:08x} from ourself", M::KIND); let msg = (msg as Box).downcast::().unwrap(); let res = handler(remote, *msg).await; Box::new(res) } impl NetApp { + /// Creates a new instance of NetApp. No background process is pub fn new( listen_addr: SocketAddr, netid: auth::Key, @@ -99,8 +114,8 @@ impl NetApp { server_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()), msg_handlers: ArcSwap::new(Arc::new(HashMap::new())), - on_connected: ArcSwapOption::new(None), - on_disconnected: ArcSwapOption::new(None), + on_connected_handler: ArcSwapOption::new(None), + on_disconnected_handler: ArcSwapOption::new(None), }); let netapp2 = netapp.clone(); @@ -114,6 +129,26 @@ impl NetApp { netapp } + /// Set the handler to be called when a new connection (incoming or outgoing) has + /// been successfully established. Do not set this if using a peering strategy, + /// as the peering strategy will need to set this itself. + pub fn on_connected(&self, handler: F) + where F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static + { + self.on_connected_handler.store(Some(Arc::new(Box::new(handler)))); + } + + /// Set the handler to be called when an existing connection (incoming or outgoing) has + /// been closed by either party. Do not set this if using a peering strategy, + /// as the peering strategy will need to set this itself. + pub fn on_disconnected(&self, handler: F) + where F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static + { + self.on_disconnected_handler.store(Some(Arc::new(Box::new(handler)))); + } + + /// Add a handler for a certain message type. Note that only one handler + /// can be specified for each message type. pub fn add_msg_handler(&self, handler: F) where M: Message + 'static, @@ -122,10 +157,10 @@ impl NetApp { { let handler = Arc::new(handler); - let handler1 = handler.clone(); + let handler2 = handler.clone(); let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| { let fun: Pin> + Sync + Send>> = - Box::pin(net_handler_aux(handler1.clone(), remote, bytes)); + Box::pin(net_handler_aux(handler2.clone(), remote, bytes)); fun }); @@ -146,6 +181,8 @@ impl NetApp { self.msg_handlers.store(Arc::new(handlers)); } + /// Main listening process for our app. This future runs during the whole + /// run time of our application. pub async fn listen(self: Arc) { let mut listener = TcpListener::bind(self.listen_addr).await.unwrap(); info!("Listening on {}", self.listen_addr); @@ -166,16 +203,21 @@ impl NetApp { } } + /// Attempt to connect to a peer, given by its ip:port and its public key. + /// The public key will be checked during the secret handshake process. + /// This function returns once the connection has been established and a + /// successfull handshake was made. At this point we can send messages to + /// the other node with `Netapp::request` pub async fn try_connect( self: Arc, ip: SocketAddr, pk: ed25519::PublicKey, ) -> Result<(), Error> { + // Don't connect to ourself, we don't care + // but pretend we did if pk == self.pubkey { - // Don't connect to ourself, we don't care - // but pretend we did tokio::spawn(async move { - if let Some(h) = self.on_connected.load().as_ref() { + if let Some(h) = self.on_connected_handler.load().as_ref() { h(pk, ip, false); } }); @@ -193,11 +235,16 @@ impl NetApp { Ok(()) } - pub fn disconnect(self: Arc, pk: &ed25519::PublicKey) { + /// Close the outgoing connection we have to a node specified by its public key, + /// if such a connection is currently open. + pub fn disconnect(self: &Arc, pk: &ed25519::PublicKey) { + // Don't disconnect from ourself (we aren't connected anyways) + // but pretend we did if *pk == self.pubkey { let pk = *pk; + let self2 = self.clone(); tokio::spawn(async move { - if let Some(h) = self.on_disconnected.load().as_ref() { + if let Some(h) = self2.on_disconnected_handler.load().as_ref() { h(pk, false); } }); @@ -206,10 +253,30 @@ impl NetApp { let conn = self.client_conns.read().unwrap().get(pk).cloned(); if let Some(c) = conn { + debug!("Closing connection to {} ({})", + hex::encode(c.peer_pk), + c.remote_addr); c.close(); } } + /// Close the incoming connection from a certain client to us, + /// if such a connection is currently open. + pub fn server_disconnect(self: &Arc, pk: &ed25519::PublicKey) { + let conn = self.server_conns.read().unwrap().get(pk).cloned(); + if let Some(c) = conn { + debug!("Closing incoming connection from {} ({})", + hex::encode(c.peer_pk), + c.remote_addr); + c.close(); + } + } + + // Called from conn.rs when an incoming connection is successfully established + // Registers the connection in our list of connections + // Do not yet call the on_connected handler, because we don't know if the remote + // has an actual IP address and port we can call them back on. + // We will know this when they send a Hello message, which is handled below. pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc) { info!("Accepted connection from {}", hex::encode(id)); @@ -217,8 +284,13 @@ impl NetApp { conn_list.insert(id.clone(), conn); } + // Handle hello message from a client. This message is used for them to tell us + // that they are listening on a certain port number on which we can call them back. + // At this point we know they are a full network member, and not just a client, + // and we call the on_connected handler so that the peering strategy knows + // we have a new potential peer fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { - if let Some(h) = self.on_connected.load().as_ref() { + if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(c) = self.server_conns.read().unwrap().get(&id) { let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port); h(id, remote_addr, true); @@ -226,6 +298,9 @@ impl NetApp { } } + // Called from conn.rs when an incoming connection is closed. + // We deregister the connection from server_conns and call the + // handler registered by on_disconnected pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc) { info!("Connection from {} closed", hex::encode(id)); @@ -235,12 +310,19 @@ impl NetApp { conn_list.remove(id); } - if let Some(h) = self.on_disconnected.load().as_ref() { + if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_pk, true); } } } + // Called from conn.rs when an outgoinc connection is successfully established. + // The connection is registered in self.client_conns, and the + // on_connected handler is called. + // + // Since we are ourself listening, we send them a Hello message so that + // they know on which port to call us back. (TODO: don't do this if we are + // just a simple client and not a full p2p node) pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc) { info!("Connection established to {}", hex::encode(id)); @@ -251,32 +333,39 @@ impl NetApp { } } - if let Some(h) = self.on_connected.load().as_ref() { + if let Some(h) = self.on_connected_handler.load().as_ref() { h(conn.peer_pk, conn.remote_addr, false); } + let server_port = self.listen_addr.port(); tokio::spawn(async move { - let server_port = conn.netapp.listen_addr.port(); - conn.request(HelloMessage { server_port }, prio::NORMAL) + conn.request(HelloMessage { server_port }, PRIO_NORMAL) .await .log_err("Sending hello message"); }); } + // Called from conn.rs when an outgoinc connection is closed. + // The connection is removed from conn_list, and the on_disconnected handler + // is called. pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc) { info!("Connection to {} closed", hex::encode(id)); let mut conn_list = self.client_conns.write().unwrap(); if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); - } - if let Some(h) = self.on_disconnected.load().as_ref() { - h(conn.peer_pk, false); + if let Some(h) = self.on_disconnected_handler.load().as_ref() { + h(conn.peer_pk, false); + } } } } + /// Send a message to a remote host to which a client connection is already + /// established, and await their response. The target is the id of the peer we + /// want to send the message to. + /// The priority is an `u8`, with lower numbers meaning highest priority. pub async fn request( &self, target: &ed25519::PublicKey, diff --git a/src/peering/basalt.rs b/src/peering/basalt.rs index 27461ab..5da65e0 100644 --- a/src/peering/basalt.rs +++ b/src/peering/basalt.rs @@ -264,18 +264,18 @@ impl Basalt { }); let basalt2 = basalt.clone(); - netapp.on_connected.store(Some(Arc::new(Box::new( + netapp.on_connected( move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(pk, addr, is_incoming); - }, - )))); + } + ); let basalt2 = basalt.clone(); - netapp.on_disconnected.store(Some(Arc::new(Box::new( + netapp.on_disconnected( move |pk: ed25519::PublicKey, is_incoming: bool| { basalt2.on_disconnected(pk, is_incoming); }, - )))); + ); let basalt2 = basalt.clone(); netapp.add_msg_handler::( @@ -331,7 +331,7 @@ impl Basalt { async fn do_pull(self: Arc, peer: ed25519::PublicKey) { match self .netapp - .request(&peer, PullMessage {}, prio::NORMAL) + .request(&peer, PullMessage {}, PRIO_NORMAL) .await { Ok(resp) => { @@ -345,7 +345,7 @@ impl Basalt { async fn do_push(self: Arc, peer: ed25519::PublicKey) { let push_msg = self.make_push_message(); - if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await { + if let Err(e) = self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { warn!("Error during push exchange: {}", e); } } @@ -448,17 +448,9 @@ impl Basalt { } fn close_all_diff(&self, prev_peers: &HashSet, new_peers: &HashSet) { - let client_conns = self.netapp.client_conns.read().unwrap(); for peer in prev_peers.iter() { if !new_peers.contains(peer) { - if let Some(c) = client_conns.get(&peer.id) { - debug!( - "Closing connection to {} ({})", - hex::encode(peer.id), - peer.addr - ); - c.close(); - } + self.netapp.disconnect(&peer.id); } } } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 4e9a78d..1b26489 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -177,20 +177,20 @@ impl FullMeshPeeringStrategy { ); let strat2 = strat.clone(); - netapp.on_connected.store(Some(Arc::new(Box::new( + netapp.on_connected( move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { let strat2 = strat2.clone(); tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); }, - )))); + ); let strat2 = strat.clone(); - netapp.on_disconnected.store(Some(Arc::new(Box::new( + netapp.on_disconnected( move |pk: ed25519::PublicKey, is_incoming: bool| { let strat2 = strat2.clone(); tokio::spawn(strat2.on_disconnected(pk, is_incoming)); }, - )))); + ); strat } @@ -271,7 +271,7 @@ impl FullMeshPeeringStrategy { hex::encode(id), ping_time ); - match self.netapp.request(&id, ping_msg, prio::HIGH).await { + match self.netapp.request(&id, ping_msg, PRIO_HIGH).await { Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e), Ok(ping_resp) => { let resp_time = Instant::now(); @@ -300,7 +300,7 @@ impl FullMeshPeeringStrategy { async fn exchange_peers(self: Arc, id: &ed25519::PublicKey) { let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let pex_message = PeerListMessage { list: peer_list }; - match self.netapp.request(id, pex_message, prio::BACKGROUND).await { + match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await { Err(e) => warn!("Error doing peer exchange: {}", e), Ok(resp) => { self.handle_peer_list(&resp.list[..]); diff --git a/src/proto.rs b/src/proto.rs index 58c914e..b044280 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -16,19 +16,36 @@ use crate::error::*; use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat}; +/// Priority of a request (click to read more about priorities). +/// +/// This priority value is used to priorize messages +/// in the send queue of the client, and their responses in the send queue of the +/// server. Lower values mean higher priority. +/// +/// This mechanism is usefull for messages bigger than the maximum chunk size +/// (set at `0x4000` bytes), such as large file transfers. +/// In such case, all of the messages in the send queue with the highest priority +/// will take turns to send individual chunks, in a round-robin fashion. +/// Once all highest priority messages are sent successfully, the messages with +/// the next highest priority will begin being sent in the same way. +/// +/// The same priority value is given to a request and to its associated response. +pub type RequestPriority = u8; + +/// Priority class: high +pub const PRIO_HIGH: RequestPriority = 0x20; +/// Priority class: normal +pub const PRIO_NORMAL: RequestPriority = 0x40; +/// Priority class: background +pub const PRIO_BACKGROUND: RequestPriority = 0x80; +/// Priority: primary among given class +pub const PRIO_PRIMARY: RequestPriority = 0x00; +/// Priority: secondary among given class (ex: `PRIO_HIGH || PRIO_SECONDARY`) +pub const PRIO_SECONDARY: RequestPriority = 0x01; + const MAX_CHUNK_SIZE: usize = 0x4000; -pub mod prio { - pub const HIGH: u8 = 0x20; - pub const NORMAL: u8 = 0x40; - pub const BACKGROUND: u8 = 0x80; - - pub const PRIMARY: u8 = 0x00; - pub const SECONDARY: u8 = 0x01; -} - -pub type RequestID = u16; -pub type RequestPriority = u8; +pub(crate) type RequestID = u16; struct SendQueueItem { id: RequestID, diff --git a/src/util.rs b/src/util.rs index e83822e..f09a3bc 100644 --- a/src/util.rs +++ b/src/util.rs @@ -1,6 +1,10 @@ use serde::Serialize; -// util +/// Utility function: encodes any serializable value in MessagePack binary format +/// using the RMP library. +/// +/// Field names and variant names are included in the serialization. +/// This is used internally by the netapp communication protocol. pub fn rmp_to_vec_all_named(val: &T) -> Result, rmp_serde::encode::Error> where T: Serialize + ?Sized,