From 58ec2abe1a805d0fe6c86ab009f6947adbd9ca2b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 7 Dec 2020 18:07:55 +0100 Subject: [PATCH] Small changes --- Cargo.lock | 1 - Cargo.toml | 4 ++-- src/conn.rs | 23 ++++++++++------------- src/netapp.rs | 24 +++++++++++++++++------- src/proto.rs | 21 ++++++++++----------- 5 files changed, 39 insertions(+), 34 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7f65bd7..e5a2551 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -592,7 +592,6 @@ dependencies = [ [[package]] name = "kuska-handshake" version = "0.1.1" -source = "git+https://github.com/kuska-ssb/handshake?branch=master#5bd0c3a7ce47f063fcb44303e809b1976afc2470" dependencies = [ "async-std", "futures", diff --git a/Cargo.toml b/Cargo.toml index e018af7..20f69f1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,6 @@ bytes = "0.6.0" lru = "0.6" sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" } -#kuska-handshake = { path = "../handshake", features = ["default", "tokio_compat"] } -kuska-handshake = { git = "https://github.com/kuska-ssb/handshake", branch = "master", features = ["default", "tokio_compat"] } +kuska-handshake = { path = "../handshake", features = ["default", "tokio_compat"] } +#kuska-handshake = { git = "https://github.com/kuska-ssb/handshake", branch = "master", features = ["default", "tokio_compat"] } diff --git a/src/conn.rs b/src/conn.rs index 7ba15f9..0aee952 100644 --- a/src/conn.rs +++ b/src/conn.rs @@ -60,7 +60,7 @@ impl ServerConn { let read = TokioCompatExtRead::wrap(read); let write = TokioCompatExtWrite::wrap(write); - let (box_stream_read, box_stream_write) = + let (read, write) = BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); let (resp_send, resp_recv) = mpsc::unbounded_channel(); @@ -83,13 +83,13 @@ impl ServerConn { tokio::try_join!( async move { tokio::select!( - r = conn2.recv_loop(box_stream_read) => r, + r = conn2.recv_loop(read) => r, _ = await_exit(close_recv) => Ok(()), ) }, async move { tokio::select!( - r = conn3.send_loop(resp_recv, box_stream_write) => r, + r = conn3.send_loop(resp_recv, write) => r, _ = await_exit(close_recv2) => Ok(()), ) }, @@ -174,7 +174,7 @@ impl ClientConn { let read = TokioCompatExtRead::wrap(read); let write = TokioCompatExtWrite::wrap(write); - let (box_stream_read, box_stream_write) = + let (read, write) = BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); let (query_send, query_recv) = mpsc::unbounded_channel(); @@ -196,15 +196,12 @@ impl ClientConn { tokio::spawn(async move { let conn2 = conn.clone(); let conn3 = conn.clone(); - tokio::try_join!( - conn2.send_loop(query_recv, box_stream_write), - async move { - tokio::select!( - r = conn3.recv_loop(box_stream_read) => r, - _ = await_exit(stop_recv_loop_recv) => Ok(()), - ) - } - ) + tokio::try_join!(conn2.send_loop(query_recv, write), async move { + tokio::select!( + r = conn3.recv_loop(read) => r, + _ = await_exit(stop_recv_loop_recv) => Ok(()), + ) + }) .map(|_| ()) .log_err("ClientConn send_loop/recv_loop/dispatch_loop"); diff --git a/src/netapp.rs b/src/netapp.rs index 967105e..6f6da5b 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -3,6 +3,7 @@ use std::collections::HashMap; use std::net::SocketAddr; use std::pin::Pin; use std::sync::{Arc, RwLock}; +use std::time::Instant; use std::future::Future; @@ -75,10 +76,18 @@ where M::KIND, hex::encode(remote) ); + let begin_time = Instant::now(); let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) { Ok(msg) => Ok(handler(remote, msg).await), Err(e) => Err(e.to_string()), }; + let end_time = Instant::now(); + debug!( + "Request {:08x} from {} handled in {}msec", + M::KIND, + hex::encode(remote), + (end_time - begin_time).as_millis() + ); rmp_to_vec_all_named(&res).unwrap_or(vec![]) } @@ -291,8 +300,7 @@ impl NetApp { pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc) { info!("Accepted connection from {}", hex::encode(id)); - let mut conn_list = self.server_conns.write().unwrap(); - conn_list.insert(id.clone(), conn); + self.server_conns.write().unwrap().insert(id, conn); } // Handle hello message from a client. This message is used for them to tell us @@ -319,10 +327,11 @@ impl NetApp { if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); - } + drop(conn_list); - if let Some(h) = self.on_disconnected_handler.load().as_ref() { - h(conn.peer_pk, true); + if let Some(h) = self.on_disconnected_handler.load().as_ref() { + h(conn.peer_pk, true); + } } } } @@ -338,8 +347,8 @@ impl NetApp { info!("Connection established to {}", hex::encode(id)); { - let mut conn_list = self.client_conns.write().unwrap(); - if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) { + let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone()); + if let Some(old_c) = old_c_opt { tokio::spawn(async move { old_c.close() }); } } @@ -365,6 +374,7 @@ impl NetApp { if let Some(c) = conn_list.get(id) { if Arc::ptr_eq(c, &conn) { conn_list.remove(id); + drop(conn_list); if let Some(h) = self.on_disconnected_handler.load().as_ref() { h(conn.peer_pk, false); diff --git a/src/proto.rs b/src/proto.rs index 3e9fe20..7b8aa4b 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -6,16 +6,12 @@ use log::trace; use async_std::io::prelude::WriteExt; use async_std::io::ReadExt; -use tokio::io::{ReadHalf, WriteHalf}; -use tokio::net::TcpStream; use tokio::sync::mpsc; use async_trait::async_trait; use crate::error::*; -use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat}; - /// Priority of a request (click to read more about priorities). /// /// This priority value is used to priorize messages @@ -92,11 +88,14 @@ impl SendQueue { #[async_trait] pub(crate) trait SendLoop: Sync { - async fn send_loop( + async fn send_loop( self: Arc, mut msg_recv: mpsc::UnboundedReceiver)>>, - mut write: BoxStreamWrite>>, - ) -> Result<(), Error> { + mut write: W, + ) -> Result<(), Error> + where + W: WriteExt + Unpin + Send + Sync, + { let mut sending = SendQueue::new(); let mut should_exit = false; while !should_exit || !sending.is_empty() { @@ -167,10 +166,10 @@ pub(crate) trait RecvLoop: Sync + 'static { // Returns true if we should stop receiving after this async fn recv_handler(self: Arc, id: RequestID, msg: Vec); - async fn recv_loop( - self: Arc, - mut read: BoxStreamRead>>, - ) -> Result<(), Error> { + async fn recv_loop(self: Arc, mut read: R) -> Result<(), Error> + where + R: ReadExt + Unpin + Send + Sync, + { let mut receiving = HashMap::new(); loop { trace!("recv_loop: reading packet");