RPC performance changes #387

Merged
lx merged 3 commits from configurable-timeouts into main 2022-09-20 14:17:23 +00:00
15 changed files with 115 additions and 142 deletions

4
Cargo.lock generated
View file

@ -2034,9 +2034,9 @@ dependencies = [
[[package]]
name = "netapp"
version = "0.5.1"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06c7cbf05d7cd6e4bc51340934d60c798010bf1856769cf8508caaac1db23db6"
checksum = "4ffe47ac46d3b2ce2f736a70865492df082e042eb2bfdddfca3b8dd66bd9469d"
dependencies = [
"arc-swap",
"async-trait",

View file

@ -780,7 +780,7 @@ in
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; };
dependencies = {
${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
};
});
@ -1428,7 +1428,7 @@ in
garage_web = rustPackages."unknown".garage_web."0.8.0" { inherit profileName; };
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
opentelemetry_otlp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-otlp."0.10.0" { inherit profileName; };
opentelemetry_prometheus = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-prometheus."0.10.0" { inherit profileName; };
@ -1600,7 +1600,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_table" else null } = rustPackages."unknown".garage_table."0.8.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_util" else null } = rustPackages."unknown".garage_util."0.8.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@ -1638,7 +1638,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "k8s_openapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".k8s-openapi."0.13.1" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "kube" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kube."0.62.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "openssl" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl."0.10.38" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "pnet_datalink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; };
@ -1702,7 +1702,7 @@ in
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "http" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hyper" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "lazy_static" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
@ -2823,11 +2823,11 @@ in
};
});
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" = overridableMkRustCrate (profileName: rec {
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" = overridableMkRustCrate (profileName: rec {
name = "netapp";
version = "0.5.1";
version = "0.5.2";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "06c7cbf05d7cd6e4bc51340934d60c798010bf1856769cf8508caaac1db23db6"; };
src = fetchCratesIo { inherit name version; sha256 = "4ffe47ac46d3b2ce2f736a70865492df082e042eb2bfdddfca3b8dd66bd9469d"; };
features = builtins.concatLists [
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default")
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry")
@ -3888,7 +3888,7 @@ in
];
dependencies = {
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; };
untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; };
${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; };
@ -4498,7 +4498,7 @@ in
];
dependencies = {
bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; };
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; };
static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; };
@ -5602,7 +5602,7 @@ in
${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
};
});

View file

