use std::fmt; use std::marker::PhantomData; use std::sync::Arc; use bytes::{BufMut, Bytes, BytesMut}; use rand::prelude::*; use serde::{Deserialize, Serialize}; use futures::stream::StreamExt; use crate::error::*; use crate::stream::*; use crate::util::*; /// Priority of a request (click to read more about priorities). /// /// This priority value is used to priorize messages /// in the send queue of the client, and their responses in the send queue of the /// server. Lower values mean higher priority. /// /// This mechanism is usefull for messages bigger than the maximum chunk size /// (set at `0x4000` bytes), such as large file transfers. /// In such case, all of the messages in the send queue with the highest priority /// will take turns to send individual chunks, in a round-robin fashion. /// Once all highest priority messages are sent successfully, the messages with /// the next highest priority will begin being sent in the same way. /// /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) pub const PRIO_SECONDARY: RequestPriority = 0x01; // ---- /// An order tag can be added to a message or a response to indicate /// whether it should be sent after or before other messages with order tags /// referencing a same stream #[derive(Clone, Copy, Serialize, Deserialize, Debug)] pub struct OrderTag(pub(crate) u64, pub(crate) u64); /// A stream is an opaque identifier that defines a set of messages /// or responses that are ordered wrt one another using to order tags. #[derive(Clone, Copy)] pub struct OrderTagStream(u64); impl OrderTag { /// Create a new stream from which to generate order tags. Example: /// ```ignore /// let stream = OrderTag.stream(); /// let tag_1 = stream.order(1); /// let tag_2 = stream.order(2); /// ``` pub fn stream() -> OrderTagStream { OrderTagStream(thread_rng().gen()) } } impl OrderTagStream { /// Create the order tag for message `order` in this stream pub fn order(&self, order: u64) -> OrderTag { OrderTag(self.0, order) } } // ---- /// This trait should be implemented by all messages your application /// wants to handle. It specifies which data type should be sent /// as a response to this message in the RPC protocol. pub trait Message: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { /// The type of the response that is sent in response to this message type Response: Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static; } // ---- /// The Req is a helper object used to create requests and attach them /// a stream of data. If the stream is a fixed Bytes and not a ByteStream, /// Req is cheaply clonable to allow the request to be sent to different /// peers (Clone will panic if the stream is a ByteStream). pub struct Req { pub(crate) msg: Arc, pub(crate) msg_ser: Option, pub(crate) stream: AttachedStream, pub(crate) order_tag: Option, } impl Req { /// Creates a new request from a base message `M` pub fn new(v: M) -> Result { Ok(v.into_req()?) } /// Attach a stream to message in request, where the stream is streamed /// from a fixed `Bytes` buffer pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { stream: AttachedStream::Fixed(b), ..self } } /// Attach a stream to message in request, where the stream is /// an instance of `ByteStream`. Note than when a `Req` has an attached /// stream which is a `ByteStream` instance, it can no longer be cloned /// to be sent to different nodes (`.clone()` will panic) pub fn with_stream(self, b: ByteStream) -> Self { Self { stream: AttachedStream::Stream(b), ..self } } /// Add an order tag to this request to indicate in which order it should /// be sent. pub fn with_order_tag(self, order_tag: OrderTag) -> Self { Self { order_tag: Some(order_tag), ..self } } /// Get a reference to the message `M` contained in this request pub fn msg(&self) -> &M { &self.msg } /// Takes out the stream attached to this request, if any pub fn take_stream(&mut self) -> Option { std::mem::replace(&mut self.stream, AttachedStream::None).into_stream() } pub(crate) fn into_enc( self, prio: RequestPriority, path: Bytes, telemetry_id: Bytes, ) -> ReqEnc { ReqEnc { prio, path, telemetry_id, msg: self.msg_ser.unwrap(), stream: self.stream.into_stream(), order_tag: self.order_tag, } } pub(crate) fn from_enc(enc: ReqEnc) -> Result { let msg = rmp_serde::decode::from_read_ref(&enc.msg)?; Ok(Req { msg: Arc::new(msg), msg_ser: Some(enc.msg), stream: enc .stream .map(AttachedStream::Stream) .unwrap_or(AttachedStream::None), order_tag: enc.order_tag, }) } } /// `IntoReq` represents any object that can be transformed into `Req` pub trait IntoReq { /// Transform the object into a `Req`, serializing the message M /// to be sent to remote nodes fn into_req(self) -> Result, rmp_serde::encode::Error>; /// Transform the object into a `Req`, skipping the serialization /// of message M, in the case we are not sending this RPC message to /// a remote node fn into_req_local(self) -> Req; } impl IntoReq for M { fn into_req(self) -> Result, rmp_serde::encode::Error> { let msg_ser = rmp_to_vec_all_named(&self)?; Ok(Req { msg: Arc::new(self), msg_ser: Some(Bytes::from(msg_ser)), stream: AttachedStream::None, order_tag: None, }) } fn into_req_local(self) -> Req { Req { msg: Arc::new(self), msg_ser: None, stream: AttachedStream::None, order_tag: None, } } } impl IntoReq for Req { fn into_req(self) -> Result, rmp_serde::encode::Error> { Ok(self) } fn into_req_local(self) -> Req { self } } impl Clone for Req { fn clone(&self) -> Self { let stream = match &self.stream { AttachedStream::None => AttachedStream::None, AttachedStream::Fixed(b) => AttachedStream::Fixed(b.clone()), AttachedStream::Stream(_) => { panic!("Cannot clone a Req<_> with a non-buffer attached stream") } }; Self { msg: self.msg.clone(), msg_ser: self.msg_ser.clone(), stream, order_tag: self.order_tag, } } } impl fmt::Debug for Req where M: Message + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { write!(f, "Req[{:?}", self.msg)?; match &self.stream { AttachedStream::None => write!(f, "]"), AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()), AttachedStream::Stream(_) => write!(f, "; stream]"), } } } // ---- /// The Resp represents a full response from a RPC that may have /// an attached stream. pub struct Resp { pub(crate) _phantom: PhantomData, pub(crate) msg: M::Response, pub(crate) stream: AttachedStream, pub(crate) order_tag: Option, } impl Resp { /// Creates a new response from a base response message pub fn new(v: M::Response) -> Self { Resp { _phantom: Default::default(), msg: v, stream: AttachedStream::None, order_tag: None, } } /// Attach a stream to message in response, where the stream is streamed /// from a fixed `Bytes` buffer pub fn with_stream_from_buffer(self, b: Bytes) -> Self { Self { stream: AttachedStream::Fixed(b), ..self } } /// Attach a stream to message in response, where the stream is /// an instance of `ByteStream`. pub fn with_stream(self, b: ByteStream) -> Self { Self { stream: AttachedStream::Stream(b), ..self } } /// Add an order tag to this response to indicate in which order it should /// be sent. pub fn with_order_tag(self, order_tag: OrderTag) -> Self { Self { order_tag: Some(order_tag), ..self } } /// Get a reference to the response message contained in this request pub fn msg(&self) -> &M::Response { &self.msg } /// Transforms the `Resp` into the response message it contains, /// dropping everything else (including attached data stream) pub fn into_msg(self) -> M::Response { self.msg } /// Transforms the `Resp` into, on the one side, the response message /// it contains, and on the other side, the associated data stream /// if it exists pub fn into_parts(self) -> (M::Response, Option) { (self.msg, self.stream.into_stream()) } pub(crate) fn into_enc(self) -> Result { Ok(RespEnc { msg: rmp_to_vec_all_named(&self.msg)?.into(), stream: self.stream.into_stream(), order_tag: self.order_tag, }) } pub(crate) fn from_enc(enc: RespEnc) -> Result { let msg = rmp_serde::decode::from_read_ref(&enc.msg)?; Ok(Self { _phantom: Default::default(), msg, stream: enc .stream .map(AttachedStream::Stream) .unwrap_or(AttachedStream::None), order_tag: enc.order_tag, }) } } impl fmt::Debug for Resp where M: Message, ::Response: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { write!(f, "Resp[{:?}", self.msg)?; match &self.stream { AttachedStream::None => write!(f, "]"), AttachedStream::Fixed(b) => write!(f, "; stream=buf:{}]", b.len()), AttachedStream::Stream(_) => write!(f, "; stream]"), } } } // ---- pub(crate) enum AttachedStream { None, Fixed(Bytes), Stream(ByteStream), } impl AttachedStream { pub fn into_stream(self) -> Option { match self { AttachedStream::None => None, AttachedStream::Fixed(b) => Some(Box::pin(futures::stream::once(async move { Ok(b) }))), AttachedStream::Stream(s) => Some(s), } } } // ---- ---- /// Encoding for requests into a ByteStream: /// - priority: u8 /// - path length: u8 /// - path: [u8; path length] /// - telemetry id length: u8 /// - telemetry id: [u8; telemetry id length] /// - msg len: u32 /// - msg [u8; ..] /// - the attached stream as the rest of the encoded stream pub(crate) struct ReqEnc { pub(crate) prio: RequestPriority, pub(crate) path: Bytes, pub(crate) telemetry_id: Bytes, pub(crate) msg: Bytes, pub(crate) stream: Option, pub(crate) order_tag: Option, } impl ReqEnc { pub(crate) fn encode(self) -> (ByteStream, Option) { let mut buf = BytesMut::with_capacity( self.path.len() + self.telemetry_id.len() + self.msg.len() + 16, ); buf.put_u8(self.prio); buf.put_u8(self.path.len() as u8); buf.put(self.path); buf.put_u8(self.telemetry_id.len() as u8); buf.put(&self.telemetry_id[..]); buf.put_u32(self.msg.len() as u32); let header = buf.freeze(); let res_stream: ByteStream = if let Some(stream) = self.stream { Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)]).chain(stream)) } else { Box::pin(futures::stream::iter([Ok(header), Ok(self.msg)])) }; (res_stream, self.order_tag) } pub(crate) async fn decode(stream: ByteStream) -> Result { Self::decode_aux(stream) .await .map_err(read_exact_error_to_error) } async fn decode_aux(stream: ByteStream) -> Result { let mut reader = ByteStreamReader::new(stream); let prio = reader.read_u8().await?; let path_len = reader.read_u8().await?; let path = reader.read_exact(path_len as usize).await?; let telemetry_id_len = reader.read_u8().await?; let telemetry_id = reader.read_exact(telemetry_id_len as usize).await?; let msg_len = reader.read_u32().await?; let msg = reader.read_exact(msg_len as usize).await?; Ok(Self { prio, path, telemetry_id, msg, stream: Some(reader.into_stream()), order_tag: None, }) } } /// Encoding for responses into a ByteStream: /// IF SUCCESS: /// - 0: u8 /// - msg len: u32 /// - msg [u8; ..] /// - the attached stream as the rest of the encoded stream /// IF ERROR: /// - message length + 1: u8 /// - error code: u8 /// - message: [u8; message_length] pub(crate) struct RespEnc { msg: Bytes, stream: Option, order_tag: Option, } impl RespEnc { pub(crate) fn encode(resp: Result) -> (ByteStream, Option) { match resp { Ok(Self { msg, stream, order_tag, }) => { let mut buf = BytesMut::with_capacity(4); buf.put_u32(msg.len() as u32); let header = buf.freeze(); let res_stream: ByteStream = if let Some(stream) = stream { Box::pin(futures::stream::iter([Ok(header), Ok(msg)]).chain(stream)) } else { Box::pin(futures::stream::iter([Ok(header), Ok(msg)])) }; (res_stream, order_tag) } Err(err) => { let err = std::io::Error::new( std::io::ErrorKind::Other, format!("netapp error: {}", err), ); ( Box::pin(futures::stream::once(async move { Err(err) })), None, ) } } } pub(crate) async fn decode(stream: ByteStream) -> Result { Self::decode_aux(stream) .await .map_err(read_exact_error_to_error) } async fn decode_aux(stream: ByteStream) -> Result { let mut reader = ByteStreamReader::new(stream); let msg_len = reader.read_u32().await?; let msg = reader.read_exact(msg_len as usize).await?; // Check whether the response stream still has data or not. // If no more data is coming, this will defuse the request canceller. // If we didn't do this, and the client doesn't try to read from the stream, // the request canceller doesn't know that we read everything and // sends a cancellation message to the server (which they don't care about). reader.fill_buffer().await; Ok(Self { msg, stream: Some(reader.into_stream()), order_tag: None, }) } } fn read_exact_error_to_error(e: ReadExactError) -> Error { match e { ReadExactError::Stream(err) => Error::Remote(err.kind(), err.to_string()), ReadExactError::UnexpectedEos => Error::Framing, } }