forked from lx/netapp
Fix clippy lints
This commit is contained in:
parent
7753b789b7
commit
74e661febe
5 changed files with 42 additions and 38 deletions
18
src/conn.rs
18
src/conn.rs
|
@ -39,11 +39,11 @@ impl ServerConn {
|
||||||
let handshake = handshake_server(
|
let handshake = handshake_server(
|
||||||
&mut asyncstd_socket,
|
&mut asyncstd_socket,
|
||||||
netapp.netid.clone(),
|
netapp.netid.clone(),
|
||||||
netapp.id.clone(),
|
netapp.id,
|
||||||
netapp.privkey.clone(),
|
netapp.privkey.clone(),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let peer_id = handshake.peer_pk.clone();
|
let peer_id = handshake.peer_pk;
|
||||||
|
|
||||||
let tokio_socket = asyncstd_socket.into_inner();
|
let tokio_socket = asyncstd_socket.into_inner();
|
||||||
let remote_addr = tokio_socket.peer_addr()?;
|
let remote_addr = tokio_socket.peer_addr()?;
|
||||||
|
@ -69,12 +69,12 @@ impl ServerConn {
|
||||||
let conn = Arc::new(ServerConn {
|
let conn = Arc::new(ServerConn {
|
||||||
netapp: netapp.clone(),
|
netapp: netapp.clone(),
|
||||||
remote_addr,
|
remote_addr,
|
||||||
peer_id: peer_id.clone(),
|
peer_id,
|
||||||
resp_send,
|
resp_send,
|
||||||
close_send,
|
close_send,
|
||||||
});
|
});
|
||||||
|
|
||||||
netapp.connected_as_server(peer_id.clone(), conn.clone());
|
netapp.connected_as_server(peer_id, conn.clone());
|
||||||
|
|
||||||
let conn2 = conn.clone();
|
let conn2 = conn.clone();
|
||||||
let conn3 = conn.clone();
|
let conn3 = conn.clone();
|
||||||
|
@ -123,7 +123,7 @@ impl RecvLoop for ServerConn {
|
||||||
|
|
||||||
if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) {
|
if let Some(handler) = self.netapp.msg_handlers.load().get(&kind) {
|
||||||
let net_handler = &handler.net_handler;
|
let net_handler = &handler.net_handler;
|
||||||
let resp = net_handler(self.peer_id.clone(), bytes.slice(5..)).await;
|
let resp = net_handler(self.peer_id, bytes.slice(5..)).await;
|
||||||
self.resp_send
|
self.resp_send
|
||||||
.send(Some((id, prio, resp)))
|
.send(Some((id, prio, resp)))
|
||||||
.log_err("ServerConn recv_handler send resp");
|
.log_err("ServerConn recv_handler send resp");
|
||||||
|
@ -153,9 +153,9 @@ impl ClientConn {
|
||||||
let handshake = handshake_client(
|
let handshake = handshake_client(
|
||||||
&mut asyncstd_socket,
|
&mut asyncstd_socket,
|
||||||
netapp.netid.clone(),
|
netapp.netid.clone(),
|
||||||
netapp.id.clone(),
|
netapp.id,
|
||||||
netapp.privkey.clone(),
|
netapp.privkey.clone(),
|
||||||
peer_id.clone(),
|
peer_id,
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
|
@ -182,7 +182,7 @@ impl ClientConn {
|
||||||
|
|
||||||
let conn = Arc::new(ClientConn {
|
let conn = Arc::new(ClientConn {
|
||||||
remote_addr,
|
remote_addr,
|
||||||
peer_id: peer_id.clone(),
|
peer_id,
|
||||||
next_query_number: AtomicU16::from(0u16),
|
next_query_number: AtomicU16::from(0u16),
|
||||||
query_send,
|
query_send,
|
||||||
inflight: Mutex::new(HashMap::new()),
|
inflight: Mutex::new(HashMap::new()),
|
||||||
|
@ -190,7 +190,7 @@ impl ClientConn {
|
||||||
stop_recv_loop,
|
stop_recv_loop,
|
||||||
});
|
});
|
||||||
|
|
||||||
netapp.connected_as_client(peer_id.clone(), conn.clone());
|
netapp.connected_as_client(peer_id, conn.clone());
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let conn2 = conn.clone();
|
let conn2 = conn.clone();
|
||||||
|
|
|
@ -31,13 +31,13 @@ pub enum Error {
|
||||||
|
|
||||||
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
|
impl<T> From<tokio::sync::watch::error::SendError<T>> for Error {
|
||||||
fn from(_e: tokio::sync::watch::error::SendError<T>) -> Error {
|
fn from(_e: tokio::sync::watch::error::SendError<T>) -> Error {
|
||||||
Error::Message(format!("Watch send error"))
|
Error::Message("Watch send error".into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
|
impl<T> From<tokio::sync::mpsc::error::SendError<T>> for Error {
|
||||||
fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error {
|
fn from(_e: tokio::sync::mpsc::error::SendError<T>) -> Error {
|
||||||
Error::Message(format!("MPSC send error"))
|
Error::Message("MPSC send error".into())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -24,12 +24,16 @@ use crate::util::*;
|
||||||
|
|
||||||
type DynMsg = Box<dyn Any + Send + Sync + 'static>;
|
type DynMsg = Box<dyn Any + Send + Sync + 'static>;
|
||||||
|
|
||||||
|
type OnConnectHandler = Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>;
|
||||||
|
type OnDisconnectHandler = Box<dyn Fn(NodeID, bool) + Send + Sync>;
|
||||||
|
|
||||||
|
pub(crate) type LocalHandler = Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>;
|
||||||
|
pub(crate) type NetHandler = Box<
|
||||||
|
dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send>;
|
||||||
|
|
||||||
pub(crate) struct Handler {
|
pub(crate) struct Handler {
|
||||||
pub(crate) local_handler:
|
pub(crate) local_handler: LocalHandler,
|
||||||
Box<dyn Fn(DynMsg) -> Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> + Sync + Send>,
|
pub(crate) net_handler: NetHandler,
|
||||||
pub(crate) net_handler: Box<
|
|
||||||
dyn Fn(NodeID, Bytes) -> Pin<Box<dyn Future<Output = Vec<u8>> + Sync + Send>> + Sync + Send,
|
|
||||||
>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// NetApp is the main class that handles incoming and outgoing connections.
|
/// NetApp is the main class that handles incoming and outgoing connections.
|
||||||
|
@ -58,8 +62,8 @@ pub struct NetApp {
|
||||||
client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
|
client_conns: RwLock<HashMap<NodeID, Arc<ClientConn>>>,
|
||||||
|
|
||||||
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
|
pub(crate) msg_handlers: ArcSwap<HashMap<MessageKind, Arc<Handler>>>,
|
||||||
on_connected_handler: ArcSwapOption<Box<dyn Fn(NodeID, SocketAddr, bool) + Send + Sync>>,
|
on_connected_handler: ArcSwapOption<OnConnectHandler>,
|
||||||
on_disconnected_handler: ArcSwapOption<Box<dyn Fn(NodeID, bool) + Send + Sync>>,
|
on_disconnected_handler: ArcSwapOption<OnDisconnectHandler>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct ListenParams {
|
struct ListenParams {
|
||||||
|
@ -90,7 +94,7 @@ where
|
||||||
hex::encode(remote),
|
hex::encode(remote),
|
||||||
(end_time - begin_time).as_millis()
|
(end_time - begin_time).as_millis()
|
||||||
);
|
);
|
||||||
rmp_to_vec_all_named(&res).unwrap_or(vec![])
|
rmp_to_vec_all_named(&res).unwrap_or_default()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg
|
async fn local_handler_aux<M, F, R>(handler: Arc<F>, remote: NodeID, msg: DynMsg) -> DynMsg
|
||||||
|
@ -128,7 +132,7 @@ impl NetApp {
|
||||||
let netapp2 = netapp.clone();
|
let netapp2 = netapp.clone();
|
||||||
netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| {
|
netapp.add_msg_handler::<HelloMessage, _, _>(move |from: NodeID, msg: HelloMessage| {
|
||||||
netapp2.handle_hello_message(from, msg);
|
netapp2.handle_hello_message(from, msg);
|
||||||
async { () }
|
async { }
|
||||||
});
|
});
|
||||||
|
|
||||||
netapp
|
netapp
|
||||||
|
@ -175,7 +179,7 @@ impl NetApp {
|
||||||
fun
|
fun
|
||||||
});
|
});
|
||||||
|
|
||||||
let self_id = self.id.clone();
|
let self_id = self.id;
|
||||||
let local_handler = Box::new(move |msg: DynMsg| {
|
let local_handler = Box::new(move |msg: DynMsg| {
|
||||||
let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
|
let fun: Pin<Box<dyn Future<Output = DynMsg> + Sync + Send>> =
|
||||||
Box::pin(local_handler_aux(handler.clone(), self_id, msg));
|
Box::pin(local_handler_aux(handler.clone(), self_id, msg));
|
||||||
|
@ -248,7 +252,7 @@ impl NetApp {
|
||||||
|
|
||||||
let socket = TcpStream::connect(ip).await?;
|
let socket = TcpStream::connect(ip).await?;
|
||||||
info!("Connected to {}, negotiating handshake...", ip);
|
info!("Connected to {}, negotiating handshake...", ip);
|
||||||
ClientConn::init(self, socket, id.clone()).await?;
|
ClientConn::init(self, socket, id).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -315,7 +319,7 @@ impl NetApp {
|
||||||
fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) {
|
fn handle_hello_message(&self, id: NodeID, msg: HelloMessage) {
|
||||||
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
if let Some(h) = self.on_connected_handler.load().as_ref() {
|
||||||
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
if let Some(c) = self.server_conns.read().unwrap().get(&id) {
|
||||||
let remote_ip = msg.server_addr.unwrap_or(c.remote_addr.ip());
|
let remote_ip = msg.server_addr.unwrap_or_else(|| c.remote_addr.ip());
|
||||||
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
|
let remote_addr = SocketAddr::new(remote_ip, msg.server_port);
|
||||||
h(id, remote_addr, true);
|
h(id, remote_addr, true);
|
||||||
}
|
}
|
||||||
|
|
|
@ -103,7 +103,7 @@ impl KnownHosts {
|
||||||
let mut list = Vec::with_capacity(input.len());
|
let mut list = Vec::with_capacity(input.len());
|
||||||
for (id, peer) in input.iter() {
|
for (id, peer) in input.iter() {
|
||||||
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
|
if peer.state == PeerConnState::Connected || peer.state == PeerConnState::Ourself {
|
||||||
list.push((id.clone(), peer.addr));
|
list.push((*id, peer.addr));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
list
|
list
|
||||||
|
@ -134,7 +134,7 @@ impl FullMeshPeeringStrategy {
|
||||||
known_hosts.list.insert(
|
known_hosts.list.insert(
|
||||||
id,
|
id,
|
||||||
PeerInfo {
|
PeerInfo {
|
||||||
addr: addr,
|
addr,
|
||||||
state: PeerConnState::Waiting(0, Instant::now()),
|
state: PeerConnState::Waiting(0, Instant::now()),
|
||||||
last_seen: None,
|
last_seen: None,
|
||||||
ping: VecDeque::new(),
|
ping: VecDeque::new(),
|
||||||
|
@ -201,12 +201,12 @@ impl FullMeshPeeringStrategy {
|
||||||
Some(t) => Instant::now() - t > PING_INTERVAL,
|
Some(t) => Instant::now() - t > PING_INTERVAL,
|
||||||
};
|
};
|
||||||
if must_ping {
|
if must_ping {
|
||||||
to_ping.push(id.clone());
|
to_ping.push(*id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
PeerConnState::Waiting(_, t) => {
|
PeerConnState::Waiting(_, t) => {
|
||||||
if Instant::now() >= t {
|
if Instant::now() >= t {
|
||||||
to_retry.push(id.clone());
|
to_retry.push(*id);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
|
@ -234,7 +234,7 @@ impl FullMeshPeeringStrategy {
|
||||||
i + 1
|
i + 1
|
||||||
);
|
);
|
||||||
h.state = PeerConnState::Trying(i);
|
h.state = PeerConnState::Trying(i);
|
||||||
tokio::spawn(self.clone().try_connect(id, h.addr.clone()));
|
tokio::spawn(self.clone().try_connect(id, h.addr));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -307,7 +307,7 @@ impl FullMeshPeeringStrategy {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
|
async fn try_connect(self: Arc<Self>, id: NodeID, addr: SocketAddr) {
|
||||||
let conn_result = self.netapp.clone().try_connect(addr, id.clone()).await;
|
let conn_result = self.netapp.clone().try_connect(addr, id).await;
|
||||||
if let Err(e) = conn_result {
|
if let Err(e) = conn_result {
|
||||||
warn!("Error connecting to {}: {}", hex::encode(id), e);
|
warn!("Error connecting to {}: {}", hex::encode(id), e);
|
||||||
let mut known_hosts = self.known_hosts.write().unwrap();
|
let mut known_hosts = self.known_hosts.write().unwrap();
|
||||||
|
@ -362,9 +362,9 @@ impl FullMeshPeeringStrategy {
|
||||||
for (id, info) in known_hosts.list.iter() {
|
for (id, info) in known_hosts.list.iter() {
|
||||||
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
|
let mut pings = info.ping.iter().cloned().collect::<Vec<_>>();
|
||||||
pings.sort();
|
pings.sort();
|
||||||
if pings.len() > 0 {
|
if !pings.is_empty() {
|
||||||
ret.push(PeerInfoPub {
|
ret.push(PeerInfoPub {
|
||||||
id: id.clone(),
|
id: *id,
|
||||||
addr: info.addr,
|
addr: info.addr,
|
||||||
state: info.state,
|
state: info.state,
|
||||||
last_seen: info.last_seen,
|
last_seen: info.last_seen,
|
||||||
|
@ -379,7 +379,7 @@ impl FullMeshPeeringStrategy {
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
ret.push(PeerInfoPub {
|
ret.push(PeerInfoPub {
|
||||||
id: id.clone(),
|
id: *id,
|
||||||
addr: info.addr,
|
addr: info.addr,
|
||||||
state: info.state,
|
state: info.state,
|
||||||
last_seen: info.last_seen,
|
last_seen: info.last_seen,
|
||||||
|
|
|
@ -65,7 +65,7 @@ impl SendQueue {
|
||||||
let mut items_at_prio = self
|
let mut items_at_prio = self
|
||||||
.items
|
.items
|
||||||
.remove(&prio)
|
.remove(&prio)
|
||||||
.unwrap_or(VecDeque::with_capacity(4));
|
.unwrap_or_else(|| VecDeque::with_capacity(4));
|
||||||
items_at_prio.push_back(item);
|
items_at_prio.push_back(item);
|
||||||
self.items.insert(prio, items_at_prio);
|
self.items.insert(prio, items_at_prio);
|
||||||
}
|
}
|
||||||
|
@ -143,7 +143,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
let sth = msg_recv
|
let sth = msg_recv
|
||||||
.recv()
|
.recv()
|
||||||
.await
|
.await
|
||||||
.ok_or(Error::Message("Connection closed.".into()))?;
|
.ok_or_else(|| Error::Message("Connection closed.".into()))?;
|
||||||
if let Some((id, prio, data)) = sth {
|
if let Some((id, prio, data)) = sth {
|
||||||
trace!("send_loop: got {}, {} bytes", id, data.len());
|
trace!("send_loop: got {}, {} bytes", id, data.len());
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
|
@ -190,7 +190,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
read.read_exact(&mut next_slice[..]).await?;
|
read.read_exact(&mut next_slice[..]).await?;
|
||||||
trace!("recv_loop: read {} bytes", next_slice.len());
|
trace!("recv_loop: read {} bytes", next_slice.len());
|
||||||
|
|
||||||
let mut msg_bytes = receiving.remove(&id).unwrap_or(vec![]);
|
let mut msg_bytes: Vec<_> = receiving.remove(&id).unwrap_or_default();
|
||||||
msg_bytes.extend_from_slice(&next_slice[..]);
|
msg_bytes.extend_from_slice(&next_slice[..]);
|
||||||
|
|
||||||
if has_cont {
|
if has_cont {
|
||||||
|
|
Loading…
Reference in a new issue