NLnet task 3 #667
3 changed files with 27 additions and 8 deletions
|
@ -51,20 +51,37 @@ impl LayoutHelper {
|
||||||
pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
|
pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
|
||||||
layout.cleanup_old_versions();
|
layout.cleanup_old_versions();
|
||||||
|
|
||||||
|
let all_nodes = layout.get_all_nodes();
|
||||||
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
|
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
|
||||||
layout.clamp_update_trackers(&all_nongateway_nodes);
|
|
||||||
|
layout.clamp_update_trackers(&all_nodes);
|
||||||
|
|
||||||
let min_version = layout.min_stored();
|
let min_version = layout.min_stored();
|
||||||
|
|
||||||
|
// ack_map_min is the minimum value of ack_map among all nodes
|
||||||
|
// in the cluster (gateway, non-gateway, current and previous layouts).
|
||||||
|
// It is the highest layout version which all of these nodes have
|
||||||
|
// acknowledged, indicating that they are aware of it and are no
|
||||||
|
// longer processing write operations that did not take it into account.
|
||||||
let ack_map_min = layout
|
let ack_map_min = layout
|
||||||
.update_trackers
|
.update_trackers
|
||||||
.ack_map
|
.ack_map
|
||||||
.min(&all_nongateway_nodes, min_version);
|
.min_among(&all_nodes, min_version);
|
||||||
|
|
||||||
|
// sync_map_min is the minimum value of sync_map among all storage nodes
|
||||||
|
// in the cluster (non-gateway nodes only, current and previous layouts).
|
||||||
|
// It is the highest layout version for which we know that all relevant
|
||||||
|
// storage nodes have fullfilled a sync, and therefore it is safe to
|
||||||
|
// use a read quorum within that layout to ensure consistency.
|
||||||
|
// Gateway nodes are excluded here because they hold no relevant data
|
||||||
|
// (they store the bucket and access key tables, but we don't have
|
||||||
|
// consistency on those).
|
||||||
|
// TODO: this value could take quorums into account instead.
|
||||||
let sync_map_min = layout
|
let sync_map_min = layout
|
||||||
.update_trackers
|
.update_trackers
|
||||||
.sync_map
|
.sync_map
|
||||||
.min(&all_nongateway_nodes, min_version);
|
.min_among(&all_nongateway_nodes, min_version);
|
||||||
|
|
||||||
let all_nodes = layout.get_all_nodes();
|
|
||||||
let trackers_hash = layout.calculate_trackers_hash();
|
let trackers_hash = layout.calculate_trackers_hash();
|
||||||
let staging_hash = layout.calculate_staging_hash();
|
let staging_hash = layout.calculate_staging_hash();
|
||||||
|
|
||||||
|
|
|
@ -77,14 +77,16 @@ impl LayoutHistory {
|
||||||
}
|
}
|
||||||
|
|
||||||
// If there are old versions that no one is reading from anymore,
|
// If there are old versions that no one is reading from anymore,
|
||||||
// remove them
|
// remove them (keep them in self.old_versions).
|
||||||
|
// ASSUMPTION: we only care about where nodes in the current layout version
|
||||||
|
// are reading from, as we assume older nodes are being discarded.
|
||||||
while self.versions.len() > 1 {
|
while self.versions.len() > 1 {
|
||||||
let all_nongateway_nodes = self.get_all_nongateway_nodes();
|
let current_nodes = &self.current().node_id_vec;
|
||||||
let min_version = self.min_stored();
|
let min_version = self.min_stored();
|
||||||
let sync_ack_map_min = self
|
let sync_ack_map_min = self
|
||||||
.update_trackers
|
.update_trackers
|
||||||
.sync_ack_map
|
.sync_ack_map
|
||||||
.min(&all_nongateway_nodes, min_version);
|
.min_among(¤t_nodes, min_version);
|
||||||
if self.min_stored() < sync_ack_map_min {
|
if self.min_stored() < sync_ack_map_min {
|
||||||
let removed = self.versions.remove(0);
|
let removed = self.versions.remove(0);
|
||||||
info!(
|
info!(
|
||||||
|
|
|
@ -408,7 +408,7 @@ impl UpdateTracker {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn min(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
|
pub(crate) fn min_among(&self, storage_nodes: &[Uuid], min_version: u64) -> u64 {
|
||||||
storage_nodes
|
storage_nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|x| self.0.get(x).copied().unwrap_or(min_version))
|
.map(|x| self.0.get(x).copied().unwrap_or(min_version))
|
||||||
|
|
Loading…
Reference in a new issue