From d9bd1182f7b980df8e631ae8eeca444f5d997909 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 12 Oct 2021 18:13:07 +0200 Subject: [PATCH] Move out things from conn.rs into two separate files --- Cargo.lock | 2 +- Makefile | 2 +- src/{conn.rs => client.rs} | 149 ++------------------------------ src/lib.rs | 3 +- src/netapp.rs | 3 +- src/proto.rs | 14 +-- src/server.rs | 171 +++++++++++++++++++++++++++++++++++++ 7 files changed, 190 insertions(+), 154 deletions(-) rename src/{conn.rs => client.rs} (56%) create mode 100644 src/server.rs diff --git a/Cargo.lock b/Cargo.lock index 4045aaf..d00d37f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -403,7 +403,7 @@ dependencies = [ [[package]] name = "netapp" -version = "0.2.0" +version = "0.3.0" dependencies = [ "arc-swap", "async-trait", diff --git a/Makefile b/Makefile index 569f4b4..0ef8cef 100644 --- a/Makefile +++ b/Makefile @@ -1,5 +1,5 @@ all: cargo build cargo build --example fullmesh - #RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7 + RUST_LOG=netapp=debug cargo run --example fullmesh -- -n 3242ce79e05e8b6a0e43441fbd140a906e13f335f298ae3a52f29784abbab500 -p 6c304114a0e1018bbe60502a34d33f4f439f370856c3333dda2726da01eb93a4894b7ef7249a71f11d342b69702f1beb7c93ec95fbcf122ad1eca583bb0629e7 diff --git a/src/conn.rs b/src/client.rs similarity index 56% rename from src/conn.rs rename to src/client.rs index 64318dc..a436d53 100644 --- a/src/conn.rs +++ b/src/client.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::atomic::{self, AtomicBool, AtomicU32}; use std::sync::{Arc, Mutex}; -use bytes::Bytes; use log::{debug, error, trace}; use tokio::net::TcpStream; @@ -14,7 +13,7 @@ use futures::io::AsyncReadExt; use async_trait::async_trait; -use kuska_handshake::async_std::{handshake_client, handshake_server, BoxStream}; +use kuska_handshake::async_std::{handshake_client, BoxStream}; use crate::endpoint::*; use crate::error::*; @@ -22,149 +21,7 @@ use crate::netapp::*; use crate::proto::*; use crate::util::*; -// Request message format (client -> server): -// - u8 priority -// - u8 path length -// - [u8; path length] path -// - [u8; *] data -// Response message format (server -> client): -// - u8 response code -// - [u8; *] response - -pub(crate) struct ServerConn { - pub(crate) remote_addr: SocketAddr, - pub(crate) peer_id: NodeID, - - netapp: Arc, - - resp_send: mpsc::UnboundedSender)>>, - close_send: watch::Sender, -} - -impl ServerConn { - pub(crate) async fn run(netapp: Arc, socket: TcpStream) -> Result<(), Error> { - let remote_addr = socket.peer_addr()?; - let mut socket = socket.compat(); - - let handshake = handshake_server( - &mut socket, - netapp.netid.clone(), - netapp.id, - netapp.privkey.clone(), - ) - .await?; - let peer_id = handshake.peer_pk; - - debug!( - "Handshake complete (server) with {}@{}", - hex::encode(&peer_id), - remote_addr - ); - - let (read, write) = socket.split(); - - let (read, write) = - BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); - - let (resp_send, resp_recv) = mpsc::unbounded_channel(); - - let (close_send, close_recv) = watch::channel(false); - - let conn = Arc::new(ServerConn { - netapp: netapp.clone(), - remote_addr, - peer_id, - resp_send, - close_send, - }); - - netapp.connected_as_server(peer_id, conn.clone()); - - let conn2 = conn.clone(); - let conn3 = conn.clone(); - let close_recv2 = close_recv.clone(); - tokio::try_join!( - async move { - tokio::select!( - r = conn2.recv_loop(read) => r, - _ = await_exit(close_recv) => Ok(()), - ) - }, - async move { - tokio::select!( - r = conn3.send_loop(resp_recv, write) => r, - _ = await_exit(close_recv2) => Ok(()), - ) - }, - ) - .map(|_| ()) - .log_err("ServerConn recv_loop/send_loop"); - - netapp.disconnected_as_server(&peer_id, conn); - - Ok(()) - } - - pub fn close(&self) { - self.close_send.send(true).unwrap(); - } - - async fn recv_handler_aux(self: &Arc, bytes: &[u8]) -> Result, Error> { - if bytes.len() < 2 { - return Err(Error::Message("Invalid protocol message".into())); - } - - // byte 0 is the request priority, we don't care here - let path_length = bytes[1] as usize; - if bytes.len() < 2 + path_length { - return Err(Error::Message("Invalid protocol message".into())); - } - - let path = &bytes[2..2 + path_length]; - let path = String::from_utf8(path.to_vec())?; - let data = &bytes[2 + path_length..]; - - let handler_opt = { - let endpoints = self.netapp.endpoints.read().unwrap(); - endpoints.get(&path).map(|e| e.clone_endpoint()) - }; - - if let Some(handler) = handler_opt { - handler.handle(data, self.peer_id).await - } else { - Err(Error::NoHandler) - } - } -} - -impl SendLoop for ServerConn {} - -#[async_trait] -impl RecvLoop for ServerConn { - async fn recv_handler(self: Arc, id: RequestID, bytes: Vec) { - trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); - let bytes: Bytes = bytes.into(); - - let resp = self.recv_handler_aux(&bytes[..]).await; - let prio = bytes[0]; - - let mut resp_bytes = vec![]; - match resp { - Ok(rb) => { - resp_bytes.push(0u8); - resp_bytes.extend(&rb[..]); - } - Err(e) => { - resp_bytes.push(e.code()); - } - } - - self.resp_send - .send(Some((id, prio, resp_bytes))) - .log_err("ServerConn recv_handler send resp"); - } -} pub(crate) struct ClientConn { pub(crate) remote_addr: SocketAddr, pub(crate) peer_id: NodeID, @@ -264,6 +121,7 @@ impl ClientConn { let id = self .next_query_number .fetch_add(1, atomic::Ordering::Relaxed); + let mut bytes = vec![prio, path.as_bytes().len() as u8]; bytes.extend_from_slice(path.as_bytes()); bytes.extend_from_slice(&rmp_to_vec_all_named(&rq)?[..]); @@ -283,6 +141,9 @@ impl ClientConn { self.query_send.send(Some((id, prio, bytes)))?; let resp = resp_recv.await?; + if resp.len() == 0 { + return Err(Error::Message("Response is 0 bytes, either a collision or a protocol error".into())); + } let code = resp[0]; if code == 0 { diff --git a/src/lib.rs b/src/lib.rs index e5251c5..f24b7ac 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -21,7 +21,8 @@ pub mod util; pub mod endpoint; pub mod proto; -mod conn; +mod server; +mod client; pub mod netapp; pub mod peering; diff --git a/src/netapp.rs b/src/netapp.rs index 8415c58..afdd3c9 100644 --- a/src/netapp.rs +++ b/src/netapp.rs @@ -12,7 +12,8 @@ use sodiumoxide::crypto::auth; use sodiumoxide::crypto::sign::ed25519; use tokio::net::{TcpListener, TcpStream}; -use crate::conn::*; +use crate::client::*; +use crate::server::*; use crate::endpoint::*; use crate::error::*; use crate::proto::*; diff --git a/src/proto.rs b/src/proto.rs index 5b71ba5..3811e3f 100644 --- a/src/proto.rs +++ b/src/proto.rs @@ -38,11 +38,19 @@ pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; +// Messages are sent by chunks +// Chunk format: +// - u32 BE: request id (same for request and response) +// - u16 BE: chunk length, possibly with CHUNK_HAS_CONTINUATION flag +// when this is not the last chunk of the message +// - [u8; chunk_length] chunk data + pub(crate) type RequestID = u32; type ChunkLength = u16; const MAX_CHUNK_LENGTH: ChunkLength = 0x4000; const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; + struct SendQueueItem { id: RequestID, prio: RequestPriority, @@ -86,12 +94,6 @@ impl SendQueue { } } -// Messages are sent by chunks -// Chunk format: -// - u32 BE: request id (same for request and response) -// - u16 BE: chunk length -// - [u8; chunk_length] chunk data - #[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( diff --git a/src/server.rs b/src/server.rs new file mode 100644 index 0000000..73ae267 --- /dev/null +++ b/src/server.rs @@ -0,0 +1,171 @@ +use std::net::SocketAddr; +use std::sync::{Arc}; + +use bytes::Bytes; +use log::{debug, trace}; + +use tokio::net::TcpStream; +use tokio::sync::{mpsc, watch}; +use tokio_util::compat::*; + +use futures::io::AsyncReadExt; + +use async_trait::async_trait; + +use kuska_handshake::async_std::{handshake_server, BoxStream}; + +use crate::error::*; +use crate::netapp::*; +use crate::proto::*; +use crate::util::*; + +// The client and server connection structs (client.rs and server.rs) +// build upon the chunking mechanism which is exclusively contained +// in proto.rs. +// Here, we just care about sending big messages without size limit. +// The format of these messages is described below. +// Chunking happens independently. + +// Request message format (client -> server): +// - u8 priority +// - u8 path length +// - [u8; path length] path +// - [u8; *] data + +// Response message format (server -> client): +// - u8 response code +// - [u8; *] response + +pub(crate) struct ServerConn { + pub(crate) remote_addr: SocketAddr, + pub(crate) peer_id: NodeID, + + netapp: Arc, + + resp_send: mpsc::UnboundedSender)>>, + close_send: watch::Sender, +} + +impl ServerConn { + pub(crate) async fn run(netapp: Arc, socket: TcpStream) -> Result<(), Error> { + let remote_addr = socket.peer_addr()?; + let mut socket = socket.compat(); + + let handshake = handshake_server( + &mut socket, + netapp.netid.clone(), + netapp.id, + netapp.privkey.clone(), + ) + .await?; + let peer_id = handshake.peer_pk; + + debug!( + "Handshake complete (server) with {}@{}", + hex::encode(&peer_id), + remote_addr + ); + + let (read, write) = socket.split(); + + let (read, write) = + BoxStream::from_handshake(read, write, handshake, 0x8000).split_read_write(); + + let (resp_send, resp_recv) = mpsc::unbounded_channel(); + + let (close_send, close_recv) = watch::channel(false); + + let conn = Arc::new(ServerConn { + netapp: netapp.clone(), + remote_addr, + peer_id, + resp_send, + close_send, + }); + + netapp.connected_as_server(peer_id, conn.clone()); + + let conn2 = conn.clone(); + let conn3 = conn.clone(); + let close_recv2 = close_recv.clone(); + tokio::try_join!( + async move { + tokio::select!( + r = conn2.recv_loop(read) => r, + _ = await_exit(close_recv) => Ok(()), + ) + }, + async move { + tokio::select!( + r = conn3.send_loop(resp_recv, write) => r, + _ = await_exit(close_recv2) => Ok(()), + ) + }, + ) + .map(|_| ()) + .log_err("ServerConn recv_loop/send_loop"); + + netapp.disconnected_as_server(&peer_id, conn); + + Ok(()) + } + + pub fn close(&self) { + self.close_send.send(true).unwrap(); + } + + async fn recv_handler_aux(self: &Arc, bytes: &[u8]) -> Result, Error> { + if bytes.len() < 2 { + return Err(Error::Message("Invalid protocol message".into())); + } + + // byte 0 is the request priority, we don't care here + let path_length = bytes[1] as usize; + if bytes.len() < 2 + path_length { + return Err(Error::Message("Invalid protocol message".into())); + } + + let path = &bytes[2..2 + path_length]; + let path = String::from_utf8(path.to_vec())?; + let data = &bytes[2 + path_length..]; + + let handler_opt = { + let endpoints = self.netapp.endpoints.read().unwrap(); + endpoints.get(&path).map(|e| e.clone_endpoint()) + }; + + if let Some(handler) = handler_opt { + handler.handle(data, self.peer_id).await + } else { + Err(Error::NoHandler) + } + } +} + +impl SendLoop for ServerConn {} + +#[async_trait] +impl RecvLoop for ServerConn { + async fn recv_handler(self: Arc, id: RequestID, bytes: Vec) { + trace!("ServerConn recv_handler {} ({} bytes)", id, bytes.len()); + let bytes: Bytes = bytes.into(); + + let resp = self.recv_handler_aux(&bytes[..]).await; + let prio = bytes[0]; + + let mut resp_bytes = vec![]; + match resp { + Ok(rb) => { + resp_bytes.push(0u8); + resp_bytes.extend(&rb[..]); + } + Err(e) => { + resp_bytes.push(e.code()); + } + } + + self.resp_send + .send(Some((id, prio, resp_bytes))) + .log_err("ServerConn recv_handler send resp"); + } +}