layout: split helper in separate file; more precise difference tracking

This commit is contained in:
Alex 2023-11-16 13:26:43 +01:00
parent d4df03424f
commit ad5c6f779f
Signed by: lx
GPG key ID: 0E496D15096376BE
4 changed files with 256 additions and 254 deletions

224
src/rpc/layout/helper.rs Normal file
View file

@ -0,0 +1,224 @@
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use garage_util::data::*;
use super::schema::*;
pub struct LayoutHelper {
layout: Option<LayoutHistory>,
// cached values
ack_map_min: u64,
sync_map_min: u64,
all_nodes: Vec<Uuid>,
all_nongateway_nodes: Vec<Uuid>,
pub(crate) trackers_hash: Hash,
pub(crate) staging_hash: Hash,
// ack lock: counts in-progress write operations for each
// layout version ; we don't increase the ack update tracker
// while this lock is nonzero
pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
}
impl Deref for LayoutHelper {
type Target = LayoutHistory;
fn deref(&self) -> &LayoutHistory {
self.layout()
}
}
impl LayoutHelper {
pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
layout.cleanup_old_versions();
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
layout.clamp_update_trackers(&all_nongateway_nodes);
let min_version = layout.min_stored();
let ack_map_min = layout
.update_trackers
.ack_map
.min(&all_nongateway_nodes, min_version);
let sync_map_min = layout
.update_trackers
.sync_map
.min(&all_nongateway_nodes, min_version);
let all_nodes = layout.get_all_nodes();
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
ack_lock
.entry(layout.current().version)
.or_insert(AtomicUsize::new(0));
LayoutHelper {
layout: Some(layout),
ack_map_min,
sync_map_min,
all_nodes,
all_nongateway_nodes,
trackers_hash,
staging_hash,
ack_lock,
}
}
// ------------------ single updating function --------------
fn layout(&self) -> &LayoutHistory {
self.layout.as_ref().unwrap()
}
pub(crate) fn update<F>(&mut self, f: F) -> bool
where
F: FnOnce(&mut LayoutHistory) -> bool,
{
let changed = f(&mut self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),
);
}
changed
}
// ------------------ read helpers ---------------
pub fn all_nodes(&self) -> &[Uuid] {
&self.all_nodes
}
pub fn all_nongateway_nodes(&self) -> &[Uuid] {
&self.all_nongateway_nodes
}
pub fn all_ack(&self) -> u64 {
self.ack_map_min
}
pub fn sync_versions(&self) -> (u64, u64, u64) {
(
self.layout().current().version,
self.all_ack(),
self.layout().min_stored(),
)
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let sync_min = self.sync_map_min;
let version = self
.layout()
.versions
.iter()
.find(|x| x.version == sync_min)
.or(self.layout().versions.last())
.unwrap();
version
.nodes_of(position, version.replication_factor)
.collect()
}
pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions
.iter()
.map(|x| x.nodes_of(position, x.replication_factor).collect())
.collect()
}
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let mut ret = vec![];
for version in self.layout().versions.iter() {
ret.extend(version.nodes_of(position, version.replication_factor));
}
ret.sort();
ret.dedup();
ret
}
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}
pub fn staging_hash(&self) -> Hash {
self.staging_hash
}
// ------------------ helpers for update tracking ---------------
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version which is not currently
// locked by an in-progress write operation
self.ack_max_free(local_node_id);
// 2. Assume the data on this node is sync'ed up at least to
// the first layout version in the history
self.sync_first(local_node_id);
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(local_node_id);
info!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
}
fn sync_first(&mut self, local_node_id: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version;
self.update(|layout| {
layout
.update_trackers
.sync_map
.set_max(local_node_id, first_version)
});
}
fn sync_ack(&mut self, local_node_id: Uuid) {
let sync_map_min = self.sync_map_min;
self.update(|layout| {
layout
.update_trackers
.sync_ack_map
.set_max(local_node_id, sync_map_min)
});
}
pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
let max_ack = self.max_free_ack();
let changed = self.update(|layout| {
layout
.update_trackers
.ack_map
.set_max(local_node_id, max_ack)
});
if changed {
info!("ack_until updated to {}", max_ack);
}
changed
}
pub(crate) fn max_free_ack(&self) -> u64 {
self.layout()
.versions
.iter()
.map(|x| x.version)
.take_while(|v| {
self.ack_lock
.get(v)
.map(|x| x.load(Ordering::Relaxed) == 0)
.unwrap_or(true)
})
.max()
.unwrap_or(self.min_stored())
}
}

View file

