Handle the possibility of several alternative IP addresses for peers

This commit is contained in:
Alex 2022-05-09 11:54:34 +02:00
parent faecefc7a8
commit 677c471548
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
3 changed files with 70 additions and 11 deletions

2
Cargo.lock generated
View file

@ -433,7 +433,7 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.4.1"
version = "0.4.3"
dependencies = [
"arc-swap",
"async-trait",

View file

@ -1,6 +1,6 @@
[package]
name = "netapp"
version = "0.4.2"
version = "0.4.3"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "LICENSE"

View file

@ -52,7 +52,13 @@ impl Message for PeerListMessage {
#[derive(Debug)]
struct PeerInfoInternal {
// addr is the currently connected address,
// or the last address we were connected to,
// or an arbitrary address some other peer gave us
addr: SocketAddr,
// all_addrs contains all of the addresses everyone gave us
all_addrs: Vec<SocketAddr>,
state: PeerConnState,
last_seen: Option<Instant>,
ping: VecDeque<Duration>,
@ -180,6 +186,7 @@ impl FullMeshPeeringStrategy {
id,
PeerInfoInternal {
addr,
all_addrs: vec![addr],
state: PeerConnState::Waiting(0, Instant::now()),
last_seen: None,
ping: VecDeque::new(),
@ -194,6 +201,7 @@ impl FullMeshPeeringStrategy {
netapp.id,
PeerInfoInternal {
addr,
all_addrs: vec![addr],
state: PeerConnState::Ourself,
last_seen: None,
ping: VecDeque::new(),
@ -281,11 +289,22 @@ impl FullMeshPeeringStrategy {
info!(
"Retrying connection to {} at {} ({})",
hex::encode(&id[..8]),
h.addr,
h.all_addrs
.iter()
.map(|x| format!("{}", x))
.collect::<Vec<_>>()
.join(", "),
i + 1
);
h.state = PeerConnState::Trying(i);
tokio::spawn(self.clone().try_connect(id, h.addr));
let alternate_addrs = h
.all_addrs
.iter()
.filter(|x| **x != h.addr)
.cloned()
.collect::<Vec<_>>();
tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs));
}
}
}
@ -422,7 +441,12 @@ impl FullMeshPeeringStrategy {
let mut changed = false;
for (id, addr) in list.iter() {
if !known_hosts.list.contains_key(id) {
if let Some(kh) = known_hosts.list.get_mut(id) {
if !kh.all_addrs.contains(addr) {
kh.all_addrs.push(*addr);
changed = true;
}
} else {
known_hosts.list.insert(*id, self.new_peer(id, *addr));
changed = true;
}
@ -434,10 +458,42 @@ impl FullMeshPeeringStrategy {
}
}
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
let conn_result = self.netapp.clone().try_connect(addr, id).await;
if let Err(e) = conn_result {
warn!("Error connecting to {}: {}", hex::encode(&id[..8]), e);
async fn try_connect(
self: Arc<Self>,
id: NodeID,
default_addr: SocketAddr,
alternate_addrs: Vec<SocketAddr>,
) {
let conn_addr = {
let mut ret = None;
for addr in [default_addr].iter().chain(alternate_addrs.iter()) {
debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8]));
match self.netapp.clone().try_connect(*addr, id).await {
Ok(()) => {
ret = Some(*addr);
break;
}
Err(e) => {
debug!(
"Error connecting to {} at {}: {}",
hex::encode(&id[..8]),
addr,
e
);
}
}
}
ret
};
if let Some(ok_addr) = conn_addr {
self.on_connected(id, ok_addr, false);
} else {
warn!(
"Could not connect to peer {} ({} addresses tried)",
hex::encode(&id[..8]),
1 + alternate_addrs.len()
);
let mut known_hosts = self.known_hosts.write().unwrap();
if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = match host.state {
@ -452,8 +508,6 @@ impl FullMeshPeeringStrategy {
};
self.update_public_peer_list(&known_hosts);
}
} else {
self.on_connected(id, addr, false);
}
}
@ -475,12 +529,16 @@ impl FullMeshPeeringStrategy {
if let Some(host) = known_hosts.list.get_mut(&id) {
host.state = PeerConnState::Connected;
host.addr = addr;
if !host.all_addrs.contains(&addr) {
host.all_addrs.push(addr);
}
} else {
known_hosts.list.insert(
id,
PeerInfoInternal {
state: PeerConnState::Connected,
addr,
all_addrs: vec![addr],
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
@ -512,6 +570,7 @@ impl FullMeshPeeringStrategy {
};
PeerInfoInternal {
addr,
all_addrs: vec![addr],
state,
last_seen: None,
ping: VecDeque::new(),