diff --git a/Cargo.lock b/Cargo.lock index 20c3393..819e614 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -433,7 +433,7 @@ dependencies = [ [[package]] name = "netapp" -version = "0.4.1" +version = "0.4.3" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index 5705342..87e641e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "netapp" -version = "0.4.2" +version = "0.4.3" authors = ["Alex Auvolat "] edition = "2018" license-file = "LICENSE" diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 52d7fda..bb0c9dc 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -52,7 +52,13 @@ impl Message for PeerListMessage { #[derive(Debug)] struct PeerInfoInternal { + // addr is the currently connected address, + // or the last address we were connected to, + // or an arbitrary address some other peer gave us addr: SocketAddr, + // all_addrs contains all of the addresses everyone gave us + all_addrs: Vec, + state: PeerConnState, last_seen: Option, ping: VecDeque, @@ -180,6 +186,7 @@ impl FullMeshPeeringStrategy { id, PeerInfoInternal { addr, + all_addrs: vec![addr], state: PeerConnState::Waiting(0, Instant::now()), last_seen: None, ping: VecDeque::new(), @@ -194,6 +201,7 @@ impl FullMeshPeeringStrategy { netapp.id, PeerInfoInternal { addr, + all_addrs: vec![addr], state: PeerConnState::Ourself, last_seen: None, ping: VecDeque::new(), @@ -281,11 +289,22 @@ impl FullMeshPeeringStrategy { info!( "Retrying connection to {} at {} ({})", hex::encode(&id[..8]), - h.addr, + h.all_addrs + .iter() + .map(|x| format!("{}", x)) + .collect::>() + .join(", "), i + 1 ); h.state = PeerConnState::Trying(i); - tokio::spawn(self.clone().try_connect(id, h.addr)); + + let alternate_addrs = h + .all_addrs + .iter() + .filter(|x| **x != h.addr) + .cloned() + .collect::>(); + tokio::spawn(self.clone().try_connect(id, h.addr, alternate_addrs)); } } } @@ -422,7 +441,12 @@ impl FullMeshPeeringStrategy { let mut changed = false; for (id, addr) in list.iter() { - if !known_hosts.list.contains_key(id) { + if let Some(kh) = known_hosts.list.get_mut(id) { + if !kh.all_addrs.contains(addr) { + kh.all_addrs.push(*addr); + changed = true; + } + } else { known_hosts.list.insert(*id, self.new_peer(id, *addr)); changed = true; } @@ -434,10 +458,42 @@ impl FullMeshPeeringStrategy { } } - async fn try_connect(self: Arc, id: NodeID, addr: SocketAddr) { - let conn_result = self.netapp.clone().try_connect(addr, id).await; - if let Err(e) = conn_result { - warn!("Error connecting to {}: {}", hex::encode(&id[..8]), e); + async fn try_connect( + self: Arc, + id: NodeID, + default_addr: SocketAddr, + alternate_addrs: Vec, + ) { + let conn_addr = { + let mut ret = None; + for addr in [default_addr].iter().chain(alternate_addrs.iter()) { + debug!("Trying address {} for peer {}", addr, hex::encode(&id[..8])); + match self.netapp.clone().try_connect(*addr, id).await { + Ok(()) => { + ret = Some(*addr); + break; + } + Err(e) => { + debug!( + "Error connecting to {} at {}: {}", + hex::encode(&id[..8]), + addr, + e + ); + } + } + } + ret + }; + + if let Some(ok_addr) = conn_addr { + self.on_connected(id, ok_addr, false); + } else { + warn!( + "Could not connect to peer {} ({} addresses tried)", + hex::encode(&id[..8]), + 1 + alternate_addrs.len() + ); let mut known_hosts = self.known_hosts.write().unwrap(); if let Some(host) = known_hosts.list.get_mut(&id) { host.state = match host.state { @@ -452,8 +508,6 @@ impl FullMeshPeeringStrategy { }; self.update_public_peer_list(&known_hosts); } - } else { - self.on_connected(id, addr, false); } } @@ -475,12 +529,16 @@ impl FullMeshPeeringStrategy { if let Some(host) = known_hosts.list.get_mut(&id) { host.state = PeerConnState::Connected; host.addr = addr; + if !host.all_addrs.contains(&addr) { + host.all_addrs.push(addr); + } } else { known_hosts.list.insert( id, PeerInfoInternal { state: PeerConnState::Connected, addr, + all_addrs: vec![addr], last_seen: None, ping: VecDeque::new(), failed_pings: 0, @@ -512,6 +570,7 @@ impl FullMeshPeeringStrategy { }; PeerInfoInternal { addr, + all_addrs: vec![addr], state, last_seen: None, ping: VecDeque::new(),