use netapp streaming body #343

Merged
lx merged 31 commits from netapp-stream-body into main 2022-09-13 13:26:09 +00:00
12 changed files with 97 additions and 112 deletions
Showing only changes of commit 8e7e680afe - Show all commits

74
Cargo.lock generated
View file

@ -154,7 +154,7 @@ dependencies = [
"aws-smithy-types", "aws-smithy-types",
"aws-smithy-xml", "aws-smithy-xml",
"aws-types", "aws-types",
"bytes 1.1.0", "bytes 1.2.0",
"http", "http",
"md5", "md5",
"tokio-stream", "tokio-stream",
@ -184,7 +184,7 @@ checksum = "51d371fb688d909e5b866ff1f297bbec4621eed4f9fcdac566fcc33541f0c6a6"
dependencies = [ dependencies = [
"aws-smithy-eventstream", "aws-smithy-eventstream",
"aws-smithy-http", "aws-smithy-http",
"bytes 1.1.0", "bytes 1.2.0",
"form_urlencoded", "form_urlencoded",
"hex", "hex",
"http", "http",
@ -218,7 +218,7 @@ dependencies = [
"aws-smithy-http", "aws-smithy-http",
"aws-smithy-http-tower", "aws-smithy-http-tower",
"aws-smithy-types", "aws-smithy-types",
"bytes 1.1.0", "bytes 1.2.0",
"fastrand", "fastrand",
"http", "http",
"http-body", "http-body",
@ -239,7 +239,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f972226c639e0dc1eca2cb0220c1b5799e2bfc62eda37845b662c5d0cb972371" checksum = "f972226c639e0dc1eca2cb0220c1b5799e2bfc62eda37845b662c5d0cb972371"
dependencies = [ dependencies = [
"aws-smithy-types", "aws-smithy-types",
"bytes 1.1.0", "bytes 1.2.0",
"crc32fast", "crc32fast",
] ]
@ -251,7 +251,7 @@ checksum = "12c787e24b757634453a60ff05948aa1b450f5b3a7a2094f22acff8a5022635b"
dependencies = [ dependencies = [
"aws-smithy-eventstream", "aws-smithy-eventstream",
"aws-smithy-types", "aws-smithy-types",
"bytes 1.1.0", "bytes 1.2.0",
"bytes-utils", "bytes-utils",
"futures-core", "futures-core",
"http", "http",
@ -271,7 +271,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64f80a2c56fc09fc9a2da3c63f286ec2a89465433219f8165e14e522283a5eb8" checksum = "64f80a2c56fc09fc9a2da3c63f286ec2a89465433219f8165e14e522283a5eb8"
dependencies = [ dependencies = [
"aws-smithy-http", "aws-smithy-http",
"bytes 1.1.0", "bytes 1.2.0",
"http", "http",
"http-body", "http-body",
"pin-project 1.0.10", "pin-project 1.0.10",
@ -391,9 +391,9 @@ checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.1.0" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e"
[[package]] [[package]]
name = "bytes-utils" name = "bytes-utils"
@ -401,7 +401,7 @@ version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1" checksum = "1934a3ef9cac8efde4966a92781e77713e1ba329f1d42e446c7d7eba340d8ef1"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"either", "either",
] ]
@ -982,7 +982,7 @@ dependencies = [
"async-trait", "async-trait",
"aws-sdk-s3", "aws-sdk-s3",
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"bytesize", "bytesize",
"chrono", "chrono",
"futures", "futures",
@ -1026,7 +1026,7 @@ version = "0.7.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"chrono", "chrono",
"crypto-common", "crypto-common",
"err-derive 0.3.1", "err-derive 0.3.1",
@ -1071,7 +1071,7 @@ version = "0.7.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.2.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_db", "garage_db",
@ -1166,7 +1166,7 @@ checksum = "81e693aa4582cfe7a7ce70c07880e3662544b5d0cd68bc4b59c53febfbb8d1ec"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.2.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_util 0.5.1", "garage_util 0.5.1",
@ -1191,7 +1191,7 @@ version = "0.7.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.2.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_util 0.7.0", "garage_util 0.7.0",
@ -1224,7 +1224,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c3557f3757e2acd29eaee86804d4e6c38d2abda81b4b349d8a0d2277044265c" checksum = "5c3557f3757e2acd29eaee86804d4e6c38d2abda81b4b349d8a0d2277044265c"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.2.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_rpc 0.5.1", "garage_rpc 0.5.1",
@ -1244,7 +1244,7 @@ name = "garage_table"
version = "0.7.0" version = "0.7.0"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"bytes 1.1.0", "bytes 1.2.0",
"futures", "futures",
"futures-util", "futures-util",
"garage_db", "garage_db",
@ -1390,7 +1390,7 @@ version = "0.3.12"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b" checksum = "62eeb471aa3e3c9197aa4bfeabfe02982f6dc96f750486c0bb0009ac58b26d2b"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"fnv", "fnv",
"futures-core", "futures-core",
"futures-sink", "futures-sink",
@ -1525,7 +1525,7 @@ version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03" checksum = "31f4c6746584866f0feabcc69893c5b51beef3831656a968ed7ae254cdc4fd03"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"fnv", "fnv",
"itoa", "itoa",
] ]
@ -1536,7 +1536,7 @@ version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6" checksum = "1ff4f84919677303da5f147645dbea6b1881f368d03ac84e1dc09031ebd7b2c6"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"http", "http",
"pin-project-lite", "pin-project-lite",
] ]
@ -1580,7 +1580,7 @@ version = "0.14.18"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2" checksum = "b26ae0a80afebe130861d90abf98e3814a4f28a4c6ffeb5ab8ebb2be311e0ef2"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"futures-channel", "futures-channel",
"futures-core", "futures-core",
"futures-util", "futures-util",
@ -1633,7 +1633,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"hyper", "hyper",
"native-tls", "native-tls",
"tokio", "tokio",
@ -1781,7 +1781,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4f8de9873b904e74b3533f77493731ee26742418077503683db44e1b3c54aa5c" checksum = "4f8de9873b904e74b3533f77493731ee26742418077503683db44e1b3c54aa5c"
dependencies = [ dependencies = [
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"chrono", "chrono",
"http", "http",
"percent-encoding", "percent-encoding",
@ -1810,7 +1810,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8cd12d68768b54bbd50547f4c7b57b73cff680ef8da3ba409463ee995cf0d707" checksum = "8cd12d68768b54bbd50547f4c7b57b73cff680ef8da3ba409463ee995cf0d707"
dependencies = [ dependencies = [
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"chrono", "chrono",
"dirs-next", "dirs-next",
"either", "either",
@ -2081,7 +2081,7 @@ version = "2.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836" checksum = "5f8f35e687561d5c1667590911e6698a8cb714a134a7505718a182e7bc9d3836"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"encoding_rs", "encoding_rs",
"futures-util", "futures-util",
"http", "http",
@ -2142,12 +2142,11 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.4.4" version = "0.4.4"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "git+https://git.deuxfleurs.fr/lx/netapp?branch=stream-body#cbc21e40acfc420a3e452a1fd488c6a96694b0f2"
checksum = "c6419a4b836774192e13fedb05c0e5f414ee8df9ca0c467456a0bacde06c29ee"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 0.6.0", "bytes 1.2.0",
"cfg-if 1.0.0", "cfg-if 1.0.0",
"err-derive 0.2.4", "err-derive 0.2.4",
"futures", "futures",
@ -2157,6 +2156,7 @@ dependencies = [
"log", "log",
"opentelemetry", "opentelemetry",
"opentelemetry-contrib", "opentelemetry-contrib",
"pin-project 1.0.10",
"rand 0.5.6", "rand 0.5.6",
"rmp-serde 0.14.4", "rmp-serde 0.14.4",
"serde", "serde",
@ -2640,7 +2640,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001" checksum = "444879275cb4fd84958b1a1d5420d15e6fcf7c235fe47f053c9c2a80aceb6001"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"prost-derive", "prost-derive",
] ]
@ -2650,7 +2650,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5" checksum = "62941722fb675d463659e49c4f3fe1fe792ff24fe5bbaa9c08cd3b98a1c354f5"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"heck 0.3.3", "heck 0.3.3",
"itertools 0.10.3", "itertools 0.10.3",
"lazy_static", "lazy_static",
@ -2683,7 +2683,7 @@ version = "0.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a" checksum = "534b7a0e836e3c482d2693070f982e39e7611da9695d4d1f5a4b186b51faef0a"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"prost", "prost",
] ]
@ -2986,7 +2986,7 @@ checksum = "1db30db44ea73551326269adcf7a2169428a054f14faf9e1768f2163494f2fa2"
dependencies = [ dependencies = [
"async-trait", "async-trait",
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"crc32fast", "crc32fast",
"futures", "futures",
"http", "http",
@ -3028,7 +3028,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272" checksum = "a5ae95491c8b4847931e291b151127eccd6ff8ca13f33603eb3d0035ecb05272"
dependencies = [ dependencies = [
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"chrono", "chrono",
"digest 0.9.0", "digest 0.9.0",
"futures", "futures",
@ -3594,7 +3594,7 @@ version = "1.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee" checksum = "2af73ac49756f3f7c01172e34a23e5d0216f6c32333757c2c61feb2bbff5a5ee"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"libc", "libc",
"memchr", "memchr",
"mio", "mio",
@ -3667,7 +3667,7 @@ version = "0.6.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0" checksum = "9e99e1983e5d376cd8eb4b66604d2e99e79f5bd988c3055891dcd8c9e2604cc0"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-sink", "futures-sink",
@ -3683,7 +3683,7 @@ version = "0.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1" checksum = "64910e1b9c1901aaf5375561e35b9c057d95ff41a44ede043a03e09279eabaf1"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes 1.2.0",
"futures-core", "futures-core",
"futures-sink", "futures-sink",
"log", "log",
@ -3709,7 +3709,7 @@ dependencies = [
"async-stream", "async-stream",
"async-trait", "async-trait",
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"futures-core", "futures-core",
"futures-util", "futures-util",
"h2", "h2",
@ -3770,7 +3770,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "81eca72647e58054bbfa41e6f297c23436f1c60aff6e5eb38455a0f9ca420bb5" checksum = "81eca72647e58054bbfa41e6f297c23436f1c60aff6e5eb38455a0f9ca420bb5"
dependencies = [ dependencies = [
"base64", "base64",
"bytes 1.1.0", "bytes 1.2.0",
"futures-core", "futures-core",
"futures-util", "futures-util",
"http", "http",

View file

@ -8,7 +8,6 @@ use async_trait::async_trait;
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use futures::future::*;
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::select; use tokio::select;
@ -637,24 +636,24 @@ impl BlockManager {
} }
who.retain(|id| *id != self.system.id); who.retain(|id| *id != self.system.id);
let msg = Arc::new(BlockRpc::NeedBlockQuery(*hash)); let who_needs_resps = self
let who_needs_fut = who.iter().map(|to| { .system
self.system.rpc.call_arc( .rpc
.call_many(
&self.endpoint, &self.endpoint,
*to, &who,
msg.clone(), BlockRpc::NeedBlockQuery(*hash),
RequestStrategy::with_priority(PRIO_BACKGROUND) RequestStrategy::with_priority(PRIO_BACKGROUND)
.with_timeout(NEED_BLOCK_QUERY_TIMEOUT), .with_timeout(NEED_BLOCK_QUERY_TIMEOUT),
) )
}); .await?;
let who_needs_resps = join_all(who_needs_fut).await;
let mut need_nodes = vec![]; let mut need_nodes = vec![];
for (node, needed) in who.iter().zip(who_needs_resps.into_iter()) { for (node, needed) in who_needs_resps.into_iter() {
match needed.err_context("NeedBlockQuery RPC")? { match needed.err_context("NeedBlockQuery RPC")? {
BlockRpc::NeedBlockReply(needed) => { BlockRpc::NeedBlockReply(needed) => {
if needed { if needed {
need_nodes.push(*node); need_nodes.push(node);
} }
} }
m => { m => {

View file

@ -50,9 +50,8 @@ futures = "0.3"
futures-util = "0.3" futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.2", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = "0.4"
#netapp = { version = "0.4", path = "../../../netapp" } netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
netapp = "0.4"
opentelemetry = { version = "0.17", features = [ "rt-tokio" ] } opentelemetry = { version = "0.17", features = [ "rt-tokio" ] }
opentelemetry-prometheus = "0.10" opentelemetry-prometheus = "0.10"

View file

@ -681,7 +681,7 @@ impl AdminRpcHandler {
.endpoint .endpoint
.call( .call(
&node, &node,
&AdminRpc::LaunchRepair(opt_to_send.clone()), AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL, PRIO_NORMAL,
) )
.await; .await;
@ -721,7 +721,7 @@ impl AdminRpcHandler {
let node_id = (*node).into(); let node_id = (*node).into();
match self match self
.endpoint .endpoint
.call(&node_id, &AdminRpc::Stats(opt), PRIO_NORMAL) .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await? .await?
{ {
Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(), Ok(AdminRpc::Ok(s)) => writeln!(&mut ret, "{}", s).unwrap(),

View file

@ -47,7 +47,7 @@ pub async fn cli_command_dispatch(
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> { pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = match rpc_cli let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await?? .await??
{ {
SystemRpc::ReturnKnownNodes(nodes) => nodes, SystemRpc::ReturnKnownNodes(nodes) => nodes,
@ -149,7 +149,7 @@ pub async fn cmd_connect(
args: ConnectNodeOpt, args: ConnectNodeOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, &SystemRpc::Connect(args.node), PRIO_NORMAL) .call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.await?? .await??
{ {
SystemRpc::Ok => { SystemRpc::Ok => {
@ -165,7 +165,7 @@ pub async fn cmd_admin(
rpc_host: NodeID, rpc_host: NodeID,
args: AdminRpc, args: AdminRpc,
) -> Result<(), HelperError> { ) -> Result<(), HelperError> {
match rpc_cli.call(&rpc_host, &args, PRIO_NORMAL).await?? { match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => { AdminRpc::Ok(msg) => {
println!("{}", msg); println!("{}", msg);
} }

View file

@ -36,7 +36,7 @@ pub async fn cmd_assign_role(
args: AssignRoleOpt, args: AssignRoleOpt,
) -> Result<(), Error> { ) -> Result<(), Error> {
let status = match rpc_cli let status = match rpc_cli
.call(&rpc_host, &SystemRpc::GetKnownNodes, PRIO_NORMAL) .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await?? .await??
{ {
SystemRpc::ReturnKnownNodes(nodes) => nodes, SystemRpc::ReturnKnownNodes(nodes) => nodes,
@ -245,7 +245,7 @@ pub async fn fetch_layout(
rpc_host: NodeID, rpc_host: NodeID,
) -> Result<ClusterLayout, Error> { ) -> Result<ClusterLayout, Error> {
match rpc_cli match rpc_cli
.call(&rpc_host, &SystemRpc::PullClusterLayout, PRIO_NORMAL) .call(&rpc_host, SystemRpc::PullClusterLayout, PRIO_NORMAL)
.await?? .await??
{ {
SystemRpc::AdvertiseClusterLayout(t) => Ok(t), SystemRpc::AdvertiseClusterLayout(t) => Ok(t),
@ -261,7 +261,7 @@ pub async fn send_layout(
rpc_cli rpc_cli
.call( .call(
&rpc_host, &rpc_host,
&SystemRpc::AdvertiseClusterLayout(layout), SystemRpc::AdvertiseClusterLayout(layout),
PRIO_NORMAL, PRIO_NORMAL,
) )
.await??; .await??;

View file

@ -40,9 +40,8 @@ futures-util = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
opentelemetry = "0.17" opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = "0.4"
#netapp = { version = "0.4", path = "../../../netapp" } netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
netapp = "0.4"
[features] [features]
k2v = [ "garage_util/k2v" ] k2v = [ "garage_util/k2v" ]

View file

@ -46,9 +46,8 @@ tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi
tokio-stream = { version = "0.1", features = ["net"] } tokio-stream = { version = "0.1", features = ["net"] }
opentelemetry = "0.17" opentelemetry = "0.17"
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = { version = "0.4.4", features = ["telemetry"] }
#netapp = { version = "0.4", path = "../../../netapp", features = ["telemetry"] } netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
netapp = { version = "0.4.4", features = ["telemetry"] }
hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] } hyper = { version = "0.14", features = ["client", "http1", "runtime", "tcp"] }

View file

@ -15,9 +15,9 @@ use opentelemetry::{
Context, Context,
}; };
pub use netapp::endpoint::{Endpoint, EndpointHandler, Message as Rpc}; pub use netapp::endpoint::{Endpoint, EndpointHandler};
pub use netapp::message::{Message as Rpc, *};
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
pub use netapp::proto::*;
pub use netapp::{NetApp, NodeID}; pub use netapp::{NetApp, NodeID};
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
@ -30,10 +30,8 @@ use crate::ring::Ring;
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
// Try to never have more than 200MB of outgoing requests // Don't allow more than 100 concurrent outgoing RPCs.
// buffered at the same time. Other requests are queued until const MAX_CONCURRENT_REQUESTS: usize = 100;
// space is freed.
const REQUEST_BUFFER_SIZE: usize = 200 * 1024 * 1024;
/// Strategy to apply when making RPC /// Strategy to apply when making RPC
#[derive(Copy, Clone)] #[derive(Copy, Clone)]
@ -95,7 +93,7 @@ impl RpcHelper {
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
ring: watch::Receiver<Arc<Ring>>, ring: watch::Receiver<Arc<Ring>>,
) -> Self { ) -> Self {
let sem = Arc::new(Semaphore::new(REQUEST_BUFFER_SIZE)); let sem = Arc::new(Semaphore::new(MAX_CONCURRENT_REQUESTS));
let metrics = RpcMetrics::new(sem.clone()); let metrics = RpcMetrics::new(sem.clone());
@ -109,29 +107,16 @@ impl RpcHelper {
})) }))
} }
pub async fn call<M, H, S>( pub async fn call<M, N, H, S>(
&self, &self,
endpoint: &Endpoint<M, H>, endpoint: &Endpoint<M, H>,
to: Uuid, to: Uuid,
msg: M, msg: N,
strat: RequestStrategy,
) -> Result<S, Error>
where
M: Rpc<Response = Result<S, Error>>,
H: EndpointHandler<M>,
{
self.call_arc(endpoint, to, Arc::new(msg), strat).await
}
pub async fn call_arc<M, H, S>(
&self,
endpoint: &Endpoint<M, H>,
to: Uuid,
msg: Arc<M>,
strat: RequestStrategy, strat: RequestStrategy,
) -> Result<S, Error> ) -> Result<S, Error>
where where
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M> + Send,
H: EndpointHandler<M>, H: EndpointHandler<M>,
{ {
let metric_tags = [ let metric_tags = [
@ -140,11 +125,10 @@ impl RpcHelper {
KeyValue::new("to", format!("{:?}", to)), KeyValue::new("to", format!("{:?}", to)),
]; ];
let msg_size = rmp_to_vec_all_named(&msg)?.len() as u32;
let permit = self let permit = self
.0 .0
.request_buffer_semaphore .request_buffer_semaphore
.acquire_many(msg_size) .acquire()
.record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags) .record_duration(&self.0.metrics.rpc_queueing_time, &metric_tags)
.await?; .await?;
@ -152,7 +136,7 @@ impl RpcHelper {
let node_id = to.into(); let node_id = to.into();
let rpc_call = endpoint let rpc_call = endpoint
.call(&node_id, msg, strat.rs_priority) .call_streaming(&node_id, msg, strat.rs_priority)
.record_duration(&self.0.metrics.rpc_duration, &metric_tags); .record_duration(&self.0.metrics.rpc_duration, &metric_tags);
select! { select! {
@ -162,7 +146,7 @@ impl RpcHelper {
if res.is_err() { if res.is_err() {
self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags); self.0.metrics.rpc_netapp_error_counter.add(1, &metric_tags);
} }
let res = res?; let res = res?.into_msg();
if res.is_err() { if res.is_err() {
self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags); self.0.metrics.rpc_garage_error_counter.add(1, &metric_tags);
@ -178,37 +162,41 @@ impl RpcHelper {
} }
} }
pub async fn call_many<M, H, S>( pub async fn call_many<M, N, H, S>(
&self, &self,
endpoint: &Endpoint<M, H>, endpoint: &Endpoint<M, H>,
to: &[Uuid], to: &[Uuid],
msg: M, msg: N,
strat: RequestStrategy, strat: RequestStrategy,
) -> Vec<(Uuid, Result<S, Error>)> ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where where
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M>,
H: EndpointHandler<M>, H: EndpointHandler<M>,
{ {
let msg = Arc::new(msg); let msg = msg.into_req().map_err(netapp::error::Error::from)?;
let resps = join_all( let resps = join_all(
to.iter() to.iter()
.map(|to| self.call_arc(endpoint, *to, msg.clone(), strat)), .map(|to| self.call(endpoint, *to, msg.clone(), strat)),
) )
.await; .await;
to.iter() Ok(to
.iter()
.cloned() .cloned()
.zip(resps.into_iter()) .zip(resps.into_iter())
.collect::<Vec<_>>() .collect::<Vec<_>>())
} }
pub async fn broadcast<M, H, S>( pub async fn broadcast<M, N, H, S>(
&self, &self,
endpoint: &Endpoint<M, H>, endpoint: &Endpoint<M, H>,
msg: M, msg: N,
strat: RequestStrategy, strat: RequestStrategy,
) -> Vec<(Uuid, Result<S, Error>)> ) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
where where
M: Rpc<Response = Result<S, Error>>, M: Rpc<Response = Result<S, Error>>,
N: IntoReq<M>,
H: EndpointHandler<M>, H: EndpointHandler<M>,
{ {
let to = self let to = self
@ -262,20 +250,21 @@ impl RpcHelper {
.await .await
} }
async fn try_call_many_internal<M, H, S>( async fn try_call_many_internal<M, N, H, S>(
&self, &self,
endpoint: &Arc<Endpoint<M, H>>, endpoint: &Arc<Endpoint<M, H>>,
to: &[Uuid], to: &[Uuid],
msg: M, msg: N,
strategy: RequestStrategy, strategy: RequestStrategy,
quorum: usize, quorum: usize,
) -> Result<Vec<S>, Error> ) -> Result<Vec<S>, Error>
where where
M: Rpc<Response = Result<S, Error>> + 'static, M: Rpc<Response = Result<S, Error>> + 'static,
N: IntoReq<M>,
H: EndpointHandler<M> + 'static, H: EndpointHandler<M> + 'static,
S: Send + 'static, S: Send + 'static,
{ {
let msg = Arc::new(msg); let msg = msg.into_req().map_err(netapp::error::Error::from)?;
// Build future for each request // Build future for each request
// They are not started now: they are added below in a FuturesUnordered // They are not started now: they are added below in a FuturesUnordered
@ -285,7 +274,7 @@ impl RpcHelper {
let msg = msg.clone(); let msg = msg.clone();
let endpoint2 = endpoint.clone(); let endpoint2 = endpoint.clone();
(to, async move { (to, async move {
self2.call_arc(&endpoint2, to, msg, strategy).await self2.call(&endpoint2, to, msg, strategy).await
}) })
}); });

View file

@ -16,8 +16,8 @@ use tokio::sync::watch;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*;
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::proto::*;
use netapp::util::parse_and_resolve_peer_addr; use netapp::util::parse_and_resolve_peer_addr;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
@ -544,7 +544,7 @@ impl System {
SystemRpc::AdvertiseClusterLayout(layout), SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH), RequestStrategy::with_priority(PRIO_HIGH),
) )
.await; .await?;
Ok(()) Ok(())
}); });
self.background.spawn(self.clone().save_cluster_layout()); self.background.spawn(self.clone().save_cluster_layout());
@ -559,7 +559,8 @@ impl System {
self.update_local_status(); self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone(); let local_status: NodeStatus = self.local_status.load().as_ref().clone();
self.rpc let _ = self
.rpc
.broadcast( .broadcast(
&self.system_endpoint, &self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status), SystemRpc::AdvertiseStatus(local_status),

View file

@ -60,7 +60,7 @@ pub trait Entry<P: PartitionKey, S: SortKey>:
} }
/// Trait for the schema used in a table /// Trait for the schema used in a table
pub trait TableSchema: Send + Sync { pub trait TableSchema: Send + Sync + 'static {
/// The name of the table in the database /// The name of the table in the database
const TABLE_NAME: &'static str; const TABLE_NAME: &'static str;

View file

@ -36,9 +36,8 @@ toml = "0.5"
futures = "0.3" futures = "0.3"
tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] } tokio = { version = "1.0", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
#netapp = { version = "0.3.0", git = "https://git.deuxfleurs.fr/lx/netapp" } #netapp = "0.4"
#netapp = { version = "0.4", path = "../../../netapp" } netapp = { version = "0.4.4", git = "https://git.deuxfleurs.fr/lx/netapp", branch = "stream-body", features = ["telemetry"] }
netapp = "0.4"
http = "0.2" http = "0.2"
hyper = "0.14" hyper = "0.14"