2021-03-12 14:40:54 +00:00
use std ::collections ::HashMap ;
2021-03-12 17:16:03 +00:00
use std ::fmt ::Write ;
use std ::sync ::Arc ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
use async_trait ::async_trait ;
2020-04-19 15:15:48 +00:00
use serde ::{ Deserialize , Serialize } ;
2021-12-17 10:53:13 +00:00
use serde_bytes ::ByteBuf ;
2020-04-19 15:15:48 +00:00
2021-12-14 12:55:11 +00:00
use garage_util ::crdt ::* ;
use garage_util ::data ::* ;
use garage_util ::error ::* ;
use garage_util ::time ::* ;
2020-04-23 17:05:46 +00:00
2021-03-12 14:40:54 +00:00
use garage_table ::replication ::* ;
2021-03-12 17:16:03 +00:00
use garage_table ::* ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
use garage_rpc ::* ;
2020-04-23 17:05:46 +00:00
2021-12-14 12:55:11 +00:00
use garage_model ::bucket_alias_table ::* ;
2020-07-07 11:59:22 +00:00
use garage_model ::bucket_table ::* ;
use garage_model ::garage ::Garage ;
use garage_model ::key_table ::* ;
2021-12-16 12:17:09 +00:00
use garage_model ::migrate ::Migrate ;
2021-12-14 12:55:11 +00:00
use garage_model ::permission ::* ;
2020-04-23 17:05:46 +00:00
2021-03-12 17:12:31 +00:00
use crate ::cli ::* ;
2021-03-12 17:16:03 +00:00
use crate ::repair ::Repair ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
pub const ADMIN_RPC_PATH : & str = " garage/admin_rpc.rs/Rpc " ;
2020-04-19 15:15:48 +00:00
2021-12-17 10:53:13 +00:00
macro_rules ! INVALID_BUCKET_NAME_MESSAGE { ( ) = > { " Invalid bucket name: {}. See AWS documentation for constraints on S3 bucket names: \n https://docs.aws.amazon.com/AmazonS3/latest/userguide/bucketnamingrules.html " } ; }
2020-04-19 15:15:48 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
2021-04-23 20:41:24 +00:00
pub enum AdminRpc {
2020-04-19 15:15:48 +00:00
BucketOperation ( BucketOperation ) ,
2020-04-23 18:36:12 +00:00
KeyOperation ( KeyOperation ) ,
2020-04-21 16:40:17 +00:00
LaunchRepair ( RepairOpt ) ,
2021-12-16 12:17:09 +00:00
Migrate ( MigrateOpt ) ,
2021-03-12 14:40:54 +00:00
Stats ( StatsOpt ) ,
2020-04-19 15:15:48 +00:00
// Replies
2020-04-19 17:59:59 +00:00
Ok ( String ) ,
2021-12-14 12:55:11 +00:00
BucketList ( Vec < BucketAlias > ) ,
2021-12-16 15:17:51 +00:00
BucketInfo ( Bucket , HashMap < String , Key > ) ,
2020-04-23 20:25:45 +00:00
KeyList ( Vec < ( String , String ) > ) ,
2021-12-16 15:17:51 +00:00
KeyInfo ( Key , HashMap < Uuid , Bucket > ) ,
2020-04-19 15:15:48 +00:00
}
2021-10-15 09:05:09 +00:00
impl Rpc for AdminRpc {
type Response = Result < AdminRpc , Error > ;
2021-10-14 09:50:12 +00:00
}
2020-04-19 15:15:48 +00:00
pub struct AdminRpcHandler {
garage : Arc < Garage > ,
2021-10-14 09:50:12 +00:00
endpoint : Arc < Endpoint < AdminRpc , Self > > ,
2020-04-19 15:15:48 +00:00
}
impl AdminRpcHandler {
pub fn new ( garage : Arc < Garage > ) -> Arc < Self > {
2021-10-14 09:50:12 +00:00
let endpoint = garage . system . netapp . endpoint ( ADMIN_RPC_PATH . into ( ) ) ;
let admin = Arc ::new ( Self { garage , endpoint } ) ;
admin . endpoint . set_handler ( admin . clone ( ) ) ;
admin
2020-04-19 15:15:48 +00:00
}
2021-10-14 09:50:12 +00:00
async fn handle_bucket_cmd ( & self , cmd : & BucketOperation ) -> Result < AdminRpc , Error > {
2020-04-19 15:15:48 +00:00
match cmd {
2021-12-14 12:55:11 +00:00
BucketOperation ::List = > self . handle_list_buckets ( ) . await ,
2021-12-16 15:17:51 +00:00
BucketOperation ::Info ( query ) = > self . handle_bucket_info ( query ) . await ,
2021-12-14 12:55:11 +00:00
BucketOperation ::Create ( query ) = > self . handle_create_bucket ( & query . name ) . await ,
BucketOperation ::Delete ( query ) = > self . handle_delete_bucket ( query ) . await ,
2021-12-15 17:36:15 +00:00
BucketOperation ::Alias ( query ) = > self . handle_alias_bucket ( query ) . await ,
BucketOperation ::Unalias ( query ) = > self . handle_unalias_bucket ( query ) . await ,
2021-12-14 12:55:11 +00:00
BucketOperation ::Allow ( query ) = > self . handle_bucket_allow ( query ) . await ,
BucketOperation ::Deny ( query ) = > self . handle_bucket_deny ( query ) . await ,
BucketOperation ::Website ( query ) = > self . handle_bucket_website ( query ) . await ,
}
}
async fn handle_list_buckets ( & self ) -> Result < AdminRpc , Error > {
let bucket_aliases = self
. garage
. bucket_alias_table
. get_range ( & EmptyKey , None , Some ( DeletedFilter ::NotDeleted ) , 10000 )
. await ? ;
Ok ( AdminRpc ::BucketList ( bucket_aliases ) )
}
2020-12-15 11:48:24 +00:00
2021-12-16 15:17:51 +00:00
async fn handle_bucket_info ( & self , query : & BucketOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . name )
. await ?
. ok_or_message ( " Bucket not found " ) ? ;
let bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let mut relevant_keys = HashMap ::new ( ) ;
for ( k , _ ) in bucket
. state
. as_option ( )
. unwrap ( )
. authorized_keys
. items ( )
. iter ( )
{
if let Some ( key ) = self . garage . key_table . get ( & EmptyKey , k ) . await ? {
relevant_keys . insert ( k . clone ( ) , key ) ;
}
}
for ( ( k , _ ) , _ , _ ) in bucket
. state
. as_option ( )
. unwrap ( )
. local_aliases
. items ( )
. iter ( )
{
if relevant_keys . contains_key ( k ) {
continue ;
}
if let Some ( key ) = self . garage . key_table . get ( & EmptyKey , k ) . await ? {
relevant_keys . insert ( k . clone ( ) , key ) ;
}
}
Ok ( AdminRpc ::BucketInfo ( bucket , relevant_keys ) )
}
2021-12-14 12:55:11 +00:00
#[ allow(clippy::ptr_arg) ]
async fn handle_create_bucket ( & self , name : & String ) -> Result < AdminRpc , Error > {
let mut bucket = Bucket ::new ( ) ;
let alias = match self . garage . bucket_alias_table . get ( & EmptyKey , name ) . await ? {
Some ( mut alias ) = > {
if ! alias . state . get ( ) . is_deleted ( ) {
return Err ( Error ::BadRpc ( format! ( " Bucket {} already exists " , name ) ) ) ;
2020-12-12 16:00:31 +00:00
}
2021-12-14 12:55:11 +00:00
alias . state . update ( Deletable ::Present ( AliasParams {
bucket_id : bucket . id ,
} ) ) ;
alias
}
2021-12-17 10:53:13 +00:00
None = > BucketAlias ::new ( name . clone ( ) , bucket . id )
. ok_or_message ( format! ( INVALID_BUCKET_NAME_MESSAGE ! ( ) , name ) ) ? ,
2021-12-14 12:55:11 +00:00
} ;
2021-12-17 10:53:13 +00:00
bucket . state . as_option_mut ( ) . unwrap ( ) . aliases . merge_raw (
name ,
alias . state . timestamp ( ) ,
& true ,
) ;
2021-12-14 12:55:11 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
self . garage . bucket_alias_table . insert ( & alias ) . await ? ;
Ok ( AdminRpc ::Ok ( format! ( " Bucket {} was created. " , name ) ) )
}
2020-12-15 11:48:24 +00:00
2021-12-14 12:55:11 +00:00
async fn handle_delete_bucket ( & self , query : & DeleteBucketOpt ) -> Result < AdminRpc , Error > {
let mut bucket_alias = self
. garage
. bucket_alias_table
. get ( & EmptyKey , & query . name )
. await ?
. filter ( | a | ! a . is_deleted ( ) )
. ok_or_message ( format! ( " Bucket {} does not exist " , query . name ) ) ? ;
2020-12-12 16:00:31 +00:00
2021-12-14 12:55:11 +00:00
let bucket_id = bucket_alias . state . get ( ) . as_option ( ) . unwrap ( ) . bucket_id ;
// Check bucket doesn't have other aliases
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let bucket_state = bucket . state . as_option ( ) . unwrap ( ) ;
if bucket_state
. aliases
. items ( )
. iter ( )
. filter ( | ( _ , _ , active ) | * active )
. any ( | ( name , _ , _ ) | name ! = & query . name )
{
return Err ( Error ::Message ( format! ( " Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one. " , query . name ) ) ) ;
}
if bucket_state
. local_aliases
. items ( )
. iter ( )
. any ( | ( _ , _ , active ) | * active )
{
return Err ( Error ::Message ( format! ( " Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one. " , query . name ) ) ) ;
}
// Check bucket is empty
let objects = self
. garage
. object_table
. get_range ( & bucket_id , None , Some ( DeletedFilter ::NotDeleted ) , 10 )
. await ? ;
if ! objects . is_empty ( ) {
return Err ( Error ::BadRpc ( format! ( " Bucket {} is not empty " , query . name ) ) ) ;
}
if ! query . yes {
return Err ( Error ::BadRpc (
" Add --yes flag to really perform this operation " . to_string ( ) ,
) ) ;
}
// --- done checking, now commit ---
// 1. delete authorization from keys that had access
for ( key_id , _ ) in bucket . authorized_keys ( ) {
if let Some ( key ) = self . garage . key_table . get ( & EmptyKey , key_id ) . await ? {
if ! key . state . is_deleted ( ) {
2021-12-16 10:47:58 +00:00
self . update_key_bucket ( & key , bucket . id , false , false , false )
2021-12-14 12:55:11 +00:00
. await ? ;
2020-12-15 11:48:24 +00:00
}
2021-12-14 12:55:11 +00:00
} else {
return Err ( Error ::Message ( format! ( " Key not found: {} " , key_id ) ) ) ;
2020-12-10 17:13:32 +00:00
}
2020-04-19 15:15:48 +00:00
}
2021-12-14 12:55:11 +00:00
// 2. delete bucket alias
bucket_alias . state . update ( Deletable ::Deleted ) ;
self . garage . bucket_alias_table . insert ( & bucket_alias ) . await ? ;
2021-12-17 10:53:13 +00:00
// 3. delete bucket
2021-12-14 12:55:11 +00:00
bucket . state = Deletable ::delete ( ) ;
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! ( " Bucket {} was deleted. " , query . name ) ) )
}
2021-12-15 17:36:15 +00:00
async fn handle_alias_bucket ( & self , query : & AliasBucketOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . existing_bucket )
. await ?
. ok_or_message ( " Bucket not found " ) ? ;
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
if let Some ( key_local ) = & query . local {
let mut key = self . get_existing_key ( key_local ) . await ? ;
let mut key_param = key . state . as_option_mut ( ) . unwrap ( ) ;
if let Some ( Deletable ::Present ( existing_alias ) ) =
key_param . local_aliases . get ( & query . new_name )
{
if * existing_alias = = bucket_id {
return Ok ( AdminRpc ::Ok ( format! (
" Alias {} already points to bucket {:?} in namespace of key {} " ,
query . new_name , bucket_id , key . key_id
) ) ) ;
} else {
return Err ( Error ::Message ( format! ( " Alias {} already exists and points to different bucket: {:?} in namespace of key {} " , query . new_name , existing_alias , key . key_id ) ) ) ;
}
}
2021-12-17 10:53:13 +00:00
if ! is_valid_bucket_name ( & query . new_name ) {
return Err ( Error ::Message ( format! (
INVALID_BUCKET_NAME_MESSAGE ! ( ) ,
query . new_name
) ) ) ;
}
2021-12-15 17:36:15 +00:00
2021-12-17 10:53:13 +00:00
// Checks ok, add alias
2021-12-15 17:36:15 +00:00
let mut bucket_p = bucket . state . as_option_mut ( ) . unwrap ( ) ;
2021-12-17 10:53:13 +00:00
let bucket_p_local_alias_key = ( key . key_id . clone ( ) , query . new_name . clone ( ) ) ;
// Calculate the timestamp to assign to this aliasing in the two local_aliases maps
// (the one from key to bucket, and the reverse one stored in the bucket iself)
// so that merges on both maps in case of a concurrent operation resolve
// to the same alias being set
let alias_ts = increment_logical_clock_2 (
key_param . local_aliases . get_timestamp ( & query . new_name ) ,
bucket_p
. local_aliases
. get_timestamp ( & bucket_p_local_alias_key ) ,
) ;
key_param . local_aliases = LwwMap ::raw_item (
query . new_name . clone ( ) ,
alias_ts ,
Deletable ::present ( bucket_id ) ,
) ;
self . garage . key_table . insert ( & key ) . await ? ;
bucket_p . local_aliases = LwwMap ::raw_item ( bucket_p_local_alias_key , alias_ts , true ) ;
2021-12-15 17:36:15 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! (
" Alias {} created to bucket {:?} in namespace of key {} " ,
query . new_name , bucket_id , key . key_id
) ) )
} else {
2021-12-17 10:53:13 +00:00
let alias = self
2021-12-15 17:36:15 +00:00
. garage
. bucket_alias_table
. get ( & EmptyKey , & query . new_name )
2021-12-17 10:53:13 +00:00
. await ? ;
2021-12-15 17:36:15 +00:00
2021-12-17 10:53:13 +00:00
if let Some ( existing_alias ) = alias . as_ref ( ) {
if let Some ( p ) = existing_alias . state . get ( ) . as_option ( ) {
if p . bucket_id = = bucket_id {
return Ok ( AdminRpc ::Ok ( format! (
" Alias {} already points to bucket {:?} " ,
query . new_name , bucket_id
) ) ) ;
} else {
return Err ( Error ::Message ( format! (
" Alias {} already exists and points to different bucket: {:?} " ,
query . new_name , p . bucket_id
) ) ) ;
}
2021-12-15 17:36:15 +00:00
}
}
// Checks ok, add alias
2021-12-17 10:53:13 +00:00
let mut bucket_p = bucket . state . as_option_mut ( ) . unwrap ( ) ;
let alias_ts = increment_logical_clock_2 (
bucket_p . aliases . get_timestamp ( & query . new_name ) ,
alias . as_ref ( ) . map ( | a | a . state . timestamp ( ) ) . unwrap_or ( 0 ) ,
) ;
let alias = match alias {
None = > BucketAlias ::new ( query . new_name . clone ( ) , bucket_id )
. ok_or_message ( format! ( INVALID_BUCKET_NAME_MESSAGE ! ( ) , query . new_name ) ) ? ,
Some ( mut a ) = > {
a . state = Lww ::raw ( alias_ts , Deletable ::present ( AliasParams { bucket_id } ) ) ;
a
}
} ;
2021-12-15 17:36:15 +00:00
self . garage . bucket_alias_table . insert ( & alias ) . await ? ;
2021-12-17 10:53:13 +00:00
bucket_p . aliases = LwwMap ::raw_item ( query . new_name . clone ( ) , alias_ts , true ) ;
2021-12-15 17:36:15 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! (
" Alias {} created to bucket {:?} " ,
query . new_name , bucket_id
) ) )
}
}
async fn handle_unalias_bucket ( & self , query : & UnaliasBucketOpt ) -> Result < AdminRpc , Error > {
if let Some ( key_local ) = & query . local {
let mut key = self . get_existing_key ( key_local ) . await ? ;
let bucket_id = key
. state
. as_option ( )
. unwrap ( )
. local_aliases
. get ( & query . name )
. map ( | a | a . into_option ( ) )
. flatten ( )
. ok_or_message ( " Bucket not found " ) ? ;
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
2021-12-17 10:53:13 +00:00
let mut bucket_p = bucket . state . as_option_mut ( ) . unwrap ( ) ;
2021-12-15 17:36:15 +00:00
2021-12-17 10:53:13 +00:00
let has_other_aliases = bucket_p
2021-12-15 17:36:15 +00:00
. aliases
. items ( )
. iter ( )
. any ( | ( _ , _ , active ) | * active )
2021-12-17 10:53:13 +00:00
| | bucket_p
2021-12-15 17:36:15 +00:00
. local_aliases
. items ( )
. iter ( )
. any ( | ( ( k , n ) , _ , active ) | * k = = key . key_id & & * n = = query . name & & * active ) ;
if ! has_other_aliases {
return Err ( Error ::Message ( format! ( " Bucket {} doesn't have other aliases, please delete it instead of just unaliasing. " , query . name ) ) ) ;
}
2021-12-17 10:53:13 +00:00
// Checks ok, remove alias
2021-12-15 17:36:15 +00:00
let mut key_param = key . state . as_option_mut ( ) . unwrap ( ) ;
2021-12-17 10:53:13 +00:00
let bucket_p_local_alias_key = ( key . key_id . clone ( ) , query . name . clone ( ) ) ;
let alias_ts = increment_logical_clock_2 (
key_param . local_aliases . get_timestamp ( & query . name ) ,
bucket_p
. local_aliases
. get_timestamp ( & bucket_p_local_alias_key ) ,
) ;
key_param . local_aliases =
LwwMap ::raw_item ( query . name . clone ( ) , alias_ts , Deletable ::delete ( ) ) ;
2021-12-15 17:36:15 +00:00
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-17 10:53:13 +00:00
bucket_p . local_aliases = LwwMap ::raw_item ( bucket_p_local_alias_key , alias_ts , false ) ;
2021-12-15 17:36:15 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! (
" Bucket alias {} deleted from namespace of key {} " ,
query . name , key . key_id
) ) )
} else {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . name )
. await ?
. ok_or_message ( " Bucket not found " ) ? ;
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let mut bucket_state = bucket . state . as_option_mut ( ) . unwrap ( ) ;
let has_other_aliases = bucket_state
. aliases
. items ( )
. iter ( )
. any ( | ( name , _ , active ) | * name ! = query . name & & * active )
| | bucket_state
. local_aliases
. items ( )
. iter ( )
. any ( | ( _ , _ , active ) | * active ) ;
if ! has_other_aliases {
return Err ( Error ::Message ( format! ( " Bucket {} doesn't have other aliases, please delete it instead of just unaliasing. " , query . name ) ) ) ;
}
let mut alias = self
. garage
. bucket_alias_table
. get ( & EmptyKey , & query . name )
. await ?
. ok_or_message ( " Internal error: alias not found " ) ? ;
2021-12-17 10:53:13 +00:00
// Checks ok, remove alias
let alias_ts = increment_logical_clock_2 (
alias . state . timestamp ( ) ,
bucket_state . aliases . get_timestamp ( & query . name ) ,
) ;
alias . state = Lww ::raw ( alias_ts , Deletable ::delete ( ) ) ;
2021-12-15 17:36:15 +00:00
self . garage . bucket_alias_table . insert ( & alias ) . await ? ;
2021-12-17 10:53:13 +00:00
bucket_state . aliases = LwwMap ::raw_item ( query . name . clone ( ) , alias_ts , false ) ;
2021-12-15 17:36:15 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! ( " Bucket alias {} deleted " , query . name ) ) )
}
}
2021-12-14 12:55:11 +00:00
async fn handle_bucket_allow ( & self , query : & PermBucketOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . bucket )
. await ?
. ok_or_message ( " Bucket not found " ) ? ;
let bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let key = self . get_existing_key ( & query . key_pattern ) . await ? ;
let allow_read = query . read | | key . allow_read ( & bucket_id ) ;
let allow_write = query . write | | key . allow_write ( & bucket_id ) ;
2021-12-16 10:47:58 +00:00
let allow_owner = query . owner | | key . allow_owner ( & bucket_id ) ;
2021-12-14 12:55:11 +00:00
let new_perm = self
2021-12-16 10:47:58 +00:00
. update_key_bucket ( & key , bucket_id , allow_read , allow_write , allow_owner )
2021-12-14 12:55:11 +00:00
. await ? ;
self . update_bucket_key ( bucket , & key . key_id , new_perm )
. await ? ;
Ok ( AdminRpc ::Ok ( format! (
2021-12-16 10:47:58 +00:00
" New permissions for {} on {}: read {}, write {}, owner {}. " ,
& key . key_id , & query . bucket , allow_read , allow_write , allow_owner
2021-12-14 12:55:11 +00:00
) ) )
}
async fn handle_bucket_deny ( & self , query : & PermBucketOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . bucket )
. await ?
. ok_or_message ( " Bucket not found " ) ? ;
let bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let key = self . get_existing_key ( & query . key_pattern ) . await ? ;
let allow_read = ! query . read & & key . allow_read ( & bucket_id ) ;
let allow_write = ! query . write & & key . allow_write ( & bucket_id ) ;
2021-12-16 10:47:58 +00:00
let allow_owner = ! query . owner & & key . allow_owner ( & bucket_id ) ;
2021-12-14 12:55:11 +00:00
let new_perm = self
2021-12-16 10:47:58 +00:00
. update_key_bucket ( & key , bucket_id , allow_read , allow_write , allow_owner )
2021-12-14 12:55:11 +00:00
. await ? ;
self . update_bucket_key ( bucket , & key . key_id , new_perm )
. await ? ;
Ok ( AdminRpc ::Ok ( format! (
2021-12-16 10:47:58 +00:00
" New permissions for {} on {}: read {}, write {}, owner {}. " ,
& key . key_id , & query . bucket , allow_read , allow_write , allow_owner
2021-12-14 12:55:11 +00:00
) ) )
}
async fn handle_bucket_website ( & self , query : & WebsiteOpt ) -> Result < AdminRpc , Error > {
2021-12-16 10:47:58 +00:00
let bucket_id = self
2021-12-14 12:55:11 +00:00
. garage
2021-12-16 10:47:58 +00:00
. bucket_helper ( )
. resolve_global_bucket_name ( & query . bucket )
2021-12-14 12:55:11 +00:00
. await ?
2021-12-16 10:47:58 +00:00
. ok_or_message ( " Bucket not found " ) ? ;
2021-12-14 12:55:11 +00:00
2021-12-16 10:47:58 +00:00
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let bucket_state = bucket . state . as_option_mut ( ) . unwrap ( ) ;
2021-12-14 12:55:11 +00:00
if ! ( query . allow ^ query . deny ) {
return Err ( Error ::Message (
" You must specify exactly one flag, either --allow or --deny " . to_string ( ) ,
) ) ;
}
2021-12-17 10:53:13 +00:00
let website = if query . allow {
Some ( ByteBuf ::from ( DEFAULT_WEBSITE_CONFIGURATION . to_vec ( ) ) )
} else {
None
} ;
bucket_state . website_config . update ( website ) ;
2021-12-16 10:47:58 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2021-12-14 12:55:11 +00:00
let msg = if query . allow {
format! ( " Website access allowed for {} " , & query . bucket )
} else {
format! ( " Website access denied for {} " , & query . bucket )
} ;
Ok ( AdminRpc ::Ok ( msg ) )
2020-04-19 15:15:48 +00:00
}
2020-04-19 20:36:36 +00:00
2021-10-14 09:50:12 +00:00
async fn handle_key_cmd ( & self , cmd : & KeyOperation ) -> Result < AdminRpc , Error > {
2020-04-23 20:25:45 +00:00
match cmd {
2021-12-14 12:55:11 +00:00
KeyOperation ::List = > self . handle_list_keys ( ) . await ,
2021-12-16 15:17:51 +00:00
KeyOperation ::Info ( query ) = > self . handle_key_info ( query ) . await ,
2021-12-14 12:55:11 +00:00
KeyOperation ::New ( query ) = > self . handle_create_key ( query ) . await ,
KeyOperation ::Rename ( query ) = > self . handle_rename_key ( query ) . await ,
KeyOperation ::Delete ( query ) = > self . handle_delete_key ( query ) . await ,
KeyOperation ::Import ( query ) = > self . handle_import_key ( query ) . await ,
}
}
async fn handle_list_keys ( & self ) -> Result < AdminRpc , Error > {
let key_ids = self
. garage
. key_table
. get_range (
& EmptyKey ,
None ,
Some ( KeyFilter ::Deleted ( DeletedFilter ::NotDeleted ) ) ,
10000 ,
)
. await ?
. iter ( )
. map ( | k | ( k . key_id . to_string ( ) , k . name . get ( ) . clone ( ) ) )
. collect ::< Vec < _ > > ( ) ;
Ok ( AdminRpc ::KeyList ( key_ids ) )
}
2021-12-16 15:17:51 +00:00
async fn handle_key_info ( & self , query : & KeyOpt ) -> Result < AdminRpc , Error > {
let key = self . get_existing_key ( & query . key_pattern ) . await ? ;
self . key_info_result ( key ) . await
}
2021-12-14 12:55:11 +00:00
async fn handle_create_key ( & self , query : & KeyNewOpt ) -> Result < AdminRpc , Error > {
let key = Key ::new ( query . name . clone ( ) ) ;
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
2021-12-14 12:55:11 +00:00
}
async fn handle_rename_key ( & self , query : & KeyRenameOpt ) -> Result < AdminRpc , Error > {
let mut key = self . get_existing_key ( & query . key_pattern ) . await ? ;
key . name . update ( query . new_name . clone ( ) ) ;
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
2021-12-14 12:55:11 +00:00
}
async fn handle_delete_key ( & self , query : & KeyDeleteOpt ) -> Result < AdminRpc , Error > {
let mut key = self . get_existing_key ( & query . key_pattern ) . await ? ;
if ! query . yes {
return Err ( Error ::BadRpc (
" Add --yes flag to really perform this operation " . to_string ( ) ,
) ) ;
}
let state = key . state . as_option_mut ( ) . unwrap ( ) ;
// --- done checking, now commit ---
// 1. Delete local aliases
for ( alias , _ , to ) in state . local_aliases . items ( ) . iter ( ) {
if let Deletable ::Present ( bucket_id ) = to {
if let Some ( mut bucket ) = self . garage . bucket_table . get ( bucket_id , & EmptyKey ) . await ?
{
if let Deletable ::Present ( bucket_state ) = & mut bucket . state {
bucket_state . local_aliases = bucket_state
. local_aliases
. update_mutator ( ( key . key_id . to_string ( ) , alias . to_string ( ) ) , false ) ;
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2020-04-23 20:25:45 +00:00
}
2021-12-14 12:55:11 +00:00
} else {
// ignore
2020-04-23 20:25:45 +00:00
}
}
2021-12-14 12:55:11 +00:00
}
// 2. Delete authorized buckets
for ( ab_id , auth ) in state . authorized_buckets . items ( ) . iter ( ) {
if let Some ( bucket ) = self . garage . bucket_table . get ( ab_id , & EmptyKey ) . await ? {
let new_perm = BucketKeyPerm {
timestamp : increment_logical_clock ( auth . timestamp ) ,
allow_read : false ,
allow_write : false ,
2021-12-16 10:47:58 +00:00
allow_owner : false ,
2021-12-14 12:55:11 +00:00
} ;
if ! bucket . is_deleted ( ) {
self . update_bucket_key ( bucket , & key . key_id , new_perm )
. await ? ;
2021-03-18 18:24:59 +00:00
}
2021-12-14 12:55:11 +00:00
} else {
// ignore
2021-03-18 18:24:59 +00:00
}
2020-04-23 20:25:45 +00:00
}
2021-12-14 12:55:11 +00:00
// 3. Actually delete key
key . state = Deletable ::delete ( ) ;
self . garage . key_table . insert ( & key ) . await ? ;
Ok ( AdminRpc ::Ok ( format! (
" Key {} was deleted successfully. " ,
key . key_id
) ) )
2020-04-23 20:25:45 +00:00
}
2021-12-14 12:55:11 +00:00
async fn handle_import_key ( & self , query : & KeyImportOpt ) -> Result < AdminRpc , Error > {
let prev_key = self . garage . key_table . get ( & EmptyKey , & query . key_id ) . await ? ;
if prev_key . is_some ( ) {
return Err ( Error ::Message ( format! ( " Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry. " , query . key_id ) ) ) ;
}
let imported_key = Key ::import ( & query . key_id , & query . secret_key , & query . name ) ;
self . garage . key_table . insert ( & imported_key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( imported_key ) . await
2020-04-23 20:25:45 +00:00
}
2021-03-15 18:14:26 +00:00
async fn get_existing_key ( & self , pattern : & str ) -> Result < Key , Error > {
2021-03-15 22:14:12 +00:00
let candidates = self
. garage
2020-04-23 20:25:45 +00:00
. key_table
2021-03-15 22:14:12 +00:00
. get_range (
& EmptyKey ,
None ,
Some ( KeyFilter ::Matches ( pattern . to_string ( ) ) ) ,
10 ,
)
2020-04-23 20:25:45 +00:00
. await ?
2021-03-15 18:14:26 +00:00
. into_iter ( )
2021-12-14 12:55:11 +00:00
. filter ( | k | ! k . state . is_deleted ( ) )
2021-03-15 18:14:26 +00:00
. collect ::< Vec < _ > > ( ) ;
if candidates . len ( ) ! = 1 {
2021-03-15 22:14:12 +00:00
Err ( Error ::Message ( format! (
" {} matching keys " ,
candidates . len ( )
) ) )
2021-03-15 18:14:26 +00:00
} else {
Ok ( candidates . into_iter ( ) . next ( ) . unwrap ( ) )
}
2020-04-23 20:25:45 +00:00
}
2021-12-16 15:17:51 +00:00
async fn key_info_result ( & self , key : Key ) -> Result < AdminRpc , Error > {
let mut relevant_buckets = HashMap ::new ( ) ;
for ( id , _ ) in key
. state
. as_option ( )
. unwrap ( )
. authorized_buckets
. items ( )
. iter ( )
{
if let Some ( b ) = self . garage . bucket_table . get ( id , & EmptyKey ) . await ? {
relevant_buckets . insert ( * id , b ) ;
}
}
Ok ( AdminRpc ::KeyInfo ( key , relevant_buckets ) )
}
2020-12-12 16:00:31 +00:00
/// Update **key table** to inform of the new linked bucket
2020-04-23 20:25:45 +00:00
async fn update_key_bucket (
& self ,
2021-03-15 18:14:26 +00:00
key : & Key ,
2021-12-14 12:55:11 +00:00
bucket_id : Uuid ,
2020-04-23 20:25:45 +00:00
allow_read : bool ,
allow_write : bool ,
2021-12-16 10:47:58 +00:00
allow_owner : bool ,
2021-12-14 12:55:11 +00:00
) -> Result < BucketKeyPerm , Error > {
2021-03-15 18:14:26 +00:00
let mut key = key . clone ( ) ;
2021-12-14 12:55:11 +00:00
let mut key_state = key . state . as_option_mut ( ) . unwrap ( ) ;
let perm = key_state
. authorized_buckets
. get ( & bucket_id )
. cloned ( )
. map ( | old_perm | BucketKeyPerm {
timestamp : increment_logical_clock ( old_perm . timestamp ) ,
2020-11-20 22:01:12 +00:00
allow_read ,
allow_write ,
2021-12-16 10:47:58 +00:00
allow_owner ,
2021-12-14 12:55:11 +00:00
} )
. unwrap_or ( BucketKeyPerm {
timestamp : now_msec ( ) ,
allow_read ,
allow_write ,
2021-12-16 10:47:58 +00:00
allow_owner ,
2021-12-14 12:55:11 +00:00
} ) ;
key_state . authorized_buckets = Map ::put_mutator ( bucket_id , perm ) ;
2020-04-23 20:25:45 +00:00
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-14 12:55:11 +00:00
Ok ( perm )
}
/// Update **bucket table** to inform of the new linked key
async fn update_bucket_key (
& self ,
mut bucket : Bucket ,
key_id : & str ,
new_perm : BucketKeyPerm ,
) -> Result < ( ) , Error > {
bucket . state . as_option_mut ( ) . unwrap ( ) . authorized_keys =
Map ::put_mutator ( key_id . to_string ( ) , new_perm ) ;
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2020-04-23 20:25:45 +00:00
Ok ( ( ) )
2020-04-23 18:36:12 +00:00
}
2021-12-16 12:17:09 +00:00
async fn handle_migrate ( self : & Arc < Self > , opt : MigrateOpt ) -> Result < AdminRpc , Error > {
if ! opt . yes {
return Err ( Error ::BadRpc (
" Please provide the --yes flag to initiate migration operation. " . to_string ( ) ,
) ) ;
}
let m = Migrate {
garage : self . garage . clone ( ) ,
} ;
match opt . what {
MigrateWhat ::Buckets050 = > m . migrate_buckets050 ( ) . await ,
} ? ;
Ok ( AdminRpc ::Ok ( " Migration successfull. " . into ( ) ) )
}
2021-04-23 20:41:24 +00:00
async fn handle_launch_repair ( self : & Arc < Self > , opt : RepairOpt ) -> Result < AdminRpc , Error > {
2020-04-21 16:40:17 +00:00
if ! opt . yes {
2021-05-02 21:13:08 +00:00
return Err ( Error ::BadRpc (
2021-04-23 20:41:24 +00:00
" Please provide the --yes flag to initiate repair operations. " . to_string ( ) ,
) ) ;
2020-04-21 16:40:17 +00:00
}
if opt . all_nodes {
let mut opt_to_send = opt . clone ( ) ;
opt_to_send . all_nodes = false ;
2020-04-19 20:36:36 +00:00
let mut failures = vec! [ ] ;
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
2021-11-09 11:24:04 +00:00
for node in ring . layout . node_ids ( ) . iter ( ) {
2021-10-15 09:05:09 +00:00
let node = ( * node ) . into ( ) ;
let resp = self
2021-10-14 09:50:12 +00:00
. endpoint
2020-04-21 16:40:17 +00:00
. call (
2021-10-14 09:50:12 +00:00
& node ,
& AdminRpc ::LaunchRepair ( opt_to_send . clone ( ) ) ,
PRIO_NORMAL ,
2020-04-21 16:40:17 +00:00
)
2021-10-15 09:05:09 +00:00
. await ;
2021-10-19 14:16:10 +00:00
if ! matches! ( resp , Ok ( Ok ( _ ) ) ) {
2021-10-14 09:50:12 +00:00
failures . push ( node ) ;
2020-04-19 20:36:36 +00:00
}
}
if failures . is_empty ( ) {
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( " Repair launched on all nodes " . to_string ( ) ) )
2020-04-19 20:36:36 +00:00
} else {
Err ( Error ::Message ( format! (
" Could not launch repair on nodes: {:?} (launched successfully on other nodes) " ,
failures
) ) )
}
} else {
2020-04-23 18:36:12 +00:00
let repair = Repair {
garage : self . garage . clone ( ) ,
} ;
2020-04-19 21:27:08 +00:00
self . garage
. system
. background
2020-04-19 21:33:38 +00:00
. spawn_worker ( " Repair worker " . into ( ) , move | must_exit | async move {
2020-04-23 18:36:12 +00:00
repair . repair_worker ( opt , must_exit ) . await
2021-03-11 12:47:21 +00:00
} ) ;
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( format! (
2020-04-19 20:36:36 +00:00
" Repair launched on {:?} " ,
self . garage . system . id
) ) )
}
}
2021-03-12 14:40:54 +00:00
2021-04-23 20:41:24 +00:00
async fn handle_stats ( & self , opt : StatsOpt ) -> Result < AdminRpc , Error > {
2021-03-12 14:40:54 +00:00
if opt . all_nodes {
let mut ret = String ::new ( ) ;
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
2021-11-09 11:24:04 +00:00
for node in ring . layout . node_ids ( ) . iter ( ) {
2021-03-12 14:40:54 +00:00
let mut opt = opt . clone ( ) ;
opt . all_nodes = false ;
writeln! ( & mut ret , " \n ====================== " ) . unwrap ( ) ;
writeln! ( & mut ret , " Stats for node {:?}: " , node ) . unwrap ( ) ;
2021-10-15 09:05:09 +00:00
let node_id = ( * node ) . into ( ) ;
2021-03-12 14:40:54 +00:00
match self
2021-10-14 09:50:12 +00:00
. endpoint
2021-10-15 09:05:09 +00:00
. call ( & node_id , & AdminRpc ::Stats ( opt ) , PRIO_NORMAL )
. await ?
2021-03-12 14:40:54 +00:00
{
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( s ) ) = > writeln! ( & mut ret , " {} " , s ) . unwrap ( ) ,
2021-03-12 14:40:54 +00:00
Ok ( x ) = > writeln! ( & mut ret , " Bad answer: {:?} " , x ) . unwrap ( ) ,
Err ( e ) = > writeln! ( & mut ret , " Error: {} " , e ) . unwrap ( ) ,
}
}
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( ret ) )
2021-03-12 14:40:54 +00:00
} else {
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( self . gather_stats_local ( opt ) ) )
2021-03-12 14:40:54 +00:00
}
}
2021-04-23 20:41:24 +00:00
fn gather_stats_local ( & self , opt : StatsOpt ) -> String {
2021-03-12 14:40:54 +00:00
let mut ret = String ::new ( ) ;
2021-03-12 17:16:03 +00:00
writeln! (
& mut ret ,
" \n Garage version: {} " ,
2021-10-04 16:27:57 +00:00
option_env! ( " GIT_VERSION " ) . unwrap_or ( git_version ::git_version! (
2021-10-11 12:26:54 +00:00
prefix = " git: " ,
cargo_prefix = " cargo: " ,
fallback = " unknown "
2021-10-04 16:27:57 +00:00
) )
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2021-03-12 14:40:54 +00:00
// Gather ring statistics
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
let mut ring_nodes = HashMap ::new ( ) ;
2021-05-28 10:36:22 +00:00
for ( _i , loc ) in ring . partitions ( ) . iter ( ) {
for n in ring . get_nodes ( loc , ring . replication_factor ) . iter ( ) {
2021-03-12 14:40:54 +00:00
if ! ring_nodes . contains_key ( n ) {
ring_nodes . insert ( * n , 0 usize ) ;
}
* ring_nodes . get_mut ( n ) . unwrap ( ) + = 1 ;
}
}
writeln! ( & mut ret , " \n Ring nodes & partition count: " ) . unwrap ( ) ;
for ( n , c ) in ring_nodes . iter ( ) {
writeln! ( & mut ret , " {:?} {} " , n , c ) . unwrap ( ) ;
}
2021-04-23 20:41:24 +00:00
self . gather_table_stats ( & mut ret , & self . garage . bucket_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . key_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . object_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . version_table , & opt ) ;
self . gather_table_stats ( & mut ret , & self . garage . block_ref_table , & opt ) ;
2021-03-12 14:40:54 +00:00
writeln! ( & mut ret , " \n Block manager stats: " ) . unwrap ( ) ;
2021-03-16 15:35:10 +00:00
if opt . detailed {
writeln! (
& mut ret ,
2021-10-28 12:32:55 +00:00
" number of RC entries (~= number of blocks): {} " ,
2021-03-16 15:35:10 +00:00
self . garage . block_manager . rc_len ( )
)
. unwrap ( ) ;
}
2021-03-12 17:16:03 +00:00
writeln! (
& mut ret ,
" resync queue length: {} " ,
2021-03-15 18:51:16 +00:00
self . garage . block_manager . resync_queue_len ( )
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2021-03-12 14:40:54 +00:00
2021-04-23 20:41:24 +00:00
ret
2021-03-12 14:40:54 +00:00
}
2021-04-23 20:41:24 +00:00
fn gather_table_stats < F , R > ( & self , to : & mut String , t : & Arc < Table < F , R > > , opt : & StatsOpt )
2021-03-16 10:43:58 +00:00
where
F : TableSchema + 'static ,
R : TableReplication + 'static ,
{
2021-12-14 11:34:01 +00:00
writeln! ( to , " \n Table stats for {} " , F ::TABLE_NAME ) . unwrap ( ) ;
2021-03-16 15:35:10 +00:00
if opt . detailed {
writeln! ( to , " number of items: {} " , t . data . store . len ( ) ) . unwrap ( ) ;
writeln! (
to ,
" Merkle tree size: {} " ,
t . merkle_updater . merkle_tree_len ( )
)
. unwrap ( ) ;
}
2021-03-12 17:16:03 +00:00
writeln! (
to ,
" Merkle updater todo queue length: {} " ,
2021-03-16 10:43:58 +00:00
t . merkle_updater . todo_len ( )
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2021-03-15 22:14:12 +00:00
writeln! ( to , " GC todo queue length: {} " , t . data . gc_todo_len ( ) ) . unwrap ( ) ;
2021-03-12 14:40:54 +00:00
}
2021-10-15 09:05:09 +00:00
}
2021-10-14 09:50:12 +00:00
2021-10-15 09:05:09 +00:00
#[ async_trait ]
impl EndpointHandler < AdminRpc > for AdminRpcHandler {
async fn handle (
self : & Arc < Self > ,
message : & AdminRpc ,
_from : NodeID ,
) -> Result < AdminRpc , Error > {
match message {
2021-10-14 09:50:12 +00:00
AdminRpc ::BucketOperation ( bo ) = > self . handle_bucket_cmd ( bo ) . await ,
AdminRpc ::KeyOperation ( ko ) = > self . handle_key_cmd ( ko ) . await ,
2021-12-16 12:17:09 +00:00
AdminRpc ::Migrate ( opt ) = > self . handle_migrate ( opt . clone ( ) ) . await ,
2021-10-14 09:50:12 +00:00
AdminRpc ::LaunchRepair ( opt ) = > self . handle_launch_repair ( opt . clone ( ) ) . await ,
AdminRpc ::Stats ( opt ) = > self . handle_stats ( opt . clone ( ) ) . await ,
_ = > Err ( Error ::BadRpc ( " Invalid RPC " . to_string ( ) ) ) ,
}
}
}