netapp/src/endpoint.rs

202 lines
5.2 KiB
Rust
Raw Normal View History

2021-10-12 15:59:46 +00:00
use std::marker::PhantomData;
use std::sync::Arc;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use crate::error::Error;
2022-07-21 15:34:53 +00:00
use crate::message::*;
2021-10-12 15:59:46 +00:00
use crate::netapp::*;
2021-10-14 09:35:05 +00:00
/// This trait should be implemented by an object of your application
/// that can handle a message of type `M`, if it wishes to handle
/// streams attached to the request and/or to send back streams
/// attached to the response..
2021-10-14 09:35:05 +00:00
///
/// The handler object should be in an Arc, see `Endpoint::set_handler`
2021-10-12 15:59:46 +00:00
#[async_trait]
pub trait StreamingEndpointHandler<M>: Send + Sync
2021-10-12 15:59:46 +00:00
where
M: Message,
{
2022-07-21 18:22:56 +00:00
async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
2021-10-12 15:59:46 +00:00
}
2021-10-14 09:35:05 +00:00
/// If one simply wants to use an endpoint in a client fashion,
/// without locally serving requests to that endpoint,
/// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request.
#[async_trait]
impl<M: Message> EndpointHandler<M> for () {
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
2021-10-14 09:35:05 +00:00
panic!("This endpoint should not have a local handler.");
}
}
// ----
2022-07-22 11:27:56 +00:00
/// This trait should be implemented by an object of your application
/// that can handle a message of type `M`, in the cases where it doesn't
/// care about attached stream in the request nor in the response.
#[async_trait]
pub trait EndpointHandler<M>: Send + Sync
where
M: Message,
{
async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> <M as Message>::Response;
}
#[async_trait]
impl<T, M> StreamingEndpointHandler<M> for T
where
T: EndpointHandler<M>,
M: Message,
{
2022-07-22 11:32:08 +00:00
async fn handle(self: &Arc<Self>, mut m: Req<M>, from: NodeID) -> Resp<M> {
// Immediately drop stream to avoid backpressure if a stream was sent
// (this will make all data sent to the stream be ignored immediately)
drop(m.take_stream());
Resp::new(EndpointHandler::handle(self, m.msg(), from).await)
}
}
// ----
2021-10-14 09:35:05 +00:00
/// This struct represents an endpoint for message of type `M`.
///
/// Creating a new endpoint is done by calling `NetApp::endpoint`.
/// An endpoint is identified primarily by its path, which is specified
/// at creation time.
///
/// An `Endpoint` is used both to send requests to remote nodes,
/// and to specify the handler for such requests on the local node.
/// The type `H` represents the type of the handler object for
/// endpoint messages (see `StreamingEndpointHandler`).
2021-10-12 15:59:46 +00:00
pub struct Endpoint<M, H>
where
M: Message,
H: StreamingEndpointHandler<M>,
2021-10-12 15:59:46 +00:00
{
_phantom: PhantomData<M>,
2021-10-12 15:59:46 +00:00
netapp: Arc<NetApp>,
path: String,
handler: ArcSwapOption<H>,
}
impl<M, H> Endpoint<M, H>
where
M: Message,
H: StreamingEndpointHandler<M>,
2021-10-12 15:59:46 +00:00
{
pub(crate) fn new(netapp: Arc<NetApp>, path: String) -> Self {
Self {
_phantom: PhantomData::default(),
2021-10-12 15:59:46 +00:00
netapp,
path,
handler: ArcSwapOption::from(None),
}
}
2021-10-14 09:35:05 +00:00
2022-02-16 12:00:26 +00:00
/// Get the path of this endpoint
pub fn path(&self) -> &str {
&self.path
}
2021-10-14 09:35:05 +00:00
/// Set the object that is responsible of handling requests to
/// this endpoint on the local node.
2021-10-12 15:59:46 +00:00
pub fn set_handler(&self, h: Arc<H>) {
self.handler.swap(Some(h));
}
2021-10-14 09:35:05 +00:00
/// Call this endpoint on a remote node (or on the local node,
/// for that matter). This function invokes the full version that
/// allows to attach a stream to the request and to
/// receive such a stream attached to the response.
pub async fn call_streaming<T>(
2021-10-12 15:59:46 +00:00
&self,
target: &NodeID,
2022-07-21 18:22:56 +00:00
req: T,
2021-10-12 15:59:46 +00:00
prio: RequestPriority,
2022-07-21 18:22:56 +00:00
) -> Result<Resp<M>, Error>
where
T: IntoReq<M>,
{
2021-10-12 15:59:46 +00:00
if *target == self.netapp.id {
match self.handler.load_full() {
None => Err(Error::NoHandler),
2022-07-21 18:22:56 +00:00
Some(h) => Ok(h.handle(req.into_req_local(), self.netapp.id).await),
2021-10-12 15:59:46 +00:00
}
} else {
let conn = self
.netapp
.client_conns
.read()
.unwrap()
.get(target)
.cloned();
match conn {
None => Err(Error::Message(format!(
"Not connected: {}",
2022-02-21 15:57:07 +00:00
hex::encode(&target[..8])
2021-10-12 15:59:46 +00:00
))),
2022-07-21 18:22:56 +00:00
Some(c) => c.call(req.into_req()?, self.path.as_str(), prio).await,
2021-10-12 15:59:46 +00:00
}
}
}
2022-07-21 18:22:56 +00:00
/// Call this endpoint on a remote node. This function is the simplified
/// version that doesn't allow to have streams attached to the request
/// or the response; see `call_streaming` for the full version.
2022-07-21 18:22:56 +00:00
pub async fn call(
&self,
target: &NodeID,
req: M,
prio: RequestPriority,
) -> Result<<M as Message>::Response, Error> {
Ok(self.call_streaming(target, req, prio).await?.into_msg())
2022-07-21 18:22:56 +00:00
}
2021-10-12 15:59:46 +00:00
}
2021-10-14 09:35:05 +00:00
// ---- Internal stuff ----
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
2021-10-12 15:59:46 +00:00
#[async_trait]
pub(crate) trait GenericEndpoint {
2022-07-22 10:45:38 +00:00
async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>;
fn drop_handler(&self);
2021-10-12 15:59:46 +00:00
fn clone_endpoint(&self) -> DynEndpoint;
}
#[derive(Clone)]
pub(crate) struct EndpointArc<M, H>(pub(crate) Arc<Endpoint<M, H>>)
where
M: Message,
H: StreamingEndpointHandler<M>;
2021-10-12 15:59:46 +00:00
#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H>
where
M: Message,
H: StreamingEndpointHandler<M> + 'static,
2021-10-12 15:59:46 +00:00
{
2022-07-22 10:45:38 +00:00
async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> {
2021-10-12 15:59:46 +00:00
match self.0.handler.load_full() {
None => Err(Error::NoHandler),
Some(h) => {
2022-07-22 10:45:38 +00:00
let req = Req::from_enc(req_enc)?;
let res = h.handle(req, from).await;
2022-07-22 10:45:38 +00:00
Ok(res.into_enc()?)
2021-10-12 15:59:46 +00:00
}
}
}
fn drop_handler(&self) {
2021-10-12 15:59:46 +00:00
self.0.handler.swap(None);
}
fn clone_endpoint(&self) -> DynEndpoint {
Box::new(Self(self.0.clone()))
}
}