Fix ping timeout and interval #4

Merged
lx merged 8 commits from fix-ping into main 2022-09-02 12:22:57 +00:00
6 changed files with 91 additions and 56 deletions

2
Cargo.lock generated
View file

@ -428,7 +428,7 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.4.4" version = "0.4.5"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",

View file

@ -1,6 +1,6 @@
[package] [package]
name = "netapp" name = "netapp"
version = "0.4.4" version = "0.4.5"
authors = ["Alex Auvolat <alex@adnab.me>"] authors = ["Alex Auvolat <alex@adnab.me>"]
edition = "2018" edition = "2018"
license-file = "LICENSE" license-file = "LICENSE"

View file

@ -102,13 +102,16 @@ impl ClientConn {
netapp.connected_as_client(peer_id, conn.clone()); netapp.connected_as_client(peer_id, conn.clone());
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
tokio::spawn(async move { tokio::spawn(async move {
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write)); let debug_name_2 = debug_name.clone();
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
let conn2 = conn.clone(); let conn2 = conn.clone();
let recv_future = tokio::spawn(async move { let recv_future = tokio::spawn(async move {
select! { select! {
r = conn2.recv_loop(read) => r, r = conn2.recv_loop(read, debug_name) => r,
_ = await_exit(stop_recv_loop_recv) => Ok(()) _ = await_exit(stop_recv_loop_recv) => Ok(())
} }
}); });

View file

@ -22,10 +22,10 @@ use crate::NodeID;
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
const CONN_MAX_RETRIES: usize = 10; const CONN_MAX_RETRIES: usize = 10;
const PING_INTERVAL: Duration = Duration::from_secs(10); const PING_INTERVAL: Duration = Duration::from_secs(15);
const LOOP_DELAY: Duration = Duration::from_secs(1); const LOOP_DELAY: Duration = Duration::from_secs(1);
const PING_TIMEOUT: Duration = Duration::from_secs(5); const PING_TIMEOUT: Duration = Duration::from_secs(10);
const FAILED_PING_THRESHOLD: usize = 3; const FAILED_PING_THRESHOLD: usize = 4;
// -- Protocol messages -- // -- Protocol messages --
@ -60,11 +60,26 @@ struct PeerInfoInternal {
all_addrs: Vec<SocketAddr>, all_addrs: Vec<SocketAddr>,
state: PeerConnState, state: PeerConnState,
last_send_ping: Option<Instant>,
last_seen: Option<Instant>, last_seen: Option<Instant>,
ping: VecDeque<Duration>, ping: VecDeque<Duration>,
failed_pings: usize, failed_pings: usize,
} }
impl PeerInfoInternal {
fn new(addr: SocketAddr, state: PeerConnState) -> Self {
Self {
addr,
all_addrs: vec![addr],
state,
last_send_ping: None,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
}
}
}
#[derive(Copy, Clone, Debug)] #[derive(Copy, Clone, Debug)]
pub struct PeerInfo { pub struct PeerInfo {
/// The node's identifier (its public key) /// The node's identifier (its public key)
@ -184,14 +199,7 @@ impl FullMeshPeeringStrategy {
if id != netapp.id { if id != netapp.id {
known_hosts.list.insert( known_hosts.list.insert(
id, id,
PeerInfoInternal { PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
addr,
all_addrs: vec![addr],
state: PeerConnState::Waiting(0, Instant::now()),
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
); );
} }
} }
@ -199,14 +207,7 @@ impl FullMeshPeeringStrategy {
if let Some(addr) = our_addr { if let Some(addr) = our_addr {
known_hosts.list.insert( known_hosts.list.insert(
netapp.id, netapp.id,
PeerInfoInternal { PeerInfoInternal::new(addr, PeerConnState::Ourself),
addr,
all_addrs: vec![addr],
state: PeerConnState::Ourself,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
); );
} }
@ -254,7 +255,7 @@ impl FullMeshPeeringStrategy {
trace!("{}, {:?}", hex::encode(&id[..8]), info); trace!("{}, {:?}", hex::encode(&id[..8]), info);
match info.state { match info.state {
PeerConnState::Connected => { PeerConnState::Connected => {
let must_ping = match info.last_seen { let must_ping = match info.last_send_ping {
None => true, None => true,
Some(t) => Instant::now() - t > PING_INTERVAL, Some(t) => Instant::now() - t > PING_INTERVAL,
}; };
@ -274,10 +275,17 @@ impl FullMeshPeeringStrategy {
}; };
// 2. Dispatch ping to hosts // 2. Dispatch ping to hosts
trace!("to_ping: {} peers", to_retry.len()); trace!("to_ping: {} peers", to_ping.len());
if !to_ping.is_empty() {
let mut known_hosts = self.known_hosts.write().unwrap();
for id in to_ping.iter() {
known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now());
}
drop(known_hosts);
for id in to_ping { for id in to_ping {
tokio::spawn(self.clone().ping(id)); tokio::spawn(self.clone().ping(id));
} }
}
// 3. Try reconnects // 3. Try reconnects
trace!("to_retry: {} peers", to_retry.len()); trace!("to_retry: {} peers", to_retry.len());
@ -534,17 +542,9 @@ impl FullMeshPeeringStrategy {
host.all_addrs.push(addr); host.all_addrs.push(addr);
} }
} else { } else {
known_hosts.list.insert( known_hosts
id, .list
PeerInfoInternal { .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
state: PeerConnState::Connected,
addr,
all_addrs: vec![addr],
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
},
);
} }
} }
known_hosts.update_hash(); known_hosts.update_hash();
@ -569,14 +569,7 @@ impl FullMeshPeeringStrategy {
} else { } else {
PeerConnState::Waiting(0, Instant::now()) PeerConnState::Waiting(0, Instant::now())
}; };
PeerInfoInternal { PeerInfoInternal::new(addr, state)
addr,
all_addrs: vec![addr],
state,
last_seen: None,
ping: VecDeque::new(),
failed_pings: 0,
}
} }
} }