@ -1,7 +1,4 @@
use std::collections::HashMap;
use std::collections::HashSet;
use std::ops::Deref;
use std::sync::atomic::{AtomicUsize, Ordering};
use garage_util::crdt::{Crdt, Lww, LwwMap};
use garage_util::data::*;
@ -11,225 +8,6 @@ use garage_util::error::*;
use super::schema::*;
use super::*;
pub struct LayoutHelper {
layout: Option<LayoutHistory>,
// cached values
ack_map_min: u64,
sync_map_min: u64,
all_nodes: Vec<Uuid>,
all_nongateway_nodes: Vec<Uuid>,
trackers_hash: Hash,
staging_hash: Hash,
// ack lock: counts in-progress write operations for each
// layout version ; we don't increase the ack update tracker
// while this lock is nonzero
pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
}
impl Deref for LayoutHelper {
type Target = LayoutHistory;
fn deref(&self) -> &LayoutHistory {
self.layout()
}
}
impl LayoutHelper {
pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
layout.cleanup_old_versions();
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
layout.clamp_update_trackers(&all_nongateway_nodes);
let min_version = layout.min_stored();
let ack_map_min = layout
.update_trackers
.ack_map
.min(&all_nongateway_nodes, min_version);
let sync_map_min = layout
.update_trackers
.sync_map
.min(&all_nongateway_nodes, min_version);
let all_nodes = layout.get_all_nodes();
let trackers_hash = layout.calculate_trackers_hash();
let staging_hash = layout.calculate_staging_hash();
ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
ack_lock
.entry(layout.current().version)
.or_insert(AtomicUsize::new(0));
LayoutHelper {
layout: Some(layout),
ack_map_min,
sync_map_min,
all_nodes,
all_nongateway_nodes,
trackers_hash,
staging_hash,
ack_lock,
}
}
// ------------------ single updating function --------------
fn layout(&self) -> &LayoutHistory {
self.layout.as_ref().unwrap()
}
pub(crate) fn update<F>(&mut self, f: F) -> bool
where
F: FnOnce(&mut LayoutHistory) -> bool,
{
let changed = f(&mut self.layout.as_mut().unwrap());
if changed {
*self = Self::new(
self.layout.take().unwrap(),
std::mem::take(&mut self.ack_lock),
);
}
changed
}
// ------------------ read helpers ---------------
pub fn all_nodes(&self) -> &[Uuid] {
&self.all_nodes
}
pub fn all_nongateway_nodes(&self) -> &[Uuid] {
&self.all_nongateway_nodes
}
pub fn all_ack(&self) -> u64 {
self.ack_map_min
}
pub fn sync_versions(&self) -> (u64, u64, u64) {
(
self.layout().current().version,
self.all_ack(),
self.layout().min_stored(),
)
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let sync_min = self.sync_map_min;
let version = self
.layout()
.versions
.iter()
.find(|x| x.version == sync_min)
.or(self.layout().versions.last())
.unwrap();
version
.nodes_of(position, version.replication_factor)
.collect()
}
pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.layout()
.versions
.iter()
.map(|x| x.nodes_of(position, x.replication_factor).collect())
.collect()
}
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let mut ret = vec![];
for version in self.layout().versions.iter() {
ret.extend(version.nodes_of(position, version.replication_factor));
}
ret.sort();
ret.dedup();
ret
}
pub fn trackers_hash(&self) -> Hash {
self.trackers_hash
}
pub fn staging_hash(&self) -> Hash {
self.staging_hash
}
// ------------------ helpers for update tracking ---------------
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
// Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version which is not currently
// locked by an in-progress write operation
self.ack_max_free(local_node_id);
// 2. Assume the data on this node is sync'ed up at least to
// the first layout version in the history
self.sync_first(local_node_id);
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self.sync_ack(local_node_id);
info!("ack_map: {:?}", self.update_trackers.ack_map);
info!("sync_map: {:?}", self.update_trackers.sync_map);
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
}
fn sync_first(&mut self, local_node_id: Uuid) {
let first_version = self.versions.first().as_ref().unwrap().version;
self.update(|layout| {
layout
.update_trackers
.sync_map
.set_max(local_node_id, first_version)
});
}
fn sync_ack(&mut self, local_node_id: Uuid) {
let sync_map_min = self.sync_map_min;
self.update(|layout| {
layout
.update_trackers
.sync_ack_map
.set_max(local_node_id, sync_map_min)
});
}
pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
let max_ack = self.max_free_ack();
let changed = self.update(|layout| {
layout
.update_trackers
.ack_map
.set_max(local_node_id, max_ack)
});
if changed {
info!("ack_until updated to {}", max_ack);
}
changed
}
pub(crate) fn max_free_ack(&self) -> u64 {
self.layout()
.versions
.iter()
.map(|x| x.version)
.take_while(|v| {
self.ack_lock
.get(v)
.map(|x| x.load(Ordering::Relaxed) == 0)
.unwrap_or(true)
})
.max()
.unwrap_or(self.min_stored())
}
}
// ----
impl LayoutHistory {
pub fn new(replication_factor: usize) -> Self {
let version = LayoutVersion::new(replication_factor);
@ -270,7 +48,7 @@ impl LayoutHistory {
}
}
fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
pub(crate) fn get_all_nongateway_nodes(&self) -> Vec<Uuid> {
if self.versions.len() == 1 {
self.versions[0].nongateway_nodes().to_vec()
} else {
@ -286,8 +64,21 @@ impl LayoutHistory {
// ---- housekeeping (all invoked by LayoutHelper) ----
fn cleanup_old_versions(&mut self) {
loop {
pub(crate) fn cleanup_old_versions(&mut self) {
// If there are invalid versions before valid versions, remove them
if self.versions.len() > 1 && self.current().check().is_ok() {
while self.versions.len() > 1 && self.versions.first().unwrap().check().is_err() {
let removed = self.versions.remove(0);
info!(
"Layout history: pruning old invalid version {}",
removed.version
);
}
}
// If there are old versions that no one is reading from anymore,
// remove them
while self.versions.len() > 1 {
let all_nongateway_nodes = self.get_all_nongateway_nodes();
let min_version = self.min_stored();
let sync_ack_map_min = self
@ -303,7 +94,7 @@ impl LayoutHistory {
}
}
fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
pub(crate) fn clamp_update_trackers(&mut self, nodes: &[Uuid]) {
let min_v = self.min_stored();
for node in nodes {
self.update_trackers.ack_map.set_max(*node, min_v);
@ -312,11 +103,11 @@ impl LayoutHistory {
}
}
fn calculate_trackers_hash(&self) -> Hash {
pub(crate) fn calculate_trackers_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.update_trackers).unwrap()[..])
}
fn calculate_staging_hash(&self) -> Hash {
pub(crate) fn calculate_staging_hash(&self) -> Hash {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
@ -328,6 +119,7 @@ impl LayoutHistory {
// Add any new versions to history
for v2 in other.versions.iter() {
if let Some(v1) = self.versions.iter().find(|v| v.version == v2.version) {
// Version is already present, check consistency
if v1 != v2 {
error!("Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced.", v2.version);
}
@ -344,24 +136,14 @@ impl LayoutHistory {
}
// Merge trackers
if self.update_trackers != other.update_trackers {
let c = self.update_trackers.merge(&other.update_trackers);
changed = changed || c;
}
// If there are invalid versions before valid versions, remove them,
// and increment update trackers
if self.versions.len() > 1 && self.current().check().is_ok() {
while self.versions.first().unwrap().check().is_err() {
self.versions.remove(0);
changed = true;
}
}
let c = self.update_trackers.merge(&other.update_trackers);
changed = changed || c;
// Merge staged layout changes
if self.staging != other.staging {
let prev_staging = self.staging.clone();
self.staging.merge(&other.staging);
changed = true;
changed = changed || self.staging != prev_staging;
}
changed
@ -390,11 +172,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
.calculate_next_version(&self.staging.get())?;
self.versions.push(new_version);
if self.current().check().is_ok() {
while self.versions.first().unwrap().check().is_err() {
self.versions.remove(0);
}
}
self.cleanup_old_versions();
// Reset the staged layout changes
self.staging.update(LayoutStaging {
@ -415,11 +193,7 @@ To know the correct value of the new layout version, invoke `garage layout show`
}
pub fn check(&self) -> Result<(), String> {
for version in self.versions.iter() {
version.check()?;
}
// TODO: anything more ?
Ok(())
self.current().check()
}
}

View file

@ -184,17 +184,20 @@ impl LayoutManager {
return Some(layout.clone());
}
}
None
}
fn merge_layout_trackers(&self, adv: &UpdateTrackers) -> Option<UpdateTrackers> {
let mut layout = self.layout.write().unwrap();
if layout.update_trackers != *adv {
if layout.update(|l| l.update_trackers.merge(adv)) {
layout.update_trackers(self.node_id);
return Some(layout.update_trackers.clone());
}
}
None
}
@ -284,7 +287,7 @@ impl LayoutManager {
}
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
let layout = self.layout.read().unwrap().clone();
SystemRpc::AdvertiseClusterLayout(layout)
}

View file

@ -1,4 +1,5 @@
mod graph_algo;
mod helper;
mod history;
mod schema;
mod version;
@ -10,7 +11,7 @@ pub mod manager;
// ---- re-exports ----
pub use history::*;
pub use helper::LayoutHelper;
pub use manager::WriteLock;
pub use schema::*;
pub use version::*;