Small changes
This commit is contained in:
parent
32a0fbcbd9
commit
58ec2abe1a
5 changed files with 39 additions and 34 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -592,7 +592,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "kuska-handshake"
|
name = "kuska-handshake"
|
||||||
version = "0.1.1"
|
version = "0.1.1"
|
||||||
source = "git+https://github.com/kuska-ssb/handshake?branch=master#5bd0c3a7ce47f063fcb44303e809b1976afc2470"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-std",
|
"async-std",
|
||||||
"futures",
|
"futures",
|
||||||
|
|
|
@ -28,6 +28,6 @@ bytes = "0.6.0"
|
||||||
lru = "0.6"
|
lru = "0.6"
|
||||||
|
|
||||||
sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" }
|
sodiumoxide = { git = "https://github.com/Dhole/sodiumoxidez", branch = "extra" }
|
||||||
#kuska-handshake = { path = "../handshake", features = ["default", "tokio_compat"] }
|
kuska-handshake = { path = "../handshake", features = ["default", "tokio_compat"] }
|
||||||
kuska-handshake = { git = "https://github.com/kuska-ssb/handshake", branch = "master", features = ["default", "tokio_compat"] }
|
#kuska-handshake = { git = "https://github.com/kuska-ssb/handshake", branch = "master", features = ["default", "tokio_compat"] }
|
||||||
|
|
||||||
|
|
17
src/conn.rs
17
src/conn.rs
|
@ -60,7 +60,7 @@ impl ServerConn {
|
||||||
let read = TokioCompatExtRead::wrap(read);
|
let read = TokioCompatExtRead::wrap(read);
|
||||||
let write = TokioCompatExtWrite::wrap(write);
|
let write = TokioCompatExtWrite::wrap(write);
|
||||||
|
|
||||||
let (box_stream_read, box_stream_write) =
|
let (read, write) =
|
||||||
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
||||||
|
|
||||||
let (resp_send, resp_recv) = mpsc::unbounded_channel();
|
let (resp_send, resp_recv) = mpsc::unbounded_channel();
|
||||||
|
@ -83,13 +83,13 @@ impl ServerConn {
|
||||||
tokio::try_join!(
|
tokio::try_join!(
|
||||||
async move {
|
async move {
|
||||||
tokio::select!(
|
tokio::select!(
|
||||||
r = conn2.recv_loop(box_stream_read) => r,
|
r = conn2.recv_loop(read) => r,
|
||||||
_ = await_exit(close_recv) => Ok(()),
|
_ = await_exit(close_recv) => Ok(()),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
async move {
|
async move {
|
||||||
tokio::select!(
|
tokio::select!(
|
||||||
r = conn3.send_loop(resp_recv, box_stream_write) => r,
|
r = conn3.send_loop(resp_recv, write) => r,
|
||||||
_ = await_exit(close_recv2) => Ok(()),
|
_ = await_exit(close_recv2) => Ok(()),
|
||||||
)
|
)
|
||||||
},
|
},
|
||||||
|
@ -174,7 +174,7 @@ impl ClientConn {
|
||||||
let read = TokioCompatExtRead::wrap(read);
|
let read = TokioCompatExtRead::wrap(read);
|
||||||
let write = TokioCompatExtWrite::wrap(write);
|
let write = TokioCompatExtWrite::wrap(write);
|
||||||
|
|
||||||
let (box_stream_read, box_stream_write) =
|
let (read, write) =
|
||||||
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
|
||||||
|
|
||||||
let (query_send, query_recv) = mpsc::unbounded_channel();
|
let (query_send, query_recv) = mpsc::unbounded_channel();
|
||||||
|
@ -196,15 +196,12 @@ impl ClientConn {
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
let conn2 = conn.clone();
|
let conn2 = conn.clone();
|
||||||
let conn3 = conn.clone();
|
let conn3 = conn.clone();
|
||||||
tokio::try_join!(
|
tokio::try_join!(conn2.send_loop(query_recv, write), async move {
|
||||||
conn2.send_loop(query_recv, box_stream_write),
|
|
||||||
async move {
|
|
||||||
tokio::select!(
|
tokio::select!(
|
||||||
r = conn3.recv_loop(box_stream_read) => r,
|
r = conn3.recv_loop(read) => r,
|
||||||
_ = await_exit(stop_recv_loop_recv) => Ok(()),
|
_ = await_exit(stop_recv_loop_recv) => Ok(()),
|
||||||
)
|
)
|
||||||
}
|
})
|
||||||
)
|
|
||||||
.map(|_| ())
|
.map(|_| ())
|
||||||
.log_err("ClientConn send_loop/recv_loop/dispatch_loop");
|
.log_err("ClientConn send_loop/recv_loop/dispatch_loop");
|
||||||
|
|
||||||
|
|
|
@ -3,6 +3,7 @@ use std::collections::HashMap;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::{Arc, RwLock};
|
||||||
|
use std::time::Instant;
|
||||||
|
|
||||||
use std::future::Future;
|
use std::future::Future;
|
||||||
|
|
||||||
|
@ -75,10 +76,18 @@ where
|
||||||
M::KIND,
|
M::KIND,
|
||||||
hex::encode(remote)
|
hex::encode(remote)
|
||||||
);
|
);
|
||||||
|
let begin_time = Instant::now();
|
||||||
let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
|
let res = match rmp_serde::decode::from_read_ref::<_, M>(&bytes[..]) {
|
||||||
Ok(msg) => Ok(handler(remote, msg).await),
|
Ok(msg) => Ok(handler(remote, msg).await),
|
||||||
Err(e) => Err(e.to_string()),
|
Err(e) => Err(e.to_string()),
|
||||||
};
|
};
|
||||||
|
let end_time = Instant::now();
|
||||||
|
debug!(
|
||||||
|
"Request {:08x} from {} handled in {}msec",
|
||||||
|
M::KIND,
|
||||||
|
hex::encode(remote),
|
||||||
|
(end_time - begin_time).as_millis()
|
||||||
|
);
|
||||||
rmp_to_vec_all_named(&res).unwrap_or(vec![])
|
rmp_to_vec_all_named(&res).unwrap_or(vec![])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -291,8 +300,7 @@ impl NetApp {
|
||||||
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
|
pub(crate) fn connected_as_server(&self, id: ed25519::PublicKey, conn: Arc<ServerConn>) {
|
||||||
info!("Accepted connection from {}", hex::encode(id));
|
info!("Accepted connection from {}", hex::encode(id));
|
||||||
|
|
||||||
let mut conn_list = self.server_conns.write().unwrap();
|
self.server_conns.write().unwrap().insert(id, conn);
|
||||||
conn_list.insert(id.clone(), conn);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handle hello message from a client. This message is used for them to tell us
|
// Handle hello message from a client. This message is used for them to tell us
|
||||||
|
@ -319,13 +327,14 @@ impl NetApp {
|
||||||
if let Some(c) = conn_list.get(id) {
|
if let Some(c) = conn_list.get(id) {
|
||||||
if Arc::ptr_eq(c, &conn) {
|
if Arc::ptr_eq(c, &conn) {
|
||||||
conn_list.remove(id);
|
conn_list.remove(id);
|
||||||
}
|
drop(conn_list);
|
||||||
|
|
||||||
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
||||||
h(conn.peer_pk, true);
|
h(conn.peer_pk, true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// Called from conn.rs when an outgoinc connection is successfully established.
|
// Called from conn.rs when an outgoinc connection is successfully established.
|
||||||
// The connection is registered in self.client_conns, and the
|
// The connection is registered in self.client_conns, and the
|
||||||
|
@ -338,8 +347,8 @@ impl NetApp {
|
||||||
info!("Connection established to {}", hex::encode(id));
|
info!("Connection established to {}", hex::encode(id));
|
||||||
|
|
||||||
{
|
{
|
||||||
let mut conn_list = self.client_conns.write().unwrap();
|
let old_c_opt = self.client_conns.write().unwrap().insert(id, conn.clone());
|
||||||
if let Some(old_c) = conn_list.insert(id.clone(), conn.clone()) {
|
if let Some(old_c) = old_c_opt {
|
||||||
tokio::spawn(async move { old_c.close() });
|
tokio::spawn(async move { old_c.close() });
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -365,6 +374,7 @@ impl NetApp {
|
||||||
if let Some(c) = conn_list.get(id) {
|
if let Some(c) = conn_list.get(id) {
|
||||||
if Arc::ptr_eq(c, &conn) {
|
if Arc::ptr_eq(c, &conn) {
|
||||||
conn_list.remove(id);
|
conn_list.remove(id);
|
||||||
|
drop(conn_list);
|
||||||
|
|
||||||
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
if let Some(h) = self.on_disconnected_handler.load().as_ref() {
|
||||||
h(conn.peer_pk, false);
|
h(conn.peer_pk, false);
|
||||||
|
|
21
src/proto.rs
21
src/proto.rs
|
@ -6,16 +6,12 @@ use log::trace;
|
||||||
use async_std::io::prelude::WriteExt;
|
use async_std::io::prelude::WriteExt;
|
||||||
use async_std::io::ReadExt;
|
use async_std::io::ReadExt;
|
||||||
|
|
||||||
use tokio::io::{ReadHalf, WriteHalf};
|
|
||||||
use tokio::net::TcpStream;
|
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
|
|
||||||
use kuska_handshake::async_std::{BoxStreamRead, BoxStreamWrite, TokioCompat};
|
|
||||||
|
|
||||||
/// Priority of a request (click to read more about priorities).
|
/// Priority of a request (click to read more about priorities).
|
||||||
///
|
///
|
||||||
/// This priority value is used to priorize messages
|
/// This priority value is used to priorize messages
|
||||||
|
@ -92,11 +88,14 @@ impl SendQueue {
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub(crate) trait SendLoop: Sync {
|
pub(crate) trait SendLoop: Sync {
|
||||||
async fn send_loop(
|
async fn send_loop<W>(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>,
|
mut msg_recv: mpsc::UnboundedReceiver<Option<(RequestID, RequestPriority, Vec<u8>)>>,
|
||||||
mut write: BoxStreamWrite<TokioCompat<WriteHalf<TcpStream>>>,
|
mut write: W,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error>
|
||||||
|
where
|
||||||
|
W: WriteExt + Unpin + Send + Sync,
|
||||||
|
{
|
||||||
let mut sending = SendQueue::new();
|
let mut sending = SendQueue::new();
|
||||||
let mut should_exit = false;
|
let mut should_exit = false;
|
||||||
while !should_exit || !sending.is_empty() {
|
while !should_exit || !sending.is_empty() {
|
||||||
|
@ -167,10 +166,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
// Returns true if we should stop receiving after this
|
// Returns true if we should stop receiving after this
|
||||||
async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>);
|
async fn recv_handler(self: Arc<Self>, id: RequestID, msg: Vec<u8>);
|
||||||
|
|
||||||
async fn recv_loop(
|
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
|
||||||
self: Arc<Self>,
|
where
|
||||||
mut read: BoxStreamRead<TokioCompat<ReadHalf<TcpStream>>>,
|
R: ReadExt + Unpin + Send + Sync,
|
||||||
) -> Result<(), Error> {
|
{
|
||||||
let mut receiving = HashMap::new();
|
let mut receiving = HashMap::new();
|
||||||
loop {
|
loop {
|
||||||
trace!("recv_loop: reading packet");
|
trace!("recv_loop: reading packet");
|
||||||
|
|
Loading…
Reference in a new issue