448 lines
14 KiB
Rust
448 lines
14 KiB
Rust
use std::any::Any;
|
|
use std::collections::HashMap;
|
|
use std::net::{IpAddr, SocketAddr};
|
|
use std::pin::Pin;
|
|
use std::sync::{Arc, RwLock};
|
|
use std::time::Instant;
|
|
|
|
use std::future::Future;
|
|
|
|
use log::{debug, info};
|
|
|
|
use arc_swap::{ArcSwap, ArcSwapOption};
|
|
use bytes::Bytes;
|
|
|
|
use sodiumoxide::crypto::auth;
|
|
use sodiumoxide::crypto::sign::ed25519;
|
|
use tokio::net::{TcpListener, TcpStream};
|
|
|
|
use crate::conn::*;
|
|
use crate::error::*;
|
|
use crate::message::*;
|
|
use crate::proto::*;
|
|
use crate::util::*;
|
|
|
|
type DynMsg = Box<dyn Any + Send + Sync + 'static>;
|
|
|
|
type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
|
|
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
|
|
|
|
pub(crate) type LocalHandler =
|
|
Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>;
|
|
pub(crate) type NetHandler = Box<
|
|
dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send,
|
|
>;
|
|
|
|
pub(crate) struct Handler {
|
|
pub(crate) local_handler: LocalHandler,
|
|
pub(crate) net_handler: NetHandler,
|
|
}
|
|
|
|
/// NetApp is the main class that handles incoming and outgoing connections.
|
|
///
|
|
/// The `request()` method can be used to send a message to any peer to which we have
|
|
/// an outgoing connection, or to ourself. On the server side, these messages are
|
|
/// processed by the handlers that have been defined using `add_msg_handler()`.
|
|
///
|
|
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
|
|
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
|
|
/// in order to manage information about the current peer list.
|
|
///
|
|
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
|
|
/// and RPS peering strategies take care of the most common use cases.
|
|
pub struct NetApp {
|
|
listen_params: ArcSwapOption<ListenParams>,
|
|
|
|
/// Network secret key
|
|
pub netid: auth::Key,
|
|
/// Our peer ID
|
|
pub id: NodeID,
|
|
/// Private key associated with our peer ID
|
|
pub privkey: ed25519::SecretKey,
|
|
|
|
server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
|
|
client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
|
|
|
|
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
|
|
on_connected_handler: ArcSwapOption<OnConnectHandler>,
|
|
on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>,
|
|
}
|
|
|
|
struct ListenParams {
|
|
listen_addr: SocketAddr,
|
|
public_addr: Option<IpAddr>,
|
|
}
|
|
|
|
async fn net_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, bytes: Bytes) -> Vec<u8>
|
|
where
|
|
M: Message + 'static,
|
|
F: Fn(NodeID, M) -> R + Send + Sync + 'static,
|
|
R: Future<Output = <M as Message>::Response> + Send + Sync,
|
|
{
|
|
debug!(
|
|
"Handling message of kind {:08x} from {}",
|
|
M::KIND,
|
|
hex::encode(remote)
|
|
);
|
|
let begin_time = Instant::now();
|
|
let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
|
|
Ok(msg) => Ok(handler(remote, msg).await),
|
|
Err(e) => Err(e.to_string()),
|
|
};
|
|
let end_time = Instant::now();
|
|
debug!(
|
|
"Request {:08x} from {} handled in {}msec",
|
|
M::KIND,
|
|
hex::encode(remote),
|
|
(end_time - begin_time).as_millis()
|
|
);
|
|
rmp_to_vec_all_named(&res).unwrap_or_default()
|
|
}
|
|
|
|
async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg
|
|
where
|
|
M: Message + 'static,
|
|
F: Fn(NodeID, M) -> R + Send + Sync + 'static,
|
|
R: Future<Output = <M as Message>::Response> + Send + Sync,
|
|
{
|
|
debug!("Handling message of kind {:08x} from ourself", M::KIND);
|
|
let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap();
|
|
let res = handler(remote, *msg).await;
|
|
Box::new(res)
|
|
}
|
|
|
|
impl NetApp {
|
|
/// Creates a new instance of NetApp, which can serve either as a full p2p node,
|
|
/// or just as a passive client. To upgrade to a full p2p node, spawn a listener
|
|
/// using `.listen()`
|
|
///
|
|
/// Our Peer ID is the public key associated to the secret key given here.
|
|
pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
|
|
let id = privkey.public_key();
|
|
let netapp = Arc::new(Self {
|
|
listen_params: ArcSwapOption::new(None),
|
|
netid,
|
|
id,
|
|
privkey,
|
|
server_conns: RwLock::new(HashMap::new()),
|
|
client_conns: RwLock::new(HashMap::new()),
|
|
msg_handlers: ArcSwap::new(Arc::new(HashMap::new())),
|
|
on_connected_handler: ArcSwapOption::new(None),
|
|
on_disconnected_handler: ArcSwapOption::new(None),
|
|
});
|
|
|
|
let netapp2 = netapp.clone();
|
|
netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| {
|
|
netapp2.handle_hello_message(from, msg);
|
|
async {}
|
|
});
|
|
|
|
netapp
|
|
}
|
|
|
|
/// Set the handler to be called when a new connection (incoming or outgoing) has
|
|
/// been successfully established. Do not set this if using a peering strategy,
|
|
/// as the peering strategy will need to set this itself.
|
|
pub fn on_connected<F>(&self, handler: F)
|
|
where
|
|
F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
|
|
{
|
|
self.on_connected_handler
|
|
.store(Some(Arc::new(Box::new(handler))));
|
|
}
|
|
|
|
/// Set the handler to be called when an existing connection (incoming or outgoing) has
|
|
/// been closed by either party. Do not set this if using a peering strategy,
|
|
/// as the peering strategy will need to set this itself.
|
|
pub fn on_disconnected<F>(&self, handler: F)
|
|
where
|
|
F: Fn(NodeID, bool) + Sized + Send + Sync + 'static,
|
|
{
|
|
self.on_disconnected_handler
|
|
.store(Some(Arc::new(Box::new(handler))));
|
|
}
|
|
|
|
/// Add a handler for a certain message type. Note that only one handler
|
|
/// can be specified for each message type.
|
|
/// The handler is an asynchronous function, i.e. a function that returns
|
|
/// a future.
|
|
pub fn add_msg_handler<M, F, R>(&self, handler: F)
|
|
where
|
|
M: Message + 'static,
|
|
F: Fn(NodeID, M) -> R + Send + Sync + 'static,
|
|
R: Future<Output = <M as Message>::Response> + Send + Sync + 'static,
|
|
{
|
|
let handler = Arc::new(handler);
|
|
|
|
let handler2 = handler.clone();
|
|
let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| {
|
|
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
|
|
Box::pin(net_handler_aux(handler2.clone(), remote, bytes));
|
|
fun
|
|
});
|
|
|
|
let self_id = self.id;
|
|
let local_handler = Box::new(move |msg: DynMsg| {
|
|
let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
|
|
Box::pin(local_handler_aux(handler.clone(), self_id, msg));
|
|
fun
|
|
});
|
|
|
|
let funs = Arc::new(Handler {
|
|
net_handler,
|
|
local_handler,
|
|
});
|
|
|
|
let mut handlers = self.msg_handlers.load().as_ref().clone();
|
|
handlers.insert(M::KIND, funs);
|
|
self.msg_handlers.store(Arc::new(handlers));
|
|
}
|
|
|
|
/// Main listening process for our app. This future runs during the whole
|
|
/// run time of our application.
|
|
/// If this is not called, the NetApp instance remains a passive client.
|
|
pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
|
|
let listen_params = ListenParams {
|
|
listen_addr,
|
|
public_addr,
|
|
};
|
|
self.listen_params.store(Some(Arc::new(listen_params)));
|
|
|
|
let listener = TcpListener::bind(listen_addr).await.unwrap();
|
|
info!("Listening on {}", listen_addr);
|
|
|
|
loop {
|
|
// The second item contains the IP and port of the new connection.
|
|
let (socket, _) = listener.accept().await.unwrap();
|
|
info!(
|
|
"Incoming connection from {}, negotiating handshake...",
|
|
match socket.peer_addr() {
|
|
Ok(x) => format!("{}", x),
|
|
Err(e) => format!("<invalid addr: {}>", e),
|
|
}
|
|
);
|
|
let self2 = self.clone();
|
|
tokio::spawn(async move {
|
|
ServerConn::run(self2, socket)
|
|
.await
|
|
.log_err("ServerConn::run");
|
|
});
|
|
}
|
|
}
|
|
|
|
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
|
/// The public key will be checked during the secret handshake process.
|
|
/// This function returns once the connection has been established and a
|
|
/// successfull handshake was made. At this point we can send messages to
|
|
/// the other node with `Netapp::request`
|
|
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
|
|
// Don't connect to ourself, we don't care
|
|
// but pretend we did
|
|
if id == self.id {
|
|
tokio::spawn(async move {
|
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
|
h(id, ip, false);
|
|
}
|
|
});
|
|
return Ok(());
|
|
}
|
|
|
|
// Don't connect if already connected
|
|
if self.client_conns.read().unwrap().contains_key(&id) {
|
|
return Ok(());
|
|
}
|
|
|
|
let socket = TcpStream::connect(ip).await?;
|
|
info!("Connected to {}, negotiating handshake...", ip);
|
|
ClientConn::init(self, socket, id).await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Close the outgoing connection we have to a node specified by its public key,
|
|
/// if such a connection is currently open.
|
|
pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
|
|
// If id is ourself, we're not supposed to have a connection open
|
|
if *id != self.id {
|
|
let conn = self.client_conns.write().unwrap().remove(id);
|
|
if let Some(c) = conn {
|
|
debug!(
|
|
"Closing connection to {} ({})",
|
|
hex::encode(c.peer_id),
|
|
c.remote_addr
|
|
);
|
|
c.close();
|
|
} else {
|
|
return;
|
|
}
|
|
}
|
|
|
|
// call on_disconnected_handler immediately, since the connection
|
|
// was removed
|
|
// (if id == self.id, we pretend we disconnected)
|
|
let id = *id;
|
|
let self2 = self.clone();
|
|
tokio::spawn(async move {
|
|
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
|
|
h(id, false);
|
|
}
|
|
});
|
|
}
|
|
|
|
/// Close the incoming connection from a certain client to us,
|
|
/// if such a connection is currently open.
|
|
pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) {
|
|
let conn = self.server_conns.read().unwrap().get(id).cloned();
|
|
if let Some(c) = conn {
|
|
debug!(
|
|
"Closing incoming connection from {} ({})",
|
|
hex::encode(c.peer_id),
|
|
c.remote_addr
|
|
);
|
|
c.close();
|
|
}
|
|
}
|
|
|
|
// Called from conn.rs when an incoming connection is successfully established
|
|
// Registers the connection in our list of connections
|
|
// Do not yet call the on_connected handler, because we don't know if the remote
|
|
// has an actual IP address and port we can call them back on.
|
|
// We will know this when they send a Hello message, which is handled below.
|
|
pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) {
|
|
info!("Accepted connection from {}", hex::encode(id));
|
|
|
|
self.server_conns.write().unwrap().insert(id, conn);
|
|
}
|
|
|
|
// Handle hello message from a client. This message is used for them to tell us
|
|
// that they are listening on a certain port number on which we can call them back.
|
|
// At this point we know they are a full network member, and not just a client,
|
|
// and we call the on_connected handler so that the peering strategy knows
|
|
// we have a new potential peer
|
|
fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) {
|
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
|
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
|
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
|
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
|
|
h(id, remote_addr, true);
|
|
}
|
|
}
|
|
}
|
|
|
|
// Called from conn.rs when an incoming connection is closed.
|
|
// We deregister the connection from server_conns and call the
|
|
// handler registered by on_disconnected
|
|
pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) {
|
|
info!("Connection from {} closed", hex::encode(id));
|
|
|
|
let mut conn_list = self.server_conns.write().unwrap();
|
|
if let Some(c) = conn_list.get(id) {
|
|
if Arc::ptr_eq(c, &conn) {
|
|
conn_list.remove(id);
|
|
drop(conn_list);
|
|
|
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
|
h(conn.peer_id, true);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// Called from conn.rs when an outgoinc connection is successfully established.
|
|
// The connection is registered in self.client_conns, and the
|
|
// on_connected handler is called.
|
|
//
|
|
// Since we are ourself listening, we send them a Hello message so that
|
|
// they know on which port to call us back. (TODO: don't do this if we are
|
|
// just a simple client and not a full p2p node)
|
|
pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) {
|
|
info!("Connection established to {}", hex::encode(id));
|
|
|
|
{
|
|
let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone());
|
|
if let Some(old_c) = old_c_opt {
|
|
tokio::spawn(async move { old_c.close() });
|
|
}
|
|
}
|
|
|
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
|
h(conn.peer_id, conn.remote_addr, false);
|
|
}
|
|
|
|
if let Some(lp) = self.listen_params.load_full() {
|
|
let server_addr = lp.public_addr;
|
|
let server_port = lp.listen_addr.port();
|
|
tokio::spawn(async move {
|
|
conn.request(
|
|
HelloMessage {
|
|
server_addr,
|
|
server_port,
|
|
},
|
|
PRIO_NORMAL,
|
|
)
|
|
.await
|
|
.log_err("Sending hello message");
|
|
});
|
|
}
|
|
}
|
|
|
|
// Called from conn.rs when an outgoinc connection is closed.
|
|
// The connection is removed from conn_list, and the on_disconnected handler
|
|
// is called.
|
|
pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) {
|
|
info!("Connection to {} closed", hex::encode(id));
|
|
let mut conn_list = self.client_conns.write().unwrap();
|
|
if let Some(c) = conn_list.get(id) {
|
|
if Arc::ptr_eq(c, &conn) {
|
|
conn_list.remove(id);
|
|
drop(conn_list);
|
|
|
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
|
h(conn.peer_id, false);
|
|
}
|
|
}
|
|
}
|
|
// else case: happens if connection was removed in .disconnect()
|
|
// in which case on_disconnected_handler was already called
|
|
}
|
|
|
|
/// Send a message to a remote host to which a client connection is already
|
|
/// established, and await their response. The target is the id of the peer we
|
|
/// want to send the message to.
|
|
/// The priority is an `u8`, with lower numbers meaning highest priority.
|
|
pub async fn request<T>(
|
|
&self,
|
|
target: &NodeID,
|
|
rq: T,
|
|
prio: RequestPriority,
|
|
) -> Result<<T as Message>::Response, Error>
|
|
where
|
|
T: Message + 'static,
|
|
{
|
|
if *target == self.id {
|
|
let handler = self.msg_handlers.load().get(&T::KIND).cloned();
|
|
match handler {
|
|
None => Err(Error::Message(format!(
|
|
"No handler registered for message kind {:08x}",
|
|
T::KIND
|
|
))),
|
|
Some(h) => {
|
|
let local_handler = &h.local_handler;
|
|
let res = local_handler(Box::new(rq)).await;
|
|
let res_t = (res as Box<dyn Any + 'static>)
|
|
.downcast::<<T as Message>::Response>()
|
|
.unwrap();
|
|
Ok(*res_t)
|
|
}
|
|
}
|
|
} else {
|
|
let conn = self.client_conns.read().unwrap().get(target).cloned();
|
|
match conn {
|
|
None => Err(Error::Message(format!(
|
|
"Not connected: {}",
|
|
hex::encode(target)
|
|
))),
|
|
Some(c) => c.request(rq, prio).await,
|
|
}
|
|
}
|
|
}
|
|
}
|