layout: begin managing the update tracker values
This commit is contained in:
parent
94caf9c0c1
commit
03ebf18830
4 changed files with 109 additions and 33 deletions
|
@ -1,3 +1,5 @@
|
|||
use std::collections::HashSet;
|
||||
|
||||
use garage_util::crdt::{Crdt, Lww, LwwMap};
|
||||
use garage_util::data::*;
|
||||
use garage_util::encode::nonversioned_encode;
|
||||
|
@ -30,6 +32,14 @@ impl LayoutHistory {
|
|||
self.versions.last().as_ref().unwrap()
|
||||
}
|
||||
|
||||
pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
|
||||
self.versions
|
||||
.iter()
|
||||
.map(|x| x.nongateway_nodes())
|
||||
.flatten()
|
||||
.collect::<HashSet<_>>()
|
||||
}
|
||||
|
||||
pub(crate) fn update_hashes(&mut self) {
|
||||
self.trackers_hash = self.calculate_trackers_hash();
|
||||
self.staging_hash = self.calculate_staging_hash();
|
||||
|
@ -43,6 +53,65 @@ impl LayoutHistory {
|
|||
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
|
||||
}
|
||||
|
||||
// ------------------ update tracking ---------------
|
||||
|
||||
pub(crate) fn update_trackers(&mut self, node_id: Uuid) {
|
||||
// Ensure trackers for this node's values are up-to-date
|
||||
|
||||
// 1. Acknowledge the last layout version in the history
|
||||
self.ack_last(node_id);
|
||||
|
||||
// 2. Assume the data on this node is sync'ed up at least to
|
||||
// the first layout version in the history
|
||||
self.sync_first(node_id);
|
||||
|
||||
// 3. Acknowledge everyone has synced up to min(self.sync_map)
|
||||
self.sync_ack(node_id);
|
||||
|
||||
// 4. Cleanup layout versions that are not needed anymore
|
||||
self.cleanup_old_versions();
|
||||
|
||||
info!("ack_map: {:?}", self.update_trackers.ack_map);
|
||||
info!("sync_map: {:?}", self.update_trackers.sync_map);
|
||||
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
|
||||
|
||||
// Finally, update hashes
|
||||
self.update_hashes();
|
||||
}
|
||||
|
||||
pub(crate) fn ack_last(&mut self, node: Uuid) {
|
||||
let last_version = self.current().version;
|
||||
self.update_trackers.ack_map.set_max(node, last_version);
|
||||
}
|
||||
|
||||
pub(crate) fn sync_first(&mut self, node: Uuid) {
|
||||
let first_version = self.versions.first().as_ref().unwrap().version;
|
||||
self.update_trackers.sync_map.set_max(node, first_version);
|
||||
}
|
||||
|
||||
pub(crate) fn sync_ack(&mut self, node: Uuid) {
|
||||
self.update_trackers.sync_ack_map.set_max(
|
||||
node,
|
||||
self.calculate_global_min(&self.update_trackers.sync_map),
|
||||
);
|
||||
}
|
||||
|
||||
pub(crate) fn cleanup_old_versions(&mut self) {
|
||||
let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map);
|
||||
while self.versions.first().as_ref().unwrap().version < min_sync_ack {
|
||||
self.versions.remove(0);
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
|
||||
let storage_nodes = self.all_storage_nodes();
|
||||
storage_nodes
|
||||
.iter()
|
||||
.map(|x| tracker.0.get(x).copied().unwrap_or(0))
|
||||
.min()
|
||||
.unwrap_or(0)
|
||||
}
|
||||
|
||||
// ================== updates to layout, public interface ===================
|
||||
|
||||
pub fn merge(&mut self, other: &LayoutHistory) -> bool {
|
||||
|
@ -78,11 +147,6 @@ impl LayoutHistory {
|
|||
changed = true;
|
||||
}
|
||||
|
||||
// Update hashes if there are changes
|
||||
if changed {
|
||||
self.update_hashes();
|
||||
}
|
||||
|
||||
changed
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ impl LayoutManager {
|
|||
let persist_cluster_layout: Persister<LayoutHistory> =
|
||||
Persister::new(&config.metadata_dir, "cluster_layout");
|
||||
|
||||
let cluster_layout = match persist_cluster_layout.load() {
|
||||
let mut cluster_layout = match persist_cluster_layout.load() {
|
||||
Ok(x) => {
|
||||
if x.current().replication_factor != replication_factor {
|
||||
return Err(Error::Message(format!(
|
||||
|
@ -71,6 +71,8 @@ impl LayoutManager {
|
|||
}
|
||||
};
|
||||
|
||||
cluster_layout.update_trackers(node_id.into());
|
||||
|
||||
let layout = Arc::new(RwLock::new(cluster_layout));
|
||||
let change_notify = Arc::new(Notify::new());
|
||||
|
||||
|
@ -126,7 +128,7 @@ impl LayoutManager {
|
|||
if prev_layout_check && layout.check().is_err() {
|
||||
panic!("Merged two correct layouts and got an incorrect layout.");
|
||||
}
|
||||
|
||||
layout.update_trackers(self.node_id);
|
||||
return Some(layout.clone());
|
||||
}
|
||||
}
|
||||
|
@ -137,6 +139,7 @@ impl LayoutManager {
|
|||
let mut layout = self.layout.write().unwrap();
|
||||
if layout.update_trackers != *adv {
|
||||
if layout.update_trackers.merge(adv) {
|
||||
layout.update_trackers(self.node_id);
|
||||
return Some(layout.update_trackers.clone());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -3,6 +3,7 @@ use std::fmt;
|
|||
use bytesize::ByteSize;
|
||||
|
||||
use garage_util::crdt::{AutoCrdt, Crdt};
|
||||
use garage_util::data::Uuid;
|
||||
|
||||
mod v08 {
|
||||
use crate::layout::CompactNodeType;
|
||||
|
@ -276,8 +277,7 @@ mod v010 {
|
|||
let update_tracker = UpdateTracker(
|
||||
version
|
||||
.nongateway_nodes()
|
||||
.iter()
|
||||
.map(|x| (*x, version.version))
|
||||
.map(|x| (x, version.version))
|
||||
.collect::<HashMap<Uuid, u64>>(),
|
||||
);
|
||||
let staging = LayoutStaging {
|
||||
|
@ -375,8 +375,15 @@ impl UpdateTracker {
|
|||
changed
|
||||
}
|
||||
|
||||
pub(crate) fn min(&self) -> u64 {
|
||||
self.0.iter().map(|(_, v)| *v).min().unwrap_or(0)
|
||||
pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) {
|
||||
match self.0.get_mut(&peer) {
|
||||
Some(e) => {
|
||||
*e = std::cmp::max(*e, value);
|
||||
}
|
||||
None => {
|
||||
self.0.insert(peer, value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -134,15 +134,14 @@ impl LayoutVersion {
|
|||
// ===================== internal information extractors ======================
|
||||
|
||||
/// Returns the uuids of the non_gateway nodes in self.node_id_vec.
|
||||
pub(crate) fn nongateway_nodes(&self) -> Vec<Uuid> {
|
||||
let mut result = Vec::<Uuid>::new();
|
||||
for uuid in self.node_id_vec.iter() {
|
||||
match self.node_role(uuid) {
|
||||
Some(role) if role.capacity.is_some() => result.push(*uuid),
|
||||
_ => (),
|
||||
}
|
||||
}
|
||||
result
|
||||
pub(crate) fn nongateway_nodes(&self) -> impl Iterator<Item = Uuid> + '_ {
|
||||
self.node_id_vec
|
||||
.iter()
|
||||
.copied()
|
||||
.filter(move |uuid| match self.node_role(uuid) {
|
||||
Some(role) if role.capacity.is_some() => true,
|
||||
_ => false,
|
||||
})
|
||||
}
|
||||
|
||||
/// Given a node uuids, this function returns the label of its zone
|
||||
|
@ -158,8 +157,8 @@ impl LayoutVersion {
|
|||
/// Returns the sum of capacities of non gateway nodes in the cluster
|
||||
fn get_total_capacity(&self) -> Result<u64, Error> {
|
||||
let mut total_capacity = 0;
|
||||
for uuid in self.nongateway_nodes().iter() {
|
||||
total_capacity += self.get_node_capacity(uuid)?;
|
||||
for uuid in self.nongateway_nodes() {
|
||||
total_capacity += self.get_node_capacity(&uuid)?;
|
||||
}
|
||||
Ok(total_capacity)
|
||||
}
|
||||
|
@ -320,7 +319,7 @@ impl LayoutVersion {
|
|||
// to use them as indices in the flow graphs.
|
||||
let (id_to_zone, zone_to_id) = self.generate_nongateway_zone_ids()?;
|
||||
|
||||
let nb_nongateway_nodes = self.nongateway_nodes().len();
|
||||
let nb_nongateway_nodes = self.nongateway_nodes().count();
|
||||
if nb_nongateway_nodes < self.replication_factor {
|
||||
return Err(Error::Message(format!(
|
||||
"The number of nodes with positive \
|
||||
|
@ -479,7 +478,8 @@ impl LayoutVersion {
|
|||
let mut id_to_zone = Vec::<String>::new();
|
||||
let mut zone_to_id = HashMap::<String, usize>::new();
|
||||
|
||||
for uuid in self.nongateway_nodes().iter() {
|
||||
let nongateway_nodes = self.nongateway_nodes().collect::<Vec<_>>();
|
||||
for uuid in nongateway_nodes.iter() {
|
||||
let r = self.node_role(uuid).unwrap();
|
||||
if !zone_to_id.contains_key(&r.zone) && r.capacity.is_some() {
|
||||
zone_to_id.insert(r.zone.clone(), id_to_zone.len());
|
||||
|
@ -556,8 +556,10 @@ impl LayoutVersion {
|
|||
exclude_assoc: &HashSet<(usize, usize)>,
|
||||
zone_redundancy: usize,
|
||||
) -> Result<Graph<FlowEdge>, Error> {
|
||||
let vertices =
|
||||
LayoutVersion::generate_graph_vertices(zone_to_id.len(), self.nongateway_nodes().len());
|
||||
let vertices = LayoutVersion::generate_graph_vertices(
|
||||
zone_to_id.len(),
|
||||
self.nongateway_nodes().count(),
|
||||
);
|
||||
let mut g = Graph::<FlowEdge>::new(&vertices);
|
||||
let nb_zones = zone_to_id.len();
|
||||
for p in 0..NB_PARTITIONS {
|
||||
|
@ -576,7 +578,7 @@ impl LayoutVersion {
|
|||
)?;
|
||||
}
|
||||
}
|
||||
for n in 0..self.nongateway_nodes().len() {
|
||||
for n in 0..self.nongateway_nodes().count() {
|
||||
let node_capacity = self.get_node_capacity(&self.node_id_vec[n])?;
|
||||
let node_zone = zone_to_id[self.get_node_zone(&self.node_id_vec[n])?];
|
||||
g.add_edge(Vertex::N(n), Vertex::Sink, node_capacity / partition_size)?;
|
||||
|
@ -600,7 +602,7 @@ impl LayoutVersion {
|
|||
// previous assignment
|
||||
let mut exclude_edge = HashSet::<(usize, usize)>::new();
|
||||
if let Some(prev_assign) = prev_assign_opt {
|
||||
let nb_nodes = self.nongateway_nodes().len();
|
||||
let nb_nodes = self.nongateway_nodes().count();
|
||||
for (p, prev_assign_p) in prev_assign.iter().enumerate() {
|
||||
for n in 0..nb_nodes {
|
||||
exclude_edge.insert((p, n));
|
||||
|
@ -652,7 +654,7 @@ impl LayoutVersion {
|
|||
// We compute the maximal length of a simple path in gflow. It is used in the
|
||||
// Bellman-Ford algorithm in optimize_flow_with_cost to set the number
|
||||
// of iterations.
|
||||
let nb_nodes = self.nongateway_nodes().len();
|
||||
let nb_nodes = self.nongateway_nodes().count();
|
||||
let path_length = 4 * nb_nodes;
|
||||
gflow.optimize_flow_with_cost(&cost, path_length)?;
|
||||
|
||||
|
@ -730,7 +732,7 @@ impl LayoutVersion {
|
|||
}
|
||||
|
||||
// We define and fill in the following tables
|
||||
let storing_nodes = self.nongateway_nodes();
|
||||
let storing_nodes = self.nongateway_nodes().collect::<Vec<_>>();
|
||||
let mut new_partitions = vec![0; storing_nodes.len()];
|
||||
let mut stored_partitions = vec![0; storing_nodes.len()];
|
||||
|
||||
|
@ -873,9 +875,9 @@ mod tests {
|
|||
for z in zones.iter() {
|
||||
zone_token.insert(z.clone(), 0);
|
||||
}
|
||||
for uuid in cl.nongateway_nodes().iter() {
|
||||
let z = cl.get_node_zone(uuid)?;
|
||||
let c = cl.get_node_capacity(uuid)?;
|
||||
for uuid in cl.nongateway_nodes() {
|
||||
let z = cl.get_node_zone(&uuid)?;
|
||||
let c = cl.get_node_capacity(&uuid)?;
|
||||
zone_token.insert(
|
||||
z.clone(),
|
||||
zone_token[&z] + min(NB_PARTITIONS, (c / over_size) as usize),
|
||||
|
|
Loading…
Reference in a new issue