Use tokio_util::compat instead of the one from kuska-handshake

This commit is contained in:
Alex 2021-10-12 14:51:28 +02:00
parent b14515a422
commit a4069d703c
No known key found for this signature in database
GPG key ID: EDABF9711E244EB1
4 changed files with 162 additions and 572 deletions

686
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -19,8 +19,9 @@ default = []
basalt = ["lru", "rand"] basalt = ["lru", "rand"]
[dependencies] [dependencies]
async-std = { version = "1.5.0", default-features = false } futures = "0.3.17"
tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros"] } tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros", "io-util"] }
tokio-util = { version = "0.6.8", default-features = false, features = ["compat"] }
serde = { version = "1.0", default-features = false, features = ["derive"] } serde = { version = "1.0", default-features = false, features = ["derive"] }
rmp-serde = "0.14.3" rmp-serde = "0.14.3"
@ -38,7 +39,7 @@ bytes = "0.6.0"
lru = { version = "0.6", optional = true } lru = { version = "0.6", optional = true }
sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" } sodiumoxide = { version = "0.2.5-0", package = "kuska-sodiumoxide" }
kuska-handshake = { version = "0.2.0", git = "https://github.com/Alexis211/handshake", branch = "tokio1.0", features = ["default", "tokio_compat"] } kuska-handshake = { version = "0.2.0", git = "https://github.com/Alexis211/handshake", branch = "tokio1.0", features = ["default", "async_std"] }
[dev-dependencies] [dev-dependencies]
structopt = { version = "0.3", default-features = false } structopt = { version = "0.3", default-features = false }

View file

@ -6,15 +6,16 @@ use std::sync::{Arc, Mutex};
use bytes::Bytes; use bytes::Bytes;
use log::{debug, error, trace}; use log::{debug, error, trace};
use tokio::io::split;
use tokio::net::TcpStream; use tokio::net::TcpStream;
use tokio::sync::{mpsc, oneshot, watch}; use tokio::sync::{mpsc, oneshot, watch};
use tokio_util::compat::*;
use futures::io::AsyncReadExt;
use async_trait::async_trait; use async_trait::async_trait;
use kuska_handshake::async_std::{ use kuska_handshake::async_std::{
handshake_client, handshake_server, BoxStream, TokioCompatExt, TokioCompatExtRead, handshake_client, handshake_server, BoxStream
TokioCompatExtWrite,
}; };
use crate::error::*; use crate::error::*;
@ -35,9 +36,11 @@ pub(crate) struct ServerConn {
impl ServerConn { impl ServerConn {
pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> { pub(crate) async fn run(netapp: Arc<NetApp>, socket: TcpStream) -> Result<(), Error> {
let mut asyncstd_socket = TokioCompatExt::wrap(socket); let remote_addr = socket.peer_addr()?;
let mut socket = socket.compat();
let handshake = handshake_server( let handshake = handshake_server(
&mut asyncstd_socket, &mut socket,
netapp.netid.clone(), netapp.netid.clone(),
netapp.id, netapp.id,
netapp.privkey.clone(), netapp.privkey.clone(),
@ -45,19 +48,13 @@ impl ServerConn {
.await?; .await?;
let peer_id = handshake.peer_pk; let peer_id = handshake.peer_pk;
let tokio_socket = asyncstd_socket.into_inner();
let remote_addr = tokio_socket.peer_addr()?;
debug!( debug!(
"Handshake complete (server) with {}@{}", "Handshake complete (server) with {}@{}",
hex::encode(&peer_id), hex::encode(&peer_id),
remote_addr remote_addr
); );
let (read, write) = split(tokio_socket); let (read, write) = socket.split();
let read = TokioCompatExtRead::wrap(read);
let write = TokioCompatExtWrite::wrap(write);
let (read, write) = let (read, write) =
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();
@ -148,10 +145,11 @@ impl ClientConn {
socket: TcpStream, socket: TcpStream,
peer_id: NodeID, peer_id: NodeID,
) -> Result<(), Error> { ) -> Result<(), Error> {
let mut asyncstd_socket = TokioCompatExt::wrap(socket); let remote_addr = socket.peer_addr()?;
let mut socket = socket.compat();
let handshake = handshake_client( let handshake = handshake_client(
&mut asyncstd_socket, &mut socket,
netapp.netid.clone(), netapp.netid.clone(),
netapp.id, netapp.id,
netapp.privkey.clone(), netapp.privkey.clone(),
@ -159,19 +157,13 @@ impl ClientConn {
) )
.await?; .await?;
let tokio_socket = asyncstd_socket.into_inner();
let remote_addr = tokio_socket.peer_addr()?;
debug!( debug!(
"Handshake complete (client) with {}@{}", "Handshake complete (client) with {}@{}",
hex::encode(&peer_id), hex::encode(&peer_id),
remote_addr remote_addr
); );
let (read, write) = split(tokio_socket); let (read, write) = socket.split();
let read = TokioCompatExtRead::wrap(read);
let write = TokioCompatExtWrite::wrap(write);
let (read, write) = let (read, write) =
BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write();

View file

@ -3,8 +3,7 @@ use std::sync::Arc;
use log::trace; use log::trace;
use async_std::io::prelude::WriteExt; use futures::{AsyncReadExt, AsyncWriteExt};
use async_std::io::ReadExt;
use tokio::sync::mpsc; use tokio::sync::mpsc;
@ -94,7 +93,7 @@ pub(crate) trait SendLoop: Sync {
mut write: W, mut write: W,
) -> Result<(), Error> ) -> Result<(), Error>
where where
W: WriteExt + Unpin + Send + Sync, W: AsyncWriteExt + Unpin + Send + Sync,
{ {
let mut sending = SendQueue::new(); let mut sending = SendQueue::new();
let mut should_exit = false; let mut should_exit = false;
@ -168,7 +167,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error> async fn recv_loop<R>(self: Arc<Self>, mut read: R) -> Result<(), Error>
where where
R: ReadExt + Unpin + Send + Sync, R: AsyncReadExt + Unpin + Send + Sync,
{ {
let mut receiving = HashMap::new(); let mut receiving = HashMap::new();
loop { loop {