wip: use netapp streaming #1

Draft
trinity-1686a wants to merge 1 commit from netapp-streams into main
17 changed files with 216 additions and 51 deletions
Showing only changes of commit c1b84ac459 - Show all commits

5
Cargo.lock generated
View file

@ -939,6 +939,7 @@ dependencies = [
"garage_table 0.7.0",
"garage_util 0.7.0",
"hex",
"netapp 0.4.4",
"opentelemetry",
"rand 0.8.5",
"rmp-serde 0.15.5",
@ -1136,6 +1137,7 @@ dependencies = [
name = "garage_util"
version = "0.7.0"
dependencies = [
"async-trait",
"blake2",
"chrono",
"err-derive 0.3.1",
@ -1851,8 +1853,6 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee"
dependencies = [
"arc-swap",
"async-trait",
@ -1866,6 +1866,7 @@ dependencies = [
"log",
"opentelemetry",
"opentelemetry-contrib",
"pin-project 1.0.10",
"rand 0.5.6",
"rmp-serde 0.14.4",
"serde",

View file

@ -16,3 +16,6 @@ lto = "off"
[profile.release]
debug = true
[patch.crates-io]
netapp = { path = "../netapp" }

View file

@ -1,4 +1,5 @@
use std::convert::TryInto;
use std::sync::Arc;
use err_derive::Error;
use hyper::header::HeaderValue;
@ -15,7 +16,7 @@ pub enum Error {
// Category: internal error
/// Error related to deeper parts of Garage
#[error(display = "Internal error: {}", _0)]
InternalError(#[error(source)] GarageError),
InternalError(Arc<GarageError>),
/// Error related to Hyper
#[error(display = "Internal error (Hyper error): {}", _0)]
@ -109,6 +110,12 @@ pub enum Error {
NotImplemented(String),
}
impl From<GarageError> for Error {
fn from(err: GarageError) -> Self {
Self::InternalError(Arc::new(err))
}
}
impl From<roxmltree::Error> for Error {
fn from(err: roxmltree::Error) -> Self {
Self::InvalidXml(format!("{}", err))
@ -145,14 +152,13 @@ impl Error {
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
Error::Forbidden(_) => StatusCode::FORBIDDEN,
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
Error::InternalError(
Error::InternalError(e) => match **e {
GarageError::Timeout
| GarageError::RemoteError(_)
| GarageError::Quorum(_, _, _, _),
) => StatusCode::SERVICE_UNAVAILABLE,
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
StatusCode::INTERNAL_SERVER_ERROR
}
| GarageError::Quorum(_, _, _, _) => StatusCode::SERVICE_UNAVAILABLE,
_ => StatusCode::INTERNAL_SERVER_ERROR,
},
Error::Hyper(_) | Error::Http(_) => StatusCode::INTERNAL_SERVER_ERROR,
Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
_ => StatusCode::BAD_REQUEST,
@ -173,12 +179,13 @@ impl Error {
Error::Forbidden(_) => "AccessDenied",
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
Error::NotImplemented(_) => "NotImplemented",
Error::InternalError(
Error::InternalError(e) => match **e {
GarageError::Timeout
| GarageError::RemoteError(_)
| GarageError::Quorum(_, _, _, _),
) => "ServiceUnavailable",
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
| GarageError::Quorum(_, _, _, _) => "ServiceUnavailable",
_ => "InternalError",
},
Error::Hyper(_) | Error::Http(_) => "InternalError",
_ => "InvalidRequest",
}
}
@ -262,10 +269,8 @@ where
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
match self {
Ok(x) => Ok(x),
Err(e) => Err(Error::InternalError(GarageError::Message(format!(
"{}: {}",
reason.as_ref(),
e
Err(e) => Err(Error::InternalError(Arc::new(GarageError::Message(
format!("{}: {}", reason.as_ref(), e),
)))),
}
}
@ -276,9 +281,9 @@ impl<T> OkOrInternalError for Option<T> {
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
match self {
Some(x) => Ok(x),
None => Err(Error::InternalError(GarageError::Message(
None => Err(Error::InternalError(Arc::new(GarageError::Message(
reason.as_ref().to_string(),
))),
)))),
}
}
}

View file

@ -17,6 +17,7 @@ path = "lib.rs"
garage_rpc = { version = "0.7.0", path = "../rpc" }
garage_util = { version = "0.7.0", path = "../util" }
garage_table = { version = "0.7.0", path = "../table" }
netapp = "0.4"
opentelemetry = "0.17"

View file

@ -1,16 +1,15 @@
use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*;
use garage_util::error::*;
/// A possibly compressed block of data
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub enum DataBlock {
/// Uncompressed data
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
Plain(Vec<u8>),
/// Data compressed with zstd
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
Compressed(Vec<u8>),
}
impl DataBlock {

View file

@ -6,8 +6,12 @@ use std::time::Duration;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use netapp::endpoint::SerializeMessage;
use netapp::util::AssociatedStream;
use futures::future::*;
use futures::select;
use futures::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
@ -56,7 +60,7 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
/// RPC messages used to share blocks of data between nodes
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug)]
pub enum BlockRpc {
Ok,
/// Message to ask for a block of data, by hash
@ -73,10 +77,74 @@ pub enum BlockRpc {
NeedBlockReply(bool),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum BlockRpcSer {
Ok,
GetBlock(Hash),
PutBlock {
hash: Hash,
compressed: bool,
len: usize,
},
NeedBlockQuery(Hash),
NeedBlockReply(bool),
}
impl Rpc for BlockRpc {
type Response = Result<BlockRpc, Error>;
}
#[async_trait]
impl SerializeMessage for BlockRpc {
type SerializableSelf = BlockRpcSer;
fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) {
let ser_self = match self {
BlockRpc::Ok => BlockRpcSer::Ok,
BlockRpc::GetBlock(h) => BlockRpcSer::GetBlock(*h),
BlockRpc::PutBlock { hash, data } => {
let buf = data.inner_buffer().to_vec();
let ser_self = BlockRpcSer::PutBlock {
hash: *hash,
compressed: data.is_compressed(),
len: buf.len(),
};
let stream = Box::pin(futures::stream::once(async move { buf }));
return (ser_self, Some(stream));
}
BlockRpc::NeedBlockQuery(h) => BlockRpcSer::NeedBlockQuery(*h),
BlockRpc::NeedBlockReply(b) => BlockRpcSer::NeedBlockReply(*b),
};
(ser_self, None)
}
async fn deserialize_msg(self_ser: Self::SerializableSelf, stream: AssociatedStream) -> Self {
let de_self = match self_ser {
BlockRpcSer::Ok => BlockRpc::Ok,
BlockRpcSer::GetBlock(h) => BlockRpc::GetBlock(h),
BlockRpcSer::PutBlock {
hash,
compressed,
len,
} => {
// TODO this is very useless. It should store the stream so it can be accessed as a
// stream instead
let raw_data = stream.concat().await;
assert_eq!(len, raw_data.len());
let data = if compressed {
DataBlock::Compressed(raw_data)
} else {
DataBlock::Plain(raw_data)
};
BlockRpc::PutBlock { hash, data }
}
BlockRpcSer::NeedBlockQuery(h) => BlockRpc::NeedBlockQuery(h),
BlockRpcSer::NeedBlockReply(b) => BlockRpc::NeedBlockReply(b),
};
de_self
}
}
/// The block manager, handling block exchange between nodes, and block storage on local node
pub struct BlockManager {
/// Replication strategy, allowing to find on which node blocks should be located
@ -694,7 +762,7 @@ impl BlockManager {
}
}
m => {
return Err(Error::unexpected_rpc_message(m));
return Err(Error::unexpected_rpc_message(m.serialize_msg().0));
}
}
}
@ -836,7 +904,7 @@ impl EndpointHandler<BlockRpc> for BlockManager {
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
BlockRpc::GetBlock(h) => self.read_block(h).await,
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
m => Err(Error::unexpected_rpc_message(m)),
m => Err(Error::unexpected_rpc_message(m.serialize_msg().0)),
}
}
}

View file

@ -29,7 +29,7 @@ use crate::repair::Repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
#[derive(Debug, Serialize, Deserialize, Clone)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
@ -49,6 +49,8 @@ impl Rpc for AdminRpc {
type Response = Result<AdminRpc, Error>;
}
impl AutoSerialize for AdminRpc {}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,

View file

@ -138,7 +138,7 @@ pub struct RevertLayoutOpt {
pub(crate) version: Option<u64>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list")]
@ -177,7 +177,7 @@ pub enum BucketOperation {
Website(WebsiteOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
@ -199,13 +199,13 @@ pub struct WebsiteOpt {
pub error_document: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
@ -215,7 +215,7 @@ pub struct DeleteBucketOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct AliasBucketOpt {
/// Existing bucket name (its alias in global namespace or its full hex uuid)
pub existing_bucket: String,
@ -228,7 +228,7 @@ pub struct AliasBucketOpt {
pub local: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct UnaliasBucketOpt {
/// Bucket name
pub name: String,
@ -238,7 +238,7 @@ pub struct UnaliasBucketOpt {
pub local: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
@ -261,7 +261,7 @@ pub struct PermBucketOpt {
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list")]
@ -296,20 +296,20 @@ pub enum KeyOperation {
Import(KeyImportOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct KeyOpt {
/// ID or name of the key
pub key_pattern: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(long = "name", default_value = "Unnamed key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
@ -318,7 +318,7 @@ pub struct KeyRenameOpt {
pub new_name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
@ -328,7 +328,7 @@ pub struct KeyDeleteOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct KeyPermOpt {
/// ID or name of the key
pub key_pattern: String,
@ -338,7 +338,7 @@ pub struct KeyPermOpt {
pub create_bucket: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,

View file

@ -1,20 +1,29 @@
use err_derive::Error;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use garage_util::error::Error as GarageError;
#[derive(Debug, Error, Serialize, Deserialize)]
#[derive(Debug, Error, Serialize, Deserialize, Clone)]
pub enum Error {
#[error(display = "Internal error: {}", _0)]
Internal(#[error(source)] GarageError),
Internal(Arc<GarageError>),
#[error(display = "Bad request: {}", _0)]
BadRequest(String),
}
impl From<GarageError> for Error {
fn from(e: GarageError) -> Error {
Error::Internal(Arc::new(e))
}
}
impl netapp::endpoint::AutoSerialize for Error {}
impl From<netapp::error::Error> for Error {
fn from(e: netapp::error::Error) -> Self {
Error::Internal(GarageError::Netapp(e))
Error::Internal(Arc::new(GarageError::Netapp(e)))
}
}

View file

@ -15,7 +15,9 @@ use opentelemetry::{
Context,
};
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
pub use netapp::endpoint::{
AutoSerialize, Endpoint, EndpointHandler, Message as Rpc, SerializeMessage,
};
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID};
@ -118,7 +120,9 @@ impl RpcHelper {
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
Result<S, Error>: SerializeMessage,
H: EndpointHandler<M>,
S: Send,
{
self.call_arc(endpoint, to, Arc::new(msg), strat).await
}
@ -132,7 +136,9 @@ impl RpcHelper {
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
Result<S, Error>: SerializeMessage,
H: EndpointHandler<M>,
S: Send,
{
let metric_tags = [
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
@ -140,7 +146,9 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
//let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
// some value to make things works while I figure out how to do better
let msg_size = 8192;
let permit = self
.0
.request_buffer_semaphore
@ -151,6 +159,7 @@ impl RpcHelper {
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
let rpc_call = endpoint
.call(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
@ -187,7 +196,9 @@ impl RpcHelper {
) -> Vec<(Uuid, Result<S, Error>)>
where
M: Rpc<Response = Result<S, Error>>,
Result<S, Error>: SerializeMessage,
H: EndpointHandler<M>,
S: Send,
{
let msg = Arc::new(msg);
let resps = join_all(
@ -209,7 +220,9 @@ impl RpcHelper {
) -> Vec<(Uuid, Result<S, Error>)>
where
M: Rpc<Response = Result<S, Error>>,
Result<S, Error>: SerializeMessage,
H: EndpointHandler<M>,
S: Send,
{
let to = self
.0
@ -232,6 +245,7 @@ impl RpcHelper {
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
Result<S, Error>: SerializeMessage,
H: EndpointHandler<M> + 'static,
S: Send + 'static,
{
@ -272,6 +286,7 @@ impl RpcHelper {
) -> Result<Vec<S>, Error>
where
M: Rpc<Response = Result<S, Error>> + 'static,
Result<S, Error>: SerializeMessage,
H: EndpointHandler<M> + 'static,
S: Send + 'static,
{

View file

@ -71,6 +71,8 @@ impl Rpc for SystemRpc {
type Response = Result<SystemRpc, Error>;
}
impl AutoSerialize for SystemRpc {}
/// This node's membership manager
pub struct System {
/// The id of this node

View file

@ -39,7 +39,7 @@ pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'stati
endpoint: Arc<Endpoint<GcRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
enum GcRpc {
Update(Vec<ByteBuf>),
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
@ -50,6 +50,8 @@ impl Rpc for GcRpc {
type Response = Result<GcRpc, Error>;
}
impl AutoSerialize for GcRpc {}
impl<F, R> TableGc<F, R>
where
F: TableSchema + 'static,

View file

@ -52,7 +52,7 @@ pub struct MerkleNodeKey {
pub prefix: Vec<u8>,
}
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone)]
pub enum MerkleNode {
// The empty Merkle node
Empty,

View file

@ -38,7 +38,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
#[derive(Serialize, Deserialize)]
#[derive(Serialize, Deserialize, Clone)]
pub(crate) enum SyncRpc {
RootCkHash(Partition, Hash),
RootCkDifferent(bool),
@ -52,6 +52,8 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
impl AutoSerialize for SyncRpc {}
struct SyncTodo {
todo: Vec<TodoPartition>,
}

View file

@ -58,10 +58,47 @@ pub(crate) enum TableRpc<F: TableSchema> {
Update(Vec<Arc<ByteBuf>>),
}
impl<F: TableSchema> Clone for TableRpc<F>
where
F::P: Clone,
F::S: Clone,
F::Filter: Clone,
{
fn clone(&self) -> Self {
use TableRpc::*;
match self {
Ok => Ok,
ReadEntry(f, s) => ReadEntry(f.clone(), s.clone()),
ReadEntryResponse(b) => ReadEntryResponse(b.clone()),
ReadRange {
partition,
begin_sort_key,
filter,
limit,
enumeration_order,
} => ReadRange {
partition: partition.clone(),
begin_sort_key: begin_sort_key.clone(),
filter: filter.clone(),
limit: *limit,
enumeration_order: enumeration_order.clone(),
},
Update(v) => Update(v.clone()),
}
}
}
impl<F: TableSchema> Rpc for TableRpc<F> {
type Response = Result<TableRpc<F>, Error>;
}
impl<F: TableSchema> AutoSerialize for TableRpc<F>
where
F::P: Clone,
F::S: Clone,
{
}
impl<F, R> Table<F, R>
where
F: TableSchema + 'static,

View file

@ -14,6 +14,7 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
async-trait = "0.1.7"
blake2 = "0.9"
err-derive = "0.3"
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }

View file

@ -4,7 +4,11 @@ use std::io;
use err_derive::Error;
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
use serde::de::Visitor;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use netapp::endpoint::SerializeMessage;
use netapp::util::AssociatedStream;
use crate::data::*;
@ -162,12 +166,26 @@ impl<T> OkOrMessage for Option<T> {
// Upon deserialization, they all become a RemoteError with the
// given representation.
// can't use AutoSerialize because it requires Clone
#[async_trait::async_trait]
impl SerializeMessage for Error {
type SerializableSelf = String;
fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) {
(self.to_string(), None)
}
async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self {
// TODO verify no stream
Error::RemoteError(ser_self)
}
}
impl Serialize for Error {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&format!("{}", self))
serializer.serialize_str(&self.to_string())
}
}