Compare commits
6 commits
25c196f34d
...
8bfc16ba7d
Author | SHA1 | Date | |
---|---|---|---|
8bfc16ba7d | |||
ecf641d88c | |||
75cd14926d | |||
e1dc84e123 | |||
85f580cbde | |||
0d3e285d13 |
12 changed files with 228 additions and 60 deletions
|
@ -35,6 +35,7 @@ steps:
|
|||
- ./result/bin/garage_web-*
|
||||
- ./result/bin/garage-*
|
||||
- GARAGE_TEST_INTEGRATION_DB_ENGINE=lmdb ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
||||
- nix-shell --attr ci --run "killall -9 garage" || true
|
||||
- GARAGE_TEST_INTEGRATION_DB_ENGINE=sqlite ./result/bin/integration-* || (cat tmp-garage-integration/stderr.log; false)
|
||||
- rm result
|
||||
- rm -rv tmp-garage-integration
|
||||
|
|
|
@ -21,6 +21,7 @@ metadata_auto_snapshot_interval = "6h"
|
|||
db_engine = "lmdb"
|
||||
|
||||
block_size = "1M"
|
||||
block_ram_buffer_max = "256MiB"
|
||||
|
||||
lmdb_map_size = "1T"
|
||||
|
||||
|
@ -87,6 +88,7 @@ The following gives details about each available configuration option.
|
|||
|
||||
Top-level configuration options:
|
||||
[`allow_world_readable_secrets`](#allow_world_readable_secrets),
|
||||
[`block_ram_buffer_max`](#block_ram_buffer_max),
|
||||
[`block_size`](#block_size),
|
||||
[`bootstrap_peers`](#bootstrap_peers),
|
||||
[`compression_level`](#compression_level),
|
||||
|
@ -434,6 +436,37 @@ files will remain available. This however means that chunks from existing files
|
|||
will not be deduplicated with chunks from newly uploaded files, meaning you
|
||||
might use more storage space that is optimally possible.
|
||||
|
||||
#### `block_ram_buffer_max` (since v0.9.4) {#block_ram_buffer_max}
|
||||
|
||||
A limit on the total size of data blocks kept in RAM by S3 API nodes awaiting
|
||||
to be sent to storage nodes asynchronously.
|
||||
|
||||
Explanation: since Garage wants to tolerate node failures, it uses quorum
|
||||
writes to send data blocks to storage nodes: try to write the block to three
|
||||
nodes, and return ok as soon as two writes complete. So even if all three nodes
|
||||
are online, the third write always completes asynchronously. In general, there
|
||||
are not many writes to a cluster, and the third asynchronous write can
|
||||
terminate early enough so as to not cause unbounded RAM growth. However, if
|
||||
the S3 API node is continuously receiving large quantities of data and the
|
||||
third node is never able to catch up, many data blocks will be kept buffered in
|
||||
RAM as they are awaiting transfer to the third node.
|
||||
|
||||
The `block_ram_buffer_max` sets a limit to the size of buffers that can be kept
|
||||
in RAM in this process. When the limit is reached, backpressure is applied
|
||||
back to the S3 client.
|
||||
|
||||
Note that this only counts buffers that have arrived to a certain stage of
|
||||
processing (received from the client + encrypted and/or compressed as
|
||||
necessary) and are ready to send to the storage nodes. Many other buffers will
|
||||
not be counted and this is not a hard limit on RAM consumption. In particular,
|
||||
if many clients send requests simultaneously with large objects, the RAM
|
||||
consumption will always grow linearly with the number of concurrent requests,
|
||||
as each request will use a few buffers of size `block_size` for receiving and
|
||||
intermediate processing before even trying to send the data to the storage
|
||||
node.
|
||||
|
||||
The default value is 256MiB.
|
||||
|
||||
#### `lmdb_map_size` {#lmdb_map_size}
|
||||
|
||||
This parameters can be used to set the map size used by LMDB,
|
||||
|
|
|
@ -225,6 +225,17 @@ block_bytes_read 120586322022
|
|||
block_bytes_written 3386618077
|
||||
```
|
||||
|
||||
#### `block_ram_buffer_free_kb` (gauge)
|
||||
|
||||
Kibibytes available for buffering blocks that have to be sent to remote nodes.
|
||||
When clients send too much data to this node and a storage node is not receiving
|
||||
data fast enough due to slower network conditions, this will decrease down to
|
||||
zero and backpressure will be applied.
|
||||
|
||||
```
|
||||
block_ram_buffer_free_kb 219829
|
||||
```
|
||||
|
||||
#### `block_compression_level` (counter)
|
||||
|
||||
Exposes the block compression level configured for the Garage node.
|
||||
|
|
|
@ -1,3 +1,4 @@
|
|||
use std::convert::TryInto;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize};
|
|||
|
||||
use tokio::fs;
|
||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
||||
use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore};
|
||||
|
||||
use opentelemetry::{
|
||||
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
|
||||
|
@ -93,6 +94,7 @@ pub struct BlockManager {
|
|||
|
||||
pub(crate) system: Arc<System>,
|
||||
pub(crate) endpoint: Arc<Endpoint<BlockRpc, Self>>,
|
||||
buffer_kb_semaphore: Arc<Semaphore>,
|
||||
|
||||
pub(crate) metrics: BlockManagerMetrics,
|
||||
|
||||
|
@ -152,11 +154,14 @@ impl BlockManager {
|
|||
.netapp
|
||||
.endpoint("garage_block/manager.rs/Rpc".to_string());
|
||||
|
||||
let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024));
|
||||
|
||||
let metrics = BlockManagerMetrics::new(
|
||||
config.compression_level,
|
||||
rc.rc_table.clone(),
|
||||
resync.queue.clone(),
|
||||
resync.errors.clone(),
|
||||
buffer_kb_semaphore.clone(),
|
||||
);
|
||||
|
||||
let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info");
|
||||
|
@ -176,6 +181,7 @@ impl BlockManager {
|
|||
resync,
|
||||
system,
|
||||
endpoint,
|
||||
buffer_kb_semaphore,
|
||||
metrics,
|
||||
scrub_persister,
|
||||
tx_scrub_command: ArcSwapOption::new(None),
|
||||
|
@ -238,10 +244,16 @@ impl BlockManager {
|
|||
async fn rpc_get_raw_block_streaming(
|
||||
&self,
|
||||
hash: &Hash,
|
||||
priority: RequestPriority,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<DataBlockStream, Error> {
|
||||
self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
|
||||
.await
|
||||
self.rpc_get_raw_block_internal(
|
||||
hash,
|
||||
priority,
|
||||
order_tag,
|
||||
|stream| async move { Ok(stream) },
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Ask nodes that might have a (possibly compressed) block for it
|
||||
|
@ -249,9 +261,10 @@ impl BlockManager {
|
|||
pub(crate) async fn rpc_get_raw_block(
|
||||
&self,
|
||||
hash: &Hash,
|
||||
priority: RequestPriority,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<DataBlock, Error> {
|
||||
self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
|
||||
self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move {
|
||||
let (header, stream) = block_stream.into_parts();
|
||||
read_stream_to_end(stream)
|
||||
.await
|
||||
|
@ -264,6 +277,7 @@ impl BlockManager {
|
|||
async fn rpc_get_raw_block_internal<F, Fut, T>(
|
||||
&self,
|
||||
hash: &Hash,
|
||||
priority: RequestPriority,
|
||||
order_tag: Option<OrderTag>,
|
||||
f: F,
|
||||
) -> Result<T, Error>
|
||||
|
@ -281,7 +295,7 @@ impl BlockManager {
|
|||
let rpc = self.endpoint.call_streaming(
|
||||
&node_id,
|
||||
BlockRpc::GetBlock(*hash, order_tag),
|
||||
PRIO_NORMAL | PRIO_SECONDARY,
|
||||
priority,
|
||||
);
|
||||
tokio::select! {
|
||||
res = rpc => {
|
||||
|
@ -333,7 +347,9 @@ impl BlockManager {
|
|||
hash: &Hash,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<ByteStream, Error> {
|
||||
let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
||||
let block_stream = self
|
||||
.rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag)
|
||||
.await?;
|
||||
let (header, stream) = block_stream.into_parts();
|
||||
match header {
|
||||
DataBlockHeader::Plain => Ok(stream),
|
||||
|
@ -361,6 +377,14 @@ impl BlockManager {
|
|||
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
|
||||
.await
|
||||
.into_parts();
|
||||
|
||||
let permit = self
|
||||
.buffer_kb_semaphore
|
||||
.clone()
|
||||
.acquire_many_owned((bytes.len() / 1024).try_into().unwrap())
|
||||
.await
|
||||
.ok_or_message("could not reserve space for buffer of data to send to remote nodes")?;
|
||||
|
||||
let put_block_rpc =
|
||||
Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes);
|
||||
let put_block_rpc = if let Some(tag) = order_tag {
|
||||
|
@ -376,6 +400,7 @@ impl BlockManager {
|
|||
who.as_ref(),
|
||||
put_block_rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||
.with_drop_on_completion(permit)
|
||||
.with_quorum(self.replication.write_quorum()),
|
||||
)
|
||||
.await?;
|
||||
|
|
|
@ -1,3 +1,7 @@
|
|||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::Semaphore;
|
||||
|
||||
use opentelemetry::{global, metrics::*};
|
||||
|
||||
use garage_db as db;
|
||||
|
@ -8,6 +12,7 @@ pub struct BlockManagerMetrics {
|
|||
pub(crate) _rc_size: ValueObserver<u64>,
|
||||
pub(crate) _resync_queue_len: ValueObserver<u64>,
|
||||
pub(crate) _resync_errored_blocks: ValueObserver<u64>,
|
||||
pub(crate) _buffer_free_kb: ValueObserver<u64>,
|
||||
|
||||
pub(crate) resync_counter: BoundCounter<u64>,
|
||||
pub(crate) resync_error_counter: BoundCounter<u64>,
|
||||
|
@ -30,6 +35,7 @@ impl BlockManagerMetrics {
|
|||
rc_tree: db::Tree,
|
||||
resync_queue: db::Tree,
|
||||
resync_errors: db::Tree,
|
||||
buffer_semaphore: Arc<Semaphore>,
|
||||
) -> Self {
|
||||
let meter = global::meter("garage_model/block");
|
||||
Self {
|
||||
|
@ -69,6 +75,15 @@ impl BlockManagerMetrics {
|
|||
.with_description("Number of block hashes whose last resync resulted in an error")
|
||||
.init(),
|
||||
|
||||
_buffer_free_kb: meter
|
||||
.u64_value_observer("block.ram_buffer_free_kb", move |observer| {
|
||||
observer.observe(buffer_semaphore.available_permits() as u64, &[])
|
||||
})
|
||||
.with_description(
|
||||
"Available RAM in KiB to use for buffering data blocks to be written to remote nodes",
|
||||
)
|
||||
.init(),
|
||||
|
||||
resync_counter: meter
|
||||
.u64_counter("block.resync_counter")
|
||||
.with_description("Number of calls to resync_block")
|
||||
|
|
|
@ -436,7 +436,7 @@ impl BlockResyncManager {
|
|||
&manager.endpoint,
|
||||
&need_nodes,
|
||||
put_block_message,
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND)
|
||||
RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY)
|
||||
.with_quorum(need_nodes.len()),
|
||||
)
|
||||
.await
|
||||
|
@ -460,7 +460,9 @@ impl BlockResyncManager {
|
|||
hash
|
||||
);
|
||||
|
||||
let block_data = manager.rpc_get_raw_block(hash, None).await;
|
||||
let block_data = manager
|
||||
.rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None)
|
||||
.await;
|
||||
if matches!(block_data, Err(Error::MissingBlock(_))) {
|
||||
warn!(
|
||||
"Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
|
||||
|
|
|
@ -300,7 +300,7 @@ impl K2VRpcHandler {
|
|||
.map(|node| {
|
||||
self.system
|
||||
.rpc_helper()
|
||||
.call(&self.endpoint, *node, msg.clone(), rs)
|
||||
.call(&self.endpoint, *node, msg.clone(), rs.clone())
|
||||
})
|
||||
.collect::<FuturesUnordered<_>>();
|
||||
|
||||
|
|
|
@ -28,12 +28,30 @@ use crate::util::*;
|
|||
/// The same priority value is given to a request and to its associated response.
|
||||
pub type RequestPriority = u8;
|
||||
|
||||
// Usage of priority levels in Garage:
|
||||
//
|
||||
// PRIO_HIGH
|
||||
// for liveness check events such as pings and important
|
||||
// reconfiguration events such as layout changes
|
||||
//
|
||||
// PRIO_NORMAL
|
||||
// for standard interactive requests to exchange metadata
|
||||
//
|
||||
// PRIO_NORMAL | PRIO_SECONDARY
|
||||
// for standard interactive requests to exchange block data
|
||||
//
|
||||
// PRIO_BACKGROUND
|
||||
// for background resync requests to exchange metadata
|
||||
// PRIO_BACKGROUND | PRIO_SECONDARY
|
||||
// for background resync requests to exchange block data
|
||||
|
||||
/// Priority class: high
|
||||
pub const PRIO_HIGH: RequestPriority = 0x20;
|
||||
/// Priority class: normal
|
||||
pub const PRIO_NORMAL: RequestPriority = 0x40;
|
||||
/// Priority class: background
|
||||
pub const PRIO_BACKGROUND: RequestPriority = 0x80;
|
||||
|
||||
/// Priority: primary among given class
|
||||
pub const PRIO_PRIMARY: RequestPriority = 0x00;
|
||||
/// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`)
|
||||
|
|
|
@ -109,7 +109,7 @@ impl SendQueuePriority {
|
|||
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
||||
order_vec.insert(i, order);
|
||||
}
|
||||
self.items.push_front(item);
|
||||
self.items.push_back(item);
|
||||
}
|
||||
fn remove(&mut self, id: RequestID) {
|
||||
if let Some(i) = self.items.iter().position(|x| x.id == id) {
|
||||
|
@ -128,51 +128,56 @@ impl SendQueuePriority {
|
|||
self.items.is_empty()
|
||||
}
|
||||
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
|
||||
for (j, item) in self.items.iter_mut().enumerate() {
|
||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
||||
// in step 1: poll only streams that have sent 0 bytes, we want to send them in priority
|
||||
// as they most likely represent small requests to be sent first
|
||||
// in step 2: poll all streams
|
||||
for step in 0..2 {
|
||||
for (j, item) in self.items.iter_mut().enumerate() {
|
||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if step == 0 && item.sent > 0 {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
||||
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
||||
let id = item.id;
|
||||
let eos = item.data.eos();
|
||||
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
||||
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
||||
let id = item.id;
|
||||
let eos = item.data.eos();
|
||||
|
||||
let packet = bytes_or_err.map_err(|e| match e {
|
||||
ReadExactError::Stream(err) => err,
|
||||
_ => unreachable!(),
|
||||
});
|
||||
let packet = bytes_or_err.map_err(|e| match e {
|
||||
ReadExactError::Stream(err) => err,
|
||||
_ => unreachable!(),
|
||||
});
|
||||
|
||||
let is_err = packet.is_err();
|
||||
let data_frame = DataFrame::from_packet(packet, !eos);
|
||||
item.sent += data_frame.data().len();
|
||||
let is_err = packet.is_err();
|
||||
let data_frame = DataFrame::from_packet(packet, !eos);
|
||||
item.sent += data_frame.data().len();
|
||||
|
||||
if eos || is_err {
|
||||
// If item had an order tag, remove it from the corresponding ordering list
|
||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||
let order_stream = self.order.get_mut(&stream).unwrap();
|
||||
assert_eq!(order_stream.pop_front(), Some(order));
|
||||
if order_stream.is_empty() {
|
||||
self.order.remove(&stream);
|
||||
}
|
||||
}
|
||||
// Remove item from sending queue
|
||||
self.items.remove(j);
|
||||
} else {
|
||||
// Move item later in send queue to implement LAS scheduling
|
||||
// (LAS = Least Attained Service)
|
||||
for k in j..self.items.len() - 1 {
|
||||
if self.items[k].sent >= self.items[k + 1].sent {
|
||||
self.items.swap(k, k + 1);
|
||||
} else {
|
||||
break;
|
||||
if eos || is_err {
|
||||
// If item had an order tag, remove it from the corresponding ordering list
|
||||
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||
let order_stream = self.order.get_mut(&stream).unwrap();
|
||||
assert_eq!(order_stream.pop_front(), Some(order));
|
||||
if order_stream.is_empty() {
|
||||
self.order.remove(&stream);
|
||||
}
|
||||
}
|
||||
// Remove item from sending queue
|
||||
self.items.remove(j);
|
||||
} else if step == 0 {
|
||||
// Step 0 means that this stream had not sent any bytes yet.
|
||||
// Now that it has, and it was not an EOS, we know that it is bigger
|
||||
// than one chunk so move it at the end of the queue.
|
||||
let item = self.items.remove(j).unwrap();
|
||||
self.items.push_back(item);
|
||||
}
|
||||
|
||||
return Poll::Ready((id, data_frame));
|
||||
}
|
||||
|
||||
return Poll::Ready((id, data_frame));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -190,7 +190,7 @@ impl RecvLoop for ServerConn {
|
|||
|
||||
let (prio, resp_enc_result) = match ReqEnc::decode(stream).await {
|
||||
Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await),
|
||||
Err(e) => (PRIO_HIGH, Err(e)),
|
||||
Err(e) => (PRIO_NORMAL, Err(e)),
|
||||
};
|
||||
|
||||
debug!("server: sending response to {}", id);
|
||||
|
|
|
@ -33,8 +33,7 @@ use crate::metrics::RpcMetrics;
|
|||
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
|
||||
/// Strategy to apply when making RPC
|
||||
#[derive(Copy, Clone)]
|
||||
pub struct RequestStrategy {
|
||||
pub struct RequestStrategy<T> {
|
||||
/// Min number of response to consider the request successful
|
||||
rs_quorum: Option<usize>,
|
||||
/// Send all requests at once
|
||||
|
@ -43,6 +42,8 @@ pub struct RequestStrategy {
|
|||
rs_priority: RequestPriority,
|
||||
/// Custom timeout for this request
|
||||
rs_timeout: Timeout,
|
||||
/// Data to drop when everything completes
|
||||
rs_drop_on_complete: T,
|
||||
}
|
||||
|
||||
#[derive(Copy, Clone)]
|
||||
|
@ -52,7 +53,19 @@ enum Timeout {
|
|||
Custom(Duration),
|
||||
}
|
||||
|
||||
impl RequestStrategy {
|
||||
impl Clone for RequestStrategy<()> {
|
||||
fn clone(&self) -> Self {
|
||||
RequestStrategy {
|
||||
rs_quorum: self.rs_quorum,
|
||||
rs_send_all_at_once: self.rs_send_all_at_once,
|
||||
rs_priority: self.rs_priority,
|
||||
rs_timeout: self.rs_timeout,
|
||||
rs_drop_on_complete: (),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl RequestStrategy<()> {
|
||||
/// Create a RequestStrategy with default timeout and not interrupting when quorum reached
|
||||
pub fn with_priority(prio: RequestPriority) -> Self {
|
||||
RequestStrategy {
|
||||
|
@ -60,8 +73,22 @@ impl RequestStrategy {
|
|||
rs_send_all_at_once: None,
|
||||
rs_priority: prio,
|
||||
rs_timeout: Timeout::Default,
|
||||
rs_drop_on_complete: (),
|
||||
}
|
||||
}
|
||||
/// Add an item to be dropped on completion
|
||||
pub fn with_drop_on_completion<T>(self, drop_on_complete: T) -> RequestStrategy<T> {
|
||||
RequestStrategy {
|
||||
rs_quorum: self.rs_quorum,
|
||||
rs_send_all_at_once: self.rs_send_all_at_once,
|
||||
rs_priority: self.rs_priority,
|
||||
rs_timeout: self.rs_timeout,
|
||||
rs_drop_on_complete: drop_on_complete,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> RequestStrategy<T> {
|
||||
/// Set quorum to be reached for request
|
||||
pub fn with_quorum(mut self, quorum: usize) -> Self {
|
||||
self.rs_quorum = Some(quorum);
|
||||
|
@ -82,6 +109,19 @@ impl RequestStrategy {
|
|||
self.rs_timeout = Timeout::Custom(timeout);
|
||||
self
|
||||
}
|
||||
/// Extract drop_on_complete item
|
||||
fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) {
|
||||
(
|
||||
RequestStrategy {
|
||||
rs_quorum: self.rs_quorum,
|
||||
rs_send_all_at_once: self.rs_send_all_at_once,
|
||||
rs_priority: self.rs_priority,
|
||||
rs_timeout: self.rs_timeout,
|
||||
rs_drop_on_complete: (),
|
||||
},
|
||||
self.rs_drop_on_complete,
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
|
@ -122,7 +162,7 @@ impl RpcHelper {
|
|||
endpoint: &Endpoint<M, H>,
|
||||
to: Uuid,
|
||||
msg: N,
|
||||
strat: RequestStrategy,
|
||||
strat: RequestStrategy<()>,
|
||||
) -> Result<S, Error>
|
||||
where
|
||||
M: Rpc<Response = Result<S, Error>>,
|
||||
|
@ -182,7 +222,7 @@ impl RpcHelper {
|
|||
endpoint: &Endpoint<M, H>,
|
||||
to: &[Uuid],
|
||||
msg: N,
|
||||
strat: RequestStrategy,
|
||||
strat: RequestStrategy<()>,
|
||||
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
||||
where
|
||||
M: Rpc<Response = Result<S, Error>>,
|
||||
|
@ -197,7 +237,7 @@ impl RpcHelper {
|
|||
|
||||
let resps = join_all(
|
||||
to.iter()
|
||||
.map(|to| self.call(endpoint, *to, msg.clone(), strat)),
|
||||
.map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())),
|
||||
)
|
||||
.with_context(Context::current_with_span(span))
|
||||
.await;
|
||||
|
@ -212,7 +252,7 @@ impl RpcHelper {
|
|||
&self,
|
||||
endpoint: &Endpoint<M, H>,
|
||||
msg: N,
|
||||
strat: RequestStrategy,
|
||||
strat: RequestStrategy<()>,
|
||||
) -> Result<Vec<(Uuid, Result<S, Error>)>, Error>
|
||||
where
|
||||
M: Rpc<Response = Result<S, Error>>,
|
||||
|
@ -252,7 +292,7 @@ impl RpcHelper {
|
|||
endpoint: &Arc<Endpoint<M, H>>,
|
||||
to: &[Uuid],
|
||||
msg: N,
|
||||
strategy: RequestStrategy,
|
||||
strategy: RequestStrategy<()>,
|
||||
) -> Result<Vec<S>, Error>
|
||||
where
|
||||
M: Rpc<Response = Result<S, Error>> + 'static,
|
||||
|
@ -285,7 +325,7 @@ impl RpcHelper {
|
|||
endpoint: &Arc<Endpoint<M, H>>,
|
||||
to: &[Uuid],
|
||||
msg: N,
|
||||
strategy: RequestStrategy,
|
||||
strategy: RequestStrategy<()>,
|
||||
quorum: usize,
|
||||
) -> Result<Vec<S>, Error>
|
||||
where
|
||||
|
@ -316,6 +356,7 @@ impl RpcHelper {
|
|||
let self2 = self.clone();
|
||||
let msg = msg.clone();
|
||||
let endpoint2 = endpoint.clone();
|
||||
let strategy = strategy.clone();
|
||||
async move { self2.call(&endpoint2, to, msg, strategy).await }
|
||||
});
|
||||
|
||||
|
@ -388,18 +429,19 @@ impl RpcHelper {
|
|||
/// changes, where data has to be written both in the old layout and in the
|
||||
/// new one as long as all nodes have not successfully tranisitionned and
|
||||
/// moved all data to the new layout.
|
||||
pub async fn try_write_many_sets<M, N, H, S>(
|
||||
pub async fn try_write_many_sets<M, N, H, S, T>(
|
||||
&self,
|
||||
endpoint: &Arc<Endpoint<M, H>>,
|
||||
to_sets: &[Vec<Uuid>],
|
||||
msg: N,
|
||||
strategy: RequestStrategy,
|
||||
strategy: RequestStrategy<T>,
|
||||
) -> Result<Vec<S>, Error>
|
||||
where
|
||||
M: Rpc<Response = Result<S, Error>> + 'static,
|
||||
N: IntoReq<M>,
|
||||
H: StreamingEndpointHandler<M> + 'static,
|
||||
S: Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
let quorum = strategy
|
||||
.rs_quorum
|
||||
|
@ -423,12 +465,12 @@ impl RpcHelper {
|
|||
.await
|
||||
}
|
||||
|
||||
async fn try_write_many_sets_inner<M, N, H, S>(
|
||||
async fn try_write_many_sets_inner<M, N, H, S, T>(
|
||||
&self,
|
||||
endpoint: &Arc<Endpoint<M, H>>,
|
||||
to_sets: &[Vec<Uuid>],
|
||||
msg: N,
|
||||
strategy: RequestStrategy,
|
||||
strategy: RequestStrategy<T>,
|
||||
quorum: usize,
|
||||
) -> Result<Vec<S>, Error>
|
||||
where
|
||||
|
@ -436,11 +478,14 @@ impl RpcHelper {
|
|||
N: IntoReq<M>,
|
||||
H: StreamingEndpointHandler<M> + 'static,
|
||||
S: Send + 'static,
|
||||
T: Send + 'static,
|
||||
{
|
||||
// Peers may appear in many quorum sets. Here, build a list of peers,
|
||||
// mapping to the index of the quorum sets in which they appear.
|
||||
let mut result_tracker = QuorumSetResultTracker::new(to_sets, quorum);
|
||||
|
||||
let (strategy, drop_on_complete) = strategy.extract_drop_on_complete();
|
||||
|
||||
// Send one request to each peer of the quorum sets
|
||||
let msg = msg.into_req().map_err(garage_net::error::Error::from)?;
|
||||
let requests = result_tracker.nodes.keys().map(|peer| {
|
||||
|
@ -448,6 +493,7 @@ impl RpcHelper {
|
|||
let msg = msg.clone();
|
||||
let endpoint2 = endpoint.clone();
|
||||
let to = *peer;
|
||||
let strategy = strategy.clone();
|
||||
async move { (to, self2.call(&endpoint2, to, msg, strategy).await) }
|
||||
});
|
||||
let mut resp_stream = requests.collect::<FuturesUnordered<_>>();
|
||||
|
@ -463,6 +509,7 @@ impl RpcHelper {
|
|||
// Continue all other requets in background
|
||||
tokio::spawn(async move {
|
||||
resp_stream.collect::<Vec<(Uuid, Result<_, _>)>>().await;
|
||||
drop(drop_on_complete);
|
||||
});
|
||||
|
||||
return Ok(result_tracker.success_values());
|
||||
|
|
|
@ -60,6 +60,14 @@ pub struct Config {
|
|||
)]
|
||||
pub compression_level: Option<i32>,
|
||||
|
||||
/// Maximum amount of block data to buffer in RAM for sending to
|
||||
/// remote nodes when these nodes are on slower links
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_capacity",
|
||||
default = "default_block_ram_buffer_max"
|
||||
)]
|
||||
pub block_ram_buffer_max: usize,
|
||||
|
||||
/// Skip the permission check of secret files. Useful when
|
||||
/// POSIX ACLs (or more complex chmods) are used.
|
||||
#[serde(default)]
|
||||
|
@ -247,6 +255,9 @@ fn default_db_engine() -> String {
|
|||
fn default_block_size() -> usize {
|
||||
1048576
|
||||
}
|
||||
fn default_block_ram_buffer_max() -> usize {
|
||||
256 * 1024 * 1024
|
||||
}
|
||||
|
||||
fn default_consistency_mode() -> String {
|
||||
"consistent".into()
|
||||
|
|
Loading…
Reference in a new issue