2021-11-09 11:24:04 +00:00
use garage_util ::crdt ::Crdt ;
use garage_util ::data ::* ;
use garage_util ::error ::* ;
use garage_rpc ::layout ::* ;
use garage_rpc ::system ::* ;
use garage_rpc ::* ;
use crate ::cli ::* ;
pub async fn cli_layout_command_dispatch (
cmd : LayoutOperation ,
system_rpc_endpoint : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
) -> Result < ( ) , Error > {
match cmd {
LayoutOperation ::Assign ( configure_opt ) = > {
cmd_assign_role ( system_rpc_endpoint , rpc_host , configure_opt ) . await
}
LayoutOperation ::Remove ( remove_opt ) = > {
cmd_remove_role ( system_rpc_endpoint , rpc_host , remove_opt ) . await
}
LayoutOperation ::Show = > cmd_show_layout ( system_rpc_endpoint , rpc_host ) . await ,
LayoutOperation ::Apply ( apply_opt ) = > {
cmd_apply_layout ( system_rpc_endpoint , rpc_host , apply_opt ) . await
}
LayoutOperation ::Revert ( revert_opt ) = > {
cmd_revert_layout ( system_rpc_endpoint , rpc_host , revert_opt ) . await
}
}
}
pub async fn cmd_assign_role (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
args : AssignRoleOpt ,
) -> Result < ( ) , Error > {
let status = match rpc_cli
. call ( & rpc_host , & SystemRpc ::GetKnownNodes , PRIO_NORMAL )
. await ? ?
{
SystemRpc ::ReturnKnownNodes ( nodes ) = > nodes ,
resp = > return Err ( Error ::Message ( format! ( " Invalid RPC response: {:?} " , resp ) ) ) ,
} ;
let added_node = find_matching_node ( status . iter ( ) . map ( | adv | adv . id ) , & args . node_id ) ? ;
let mut layout = fetch_layout ( rpc_cli , rpc_host ) . await ? ;
let mut roles = layout . roles . clone ( ) ;
roles . merge ( & layout . staging ) ;
for replaced in args . replace . iter ( ) {
let replaced_node = find_matching_node ( layout . node_ids ( ) . iter ( ) . cloned ( ) , replaced ) ? ;
match roles . get ( & replaced_node ) {
Some ( NodeRoleV ( Some ( _ ) ) ) = > {
layout
. staging
. merge ( & roles . update_mutator ( replaced_node , NodeRoleV ( None ) ) ) ;
}
_ = > {
return Err ( Error ::Message ( format! (
" Cannot replace node {:?} as it is not currently in planned layout " ,
replaced_node
) ) ) ;
}
}
}
if args . capacity . is_some ( ) & & args . gateway {
return Err ( Error ::Message (
" -c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node " . into ( ) ) ) ;
}
if args . capacity = = Some ( 0 ) {
return Err ( Error ::Message ( " Invalid capacity value: 0 " . into ( ) ) ) ;
}
let new_entry = match roles . get ( & added_node ) {
Some ( NodeRoleV ( Some ( old ) ) ) = > {
let capacity = match args . capacity {
Some ( c ) = > Some ( c ) ,
None if args . gateway = > None ,
None = > old . capacity ,
} ;
let tags = if args . tags . is_empty ( ) {
old . tags . clone ( )
} else {
args . tags
} ;
NodeRole {
zone : args . zone . unwrap_or_else ( | | old . zone . to_string ( ) ) ,
capacity ,
tags ,
}
}
_ = > {
let capacity = match args . capacity {
Some ( c ) = > Some ( c ) ,
None if args . gateway = > None ,
None = > return Err ( Error ::Message (
" Please specify a capacity with the -c flag, or set node explicitly as gateway with -g " . into ( ) ) ) ,
} ;
NodeRole {
zone : args . zone . ok_or ( " Please specifiy a zone with the -z flag " ) ? ,
capacity ,
tags : args . tags ,
}
}
} ;
layout
. staging
. merge ( & roles . update_mutator ( added_node , NodeRoleV ( Some ( new_entry ) ) ) ) ;
send_layout ( rpc_cli , rpc_host , layout ) . await ? ;
println! ( " Role change is staged but not yet commited. " ) ;
println! ( " Use `garage layout show` to view staged role changes, " ) ;
println! ( " and `garage layout apply` to enact staged changes. " ) ;
Ok ( ( ) )
}
pub async fn cmd_remove_role (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
args : RemoveRoleOpt ,
) -> Result < ( ) , Error > {
let mut layout = fetch_layout ( rpc_cli , rpc_host ) . await ? ;
let mut roles = layout . roles . clone ( ) ;
roles . merge ( & layout . staging ) ;
let deleted_node =
find_matching_node ( roles . items ( ) . iter ( ) . map ( | ( id , _ , _ ) | * id ) , & args . node_id ) ? ;
layout
. staging
. merge ( & roles . update_mutator ( deleted_node , NodeRoleV ( None ) ) ) ;
send_layout ( rpc_cli , rpc_host , layout ) . await ? ;
println! ( " Role removal is staged but not yet commited. " ) ;
println! ( " Use `garage layout show` to view staged role changes, " ) ;
println! ( " and `garage layout apply` to enact staged changes. " ) ;
Ok ( ( ) )
}
pub async fn cmd_show_layout (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
) -> Result < ( ) , Error > {
let mut layout = fetch_layout ( rpc_cli , rpc_host ) . await ? ;
println! ( " ==== CURRENT CLUSTER LAYOUT ==== " ) ;
if ! print_cluster_layout ( & layout ) {
println! ( " No nodes currently have a role in the cluster. " ) ;
println! ( " See `garage status` to view available nodes. " ) ;
}
println! ( ) ;
println! ( " Current cluster layout version: {} " , layout . version ) ;
if print_staging_role_changes ( & layout ) {
layout . roles . merge ( & layout . staging ) ;
println! ( ) ;
println! ( " ==== NEW CLUSTER LAYOUT AFTER APPLYING CHANGES ==== " ) ;
if ! print_cluster_layout ( & layout ) {
println! ( " No nodes have a role in the new layout. " ) ;
}
println! ( ) ;
// this will print the stats of what partitions
// will move around when we apply
if layout . calculate_partition_assignation ( ) {
println! ( " To enact the staged role changes, type: " ) ;
println! ( ) ;
println! ( " garage layout apply --version {} " , layout . version + 1 ) ;
println! ( ) ;
println! (
" You can also revert all proposed changes with: garage layout revert --version {} " ,
layout . version + 1
) ;
} else {
println! ( " Not enough nodes have an assigned role to maintain enough copies of data. " ) ;
println! ( " This new layout cannot yet be applied. " ) ;
}
}
Ok ( ( ) )
}
pub async fn cmd_apply_layout (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
apply_opt : ApplyLayoutOpt ,
) -> Result < ( ) , Error > {
let mut layout = fetch_layout ( rpc_cli , rpc_host ) . await ? ;
2022-02-10 15:10:21 +00:00
layout . roles . merge ( & layout . staging ) ;
if ! layout . calculate_partition_assignation ( ) {
return Err ( Error ::Message ( " Could not calculate new assignation of partitions to nodes. This can happen if there are less nodes than the desired number of copies of your data (see the replication_mode configuration parameter). " . into ( ) ) ) ;
}
layout . staging . clear ( ) ;
layout . staging_hash = blake2sum ( & rmp_to_vec_all_named ( & layout . staging ) . unwrap ( ) [ .. ] ) ;
2021-11-09 11:24:04 +00:00
match apply_opt . version {
None = > {
println! ( " Please pass the --version flag to ensure that you are writing the correct version of the cluster layout. " ) ;
println! ( " To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes. " ) ;
return Err ( Error ::Message ( " --version flag is missing " . into ( ) ) ) ;
}
Some ( v ) = > {
if v ! = layout . version + 1 {
return Err ( Error ::Message ( " Invalid value of --version flag " . into ( ) ) ) ;
}
}
}
layout . version + = 1 ;
send_layout ( rpc_cli , rpc_host , layout ) . await ? ;
println! ( " New cluster layout with updated role assignation has been applied in cluster. " ) ;
println! ( " Data will now be moved around between nodes accordingly. " ) ;
Ok ( ( ) )
}
pub async fn cmd_revert_layout (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
revert_opt : RevertLayoutOpt ,
) -> Result < ( ) , Error > {
let mut layout = fetch_layout ( rpc_cli , rpc_host ) . await ? ;
match revert_opt . version {
None = > {
println! ( " Please pass the --version flag to ensure that you are writing the correct version of the cluster layout. " ) ;
println! ( " To know the correct value of the --version flag, invoke `garage layout show` and review the proposed changes. " ) ;
return Err ( Error ::Message ( " --version flag is missing " . into ( ) ) ) ;
}
Some ( v ) = > {
if v ! = layout . version + 1 {
return Err ( Error ::Message ( " Invalid value of --version flag " . into ( ) ) ) ;
}
}
}
layout . staging . clear ( ) ;
layout . staging_hash = blake2sum ( & rmp_to_vec_all_named ( & layout . staging ) . unwrap ( ) [ .. ] ) ;
layout . version + = 1 ;
send_layout ( rpc_cli , rpc_host , layout ) . await ? ;
println! ( " All proposed role changes in cluster layout have been canceled. " ) ;
Ok ( ( ) )
}
// --- utility ---
pub async fn fetch_layout (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
) -> Result < ClusterLayout , Error > {
match rpc_cli
. call ( & rpc_host , & SystemRpc ::PullClusterLayout , PRIO_NORMAL )
. await ? ?
{
SystemRpc ::AdvertiseClusterLayout ( t ) = > Ok ( t ) ,
resp = > Err ( Error ::Message ( format! ( " Invalid RPC response: {:?} " , resp ) ) ) ,
}
}
pub async fn send_layout (
rpc_cli : & Endpoint < SystemRpc , ( ) > ,
rpc_host : NodeID ,
layout : ClusterLayout ,
) -> Result < ( ) , Error > {
rpc_cli
. call (
& rpc_host ,
& SystemRpc ::AdvertiseClusterLayout ( layout ) ,
PRIO_NORMAL ,
)
. await ? ? ;
Ok ( ( ) )
}
pub fn print_cluster_layout ( layout : & ClusterLayout ) -> bool {
let mut table = vec! [ " ID \t Tags \t Zone \t Capacity " . to_string ( ) ] ;
for ( id , _ , role ) in layout . roles . items ( ) . iter ( ) {
let role = match & role . 0 {
Some ( r ) = > r ,
_ = > continue ,
} ;
let tags = role . tags . join ( " , " ) ;
table . push ( format! (
" {:?} \t {} \t {} \t {} " ,
id ,
tags ,
role . zone ,
role . capacity_string ( )
) ) ;
}
if table . len ( ) = = 1 {
false
} else {
format_table ( table ) ;
true
}
}
pub fn print_staging_role_changes ( layout : & ClusterLayout ) -> bool {
if ! layout . staging . items ( ) . is_empty ( ) {
println! ( ) ;
println! ( " ==== STAGED ROLE CHANGES ==== " ) ;
let mut table = vec! [ " ID \t Tags \t Zone \t Capacity " . to_string ( ) ] ;
for ( id , _ , role ) in layout . staging . items ( ) . iter ( ) {
if let Some ( role ) = & role . 0 {
let tags = role . tags . join ( " , " ) ;
table . push ( format! (
" {:?} \t {} \t {} \t {} " ,
id ,
tags ,
role . zone ,
role . capacity_string ( )
) ) ;
} else {
table . push ( format! ( " {:?} \t REMOVED " , id ) ) ;
}
}
format_table ( table ) ;
true
} else {
false
}
}