From 0d3e285d133459fd53e28f879a86c0de1a0c36df Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 15:26:08 +0100 Subject: [PATCH 1/2] [fix-buffering] implement `block_ram_buffer_max` to avoid excessive RAM usage --- doc/book/reference-manual/configuration.md | 33 +++++++++++ doc/book/reference-manual/monitoring.md | 11 ++++ src/block/manager.rs | 17 +++++- src/block/metrics.rs | 15 +++++ src/model/k2v/rpc.rs | 6 +- src/rpc/rpc_helper.rs | 68 ++++++++++++++++++---- src/util/config.rs | 11 ++++ 7 files changed, 148 insertions(+), 13 deletions(-) diff --git a/doc/book/reference-manual/configuration.md b/doc/book/reference-manual/configuration.md index e6aced6d..1ac2051e 100644 --- a/doc/book/reference-manual/configuration.md +++ b/doc/book/reference-manual/configuration.md @@ -20,6 +20,7 @@ metadata_auto_snapshot_interval = "6h" db_engine = "lmdb" block_size = "1M" +block_ram_buffer_max = "256MiB" sled_cache_capacity = "128MiB" sled_flush_every_ms = 2000 @@ -88,6 +89,7 @@ The following gives details about each available configuration option. Top-level configuration options: [`allow_world_readable_secrets`](#allow_world_readable_secrets), +[`block_ram_buffer_max`](#block_ram_buffer_max), [`block_size`](#block_size), [`bootstrap_peers`](#bootstrap_peers), [`compression_level`](#compression_level), @@ -420,6 +422,37 @@ files will remain available. This however means that chunks from existing files will not be deduplicated with chunks from newly uploaded files, meaning you might use more storage space that is optimally possible. +#### `block_ram_buffer_max` (since v0.9.4) {#block_ram_buffer_max} + +A limit on the total size of data blocks kept in RAM by S3 API nodes awaiting +to be sent to storage nodes asynchronously. + +Explanation: since Garage wants to tolerate node failures, it uses quorum +writes to send data blocks to storage nodes: try to write the block to three +nodes, and return ok as soon as two writes complete. So even if all three nodes +are online, the third write always completes asynchronously. In general, there +are not many writes to a cluster, and the third asynchronous write can +terminate early enough so as to not cause unbounded RAM growth. However, if +the S3 API node is continuously receiving large quantities of data and the +third node is never able to catch up, many data blocks will be kept buffered in +RAM as they are awaiting transfer to the third node. + +The `block_ram_buffer_max` sets a limit to the size of buffers that can be kept +in RAM in this process. When the limit is reached, backpressure is applied +back to the S3 client. + +Note that this only counts buffers that have arrived to a certain stage of +processing (received from the client + encrypted and/or compressed as +necessary) and are ready to send to the storage nodes. Many other buffers will +not be counted and this is not a hard limit on RAM consumption. In particular, +if many clients send requests simultaneously with large objects, the RAM +consumption will always grow linearly with the number of concurrent requests, +as each request will use a few buffers of size `block_size` for receiving and +intermediate processing before even trying to send the data to the storage +node. + +The default value is 256MiB. + #### `sled_cache_capacity` {#sled_cache_capacity} This parameter can be used to tune the capacity of the cache used by diff --git a/doc/book/reference-manual/monitoring.md b/doc/book/reference-manual/monitoring.md index f392c133..53608d95 100644 --- a/doc/book/reference-manual/monitoring.md +++ b/doc/book/reference-manual/monitoring.md @@ -225,6 +225,17 @@ block_bytes_read 120586322022 block_bytes_written 3386618077 ``` +#### `block_ram_buffer_free_kb` (gauge) + +Kibibytes available for buffering blocks that have to be sent to remote nodes. +When clients send too much data to this node and a storage node is not receiving +data fast enough due to slower network conditions, this will decrease down to +zero and backpressure will be applied. + +``` +block_ram_buffer_free_kb 219829 +``` + #### `block_compression_level` (counter) Exposes the block compression level configured for the Garage node. diff --git a/src/block/manager.rs b/src/block/manager.rs index 34d854b9..2c7c7aba 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -1,3 +1,4 @@ +use std::convert::TryInto; use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -10,7 +11,7 @@ use serde::{Deserialize, Serialize}; use tokio::fs; use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader}; -use tokio::sync::{mpsc, Mutex, MutexGuard}; +use tokio::sync::{mpsc, Mutex, MutexGuard, Semaphore}; use opentelemetry::{ trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer}, @@ -93,6 +94,7 @@ pub struct BlockManager { pub(crate) system: Arc, pub(crate) endpoint: Arc>, + buffer_kb_semaphore: Arc, pub(crate) metrics: BlockManagerMetrics, @@ -152,11 +154,14 @@ impl BlockManager { .netapp .endpoint("garage_block/manager.rs/Rpc".to_string()); + let buffer_kb_semaphore = Arc::new(Semaphore::new(config.block_ram_buffer_max / 1024)); + let metrics = BlockManagerMetrics::new( config.compression_level, rc.rc.clone(), resync.queue.clone(), resync.errors.clone(), + buffer_kb_semaphore.clone(), ); let scrub_persister = PersisterShared::new(&system.metadata_dir, "scrub_info"); @@ -176,6 +181,7 @@ impl BlockManager { resync, system, endpoint, + buffer_kb_semaphore, metrics, scrub_persister, tx_scrub_command: ArcSwapOption::new(None), @@ -361,6 +367,14 @@ impl BlockManager { let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await .into_parts(); + + let permit = self + .buffer_kb_semaphore + .clone() + .acquire_many_owned((bytes.len() / 1024).try_into().unwrap()) + .await + .ok_or_message("could not reserve space for buffer of data to send to remote nodes")?; + let put_block_rpc = Req::new(BlockRpc::PutBlock { hash, header })?.with_stream_from_buffer(bytes); let put_block_rpc = if let Some(tag) = order_tag { @@ -376,6 +390,7 @@ impl BlockManager { &who[..], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) + .with_drop_on_completion(permit) .with_quorum(self.replication.write_quorum()), ) .await?; diff --git a/src/block/metrics.rs b/src/block/metrics.rs index 6659df32..c989f940 100644 --- a/src/block/metrics.rs +++ b/src/block/metrics.rs @@ -1,3 +1,7 @@ +use std::sync::Arc; + +use tokio::sync::Semaphore; + use opentelemetry::{global, metrics::*}; use garage_db as db; @@ -9,6 +13,7 @@ pub struct BlockManagerMetrics { pub(crate) _rc_size: ValueObserver, pub(crate) _resync_queue_len: ValueObserver, pub(crate) _resync_errored_blocks: ValueObserver, + pub(crate) _buffer_free_kb: ValueObserver, pub(crate) resync_counter: BoundCounter, pub(crate) resync_error_counter: BoundCounter, @@ -31,6 +36,7 @@ impl BlockManagerMetrics { rc_tree: db::Tree, resync_queue: CountedTree, resync_errors: CountedTree, + buffer_semaphore: Arc, ) -> Self { let meter = global::meter("garage_model/block"); Self { @@ -66,6 +72,15 @@ impl BlockManagerMetrics { .with_description("Number of block hashes whose last resync resulted in an error") .init(), + _buffer_free_kb: meter + .u64_value_observer("block.ram_buffer_free_kb", move |observer| { + observer.observe(buffer_semaphore.available_permits() as u64, &[]) + }) + .with_description( + "Available RAM in KiB to use for buffering data blocks to be written to remote nodes", + ) + .init(), + resync_counter: meter .u64_counter("block.resync_counter") .with_description("Number of calls to resync_block") diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 4ab44c22..af7df341 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -300,7 +300,11 @@ impl K2VRpcHandler { let rs = RequestStrategy::with_priority(PRIO_NORMAL).without_timeout(); let mut requests = nodes .iter() - .map(|node| self.system.rpc.call(&self.endpoint, *node, msg.clone(), rs)) + .map(|node| { + self.system + .rpc + .call(&self.endpoint, *node, msg.clone(), rs.clone()) + }) .collect::>(); // Fetch responses. This procedure stops fetching responses when any of the following diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index c46e577f..a1b7951c 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -33,8 +33,7 @@ use crate::ring::Ring; const DEFAULT_TIMEOUT: Duration = Duration::from_secs(300); /// Strategy to apply when making RPC -#[derive(Copy, Clone)] -pub struct RequestStrategy { +pub struct RequestStrategy { /// Min number of response to consider the request successful pub rs_quorum: Option, /// Should requests be dropped after enough response are received @@ -43,6 +42,8 @@ pub struct RequestStrategy { pub rs_priority: RequestPriority, /// Custom timeout for this request rs_timeout: Timeout, + /// Data to drop when everything completes + rs_drop_on_complete: T, } #[derive(Copy, Clone)] @@ -52,7 +53,19 @@ enum Timeout { Custom(Duration), } -impl RequestStrategy { +impl Clone for RequestStrategy<()> { + fn clone(&self) -> Self { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_interrupt_after_quorum: self.rs_interrupt_after_quorum, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + } + } +} + +impl RequestStrategy<()> { /// Create a RequestStrategy with default timeout and not interrupting when quorum reached pub fn with_priority(prio: RequestPriority) -> Self { RequestStrategy { @@ -60,8 +73,22 @@ impl RequestStrategy { rs_interrupt_after_quorum: false, rs_priority: prio, rs_timeout: Timeout::Default, + rs_drop_on_complete: (), } } + /// Add an item to be dropped on completion + pub fn with_drop_on_completion(self, drop_on_complete: T) -> RequestStrategy { + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_interrupt_after_quorum: self.rs_interrupt_after_quorum, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: drop_on_complete, + } + } +} + +impl RequestStrategy { /// Set quorum to be reached for request pub fn with_quorum(mut self, quorum: usize) -> Self { self.rs_quorum = Some(quorum); @@ -83,6 +110,19 @@ impl RequestStrategy { self.rs_timeout = Timeout::Custom(timeout); self } + /// Extract drop_on_complete item + fn extract_drop_on_complete(self) -> (RequestStrategy<()>, T) { + ( + RequestStrategy { + rs_quorum: self.rs_quorum, + rs_interrupt_after_quorum: self.rs_interrupt_after_quorum, + rs_priority: self.rs_priority, + rs_timeout: self.rs_timeout, + rs_drop_on_complete: (), + }, + self.rs_drop_on_complete, + ) + } } #[derive(Clone)] @@ -123,7 +163,7 @@ impl RpcHelper { endpoint: &Endpoint, to: Uuid, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result where M: Rpc>, @@ -176,7 +216,7 @@ impl RpcHelper { endpoint: &Endpoint, to: &[Uuid], msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result)>, Error> where M: Rpc>, @@ -187,7 +227,7 @@ impl RpcHelper { let resps = join_all( to.iter() - .map(|to| self.call(endpoint, *to, msg.clone(), strat)), + .map(|to| self.call(endpoint, *to, msg.clone(), strat.clone())), ) .await; Ok(to @@ -201,7 +241,7 @@ impl RpcHelper { &self, endpoint: &Endpoint, msg: N, - strat: RequestStrategy, + strat: RequestStrategy<()>, ) -> Result)>, Error> where M: Rpc>, @@ -220,18 +260,19 @@ impl RpcHelper { /// Make a RPC call to multiple servers, returning either a Vec of responses, /// or an error if quorum could not be reached due to too many errors - pub async fn try_call_many( + pub async fn try_call_many( &self, endpoint: &Arc>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy, ) -> Result, Error> where M: Rpc> + 'static, N: IntoReq, H: StreamingEndpointHandler + 'static, S: Send + 'static, + T: Send + 'static, { let quorum = strategy.rs_quorum.unwrap_or(to.len()); @@ -260,12 +301,12 @@ impl RpcHelper { .await } - async fn try_call_many_internal( + async fn try_call_many_internal( &self, endpoint: &Arc>, to: &[Uuid], msg: N, - strategy: RequestStrategy, + strategy: RequestStrategy, quorum: usize, ) -> Result, Error> where @@ -273,9 +314,12 @@ impl RpcHelper { N: IntoReq, H: StreamingEndpointHandler + 'static, S: Send + 'static, + T: Send + 'static, { let msg = msg.into_req().map_err(garage_net::error::Error::from)?; + let (strategy, drop_on_complete) = strategy.extract_drop_on_complete(); + // Build future for each request // They are not started now: they are added below in a FuturesUnordered // object that will take care of polling them (see below) @@ -283,6 +327,7 @@ impl RpcHelper { let self2 = self.clone(); let msg = msg.clone(); let endpoint2 = endpoint.clone(); + let strategy = strategy.clone(); (to, async move { self2.call(&endpoint2, to, msg, strategy).await }) @@ -377,6 +422,7 @@ impl RpcHelper { // they have to be put in a proper queue that is persisted to disk. tokio::spawn(async move { resp_stream.collect::>>().await; + drop(drop_on_complete); }); } } diff --git a/src/util/config.rs b/src/util/config.rs index 8ecbdfbb..5372a1ec 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -52,6 +52,14 @@ pub struct Config { )] pub compression_level: Option, + /// Maximum amount of block data to buffer in RAM for sending to + /// remote nodes when these nodes are on slower links + #[serde( + deserialize_with = "deserialize_capacity", + default = "default_block_ram_buffer_max" + )] + pub block_ram_buffer_max: usize, + /// Skip the permission check of secret files. Useful when /// POSIX ACLs (or more complex chmods) are used. #[serde(default)] @@ -255,6 +263,9 @@ fn default_sled_flush_every_ms() -> u64 { fn default_block_size() -> usize { 1048576 } +fn default_block_ram_buffer_max() -> usize { + 256 * 1024 * 1024 +} fn default_compression() -> Option { Some(1) From 85f580cbde4913fe8382316ff3c27b8443c61dd7 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 27 Mar 2024 16:00:46 +0100 Subject: [PATCH 2/2] [fix-buffering] change request sending strategy and fix priorities remove LAS, priorize new requests but otherwise just do standard queuing --- src/block/manager.rs | 20 ++++++++--- src/block/resync.rs | 6 ++-- src/net/message.rs | 18 ++++++++++ src/net/send.rs | 79 +++++++++++++++++++++++--------------------- src/net/server.rs | 2 +- 5 files changed, 80 insertions(+), 45 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 2c7c7aba..62829a24 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -238,10 +238,16 @@ impl BlockManager { async fn rpc_get_raw_block_streaming( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) }) - .await + self.rpc_get_raw_block_internal( + hash, + priority, + order_tag, + |stream| async move { Ok(stream) }, + ) + .await } /// Ask nodes that might have a (possibly compressed) block for it @@ -249,9 +255,10 @@ impl BlockManager { pub(crate) async fn rpc_get_raw_block( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option, ) -> Result { - self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move { + self.rpc_get_raw_block_internal(hash, priority, order_tag, |block_stream| async move { let (header, stream) = block_stream.into_parts(); read_stream_to_end(stream) .await @@ -264,6 +271,7 @@ impl BlockManager { async fn rpc_get_raw_block_internal( &self, hash: &Hash, + priority: RequestPriority, order_tag: Option, f: F, ) -> Result @@ -279,7 +287,7 @@ impl BlockManager { let rpc = self.endpoint.call_streaming( &node_id, BlockRpc::GetBlock(*hash, order_tag), - PRIO_NORMAL | PRIO_SECONDARY, + priority, ); tokio::select! { res = rpc => { @@ -331,7 +339,9 @@ impl BlockManager { hash: &Hash, order_tag: Option, ) -> Result { - let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?; + let block_stream = self + .rpc_get_raw_block_streaming(hash, PRIO_NORMAL | PRIO_SECONDARY, order_tag) + .await?; let (header, stream) = block_stream.into_parts(); match header { DataBlockHeader::Plain => Ok(stream), diff --git a/src/block/resync.rs b/src/block/resync.rs index 9c1da4a7..7221b093 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -436,7 +436,7 @@ impl BlockResyncManager { &manager.endpoint, &need_nodes[..], put_block_message, - RequestStrategy::with_priority(PRIO_BACKGROUND) + RequestStrategy::with_priority(PRIO_BACKGROUND | PRIO_SECONDARY) .with_quorum(need_nodes.len()), ) .await @@ -460,7 +460,9 @@ impl BlockResyncManager { hash ); - let block_data = manager.rpc_get_raw_block(hash, None).await?; + let block_data = manager + .rpc_get_raw_block(hash, PRIO_BACKGROUND | PRIO_SECONDARY, None) + .await?; manager.metrics.resync_recv_counter.add(1); diff --git a/src/net/message.rs b/src/net/message.rs index b0d255c6..af98ca12 100644 --- a/src/net/message.rs +++ b/src/net/message.rs @@ -28,12 +28,30 @@ use crate::util::*; /// The same priority value is given to a request and to its associated response. pub type RequestPriority = u8; +// Usage of priority levels in Garage: +// +// PRIO_HIGH +// for liveness check events such as pings and important +// reconfiguration events such as layout changes +// +// PRIO_NORMAL +// for standard interactive requests to exchange metadata +// +// PRIO_NORMAL | PRIO_SECONDARY +// for standard interactive requests to exchange block data +// +// PRIO_BACKGROUND +// for background resync requests to exchange metadata +// PRIO_BACKGROUND | PRIO_SECONDARY +// for background resync requests to exchange block data + /// Priority class: high pub const PRIO_HIGH: RequestPriority = 0x20; /// Priority class: normal pub const PRIO_NORMAL: RequestPriority = 0x40; /// Priority class: background pub const PRIO_BACKGROUND: RequestPriority = 0x80; + /// Priority: primary among given class pub const PRIO_PRIMARY: RequestPriority = 0x00; /// Priority: secondary among given class (ex: `PRIO_HIGH | PRIO_SECONDARY`) diff --git a/src/net/send.rs b/src/net/send.rs index 0db0ba77..c60fc6b2 100644 --- a/src/net/send.rs +++ b/src/net/send.rs @@ -109,7 +109,7 @@ impl SendQueuePriority { let i = order_vec.iter().take_while(|o2| **o2 < order).count(); order_vec.insert(i, order); } - self.items.push_front(item); + self.items.push_back(item); } fn remove(&mut self, id: RequestID) { if let Some(i) = self.items.iter().position(|x| x.id == id) { @@ -128,51 +128,56 @@ impl SendQueuePriority { self.items.is_empty() } fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> { - for (j, item) in self.items.iter_mut().enumerate() { - if let Some(OrderTag(stream, order)) = item.order_tag { - if order > *self.order.get(&stream).unwrap().front().unwrap() { + // in step 1: poll only streams that have sent 0 bytes, we want to send them in priority + // as they most likely represent small requests to be sent first + // in step 2: poll all streams + for step in 0..2 { + for (j, item) in self.items.iter_mut().enumerate() { + if let Some(OrderTag(stream, order)) = item.order_tag { + if order > *self.order.get(&stream).unwrap().front().unwrap() { + continue; + } + } + + if step == 0 && item.sent > 0 { continue; } - } - let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); - if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { - let id = item.id; - let eos = item.data.eos(); + let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); + if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) { + let id = item.id; + let eos = item.data.eos(); - let packet = bytes_or_err.map_err(|e| match e { - ReadExactError::Stream(err) => err, - _ => unreachable!(), - }); + let packet = bytes_or_err.map_err(|e| match e { + ReadExactError::Stream(err) => err, + _ => unreachable!(), + }); - let is_err = packet.is_err(); - let data_frame = DataFrame::from_packet(packet, !eos); - item.sent += data_frame.data().len(); + let is_err = packet.is_err(); + let data_frame = DataFrame::from_packet(packet, !eos); + item.sent += data_frame.data().len(); - if eos || is_err { - // If item had an order tag, remove it from the corresponding ordering list - if let Some(OrderTag(stream, order)) = item.order_tag { - let order_stream = self.order.get_mut(&stream).unwrap(); - assert_eq!(order_stream.pop_front(), Some(order)); - if order_stream.is_empty() { - self.order.remove(&stream); - } - } - // Remove item from sending queue - self.items.remove(j); - } else { - // Move item later in send queue to implement LAS scheduling - // (LAS = Least Attained Service) - for k in j..self.items.len() - 1 { - if self.items[k].sent >= self.items[k + 1].sent { - self.items.swap(k, k + 1); - } else { - break; + if eos || is_err { + // If item had an order tag, remove it from the corresponding ordering list + if let Some(OrderTag(stream, order)) = item.order_tag { + let order_stream = self.order.get_mut(&stream).unwrap(); + assert_eq!(order_stream.pop_front(), Some(order)); + if order_stream.is_empty() { + self.order.remove(&stream); + } } + // Remove item from sending queue + self.items.remove(j); + } else if step == 0 { + // Step 0 means that this stream had not sent any bytes yet. + // Now that it has, and it was not an EOS, we know that it is bigger + // than one chunk so move it at the end of the queue. + let item = self.items.remove(j).unwrap(); + self.items.push_back(item); } + + return Poll::Ready((id, data_frame)); } - - return Poll::Ready((id, data_frame)); } } diff --git a/src/net/server.rs b/src/net/server.rs index 55b9e678..36dccb2f 100644 --- a/src/net/server.rs +++ b/src/net/server.rs @@ -190,7 +190,7 @@ impl RecvLoop for ServerConn { let (prio, resp_enc_result) = match ReqEnc::decode(stream).await { Ok(req_enc) => (req_enc.prio, self2.recv_handler_aux(req_enc).await), - Err(e) => (PRIO_HIGH, Err(e)), + Err(e) => (PRIO_NORMAL, Err(e)), }; debug!("server: sending response to {}", id);