2023-11-09 14:31:59 +00:00
use std ::collections ::HashSet ;
2023-11-08 16:49:06 +00:00
use garage_util ::crdt ::{ Crdt , Lww , LwwMap } ;
use garage_util ::data ::* ;
use garage_util ::encode ::nonversioned_encode ;
use garage_util ::error ::* ;
use super ::schema ::* ;
use super ::* ;
impl LayoutHistory {
pub fn new ( replication_factor : usize ) -> Self {
let version = LayoutVersion ::new ( replication_factor ) ;
2023-11-09 10:19:43 +00:00
let staging = LayoutStaging {
parameters : Lww ::< LayoutParameters > ::new ( version . parameters ) ,
roles : LwwMap ::new ( ) ,
} ;
2023-11-08 16:49:06 +00:00
let mut ret = LayoutHistory {
versions : vec ! [ version ] . into_boxed_slice ( ) . into ( ) ,
update_trackers : Default ::default ( ) ,
2023-11-09 13:53:34 +00:00
trackers_hash : [ 0 u8 ; 32 ] . into ( ) ,
2023-11-09 10:19:43 +00:00
staging : Lww ::raw ( 0 , staging ) ,
2023-11-08 16:49:06 +00:00
staging_hash : [ 0 u8 ; 32 ] . into ( ) ,
} ;
2023-11-09 13:53:34 +00:00
ret . update_hashes ( ) ;
2023-11-08 16:49:06 +00:00
ret
}
pub fn current ( & self ) -> & LayoutVersion {
self . versions . last ( ) . as_ref ( ) . unwrap ( )
}
2023-11-09 14:31:59 +00:00
pub fn all_storage_nodes ( & self ) -> HashSet < Uuid > {
self . versions
. iter ( )
. map ( | x | x . nongateway_nodes ( ) )
. flatten ( )
. collect ::< HashSet < _ > > ( )
}
2023-11-09 14:42:10 +00:00
pub fn update_hashes ( & mut self ) {
2023-11-09 13:53:34 +00:00
self . trackers_hash = self . calculate_trackers_hash ( ) ;
self . staging_hash = self . calculate_staging_hash ( ) ;
}
pub ( crate ) fn calculate_trackers_hash ( & self ) -> Hash {
blake2sum ( & nonversioned_encode ( & self . update_trackers ) . unwrap ( ) [ .. ] )
}
2023-11-08 16:49:06 +00:00
pub ( crate ) fn calculate_staging_hash ( & self ) -> Hash {
2023-11-09 10:19:43 +00:00
blake2sum ( & nonversioned_encode ( & self . staging ) . unwrap ( ) [ .. ] )
2023-11-08 16:49:06 +00:00
}
2023-11-09 14:31:59 +00:00
// ------------------ update tracking ---------------
pub ( crate ) fn update_trackers ( & mut self , node_id : Uuid ) {
// Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version in the history
self . ack_last ( node_id ) ;
// 2. Assume the data on this node is sync'ed up at least to
// the first layout version in the history
self . sync_first ( node_id ) ;
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self . sync_ack ( node_id ) ;
// 4. Cleanup layout versions that are not needed anymore
self . cleanup_old_versions ( ) ;
info! ( " ack_map: {:?} " , self . update_trackers . ack_map ) ;
info! ( " sync_map: {:?} " , self . update_trackers . sync_map ) ;
info! ( " sync_ack_map: {:?} " , self . update_trackers . sync_ack_map ) ;
// Finally, update hashes
self . update_hashes ( ) ;
}
pub ( crate ) fn ack_last ( & mut self , node : Uuid ) {
let last_version = self . current ( ) . version ;
self . update_trackers . ack_map . set_max ( node , last_version ) ;
}
pub ( crate ) fn sync_first ( & mut self , node : Uuid ) {
let first_version = self . versions . first ( ) . as_ref ( ) . unwrap ( ) . version ;
self . update_trackers . sync_map . set_max ( node , first_version ) ;
}
pub ( crate ) fn sync_ack ( & mut self , node : Uuid ) {
self . update_trackers . sync_ack_map . set_max (
node ,
self . calculate_global_min ( & self . update_trackers . sync_map ) ,
) ;
}
pub ( crate ) fn cleanup_old_versions ( & mut self ) {
let min_sync_ack = self . calculate_global_min ( & self . update_trackers . sync_ack_map ) ;
while self . versions . first ( ) . as_ref ( ) . unwrap ( ) . version < min_sync_ack {
self . versions . remove ( 0 ) ;
}
}
pub ( crate ) fn calculate_global_min ( & self , tracker : & UpdateTracker ) -> u64 {
let storage_nodes = self . all_storage_nodes ( ) ;
storage_nodes
. iter ( )
. map ( | x | tracker . 0. get ( x ) . copied ( ) . unwrap_or ( 0 ) )
. min ( )
. unwrap_or ( 0 )
}
2023-11-08 16:49:06 +00:00
// ================== updates to layout, public interface ===================
pub fn merge ( & mut self , other : & LayoutHistory ) -> bool {
let mut changed = false ;
// Add any new versions to history
for v2 in other . versions . iter ( ) {
2023-11-08 18:28:36 +00:00
if let Some ( v1 ) = self . versions . iter ( ) . find ( | v | v . version = = v2 . version ) {
2023-11-08 16:49:06 +00:00
if v1 ! = v2 {
error! ( " Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced. " , v2 . version ) ;
}
2023-11-08 18:28:36 +00:00
} else if self . versions . iter ( ) . all ( | v | v . version ! = v2 . version - 1 ) {
2023-11-08 16:49:06 +00:00
error! (
" Cannot receive new layout version {}, version {} is missing " ,
v2 . version ,
v2 . version - 1
) ;
} else {
2023-11-08 18:28:36 +00:00
self . versions . push ( v2 . clone ( ) ) ;
2023-11-08 16:49:06 +00:00
changed = true ;
}
}
// Merge trackers
2023-11-09 13:53:34 +00:00
if self . update_trackers ! = other . update_trackers {
let c = self . update_trackers . merge ( & other . update_trackers ) ;
changed = changed | | c ;
}
// Merge staged layout changes
if self . staging ! = other . staging {
self . staging . merge ( & other . staging ) ;
changed = true ;
}
2023-11-08 16:49:06 +00:00
changed
}
pub fn apply_staged_changes ( mut self , version : Option < u64 > ) -> Result < ( Self , Message ) , Error > {
match version {
None = > {
let error = r #"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout .
To know the correct value of the new layout version , invoke ` garage layout show ` and review the proposed changes .
" #;
return Err ( Error ::Message ( error . into ( ) ) ) ;
}
Some ( v ) = > {
if v ! = self . current ( ) . version + 1 {
return Err ( Error ::Message ( " Invalid new layout version " . into ( ) ) ) ;
}
}
}
2023-11-09 10:19:43 +00:00
// Compute new version and add it to history
2023-11-08 16:49:06 +00:00
let mut new_version = self . current ( ) . clone ( ) ;
new_version . version + = 1 ;
2023-11-09 10:19:43 +00:00
new_version . roles . merge ( & self . staging . get ( ) . roles ) ;
2023-11-08 16:49:06 +00:00
new_version . roles . retain ( | ( _ , _ , v ) | v . 0. is_some ( ) ) ;
2023-11-09 10:19:43 +00:00
new_version . parameters = * self . staging . get ( ) . parameters . get ( ) ;
2023-11-08 16:49:06 +00:00
let msg = new_version . calculate_partition_assignment ( ) ? ;
2023-11-08 18:28:36 +00:00
self . versions . push ( new_version ) ;
2023-11-08 16:49:06 +00:00
2023-11-09 10:19:43 +00:00
// Reset the staged layout changes
self . staging . update ( LayoutStaging {
parameters : self . staging . get ( ) . parameters . clone ( ) ,
roles : LwwMap ::new ( ) ,
} ) ;
2023-11-09 13:53:34 +00:00
self . update_hashes ( ) ;
2023-11-09 10:19:43 +00:00
2023-11-08 16:49:06 +00:00
Ok ( ( self , msg ) )
}
2023-11-09 10:19:43 +00:00
pub fn revert_staged_changes ( mut self ) -> Result < Self , Error > {
self . staging . update ( LayoutStaging {
parameters : Lww ::new ( self . current ( ) . parameters . clone ( ) ) ,
roles : LwwMap ::new ( ) ,
} ) ;
2023-11-09 13:53:34 +00:00
self . update_hashes ( ) ;
2023-11-08 16:49:06 +00:00
Ok ( self )
}
pub fn check ( & self ) -> Result < ( ) , String > {
// Check that the hash of the staging data is correct
2023-11-09 13:53:34 +00:00
if self . trackers_hash ! = self . calculate_trackers_hash ( ) {
return Err ( " trackers_hash is incorrect " . into ( ) ) ;
}
if self . staging_hash ! = self . calculate_staging_hash ( ) {
2023-11-08 16:49:06 +00:00
return Err ( " staging_hash is incorrect " . into ( ) ) ;
}
2023-11-09 13:53:34 +00:00
for version in self . versions . iter ( ) {
version . check ( ) ? ;
}
2023-11-08 16:49:06 +00:00
2023-11-09 13:53:34 +00:00
// TODO: anythign more ?
Ok ( ( ) )
2023-11-08 16:49:06 +00:00
}
}