Fix ping timeout and interval #4
6 changed files with 91 additions and 56 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -428,7 +428,7 @@ dependencies = [
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.4.4"
|
version = "0.4.5"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
[package]
|
[package]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.4.4"
|
version = "0.4.5"
|
||||||
authors = ["Alex Auvolat <alex@adnab.me>"]
|
authors = ["Alex Auvolat <alex@adnab.me>"]
|
||||||
edition = "2018"
|
edition = "2018"
|
||||||
license-file = "LICENSE"
|
license-file = "LICENSE"
|
||||||
|
|
|
@ -102,13 +102,16 @@ impl ClientConn {
|
||||||
|
|
||||||
netapp.connected_as_client(peer_id, conn.clone());
|
netapp.connected_as_client(peer_id, conn.clone());
|
||||||
|
|
||||||
|
let debug_name = format!("CLI {}", hex::encode(&peer_id[..8]));
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write));
|
let debug_name_2 = debug_name.clone();
|
||||||
|
let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2));
|
||||||
|
|
||||||
let conn2 = conn.clone();
|
let conn2 = conn.clone();
|
||||||
let recv_future = tokio::spawn(async move {
|
let recv_future = tokio::spawn(async move {
|
||||||
select! {
|
select! {
|
||||||
r = conn2.recv_loop(read) => r,
|
r = conn2.recv_loop(read, debug_name) => r,
|
||||||
_ = await_exit(stop_recv_loop_recv) => Ok(())
|
_ = await_exit(stop_recv_loop_recv) => Ok(())
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
|
@ -22,10 +22,10 @@ use crate::NodeID;
|
||||||
|
|
||||||
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
|
const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30);
|
||||||
const CONN_MAX_RETRIES: usize = 10;
|
const CONN_MAX_RETRIES: usize = 10;
|
||||||
const PING_INTERVAL: Duration = Duration::from_secs(10);
|
const PING_INTERVAL: Duration = Duration::from_secs(15);
|
||||||
const LOOP_DELAY: Duration = Duration::from_secs(1);
|
const LOOP_DELAY: Duration = Duration::from_secs(1);
|
||||||
const PING_TIMEOUT: Duration = Duration::from_secs(5);
|
const PING_TIMEOUT: Duration = Duration::from_secs(10);
|
||||||
const FAILED_PING_THRESHOLD: usize = 3;
|
const FAILED_PING_THRESHOLD: usize = 4;
|
||||||
|
|
||||||
// -- Protocol messages --
|
// -- Protocol messages --
|
||||||
|
|
||||||
|
@ -60,11 +60,26 @@ struct PeerInfoInternal {
|
||||||
all_addrs: Vec<SocketAddr>,
|
all_addrs: Vec<SocketAddr>,
|
||||||
|
|
||||||
state: PeerConnState,
|
state: PeerConnState,
|
||||||
|
last_send_ping: Option<Instant>,
|
||||||
last_seen: Option<Instant>,
|
last_seen: Option<Instant>,
|
||||||
ping: VecDeque<Duration>,
|
ping: VecDeque<Duration>,
|
||||||
failed_pings: usize,
|
failed_pings: usize,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl PeerInfoInternal {
|
||||||
|
fn new(addr: SocketAddr, state: PeerConnState) -> Self {
|
||||||
|
Self {
|
||||||
|
addr,
|
||||||
|
all_addrs: vec![addr],
|
||||||
|
state,
|
||||||
|
last_send_ping: None,
|
||||||
|
last_seen: None,
|
||||||
|
ping: VecDeque::new(),
|
||||||
|
failed_pings: 0,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub struct PeerInfo {
|
pub struct PeerInfo {
|
||||||
/// The node's identifier (its public key)
|
/// The node's identifier (its public key)
|
||||||
|
@ -184,14 +199,7 @@ impl FullMeshPeeringStrategy {
|
||||||
if id != netapp.id {
|
if id != netapp.id {
|
||||||
known_hosts.list.insert(
|
known_hosts.list.insert(
|
||||||
id,
|
id,
|
||||||
PeerInfoInternal {
|
PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())),
|
||||||
addr,
|
|
||||||
all_addrs: vec![addr],
|
|
||||||
state: PeerConnState::Waiting(0, Instant::now()),
|
|
||||||
last_seen: None,
|
|
||||||
ping: VecDeque::new(),
|
|
||||||
failed_pings: 0,
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -199,14 +207,7 @@ impl FullMeshPeeringStrategy {
|
||||||
if let Some(addr) = our_addr {
|
if let Some(addr) = our_addr {
|
||||||
known_hosts.list.insert(
|
known_hosts.list.insert(
|
||||||
netapp.id,
|
netapp.id,
|
||||||
PeerInfoInternal {
|
PeerInfoInternal::new(addr, PeerConnState::Ourself),
|
||||||
addr,
|
|
||||||
all_addrs: vec![addr],
|
|
||||||
state: PeerConnState::Ourself,
|
|
||||||
last_seen: None,
|
|
||||||
ping: VecDeque::new(),
|
|
||||||
failed_pings: 0,
|
|
||||||
},
|
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -254,7 +255,7 @@ impl FullMeshPeeringStrategy {
|
||||||
trace!("{}, {:?}", hex::encode(&id[..8]), info);
|
trace!("{}, {:?}", hex::encode(&id[..8]), info);
|
||||||
match info.state {
|
match info.state {
|
||||||
PeerConnState::Connected => {
|
PeerConnState::Connected => {
|
||||||
let must_ping = match info.last_seen {
|
let must_ping = match info.last_send_ping {
|
||||||
None => true,
|
None => true,
|
||||||
Some(t) => Instant::now() - t > PING_INTERVAL,
|
Some(t) => Instant::now() - t > PING_INTERVAL,
|
||||||
};
|
};
|
||||||
|
@ -274,10 +275,17 @@ impl FullMeshPeeringStrategy {
|
||||||
};
|
};
|
||||||
|
|
||||||
// 2. Dispatch ping to hosts
|
// 2. Dispatch ping to hosts
|
||||||
trace!("to_ping: {} peers", to_retry.len());
|
trace!("to_ping: {} peers", to_ping.len());
|
||||||
|
if !to_ping.is_empty() {
|
||||||
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
|
for id in to_ping.iter() {
|
||||||
|
known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now());
|
||||||
|
}
|
||||||
|
drop(known_hosts);
|
||||||
for id in to_ping {
|
for id in to_ping {
|
||||||
tokio::spawn(self.clone().ping(id));
|
tokio::spawn(self.clone().ping(id));
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// 3. Try reconnects
|
// 3. Try reconnects
|
||||||
trace!("to_retry: {} peers", to_retry.len());
|
trace!("to_retry: {} peers", to_retry.len());
|
||||||
|
@ -534,17 +542,9 @@ impl FullMeshPeeringStrategy {
|
||||||
host.all_addrs.push(addr);
|
host.all_addrs.push(addr);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
known_hosts.list.insert(
|
known_hosts
|
||||||
id,
|
.list
|
||||||
PeerInfoInternal {
|
.insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected));
|
||||||
state: PeerConnState::Connected,
|
|
||||||
addr,
|
|
||||||
all_addrs: vec![addr],
|
|
||||||
last_seen: None,
|
|
||||||
ping: VecDeque::new(),
|
|
||||||
failed_pings: 0,
|
|
||||||
},
|
|
||||||
);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
known_hosts.update_hash();
|
known_hosts.update_hash();
|
||||||
|
@ -569,14 +569,7 @@ impl FullMeshPeeringStrategy {
|
||||||
} else {
|
} else {
|
||||||
PeerConnState::Waiting(0, Instant::now())
|
PeerConnState::Waiting(0, Instant::now())
|
||||||
};
|
};
|
||||||
PeerInfoInternal {
|
PeerInfoInternal::new(addr, state)
|
||||||
addr,
|
|
||||||
all_addrs: vec![addr],
|
|
||||||
state,
|
|
||||||
last_seen: None,
|
|
||||||
ping: VecDeque::new(),
|
|
||||||
failed_pings: 0,
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
52
src/proto.rs
52
src/proto.rs
|
@ -1,4 +1,5 @@
|
||||||
use std::collections::{HashMap, VecDeque};
|
use std::collections::{HashMap, VecDeque};
|
||||||
|
use std::fmt::Write;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use log::trace;
|
use log::trace;
|
||||||
|
@ -94,6 +95,22 @@ impl SendQueue {
|
||||||
fn is_empty(&self) -> bool {
|
fn is_empty(&self) -> bool {
|
||||||
self.items.iter().all(|(_k, v)| v.is_empty())
|
self.items.iter().all(|(_k, v)| v.is_empty())
|
||||||
}
|
}
|
||||||
|
fn dump(&self) -> String {
|
||||||
|
let mut ret = String::new();
|
||||||
|
for (prio, q) in self.items.iter() {
|
||||||
|
for item in q.iter() {
|
||||||
|
write!(
|
||||||
|
&mut ret,
|
||||||
|
" [{} {} ({})]",
|
||||||
|
prio,
|
||||||
|
item.data.len() - item.cursor,
|
||||||
|
item.id
|
||||||
|
)
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The SendLoop trait, which is implemented both by the client and the server
|
/// The SendLoop trait, which is implemented both by the client and the server
|
||||||
|
@ -110,6 +127,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
|
mut msg_recv: mpsc::UnboundedReceiver<(RequestID, RequestPriority, Vec<u8>)>,
|
||||||
mut write: BoxStreamWrite<W>,
|
mut write: BoxStreamWrite<W>,
|
||||||
|
debug_name: String,
|
||||||
) -> Result<(), Error>
|
) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
W: AsyncWriteExt + Unpin + Send + Sync,
|
W: AsyncWriteExt + Unpin + Send + Sync,
|
||||||
|
@ -117,8 +135,15 @@ pub(crate) trait SendLoop: Sync {
|
||||||
let mut sending = SendQueue::new();
|
let mut sending = SendQueue::new();
|
||||||
let mut should_exit = false;
|
let mut should_exit = false;
|
||||||
while !should_exit || !sending.is_empty() {
|
while !should_exit || !sending.is_empty() {
|
||||||
|
trace!("send_loop({}): queue = {}", debug_name, sending.dump());
|
||||||
if let Ok((id, prio, data)) = msg_recv.try_recv() {
|
if let Ok((id, prio, data)) = msg_recv.try_recv() {
|
||||||
trace!("send_loop: got {}, {} bytes", id, data.len());
|
trace!(
|
||||||
|
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
|
||||||
|
debug_name,
|
||||||
|
id,
|
||||||
|
prio,
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
id,
|
id,
|
||||||
prio,
|
prio,
|
||||||
|
@ -127,7 +152,8 @@ pub(crate) trait SendLoop: Sync {
|
||||||
});
|
});
|
||||||
} else if let Some(mut item) = sending.pop() {
|
} else if let Some(mut item) = sending.pop() {
|
||||||
trace!(
|
trace!(
|
||||||
"send_loop: sending bytes for {} ({} bytes, {} already sent)",
|
"send_loop({}): sending bytes for {} ({} bytes, {} already sent)",
|
||||||
|
debug_name,
|
||||||
item.id,
|
item.id,
|
||||||
item.data.len(),
|
item.data.len(),
|
||||||
item.cursor
|
item.cursor
|
||||||
|
@ -157,7 +183,13 @@ pub(crate) trait SendLoop: Sync {
|
||||||
} else {
|
} else {
|
||||||
let sth = msg_recv.recv().await;
|
let sth = msg_recv.recv().await;
|
||||||
if let Some((id, prio, data)) = sth {
|
if let Some((id, prio, data)) = sth {
|
||||||
trace!("send_loop: got {}, {} bytes", id, data.len());
|
trace!(
|
||||||
|
"send_loop({}): new message to send, id = {}, prio = {}, {} bytes",
|
||||||
|
debug_name,
|
||||||
|
id,
|
||||||
|
prio,
|
||||||
|
data.len()
|
||||||
|
);
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
id,
|
id,
|
||||||
prio,
|
prio,
|
||||||
|
@ -186,13 +218,12 @@ pub(crate) trait SendLoop: Sync {
|
||||||
pub(crate) trait RecvLoop: Sync + 'static {
|
pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
|
fn recv_handler(self: &Arc<Self>, id: RequestID, msg: Vec<u8>);
|
||||||
|
|
||||||
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
|
async fn recv_loop<R>(self: Arc<Self>, mut read: R, debug_name: String) -> Result<(), Error>
|
||||||
where
|
where
|
||||||
R: AsyncReadExt + Unpin + Send + Sync,
|
R: AsyncReadExt + Unpin + Send + Sync,
|
||||||
{
|
{
|
||||||
let mut receiving = HashMap::new();
|
let mut receiving = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
trace!("recv_loop: reading packet");
|
|
||||||
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
let mut header_id = [0u8; RequestID::BITS as usize / 8];
|
||||||
match read.read_exact(&mut header_id[..]).await {
|
match read.read_exact(&mut header_id[..]).await {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
|
@ -200,19 +231,24 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
Err(e) => return Err(e.into()),
|
Err(e) => return Err(e.into()),
|
||||||
};
|
};
|
||||||
let id = RequestID::from_be_bytes(header_id);
|
let id = RequestID::from_be_bytes(header_id);
|
||||||
trace!("recv_loop: got header id: {:04x}", id);
|
|
||||||
|
|
||||||
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
|
let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
|
||||||
read.read_exact(&mut header_size[..]).await?;
|
read.read_exact(&mut header_size[..]).await?;
|
||||||
let size = ChunkLength::from_be_bytes(header_size);
|
let size = ChunkLength::from_be_bytes(header_size);
|
||||||
trace!("recv_loop: got header size: {:04x}", size);
|
trace!(
|
||||||
|
"recv_loop({}): got header id = {}, size = 0x{:04x} ({} bytes)",
|
||||||
|
debug_name,
|
||||||
|
id,
|
||||||
|
size,
|
||||||
|
size & !CHUNK_HAS_CONTINUATION
|
||||||
|
);
|
||||||
|
|
||||||
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
|
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
|
||||||
let size = size & !CHUNK_HAS_CONTINUATION;
|
let size = size & !CHUNK_HAS_CONTINUATION;
|
||||||
|
|
||||||
let mut next_slice = vec![0; size as usize];
|
let mut next_slice = vec![0; size as usize];
|
||||||
read.read_exact(&mut next_slice[..]).await?;
|
read.read_exact(&mut next_slice[..]).await?;
|
||||||
trace!("recv_loop: read {} bytes", next_slice.len());
|
trace!("recv_loop({}): read {} bytes", debug_name, next_slice.len());
|
||||||
|
|
||||||
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
|
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
|
||||||
msg_bytes.extend_from_slice(&next_slice[..]);
|
msg_bytes.extend_from_slice(&next_slice[..]);
|
||||||
|
|
|
@ -105,14 +105,17 @@ impl ServerConn {
|
||||||
|
|
||||||
netapp.connected_as_server(peer_id, conn.clone());
|
netapp.connected_as_server(peer_id, conn.clone());
|
||||||
|
|
||||||
|
let debug_name = format!("SRV {}", hex::encode(&peer_id[..8]));
|
||||||
|
let debug_name_2 = debug_name.clone();
|
||||||
|
|
||||||
let conn2 = conn.clone();
|
let conn2 = conn.clone();
|
||||||
let recv_future = tokio::spawn(async move {
|
let recv_future = tokio::spawn(async move {
|
||||||
select! {
|
select! {
|
||||||
r = conn2.recv_loop(read) => r,
|
r = conn2.recv_loop(read, debug_name_2) => r,
|
||||||
_ = await_exit(must_exit) => Ok(())
|
_ = await_exit(must_exit) => Ok(())
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write));
|
let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name));
|
||||||
|
|
||||||
recv_future.await.log_err("ServerConn recv_loop");
|
recv_future.await.log_err("ServerConn recv_loop");
|
||||||
conn.resp_send.store(None);
|
conn.resp_send.store(None);
|
||||||
|
|
Loading…
Reference in a new issue