forked from Deuxfleurs/garage
453 lines
14 KiB
Rust
453 lines
14 KiB
Rust
|
use std::collections::HashMap;
|
||
|
use std::net::{IpAddr, SocketAddr};
|
||
|
use std::sync::{Arc, RwLock};
|
||
|
|
||
|
use log::{debug, error, info, trace, warn};
|
||
|
|
||
|
use arc_swap::ArcSwapOption;
|
||
|
use async_trait::async_trait;
|
||
|
|
||
|
use serde::{Deserialize, Serialize};
|
||
|
use sodiumoxide::crypto::auth;
|
||
|
use sodiumoxide::crypto::sign::ed25519;
|
||
|
|
||
|
use futures::stream::futures_unordered::FuturesUnordered;
|
||
|
use futures::stream::StreamExt;
|
||
|
use tokio::net::{TcpListener, TcpStream};
|
||
|
use tokio::select;
|
||
|
use tokio::sync::{mpsc, watch};
|
||
|
|
||
|
use crate::client::*;
|
||
|
use crate::endpoint::*;
|
||
|
use crate::error::*;
|
||
|
use crate::message::*;
|
||
|
use crate::server::*;
|
||
|
|
||
|
/// A node's identifier, which is also its public cryptographic key
|
||
|
pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey;
|
||
|
/// A node's secret key
|
||
|
pub type NodeKey = sodiumoxide::crypto::sign::ed25519::SecretKey;
|
||
|
/// A network key
|
||
|
pub type NetworkKey = sodiumoxide::crypto::auth::Key;
|
||
|
|
||
|
/// Tag which is exchanged between client and server upon connection establishment
|
||
|
/// to check that they are running compatible versions of Netapp,
|
||
|
/// composed of 8 bytes for Netapp version and 8 bytes for client version
|
||
|
pub(crate) type VersionTag = [u8; 16];
|
||
|
|
||
|
/// Value of the Netapp version used in the version tag
|
||
|
pub(crate) const NETAPP_VERSION_TAG: u64 = 0x6e65746170700005; // netapp 0x0005
|
||
|
|
||
|
#[derive(Serialize, Deserialize, Debug)]
|
||
|
pub(crate) struct HelloMessage {
|
||
|
pub server_addr: Option<IpAddr>,
|
||
|
pub server_port: u16,
|
||
|
}
|
||
|
|
||
|
impl Message for HelloMessage {
|
||
|
type Response = ();
|
||
|
}
|
||
|
|
||
|
type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
|
||
|
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
|
||
|
|
||
|
/// NetApp is the main class that handles incoming and outgoing connections.
|
||
|
///
|
||
|
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
|
||
|
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
|
||
|
/// in order to manage information about the current peer list.
|
||
|
///
|
||
|
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
|
||
|
/// and RPS peering strategies take care of the most common use cases.
|
||
|
pub struct NetApp {
|
||
|
listen_params: ArcSwapOption<ListenParams>,
|
||
|
|
||
|
/// Version tag, 8 bytes for netapp version, 8 bytes for app version
|
||
|
pub version_tag: VersionTag,
|
||
|
/// Network secret key
|
||
|
pub netid: auth::Key,
|
||
|
/// Our peer ID
|
||
|
pub id: NodeID,
|
||
|
/// Private key associated with our peer ID
|
||
|
pub privkey: ed25519::SecretKey,
|
||
|
|
||
|
pub(crate) server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
|
||
|
pub(crate) client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
|
||
|
|
||
|
pub(crate) endpoints: RwLock<HashMap<String, DynEndpoint>>,
|
||
|
hello_endpoint: ArcSwapOption<Endpoint<HelloMessage, NetApp>>,
|
||
|
|
||
|
on_connected_handler: ArcSwapOption<OnConnectHandler>,
|
||
|
on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>,
|
||
|
}
|
||
|
|
||
|
struct ListenParams {
|
||
|
listen_addr: SocketAddr,
|
||
|
public_addr: Option<IpAddr>,
|
||
|
}
|
||
|
|
||
|
impl NetApp {
|
||
|
/// Creates a new instance of NetApp, which can serve either as a full p2p node,
|
||
|
/// or just as a passive client. To upgrade to a full p2p node, spawn a listener
|
||
|
/// using `.listen()`
|
||
|
///
|
||
|
/// Our Peer ID is the public key associated to the secret key given here.
|
||
|
pub fn new(app_version_tag: u64, netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
|
||
|
let mut version_tag = [0u8; 16];
|
||
|
version_tag[0..8].copy_from_slice(&u64::to_be_bytes(NETAPP_VERSION_TAG)[..]);
|
||
|
version_tag[8..16].copy_from_slice(&u64::to_be_bytes(app_version_tag)[..]);
|
||
|
|
||
|
let id = privkey.public_key();
|
||
|
let netapp = Arc::new(Self {
|
||
|
listen_params: ArcSwapOption::new(None),
|
||
|
version_tag,
|
||
|
netid,
|
||
|
id,
|
||
|
privkey,
|
||
|
server_conns: RwLock::new(HashMap::new()),
|
||
|
client_conns: RwLock::new(HashMap::new()),
|
||
|
endpoints: RwLock::new(HashMap::new()),
|
||
|
hello_endpoint: ArcSwapOption::new(None),
|
||
|
on_connected_handler: ArcSwapOption::new(None),
|
||
|
on_disconnected_handler: ArcSwapOption::new(None),
|
||
|
});
|
||
|
|
||
|
netapp
|
||
|
.hello_endpoint
|
||
|
.swap(Some(netapp.endpoint("__netapp/netapp.rs/Hello".into())));
|
||
|
netapp
|
||
|
.hello_endpoint
|
||
|
.load_full()
|
||
|
.unwrap()
|
||
|
.set_handler(netapp.clone());
|
||
|
|
||
|
netapp
|
||
|
}
|
||
|
|
||
|
/// Set the handler to be called when a new connection (incoming or outgoing) has
|
||
|
/// been successfully established. Do not set this if using a peering strategy,
|
||
|
/// as the peering strategy will need to set this itself.
|
||
|
pub fn on_connected<F>(&self, handler: F)
|
||
|
where
|
||
|
F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
|
||
|
{
|
||
|
self.on_connected_handler
|
||
|
.store(Some(Arc::new(Box::new(handler))));
|
||
|
}
|
||
|
|
||
|
/// Set the handler to be called when an existing connection (incoming or outgoing) has
|
||
|
/// been closed by either party. Do not set this if using a peering strategy,
|
||
|
/// as the peering strategy will need to set this itself.
|
||
|
pub fn on_disconnected<F>(&self, handler: F)
|
||
|
where
|
||
|
F: Fn(NodeID, bool) + Sized + Send + Sync + 'static,
|
||
|
{
|
||
|
self.on_disconnected_handler
|
||
|
.store(Some(Arc::new(Box::new(handler))));
|
||
|
}
|
||
|
|
||
|
/// Create a new endpoint with path `path`,
|
||
|
/// that handles messages of type `M`.
|
||
|
/// `H` is the type of the object that should handle requests
|
||
|
/// to this endpoint on the local node. If you don't want
|
||
|
/// to handle request on the local node (e.g. if this node
|
||
|
/// is only a client in the network), define the type `H`
|
||
|
/// to be `()`.
|
||
|
/// This function will panic if the endpoint has already been
|
||
|
/// created.
|
||
|
pub fn endpoint<M, H>(self: &Arc<Self>, path: String) -> Arc<Endpoint<M, H>>
|
||
|
where
|
||
|
M: Message + 'static,
|
||
|
H: StreamingEndpointHandler<M> + 'static,
|
||
|
{
|
||
|
let endpoint = Arc::new(Endpoint::<M, H>::new(self.clone(), path.clone()));
|
||
|
let endpoint_arc = EndpointArc(endpoint.clone());
|
||
|
if self
|
||
|
.endpoints
|
||
|
.write()
|
||
|
.unwrap()
|
||
|
.insert(path.clone(), Box::new(endpoint_arc))
|
||
|
.is_some()
|
||
|
{
|
||
|
panic!("Redefining endpoint: {}", path);
|
||
|
};
|
||
|
endpoint
|
||
|
}
|
||
|
|
||
|
/// Main listening process for our app. This future runs during the whole
|
||
|
/// run time of our application.
|
||
|
/// If this is not called, the NetApp instance remains a passive client.
|
||
|
pub async fn listen(
|
||
|
self: Arc<Self>,
|
||
|
listen_addr: SocketAddr,
|
||
|
public_addr: Option<IpAddr>,
|
||
|
mut must_exit: watch::Receiver<bool>,
|
||
|
) {
|
||
|
let listen_params = ListenParams {
|
||
|
listen_addr,
|
||
|
public_addr,
|
||
|
};
|
||
|
if self
|
||
|
.listen_params
|
||
|
.swap(Some(Arc::new(listen_params)))
|
||
|
.is_some()
|
||
|
{
|
||
|
error!("Trying to listen on NetApp but we're already listening!");
|
||
|
}
|
||
|
|
||
|
let listener = TcpListener::bind(listen_addr).await.unwrap();
|
||
|
info!("Listening on {}", listen_addr);
|
||
|
|
||
|
let (conn_in, mut conn_out) = mpsc::unbounded_channel();
|
||
|
let connection_collector = tokio::spawn(async move {
|
||
|
let mut collection = FuturesUnordered::new();
|
||
|
loop {
|
||
|
if collection.is_empty() {
|
||
|
match conn_out.recv().await {
|
||
|
Some(f) => collection.push(f),
|
||
|
None => break,
|
||
|
}
|
||
|
} else {
|
||
|
select! {
|
||
|
new_fut = conn_out.recv() => {
|
||
|
match new_fut {
|
||
|
Some(f) => collection.push(f),
|
||
|
None => break,
|
||
|
}
|
||
|
}
|
||
|
result = collection.next() => {
|
||
|
trace!("Collected connection: {:?}", result);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
debug!("Collecting last open server connections.");
|
||
|
while let Some(conn_res) = collection.next().await {
|
||
|
trace!("Collected connection: {:?}", conn_res);
|
||
|
}
|
||
|
debug!("No more server connections to collect");
|
||
|
});
|
||
|
|
||
|
while !*must_exit.borrow_and_update() {
|
||
|
let (socket, peer_addr) = select! {
|
||
|
sockres = listener.accept() => {
|
||
|
match sockres {
|
||
|
Ok(x) => x,
|
||
|
Err(e) => {
|
||
|
warn!("Error in listener.accept: {}", e);
|
||
|
continue;
|
||
|
}
|
||
|
}
|
||
|
},
|
||
|
_ = must_exit.changed() => continue,
|
||
|
};
|
||
|
|
||
|
info!(
|
||
|
"Incoming connection from {}, negotiating handshake...",
|
||
|
peer_addr
|
||
|
);
|
||
|
let self2 = self.clone();
|
||
|
let must_exit2 = must_exit.clone();
|
||
|
conn_in
|
||
|
.send(tokio::spawn(async move {
|
||
|
ServerConn::run(self2, socket, must_exit2)
|
||
|
.await
|
||
|
.log_err("ServerConn::run");
|
||
|
}))
|
||
|
.log_err("Failed to send connection to connection collector");
|
||
|
}
|
||
|
|
||
|
drop(conn_in);
|
||
|
|
||
|
connection_collector
|
||
|
.await
|
||
|
.log_err("Failed to await for connection collector");
|
||
|
}
|
||
|
|
||
|
/// Drop all endpoint handlers, as well as handlers for connection/disconnection
|
||
|
/// events. (This disables the peering strategy)
|
||
|
///
|
||
|
/// Use this when terminating to break reference cycles
|
||
|
pub fn drop_all_handlers(&self) {
|
||
|
for (_, endpoint) in self.endpoints.read().unwrap().iter() {
|
||
|
endpoint.drop_handler();
|
||
|
}
|
||
|
self.on_connected_handler.store(None);
|
||
|
self.on_disconnected_handler.store(None);
|
||
|
}
|
||
|
|
||
|
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
||
|
/// The public key will be checked during the secret handshake process.
|
||
|
/// This function returns once the connection has been established and a
|
||
|
/// successfull handshake was made. At this point we can send messages to
|
||
|
/// the other node with `Netapp::request`
|
||
|
pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
|
||
|
// Don't connect to ourself, we don't care
|
||
|
// but pretend we did
|
||
|
if id == self.id {
|
||
|
tokio::spawn(async move {
|
||
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||
|
h(id, ip, false);
|
||
|
}
|
||
|
});
|
||
|
return Ok(());
|
||
|
}
|
||
|
|
||
|
// Don't connect if already connected
|
||
|
if self.client_conns.read().unwrap().contains_key(&id) {
|
||
|
return Ok(());
|
||
|
}
|
||
|
|
||
|
let socket = TcpStream::connect(ip).await?;
|
||
|
info!("Connected to {}, negotiating handshake...", ip);
|
||
|
ClientConn::init(self, socket, id).await?;
|
||
|
Ok(())
|
||
|
}
|
||
|
|
||
|
/// Close the outgoing connection we have to a node specified by its public key,
|
||
|
/// if such a connection is currently open.
|
||
|
pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
|
||
|
// If id is ourself, we're not supposed to have a connection open
|
||
|
if *id != self.id {
|
||
|
let conn = self.client_conns.write().unwrap().remove(id);
|
||
|
if let Some(c) = conn {
|
||
|
debug!(
|
||
|
"Closing connection to {} ({})",
|
||
|
hex::encode(&c.peer_id[..8]),
|
||
|
c.remote_addr
|
||
|
);
|
||
|
c.close();
|
||
|
} else {
|
||
|
return;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// call on_disconnected_handler immediately, since the connection
|
||
|
// was removed
|
||
|
// (if id == self.id, we pretend we disconnected)
|
||
|
let id = *id;
|
||
|
let self2 = self.clone();
|
||
|
tokio::spawn(async move {
|
||
|
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
|
||
|
h(id, false);
|
||
|
}
|
||
|
});
|
||
|
}
|
||
|
|
||
|
// Called from conn.rs when an incoming connection is successfully established
|
||
|
// Registers the connection in our list of connections
|
||
|
// Do not yet call the on_connected handler, because we don't know if the remote
|
||
|
// has an actual IP address and port we can call them back on.
|
||
|
// We will know this when they send a Hello message, which is handled below.
|
||
|
pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) {
|
||
|
info!(
|
||
|
"Accepted connection from {} at {}",
|
||
|
hex::encode(&id[..8]),
|
||
|
conn.remote_addr
|
||
|
);
|
||
|
|
||
|
self.server_conns.write().unwrap().insert(id, conn);
|
||
|
}
|
||
|
|
||
|
// Handle hello message from a client. This message is used for them to tell us
|
||
|
// that they are listening on a certain port number on which we can call them back.
|
||
|
// At this point we know they are a full network member, and not just a client,
|
||
|
// and we call the on_connected handler so that the peering strategy knows
|
||
|
// we have a new potential peer
|
||
|
|
||
|
// Called from conn.rs when an incoming connection is closed.
|
||
|
// We deregister the connection from server_conns and call the
|
||
|
// handler registered by on_disconnected
|
||
|
pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) {
|
||
|
info!("Connection from {} closed", hex::encode(&id[..8]));
|
||
|
|
||
|
let mut conn_list = self.server_conns.write().unwrap();
|
||
|
if let Some(c) = conn_list.get(id) {
|
||
|
if Arc::ptr_eq(c, &conn) {
|
||
|
conn_list.remove(id);
|
||
|
drop(conn_list);
|
||
|
|
||
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
||
|
h(conn.peer_id, true);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Called from conn.rs when an outgoinc connection is successfully established.
|
||
|
// The connection is registered in self.client_conns, and the
|
||
|
// on_connected handler is called.
|
||
|
//
|
||
|
// Since we are ourself listening, we send them a Hello message so that
|
||
|
// they know on which port to call us back. (TODO: don't do this if we are
|
||
|
// just a simple client and not a full p2p node)
|
||
|
pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) {
|
||
|
info!("Connection established to {}", hex::encode(&id[..8]));
|
||
|
|
||
|
{
|
||
|
let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone());
|
||
|
if let Some(old_c) = old_c_opt {
|
||
|
tokio::spawn(async move { old_c.close() });
|
||
|
}
|
||
|
}
|
||
|
|
||
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||
|
h(conn.peer_id, conn.remote_addr, false);
|
||
|
}
|
||
|
|
||
|
if let Some(lp) = self.listen_params.load_full() {
|
||
|
let server_addr = lp.public_addr;
|
||
|
let server_port = lp.listen_addr.port();
|
||
|
let hello_endpoint = self.hello_endpoint.load_full().unwrap();
|
||
|
tokio::spawn(async move {
|
||
|
hello_endpoint
|
||
|
.call(
|
||
|
&conn.peer_id,
|
||
|
HelloMessage {
|
||
|
server_addr,
|
||
|
server_port,
|
||
|
},
|
||
|
PRIO_NORMAL,
|
||
|
)
|
||
|
.await
|
||
|
.map(|_| ())
|
||
|
.log_err("Sending hello message");
|
||
|
});
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Called from conn.rs when an outgoinc connection is closed.
|
||
|
// The connection is removed from conn_list, and the on_disconnected handler
|
||
|
// is called.
|
||
|
pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) {
|
||
|
info!("Connection to {} closed", hex::encode(&id[..8]));
|
||
|
let mut conn_list = self.client_conns.write().unwrap();
|
||
|
if let Some(c) = conn_list.get(id) {
|
||
|
if Arc::ptr_eq(c, &conn) {
|
||
|
conn_list.remove(id);
|
||
|
drop(conn_list);
|
||
|
|
||
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
||
|
h(conn.peer_id, false);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
// else case: happens if connection was removed in .disconnect()
|
||
|
// in which case on_disconnected_handler was already called
|
||
|
}
|
||
|
}
|
||
|
|
||
|
#[async_trait]
|
||
|
impl EndpointHandler<HelloMessage> for NetApp {
|
||
|
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
|
||
|
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);
|
||
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||
|
if let Some(c) = self.server_conns.read().unwrap().get(&from) {
|
||
|
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
||
|
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
|
||
|
h(from, remote_addr, true);
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|