[fix-buffering] implement block_ram_buffer_max
to avoid excessive RAM usage
This commit is contained in:
parent
95eb8808e8
commit
0d3e285d13
7 changed files with 148 additions and 13 deletions
|
@ -20,6 +20,7 @@ metadata_auto_snapshot_interval = "6h"
|
||||||
db_engine = "lmdb"
|
db_engine = "lmdb"
|
||||||
|
|
||||||
block_size = "1M"
|
block_size = "1M"
|
||||||
|
block_ram_buffer_max = "256MiB"
|
||||||
|
|
||||||
sled_cache_capacity = "128MiB"
|
sled_cache_capacity = "128MiB"
|
||||||
sled_flush_every_ms = 2000
|
sled_flush_every_ms = 2000
|
||||||
|
@ -88,6 +89,7 @@ The following gives details about each available configuration option.
|
||||||
|
|
||||||
Top-level configuration options:
|
Top-level configuration options:
|
||||||
[`allow_world_readable_secrets`](#allow_world_readable_secrets),
|
[`allow_world_readable_secrets`](#allow_world_readable_secrets),
|
||||||
|
[`block_ram_buffer_max`](#block_ram_buffer_max),
|
||||||
[`block_size`](#block_size),
|
[`block_size`](#block_size),
|
||||||
[`bootstrap_peers`](#bootstrap_peers),
|
[`bootstrap_peers`](#bootstrap_peers),
|
||||||
[`compression_level`](#compression_level),
|
[`compression_level`](#compression_level),
|
||||||
|
@ -420,6 +422,37 @@ files will remain available. This however means that chunks from existing files
|
||||||
will not be deduplicated with chunks from newly uploaded files, meaning you
|
will not be deduplicated with chunks from newly uploaded files, meaning you
|
||||||
might use more storage space that is optimally possible.
|
might use more storage space that is optimally possible.
|
||||||
|
|
||||||
|
#### `block_ram_buffer_max` (since v0.9.4) {#block_ram_buffer_max}
|
||||||
|
|
||||||
|
A limit on the total size of data blocks kept in RAM by S3 API nodes awaiting
|
||||||
|
to be sent to storage nodes asynchronously.
|
||||||
|
|
||||||
|
Explanation: since Garage wants to tolerate node failures, it uses quorum
|
||||||
|
writes to send data blocks to storage nodes: try to write the block to three
|
||||||
|
nodes, and return ok as soon as two writes complete. So even if all three nodes
|
||||||
|
are online, the third write always completes asynchronously. In general, there
|
||||||
|
are not many writes to a cluster, and the third asynchronous write can
|
||||||
|
terminate early enough so as to not cause unbounded RAM growth. However, if
|
||||||
|
the S3 API node is continuously receiving large quantities of data and the
|
||||||
|
third node is never able to catch up, many data blocks will be kept buffered in
|
||||||
|
RAM as they are awaiting transfer to the third node.
|
||||||
|
|
||||||
|
The `block_ram_buffer_max` sets a limit to the size of buffers that can be kept
|
||||||
|
in RAM in this process. When the limit is reached, backpressure is applied
|
||||||
|
back to the S3 client.
|
||||||
|
|
||||||
|
Note that this only counts buffers that have arrived to a certain stage of
|
||||||
|
processing (received from the client + encrypted and/or compressed as
|
||||||
|
necessary) and are ready to send to the storage nodes. Many other buffers will
|
||||||
|
not be counted and this is not a hard limit on RAM consumption. In particular,
|
||||||
|
if many clients send requests simultaneously with large objects, the RAM
|
||||||
|
consumption will always grow linearly with the number of concurrent requests,
|
||||||
|
as each request will use a few buffers of size `block_size` for receiving and
|
||||||
|
intermediate processing before even trying to send the data to the storage
|
||||||
|
node.
|
||||||
|
|
||||||
|
The default value is 256MiB.
|
||||||
|
|
||||||
#### `sled_cache_capacity` {#sled_cache_capacity}
|
#### `sled_cache_capacity` {#sled_cache_capacity}
|
||||||
|
|
||||||
This parameter can be used to tune the capacity of the cache used by
|
This parameter can be used to tune the capacity of the cache used by
|
||||||
|
|
|
@ -225,6 +225,17 @@ block_bytes_read 120586322022
|
||||||
block_bytes_written 3386618077
|
block_bytes_written 3386618077
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### `block_ram_buffer_free_kb` (gauge)
|
||||||
|
|
||||||
|
Kibibytes available for buffering blocks that have to be sent to remote nodes.
|
||||||
|
When clients send too much data to this node and a storage node is not receiving
|
||||||
|
data fast enough due to slower network conditions, this will decrease down to
|
||||||
|
zero and backpressure will be applied.
|
||||||
|
|
||||||
|
```
|
||||||
|
block_ram_buffer_free_kb 219829
|
||||||
|
```
|
||||||
|
|
||||||
#### `block_compression_level` (counter)
|
#### `block_compression_level` (counter)
|
||||||
|
|
||||||
Exposes the block compression level configured for the Garage node.
|
Exposes the block compression level configured for the Garage node.
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
|
||||||
|
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
|
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
|
||||||
|
@ -93,6 +94,7 @@ pub struct BlockManager {
|
||||||
|
|
||||||
pub(crate) system: Arc<System>,
|
pub(crate) system: Arc<System>,
|
||||||
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
||||||
|
buffer_kb_semaphore: Arc<Semaphore>,
|
||||||
|
|
||||||
pub(crate) metrics: BlockManagerMetrics,
|
pub(crate) metrics: BlockManagerMetrics,
|
||||||
|
|
||||||
|
@ -152,11 +154,14 @@ impl BlockManager {
|
||||||
.netapp
|
.netapp
|
||||||
.endpoint("garage_block/manager.rs/Rpc".to_string());
|
.endpoint("garage_block/manager.rs/Rpc".to_string());
|
||||||
|
|
||||||
|
let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
|
||||||
|
|
||||||
let metrics = BlockManagerMetrics::new(
|
let metrics = BlockManagerMetrics::new(
|
||||||
config.compression_level,
|
config.compression_level,
|
||||||
rc.rc.clone(),
|
rc.rc.clone(),
|
||||||
resync.queue.clone(),
|
resync.queue.clone(),
|
||||||
resync.errors.clone(),
|
resync.errors.clone(),
|
||||||
|
buffer_kb_semaphore.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
||||||
|
@ -176,6 +181,7 @@ impl BlockManager {
|
||||||
resync,
|
resync,
|
||||||
system,
|
system,
|
||||||
endpoint,
|
endpoint,
|
||||||
|
buffer_kb_semaphore,
|
||||||
metrics,
|
metrics,
|
||||||
scrub_persister,
|
scrub_persister,
|
||||||
tx_scrub_command: ArcSwapOption::new(None),
|
tx_scrub_command: ArcSwapOption::new(None),
|
||||||
|
@ -361,6 +367,14 @@ impl BlockManager {
|
||||||
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
||||||
.await
|
.await
|
||||||
.into_parts();
|
.into_parts();
|
||||||
|
|
||||||
|
let permit = self
|
||||||
|
.buffer_kb_semaphore
|
||||||
|
.clone()
|
||||||
|
.acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
|
||||||
|
.await
|
||||||
|
.ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
|
||||||
|
|
||||||
let put_block_rpc =
|
let put_block_rpc =
|
||||||
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
|
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
|
||||||
let put_block_rpc = if let Some(tag) = order_tag {
|
let put_block_rpc = if let Some(tag) = order_tag {
|
||||||
|
@ -376,6 +390,7 @@ impl BlockManager {
|
||||||
&who[..],
|
&who[..],
|
||||||
put_block_rpc,
|
put_block_rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||||
|
.with_drop_on_completion(permit)
|
||||||
.with_quorum(self.replication.write_quorum()),
|
.with_quorum(self.replication.write_quorum()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
use opentelemetry::{global, metrics::*};
|
use opentelemetry::{global, metrics::*};
|
||||||
|
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
|
@ -9,6 +13,7 @@ pub struct BlockManagerMetrics {
|
||||||
pub(crate) _rc_size: ValueObserver<u64>,
|
pub(crate) _rc_size: ValueObserver<u64>,
|
||||||
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
||||||
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
||||||
|
pub(crate) _buffer_free_kb: ValueObserver<u64>,
|
||||||
|
|
||||||
pub(crate) resync_counter: BoundCounter<u64>,
|
pub(crate) resync_counter: BoundCounter<u64>,
|
||||||
pub(crate) resync_error_counter: BoundCounter<u64>,
|
pub(crate) resync_error_counter: BoundCounter<u64>,
|
||||||
|
@ -31,6 +36,7 @@ impl BlockManagerMetrics {
|
||||||
rc_tree: db::Tree,
|
rc_tree: db::Tree,
|
||||||
resync_queue: CountedTree,
|
resync_queue: CountedTree,
|
||||||
resync_errors: CountedTree,
|
resync_errors: CountedTree,
|
||||||
|
buffer_semaphore: Arc<Semaphore>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let meter = global::meter("garage_model/block");
|
let meter = global::meter("garage_model/block");
|
||||||
Self {
|
Self {
|
||||||
|
@ -66,6 +72,15 @@ impl BlockManagerMetrics {
|
||||||
.with_description("Number of block hashes whose last resync resulted in an error")
|
.with_description("Number of block hashes whose last resync resulted in an error")
|
||||||
.init(),
|
.init(),
|
||||||
|
|
||||||
|
_buffer_free_kb: meter
|
||||||
|
.u64_value_observer("block.ram_buffer_free_kb", move |observer| {
|
||||||
|
observer.observe(buffer_semaphore.available_permits() as u64, &[])
|
||||||
|
})
|
||||||
|
.with_description(
|
||||||
|
"Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
|
||||||
|
)
|
||||||
|
.init(),
|
||||||
|
|
||||||
resync_counter: meter
|
resync_counter: meter
|
||||||
.u64_counter("block.resync_counter")
|
.u64_counter("block.resync_counter")
|
||||||
.with_description("Number of calls to resync_block")
|
.with_description("Number of calls to resync_block")
|
||||||
|
|
|
@ -300,7 +300,11 @@ impl K2VRpcHandler {
|
||||||
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
|
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
|
||||||
let mut requests = nodes
|
let mut requests = nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
|
.map(|node| {
|
||||||
|
self.system
|
||||||
|
.rpc
|
||||||
|
.call(&self.endpoint, *node, msg.clone(), rs.clone())
|
||||||
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
// Fetch responses. This procedure stops fetching responses when any of the following
|
// Fetch responses. This procedure stops fetching responses when any of the following
|
||||||
|
|
|
@ -33,8 +33,7 @@ use crate::ring::Ring;
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
|
||||||
|
|
||||||
/// Strategy to apply when making RPC
|
/// Strategy to apply when making RPC
|
||||||
#[derive(Copy, Clone)]
|
pub struct RequestStrategy<T> {
|
||||||
pub struct RequestStrategy {
|
|
||||||
/// Min number of response to consider the request successful
|
/// Min number of response to consider the request successful
|
||||||
pub rs_quorum: Option<usize>,
|
pub rs_quorum: Option<usize>,
|
||||||
/// Should requests be dropped after enough response are received
|
/// Should requests be dropped after enough response are received
|
||||||
|
@ -43,6 +42,8 @@ pub struct RequestStrategy {
|
||||||
pub rs_priority: RequestPriority,
|
pub rs_priority: RequestPriority,
|
||||||
/// Custom timeout for this request
|
/// Custom timeout for this request
|
||||||
rs_timeout: Timeout,
|
rs_timeout: Timeout,
|
||||||
|
/// Data to drop when everything completes
|
||||||
|
rs_drop_on_complete: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
|
@ -52,7 +53,19 @@ enum Timeout {
|
||||||
Custom(Duration),
|
Custom(Duration),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestStrategy {
|
impl Clone for RequestStrategy<()> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
RequestStrategy {
|
||||||
|
rs_quorum: self.rs_quorum,
|
||||||
|
rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
|
||||||
|
rs_priority: self.rs_priority,
|
||||||
|
rs_timeout: self.rs_timeout,
|
||||||
|
rs_drop_on_complete: (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestStrategy<()> {
|
||||||
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||||
pub fn with_priority(prio: RequestPriority) -> Self {
|
pub fn with_priority(prio: RequestPriority) -> Self {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
|
@ -60,8 +73,22 @@ impl RequestStrategy {
|
||||||
rs_interrupt_after_quorum: false,
|
rs_interrupt_after_quorum: false,
|
||||||
rs_priority: prio,
|
rs_priority: prio,
|
||||||
rs_timeout: Timeout::Default,
|
rs_timeout: Timeout::Default,
|
||||||
|
rs_drop_on_complete: (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Add an item to be dropped on completion
|
||||||
|
pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
|
||||||
|
RequestStrategy {
|
||||||
|
rs_quorum: self.rs_quorum,
|
||||||
|
rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
|
||||||
|
rs_priority: self.rs_priority,
|
||||||
|
rs_timeout: self.rs_timeout,
|
||||||
|
rs_drop_on_complete: drop_on_complete,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RequestStrategy<T> {
|
||||||
/// Set quorum to be reached for request
|
/// Set quorum to be reached for request
|
||||||
pub fn with_quorum(mut self, quorum: usize) -> Self {
|
pub fn with_quorum(mut self, quorum: usize) -> Self {
|
||||||
self.rs_quorum = Some(quorum);
|
self.rs_quorum = Some(quorum);
|
||||||
|
@ -83,6 +110,19 @@ impl RequestStrategy {
|
||||||
self.rs_timeout = Timeout::Custom(timeout);
|
self.rs_timeout = Timeout::Custom(timeout);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
/// Extract drop_on_complete item
|
||||||
|
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
|
||||||
|
(
|
||||||
|
RequestStrategy {
|
||||||
|
rs_quorum: self.rs_quorum,
|
||||||
|
rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
|
||||||
|
rs_priority: self.rs_priority,
|
||||||
|
rs_timeout: self.rs_timeout,
|
||||||
|
rs_drop_on_complete: (),
|
||||||
|
},
|
||||||
|
self.rs_drop_on_complete,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -123,7 +163,7 @@ impl RpcHelper {
|
||||||
endpoint: &Endpoint<M, H>,
|
endpoint: &Endpoint<M, H>,
|
||||||
to: Uuid,
|
to: Uuid,
|
||||||
msg: N,
|
msg: N,
|
||||||
strat: RequestStrategy,
|
strat: RequestStrategy<()>,
|
||||||
) -> Result<S, Error>
|
) -> Result<S, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
@ -176,7 +216,7 @@ impl RpcHelper {
|
||||||
endpoint: &Endpoint<M, H>,
|
endpoint: &Endpoint<M, H>,
|
||||||
to: &[Uuid],
|
to: &[Uuid],
|
||||||
msg: N,
|
msg: N,
|
||||||
strat: RequestStrategy,
|
strat: RequestStrategy<()>,
|
||||||
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
@ -187,7 +227,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
let resps = join_all(
|
let resps = join_all(
|
||||||
to.iter()
|
to.iter()
|
||||||
.map(|to| self.call(endpoint, *to, msg.clone(), strat)),
|
.map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
Ok(to
|
Ok(to
|
||||||
|
@ -201,7 +241,7 @@ impl RpcHelper {
|
||||||
&self,
|
&self,
|
||||||
endpoint: &Endpoint<M, H>,
|
endpoint: &Endpoint<M, H>,
|
||||||
msg: N,
|
msg: N,
|
||||||
strat: RequestStrategy,
|
strat: RequestStrategy<()>,
|
||||||
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
@ -220,18 +260,19 @@ impl RpcHelper {
|
||||||
|
|
||||||
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
||||||
/// or an error if quorum could not be reached due to too many errors
|
/// or an error if quorum could not be reached due to too many errors
|
||||||
pub async fn try_call_many<M, N, H, S>(
|
pub async fn try_call_many<M, N, H, S, T>(
|
||||||
&self,
|
&self,
|
||||||
endpoint: &Arc<Endpoint<M, H>>,
|
endpoint: &Arc<Endpoint<M, H>>,
|
||||||
to: &[Uuid],
|
to: &[Uuid],
|
||||||
msg: N,
|
msg: N,
|
||||||
strategy: RequestStrategy,
|
strategy: RequestStrategy<T>,
|
||||||
) -> Result<Vec<S>, Error>
|
) -> Result<Vec<S>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>> + 'static,
|
M: Rpc<Response = Result<S, Error>> + 'static,
|
||||||
N: IntoReq<M>,
|
N: IntoReq<M>,
|
||||||
H: StreamingEndpointHandler<M> + 'static,
|
H: StreamingEndpointHandler<M> + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
let quorum = strategy.rs_quorum.unwrap_or(to.len());
|
let quorum = strategy.rs_quorum.unwrap_or(to.len());
|
||||||
|
|
||||||
|
@ -260,12 +301,12 @@ impl RpcHelper {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_call_many_internal<M, N, H, S>(
|
async fn try_call_many_internal<M, N, H, S, T>(
|
||||||
&self,
|
&self,
|
||||||
endpoint: &Arc<Endpoint<M, H>>,
|
endpoint: &Arc<Endpoint<M, H>>,
|
||||||
to: &[Uuid],
|
to: &[Uuid],
|
||||||
msg: N,
|
msg: N,
|
||||||
strategy: RequestStrategy,
|
strategy: RequestStrategy<T>,
|
||||||
quorum: usize,
|
quorum: usize,
|
||||||
) -> Result<Vec<S>, Error>
|
) -> Result<Vec<S>, Error>
|
||||||
where
|
where
|
||||||
|
@ -273,9 +314,12 @@ impl RpcHelper {
|
||||||
N: IntoReq<M>,
|
N: IntoReq<M>,
|
||||||
H: StreamingEndpointHandler<M> + 'static,
|
H: StreamingEndpointHandler<M> + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
||||||
|
|
||||||
|
let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
|
||||||
|
|
||||||
// Build future for each request
|
// Build future for each request
|
||||||
// They are not started now: they are added below in a FuturesUnordered
|
// They are not started now: they are added below in a FuturesUnordered
|
||||||
// object that will take care of polling them (see below)
|
// object that will take care of polling them (see below)
|
||||||
|
@ -283,6 +327,7 @@ impl RpcHelper {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
let msg = msg.clone();
|
let msg = msg.clone();
|
||||||
let endpoint2 = endpoint.clone();
|
let endpoint2 = endpoint.clone();
|
||||||
|
let strategy = strategy.clone();
|
||||||
(to, async move {
|
(to, async move {
|
||||||
self2.call(&endpoint2, to, msg, strategy).await
|
self2.call(&endpoint2, to, msg, strategy).await
|
||||||
})
|
})
|
||||||
|
@ -377,6 +422,7 @@ impl RpcHelper {
|
||||||
// they have to be put in a proper queue that is persisted to disk.
|
// they have to be put in a proper queue that is persisted to disk.
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
resp_stream.collect::<Vec<Result<_, _>>>().await;
|
resp_stream.collect::<Vec<Result<_, _>>>().await;
|
||||||
|
drop(drop_on_complete);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,14 @@ pub struct Config {
|
||||||
)]
|
)]
|
||||||
pub compression_level: Option<i32>,
|
pub compression_level: Option<i32>,
|
||||||
|
|
||||||
|
/// Maximum amount of block data to buffer in RAM for sending to
|
||||||
|
/// remote nodes when these nodes are on slower links
|
||||||
|
#[serde(
|
||||||
|
deserialize_with = "deserialize_capacity",
|
||||||
|
default = "default_block_ram_buffer_max"
|
||||||
|
)]
|
||||||
|
pub block_ram_buffer_max: usize,
|
||||||
|
|
||||||
/// Skip the permission check of secret files. Useful when
|
/// Skip the permission check of secret files. Useful when
|
||||||
/// POSIX ACLs (or more complex chmods) are used.
|
/// POSIX ACLs (or more complex chmods) are used.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
@ -255,6 +263,9 @@ fn default_sled_flush_every_ms() -> u64 {
|
||||||
fn default_block_size() -> usize {
|
fn default_block_size() -> usize {
|
||||||
1048576
|
1048576
|
||||||
}
|
}
|
||||||
|
fn default_block_ram_buffer_max() -> usize {
|
||||||
|
256 * 1024 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
fn default_compression() -> Option<i32> {
|
fn default_compression() -> Option<i32> {
|
||||||
Some(1)
|
Some(1)
|
||||||
|
|
Loading…
Reference in a new issue