2021-03-12 14:40:54 +00:00
use std ::collections ::HashMap ;
2021-03-12 17:16:03 +00:00
use std ::fmt ::Write ;
use std ::sync ::Arc ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
use async_trait ::async_trait ;
2020-04-19 15:15:48 +00:00
use serde ::{ Deserialize , Serialize } ;
2021-12-14 12:55:11 +00:00
use garage_util ::crdt ::* ;
use garage_util ::data ::* ;
2022-01-03 16:22:40 +00:00
use garage_util ::error ::Error as GarageError ;
2021-12-14 12:55:11 +00:00
use garage_util ::time ::* ;
2020-04-23 17:05:46 +00:00
2021-03-12 14:40:54 +00:00
use garage_table ::replication ::* ;
2021-03-12 17:16:03 +00:00
use garage_table ::* ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
use garage_rpc ::* ;
2020-04-23 17:05:46 +00:00
2021-12-14 12:55:11 +00:00
use garage_model ::bucket_alias_table ::* ;
2020-07-07 11:59:22 +00:00
use garage_model ::bucket_table ::* ;
use garage_model ::garage ::Garage ;
2022-01-03 12:58:05 +00:00
use garage_model ::helper ::error ::{ Error , OkOrBadRequest } ;
2020-07-07 11:59:22 +00:00
use garage_model ::key_table ::* ;
2021-12-16 12:17:09 +00:00
use garage_model ::migrate ::Migrate ;
2021-12-14 12:55:11 +00:00
use garage_model ::permission ::* ;
2020-04-23 17:05:46 +00:00
2021-03-12 17:12:31 +00:00
use crate ::cli ::* ;
2021-03-12 17:16:03 +00:00
use crate ::repair ::Repair ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
pub const ADMIN_RPC_PATH : & str = " garage/admin_rpc.rs/Rpc " ;
2020-04-19 15:15:48 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
2021-04-23 20:41:24 +00:00
pub enum AdminRpc {
2020-04-19 15:15:48 +00:00
BucketOperation ( BucketOperation ) ,
2020-04-23 18:36:12 +00:00
KeyOperation ( KeyOperation ) ,
2020-04-21 16:40:17 +00:00
LaunchRepair ( RepairOpt ) ,
2021-12-16 12:17:09 +00:00
Migrate ( MigrateOpt ) ,
2021-03-12 14:40:54 +00:00
Stats ( StatsOpt ) ,
2020-04-19 15:15:48 +00:00
// Replies
2020-04-19 17:59:59 +00:00
Ok ( String ) ,
2022-01-03 18:06:04 +00:00
BucketList ( Vec < Bucket > ) ,
2021-12-16 15:17:51 +00:00
BucketInfo ( Bucket , HashMap < String , Key > ) ,
2020-04-23 20:25:45 +00:00
KeyList ( Vec < ( String , String ) > ) ,
2021-12-16 15:17:51 +00:00
KeyInfo ( Key , HashMap < Uuid , Bucket > ) ,
2020-04-19 15:15:48 +00:00
}
2021-10-15 09:05:09 +00:00
impl Rpc for AdminRpc {
type Response = Result < AdminRpc , Error > ;
2021-10-14 09:50:12 +00:00
}
2020-04-19 15:15:48 +00:00
pub struct AdminRpcHandler {
garage : Arc < Garage > ,
2021-10-14 09:50:12 +00:00
endpoint : Arc < Endpoint < AdminRpc , Self > > ,
2020-04-19 15:15:48 +00:00
}
impl AdminRpcHandler {
pub fn new ( garage : Arc < Garage > ) -> Arc < Self > {
2021-10-14 09:50:12 +00:00
let endpoint = garage . system . netapp . endpoint ( ADMIN_RPC_PATH . into ( ) ) ;
let admin = Arc ::new ( Self { garage , endpoint } ) ;
admin . endpoint . set_handler ( admin . clone ( ) ) ;
admin
2020-04-19 15:15:48 +00:00
}
2021-10-14 09:50:12 +00:00
async fn handle_bucket_cmd ( & self , cmd : & BucketOperation ) -> Result < AdminRpc , Error > {
2020-04-19 15:15:48 +00:00
match cmd {
2021-12-14 12:55:11 +00:00
BucketOperation ::List = > self . handle_list_buckets ( ) . await ,
2021-12-16 15:17:51 +00:00
BucketOperation ::Info ( query ) = > self . handle_bucket_info ( query ) . await ,
2021-12-14 12:55:11 +00:00
BucketOperation ::Create ( query ) = > self . handle_create_bucket ( & query . name ) . await ,
BucketOperation ::Delete ( query ) = > self . handle_delete_bucket ( query ) . await ,
2021-12-15 17:36:15 +00:00
BucketOperation ::Alias ( query ) = > self . handle_alias_bucket ( query ) . await ,
BucketOperation ::Unalias ( query ) = > self . handle_unalias_bucket ( query ) . await ,
2021-12-14 12:55:11 +00:00
BucketOperation ::Allow ( query ) = > self . handle_bucket_allow ( query ) . await ,
BucketOperation ::Deny ( query ) = > self . handle_bucket_deny ( query ) . await ,
BucketOperation ::Website ( query ) = > self . handle_bucket_website ( query ) . await ,
}
}
async fn handle_list_buckets ( & self ) -> Result < AdminRpc , Error > {
2022-01-03 18:06:04 +00:00
let buckets = self
2021-12-14 12:55:11 +00:00
. garage
2022-01-03 18:06:04 +00:00
. bucket_table
2021-12-14 12:55:11 +00:00
. get_range ( & EmptyKey , None , Some ( DeletedFilter ::NotDeleted ) , 10000 )
. await ? ;
2022-01-03 18:06:04 +00:00
Ok ( AdminRpc ::BucketList ( buckets ) )
2021-12-14 12:55:11 +00:00
}
2020-12-15 11:48:24 +00:00
2021-12-16 15:17:51 +00:00
async fn handle_bucket_info ( & self , query : & BucketOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . name )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-16 15:17:51 +00:00
let bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let mut relevant_keys = HashMap ::new ( ) ;
for ( k , _ ) in bucket
. state
. as_option ( )
. unwrap ( )
. authorized_keys
. items ( )
. iter ( )
{
if let Some ( key ) = self . garage . key_table . get ( & EmptyKey , k ) . await ? {
relevant_keys . insert ( k . clone ( ) , key ) ;
}
}
for ( ( k , _ ) , _ , _ ) in bucket
. state
. as_option ( )
. unwrap ( )
. local_aliases
. items ( )
. iter ( )
{
if relevant_keys . contains_key ( k ) {
continue ;
}
if let Some ( key ) = self . garage . key_table . get ( & EmptyKey , k ) . await ? {
relevant_keys . insert ( k . clone ( ) , key ) ;
}
}
Ok ( AdminRpc ::BucketInfo ( bucket , relevant_keys ) )
}
2021-12-14 12:55:11 +00:00
#[ allow(clippy::ptr_arg) ]
async fn handle_create_bucket ( & self , name : & String ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
if ! is_valid_bucket_name ( name ) {
return Err ( Error ::BadRequest ( format! (
" {}: {} " ,
name , INVALID_BUCKET_NAME_MESSAGE
) ) ) ;
}
if let Some ( alias ) = self . garage . bucket_alias_table . get ( & EmptyKey , name ) . await ? {
2022-01-03 17:32:15 +00:00
if alias . state . get ( ) . is_some ( ) {
2022-01-03 16:22:40 +00:00
return Err ( Error ::BadRequest ( format! ( " Bucket {} already exists " , name ) ) ) ;
2021-12-14 12:55:11 +00:00
}
2022-01-03 16:22:40 +00:00
}
// ---- done checking, now commit ----
let bucket = Bucket ::new ( ) ;
2021-12-14 12:55:11 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2022-01-03 16:22:40 +00:00
self . garage
. bucket_helper ( )
. set_global_bucket_alias ( bucket . id , name )
. await ? ;
2021-12-14 12:55:11 +00:00
Ok ( AdminRpc ::Ok ( format! ( " Bucket {} was created. " , name ) ) )
}
2020-12-15 11:48:24 +00:00
2021-12-14 12:55:11 +00:00
async fn handle_delete_bucket ( & self , query : & DeleteBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
let bucket_id = helper
. resolve_global_bucket_name ( & query . name )
. await ?
. ok_or_bad_request ( " Bucket not found " ) ? ;
// Get the alias, but keep in minde here the bucket name
// given in parameter can also be directly the bucket's ID.
// In that case bucket_alias will be None, and
// we can still delete the bucket if it has zero aliases
// (a condition which we try to prevent but that could still happen somehow).
// We just won't try to delete an alias entry because there isn't one.
let bucket_alias = self
2021-12-14 12:55:11 +00:00
. garage
. bucket_alias_table
. get ( & EmptyKey , & query . name )
2022-01-03 16:22:40 +00:00
. await ? ;
2021-12-14 12:55:11 +00:00
// Check bucket doesn't have other aliases
2022-01-03 16:22:40 +00:00
let mut bucket = helper . get_existing_bucket ( bucket_id ) . await ? ;
2021-12-14 12:55:11 +00:00
let bucket_state = bucket . state . as_option ( ) . unwrap ( ) ;
if bucket_state
. aliases
. items ( )
. iter ( )
. filter ( | ( _ , _ , active ) | * active )
. any ( | ( name , _ , _ ) | name ! = & query . name )
{
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! ( " Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one. " , query . name ) ) ) ;
2021-12-14 12:55:11 +00:00
}
if bucket_state
. local_aliases
. items ( )
. iter ( )
. any ( | ( _ , _ , active ) | * active )
{
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! ( " Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one. " , query . name ) ) ) ;
2021-12-14 12:55:11 +00:00
}
// Check bucket is empty
let objects = self
. garage
. object_table
. get_range ( & bucket_id , None , Some ( DeletedFilter ::NotDeleted ) , 10 )
. await ? ;
if ! objects . is_empty ( ) {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! (
" Bucket {} is not empty " ,
query . name
) ) ) ;
2021-12-14 12:55:11 +00:00
}
if ! query . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-14 12:55:11 +00:00
" Add --yes flag to really perform this operation " . to_string ( ) ,
) ) ;
}
// --- done checking, now commit ---
// 1. delete authorization from keys that had access
for ( key_id , _ ) in bucket . authorized_keys ( ) {
2022-01-03 16:22:40 +00:00
helper
. set_bucket_key_permissions ( bucket . id , key_id , BucketKeyPerm ::no_permissions ( ) )
. await ? ;
2020-04-19 15:15:48 +00:00
}
2022-01-03 16:22:40 +00:00
2021-12-14 12:55:11 +00:00
// 2. delete bucket alias
2022-01-03 16:22:40 +00:00
if bucket_alias . is_some ( ) {
helper
2022-01-03 17:32:15 +00:00
. purge_global_bucket_alias ( bucket_id , & query . name )
2022-01-03 16:22:40 +00:00
. await ? ;
}
2021-12-17 10:53:13 +00:00
// 3. delete bucket
2021-12-14 12:55:11 +00:00
bucket . state = Deletable ::delete ( ) ;
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! ( " Bucket {} was deleted. " , query . name ) ) )
}
2021-12-15 17:36:15 +00:00
async fn handle_alias_bucket ( & self , query : & AliasBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
let bucket_id = helper
2021-12-15 17:36:15 +00:00
. resolve_global_bucket_name ( & query . existing_bucket )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
if let Some ( key_pattern ) = & query . local {
let key = helper . get_existing_matching_key ( key_pattern ) . await ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
helper
. set_local_bucket_alias ( bucket_id , & key . key_id , & query . new_name )
. await ? ;
2021-12-15 17:36:15 +00:00
Ok ( AdminRpc ::Ok ( format! (
2022-01-03 16:22:40 +00:00
" Alias {} now points to bucket {:?} in namespace of key {} " ,
2021-12-15 17:36:15 +00:00
query . new_name , bucket_id , key . key_id
) ) )
} else {
2022-01-03 16:22:40 +00:00
helper
. set_global_bucket_alias ( bucket_id , & query . new_name )
2021-12-17 10:53:13 +00:00
. await ? ;
2021-12-15 17:36:15 +00:00
Ok ( AdminRpc ::Ok ( format! (
2022-01-03 16:22:40 +00:00
" Alias {} now points to bucket {:?} " ,
2021-12-15 17:36:15 +00:00
query . new_name , bucket_id
) ) )
}
}
async fn handle_unalias_bucket ( & self , query : & UnaliasBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
if let Some ( key_pattern ) = & query . local {
let key = helper . get_existing_matching_key ( key_pattern ) . await ? ;
2021-12-15 17:36:15 +00:00
let bucket_id = key
. state
. as_option ( )
. unwrap ( )
. local_aliases
. get ( & query . name )
2022-01-03 17:32:15 +00:00
. cloned ( )
2021-12-15 17:36:15 +00:00
. flatten ( )
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
helper
. unset_local_bucket_alias ( bucket_id , & key . key_id , & query . name )
. await ? ;
2021-12-15 17:36:15 +00:00
Ok ( AdminRpc ::Ok ( format! (
2022-01-03 16:22:40 +00:00
" Alias {} no longer points to bucket {:?} in namespace of key {} " ,
& query . name , bucket_id , key . key_id
2021-12-15 17:36:15 +00:00
) ) )
} else {
2022-01-03 16:22:40 +00:00
let bucket_id = helper
2021-12-15 17:36:15 +00:00
. resolve_global_bucket_name ( & query . name )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
helper
. unset_global_bucket_alias ( bucket_id , & query . name )
. await ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
Ok ( AdminRpc ::Ok ( format! (
" Alias {} no longer points to bucket {:?} " ,
& query . name , bucket_id
) ) )
2021-12-15 17:36:15 +00:00
}
}
2021-12-14 12:55:11 +00:00
async fn handle_bucket_allow ( & self , query : & PermBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
let bucket_id = helper
2021-12-14 12:55:11 +00:00
. resolve_global_bucket_name ( & query . bucket )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2022-01-03 16:22:40 +00:00
let key = helper . get_existing_matching_key ( & query . key_pattern ) . await ? ;
2021-12-14 12:55:11 +00:00
let allow_read = query . read | | key . allow_read ( & bucket_id ) ;
let allow_write = query . write | | key . allow_write ( & bucket_id ) ;
2021-12-16 10:47:58 +00:00
let allow_owner = query . owner | | key . allow_owner ( & bucket_id ) ;
2021-12-14 12:55:11 +00:00
2022-01-03 16:22:40 +00:00
helper
. set_bucket_key_permissions (
bucket_id ,
& key . key_id ,
BucketKeyPerm {
timestamp : now_msec ( ) ,
allow_read ,
allow_write ,
allow_owner ,
} ,
)
2021-12-14 12:55:11 +00:00
. await ? ;
Ok ( AdminRpc ::Ok ( format! (
2021-12-16 10:47:58 +00:00
" New permissions for {} on {}: read {}, write {}, owner {}. " ,
& key . key_id , & query . bucket , allow_read , allow_write , allow_owner
2021-12-14 12:55:11 +00:00
) ) )
}
async fn handle_bucket_deny ( & self , query : & PermBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
let bucket_id = helper
2021-12-14 12:55:11 +00:00
. resolve_global_bucket_name ( & query . bucket )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2022-01-03 16:22:40 +00:00
let key = helper . get_existing_matching_key ( & query . key_pattern ) . await ? ;
2021-12-14 12:55:11 +00:00
let allow_read = ! query . read & & key . allow_read ( & bucket_id ) ;
let allow_write = ! query . write & & key . allow_write ( & bucket_id ) ;
2021-12-16 10:47:58 +00:00
let allow_owner = ! query . owner & & key . allow_owner ( & bucket_id ) ;
2021-12-14 12:55:11 +00:00
2022-01-03 16:22:40 +00:00
helper
. set_bucket_key_permissions (
bucket_id ,
& key . key_id ,
BucketKeyPerm {
timestamp : now_msec ( ) ,
allow_read ,
allow_write ,
allow_owner ,
} ,
)
2021-12-14 12:55:11 +00:00
. await ? ;
Ok ( AdminRpc ::Ok ( format! (
2021-12-16 10:47:58 +00:00
" New permissions for {} on {}: read {}, write {}, owner {}. " ,
& key . key_id , & query . bucket , allow_read , allow_write , allow_owner
2021-12-14 12:55:11 +00:00
) ) )
}
async fn handle_bucket_website ( & self , query : & WebsiteOpt ) -> Result < AdminRpc , Error > {
2021-12-16 10:47:58 +00:00
let bucket_id = self
2021-12-14 12:55:11 +00:00
. garage
2021-12-16 10:47:58 +00:00
. bucket_helper ( )
. resolve_global_bucket_name ( & query . bucket )
2021-12-14 12:55:11 +00:00
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-14 12:55:11 +00:00
2021-12-16 10:47:58 +00:00
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let bucket_state = bucket . state . as_option_mut ( ) . unwrap ( ) ;
2021-12-14 12:55:11 +00:00
if ! ( query . allow ^ query . deny ) {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-14 12:55:11 +00:00
" You must specify exactly one flag, either --allow or --deny " . to_string ( ) ,
) ) ;
}
2021-12-17 10:53:13 +00:00
let website = if query . allow {
2022-01-03 14:06:19 +00:00
Some ( WebsiteConfig {
2021-12-22 17:50:08 +00:00
index_document : " index.html " . into ( ) ,
error_document : None ,
} )
2021-12-17 10:53:13 +00:00
} else {
None
} ;
bucket_state . website_config . update ( website ) ;
2021-12-16 10:47:58 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2021-12-14 12:55:11 +00:00
let msg = if query . allow {
format! ( " Website access allowed for {} " , & query . bucket )
} else {
format! ( " Website access denied for {} " , & query . bucket )
} ;
Ok ( AdminRpc ::Ok ( msg ) )
2020-04-19 15:15:48 +00:00
}
2020-04-19 20:36:36 +00:00
2021-10-14 09:50:12 +00:00
async fn handle_key_cmd ( & self , cmd : & KeyOperation ) -> Result < AdminRpc , Error > {
2020-04-23 20:25:45 +00:00
match cmd {
2021-12-14 12:55:11 +00:00
KeyOperation ::List = > self . handle_list_keys ( ) . await ,
2021-12-16 15:17:51 +00:00
KeyOperation ::Info ( query ) = > self . handle_key_info ( query ) . await ,
2021-12-14 12:55:11 +00:00
KeyOperation ::New ( query ) = > self . handle_create_key ( query ) . await ,
KeyOperation ::Rename ( query ) = > self . handle_rename_key ( query ) . await ,
KeyOperation ::Delete ( query ) = > self . handle_delete_key ( query ) . await ,
KeyOperation ::Import ( query ) = > self . handle_import_key ( query ) . await ,
}
}
async fn handle_list_keys ( & self ) -> Result < AdminRpc , Error > {
let key_ids = self
. garage
. key_table
. get_range (
& EmptyKey ,
None ,
Some ( KeyFilter ::Deleted ( DeletedFilter ::NotDeleted ) ) ,
10000 ,
)
. await ?
. iter ( )
. map ( | k | ( k . key_id . to_string ( ) , k . name . get ( ) . clone ( ) ) )
. collect ::< Vec < _ > > ( ) ;
Ok ( AdminRpc ::KeyList ( key_ids ) )
}
2021-12-16 15:17:51 +00:00
async fn handle_key_info ( & self , query : & KeyOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let key = self
. garage
. bucket_helper ( )
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
}
2021-12-14 12:55:11 +00:00
async fn handle_create_key ( & self , query : & KeyNewOpt ) -> Result < AdminRpc , Error > {
let key = Key ::new ( query . name . clone ( ) ) ;
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
2021-12-14 12:55:11 +00:00
}
async fn handle_rename_key ( & self , query : & KeyRenameOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let mut key = self
. garage
. bucket_helper ( )
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2021-12-14 12:55:11 +00:00
key . name . update ( query . new_name . clone ( ) ) ;
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
2021-12-14 12:55:11 +00:00
}
async fn handle_delete_key ( & self , query : & KeyDeleteOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
let mut key = helper . get_existing_matching_key ( & query . key_pattern ) . await ? ;
2021-12-14 12:55:11 +00:00
if ! query . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-14 12:55:11 +00:00
" Add --yes flag to really perform this operation " . to_string ( ) ,
) ) ;
}
2022-01-03 16:22:40 +00:00
2021-12-14 12:55:11 +00:00
let state = key . state . as_option_mut ( ) . unwrap ( ) ;
// --- done checking, now commit ---
2022-01-03 17:32:15 +00:00
// (the step at unset_local_bucket_alias will fail if a bucket
// does not have another alias, the deletion will be
// interrupted in the middle if that happens)
2021-12-14 12:55:11 +00:00
// 1. Delete local aliases
for ( alias , _ , to ) in state . local_aliases . items ( ) . iter ( ) {
2022-01-03 17:32:15 +00:00
if let Some ( bucket_id ) = to {
2022-01-03 16:22:40 +00:00
helper
. unset_local_bucket_alias ( * bucket_id , & key . key_id , alias )
. await ? ;
2020-04-23 20:25:45 +00:00
}
2021-12-14 12:55:11 +00:00
}
2022-01-03 17:32:15 +00:00
// 2. Remove permissions on all authorized buckets
2022-01-03 16:22:40 +00:00
for ( ab_id , _auth ) in state . authorized_buckets . items ( ) . iter ( ) {
helper
. set_bucket_key_permissions ( * ab_id , & key . key_id , BucketKeyPerm ::no_permissions ( ) )
. await ? ;
2020-04-23 20:25:45 +00:00
}
2022-01-03 17:32:15 +00:00
2021-12-14 12:55:11 +00:00
// 3. Actually delete key
key . state = Deletable ::delete ( ) ;
self . garage . key_table . insert ( & key ) . await ? ;
Ok ( AdminRpc ::Ok ( format! (
" Key {} was deleted successfully. " ,
key . key_id
) ) )
2020-04-23 20:25:45 +00:00
}
2021-12-14 12:55:11 +00:00
async fn handle_import_key ( & self , query : & KeyImportOpt ) -> Result < AdminRpc , Error > {
let prev_key = self . garage . key_table . get ( & EmptyKey , & query . key_id ) . await ? ;
if prev_key . is_some ( ) {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! ( " Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry. " , query . key_id ) ) ) ;
2021-12-14 12:55:11 +00:00
}
let imported_key = Key ::import ( & query . key_id , & query . secret_key , & query . name ) ;
self . garage . key_table . insert ( & imported_key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( imported_key ) . await
2020-04-23 20:25:45 +00:00
}
2021-12-16 15:17:51 +00:00
async fn key_info_result ( & self , key : Key ) -> Result < AdminRpc , Error > {
let mut relevant_buckets = HashMap ::new ( ) ;
for ( id , _ ) in key
. state
. as_option ( )
. unwrap ( )
. authorized_buckets
. items ( )
. iter ( )
{
2022-01-03 18:06:04 +00:00
if let Some ( b ) = self . garage . bucket_table . get ( & EmptyKey , id ) . await ? {
2021-12-16 15:17:51 +00:00
relevant_buckets . insert ( * id , b ) ;
}
}
Ok ( AdminRpc ::KeyInfo ( key , relevant_buckets ) )
}
2021-12-16 12:17:09 +00:00
async fn handle_migrate ( self : & Arc < Self > , opt : MigrateOpt ) -> Result < AdminRpc , Error > {
if ! opt . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-16 12:17:09 +00:00
" Please provide the --yes flag to initiate migration operation. " . to_string ( ) ,
) ) ;
}
let m = Migrate {
garage : self . garage . clone ( ) ,
} ;
match opt . what {
MigrateWhat ::Buckets050 = > m . migrate_buckets050 ( ) . await ,
} ? ;
Ok ( AdminRpc ::Ok ( " Migration successfull. " . into ( ) ) )
}
2021-04-23 20:41:24 +00:00
async fn handle_launch_repair ( self : & Arc < Self > , opt : RepairOpt ) -> Result < AdminRpc , Error > {
2020-04-21 16:40:17 +00:00
if ! opt . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-04-23 20:41:24 +00:00
" Please provide the --yes flag to initiate repair operations. " . to_string ( ) ,
) ) ;
2020-04-21 16:40:17 +00:00
}
if opt . all_nodes {
let mut opt_to_send = opt . clone ( ) ;
opt_to_send . all_nodes = false ;
2020-04-19 20:36:36 +00:00
let mut failures = vec! [ ] ;
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
2021-11-09 11:24:04 +00:00
for node in ring . layout . node_ids ( ) . iter ( ) {
2021-10-15 09:05:09 +00:00
let node = ( * node ) . into ( ) ;
let resp = self
2021-10-14 09:50:12 +00:00
. endpoint
2020-04-21 16:40:17 +00:00
. call (
2021-10-14 09:50:12 +00:00
& node ,
& AdminRpc ::LaunchRepair ( opt_to_send . clone ( ) ) ,
PRIO_NORMAL ,
2020-04-21 16:40:17 +00:00
)
2021-10-15 09:05:09 +00:00
. await ;
2021-10-19 14:16:10 +00:00
if ! matches! ( resp , Ok ( Ok ( _ ) ) ) {
2021-10-14 09:50:12 +00:00
failures . push ( node ) ;
2020-04-19 20:36:36 +00:00
}
}
if failures . is_empty ( ) {
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( " Repair launched on all nodes " . to_string ( ) ) )
2020-04-19 20:36:36 +00:00
} else {
2022-01-03 12:58:05 +00:00
Err ( Error ::BadRequest ( format! (
2020-04-19 20:36:36 +00:00
" Could not launch repair on nodes: {:?} (launched successfully on other nodes) " ,
failures
) ) )
}
} else {
2020-04-23 18:36:12 +00:00
let repair = Repair {
garage : self . garage . clone ( ) ,
} ;
2020-04-19 21:27:08 +00:00
self . garage
. system
. background
2020-04-19 21:33:38 +00:00
. spawn_worker ( " Repair worker " . into ( ) , move | must_exit | async move {
2020-04-23 18:36:12 +00:00
repair . repair_worker ( opt , must_exit ) . await
2021-03-11 12:47:21 +00:00
} ) ;
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( format! (
2020-04-19 20:36:36 +00:00
" Repair launched on {:?} " ,
self . garage . system . id
) ) )
}
}
2021-03-12 14:40:54 +00:00
2021-04-23 20:41:24 +00:00
async fn handle_stats ( & self , opt : StatsOpt ) -> Result < AdminRpc , Error > {
2021-03-12 14:40:54 +00:00
if opt . all_nodes {
let mut ret = String ::new ( ) ;
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
2021-11-09 11:24:04 +00:00
for node in ring . layout . node_ids ( ) . iter ( ) {
2021-03-12 14:40:54 +00:00
let mut opt = opt . clone ( ) ;
opt . all_nodes = false ;
writeln! ( & mut ret , " \n ====================== " ) . unwrap ( ) ;
writeln! ( & mut ret , " Stats for node {:?}: " , node ) . unwrap ( ) ;
2021-10-15 09:05:09 +00:00
let node_id = ( * node ) . into ( ) ;
2021-03-12 14:40:54 +00:00
match self
2021-10-14 09:50:12 +00:00
. endpoint
2021-10-15 09:05:09 +00:00
. call ( & node_id , & AdminRpc ::Stats ( opt ) , PRIO_NORMAL )
. await ?
2021-03-12 14:40:54 +00:00
{
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( s ) ) = > writeln! ( & mut ret , " {} " , s ) . unwrap ( ) ,
2021-03-12 14:40:54 +00:00
Ok ( x ) = > writeln! ( & mut ret , " Bad answer: {:?} " , x ) . unwrap ( ) ,
Err ( e ) = > writeln! ( & mut ret , " Error: {} " , e ) . unwrap ( ) ,
}
}
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( ret ) )
2021-03-12 14:40:54 +00:00
} else {
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( self . gather_stats_local ( opt ) ) )
2021-03-12 14:40:54 +00:00
}
}
2021-04-23 20:41:24 +00:00
fn gather_stats_local ( & self , opt : StatsOpt ) -> String {
2021-03-12 14:40:54 +00:00
let mut ret = String ::new ( ) ;
2021-03-12 17:16:03 +00:00
writeln! (
& mut ret ,
" \n Garage version: {} " ,
2021-10-04 16:27:57 +00:00
option_env! ( " GIT_VERSION " ) . unwrap_or ( git_version ::git_version! (
2021-10-11 12:26:54 +00:00
prefix = " git: " ,
cargo_prefix = " cargo: " ,
fallback = " unknown "
2021-10-04 16:27:57 +00:00
) )
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2021-03-12 14:40:54 +00:00
// Gather ring statistics
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
let mut ring_nodes = HashMap ::new ( ) ;
2021-05-28 10:36:22 +00:00
for ( _i , loc ) in ring . partitions ( ) . iter ( ) {
for n in ring . get_nodes ( loc , ring . replication_factor ) . iter ( ) {
2021-03-12 14:40:54 +00:00
if ! ring_nodes . contains_key ( n ) {
ring_nodes . insert ( * n , 0 usize ) ;
}
* ring_nodes . get_mut ( n ) . unwrap ( ) + = 1 ;
}
}
writeln! ( & mut ret , " \n Ring nodes & partition count: " ) . unwrap ( ) ;
for ( n , c ) in ring_nodes . iter ( ) {
writeln! ( & mut ret , " {:?} {} " , n , c ) . unwrap ( ) ;
}
2021-04-23 20:41:24 +00:00
self . gather_table_stats ( & mut ret , & self . garage . bucket_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . key_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . object_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . version_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . block_ref_table , & opt ) ;
2021-03-12 14:40:54 +00:00
writeln! ( & mut ret , " \n Block manager stats: " ) . unwrap ( ) ;
2021-03-16 15:35:10 +00:00
if opt . detailed {
writeln! (
& mut ret ,
2021-10-28 12:32:55 +00:00
" number of RC entries (~= number of blocks): {} " ,
2021-03-16 15:35:10 +00:00
self . garage . block_manager . rc_len ( )
)
. unwrap ( ) ;
}
2021-03-12 17:16:03 +00:00
writeln! (
& mut ret ,
" resync queue length: {} " ,
2021-03-15 18:51:16 +00:00
self . garage . block_manager . resync_queue_len ( )
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2021-03-12 14:40:54 +00:00
2021-04-23 20:41:24 +00:00
ret
2021-03-12 14:40:54 +00:00
}
2021-04-23 20:41:24 +00:00
fn gather_table_stats < F , R > ( & self , to : & mut String , t : & Arc < Table < F , R > > , opt : & StatsOpt )
2021-03-16 10:43:58 +00:00
where
F : TableSchema + 'static ,
R : TableReplication + 'static ,
{
2021-12-14 11:34:01 +00:00
writeln! ( to , " \n Table stats for {} " , F ::TABLE_NAME ) . unwrap ( ) ;
2021-03-16 15:35:10 +00:00
if opt . detailed {
writeln! ( to , " number of items: {} " , t . data . store . len ( ) ) . unwrap ( ) ;
writeln! (
to ,
" Merkle tree size: {} " ,
t . merkle_updater . merkle_tree_len ( )
)
. unwrap ( ) ;
}
2021-03-12 17:16:03 +00:00
writeln! (
to ,
" Merkle updater todo queue length: {} " ,
2021-03-16 10:43:58 +00:00
t . merkle_updater . todo_len ( )
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2021-03-15 22:14:12 +00:00
writeln! ( to , " GC todo queue length: {} " , t . data . gc_todo_len ( ) ) . unwrap ( ) ;
2021-03-12 14:40:54 +00:00
}
2021-10-15 09:05:09 +00:00
}
2021-10-14 09:50:12 +00:00
2021-10-15 09:05:09 +00:00
#[ async_trait ]
impl EndpointHandler < AdminRpc > for AdminRpcHandler {
async fn handle (
self : & Arc < Self > ,
message : & AdminRpc ,
_from : NodeID ,
) -> Result < AdminRpc , Error > {
match message {
2021-10-14 09:50:12 +00:00
AdminRpc ::BucketOperation ( bo ) = > self . handle_bucket_cmd ( bo ) . await ,
AdminRpc ::KeyOperation ( ko ) = > self . handle_key_cmd ( ko ) . await ,
2021-12-16 12:17:09 +00:00
AdminRpc ::Migrate ( opt ) = > self . handle_migrate ( opt . clone ( ) ) . await ,
2021-10-14 09:50:12 +00:00
AdminRpc ::LaunchRepair ( opt ) = > self . handle_launch_repair ( opt . clone ( ) ) . await ,
AdminRpc ::Stats ( opt ) = > self . handle_stats ( opt . clone ( ) ) . await ,
2022-01-03 12:58:05 +00:00
m = > Err ( GarageError ::unexpected_rpc_message ( m ) . into ( ) ) ,
2021-10-14 09:50:12 +00:00
}
}
}