forked from Deuxfleurs/garage
wip: use netapp streaming #1
17 changed files with 216 additions and 51 deletions
5
Cargo.lock
generated
5
Cargo.lock
generated
|
@ -939,6 +939,7 @@ dependencies = [
|
||||||
"garage_table 0.7.0",
|
"garage_table 0.7.0",
|
||||||
"garage_util 0.7.0",
|
"garage_util 0.7.0",
|
||||||
"hex",
|
"hex",
|
||||||
|
"netapp 0.4.4",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"rand 0.8.5",
|
"rand 0.8.5",
|
||||||
"rmp-serde 0.15.5",
|
"rmp-serde 0.15.5",
|
||||||
|
@ -1136,6 +1137,7 @@ dependencies = [
|
||||||
name = "garage_util"
|
name = "garage_util"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"async-trait",
|
||||||
"blake2",
|
"blake2",
|
||||||
"chrono",
|
"chrono",
|
||||||
"err-derive 0.3.1",
|
"err-derive 0.3.1",
|
||||||
|
@ -1851,8 +1853,6 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "netapp"
|
name = "netapp"
|
||||||
version = "0.4.4"
|
version = "0.4.4"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
|
||||||
checksum = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee"
|
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"arc-swap",
|
"arc-swap",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
|
@ -1866,6 +1866,7 @@ dependencies = [
|
||||||
"log",
|
"log",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
"opentelemetry-contrib",
|
"opentelemetry-contrib",
|
||||||
|
"pin-project 1.0.10",
|
||||||
"rand 0.5.6",
|
"rand 0.5.6",
|
||||||
"rmp-serde 0.14.4",
|
"rmp-serde 0.14.4",
|
||||||
"serde",
|
"serde",
|
||||||
|
|
|
@ -16,3 +16,6 @@ lto = "off"
|
||||||
|
|
||||||
[profile.release]
|
[profile.release]
|
||||||
debug = true
|
debug = true
|
||||||
|
|
||||||
|
[patch.crates-io]
|
||||||
|
netapp = { path = "../netapp" }
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use err_derive::Error;
|
use err_derive::Error;
|
||||||
use hyper::header::HeaderValue;
|
use hyper::header::HeaderValue;
|
||||||
|
@ -15,7 +16,7 @@ pub enum Error {
|
||||||
// Category: internal error
|
// Category: internal error
|
||||||
/// Error related to deeper parts of Garage
|
/// Error related to deeper parts of Garage
|
||||||
#[error(display = "Internal error: {}", _0)]
|
#[error(display = "Internal error: {}", _0)]
|
||||||
InternalError(#[error(source)] GarageError),
|
InternalError(Arc<GarageError>),
|
||||||
|
|
||||||
/// Error related to Hyper
|
/// Error related to Hyper
|
||||||
#[error(display = "Internal error (Hyper error): {}", _0)]
|
#[error(display = "Internal error (Hyper error): {}", _0)]
|
||||||
|
@ -109,6 +110,12 @@ pub enum Error {
|
||||||
NotImplemented(String),
|
NotImplemented(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<GarageError> for Error {
|
||||||
|
fn from(err: GarageError) -> Self {
|
||||||
|
Self::InternalError(Arc::new(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl From<roxmltree::Error> for Error {
|
impl From<roxmltree::Error> for Error {
|
||||||
fn from(err: roxmltree::Error) -> Self {
|
fn from(err: roxmltree::Error) -> Self {
|
||||||
Self::InvalidXml(format!("{}", err))
|
Self::InvalidXml(format!("{}", err))
|
||||||
|
@ -145,14 +152,13 @@ impl Error {
|
||||||
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
|
Error::PreconditionFailed => StatusCode::PRECONDITION_FAILED,
|
||||||
Error::Forbidden(_) => StatusCode::FORBIDDEN,
|
Error::Forbidden(_) => StatusCode::FORBIDDEN,
|
||||||
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
|
Error::NotAcceptable(_) => StatusCode::NOT_ACCEPTABLE,
|
||||||
Error::InternalError(
|
Error::InternalError(e) => match **e {
|
||||||
GarageError::Timeout
|
GarageError::Timeout
|
||||||
| GarageError::RemoteError(_)
|
| GarageError::RemoteError(_)
|
||||||
| GarageError::Quorum(_, _, _, _),
|
| GarageError::Quorum(_, _, _, _) => StatusCode::SERVICE_UNAVAILABLE,
|
||||||
) => StatusCode::SERVICE_UNAVAILABLE,
|
_ => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => {
|
},
|
||||||
StatusCode::INTERNAL_SERVER_ERROR
|
Error::Hyper(_) | Error::Http(_) => StatusCode::INTERNAL_SERVER_ERROR,
|
||||||
}
|
|
||||||
Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
|
Error::InvalidRange(_) => StatusCode::RANGE_NOT_SATISFIABLE,
|
||||||
Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
|
Error::NotImplemented(_) => StatusCode::NOT_IMPLEMENTED,
|
||||||
_ => StatusCode::BAD_REQUEST,
|
_ => StatusCode::BAD_REQUEST,
|
||||||
|
@ -173,12 +179,13 @@ impl Error {
|
||||||
Error::Forbidden(_) => "AccessDenied",
|
Error::Forbidden(_) => "AccessDenied",
|
||||||
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
|
Error::AuthorizationHeaderMalformed(_) => "AuthorizationHeaderMalformed",
|
||||||
Error::NotImplemented(_) => "NotImplemented",
|
Error::NotImplemented(_) => "NotImplemented",
|
||||||
Error::InternalError(
|
Error::InternalError(e) => match **e {
|
||||||
GarageError::Timeout
|
GarageError::Timeout
|
||||||
| GarageError::RemoteError(_)
|
| GarageError::RemoteError(_)
|
||||||
| GarageError::Quorum(_, _, _, _),
|
| GarageError::Quorum(_, _, _, _) => "ServiceUnavailable",
|
||||||
) => "ServiceUnavailable",
|
_ => "InternalError",
|
||||||
Error::InternalError(_) | Error::Hyper(_) | Error::Http(_) => "InternalError",
|
},
|
||||||
|
Error::Hyper(_) | Error::Http(_) => "InternalError",
|
||||||
_ => "InvalidRequest",
|
_ => "InvalidRequest",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -262,10 +269,8 @@ where
|
||||||
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Ok(x) => Ok(x),
|
Ok(x) => Ok(x),
|
||||||
Err(e) => Err(Error::InternalError(GarageError::Message(format!(
|
Err(e) => Err(Error::InternalError(Arc::new(GarageError::Message(
|
||||||
"{}: {}",
|
format!("{}: {}", reason.as_ref(), e),
|
||||||
reason.as_ref(),
|
|
||||||
e
|
|
||||||
)))),
|
)))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -276,9 +281,9 @@ impl<T> OkOrInternalError for Option<T> {
|
||||||
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
fn ok_or_internal_error<M: AsRef<str>>(self, reason: M) -> Result<T, Error> {
|
||||||
match self {
|
match self {
|
||||||
Some(x) => Ok(x),
|
Some(x) => Ok(x),
|
||||||
None => Err(Error::InternalError(GarageError::Message(
|
None => Err(Error::InternalError(Arc::new(GarageError::Message(
|
||||||
reason.as_ref().to_string(),
|
reason.as_ref().to_string(),
|
||||||
))),
|
)))),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,6 +17,7 @@ path = "lib.rs"
|
||||||
garage_rpc = { version = "0.7.0", path = "../rpc" }
|
garage_rpc = { version = "0.7.0", path = "../rpc" }
|
||||||
garage_util = { version = "0.7.0", path = "../util" }
|
garage_util = { version = "0.7.0", path = "../util" }
|
||||||
garage_table = { version = "0.7.0", path = "../table" }
|
garage_table = { version = "0.7.0", path = "../table" }
|
||||||
|
netapp = "0.4"
|
||||||
|
|
||||||
opentelemetry = "0.17"
|
opentelemetry = "0.17"
|
||||||
|
|
||||||
|
|
|
@ -1,16 +1,15 @@
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
|
||||||
/// A possibly compressed block of data
|
/// A possibly compressed block of data
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug)]
|
||||||
pub enum DataBlock {
|
pub enum DataBlock {
|
||||||
/// Uncompressed data
|
/// Uncompressed data
|
||||||
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
|
Plain(Vec<u8>),
|
||||||
/// Data compressed with zstd
|
/// Data compressed with zstd
|
||||||
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
|
Compressed(Vec<u8>),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DataBlock {
|
impl DataBlock {
|
||||||
|
|
|
@ -6,8 +6,12 @@ use std::time::Duration;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use netapp::endpoint::SerializeMessage;
|
||||||
|
use netapp::util::AssociatedStream;
|
||||||
|
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
use futures::select;
|
use futures::select;
|
||||||
|
use futures::StreamExt;
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::sync::{watch, Mutex, Notify};
|
use tokio::sync::{watch, Mutex, Notify};
|
||||||
|
@ -56,7 +60,7 @@ const RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER: u64 = 6;
|
||||||
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
||||||
|
|
||||||
/// RPC messages used to share blocks of data between nodes
|
/// RPC messages used to share blocks of data between nodes
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug)]
|
||||||
pub enum BlockRpc {
|
pub enum BlockRpc {
|
||||||
Ok,
|
Ok,
|
||||||
/// Message to ask for a block of data, by hash
|
/// Message to ask for a block of data, by hash
|
||||||
|
@ -73,10 +77,74 @@ pub enum BlockRpc {
|
||||||
NeedBlockReply(bool),
|
NeedBlockReply(bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum BlockRpcSer {
|
||||||
|
Ok,
|
||||||
|
GetBlock(Hash),
|
||||||
|
PutBlock {
|
||||||
|
hash: Hash,
|
||||||
|
compressed: bool,
|
||||||
|
len: usize,
|
||||||
|
},
|
||||||
|
NeedBlockQuery(Hash),
|
||||||
|
NeedBlockReply(bool),
|
||||||
|
}
|
||||||
|
|
||||||
impl Rpc for BlockRpc {
|
impl Rpc for BlockRpc {
|
||||||
type Response = Result<BlockRpc, Error>;
|
type Response = Result<BlockRpc, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl SerializeMessage for BlockRpc {
|
||||||
|
type SerializableSelf = BlockRpcSer;
|
||||||
|
|
||||||
|
fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) {
|
||||||
|
let ser_self = match self {
|
||||||
|
BlockRpc::Ok => BlockRpcSer::Ok,
|
||||||
|
BlockRpc::GetBlock(h) => BlockRpcSer::GetBlock(*h),
|
||||||
|
BlockRpc::PutBlock { hash, data } => {
|
||||||
|
let buf = data.inner_buffer().to_vec();
|
||||||
|
let ser_self = BlockRpcSer::PutBlock {
|
||||||
|
hash: *hash,
|
||||||
|
compressed: data.is_compressed(),
|
||||||
|
len: buf.len(),
|
||||||
|
};
|
||||||
|
let stream = Box::pin(futures::stream::once(async move { buf }));
|
||||||
|
return (ser_self, Some(stream));
|
||||||
|
}
|
||||||
|
BlockRpc::NeedBlockQuery(h) => BlockRpcSer::NeedBlockQuery(*h),
|
||||||
|
BlockRpc::NeedBlockReply(b) => BlockRpcSer::NeedBlockReply(*b),
|
||||||
|
};
|
||||||
|
(ser_self, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deserialize_msg(self_ser: Self::SerializableSelf, stream: AssociatedStream) -> Self {
|
||||||
|
let de_self = match self_ser {
|
||||||
|
BlockRpcSer::Ok => BlockRpc::Ok,
|
||||||
|
BlockRpcSer::GetBlock(h) => BlockRpc::GetBlock(h),
|
||||||
|
BlockRpcSer::PutBlock {
|
||||||
|
hash,
|
||||||
|
compressed,
|
||||||
|
len,
|
||||||
|
} => {
|
||||||
|
// TODO this is very useless. It should store the stream so it can be accessed as a
|
||||||
|
// stream instead
|
||||||
|
let raw_data = stream.concat().await;
|
||||||
|
assert_eq!(len, raw_data.len());
|
||||||
|
let data = if compressed {
|
||||||
|
DataBlock::Compressed(raw_data)
|
||||||
|
} else {
|
||||||
|
DataBlock::Plain(raw_data)
|
||||||
|
};
|
||||||
|
BlockRpc::PutBlock { hash, data }
|
||||||
|
}
|
||||||
|
BlockRpcSer::NeedBlockQuery(h) => BlockRpc::NeedBlockQuery(h),
|
||||||
|
BlockRpcSer::NeedBlockReply(b) => BlockRpc::NeedBlockReply(b),
|
||||||
|
};
|
||||||
|
de_self
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// The block manager, handling block exchange between nodes, and block storage on local node
|
/// The block manager, handling block exchange between nodes, and block storage on local node
|
||||||
pub struct BlockManager {
|
pub struct BlockManager {
|
||||||
/// Replication strategy, allowing to find on which node blocks should be located
|
/// Replication strategy, allowing to find on which node blocks should be located
|
||||||
|
@ -694,7 +762,7 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
m => {
|
m => {
|
||||||
return Err(Error::unexpected_rpc_message(m));
|
return Err(Error::unexpected_rpc_message(m.serialize_msg().0));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -836,7 +904,7 @@ impl EndpointHandler<BlockRpc> for BlockManager {
|
||||||
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
|
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
|
||||||
BlockRpc::GetBlock(h) => self.read_block(h).await,
|
BlockRpc::GetBlock(h) => self.read_block(h).await,
|
||||||
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
|
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
|
||||||
m => Err(Error::unexpected_rpc_message(m)),
|
m => Err(Error::unexpected_rpc_message(m.serialize_msg().0)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -29,7 +29,7 @@ use crate::repair::Repair;
|
||||||
|
|
||||||
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
|
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize, Clone)]
|
||||||
pub enum AdminRpc {
|
pub enum AdminRpc {
|
||||||
BucketOperation(BucketOperation),
|
BucketOperation(BucketOperation),
|
||||||
KeyOperation(KeyOperation),
|
KeyOperation(KeyOperation),
|
||||||
|
@ -49,6 +49,8 @@ impl Rpc for AdminRpc {
|
||||||
type Response = Result<AdminRpc, Error>;
|
type Response = Result<AdminRpc, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AutoSerialize for AdminRpc {}
|
||||||
|
|
||||||
pub struct AdminRpcHandler {
|
pub struct AdminRpcHandler {
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
endpoint: Arc<Endpoint<AdminRpc, Self>>,
|
endpoint: Arc<Endpoint<AdminRpc, Self>>,
|
||||||
|
|
|
@ -138,7 +138,7 @@ pub struct RevertLayoutOpt {
|
||||||
pub(crate) version: Option<u64>,
|
pub(crate) version: Option<u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub enum BucketOperation {
|
pub enum BucketOperation {
|
||||||
/// List buckets
|
/// List buckets
|
||||||
#[structopt(name = "list")]
|
#[structopt(name = "list")]
|
||||||
|
@ -177,7 +177,7 @@ pub enum BucketOperation {
|
||||||
Website(WebsiteOpt),
|
Website(WebsiteOpt),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct WebsiteOpt {
|
pub struct WebsiteOpt {
|
||||||
/// Create
|
/// Create
|
||||||
#[structopt(long = "allow")]
|
#[structopt(long = "allow")]
|
||||||
|
@ -199,13 +199,13 @@ pub struct WebsiteOpt {
|
||||||
pub error_document: Option<String>,
|
pub error_document: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct BucketOpt {
|
pub struct BucketOpt {
|
||||||
/// Bucket name
|
/// Bucket name
|
||||||
pub name: String,
|
pub name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct DeleteBucketOpt {
|
pub struct DeleteBucketOpt {
|
||||||
/// Bucket name
|
/// Bucket name
|
||||||
pub name: String,
|
pub name: String,
|
||||||
|
@ -215,7 +215,7 @@ pub struct DeleteBucketOpt {
|
||||||
pub yes: bool,
|
pub yes: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct AliasBucketOpt {
|
pub struct AliasBucketOpt {
|
||||||
/// Existing bucket name (its alias in global namespace or its full hex uuid)
|
/// Existing bucket name (its alias in global namespace or its full hex uuid)
|
||||||
pub existing_bucket: String,
|
pub existing_bucket: String,
|
||||||
|
@ -228,7 +228,7 @@ pub struct AliasBucketOpt {
|
||||||
pub local: Option<String>,
|
pub local: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct UnaliasBucketOpt {
|
pub struct UnaliasBucketOpt {
|
||||||
/// Bucket name
|
/// Bucket name
|
||||||
pub name: String,
|
pub name: String,
|
||||||
|
@ -238,7 +238,7 @@ pub struct UnaliasBucketOpt {
|
||||||
pub local: Option<String>,
|
pub local: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct PermBucketOpt {
|
pub struct PermBucketOpt {
|
||||||
/// Access key name or ID
|
/// Access key name or ID
|
||||||
#[structopt(long = "key")]
|
#[structopt(long = "key")]
|
||||||
|
@ -261,7 +261,7 @@ pub struct PermBucketOpt {
|
||||||
pub bucket: String,
|
pub bucket: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub enum KeyOperation {
|
pub enum KeyOperation {
|
||||||
/// List keys
|
/// List keys
|
||||||
#[structopt(name = "list")]
|
#[structopt(name = "list")]
|
||||||
|
@ -296,20 +296,20 @@ pub enum KeyOperation {
|
||||||
Import(KeyImportOpt),
|
Import(KeyImportOpt),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct KeyOpt {
|
pub struct KeyOpt {
|
||||||
/// ID or name of the key
|
/// ID or name of the key
|
||||||
pub key_pattern: String,
|
pub key_pattern: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct KeyNewOpt {
|
pub struct KeyNewOpt {
|
||||||
/// Name of the key
|
/// Name of the key
|
||||||
#[structopt(long = "name", default_value = "Unnamed key")]
|
#[structopt(long = "name", default_value = "Unnamed key")]
|
||||||
pub name: String,
|
pub name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct KeyRenameOpt {
|
pub struct KeyRenameOpt {
|
||||||
/// ID or name of the key
|
/// ID or name of the key
|
||||||
pub key_pattern: String,
|
pub key_pattern: String,
|
||||||
|
@ -318,7 +318,7 @@ pub struct KeyRenameOpt {
|
||||||
pub new_name: String,
|
pub new_name: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct KeyDeleteOpt {
|
pub struct KeyDeleteOpt {
|
||||||
/// ID or name of the key
|
/// ID or name of the key
|
||||||
pub key_pattern: String,
|
pub key_pattern: String,
|
||||||
|
@ -328,7 +328,7 @@ pub struct KeyDeleteOpt {
|
||||||
pub yes: bool,
|
pub yes: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct KeyPermOpt {
|
pub struct KeyPermOpt {
|
||||||
/// ID or name of the key
|
/// ID or name of the key
|
||||||
pub key_pattern: String,
|
pub key_pattern: String,
|
||||||
|
@ -338,7 +338,7 @@ pub struct KeyPermOpt {
|
||||||
pub create_bucket: bool,
|
pub create_bucket: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, StructOpt, Debug)]
|
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
|
||||||
pub struct KeyImportOpt {
|
pub struct KeyImportOpt {
|
||||||
/// Access key ID
|
/// Access key ID
|
||||||
pub key_id: String,
|
pub key_id: String,
|
||||||
|
|
|
@ -1,20 +1,29 @@
|
||||||
use err_derive::Error;
|
use err_derive::Error;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
|
|
||||||
#[derive(Debug, Error, Serialize, Deserialize)]
|
#[derive(Debug, Error, Serialize, Deserialize, Clone)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
#[error(display = "Internal error: {}", _0)]
|
#[error(display = "Internal error: {}", _0)]
|
||||||
Internal(#[error(source)] GarageError),
|
Internal(Arc<GarageError>),
|
||||||
|
|
||||||
#[error(display = "Bad request: {}", _0)]
|
#[error(display = "Bad request: {}", _0)]
|
||||||
BadRequest(String),
|
BadRequest(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl From<GarageError> for Error {
|
||||||
|
fn from(e: GarageError) -> Error {
|
||||||
|
Error::Internal(Arc::new(e))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl netapp::endpoint::AutoSerialize for Error {}
|
||||||
|
|
||||||
impl From<netapp::error::Error> for Error {
|
impl From<netapp::error::Error> for Error {
|
||||||
fn from(e: netapp::error::Error) -> Self {
|
fn from(e: netapp::error::Error) -> Self {
|
||||||
Error::Internal(GarageError::Netapp(e))
|
Error::Internal(Arc::new(GarageError::Netapp(e)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -15,7 +15,9 @@ use opentelemetry::{
|
||||||
Context,
|
Context,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc};
|
pub use netapp::endpoint::{
|
||||||
|
AutoSerialize, Endpoint, EndpointHandler, Message as Rpc, SerializeMessage,
|
||||||
|
};
|
||||||
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
||||||
pub use netapp::proto::*;
|
pub use netapp::proto::*;
|
||||||
pub use netapp::{NetApp, NodeID};
|
pub use netapp::{NetApp, NodeID};
|
||||||
|
@ -118,7 +120,9 @@ impl RpcHelper {
|
||||||
) -> Result<S, Error>
|
) -> Result<S, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
Result<S, Error>: SerializeMessage,
|
||||||
H: EndpointHandler<M>,
|
H: EndpointHandler<M>,
|
||||||
|
S: Send,
|
||||||
{
|
{
|
||||||
self.call_arc(endpoint, to, Arc::new(msg), strat).await
|
self.call_arc(endpoint, to, Arc::new(msg), strat).await
|
||||||
}
|
}
|
||||||
|
@ -132,7 +136,9 @@ impl RpcHelper {
|
||||||
) -> Result<S, Error>
|
) -> Result<S, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
Result<S, Error>: SerializeMessage,
|
||||||
H: EndpointHandler<M>,
|
H: EndpointHandler<M>,
|
||||||
|
S: Send,
|
||||||
{
|
{
|
||||||
let metric_tags = [
|
let metric_tags = [
|
||||||
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
|
KeyValue::new("rpc_endpoint", endpoint.path().to_string()),
|
||||||
|
@ -140,7 +146,9 @@ impl RpcHelper {
|
||||||
KeyValue::new("to", format!("{:?}", to)),
|
KeyValue::new("to", format!("{:?}", to)),
|
||||||
];
|
];
|
||||||
|
|
||||||
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
|
//let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
|
||||||
|
// some value to make things works while I figure out how to do better
|
||||||
|
let msg_size = 8192;
|
||||||
let permit = self
|
let permit = self
|
||||||
.0
|
.0
|
||||||
.request_buffer_semaphore
|
.request_buffer_semaphore
|
||||||
|
@ -151,6 +159,7 @@ impl RpcHelper {
|
||||||
self.0.metrics.rpc_counter.add(1, &metric_tags);
|
self.0.metrics.rpc_counter.add(1, &metric_tags);
|
||||||
|
|
||||||
let node_id = to.into();
|
let node_id = to.into();
|
||||||
|
|
||||||
let rpc_call = endpoint
|
let rpc_call = endpoint
|
||||||
.call(&node_id, msg, strat.rs_priority)
|
.call(&node_id, msg, strat.rs_priority)
|
||||||
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
||||||
|
@ -187,7 +196,9 @@ impl RpcHelper {
|
||||||
) -> Vec<(Uuid, Result<S, Error>)>
|
) -> Vec<(Uuid, Result<S, Error>)>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
Result<S, Error>: SerializeMessage,
|
||||||
H: EndpointHandler<M>,
|
H: EndpointHandler<M>,
|
||||||
|
S: Send,
|
||||||
{
|
{
|
||||||
let msg = Arc::new(msg);
|
let msg = Arc::new(msg);
|
||||||
let resps = join_all(
|
let resps = join_all(
|
||||||
|
@ -209,7 +220,9 @@ impl RpcHelper {
|
||||||
) -> Vec<(Uuid, Result<S, Error>)>
|
) -> Vec<(Uuid, Result<S, Error>)>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
Result<S, Error>: SerializeMessage,
|
||||||
H: EndpointHandler<M>,
|
H: EndpointHandler<M>,
|
||||||
|
S: Send,
|
||||||
{
|
{
|
||||||
let to = self
|
let to = self
|
||||||
.0
|
.0
|
||||||
|
@ -232,6 +245,7 @@ impl RpcHelper {
|
||||||
) -> Result<Vec<S>, Error>
|
) -> Result<Vec<S>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>> + 'static,
|
M: Rpc<Response = Result<S, Error>> + 'static,
|
||||||
|
Result<S, Error>: SerializeMessage,
|
||||||
H: EndpointHandler<M> + 'static,
|
H: EndpointHandler<M> + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
{
|
{
|
||||||
|
@ -272,6 +286,7 @@ impl RpcHelper {
|
||||||
) -> Result<Vec<S>, Error>
|
) -> Result<Vec<S>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>> + 'static,
|
M: Rpc<Response = Result<S, Error>> + 'static,
|
||||||
|
Result<S, Error>: SerializeMessage,
|
||||||
H: EndpointHandler<M> + 'static,
|
H: EndpointHandler<M> + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
{
|
{
|
||||||
|
|
|
@ -71,6 +71,8 @@ impl Rpc for SystemRpc {
|
||||||
type Response = Result<SystemRpc, Error>;
|
type Response = Result<SystemRpc, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AutoSerialize for SystemRpc {}
|
||||||
|
|
||||||
/// This node's membership manager
|
/// This node's membership manager
|
||||||
pub struct System {
|
pub struct System {
|
||||||
/// The id of this node
|
/// The id of this node
|
||||||
|
|
|
@ -39,7 +39,7 @@ pub(crate) struct TableGc<F: TableSchema + 'static, R: TableReplication + 'stati
|
||||||
endpoint: Arc<Endpoint<GcRpc, Self>>,
|
endpoint: Arc<Endpoint<GcRpc, Self>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
enum GcRpc {
|
enum GcRpc {
|
||||||
Update(Vec<ByteBuf>),
|
Update(Vec<ByteBuf>),
|
||||||
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
|
DeleteIfEqualHash(Vec<(ByteBuf, Hash)>),
|
||||||
|
@ -50,6 +50,8 @@ impl Rpc for GcRpc {
|
||||||
type Response = Result<GcRpc, Error>;
|
type Response = Result<GcRpc, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AutoSerialize for GcRpc {}
|
||||||
|
|
||||||
impl<F, R> TableGc<F, R>
|
impl<F, R> TableGc<F, R>
|
||||||
where
|
where
|
||||||
F: TableSchema + 'static,
|
F: TableSchema + 'static,
|
||||||
|
|
|
@ -52,7 +52,7 @@ pub struct MerkleNodeKey {
|
||||||
pub prefix: Vec<u8>,
|
pub prefix: Vec<u8>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
|
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize, Clone)]
|
||||||
pub enum MerkleNode {
|
pub enum MerkleNode {
|
||||||
// The empty Merkle node
|
// The empty Merkle node
|
||||||
Empty,
|
Empty,
|
||||||
|
|
|
@ -38,7 +38,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
|
||||||
endpoint: Arc<Endpoint<SyncRpc, Self>>,
|
endpoint: Arc<Endpoint<SyncRpc, Self>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
pub(crate) enum SyncRpc {
|
pub(crate) enum SyncRpc {
|
||||||
RootCkHash(Partition, Hash),
|
RootCkHash(Partition, Hash),
|
||||||
RootCkDifferent(bool),
|
RootCkDifferent(bool),
|
||||||
|
@ -52,6 +52,8 @@ impl Rpc for SyncRpc {
|
||||||
type Response = Result<SyncRpc, Error>;
|
type Response = Result<SyncRpc, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl AutoSerialize for SyncRpc {}
|
||||||
|
|
||||||
struct SyncTodo {
|
struct SyncTodo {
|
||||||
todo: Vec<TodoPartition>,
|
todo: Vec<TodoPartition>,
|
||||||
}
|
}
|
||||||
|
|
|
@ -58,10 +58,47 @@ pub(crate) enum TableRpc<F: TableSchema> {
|
||||||
Update(Vec<Arc<ByteBuf>>),
|
Update(Vec<Arc<ByteBuf>>),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<F: TableSchema> Clone for TableRpc<F>
|
||||||
|
where
|
||||||
|
F::P: Clone,
|
||||||
|
F::S: Clone,
|
||||||
|
F::Filter: Clone,
|
||||||
|
{
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
use TableRpc::*;
|
||||||
|
match self {
|
||||||
|
Ok => Ok,
|
||||||
|
ReadEntry(f, s) => ReadEntry(f.clone(), s.clone()),
|
||||||
|
ReadEntryResponse(b) => ReadEntryResponse(b.clone()),
|
||||||
|
ReadRange {
|
||||||
|
partition,
|
||||||
|
begin_sort_key,
|
||||||
|
filter,
|
||||||
|
limit,
|
||||||
|
enumeration_order,
|
||||||
|
} => ReadRange {
|
||||||
|
partition: partition.clone(),
|
||||||
|
begin_sort_key: begin_sort_key.clone(),
|
||||||
|
filter: filter.clone(),
|
||||||
|
limit: *limit,
|
||||||
|
enumeration_order: enumeration_order.clone(),
|
||||||
|
},
|
||||||
|
Update(v) => Update(v.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl<F: TableSchema> Rpc for TableRpc<F> {
|
impl<F: TableSchema> Rpc for TableRpc<F> {
|
||||||
type Response = Result<TableRpc<F>, Error>;
|
type Response = Result<TableRpc<F>, Error>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl<F: TableSchema> AutoSerialize for TableRpc<F>
|
||||||
|
where
|
||||||
|
F::P: Clone,
|
||||||
|
F::S: Clone,
|
||||||
|
{
|
||||||
|
}
|
||||||
|
|
||||||
impl<F, R> Table<F, R>
|
impl<F, R> Table<F, R>
|
||||||
where
|
where
|
||||||
F: TableSchema + 'static,
|
F: TableSchema + 'static,
|
||||||
|
|
|
@ -14,6 +14,7 @@ path = "lib.rs"
|
||||||
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
async-trait = "0.1.7"
|
||||||
blake2 = "0.9"
|
blake2 = "0.9"
|
||||||
err-derive = "0.3"
|
err-derive = "0.3"
|
||||||
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
|
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
|
||||||
|
|
|
@ -4,7 +4,11 @@ use std::io;
|
||||||
|
|
||||||
use err_derive::Error;
|
use err_derive::Error;
|
||||||
|
|
||||||
use serde::{de::Visitor, Deserialize, Deserializer, Serialize, Serializer};
|
use serde::de::Visitor;
|
||||||
|
use serde::{Deserialize, Deserializer, Serialize, Serializer};
|
||||||
|
|
||||||
|
use netapp::endpoint::SerializeMessage;
|
||||||
|
use netapp::util::AssociatedStream;
|
||||||
|
|
||||||
use crate::data::*;
|
use crate::data::*;
|
||||||
|
|
||||||
|
@ -162,12 +166,26 @@ impl<T> OkOrMessage for Option<T> {
|
||||||
// Upon deserialization, they all become a RemoteError with the
|
// Upon deserialization, they all become a RemoteError with the
|
||||||
// given representation.
|
// given representation.
|
||||||
|
|
||||||
|
// can't use AutoSerialize because it requires Clone
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl SerializeMessage for Error {
|
||||||
|
type SerializableSelf = String;
|
||||||
|
fn serialize_msg(&self) -> (Self::SerializableSelf, Option<AssociatedStream>) {
|
||||||
|
(self.to_string(), None)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deserialize_msg(ser_self: Self::SerializableSelf, stream: AssociatedStream) -> Self {
|
||||||
|
// TODO verify no stream
|
||||||
|
Error::RemoteError(ser_self)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
impl Serialize for Error {
|
impl Serialize for Error {
|
||||||
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
|
||||||
where
|
where
|
||||||
S: Serializer,
|
S: Serializer,
|
||||||
{
|
{
|
||||||
serializer.serialize_str(&format!("{}", self))
|
serializer.serialize_str(&self.to_string())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue