2021-03-12 14:40:54 +00:00
use std ::collections ::HashMap ;
2021-03-12 17:16:03 +00:00
use std ::fmt ::Write ;
use std ::sync ::Arc ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
use async_trait ::async_trait ;
2020-04-19 15:15:48 +00:00
use serde ::{ Deserialize , Serialize } ;
2021-12-14 12:55:11 +00:00
use garage_util ::crdt ::* ;
use garage_util ::data ::* ;
2022-01-03 16:22:40 +00:00
use garage_util ::error ::Error as GarageError ;
2022-12-13 14:43:22 +00:00
use garage_util ::formater ::format_table_to_string ;
2021-12-14 12:55:11 +00:00
use garage_util ::time ::* ;
2020-04-23 17:05:46 +00:00
2021-03-12 14:40:54 +00:00
use garage_table ::replication ::* ;
2021-03-12 17:16:03 +00:00
use garage_table ::* ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
use garage_rpc ::* ;
2020-04-23 17:05:46 +00:00
2022-12-13 13:23:45 +00:00
use garage_block ::manager ::BlockResyncErrorInfo ;
2022-09-02 13:34:21 +00:00
use garage_block ::repair ::ScrubWorkerCommand ;
2021-12-14 12:55:11 +00:00
use garage_model ::bucket_alias_table ::* ;
2020-07-07 11:59:22 +00:00
use garage_model ::bucket_table ::* ;
use garage_model ::garage ::Garage ;
2022-01-03 12:58:05 +00:00
use garage_model ::helper ::error ::{ Error , OkOrBadRequest } ;
2020-07-07 11:59:22 +00:00
use garage_model ::key_table ::* ;
2021-12-16 12:17:09 +00:00
use garage_model ::migrate ::Migrate ;
2021-12-14 12:55:11 +00:00
use garage_model ::permission ::* ;
2022-12-13 14:02:42 +00:00
use garage_model ::s3 ::object_table ::* ;
2022-12-13 13:23:45 +00:00
use garage_model ::s3 ::version_table ::Version ;
2020-04-23 17:05:46 +00:00
2021-03-12 17:12:31 +00:00
use crate ::cli ::* ;
2022-07-08 11:30:26 +00:00
use crate ::repair ::online ::launch_online_repair ;
2020-04-19 15:15:48 +00:00
2021-10-14 09:50:12 +00:00
pub const ADMIN_RPC_PATH : & str = " garage/admin_rpc.rs/Rpc " ;
2020-04-19 15:15:48 +00:00
#[ derive(Debug, Serialize, Deserialize) ]
2022-06-15 18:20:28 +00:00
#[ allow(clippy::large_enum_variant) ]
2021-04-23 20:41:24 +00:00
pub enum AdminRpc {
2020-04-19 15:15:48 +00:00
BucketOperation ( BucketOperation ) ,
2020-04-23 18:36:12 +00:00
KeyOperation ( KeyOperation ) ,
2020-04-21 16:40:17 +00:00
LaunchRepair ( RepairOpt ) ,
2021-12-16 12:17:09 +00:00
Migrate ( MigrateOpt ) ,
2021-03-12 14:40:54 +00:00
Stats ( StatsOpt ) ,
2022-12-13 13:23:45 +00:00
Worker ( WorkerOperation ) ,
BlockOperation ( BlockOperation ) ,
2020-04-19 15:15:48 +00:00
// Replies
2020-04-19 17:59:59 +00:00
Ok ( String ) ,
2022-01-03 18:06:04 +00:00
BucketList ( Vec < Bucket > ) ,
2022-06-15 18:20:28 +00:00
BucketInfo {
bucket : Bucket ,
relevant_keys : HashMap < String , Key > ,
counters : HashMap < String , i64 > ,
} ,
2020-04-23 20:25:45 +00:00
KeyList ( Vec < ( String , String ) > ) ,
2021-12-16 15:17:51 +00:00
KeyInfo ( Key , HashMap < Uuid , Bucket > ) ,
2022-07-08 11:30:26 +00:00
WorkerList (
HashMap < usize , garage_util ::background ::WorkerInfo > ,
WorkerListOpt ,
) ,
2022-12-13 11:24:30 +00:00
WorkerInfo ( usize , garage_util ::background ::WorkerInfo ) ,
2022-12-13 13:23:45 +00:00
BlockErrorList ( Vec < BlockResyncErrorInfo > ) ,
BlockInfo {
hash : Hash ,
refcount : u64 ,
versions : Vec < Result < Version , Uuid > > ,
} ,
2020-04-19 15:15:48 +00:00
}
2021-10-15 09:05:09 +00:00
impl Rpc for AdminRpc {
type Response = Result < AdminRpc , Error > ;
2021-10-14 09:50:12 +00:00
}
2020-04-19 15:15:48 +00:00
pub struct AdminRpcHandler {
garage : Arc < Garage > ,
2021-10-14 09:50:12 +00:00
endpoint : Arc < Endpoint < AdminRpc , Self > > ,
2020-04-19 15:15:48 +00:00
}
impl AdminRpcHandler {
pub fn new ( garage : Arc < Garage > ) -> Arc < Self > {
2021-10-14 09:50:12 +00:00
let endpoint = garage . system . netapp . endpoint ( ADMIN_RPC_PATH . into ( ) ) ;
let admin = Arc ::new ( Self { garage , endpoint } ) ;
admin . endpoint . set_handler ( admin . clone ( ) ) ;
admin
2020-04-19 15:15:48 +00:00
}
2022-12-13 13:23:45 +00:00
// ================ BUCKET COMMANDS ====================
2021-10-14 09:50:12 +00:00
async fn handle_bucket_cmd ( & self , cmd : & BucketOperation ) -> Result < AdminRpc , Error > {
2020-04-19 15:15:48 +00:00
match cmd {
2021-12-14 12:55:11 +00:00
BucketOperation ::List = > self . handle_list_buckets ( ) . await ,
2021-12-16 15:17:51 +00:00
BucketOperation ::Info ( query ) = > self . handle_bucket_info ( query ) . await ,
2021-12-14 12:55:11 +00:00
BucketOperation ::Create ( query ) = > self . handle_create_bucket ( & query . name ) . await ,
BucketOperation ::Delete ( query ) = > self . handle_delete_bucket ( query ) . await ,
2021-12-15 17:36:15 +00:00
BucketOperation ::Alias ( query ) = > self . handle_alias_bucket ( query ) . await ,
BucketOperation ::Unalias ( query ) = > self . handle_unalias_bucket ( query ) . await ,
2021-12-14 12:55:11 +00:00
BucketOperation ::Allow ( query ) = > self . handle_bucket_allow ( query ) . await ,
BucketOperation ::Deny ( query ) = > self . handle_bucket_deny ( query ) . await ,
BucketOperation ::Website ( query ) = > self . handle_bucket_website ( query ) . await ,
2022-06-15 18:20:28 +00:00
BucketOperation ::SetQuotas ( query ) = > self . handle_bucket_set_quotas ( query ) . await ,
2022-11-04 10:55:59 +00:00
BucketOperation ::CleanupIncompleteUploads ( query ) = > {
self . handle_bucket_cleanup_incomplete_uploads ( query ) . await
}
2021-12-14 12:55:11 +00:00
}
}
async fn handle_list_buckets ( & self ) -> Result < AdminRpc , Error > {
2022-01-03 18:06:04 +00:00
let buckets = self
2021-12-14 12:55:11 +00:00
. garage
2022-01-03 18:06:04 +00:00
. bucket_table
2022-05-10 11:16:57 +00:00
. get_range (
& EmptyKey ,
None ,
Some ( DeletedFilter ::NotDeleted ) ,
10000 ,
EnumerationOrder ::Forward ,
)
2021-12-14 12:55:11 +00:00
. await ? ;
2022-06-15 18:20:28 +00:00
2022-01-03 18:06:04 +00:00
Ok ( AdminRpc ::BucketList ( buckets ) )
2021-12-14 12:55:11 +00:00
}
2020-12-15 11:48:24 +00:00
2021-12-16 15:17:51 +00:00
async fn handle_bucket_info ( & self , query : & BucketOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . name )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-16 15:17:51 +00:00
let bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
2022-06-15 18:20:28 +00:00
let counters = self
. garage
. object_counter_table
. table
. get ( & bucket_id , & EmptyKey )
. await ?
. map ( | x | x . filtered_values ( & self . garage . system . ring . borrow ( ) ) )
. unwrap_or_default ( ) ;
2021-12-16 15:17:51 +00:00
let mut relevant_keys = HashMap ::new ( ) ;
for ( k , _ ) in bucket
. state
. as_option ( )
. unwrap ( )
. authorized_keys
. items ( )
. iter ( )
{
2022-01-04 17:59:17 +00:00
if let Some ( key ) = self
. garage
. key_table
. get ( & EmptyKey , k )
. await ?
. filter ( | k | ! k . is_deleted ( ) )
{
2021-12-16 15:17:51 +00:00
relevant_keys . insert ( k . clone ( ) , key ) ;
}
}
for ( ( k , _ ) , _ , _ ) in bucket
. state
. as_option ( )
. unwrap ( )
. local_aliases
. items ( )
. iter ( )
{
if relevant_keys . contains_key ( k ) {
continue ;
}
if let Some ( key ) = self . garage . key_table . get ( & EmptyKey , k ) . await ? {
relevant_keys . insert ( k . clone ( ) , key ) ;
}
}
2022-06-15 18:20:28 +00:00
Ok ( AdminRpc ::BucketInfo {
bucket ,
relevant_keys ,
counters ,
} )
2021-12-16 15:17:51 +00:00
}
2021-12-14 12:55:11 +00:00
#[ allow(clippy::ptr_arg) ]
async fn handle_create_bucket ( & self , name : & String ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
if ! is_valid_bucket_name ( name ) {
return Err ( Error ::BadRequest ( format! (
" {}: {} " ,
name , INVALID_BUCKET_NAME_MESSAGE
) ) ) ;
}
if let Some ( alias ) = self . garage . bucket_alias_table . get ( & EmptyKey , name ) . await ? {
2022-01-03 17:32:15 +00:00
if alias . state . get ( ) . is_some ( ) {
2022-01-03 16:22:40 +00:00
return Err ( Error ::BadRequest ( format! ( " Bucket {} already exists " , name ) ) ) ;
2021-12-14 12:55:11 +00:00
}
2022-01-03 16:22:40 +00:00
}
// ---- done checking, now commit ----
let bucket = Bucket ::new ( ) ;
2021-12-14 12:55:11 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2022-01-03 16:22:40 +00:00
self . garage
. bucket_helper ( )
. set_global_bucket_alias ( bucket . id , name )
. await ? ;
2021-12-14 12:55:11 +00:00
Ok ( AdminRpc ::Ok ( format! ( " Bucket {} was created. " , name ) ) )
}
2020-12-15 11:48:24 +00:00
2021-12-14 12:55:11 +00:00
async fn handle_delete_bucket ( & self , query : & DeleteBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
let bucket_id = helper
. resolve_global_bucket_name ( & query . name )
. await ?
. ok_or_bad_request ( " Bucket not found " ) ? ;
// Get the alias, but keep in minde here the bucket name
// given in parameter can also be directly the bucket's ID.
// In that case bucket_alias will be None, and
// we can still delete the bucket if it has zero aliases
// (a condition which we try to prevent but that could still happen somehow).
// We just won't try to delete an alias entry because there isn't one.
let bucket_alias = self
2021-12-14 12:55:11 +00:00
. garage
. bucket_alias_table
. get ( & EmptyKey , & query . name )
2022-01-03 16:22:40 +00:00
. await ? ;
2021-12-14 12:55:11 +00:00
// Check bucket doesn't have other aliases
2022-01-03 16:22:40 +00:00
let mut bucket = helper . get_existing_bucket ( bucket_id ) . await ? ;
2021-12-14 12:55:11 +00:00
let bucket_state = bucket . state . as_option ( ) . unwrap ( ) ;
if bucket_state
. aliases
. items ( )
. iter ( )
. filter ( | ( _ , _ , active ) | * active )
. any ( | ( name , _ , _ ) | name ! = & query . name )
{
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! ( " Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one. " , query . name ) ) ) ;
2021-12-14 12:55:11 +00:00
}
if bucket_state
. local_aliases
. items ( )
. iter ( )
. any ( | ( _ , _ , active ) | * active )
{
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! ( " Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one. " , query . name ) ) ) ;
2021-12-14 12:55:11 +00:00
}
// Check bucket is empty
2022-05-24 10:16:39 +00:00
if ! helper . is_bucket_empty ( bucket_id ) . await ? {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! (
" Bucket {} is not empty " ,
query . name
) ) ) ;
2021-12-14 12:55:11 +00:00
}
if ! query . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-14 12:55:11 +00:00
" Add --yes flag to really perform this operation " . to_string ( ) ,
) ) ;
}
// --- done checking, now commit ---
// 1. delete authorization from keys that had access
for ( key_id , _ ) in bucket . authorized_keys ( ) {
2022-01-03 16:22:40 +00:00
helper
2022-01-04 17:59:17 +00:00
. set_bucket_key_permissions ( bucket . id , key_id , BucketKeyPerm ::NO_PERMISSIONS )
2022-01-03 16:22:40 +00:00
. await ? ;
2020-04-19 15:15:48 +00:00
}
2022-01-03 16:22:40 +00:00
2021-12-14 12:55:11 +00:00
// 2. delete bucket alias
2022-01-03 16:22:40 +00:00
if bucket_alias . is_some ( ) {
helper
2022-01-03 17:32:15 +00:00
. purge_global_bucket_alias ( bucket_id , & query . name )
2022-01-03 16:22:40 +00:00
. await ? ;
}
2021-12-17 10:53:13 +00:00
// 3. delete bucket
2021-12-14 12:55:11 +00:00
bucket . state = Deletable ::delete ( ) ;
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! ( " Bucket {} was deleted. " , query . name ) ) )
}
2021-12-15 17:36:15 +00:00
async fn handle_alias_bucket ( & self , query : & AliasBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
2022-05-24 10:16:39 +00:00
let key_helper = self . garage . key_helper ( ) ;
2022-01-03 16:22:40 +00:00
let bucket_id = helper
2021-12-15 17:36:15 +00:00
. resolve_global_bucket_name ( & query . existing_bucket )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
if let Some ( key_pattern ) = & query . local {
2022-05-24 10:16:39 +00:00
let key = key_helper . get_existing_matching_key ( key_pattern ) . await ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
helper
. set_local_bucket_alias ( bucket_id , & key . key_id , & query . new_name )
. await ? ;
2021-12-15 17:36:15 +00:00
Ok ( AdminRpc ::Ok ( format! (
2022-01-03 16:22:40 +00:00
" Alias {} now points to bucket {:?} in namespace of key {} " ,
2021-12-15 17:36:15 +00:00
query . new_name , bucket_id , key . key_id
) ) )
} else {
2022-01-03 16:22:40 +00:00
helper
. set_global_bucket_alias ( bucket_id , & query . new_name )
2021-12-17 10:53:13 +00:00
. await ? ;
2021-12-15 17:36:15 +00:00
Ok ( AdminRpc ::Ok ( format! (
2022-01-03 16:22:40 +00:00
" Alias {} now points to bucket {:?} " ,
2021-12-15 17:36:15 +00:00
query . new_name , bucket_id
) ) )
}
}
async fn handle_unalias_bucket ( & self , query : & UnaliasBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
2022-05-24 10:16:39 +00:00
let key_helper = self . garage . key_helper ( ) ;
2022-01-03 16:22:40 +00:00
if let Some ( key_pattern ) = & query . local {
2022-05-24 10:16:39 +00:00
let key = key_helper . get_existing_matching_key ( key_pattern ) . await ? ;
2021-12-15 17:36:15 +00:00
let bucket_id = key
. state
. as_option ( )
. unwrap ( )
. local_aliases
. get ( & query . name )
2022-01-03 17:32:15 +00:00
. cloned ( )
2021-12-15 17:36:15 +00:00
. flatten ( )
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
helper
. unset_local_bucket_alias ( bucket_id , & key . key_id , & query . name )
. await ? ;
2021-12-15 17:36:15 +00:00
Ok ( AdminRpc ::Ok ( format! (
2022-01-03 16:22:40 +00:00
" Alias {} no longer points to bucket {:?} in namespace of key {} " ,
& query . name , bucket_id , key . key_id
2021-12-15 17:36:15 +00:00
) ) )
} else {
2022-01-03 16:22:40 +00:00
let bucket_id = helper
2021-12-15 17:36:15 +00:00
. resolve_global_bucket_name ( & query . name )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
helper
. unset_global_bucket_alias ( bucket_id , & query . name )
. await ? ;
2021-12-15 17:36:15 +00:00
2022-01-03 16:22:40 +00:00
Ok ( AdminRpc ::Ok ( format! (
" Alias {} no longer points to bucket {:?} " ,
& query . name , bucket_id
) ) )
2021-12-15 17:36:15 +00:00
}
}
2021-12-14 12:55:11 +00:00
async fn handle_bucket_allow ( & self , query : & PermBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
2022-05-24 10:16:39 +00:00
let key_helper = self . garage . key_helper ( ) ;
2022-01-03 16:22:40 +00:00
let bucket_id = helper
2021-12-14 12:55:11 +00:00
. resolve_global_bucket_name ( & query . bucket )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2022-05-24 10:16:39 +00:00
let key = key_helper
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2021-12-14 12:55:11 +00:00
let allow_read = query . read | | key . allow_read ( & bucket_id ) ;
let allow_write = query . write | | key . allow_write ( & bucket_id ) ;
2021-12-16 10:47:58 +00:00
let allow_owner = query . owner | | key . allow_owner ( & bucket_id ) ;
2021-12-14 12:55:11 +00:00
2022-01-03 16:22:40 +00:00
helper
. set_bucket_key_permissions (
bucket_id ,
& key . key_id ,
BucketKeyPerm {
timestamp : now_msec ( ) ,
allow_read ,
allow_write ,
allow_owner ,
} ,
)
2021-12-14 12:55:11 +00:00
. await ? ;
Ok ( AdminRpc ::Ok ( format! (
2021-12-16 10:47:58 +00:00
" New permissions for {} on {}: read {}, write {}, owner {}. " ,
& key . key_id , & query . bucket , allow_read , allow_write , allow_owner
2021-12-14 12:55:11 +00:00
) ) )
}
async fn handle_bucket_deny ( & self , query : & PermBucketOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let helper = self . garage . bucket_helper ( ) ;
2022-05-24 10:16:39 +00:00
let key_helper = self . garage . key_helper ( ) ;
2022-01-03 16:22:40 +00:00
let bucket_id = helper
2021-12-14 12:55:11 +00:00
. resolve_global_bucket_name ( & query . bucket )
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2022-05-24 10:16:39 +00:00
let key = key_helper
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2021-12-14 12:55:11 +00:00
let allow_read = ! query . read & & key . allow_read ( & bucket_id ) ;
let allow_write = ! query . write & & key . allow_write ( & bucket_id ) ;
2021-12-16 10:47:58 +00:00
let allow_owner = ! query . owner & & key . allow_owner ( & bucket_id ) ;
2021-12-14 12:55:11 +00:00
2022-01-03 16:22:40 +00:00
helper
. set_bucket_key_permissions (
bucket_id ,
& key . key_id ,
BucketKeyPerm {
timestamp : now_msec ( ) ,
allow_read ,
allow_write ,
allow_owner ,
} ,
)
2021-12-14 12:55:11 +00:00
. await ? ;
Ok ( AdminRpc ::Ok ( format! (
2021-12-16 10:47:58 +00:00
" New permissions for {} on {}: read {}, write {}, owner {}. " ,
& key . key_id , & query . bucket , allow_read , allow_write , allow_owner
2021-12-14 12:55:11 +00:00
) ) )
}
async fn handle_bucket_website ( & self , query : & WebsiteOpt ) -> Result < AdminRpc , Error > {
2021-12-16 10:47:58 +00:00
let bucket_id = self
2021-12-14 12:55:11 +00:00
. garage
2021-12-16 10:47:58 +00:00
. bucket_helper ( )
. resolve_global_bucket_name ( & query . bucket )
2021-12-14 12:55:11 +00:00
. await ?
2022-01-03 12:58:05 +00:00
. ok_or_bad_request ( " Bucket not found " ) ? ;
2021-12-14 12:55:11 +00:00
2021-12-16 10:47:58 +00:00
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let bucket_state = bucket . state . as_option_mut ( ) . unwrap ( ) ;
2021-12-14 12:55:11 +00:00
if ! ( query . allow ^ query . deny ) {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-14 12:55:11 +00:00
" You must specify exactly one flag, either --allow or --deny " . to_string ( ) ,
) ) ;
}
2021-12-17 10:53:13 +00:00
let website = if query . allow {
2022-01-03 14:06:19 +00:00
Some ( WebsiteConfig {
2022-01-06 11:58:21 +00:00
index_document : query . index_document . clone ( ) ,
error_document : query . error_document . clone ( ) ,
2021-12-22 17:50:08 +00:00
} )
2021-12-17 10:53:13 +00:00
} else {
None
} ;
bucket_state . website_config . update ( website ) ;
2021-12-16 10:47:58 +00:00
self . garage . bucket_table . insert ( & bucket ) . await ? ;
2021-12-14 12:55:11 +00:00
let msg = if query . allow {
format! ( " Website access allowed for {} " , & query . bucket )
} else {
format! ( " Website access denied for {} " , & query . bucket )
} ;
Ok ( AdminRpc ::Ok ( msg ) )
2020-04-19 15:15:48 +00:00
}
2020-04-19 20:36:36 +00:00
2022-06-15 18:20:28 +00:00
async fn handle_bucket_set_quotas ( & self , query : & SetQuotasOpt ) -> Result < AdminRpc , Error > {
let bucket_id = self
. garage
. bucket_helper ( )
. resolve_global_bucket_name ( & query . bucket )
. await ?
. ok_or_bad_request ( " Bucket not found " ) ? ;
let mut bucket = self
. garage
. bucket_helper ( )
. get_existing_bucket ( bucket_id )
. await ? ;
let bucket_state = bucket . state . as_option_mut ( ) . unwrap ( ) ;
if query . max_size . is_none ( ) & & query . max_objects . is_none ( ) {
return Err ( Error ::BadRequest (
" You must specify either --max-size or --max-objects (or both) for this command to do something. " . to_string ( ) ,
) ) ;
}
let mut quotas = bucket_state . quotas . get ( ) . clone ( ) ;
match query . max_size . as_ref ( ) . map ( String ::as_ref ) {
Some ( " none " ) = > quotas . max_size = None ,
Some ( v ) = > {
let bs = v
. parse ::< bytesize ::ByteSize > ( )
. ok_or_bad_request ( format! ( " Invalid size specified: {} " , v ) ) ? ;
quotas . max_size = Some ( bs . as_u64 ( ) ) ;
}
_ = > ( ) ,
}
match query . max_objects . as_ref ( ) . map ( String ::as_ref ) {
Some ( " none " ) = > quotas . max_objects = None ,
Some ( v ) = > {
let mo = v
. parse ::< u64 > ( )
. ok_or_bad_request ( format! ( " Invalid number specified: {} " , v ) ) ? ;
quotas . max_objects = Some ( mo ) ;
}
_ = > ( ) ,
}
bucket_state . quotas . update ( quotas ) ;
self . garage . bucket_table . insert ( & bucket ) . await ? ;
Ok ( AdminRpc ::Ok ( format! (
" Quotas updated for {} " ,
& query . bucket
) ) )
}
2022-11-04 10:55:59 +00:00
async fn handle_bucket_cleanup_incomplete_uploads (
& self ,
query : & CleanupIncompleteUploadsOpt ,
) -> Result < AdminRpc , Error > {
let mut bucket_ids = vec! [ ] ;
for b in query . buckets . iter ( ) {
bucket_ids . push (
self . garage
. bucket_helper ( )
. resolve_global_bucket_name ( b )
. await ?
2022-11-04 15:07:33 +00:00
. ok_or_bad_request ( format! ( " Bucket not found: {} " , b ) ) ? ,
2022-11-04 10:55:59 +00:00
) ;
}
let duration = parse_duration ::parse ::parse ( & query . older_than )
2022-11-04 15:07:33 +00:00
. ok_or_bad_request ( " Invalid duration passed for --older-than parameter " ) ? ;
2022-11-04 10:55:59 +00:00
let mut ret = String ::new ( ) ;
for bucket in bucket_ids {
let count = self
. garage
. bucket_helper ( )
. cleanup_incomplete_uploads ( & bucket , duration )
. await ? ;
writeln! (
& mut ret ,
" Bucket {:?}: {} incomplete uploads aborted " ,
bucket , count
)
. unwrap ( ) ;
}
Ok ( AdminRpc ::Ok ( ret ) )
}
2022-12-13 13:23:45 +00:00
// ================ KEY COMMANDS ====================
2021-10-14 09:50:12 +00:00
async fn handle_key_cmd ( & self , cmd : & KeyOperation ) -> Result < AdminRpc , Error > {
2020-04-23 20:25:45 +00:00
match cmd {
2021-12-14 12:55:11 +00:00
KeyOperation ::List = > self . handle_list_keys ( ) . await ,
2021-12-16 15:17:51 +00:00
KeyOperation ::Info ( query ) = > self . handle_key_info ( query ) . await ,
2021-12-14 12:55:11 +00:00
KeyOperation ::New ( query ) = > self . handle_create_key ( query ) . await ,
KeyOperation ::Rename ( query ) = > self . handle_rename_key ( query ) . await ,
KeyOperation ::Delete ( query ) = > self . handle_delete_key ( query ) . await ,
2022-01-05 14:12:59 +00:00
KeyOperation ::Allow ( query ) = > self . handle_allow_key ( query ) . await ,
KeyOperation ::Deny ( query ) = > self . handle_deny_key ( query ) . await ,
2021-12-14 12:55:11 +00:00
KeyOperation ::Import ( query ) = > self . handle_import_key ( query ) . await ,
}
}
async fn handle_list_keys ( & self ) -> Result < AdminRpc , Error > {
let key_ids = self
. garage
. key_table
. get_range (
& EmptyKey ,
None ,
Some ( KeyFilter ::Deleted ( DeletedFilter ::NotDeleted ) ) ,
10000 ,
2022-05-10 11:16:57 +00:00
EnumerationOrder ::Forward ,
2021-12-14 12:55:11 +00:00
)
. await ?
. iter ( )
2022-01-04 17:59:17 +00:00
. map ( | k | ( k . key_id . to_string ( ) , k . params ( ) . unwrap ( ) . name . get ( ) . clone ( ) ) )
2021-12-14 12:55:11 +00:00
. collect ::< Vec < _ > > ( ) ;
Ok ( AdminRpc ::KeyList ( key_ids ) )
}
2021-12-16 15:17:51 +00:00
async fn handle_key_info ( & self , query : & KeyOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let key = self
. garage
2022-05-24 10:16:39 +00:00
. key_helper ( )
2022-01-03 16:22:40 +00:00
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
}
2021-12-14 12:55:11 +00:00
async fn handle_create_key ( & self , query : & KeyNewOpt ) -> Result < AdminRpc , Error > {
2022-01-04 17:59:17 +00:00
let key = Key ::new ( & query . name ) ;
2021-12-14 12:55:11 +00:00
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
2021-12-14 12:55:11 +00:00
}
async fn handle_rename_key ( & self , query : & KeyRenameOpt ) -> Result < AdminRpc , Error > {
2022-01-03 16:22:40 +00:00
let mut key = self
. garage
2022-05-24 10:16:39 +00:00
. key_helper ( )
2022-01-03 16:22:40 +00:00
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2022-01-04 17:59:17 +00:00
key . params_mut ( )
. unwrap ( )
. name
. update ( query . new_name . clone ( ) ) ;
2021-12-14 12:55:11 +00:00
self . garage . key_table . insert ( & key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( key ) . await
2021-12-14 12:55:11 +00:00
}
async fn handle_delete_key ( & self , query : & KeyDeleteOpt ) -> Result < AdminRpc , Error > {
2022-05-24 10:16:39 +00:00
let key_helper = self . garage . key_helper ( ) ;
2022-01-03 16:22:40 +00:00
2022-05-24 10:16:39 +00:00
let mut key = key_helper
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2022-01-03 16:22:40 +00:00
2021-12-14 12:55:11 +00:00
if ! query . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-14 12:55:11 +00:00
" Add --yes flag to really perform this operation " . to_string ( ) ,
) ) ;
}
2022-01-03 16:22:40 +00:00
2022-05-24 10:16:39 +00:00
key_helper . delete_key ( & mut key ) . await ? ;
2021-12-14 12:55:11 +00:00
Ok ( AdminRpc ::Ok ( format! (
" Key {} was deleted successfully. " ,
key . key_id
) ) )
2020-04-23 20:25:45 +00:00
}
2022-01-05 14:12:59 +00:00
async fn handle_allow_key ( & self , query : & KeyPermOpt ) -> Result < AdminRpc , Error > {
let mut key = self
. garage
2022-05-24 10:16:39 +00:00
. key_helper ( )
2022-01-05 14:12:59 +00:00
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2022-01-10 11:38:33 +00:00
if query . create_bucket {
key . params_mut ( ) . unwrap ( ) . allow_create_bucket . update ( true ) ;
}
2022-01-05 14:12:59 +00:00
self . garage . key_table . insert ( & key ) . await ? ;
self . key_info_result ( key ) . await
}
async fn handle_deny_key ( & self , query : & KeyPermOpt ) -> Result < AdminRpc , Error > {
let mut key = self
. garage
2022-05-24 10:16:39 +00:00
. key_helper ( )
2022-01-05 14:12:59 +00:00
. get_existing_matching_key ( & query . key_pattern )
. await ? ;
2022-01-10 11:38:33 +00:00
if query . create_bucket {
key . params_mut ( ) . unwrap ( ) . allow_create_bucket . update ( false ) ;
}
2022-01-05 14:12:59 +00:00
self . garage . key_table . insert ( & key ) . await ? ;
self . key_info_result ( key ) . await
}
2021-12-14 12:55:11 +00:00
async fn handle_import_key ( & self , query : & KeyImportOpt ) -> Result < AdminRpc , Error > {
let prev_key = self . garage . key_table . get ( & EmptyKey , & query . key_id ) . await ? ;
if prev_key . is_some ( ) {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest ( format! ( " Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry. " , query . key_id ) ) ) ;
2021-12-14 12:55:11 +00:00
}
let imported_key = Key ::import ( & query . key_id , & query . secret_key , & query . name ) ;
self . garage . key_table . insert ( & imported_key ) . await ? ;
2021-12-16 15:17:51 +00:00
self . key_info_result ( imported_key ) . await
2020-04-23 20:25:45 +00:00
}
2021-12-16 15:17:51 +00:00
async fn key_info_result ( & self , key : Key ) -> Result < AdminRpc , Error > {
let mut relevant_buckets = HashMap ::new ( ) ;
for ( id , _ ) in key
. state
. as_option ( )
. unwrap ( )
. authorized_buckets
. items ( )
. iter ( )
{
2022-01-03 18:06:04 +00:00
if let Some ( b ) = self . garage . bucket_table . get ( & EmptyKey , id ) . await ? {
2021-12-16 15:17:51 +00:00
relevant_buckets . insert ( * id , b ) ;
}
}
Ok ( AdminRpc ::KeyInfo ( key , relevant_buckets ) )
}
2022-12-13 13:23:45 +00:00
// ================ MIGRATION COMMANDS ====================
2021-12-16 12:17:09 +00:00
async fn handle_migrate ( self : & Arc < Self > , opt : MigrateOpt ) -> Result < AdminRpc , Error > {
if ! opt . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-12-16 12:17:09 +00:00
" Please provide the --yes flag to initiate migration operation. " . to_string ( ) ,
) ) ;
}
let m = Migrate {
garage : self . garage . clone ( ) ,
} ;
match opt . what {
MigrateWhat ::Buckets050 = > m . migrate_buckets050 ( ) . await ,
} ? ;
Ok ( AdminRpc ::Ok ( " Migration successfull. " . into ( ) ) )
}
2022-12-13 13:23:45 +00:00
// ================ REPAIR COMMANDS ====================
2021-04-23 20:41:24 +00:00
async fn handle_launch_repair ( self : & Arc < Self > , opt : RepairOpt ) -> Result < AdminRpc , Error > {
2020-04-21 16:40:17 +00:00
if ! opt . yes {
2022-01-03 12:58:05 +00:00
return Err ( Error ::BadRequest (
2021-04-23 20:41:24 +00:00
" Please provide the --yes flag to initiate repair operations. " . to_string ( ) ,
) ) ;
2020-04-21 16:40:17 +00:00
}
if opt . all_nodes {
let mut opt_to_send = opt . clone ( ) ;
opt_to_send . all_nodes = false ;
2020-04-19 20:36:36 +00:00
let mut failures = vec! [ ] ;
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
2021-11-09 11:24:04 +00:00
for node in ring . layout . node_ids ( ) . iter ( ) {
2021-10-15 09:05:09 +00:00
let node = ( * node ) . into ( ) ;
let resp = self
2021-10-14 09:50:12 +00:00
. endpoint
2020-04-21 16:40:17 +00:00
. call (
2021-10-14 09:50:12 +00:00
& node ,
2022-07-22 13:20:00 +00:00
AdminRpc ::LaunchRepair ( opt_to_send . clone ( ) ) ,
2021-10-14 09:50:12 +00:00
PRIO_NORMAL ,
2020-04-21 16:40:17 +00:00
)
2021-10-15 09:05:09 +00:00
. await ;
2021-10-19 14:16:10 +00:00
if ! matches! ( resp , Ok ( Ok ( _ ) ) ) {
2021-10-14 09:50:12 +00:00
failures . push ( node ) ;
2020-04-19 20:36:36 +00:00
}
}
if failures . is_empty ( ) {
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( " Repair launched on all nodes " . to_string ( ) ) )
2020-04-19 20:36:36 +00:00
} else {
2022-01-03 12:58:05 +00:00
Err ( Error ::BadRequest ( format! (
2020-04-19 20:36:36 +00:00
" Could not launch repair on nodes: {:?} (launched successfully on other nodes) " ,
failures
) ) )
}
} else {
2022-07-08 11:30:26 +00:00
launch_online_repair ( self . garage . clone ( ) , opt ) . await ;
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( format! (
2020-04-19 20:36:36 +00:00
" Repair launched on {:?} " ,
self . garage . system . id
) ) )
}
}
2021-03-12 14:40:54 +00:00
2022-12-13 13:23:45 +00:00
// ================ STATS COMMANDS ====================
2021-04-23 20:41:24 +00:00
async fn handle_stats ( & self , opt : StatsOpt ) -> Result < AdminRpc , Error > {
2021-03-12 14:40:54 +00:00
if opt . all_nodes {
let mut ret = String ::new ( ) ;
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
2021-11-09 11:24:04 +00:00
for node in ring . layout . node_ids ( ) . iter ( ) {
2021-03-12 14:40:54 +00:00
let mut opt = opt . clone ( ) ;
opt . all_nodes = false ;
writeln! ( & mut ret , " \n ====================== " ) . unwrap ( ) ;
writeln! ( & mut ret , " Stats for node {:?}: " , node ) . unwrap ( ) ;
2021-10-15 09:05:09 +00:00
let node_id = ( * node ) . into ( ) ;
2021-03-12 14:40:54 +00:00
match self
2021-10-14 09:50:12 +00:00
. endpoint
2022-07-22 13:20:00 +00:00
. call ( & node_id , AdminRpc ::Stats ( opt ) , PRIO_NORMAL )
2021-10-15 09:05:09 +00:00
. await ?
2021-03-12 14:40:54 +00:00
{
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( s ) ) = > writeln! ( & mut ret , " {} " , s ) . unwrap ( ) ,
2021-03-12 14:40:54 +00:00
Ok ( x ) = > writeln! ( & mut ret , " Bad answer: {:?} " , x ) . unwrap ( ) ,
Err ( e ) = > writeln! ( & mut ret , " Error: {} " , e ) . unwrap ( ) ,
}
}
2021-04-23 20:41:24 +00:00
Ok ( AdminRpc ::Ok ( ret ) )
2021-03-12 14:40:54 +00:00
} else {
2022-06-08 08:01:44 +00:00
Ok ( AdminRpc ::Ok ( self . gather_stats_local ( opt ) ? ) )
2021-03-12 14:40:54 +00:00
}
}
2022-06-08 08:01:44 +00:00
fn gather_stats_local ( & self , opt : StatsOpt ) -> Result < String , Error > {
2021-03-12 14:40:54 +00:00
let mut ret = String ::new ( ) ;
2021-03-12 17:16:03 +00:00
writeln! (
& mut ret ,
2022-09-07 15:05:21 +00:00
" \n Garage version: {} [features: {}] " ,
2022-09-07 16:36:46 +00:00
garage_util ::version ::garage_version ( ) ,
garage_util ::version ::garage_features ( )
2022-09-07 15:05:21 +00:00
. map ( | list | list . join ( " , " ) )
. unwrap_or_else ( | | " (unknown) " . into ( ) ) ,
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2022-12-13 14:43:22 +00:00
2022-06-08 08:01:44 +00:00
writeln! ( & mut ret , " \n Database engine: {} " , self . garage . db . engine ( ) ) . unwrap ( ) ;
2021-03-12 14:40:54 +00:00
// Gather ring statistics
let ring = self . garage . system . ring . borrow ( ) . clone ( ) ;
let mut ring_nodes = HashMap ::new ( ) ;
2021-05-28 10:36:22 +00:00
for ( _i , loc ) in ring . partitions ( ) . iter ( ) {
for n in ring . get_nodes ( loc , ring . replication_factor ) . iter ( ) {
2021-03-12 14:40:54 +00:00
if ! ring_nodes . contains_key ( n ) {
ring_nodes . insert ( * n , 0 usize ) ;
}
* ring_nodes . get_mut ( n ) . unwrap ( ) + = 1 ;
}
}
writeln! ( & mut ret , " \n Ring nodes & partition count: " ) . unwrap ( ) ;
for ( n , c ) in ring_nodes . iter ( ) {
writeln! ( & mut ret , " {:?} {} " , n , c ) . unwrap ( ) ;
}
2022-12-13 14:43:22 +00:00
// Gather table statistics
let mut table = vec! [ " Table \t Items \t MklItems \t MklTodo \t GcTodo " . into ( ) ] ;
table . push ( self . gather_table_stats ( & self . garage . bucket_table , opt . detailed ) ? ) ;
table . push ( self . gather_table_stats ( & self . garage . key_table , opt . detailed ) ? ) ;
table . push ( self . gather_table_stats ( & self . garage . object_table , opt . detailed ) ? ) ;
table . push ( self . gather_table_stats ( & self . garage . version_table , opt . detailed ) ? ) ;
table . push ( self . gather_table_stats ( & self . garage . block_ref_table , opt . detailed ) ? ) ;
write! (
& mut ret ,
" \n Table stats: \n {} " ,
format_table_to_string ( table )
)
. unwrap ( ) ;
2021-03-12 14:40:54 +00:00
2022-12-13 14:43:22 +00:00
// Gather block manager statistics
2021-03-12 14:40:54 +00:00
writeln! ( & mut ret , " \n Block manager stats: " ) . unwrap ( ) ;
2022-12-13 14:43:22 +00:00
let rc_len = if opt . detailed {
self . garage . block_manager . rc_len ( ) ? . to_string ( )
} else {
self . garage
. block_manager
. rc_fast_len ( ) ?
. map ( | x | x . to_string ( ) )
. unwrap_or_else ( | | " NC " . into ( ) )
} ;
writeln! (
& mut ret ,
" number of RC entries (~= number of blocks): {} " ,
rc_len
)
. unwrap ( ) ;
2021-03-12 17:16:03 +00:00
writeln! (
& mut ret ,
" resync queue length: {} " ,
2022-09-02 14:47:15 +00:00
self . garage . block_manager . resync . queue_len ( ) ?
2021-03-12 17:16:03 +00:00
)
. unwrap ( ) ;
2022-03-28 13:47:23 +00:00
writeln! (
& mut ret ,
" blocks with resync errors: {} " ,
2022-09-02 14:47:15 +00:00
self . garage . block_manager . resync . errors_len ( ) ?
2022-03-28 13:47:23 +00:00
)
. unwrap ( ) ;
2021-03-12 14:40:54 +00:00
2022-12-13 14:43:22 +00:00
if ! opt . detailed {
writeln! ( & mut ret , " \n If values are missing (marked as NC), consider adding the --detailed flag - this will be slow. " ) . unwrap ( ) ;
}
2022-06-08 08:01:44 +00:00
Ok ( ret )
2021-03-12 14:40:54 +00:00
}
2022-06-08 08:01:44 +00:00
fn gather_table_stats < F , R > (
& self ,
t : & Arc < Table < F , R > > ,
2022-12-13 14:43:22 +00:00
detailed : bool ,
) -> Result < String , Error >
2021-03-16 10:43:58 +00:00
where
F : TableSchema + 'static ,
R : TableReplication + 'static ,
{
2022-12-13 14:43:22 +00:00
let ( data_len , mkl_len ) = if detailed {
(
t . data . store . len ( ) . map_err ( GarageError ::from ) ? . to_string ( ) ,
t . merkle_updater . merkle_tree_len ( ) ? . to_string ( ) ,
2022-06-08 08:01:44 +00:00
)
2022-12-13 14:43:22 +00:00
} else {
(
t . data
. store
. fast_len ( )
. map_err ( GarageError ::from ) ?
. map ( | x | x . to_string ( ) )
. unwrap_or_else ( | | " NC " . into ( ) ) ,
t . merkle_updater
. merkle_tree_fast_len ( ) ?
. map ( | x | x . to_string ( ) )
. unwrap_or_else ( | | " NC " . into ( ) ) ,
2021-03-16 15:35:10 +00:00
)
2022-12-13 14:43:22 +00:00
} ;
2022-06-08 08:01:44 +00:00
2022-12-13 14:43:22 +00:00
Ok ( format! (
" {} \t {} \t {} \t {} \t {} " ,
F ::TABLE_NAME ,
data_len ,
mkl_len ,
t . merkle_updater . todo_len ( ) ? ,
t . data . gc_todo_len ( ) ?
) )
2021-03-12 14:40:54 +00:00
}
2022-07-08 11:30:26 +00:00
2022-12-13 13:23:45 +00:00
// ================ WORKER COMMANDS ====================
2022-07-08 11:30:26 +00:00
2022-12-13 13:23:45 +00:00
async fn handle_worker_cmd ( & self , cmd : & WorkerOperation ) -> Result < AdminRpc , Error > {
match cmd {
WorkerOperation ::List { opt } = > {
2022-07-08 11:30:26 +00:00
let workers = self . garage . background . get_worker_info ( ) ;
2022-12-13 13:23:45 +00:00
Ok ( AdminRpc ::WorkerList ( workers , * opt ) )
2022-07-08 11:30:26 +00:00
}
2022-12-13 13:23:45 +00:00
WorkerOperation ::Info { tid } = > {
2022-12-13 11:24:30 +00:00
let info = self
. garage
. background
. get_worker_info ( )
2022-12-13 13:23:45 +00:00
. get ( tid )
2022-12-13 11:24:30 +00:00
. ok_or_bad_request ( format! ( " No worker with TID {} " , tid ) ) ?
. clone ( ) ;
2022-12-13 13:23:45 +00:00
Ok ( AdminRpc ::WorkerInfo ( * tid , info ) )
2022-12-13 11:24:30 +00:00
}
2022-12-13 13:23:45 +00:00
WorkerOperation ::Set { opt } = > match opt {
2022-09-02 13:34:21 +00:00
WorkerSetCmd ::ScrubTranquility { tranquility } = > {
2022-12-13 13:23:45 +00:00
let scrub_command = ScrubWorkerCommand ::SetTranquility ( * tranquility ) ;
2022-09-02 13:34:21 +00:00
self . garage
. block_manager
. send_scrub_command ( scrub_command )
. await ;
Ok ( AdminRpc ::Ok ( " Scrub tranquility updated " . into ( ) ) )
}
2022-12-13 10:44:11 +00:00
WorkerSetCmd ::ResyncWorkerCount { worker_count } = > {
2022-09-02 15:18:13 +00:00
self . garage
. block_manager
. resync
2022-12-13 13:23:45 +00:00
. set_n_workers ( * worker_count )
2022-09-02 15:18:13 +00:00
. await ? ;
Ok ( AdminRpc ::Ok ( " Number of resync workers updated " . into ( ) ) )
}
2022-09-02 13:34:21 +00:00
WorkerSetCmd ::ResyncTranquility { tranquility } = > {
self . garage
. block_manager
2022-09-02 14:47:15 +00:00
. resync
2022-12-13 13:23:45 +00:00
. set_tranquility ( * tranquility )
2022-09-02 13:34:21 +00:00
. await ? ;
Ok ( AdminRpc ::Ok ( " Resync tranquility updated " . into ( ) ) )
}
} ,
2022-07-08 11:30:26 +00:00
}
}
2022-12-13 13:23:45 +00:00
// ================ BLOCK COMMANDS ====================
async fn handle_block_cmd ( & self , cmd : & BlockOperation ) -> Result < AdminRpc , Error > {
match cmd {
BlockOperation ::ListErrors = > Ok ( AdminRpc ::BlockErrorList (
self . garage . block_manager . list_resync_errors ( ) ? ,
) ) ,
BlockOperation ::Info { hash } = > {
let hash = hex ::decode ( hash ) . ok_or_bad_request ( " invalid hash " ) ? ;
let hash = Hash ::try_from ( & hash ) . ok_or_bad_request ( " invalid hash " ) ? ;
let refcount = self . garage . block_manager . get_block_rc ( & hash ) ? ;
let block_refs = self
. garage
. block_ref_table
. get_range ( & hash , None , None , 10000 , Default ::default ( ) )
. await ? ;
let mut versions = vec! [ ] ;
for br in block_refs {
if let Some ( v ) = self
. garage
. version_table
. get ( & br . version , & EmptyKey )
. await ?
{
versions . push ( Ok ( v ) ) ;
} else {
versions . push ( Err ( br . version ) ) ;
}
}
Ok ( AdminRpc ::BlockInfo {
hash ,
refcount ,
versions ,
} )
}
2022-12-13 14:02:42 +00:00
BlockOperation ::RetryNow { all , blocks } = > {
if * all {
if ! blocks . is_empty ( ) {
return Err ( GarageError ::Message (
" --all was specified, cannot also specify blocks " . into ( ) ,
)
. into ( ) ) ;
}
let blocks = self . garage . block_manager . list_resync_errors ( ) ? ;
for b in blocks . iter ( ) {
self . garage . block_manager . resync . clear_backoff ( & b . hash ) ? ;
}
Ok ( AdminRpc ::Ok ( format! (
" {} blocks returned in queue for a retry now (check logs to see results) " ,
blocks . len ( )
) ) )
} else {
for hash in blocks {
let hash = hex ::decode ( hash ) . ok_or_bad_request ( " invalid hash " ) ? ;
let hash = Hash ::try_from ( & hash ) . ok_or_bad_request ( " invalid hash " ) ? ;
self . garage . block_manager . resync . clear_backoff ( & hash ) ? ;
}
Ok ( AdminRpc ::Ok ( format! (
" {} blocks returned in queue for a retry now (check logs to see results) " ,
blocks . len ( )
) ) )
}
2022-12-13 13:23:45 +00:00
}
2022-12-13 14:02:42 +00:00
BlockOperation ::Purge { yes , blocks } = > {
if ! yes {
return Err ( GarageError ::Message (
" Pass the --yes flag to confirm block purge operation. " . into ( ) ,
)
. into ( ) ) ;
}
let mut obj_dels = 0 ;
let mut ver_dels = 0 ;
for hash in blocks {
let hash = hex ::decode ( hash ) . ok_or_bad_request ( " invalid hash " ) ? ;
let hash = Hash ::try_from ( & hash ) . ok_or_bad_request ( " invalid hash " ) ? ;
let block_refs = self
. garage
. block_ref_table
. get_range ( & hash , None , None , 10000 , Default ::default ( ) )
. await ? ;
for br in block_refs {
let version = match self
. garage
. version_table
. get ( & br . version , & EmptyKey )
. await ?
{
Some ( v ) = > v ,
None = > continue ,
} ;
if let Some ( object ) = self
. garage
. object_table
. get ( & version . bucket_id , & version . key )
. await ?
{
let ov = object . versions ( ) . iter ( ) . rev ( ) . find ( | v | v . is_complete ( ) ) ;
if let Some ( ov ) = ov {
if ov . uuid = = br . version {
let del_uuid = gen_uuid ( ) ;
let deleted_object = Object ::new (
version . bucket_id ,
version . key . clone ( ) ,
vec! [ ObjectVersion {
uuid : del_uuid ,
timestamp : ov . timestamp + 1 ,
state : ObjectVersionState ::Complete (
ObjectVersionData ::DeleteMarker ,
) ,
} ] ,
) ;
self . garage . object_table . insert ( & deleted_object ) . await ? ;
obj_dels + = 1 ;
}
}
}
if ! version . deleted . get ( ) {
let deleted_version = Version ::new (
version . uuid ,
version . bucket_id ,
version . key . clone ( ) ,
true ,
) ;
self . garage . version_table . insert ( & deleted_version ) . await ? ;
ver_dels + = 1 ;
}
}
}
Ok ( AdminRpc ::Ok ( format! (
" {} blocks were purged: {} object deletion markers added, {} versions marked deleted " ,
blocks . len ( ) ,
obj_dels ,
ver_dels
) ) )
2022-12-13 13:23:45 +00:00
}
}
}
2021-10-15 09:05:09 +00:00
}
2021-10-14 09:50:12 +00:00
2021-10-15 09:05:09 +00:00
#[ async_trait ]
impl EndpointHandler < AdminRpc > for AdminRpcHandler {
async fn handle (
self : & Arc < Self > ,
message : & AdminRpc ,
_from : NodeID ,
) -> Result < AdminRpc , Error > {
match message {
2021-10-14 09:50:12 +00:00
AdminRpc ::BucketOperation ( bo ) = > self . handle_bucket_cmd ( bo ) . await ,
AdminRpc ::KeyOperation ( ko ) = > self . handle_key_cmd ( ko ) . await ,
2021-12-16 12:17:09 +00:00
AdminRpc ::Migrate ( opt ) = > self . handle_migrate ( opt . clone ( ) ) . await ,
2021-10-14 09:50:12 +00:00
AdminRpc ::LaunchRepair ( opt ) = > self . handle_launch_repair ( opt . clone ( ) ) . await ,
AdminRpc ::Stats ( opt ) = > self . handle_stats ( opt . clone ( ) ) . await ,
2022-12-13 13:23:45 +00:00
AdminRpc ::Worker ( wo ) = > self . handle_worker_cmd ( wo ) . await ,
AdminRpc ::BlockOperation ( bo ) = > self . handle_block_cmd ( bo ) . await ,
2022-01-03 12:58:05 +00:00
m = > Err ( GarageError ::unexpected_rpc_message ( m ) . into ( ) ) ,
2021-10-14 09:50:12 +00:00
}
}
}