Merge pull request 'RPC performance changes' (#387) from configurable-timeouts into main
Reviewed-on: #387
This commit is contained in:
commit
7a901f7aab
15 changed files with 115 additions and 142 deletions
4
Cargo.lock
generated
4
Cargo.lock
generated
|
@ -2034,9 +2034,9 @@ dependencies = [
|
|||
|
||||
[[package]]
|
||||
name = "netapp"
|
||||
version = "0.5.1"
|
||||
version = "0.5.2"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "06c7cbf05d7cd6e4bc51340934d60c798010bf1856769cf8508caaac1db23db6"
|
||||
checksum = "4ffe47ac46d3b2ce2f736a70865492df082e042eb2bfdddfca3b8dd66bd9469d"
|
||||
dependencies = [
|
||||
"arc-swap",
|
||||
"async-trait",
|
||||
|
|
22
Cargo.nix
22
Cargo.nix
|
@ -780,7 +780,7 @@ in
|
|||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "59a6001667ab124aebae2a495118e11d30984c3a653e99d86d58971708cf5e4b"; };
|
||||
dependencies = {
|
||||
${ if hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.config == "aarch64-apple-darwin" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
|
||||
${ if hostPlatform.config == "aarch64-apple-darwin" || hostPlatform.config == "aarch64-linux-android" || hostPlatform.parsed.cpu.name == "aarch64" && hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
|
||||
};
|
||||
});
|
||||
|
||||
|
@ -1428,7 +1428,7 @@ in
|
|||
garage_web = rustPackages."unknown".garage_web."0.8.0" { inherit profileName; };
|
||||
hex = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
|
||||
sodiumoxide = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
|
||||
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
|
||||
netapp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
|
||||
opentelemetry = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
|
||||
opentelemetry_otlp = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-otlp."0.10.0" { inherit profileName; };
|
||||
opentelemetry_prometheus = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry-prometheus."0.10.0" { inherit profileName; };
|
||||
|
@ -1600,7 +1600,7 @@ in
|
|||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_table" else null } = rustPackages."unknown".garage_table."0.8.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "garage_util" else null } = rustPackages."unknown".garage_util."0.8.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "hex" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_web" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
|
||||
|
@ -1638,7 +1638,7 @@ in
|
|||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "k8s_openapi" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".k8s-openapi."0.13.1" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "kube" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kube."0.62.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "sodiumoxide" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".kuska-sodiumoxide."0.2.5-0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_rpc" then "openssl" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".openssl."0.10.38" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web" then "pnet_datalink" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".pnet_datalink."0.28.0" { inherit profileName; };
|
||||
|
@ -1702,7 +1702,7 @@ in
|
|||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "http" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".http."0.2.6" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "hyper" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".hyper."0.14.18" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "lazy_static" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".lazy_static."1.4.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "netapp" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "opentelemetry" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rand" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; };
|
||||
${ if rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client" then "rmp_serde" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".rmp-serde."0.15.5" { inherit profileName; };
|
||||
|
@ -2823,11 +2823,11 @@ in
|
|||
};
|
||||
});
|
||||
|
||||
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.1" = overridableMkRustCrate (profileName: rec {
|
||||
"registry+https://github.com/rust-lang/crates.io-index".netapp."0.5.2" = overridableMkRustCrate (profileName: rec {
|
||||
name = "netapp";
|
||||
version = "0.5.1";
|
||||
version = "0.5.2";
|
||||
registry = "registry+https://github.com/rust-lang/crates.io-index";
|
||||
src = fetchCratesIo { inherit name version; sha256 = "06c7cbf05d7cd6e4bc51340934d60c798010bf1856769cf8508caaac1db23db6"; };
|
||||
src = fetchCratesIo { inherit name version; sha256 = "4ffe47ac46d3b2ce2f736a70865492df082e042eb2bfdddfca3b8dd66bd9469d"; };
|
||||
features = builtins.concatLists [
|
||||
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_util" || rootFeatures' ? "garage_web" || rootFeatures' ? "k2v-client") "default")
|
||||
(lib.optional (rootFeatures' ? "garage" || rootFeatures' ? "garage_api" || rootFeatures' ? "garage_block" || rootFeatures' ? "garage_model" || rootFeatures' ? "garage_rpc" || rootFeatures' ? "garage_table" || rootFeatures' ? "garage_web") "opentelemetry")
|
||||
|
@ -3888,7 +3888,7 @@ in
|
|||
];
|
||||
dependencies = {
|
||||
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
|
||||
${ if hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" || hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
|
||||
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "dragonfly" || hostPlatform.parsed.kernel.name == "freebsd" || hostPlatform.parsed.kernel.name == "illumos" || hostPlatform.parsed.kernel.name == "netbsd" || hostPlatform.parsed.kernel.name == "openbsd" || hostPlatform.parsed.kernel.name == "solaris" then "once_cell" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".once_cell."1.10.0" { inherit profileName; };
|
||||
${ if hostPlatform.parsed.cpu.name == "i686" || hostPlatform.parsed.cpu.name == "x86_64" || (hostPlatform.parsed.cpu.name == "aarch64" || hostPlatform.parsed.cpu.name == "armv6l" || hostPlatform.parsed.cpu.name == "armv7l") && (hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "fuchsia" || hostPlatform.parsed.kernel.name == "linux") then "spin" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".spin."0.5.2" { inherit profileName; };
|
||||
untrusted = rustPackages."registry+https://github.com/rust-lang/crates.io-index".untrusted."0.7.1" { inherit profileName; };
|
||||
${ if hostPlatform.parsed.cpu.name == "wasm32" && hostPlatform.parsed.vendor.name == "unknown" && hostPlatform.parsed.kernel.name == "unknown" && hostPlatform.parsed.abi.name == "" then "web_sys" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".web-sys."0.3.56" { inherit profileName; };
|
||||
|
@ -4498,7 +4498,7 @@ in
|
|||
];
|
||||
dependencies = {
|
||||
bitflags = rustPackages."registry+https://github.com/rust-lang/crates.io-index".bitflags."1.3.2" { inherit profileName; };
|
||||
${ if hostPlatform.parsed.kernel.name == "android" || hostPlatform.parsed.kernel.name == "linux" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
|
||||
${ if hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android" then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.121" { inherit profileName; };
|
||||
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot."0.11.2" { inherit profileName; };
|
||||
${ if !(hostPlatform.parsed.kernel.name == "linux" || hostPlatform.parsed.kernel.name == "android") then "parking_lot_core" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".parking_lot_core."0.8.5" { inherit profileName; };
|
||||
static_init_macro = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".static_init_macro."1.0.2" { profileName = "__noProfile"; };
|
||||
|
@ -5602,7 +5602,7 @@ in
|
|||
${ if hostPlatform.config == "i686-pc-windows-gnu" || hostPlatform.config == "i686-uwp-windows-gnu" then "windows_i686_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_gnu."0.32.0" { inherit profileName; };
|
||||
${ if hostPlatform.config == "i686-pc-windows-msvc" || hostPlatform.config == "i686-uwp-windows-msvc" then "windows_i686_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_i686_msvc."0.32.0" { inherit profileName; };
|
||||
${ if hostPlatform.config == "x86_64-uwp-windows-gnu" || hostPlatform.config == "x86_64-pc-windows-gnu" then "windows_x86_64_gnu" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_gnu."0.32.0" { inherit profileName; };
|
||||
${ if hostPlatform.config == "x86_64-pc-windows-msvc" || hostPlatform.config == "x86_64-uwp-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
|
||||
${ if hostPlatform.config == "x86_64-uwp-windows-msvc" || hostPlatform.config == "x86_64-pc-windows-msvc" then "windows_x86_64_msvc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".windows_x86_64_msvc."0.32.0" { inherit profileName; };
|
||||
};
|
||||
});
|
||||
|
||||
|
|
|
@ -41,9 +41,6 @@ use crate::resync::*;
|
|||
/// Size under which data will be stored inlined in database instead of as files
|
||||
pub const INLINE_THRESHOLD: usize = 3072;
|
||||
|
||||
// Timeout for RPCs that read and write blocks to remote nodes
|
||||
pub(crate) const BLOCK_RW_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
// The delay between the moment when the reference counter
|
||||
// drops to zero, and the moment where we allow ourselves
|
||||
// to delete the block locally.
|
||||
|
@ -183,7 +180,7 @@ impl BlockManager {
|
|||
};
|
||||
return Ok((header, stream));
|
||||
}
|
||||
_ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
|
||||
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
|
||||
debug!("Node {:?} didn't return block in time, trying next.", node);
|
||||
}
|
||||
};
|
||||
|
@ -235,7 +232,7 @@ impl BlockManager {
|
|||
}
|
||||
}
|
||||
}
|
||||
_ = tokio::time::sleep(BLOCK_RW_TIMEOUT) => {
|
||||
_ = tokio::time::sleep(self.system.rpc.rpc_timeout()) => {
|
||||
debug!("Node {:?} didn't return block in time, trying next.", node);
|
||||
}
|
||||
};
|
||||
|
@ -300,8 +297,7 @@ impl BlockManager {
|
|||
&who[..],
|
||||
put_block_rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||
.with_quorum(self.replication.write_quorum())
|
||||
.with_timeout(BLOCK_RW_TIMEOUT),
|
||||
.with_quorum(self.replication.write_quorum()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -336,7 +332,10 @@ impl BlockManager {
|
|||
// we will fecth it from someone.
|
||||
let this = self.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = this.resync.put_to_resync(&hash, 2 * BLOCK_RW_TIMEOUT) {
|
||||
if let Err(e) = this
|
||||
.resync
|
||||
.put_to_resync(&hash, 2 * this.system.rpc.rpc_timeout())
|
||||
{
|
||||
error!("Block {:?} could not be put in resync queue: {}.", hash, e);
|
||||
}
|
||||
});
|
||||
|
@ -444,7 +443,8 @@ impl BlockManager {
|
|||
Ok(c) => c,
|
||||
Err(e) => {
|
||||
// Not found but maybe we should have had it ??
|
||||
self.resync.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
|
||||
self.resync
|
||||
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
|
||||
return Err(Into::into(e));
|
||||
}
|
||||
};
|
||||
|
|
|
@ -33,14 +33,6 @@ use garage_table::replication::TableReplication;
|
|||
|
||||
use crate::manager::*;
|
||||
|
||||
// Timeout for RPCs that ask other nodes whether they need a copy
|
||||
// of a given block before we delete it locally
|
||||
// The timeout here is relatively low because we don't want to block
|
||||
// the entire resync loop when some nodes are not responding.
|
||||
// Nothing will be deleted if the nodes don't answer the queries,
|
||||
// we will just retry later.
|
||||
const NEED_BLOCK_QUERY_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
|
||||
// The delay between the time where a resync operation fails
|
||||
// and the time when it is retried, with exponential backoff
|
||||
// (multiplied by 2, 4, 8, 16, etc. for every consecutive failure).
|
||||
|
@ -346,8 +338,7 @@ impl BlockResyncManager {
|
|||
&manager.endpoint,
|
||||
&who,
|
||||
BlockRpc::NeedBlockQuery(*hash),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -394,8 +385,7 @@ impl BlockResyncManager {
|
|||
&need_nodes[..],
|
||||
put_block_message,
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_quorum(need_nodes.len())
|
||||
.with_timeout(BLOCK_RW_TIMEOUT),
|
||||
.with_quorum(need_nodes.len()),
|
||||
)
|
||||
.await
|
||||
.err_context("PutBlock RPC")?;
|
||||
|
|
|
@ -15,7 +15,7 @@ pub fn make_node_id(node_id: Uuid) -> K2VNodeId {
|
|||
u64::from_be_bytes(tmp)
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Debug, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
|
||||
pub struct CausalContext {
|
||||
pub vector_clock: BTreeMap<K2VNodeId, u64>,
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@ pub const CONFLICTS: &str = "conflicts";
|
|||
pub const VALUES: &str = "values";
|
||||
pub const BYTES: &str = "bytes";
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct K2VItem {
|
||||
pub partition: K2VItemPartition,
|
||||
pub sort_key: String,
|
||||
|
@ -25,19 +25,19 @@ pub struct K2VItem {
|
|||
items: BTreeMap<K2VNodeId, DvvsEntry>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize, Hash, Eq)]
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize, Hash)]
|
||||
pub struct K2VItemPartition {
|
||||
pub bucket_id: Uuid,
|
||||
pub partition_key: String,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
struct DvvsEntry {
|
||||
t_discard: u64,
|
||||
values: Vec<(u64, DvvsValue)>,
|
||||
}
|
||||
|
||||
#[derive(PartialEq, Clone, Debug, Serialize, Deserialize)]
|
||||
#[derive(PartialEq, Eq, Clone, Debug, Serialize, Deserialize)]
|
||||
pub enum DvvsValue {
|
||||
Value(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
Deleted,
|
||||
|
|
|
@ -23,7 +23,6 @@ use garage_rpc::system::System;
|
|||
use garage_rpc::*;
|
||||
|
||||
use garage_table::replication::{TableReplication, TableShardedReplication};
|
||||
use garage_table::table::TABLE_RPC_TIMEOUT;
|
||||
use garage_table::{PartitionKey, Table};
|
||||
|
||||
use crate::k2v::causality::*;
|
||||
|
@ -117,7 +116,6 @@ impl K2VRpcHandler {
|
|||
}),
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(1)
|
||||
.with_timeout(TABLE_RPC_TIMEOUT)
|
||||
.interrupt_after_quorum(true),
|
||||
)
|
||||
.await?;
|
||||
|
@ -169,7 +167,6 @@ impl K2VRpcHandler {
|
|||
K2VRpc::InsertManyItems(items),
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(1)
|
||||
.with_timeout(TABLE_RPC_TIMEOUT)
|
||||
.interrupt_after_quorum(true),
|
||||
)
|
||||
.await?;
|
||||
|
@ -205,22 +202,23 @@ impl K2VRpcHandler {
|
|||
.replication
|
||||
.write_nodes(&poll_key.partition.hash());
|
||||
|
||||
let resps = self
|
||||
.system
|
||||
.rpc
|
||||
.try_call_many(
|
||||
&self.endpoint,
|
||||
&nodes[..],
|
||||
K2VRpc::PollItem {
|
||||
key: poll_key,
|
||||
causal_context,
|
||||
timeout_msec,
|
||||
},
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(self.item_table.data.replication.read_quorum())
|
||||
.with_timeout(Duration::from_millis(timeout_msec) + TABLE_RPC_TIMEOUT),
|
||||
)
|
||||
.await?;
|
||||
let rpc = self.system.rpc.try_call_many(
|
||||
&self.endpoint,
|
||||
&nodes[..],
|
||||
K2VRpc::PollItem {
|
||||
key: poll_key,
|
||||
causal_context,
|
||||
timeout_msec,
|
||||
},
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(self.item_table.data.replication.read_quorum())
|
||||
.without_timeout(),
|
||||
);
|
||||
let timeout_duration = Duration::from_millis(timeout_msec) + self.system.rpc.rpc_timeout();
|
||||
let resps = select! {
|
||||
r = rpc => r?,
|
||||
_ = tokio::time::sleep(timeout_duration) => return Ok(None),
|
||||
};
|
||||
|
||||
let mut resp: Option<K2VItem> = None;
|
||||
for v in resps {
|
||||
|
|
|
@ -45,7 +45,7 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
|
|||
tokio-stream = { version = "0.1", features = ["net"] }
|
||||
opentelemetry = "0.17"
|
||||
|
||||
netapp = { version = "0.5.1", features = ["telemetry"] }
|
||||
netapp = { version = "0.5.2", features = ["telemetry"] }
|
||||
|
||||
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }
|
||||
|
||||
|
|
|
@ -1,31 +1,18 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use opentelemetry::{global, metrics::*};
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
/// TableMetrics reference all counter used for metrics
|
||||
pub struct RpcMetrics {
|
||||
pub(crate) _rpc_available_permits: ValueObserver<u64>,
|
||||
|
||||
pub(crate) rpc_counter: Counter<u64>,
|
||||
pub(crate) rpc_timeout_counter: Counter<u64>,
|
||||
pub(crate) rpc_netapp_error_counter: Counter<u64>,
|
||||
pub(crate) rpc_garage_error_counter: Counter<u64>,
|
||||
|
||||
pub(crate) rpc_duration: ValueRecorder<f64>,
|
||||
pub(crate) rpc_queueing_time: ValueRecorder<f64>,
|
||||
}
|
||||
impl RpcMetrics {
|
||||
pub fn new(sem: Arc<Semaphore>) -> Self {
|
||||
pub fn new() -> Self {
|
||||
let meter = global::meter("garage_rpc");
|
||||
RpcMetrics {
|
||||
_rpc_available_permits: meter
|
||||
.u64_value_observer("rpc.available_permits", move |observer| {
|
||||
observer.observe(sem.available_permits() as u64, &[])
|
||||
})
|
||||
.with_description("Number of available RPC permits")
|
||||
.init(),
|
||||
|
||||
rpc_counter: meter
|
||||
.u64_counter("rpc.request_counter")
|
||||
.with_description("Number of RPC requests emitted")
|
||||
|
@ -46,10 +33,6 @@ impl RpcMetrics {
|
|||
.f64_value_recorder("rpc.duration")
|
||||
.with_description("Duration of RPCs")
|
||||
.init(),
|
||||
rpc_queueing_time: meter
|
||||
.f64_value_recorder("rpc.queueing_time")
|
||||
.with_description("Time RPC requests were queued for before being sent")
|
||||
.init(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -7,7 +7,7 @@ use futures::stream::futures_unordered::FuturesUnordered;
|
|||
use futures::stream::StreamExt;
|
||||
use futures_util::future::FutureExt;
|
||||
use tokio::select;
|
||||
use tokio::sync::{watch, Semaphore};
|
||||
use tokio::sync::watch;
|
||||
|
||||
use opentelemetry::KeyValue;
|
||||
use opentelemetry::{
|
||||
|
@ -32,32 +32,37 @@ use garage_util::metrics::RecordDuration;
|
|||
use crate::metrics::RpcMetrics;
|
||||
use crate::ring::Ring;
|
||||
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
// Don't allow more than 100 concurrent outgoing RPCs.
|
||||
const MAX_CONCURRENT_REQUESTS: usize = 100;
|
||||
// Default RPC timeout = 5 minutes
|
||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
|
||||
/// Strategy to apply when making RPC
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct RequestStrategy {
|
||||
/// Max time to wait for reponse
|
||||
pub rs_timeout: Duration,
|
||||
/// Min number of response to consider the request successful
|
||||
pub rs_quorum: Option<usize>,
|
||||
/// Should requests be dropped after enough response are received
|
||||
pub rs_interrupt_after_quorum: bool,
|
||||
/// Request priority
|
||||
pub rs_priority: RequestPriority,
|
||||
/// Custom timeout for this request
|
||||
rs_timeout: Timeout,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
enum Timeout {
|
||||
None,
|
||||
Default,
|
||||
Custom(Duration),
|
||||
}
|
||||
|
||||
impl RequestStrategy {
|
||||
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||
pub fn with_priority(prio: RequestPriority) -> Self {
|
||||
RequestStrategy {
|
||||
rs_timeout: DEFAULT_TIMEOUT,
|
||||
rs_quorum: None,
|
||||
rs_interrupt_after_quorum: false,
|
||||
rs_priority: prio,
|
||||
rs_timeout: Timeout::Default,
|
||||
}
|
||||
}
|
||||
/// Set quorum to be reached for request
|
||||
|
@ -65,17 +70,22 @@ impl RequestStrategy {
|
|||
self.rs_quorum = Some(quorum);
|
||||
self
|
||||
}
|
||||
/// Set timeout of the strategy
|
||||
pub fn with_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.rs_timeout = timeout;
|
||||
self
|
||||
}
|
||||
/// Set if requests can be dropped after quorum has been reached
|
||||
/// In general true for read requests, and false for write
|
||||
pub fn interrupt_after_quorum(mut self, interrupt: bool) -> Self {
|
||||
self.rs_interrupt_after_quorum = interrupt;
|
||||
self
|
||||
}
|
||||
/// Deactivate timeout for this request
|
||||
pub fn without_timeout(mut self) -> Self {
|
||||
self.rs_timeout = Timeout::None;
|
||||
self
|
||||
}
|
||||
/// Set custom timeout for this request
|
||||
pub fn with_custom_timeout(mut self, timeout: Duration) -> Self {
|
||||
self.rs_timeout = Timeout::Custom(timeout);
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -86,8 +96,8 @@ struct RpcHelperInner {
|
|||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
||||
background: Arc<BackgroundRunner>,
|
||||
ring: watch::Receiver<Arc<Ring>>,
|
||||
request_buffer_semaphore: Arc<Semaphore>,
|
||||
metrics: RpcMetrics,
|
||||
rpc_timeout: Duration,
|
||||
}
|
||||
|
||||
impl RpcHelper {
|
||||
|
@ -96,21 +106,24 @@ impl RpcHelper {
|
|||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
||||
background: Arc<BackgroundRunner>,
|
||||
ring: watch::Receiver<Arc<Ring>>,
|
||||
rpc_timeout: Option<Duration>,
|
||||
) -> Self {
|
||||
let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
|
||||
|
||||
let metrics = RpcMetrics::new(sem.clone());
|
||||
let metrics = RpcMetrics::new();
|
||||
|
||||
Self(Arc::new(RpcHelperInner {
|
||||
our_node_id,
|
||||
fullmesh,
|
||||
background,
|
||||
ring,
|
||||
request_buffer_semaphore: sem,
|
||||
metrics,
|
||||
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
|
||||
}))
|
||||
}
|
||||
|
||||
pub fn rpc_timeout(&self) -> Duration {
|
||||
self.0.rpc_timeout
|
||||
}
|
||||
|
||||
pub async fn call<M, N, H, S>(
|
||||
&self,
|
||||
endpoint: &Endpoint<M, H>,
|
||||
|
@ -129,13 +142,6 @@ impl RpcHelper {
|
|||
KeyValue::new("to", format!("{:?}", to)),
|
||||
];
|
||||
|
||||
let permit = self
|
||||
.0
|
||||
.request_buffer_semaphore
|
||||
.acquire()
|
||||
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
|
||||
.await?;
|
||||
|
||||
self.0.metrics.rpc_counter.add(1, &metric_tags);
|
||||
|
||||
let node_id = to.into();
|
||||
|
@ -143,10 +149,16 @@ impl RpcHelper {
|
|||
.call_streaming(&node_id, msg, strat.rs_priority)
|
||||
.record_duration(&self.0.metrics.rpc_duration, &metric_tags);
|
||||
|
||||
let timeout = async {
|
||||
match strat.rs_timeout {
|
||||
Timeout::None => futures::future::pending().await,
|
||||
Timeout::Default => tokio::time::sleep(self.0.rpc_timeout).await,
|
||||
Timeout::Custom(t) => tokio::time::sleep(t).await,
|
||||
}
|
||||
};
|
||||
|
||||
select! {
|
||||
res = rpc_call => {
|
||||
drop(permit);
|
||||
|
||||
if res.is_err() {
|
||||
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
|
||||
}
|
||||
|
@ -158,8 +170,7 @@ impl RpcHelper {
|
|||
|
||||
Ok(res?)
|
||||
}
|
||||
_ = tokio::time::sleep(strat.rs_timeout) => {
|
||||
drop(permit);
|
||||
() = timeout => {
|
||||
self.0.metrics.rpc_timeout_counter.add(1, &metric_tags);
|
||||
Err(Error::Timeout)
|
||||
}
|
||||
|
@ -413,7 +424,7 @@ impl RpcHelper {
|
|||
.iter()
|
||||
.find(|x| x.id.as_ref() == to.as_slice())
|
||||
.and_then(|pi| pi.avg_ping)
|
||||
.unwrap_or_else(|| Duration::from_secs(1));
|
||||
.unwrap_or_else(|| Duration::from_secs(10));
|
||||
(
|
||||
*to != self.0.our_node_id,
|
||||
peer_zone != our_zone,
|
||||
|
|
|
@ -37,7 +37,6 @@ use crate::rpc_helper::*;
|
|||
|
||||
const DISCOVERY_INTERVAL: Duration = Duration::from_secs(60);
|
||||
const STATUS_EXCHANGE_INTERVAL: Duration = Duration::from_secs(10);
|
||||
const SYSTEM_RPC_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
|
||||
/// Version tag used for version check upon Netapp connection.
|
||||
/// Cluster nodes with different version tags are deemed
|
||||
|
@ -280,6 +279,9 @@ impl System {
|
|||
|
||||
let netapp = NetApp::new(GARAGE_VERSION_TAG, network_key, node_key);
|
||||
let fullmesh = FullMeshPeeringStrategy::new(netapp.clone(), vec![], rpc_public_addr);
|
||||
if let Some(ping_timeout) = config.rpc_ping_timeout_msec {
|
||||
fullmesh.set_ping_timeout_millis(ping_timeout);
|
||||
}
|
||||
|
||||
let system_endpoint = netapp.endpoint(SYSTEM_RPC_PATH.into());
|
||||
|
||||
|
@ -317,7 +319,13 @@ impl System {
|
|||
node_status: RwLock::new(HashMap::new()),
|
||||
netapp: netapp.clone(),
|
||||
fullmesh: fullmesh.clone(),
|
||||
rpc: RpcHelper::new(netapp.id.into(), fullmesh, background.clone(), ring.clone()),
|
||||
rpc: RpcHelper::new(
|
||||
netapp.id.into(),
|
||||
fullmesh,
|
||||
background.clone(),
|
||||
ring.clone(),
|
||||
config.rpc_timeout_msec.map(Duration::from_millis),
|
||||
),
|
||||
system_endpoint,
|
||||
replication_factor,
|
||||
rpc_listen_addr: config.rpc_bind_addr,
|
||||
|
@ -600,7 +608,7 @@ impl System {
|
|||
.broadcast(
|
||||
&self.system_endpoint,
|
||||
SystemRpc::AdvertiseStatus(local_status),
|
||||
RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_HIGH),
|
||||
)
|
||||
.await;
|
||||
|
||||
|
@ -724,7 +732,7 @@ impl System {
|
|||
&self.system_endpoint,
|
||||
peer,
|
||||
SystemRpc::PullClusterLayout,
|
||||
RequestStrategy::with_priority(PRIO_HIGH).with_timeout(SYSTEM_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_HIGH),
|
||||
)
|
||||
.await;
|
||||
if let Ok(SystemRpc::AdvertiseClusterLayout(layout)) = resp {
|
||||
|
|
|
@ -25,8 +25,6 @@ use crate::replication::*;
|
|||
use crate::schema::*;
|
||||
|
||||
const TABLE_GC_BATCH_SIZE: usize = 1024;
|
||||
// Same timeout as NEED_BLOCK_QUERY_TIMEOUT in block manager
|
||||
const TABLE_GC_RPC_TIMEOUT: Duration = Duration::from_secs(15);
|
||||
|
||||
// GC delay for table entries: 1 day (24 hours)
|
||||
// (the delay before the entry is added in the GC todo list
|
||||
|
@ -237,9 +235,7 @@ where
|
|||
&self.endpoint,
|
||||
&nodes[..],
|
||||
GcRpc::Update(updates),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_quorum(nodes.len())
|
||||
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
|
||||
)
|
||||
.await
|
||||
.err_context("GC: send tombstones")?;
|
||||
|
@ -260,9 +256,7 @@ where
|
|||
&self.endpoint,
|
||||
&nodes[..],
|
||||
GcRpc::DeleteIfEqualHash(deletes),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_quorum(nodes.len())
|
||||
.with_timeout(TABLE_GC_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
|
||||
)
|
||||
.await
|
||||
.err_context("GC: remote delete tombstones")?;
|
||||
|
|
|
@ -24,9 +24,6 @@ use crate::merkle::*;
|
|||
use crate::replication::*;
|
||||
use crate::*;
|
||||
|
||||
// Sync RPC can contain a lot of data, so have a 1min timeout
|
||||
const TABLE_SYNC_RPC_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
// Do anti-entropy every 10 minutes
|
||||
const ANTI_ENTROPY_INTERVAL: Duration = Duration::from_secs(10 * 60);
|
||||
|
||||
|
@ -248,9 +245,7 @@ where
|
|||
&self.endpoint,
|
||||
nodes,
|
||||
SyncRpc::Items(values),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_quorum(nodes.len())
|
||||
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND).with_quorum(nodes.len()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -311,8 +306,7 @@ where
|
|||
&self.endpoint,
|
||||
who,
|
||||
SyncRpc::RootCkHash(partition.partition, root_ck_hash),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -368,8 +362,7 @@ where
|
|||
&self.endpoint,
|
||||
who,
|
||||
SyncRpc::GetNode(key.clone()),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
||||
)
|
||||
.await?
|
||||
{
|
||||
|
@ -445,8 +438,7 @@ where
|
|||
&self.endpoint,
|
||||
who,
|
||||
SyncRpc::Items(values),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
.with_timeout(TABLE_SYNC_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND),
|
||||
)
|
||||
.await?;
|
||||
if let SyncRpc::Ok = rpc_resp {
|
||||
|
|
|
@ -1,7 +1,6 @@
|
|||
use std::borrow::Borrow;
|
||||
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use futures::stream::*;
|
||||
|
@ -31,8 +30,6 @@ use crate::schema::*;
|
|||
use crate::sync::*;
|
||||
use crate::util::*;
|
||||
|
||||
pub const TABLE_RPC_TIMEOUT: Duration = Duration::from_secs(30);
|
||||
|
||||
pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
|
||||
pub system: Arc<System>,
|
||||
pub data: Arc<TableData<F, R>>,
|
||||
|
@ -124,8 +121,7 @@ where
|
|||
&who[..],
|
||||
rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(self.data.replication.write_quorum())
|
||||
.with_timeout(TABLE_RPC_TIMEOUT),
|
||||
.with_quorum(self.data.replication.write_quorum()),
|
||||
)
|
||||
.await?;
|
||||
|
||||
|
@ -177,7 +173,7 @@ where
|
|||
&self.endpoint,
|
||||
node,
|
||||
rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL).with_timeout(TABLE_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_NORMAL),
|
||||
)
|
||||
.await?;
|
||||
Ok::<_, Error>((node, resp))
|
||||
|
@ -234,7 +230,6 @@ where
|
|||
rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(self.data.replication.read_quorum())
|
||||
.with_timeout(TABLE_RPC_TIMEOUT)
|
||||
.interrupt_after_quorum(true),
|
||||
)
|
||||
.await?;
|
||||
|
@ -329,7 +324,6 @@ where
|
|||
rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(self.data.replication.read_quorum())
|
||||
.with_timeout(TABLE_RPC_TIMEOUT)
|
||||
.interrupt_after_quorum(true),
|
||||
)
|
||||
.await?;
|
||||
|
@ -406,9 +400,7 @@ where
|
|||
&self.endpoint,
|
||||
who,
|
||||
TableRpc::<F>::Update(vec![what_enc]),
|
||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||
.with_quorum(who.len())
|
||||
.with_timeout(TABLE_RPC_TIMEOUT),
|
||||
RequestStrategy::with_priority(PRIO_NORMAL).with_quorum(who.len()),
|
||||
)
|
||||
.await?;
|
||||
Ok(())
|
||||
|
|
|
@ -41,6 +41,11 @@ pub struct Config {
|
|||
/// Public IP address of this node
|
||||
pub rpc_public_addr: Option<String>,
|
||||
|
||||
/// Timeout for Netapp's ping messagess
|
||||
pub rpc_ping_timeout_msec: Option<u64>,
|
||||
/// Timeout for Netapp RPC calls
|
||||
pub rpc_timeout_msec: Option<u64>,
|
||||
|
||||
/// Bootstrap peers RPC address
|
||||
#[serde(default)]
|
||||
pub bootstrap_peers: Vec<String>,
|
||||
|
|
Loading…
Reference in a new issue