View file

@ -1,4 +1,5 @@
use std::collections::{HashMap, VecDeque}; use std::collections::{HashMap, VecDeque};
use std::fmt::Write;
use std::sync::Arc; use std::sync::Arc;
use log::trace; use log::trace;
@ -94,6 +95,22 @@ impl SendQueue {
fn is_empty(&self) -> bool { fn is_empty(&self) -> bool {
self.items.iter().all(|(_k, v)| v.is_empty()) self.items.iter().all(|(_k, v)| v.is_empty())
} }
fn dump(&self) -> String {
let mut ret = String::new();
for (prio, q) in self.items.iter() {
for item in q.iter() {
write!(
&mut ret,
" [{} {} ({})]",
prio,
item.data.len() - item.cursor,
item.id
)
.unwrap();
}
}
ret
}
} }
/// The SendLoop trait, which is implemented both by the client and the server /// The SendLoop trait, which is implemented both by the client and the server
@ -110,6 +127,7 @@ pub(crate) trait SendLoop: Sync {
self: Arc<Self>, self: Arc<Self>,
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>, mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
mut write: BoxStreamWrite<W>, mut write: BoxStreamWrite<W>,
debug_name: String,
) -> Result<(), Error> ) -> Result<(), Error>
where where
W: AsyncWriteExt + Unpin + Send + Sync, W: AsyncWriteExt + Unpin + Send + Sync,
@ -117,8 +135,15 @@ pub(crate) trait SendLoop: Sync {
let mut sending = SendQueue::new(); let mut sending = SendQueue::new();
let mut should_exit = false; let mut should_exit = false;
while !should_exit || !sending.is_empty() { while !should_exit || !sending.is_empty() {
trace!("send_loop({}): queue = {}", debug_name, sending.dump());
if let Ok((id, prio, data)) = msg_recv.try_recv() { if let Ok((id, prio, data)) = msg_recv.try_recv() {
trace!("send_loop: got {}, {} bytes", id, data.len()); trace!(
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
debug_name,
id,
prio,
data.len()
);
sending.push(SendQueueItem { sending.push(SendQueueItem {
id, id,
prio, prio,
@ -127,7 +152,8 @@ pub(crate) trait SendLoop: Sync {
}); });
} else if let Some(mut item) = sending.pop() { } else if let Some(mut item) = sending.pop() {
trace!( trace!(
"send_loop: sending bytes for {} ({} bytes, {} already sent)", "send_loop({}): sending bytes for {} ({} bytes, {} already sent)",
debug_name,
item.id, item.id,
item.data.len(), item.data.len(),
item.cursor item.cursor
@ -157,7 +183,13 @@ pub(crate) trait SendLoop: Sync {
} else { } else {
let sth = msg_recv.recv().await; let sth = msg_recv.recv().await;
if let Some((id, prio, data)) = sth { if let Some((id, prio, data)) = sth {
trace!("send_loop: got {}, {} bytes", id, data.len()); trace!(
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
debug_name,
id,
prio,
data.len()
);
sending.push(SendQueueItem { sending.push(SendQueueItem {
id, id,
prio, prio,
@ -186,13 +218,12 @@ pub(crate) trait SendLoop: Sync {
pub(crate) trait RecvLoop: Sync + 'static { pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>); fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
where where
R: AsyncReadExt + Unpin + Send + Sync, R: AsyncReadExt + Unpin + Send + Sync,
{ {
let mut receiving = HashMap::new(); let mut receiving = HashMap::new();
loop { loop {
trace!("recv_loop: reading packet");
let mut header_id = [0u8; RequestID::BITS as usize / 8]; let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await { match read.read_exact(&mut header_id[..]).await {
Ok(_) => (), Ok(_) => (),
@ -200,19 +231,24 @@ pub(crate) trait RecvLoop: Sync + 'static {
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
let id = RequestID::from_be_bytes(header_id); let id = RequestID::from_be_bytes(header_id);
trace!("recv_loop: got header id: {:04x}", id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?; read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size); let size = ChunkLength::from_be_bytes(header_size);
trace!("recv_loop: got header size: {:04x}", size); trace!(
"recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)",
debug_name,
id,
size,
size & !CHUNK_HAS_CONTINUATION
);
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let size = size & !CHUNK_HAS_CONTINUATION; let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize]; let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?; read.read_exact(&mut next_slice[..]).await?;
trace!("recv_loop: read {} bytes", next_slice.len()); trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len());
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default(); let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
msg_bytes.extend_from_slice(&next_slice[..]); msg_bytes.extend_from_slice(&next_slice[..]);

View file

@ -105,14 +105,17 @@ impl ServerConn {
netapp.connected_as_server(peer_id, conn.clone()); netapp.connected_as_server(peer_id, conn.clone());
let debug_name = format!("SRV {}", hex::encode(&peer_id[..8]));
let debug_name_2 = debug_name.clone();
let conn2 = conn.clone(); let conn2 = conn.clone();
let recv_future = tokio::spawn(async move { let recv_future = tokio::spawn(async move {
select! { select! {
r = conn2.recv_loop(read) => r, r = conn2.recv_loop(read, debug_name_2) => r,
_ = await_exit(must_exit) => Ok(()) _ = await_exit(must_exit) => Ok(())
} }
}); });
let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write)); let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name));
recv_future.await.log_err("ServerConn recv_loop"); recv_future.await.log_err("ServerConn recv_loop");
conn.resp_send.store(None); conn.resp_send.store(None);