relocalize logic into block manager
This commit is contained in:
parent
d25e631a4a
commit
c6bed26347
3 changed files with 16 additions and 19 deletions
|
@ -336,6 +336,19 @@ impl BlockManager {
|
|||
Err(err)
|
||||
}
|
||||
|
||||
/// Returns the set of nodes that should store a copy of a given block.
|
||||
/// These are the nodes assigned to the block's hash in the current
|
||||
/// layout version only: since blocks are immutable, we don't need to
|
||||
/// do complex logic when several layour versions are active at once,
|
||||
/// just move them directly to the new nodes.
|
||||
pub(crate) fn storage_nodes_of(&self, hash: &Hash) -> Vec<Uuid> {
|
||||
self.system
|
||||
.cluster_layout()
|
||||
.current()
|
||||
.nodes_of(hash)
|
||||
.collect()
|
||||
}
|
||||
|
||||
// ---- Public interface ----
|
||||
|
||||
/// Ask nodes that might have a block for it, return it as a stream
|
||||
|
@ -368,7 +381,7 @@ impl BlockManager {
|
|||
prevent_compression: bool,
|
||||
order_tag: Option<OrderTag>,
|
||||
) -> Result<(), Error> {
|
||||
let who = self.system.cluster_layout().current_storage_nodes_of(&hash);
|
||||
let who = self.storage_nodes_of(&hash);
|
||||
|
||||
let compression_level = self.compression_level.filter(|_| !prevent_compression);
|
||||
let (header, bytes) = DataBlock::from_buffer(data, compression_level)
|
||||
|
|
|
@ -375,10 +375,7 @@ impl BlockResyncManager {
|
|||
info!("Resync block {:?}: offloading and deleting", hash);
|
||||
let existing_path = existing_path.unwrap();
|
||||
|
||||
let mut who = manager
|
||||
.system
|
||||
.cluster_layout()
|
||||
.current_storage_nodes_of(hash);
|
||||
let mut who = manager.storage_nodes_of(hash);
|
||||
if who.len() < manager.write_quorum {
|
||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||
}
|
||||
|
@ -461,10 +458,7 @@ impl BlockResyncManager {
|
|||
|
||||
// First, check whether we are still supposed to store that
|
||||
// block in the latest cluster layout version.
|
||||
let storage_nodes = manager
|
||||
.system
|
||||
.cluster_layout()
|
||||
.current_storage_nodes_of(&hash);
|
||||
let storage_nodes = manager.storage_nodes_of(&hash);
|
||||
|
||||
if !storage_nodes.contains(&manager.system.id) {
|
||||
info!(
|
||||
|
|
|
@ -186,16 +186,6 @@ impl LayoutHelper {
|
|||
&self.all_nongateway_nodes
|
||||
}
|
||||
|
||||
/// Returns the set of nodes for storing this hash in the current layout version.
|
||||
///
|
||||
/// Used by the block maanger only: data blocks are immutable, so we don't have
|
||||
/// to coordinate between old set of nodes and new set of nodes when layout changes.
|
||||
/// As soon as the layout change is effective, blocks can be moved to the new
|
||||
/// set of nodes.
|
||||
pub fn current_storage_nodes_of(&self, hash: &Hash) -> Vec<Uuid> {
|
||||
self.current().nodes_of(hash).collect()
|
||||
}
|
||||
|
||||
pub fn ack_map_min(&self) -> u64 {
|
||||
self.ack_map_min
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue