Replace pk,pubkey,PublicKey by id,NodeID,etc

This commit is contained in:
Alex 2020-12-12 21:14:15 +01:00
parent 5040198972
commit 6742638c81
8 changed files with 150 additions and 180 deletions

View file

@ -15,6 +15,7 @@ use netapp::message::*;
use netapp::peering::basalt::*; use netapp::peering::basalt::*;
use netapp::proto::*; use netapp::proto::*;
use netapp::NetApp; use netapp::NetApp;
use netapp::NodeID;
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
#[structopt(name = "netapp")] #[structopt(name = "netapp")]
@ -91,7 +92,7 @@ async fn main() {
for peer in opt.bootstrap_peers.iter() { for peer in opt.bootstrap_peers.iter() {
if let Some(delim) = peer.find('@') { if let Some(delim) = peer.find('@') {
let (key, ip) = peer.split_at(delim); let (key, ip) = peer.split_at(delim);
let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); let pubkey = NodeID::from_slice(&hex::decode(&key).unwrap()).unwrap();
let ip = ip[1..].parse::<SocketAddr>().unwrap(); let ip = ip[1..].parse::<SocketAddr>().unwrap();
bootstrap_peers.push((pubkey, ip)); bootstrap_peers.push((pubkey, ip));
} }
@ -106,16 +107,14 @@ async fn main() {
}; };
let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params); let peering = Basalt::new(netapp.clone(), bootstrap_peers, basalt_params);
netapp.add_msg_handler::<ExampleMessage, _, _>( netapp.add_msg_handler::<ExampleMessage, _, _>(|_from: NodeID, msg: ExampleMessage| {
|_from: ed25519::PublicKey, msg: ExampleMessage| { debug!("Got example message: {:?}, sending example response", msg);
debug!("Got example message: {:?}, sending example response", msg); async {
async { ExampleResponse {
ExampleResponse { example_field: false,
example_field: false,
}
} }
}, }
); });
let listen_addr = opt.listen_addr.parse().unwrap(); let listen_addr = opt.listen_addr.parse().unwrap();
let public_addr = opt.public_addr.map(|x| x.parse().unwrap()); let public_addr = opt.public_addr.map(|x| x.parse().unwrap());

View file

@ -10,6 +10,7 @@ use sodiumoxide::crypto::sign::ed25519;
use netapp::peering::fullmesh::*; use netapp::peering::fullmesh::*;
use netapp::NetApp; use netapp::NetApp;
use netapp::NodeID;
#[derive(StructOpt, Debug)] #[derive(StructOpt, Debug)]
#[structopt(name = "netapp")] #[structopt(name = "netapp")]
@ -71,7 +72,7 @@ async fn main() {
for peer in opt.bootstrap_peers.iter() { for peer in opt.bootstrap_peers.iter() {
if let Some(delim) = peer.find('@') { if let Some(delim) = peer.find('@') {
let (key, ip) = peer.split_at(delim); let (key, ip) = peer.split_at(delim);
let pubkey = ed25519::PublicKey::from_slice(&hex::decode(&key).unwrap()).unwrap(); let pubkey = NodeID::from_slice(&hex::decode(&key).unwrap()).unwrap();
let ip = ip[1..].parse::<SocketAddr>().unwrap(); let ip = ip[1..].parse::<SocketAddr>().unwrap();
bootstrap_peers.push((pubkey, ip)); bootstrap_peers.push((pubkey, ip));
} }

View file

@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex};
use bytes::Bytes; use bytes::Bytes;
use log::{debug, error, trace}; use log::{debug, error, trace};
use sodiumoxide::crypto::sign::ed25519;
use tokio::io::split; use tokio::io::split;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
@ -26,7 +25,7 @@ use crate::util::*;
pub(crate) struct ServerConn { pub(crate) struct ServerConn {
pub(crate) remote_addr: SocketAddr, pub(crate) remote_addr: SocketAddr,
pub(crate) peer_pk: ed25519::PublicKey, pub(crate) peer_id: NodeID,
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
@ -40,18 +39,18 @@ impl ServerConn {
let handshake = handshake_server( let handshake = handshake_server(
&mut asyncstd_socket, &mut asyncstd_socket,
netapp.netid.clone(), netapp.netid.clone(),
netapp.pubkey.clone(), netapp.id.clone(),
netapp.privkey.clone(), netapp.privkey.clone(),
) )
.await?; .await?;
let peer_pk = handshake.peer_pk.clone(); let peer_id = handshake.peer_pk.clone();
let tokio_socket = asyncstd_socket.into_inner(); let tokio_socket = asyncstd_socket.into_inner();
let remote_addr = tokio_socket.peer_addr().unwrap(); let remote_addr = tokio_socket.peer_addr().unwrap();
debug!( debug!(
"Handshake complete (server) with {}@{}", "Handshake complete (server) with {}@{}",
hex::encode(&peer_pk), hex::encode(&peer_id),
remote_addr remote_addr
); );
@ -70,12 +69,12 @@ impl ServerConn {
let conn = Arc::new(ServerConn { let conn = Arc::new(ServerConn {
netapp: netapp.clone(), netapp: netapp.clone(),
remote_addr, remote_addr,
peer_pk: peer_pk.clone(), peer_id: peer_id.clone(),
resp_send, resp_send,
close_send, close_send,
}); });
netapp.connected_as_server(peer_pk.clone(), conn.clone()); netapp.connected_as_server(peer_id.clone(), conn.clone());
let conn2 = conn.clone(); let conn2 = conn.clone();
let conn3 = conn.clone(); let conn3 = conn.clone();
@ -97,7 +96,7 @@ impl ServerConn {
.map(|_| ()) .map(|_| ())
.log_err("ServerConn recv_loop/send_loop"); .log_err("ServerConn recv_loop/send_loop");
netapp.disconnected_as_server(&peer_pk, conn); netapp.disconnected_as_server(&peer_id, conn);
Ok(()) Ok(())
} }
@ -124,7 +123,7 @@ impl RecvLoop for ServerConn {
if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) { if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) {
let net_handler = &handler.net_handler; let net_handler = &handler.net_handler;
let resp = net_handler(self.peer_pk.clone(), bytes.slice(5..)).await; let resp = net_handler(self.peer_id.clone(), bytes.slice(5..)).await;
self.resp_send self.resp_send
.send(Some((id, prio, resp))) .send(Some((id, prio, resp)))
.log_err("ServerConn recv_handler send resp"); .log_err("ServerConn recv_handler send resp");
@ -133,7 +132,7 @@ impl RecvLoop for ServerConn {
} }
pub(crate) struct ClientConn { pub(crate) struct ClientConn {
pub(crate) remote_addr: SocketAddr, pub(crate) remote_addr: SocketAddr,
pub(crate) peer_pk: ed25519::PublicKey, pub(crate) peer_id: NodeID,
query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>, query_send: mpsc::UnboundedSender<Option<(RequestID, RequestPriority, Vec<u8>)>>,
@ -147,16 +146,16 @@ impl ClientConn {
pub(crate) async fn init( pub(crate) async fn init(
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
socket: TcpStream, socket: TcpStream,
remote_pk: ed25519::PublicKey, peer_id: NodeID,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut asyncstd_socket = TokioCompatExt::wrap(socket); let mut asyncstd_socket = TokioCompatExt::wrap(socket);
let handshake = handshake_client( let handshake = handshake_client(
&mut asyncstd_socket, &mut asyncstd_socket,
netapp.netid.clone(), netapp.netid.clone(),
netapp.pubkey.clone(), netapp.id.clone(),
netapp.privkey.clone(), netapp.privkey.clone(),
remote_pk.clone(), peer_id.clone(),
) )
.await?; .await?;
@ -165,7 +164,7 @@ impl ClientConn {
debug!( debug!(
"Handshake complete (client) with {}@{}", "Handshake complete (client) with {}@{}",
hex::encode(&remote_pk), hex::encode(&peer_id),
remote_addr remote_addr
); );
@ -183,7 +182,7 @@ impl ClientConn {
let conn = Arc::new(ClientConn { let conn = Arc::new(ClientConn {
remote_addr, remote_addr,
peer_pk: remote_pk.clone(), peer_id: peer_id.clone(),
next_query_number: AtomicU16::from(0u16), next_query_number: AtomicU16::from(0u16),
query_send, query_send,
inflight: Mutex::new(HashMap::new()), inflight: Mutex::new(HashMap::new()),
@ -191,7 +190,7 @@ impl ClientConn {
stop_recv_loop, stop_recv_loop,
}); });
netapp.connected_as_client(remote_pk.clone(), conn.clone()); netapp.connected_as_client(peer_id.clone(), conn.clone());
tokio::spawn(async move { tokio::spawn(async move {
let conn2 = conn.clone(); let conn2 = conn.clone();
@ -205,7 +204,7 @@ impl ClientConn {
.map(|_| ()) .map(|_| ())
.log_err("ClientConn send_loop/recv_loop/dispatch_loop"); .log_err("ClientConn send_loop/recv_loop/dispatch_loop");
netapp.disconnected_as_client(&remote_pk, conn); netapp.disconnected_as_client(&peer_id, conn);
}); });
Ok(()) Ok(())

View file

@ -27,3 +27,4 @@ pub mod netapp;
pub mod peering; pub mod peering;
pub use netapp::*; pub use netapp::*;
pub use util::NodeID;

View file

@ -28,9 +28,7 @@ pub(crate) struct Handler {
pub(crate) local_handler: pub(crate) local_handler:
Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>, Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>,
pub(crate) net_handler: Box< pub(crate) net_handler: Box<
dyn Fn(ed25519::PublicKey, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send,
+ Sync
+ Send,
>, >,
} }
@ -49,17 +47,19 @@ pub(crate) struct Handler {
pub struct NetApp { pub struct NetApp {
listen_params: ArcSwapOption<ListenParams>, listen_params: ArcSwapOption<ListenParams>,
/// Network secret key
pub netid: auth::Key, pub netid: auth::Key,
pub pubkey: ed25519::PublicKey, /// Our peer ID
pub id: NodeID,
/// Private key associated with our peer ID
pub privkey: ed25519::SecretKey, pub privkey: ed25519::SecretKey,
server_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ServerConn>>>, server_conns: RwLock<HashMap<NodeID, Arc<ServerConn>>>,
client_conns: RwLock<HashMap<ed25519::PublicKey, Arc<ClientConn>>>, client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>, pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
on_connected_handler: on_connected_handler: ArcSwapOption<Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>>,
ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, SocketAddr, bool) + Send + Sync>>, on_disconnected_handler: ArcSwapOption<Box<dyn Fn(NodeID, bool) + Send + Sync>>,
on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
} }
struct ListenParams { struct ListenParams {
@ -67,14 +67,10 @@ struct ListenParams {
public_addr: Option<IpAddr>, public_addr: Option<IpAddr>,
} }
async fn net_handler_aux<M, F, R>( async fn net_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, bytes: Bytes) -> Vec<u8>
handler: Arc<F>,
remote: ed25519::PublicKey,
bytes: Bytes,
) -> Vec<u8>
where where
M: Message + 'static, M: Message + 'static,
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, F: Fn(NodeID, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync, R: Future<Output = <M as Message>::Response> + Send + Sync,
{ {
debug!( debug!(
@ -97,14 +93,10 @@ where
rmp_to_vec_all_named(&res).unwrap_or(vec![]) rmp_to_vec_all_named(&res).unwrap_or(vec![])
} }
async fn local_handler_aux<M, F, R>( async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg
handler: Arc<F>,
remote: ed25519::PublicKey,
msg: DynMsg,
) -> DynMsg
where where
M: Message + 'static, M: Message + 'static,
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, F: Fn(NodeID, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync, R: Future<Output = <M as Message>::Response> + Send + Sync,
{ {
debug!("Handling message of kind {:08x} from ourself", M::KIND); debug!("Handling message of kind {:08x} from ourself", M::KIND);
@ -117,12 +109,14 @@ impl NetApp {
/// Creates a new instance of NetApp, which can serve either as a full p2p node, /// Creates a new instance of NetApp, which can serve either as a full p2p node,
/// or just as a passive client. To upgrade to a full p2p node, spawn a listener /// or just as a passive client. To upgrade to a full p2p node, spawn a listener
/// using `.listen()` /// using `.listen()`
///
/// Our Peer ID is the public key associated to the secret key given here.
pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> { pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
let pubkey = privkey.public_key(); let id = privkey.public_key();
let netapp = Arc::new(Self { let netapp = Arc::new(Self {
listen_params: ArcSwapOption::new(None), listen_params: ArcSwapOption::new(None),
netid, netid,
pubkey, id,
privkey, privkey,
server_conns: RwLock::new(HashMap::new()), server_conns: RwLock::new(HashMap::new()),
client_conns: RwLock::new(HashMap::new()), client_conns: RwLock::new(HashMap::new()),
@ -132,12 +126,10 @@ impl NetApp {
}); });
let netapp2 = netapp.clone(); let netapp2 = netapp.clone();
netapp.add_msg_handler::<HelloMessage, _, _>( netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| {
move |from: ed25519::PublicKey, msg: HelloMessage| { netapp2.handle_hello_message(from, msg);
netapp2.handle_hello_message(from, msg); async { () }
async { () } });
},
);
netapp netapp
} }
@ -147,7 +139,7 @@ impl NetApp {
/// as the peering strategy will need to set this itself. /// as the peering strategy will need to set this itself.
pub fn on_connected<F>(&self, handler: F) pub fn on_connected<F>(&self, handler: F)
where where
F: Fn(ed25519::PublicKey, SocketAddr, bool) + Sized + Send + Sync + 'static, F: Fn(NodeID, SocketAddr, bool) + Sized + Send + Sync + 'static,
{ {
self.on_connected_handler self.on_connected_handler
.store(Some(Arc::new(Box::new(handler)))); .store(Some(Arc::new(Box::new(handler))));
@ -158,7 +150,7 @@ impl NetApp {
/// as the peering strategy will need to set this itself. /// as the peering strategy will need to set this itself.
pub fn on_disconnected<F>(&self, handler: F) pub fn on_disconnected<F>(&self, handler: F)
where where
F: Fn(ed25519::PublicKey, bool) + Sized + Send + Sync + 'static, F: Fn(NodeID, bool) + Sized + Send + Sync + 'static,
{ {
self.on_disconnected_handler self.on_disconnected_handler
.store(Some(Arc::new(Box::new(handler)))); .store(Some(Arc::new(Box::new(handler))));
@ -169,19 +161,19 @@ impl NetApp {
pub fn add_msg_handler<M, F, R>(&self, handler: F) pub fn add_msg_handler<M, F, R>(&self, handler: F)
where where
M: Message + 'static, M: Message + 'static,
F: Fn(ed25519::PublicKey, M) -> R + Send + Sync + 'static, F: Fn(NodeID, M) -> R + Send + Sync + 'static,
R: Future<Output = <M as Message>::Response> + Send + Sync + 'static, R: Future<Output = <M as Message>::Response> + Send + Sync + 'static,
{ {
let handler = Arc::new(handler); let handler = Arc::new(handler);
let handler2 = handler.clone(); let handler2 = handler.clone();
let net_handler = Box::new(move |remote: ed25519::PublicKey, bytes: Bytes| { let net_handler = Box::new(move |remote: NodeID, bytes: Bytes| {
let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> = let fun: Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> =
Box::pin(net_handler_aux(handler2.clone(), remote, bytes)); Box::pin(net_handler_aux(handler2.clone(), remote, bytes));
fun fun
}); });
let self_id = self.pubkey.clone(); let self_id = self.id.clone();
let local_handler = Box::new(move |msg: DynMsg| { let local_handler = Box::new(move |msg: DynMsg| {
let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> = let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
Box::pin(local_handler_aux(handler.clone(), self_id, msg)); Box::pin(local_handler_aux(handler.clone(), self_id, msg));
@ -232,43 +224,39 @@ impl NetApp {
/// This function returns once the connection has been established and a /// This function returns once the connection has been established and a
/// successfull handshake was made. At this point we can send messages to /// successfull handshake was made. At this point we can send messages to
/// the other node with `Netapp::request` /// the other node with `Netapp::request`
pub async fn try_connect( pub async fn try_connect(self: Arc<Self>, ip: SocketAddr, id: NodeID) -> Result<(), Error> {
self: Arc<Self>,
ip: SocketAddr,
pk: ed25519::PublicKey,
) -> Result<(), Error> {
// Don't connect to ourself, we don't care // Don't connect to ourself, we don't care
// but pretend we did // but pretend we did
if pk == self.pubkey { if id == self.id {
tokio::spawn(async move { tokio::spawn(async move {
if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(h) = self.on_connected_handler.load().as_ref() {
h(pk, ip, false); h(id, ip, false);
} }
}); });
return Ok(()); return Ok(());
} }
// Don't connect if already connected // Don't connect if already connected
if self.client_conns.read().unwrap().contains_key(&pk) { if self.client_conns.read().unwrap().contains_key(&id) {
return Ok(()); return Ok(());
} }
let socket = TcpStream::connect(ip).await?; let socket = TcpStream::connect(ip).await?;
info!("Connected to {}, negotiating handshake...", ip); info!("Connected to {}, negotiating handshake...", ip);
ClientConn::init(self, socket, pk.clone()).await?; ClientConn::init(self, socket, id.clone()).await?;
Ok(()) Ok(())
} }
/// Close the outgoing connection we have to a node specified by its public key, /// Close the outgoing connection we have to a node specified by its public key,
/// if such a connection is currently open. /// if such a connection is currently open.
pub fn disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) { pub fn disconnect(self: &Arc<Self>, id: &NodeID) {
// If pk is ourself, we're not supposed to have a connection open // If id is ourself, we're not supposed to have a connection open
if *pk != self.pubkey { if *id != self.id {
let conn = self.client_conns.write().unwrap().remove(pk); let conn = self.client_conns.write().unwrap().remove(id);
if let Some(c) = conn { if let Some(c) = conn {
debug!( debug!(
"Closing connection to {} ({})", "Closing connection to {} ({})",
hex::encode(c.peer_pk), hex::encode(c.peer_id),
c.remote_addr c.remote_addr
); );
c.close(); c.close();
@ -279,24 +267,24 @@ impl NetApp {
// call on_disconnected_handler immediately, since the connection // call on_disconnected_handler immediately, since the connection
// was removed // was removed
// (if pk == self.pubkey, we pretend we disconnected) // (if id == self.id, we pretend we disconnected)
let pk = *pk; let id = *id;
let self2 = self.clone(); let self2 = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
if let Some(h) = self2.on_disconnected_handler.load().as_ref() { if let Some(h) = self2.on_disconnected_handler.load().as_ref() {
h(pk, false); h(id, false);
} }
}); });
} }
/// Close the incoming connection from a certain client to us, /// Close the incoming connection from a certain client to us,
/// if such a connection is currently open. /// if such a connection is currently open.
pub fn server_disconnect(self: &Arc<Self>, pk: &ed25519::PublicKey) { pub fn server_disconnect(self: &Arc<Self>, id: &NodeID) {
let conn = self.server_conns.read().unwrap().get(pk).cloned(); let conn = self.server_conns.read().unwrap().get(id).cloned();
if let Some(c) = conn { if let Some(c) = conn {
debug!( debug!(
"Closing incoming connection from {} ({})", "Closing incoming connection from {} ({})",
hex::encode(c.peer_pk), hex::encode(c.peer_id),
c.remote_addr c.remote_addr
); );
c.close(); c.close();
@ -308,7 +296,7 @@ impl NetApp {
// Do not yet call the on_connected handler, because we don't know if the remote // Do not yet call the on_connected handler, because we don't know if the remote
// has an actual IP address and port we can call them back on. // has an actual IP address and port we can call them back on.
// We will know this when they send a Hello message, which is handled below. // We will know this when they send a Hello message, which is handled below.
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) { pub(crate) fn connected_as_server(&self, id: NodeID, conn: Arc<ServerConn>) {
info!("Accepted connection from {}", hex::encode(id)); info!("Accepted connection from {}", hex::encode(id));
self.server_conns.write().unwrap().insert(id, conn); self.server_conns.write().unwrap().insert(id, conn);
@ -319,7 +307,7 @@ impl NetApp {
// At this point we know they are a full network member, and not just a client, // At this point we know they are a full network member, and not just a client,
// and we call the on_connected handler so that the peering strategy knows // and we call the on_connected handler so that the peering strategy knows
// we have a new potential peer // we have a new potential peer
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) { fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) {
if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(h) = self.on_connected_handler.load().as_ref() {
if let Some(c) = self.server_conns.read().unwrap().get(&id) { if let Some(c) = self.server_conns.read().unwrap().get(&id) {
let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip()); let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip());
@ -332,7 +320,7 @@ impl NetApp {
// Called from conn.rs when an incoming connection is closed. // Called from conn.rs when an incoming connection is closed.
// We deregister the connection from server_conns and call the // We deregister the connection from server_conns and call the
// handler registered by on_disconnected // handler registered by on_disconnected
pub(crate) fn disconnected_as_server(&self, id: &ed25519::PublicKey, conn: Arc<ServerConn>) { pub(crate) fn disconnected_as_server(&self, id: &NodeID, conn: Arc<ServerConn>) {
info!("Connection from {} closed", hex::encode(id)); info!("Connection from {} closed", hex::encode(id));
let mut conn_list = self.server_conns.write().unwrap(); let mut conn_list = self.server_conns.write().unwrap();
@ -342,7 +330,7 @@ impl NetApp {
drop(conn_list); drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() { if let Some(h) = self.on_disconnected_handler.load().as_ref() {
h(conn.peer_pk, true); h(conn.peer_id, true);
} }
} }
} }
@ -355,7 +343,7 @@ impl NetApp {
// Since we are ourself listening, we send them a Hello message so that // Since we are ourself listening, we send them a Hello message so that
// they know on which port to call us back. (TODO: don't do this if we are // they know on which port to call us back. (TODO: don't do this if we are
// just a simple client and not a full p2p node) // just a simple client and not a full p2p node)
pub(crate) fn connected_as_client(&self, id: ed25519::PublicKey, conn: Arc<ClientConn>) { pub(crate) fn connected_as_client(&self, id: NodeID, conn: Arc<ClientConn>) {
info!("Connection established to {}", hex::encode(id)); info!("Connection established to {}", hex::encode(id));
{ {
@ -366,7 +354,7 @@ impl NetApp {
} }
if let Some(h) = self.on_connected_handler.load().as_ref() { if let Some(h) = self.on_connected_handler.load().as_ref() {
h(conn.peer_pk, conn.remote_addr, false); h(conn.peer_id, conn.remote_addr, false);
} }
if let Some(lp) = self.listen_params.load_full() { if let Some(lp) = self.listen_params.load_full() {
@ -389,7 +377,7 @@ impl NetApp {
// Called from conn.rs when an outgoinc connection is closed. // Called from conn.rs when an outgoinc connection is closed.
// The connection is removed from conn_list, and the on_disconnected handler // The connection is removed from conn_list, and the on_disconnected handler
// is called. // is called.
pub(crate) fn disconnected_as_client(&self, id: &ed25519::PublicKey, conn: Arc<ClientConn>) { pub(crate) fn disconnected_as_client(&self, id: &NodeID, conn: Arc<ClientConn>) {
info!("Connection to {} closed", hex::encode(id)); info!("Connection to {} closed", hex::encode(id));
let mut conn_list = self.client_conns.write().unwrap(); let mut conn_list = self.client_conns.write().unwrap();
if let Some(c) = conn_list.get(id) { if let Some(c) = conn_list.get(id) {
@ -398,7 +386,7 @@ impl NetApp {
drop(conn_list); drop(conn_list);
if let Some(h) = self.on_disconnected_handler.load().as_ref() { if let Some(h) = self.on_disconnected_handler.load().as_ref() {
h(conn.peer_pk, false); h(conn.peer_id, false);
} }
} }
} }
@ -412,14 +400,14 @@ impl NetApp {
/// The priority is an `u8`, with lower numbers meaning highest priority. /// The priority is an `u8`, with lower numbers meaning highest priority.
pub async fn request<T>( pub async fn request<T>(
&self, &self,
target: &ed25519::PublicKey, target: &NodeID,
rq: T, rq: T,
prio: RequestPriority, prio: RequestPriority,
) -> Result<<T as Message>::Response, Error> ) -> Result<<T as Message>::Response, Error>
where where
T: Message + 'static, T: Message + 'static,
{ {
if *target == self.pubkey { if *target == self.id {
let handler = self.msg_handlers.load().get(&T::KIND).cloned(); let handler = self.msg_handlers.load().get(&T::KIND).cloned();
match handler { match handler {
None => Err(Error::Message(format!( None => Err(Error::Message(format!(

View file

@ -9,11 +9,11 @@ use rand::{thread_rng, Rng};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash; use sodiumoxide::crypto::hash;
use sodiumoxide::crypto::sign::ed25519;
use crate::message::*; use crate::message::*;
use crate::netapp::*; use crate::netapp::*;
use crate::proto::*; use crate::proto::*;
use crate::NodeID;
// -- Protocol messages -- // -- Protocol messages --
@ -41,7 +41,7 @@ type Seed = [u8; 32];
#[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)] #[derive(Hash, Clone, Copy, Debug, PartialOrd, PartialEq, Eq, Serialize, Deserialize)]
struct Peer { struct Peer {
id: ed25519::PublicKey, id: NodeID,
addr: SocketAddr, addr: SocketAddr,
} }
@ -168,7 +168,7 @@ impl BasaltView {
} }
} }
fn disconnected(&mut self, id: ed25519::PublicKey) { fn disconnected(&mut self, id: NodeID) {
let mut cleared_slots = vec![]; let mut cleared_slots = vec![];
for i in 0..self.slots.len() { for i in 0..self.slots.len() {
if let Some(p) = self.slots[i].peer { if let Some(p) = self.slots[i].peer {
@ -248,7 +248,7 @@ pub struct Basalt {
impl Basalt { impl Basalt {
pub fn new( pub fn new(
netapp: Arc<NetApp>, netapp: Arc<NetApp>,
bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>, bootstrap_list: Vec<(NodeID, SocketAddr)>,
param: BasaltParams, param: BasaltParams,
) -> Arc<Self> { ) -> Arc<Self> {
let bootstrap_peers = bootstrap_list let bootstrap_peers = bootstrap_list
@ -272,37 +272,31 @@ impl Basalt {
}); });
let basalt2 = basalt.clone(); let basalt2 = basalt.clone();
netapp.on_connected( netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { basalt2.on_connected(id, addr, is_incoming);
basalt2.on_connected(pk, addr, is_incoming);
},
);
let basalt2 = basalt.clone();
netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| {
basalt2.on_disconnected(pk, is_incoming);
}); });
let basalt2 = basalt.clone(); let basalt2 = basalt.clone();
netapp.add_msg_handler::<PullMessage, _, _>( netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
move |_from: ed25519::PublicKey, _pullmsg: PullMessage| { basalt2.on_disconnected(id, is_incoming);
let push_msg = basalt2.make_push_message(); });
async move { push_msg }
},
);
let basalt2 = basalt.clone(); let basalt2 = basalt.clone();
netapp.add_msg_handler::<PushMessage, _, _>( netapp.add_msg_handler::<PullMessage, _, _>(move |_from: NodeID, _pullmsg: PullMessage| {
move |_from: ed25519::PublicKey, push_msg: PushMessage| { let push_msg = basalt2.make_push_message();
basalt2.handle_peer_list(&push_msg.peers[..]); async move { push_msg }
async move { () } });
},
); let basalt2 = basalt.clone();
netapp.add_msg_handler::<PushMessage, _, _>(move |_from: NodeID, push_msg: PushMessage| {
basalt2.handle_peer_list(&push_msg.peers[..]);
async move { () }
});
basalt basalt
} }
pub fn sample(&self, count: usize) -> Vec<ed25519::PublicKey> { pub fn sample(&self, count: usize) -> Vec<NodeID> {
self.view self.view
.read() .read()
.unwrap() .unwrap()
@ -337,7 +331,7 @@ impl Basalt {
} }
} }
async fn do_pull(self: Arc<Self>, peer: ed25519::PublicKey) { async fn do_pull(self: Arc<Self>, peer: NodeID) {
match self match self
.netapp .netapp
.request(&peer, PullMessage {}, PRIO_NORMAL) .request(&peer, PullMessage {}, PRIO_NORMAL)
@ -353,7 +347,7 @@ impl Basalt {
}; };
} }
async fn do_push(self: Arc<Self>, peer: ed25519::PublicKey) { async fn do_push(self: Arc<Self>, peer: NodeID) {
let push_msg = self.make_push_message(); let push_msg = self.make_push_message();
match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await { match self.netapp.request(&peer, push_msg, PRIO_NORMAL).await {
Ok(_) => { Ok(_) => {
@ -434,12 +428,12 @@ impl Basalt {
} }
} }
fn on_connected(self: &Arc<Self>, pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool) { fn on_connected(self: &Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
if is_incoming { if is_incoming {
self.handle_peer_list(&[Peer { id: pk, addr }][..]); self.handle_peer_list(&[Peer { id, addr }][..]);
} else { } else {
info!("KYEV C {} {}", hex::encode(pk), addr); info!("KYEV C {} {}", hex::encode(id), addr);
let peer = Peer { id: pk, addr }; let peer = Peer { id, addr };
let mut backlog = self.backlog.write().unwrap(); let mut backlog = self.backlog.write().unwrap();
if backlog.get(&peer).is_none() { if backlog.get(&peer).is_none() {
@ -459,10 +453,10 @@ impl Basalt {
} }
} }
fn on_disconnected(&self, pk: ed25519::PublicKey, is_incoming: bool) { fn on_disconnected(&self, id: NodeID, is_incoming: bool) {
if !is_incoming { if !is_incoming {
info!("KYEV D {}", hex::encode(pk)); info!("KYEV D {}", hex::encode(id));
self.view.write().unwrap().disconnected(pk); self.view.write().unwrap().disconnected(id);
} }
} }

View file

@ -8,11 +8,11 @@ use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::hash; use sodiumoxide::crypto::hash;
use sodiumoxide::crypto::sign::ed25519;
use crate::message::*; use crate::message::*;
use crate::netapp::*; use crate::netapp::*;
use crate::proto::*; use crate::proto::*;
use crate::NodeID;
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10; const CONN_MAX_RETRIES: usize = 10;
@ -34,7 +34,7 @@ impl Message for PingMessage {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct PeerListMessage { struct PeerListMessage {
pub list: Vec<(ed25519::PublicKey, SocketAddr)>, pub list: Vec<(NodeID, SocketAddr)>,
} }
impl Message for PeerListMessage { impl Message for PeerListMessage {
@ -54,7 +54,7 @@ struct PeerInfo {
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct PeerInfoPub { pub struct PeerInfoPub {
pub id: ed25519::PublicKey, pub id: NodeID,
pub addr: SocketAddr, pub addr: SocketAddr,
pub state: PeerConnState, pub state: PeerConnState,
pub last_seen: Option<Instant>, pub last_seen: Option<Instant>,
@ -86,7 +86,7 @@ pub enum PeerConnState {
} }
struct KnownHosts { struct KnownHosts {
list: HashMap<ed25519::PublicKey, PeerInfo>, list: HashMap<NodeID, PeerInfo>,
hash: hash::Digest, hash: hash::Digest,
} }
@ -99,9 +99,7 @@ impl KnownHosts {
fn update_hash(&mut self) { fn update_hash(&mut self) {
self.hash = Self::calculate_hash(&self.list); self.hash = Self::calculate_hash(&self.list);
} }
fn map_into_vec( fn map_into_vec(input: &HashMap<NodeID, PeerInfo>) -> Vec<(NodeID, SocketAddr)> {
input: &HashMap<ed25519::PublicKey, PeerInfo>,
) -> Vec<(ed25519::PublicKey, SocketAddr)> {
let mut list = Vec::with_capacity(input.len()); let mut list = Vec::with_capacity(input.len());
for (id, peer) in input.iter() { for (id, peer) in input.iter() {
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself { if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
@ -110,7 +108,7 @@ impl KnownHosts {
} }
list list
} }
fn calculate_hash(input: &HashMap<ed25519::PublicKey, PeerInfo>) -> hash::Digest { fn calculate_hash(input: &HashMap<NodeID, PeerInfo>) -> hash::Digest {
let mut list = Self::map_into_vec(input); let mut list = Self::map_into_vec(input);
list.sort(); list.sort();
let mut hash_state = hash::State::new(); let mut hash_state = hash::State::new();
@ -129,15 +127,12 @@ pub struct FullMeshPeeringStrategy {
} }
impl FullMeshPeeringStrategy { impl FullMeshPeeringStrategy {
pub fn new( pub fn new(netapp: Arc<NetApp>, bootstrap_list: Vec<(NodeID, SocketAddr)>) -> Arc<Self> {
netapp: Arc<NetApp>,
bootstrap_list: Vec<(ed25519::PublicKey, SocketAddr)>,
) -> Arc<Self> {
let mut known_hosts = KnownHosts::new(); let mut known_hosts = KnownHosts::new();
for (pk, addr) in bootstrap_list { for (id, addr) in bootstrap_list {
if pk != netapp.pubkey { if id != netapp.id {
known_hosts.list.insert( known_hosts.list.insert(
pk, id,
PeerInfo { PeerInfo {
addr: addr, addr: addr,
state: PeerConnState::Waiting(0, Instant::now()), state: PeerConnState::Waiting(0, Instant::now()),
@ -155,20 +150,18 @@ impl FullMeshPeeringStrategy {
}); });
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.add_msg_handler::<PingMessage, _, _>( netapp.add_msg_handler::<PingMessage, _, _>(move |from: NodeID, ping: PingMessage| {
move |from: ed25519::PublicKey, ping: PingMessage| { let ping_resp = PingMessage {
let ping_resp = PingMessage { id: ping.id,
id: ping.id, peer_list_hash: strat2.known_hosts.read().unwrap().hash,
peer_list_hash: strat2.known_hosts.read().unwrap().hash, };
}; debug!("Ping from {}", hex::encode(&from));
debug!("Ping from {}", hex::encode(&from)); async move { ping_resp }
async move { ping_resp } });
},
);
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.add_msg_handler::<PeerListMessage, _, _>( netapp.add_msg_handler::<PeerListMessage, _, _>(
move |_from: ed25519::PublicKey, peer_list: PeerListMessage| { move |_from: NodeID, peer_list: PeerListMessage| {
strat2.handle_peer_list(&peer_list.list[..]); strat2.handle_peer_list(&peer_list.list[..]);
let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list); let peer_list = KnownHosts::map_into_vec(&strat2.known_hosts.read().unwrap().list);
let resp = PeerListMessage { list: peer_list }; let resp = PeerListMessage { list: peer_list };
@ -177,17 +170,15 @@ impl FullMeshPeeringStrategy {
); );
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.on_connected( netapp.on_connected(move |id: NodeID, addr: SocketAddr, is_incoming: bool| {
move |pk: ed25519::PublicKey, addr: SocketAddr, is_incoming: bool| { let strat2 = strat2.clone();
let strat2 = strat2.clone(); tokio::spawn(strat2.on_connected(id, addr, is_incoming));
tokio::spawn(strat2.on_connected(pk, addr, is_incoming)); });
},
);
let strat2 = strat.clone(); let strat2 = strat.clone();
netapp.on_disconnected(move |pk: ed25519::PublicKey, is_incoming: bool| { netapp.on_disconnected(move |id: NodeID, is_incoming: bool| {
let strat2 = strat2.clone(); let strat2 = strat2.clone();
tokio::spawn(strat2.on_disconnected(pk, is_incoming)); tokio::spawn(strat2.on_disconnected(id, is_incoming));
}); });
strat strat
@ -254,7 +245,7 @@ impl FullMeshPeeringStrategy {
} }
} }
async fn ping(self: Arc<Self>, id: ed25519::PublicKey) { async fn ping(self: Arc<Self>, id: NodeID) {
let peer_list_hash = self.known_hosts.read().unwrap().hash; let peer_list_hash = self.known_hosts.read().unwrap().hash;
let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed); let ping_id = self.next_ping_id.fetch_add(1u64, atomic::Ordering::Relaxed);
let ping_time = Instant::now(); let ping_time = Instant::now();
@ -295,7 +286,7 @@ impl FullMeshPeeringStrategy {
} }
} }
async fn exchange_peers(self: Arc<Self>, id: &ed25519::PublicKey) { async fn exchange_peers(self: Arc<Self>, id: &NodeID) {
let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list); let peer_list = KnownHosts::map_into_vec(&self.known_hosts.read().unwrap().list);
let pex_message = PeerListMessage { list: peer_list }; let pex_message = PeerListMessage { list: peer_list };
match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await { match self.netapp.request(id, pex_message, PRIO_BACKGROUND).await {
@ -306,7 +297,7 @@ impl FullMeshPeeringStrategy {
} }
} }
fn handle_peer_list(&self, list: &[(ed25519::PublicKey, SocketAddr)]) { fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
for (id, addr) in list.iter() { for (id, addr) in list.iter() {
if !known_hosts.list.contains_key(id) { if !known_hosts.list.contains_key(id) {
@ -315,7 +306,7 @@ impl FullMeshPeeringStrategy {
} }
} }
async fn try_connect(self: Arc<Self>, id: ed25519::PublicKey, addr: SocketAddr) { async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await; let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await;
if let Err(e) = conn_result { if let Err(e) = conn_result {
warn!("Error connecting to {}: {}", hex::encode(id), e); warn!("Error connecting to {}: {}", hex::encode(id), e);
@ -335,35 +326,30 @@ impl FullMeshPeeringStrategy {
} }
} }
async fn on_connected( async fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
self: Arc<Self>,
pk: ed25519::PublicKey,
addr: SocketAddr,
is_incoming: bool,
) {
if is_incoming { if is_incoming {
if !self.known_hosts.read().unwrap().list.contains_key(&pk) { if !self.known_hosts.read().unwrap().list.contains_key(&id) {
self.known_hosts self.known_hosts
.write() .write()
.unwrap() .unwrap()
.list .list
.insert(pk, self.new_peer(&pk, addr)); .insert(id, self.new_peer(&id, addr));
} }
} else { } else {
info!("Successfully connected to {} at {}", hex::encode(&pk), addr); info!("Successfully connected to {} at {}", hex::encode(&id), addr);
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&pk) { if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected; host.state = PeerConnState::Connected;
known_hosts.update_hash(); known_hosts.update_hash();
} }
} }
} }
async fn on_disconnected(self: Arc<Self>, pk: ed25519::PublicKey, is_incoming: bool) { async fn on_disconnected(self: Arc<Self>, id: NodeID, is_incoming: bool) {
if !is_incoming { if !is_incoming {
info!("Connection to {} was closed", hex::encode(pk)); info!("Connection to {} was closed", hex::encode(id));
let mut known_hosts = self.known_hosts.write().unwrap(); let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&pk) { if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Waiting(0, Instant::now()); host.state = PeerConnState::Waiting(0, Instant::now());
known_hosts.update_hash(); known_hosts.update_hash();
} }
@ -406,8 +392,8 @@ impl FullMeshPeeringStrategy {
ret ret
} }
fn new_peer(&self, id: &ed25519::PublicKey, addr: SocketAddr) -> PeerInfo { fn new_peer(&self, id: &NodeID, addr: SocketAddr) -> PeerInfo {
let state = if *id == self.netapp.pubkey { let state = if *id == self.netapp.id {
PeerConnState::Ourself PeerConnState::Ourself
} else { } else {
PeerConnState::Waiting(0, Instant::now()) PeerConnState::Waiting(0, Instant::now())

View file

@ -2,6 +2,8 @@ use serde::Serialize;
use tokio::sync::watch; use tokio::sync::watch;
pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey;
/// Utility function: encodes any serializable value in MessagePack binary format /// Utility function: encodes any serializable value in MessagePack binary format
/// using the RMP library. /// using the RMP library.
/// ///