Store data blocks only on nodes in the latest cluster layout version (fix #815) #956
4 changed files with 94 additions and 12 deletions
|
@ -370,7 +370,7 @@ impl BlockManager {
|
|||
prevent_compression: bool,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<(), Error> {
|
||||
let who = self.replication.write_sets(&hash);
|
||||
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
|
||||
|
||||
let compression_level = self.compression_level.filter(|_| !prevent_compression);
|
||||
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
|
||||
|
@ -396,7 +396,7 @@ impl BlockManager {
|
|||
.rpc_helper()
|
||||
.try_write_many_sets(
|
||||
&self.endpoint,
|
||||
who.as_ref(),
|
||||
&[who],
|
||||
put_block_rpc,
|
||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||
.with_drop_on_completion(permit)
|
||||
|
|
|
@ -377,7 +377,10 @@ impl BlockResyncManager {
|
|||
info!("Resync block {:?}: offloading and deleting", hash);
|
||||
let existing_path = existing_path.unwrap();
|
||||
|
||||
let mut who = manager.replication.storage_nodes(hash);
|
||||
let mut who = manager
|
||||
.system
|
||||
.cluster_layout()
|
||||
.current_storage_nodes_of(hash);
|
||||
if who.len() < manager.replication.write_quorum() {
|
||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||
}
|
||||
|
@ -455,6 +458,25 @@ impl BlockResyncManager {
|
|||
}
|
||||
|
||||
if rc.is_nonzero() && !exists {
|
||||
// The refcount is > 0, and the block is not present locally.
|
||||
// We might need to fetch it from another node.
|
||||
|
||||
// First, check whether we are still supposed to store that
|
||||
// block in the latest cluster layout version.
|
||||
let storage_nodes = manager
|
||||
.system
|
||||
.cluster_layout()
|
||||
.current_storage_nodes_of(&hash);
|
||||
|
||||
if !storage_nodes.contains(&manager.system.id) {
|
||||
info!(
|
||||
"Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.",
|
||||
hash
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// We know we need the block. Fetch it.
|
||||
info!(
|
||||
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
|
||||
hash
|
||||
|
|
|
@ -219,6 +219,11 @@ impl LayoutHelper {
|
|||
ret
|
||||
}
|
||||
|
||||
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
||||
let ver = self.current();
|
||||
ver.nodes_of(position, ver.replication_factor).collect()
|
||||
}
|
||||
|
||||
pub fn trackers_hash(&self) -> Hash {
|
||||
self.trackers_hash
|
||||
}
|
||||
|
|
|
@ -540,19 +540,73 @@ impl RpcHelper {
|
|||
// ---- functions not related to MAKING RPCs, but just determining to what nodes
|
||||
// they should be made and in which order ----
|
||||
|
||||
/// Determine to what nodes, and in what order, requests to read a data block
|
||||
/// should be sent. All nodes in the Vec returned by this function are tried
|
||||
/// one by one until there is one that returns the block (in block/manager.rs).
|
||||
///
|
||||
/// We want to have the best chance of finding the block in as few requests
|
||||
/// as possible, and we want to avoid nodes that answer slowly.
|
||||
///
|
||||
/// Note that when there are several active layout versions, the block might
|
||||
/// be stored only by nodes of the latest version (in case of a block that was
|
||||
/// written after the layout change), or only by nodes of the oldest active
|
||||
/// version (for all blocks that were written before). So we have to try nodes
|
||||
/// of all layout versions. We also want to try nodes of all layout versions
|
||||
/// fast, so as to optimize the chance of finding the block fast.
|
||||
///
|
||||
/// Therefore, the strategy is the following:
|
||||
///
|
||||
/// 1. ask first all nodes of all currently active layout versions
|
||||
/// -> ask the preferred node in all layout versions (older to newer),
|
||||
/// then the second preferred onde in all verions, etc.
|
||||
/// -> we start by the oldest active layout version first, because a majority
|
||||
/// of blocks will have been saved before the layout change
|
||||
/// 2. ask all nodes of historical layout versions, for blocks which have not
|
||||
/// yet been transferred to their new storage nodes
|
||||
///
|
||||
/// The preference order, for each layout version, is given by `request_order`,
|
||||
/// based on factors such as nodes being in the same datacenter,
|
||||
/// having low ping, etc.
|
||||
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
|
||||
let layout = self.0.layout.read().unwrap();
|
||||
|
||||
let mut ret = Vec::with_capacity(12);
|
||||
let ver_iter = layout
|
||||
.versions()
|
||||
.iter()
|
||||
.rev()
|
||||
.chain(layout.inner().old_versions.iter().rev());
|
||||
for ver in ver_iter {
|
||||
if ver.version > layout.sync_map_min() {
|
||||
continue;
|
||||
// Compute, for each layout version, the set of nodes that might store
|
||||
// the block, and put them in their preferred order as of `request_order`.
|
||||
let mut vernodes = layout.versions().iter().map(|ver| {
|
||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
||||
rpc_helper.request_order(layout.current(), nodes)
|
||||
});
|
||||
|
||||
let mut ret = if layout.versions().len() == 1 {
|
||||
// If we have only one active layout version, then these are the
|
||||
// only nodes we ask in step 1
|
||||
vernodes.next().unwrap()
|
||||
} else {
|
||||
let vernodes = vernodes.collect::<Vec<_>>();
|
||||
|
||||
let mut nodes = Vec::<Uuid>::with_capacity(12);
|
||||
for i in 0..layout.current().replication_factor {
|
||||
for vn in vernodes.iter() {
|
||||
if let Some(n) = vn.get(i) {
|
||||
if !nodes.contains(&n) {
|
||||
if *n == self.0.our_node_id {
|
||||
// it's always fast (almost free) to ask locally,
|
||||
// so always put that as first choice
|
||||
nodes.insert(0, *n);
|
||||
} else {
|
||||
nodes.push(*n);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nodes
|
||||
};
|
||||
|
||||
// Second step: add nodes of older layout versions
|
||||
let old_ver_iter = layout.inner().old_versions.iter().rev();
|
||||
for ver in old_ver_iter {
|
||||
let nodes = ver.nodes_of(position, ver.replication_factor);
|
||||
for node in rpc_helper.request_order(layout.current(), nodes) {
|
||||
if !ret.contains(&node) {
|
||||
|
@ -560,6 +614,7 @@ impl RpcHelper {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
ret
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue