Documentate
This commit is contained in:
parent
46fae5d138
commit
14d34e76f4
11 changed files with 209 additions and 69 deletions
|
@ -9,7 +9,7 @@ use structopt::StructOpt;
|
||||||
use sodiumoxide::crypto::auth;
|
use sodiumoxide::crypto::auth;
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
|
||||||
use netapp::netapp::*;
|
use netapp::NetApp;
|
||||||
use netapp::peering::basalt::*;
|
use netapp::peering::basalt::*;
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
|
|
|
@ -7,8 +7,8 @@ use structopt::StructOpt;
|
||||||
use sodiumoxide::crypto::auth;
|
use sodiumoxide::crypto::auth;
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
|
||||||
use netapp::netapp::*;
|
|
||||||
use netapp::peering::fullmesh::*;
|
use netapp::peering::fullmesh::*;
|
||||||
|
use netapp::NetApp;
|
||||||
|
|
||||||
#[derive(StructOpt, Debug)]
|
#[derive(StructOpt, Debug)]
|
||||||
#[structopt(name = "netapp")]
|
#[structopt(name = "netapp")]
|
||||||
|
|
17
src/conn.rs
17
src/conn.rs
|
@ -23,10 +23,12 @@ use crate::netapp::*;
|
||||||
use crate::proto::*;
|
use crate::proto::*;
|
||||||
use crate::util::*;
|
use crate::util::*;
|
||||||
|
|
||||||
pub struct ServerConn {
|
pub(crate) struct ServerConn {
|
||||||
|
pub(crate) remote_addr: SocketAddr,
|
||||||
|
pub(crate) peer_pk: ed25519::PublicKey,
|
||||||
|
|
||||||
netapp: Arc<NetApp>,
|
netapp: Arc<NetApp>,
|
||||||
pub remote_addr: SocketAddr,
|
|
||||||
pub peer_pk: ed25519::PublicKey,
|
|
||||||
resp_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>,
|
resp_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>,
|
||||||
close_send: watch::Sender<bool>,
|
close_send: watch::Sender<bool>,
|
||||||
}
|
}
|
||||||
|
@ -115,10 +117,10 @@ impl RecvLoop for ServerConn {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
pub struct ClientConn {
|
pub(crate) struct ClientConn {
|
||||||
pub netapp: Arc<NetApp>,
|
pub(crate) remote_addr: SocketAddr,
|
||||||
pub remote_addr: SocketAddr,
|
pub(crate) peer_pk: ed25519::PublicKey,
|
||||||
pub peer_pk: ed25519::PublicKey,
|
|
||||||
query_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>,
|
query_send: mpsc::UnboundedSender<(RequestID, RequestPriority, Vec<u8>)>,
|
||||||
next_query_number: AtomicU16,
|
next_query_number: AtomicU16,
|
||||||
resp_send: mpsc::UnboundedSender<(RequestID, Vec<u8>)>,
|
resp_send: mpsc::UnboundedSender<(RequestID, Vec<u8>)>,
|
||||||
|
@ -167,7 +169,6 @@ impl ClientConn {
|
||||||
let (close_send, close_recv) = watch::channel(false);
|
let (close_send, close_recv) = watch::channel(false);
|
||||||
|
|
||||||
let conn = Arc::new(ClientConn {
|
let conn = Arc::new(ClientConn {
|
||||||
netapp: netapp.clone(),
|
|
||||||
remote_addr,
|
remote_addr,
|
||||||
peer_pk: remote_pk.clone(),
|
peer_pk: remote_pk.clone(),
|
||||||
next_query_number: AtomicU16::from(0u16),
|
next_query_number: AtomicU16::from(0u16),
|
||||||
|
|
|
@ -41,6 +41,8 @@ impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Ths trait adds a `.log_err()` method on `Result<(), E>` types,
|
||||||
|
/// which dismisses the error by logging it to stderr.
|
||||||
pub trait LogError {
|
pub trait LogError {
|
||||||
fn log_err(self, msg: &'static str);
|
fn log_err(self, msg: &'static str);
|
||||||
}
|
}
|
||||||
|
|
26
src/lib.rs
26
src/lib.rs
|
@ -1,9 +1,29 @@
|
||||||
|
//! Netapp is a Rust library that takes care of a few common tasks in distributed software:
|
||||||
|
//!
|
||||||
|
//! - establishing secure connections
|
||||||
|
//! - managing connection lifetime, reconnecting on failure
|
||||||
|
//! - checking peer's state
|
||||||
|
//! - peer discovery
|
||||||
|
//! - query/response message passing model for communications
|
||||||
|
//! - multiplexing transfers over a connection
|
||||||
|
//! - overlay networks: full mesh or random peer sampling using Basalt
|
||||||
|
//!
|
||||||
|
//! Of particular interest, read the documentation for the `netapp::NetApp` type,
|
||||||
|
//! the `message::Message` trait, and `proto::RequestPriority` to learn more
|
||||||
|
//! about message priorization.
|
||||||
|
//! Also check out the examples to learn how to use this crate.
|
||||||
|
|
||||||
#![feature(map_first_last)]
|
#![feature(map_first_last)]
|
||||||
|
|
||||||
pub mod conn;
|
|
||||||
pub mod error;
|
pub mod error;
|
||||||
|
pub mod util;
|
||||||
|
|
||||||
|
pub mod proto;
|
||||||
pub mod message;
|
pub mod message;
|
||||||
|
|
||||||
|
mod conn;
|
||||||
|
|
||||||
pub mod netapp;
|
pub mod netapp;
|
||||||
pub mod peering;
|
pub mod peering;
|
||||||
pub mod proto;
|
|
||||||
pub mod util;
|
pub use netapp::*;
|
||||||
|
|
|
@ -2,6 +2,21 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
pub type MessageKind = u32;
|
pub type MessageKind = u32;
|
||||||
|
|
||||||
|
/// This trait should be implemented by all messages your application
|
||||||
|
/// wants to handle (click to read more).
|
||||||
|
///
|
||||||
|
/// It defines a `KIND`, which should be a **unique**
|
||||||
|
/// `u32` that distinguishes these messages from other types of messages
|
||||||
|
/// (it is used by our communication protocol), as well as an associated
|
||||||
|
/// `Response` type that defines the type of the response that is given
|
||||||
|
/// to the message. It is your responsibility to ensure that `KIND` is a
|
||||||
|
/// unique `u32` that is not used by any other protocol messages.
|
||||||
|
/// All `KIND` values of the form `0x42xxxxxx` are reserved by the netapp
|
||||||
|
/// crate for internal purposes.
|
||||||
|
///
|
||||||
|
/// A handler for this message has type `Self -> Self::Response`.
|
||||||
|
/// If you need to return an error, the `Response` type should be
|
||||||
|
/// a `Result<_, _>`.
|
||||||
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
|
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync {
|
||||||
const KIND: MessageKind;
|
const KIND: MessageKind;
|
||||||
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync;
|
||||||
|
|
129
src/netapp.rs
129
src/netapp.rs
|
@ -33,17 +33,31 @@ pub(crate) struct Handler {
|
||||||
>,
|
>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// NetApp is the main class that handles incoming and outgoing connections.
|
||||||
|
///
|
||||||
|
/// The `request()` method can be used to send a message to any peer to which we have
|
||||||
|
/// an outgoing connection, or to ourself. On the server side, these messages are
|
||||||
|
/// processed by the handlers that have been defined using `add_msg_handler()`.
|
||||||
|
///
|
||||||
|
/// NetApp can be used in a stand-alone fashion or together with a peering strategy.
|
||||||
|
/// If using it alone, you will want to set `on_connect` and `on_disconnect` events
|
||||||
|
/// in order to manage information about the current peer list.
|
||||||
|
///
|
||||||
|
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
|
||||||
|
/// and RPS peering strategies take care of the most common use cases.
|
||||||
pub struct NetApp {
|
pub struct NetApp {
|
||||||
pub listen_addr: SocketAddr,
|
pub listen_addr: SocketAddr,
|
||||||
pub netid: auth::Key,
|
pub netid: auth::Key,
|
||||||
pub pubkey: ed25519::PublicKey,
|
pub pubkey: ed25519::PublicKey,
|
||||||
pub privkey: ed25519::SecretKey,
|
pub privkey: ed25519::SecretKey,
|
||||||
pub server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
|
|
||||||
pub client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
|
server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>,
|
||||||
|
client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>,
|
||||||
|
|
||||||
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
|
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
|
||||||
pub(crate) on_connected:
|
on_connected_handler:
|
||||||
ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>,
|
ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>,
|
||||||
pub(crate) on_disconnected: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
|
on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn net_handler_aux<M, F, R>(
|
async fn net_handler_aux<M, F, R>(
|
||||||
|
@ -78,13 +92,14 @@ where
|
||||||
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
|
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static,
|
||||||
R: Future<Output = <M as Message>::Response> + Send + Sync,
|
R: Future<Output = <M as Message>::Response> + Send + Sync,
|
||||||
{
|
{
|
||||||
debug!("Handling message of kind {:08x} from ourself", M::KIND,);
|
debug!("Handling message of kind {:08x} from ourself", M::KIND);
|
||||||
let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap();
|
let msg = (msg as Box<dyn Any + 'static>).downcast::<M>().unwrap();
|
||||||
let res = handler(remote, *msg).await;
|
let res = handler(remote, *msg).await;
|
||||||
Box::new(res)
|
Box::new(res)
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetApp {
|
impl NetApp {
|
||||||
|
/// Creates a new instance of NetApp. No background process is
|
||||||
pub fn new(
|
pub fn new(
|
||||||
listen_addr: SocketAddr,
|
listen_addr: SocketAddr,
|
||||||
netid: auth::Key,
|
netid: auth::Key,
|
||||||
|
@ -99,8 +114,8 @@ impl NetApp {
|
||||||
server_conns: RwLock::new(HashMap::new()),
|
server_conns: RwLock::new(HashMap::new()),
|
||||||
client_conns: RwLock::new(HashMap::new()),
|
client_conns: RwLock::new(HashMap::new()),
|
||||||
msg_handlers: ArcSwap::new(Arc::new(HashMap::new())),
|
msg_handlers: ArcSwap::new(Arc::new(HashMap::new())),
|
||||||
on_connected: ArcSwapOption::new(None),
|
on_connected_handler: ArcSwapOption::new(None),
|
||||||
on_disconnected: ArcSwapOption::new(None),
|
on_disconnected_handler: ArcSwapOption::new(None),
|
||||||
});
|
});
|
||||||
|
|
||||||
let netapp2 = netapp.clone();
|
let netapp2 = netapp.clone();
|
||||||
|
@ -114,6 +129,26 @@ impl NetApp {
|
||||||
netapp
|
netapp
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Set the handler to be called when a new connection (incoming or outgoing) has
|
||||||
|
/// been successfully established. Do not set this if using a peering strategy,
|
||||||
|
/// as the peering strategy will need to set this itself.
|
||||||
|
pub fn on_connected<F>(&self, handler: F)
|
||||||
|
where F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
self.on_connected_handler.store(Some(Arc::new(Box::new(handler))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Set the handler to be called when an existing connection (incoming or outgoing) has
|
||||||
|
/// been closed by either party. Do not set this if using a peering strategy,
|
||||||
|
/// as the peering strategy will need to set this itself.
|
||||||
|
pub fn on_disconnected<F>(&self, handler: F)
|
||||||
|
where F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static
|
||||||
|
{
|
||||||
|
self.on_disconnected_handler.store(Some(Arc::new(Box::new(handler))));
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Add a handler for a certain message type. Note that only one handler
|
||||||
|
/// can be specified for each message type.
|
||||||
pub fn add_msg_handler<M, F, R>(&self, handler: F)
|
pub fn add_msg_handler<M, F, R>(&self, handler: F)
|
||||||
where
|
where
|
||||||
M: Message + 'static,
|
M: Message + 'static,
|
||||||
|
@ -122,10 +157,10 @@ impl NetApp {
|
||||||
{
|
{
|
||||||
let handler = Arc::new(handler);
|
let handler = Arc::new(handler);
|
||||||
|
|
||||||
let handler1 = handler.clone();
|
let handler2 = handler.clone();
|
||||||
let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
|
let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| {
|
||||||
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
|
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
|
||||||
Box::pin(net_handler_aux(handler1.clone(), remote, bytes));
|
Box::pin(net_handler_aux(handler2.clone(), remote, bytes));
|
||||||
fun
|
fun
|
||||||
});
|
});
|
||||||
|
|
||||||
|
@ -146,6 +181,8 @@ impl NetApp {
|
||||||
self.msg_handlers.store(Arc::new(handlers));
|
self.msg_handlers.store(Arc::new(handlers));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Main listening process for our app. This future runs during the whole
|
||||||
|
/// run time of our application.
|
||||||
pub async fn listen(self: Arc<Self>) {
|
pub async fn listen(self: Arc<Self>) {
|
||||||
let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
|
let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
|
||||||
info!("Listening on {}", self.listen_addr);
|
info!("Listening on {}", self.listen_addr);
|
||||||
|
@ -166,16 +203,21 @@ impl NetApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
||||||
|
/// The public key will be checked during the secret handshake process.
|
||||||
|
/// This function returns once the connection has been established and a
|
||||||
|
/// successfull handshake was made. At this point we can send messages to
|
||||||
|
/// the other node with `Netapp::request`
|
||||||
pub async fn try_connect(
|
pub async fn try_connect(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
ip: SocketAddr,
|
ip: SocketAddr,
|
||||||
pk: ed25519::PublicKey,
|
pk: ed25519::PublicKey,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
if pk == self.pubkey {
|
|
||||||
// Don't connect to ourself, we don't care
|
// Don't connect to ourself, we don't care
|
||||||
// but pretend we did
|
// but pretend we did
|
||||||
|
if pk == self.pubkey {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Some(h) = self.on_connected.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
h(pk, ip, false);
|
h(pk, ip, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -193,11 +235,16 @@ impl NetApp {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn disconnect(self: Arc<Self>, pk: &ed25519::PublicKey) {
|
/// Close the outgoing connection we have to a node specified by its public key,
|
||||||
|
/// if such a connection is currently open.
|
||||||
|
pub fn disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) {
|
||||||
|
// Don't disconnect from ourself (we aren't connected anyways)
|
||||||
|
// but pretend we did
|
||||||
if *pk == self.pubkey {
|
if *pk == self.pubkey {
|
||||||
let pk = *pk;
|
let pk = *pk;
|
||||||
|
let self2 = self.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
if let Some(h) = self.on_disconnected.load().as_ref() {
|
if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
|
||||||
h(pk, false);
|
h(pk, false);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
@ -206,10 +253,30 @@ impl NetApp {
|
||||||
|
|
||||||
let conn = self.client_conns.read().unwrap().get(pk).cloned();
|
let conn = self.client_conns.read().unwrap().get(pk).cloned();
|
||||||
if let Some(c) = conn {
|
if let Some(c) = conn {
|
||||||
|
debug!("Closing connection to {} ({})",
|
||||||
|
hex::encode(c.peer_pk),
|
||||||
|
c.remote_addr);
|
||||||
c.close();
|
c.close();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Close the incoming connection from a certain client to us,
|
||||||
|
/// if such a connection is currently open.
|
||||||
|
pub fn server_disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) {
|
||||||
|
let conn = self.server_conns.read().unwrap().get(pk).cloned();
|
||||||
|
if let Some(c) = conn {
|
||||||
|
debug!("Closing incoming connection from {} ({})",
|
||||||
|
hex::encode(c.peer_pk),
|
||||||
|
c.remote_addr);
|
||||||
|
c.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Called from conn.rs when an incoming connection is successfully established
|
||||||
|
// Registers the connection in our list of connections
|
||||||
|
// Do not yet call the on_connected handler, because we don't know if the remote
|
||||||
|
// has an actual IP address and port we can call them back on.
|
||||||
|
// We will know this when they send a Hello message, which is handled below.
|
||||||
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
|
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
|
||||||
info!("Accepted connection from {}", hex::encode(id));
|
info!("Accepted connection from {}", hex::encode(id));
|
||||||
|
|
||||||
|
@ -217,8 +284,13 @@ impl NetApp {
|
||||||
conn_list.insert(id.clone(), conn);
|
conn_list.insert(id.clone(), conn);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Handle hello message from a client. This message is used for them to tell us
|
||||||
|
// that they are listening on a certain port number on which we can call them back.
|
||||||
|
// At this point we know they are a full network member, and not just a client,
|
||||||
|
// and we call the on_connected handler so that the peering strategy knows
|
||||||
|
// we have a new potential peer
|
||||||
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
|
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
|
||||||
if let Some(h) = self.on_connected.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
||||||
let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port);
|
let remote_addr = SocketAddr::new(c.remote_addr.ip(), msg.server_port);
|
||||||
h(id, remote_addr, true);
|
h(id, remote_addr, true);
|
||||||
|
@ -226,6 +298,9 @@ impl NetApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Called from conn.rs when an incoming connection is closed.
|
||||||
|
// We deregister the connection from server_conns and call the
|
||||||
|
// handler registered by on_disconnected
|
||||||
pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) {
|
pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) {
|
||||||
info!("Connection from {} closed", hex::encode(id));
|
info!("Connection from {} closed", hex::encode(id));
|
||||||
|
|
||||||
|
@ -235,12 +310,19 @@ impl NetApp {
|
||||||
conn_list.remove(id);
|
conn_list.remove(id);
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(h) = self.on_disconnected.load().as_ref() {
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
||||||
h(conn.peer_pk, true);
|
h(conn.peer_pk, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Called from conn.rs when an outgoinc connection is successfully established.
|
||||||
|
// The connection is registered in self.client_conns, and the
|
||||||
|
// on_connected handler is called.
|
||||||
|
//
|
||||||
|
// Since we are ourself listening, we send them a Hello message so that
|
||||||
|
// they know on which port to call us back. (TODO: don't do this if we are
|
||||||
|
// just a simple client and not a full p2p node)
|
||||||
pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) {
|
pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) {
|
||||||
info!("Connection established to {}", hex::encode(id));
|
info!("Connection established to {}", hex::encode(id));
|
||||||
|
|
||||||
|
@ -251,32 +333,39 @@ impl NetApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(h) = self.on_connected.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
h(conn.peer_pk, conn.remote_addr, false);
|
h(conn.peer_pk, conn.remote_addr, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let server_port = self.listen_addr.port();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let server_port = conn.netapp.listen_addr.port();
|
conn.request(HelloMessage { server_port }, PRIO_NORMAL)
|
||||||
conn.request(HelloMessage { server_port }, prio::NORMAL)
|
|
||||||
.await
|
.await
|
||||||
.log_err("Sending hello message");
|
.log_err("Sending hello message");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Called from conn.rs when an outgoinc connection is closed.
|
||||||
|
// The connection is removed from conn_list, and the on_disconnected handler
|
||||||
|
// is called.
|
||||||
pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) {
|
pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) {
|
||||||
info!("Connection to {} closed", hex::encode(id));
|
info!("Connection to {} closed", hex::encode(id));
|
||||||
let mut conn_list = self.client_conns.write().unwrap();
|
let mut conn_list = self.client_conns.write().unwrap();
|
||||||
if let Some(c) = conn_list.get(id) {
|
if let Some(c) = conn_list.get(id) {
|
||||||
if Arc::ptr_eq(c, &conn) {
|
if Arc::ptr_eq(c, &conn) {
|
||||||
conn_list.remove(id);
|
conn_list.remove(id);
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(h) = self.on_disconnected.load().as_ref() {
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
||||||
h(conn.peer_pk, false);
|
h(conn.peer_pk, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Send a message to a remote host to which a client connection is already
|
||||||
|
/// established, and await their response. The target is the id of the peer we
|
||||||
|
/// want to send the message to.
|
||||||
|
/// The priority is an `u8`, with lower numbers meaning highest priority.
|
||||||
pub async fn request<T>(
|
pub async fn request<T>(
|
||||||
&self,
|
&self,
|
||||||
target: &ed25519::PublicKey,
|
target: &ed25519::PublicKey,
|
||||||
|
|
|
@ -264,18 +264,18 @@ impl Basalt {
|
||||||
});
|
});
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
let basalt2 = basalt.clone();
|
||||||
netapp.on_connected.store(Some(Arc::new(Box::new(
|
netapp.on_connected(
|
||||||
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
|
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
|
||||||
basalt2.on_connected(pk, addr, is_incoming);
|
basalt2.on_connected(pk, addr, is_incoming);
|
||||||
},
|
}
|
||||||
))));
|
);
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
let basalt2 = basalt.clone();
|
||||||
netapp.on_disconnected.store(Some(Arc::new(Box::new(
|
netapp.on_disconnected(
|
||||||
move |pk: ed25519::PublicKey, is_incoming: bool| {
|
move |pk: ed25519::PublicKey, is_incoming: bool| {
|
||||||
basalt2.on_disconnected(pk, is_incoming);
|
basalt2.on_disconnected(pk, is_incoming);
|
||||||
},
|
},
|
||||||
))));
|
);
|
||||||
|
|
||||||
let basalt2 = basalt.clone();
|
let basalt2 = basalt.clone();
|
||||||
netapp.add_msg_handler::<PullMessage, _, _>(
|
netapp.add_msg_handler::<PullMessage, _, _>(
|
||||||
|
@ -331,7 +331,7 @@ impl Basalt {
|
||||||
async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) {
|
async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) {
|
||||||
match self
|
match self
|
||||||
.netapp
|
.netapp
|
||||||
.request(&peer, PullMessage {}, prio::NORMAL)
|
.request(&peer, PullMessage {}, PRIO_NORMAL)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
|
@ -345,7 +345,7 @@ impl Basalt {
|
||||||
|
|
||||||
async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) {
|
async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) {
|
||||||
let push_msg = self.make_push_message();
|
let push_msg = self.make_push_message();
|
||||||
if let Err(e) = self.netapp.request(&peer, push_msg, prio::NORMAL).await {
|
if let Err(e) = self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
|
||||||
warn!("Error during push exchange: {}", e);
|
warn!("Error during push exchange: {}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -448,17 +448,9 @@ impl Basalt {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
|
fn close_all_diff(&self, prev_peers: &HashSet<Peer>, new_peers: &HashSet<Peer>) {
|
||||||
let client_conns = self.netapp.client_conns.read().unwrap();
|
|
||||||
for peer in prev_peers.iter() {
|
for peer in prev_peers.iter() {
|
||||||
if !new_peers.contains(peer) {
|
if !new_peers.contains(peer) {
|
||||||
if let Some(c) = client_conns.get(&peer.id) {
|
self.netapp.disconnect(&peer.id);
|
||||||
debug!(
|
|
||||||
"Closing connection to {} ({})",
|
|
||||||
hex::encode(peer.id),
|
|
||||||
peer.addr
|
|
||||||
);
|
|
||||||
c.close();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -177,20 +177,20 @@ impl FullMeshPeeringStrategy {
|
||||||
);
|
);
|
||||||
|
|
||||||
let strat2 = strat.clone();
|
let strat2 = strat.clone();
|
||||||
netapp.on_connected.store(Some(Arc::new(Box::new(
|
netapp.on_connected(
|
||||||
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
|
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| {
|
||||||
let strat2 = strat2.clone();
|
let strat2 = strat2.clone();
|
||||||
tokio::spawn(strat2.on_connected(pk, addr, is_incoming));
|
tokio::spawn(strat2.on_connected(pk, addr, is_incoming));
|
||||||
},
|
},
|
||||||
))));
|
);
|
||||||
|
|
||||||
let strat2 = strat.clone();
|
let strat2 = strat.clone();
|
||||||
netapp.on_disconnected.store(Some(Arc::new(Box::new(
|
netapp.on_disconnected(
|
||||||
move |pk: ed25519::PublicKey, is_incoming: bool| {
|
move |pk: ed25519::PublicKey, is_incoming: bool| {
|
||||||
let strat2 = strat2.clone();
|
let strat2 = strat2.clone();
|
||||||
tokio::spawn(strat2.on_disconnected(pk, is_incoming));
|
tokio::spawn(strat2.on_disconnected(pk, is_incoming));
|
||||||
},
|
},
|
||||||
))));
|
);
|
||||||
|
|
||||||
strat
|
strat
|
||||||
}
|
}
|
||||||
|
@ -271,7 +271,7 @@ impl FullMeshPeeringStrategy {
|
||||||
hex::encode(id),
|
hex::encode(id),
|
||||||
ping_time
|
ping_time
|
||||||
);
|
);
|
||||||
match self.netapp.request(&id, ping_msg, prio::HIGH).await {
|
match self.netapp.request(&id, ping_msg, PRIO_HIGH).await {
|
||||||
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
|
Err(e) => warn!("Error pinging {}: {}", hex::encode(id), e),
|
||||||
Ok(ping_resp) => {
|
Ok(ping_resp) => {
|
||||||
let resp_time = Instant::now();
|
let resp_time = Instant::now();
|
||||||
|
@ -300,7 +300,7 @@ impl FullMeshPeeringStrategy {
|
||||||
async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) {
|
async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) {
|
||||||
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
|
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
|
||||||
let pex_message = PeerListMessage { list: peer_list };
|
let pex_message = PeerListMessage { list: peer_list };
|
||||||
match self.netapp.request(id, pex_message, prio::BACKGROUND).await {
|
match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await {
|
||||||
Err(e) => warn!("Error doing peer exchange: {}", e),
|
Err(e) => warn!("Error doing peer exchange: {}", e),
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
self.handle_peer_list(&resp.list[..]);
|
self.handle_peer_list(&resp.list[..]);
|
||||||
|
|
39
src/proto.rs
39
src/proto.rs
|
@ -16,19 +16,36 @@ use crate::error::*;
|
||||||
|
|
||||||
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat};
|
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat};
|
||||||
|
|
||||||
|
/// Priority of a request (click to read more about priorities).
|
||||||
|
///
|
||||||
|
/// This priority value is used to priorize messages
|
||||||
|
/// in the send queue of the client, and their responses in the send queue of the
|
||||||
|
/// server. Lower values mean higher priority.
|
||||||
|
///
|
||||||
|
/// This mechanism is usefull for messages bigger than the maximum chunk size
|
||||||
|
/// (set at `0x4000` bytes), such as large file transfers.
|
||||||
|
/// In such case, all of the messages in the send queue with the highest priority
|
||||||
|
/// will take turns to send individual chunks, in a round-robin fashion.
|
||||||
|
/// Once all highest priority messages are sent successfully, the messages with
|
||||||
|
/// the next highest priority will begin being sent in the same way.
|
||||||
|
///
|
||||||
|
/// The same priority value is given to a request and to its associated response.
|
||||||
|
pub type RequestPriority = u8;
|
||||||
|
|
||||||
|
/// Priority class: high
|
||||||
|
pub const PRIO_HIGH: RequestPriority = 0x20;
|
||||||
|
/// Priority class: normal
|
||||||
|
pub const PRIO_NORMAL: RequestPriority = 0x40;
|
||||||
|
/// Priority class: background
|
||||||
|
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
|
||||||
|
/// Priority: primary among given class
|
||||||
|
pub const PRIO_PRIMARY: RequestPriority = 0x00;
|
||||||
|
/// Priority: secondary among given class (ex: `PRIO_HIGH || PRIO_SECONDARY`)
|
||||||
|
pub const PRIO_SECONDARY: RequestPriority = 0x01;
|
||||||
|
|
||||||
const MAX_CHUNK_SIZE: usize = 0x4000;
|
const MAX_CHUNK_SIZE: usize = 0x4000;
|
||||||
|
|
||||||
pub mod prio {
|
pub(crate) type RequestID = u16;
|
||||||
pub const HIGH: u8 = 0x20;
|
|
||||||
pub const NORMAL: u8 = 0x40;
|
|
||||||
pub const BACKGROUND: u8 = 0x80;
|
|
||||||
|
|
||||||
pub const PRIMARY: u8 = 0x00;
|
|
||||||
pub const SECONDARY: u8 = 0x01;
|
|
||||||
}
|
|
||||||
|
|
||||||
pub type RequestID = u16;
|
|
||||||
pub type RequestPriority = u8;
|
|
||||||
|
|
||||||
struct SendQueueItem {
|
struct SendQueueItem {
|
||||||
id: RequestID,
|
id: RequestID,
|
||||||
|
|
|
@ -1,6 +1,10 @@
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
// util
|
/// Utility function: encodes any serializable value in MessagePack binary format
|
||||||
|
/// using the RMP library.
|
||||||
|
///
|
||||||
|
/// Field names and variant names are included in the serialization.
|
||||||
|
/// This is used internally by the netapp communication protocol.
|
||||||
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
pub fn rmp_to_vec_all_named<T>(val: &T) -> Result<Vec<u8>, rmp_serde::encode::Error>
|
||||||
where
|
where
|
||||||
T: Serialize + ?Sized,
|
T: Serialize + ?Sized,
|
||||||
|
|
Loading…
Reference in a new issue