add streaming body to requests and responses #3
3 changed files with 89 additions and 11 deletions
|
@ -41,17 +41,31 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01;
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
#[derive(Clone, Copy)]
|
/// An order tag can be added to a message or a response to indicate
|
||||||
pub struct OrderTagStream(u64);
|
/// whether it should be sent after or before other messages with order tags
|
||||||
|
/// referencing a same stream
|
||||||
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
|
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
|
||||||
pub struct OrderTag(pub(crate) u64, pub(crate) u64);
|
pub struct OrderTag(pub(crate) u64, pub(crate) u64);
|
||||||
|
|
||||||
|
/// A stream is an opaque identifier that defines a set of messages
|
||||||
|
/// or responses that are ordered wrt one another using to order tags.
|
||||||
|
#[derive(Clone, Copy)]
|
||||||
|
pub struct OrderTagStream(u64);
|
||||||
|
|
||||||
|
|
||||||
impl OrderTag {
|
impl OrderTag {
|
||||||
|
/// Create a new stream from which to generate order tags. Example:
|
||||||
|
/// ```
|
||||||
|
/// let stream = OrderTag.stream();
|
||||||
|
/// let tag_1 = stream.order(1);
|
||||||
|
/// let tag_2 = stream.order(2);
|
||||||
|
/// ```
|
||||||
pub fn stream() -> OrderTagStream {
|
pub fn stream() -> OrderTagStream {
|
||||||
OrderTagStream(thread_rng().gen())
|
OrderTagStream(thread_rng().gen())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
impl OrderTagStream {
|
impl OrderTagStream {
|
||||||
|
/// Create the order tag for message `order` in this stream
|
||||||
pub fn order(&self, order: u64) -> OrderTag {
|
pub fn order(&self, order: u64) -> OrderTag {
|
||||||
OrderTag(self.0, order)
|
OrderTag(self.0, order)
|
||||||
}
|
}
|
||||||
|
@ -60,8 +74,10 @@ impl OrderTagStream {
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
/// This trait should be implemented by all messages your application
|
/// This trait should be implemented by all messages your application
|
||||||
/// wants to handle
|
/// wants to handle. It specifies which data type should be sent
|
||||||
|
/// as a response to this message in the RPC protocol.
|
||||||
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
|
pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static {
|
||||||
|
/// The type of the response that is sent in response to this message
|
||||||
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
|
type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -79,10 +95,13 @@ pub struct Req<M: Message> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Message> Req<M> {
|
impl<M: Message> Req<M> {
|
||||||
|
/// Creates a new request from a base message `M`
|
||||||
pub fn new(v: M) -> Result<Self, Error> {
|
pub fn new(v: M) -> Result<Self, Error> {
|
||||||
Ok(v.into_req()?)
|
Ok(v.into_req()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach a stream to message in request, where the stream is streamed
|
||||||
|
/// from a fixed `Bytes` buffer
|
||||||
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
|
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stream: AttachedStream::Fixed(b),
|
stream: AttachedStream::Fixed(b),
|
||||||
|
@ -90,6 +109,10 @@ impl<M: Message> Req<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach a stream to message in request, where the stream is
|
||||||
|
/// an instance of `ByteStream`. Note than when a `Req<M>` has an attached
|
||||||
|
/// stream which is a `ByteStream` instance, it can no longer be cloned
|
||||||
|
/// to be sent to different nodes (`.clone()` will panic)
|
||||||
pub fn with_stream(self, b: ByteStream) -> Self {
|
pub fn with_stream(self, b: ByteStream) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stream: AttachedStream::Stream(b),
|
stream: AttachedStream::Stream(b),
|
||||||
|
@ -97,6 +120,8 @@ impl<M: Message> Req<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add an order tag to this request to indicate in which order it should
|
||||||
|
/// be sent.
|
||||||
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
|
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
|
||||||
Self {
|
Self {
|
||||||
order_tag: Some(order_tag),
|
order_tag: Some(order_tag),
|
||||||
|
@ -104,10 +129,12 @@ impl<M: Message> Req<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the message `M` contained in this request
|
||||||
pub fn msg(&self) -> &M {
|
pub fn msg(&self) -> &M {
|
||||||
&self.msg
|
&self.msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Takes out the stream attached to this request, if any
|
||||||
pub fn take_stream(&mut self) -> Option<ByteStream> {
|
pub fn take_stream(&mut self) -> Option<ByteStream> {
|
||||||
std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
|
std::mem::replace(&mut self.stream, AttachedStream::None).into_stream()
|
||||||
}
|
}
|
||||||
|
@ -142,8 +169,14 @@ impl<M: Message> Req<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// `IntoReq<M>` represents any object that can be transformed into `Req<M>`
|
||||||
pub trait IntoReq<M: Message> {
|
pub trait IntoReq<M: Message> {
|
||||||
|
/// Transform the object into a `Req<M>`, serializing the message M
|
||||||
|
/// to be sent to remote nodes
|
||||||
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>;
|
fn into_req(self) -> Result<Req<M>, rmp_serde::encode::Error>;
|
||||||
|
/// Transform the object into a `Req<M>`, skipping the serialization
|
||||||
|
/// of message M, in the case we are not sending this RPC message to
|
||||||
|
/// a remote node
|
||||||
fn into_req_local(self) -> Req<M>;
|
fn into_req_local(self) -> Req<M>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -220,6 +253,7 @@ pub struct Resp<M: Message> {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<M: Message> Resp<M> {
|
impl<M: Message> Resp<M> {
|
||||||
|
/// Creates a new response from a base response message
|
||||||
pub fn new(v: M::Response) -> Self {
|
pub fn new(v: M::Response) -> Self {
|
||||||
Resp {
|
Resp {
|
||||||
_phantom: Default::default(),
|
_phantom: Default::default(),
|
||||||
|
@ -229,6 +263,8 @@ impl<M: Message> Resp<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach a stream to message in response, where the stream is streamed
|
||||||
|
/// from a fixed `Bytes` buffer
|
||||||
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
|
pub fn with_stream_from_buffer(self, b: Bytes) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stream: AttachedStream::Fixed(b),
|
stream: AttachedStream::Fixed(b),
|
||||||
|
@ -236,6 +272,8 @@ impl<M: Message> Resp<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Attach a stream to message in response, where the stream is
|
||||||
|
/// an instance of `ByteStream`.
|
||||||
pub fn with_stream(self, b: ByteStream) -> Self {
|
pub fn with_stream(self, b: ByteStream) -> Self {
|
||||||
Self {
|
Self {
|
||||||
stream: AttachedStream::Stream(b),
|
stream: AttachedStream::Stream(b),
|
||||||
|
@ -243,6 +281,8 @@ impl<M: Message> Resp<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Add an order tag to this response to indicate in which order it should
|
||||||
|
/// be sent.
|
||||||
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
|
pub fn with_order_tag(self, order_tag: OrderTag) -> Self {
|
||||||
Self {
|
Self {
|
||||||
order_tag: Some(order_tag),
|
order_tag: Some(order_tag),
|
||||||
|
@ -250,14 +290,20 @@ impl<M: Message> Resp<M> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Get a reference to the response message contained in this request
|
||||||
pub fn msg(&self) -> &M::Response {
|
pub fn msg(&self) -> &M::Response {
|
||||||
&self.msg
|
&self.msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transforms the `Resp<M>` into the response message it contains,
|
||||||
|
/// dropping everything else (including attached data stream)
|
||||||
pub fn into_msg(self) -> M::Response {
|
pub fn into_msg(self) -> M::Response {
|
||||||
self.msg
|
self.msg
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transforms the `Resp<M>` into, on the one side, the response message
|
||||||
|
/// it contains, and on the other side, the associated data stream
|
||||||
|
/// if it exists
|
||||||
pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
|
pub fn into_parts(self) -> (M::Response, Option<ByteStream>) {
|
||||||
(self.msg, self.stream.into_stream())
|
(self.msg, self.stream.into_stream())
|
||||||
}
|
}
|
||||||
|
|
|
@ -81,6 +81,7 @@ impl PeerInfoInternal {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Information that the full mesh peering strategy can return about the peers it knows of
|
||||||
#[derive(Copy, Clone, Debug)]
|
#[derive(Copy, Clone, Debug)]
|
||||||
pub struct PeerInfo {
|
pub struct PeerInfo {
|
||||||
/// The node's identifier (its public key)
|
/// The node's identifier (its public key)
|
||||||
|
|
|
@ -9,19 +9,23 @@ use tokio::io::AsyncRead;
|
||||||
|
|
||||||
use crate::bytes_buf::BytesBuf;
|
use crate::bytes_buf::BytesBuf;
|
||||||
|
|
||||||
/// A stream of associated data.
|
/// A stream of bytes (click to read more).
|
||||||
///
|
///
|
||||||
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
|
/// When sent through Netapp, the Vec may be split in smaller chunk in such a way
|
||||||
/// consecutive Vec may get merged, but Vec and error code may not be reordered
|
/// consecutive Vec may get merged, but Vec and error code may not be reordered
|
||||||
///
|
///
|
||||||
/// Error code 255 means the stream was cut before its end. Other codes have no predefined
|
/// Items sent in the ByteStream may be errors of type `std::io::Error`.
|
||||||
/// meaning, it's up to your application to define their semantic.
|
/// An error indicates the end of the ByteStream: a reader should no longer read
|
||||||
|
/// after recieving an error, and a writer should stop writing after sending an error.
|
||||||
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
|
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
|
||||||
|
|
||||||
|
/// A packet sent in a ByteStream, which may contain either
|
||||||
|
/// a Bytes object or an error
|
||||||
pub type Packet = Result<Bytes, std::io::Error>;
|
pub type Packet = Result<Bytes, std::io::Error>;
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
|
/// A helper struct to read defined lengths of data from a BytesStream
|
||||||
pub struct ByteStreamReader {
|
pub struct ByteStreamReader {
|
||||||
stream: ByteStream,
|
stream: ByteStream,
|
||||||
buf: BytesBuf,
|
buf: BytesBuf,
|
||||||
|
@ -30,6 +34,7 @@ pub struct ByteStreamReader {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ByteStreamReader {
|
impl ByteStreamReader {
|
||||||
|
/// Creates a new `ByteStreamReader` from a `ByteStream`
|
||||||
pub fn new(stream: ByteStream) -> Self {
|
pub fn new(stream: ByteStream) -> Self {
|
||||||
ByteStreamReader {
|
ByteStreamReader {
|
||||||
stream,
|
stream,
|
||||||
|
@ -39,6 +44,8 @@ impl ByteStreamReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read exactly `read_len` bytes from the underlying stream
|
||||||
|
/// (returns a future)
|
||||||
pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
|
pub fn read_exact(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
|
||||||
ByteStreamReadExact {
|
ByteStreamReadExact {
|
||||||
reader: self,
|
reader: self,
|
||||||
|
@ -47,6 +54,8 @@ impl ByteStreamReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read at most `read_len` bytes from the underlying stream, or less
|
||||||
|
/// if the end of the stream is reached (returns a future)
|
||||||
pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
|
pub fn read_exact_or_eos(&mut self, read_len: usize) -> ByteStreamReadExact<'_> {
|
||||||
ByteStreamReadExact {
|
ByteStreamReadExact {
|
||||||
reader: self,
|
reader: self,
|
||||||
|
@ -55,10 +64,14 @@ impl ByteStreamReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read exactly one byte from the underlying stream and returns it
|
||||||
|
/// as an u8
|
||||||
pub async fn read_u8(&mut self) -> Result<u8, ReadExactError> {
|
pub async fn read_u8(&mut self) -> Result<u8, ReadExactError> {
|
||||||
Ok(self.read_exact(1).await?[0])
|
Ok(self.read_exact(1).await?[0])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read exactly two bytes from the underlying stream and returns them as an u16 (using
|
||||||
|
/// big-endian decoding)
|
||||||
pub async fn read_u16(&mut self) -> Result<u16, ReadExactError> {
|
pub async fn read_u16(&mut self) -> Result<u16, ReadExactError> {
|
||||||
let bytes = self.read_exact(2).await?;
|
let bytes = self.read_exact(2).await?;
|
||||||
let mut b = [0u8; 2];
|
let mut b = [0u8; 2];
|
||||||
|
@ -66,6 +79,8 @@ impl ByteStreamReader {
|
||||||
Ok(u16::from_be_bytes(b))
|
Ok(u16::from_be_bytes(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Read exactly four bytes from the underlying stream and returns them as an u32 (using
|
||||||
|
/// big-endian decoding)
|
||||||
pub async fn read_u32(&mut self) -> Result<u32, ReadExactError> {
|
pub async fn read_u32(&mut self) -> Result<u32, ReadExactError> {
|
||||||
let bytes = self.read_exact(4).await?;
|
let bytes = self.read_exact(4).await?;
|
||||||
let mut b = [0u8; 4];
|
let mut b = [0u8; 4];
|
||||||
|
@ -73,6 +88,8 @@ impl ByteStreamReader {
|
||||||
Ok(u32::from_be_bytes(b))
|
Ok(u32::from_be_bytes(b))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Transforms the stream reader back into the underlying stream (starting
|
||||||
|
/// after everything that the reader has read)
|
||||||
pub fn into_stream(self) -> ByteStream {
|
pub fn into_stream(self) -> ByteStream {
|
||||||
let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
|
let buf_stream = futures::stream::iter(self.buf.into_slices().into_iter().map(Ok));
|
||||||
if let Some(err) = self.err {
|
if let Some(err) = self.err {
|
||||||
|
@ -84,10 +101,21 @@ impl ByteStreamReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Tries to fill the internal read buffer from the underlying stream.
|
||||||
|
/// Calling this might be necessary to ensure that `.eos()` returns a correct
|
||||||
|
/// result, otherwise the reader might not be aware that the underlying
|
||||||
|
/// stream has nothing left to return.
|
||||||
|
pub async fn fill_buffer(&mut self) {
|
||||||
|
let packet = self.stream.next().await;
|
||||||
|
self.add_stream_next(packet);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Clears the internal read buffer and returns its content
|
||||||
pub fn take_buffer(&mut self) -> Bytes {
|
pub fn take_buffer(&mut self) -> Bytes {
|
||||||
self.buf.take_all()
|
self.buf.take_all()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Returns true if the end of the underlying stream has been reached
|
||||||
pub fn eos(&self) -> bool {
|
pub fn eos(&self) -> bool {
|
||||||
self.buf.is_empty() && self.eos
|
self.buf.is_empty() && self.eos
|
||||||
}
|
}
|
||||||
|
@ -110,18 +138,19 @@ impl ByteStreamReader {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn fill_buffer(&mut self) {
|
|
||||||
let packet = self.stream.next().await;
|
|
||||||
self.add_stream_next(packet);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The error kind that can be returned by `ByteStreamReader::read_exact` and
|
||||||
|
/// `ByteStreamReader::read_exact_or_eos`
|
||||||
pub enum ReadExactError {
|
pub enum ReadExactError {
|
||||||
|
/// The end of the stream was reached before the requested number of bytes could be read
|
||||||
UnexpectedEos,
|
UnexpectedEos,
|
||||||
|
/// The underlying data stream returned an IO error when trying to read
|
||||||
Stream(std::io::Error),
|
Stream(std::io::Error),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// The future returned by `ByteStreamReader::read_exact` and
|
||||||
|
/// `ByteStreamReader::read_exact_or_eos`
|
||||||
#[pin_project::pin_project]
|
#[pin_project::pin_project]
|
||||||
pub struct ByteStreamReadExact<'a> {
|
pub struct ByteStreamReadExact<'a> {
|
||||||
#[pin]
|
#[pin]
|
||||||
|
@ -160,10 +189,12 @@ impl<'a> Future for ByteStreamReadExact<'a> {
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
|
/// Turns a `tokio::io::AsyncRead` asynchronous reader into a `ByteStream`
|
||||||
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
|
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
|
||||||
Box::pin(tokio_util::io::ReaderStream::new(reader))
|
Box::pin(tokio_util::io::ReaderStream::new(reader))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Turns a `ByteStream` into a `tokio::io::AsyncRead` asynchronous reader
|
||||||
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
|
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
|
||||||
tokio_util::io::StreamReader::new(stream)
|
tokio_util::io::StreamReader::new(stream)
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue