2023-11-14 11:48:38 +00:00
use std ::borrow ::Cow ;
2023-11-09 14:31:59 +00:00
use std ::collections ::HashSet ;
2023-11-08 16:49:06 +00:00
use garage_util ::crdt ::{ Crdt , Lww , LwwMap } ;
use garage_util ::data ::* ;
use garage_util ::encode ::nonversioned_encode ;
use garage_util ::error ::* ;
use super ::schema ::* ;
use super ::* ;
impl LayoutHistory {
pub fn new ( replication_factor : usize ) -> Self {
let version = LayoutVersion ::new ( replication_factor ) ;
2023-11-09 10:19:43 +00:00
let staging = LayoutStaging {
parameters : Lww ::< LayoutParameters > ::new ( version . parameters ) ,
roles : LwwMap ::new ( ) ,
} ;
2023-11-08 16:49:06 +00:00
let mut ret = LayoutHistory {
2023-11-11 12:10:59 +00:00
versions : vec ! [ version ] ,
2023-11-08 16:49:06 +00:00
update_trackers : Default ::default ( ) ,
2023-11-09 13:53:34 +00:00
trackers_hash : [ 0 u8 ; 32 ] . into ( ) ,
2023-11-09 10:19:43 +00:00
staging : Lww ::raw ( 0 , staging ) ,
2023-11-08 16:49:06 +00:00
staging_hash : [ 0 u8 ; 32 ] . into ( ) ,
} ;
2023-11-09 13:53:34 +00:00
ret . update_hashes ( ) ;
2023-11-08 16:49:06 +00:00
ret
}
pub fn current ( & self ) -> & LayoutVersion {
self . versions . last ( ) . as_ref ( ) . unwrap ( )
}
2023-11-09 14:42:10 +00:00
pub fn update_hashes ( & mut self ) {
2023-11-09 13:53:34 +00:00
self . trackers_hash = self . calculate_trackers_hash ( ) ;
self . staging_hash = self . calculate_staging_hash ( ) ;
}
pub ( crate ) fn calculate_trackers_hash ( & self ) -> Hash {
blake2sum ( & nonversioned_encode ( & self . update_trackers ) . unwrap ( ) [ .. ] )
}
2023-11-08 16:49:06 +00:00
pub ( crate ) fn calculate_staging_hash ( & self ) -> Hash {
2023-11-09 10:19:43 +00:00
blake2sum ( & nonversioned_encode ( & self . staging ) . unwrap ( ) [ .. ] )
2023-11-08 16:49:06 +00:00
}
2023-11-09 15:32:31 +00:00
// ------------------ who stores what now? ---------------
2023-11-11 11:08:32 +00:00
pub fn all_ack ( & self ) -> u64 {
2023-11-09 15:32:31 +00:00
self . calculate_global_min ( & self . update_trackers . ack_map )
}
2023-11-11 11:08:32 +00:00
pub fn min_stored ( & self ) -> u64 {
self . versions . first ( ) . as_ref ( ) . unwrap ( ) . version
}
pub fn sync_versions ( & self ) -> ( u64 , u64 , u64 ) {
( self . current ( ) . version , self . all_ack ( ) , self . min_stored ( ) )
}
2023-11-14 12:06:16 +00:00
pub fn all_nodes ( & self ) -> Cow < '_ , [ Uuid ] > {
// TODO: cache this
if self . versions . len ( ) = = 1 {
self . versions [ 0 ] . all_nodes ( ) . into ( )
} else {
let set = self
. versions
. iter ( )
. map ( | x | x . all_nodes ( ) )
. flatten ( )
. collect ::< HashSet < _ > > ( ) ;
set . into_iter ( ) . copied ( ) . collect ::< Vec < _ > > ( ) . into ( )
}
}
2023-11-14 11:48:38 +00:00
pub fn all_nongateway_nodes ( & self ) -> Cow < '_ , [ Uuid ] > {
2023-11-09 15:32:31 +00:00
// TODO: cache this
2023-11-14 11:48:38 +00:00
if self . versions . len ( ) = = 1 {
self . versions [ 0 ] . nongateway_nodes ( ) . into ( )
} else {
let set = self
. versions
. iter ( )
. map ( | x | x . nongateway_nodes ( ) )
. flatten ( )
. collect ::< HashSet < _ > > ( ) ;
set . into_iter ( ) . copied ( ) . collect ::< Vec < _ > > ( ) . into ( )
}
2023-11-09 15:32:31 +00:00
}
pub fn read_nodes_of ( & self , position : & Hash ) -> Vec < Uuid > {
let sync_min = self . calculate_global_min ( & self . update_trackers . sync_map ) ;
let version = self
. versions
. iter ( )
. find ( | x | x . version = = sync_min )
. or ( self . versions . last ( ) )
. unwrap ( ) ;
version . nodes_of ( position , version . replication_factor )
}
2023-11-11 11:08:32 +00:00
pub fn write_sets_of < ' a > ( & ' a self , position : & ' a Hash ) -> impl Iterator < Item = Vec < Uuid > > + ' a {
2023-11-09 15:32:31 +00:00
self . versions
. iter ( )
2023-11-11 11:08:32 +00:00
. map ( move | x | x . nodes_of ( position , x . replication_factor ) )
2023-11-09 15:32:31 +00:00
}
2023-11-09 14:31:59 +00:00
// ------------------ update tracking ---------------
pub ( crate ) fn update_trackers ( & mut self , node_id : Uuid ) {
// Ensure trackers for this node's values are up-to-date
// 1. Acknowledge the last layout version in the history
self . ack_last ( node_id ) ;
// 2. Assume the data on this node is sync'ed up at least to
// the first layout version in the history
self . sync_first ( node_id ) ;
// 3. Acknowledge everyone has synced up to min(self.sync_map)
self . sync_ack ( node_id ) ;
// 4. Cleanup layout versions that are not needed anymore
self . cleanup_old_versions ( ) ;
info! ( " ack_map: {:?} " , self . update_trackers . ack_map ) ;
info! ( " sync_map: {:?} " , self . update_trackers . sync_map ) ;
info! ( " sync_ack_map: {:?} " , self . update_trackers . sync_ack_map ) ;
// Finally, update hashes
self . update_hashes ( ) ;
}
pub ( crate ) fn ack_last ( & mut self , node : Uuid ) {
let last_version = self . current ( ) . version ;
self . update_trackers . ack_map . set_max ( node , last_version ) ;
}
pub ( crate ) fn sync_first ( & mut self , node : Uuid ) {
let first_version = self . versions . first ( ) . as_ref ( ) . unwrap ( ) . version ;
self . update_trackers . sync_map . set_max ( node , first_version ) ;
}
pub ( crate ) fn sync_ack ( & mut self , node : Uuid ) {
self . update_trackers . sync_ack_map . set_max (
node ,
self . calculate_global_min ( & self . update_trackers . sync_map ) ,
) ;
}
pub ( crate ) fn cleanup_old_versions ( & mut self ) {
let min_sync_ack = self . calculate_global_min ( & self . update_trackers . sync_ack_map ) ;
while self . versions . first ( ) . as_ref ( ) . unwrap ( ) . version < min_sync_ack {
2023-11-11 11:37:33 +00:00
let removed = self . versions . remove ( 0 ) ;
info! ( " Layout history: pruning old version {} " , removed . version ) ;
2023-11-09 14:31:59 +00:00
}
}
pub ( crate ) fn calculate_global_min ( & self , tracker : & UpdateTracker ) -> u64 {
2023-11-11 11:08:32 +00:00
// TODO: for TableFullReplication, counting gateway nodes might be
// necessary? Think about this more.
let storage_nodes = self . all_nongateway_nodes ( ) ;
2023-11-09 14:31:59 +00:00
storage_nodes
. iter ( )
. map ( | x | tracker . 0. get ( x ) . copied ( ) . unwrap_or ( 0 ) )
. min ( )
. unwrap_or ( 0 )
}
2023-11-08 16:49:06 +00:00
// ================== updates to layout, public interface ===================
pub fn merge ( & mut self , other : & LayoutHistory ) -> bool {
let mut changed = false ;
// Add any new versions to history
for v2 in other . versions . iter ( ) {
2023-11-08 18:28:36 +00:00
if let Some ( v1 ) = self . versions . iter ( ) . find ( | v | v . version = = v2 . version ) {
2023-11-08 16:49:06 +00:00
if v1 ! = v2 {
error! ( " Inconsistent layout histories: different layout compositions for version {}. Your cluster will be broken as long as this layout version is not replaced. " , v2 . version ) ;
}
2023-11-08 18:28:36 +00:00
} else if self . versions . iter ( ) . all ( | v | v . version ! = v2 . version - 1 ) {
2023-11-08 16:49:06 +00:00
error! (
" Cannot receive new layout version {}, version {} is missing " ,
v2 . version ,
v2 . version - 1
) ;
} else {
2023-11-08 18:28:36 +00:00
self . versions . push ( v2 . clone ( ) ) ;
2023-11-08 16:49:06 +00:00
changed = true ;
}
}
// Merge trackers
2023-11-09 13:53:34 +00:00
if self . update_trackers ! = other . update_trackers {
let c = self . update_trackers . merge ( & other . update_trackers ) ;
changed = changed | | c ;
}
// Merge staged layout changes
if self . staging ! = other . staging {
self . staging . merge ( & other . staging ) ;
changed = true ;
}
2023-11-08 16:49:06 +00:00
changed
}
pub fn apply_staged_changes ( mut self , version : Option < u64 > ) -> Result < ( Self , Message ) , Error > {
match version {
None = > {
let error = r #"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout .
To know the correct value of the new layout version , invoke ` garage layout show ` and review the proposed changes .
" #;
return Err ( Error ::Message ( error . into ( ) ) ) ;
}
Some ( v ) = > {
if v ! = self . current ( ) . version + 1 {
return Err ( Error ::Message ( " Invalid new layout version " . into ( ) ) ) ;
}
}
}
2023-11-09 10:19:43 +00:00
// Compute new version and add it to history
2023-11-14 11:48:38 +00:00
let ( new_version , msg ) = self
. current ( )
. clone ( )
. calculate_next_version ( & self . staging . get ( ) ) ? ;
2023-11-08 16:49:06 +00:00
2023-11-08 18:28:36 +00:00
self . versions . push ( new_version ) ;
2023-11-11 12:10:59 +00:00
if self . current ( ) . check ( ) . is_ok ( ) {
while self . versions . first ( ) . unwrap ( ) . check ( ) . is_err ( ) {
self . versions . remove ( 0 ) ;
}
}
2023-11-08 16:49:06 +00:00
2023-11-09 10:19:43 +00:00
// Reset the staged layout changes
self . staging . update ( LayoutStaging {
parameters : self . staging . get ( ) . parameters . clone ( ) ,
roles : LwwMap ::new ( ) ,
} ) ;
2023-11-09 13:53:34 +00:00
self . update_hashes ( ) ;
2023-11-09 10:19:43 +00:00
2023-11-08 16:49:06 +00:00
Ok ( ( self , msg ) )
}
2023-11-09 10:19:43 +00:00
pub fn revert_staged_changes ( mut self ) -> Result < Self , Error > {
self . staging . update ( LayoutStaging {
parameters : Lww ::new ( self . current ( ) . parameters . clone ( ) ) ,
roles : LwwMap ::new ( ) ,
} ) ;
2023-11-09 13:53:34 +00:00
self . update_hashes ( ) ;
2023-11-08 16:49:06 +00:00
Ok ( self )
}
pub fn check ( & self ) -> Result < ( ) , String > {
// Check that the hash of the staging data is correct
2023-11-09 13:53:34 +00:00
if self . trackers_hash ! = self . calculate_trackers_hash ( ) {
return Err ( " trackers_hash is incorrect " . into ( ) ) ;
}
if self . staging_hash ! = self . calculate_staging_hash ( ) {
2023-11-08 16:49:06 +00:00
return Err ( " staging_hash is incorrect " . into ( ) ) ;
}
2023-11-09 13:53:34 +00:00
for version in self . versions . iter ( ) {
version . check ( ) ? ;
}
2023-11-08 16:49:06 +00:00
2023-11-11 12:10:59 +00:00
// TODO: anything more ?
2023-11-09 13:53:34 +00:00
Ok ( ( ) )
2023-11-08 16:49:06 +00:00
}
}