forked from Deuxfleurs/garage
Merge pull request 'Fix unbounded buffering when one node has slower network' (#792) from fix-buffering into main
Reviewed-on: Deuxfleurs/garage#792
This commit is contained in:
commit
ecf641d88c
11 changed files with 228 additions and 58 deletions
|
@ -20,6 +20,7 @@ metadata_auto_snapshot_interval = "6h"
|
||||||
db_engine = "lmdb"
|
db_engine = "lmdb"
|
||||||
|
|
||||||
block_size = "1M"
|
block_size = "1M"
|
||||||
|
block_ram_buffer_max = "256MiB"
|
||||||
|
|
||||||
sled_cache_capacity = "128MiB"
|
sled_cache_capacity = "128MiB"
|
||||||
sled_flush_every_ms = 2000
|
sled_flush_every_ms = 2000
|
||||||
|
@ -88,6 +89,7 @@ The following gives details about each available configuration option.
|
||||||
|
|
||||||
Top-level configuration options:
|
Top-level configuration options:
|
||||||
[`allow_world_readable_secrets`](#allow_world_readable_secrets),
|
[`allow_world_readable_secrets`](#allow_world_readable_secrets),
|
||||||
|
[`block_ram_buffer_max`](#block_ram_buffer_max),
|
||||||
[`block_size`](#block_size),
|
[`block_size`](#block_size),
|
||||||
[`bootstrap_peers`](#bootstrap_peers),
|
[`bootstrap_peers`](#bootstrap_peers),
|
||||||
[`compression_level`](#compression_level),
|
[`compression_level`](#compression_level),
|
||||||
|
@ -420,6 +422,37 @@ files will remain available. This however means that chunks from existing files
|
||||||
will not be deduplicated with chunks from newly uploaded files, meaning you
|
will not be deduplicated with chunks from newly uploaded files, meaning you
|
||||||
might use more storage space that is optimally possible.
|
might use more storage space that is optimally possible.
|
||||||
|
|
||||||
|
#### `block_ram_buffer_max` (since v0.9.4) {#block_ram_buffer_max}
|
||||||
|
|
||||||
|
A limit on the total size of data blocks kept in RAM by S3 API nodes awaiting
|
||||||
|
to be sent to storage nodes asynchronously.
|
||||||
|
|
||||||
|
Explanation: since Garage wants to tolerate node failures, it uses quorum
|
||||||
|
writes to send data blocks to storage nodes: try to write the block to three
|
||||||
|
nodes, and return ok as soon as two writes complete. So even if all three nodes
|
||||||
|
are online, the third write always completes asynchronously. In general, there
|
||||||
|
are not many writes to a cluster, and the third asynchronous write can
|
||||||
|
terminate early enough so as to not cause unbounded RAM growth. However, if
|
||||||
|
the S3 API node is continuously receiving large quantities of data and the
|
||||||
|
third node is never able to catch up, many data blocks will be kept buffered in
|
||||||
|
RAM as they are awaiting transfer to the third node.
|
||||||
|
|
||||||
|
The `block_ram_buffer_max` sets a limit to the size of buffers that can be kept
|
||||||
|
in RAM in this process. When the limit is reached, backpressure is applied
|
||||||
|
back to the S3 client.
|
||||||
|
|
||||||
|
Note that this only counts buffers that have arrived to a certain stage of
|
||||||
|
processing (received from the client + encrypted and/or compressed as
|
||||||
|
necessary) and are ready to send to the storage nodes. Many other buffers will
|
||||||
|
not be counted and this is not a hard limit on RAM consumption. In particular,
|
||||||
|
if many clients send requests simultaneously with large objects, the RAM
|
||||||
|
consumption will always grow linearly with the number of concurrent requests,
|
||||||
|
as each request will use a few buffers of size `block_size` for receiving and
|
||||||
|
intermediate processing before even trying to send the data to the storage
|
||||||
|
node.
|
||||||
|
|
||||||
|
The default value is 256MiB.
|
||||||
|
|
||||||
#### `sled_cache_capacity` {#sled_cache_capacity}
|
#### `sled_cache_capacity` {#sled_cache_capacity}
|
||||||
|
|
||||||
This parameter can be used to tune the capacity of the cache used by
|
This parameter can be used to tune the capacity of the cache used by
|
||||||
|
|
|
@ -225,6 +225,17 @@ block_bytes_read 120586322022
|
||||||
block_bytes_written 3386618077
|
block_bytes_written 3386618077
|
||||||
```
|
```
|
||||||
|
|
||||||
|
#### `block_ram_buffer_free_kb` (gauge)
|
||||||
|
|
||||||
|
Kibibytes available for buffering blocks that have to be sent to remote nodes.
|
||||||
|
When clients send too much data to this node and a storage node is not receiving
|
||||||
|
data fast enough due to slower network conditions, this will decrease down to
|
||||||
|
zero and backpressure will be applied.
|
||||||
|
|
||||||
|
```
|
||||||
|
block_ram_buffer_free_kb 219829
|
||||||
|
```
|
||||||
|
|
||||||
#### `block_compression_level` (counter)
|
#### `block_compression_level` (counter)
|
||||||
|
|
||||||
Exposes the block compression level configured for the Garage node.
|
Exposes the block compression level configured for the Garage node.
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use std::convert::TryInto;
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
|
||||||
|
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
|
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
|
||||||
|
@ -93,6 +94,7 @@ pub struct BlockManager {
|
||||||
|
|
||||||
pub(crate) system: Arc<System>,
|
pub(crate) system: Arc<System>,
|
||||||
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
||||||
|
buffer_kb_semaphore: Arc<Semaphore>,
|
||||||
|
|
||||||
pub(crate) metrics: BlockManagerMetrics,
|
pub(crate) metrics: BlockManagerMetrics,
|
||||||
|
|
||||||
|
@ -152,11 +154,14 @@ impl BlockManager {
|
||||||
.netapp
|
.netapp
|
||||||
.endpoint("garage_block/manager.rs/Rpc".to_string());
|
.endpoint("garage_block/manager.rs/Rpc".to_string());
|
||||||
|
|
||||||
|
let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
|
||||||
|
|
||||||
let metrics = BlockManagerMetrics::new(
|
let metrics = BlockManagerMetrics::new(
|
||||||
config.compression_level,
|
config.compression_level,
|
||||||
rc.rc.clone(),
|
rc.rc.clone(),
|
||||||
resync.queue.clone(),
|
resync.queue.clone(),
|
||||||
resync.errors.clone(),
|
resync.errors.clone(),
|
||||||
|
buffer_kb_semaphore.clone(),
|
||||||
);
|
);
|
||||||
|
|
||||||
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
||||||
|
@ -176,6 +181,7 @@ impl BlockManager {
|
||||||
resync,
|
resync,
|
||||||
system,
|
system,
|
||||||
endpoint,
|
endpoint,
|
||||||
|
buffer_kb_semaphore,
|
||||||
metrics,
|
metrics,
|
||||||
scrub_persister,
|
scrub_persister,
|
||||||
tx_scrub_command: ArcSwapOption::new(None),
|
tx_scrub_command: ArcSwapOption::new(None),
|
||||||
|
@ -232,9 +238,15 @@ impl BlockManager {
|
||||||
async fn rpc_get_raw_block_streaming(
|
async fn rpc_get_raw_block_streaming(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
priority: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<DataBlockStream, Error> {
|
) -> Result<DataBlockStream, Error> {
|
||||||
self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
|
self.rpc_get_raw_block_internal(
|
||||||
|
hash,
|
||||||
|
priority,
|
||||||
|
order_tag,
|
||||||
|
|stream| async move { Ok(stream) },
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -243,9 +255,10 @@ impl BlockManager {
|
||||||
pub(crate) async fn rpc_get_raw_block(
|
pub(crate) async fn rpc_get_raw_block(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
priority: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<DataBlock, Error> {
|
) -> Result<DataBlock, Error> {
|
||||||
self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
|
self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
|
||||||
let (header, stream) = block_stream.into_parts();
|
let (header, stream) = block_stream.into_parts();
|
||||||
read_stream_to_end(stream)
|
read_stream_to_end(stream)
|
||||||
.await
|
.await
|
||||||
|
@ -258,6 +271,7 @@ impl BlockManager {
|
||||||
async fn rpc_get_raw_block_internal<F, Fut, T>(
|
async fn rpc_get_raw_block_internal<F, Fut, T>(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
priority: RequestPriority,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
f: F,
|
f: F,
|
||||||
) -> Result<T, Error>
|
) -> Result<T, Error>
|
||||||
|
@ -273,7 +287,7 @@ impl BlockManager {
|
||||||
let rpc = self.endpoint.call_streaming(
|
let rpc = self.endpoint.call_streaming(
|
||||||
&node_id,
|
&node_id,
|
||||||
BlockRpc::GetBlock(*hash, order_tag),
|
BlockRpc::GetBlock(*hash, order_tag),
|
||||||
PRIO_NORMAL | PRIO_SECONDARY,
|
priority,
|
||||||
);
|
);
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
res = rpc => {
|
res = rpc => {
|
||||||
|
@ -325,7 +339,9 @@ impl BlockManager {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<ByteStream, Error> {
|
) -> Result<ByteStream, Error> {
|
||||||
let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
let block_stream = self
|
||||||
|
.rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
|
||||||
|
.await?;
|
||||||
let (header, stream) = block_stream.into_parts();
|
let (header, stream) = block_stream.into_parts();
|
||||||
match header {
|
match header {
|
||||||
DataBlockHeader::Plain => Ok(stream),
|
DataBlockHeader::Plain => Ok(stream),
|
||||||
|
@ -361,6 +377,14 @@ impl BlockManager {
|
||||||
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
||||||
.await
|
.await
|
||||||
.into_parts();
|
.into_parts();
|
||||||
|
|
||||||
|
let permit = self
|
||||||
|
.buffer_kb_semaphore
|
||||||
|
.clone()
|
||||||
|
.acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
|
||||||
|
.await
|
||||||
|
.ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
|
||||||
|
|
||||||
let put_block_rpc =
|
let put_block_rpc =
|
||||||
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
|
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
|
||||||
let put_block_rpc = if let Some(tag) = order_tag {
|
let put_block_rpc = if let Some(tag) = order_tag {
|
||||||
|
@ -376,6 +400,7 @@ impl BlockManager {
|
||||||
&who[..],
|
&who[..],
|
||||||
put_block_rpc,
|
put_block_rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||||
|
.with_drop_on_completion(permit)
|
||||||
.with_quorum(self.replication.write_quorum()),
|
.with_quorum(self.replication.write_quorum()),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
|
@ -1,3 +1,7 @@
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use tokio::sync::Semaphore;
|
||||||
|
|
||||||
use opentelemetry::{global, metrics::*};
|
use opentelemetry::{global, metrics::*};
|
||||||
|
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
|
@ -9,6 +13,7 @@ pub struct BlockManagerMetrics {
|
||||||
pub(crate) _rc_size: ValueObserver<u64>,
|
pub(crate) _rc_size: ValueObserver<u64>,
|
||||||
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
||||||
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
||||||
|
pub(crate) _buffer_free_kb: ValueObserver<u64>,
|
||||||
|
|
||||||
pub(crate) resync_counter: BoundCounter<u64>,
|
pub(crate) resync_counter: BoundCounter<u64>,
|
||||||
pub(crate) resync_error_counter: BoundCounter<u64>,
|
pub(crate) resync_error_counter: BoundCounter<u64>,
|
||||||
|
@ -31,6 +36,7 @@ impl BlockManagerMetrics {
|
||||||
rc_tree: db::Tree,
|
rc_tree: db::Tree,
|
||||||
resync_queue: CountedTree,
|
resync_queue: CountedTree,
|
||||||
resync_errors: CountedTree,
|
resync_errors: CountedTree,
|
||||||
|
buffer_semaphore: Arc<Semaphore>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let meter = global::meter("garage_model/block");
|
let meter = global::meter("garage_model/block");
|
||||||
Self {
|
Self {
|
||||||
|
@ -66,6 +72,15 @@ impl BlockManagerMetrics {
|
||||||
.with_description("Number of block hashes whose last resync resulted in an error")
|
.with_description("Number of block hashes whose last resync resulted in an error")
|
||||||
.init(),
|
.init(),
|
||||||
|
|
||||||
|
_buffer_free_kb: meter
|
||||||
|
.u64_value_observer("block.ram_buffer_free_kb", move |observer| {
|
||||||
|
observer.observe(buffer_semaphore.available_permits() as u64, &[])
|
||||||
|
})
|
||||||
|
.with_description(
|
||||||
|
"Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
|
||||||
|
)
|
||||||
|
.init(),
|
||||||
|
|
||||||
resync_counter: meter
|
resync_counter: meter
|
||||||
.u64_counter("block.resync_counter")
|
.u64_counter("block.resync_counter")
|
||||||
.with_description("Number of calls to resync_block")
|
.with_description("Number of calls to resync_block")
|
||||||
|
|
|
@ -436,7 +436,7 @@ impl BlockResyncManager {
|
||||||
&manager.endpoint,
|
&manager.endpoint,
|
||||||
&need_nodes[..],
|
&need_nodes[..],
|
||||||
put_block_message,
|
put_block_message,
|
||||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
|
||||||
.with_quorum(need_nodes.len()),
|
.with_quorum(need_nodes.len()),
|
||||||
)
|
)
|
||||||
.await
|
.await
|
||||||
|
@ -460,7 +460,9 @@ impl BlockResyncManager {
|
||||||
hash
|
hash
|
||||||
);
|
);
|
||||||
|
|
||||||
let block_data = manager.rpc_get_raw_block(hash, None).await?;
|
let block_data = manager
|
||||||
|
.rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
|
||||||
|
.await?;
|
||||||
|
|
||||||
manager.metrics.resync_recv_counter.add(1);
|
manager.metrics.resync_recv_counter.add(1);
|
||||||
|
|
||||||
|
|
|
@ -300,7 +300,11 @@ impl K2VRpcHandler {
|
||||||
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
|
let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout();
|
||||||
let mut requests = nodes
|
let mut requests = nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs))
|
.map(|node| {
|
||||||
|
self.system
|
||||||
|
.rpc
|
||||||
|
.call(&self.endpoint, *node, msg.clone(), rs.clone())
|
||||||
|
})
|
||||||
.collect::<FuturesUnordered<_>>();
|
.collect::<FuturesUnordered<_>>();
|
||||||
|
|
||||||
// Fetch responses. This procedure stops fetching responses when any of the following
|
// Fetch responses. This procedure stops fetching responses when any of the following
|
||||||
|
|
|
@ -28,12 +28,30 @@ use crate::util::*;
|
||||||
/// The same priority value is given to a request and to its associated response.
|
/// The same priority value is given to a request and to its associated response.
|
||||||
pub type RequestPriority = u8;
|
pub type RequestPriority = u8;
|
||||||
|
|
||||||
|
// Usage of priority levels in Garage:
|
||||||
|
//
|
||||||
|
// PRIO_HIGH
|
||||||
|
// for liveness check events such as pings and important
|
||||||
|
// reconfiguration events such as layout changes
|
||||||
|
//
|
||||||
|
// PRIO_NORMAL
|
||||||
|
// for standard interactive requests to exchange metadata
|
||||||
|
//
|
||||||
|
// PRIO_NORMAL | PRIO_SECONDARY
|
||||||
|
// for standard interactive requests to exchange block data
|
||||||
|
//
|
||||||
|
// PRIO_BACKGROUND
|
||||||
|
// for background resync requests to exchange metadata
|
||||||
|
// PRIO_BACKGROUND | PRIO_SECONDARY
|
||||||
|
// for background resync requests to exchange block data
|
||||||
|
|
||||||
/// Priority class: high
|
/// Priority class: high
|
||||||
pub const PRIO_HIGH: RequestPriority = 0x20;
|
pub const PRIO_HIGH: RequestPriority = 0x20;
|
||||||
/// Priority class: normal
|
/// Priority class: normal
|
||||||
pub const PRIO_NORMAL: RequestPriority = 0x40;
|
pub const PRIO_NORMAL: RequestPriority = 0x40;
|
||||||
/// Priority class: background
|
/// Priority class: background
|
||||||
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
|
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
|
||||||
|
|
||||||
/// Priority: primary among given class
|
/// Priority: primary among given class
|
||||||
pub const PRIO_PRIMARY: RequestPriority = 0x00;
|
pub const PRIO_PRIMARY: RequestPriority = 0x00;
|
||||||
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
|
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
|
||||||
|
|
|
@ -109,7 +109,7 @@ impl SendQueuePriority {
|
||||||
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
||||||
order_vec.insert(i, order);
|
order_vec.insert(i, order);
|
||||||
}
|
}
|
||||||
self.items.push_front(item);
|
self.items.push_back(item);
|
||||||
}
|
}
|
||||||
fn remove(&mut self, id: RequestID) {
|
fn remove(&mut self, id: RequestID) {
|
||||||
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
||||||
|
@ -128,6 +128,10 @@ impl SendQueuePriority {
|
||||||
self.items.is_empty()
|
self.items.is_empty()
|
||||||
}
|
}
|
||||||
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
|
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
|
||||||
|
// in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
|
||||||
|
// as they most likely represent small requests to be sent first
|
||||||
|
// in step 2: poll all streams
|
||||||
|
for step in 0..2 {
|
||||||
for (j, item) in self.items.iter_mut().enumerate() {
|
for (j, item) in self.items.iter_mut().enumerate() {
|
||||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
||||||
|
@ -135,6 +139,10 @@ impl SendQueuePriority {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if step == 0 && item.sent > 0 {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
||||||
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
||||||
let id = item.id;
|
let id = item.id;
|
||||||
|
@ -160,21 +168,18 @@ impl SendQueuePriority {
|
||||||
}
|
}
|
||||||
// Remove item from sending queue
|
// Remove item from sending queue
|
||||||
self.items.remove(j);
|
self.items.remove(j);
|
||||||
} else {
|
} else if step == 0 {
|
||||||
// Move item later in send queue to implement LAS scheduling
|
// Step 0 means that this stream had not sent any bytes yet.
|
||||||
// (LAS = Least Attained Service)
|
// Now that it has, and it was not an EOS, we know that it is bigger
|
||||||
for k in j..self.items.len() - 1 {
|
// than one chunk so move it at the end of the queue.
|
||||||
if self.items[k].sent >= self.items[k + 1].sent {
|
let item = self.items.remove(j).unwrap();
|
||||||
self.items.swap(k, k + 1);
|
self.items.push_back(item);
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return Poll::Ready((id, data_frame));
|
return Poll::Ready((id, data_frame));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Poll::Pending
|
Poll::Pending
|
||||||
}
|
}
|
||||||
|
|
|
@ -190,7 +190,7 @@ impl RecvLoop for ServerConn {
|
||||||
|
|
||||||
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
|
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
|
||||||
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
|
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
|
||||||
Err(e) => (PRIO_HIGH, Err(e)),
|
Err(e) => (PRIO_NORMAL, Err(e)),
|
||||||
};
|
};
|
||||||
|
|
||||||
debug!("server: sending response to {}", id);
|
debug!("server: sending response to {}", id);
|
||||||
|
|
|
@ -33,8 +33,7 @@ use crate::ring::Ring;
|
||||||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
|
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
|
||||||
|
|
||||||
/// Strategy to apply when making RPC
|
/// Strategy to apply when making RPC
|
||||||
#[derive(Copy, Clone)]
|
pub struct RequestStrategy<T> {
|
||||||
pub struct RequestStrategy {
|
|
||||||
/// Min number of response to consider the request successful
|
/// Min number of response to consider the request successful
|
||||||
pub rs_quorum: Option<usize>,
|
pub rs_quorum: Option<usize>,
|
||||||
/// Should requests be dropped after enough response are received
|
/// Should requests be dropped after enough response are received
|
||||||
|
@ -43,6 +42,8 @@ pub struct RequestStrategy {
|
||||||
pub rs_priority: RequestPriority,
|
pub rs_priority: RequestPriority,
|
||||||
/// Custom timeout for this request
|
/// Custom timeout for this request
|
||||||
rs_timeout: Timeout,
|
rs_timeout: Timeout,
|
||||||
|
/// Data to drop when everything completes
|
||||||
|
rs_drop_on_complete: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Copy, Clone)]
|
#[derive(Copy, Clone)]
|
||||||
|
@ -52,7 +53,19 @@ enum Timeout {
|
||||||
Custom(Duration),
|
Custom(Duration),
|
||||||
}
|
}
|
||||||
|
|
||||||
impl RequestStrategy {
|
impl Clone for RequestStrategy<()> {
|
||||||
|
fn clone(&self) -> Self {
|
||||||
|
RequestStrategy {
|
||||||
|
rs_quorum: self.rs_quorum,
|
||||||
|
rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
|
||||||
|
rs_priority: self.rs_priority,
|
||||||
|
rs_timeout: self.rs_timeout,
|
||||||
|
rs_drop_on_complete: (),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl RequestStrategy<()> {
|
||||||
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||||
pub fn with_priority(prio: RequestPriority) -> Self {
|
pub fn with_priority(prio: RequestPriority) -> Self {
|
||||||
RequestStrategy {
|
RequestStrategy {
|
||||||
|
@ -60,8 +73,22 @@ impl RequestStrategy {
|
||||||
rs_interrupt_after_quorum: false,
|
rs_interrupt_after_quorum: false,
|
||||||
rs_priority: prio,
|
rs_priority: prio,
|
||||||
rs_timeout: Timeout::Default,
|
rs_timeout: Timeout::Default,
|
||||||
|
rs_drop_on_complete: (),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
/// Add an item to be dropped on completion
|
||||||
|
pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
|
||||||
|
RequestStrategy {
|
||||||
|
rs_quorum: self.rs_quorum,
|
||||||
|
rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
|
||||||
|
rs_priority: self.rs_priority,
|
||||||
|
rs_timeout: self.rs_timeout,
|
||||||
|
rs_drop_on_complete: drop_on_complete,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> RequestStrategy<T> {
|
||||||
/// Set quorum to be reached for request
|
/// Set quorum to be reached for request
|
||||||
pub fn with_quorum(mut self, quorum: usize) -> Self {
|
pub fn with_quorum(mut self, quorum: usize) -> Self {
|
||||||
self.rs_quorum = Some(quorum);
|
self.rs_quorum = Some(quorum);
|
||||||
|
@ -83,6 +110,19 @@ impl RequestStrategy {
|
||||||
self.rs_timeout = Timeout::Custom(timeout);
|
self.rs_timeout = Timeout::Custom(timeout);
|
||||||
self
|
self
|
||||||
}
|
}
|
||||||
|
/// Extract drop_on_complete item
|
||||||
|
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
|
||||||
|
(
|
||||||
|
RequestStrategy {
|
||||||
|
rs_quorum: self.rs_quorum,
|
||||||
|
rs_interrupt_after_quorum: self.rs_interrupt_after_quorum,
|
||||||
|
rs_priority: self.rs_priority,
|
||||||
|
rs_timeout: self.rs_timeout,
|
||||||
|
rs_drop_on_complete: (),
|
||||||
|
},
|
||||||
|
self.rs_drop_on_complete,
|
||||||
|
)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -123,7 +163,7 @@ impl RpcHelper {
|
||||||
endpoint: &Endpoint<M, H>,
|
endpoint: &Endpoint<M, H>,
|
||||||
to: Uuid,
|
to: Uuid,
|
||||||
msg: N,
|
msg: N,
|
||||||
strat: RequestStrategy,
|
strat: RequestStrategy<()>,
|
||||||
) -> Result<S, Error>
|
) -> Result<S, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
@ -176,7 +216,7 @@ impl RpcHelper {
|
||||||
endpoint: &Endpoint<M, H>,
|
endpoint: &Endpoint<M, H>,
|
||||||
to: &[Uuid],
|
to: &[Uuid],
|
||||||
msg: N,
|
msg: N,
|
||||||
strat: RequestStrategy,
|
strat: RequestStrategy<()>,
|
||||||
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
@ -187,7 +227,7 @@ impl RpcHelper {
|
||||||
|
|
||||||
let resps = join_all(
|
let resps = join_all(
|
||||||
to.iter()
|
to.iter()
|
||||||
.map(|to| self.call(endpoint, *to, msg.clone(), strat)),
|
.map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
|
||||||
)
|
)
|
||||||
.await;
|
.await;
|
||||||
Ok(to
|
Ok(to
|
||||||
|
@ -201,7 +241,7 @@ impl RpcHelper {
|
||||||
&self,
|
&self,
|
||||||
endpoint: &Endpoint<M, H>,
|
endpoint: &Endpoint<M, H>,
|
||||||
msg: N,
|
msg: N,
|
||||||
strat: RequestStrategy,
|
strat: RequestStrategy<()>,
|
||||||
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>>,
|
M: Rpc<Response = Result<S, Error>>,
|
||||||
|
@ -220,18 +260,19 @@ impl RpcHelper {
|
||||||
|
|
||||||
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
/// Make a RPC call to multiple servers, returning either a Vec of responses,
|
||||||
/// or an error if quorum could not be reached due to too many errors
|
/// or an error if quorum could not be reached due to too many errors
|
||||||
pub async fn try_call_many<M, N, H, S>(
|
pub async fn try_call_many<M, N, H, S, T>(
|
||||||
&self,
|
&self,
|
||||||
endpoint: &Arc<Endpoint<M, H>>,
|
endpoint: &Arc<Endpoint<M, H>>,
|
||||||
to: &[Uuid],
|
to: &[Uuid],
|
||||||
msg: N,
|
msg: N,
|
||||||
strategy: RequestStrategy,
|
strategy: RequestStrategy<T>,
|
||||||
) -> Result<Vec<S>, Error>
|
) -> Result<Vec<S>, Error>
|
||||||
where
|
where
|
||||||
M: Rpc<Response = Result<S, Error>> + 'static,
|
M: Rpc<Response = Result<S, Error>> + 'static,
|
||||||
N: IntoReq<M>,
|
N: IntoReq<M>,
|
||||||
H: StreamingEndpointHandler<M> + 'static,
|
H: StreamingEndpointHandler<M> + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
let quorum = strategy.rs_quorum.unwrap_or(to.len());
|
let quorum = strategy.rs_quorum.unwrap_or(to.len());
|
||||||
|
|
||||||
|
@ -260,12 +301,12 @@ impl RpcHelper {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn try_call_many_internal<M, N, H, S>(
|
async fn try_call_many_internal<M, N, H, S, T>(
|
||||||
&self,
|
&self,
|
||||||
endpoint: &Arc<Endpoint<M, H>>,
|
endpoint: &Arc<Endpoint<M, H>>,
|
||||||
to: &[Uuid],
|
to: &[Uuid],
|
||||||
msg: N,
|
msg: N,
|
||||||
strategy: RequestStrategy,
|
strategy: RequestStrategy<T>,
|
||||||
quorum: usize,
|
quorum: usize,
|
||||||
) -> Result<Vec<S>, Error>
|
) -> Result<Vec<S>, Error>
|
||||||
where
|
where
|
||||||
|
@ -273,9 +314,12 @@ impl RpcHelper {
|
||||||
N: IntoReq<M>,
|
N: IntoReq<M>,
|
||||||
H: StreamingEndpointHandler<M> + 'static,
|
H: StreamingEndpointHandler<M> + 'static,
|
||||||
S: Send + 'static,
|
S: Send + 'static,
|
||||||
|
T: Send + 'static,
|
||||||
{
|
{
|
||||||
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
||||||
|
|
||||||
|
let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
|
||||||
|
|
||||||
// Build future for each request
|
// Build future for each request
|
||||||
// They are not started now: they are added below in a FuturesUnordered
|
// They are not started now: they are added below in a FuturesUnordered
|
||||||
// object that will take care of polling them (see below)
|
// object that will take care of polling them (see below)
|
||||||
|
@ -283,6 +327,7 @@ impl RpcHelper {
|
||||||
let self2 = self.clone();
|
let self2 = self.clone();
|
||||||
let msg = msg.clone();
|
let msg = msg.clone();
|
||||||
let endpoint2 = endpoint.clone();
|
let endpoint2 = endpoint.clone();
|
||||||
|
let strategy = strategy.clone();
|
||||||
(to, async move {
|
(to, async move {
|
||||||
self2.call(&endpoint2, to, msg, strategy).await
|
self2.call(&endpoint2, to, msg, strategy).await
|
||||||
})
|
})
|
||||||
|
@ -377,6 +422,7 @@ impl RpcHelper {
|
||||||
// they have to be put in a proper queue that is persisted to disk.
|
// they have to be put in a proper queue that is persisted to disk.
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
resp_stream.collect::<Vec<Result<_, _>>>().await;
|
resp_stream.collect::<Vec<Result<_, _>>>().await;
|
||||||
|
drop(drop_on_complete);
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,6 +52,14 @@ pub struct Config {
|
||||||
)]
|
)]
|
||||||
pub compression_level: Option<i32>,
|
pub compression_level: Option<i32>,
|
||||||
|
|
||||||
|
/// Maximum amount of block data to buffer in RAM for sending to
|
||||||
|
/// remote nodes when these nodes are on slower links
|
||||||
|
#[serde(
|
||||||
|
deserialize_with = "deserialize_capacity",
|
||||||
|
default = "default_block_ram_buffer_max"
|
||||||
|
)]
|
||||||
|
pub block_ram_buffer_max: usize,
|
||||||
|
|
||||||
/// Skip the permission check of secret files. Useful when
|
/// Skip the permission check of secret files. Useful when
|
||||||
/// POSIX ACLs (or more complex chmods) are used.
|
/// POSIX ACLs (or more complex chmods) are used.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
@ -255,6 +263,9 @@ fn default_sled_flush_every_ms() -> u64 {
|
||||||
fn default_block_size() -> usize {
|
fn default_block_size() -> usize {
|
||||||
1048576
|
1048576
|
||||||
}
|
}
|
||||||
|
fn default_block_ram_buffer_max() -> usize {
|
||||||
|
256 * 1024 * 1024
|
||||||
|
}
|
||||||
|
|
||||||
fn default_compression() -> Option<i32> {
|
fn default_compression() -> Option<i32> {
|
||||||
Some(1)
|
Some(1)
|
||||||
|
|
Loading…
Reference in a new issue