remove async_trait for traits declared in garage_net
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-02-05 20:22:16 +01:00
parent 47e87c8739
commit 620dc58560
15 changed files with 35 additions and 54 deletions

View file

@ -4,7 +4,6 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use arc_swap::{ArcSwap, ArcSwapOption}; use arc_swap::{ArcSwap, ArcSwapOption};
use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use rand::prelude::*; use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -688,7 +687,6 @@ impl BlockManager {
} }
} }
#[async_trait]
impl StreamingEndpointHandler<BlockRpc> for BlockManager { impl StreamingEndpointHandler<BlockRpc> for BlockManager {
async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> { async fn handle(self: &Arc<Self>, mut message: Req<BlockRpc>, _from: NodeID) -> Resp<BlockRpc> {
match message.msg() { match message.msg() {

View file

@ -4,9 +4,11 @@ mod key;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt::Write; use std::fmt::Write;
use std::future::Future;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use futures::future::FutureExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use format_table::format_table_to_string; use format_table::format_table_to_string;
@ -505,22 +507,25 @@ impl AdminRpcHandler {
} }
} }
#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler { impl EndpointHandler<AdminRpc> for AdminRpcHandler {
async fn handle( fn handle(
self: &Arc<Self>, self: &Arc<Self>,
message: &AdminRpc, message: &AdminRpc,
_from: NodeID, _from: NodeID,
) -> Result<AdminRpc, Error> { ) -> impl Future<Output = Result<AdminRpc, Error>> + Send {
match message { let self2 = self.clone();
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, async move {
AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, match message {
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()), AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
} }
.boxed()
} }
} }

View file

@ -10,7 +10,6 @@ use std::convert::TryInto;
use std::sync::{Arc, Mutex, MutexGuard}; use std::sync::{Arc, Mutex, MutexGuard};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::stream::FuturesUnordered; use futures::stream::FuturesUnordered;
use futures::StreamExt; use futures::StreamExt;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -537,7 +536,6 @@ impl K2VRpcHandler {
} }
} }
#[async_trait]
impl EndpointHandler<K2VRpc> for K2VRpcHandler { impl EndpointHandler<K2VRpc> for K2VRpcHandler {
async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> { async fn handle(self: &Arc<Self>, message: &K2VRpc, _from: NodeID) -> Result<K2VRpc, Error> {
match message { match message {

View file

@ -22,6 +22,7 @@ tokio.workspace = true
tokio-util.workspace = true tokio-util.workspace = true
tokio-stream.workspace = true tokio-stream.workspace = true
async-trait.workspace = true
serde.workspace = true serde.workspace = true
rmp-serde.workspace = true rmp-serde.workspace = true
hex.workspace = true hex.workspace = true
@ -30,7 +31,6 @@ rand.workspace = true
log.workspace = true log.workspace = true
arc-swap.workspace = true arc-swap.workspace = true
async-trait.workspace = true
err-derive.workspace = true err-derive.workspace = true
bytes.workspace = true bytes.workspace = true
cfg-if.workspace = true cfg-if.workspace = true

View file

@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex};
use std::task::Poll; use std::task::Poll;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use log::{debug, error, trace}; use log::{debug, error, trace};
@ -220,7 +219,6 @@ impl ClientConn {
impl SendLoop for ClientConn {} impl SendLoop for ClientConn {}
#[async_trait]
impl RecvLoop for ClientConn { impl RecvLoop for ClientConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
trace!("ClientConn recv_handler {}", id); trace!("ClientConn recv_handler {}", id);

View file

@ -1,8 +1,9 @@
use std::future::Future;
use std::marker::PhantomData; use std::marker::PhantomData;
use std::sync::Arc; use std::sync::Arc;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use futures::future::{BoxFuture, FutureExt};
use crate::error::Error; use crate::error::Error;
use crate::message::*; use crate::message::*;
@ -14,19 +15,17 @@ use crate::netapp::*;
/// attached to the response.. /// attached to the response..
/// ///
/// The handler object should be in an Arc, see `Endpoint::set_handler` /// The handler object should be in an Arc, see `Endpoint::set_handler`
#[async_trait]
pub trait StreamingEndpointHandler<M>: Send + Sync pub trait StreamingEndpointHandler<M>: Send + Sync
where where
M: Message, M: Message,
{ {
async fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> Resp<M>; fn handle(self: &Arc<Self>, m: Req<M>, from: NodeID) -> impl Future<Output = Resp<M>> + Send;
} }
/// If one simply wants to use an endpoint in a client fashion, /// If one simply wants to use an endpoint in a client fashion,
/// without locally serving requests to that endpoint, /// without locally serving requests to that endpoint,
/// use the unit type `()` as the handler type: /// use the unit type `()` as the handler type:
/// it will panic if it is ever made to handle request. /// it will panic if it is ever made to handle request.
#[async_trait]
impl<M: Message> EndpointHandler<M> for () { impl<M: Message> EndpointHandler<M> for () {
async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response {
panic!("This endpoint should not have a local handler."); panic!("This endpoint should not have a local handler.");
@ -38,15 +37,13 @@ impl<M: Message> EndpointHandler<M> for () {
/// This trait should be implemented by an object of your application /// This trait should be implemented by an object of your application
/// that can handle a message of type `M`, in the cases where it doesn't /// that can handle a message of type `M`, in the cases where it doesn't
/// care about attached stream in the request nor in the response. /// care about attached stream in the request nor in the response.
#[async_trait]
pub trait EndpointHandler<M>: Send + Sync pub trait EndpointHandler<M>: Send + Sync
where where
M: Message, M: Message,
{ {
async fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> M::Response; fn handle(self: &Arc<Self>, m: &M, from: NodeID) -> impl Future<Output = M::Response> + Send;
} }
#[async_trait]
impl<T, M> StreamingEndpointHandler<M> for T impl<T, M> StreamingEndpointHandler<M> for T
where where
T: EndpointHandler<M>, T: EndpointHandler<M>,
@ -161,9 +158,8 @@ where
pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>; pub(crate) type DynEndpoint = Box<dyn GenericEndpoint + Send + Sync>;
#[async_trait]
pub(crate) trait GenericEndpoint { pub(crate) trait GenericEndpoint {
async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error>; fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>>;
fn drop_handler(&self); fn drop_handler(&self);
fn clone_endpoint(&self) -> DynEndpoint; fn clone_endpoint(&self) -> DynEndpoint;
} }
@ -174,21 +170,23 @@ where
M: Message, M: Message,
H: StreamingEndpointHandler<M>; H: StreamingEndpointHandler<M>;
#[async_trait]
impl<M, H> GenericEndpoint for EndpointArc<M, H> impl<M, H> GenericEndpoint for EndpointArc<M, H>
where where
M: Message, M: Message,
H: StreamingEndpointHandler<M> + 'static, H: StreamingEndpointHandler<M> + 'static,
{ {
async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result<RespEnc, Error> { fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture<Result<RespEnc, Error>> {
match self.0.handler.load_full() { async move {
None => Err(Error::NoHandler), match self.0.handler.load_full() {
Some(h) => { None => Err(Error::NoHandler),
let req = Req::from_enc(req_enc)?; Some(h) => {
let res = h.handle(req, from).await; let req = Req::from_enc(req_enc)?;
Ok(res.into_enc()?) let res = h.handle(req, from).await;
Ok(res.into_enc()?)
}
} }
} }
.boxed()
} }
fn drop_handler(&self) { fn drop_handler(&self) {

View file

@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::auth; use sodiumoxide::crypto::auth;
@ -457,7 +456,6 @@ impl NetApp {
} }
} }
#[async_trait]
impl EndpointHandler<HelloMessage> for NetApp { impl EndpointHandler<HelloMessage> for NetApp {
async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) { async fn handle(self: &Arc<Self>, msg: &HelloMessage, from: NodeID) {
debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg);

View file

@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use async_trait::async_trait;
use log::{debug, info, trace, warn}; use log::{debug, info, trace, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@ -592,7 +591,6 @@ impl PeeringManager {
} }
} }
#[async_trait]
impl EndpointHandler<PingMessage> for PeeringManager { impl EndpointHandler<PingMessage> for PeeringManager {
async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage { async fn handle(self: &Arc<Self>, ping: &PingMessage, from: NodeID) -> PingMessage {
let ping_resp = PingMessage { let ping_resp = PingMessage {
@ -604,7 +602,6 @@ impl EndpointHandler<PingMessage> for PeeringManager {
} }
} }
#[async_trait]
impl EndpointHandler<PeerListMessage> for PeeringManager { impl EndpointHandler<PeerListMessage> for PeeringManager {
async fn handle( async fn handle(
self: &Arc<Self>, self: &Arc<Self>,

View file

@ -1,7 +1,6 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use log::*; use log::*;
@ -50,7 +49,6 @@ impl Drop for Sender {
/// according to the protocol defined above: chunks of message in progress of being /// according to the protocol defined above: chunks of message in progress of being
/// received are stored in a buffer, and when the last chunk of a message is received, /// received are stored in a buffer, and when the last chunk of a message is received,
/// the full message is passed to the receive handler. /// the full message is passed to the receive handler.
#[async_trait]
pub(crate) trait RecvLoop: Sync + 'static { pub(crate) trait RecvLoop: Sync + 'static {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream); fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream);
fn cancel_handler(self: &Arc<Self>, _id: RequestID) {} fn cancel_handler(self: &Arc<Self>, _id: RequestID) {}

View file

@ -3,7 +3,6 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::{BufMut, Bytes, BytesMut}; use bytes::{BufMut, Bytes, BytesMut};
use log::*; use log::*;
@ -273,7 +272,6 @@ impl DataFrame {
/// ///
/// The `.send_loop()` exits when the sending end of the channel is closed, /// The `.send_loop()` exits when the sending end of the channel is closed,
/// or if there is an error at any time writing to the async writer. /// or if there is an error at any time writing to the async writer.
#[async_trait]
pub(crate) trait SendLoop: Sync { pub(crate) trait SendLoop: Sync {
async fn send_loop<W>( async fn send_loop<W>(
self: Arc<Self>, self: Arc<Self>,

View file

@ -3,7 +3,6 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use log::*; use log::*;
use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::io::{AsyncReadExt, AsyncWriteExt};
@ -174,7 +173,6 @@ impl ServerConn {
impl SendLoop for ServerConn {} impl SendLoop for ServerConn {}
#[async_trait]
impl RecvLoop for ServerConn { impl RecvLoop for ServerConn {
fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) { fn recv_handler(self: &Arc<Self>, id: RequestID, stream: ByteStream) {
let resp_send = match self.resp_send.load_full() { let resp_send = match self.resp_send.load_full() {

View file

@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures::join; use futures::join;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
@ -749,7 +748,6 @@ impl System {
} }
} }
#[async_trait]
impl EndpointHandler<SystemRpc> for System { impl EndpointHandler<SystemRpc> for System {
async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> { async fn handle(self: &Arc<Self>, msg: &SystemRpc, from: NodeID) -> Result<SystemRpc, Error> {
match msg { match msg {

View file

@ -4,6 +4,7 @@ use std::sync::Arc;
use std::time::Duration; use std::time::Duration;
use async_trait::async_trait; use async_trait::async_trait;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
@ -272,7 +273,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
} }
} }
#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> { impl<F: TableSchema, R: TableReplication> EndpointHandler<GcRpc> for TableGc<F, R> {
async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> { async fn handle(self: &Arc<Self>, message: &GcRpc, _from: NodeID) -> Result<GcRpc, Error> {
match message { match message {

View file

@ -444,7 +444,6 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
// ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ======
#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> { impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSyncer<F, R> {
async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> { async fn handle(self: &Arc<Self>, message: &SyncRpc, from: NodeID) -> Result<SyncRpc, Error> {
match message { match message {

View file

@ -2,7 +2,6 @@ use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait;
use futures::stream::*; use futures::stream::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
@ -500,7 +499,6 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
} }
} }
#[async_trait]
impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> { impl<F: TableSchema, R: TableReplication> EndpointHandler<TableRpc<F>> for Table<F, R> {
async fn handle( async fn handle(
self: &Arc<Self>, self: &Arc<Self>,