From d6d239fc7909cbd017da6ea35cceb3d561a87cca Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 27 Nov 2023 11:52:57 +0100 Subject: [PATCH] block manager: read_block using old layout versions if necessary --- src/block/manager.rs | 6 ++++-- src/rpc/layout/helper.rs | 23 +++++++++++++++++++++++ src/rpc/layout/history.rs | 12 +++++++++++- src/rpc/layout/schema.rs | 7 +++++++ src/rpc/rpc_helper.rs | 11 +++++------ 5 files changed, 50 insertions(+), 9 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index be2e49513..471111609 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -264,8 +264,10 @@ impl BlockManager { F: Fn(DataBlockHeader, ByteStream) -> Fut, Fut: futures::Future>, { - let who = self.replication.read_nodes(hash); - let who = self.system.rpc_helper().request_order(&who); + let who = self + .system + .cluster_layout() + .block_read_nodes_of(hash, self.system.rpc_helper()); for node in who.iter() { let node_id = NodeID::from(*node); diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 0d746ea38..5d159f3e9 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -7,6 +7,7 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; use super::schema::*; +use crate::rpc_helper::RpcHelper; #[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)] pub struct LayoutDigest { @@ -140,6 +141,28 @@ impl LayoutHelper { .collect() } + pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec { + let mut ret = Vec::with_capacity(12); + let ver_iter = self + .layout() + .versions + .iter() + .rev() + .chain(self.layout().old_versions.iter().rev()); + for ver in ver_iter { + if ver.version > self.sync_map_min { + continue; + } + let nodes = ver.nodes_of(position, ver.replication_factor); + for node in rpc_helper.request_order(nodes) { + if !ret.contains(&node) { + ret.push(node); + } + } + } + ret + } + pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec> { self.layout() .versions diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 653d2a481..7d4a1b48c 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -18,6 +18,7 @@ impl LayoutHistory { LayoutHistory { versions: vec![version], + old_versions: vec![], update_trackers: Default::default(), staging: Lww::raw(0, staging), } @@ -86,11 +87,20 @@ impl LayoutHistory { .min(&all_nongateway_nodes, min_version); if self.min_stored() < sync_ack_map_min { let removed = self.versions.remove(0); - info!("Layout history: pruning old version {}", removed.version); + info!( + "Layout history: moving version {} to old_versions", + removed.version + ); + self.old_versions.push(removed); } else { break; } } + + while self.old_versions.len() > OLD_VERSION_COUNT { + let removed = self.old_versions.remove(0); + info!("Layout history: removing old_version {}", removed.version); + } } pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) { diff --git a/src/rpc/layout/schema.rs b/src/rpc/layout/schema.rs index 00a2c017f..08db44ca1 100644 --- a/src/rpc/layout/schema.rs +++ b/src/rpc/layout/schema.rs @@ -193,12 +193,18 @@ mod v010 { use std::collections::BTreeMap; pub use v09::{LayoutParameters, NodeRole, NodeRoleV, ZoneRedundancy}; + pub const OLD_VERSION_COUNT: usize = 5; + /// The history of cluster layouts, with trackers to keep a record /// of which nodes are up-to-date to current cluster data #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)] pub struct LayoutHistory { /// The versions currently in use in the cluster pub versions: Vec, + /// At most 5 of the previous versions, not used by the garage_table + /// module, but usefull for the garage_block module to find data blocks + /// that have not yet been moved + pub old_versions: Vec, /// Update trackers pub update_trackers: UpdateTrackers, @@ -300,6 +306,7 @@ mod v010 { }; Self { versions: vec![version], + old_versions: vec![], update_trackers: UpdateTrackers { ack_map: update_tracker.clone(), sync_map: update_tracker.clone(), diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index 7e9fabd75..e9a9143f3 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -267,7 +267,7 @@ impl RpcHelper { // When there are errors, we start new requests to compensate. // Reorder requests to priorize closeness / low latency - let request_order = self.request_order(to); + let request_order = self.request_order(to.iter().copied()); let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false); // Build future for each request @@ -335,7 +335,7 @@ impl RpcHelper { } } - pub fn request_order(&self, nodes: &[Uuid]) -> Vec { + pub fn request_order(&self, nodes: impl Iterator) -> Vec { // Retrieve some status variables that we will use to sort requests let peer_list = self.0.fullmesh.get_peer_list(); let layout = self.0.layout.read().unwrap(); @@ -351,9 +351,8 @@ impl RpcHelper { // By sorting this vec, we priorize ourself, then nodes in the same zone, // and within a same zone we priorize nodes with the lowest latency. let mut nodes = nodes - .iter() .map(|to| { - let peer_zone = match layout.current().node_role(to) { + let peer_zone = match layout.current().node_role(&to) { Some(pc) => &pc.zone, None => "", }; @@ -363,10 +362,10 @@ impl RpcHelper { .and_then(|pi| pi.avg_ping) .unwrap_or_else(|| Duration::from_secs(10)); ( - *to != self.0.our_node_id, + to != self.0.our_node_id, peer_zone != our_zone, peer_avg_ping, - *to, + to, ) }) .collect::>();