From c1b84ac459eca51a6cad6fa38a2c79c60e4049c0 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 8 Jun 2022 00:40:47 +0200 Subject: [PATCH] initial work on netapp streams - garage side --- Cargo.lock | 5 +-- Cargo.toml | 3 ++ src/api/error.rs | 39 ++++++++++++--------- src/block/Cargo.toml | 1 + src/block/block.rs | 7 ++-- src/block/manager.rs | 74 +++++++++++++++++++++++++++++++++++++-- src/garage/admin.rs | 4 ++- src/garage/cli/structs.rs | 28 +++++++-------- src/model/helper/error.rs | 15 ++++++-- src/rpc/rpc_helper.rs | 19 ++++++++-- src/rpc/system.rs | 2 ++ src/table/gc.rs | 4 ++- src/table/merkle.rs | 2 +- src/table/sync.rs | 4 ++- src/table/table.rs | 37 ++++++++++++++++++++ src/util/Cargo.toml | 1 + src/util/error.rs | 22 ++++++++++-- 17 files changed, 216 insertions(+), 51 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index de1ae5cd..c5f7b41d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -939,6 +939,7 @@ dependencies = [ "garage_table 0.7.0", "garage_util 0.7.0", "hex", + "netapp 0.4.4", "opentelemetry", "rand 0.8.5", "rmp-serde 0.15.5", @@ -1136,6 +1137,7 @@ dependencies = [ name = "garage_util" version = "0.7.0" dependencies = [ + "async-trait", "blake2", "chrono", "err-derive 0.3.1", @@ -1851,8 +1853,6 @@ dependencies = [ [[package]] name = "netapp" version = "0.4.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee" dependencies = [ "arc-swap", "async-trait", @@ -1866,6 +1866,7 @@ dependencies = [ "log", "opentelemetry", "opentelemetry-contrib", + "pin-project 1.0.10", "rand 0.5.6", "rmp-serde 0.14.4", "serde", diff --git a/Cargo.toml b/Cargo.toml index cfc48113..351b75a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -16,3 +16,6 @@ lto = "off" [profile.release] debug = true + +[patch.crates-io] +netapp = { path = "../netapp" } diff --git a/src/api/error.rs b/src/api/error.rs index 4b7254d2..2daacc1a 100644 --- a/src/api/error.rs +++ b/src/api/error.rs @@ -1,4 +1,5 @@ use std::convert::TryInto; +use std::sync::Arc; use err_derive::Error; use hyper::header::HeaderValue; @@ -15,7 +16,7 @@ pub enum Error { // Category: internal error /// Error related to deeper parts of Garage #[error(display = "Internal error: {}", _0)] - InternalError(#[error(source)] GarageError), + InternalError(Arc), /// Error related to Hyper #[error(display = "Internal error (Hyper error): {}", _0)] @@ -109,6 +110,12 @@ pub enum Error { NotImplemented(String), } +impl From for Error { + fn from(err: GarageError) -> Self { + Self::InternalError(Arc::new(err)) + } +} + impl From for Error { fn from(err: roxmltree::Error) -> Self { Self::InvalidXml(format!("{}", err)) @@ -145,14 +152,13 @@ impl Error { Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED, Error::Forbidden(_) => StatusCode::FORBIDDEN, Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE, - Error::InternalError( + Error::InternalError(e) => match **e { GarageError::Timeout | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => StatusCode::SERVICE_UNAVAILABLE, - Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => { - StatusCode::INTERNAL_SERVER_ERROR - } + | GarageError::Quorum(_, _, _, _) => StatusCode::SERVICE_UNAVAILABLE, + _ => StatusCode::INTERNAL_SERVER_ERROR, + }, + Error::Hyper(_) | Error::Http(_) => StatusCode::INTERNAL_SERVER_ERROR, Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE, Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED, _ => StatusCode::BAD_REQUEST, @@ -173,12 +179,13 @@ impl Error { Error::Forbidden(_) => "AccessDenied", Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed", Error::NotImplemented(_) => "NotImplemented", - Error::InternalError( + Error::InternalError(e) => match **e { GarageError::Timeout | GarageError::RemoteError(_) - | GarageError::Quorum(_, _, _, _), - ) => "ServiceUnavailable", - Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError", + | GarageError::Quorum(_, _, _, _) => "ServiceUnavailable", + _ => "InternalError", + }, + Error::Hyper(_) | Error::Http(_) => "InternalError", _ => "InvalidRequest", } } @@ -262,10 +269,8 @@ where fn ok_or_internal_error>(self, reason: M) -> Result { match self { Ok(x) => Ok(x), - Err(e) => Err(Error::InternalError(GarageError::Message(format!( - "{}: {}", - reason.as_ref(), - e + Err(e) => Err(Error::InternalError(Arc::new(GarageError::Message( + format!("{}: {}", reason.as_ref(), e), )))), } } @@ -276,9 +281,9 @@ impl OkOrInternalError for Option { fn ok_or_internal_error>(self, reason: M) -> Result { match self { Some(x) => Ok(x), - None => Err(Error::InternalError(GarageError::Message( + None => Err(Error::InternalError(Arc::new(GarageError::Message( reason.as_ref().to_string(), - ))), + )))), } } } diff --git a/src/block/Cargo.toml b/src/block/Cargo.toml index 9cba69ee..51ef0f08 100644 --- a/src/block/Cargo.toml +++ b/src/block/Cargo.toml @@ -17,6 +17,7 @@ path = "lib.rs" garage_rpc = { version = "0.7.0", path = "../rpc" } garage_util = { version = "0.7.0", path = "../util" } garage_table = { version = "0.7.0", path = "../table" } +netapp = "0.4" opentelemetry = "0.17" diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..afe6655a 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,16 +1,15 @@ -use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; use garage_util::data::*; use garage_util::error::*; /// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub enum DataBlock { /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec), + Plain(Vec), /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec), + Compressed(Vec), } impl DataBlock { diff --git a/src/block/manager.rs b/src/block/manager.rs index 9b2d9cad..7a45f6a9 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -6,8 +6,12 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; +use netapp::endpoint::SerializeMessage; +use netapp::util::AssociatedStream; + use futures::future::*; use futures::select; +use futures::StreamExt; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::sync::{watch, Mutex, Notify}; @@ -56,7 +60,7 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6; pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600); /// RPC messages used to share blocks of data between nodes -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug)] pub enum BlockRpc { Ok, /// Message to ask for a block of data, by hash @@ -73,10 +77,74 @@ pub enum BlockRpc { NeedBlockReply(bool), } +#[derive(Debug, Serialize, Deserialize)] +pub enum BlockRpcSer { + Ok, + GetBlock(Hash), + PutBlock { + hash: Hash, + compressed: bool, + len: usize, + }, + NeedBlockQuery(Hash), + NeedBlockReply(bool), +} + impl Rpc for BlockRpc { type Response = Result; } +#[async_trait] +impl SerializeMessage for BlockRpc { + type SerializableSelf = BlockRpcSer; + + fn serialize_msg(&self) -> (Self::SerializableSelf, Option) { + let ser_self = match self { + BlockRpc::Ok => BlockRpcSer::Ok, + BlockRpc::GetBlock(h) => BlockRpcSer::GetBlock(*h), + BlockRpc::PutBlock { hash, data } => { + let buf = data.inner_buffer().to_vec(); + let ser_self = BlockRpcSer::PutBlock { + hash: *hash, + compressed: data.is_compressed(), + len: buf.len(), + }; + let stream = Box::pin(futures::stream::once(async move { buf })); + return (ser_self, Some(stream)); + } + BlockRpc::NeedBlockQuery(h) => BlockRpcSer::NeedBlockQuery(*h), + BlockRpc::NeedBlockReply(b) => BlockRpcSer::NeedBlockReply(*b), + }; + (ser_self, None) + } + + async fn deserialize_msg(self_ser: Self::SerializableSelf, stream: AssociatedStream) -> Self { + let de_self = match self_ser { + BlockRpcSer::Ok => BlockRpc::Ok, + BlockRpcSer::GetBlock(h) => BlockRpc::GetBlock(h), + BlockRpcSer::PutBlock { + hash, + compressed, + len, + } => { + // TODO this is very useless. It should store the stream so it can be accessed as a + // stream instead + let raw_data = stream.concat().await; + assert_eq!(len, raw_data.len()); + let data = if compressed { + DataBlock::Compressed(raw_data) + } else { + DataBlock::Plain(raw_data) + }; + BlockRpc::PutBlock { hash, data } + } + BlockRpcSer::NeedBlockQuery(h) => BlockRpc::NeedBlockQuery(h), + BlockRpcSer::NeedBlockReply(b) => BlockRpc::NeedBlockReply(b), + }; + de_self + } +} + /// The block manager, handling block exchange between nodes, and block storage on local node pub struct BlockManager { /// Replication strategy, allowing to find on which node blocks should be located @@ -694,7 +762,7 @@ impl BlockManager { } } m => { - return Err(Error::unexpected_rpc_message(m)); + return Err(Error::unexpected_rpc_message(m.serialize_msg().0)); } } } @@ -836,7 +904,7 @@ impl EndpointHandler for BlockManager { BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await, BlockRpc::GetBlock(h) => self.read_block(h).await, BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), - m => Err(Error::unexpected_rpc_message(m)), + m => Err(Error::unexpected_rpc_message(m.serialize_msg().0)), } } } diff --git a/src/garage/admin.rs b/src/garage/admin.rs index af0c3f22..cf7dd05e 100644 --- a/src/garage/admin.rs +++ b/src/garage/admin.rs @@ -29,7 +29,7 @@ use crate::repair::Repair; pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Serialize, Deserialize, Clone)] pub enum AdminRpc { BucketOperation(BucketOperation), KeyOperation(KeyOperation), @@ -49,6 +49,8 @@ impl Rpc for AdminRpc { type Response = Result; } +impl AutoSerialize for AdminRpc {} + pub struct AdminRpcHandler { garage: Arc, endpoint: Arc>, diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index a0c49aeb..702d5bca 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -138,7 +138,7 @@ pub struct RevertLayoutOpt { pub(crate) version: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub enum BucketOperation { /// List buckets #[structopt(name = "list")] @@ -177,7 +177,7 @@ pub enum BucketOperation { Website(WebsiteOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct WebsiteOpt { /// Create #[structopt(long = "allow")] @@ -199,13 +199,13 @@ pub struct WebsiteOpt { pub error_document: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct BucketOpt { /// Bucket name pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct DeleteBucketOpt { /// Bucket name pub name: String, @@ -215,7 +215,7 @@ pub struct DeleteBucketOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct AliasBucketOpt { /// Existing bucket name (its alias in global namespace or its full hex uuid) pub existing_bucket: String, @@ -228,7 +228,7 @@ pub struct AliasBucketOpt { pub local: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct UnaliasBucketOpt { /// Bucket name pub name: String, @@ -238,7 +238,7 @@ pub struct UnaliasBucketOpt { pub local: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct PermBucketOpt { /// Access key name or ID #[structopt(long = "key")] @@ -261,7 +261,7 @@ pub struct PermBucketOpt { pub bucket: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub enum KeyOperation { /// List keys #[structopt(name = "list")] @@ -296,20 +296,20 @@ pub enum KeyOperation { Import(KeyImportOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct KeyOpt { /// ID or name of the key pub key_pattern: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct KeyNewOpt { /// Name of the key #[structopt(long = "name", default_value = "Unnamed key")] pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct KeyRenameOpt { /// ID or name of the key pub key_pattern: String, @@ -318,7 +318,7 @@ pub struct KeyRenameOpt { pub new_name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct KeyDeleteOpt { /// ID or name of the key pub key_pattern: String, @@ -328,7 +328,7 @@ pub struct KeyDeleteOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct KeyPermOpt { /// ID or name of the key pub key_pattern: String, @@ -338,7 +338,7 @@ pub struct KeyPermOpt { pub create_bucket: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] pub struct KeyImportOpt { /// Access key ID pub key_id: String, diff --git a/src/model/helper/error.rs b/src/model/helper/error.rs index 30b2ba32..1a6b16cb 100644 --- a/src/model/helper/error.rs +++ b/src/model/helper/error.rs @@ -1,20 +1,29 @@ use err_derive::Error; use serde::{Deserialize, Serialize}; +use std::sync::Arc; use garage_util::error::Error as GarageError; -#[derive(Debug, Error, Serialize, Deserialize)] +#[derive(Debug, Error, Serialize, Deserialize, Clone)] pub enum Error { #[error(display = "Internal error: {}", _0)] - Internal(#[error(source)] GarageError), + Internal(Arc), #[error(display = "Bad request: {}", _0)] BadRequest(String), } +impl From for Error { + fn from(e: GarageError) -> Error { + Error::Internal(Arc::new(e)) + } +} + +impl netapp::endpoint::AutoSerialize for Error {} + impl From for Error { fn from(e: netapp::error::Error) -> Self { - Error::Internal(GarageError::Netapp(e)) + Error::Internal(Arc::new(GarageError::Netapp(e))) } } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 34717d3b..0ba90ba2 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -15,7 +15,9 @@ use opentelemetry::{ Context, }; -pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; +pub use netapp::endpoint::{ + AutoSerialize, Endpoint, EndpointHandler, Message as Rpc, SerializeMessage, +}; use netapp::peering::fullmesh::FullMeshPeeringStrategy; pub use netapp::proto::*; pub use netapp::{NetApp, NodeID}; @@ -118,7 +120,9 @@ impl RpcHelper { ) -> Result where M: Rpc>, + Result: SerializeMessage, H: EndpointHandler, + S: Send, { self.call_arc(endpoint, to, Arc::new(msg), strat).await } @@ -132,7 +136,9 @@ impl RpcHelper { ) -> Result where M: Rpc>, + Result: SerializeMessage, H: EndpointHandler, + S: Send, { let metric_tags = [ KeyValue::new("rpc_endpoint", endpoint.path().to_string()), @@ -140,7 +146,9 @@ impl RpcHelper { KeyValue::new("to", format!("{:?}", to)), ]; - let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; + //let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32; + // some value to make things works while I figure out how to do better + let msg_size = 8192; let permit = self .0 .request_buffer_semaphore @@ -151,6 +159,7 @@ impl RpcHelper { self.0.metrics.rpc_counter.add(1, &metric_tags); let node_id = to.into(); + let rpc_call = endpoint .call(&node_id, msg, strat.rs_priority) .record_duration(&self.0.metrics.rpc_duration, &metric_tags); @@ -187,7 +196,9 @@ impl RpcHelper { ) -> Vec<(Uuid, Result)> where M: Rpc>, + Result: SerializeMessage, H: EndpointHandler, + S: Send, { let msg = Arc::new(msg); let resps = join_all( @@ -209,7 +220,9 @@ impl RpcHelper { ) -> Vec<(Uuid, Result)> where M: Rpc>, + Result: SerializeMessage, H: EndpointHandler, + S: Send, { let to = self .0 @@ -232,6 +245,7 @@ impl RpcHelper { ) -> Result, Error> where M: Rpc> + 'static, + Result: SerializeMessage, H: EndpointHandler + 'static, S: Send + 'static, { @@ -272,6 +286,7 @@ impl RpcHelper { ) -> Result, Error> where M: Rpc> + 'static, + Result: SerializeMessage, H: EndpointHandler + 'static, S: Send + 'static, { diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 68d94ea5..a1b3ceb6 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -71,6 +71,8 @@ impl Rpc for SystemRpc { type Response = Result; } +impl AutoSerialize for SystemRpc {} + /// This node's membership manager pub struct System { /// The id of this node diff --git a/src/table/gc.rs b/src/table/gc.rs index 2a05b6ae..09ac549b 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -39,7 +39,7 @@ pub(crate) struct TableGc>, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] enum GcRpc { Update(Vec), DeleteIfEqualHash(Vec<(ByteBuf, Hash)>), @@ -50,6 +50,8 @@ impl Rpc for GcRpc { type Response = Result; } +impl AutoSerialize for GcRpc {} + impl TableGc where F: TableSchema + 'static, diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 93bf7e47..731a0869 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -52,7 +52,7 @@ pub struct MerkleNodeKey { pub prefix: Vec, } -#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)] +#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone)] pub enum MerkleNode { // The empty Merkle node Empty, diff --git a/src/table/sync.rs b/src/table/sync.rs index 08069ad0..b2d587bd 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -38,7 +38,7 @@ pub struct TableSyncer endpoint: Arc>, } -#[derive(Serialize, Deserialize)] +#[derive(Serialize, Deserialize, Clone)] pub(crate) enum SyncRpc { RootCkHash(Partition, Hash), RootCkDifferent(bool), @@ -52,6 +52,8 @@ impl Rpc for SyncRpc { type Response = Result; } +impl AutoSerialize for SyncRpc {} + struct SyncTodo { todo: Vec, } diff --git a/src/table/table.rs b/src/table/table.rs index 2a167604..ce9afa14 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -58,10 +58,47 @@ pub(crate) enum TableRpc { Update(Vec>), } +impl Clone for TableRpc +where + F::P: Clone, + F::S: Clone, + F::Filter: Clone, +{ + fn clone(&self) -> Self { + use TableRpc::*; + match self { + Ok => Ok, + ReadEntry(f, s) => ReadEntry(f.clone(), s.clone()), + ReadEntryResponse(b) => ReadEntryResponse(b.clone()), + ReadRange { + partition, + begin_sort_key, + filter, + limit, + enumeration_order, + } => ReadRange { + partition: partition.clone(), + begin_sort_key: begin_sort_key.clone(), + filter: filter.clone(), + limit: *limit, + enumeration_order: enumeration_order.clone(), + }, + Update(v) => Update(v.clone()), + } + } +} + impl Rpc for TableRpc { type Response = Result, Error>; } +impl AutoSerialize for TableRpc +where + F::P: Clone, + F::S: Clone, +{ +} + impl Table where F: TableSchema + 'static, diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 95cde531..c43bd24a 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -14,6 +14,7 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +async-trait = "0.1.7" blake2 = "0.9" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } diff --git a/src/util/error.rs b/src/util/error.rs index 8734a0c8..74ff2575 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -4,7 +4,11 @@ use std::io; use err_derive::Error; -use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer}; +use serde::de::Visitor; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; + +use netapp::endpoint::SerializeMessage; +use netapp::util::AssociatedStream; use crate::data::*; @@ -162,12 +166,26 @@ impl OkOrMessage for Option { // Upon deserialization, they all become a RemoteError with the // given representation. +// can't use AutoSerialize because it requires Clone +#[async_trait::async_trait] +impl SerializeMessage for Error { + type SerializableSelf = String; + fn serialize_msg(&self) -> (Self::SerializableSelf, Option) { + (self.to_string(), None) + } + + async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self { + // TODO verify no stream + Error::RemoteError(ser_self) + } +} + impl Serialize for Error { fn serialize(&self, serializer: S) -> Result where S: Serializer, { - serializer.serialize_str(&format!("{}", self)) + serializer.serialize_str(&self.to_string()) } }