NLnet task 3 #667

Merged
lx merged 60 commits from nlnet-task3 into next-0.10 2024-01-11 10:58:08 +00:00
3 changed files with 57 additions and 31 deletions
Showing only changes of commit 65066c7064 - Show all commits

View file

@ -9,6 +9,7 @@ use garage_util::error::*;
use super::schema::*; use super::schema::*;
use super::*; use super::*;
impl LayoutHistory { impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self { pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor); let version = LayoutVersion::new(replication_factor);
@ -49,7 +50,7 @@ impl LayoutHistory {
// ------------------ who stores what now? --------------- // ------------------ who stores what now? ---------------
pub fn all_ack(&self) -> u64 { pub fn all_ack(&self) -> u64 {
self.calculate_global_min(&self.update_trackers.ack_map) self.update_trackers.ack_map.current_min
} }
pub fn min_stored(&self) -> u64 { pub fn min_stored(&self) -> u64 {
@ -91,7 +92,7 @@ impl LayoutHistory {
} }
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> { pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let sync_min = self.calculate_global_min(&self.update_trackers.sync_map); let sync_min = self.update_trackers.sync_map.current_min;
let version = self let version = self
.versions .versions
.iter() .iter()
@ -122,7 +123,7 @@ impl LayoutHistory {
// ------------------ update tracking --------------- // ------------------ update tracking ---------------
pub(crate) fn update_trackers(&mut self, node_id: Uuid) { pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date // Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version in the history // 1. Acknowledge the last layout version in the history
@ -138,6 +139,9 @@ impl LayoutHistory {
// 4. Cleanup layout versions that are not needed anymore // 4. Cleanup layout versions that are not needed anymore
self.cleanup_old_versions(); self.cleanup_old_versions();
// 5. Recalculate global minima
self.update_trackers_min();
info!("ack_map: {:?}", self.update_trackers.ack_map); info!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map); info!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map); info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
@ -146,42 +150,41 @@ impl LayoutHistory {
self.update_hashes(); self.update_hashes();
} }
fn update_trackers_min(&mut self) {
// TODO: for TableFullReplication, counting gateway nodes might be
// necessary? Think about this more.
let storage_nodes = self.all_nongateway_nodes().into_owned();
let min_version = self.versions.first().unwrap().version;
self.update_trackers.update_min(&storage_nodes, min_version);
}
pub(crate) fn ack_last(&mut self, node: Uuid) { pub(crate) fn ack_last(&mut self, node: Uuid) {
let last_version = self.current().version; let last_version = self.current().version;
self.update_trackers.ack_map.set_max(node, last_version); self.update_trackers.ack_map.set_max(node, last_version);
self.update_trackers_min();
} }
pub(crate) fn sync_first(&mut self, node: Uuid) { pub(crate) fn sync_first(&mut self, node: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version; let first_version = self.versions.first().as_ref().unwrap().version;
self.update_trackers.sync_map.set_max(node, first_version); self.update_trackers.sync_map.set_max(node, first_version);
self.update_trackers_min();
} }
pub(crate) fn sync_ack(&mut self, node: Uuid) { pub(crate) fn sync_ack(&mut self, node: Uuid) {
self.update_trackers.sync_ack_map.set_max( self.update_trackers
node, .sync_ack_map
self.calculate_global_min(&self.update_trackers.sync_map), .set_max(node, self.update_trackers.sync_map.current_min);
); self.update_trackers_min();
} }
pub(crate) fn cleanup_old_versions(&mut self) { pub(crate) fn cleanup_old_versions(&mut self) {
let min_sync_ack = self.calculate_global_min(&self.update_trackers.sync_ack_map); let min_sync_ack = self.update_trackers.sync_ack_map.current_min;
while self.versions.first().as_ref().unwrap().version < min_sync_ack { while self.versions.first().as_ref().unwrap().version < min_sync_ack {
let removed = self.versions.remove(0); let removed = self.versions.remove(0);
info!("Layout history: pruning old version {}", removed.version); info!("Layout history: pruning old version {}", removed.version);
} }
} }
pub(crate) fn calculate_global_min(&self, tracker: &UpdateTracker) -> u64 {
// TODO: for TableFullReplication, counting gateway nodes might be
// necessary? Think about this more.
let storage_nodes = self.all_nongateway_nodes();
storage_nodes
.iter()
.map(|x| tracker.0.get(x).copied().unwrap_or(0))
.min()
.unwrap_or(0)
}
// ================== updates to layout, public interface =================== // ================== updates to layout, public interface ===================
pub fn merge(&mut self, other: &LayoutHistory) -> bool { pub fn merge(&mut self, other: &LayoutHistory) -> bool {
@ -229,6 +232,11 @@ impl LayoutHistory {
} }
} }
// Update the current_min value in trackers if anything changed
if changed {
self.update_trackers_min();
}
// Merge staged layout changes // Merge staged layout changes
if self.staging != other.staging { if self.staging != other.staging {
self.staging.merge(&other.staging); self.staging.merge(&other.staging);

View file

@ -74,7 +74,7 @@ impl LayoutManager {
} }
}; };
cluster_layout.update_trackers(node_id.into()); cluster_layout.update_trackers_of(node_id.into());
let layout = Arc::new(RwLock::new(cluster_layout)); let layout = Arc::new(RwLock::new(cluster_layout));
let change_notify = Arc::new(Notify::new()); let change_notify = Arc::new(Notify::new());
@ -158,7 +158,7 @@ impl LayoutManager {
if !prev_layout_check || adv.check().is_ok() { if !prev_layout_check || adv.check().is_ok() {
if layout.merge(adv) { if layout.merge(adv) {
layout.update_trackers(self.node_id); layout.update_trackers_of(self.node_id);
if prev_layout_check && layout.check().is_err() { if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout."); panic!("Merged two correct layouts and got an incorrect layout.");
} }
@ -172,7 +172,7 @@ impl LayoutManager {
let mut layout = self.layout.write().unwrap(); let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv { if layout.update_trackers != *adv {
if layout.update_trackers.merge(adv) { if layout.update_trackers.merge(adv) {
layout.update_trackers(self.node_id); layout.update_trackers_of(self.node_id);
return Some(layout.update_trackers.clone()); return Some(layout.update_trackers.clone());
} }
} }

View file

@ -260,7 +260,10 @@ mod v010 {
/// The history of cluster layouts /// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)] #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct UpdateTracker(pub BTreeMap<Uuid, u64>); pub struct UpdateTracker {
pub values: BTreeMap<Uuid, u64>,
pub current_min: u64,
}
impl garage_util::migrate::Migrate for LayoutHistory { impl garage_util::migrate::Migrate for LayoutHistory {
const VERSION_MARKER: &'static [u8] = b"G010lh"; const VERSION_MARKER: &'static [u8] = b"G010lh";
@ -290,14 +293,15 @@ mod v010 {
nongateway_node_count, nongateway_node_count,
ring_assignment_data: previous.ring_assignment_data, ring_assignment_data: previous.ring_assignment_data,
}; };
let update_tracker = UpdateTracker( let update_tracker = UpdateTracker {
version values: version
.nongateway_nodes() .nongateway_nodes()
.iter() .iter()
.copied() .copied()
.map(|x| (x, version.version)) .map(|x| (x, version.version))
.collect::<BTreeMap<Uuid, u64>>(), .collect::<BTreeMap<Uuid, u64>>(),
); current_min: 0,
};
let staging = LayoutStaging { let staging = LayoutStaging {
parameters: previous.staging_parameters, parameters: previous.staging_parameters,
roles: previous.staging_roles, roles: previous.staging_roles,
@ -378,14 +382,14 @@ impl core::str::FromStr for ZoneRedundancy {
impl UpdateTracker { impl UpdateTracker {
fn merge(&mut self, other: &UpdateTracker) -> bool { fn merge(&mut self, other: &UpdateTracker) -> bool {
let mut changed = false; let mut changed = false;
for (k, v) in other.0.iter() { for (k, v) in other.values.iter() {
if let Some(v_mut) = self.0.get_mut(k) { if let Some(v_mut) = self.values.get_mut(k) {
if *v > *v_mut { if *v > *v_mut {
*v_mut = *v; *v_mut = *v;
changed = true; changed = true;
} }
} else { } else {
self.0.insert(*k, *v); self.values.insert(*k, *v);
changed = true; changed = true;
} }
} }
@ -393,18 +397,26 @@ impl UpdateTracker {
} }
pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool { pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
match self.0.get_mut(&peer) { match self.values.get_mut(&peer) {
Some(e) if *e < value => { Some(e) if *e < value => {
*e = value; *e = value;
true true
} }
None => { None => {
self.0.insert(peer, value); self.values.insert(peer, value);
true true
} }
_ => false, _ => false,
} }
} }
fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) {
self.current_min = storage_nodes
.iter()
.map(|x| self.values.get(x).copied().unwrap_or(min_version))
.min()
.unwrap_or(min_version)
}
} }
impl UpdateTrackers { impl UpdateTrackers {
@ -414,4 +426,10 @@ impl UpdateTrackers {
let c3 = self.sync_ack_map.merge(&other.sync_ack_map); let c3 = self.sync_ack_map.merge(&other.sync_ack_map);
c1 || c2 || c3 c1 || c2 || c3
} }
pub(crate) fn update_min(&mut self, storage_nodes: &[Uuid], min_version: u64) {
self.ack_map.update_min(&storage_nodes, min_version);
self.sync_map.update_min(&storage_nodes, min_version);
self.sync_ack_map.update_min(&storage_nodes, min_version);
}
} }