diff --git a/Cargo.lock b/Cargo.lock index 0b86147b..c0b00ffb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1304,7 +1304,6 @@ dependencies = [ name = "garage_api_common" version = "1.0.1" dependencies = [ - "async-trait", "bytes", "chrono", "crypto-common", @@ -1335,7 +1334,6 @@ dependencies = [ name = "garage_api_k2v" version = "1.0.1" dependencies = [ - "async-trait", "base64 0.21.7", "err-derive", "futures", @@ -1361,7 +1359,6 @@ version = "1.0.1" dependencies = [ "aes-gcm", "async-compression", - "async-trait", "base64 0.21.7", "bytes", "chrono", @@ -1471,7 +1468,6 @@ name = "garage_net" version = "1.0.1" dependencies = [ "arc-swap", - "async-trait", "bytes", "cfg-if", "err-derive", diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 1ab81be3..37574dcf 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -2,7 +2,6 @@ use std::borrow::Cow; use std::sync::Arc; use argon2::password_hash::PasswordHash; -use async_trait::async_trait; use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION}; use hyper::{body::Incoming as IncomingBody, Request, Response}; @@ -55,7 +54,6 @@ impl Rpc for AdminRpc { type Response = Result; } -#[async_trait] impl EndpointHandler for AdminApiServer { async fn handle( self: &Arc, @@ -196,7 +194,6 @@ impl AdminApiServer { struct ArcAdminApiServer(Arc); -#[async_trait] impl ApiHandler for ArcAdminApiServer { const API_NAME: &'static str = "admin"; const API_NAME_DISPLAY: &'static str = "Admin"; diff --git a/src/api/admin/repair.rs b/src/api/admin/repair.rs index 113ef636..a9b8c36a 100644 --- a/src/api/admin/repair.rs +++ b/src/api/admin/repair.rs @@ -1,3 +1,4 @@ +use std::future::Future; use std::sync::Arc; use std::time::Duration; @@ -104,7 +105,7 @@ trait TableRepair: Send + Sync + 'static { &mut self, garage: &Garage, entry: <::T as TableSchema>::E, - ) -> impl std::future::Future> + Send; + ) -> impl Future> + Send; } struct TableRepairWorker { diff --git a/src/api/common/Cargo.toml b/src/api/common/Cargo.toml index 842662c4..5b9cf479 100644 --- a/src/api/common/Cargo.toml +++ b/src/api/common/Cargo.toml @@ -18,7 +18,6 @@ garage_model.workspace = true garage_table.workspace = true garage_util.workspace = true -async-trait.workspace = true bytes.workspace = true chrono.workspace = true crypto-common.workspace = true diff --git a/src/api/common/generic_server.rs b/src/api/common/generic_server.rs index 133e3706..d7ee5692 100644 --- a/src/api/common/generic_server.rs +++ b/src/api/common/generic_server.rs @@ -5,8 +5,6 @@ use std::os::unix::fs::PermissionsExt; use std::sync::Arc; use std::time::Duration; -use async_trait::async_trait; - use futures::future::Future; use futures::stream::{futures_unordered::FuturesUnordered, StreamExt}; @@ -48,7 +46,6 @@ pub trait ApiError: std::error::Error + Send + Sync + 'static { fn http_body(&self, garage_region: &str, path: &str) -> ErrorBody; } -#[async_trait] pub trait ApiHandler: Send + Sync + 'static { const API_NAME: &'static str; const API_NAME_DISPLAY: &'static str; @@ -57,11 +54,11 @@ pub trait ApiHandler: Send + Sync + 'static { type Error: ApiError; fn parse_endpoint(&self, r: &Request) -> Result; - async fn handle( + fn handle( &self, req: Request, endpoint: Self::Endpoint, - ) -> Result>, Self::Error>; + ) -> impl Future>, Self::Error>> + Send; } pub struct ApiServer { @@ -249,13 +246,11 @@ impl ApiServer { // ==== helper functions ==== -#[async_trait] pub trait Accept: Send + Sync + 'static { type Stream: AsyncRead + AsyncWrite + Send + Sync + 'static; - async fn accept(&self) -> std::io::Result<(Self::Stream, String)>; + fn accept(&self) -> impl Future> + Send; } -#[async_trait] impl Accept for TcpListener { type Stream = TcpStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { @@ -267,7 +262,6 @@ impl Accept for TcpListener { pub struct UnixListenerOn(pub UnixListener, pub String); -#[async_trait] impl Accept for UnixListenerOn { type Stream = UnixStream; async fn accept(&self) -> std::io::Result<(Self::Stream, String)> { diff --git a/src/api/k2v/Cargo.toml b/src/api/k2v/Cargo.toml index d4e26efa..e3ebedca 100644 --- a/src/api/k2v/Cargo.toml +++ b/src/api/k2v/Cargo.toml @@ -19,7 +19,6 @@ garage_table.workspace = true garage_util = { workspace = true, features = [ "k2v" ] } garage_api_common.workspace = true -async-trait.workspace = true base64.workspace = true err-derive.workspace = true tracing.workspace = true diff --git a/src/api/k2v/api_server.rs b/src/api/k2v/api_server.rs index c562ffe0..015fd687 100644 --- a/src/api/k2v/api_server.rs +++ b/src/api/k2v/api_server.rs @@ -1,8 +1,6 @@ use std::borrow::Cow; use std::sync::Arc; -use async_trait::async_trait; - use hyper::{body::Incoming as IncomingBody, Method, Request, Response}; use tokio::sync::watch; @@ -49,7 +47,6 @@ impl K2VApiServer { } } -#[async_trait] impl ApiHandler for K2VApiServer { const API_NAME: &'static str = "k2v"; const API_NAME_DISPLAY: &'static str = "K2V"; diff --git a/src/api/s3/Cargo.toml b/src/api/s3/Cargo.toml index a1751c9f..387e45db 100644 --- a/src/api/s3/Cargo.toml +++ b/src/api/s3/Cargo.toml @@ -24,7 +24,6 @@ garage_api_common.workspace = true aes-gcm.workspace = true async-compression.workspace = true -async-trait.workspace = true base64.workspace = true bytes.workspace = true chrono.workspace = true diff --git a/src/api/s3/api_server.rs b/src/api/s3/api_server.rs index 171fbb9a..c8c28f3d 100644 --- a/src/api/s3/api_server.rs +++ b/src/api/s3/api_server.rs @@ -1,8 +1,6 @@ use std::borrow::Cow; use std::sync::Arc; -use async_trait::async_trait; - use hyper::header; use hyper::{body::Incoming as IncomingBody, Request, Response}; use tokio::sync::watch; @@ -71,7 +69,6 @@ impl S3ApiServer { } } -#[async_trait] impl ApiHandler for S3ApiServer { const API_NAME: &'static str = "s3"; const API_NAME_DISPLAY: &'static str = "S3"; diff --git a/src/block/manager.rs b/src/block/manager.rs index 40b177a2..537e1fc1 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -4,7 +4,6 @@ use std::sync::Arc; use std::time::Duration; use arc_swap::{ArcSwap, ArcSwapOption}; -use async_trait::async_trait; use bytes::Bytes; use rand::prelude::*; use serde::{Deserialize, Serialize}; @@ -688,7 +687,6 @@ impl BlockManager { } } -#[async_trait] impl StreamingEndpointHandler for BlockManager { async fn handle(self: &Arc, mut message: Req, _from: NodeID) -> Resp { match message.msg() { diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index a1bf6ee0..821f4549 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -10,7 +10,6 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex, MutexGuard}; use std::time::{Duration, Instant}; -use async_trait::async_trait; use futures::stream::FuturesUnordered; use futures::StreamExt; use serde::{Deserialize, Serialize}; @@ -537,7 +536,6 @@ impl K2VRpcHandler { } } -#[async_trait] impl EndpointHandler for K2VRpcHandler { async fn handle(self: &Arc, message: &K2VRpc, _from: NodeID) -> Result { match message { diff --git a/src/net/Cargo.toml b/src/net/Cargo.toml index 686aaaea..c0b47a6e 100644 --- a/src/net/Cargo.toml +++ b/src/net/Cargo.toml @@ -30,7 +30,6 @@ rand.workspace = true log.workspace = true arc-swap.workspace = true -async-trait.workspace = true err-derive.workspace = true bytes.workspace = true cfg-if.workspace = true diff --git a/src/net/client.rs b/src/net/client.rs index 607dd173..20e1dacd 100644 --- a/src/net/client.rs +++ b/src/net/client.rs @@ -6,7 +6,6 @@ use std::sync::{Arc, Mutex}; use std::task::Poll; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use bytes::Bytes; use log::{debug, error, trace}; @@ -220,7 +219,6 @@ impl ClientConn { impl SendLoop for ClientConn {} -#[async_trait] impl RecvLoop for ClientConn { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { trace!("ClientConn recv_handler {}", id); diff --git a/src/net/endpoint.rs b/src/net/endpoint.rs index 3cafafeb..d46acc42 100644 --- a/src/net/endpoint.rs +++ b/src/net/endpoint.rs @@ -1,8 +1,9 @@ +use std::future::Future; use std::marker::PhantomData; use std::sync::Arc; use arc_swap::ArcSwapOption; -use async_trait::async_trait; +use futures::future::{BoxFuture, FutureExt}; use crate::error::Error; use crate::message::*; @@ -14,19 +15,17 @@ use crate::netapp::*; /// attached to the response.. /// /// The handler object should be in an Arc, see `Endpoint::set_handler` -#[async_trait] pub trait StreamingEndpointHandler: Send + Sync where M: Message, { - async fn handle(self: &Arc, m: Req, from: NodeID) -> Resp; + fn handle(self: &Arc, m: Req, from: NodeID) -> impl Future> + Send; } /// If one simply wants to use an endpoint in a client fashion, /// without locally serving requests to that endpoint, /// use the unit type `()` as the handler type: /// it will panic if it is ever made to handle request. -#[async_trait] impl EndpointHandler for () { async fn handle(self: &Arc<()>, _m: &M, _from: NodeID) -> M::Response { panic!("This endpoint should not have a local handler."); @@ -38,15 +37,13 @@ impl EndpointHandler for () { /// This trait should be implemented by an object of your application /// that can handle a message of type `M`, in the cases where it doesn't /// care about attached stream in the request nor in the response. -#[async_trait] pub trait EndpointHandler: Send + Sync where M: Message, { - async fn handle(self: &Arc, m: &M, from: NodeID) -> M::Response; + fn handle(self: &Arc, m: &M, from: NodeID) -> impl Future + Send; } -#[async_trait] impl StreamingEndpointHandler for T where T: EndpointHandler, @@ -161,9 +158,8 @@ where pub(crate) type DynEndpoint = Box; -#[async_trait] pub(crate) trait GenericEndpoint { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result; + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture>; fn drop_handler(&self); fn clone_endpoint(&self) -> DynEndpoint; } @@ -174,21 +170,23 @@ where M: Message, H: StreamingEndpointHandler; -#[async_trait] impl GenericEndpoint for EndpointArc where M: Message, H: StreamingEndpointHandler + 'static, { - async fn handle(&self, req_enc: ReqEnc, from: NodeID) -> Result { - match self.0.handler.load_full() { - None => Err(Error::NoHandler), - Some(h) => { - let req = Req::from_enc(req_enc)?; - let res = h.handle(req, from).await; - Ok(res.into_enc()?) + fn handle(&self, req_enc: ReqEnc, from: NodeID) -> BoxFuture> { + async move { + match self.0.handler.load_full() { + None => Err(Error::NoHandler), + Some(h) => { + let req = Req::from_enc(req_enc)?; + let res = h.handle(req, from).await; + Ok(res.into_enc()?) + } } } + .boxed() } fn drop_handler(&self) { diff --git a/src/net/netapp.rs b/src/net/netapp.rs index 77e55774..36c6fc88 100644 --- a/src/net/netapp.rs +++ b/src/net/netapp.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use log::{debug, error, info, trace, warn}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::auth; @@ -457,7 +456,6 @@ impl NetApp { } } -#[async_trait] impl EndpointHandler for NetApp { async fn handle(self: &Arc, msg: &HelloMessage, from: NodeID) { debug!("Hello from {:?}: {:?}", hex::encode(&from[..8]), msg); diff --git a/src/net/peering.rs b/src/net/peering.rs index a8d271ec..08378a08 100644 --- a/src/net/peering.rs +++ b/src/net/peering.rs @@ -5,7 +5,6 @@ use std::sync::{Arc, RwLock}; use std::time::{Duration, Instant}; use arc_swap::ArcSwap; -use async_trait::async_trait; use log::{debug, info, trace, warn}; use serde::{Deserialize, Serialize}; @@ -592,7 +591,6 @@ impl PeeringManager { } } -#[async_trait] impl EndpointHandler for PeeringManager { async fn handle(self: &Arc, ping: &PingMessage, from: NodeID) -> PingMessage { let ping_resp = PingMessage { @@ -604,7 +602,6 @@ impl EndpointHandler for PeeringManager { } } -#[async_trait] impl EndpointHandler for PeeringManager { async fn handle( self: &Arc, diff --git a/src/net/recv.rs b/src/net/recv.rs index 0de7bef2..35a6d71a 100644 --- a/src/net/recv.rs +++ b/src/net/recv.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; use bytes::Bytes; use log::*; @@ -50,7 +49,6 @@ impl Drop for Sender { /// according to the protocol defined above: chunks of message in progress of being /// received are stored in a buffer, and when the last chunk of a message is received, /// the full message is passed to the receive handler. -#[async_trait] pub(crate) trait RecvLoop: Sync + 'static { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream); fn cancel_handler(self: &Arc, _id: RequestID) {} diff --git a/src/net/send.rs b/src/net/send.rs index 1454eeb7..6f1ac02c 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use async_trait::async_trait; use bytes::{BufMut, Bytes, BytesMut}; use log::*; @@ -273,7 +272,6 @@ impl DataFrame { /// /// The `.send_loop()` exits when the sending end of the channel is closed, /// or if there is an error at any time writing to the async writer. -#[async_trait] pub(crate) trait SendLoop: Sync { async fn send_loop( self: Arc, diff --git a/src/net/server.rs b/src/net/server.rs index 36dccb2f..fb6c6366 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -3,7 +3,6 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use log::*; use futures::io::{AsyncReadExt, AsyncWriteExt}; @@ -174,7 +173,6 @@ impl ServerConn { impl SendLoop for ServerConn {} -#[async_trait] impl RecvLoop for ServerConn { fn recv_handler(self: &Arc, id: RequestID, stream: ByteStream) { let resp_send = match self.resp_send.load_full() { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 0fa68218..2a52ae5d 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -7,7 +7,6 @@ use std::sync::{Arc, RwLock, RwLockReadGuard}; use std::time::{Duration, Instant}; use arc_swap::ArcSwapOption; -use async_trait::async_trait; use futures::join; use serde::{Deserialize, Serialize}; use sodiumoxide::crypto::sign::ed25519; @@ -749,7 +748,6 @@ impl System { } } -#[async_trait] impl EndpointHandler for System { async fn handle(self: &Arc, msg: &SystemRpc, from: NodeID) -> Result { match msg { diff --git a/src/table/gc.rs b/src/table/gc.rs index 9e060390..28ea119d 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; + use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -272,7 +273,6 @@ impl TableGc { } } -#[async_trait] impl EndpointHandler for TableGc { async fn handle(self: &Arc, message: &GcRpc, _from: NodeID) -> Result { match message { diff --git a/src/table/sync.rs b/src/table/sync.rs index 234ee8ea..2d43b9fc 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -444,7 +444,6 @@ impl TableSyncer { // ======= SYNCHRONIZATION PROCEDURE -- RECEIVER SIDE ====== -#[async_trait] impl EndpointHandler for TableSyncer { async fn handle(self: &Arc, message: &SyncRpc, from: NodeID) -> Result { match message { diff --git a/src/table/table.rs b/src/table/table.rs index 255947e7..c96f4731 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -2,7 +2,6 @@ use std::borrow::Borrow; use std::collections::{BTreeMap, BTreeSet, HashMap}; use std::sync::Arc; -use async_trait::async_trait; use futures::stream::*; use serde::{Deserialize, Serialize}; use serde_bytes::ByteBuf; @@ -500,7 +499,6 @@ impl Table { } } -#[async_trait] impl EndpointHandler> for Table { async fn handle( self: &Arc,