One possibility, but I don't like it

This commit is contained in:
Alex 2022-07-21 19:25:07 +02:00
parent 44bbc1c00c
commit 7d148c7e76
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
8 changed files with 23 additions and 55 deletions

View file

@ -16,7 +16,6 @@ use tokio::sync::watch;
use netapp::endpoint::*; use netapp::endpoint::*;
use netapp::message::*; use netapp::message::*;
use netapp::peering::basalt::*; use netapp::peering::basalt::*;
use netapp::send::*;
use netapp::util::parse_peer_addr; use netapp::util::parse_peer_addr;
use netapp::{NetApp, NodeID}; use netapp::{NetApp, NodeID};
@ -146,7 +145,7 @@ impl Example {
tokio::spawn(async move { tokio::spawn(async move {
match self2 match self2
.example_endpoint .example_endpoint
.call(&p, &ExampleMessage { example_field: 42 }, PRIO_NORMAL) .call(&p, ExampleMessage { example_field: 42 }, PRIO_NORMAL)
.await .await
{ {
Ok(resp) => debug!("Got example response: {:?}", resp), Ok(resp) => debug!("Got example response: {:?}", resp),
@ -160,7 +159,7 @@ impl Example {
#[async_trait] #[async_trait]
impl EndpointHandler<ExampleMessage> for Example { impl EndpointHandler<ExampleMessage> for Example {
async fn handle(self: &Arc<Self>, msg: &ExampleMessage, _from: NodeID) -> ExampleResponse { async fn handle(self: &Arc<Self>, msg: ExampleMessage, _from: NodeID) -> ExampleResponse {
debug!("Got example message: {:?}, sending example response", msg); debug!("Got example message: {:?}, sending example response", msg);
ExampleResponse { ExampleResponse {
example_field: false, example_field: false,

View file

@ -1,4 +1,3 @@
use std::borrow::Borrow;
use std::collections::HashMap; use std::collections::HashMap;
use std::net::SocketAddr; use std::net::SocketAddr;
use std::sync::atomic::{self, AtomicU32}; use std::sync::atomic::{self, AtomicU32};

View file

@ -1,4 +1,3 @@
use std::borrow::Borrow;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;

View file

@ -1,4 +1,3 @@
use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -43,6 +42,10 @@ pub trait Message: SerializeMessage + Send + Sync {
} }
/// A trait for de/serializing messages, with possible associated stream. /// A trait for de/serializing messages, with possible associated stream.
/// This is default-implemented by anything that can already be serialized
/// and deserialized. Adapters are provided that implement this for
/// adding a body, either from a fixed Bytes buffer (which allows the thing
/// to be Clone), or from a streaming byte stream.
pub trait SerializeMessage: Sized { pub trait SerializeMessage: Sized {
type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send; type SerializableSelf: Serialize + for<'de> Deserialize<'de> + Send;
@ -53,38 +56,9 @@ pub trait SerializeMessage: Sized {
// ---- // ----
impl<T, E> SerializeMessage for Result<T, E>
where
T: SerializeMessage + Send,
E: Serialize + for<'de> Deserialize<'de> + Send,
{
type SerializableSelf = Result<T::SerializableSelf, E>;
fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
match self {
Ok(ok) => {
let (msg, stream) = ok.into_parts();
(Ok(msg), stream)
}
Err(err) => (Err(err), None),
}
}
fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self {
match ser_self {
Ok(ok) => Ok(T::from_parts(ok, stream)),
Err(err) => Err(err),
}
}
}
// ---
pub trait SimpleMessage: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync {}
impl<T> SerializeMessage for T impl<T> SerializeMessage for T
where where
T: SimpleMessage, T: Serialize + for<'de> Deserialize<'de> + Send,
{ {
type SerializableSelf = Self; type SerializableSelf = Self;
fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) { fn into_parts(self) -> (Self::SerializableSelf, Option<ByteStream>) {
@ -97,12 +71,15 @@ where
} }
} }
impl SimpleMessage for () {}
impl<T: SimpleMessage> SimpleMessage for std::sync::Arc<T> {}
// ---- // ----
/// An adapter that adds a body from a fixed Bytes buffer to a serializable message,
/// implementing the SerializeMessage trait. This allows for the SerializeMessage object
/// to be cloned, which is usefull for requests that must be sent to multiple servers.
/// Note that cloning the body is cheap thanks to Bytes; make sure that your serializable
/// part is also easily clonable (e.g. by wrapping it in an Arc).
/// Note that this CANNOT be used for a response type, as it cannot be reconstructed
/// from a remote stream.
#[derive(Clone)] #[derive(Clone)]
pub struct WithFixedBody<T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static>( pub struct WithFixedBody<T: Serialize + for<'de> Deserialize<'de> + Clone + Send + 'static>(
pub T, pub T,
@ -123,11 +100,14 @@ where
) )
} }
fn from_parts(ser_self: Self::SerializableSelf, stream: ByteStream) -> Self { fn from_parts(_ser_self: Self::SerializableSelf, _stream: ByteStream) -> Self {
panic!("Cannot reconstruct a WithFixedBody type from parts"); panic!("Cannot use a WithFixedBody as a response type");
} }
} }
/// An adapter that adds a body from a ByteStream. This is usefull for receiving
/// responses to requests that contain attached byte streams. This type is
/// not clonable.
pub struct WithStreamingBody<T: Serialize + for<'de> Deserialize<'de> + Send>( pub struct WithStreamingBody<T: Serialize + for<'de> Deserialize<'de> + Send>(
pub T, pub T,
pub ByteStream, pub ByteStream,

View file

@ -38,8 +38,6 @@ pub(crate) struct HelloMessage {
pub server_port: u16, pub server_port: u16,
} }
impl SimpleMessage for HelloMessage {}
impl Message for HelloMessage { impl Message for HelloMessage {
type Response = (); type Response = ();
} }

View file

@ -16,7 +16,6 @@ use tokio::sync::watch;
use crate::endpoint::*; use crate::endpoint::*;
use crate::message::*; use crate::message::*;
use crate::netapp::*; use crate::netapp::*;
use crate::send::*;
use crate::NodeID; use crate::NodeID;
// -- Protocol messages -- // -- Protocol messages --
@ -332,7 +331,7 @@ impl Basalt {
async fn do_pull(self: Arc<Self>, peer: NodeID) { async fn do_pull(self: Arc<Self>, peer: NodeID) {
match self match self
.pull_endpoint .pull_endpoint
.call(&peer, &PullMessage {}, PRIO_NORMAL) .call(&peer, PullMessage {}, PRIO_NORMAL)
.await .await
{ {
Ok(resp) => { Ok(resp) => {
@ -347,7 +346,7 @@ impl Basalt {
async fn do_push(self: Arc<Self>, peer: NodeID) { async fn do_push(self: Arc<Self>, peer: NodeID) {
let push_msg = self.make_push_message(); let push_msg = self.make_push_message();
match self.push_endpoint.call(&peer, &push_msg, PRIO_NORMAL).await { match self.push_endpoint.call(&peer, push_msg, PRIO_NORMAL).await {
Ok(_) => { Ok(_) => {
trace!("KYEV PEXo {}", hex::encode(peer)); trace!("KYEV PEXo {}", hex::encode(peer));
} }
@ -469,14 +468,14 @@ impl Basalt {
#[async_trait] #[async_trait]
impl EndpointHandler<PullMessage> for Basalt { impl EndpointHandler<PullMessage> for Basalt {
async fn handle(self: &Arc<Self>, _pullmsg: &PullMessage, _from: NodeID) -> PushMessage { async fn handle(self: &Arc<Self>, _pullmsg: PullMessage, _from: NodeID) -> PushMessage {
self.make_push_message() self.make_push_message()
} }
} }
#[async_trait] #[async_trait]
impl EndpointHandler<PushMessage> for Basalt { impl EndpointHandler<PushMessage> for Basalt {
async fn handle(self: &Arc<Self>, pushmsg: &PushMessage, _from: NodeID) { async fn handle(self: &Arc<Self>, pushmsg: PushMessage, _from: NodeID) {
self.handle_peer_list(&pushmsg.peers[..]); self.handle_peer_list(&pushmsg.peers[..]);
} }
} }

View file

@ -40,8 +40,6 @@ impl Message for PingMessage {
type Response = PingMessage; type Response = PingMessage;
} }
impl SimpleMessage for PingMessage {}
#[derive(Serialize, Deserialize, Clone)] #[derive(Serialize, Deserialize, Clone)]
struct PeerListMessage { struct PeerListMessage {
pub list: Vec<(NodeID, SocketAddr)>, pub list: Vec<(NodeID, SocketAddr)>,
@ -51,8 +49,6 @@ impl Message for PeerListMessage {
type Response = PeerListMessage; type Response = PeerListMessage;
} }
impl SimpleMessage for PeerListMessage {}
// -- Algorithm data structures -- // -- Algorithm data structures --
#[derive(Debug)] #[derive(Debug)]

View file

@ -9,8 +9,6 @@ use serde::Serialize;
use futures::Stream; use futures::Stream;
use tokio::sync::watch; use tokio::sync::watch;
use crate::message::SerializeMessage;
/// A node's identifier, which is also its public cryptographic key /// A node's identifier, which is also its public cryptographic key
pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey; pub type NodeID = sodiumoxide::crypto::sign::ed25519::PublicKey;
/// A node's secret key /// A node's secret key