diff --git a/src/block/manager.rs b/src/block/manager.rs index 537e1fc1..572bdadd 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -370,7 +370,7 @@ impl BlockManager { prevent_compression: bool, order_tag: Option, ) -> Result<(), Error> { - let who = self.replication.write_sets(&hash); + let who = self.system.cluster_layout().current_storage_nodes_of(&hash); let compression_level = self.compression_level.filter(|_| !prevent_compression); let (header, bytes) = DataBlock::from_buffer(data, compression_level) @@ -396,7 +396,7 @@ impl BlockManager { .rpc_helper() .try_write_many_sets( &self.endpoint, - who.as_ref(), + &[who], put_block_rpc, RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY) .with_drop_on_completion(permit) diff --git a/src/block/resync.rs b/src/block/resync.rs index 947c68de..b476a0b8 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -377,7 +377,10 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager.replication.storage_nodes(hash); + let mut who = manager + .system + .cluster_layout() + .current_storage_nodes_of(hash); if who.len() < manager.replication.write_quorum() { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } @@ -455,6 +458,25 @@ impl BlockResyncManager { } if rc.is_nonzero() && !exists { + // The refcount is > 0, and the block is not present locally. + // We might need to fetch it from another node. + + // First, check whether we are still supposed to store that + // block in the latest cluster layout version. + let storage_nodes = manager + .system + .cluster_layout() + .current_storage_nodes_of(&hash); + + if !storage_nodes.contains(&manager.system.id) { + info!( + "Resync block {:?}: block is absent with refcount > 0, but it will drop to zero after all metadata is synced. Not fetching the block.", + hash + ); + return Ok(()); + } + + // We know we need the block. Fetch it. info!( "Resync block {:?}: fetching absent but needed block (refcount > 0)", hash diff --git a/src/rpc/layout/helper.rs b/src/rpc/layout/helper.rs index 44c826f9..c08a5629 100644 --- a/src/rpc/layout/helper.rs +++ b/src/rpc/layout/helper.rs @@ -219,6 +219,11 @@ impl LayoutHelper { ret } + pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec { + let ver = self.current(); + ver.nodes_of(position, ver.replication_factor).collect() + } + pub fn trackers_hash(&self) -> Hash { self.trackers_hash } diff --git a/src/rpc/rpc_helper.rs b/src/rpc/rpc_helper.rs index b8ca8120..2505c2ce 100644 --- a/src/rpc/rpc_helper.rs +++ b/src/rpc/rpc_helper.rs @@ -540,19 +540,73 @@ impl RpcHelper { // ---- functions not related to MAKING RPCs, but just determining to what nodes // they should be made and in which order ---- + /// Determine to what nodes, and in what order, requests to read a data block + /// should be sent. All nodes in the Vec returned by this function are tried + /// one by one until there is one that returns the block (in block/manager.rs). + /// + /// We want to have the best chance of finding the block in as few requests + /// as possible, and we want to avoid nodes that answer slowly. + /// + /// Note that when there are several active layout versions, the block might + /// be stored only by nodes of the latest version (in case of a block that was + /// written after the layout change), or only by nodes of the oldest active + /// version (for all blocks that were written before). So we have to try nodes + /// of all layout versions. We also want to try nodes of all layout versions + /// fast, so as to optimize the chance of finding the block fast. + /// + /// Therefore, the strategy is the following: + /// + /// 1. ask first all nodes of all currently active layout versions + /// -> ask the preferred node in all layout versions (older to newer), + /// then the second preferred onde in all verions, etc. + /// -> we start by the oldest active layout version first, because a majority + /// of blocks will have been saved before the layout change + /// 2. ask all nodes of historical layout versions, for blocks which have not + /// yet been transferred to their new storage nodes + /// + /// The preference order, for each layout version, is given by `request_order`, + /// based on factors such as nodes being in the same datacenter, + /// having low ping, etc. pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec { let layout = self.0.layout.read().unwrap(); - let mut ret = Vec::with_capacity(12); - let ver_iter = layout - .versions() - .iter() - .rev() - .chain(layout.inner().old_versions.iter().rev()); - for ver in ver_iter { - if ver.version > layout.sync_map_min() { - continue; + // Compute, for each layout version, the set of nodes that might store + // the block, and put them in their preferred order as of `request_order`. + let mut vernodes = layout.versions().iter().map(|ver| { + let nodes = ver.nodes_of(position, ver.replication_factor); + rpc_helper.request_order(layout.current(), nodes) + }); + + let mut ret = if layout.versions().len() == 1 { + // If we have only one active layout version, then these are the + // only nodes we ask in step 1 + vernodes.next().unwrap() + } else { + let vernodes = vernodes.collect::>(); + + let mut nodes = Vec::::with_capacity(12); + for i in 0..layout.current().replication_factor { + for vn in vernodes.iter() { + if let Some(n) = vn.get(i) { + if !nodes.contains(&n) { + if *n == self.0.our_node_id { + // it's always fast (almost free) to ask locally, + // so always put that as first choice + nodes.insert(0, *n); + } else { + nodes.push(*n); + } + } + } + } } + + nodes + }; + + // Second step: add nodes of older layout versions + let old_ver_iter = layout.inner().old_versions.iter().rev(); + for ver in old_ver_iter { let nodes = ver.nodes_of(position, ver.replication_factor); for node in rpc_helper.request_order(layout.current(), nodes) { if !ret.contains(&node) { @@ -560,6 +614,7 @@ impl RpcHelper { } } } + ret }