Merge pull request 'Store data blocks only on nodes in the latest cluster layout version (fix #815)' (#956) from fix-815 into main
All checks were successful
ci/woodpecker/push/debug Pipeline was successful

Reviewed-on: #956
This commit is contained in:
Alex 2025-02-14 15:53:16 +00:00
commit 9312c6bbcb
4 changed files with 94 additions and 12 deletions

View file

@ -370,7 +370,7 @@ impl BlockManager {
prevent_compression: bool,
order_tag: Option<OrderTag>,
) -> Result<(), Error> {
let who = self.replication.write_sets(&hash);
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
let compression_level = self.compression_level.filter(|_| !prevent_compression);
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
@ -396,7 +396,7 @@ impl BlockManager {
.rpc_helper()
.try_write_many_sets(
&self.endpoint,
who.as_ref(),
&[who],
put_block_rpc,
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
.with_drop_on_completion(permit)

View file

@ -377,7 +377,10 @@ impl BlockResyncManager {
info!("Resync block {:?}: offloading and deleting", hash);
let existing_path = existing_path.unwrap();
let mut who = manager.replication.storage_nodes(hash);
let mut who = manager
.system
.cluster_layout()
.current_storage_nodes_of(hash);
if who.len() < manager.replication.write_quorum() {
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
}
@ -455,6 +458,25 @@ impl BlockResyncManager {
}
if rc.is_nonzero() && !exists {
// The refcount is > 0, and the block is not present locally.
// We might need to fetch it from another node.
// First, check whether we are still supposed to store that
// block in the latest cluster layout version.
let storage_nodes = manager
.system
.cluster_layout()
.current_storage_nodes_of(&hash);
if !storage_nodes.contains(&manager.system.id) {
info!(
"Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
hash
);
return Ok(());
}
// We know we need the block. Fetch it.
info!(
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
hash

View file

@ -219,6 +219,11 @@ impl LayoutHelper {
ret
}
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let ver = self.current();
ver.nodes_of(position, ver.replication_factor).collect()
}
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}

View file

@ -540,19 +540,73 @@ impl RpcHelper {
// ---- functions not related to MAKING RPCs, but just determining to what nodes
// they should be made and in which order ----
/// Determine to what nodes, and in what order, requests to read a data block
/// should be sent. All nodes in the Vec returned by this function are tried
/// one by one until there is one that returns the block (in block/manager.rs).
///
/// We want to have the best chance of finding the block in as few requests
/// as possible, and we want to avoid nodes that answer slowly.
///
/// Note that when there are several active layout versions, the block might
/// be stored only by nodes of the latest version (in case of a block that was
/// written after the layout change), or only by nodes of the oldest active
/// version (for all blocks that were written before). So we have to try nodes
/// of all layout versions. We also want to try nodes of all layout versions
/// fast, so as to optimize the chance of finding the block fast.
///
/// Therefore, the strategy is the following:
///
/// 1. ask first all nodes of all currently active layout versions
/// -> ask the preferred node in all layout versions (older to newer),
/// then the second preferred onde in all verions, etc.
/// -> we start by the oldest active layout version first, because a majority
/// of blocks will have been saved before the layout change
/// 2. ask all nodes of historical layout versions, for blocks which have not
/// yet been transferred to their new storage nodes
///
/// The preference order, for each layout version, is given by `request_order`,
/// based on factors such as nodes being in the same datacenter,
/// having low ping, etc.
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
let layout = self.0.layout.read().unwrap();
let mut ret = Vec::with_capacity(12);
let ver_iter = layout
.versions()
.iter()
.rev()
.chain(layout.inner().old_versions.iter().rev());
for ver in ver_iter {
if ver.version > layout.sync_map_min() {
continue;
// Compute, for each layout version, the set of nodes that might store
// the block, and put them in their preferred order as of `request_order`.
let mut vernodes = layout.versions().iter().map(|ver| {
let nodes = ver.nodes_of(position, ver.replication_factor);
rpc_helper.request_order(layout.current(), nodes)
});
let mut ret = if layout.versions().len() == 1 {
// If we have only one active layout version, then these are the
// only nodes we ask in step 1
vernodes.next().unwrap()
} else {
let vernodes = vernodes.collect::<Vec<_>>();
let mut nodes = Vec::<Uuid>::with_capacity(12);
for i in 0..layout.current().replication_factor {
for vn in vernodes.iter() {
if let Some(n) = vn.get(i) {
if !nodes.contains(&n) {
if *n == self.0.our_node_id {
// it's always fast (almost free) to ask locally,
// so always put that as first choice
nodes.insert(0, *n);
} else {
nodes.push(*n);
}
}
}
}
}
nodes
};
// Second step: add nodes of older layout versions
let old_ver_iter = layout.inner().old_versions.iter().rev();
for ver in old_ver_iter {
let nodes = ver.nodes_of(position, ver.replication_factor);
for node in rpc_helper.request_order(layout.current(), nodes) {
if !ret.contains(&node) {
@ -560,6 +614,7 @@ impl RpcHelper {
}
}
}
ret
}