Fix ping timeout and interval #4

Merged
lx merged 8 commits from fix-ping into main 2022-09-02 12:22:57 +00:00
Showing only changes of commit 81b2ff3a4e - Show all commits

View file

@ -60,11 +60,26 @@ struct PeerInfoInternal {
all_addrs: Vec<SocketAddr>, all_addrs: Vec<SocketAddr>,
state: PeerConnState, state: PeerConnState,
last_send_ping: Option<Instant>,
last_seen: Option<Instant>, last_seen: Option<Instant>,
ping: VecDeque<Duration>, ping: VecDeque<Duration>,
failed_pings: usize, failed_pings: usize,
} }
impl PeerInfoInternal {
fn new(addr: SocketAddr, state: PeerConnState) -> Self {
Self {
addr,
all_addrs: vec![addr],
state,
last_send_ping: None,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
}
}
}
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct PeerInfo { pub struct PeerInfo {
/// The node's identifier (its public key) /// The node's identifier (its public key)
@ -184,14 +199,7 @@ impl FullMeshPeeringStrategy {
if id != netapp.id { if id != netapp.id {
known_hosts.list.insert( known_hosts.list.insert(
id, id,
PeerInfoInternal { PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
addr,
all_addrs: vec![addr],
state: PeerConnState::Waiting(0, Instant::now()),
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
); );
} }
} }
@ -199,14 +207,7 @@ impl FullMeshPeeringStrategy {
if let Some(addr) = our_addr { if let Some(addr) = our_addr {
known_hosts.list.insert( known_hosts.list.insert(
netapp.id, netapp.id,
PeerInfoInternal { PeerInfoInternal::new(addr, PeerConnState::Ourself),
addr,
all_addrs: vec![addr],
state: PeerConnState::Ourself,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
); );
} }
@ -258,7 +259,11 @@ impl FullMeshPeeringStrategy {
None => true, None => true,
Some(t) => Instant::now() - t > PING_INTERVAL, Some(t) => Instant::now() - t > PING_INTERVAL,
}; };
if must_ping { let pinged_recently = match info.last_send_ping {
None => false,
Some(t) => Instant::now() - t < PING_TIMEOUT,
};
if must_ping && !pinged_recently {
to_ping.push(*id); to_ping.push(*id);
} }
} }
@ -274,10 +279,17 @@ impl FullMeshPeeringStrategy {
}; };
// 2. Dispatch ping to hosts // 2. Dispatch ping to hosts
trace!("to_ping: {} peers", to_retry.len()); trace!("to_ping: {} peers", to_ping.len());
if !to_ping.is_empty() {
let mut known_hosts = self.known_hosts.write().unwrap();
for id in to_ping.iter() {
known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now());
}
drop(known_hosts);
for id in to_ping { for id in to_ping {
tokio::spawn(self.clone().ping(id)); tokio::spawn(self.clone().ping(id));
} }
}
// 3. Try reconnects // 3. Try reconnects
trace!("to_retry: {} peers", to_retry.len()); trace!("to_retry: {} peers", to_retry.len());
@ -534,17 +546,9 @@ impl FullMeshPeeringStrategy {
host.all_addrs.push(addr); host.all_addrs.push(addr);
} }
} else { } else {
known_hosts.list.insert( known_hosts
id, .list
PeerInfoInternal { .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
state: PeerConnState::Connected,
addr,
all_addrs: vec![addr],
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
);
} }
} }
known_hosts.update_hash(); known_hosts.update_hash();
@ -569,14 +573,7 @@ impl FullMeshPeeringStrategy {
} else { } else {
PeerConnState::Waiting(0, Instant::now()) PeerConnState::Waiting(0, Instant::now())
}; };
PeerInfoInternal { PeerInfoInternal::new(addr, state)
addr,
all_addrs: vec![addr],
state,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
}
} }
} }