diff --git a/src/client.rs b/src/client.rs index df54810..9726125 100644 --- a/src/client.rs +++ b/src/client.rs @@ -100,13 +100,16 @@ impl ClientConn { netapp.connected_as_client(peer_id, conn.clone()); + let debug_name = format!("CLI {}", hex::encode(&peer_id[..8])); + tokio::spawn(async move { - let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write)); + let debug_name_2 = debug_name.clone(); + let send_future = tokio::spawn(conn.clone().send_loop(query_recv, write, debug_name_2)); let conn2 = conn.clone(); let recv_future = tokio::spawn(async move { select! { - r = conn2.recv_loop(read) => r, + r = conn2.recv_loop(read, debug_name) => r, _ = await_exit(stop_recv_loop_recv) => Ok(()) } }); diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index ccbd0ba..7f1c065 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -23,10 +23,10 @@ use crate::NodeID; const CONN_RETRY_INTERVAL: Duration = Duration::from_secs(30); const CONN_MAX_RETRIES: usize = 10; -const PING_INTERVAL: Duration = Duration::from_secs(10); +const PING_INTERVAL: Duration = Duration::from_secs(15); const LOOP_DELAY: Duration = Duration::from_secs(1); -const PING_TIMEOUT: Duration = Duration::from_secs(5); -const FAILED_PING_THRESHOLD: usize = 3; +const PING_TIMEOUT: Duration = Duration::from_secs(10); +const FAILED_PING_THRESHOLD: usize = 4; // -- Protocol messages -- @@ -61,11 +61,26 @@ struct PeerInfoInternal { all_addrs: Vec, state: PeerConnState, + last_send_ping: Option, last_seen: Option, ping: VecDeque, failed_pings: usize, } +impl PeerInfoInternal { + fn new(addr: SocketAddr, state: PeerConnState) -> Self { + Self { + addr, + all_addrs: vec![addr], + state, + last_send_ping: None, + last_seen: None, + ping: VecDeque::new(), + failed_pings: 0, + } + } +} + #[derive(Copy, Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) @@ -185,14 +200,7 @@ impl FullMeshPeeringStrategy { if id != netapp.id { known_hosts.list.insert( id, - PeerInfoInternal { - addr, - all_addrs: vec![addr], - state: PeerConnState::Waiting(0, Instant::now()), - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - }, + PeerInfoInternal::new(addr, PeerConnState::Waiting(0, Instant::now())), ); } } @@ -200,14 +208,7 @@ impl FullMeshPeeringStrategy { if let Some(addr) = our_addr { known_hosts.list.insert( netapp.id, - PeerInfoInternal { - addr, - all_addrs: vec![addr], - state: PeerConnState::Ourself, - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - }, + PeerInfoInternal::new(addr, PeerConnState::Ourself), ); } @@ -255,7 +256,7 @@ impl FullMeshPeeringStrategy { trace!("{}, {:?}", hex::encode(&id[..8]), info); match info.state { PeerConnState::Connected => { - let must_ping = match info.last_seen { + let must_ping = match info.last_send_ping { None => true, Some(t) => Instant::now() - t > PING_INTERVAL, }; @@ -275,9 +276,16 @@ impl FullMeshPeeringStrategy { }; // 2. Dispatch ping to hosts - trace!("to_ping: {} peers", to_retry.len()); - for id in to_ping { - tokio::spawn(self.clone().ping(id)); + trace!("to_ping: {} peers", to_ping.len()); + if !to_ping.is_empty() { + let mut known_hosts = self.known_hosts.write().unwrap(); + for id in to_ping.iter() { + known_hosts.list.get_mut(id).unwrap().last_send_ping = Some(Instant::now()); + } + drop(known_hosts); + for id in to_ping { + tokio::spawn(self.clone().ping(id)); + } } // 3. Try reconnects @@ -535,17 +543,9 @@ impl FullMeshPeeringStrategy { host.all_addrs.push(addr); } } else { - known_hosts.list.insert( - id, - PeerInfoInternal { - state: PeerConnState::Connected, - addr, - all_addrs: vec![addr], - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - }, - ); + known_hosts + .list + .insert(id, PeerInfoInternal::new(addr, PeerConnState::Connected)); } } known_hosts.update_hash(); @@ -570,14 +570,7 @@ impl FullMeshPeeringStrategy { } else { PeerConnState::Waiting(0, Instant::now()) }; - PeerInfoInternal { - addr, - all_addrs: vec![addr], - state, - last_seen: None, - ping: VecDeque::new(), - failed_pings: 0, - } + PeerInfoInternal::new(addr, state) } } diff --git a/src/recv.rs b/src/recv.rs index b5289fb..ac93c4b 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -54,14 +54,15 @@ impl Drop for Sender { pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream); - async fn recv_loop(self: Arc, mut read: R) -> Result<(), Error> + async fn recv_loop(self: Arc, mut read: R, debug_name: String) -> Result<(), Error> where R: AsyncReadExt + Unpin + Send + Sync, { let mut streams: HashMap = HashMap::new(); loop { - debug!( - "Receiving: {:?}", + trace!( + "recv_loop({}): in_progress = {:?}", + debug_name, streams.iter().map(|(id, _)| id).collect::>() ); @@ -87,11 +88,12 @@ pub(crate) trait RecvLoop: Sync + 'static { let kind = u8_to_io_errorkind(next_slice[0]); let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or(""); - debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg); + debug!("recv_loop({}): got id {}, error {:?}: {}", debug_name, id, kind, msg); Some(Err(std::io::Error::new(kind, msg.to_string()))) } else { trace!( - "recv_loop: got id {}, size {}, has_cont {}", + "recv_loop({}): got id {}, size {}, has_cont {}", + debug_name, id, size, has_cont @@ -107,7 +109,7 @@ pub(crate) trait RecvLoop: Sync + 'static { send } else { let (send, recv) = mpsc::unbounded_channel(); - trace!("recv_loop: id {} is new channel", id); + trace!("recv_loop({}): id {} is new channel", debug_name, id); self.recv_handler( id, Box::pin(tokio_stream::wrappers::UnboundedReceiverStream::new(recv)), @@ -126,7 +128,7 @@ pub(crate) trait RecvLoop: Sync + 'static { assert!(!is_error); streams.insert(id, sender); } else { - trace!("recv_loop: close channel id {}", id); + trace!("recv_loop({}): close channel id {}", debug_name, id); sender.end(); } } diff --git a/src/send.rs b/src/send.rs index ea6cf9f..3b01cb5 100644 --- a/src/send.rs +++ b/src/send.rs @@ -231,6 +231,7 @@ pub(crate) trait SendLoop: Sync { self: Arc, msg_recv: mpsc::UnboundedReceiver, mut write: BoxStreamWrite, + debug_name: String, ) -> Result<(), Error> where W: AsyncWriteExt + Unpin + Send + Sync, @@ -238,8 +239,9 @@ pub(crate) trait SendLoop: Sync { let mut sending = SendQueue::new(); let mut msg_recv = Some(msg_recv); while msg_recv.is_some() || !sending.is_empty() { - debug!( - "Sending: {:?}", + trace!( + "send_loop({}): queue = {:?}", + debug_name, sending .items .iter() @@ -262,7 +264,7 @@ pub(crate) trait SendLoop: Sync { biased; // always read incomming channel first if it has data sth = recv_fut => { if let Some((id, prio, order_tag, data)) = sth { - trace!("send_loop: add stream {} to send", id); + trace!("send_loop({}): add stream {} to send", debug_name, id); sending.push(SendQueueItem { id, prio, @@ -275,7 +277,8 @@ pub(crate) trait SendLoop: Sync { } (id, data) = send_fut => { trace!( - "send_loop: id {}, send {} bytes, header_size {}", + "send_loop({}): id {}, send {} bytes, header_size {}", + debug_name, id, data.data().len(), hex::encode(data.header()) diff --git a/src/server.rs b/src/server.rs index f8c3f98..2c12d9d 100644 --- a/src/server.rs +++ b/src/server.rs @@ -103,14 +103,17 @@ impl ServerConn { netapp.connected_as_server(peer_id, conn.clone()); + let debug_name = format!("SRV {}", hex::encode(&peer_id[..8])); + let debug_name_2 = debug_name.clone(); + let conn2 = conn.clone(); let recv_future = tokio::spawn(async move { select! { - r = conn2.recv_loop(read) => r, + r = conn2.recv_loop(read, debug_name_2) => r, _ = await_exit(must_exit) => Ok(()) } }); - let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write)); + let send_future = tokio::spawn(conn.clone().send_loop(resp_recv, write, debug_name)); recv_future.await.log_err("ServerConn recv_loop"); conn.resp_send.store(None);