Fix ping timeout and interval #4

Merged
lx merged 8 commits from fix-ping into main 2022-09-02 12:22:57 +00:00
6 changed files with 91 additions and 56 deletions

2
Cargo.lock generated
View file

@ -428,7 +428,7 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.4.4"
version = "0.4.5"
dependencies = [
"arc-swap",
"async-trait",

View file

@ -1,6 +1,6 @@
[package]
name = "netapp"
version = "0.4.4"
version = "0.4.5"
authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license-file = "LICENSE"

View file

@ -102,13 +102,16 @@ impl ClientConn {
netapp.connected_as_client(peer_id, conn.clone());
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
tokio::spawn(async move {
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write));
let debug_name_2 = debug_name.clone();
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
let conn2 = conn.clone();
let recv_future = tokio::spawn(async move {
select! {
r = conn2.recv_loop(read) => r,
r = conn2.recv_loop(read, debug_name) => r,
_ = await_exit(stop_recv_loop_recv) => Ok(())
}
});

View file

@ -22,10 +22,10 @@ use crate::NodeID;
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10;
const PING_INTERVAL: Duration = Duration::from_secs(10);
const PING_INTERVAL: Duration = Duration::from_secs(15);
const LOOP_DELAY: Duration = Duration::from_secs(1);
const PING_TIMEOUT: Duration = Duration::from_secs(5);
const FAILED_PING_THRESHOLD: usize = 3;
const PING_TIMEOUT: Duration = Duration::from_secs(10);
const FAILED_PING_THRESHOLD: usize = 4;
// -- Protocol messages --
@ -60,11 +60,26 @@ struct PeerInfoInternal {
all_addrs: Vec<SocketAddr>,
state: PeerConnState,
last_send_ping: Option<Instant>,
last_seen: Option<Instant>,
ping: VecDeque<Duration>,
failed_pings: usize,
}
impl PeerInfoInternal {
fn new(addr: SocketAddr, state: PeerConnState) -> Self {
Self {
addr,
all_addrs: vec![addr],
state,
last_send_ping: None,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
}
}
}
#[derive(Copy, Clone, Debug)]
pub struct PeerInfo {
/// The node's identifier (its public key)
@ -184,14 +199,7 @@ impl FullMeshPeeringStrategy {
if id != netapp.id {
known_hosts.list.insert(
id,
PeerInfoInternal {
addr,
all_addrs: vec![addr],
state: PeerConnState::Waiting(0, Instant::now()),
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
);
}
}
@ -199,14 +207,7 @@ impl FullMeshPeeringStrategy {
if let Some(addr) = our_addr {
known_hosts.list.insert(
netapp.id,
PeerInfoInternal {
addr,
all_addrs: vec![addr],
state: PeerConnState::Ourself,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
PeerInfoInternal::new(addr, PeerConnState::Ourself),
);
}
@ -254,7 +255,7 @@ impl FullMeshPeeringStrategy {
trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state {
PeerConnState::Connected => {
let must_ping = match info.last_seen {
let must_ping = match info.last_send_ping {
None => true,
Some(t) => Instant::now() - t > PING_INTERVAL,
};
@ -274,9 +275,16 @@ impl FullMeshPeeringStrategy {
};
// 2. Dispatch ping to hosts
trace!("to_ping: {} peers", to_retry.len());
for id in to_ping {
tokio::spawn(self.clone().ping(id));
trace!("to_ping: {} peers", to_ping.len());
if !to_ping.is_empty() {
let mut known_hosts = self.known_hosts.write().unwrap();
for id in to_ping.iter() {
known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now());
}
drop(known_hosts);
for id in to_ping {
tokio::spawn(self.clone().ping(id));
}
}
// 3. Try reconnects
@ -534,17 +542,9 @@ impl FullMeshPeeringStrategy {
host.all_addrs.push(addr);
}
} else {
known_hosts.list.insert(
id,
PeerInfoInternal {
state: PeerConnState::Connected,
addr,
all_addrs: vec![addr],
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
);
known_hosts
.list
.insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
}
}
known_hosts.update_hash();
@ -569,14 +569,7 @@ impl FullMeshPeeringStrategy {
} else {
PeerConnState::Waiting(0, Instant::now())
};
PeerInfoInternal {
addr,
all_addrs: vec![addr],
state,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
}
PeerInfoInternal::new(addr, state)
}
}

View file

@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque};
use std::fmt::Write;
use std::sync::Arc;
use log::trace;
@ -94,6 +95,22 @@ impl SendQueue {
fn is_empty(&self) -> bool {
self.items.iter().all(|(_k, v)| v.is_empty())
}
fn dump(&self) -> String {
let mut ret = String::new();
for (prio, q) in self.items.iter() {
for item in q.iter() {
write!(
&mut ret,
" [{} {} ({})]",
prio,
item.data.len() - item.cursor,
item.id
)
.unwrap();
}
}
ret
}
}
/// The SendLoop trait, which is implemented both by the client and the server
@ -110,6 +127,7 @@ pub(crate) trait SendLoop: Sync {
self: Arc<Self>,
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
mut write: BoxStreamWrite<W>,
debug_name: String,
) -> Result<(), Error>
where
W: AsyncWriteExt + Unpin + Send + Sync,
@ -117,8 +135,15 @@ pub(crate) trait SendLoop: Sync {
let mut sending = SendQueue::new();
let mut should_exit = false;
while !should_exit || !sending.is_empty() {
trace!("send_loop({}): queue = {}", debug_name, sending.dump());
if let Ok((id, prio, data)) = msg_recv.try_recv() {
trace!("send_loop: got {}, {} bytes", id, data.len());
trace!(
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
debug_name,
id,
prio,
data.len()
);
sending.push(SendQueueItem {
id,
prio,
@ -127,7 +152,8 @@ pub(crate) trait SendLoop: Sync {
});
} else if let Some(mut item) = sending.pop() {
trace!(
"send_loop: sending bytes for {} ({} bytes, {} already sent)",
"send_loop({}): sending bytes for {} ({} bytes, {} already sent)",
debug_name,
item.id,
item.data.len(),
item.cursor
@ -157,7 +183,13 @@ pub(crate) trait SendLoop: Sync {
} else {
let sth = msg_recv.recv().await;
if let Some((id, prio, data)) = sth {
trace!("send_loop: got {}, {} bytes", id, data.len());
trace!(
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
debug_name,
id,
prio,
data.len()
);
sending.push(SendQueueItem {
id,
prio,
@ -186,13 +218,12 @@ pub(crate) trait SendLoop: Sync {
pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
where
R: AsyncReadExt + Unpin + Send + Sync,
{
let mut receiving = HashMap::new();
loop {
trace!("recv_loop: reading packet");
let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await {
Ok(_) => (),
@ -200,19 +231,24 @@ pub(crate) trait RecvLoop: Sync + 'static {
Err(e) => return Err(e.into()),
};
let id = RequestID::from_be_bytes(header_id);
trace!("recv_loop: got header id: {:04x}", id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size);
trace!("recv_loop: got header size: {:04x}", size);
trace!(
"recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)",
debug_name,
id,
size,
size & !CHUNK_HAS_CONTINUATION
);
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?;
trace!("recv_loop: read {} bytes", next_slice.len());
trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len());
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
msg_bytes.extend_from_slice(&next_slice[..]);

View file

@ -105,14 +105,17 @@ impl ServerConn {
netapp.connected_as_server(peer_id, conn.clone());
let debug_name = format!("SRV {}", hex::encode(&peer_id[..8]));
let debug_name_2 = debug_name.clone();
let conn2 = conn.clone();
let recv_future = tokio::spawn(async move {
select! {
r = conn2.recv_loop(read) => r,
r = conn2.recv_loop(read, debug_name_2) => r,
_ = await_exit(must_exit) => Ok(())
}
});
let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write));
let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name));
recv_future.await.log_err("ServerConn recv_loop");
conn.resp_send.store(None);