2023-11-09 15:31:59 +01:00
use std ::collections ::HashSet ;
2023-11-08 17:49:06 +01:00
use garage_util ::crdt ::{ Crdt , Lww , LwwMap } ;
use garage_util ::data ::* ;
use garage_util ::encode ::nonversioned_encode ;
use garage_util ::error ::* ;
use super ::* ;
2023-12-07 14:27:53 +01:00
use crate ::replication_mode ::ReplicationMode ;
2023-11-08 17:49:06 +01:00
2023-11-15 14:20:50 +01:00
impl LayoutHistory {
pub fn new ( replication_factor : usize ) -> Self {
let version = LayoutVersion ::new ( replication_factor ) ;
let staging = LayoutStaging {
parameters : Lww ::< LayoutParameters > ::new ( version . parameters ) ,
roles : LwwMap ::new ( ) ,
} ;
LayoutHistory {
versions : vec ! [ version ] ,
2023-11-27 11:52:57 +01:00
old_versions : vec ! [ ] ,
2023-11-15 14:20:50 +01:00
update_trackers : Default ::default ( ) ,
staging : Lww ::raw ( 0 , staging ) ,
}
2023-11-09 15:31:59 +01:00
}
2023-11-15 14:20:50 +01:00
// ------------------ who stores what now? ---------------
pub fn current ( & self ) -> & LayoutVersion {
self . versions . last ( ) . as_ref ( ) . unwrap ( )
2023-11-15 13:28:30 +01:00
}
2023-11-15 14:20:50 +01:00
pub fn min_stored ( & self ) -> u64 {
self . versions . first ( ) . as_ref ( ) . unwrap ( ) . version
2023-11-09 15:31:59 +01:00
}
2023-11-15 14:20:50 +01:00
pub fn get_all_nodes ( & self ) -> Vec < Uuid > {
if self . versions . len ( ) = = 1 {
self . versions [ 0 ] . all_nodes ( ) . to_vec ( )
} else {
let set = self
. versions
. iter ( )
. map ( | x | x . all_nodes ( ) )
. flatten ( )
. collect ::< HashSet < _ > > ( ) ;
set . into_iter ( ) . copied ( ) . collect ::< Vec < _ > > ( )
}
2023-11-09 15:31:59 +01:00
}
2023-11-16 13:26:43 +01:00
pub ( crate ) fn get_all_nongateway_nodes ( & self ) -> Vec < Uuid > {
2023-11-15 14:20:50 +01:00
if self . versions . len ( ) = = 1 {
self . versions [ 0 ] . nongateway_nodes ( ) . to_vec ( )
} else {
let set = self
. versions
. iter ( )
. map ( | x | x . nongateway_nodes ( ) )
. flatten ( )
. collect ::< HashSet < _ > > ( ) ;
set . into_iter ( ) . copied ( ) . collect ::< Vec < _ > > ( )
}
2023-11-09 15:31:59 +01:00
}
2023-11-15 14:20:50 +01:00
// ---- housekeeping (all invoked by LayoutHelper) ----
2023-12-07 14:27:53 +01:00
pub ( crate ) fn keep_current_version_only ( & mut self ) {
while self . versions . len ( ) > 1 {
let removed = self . versions . remove ( 0 ) ;
self . old_versions . push ( removed ) ;
}
}
2023-11-16 13:26:43 +01:00
pub ( crate ) fn cleanup_old_versions ( & mut self ) {
// If there are invalid versions before valid versions, remove them
if self . versions . len ( ) > 1 & & self . current ( ) . check ( ) . is_ok ( ) {
while self . versions . len ( ) > 1 & & self . versions . first ( ) . unwrap ( ) . check ( ) . is_err ( ) {
let removed = self . versions . remove ( 0 ) ;
info! (
" Layout history: pruning old invalid version {} " ,
removed . version
) ;
}
}
// If there are old versions that no one is reading from anymore,
2023-12-07 10:30:26 +01:00
// remove them (keep them in self.old_versions).
// ASSUMPTION: we only care about where nodes in the current layout version
// are reading from, as we assume older nodes are being discarded.
2023-11-16 13:26:43 +01:00
while self . versions . len ( ) > 1 {
2023-12-07 10:30:26 +01:00
let current_nodes = & self . current ( ) . node_id_vec ;
2023-11-15 14:20:50 +01:00
let min_version = self . min_stored ( ) ;
let sync_ack_map_min = self
. update_trackers
. sync_ack_map
2023-12-07 10:30:26 +01:00
. min_among ( & current_nodes , min_version ) ;
2023-11-15 14:20:50 +01:00
if self . min_stored ( ) < sync_ack_map_min {
let removed = self . versions . remove ( 0 ) ;
2023-11-27 11:52:57 +01:00
info! (
" Layout history: moving version {} to old_versions " ,
removed . version
) ;
self . old_versions . push ( removed ) ;
2023-11-15 14:20:50 +01:00
} else {
break ;
}
2023-11-09 15:31:59 +01:00
}
2023-11-27 11:52:57 +01:00
while self . old_versions . len ( ) > OLD_VERSION_COUNT {
let removed = self . old_versions . remove ( 0 ) ;
info! ( " Layout history: removing old_version {} " , removed . version ) ;
}
2023-11-09 15:31:59 +01:00
}
2023-11-16 13:26:43 +01:00
pub ( crate ) fn clamp_update_trackers ( & mut self , nodes : & [ Uuid ] ) {
2023-11-15 14:20:50 +01:00
let min_v = self . min_stored ( ) ;
for node in nodes {
self . update_trackers . ack_map . set_max ( * node , min_v ) ;
self . update_trackers . sync_map . set_max ( * node , min_v ) ;
self . update_trackers . sync_ack_map . set_max ( * node , min_v ) ;
}
}
2023-12-07 14:27:53 +01:00
pub ( crate ) fn calculate_sync_map_min_with_quorum (
& self ,
replication_mode : ReplicationMode ,
all_nongateway_nodes : & [ Uuid ] ,
) -> u64 {
// This function calculates the minimum layout version from which
// it is safe to read if we want to maintain read-after-write consistency.
// In the general case the computation can be a bit expensive so
// we try to optimize it in several ways.
// If there is only one layout version, we know that's the one
// we need to read from.
if self . versions . len ( ) = = 1 {
return self . current ( ) . version ;
}
let quorum = replication_mode . write_quorum ( ) ;
let min_version = self . min_stored ( ) ;
let global_min = self
. update_trackers
. sync_map
. min_among ( & all_nongateway_nodes , min_version ) ;
// If the write quorums are equal to the total number of nodes,
// i.e. no writes can succeed while they are not written to all nodes,
// then we must in all case wait for all nodes to complete a sync.
// This is represented by reading from the layout with version
// number global_min, the smallest layout version for which all nodes
// have completed a sync.
if quorum = = self . current ( ) . replication_factor {
return global_min ;
}
// In the general case, we need to look at all write sets for all partitions,
// and find a safe layout version to read for that partition. We then
// take the minimum value among all partition as the safe layout version
// to read in all cases (the layout version to which all reads are directed).
let mut current_min = self . current ( ) . version ;
let mut sets_done = HashSet ::< Vec < Uuid > > ::new ( ) ;
for ( _ , p_hash ) in self . current ( ) . partitions ( ) {
for v in self . versions . iter ( ) {
if v . version = = self . current ( ) . version {
// We don't care about whether nodes in the latest layout version
// have completed a sync or not, as the sync is push-only
// and by definition nodes in the latest layout version do not
// hold data that must be pushed to nodes in the latest layout
// version, since that's the same version (any data that's
// already in the latest version is assumed to have been written
// by an operation that ensured a quorum of writes within
// that version).
continue ;
}
// Determine set of nodes for partition p in layout version v.
// Sort the node set to avoid duplicate computations.
let mut set = v
. nodes_of ( & p_hash , v . replication_factor )
. collect ::< Vec < Uuid > > ( ) ;
set . sort ( ) ;
// If this set was already processed, skip it.
if sets_done . contains ( & set ) {
continue ;
}
// Find the value of the sync update trackers that is the
// highest possible minimum within a quorum of nodes.
let mut sync_values = set
. iter ( )
. map ( | x | self . update_trackers . sync_map . get ( x , min_version ) )
. collect ::< Vec < _ > > ( ) ;
sync_values . sort ( ) ;
let set_min = sync_values [ sync_values . len ( ) - quorum ] ;
if set_min < current_min {
current_min = set_min ;
}
// defavorable case, we know we are at the smallest possible version,
// so we can stop early
assert! ( current_min > = global_min ) ;
if current_min = = global_min {
return current_min ;
}
// Add set to already processed sets
sets_done . insert ( set ) ;
}
}
current_min
}
2023-11-16 13:26:43 +01:00
pub ( crate ) fn calculate_trackers_hash ( & self ) -> Hash {
2023-11-15 14:20:50 +01:00
blake2sum ( & nonversioned_encode ( & self . update_trackers ) . unwrap ( ) [ .. ] )
}
2023-11-16 13:26:43 +01:00
pub ( crate ) fn calculate_staging_hash ( & self ) -> Hash {
2023-11-15 14:20:50 +01:00
blake2sum ( & nonversioned_encode ( & self . staging ) . unwrap ( ) [ .. ] )
}
2023-11-08 17:49:06 +01:00
// ================== updates to layout, public interface ===================
pub fn merge ( & mut self , other : & LayoutHistory ) -> bool {
let mut changed = false ;
// Add any new versions to history
for v2 in other . versions . iter ( ) {
2023-11-08 19:28:36 +01:00
if let Some ( v1 ) = self . versions . iter ( ) . find ( | v | v . version = = v2 . version ) {
2023-11-16 13:26:43 +01:00
// Version is already present, check consistency
2023-11-08 17:49:06 +01:00
if v1 ! = v2 {
error! ( " Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced. " , v2 . version ) ;
}
2023-11-08 19:28:36 +01:00
} else if self . versions . iter ( ) . all ( | v | v . version ! = v2 . version - 1 ) {
2023-11-08 17:49:06 +01:00
error! (
" Cannot receive new layout version {}, version {} is missing " ,
v2 . version ,
v2 . version - 1
) ;
} else {
2023-11-08 19:28:36 +01:00
self . versions . push ( v2 . clone ( ) ) ;
2023-11-08 17:49:06 +01:00
changed = true ;
}
}
// Merge trackers
2023-11-16 13:26:43 +01:00
let c = self . update_trackers . merge ( & other . update_trackers ) ;
changed = changed | | c ;
2023-11-15 13:28:30 +01:00
2023-11-09 14:53:34 +01:00
// Merge staged layout changes
if self . staging ! = other . staging {
2023-11-16 13:26:43 +01:00
let prev_staging = self . staging . clone ( ) ;
2023-11-09 14:53:34 +01:00
self . staging . merge ( & other . staging ) ;
2023-11-16 13:26:43 +01:00
changed = changed | | self . staging ! = prev_staging ;
2023-11-09 14:53:34 +01:00
}
2023-11-08 17:49:06 +01:00
changed
}
pub fn apply_staged_changes ( mut self , version : Option < u64 > ) -> Result < ( Self , Message ) , Error > {
match version {
None = > {
let error = r #"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout .
To know the correct value of the new layout version , invoke ` garage layout show ` and review the proposed changes .
" #;
return Err ( Error ::Message ( error . into ( ) ) ) ;
}
Some ( v ) = > {
if v ! = self . current ( ) . version + 1 {
return Err ( Error ::Message ( " Invalid new layout version " . into ( ) ) ) ;
}
}
}
2023-11-09 11:19:43 +01:00
// Compute new version and add it to history
2023-11-14 12:48:38 +01:00
let ( new_version , msg ) = self
. current ( )
. clone ( )
. calculate_next_version ( & self . staging . get ( ) ) ? ;
2023-11-08 17:49:06 +01:00
2023-11-08 19:28:36 +01:00
self . versions . push ( new_version ) ;
2023-11-16 13:26:43 +01:00
self . cleanup_old_versions ( ) ;
2023-11-08 17:49:06 +01:00
2023-11-09 11:19:43 +01:00
// Reset the staged layout changes
self . staging . update ( LayoutStaging {
parameters : self . staging . get ( ) . parameters . clone ( ) ,
roles : LwwMap ::new ( ) ,
} ) ;
2023-11-08 17:49:06 +01:00
Ok ( ( self , msg ) )
}
2023-11-09 11:19:43 +01:00
pub fn revert_staged_changes ( mut self ) -> Result < Self , Error > {
self . staging . update ( LayoutStaging {
parameters : Lww ::new ( self . current ( ) . parameters . clone ( ) ) ,
roles : LwwMap ::new ( ) ,
} ) ;
2023-11-08 17:49:06 +01:00
Ok ( self )
}
pub fn check ( & self ) -> Result < ( ) , String > {
2023-11-11 13:10:59 +01:00
// TODO: anything more ?
2023-11-16 13:26:43 +01:00
self . current ( ) . check ( )
2023-11-08 17:49:06 +01:00
}
}