layout: move block_read_nodes_of to rpc_helper to avoid double-locking
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

(in theory, this could have caused a deadlock)
This commit is contained in:
Alex 2023-12-08 12:02:24 +01:00
parent 063294dd56
commit 5dd200c015
Signed by: lx
GPG Key ID: 0E496D15096376BE
3 changed files with 80 additions and 70 deletions

View File

@ -266,7 +266,7 @@ impl BlockManager {
{
let who = self
.system
.cluster_layout()
.rpc_helper()
.block_read_nodes_of(hash, self.system.rpc_helper());
for node in who.iter() {

View File

@ -8,7 +8,6 @@ use garage_util::data::*;
use super::schema::*;
use crate::replication_mode::ReplicationMode;
use crate::rpc_helper::RpcHelper;
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
pub struct LayoutDigest {
@ -155,6 +154,10 @@ impl LayoutHelper {
self.ack_map_min
}
pub fn all_sync(&self) -> u64 {
self.sync_map_min
}
pub fn sync_versions(&self) -> (u64, u64, u64) {
(
self.layout().current().version,
@ -177,28 +180,6 @@ impl LayoutHelper {
.collect()
}
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
let mut ret = Vec::with_capacity(12);
let ver_iter = self
.layout()
.versions
.iter()
.rev()
.chain(self.layout().old_versions.iter().rev());
for ver in ver_iter {
if ver.version > self.sync_map_min {
continue;
}
let nodes = ver.nodes_of(position, ver.replication_factor);
for node in rpc_helper.request_order(nodes) {
if !ret.contains(&node) {
ret.push(node);
}
}
}
ret
}
pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions

View File

@ -26,7 +26,7 @@ use garage_util::data::*;
use garage_util::error::Error;
use garage_util::metrics::RecordDuration;
use crate::layout::LayoutHelper;
use crate::layout::{LayoutHelper, LayoutHistory};
use crate::metrics::RpcMetrics;
// Default RPC timeout = 5 minutes
@ -304,7 +304,7 @@ impl RpcHelper {
// preemptively send an additional request to any remaining nodes.
// Reorder requests to priorize closeness / low latency
let request_order = self.request_order(to.iter().copied());
let request_order = self.request_order(&self.0.layout.read().unwrap(), to.iter().copied());
let send_all_at_once = strategy.rs_send_all_at_once.unwrap_or(false);
// Build future for each request
@ -368,50 +368,6 @@ impl RpcHelper {
}
}
pub fn request_order(&self, nodes: impl Iterator<Item = Uuid>) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
let layout = self.0.layout.read().unwrap();
let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
// Augment requests with some information used to sort them.
// The tuples are as follows:
// (is another node?, is another zone?, latency, node ID, request future)
// We store all of these tuples in a vec that we can sort.
// By sorting this vec, we priorize ourself, then nodes in the same zone,
// and within a same zone we priorize nodes with the lowest latency.
let mut nodes = nodes
.map(|to| {
let peer_zone = match layout.current().node_role(&to) {
Some(pc) => &pc.zone,
None => "",
};
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(10));
(
to != self.0.our_node_id,
peer_zone != our_zone,
peer_avg_ping,
to,
)
})
.collect::<Vec<_>>();
// Sort requests by (priorize ourself, priorize same zone, priorize low latency)
nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
nodes
.into_iter()
.map(|(_, _, _, to)| to)
.collect::<Vec<_>>()
}
/// Make a RPC call to multiple servers, returning either a Vec of responses,
/// or an error if quorum could not be reached due to too many errors
///
@ -533,6 +489,79 @@ impl RpcHelper {
// Failure, could not get quorum
Err(result_tracker.quorum_error())
}
// ---- functions not related to MAKING RPCs, but just determining to what nodes
// they should be made and in which order ----
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
let layout = self.0.layout.read().unwrap();
let mut ret = Vec::with_capacity(12);
let ver_iter = layout
.versions
.iter()
.rev()
.chain(layout.old_versions.iter().rev());
for ver in ver_iter {
if ver.version > layout.all_sync() {
continue;
}
let nodes = ver.nodes_of(position, ver.replication_factor);
for node in rpc_helper.request_order(&layout, nodes) {
if !ret.contains(&node) {
ret.push(node);
}
}
}
ret
}
fn request_order(
&self,
layout: &LayoutHistory,
nodes: impl Iterator<Item = Uuid>,
) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list();
let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone,
None => "",
};
// Augment requests with some information used to sort them.
// The tuples are as follows:
// (is another node?, is another zone?, latency, node ID, request future)
// We store all of these tuples in a vec that we can sort.
// By sorting this vec, we priorize ourself, then nodes in the same zone,
// and within a same zone we priorize nodes with the lowest latency.
let mut nodes = nodes
.map(|to| {
let peer_zone = match layout.current().node_role(&to) {
Some(pc) => &pc.zone,
None => "",
};
let peer_avg_ping = peer_list
.iter()
.find(|x| x.id.as_ref() == to.as_slice())
.and_then(|pi| pi.avg_ping)
.unwrap_or_else(|| Duration::from_secs(10));
(
to != self.0.our_node_id,
peer_zone != our_zone,
peer_avg_ping,
to,
)
})
.collect::<Vec<_>>();
// Sort requests by (priorize ourself, priorize same zone, priorize low latency)
nodes.sort_by_key(|(diffnode, diffzone, ping, _to)| (*diffnode, *diffzone, *ping));
nodes
.into_iter()
.map(|(_, _, _, to)| to)
.collect::<Vec<_>>()
}
}
// ------- utility for tracking successes/errors among write sets --------