@ -41,9 +41,6 @@ use crate::resync::*;
/// Size under which data will be stored inlined in database instead of as files
pub const INLINE_THRESHOLD: usize = 3072;
// Timeout for RPCs that read and write blocks to remote nodes
pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60);
// The delay between the moment when the reference counter
// drops to zero, and the moment where we allow ourselves
// to delete the block locally.
@ -183,7 +180,7 @@ impl BlockManager {
};
return Ok((header, stream));
}
_ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
@ -235,7 +232,7 @@ impl BlockManager {
}
}
}
_ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
debug!("Node {:?} didn't return block in time, trying next.", node);
}
};
@ -300,8 +297,7 @@ impl BlockManager {
&who[..],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
.with_quorum(self.replication.write_quorum()),
)
.await?;
@ -336,7 +332,10 @@ impl BlockManager {
// we will fecth it from someone.
let this = self.clone();
tokio::spawn(async move {
if let Err(e) = this.resync.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
if let Err(e) = this
.resync
.put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
{
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
}
});
@ -444,7 +443,8 @@ impl BlockManager {
Ok(c) => c,
Err(e) => {
// Not found but maybe we should have had it ??
self.resync.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
self.resync
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
return Err(Into::into(e));
}
};

View file

@ -33,14 +33,6 @@ use garage_table::replication::TableReplication;
use crate::manager::*;
// Timeout for RPCs that ask other nodes whether they need a copy
// of a given block before we delete it locally
// The timeout here is relatively low because we don't want to block
// the entire resync loop when some nodes are not responding.
// Nothing will be deleted if the nodes don't answer the queries,
// we will just retry later.
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
// The delay between the time where a resync operation fails
// and the time when it is retried, with exponential backoff
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
@ -346,8 +338,7 @@ impl BlockResyncManager {
&manager.endpoint,
&who,
BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@ -394,8 +385,7 @@ impl BlockResyncManager {
&need_nodes[..],
put_block_message,
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(need_nodes.len())
.with_timeout(BLOCK_RW_TIMEOUT),
.with_quorum(need_nodes.len()),
)
.await
.err_context("PutBlock RPC")?;

View file

@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
u64::from_be_bytes(tmp)
}
#[derive(PartialEq, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
pub struct CausalContext {
pub vector_clock: BTreeMap<K2VNodeId, u64>,
}

View file

@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts";
pub const VALUES: &str = "values";
pub const BYTES: &str = "bytes";
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub struct K2VItem {
pub partition: K2VItemPartition,
pub sort_key: String,
@ -25,19 +25,19 @@ pub struct K2VItem {
items: BTreeMap<K2VNodeId, DvvsEntry>,
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
pub struct K2VItemPartition {
pub bucket_id: Uuid,
pub partition_key: String,
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
struct DvvsEntry {
t_discard: u64,
values: Vec<(u64, DvvsValue)>,
}
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
pub enum DvvsValue {
Value(#[serde(with = "serde_bytes")] Vec<u8>),
Deleted,

View file

@ -23,7 +23,6 @@ use garage_rpc::system::System;
use garage_rpc::*;
use garage_table::replication::{TableReplication, TableShardedReplication};
use garage_table::table::TABLE_RPC_TIMEOUT;
use garage_table::{PartitionKey, Table};
use crate::k2v::causality::*;
@ -117,7 +116,6 @@ impl K2VRpcHandler {
}),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@ -169,7 +167,6 @@ impl K2VRpcHandler {
K2VRpc::InsertManyItems(items),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(1)
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@ -205,10 +202,7 @@ impl K2VRpcHandler {
.replication
.write_nodes(&poll_key.partition.hash());
let resps = self
.system
.rpc
.try_call_many(
let rpc = self.system.rpc.try_call_many(
&self.endpoint,
&nodes[..],
K2VRpc::PollItem {
@ -218,9 +212,13 @@ impl K2VRpcHandler {
},
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.item_table.data.replication.read_quorum())
.with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
)
.await?;
.without_timeout(),
);
let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
let resps = select! {
r = rpc => r?,
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
};
let mut resp: Option<K2VItem> = None;
for v in resps {

View file

@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17"
netapp = { version = "0.5.1", features = ["telemetry"] }
netapp = { version = "0.5.2", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }

View file

@ -1,31 +1,18 @@
use std::sync::Arc;
use opentelemetry::{global, metrics::*};
use tokio::sync::Semaphore;
/// TableMetrics reference all counter used for metrics
pub struct RpcMetrics {
pub(crate) _rpc_available_permits: ValueObserver<u64>,
pub(crate) rpc_counter: Counter<u64>,
pub(crate) rpc_timeout_counter: Counter<u64>,
pub(crate) rpc_netapp_error_counter: Counter<u64>,
pub(crate) rpc_garage_error_counter: Counter<u64>,
pub(crate) rpc_duration: ValueRecorder<f64>,
pub(crate) rpc_queueing_time: ValueRecorder<f64>,
}
impl RpcMetrics {
pub fn new(sem: Arc<Semaphore>) -> Self {
pub fn new() -> Self {
let meter = global::meter("garage_rpc");
RpcMetrics {
_rpc_available_permits: meter
.u64_value_observer("rpc.available_permits", move |observer| {
observer.observe(sem.available_permits() as u64, &[])
})
.with_description("Number of available RPC permits")
.init(),
rpc_counter: meter
.u64_counter("rpc.request_counter")
.with_description("Number of RPC requests emitted")
@ -46,10 +33,6 @@ impl RpcMetrics {
.f64_value_recorder("rpc.duration")
.with_description("Duration of RPCs")
.init(),
rpc_queueing_time: meter
.f64_value_recorder("rpc.queueing_time")
.with_description("Time RPC requests were queued for before being sent")
.init(),
}
}
}

View file

@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt;
use futures_util::future::FutureExt;
use tokio::select;
use tokio::sync::{watch, Semaphore};
use tokio::sync::watch;
use opentelemetry::KeyValue;
use opentelemetry::{
@ -32,32 +32,37 @@ use garage_util::metrics::RecordDuration;
use crate::metrics::RpcMetrics;
use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
// Don't allow more than 100 concurrent outgoing RPCs.
const MAX_CONCURRENT_REQUESTS: usize = 100;
// Default RPC timeout = 5 minutes
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
/// Strategy to apply when making RPC
#[derive(Copy, Clone)]
pub struct RequestStrategy {
/// Max time to wait for reponse
pub rs_timeout: Duration,
/// Min number of response to consider the request successful
pub rs_quorum: Option<usize>,
/// Should requests be dropped after enough response are received
pub rs_interrupt_after_quorum: bool,
/// Request priority
pub rs_priority: RequestPriority,
/// Custom timeout for this request
rs_timeout: Timeout,
}
#[derive(Copy, Clone)]
enum Timeout {
None,
Default,
Custom(Duration),
}
impl RequestStrategy {
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
pub fn with_priority(prio: RequestPriority) -> Self {
RequestStrategy {
rs_timeout: DEFAULT_TIMEOUT,
rs_quorum: None,
rs_interrupt_after_quorum: false,
rs_priority: prio,
rs_timeout: Timeout::Default,
}
}
/// Set quorum to be reached for request
@ -65,17 +70,22 @@ impl RequestStrategy {
self.rs_quorum = Some(quorum);
self
}
/// Set timeout of the strategy
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = timeout;
self
}
/// Set if requests can be dropped after quorum has been reached
/// In general true for read requests, and false for write
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
self.rs_interrupt_after_quorum = interrupt;
self
}
/// Deactivate timeout for this request
pub fn without_timeout(mut self) -> Self {
self.rs_timeout = Timeout::None;
self
}
/// Set custom timeout for this request
pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
self.rs_timeout = Timeout::Custom(timeout);
self
}
}
#[derive(Clone)]
@ -86,8 +96,8 @@ struct RpcHelperInner {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
request_buffer_semaphore: Arc<Semaphore>,
metrics: RpcMetrics,
rpc_timeout: Duration,
}
impl RpcHelper {
@ -96,21 +106,24 @@ impl RpcHelper {
fullmesh: Arc<FullMeshPeeringStrategy>,
background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>,
rpc_timeout: Option<Duration>,
) -> Self {
let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
let metrics = RpcMetrics::new(sem.clone());
let metrics = RpcMetrics::new();
Self(Arc::new(RpcHelperInner {
our_node_id,
fullmesh,
background,
ring,
request_buffer_semaphore: sem,
metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
}))
}
pub fn rpc_timeout(&self) -> Duration {
self.0.rpc_timeout
}
pub async fn call<M, N, H, S>(
&self,
endpoint: &Endpoint<M, H>,
@ -129,13 +142,6 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)),
];
let permit = self
.0
.request_buffer_semaphore
.acquire()
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?;
self.0.metrics.rpc_counter.add(1, &metric_tags);
let node_id = to.into();
@ -143,10 +149,16 @@ impl RpcHelper {
.call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
let timeout = async {
match strat.rs_timeout {
Timeout::None => futures::future::pending().await,
Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
Timeout::Custom(t) => tokio::time::sleep(t).await,
}
};
select! {
res = rpc_call => {
drop(permit);
if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
}
@ -158,8 +170,7 @@ impl RpcHelper {
Ok(res?)
}
_ = tokio::time::sleep(strat.rs_timeout) => {
drop(permit);
() = timeout => {
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
Err(Error::Timeout)
}
@ -413,7 +424,7 @@ impl RpcHelper {
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(1));
.unwrap_or_else(|| Duration::from_secs(10));
(
*to != self.0.our_node_id,
peer_zone != our_zone,

View file

@ -37,7 +37,6 @@ use crate::rpc_helper::*;
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15);
/// Version tag used for version check upon Netapp connection.
/// Cluster nodes with different version tags are deemed
@ -280,6 +279,9 @@ impl System {
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
fullmesh.set_ping_timeout_millis(ping_timeout);
}
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
@ -317,7 +319,13 @@ impl System {
node_status: RwLock::new(HashMap::new()),
netapp: netapp.clone(),
fullmesh: fullmesh.clone(),
rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()),
rpc: RpcHelper::new(
netapp.id.into(),
fullmesh,
background.clone(),
ring.clone(),
config.rpc_timeout_msec.map(Duration::from_millis),
),
system_endpoint,
replication_factor,
rpc_listen_addr: config.rpc_bind_addr,
@ -600,7 +608,7 @@ impl System {
.broadcast(
&self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status),
RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
@ -724,7 +732,7 @@ impl System {
&self.system_endpoint,
peer,
SystemRpc::PullClusterLayout,
RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await;
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {

View file

@ -25,8 +25,6 @@ use crate::replication::*;
use crate::schema::*;
const TABLE_GC_BATCH_SIZE: usize = 1024;
// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15);
// GC delay for table entries: 1 day (24 hours)
// (the delay before the entry is added in the GC todo list
@ -237,9 +235,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::Update(updates),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(nodes.len())
.with_timeout(TABLE_GC_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: send tombstones")?;
@ -260,9 +256,7 @@ where
&self.endpoint,
&nodes[..],
GcRpc::DeleteIfEqualHash(deletes),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(nodes.len())
.with_timeout(TABLE_GC_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await
.err_context("GC: remote delete tombstones")?;

View file

@ -24,9 +24,6 @@ use crate::merkle::*;
use crate::replication::*;
use crate::*;
// Sync RPC can contain a lot of data, so have a 1min timeout
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60);
// Do anti-entropy every 10 minutes
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
@ -248,9 +245,7 @@ where
&self.endpoint,
nodes,
SyncRpc::Items(values),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_quorum(nodes.len())
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
)
.await?;
@ -311,8 +306,7 @@ where
&self.endpoint,
who,
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
@ -368,8 +362,7 @@ where
&self.endpoint,
who,
SyncRpc::GetNode(key.clone()),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?
{
@ -445,8 +438,7 @@ where
&self.endpoint,
who,
SyncRpc::Items(values),
RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_BACKGROUND),
)
.await?;
if let SyncRpc::Ok = rpc_resp {

View file

@ -1,7 +1,6 @@
use std::borrow::Borrow;
use std::collections::{BTreeMap, BTreeSet, HashMap};
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
use futures::stream::*;
@ -31,8 +30,6 @@ use crate::schema::*;
use crate::sync::*;
use crate::util::*;
pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30);
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub system: Arc<System>,
pub data: Arc<TableData<F, R>>,
@ -124,8 +121,7 @@ where
&who[..],
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.write_quorum())
.with_timeout(TABLE_RPC_TIMEOUT),
.with_quorum(self.data.replication.write_quorum()),
)
.await?;
@ -177,7 +173,7 @@ where
&self.endpoint,
node,
rpc,
RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_NORMAL),
)
.await?;
Ok::<_, Error>((node, resp))
@ -234,7 +230,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@ -329,7 +324,6 @@ where
rpc,
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.data.replication.read_quorum())
.with_timeout(TABLE_RPC_TIMEOUT)
.interrupt_after_quorum(true),
)
.await?;
@ -406,9 +400,7 @@ where
&self.endpoint,
who,
TableRpc::<F>::Update(vec![what_enc]),
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(who.len())
.with_timeout(TABLE_RPC_TIMEOUT),
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
)
.await?;
Ok(())

View file

@ -41,6 +41,11 @@ pub struct Config {
/// Public IP address of this node
pub rpc_public_addr: Option<String>,
/// Timeout for Netapp's ping messagess
pub rpc_ping_timeout_msec: Option<u64>,
/// Timeout for Netapp RPC calls
pub rpc_timeout_msec: Option<u64>,
/// Bootstrap peers RPC address
#[serde(default)]
pub bootstrap_peers: Vec<String>,