From 620dc58560c7e1509ee9e56ce03d15ec502c34c8 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 5 Feb 2025 20:22:16 +0100 Subject: [PATCH 1/3] remove async_trait for traits declared in garage_net --- src/block/manager.rs | 2 -- src/garage/admin/mod.rs | 31 ++++++++++++++++++------------- src/model/k2v/rpc.rs | 2 -- src/net/Cargo.toml | 2 +- src/net/client.rs | 2 -- src/net/endpoint.rs | 32 +++++++++++++++----------------- src/net/netapp.rs | 2 -- src/net/peering.rs | 3 --- src/net/recv.rs | 2 -- src/net/send.rs | 2 -- src/net/server.rs | 2 -- src/rpc/system.rs | 2 -- src/table/gc.rs | 2 +- src/table/sync.rs | 1 - src/table/table.rs | 2 -- 15 files changed, 35 insertions(+), 54 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 40b177a2..537e1fc1 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::{ArcSwap, ArcSwapOption}; -use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; @@ -688,7 +687,6 @@ impl BlockManager { } } -#[async_trait] impl StreamingEndpointHandler for BlockManager { async fn handle(self: &Arc, mut message: Req, _from: NodeID) -> Resp { match message.msg() { diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index e2468143..ea414b56 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -4,9 +4,11 @@ mod key; use std::collections::HashMap; use std::fmt::Write; +use std::future::Future; use std::sync::Arc; -use async_trait::async_trait; +use futures::future::FutureExt; + use serde::{Deserialize, Serialize}; use format_table::format_table_to_string; @@ -505,22 +507,25 @@ impl AdminRpcHandler { } } -#[async_trait] impl EndpointHandler for AdminRpcHandler { - async fn handle( + fn handle( self: &Arc, message: &AdminRpc, _from: NodeID, - ) -> Result { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), + ) -> impl Future> + Send { + let self2 = self.clone(); + async move { + match message { + AdminRpc::BucketOperation(bo) => self2.handle_bucket_cmd(bo).await, + AdminRpc::KeyOperation(ko) => self2.handle_key_cmd(ko).await, + AdminRpc::LaunchRepair(opt) => self2.handle_launch_repair(opt.clone()).await, + AdminRpc::Stats(opt) => self2.handle_stats(opt.clone()).await, + AdminRpc::Worker(wo) => self2.handle_worker_cmd(wo).await, + AdminRpc::BlockOperation(bo) => self2.handle_block_cmd(bo).await, + AdminRpc::MetaOperation(mo) => self2.handle_meta_cmd(mo).await, + m => Err(GarageError::unexpected_rpc_message(m).into()), + } } + .boxed() } } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index a1bf6ee0..821f4549 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -10,7 +10,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; -use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -537,7 +536,6 @@ impl K2VRpcHandler { } } -#[async_trait] impl EndpointHandler for K2VRpcHandler { async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { match message { diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index 686aaaea..c2a869bb 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -22,6 +22,7 @@ tokio.workspace = true tokio-util.workspace = true tokio-stream.workspace = true +async-trait.workspace = true serde.workspace = true rmp-serde.workspace = true hex.workspace = true @@ -30,7 +31,6 @@ rand.workspace = true log.workspace = true arc-swap.workspace = true -async-trait.workspace = true err-derive.workspace = true bytes.workspace = true cfg-if.workspace = true diff --git a/src/net/client.rs b/src/net/client.rs index 607dd173..20e1dacd 100644 --- a/src/net/client.rs +++ b/src/net/client.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex}; use std::task::Poll; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; @@ -220,7 +219,6 @@ impl ClientConn { impl SendLoop for ClientConn {} -#[async_trait] impl RecvLoop for ClientConn { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 3cafafeb..d46acc42 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -1,8 +1,9 @@ +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use arc_swap::ArcSwapOption; -use async_trait::async_trait; +use futures::future::{BoxFuture, FutureExt}; use crate::error::Error; use crate::message::*; @@ -14,19 +15,17 @@ use crate::netapp::*; /// attached to the response.. /// /// The handler object should be in an Arc, see `Endpoint::set_handler` -#[async_trait] pub trait StreamingEndpointHandler: Send + Sync where M: Message, { - async fn handle(self: &Arc, m: Req, from: NodeID) -> Resp; + fn handle(self: &Arc, m: Req, from: NodeID) -> impl Future> + Send; } /// If one simply wants to use an endpoint in a client fashion, /// without locally serving requests to that endpoint, /// use the unit type `()` as the handler type: /// it will panic if it is ever made to handle request. -#[async_trait] impl EndpointHandler for () { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { panic!("This endpoint should not have a local handler."); @@ -38,15 +37,13 @@ impl EndpointHandler for () { /// This trait should be implemented by an object of your application /// that can handle a message of type `M`, in the cases where it doesn't /// care about attached stream in the request nor in the response. -#[async_trait] pub trait EndpointHandler: Send + Sync where M: Message, { - async fn handle(self: &Arc, m: &M, from: NodeID) -> M::Response; + fn handle(self: &Arc, m: &M, from: NodeID) -> impl Future + Send; } -#[async_trait] impl StreamingEndpointHandler for T where T: EndpointHandler, @@ -161,9 +158,8 @@ where pub(crate) type DynEndpoint = Box; -#[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result; + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -174,21 +170,23 @@ where M: Message, H: StreamingEndpointHandler; -#[async_trait] impl GenericEndpoint for EndpointArc where M: Message, H: StreamingEndpointHandler + 'static, { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result { - match self.0.handler.load_full() { - None => Err(Error::NoHandler), - Some(h) => { - let req = Req::from_enc(req_enc)?; - let res = h.handle(req, from).await; - Ok(res.into_enc()?) + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture> { + async move { + match self.0.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => { + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) + } } } + .boxed() } fn drop_handler(&self) { diff --git a/src/net/netapp.rs b/src/net/netapp.rs index 77e55774..36c6fc88 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; @@ -457,7 +456,6 @@ impl NetApp { } } -#[async_trait] impl EndpointHandler for NetApp { async fn handle(self: &Arc, msg: &HelloMessage, from: NodeID) { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); diff --git a/src/net/peering.rs b/src/net/peering.rs index a8d271ec..08378a08 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; -use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -592,7 +591,6 @@ impl PeeringManager { } } -#[async_trait] impl EndpointHandler for PeeringManager { async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { @@ -604,7 +602,6 @@ impl EndpointHandler for PeeringManager { } } -#[async_trait] impl EndpointHandler for PeeringManager { async fn handle( self: &Arc, diff --git a/src/net/recv.rs b/src/net/recv.rs index 0de7bef2..35a6d71a 100644 --- a/src/net/recv.rs +++ b/src/net/recv.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; use bytes::Bytes; use log::*; @@ -50,7 +49,6 @@ impl Drop for Sender { /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. -#[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream); fn cancel_handler(self: &Arc, _id: RequestID) {} diff --git a/src/net/send.rs b/src/net/send.rs index 1454eeb7..6f1ac02c 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use log::*; @@ -273,7 +272,6 @@ impl DataFrame { /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. -#[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, diff --git a/src/net/server.rs b/src/net/server.rs index 36dccb2f..fb6c6366 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; @@ -174,7 +173,6 @@ impl ServerConn { impl SendLoop for ServerConn {} -#[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { let resp_send = match self.resp_send.load_full() { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 0fa68218..2a52ae5d 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; @@ -749,7 +748,6 @@ impl System { } } -#[async_trait] impl EndpointHandler for System { async fn handle(self: &Arc, msg: &SystemRpc, from: NodeID) -> Result { match msg { diff --git a/src/table/gc.rs b/src/table/gc.rs index 9e060390..28ea119d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; + use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -272,7 +273,6 @@ impl TableGc { } } -#[async_trait] impl EndpointHandler for TableGc { async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> Result { match message { diff --git a/src/table/sync.rs b/src/table/sync.rs index 234ee8ea..2d43b9fc 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -444,7 +444,6 @@ impl TableSyncer { // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== -#[async_trait] impl EndpointHandler for TableSyncer { async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { diff --git a/src/table/table.rs b/src/table/table.rs index 255947e7..c96f4731 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,7 +2,6 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -500,7 +499,6 @@ impl Table { } } -#[async_trait] impl EndpointHandler> for Table { async fn handle( self: &Arc, -- 2.45.3 From 5475da8ea8184f60b0c54586668f204fe68d1113 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 5 Feb 2025 20:31:34 +0100 Subject: [PATCH 2/3] remove async_trait used in generic_server.rs --- Cargo.lock | 4 ---- src/api/admin/api_server.rs | 2 -- src/api/common/Cargo.toml | 1 - src/api/common/generic_server.rs | 12 +++--------- src/api/k2v/Cargo.toml | 1 - src/api/k2v/api_server.rs | 3 --- src/api/s3/Cargo.toml | 1 - src/api/s3/api_server.rs | 3 --- src/net/Cargo.toml | 1 - 9 files changed, 3 insertions(+), 25 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 9ec6d2c2..ad5d098d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1301,7 +1301,6 @@ dependencies = [ name = "garage_api_common" version = "1.0.1" dependencies = [ - "async-trait", "bytes", "chrono", "crypto-common", @@ -1332,7 +1331,6 @@ dependencies = [ name = "garage_api_k2v" version = "1.0.1" dependencies = [ - "async-trait", "base64 0.21.7", "err-derive", "futures", @@ -1358,7 +1356,6 @@ version = "1.0.1" dependencies = [ "aes-gcm", "async-compression", - "async-trait", "base64 0.21.7", "bytes", "chrono", @@ -1468,7 +1465,6 @@ name = "garage_net" version = "1.0.1" dependencies = [ "arc-swap", - "async-trait", "bytes", "cfg-if", "err-derive", diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index e39fa1ba..6f0c474f 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -2,7 +2,6 @@ use std::collections::HashMap; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use async_trait::async_trait; use http::header::{ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; @@ -221,7 +220,6 @@ impl AdminApiServer { } } -#[async_trait] impl ApiHandler for AdminApiServer { const API_NAME: &'static str = "admin"; const API_NAME_DISPLAY: &'static str = "Admin"; diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml index 842662c4..5b9cf479 100644 --- a/src/api/common/Cargo.toml +++ b/src/api/common/Cargo.toml @@ -18,7 +18,6 @@ garage_model.workspace = true garage_table.workspace = true garage_util.workspace = true -async-trait.workspace = true bytes.workspace = true chrono.workspace = true crypto-common.workspace = true diff --git a/src/api/common/generic_server.rs b/src/api/common/generic_server.rs index d92a3465..6ddc2ff2 100644 --- a/src/api/common/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -4,8 +4,6 @@ use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; - use futures::future::Future; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; @@ -47,7 +45,6 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } -#[async_trait] pub trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; @@ -56,11 +53,11 @@ pub trait ApiHandler: Send + Sync + 'static { type Error: ApiError; fn parse_endpoint(&self, r: &Request) -> Result; - async fn handle( + fn handle( &self, req: Request, endpoint: Self::Endpoint, - ) -> Result>, Self::Error>; + ) -> impl Future>, Self::Error>> + Send; } pub struct ApiServer { @@ -248,13 +245,11 @@ impl ApiServer { // ==== helper functions ==== -#[async_trait] pub trait Accept: Send + Sync + 'static { type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; + fn accept(&self) -> impl Future> + Send; } -#[async_trait] impl Accept for TcpListener { type Stream = TcpStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { @@ -266,7 +261,6 @@ impl Accept for TcpListener { pub struct UnixListenerOn(pub UnixListener, pub String); -#[async_trait] impl Accept for UnixListenerOn { type Stream = UnixStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml index d4e26efa..e3ebedca 100644 --- a/src/api/k2v/Cargo.toml +++ b/src/api/k2v/Cargo.toml @@ -19,7 +19,6 @@ garage_table.workspace = true garage_util = { workspace = true, features = [ "k2v" ] } garage_api_common.workspace = true -async-trait.workspace = true base64.workspace = true err-derive.workspace = true tracing.workspace = true diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index 31e07762..eb276f5b 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use tokio::sync::watch; @@ -48,7 +46,6 @@ impl K2VApiServer { } } -#[async_trait] impl ApiHandler for K2VApiServer { const API_NAME: &'static str = "k2v"; const API_NAME_DISPLAY: &'static str = "K2V"; diff --git a/src/api/s3/Cargo.toml b/src/api/s3/Cargo.toml index a1751c9f..387e45db 100644 --- a/src/api/s3/Cargo.toml +++ b/src/api/s3/Cargo.toml @@ -24,7 +24,6 @@ garage_api_common.workspace = true aes-gcm.workspace = true async-compression.workspace = true -async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index ed71b108..bf48bba1 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,7 +1,5 @@ use std::sync::Arc; -use async_trait::async_trait; - use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; use tokio::sync::watch; @@ -70,7 +68,6 @@ impl S3ApiServer { } } -#[async_trait] impl ApiHandler for S3ApiServer { const API_NAME: &'static str = "s3"; const API_NAME_DISPLAY: &'static str = "S3"; diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index c2a869bb..c0b47a6e 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -22,7 +22,6 @@ tokio.workspace = true tokio-util.workspace = true tokio-stream.workspace = true -async-trait.workspace = true serde.workspace = true rmp-serde.workspace = true hex.workspace = true -- 2.45.3 From af67626ab2bd32e94ab521607574737939a7edf3 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 5 Feb 2025 20:39:43 +0100 Subject: [PATCH 3/3] remove async_trait for TableRepair --- src/garage/repair/online.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/garage/repair/online.rs b/src/garage/repair/online.rs index 2c5227d2..47883f97 100644 --- a/src/garage/repair/online.rs +++ b/src/garage/repair/online.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -93,17 +94,16 @@ pub async fn launch_online_repair( // ---- -#[async_trait] trait TableRepair: Send + Sync + 'static { type T: TableSchema; fn table(garage: &Garage) -> &Table; - async fn process( + fn process( &mut self, garage: &Garage, entry: <::T as TableSchema>::E, - ) -> Result; + ) -> impl Future> + Send; } struct TableRepairWorker { @@ -174,7 +174,6 @@ impl Worker for TableRepairWorker { struct RepairVersions; -#[async_trait] impl TableRepair for RepairVersions { type T = VersionTable; @@ -221,7 +220,6 @@ impl TableRepair for RepairVersions { struct RepairBlockRefs; -#[async_trait] impl TableRepair for RepairBlockRefs { type T = BlockRefTable; @@ -257,7 +255,6 @@ impl TableRepair for RepairBlockRefs { struct RepairMpu; -#[async_trait] impl TableRepair for RepairMpu { type T = MultipartUploadTable; -- 2.45.3