forked from lx/netapp
try fix
This commit is contained in:
parent
e9add586a5
commit
94c01a3565
2 changed files with 20 additions and 7 deletions
|
@ -24,7 +24,7 @@ use crate::proto::*;
|
||||||
use crate::server::*;
|
use crate::server::*;
|
||||||
use crate::util::*;
|
use crate::util::*;
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize, Debug)]
|
||||||
pub(crate) struct HelloMessage {
|
pub(crate) struct HelloMessage {
|
||||||
pub server_addr: Option<IpAddr>,
|
pub server_addr: Option<IpAddr>,
|
||||||
pub server_port: u16,
|
pub server_port: u16,
|
||||||
|
@ -243,13 +243,16 @@ impl NetApp {
|
||||||
.log_err("Failed to await for connection collector");
|
.log_err("Failed to await for connection collector");
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Drop all endpoint handlers
|
/// Drop all endpoint handlers, as well as handlers for connection/disconnection
|
||||||
|
/// events. (This disables the peering strategy)
|
||||||
///
|
///
|
||||||
/// Use this when terminating to break reference cycles
|
/// Use this when terminating to break reference cycles
|
||||||
pub fn drop_all_handlers(&self) {
|
pub fn drop_all_handlers(&self) {
|
||||||
for (_, endpoint) in self.endpoints.read().unwrap().iter() {
|
for (_, endpoint) in self.endpoints.read().unwrap().iter() {
|
||||||
endpoint.drop_handler();
|
endpoint.drop_handler();
|
||||||
}
|
}
|
||||||
|
self.on_connected_handler.store(None);
|
||||||
|
self.on_disconnected_handler.store(None);
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
/// Attempt to connect to a peer, given by its ip:port and its public key.
|
||||||
|
@ -411,6 +414,7 @@ impl NetApp {
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler<HelloMessage> for NetApp {
|
impl EndpointHandler<HelloMessage> for NetApp {
|
||||||
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
|
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
|
||||||
|
debug!("Hello from {:?}: {:?}", hex::encode(from), msg);
|
||||||
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
if let Some(c) = self.server_conns.read().unwrap().get(&from) {
|
if let Some(c) = self.server_conns.read().unwrap().get(&from) {
|
||||||
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
||||||
|
|
|
@ -391,13 +391,20 @@ impl FullMeshPeeringStrategy {
|
||||||
|
|
||||||
fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
|
fn handle_peer_list(&self, list: &[(NodeID, SocketAddr)]) {
|
||||||
let mut known_hosts = self.known_hosts.write().unwrap();
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
|
|
||||||
|
let mut changed = false;
|
||||||
for (id, addr) in list.iter() {
|
for (id, addr) in list.iter() {
|
||||||
if !known_hosts.list.contains_key(id) {
|
if !known_hosts.list.contains_key(id) {
|
||||||
known_hosts.list.insert(*id, self.new_peer(id, *addr));
|
known_hosts.list.insert(*id, self.new_peer(id, *addr));
|
||||||
|
changed = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if changed {
|
||||||
|
known_hosts.update_hash();
|
||||||
self.update_public_peer_list(&known_hosts);
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
|
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
|
||||||
let conn_result = self.netapp.clone().try_connect(addr, id).await;
|
let conn_result = self.netapp.clone().try_connect(addr, id).await;
|
||||||
|
@ -422,18 +429,20 @@ impl FullMeshPeeringStrategy {
|
||||||
|
|
||||||
async fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
|
async fn on_connected(self: Arc<Self>, id: NodeID, addr: SocketAddr, is_incoming: bool) {
|
||||||
if is_incoming {
|
if is_incoming {
|
||||||
if !self.known_hosts.read().unwrap().list.contains_key(&id) {
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
self.known_hosts
|
if !known_hosts.list.contains_key(&id) {
|
||||||
.write()
|
known_hosts
|
||||||
.unwrap()
|
|
||||||
.list
|
.list
|
||||||
.insert(id, self.new_peer(&id, addr));
|
.insert(id, self.new_peer(&id, addr));
|
||||||
|
known_hosts.update_hash();
|
||||||
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
info!("Successfully connected to {} at {}", hex::encode(&id), addr);
|
info!("Successfully connected to {} at {}", hex::encode(&id), addr);
|
||||||
let mut known_hosts = self.known_hosts.write().unwrap();
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
if let Some(host) = known_hosts.list.get_mut(&id) {
|
if let Some(host) = known_hosts.list.get_mut(&id) {
|
||||||
host.state = PeerConnState::Connected;
|
host.state = PeerConnState::Connected;
|
||||||
|
host.addr = addr;
|
||||||
known_hosts.update_hash();
|
known_hosts.update_hash();
|
||||||
self.update_public_peer_list(&known_hosts);
|
self.update_public_peer_list(&known_hosts);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue