forked from lx/netapp
Make it passive-client-friendly
This commit is contained in:
parent
54c7c50bb5
commit
5040198972
4 changed files with 50 additions and 30 deletions
|
@ -85,9 +85,7 @@ async fn main() {
|
||||||
info!("KYEV SK {}", hex::encode(&privkey));
|
info!("KYEV SK {}", hex::encode(&privkey));
|
||||||
info!("KYEV PK {}", hex::encode(&privkey.public_key()));
|
info!("KYEV PK {}", hex::encode(&privkey.public_key()));
|
||||||
|
|
||||||
let listen_addr = opt.listen_addr.parse().unwrap();
|
let netapp = NetApp::new(netid, privkey);
|
||||||
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
|
||||||
let netapp = NetApp::new(listen_addr, public_addr, netid, privkey);
|
|
||||||
|
|
||||||
let mut bootstrap_peers = vec![];
|
let mut bootstrap_peers = vec![];
|
||||||
for peer in opt.bootstrap_peers.iter() {
|
for peer in opt.bootstrap_peers.iter() {
|
||||||
|
@ -119,9 +117,11 @@ async fn main() {
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
|
|
||||||
|
let listen_addr = opt.listen_addr.parse().unwrap();
|
||||||
|
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
||||||
tokio::join!(
|
tokio::join!(
|
||||||
sampling_loop(netapp.clone(), peering.clone()),
|
sampling_loop(netapp.clone(), peering.clone()),
|
||||||
netapp.listen(),
|
netapp.listen(listen_addr, public_addr),
|
||||||
peering.run(),
|
peering.run(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
|
@ -65,9 +65,7 @@ async fn main() {
|
||||||
info!("Node private key: {}", hex::encode(&privkey));
|
info!("Node private key: {}", hex::encode(&privkey));
|
||||||
info!("Node public key: {}", hex::encode(&privkey.public_key()));
|
info!("Node public key: {}", hex::encode(&privkey.public_key()));
|
||||||
|
|
||||||
let listen_addr = opt.listen_addr.parse().unwrap();
|
let netapp = NetApp::new(netid, privkey);
|
||||||
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
|
||||||
let netapp = NetApp::new(listen_addr, public_addr, netid, privkey);
|
|
||||||
|
|
||||||
let mut bootstrap_peers = vec![];
|
let mut bootstrap_peers = vec![];
|
||||||
for peer in opt.bootstrap_peers.iter() {
|
for peer in opt.bootstrap_peers.iter() {
|
||||||
|
@ -81,5 +79,7 @@ async fn main() {
|
||||||
|
|
||||||
let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers);
|
let peering = FullMeshPeeringStrategy::new(netapp.clone(), bootstrap_peers);
|
||||||
|
|
||||||
tokio::join!(netapp.listen(), peering.run(),);
|
let listen_addr = opt.listen_addr.parse().unwrap();
|
||||||
|
let public_addr = opt.public_addr.map(|x| x.parse().unwrap());
|
||||||
|
tokio::join!(netapp.listen(listen_addr, public_addr), peering.run(),);
|
||||||
}
|
}
|
||||||
|
|
|
@ -47,8 +47,7 @@ pub(crate) struct Handler {
|
||||||
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
|
/// It is generally not necessary to use NetApp stand-alone, as the provided full mesh
|
||||||
/// and RPS peering strategies take care of the most common use cases.
|
/// and RPS peering strategies take care of the most common use cases.
|
||||||
pub struct NetApp {
|
pub struct NetApp {
|
||||||
pub listen_addr: SocketAddr,
|
listen_params: ArcSwapOption<ListenParams>,
|
||||||
pub public_addr: Option<IpAddr>,
|
|
||||||
|
|
||||||
pub netid: auth::Key,
|
pub netid: auth::Key,
|
||||||
pub pubkey: ed25519::PublicKey,
|
pub pubkey: ed25519::PublicKey,
|
||||||
|
@ -63,6 +62,11 @@ pub struct NetApp {
|
||||||
on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
|
on_disconnected_handler: ArcSwapOption<Box<dyn Fn(ed25519::PublicKey, bool) + Send + Sync>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ListenParams {
|
||||||
|
listen_addr: SocketAddr,
|
||||||
|
public_addr: Option<IpAddr>,
|
||||||
|
}
|
||||||
|
|
||||||
async fn net_handler_aux<M, F, R>(
|
async fn net_handler_aux<M, F, R>(
|
||||||
handler: Arc<F>,
|
handler: Arc<F>,
|
||||||
remote: ed25519::PublicKey,
|
remote: ed25519::PublicKey,
|
||||||
|
@ -110,17 +114,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NetApp {
|
impl NetApp {
|
||||||
/// Creates a new instance of NetApp. No background process is
|
/// Creates a new instance of NetApp, which can serve either as a full p2p node,
|
||||||
pub fn new(
|
/// or just as a passive client. To upgrade to a full p2p node, spawn a listener
|
||||||
listen_addr: SocketAddr,
|
/// using `.listen()`
|
||||||
public_addr: Option<IpAddr>,
|
pub fn new(netid: auth::Key, privkey: ed25519::SecretKey) -> Arc<Self> {
|
||||||
netid: auth::Key,
|
|
||||||
privkey: ed25519::SecretKey,
|
|
||||||
) -> Arc<Self> {
|
|
||||||
let pubkey = privkey.public_key();
|
let pubkey = privkey.public_key();
|
||||||
let netapp = Arc::new(Self {
|
let netapp = Arc::new(Self {
|
||||||
listen_addr,
|
listen_params: ArcSwapOption::new(None),
|
||||||
public_addr,
|
|
||||||
netid,
|
netid,
|
||||||
pubkey,
|
pubkey,
|
||||||
privkey,
|
privkey,
|
||||||
|
@ -200,9 +200,16 @@ impl NetApp {
|
||||||
|
|
||||||
/// Main listening process for our app. This future runs during the whole
|
/// Main listening process for our app. This future runs during the whole
|
||||||
/// run time of our application.
|
/// run time of our application.
|
||||||
pub async fn listen(self: Arc<Self>) {
|
/// If this is not called, the NetApp instance remains a passive client.
|
||||||
let mut listener = TcpListener::bind(self.listen_addr).await.unwrap();
|
pub async fn listen(self: Arc<Self>, listen_addr: SocketAddr, public_addr: Option<IpAddr>) {
|
||||||
info!("Listening on {}", self.listen_addr);
|
let listen_params = ListenParams {
|
||||||
|
listen_addr,
|
||||||
|
public_addr,
|
||||||
|
};
|
||||||
|
self.listen_params.store(Some(Arc::new(listen_params)));
|
||||||
|
|
||||||
|
let mut listener = TcpListener::bind(listen_addr).await.unwrap();
|
||||||
|
info!("Listening on {}", listen_addr);
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
// The second item contains the IP and port of the new connection.
|
// The second item contains the IP and port of the new connection.
|
||||||
|
@ -315,8 +322,7 @@ impl NetApp {
|
||||||
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
|
fn handle_hello_message(&self, id: ed25519::PublicKey, msg: HelloMessage) {
|
||||||
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
||||||
let remote_ip = msg.server_addr
|
let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip());
|
||||||
.unwrap_or(c.remote_addr.ip());
|
|
||||||
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
|
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
|
||||||
h(id, remote_addr, true);
|
h(id, remote_addr, true);
|
||||||
}
|
}
|
||||||
|
@ -363,14 +369,22 @@ impl NetApp {
|
||||||
h(conn.peer_pk, conn.remote_addr, false);
|
h(conn.peer_pk, conn.remote_addr, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
let server_addr = self.public_addr;
|
if let Some(lp) = self.listen_params.load_full() {
|
||||||
let server_port = self.listen_addr.port();
|
let server_addr = lp.public_addr;
|
||||||
|
let server_port = lp.listen_addr.port();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
conn.request(HelloMessage { server_addr, server_port }, PRIO_NORMAL)
|
conn.request(
|
||||||
|
HelloMessage {
|
||||||
|
server_addr,
|
||||||
|
server_port,
|
||||||
|
},
|
||||||
|
PRIO_NORMAL,
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
.log_err("Sending hello message");
|
.log_err("Sending hello message");
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Called from conn.rs when an outgoinc connection is closed.
|
// Called from conn.rs when an outgoinc connection is closed.
|
||||||
// The connection is removed from conn_list, and the on_disconnected handler
|
// The connection is removed from conn_list, and the on_disconnected handler
|
||||||
|
|
|
@ -150,7 +150,13 @@ impl BasaltView {
|
||||||
for peer in peers.iter() {
|
for peer in peers.iter() {
|
||||||
let peer_cost = peer.cost(&self.slots[i].seed);
|
let peer_cost = peer.cost(&self.slots[i].seed);
|
||||||
if self.slots[i].peer.is_none() || peer_cost < slot_cost {
|
if self.slots[i].peer.is_none() || peer_cost < slot_cost {
|
||||||
trace!("Best match for slot {}: {}@{} (cost {})", i, hex::encode(peer.id), peer.addr, hex::encode(peer_cost));
|
trace!(
|
||||||
|
"Best match for slot {}: {}@{} (cost {})",
|
||||||
|
i,
|
||||||
|
hex::encode(peer.id),
|
||||||
|
peer.addr,
|
||||||
|
hex::encode(peer_cost)
|
||||||
|
);
|
||||||
self.slots[i].peer = Some(*peer);
|
self.slots[i].peer = Some(*peer);
|
||||||
slot_cost = peer_cost;
|
slot_cost = peer_cost;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue