forked from Deuxfleurs/garage
Merge pull request 'remove uses of #[async_trait]' (#952) from remove-async-trait into main
Reviewed-on: Deuxfleurs/garage#952
This commit is contained in:
commit
d3226bfa91
24 changed files with 40 additions and 84 deletions
4
Cargo.lock
generated
4
Cargo.lock
generated
|
@ -1301,7 +1301,6 @@ dependencies = [
|
||||||
name = "garage_api_common"
|
name = "garage_api_common"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
"crypto-common",
|
"crypto-common",
|
||||||
|
@ -1332,7 +1331,6 @@ dependencies = [
|
||||||
name = "garage_api_k2v"
|
name = "garage_api_k2v"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
|
||||||
"base64 0.21.7",
|
"base64 0.21.7",
|
||||||
"err-derive",
|
"err-derive",
|
||||||
"futures",
|
"futures",
|
||||||
|
@ -1358,7 +1356,6 @@ version = "1.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"aes-gcm",
|
"aes-gcm",
|
||||||
"async-compression",
|
"async-compression",
|
||||||
"async-trait",
|
|
||||||
"base64 0.21.7",
|
"base64 0.21.7",
|
||||||
"bytes",
|
"bytes",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
@ -1468,7 +1465,6 @@ name = "garage_net"
|
||||||
version = "1.0.1"
|
version = "1.0.1"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
|
||||||
"bytes",
|
"bytes",
|
||||||
"cfg-if",
|
"cfg-if",
|
||||||
"err-derive",
|
"err-derive",
|
||||||
|
|
|
@ -2,7 +2,6 @@ use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use argon2::password_hash::PasswordHash;
|
use argon2::password_hash::PasswordHash;
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
|
use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW};
|
||||||
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
|
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
|
||||||
|
@ -221,7 +220,6 @@ impl AdminApiServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ApiHandler for AdminApiServer {
|
impl ApiHandler for AdminApiServer {
|
||||||
const API_NAME: &'static str = "admin";
|
const API_NAME: &'static str = "admin";
|
||||||
const API_NAME_DISPLAY: &'static str = "Admin";
|
const API_NAME_DISPLAY: &'static str = "Admin";
|
||||||
|
|
|
@ -18,7 +18,6 @@ garage_model.workspace = true
|
||||||
garage_table.workspace = true
|
garage_table.workspace = true
|
||||||
garage_util.workspace = true
|
garage_util.workspace = true
|
||||||
|
|
||||||
async-trait.workspace = true
|
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
crypto-common.workspace = true
|
crypto-common.workspace = true
|
||||||
|
|
|
@ -4,8 +4,6 @@ use std::os::unix::fs::PermissionsExt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use futures::future::Future;
|
use futures::future::Future;
|
||||||
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
|
use futures::stream::{futures_unordered::FuturesUnordered, StreamExt};
|
||||||
|
|
||||||
|
@ -47,7 +45,6 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static {
|
||||||
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
|
fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait ApiHandler: Send + Sync + 'static {
|
pub trait ApiHandler: Send + Sync + 'static {
|
||||||
const API_NAME: &'static str;
|
const API_NAME: &'static str;
|
||||||
const API_NAME_DISPLAY: &'static str;
|
const API_NAME_DISPLAY: &'static str;
|
||||||
|
@ -56,11 +53,11 @@ pub trait ApiHandler: Send + Sync + 'static {
|
||||||
type Error: ApiError;
|
type Error: ApiError;
|
||||||
|
|
||||||
fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>;
|
fn parse_endpoint(&self, r: &Request<IncomingBody>) -> Result<Self::Endpoint, Self::Error>;
|
||||||
async fn handle(
|
fn handle(
|
||||||
&self,
|
&self,
|
||||||
req: Request<IncomingBody>,
|
req: Request<IncomingBody>,
|
||||||
endpoint: Self::Endpoint,
|
endpoint: Self::Endpoint,
|
||||||
) -> Result<Response<BoxBody<Self::Error>>, Self::Error>;
|
) -> impl Future<Output = Result<Response<BoxBody<Self::Error>>, Self::Error>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ApiServer<A: ApiHandler> {
|
pub struct ApiServer<A: ApiHandler> {
|
||||||
|
@ -248,13 +245,11 @@ impl<A: ApiHandler> ApiServer<A> {
|
||||||
|
|
||||||
// ==== helper functions ====
|
// ==== helper functions ====
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait Accept: Send + Sync + 'static {
|
pub trait Accept: Send + Sync + 'static {
|
||||||
type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
|
type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static;
|
||||||
async fn accept(&self) -> std::io::Result<(Self::Stream, String)>;
|
fn accept(&self) -> impl Future<Output = std::io::Result<(Self::Stream, String)>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Accept for TcpListener {
|
impl Accept for TcpListener {
|
||||||
type Stream = TcpStream;
|
type Stream = TcpStream;
|
||||||
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
|
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
|
||||||
|
@ -266,7 +261,6 @@ impl Accept for TcpListener {
|
||||||
|
|
||||||
pub struct UnixListenerOn(pub UnixListener, pub String);
|
pub struct UnixListenerOn(pub UnixListener, pub String);
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Accept for UnixListenerOn {
|
impl Accept for UnixListenerOn {
|
||||||
type Stream = UnixStream;
|
type Stream = UnixStream;
|
||||||
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
|
async fn accept(&self) -> std::io::Result<(Self::Stream, String)> {
|
||||||
|
|
|
@ -19,7 +19,6 @@ garage_table.workspace = true
|
||||||
garage_util = { workspace = true, features = [ "k2v" ] }
|
garage_util = { workspace = true, features = [ "k2v" ] }
|
||||||
garage_api_common.workspace = true
|
garage_api_common.workspace = true
|
||||||
|
|
||||||
async-trait.workspace = true
|
|
||||||
base64.workspace = true
|
base64.workspace = true
|
||||||
err-derive.workspace = true
|
err-derive.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
|
use hyper::{body::Incoming as IncomingBody, Method, Request, Response};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
@ -48,7 +46,6 @@ impl K2VApiServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ApiHandler for K2VApiServer {
|
impl ApiHandler for K2VApiServer {
|
||||||
const API_NAME: &'static str = "k2v";
|
const API_NAME: &'static str = "k2v";
|
||||||
const API_NAME_DISPLAY: &'static str = "K2V";
|
const API_NAME_DISPLAY: &'static str = "K2V";
|
||||||
|
|
|
@ -24,7 +24,6 @@ garage_api_common.workspace = true
|
||||||
|
|
||||||
aes-gcm.workspace = true
|
aes-gcm.workspace = true
|
||||||
async-compression.workspace = true
|
async-compression.workspace = true
|
||||||
async-trait.workspace = true
|
|
||||||
base64.workspace = true
|
base64.workspace = true
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
chrono.workspace = true
|
chrono.workspace = true
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use hyper::header;
|
use hyper::header;
|
||||||
use hyper::{body::Incoming as IncomingBody, Request, Response};
|
use hyper::{body::Incoming as IncomingBody, Request, Response};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
@ -70,7 +68,6 @@ impl S3ApiServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ApiHandler for S3ApiServer {
|
impl ApiHandler for S3ApiServer {
|
||||||
const API_NAME: &'static str = "s3";
|
const API_NAME: &'static str = "s3";
|
||||||
const API_NAME_DISPLAY: &'static str = "S3";
|
const API_NAME_DISPLAY: &'static str = "S3";
|
||||||
|
|
|
@ -4,7 +4,6 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use arc_swap::{ArcSwap, ArcSwapOption};
|
use arc_swap::{ArcSwap, ArcSwapOption};
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -688,7 +687,6 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl StreamingEndpointHandler<BlockRpc> for BlockManager {
|
impl StreamingEndpointHandler<BlockRpc> for BlockManager {
|
||||||
async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
|
async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
|
||||||
match message.msg() {
|
match message.msg() {
|
||||||
|
|
|
@ -4,9 +4,11 @@ mod key;
|
||||||
|
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::fmt::Write;
|
use std::fmt::Write;
|
||||||
|
use std::future::Future;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use futures::future::FutureExt;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use format_table::format_table_to_string;
|
use format_table::format_table_to_string;
|
||||||
|
@ -505,22 +507,25 @@ impl AdminRpcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
||||||
async fn handle(
|
fn handle(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
message: &AdminRpc,
|
message: &AdminRpc,
|
||||||
_from: NodeID,
|
_from: NodeID,
|
||||||
) -> Result<AdminRpc, Error> {
|
) -> impl Future<Output = Result<AdminRpc, Error>> + Send {
|
||||||
match message {
|
let self2 = self.clone();
|
||||||
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
|
async move {
|
||||||
AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
|
match message {
|
||||||
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
|
AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
|
||||||
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
|
AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
|
||||||
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
|
AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await,
|
||||||
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await,
|
||||||
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await,
|
||||||
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await,
|
||||||
|
AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await,
|
||||||
|
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::future::Future;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -93,17 +94,16 @@ pub async fn launch_online_repair(
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
trait TableRepair: Send + Sync + 'static {
|
trait TableRepair: Send + Sync + 'static {
|
||||||
type T: TableSchema;
|
type T: TableSchema;
|
||||||
|
|
||||||
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
|
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
|
||||||
|
|
||||||
async fn process(
|
fn process(
|
||||||
&mut self,
|
&mut self,
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
entry: <<Self as TableRepair>::T as TableSchema>::E,
|
entry: <<Self as TableRepair>::T as TableSchema>::E,
|
||||||
) -> Result<bool, Error>;
|
) -> impl Future<Output = Result<bool, Error>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
struct TableRepairWorker<T: TableRepair> {
|
struct TableRepairWorker<T: TableRepair> {
|
||||||
|
@ -174,7 +174,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
|
||||||
|
|
||||||
struct RepairVersions;
|
struct RepairVersions;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl TableRepair for RepairVersions {
|
impl TableRepair for RepairVersions {
|
||||||
type T = VersionTable;
|
type T = VersionTable;
|
||||||
|
|
||||||
|
@ -221,7 +220,6 @@ impl TableRepair for RepairVersions {
|
||||||
|
|
||||||
struct RepairBlockRefs;
|
struct RepairBlockRefs;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl TableRepair for RepairBlockRefs {
|
impl TableRepair for RepairBlockRefs {
|
||||||
type T = BlockRefTable;
|
type T = BlockRefTable;
|
||||||
|
|
||||||
|
@ -257,7 +255,6 @@ impl TableRepair for RepairBlockRefs {
|
||||||
|
|
||||||
struct RepairMpu;
|
struct RepairMpu;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl TableRepair for RepairMpu {
|
impl TableRepair for RepairMpu {
|
||||||
type T = MultipartUploadTable;
|
type T = MultipartUploadTable;
|
||||||
|
|
||||||
|
|
|
@ -10,7 +10,6 @@ use std::convert::TryInto;
|
||||||
use std::sync::{Arc, Mutex, MutexGuard};
|
use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use futures::stream::FuturesUnordered;
|
use futures::stream::FuturesUnordered;
|
||||||
use futures::StreamExt;
|
use futures::StreamExt;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -537,7 +536,6 @@ impl K2VRpcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
|
impl EndpointHandler<K2VRpc> for K2VRpcHandler {
|
||||||
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
|
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
|
||||||
match message {
|
match message {
|
||||||
|
|
|
@ -30,7 +30,6 @@ rand.workspace = true
|
||||||
|
|
||||||
log.workspace = true
|
log.workspace = true
|
||||||
arc-swap.workspace = true
|
arc-swap.workspace = true
|
||||||
async-trait.workspace = true
|
|
||||||
err-derive.workspace = true
|
err-derive.workspace = true
|
||||||
bytes.workspace = true
|
bytes.workspace = true
|
||||||
cfg-if.workspace = true
|
cfg-if.workspace = true
|
||||||
|
|
|
@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex};
|
||||||
use std::task::Poll;
|
use std::task::Poll;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::{debug, error, trace};
|
use log::{debug, error, trace};
|
||||||
|
|
||||||
|
@ -220,7 +219,6 @@ impl ClientConn {
|
||||||
|
|
||||||
impl SendLoop for ClientConn {}
|
impl SendLoop for ClientConn {}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl RecvLoop for ClientConn {
|
impl RecvLoop for ClientConn {
|
||||||
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
|
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
|
||||||
trace!("ClientConn recv_handler {}", id);
|
trace!("ClientConn recv_handler {}", id);
|
||||||
|
|
|
@ -1,8 +1,9 @@
|
||||||
|
use std::future::Future;
|
||||||
use std::marker::PhantomData;
|
use std::marker::PhantomData;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use async_trait::async_trait;
|
use futures::future::{BoxFuture, FutureExt};
|
||||||
|
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::message::*;
|
use crate::message::*;
|
||||||
|
@ -14,19 +15,17 @@ use crate::netapp::*;
|
||||||
/// attached to the response..
|
/// attached to the response..
|
||||||
///
|
///
|
||||||
/// The handler object should be in an Arc, see `Endpoint::set_handler`
|
/// The handler object should be in an Arc, see `Endpoint::set_handler`
|
||||||
#[async_trait]
|
|
||||||
pub trait StreamingEndpointHandler<M>: Send + Sync
|
pub trait StreamingEndpointHandler<M>: Send + Sync
|
||||||
where
|
where
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>;
|
fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
/// If one simply wants to use an endpoint in a client fashion,
|
/// If one simply wants to use an endpoint in a client fashion,
|
||||||
/// without locally serving requests to that endpoint,
|
/// without locally serving requests to that endpoint,
|
||||||
/// use the unit type `()` as the handler type:
|
/// use the unit type `()` as the handler type:
|
||||||
/// it will panic if it is ever made to handle request.
|
/// it will panic if it is ever made to handle request.
|
||||||
#[async_trait]
|
|
||||||
impl<M: Message> EndpointHandler<M> for () {
|
impl<M: Message> EndpointHandler<M> for () {
|
||||||
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
|
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
|
||||||
panic!("This endpoint should not have a local handler.");
|
panic!("This endpoint should not have a local handler.");
|
||||||
|
@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () {
|
||||||
/// This trait should be implemented by an object of your application
|
/// This trait should be implemented by an object of your application
|
||||||
/// that can handle a message of type `M`, in the cases where it doesn't
|
/// that can handle a message of type `M`, in the cases where it doesn't
|
||||||
/// care about attached stream in the request nor in the response.
|
/// care about attached stream in the request nor in the response.
|
||||||
#[async_trait]
|
|
||||||
pub trait EndpointHandler<M>: Send + Sync
|
pub trait EndpointHandler<M>: Send + Sync
|
||||||
where
|
where
|
||||||
M: Message,
|
M: Message,
|
||||||
{
|
{
|
||||||
async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response;
|
fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T, M> StreamingEndpointHandler<M> for T
|
impl<T, M> StreamingEndpointHandler<M> for T
|
||||||
where
|
where
|
||||||
T: EndpointHandler<M>,
|
T: EndpointHandler<M>,
|
||||||
|
@ -161,9 +158,8 @@ where
|
||||||
|
|
||||||
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
|
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub(crate) trait GenericEndpoint {
|
pub(crate) trait GenericEndpoint {
|
||||||
async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>;
|
fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>;
|
||||||
fn drop_handler(&self);
|
fn drop_handler(&self);
|
||||||
fn clone_endpoint(&self) -> DynEndpoint;
|
fn clone_endpoint(&self) -> DynEndpoint;
|
||||||
}
|
}
|
||||||
|
@ -174,21 +170,23 @@ where
|
||||||
M: Message,
|
M: Message,
|
||||||
H: StreamingEndpointHandler<M>;
|
H: StreamingEndpointHandler<M>;
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<M, H> GenericEndpoint for EndpointArc<M, H>
|
impl<M, H> GenericEndpoint for EndpointArc<M, H>
|
||||||
where
|
where
|
||||||
M: Message,
|
M: Message,
|
||||||
H: StreamingEndpointHandler<M> + 'static,
|
H: StreamingEndpointHandler<M> + 'static,
|
||||||
{
|
{
|
||||||
async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> {
|
fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> {
|
||||||
match self.0.handler.load_full() {
|
async move {
|
||||||
None => Err(Error::NoHandler),
|
match self.0.handler.load_full() {
|
||||||
Some(h) => {
|
None => Err(Error::NoHandler),
|
||||||
let req = Req::from_enc(req_enc)?;
|
Some(h) => {
|
||||||
let res = h.handle(req, from).await;
|
let req = Req::from_enc(req_enc)?;
|
||||||
Ok(res.into_enc()?)
|
let res = h.handle(req, from).await;
|
||||||
|
Ok(res.into_enc()?)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn drop_handler(&self) {
|
fn drop_handler(&self) {
|
||||||
|
|
|
@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use async_trait::async_trait;
|
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sodiumoxide::crypto::auth;
|
use sodiumoxide::crypto::auth;
|
||||||
|
@ -457,7 +456,6 @@ impl NetApp {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl EndpointHandler<HelloMessage> for NetApp {
|
impl EndpointHandler<HelloMessage> for NetApp {
|
||||||
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
|
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
|
||||||
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);
|
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);
|
||||||
|
|
|
@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
use async_trait::async_trait;
|
|
||||||
use log::{debug, info, trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
@ -592,7 +591,6 @@ impl PeeringManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl EndpointHandler<PingMessage> for PeeringManager {
|
impl EndpointHandler<PingMessage> for PeeringManager {
|
||||||
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
|
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
|
||||||
let ping_resp = PingMessage {
|
let ping_resp = PingMessage {
|
||||||
|
@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl EndpointHandler<PeerListMessage> for PeeringManager {
|
impl EndpointHandler<PeerListMessage> for PeeringManager {
|
||||||
async fn handle(
|
async fn handle(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
|
@ -50,7 +49,6 @@ impl Drop for Sender {
|
||||||
/// according to the protocol defined above: chunks of message in progress of being
|
/// according to the protocol defined above: chunks of message in progress of being
|
||||||
/// received are stored in a buffer, and when the last chunk of a message is received,
|
/// received are stored in a buffer, and when the last chunk of a message is received,
|
||||||
/// the full message is passed to the receive handler.
|
/// the full message is passed to the receive handler.
|
||||||
#[async_trait]
|
|
||||||
pub(crate) trait RecvLoop: Sync + 'static {
|
pub(crate) trait RecvLoop: Sync + 'static {
|
||||||
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
|
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
|
||||||
fn cancel_handler(self: &Arc<Self>, _id: RequestID) {}
|
fn cancel_handler(self: &Arc<Self>, _id: RequestID) {}
|
||||||
|
|
|
@ -3,7 +3,6 @@ use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
|
@ -273,7 +272,6 @@ impl DataFrame {
|
||||||
///
|
///
|
||||||
/// The `.send_loop()` exits when the sending end of the channel is closed,
|
/// The `.send_loop()` exits when the sending end of the channel is closed,
|
||||||
/// or if there is an error at any time writing to the async writer.
|
/// or if there is an error at any time writing to the async writer.
|
||||||
#[async_trait]
|
|
||||||
pub(crate) trait SendLoop: Sync {
|
pub(crate) trait SendLoop: Sync {
|
||||||
async fn send_loop<W>(
|
async fn send_loop<W>(
|
||||||
self: Arc<Self>,
|
self: Arc<Self>,
|
||||||
|
|
|
@ -3,7 +3,6 @@ use std::net::SocketAddr;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::{Arc, Mutex};
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use async_trait::async_trait;
|
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
use futures::io::{AsyncReadExt, AsyncWriteExt};
|
use futures::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
|
@ -174,7 +173,6 @@ impl ServerConn {
|
||||||
|
|
||||||
impl SendLoop for ServerConn {}
|
impl SendLoop for ServerConn {}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl RecvLoop for ServerConn {
|
impl RecvLoop for ServerConn {
|
||||||
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
|
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
|
||||||
let resp_send = match self.resp_send.load_full() {
|
let resp_send = match self.resp_send.load_full() {
|
||||||
|
|
|
@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use async_trait::async_trait;
|
|
||||||
use futures::join;
|
use futures::join;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
|
@ -749,7 +748,6 @@ impl System {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl EndpointHandler<SystemRpc> for System {
|
impl EndpointHandler<SystemRpc> for System {
|
||||||
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
|
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
|
||||||
match msg {
|
match msg {
|
||||||
|
|
|
@ -4,6 +4,7 @@ use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
|
|
||||||
|
@ -272,7 +273,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
|
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
|
||||||
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
|
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
|
||||||
match message {
|
match message {
|
||||||
|
|
|
@ -444,7 +444,6 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
|
|
||||||
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
|
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
|
impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
|
||||||
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
|
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
|
||||||
match message {
|
match message {
|
||||||
|
|
|
@ -2,7 +2,6 @@ use std::borrow::Borrow;
|
||||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use futures::stream::*;
|
use futures::stream::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
|
@ -500,7 +499,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
|
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
|
||||||
async fn handle(
|
async fn handle(
|
||||||
self: &Arc<Self>,
|
self: &Arc<Self>,
|
||||||
|
|
Loading…
Add table
Reference in a new issue