diff --git a/src/message.rs b/src/message.rs index ec9433a..2b2b75f 100644 --- a/src/message.rs +++ b/src/message.rs @@ -41,17 +41,31 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01; // ---- -#[derive(Clone, Copy)] -pub struct OrderTagStream(u64); +/// An order tag can be added to a message or a response to indicate +/// whether it should be sent after or before other messages with order tags +/// referencing a same stream #[derive(Clone, Copy, Serialize, Deserialize, Debug)] pub struct OrderTag(pub(crate) u64, pub(crate) u64); +/// A stream is an opaque identifier that defines a set of messages +/// or responses that are ordered wrt one another using to order tags. +#[derive(Clone, Copy)] +pub struct OrderTagStream(u64); + + impl OrderTag { + /// Create a new stream from which to generate order tags. Example: + /// ``` + /// let stream = OrderTag.stream(); + /// let tag_1 = stream.order(1); + /// let tag_2 = stream.order(2); + /// ``` pub fn stream() -> OrderTagStream { OrderTagStream(thread_rng().gen()) } } impl OrderTagStream { + /// Create the order tag for message `order` in this stream pub fn order(&self, order: u64) -> OrderTag { OrderTag(self.0, order) } @@ -60,8 +74,10 @@ impl OrderTagStream { // ---- /// This trait should be implemented by all messages your application -/// wants to handle +/// wants to handle. It specifies which data type should be sent +/// as a response to this message in the RPC protocol. pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { + /// The type of the response that is sent in response to this message type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; } @@ -79,10 +95,13 @@ pub struct Req { } impl Req { + /// Creates a new request from a base message `M` pub fn new(v: M) -> Result { Ok(v.into_req()?) } + /// Attach a stream to message in request, where the stream is streamed + /// from a fixed `Bytes` buffer pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { stream: AttachedStream::Fixed(b), @@ -90,6 +109,10 @@ impl Req { } } + /// Attach a stream to message in request, where the stream is + /// an instance of `ByteStream`. Note than when a `Req` has an attached + /// stream which is a `ByteStream` instance, it can no longer be cloned + /// to be sent to different nodes (`.clone()` will panic) pub fn with_stream(self, b: ByteStream) -> Self { Self { stream: AttachedStream::Stream(b), @@ -97,6 +120,8 @@ impl Req { } } + /// Add an order tag to this request to indicate in which order it should + /// be sent. pub fn with_order_tag(self, order_tag: OrderTag) -> Self { Self { order_tag: Some(order_tag), @@ -104,10 +129,12 @@ impl Req { } } + /// Get a reference to the message `M` contained in this request pub fn msg(&self) -> &M { &self.msg } + /// Takes out the stream attached to this request, if any pub fn take_stream(&mut self) -> Option { std::mem::replace(&mut self.stream, AttachedStream::None).into_stream() } @@ -142,8 +169,14 @@ impl Req { } } +/// `IntoReq` represents any object that can be transformed into `Req` pub trait IntoReq { + /// Transform the object into a `Req`, serializing the message M + /// to be sent to remote nodes fn into_req(self) -> Result, rmp_serde::encode::Error>; + /// Transform the object into a `Req`, skipping the serialization + /// of message M, in the case we are not sending this RPC message to + /// a remote node fn into_req_local(self) -> Req; } @@ -220,6 +253,7 @@ pub struct Resp { } impl Resp { + /// Creates a new response from a base response message pub fn new(v: M::Response) -> Self { Resp { _phantom: Default::default(), @@ -229,6 +263,8 @@ impl Resp { } } + /// Attach a stream to message in response, where the stream is streamed + /// from a fixed `Bytes` buffer pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { stream: AttachedStream::Fixed(b), @@ -236,6 +272,8 @@ impl Resp { } } + /// Attach a stream to message in response, where the stream is + /// an instance of `ByteStream`. pub fn with_stream(self, b: ByteStream) -> Self { Self { stream: AttachedStream::Stream(b), @@ -243,6 +281,8 @@ impl Resp { } } + /// Add an order tag to this response to indicate in which order it should + /// be sent. pub fn with_order_tag(self, order_tag: OrderTag) -> Self { Self { order_tag: Some(order_tag), @@ -250,14 +290,20 @@ impl Resp { } } + /// Get a reference to the response message contained in this request pub fn msg(&self) -> &M::Response { &self.msg } + /// Transforms the `Resp` into the response message it contains, + /// dropping everything else (including attached data stream) pub fn into_msg(self) -> M::Response { self.msg } + /// Transforms the `Resp` into, on the one side, the response message + /// it contains, and on the other side, the associated data stream + /// if it exists pub fn into_parts(self) -> (M::Response, Option) { (self.msg, self.stream.into_stream()) } diff --git a/src/peering/fullmesh.rs b/src/peering/fullmesh.rs index 7f1c065..2f3330e 100644 --- a/src/peering/fullmesh.rs +++ b/src/peering/fullmesh.rs @@ -81,6 +81,7 @@ impl PeerInfoInternal { } } +/// Information that the full mesh peering strategy can return about the peers it knows of #[derive(Copy, Clone, Debug)] pub struct PeerInfo { /// The node's identifier (its public key) diff --git a/src/stream.rs b/src/stream.rs index 05ee051..82f7be3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -9,19 +9,23 @@ use tokio::io::AsyncRead; use crate::bytes_buf::BytesBuf; -/// A stream of associated data. +/// A stream of bytes (click to read more). /// /// When sent through Netapp, the Vec may be split in smaller chunk in such a way /// consecutive Vec may get merged, but Vec and error code may not be reordered /// -/// Error code 255 means the stream was cut before its end. Other codes have no predefined -/// meaning, it's up to your application to define their semantic. +/// Items sent in the ByteStream may be errors of type `std::io::Error`. +/// An error indicates the end of the ByteStream: a reader should no longer read +/// after recieving an error, and a writer should stop writing after sending an error. pub type ByteStream = Pin + Send + Sync>>; +/// A packet sent in a ByteStream, which may contain either +/// a Bytes object or an error pub type Packet = Result; // ---- +/// A helper struct to read defined lengths of data from a BytesStream pub struct ByteStreamReader { stream: ByteStream, buf: BytesBuf, @@ -30,6 +34,7 @@ pub struct ByteStreamReader { } impl ByteStreamReader { + /// Creates a new `ByteStreamReader` from a `ByteStream` pub fn new(stream: ByteStream) -> Self { ByteStreamReader { stream, @@ -39,6 +44,8 @@ impl ByteStreamReader { } } + /// Read exactly `read_len` bytes from the underlying stream + /// (returns a future) pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> { ByteStreamReadExact { reader: self, @@ -47,6 +54,8 @@ impl ByteStreamReader { } } + /// Read at most `read_len` bytes from the underlying stream, or less + /// if the end of the stream is reached (returns a future) pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> { ByteStreamReadExact { reader: self, @@ -55,10 +64,14 @@ impl ByteStreamReader { } } + /// Read exactly one byte from the underlying stream and returns it + /// as an u8 pub async fn read_u8(&mut self) -> Result { Ok(self.read_exact(1).await?[0]) } + /// Read exactly two bytes from the underlying stream and returns them as an u16 (using + /// big-endian decoding) pub async fn read_u16(&mut self) -> Result { let bytes = self.read_exact(2).await?; let mut b = [0u8; 2]; @@ -66,6 +79,8 @@ impl ByteStreamReader { Ok(u16::from_be_bytes(b)) } + /// Read exactly four bytes from the underlying stream and returns them as an u32 (using + /// big-endian decoding) pub async fn read_u32(&mut self) -> Result { let bytes = self.read_exact(4).await?; let mut b = [0u8; 4]; @@ -73,6 +88,8 @@ impl ByteStreamReader { Ok(u32::from_be_bytes(b)) } + /// Transforms the stream reader back into the underlying stream (starting + /// after everything that the reader has read) pub fn into_stream(self) -> ByteStream { let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok)); if let Some(err) = self.err { @@ -84,10 +101,21 @@ impl ByteStreamReader { } } + /// Tries to fill the internal read buffer from the underlying stream. + /// Calling this might be necessary to ensure that `.eos()` returns a correct + /// result, otherwise the reader might not be aware that the underlying + /// stream has nothing left to return. + pub async fn fill_buffer(&mut self) { + let packet = self.stream.next().await; + self.add_stream_next(packet); + } + + /// Clears the internal read buffer and returns its content pub fn take_buffer(&mut self) -> Bytes { self.buf.take_all() } + /// Returns true if the end of the underlying stream has been reached pub fn eos(&self) -> bool { self.buf.is_empty() && self.eos } @@ -110,18 +138,19 @@ impl ByteStreamReader { } } } - - pub async fn fill_buffer(&mut self) { - let packet = self.stream.next().await; - self.add_stream_next(packet); - } } +/// The error kind that can be returned by `ByteStreamReader::read_exact` and +/// `ByteStreamReader::read_exact_or_eos` pub enum ReadExactError { + /// The end of the stream was reached before the requested number of bytes could be read UnexpectedEos, + /// The underlying data stream returned an IO error when trying to read Stream(std::io::Error), } +/// The future returned by `ByteStreamReader::read_exact` and +/// `ByteStreamReader::read_exact_or_eos` #[pin_project::pin_project] pub struct ByteStreamReadExact<'a> { #[pin] @@ -160,10 +189,12 @@ impl<'a> Future for ByteStreamReadExact<'a> { // ---- +/// Turns a `tokio::io::AsyncRead` asynchronous reader into a `ByteStream` pub fn asyncread_stream(reader: R) -> ByteStream { Box::pin(tokio_util::io::ReaderStream::new(reader)) } +/// Turns a `ByteStream` into a `tokio::io::AsyncRead` asynchronous reader pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { tokio_util::io::StreamReader::new(stream) }