Merge pull request 'admin refactoring: refactor CLI to use Admin API requests (step 2)' (#943) from refactor-admin into next-v2
All checks were successful
ci/woodpecker/push/debug Pipeline was successful

Reviewed-on: #943
This commit is contained in:
Alex 2025-01-30 16:18:38 +00:00
commit 3192088aac
19 changed files with 1485 additions and 1414 deletions

View file

@ -1,3 +1,4 @@
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::Arc;
@ -77,18 +78,18 @@ admin_endpoints![
// because they directly produce an http::Response
// **********************************************
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct OptionsRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CheckDomainRequest {
pub domain: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct HealthRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MetricsRequest;
// **********************************************
@ -97,10 +98,10 @@ pub struct MetricsRequest;
// ---- GetClusterStatus ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterStatusRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetClusterStatusResponse {
pub node: String,
@ -112,7 +113,7 @@ pub struct GetClusterStatusResponse {
pub nodes: Vec<NodeResp>,
}
#[derive(Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct NodeResp {
pub id: String,
@ -128,7 +129,7 @@ pub struct NodeResp {
pub metadata_partition: Option<FreeSpaceResp>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NodeRoleResp {
pub id: String,
@ -137,7 +138,7 @@ pub struct NodeRoleResp {
pub tags: Vec<String>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct FreeSpaceResp {
pub available: u64,
@ -146,7 +147,7 @@ pub struct FreeSpaceResp {
// ---- GetClusterHealth ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterHealthRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -167,10 +168,10 @@ pub struct GetClusterHealthResponse {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectClusterNodesRequest(pub Vec<String>);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ConnectClusterNodesResponse(pub Vec<ConnectNodeResponse>);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ConnectNodeResponse {
pub success: bool,
@ -179,10 +180,10 @@ pub struct ConnectNodeResponse {
// ---- GetClusterLayout ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterLayoutRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetClusterLayoutResponse {
pub version: u64,
@ -190,7 +191,7 @@ pub struct GetClusterLayoutResponse {
pub staged_role_changes: Vec<NodeRoleChange>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct NodeRoleChange {
pub id: String,
@ -198,7 +199,7 @@ pub struct NodeRoleChange {
pub action: NodeRoleChangeEnum,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum NodeRoleChangeEnum {
#[serde(rename_all = "camelCase")]
@ -213,21 +214,21 @@ pub enum NodeRoleChangeEnum {
// ---- UpdateClusterLayout ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateClusterLayoutRequest(pub Vec<NodeRoleChange>);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- ApplyClusterLayout ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ApplyClusterLayoutRequest {
pub version: u64,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ApplyClusterLayoutResponse {
pub message: Vec<String>,
@ -236,10 +237,10 @@ pub struct ApplyClusterLayoutResponse {
// ---- RevertClusterLayout ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevertClusterLayoutRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse);
// **********************************************
@ -248,13 +249,13 @@ pub struct RevertClusterLayoutResponse(pub GetClusterLayoutResponse);
// ---- ListKeys ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListKeysRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListKeysResponse(pub Vec<ListKeysResponseItem>);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListKeysResponseItem {
pub id: String,
@ -263,14 +264,14 @@ pub struct ListKeysResponseItem {
// ---- GetKeyInfo ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetKeyInfoRequest {
pub id: Option<String>,
pub search: Option<String>,
pub show_secret_key: bool,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetKeyInfoResponse {
pub name: String,
@ -281,14 +282,14 @@ pub struct GetKeyInfoResponse {
pub buckets: Vec<KeyInfoBucketResponse>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct KeyPerm {
#[serde(default)]
pub create_bucket: bool,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct KeyInfoBucketResponse {
pub id: String,
@ -297,7 +298,7 @@ pub struct KeyInfoBucketResponse {
pub permissions: ApiBucketKeyPerm,
}
#[derive(Serialize, Deserialize, Default)]
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct ApiBucketKeyPerm {
#[serde(default)]
@ -310,18 +311,18 @@ pub struct ApiBucketKeyPerm {
// ---- CreateKey ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateKeyRequest {
pub name: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateKeyResponse(pub GetKeyInfoResponse);
// ---- ImportKey ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ImportKeyRequest {
pub access_key_id: String,
@ -329,21 +330,21 @@ pub struct ImportKeyRequest {
pub name: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ImportKeyResponse(pub GetKeyInfoResponse);
// ---- UpdateKey ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateKeyRequest {
pub id: String,
pub body: UpdateKeyRequestBody,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateKeyResponse(pub GetKeyInfoResponse);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateKeyRequestBody {
pub name: Option<String>,
@ -353,12 +354,12 @@ pub struct UpdateKeyRequestBody {
// ---- DeleteKey ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteKeyRequest {
pub id: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteKeyResponse;
// **********************************************
@ -367,13 +368,13 @@ pub struct DeleteKeyResponse;
// ---- ListBuckets ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListBucketsRequest;
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ListBucketsResponse(pub Vec<ListBucketsResponseItem>);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ListBucketsResponseItem {
pub id: String,
@ -381,7 +382,7 @@ pub struct ListBucketsResponseItem {
pub local_aliases: Vec<BucketLocalAlias>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketLocalAlias {
pub access_key_id: String,
@ -390,13 +391,14 @@ pub struct BucketLocalAlias {
// ---- GetBucketInfo ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetBucketInfoRequest {
pub id: Option<String>,
pub global_alias: Option<String>,
pub search: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetBucketInfoResponse {
pub id: String,
@ -414,14 +416,14 @@ pub struct GetBucketInfoResponse {
pub quotas: ApiBucketQuotas,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetBucketInfoWebsiteResponse {
pub index_document: String,
pub error_document: Option<String>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct GetBucketInfoKey {
pub access_key_id: String,
@ -430,7 +432,7 @@ pub struct GetBucketInfoKey {
pub bucket_local_aliases: Vec<String>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct ApiBucketQuotas {
pub max_size: Option<u64>,
@ -439,17 +441,17 @@ pub struct ApiBucketQuotas {
// ---- CreateBucket ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateBucketRequest {
pub global_alias: Option<String>,
pub local_alias: Option<CreateBucketLocalAlias>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CreateBucketResponse(pub GetBucketInfoResponse);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct CreateBucketLocalAlias {
pub access_key_id: String,
@ -460,23 +462,23 @@ pub struct CreateBucketLocalAlias {
// ---- UpdateBucket ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateBucketRequest {
pub id: String,
pub body: UpdateBucketRequestBody,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct UpdateBucketResponse(pub GetBucketInfoResponse);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateBucketRequestBody {
pub website_access: Option<UpdateBucketWebsiteAccess>,
pub quotas: Option<ApiBucketQuotas>,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct UpdateBucketWebsiteAccess {
pub enabled: bool,
@ -486,12 +488,12 @@ pub struct UpdateBucketWebsiteAccess {
// ---- DeleteBucket ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteBucketRequest {
pub id: String,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteBucketResponse;
// **********************************************
@ -500,13 +502,13 @@ pub struct DeleteBucketResponse;
// ---- AllowBucketKey ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllowBucketKeyRequest(pub BucketKeyPermChangeRequest);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AllowBucketKeyResponse(pub GetBucketInfoResponse);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BucketKeyPermChangeRequest {
pub bucket_id: String,
@ -516,10 +518,10 @@ pub struct BucketKeyPermChangeRequest {
// ---- DenyBucketKey ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DenyBucketKeyRequest(pub BucketKeyPermChangeRequest);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DenyBucketKeyResponse(pub GetBucketInfoResponse);
// **********************************************
@ -528,7 +530,7 @@ pub struct DenyBucketKeyResponse(pub GetBucketInfoResponse);
// ---- AddBucketAlias ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct AddBucketAliasRequest {
pub bucket_id: String,
@ -536,10 +538,10 @@ pub struct AddBucketAliasRequest {
pub alias: BucketAliasEnum,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AddBucketAliasResponse(pub GetBucketInfoResponse);
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum BucketAliasEnum {
#[serde(rename_all = "camelCase")]
@ -553,7 +555,7 @@ pub enum BucketAliasEnum {
// ---- RemoveBucketAlias ----
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct RemoveBucketAliasRequest {
pub bucket_id: String,
@ -561,5 +563,5 @@ pub struct RemoveBucketAliasRequest {
pub alias: BucketAliasEnum,
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);

View file

@ -73,16 +73,22 @@ impl EndpointHandler for GetBucketInfoRequest {
type Response = GetBucketInfoResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias) {
(Some(id), None) => parse_bucket_id(&id)?,
(None, Some(ga)) => garage
let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage
.bucket_helper()
.resolve_global_bucket_name(&ga)
.await?
.ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
(None, None, Some(search)) => {
garage
.bucket_helper()
.admin_get_existing_matching_bucket(&search)
.await?
}
_ => {
return Err(Error::bad_request(
"Either id or globalAlias must be provided (but not both)",
"Either id, globalAlias or search must be provided (but not several of them)",
));
}
};

View file

@ -56,7 +56,7 @@ impl From<HelperError> for Error {
impl CommonErrorDerivative for Error {}
impl Error {
fn code(&self) -> &'static str {
pub fn code(&self) -> &'static str {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",

View file

@ -4,7 +4,7 @@ macro_rules! admin_endpoints {
$($endpoint:ident,)*
] => {
paste! {
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum AdminApiRequest {
$(
$special_endpoint( [<$special_endpoint Request>] ),
@ -14,7 +14,7 @@ macro_rules! admin_endpoints {
)*
}
#[derive(Serialize)]
#[derive(Debug, Clone, Serialize)]
#[serde(untagged)]
pub enum AdminApiResponse {
$(
@ -22,7 +22,7 @@ macro_rules! admin_endpoints {
)*
}
#[derive(Serialize, Deserialize)]
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum TaggedAdminApiResponse {
$(
$endpoint( [<$endpoint Response>] ),
@ -43,7 +43,7 @@ macro_rules! admin_endpoints {
}
impl AdminApiResponse {
fn tagged(self) -> TaggedAdminApiResponse {
pub fn tagged(self) -> TaggedAdminApiResponse {
match self {
$(
Self::$endpoint(res) => TaggedAdminApiResponse::$endpoint(res),
@ -52,6 +52,24 @@ macro_rules! admin_endpoints {
}
}
$(
impl From< [< $endpoint Request >] > for AdminApiRequest {
fn from(req: [< $endpoint Request >]) -> AdminApiRequest {
AdminApiRequest::$endpoint(req)
}
}
impl TryFrom<TaggedAdminApiResponse> for [< $endpoint Response >] {
type Error = TaggedAdminApiResponse;
fn try_from(resp: TaggedAdminApiResponse) -> Result< [< $endpoint Response >], TaggedAdminApiResponse> {
match resp {
TaggedAdminApiResponse::$endpoint(v) => Ok(v),
x => Err(x),
}
}
}
)*
#[async_trait]
impl EndpointHandler for AdminApiRequest {
type Response = AdminApiResponse;

View file

@ -46,7 +46,7 @@ impl AdminApiRequest {
POST DeleteKey (query::id),
GET ListKeys (),
// Bucket endpoints
GET GetBucketInfo (query_opt::id, query_opt::global_alias),
GET GetBucketInfo (query_opt::id, query_opt::global_alias, query_opt::search),
GET ListBuckets (),
POST CreateBucket (body),
POST DeleteBucket (query::id),
@ -141,6 +141,7 @@ impl AdminApiRequest {
Ok(AdminApiRequest::GetBucketInfo(GetBucketInfoRequest {
id,
global_alias,
search: None,
}))
}
Endpoint::CreateBucket => {

View file

@ -1,15 +1,6 @@
use std::collections::HashMap;
use std::fmt::Write;
use garage_util::crdt::*;
use garage_util::time::*;
use garage_table::*;
use garage_model::bucket_alias_table::*;
use garage_model::bucket_table::*;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::permission::*;
use crate::cli::*;
@ -18,451 +9,13 @@ use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => self.handle_list_buckets().await,
BucketOperation::Info(query) => self.handle_bucket_info(query).await,
BucketOperation::Create(query) => self.handle_create_bucket(&query.name).await,
BucketOperation::Delete(query) => self.handle_delete_bucket(query).await,
BucketOperation::Alias(query) => self.handle_alias_bucket(query).await,
BucketOperation::Unalias(query) => self.handle_unalias_bucket(query).await,
BucketOperation::Allow(query) => self.handle_bucket_allow(query).await,
BucketOperation::Deny(query) => self.handle_bucket_deny(query).await,
BucketOperation::Website(query) => self.handle_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.handle_bucket_set_quotas(query).await,
BucketOperation::CleanupIncompleteUploads(query) => {
self.handle_bucket_cleanup_incomplete_uploads(query).await
}
_ => unreachable!(),
}
}
async fn handle_list_buckets(&self) -> Result<AdminRpc, Error> {
let buckets = self
.garage
.bucket_table
.get_range(
&EmptyKey,
None,
Some(DeletedFilter::NotDeleted),
10000,
EnumerationOrder::Forward,
)
.await?;
Ok(AdminRpc::BucketList(buckets))
}
async fn handle_bucket_info(&self, query: &BucketOpt) -> Result<AdminRpc, Error> {
let bucket_id = self
.garage
.bucket_helper()
.admin_get_existing_matching_bucket(&query.name)
.await?;
let bucket = self
.garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let counters = self
.garage
.object_counter_table
.table
.get(&bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mpu_counters = self
.garage
.mpu_counter_table
.table
.get(&bucket_id, &EmptyKey)
.await?
.map(|x| x.filtered_values(&self.garage.system.cluster_layout()))
.unwrap_or_default();
let mut relevant_keys = HashMap::new();
for (k, _) in bucket
.state
.as_option()
.unwrap()
.authorized_keys
.items()
.iter()
{
if let Some(key) = self
.garage
.key_table
.get(&EmptyKey, k)
.await?
.filter(|k| !k.is_deleted())
{
relevant_keys.insert(k.clone(), key);
}
}
for ((k, _), _, _) in bucket
.state
.as_option()
.unwrap()
.local_aliases
.items()
.iter()
{
if relevant_keys.contains_key(k) {
continue;
}
if let Some(key) = self.garage.key_table.get(&EmptyKey, k).await? {
relevant_keys.insert(k.clone(), key);
}
}
Ok(AdminRpc::BucketInfo {
bucket,
relevant_keys,
counters,
mpu_counters,
})
}
#[allow(clippy::ptr_arg)]
async fn handle_create_bucket(&self, name: &String) -> Result<AdminRpc, Error> {
if !is_valid_bucket_name(name) {
return Err(Error::BadRequest(format!(
"{}: {}",
name, INVALID_BUCKET_NAME_MESSAGE
)));
}
let helper = self.garage.locked_helper().await;
if let Some(alias) = self.garage.bucket_alias_table.get(&EmptyKey, name).await? {
if alias.state.get().is_some() {
return Err(Error::BadRequest(format!("Bucket {} already exists", name)));
}
}
// ---- done checking, now commit ----
let bucket = Bucket::new();
self.garage.bucket_table.insert(&bucket).await?;
helper.set_global_bucket_alias(bucket.id, name).await?;
Ok(AdminRpc::Ok(format!("Bucket {} was created.", name)))
}
async fn handle_delete_bucket(&self, query: &DeleteBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.admin_get_existing_matching_bucket(&query.name)
.await?;
// Get the alias, but keep in minde here the bucket name
// given in parameter can also be directly the bucket's ID.
// In that case bucket_alias will be None, and
// we can still delete the bucket if it has zero aliases
// (a condition which we try to prevent but that could still happen somehow).
// We just won't try to delete an alias entry because there isn't one.
let bucket_alias = self
.garage
.bucket_alias_table
.get(&EmptyKey, &query.name)
.await?;
// Check bucket doesn't have other aliases
let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
let bucket_state = bucket.state.as_option().unwrap();
if bucket_state
.aliases
.items()
.iter()
.filter(|(_, _, active)| *active)
.any(|(name, _, _)| name != &query.name)
{
return Err(Error::BadRequest(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", query.name)));
}
if bucket_state
.local_aliases
.items()
.iter()
.any(|(_, _, active)| *active)
{
return Err(Error::BadRequest(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", query.name)));
}
// Check bucket is empty
if !helper.bucket().is_bucket_empty(bucket_id).await? {
return Err(Error::BadRequest(format!(
"Bucket {} is not empty",
query.name
)));
}
if !query.yes {
return Err(Error::BadRequest(
"Add --yes flag to really perform this operation".to_string(),
));
}
// --- done checking, now commit ---
// 1. delete authorization from keys that had access
for (key_id, _) in bucket.authorized_keys() {
helper
.set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
.await?;
}
// 2. delete bucket alias
if bucket_alias.is_some() {
helper
.purge_global_bucket_alias(bucket_id, &query.name)
.await?;
}
// 3. delete bucket
bucket.state = Deletable::delete();
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!("Bucket {} was deleted.", query.name)))
}
async fn handle_alias_bucket(&self, query: &AliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.admin_get_existing_matching_bucket(&query.existing_bucket)
.await?;
if let Some(key_pattern) = &query.local {
let key = helper.key().get_existing_matching_key(key_pattern).await?;
helper
.set_local_bucket_alias(bucket_id, &key.key_id, &query.new_name)
.await?;
Ok(AdminRpc::Ok(format!(
"Alias {} now points to bucket {:?} in namespace of key {}",
query.new_name, bucket_id, key.key_id
)))
} else {
helper
.set_global_bucket_alias(bucket_id, &query.new_name)
.await?;
Ok(AdminRpc::Ok(format!(
"Alias {} now points to bucket {:?}",
query.new_name, bucket_id
)))
}
}
async fn handle_unalias_bucket(&self, query: &UnaliasBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.locked_helper().await;
if let Some(key_pattern) = &query.local {
let key = helper.key().get_existing_matching_key(key_pattern).await?;
let bucket_id = key
.state
.as_option()
.unwrap()
.local_aliases
.get(&query.name)
.cloned()
.flatten()
.ok_or_bad_request("Bucket not found")?;
helper
.unset_local_bucket_alias(bucket_id, &key.key_id, &query.name)
.await?;
Ok(AdminRpc::Ok(format!(
"Alias {} no longer points to bucket {:?} in namespace of key {}",
&query.name, bucket_id, key.key_id
)))
} else {
let bucket_id = helper
.bucket()
.resolve_global_bucket_name(&query.name)
.await?
.ok_or_bad_request("Bucket not found")?;
helper
.unset_global_bucket_alias(bucket_id, &query.name)
.await?;
Ok(AdminRpc::Ok(format!(
"Alias {} no longer points to bucket {:?}",
&query.name, bucket_id
)))
}
}
async fn handle_bucket_allow(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.admin_get_existing_matching_bucket(&query.bucket)
.await?;
let key = helper
.key()
.get_existing_matching_key(&query.key_pattern)
.await?;
let allow_read = query.read || key.allow_read(&bucket_id);
let allow_write = query.write || key.allow_write(&bucket_id);
let allow_owner = query.owner || key.allow_owner(&bucket_id);
helper
.set_bucket_key_permissions(
bucket_id,
&key.key_id,
BucketKeyPerm {
timestamp: now_msec(),
allow_read,
allow_write,
allow_owner,
},
)
.await?;
Ok(AdminRpc::Ok(format!(
"New permissions for {} on {}: read {}, write {}, owner {}.",
&key.key_id, &query.bucket, allow_read, allow_write, allow_owner
)))
}
async fn handle_bucket_deny(&self, query: &PermBucketOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.locked_helper().await;
let bucket_id = helper
.bucket()
.admin_get_existing_matching_bucket(&query.bucket)
.await?;
let key = helper
.key()
.get_existing_matching_key(&query.key_pattern)
.await?;
let allow_read = !query.read && key.allow_read(&bucket_id);
let allow_write = !query.write && key.allow_write(&bucket_id);
let allow_owner = !query.owner && key.allow_owner(&bucket_id);
helper
.set_bucket_key_permissions(
bucket_id,
&key.key_id,
BucketKeyPerm {
timestamp: now_msec(),
allow_read,
allow_write,
allow_owner,
},
)
.await?;
Ok(AdminRpc::Ok(format!(
"New permissions for {} on {}: read {}, write {}, owner {}.",
&key.key_id, &query.bucket, allow_read, allow_write, allow_owner
)))
}
async fn handle_bucket_website(&self, query: &WebsiteOpt) -> Result<AdminRpc, Error> {
let bucket_id = self
.garage
.bucket_helper()
.admin_get_existing_matching_bucket(&query.bucket)
.await?;
let mut bucket = self
.garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_state = bucket.state.as_option_mut().unwrap();
if !(query.allow ^ query.deny) {
return Err(Error::BadRequest(
"You must specify exactly one flag, either --allow or --deny".to_string(),
));
}
let website = if query.allow {
Some(WebsiteConfig {
index_document: query.index_document.clone(),
error_document: query.error_document.clone(),
})
} else {
None
};
bucket_state.website_config.update(website);
self.garage.bucket_table.insert(&bucket).await?;
let msg = if query.allow {
format!("Website access allowed for {}", &query.bucket)
} else {
format!("Website access denied for {}", &query.bucket)
};
Ok(AdminRpc::Ok(msg))
}
async fn handle_bucket_set_quotas(&self, query: &SetQuotasOpt) -> Result<AdminRpc, Error> {
let bucket_id = self
.garage
.bucket_helper()
.admin_get_existing_matching_bucket(&query.bucket)
.await?;
let mut bucket = self
.garage
.bucket_helper()
.get_existing_bucket(bucket_id)
.await?;
let bucket_state = bucket.state.as_option_mut().unwrap();
if query.max_size.is_none() && query.max_objects.is_none() {
return Err(Error::BadRequest(
"You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
));
}
let mut quotas = bucket_state.quotas.get().clone();
match query.max_size.as_ref().map(String::as_ref) {
Some("none") => quotas.max_size = None,
Some(v) => {
let bs = v
.parse::<bytesize::ByteSize>()
.ok_or_bad_request(format!("Invalid size specified: {}", v))?;
quotas.max_size = Some(bs.as_u64());
}
_ => (),
}
match query.max_objects.as_ref().map(String::as_ref) {
Some("none") => quotas.max_objects = None,
Some(v) => {
let mo = v
.parse::<u64>()
.ok_or_bad_request(format!("Invalid number specified: {}", v))?;
quotas.max_objects = Some(mo);
}
_ => (),
}
bucket_state.quotas.update(quotas);
self.garage.bucket_table.insert(&bucket).await?;
Ok(AdminRpc::Ok(format!(
"Quotas updated for {}",
&query.bucket
)))
}
async fn handle_bucket_cleanup_incomplete_uploads(
&self,
query: &CleanupIncompleteUploadsOpt,

View file

@ -1,161 +0,0 @@
use std::collections::HashMap;
use garage_table::*;
use garage_model::helper::error::*;
use garage_model::key_table::*;
use crate::cli::*;
use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
KeyOperation::Info(query) => self.handle_key_info(query).await,
KeyOperation::Create(query) => self.handle_create_key(query).await,
KeyOperation::Rename(query) => self.handle_rename_key(query).await,
KeyOperation::Delete(query) => self.handle_delete_key(query).await,
KeyOperation::Allow(query) => self.handle_allow_key(query).await,
KeyOperation::Deny(query) => self.handle_deny_key(query).await,
KeyOperation::Import(query) => self.handle_import_key(query).await,
}
}
async fn handle_list_keys(&self) -> Result<AdminRpc, Error> {
let key_ids = self
.garage
.key_table
.get_range(
&EmptyKey,
None,
Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)),
10000,
EnumerationOrder::Forward,
)
.await?
.iter()
.map(|k| (k.key_id.to_string(), k.params().unwrap().name.get().clone()))
.collect::<Vec<_>>();
Ok(AdminRpc::KeyList(key_ids))
}
async fn handle_key_info(&self, query: &KeyInfoOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
.key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if !query.show_secret {
key.state.as_option_mut().unwrap().secret_key = "(redacted)".into();
}
self.key_info_result(key).await
}
async fn handle_create_key(&self, query: &KeyNewOpt) -> Result<AdminRpc, Error> {
let key = Key::new(&query.name);
self.garage.key_table.insert(&key).await?;
self.key_info_result(key).await
}
async fn handle_rename_key(&self, query: &KeyRenameOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
.key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
key.params_mut()
.unwrap()
.name
.update(query.new_name.clone());
self.garage.key_table.insert(&key).await?;
self.key_info_result(key).await
}
async fn handle_delete_key(&self, query: &KeyDeleteOpt) -> Result<AdminRpc, Error> {
let helper = self.garage.locked_helper().await;
let mut key = helper
.key()
.get_existing_matching_key(&query.key_pattern)
.await?;
if !query.yes {
return Err(Error::BadRequest(
"Add --yes flag to really perform this operation".to_string(),
));
}
helper.delete_key(&mut key).await?;
Ok(AdminRpc::Ok(format!(
"Key {} was deleted successfully.",
key.key_id
)))
}
async fn handle_allow_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
.key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
key.params_mut().unwrap().allow_create_bucket.update(true);
}
self.garage.key_table.insert(&key).await?;
self.key_info_result(key).await
}
async fn handle_deny_key(&self, query: &KeyPermOpt) -> Result<AdminRpc, Error> {
let mut key = self
.garage
.key_helper()
.get_existing_matching_key(&query.key_pattern)
.await?;
if query.create_bucket {
key.params_mut().unwrap().allow_create_bucket.update(false);
}
self.garage.key_table.insert(&key).await?;
self.key_info_result(key).await
}
async fn handle_import_key(&self, query: &KeyImportOpt) -> Result<AdminRpc, Error> {
if !query.yes {
return Err(Error::BadRequest("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string()));
}
let prev_key = self.garage.key_table.get(&EmptyKey, &query.key_id).await?;
if prev_key.is_some() {
return Err(Error::BadRequest(format!("Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", query.key_id)));
}
let imported_key = Key::import(&query.key_id, &query.secret_key, &query.name)
.ok_or_bad_request("Invalid key format")?;
self.garage.key_table.insert(&imported_key).await?;
self.key_info_result(imported_key).await
}
async fn key_info_result(&self, key: Key) -> Result<AdminRpc, Error> {
let mut relevant_buckets = HashMap::new();
for (id, _) in key
.state
.as_option()
.unwrap()
.authorized_buckets
.items()
.iter()
{
if let Some(b) = self.garage.bucket_table.get(&EmptyKey, id).await? {
relevant_buckets.insert(*id, b);
}
}
Ok(AdminRpc::KeyInfo(key, relevant_buckets))
}
}

View file

@ -1,6 +1,5 @@
mod block;
mod bucket;
mod key;
use std::collections::HashMap;
use std::fmt::Write;
@ -23,13 +22,15 @@ use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use garage_api::admin::api::{AdminApiRequest, TaggedAdminApiResponse};
use garage_api::admin::EndpointHandler as AdminApiEndpoint;
use garage_api::generic_server::ApiError;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
@ -39,7 +40,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
KeyOperation(KeyOperation),
LaunchRepair(RepairOpt),
Stats(StatsOpt),
Worker(WorkerOperation),
@ -48,15 +48,6 @@ pub enum AdminRpc {
// Replies
Ok(String),
BucketList(Vec<Bucket>),
BucketInfo {
bucket: Bucket,
relevant_keys: HashMap<String, Key>,
counters: HashMap<String, i64>,
mpu_counters: HashMap<String, i64>,
},
KeyList(Vec<(String, String)>),
KeyInfo(Key, HashMap<Uuid, Bucket>),
WorkerList(
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
@ -70,6 +61,15 @@ pub enum AdminRpc {
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
},
// Proxying HTTP Admin API endpoints
ApiRequest(AdminApiRequest),
ApiOkResponse(TaggedAdminApiResponse),
ApiErrorResponse {
http_code: u16,
error_code: String,
message: String,
},
}
impl Rpc for AdminRpc {
@ -503,6 +503,25 @@ impl AdminRpcHandler {
}
}
}
// ================== PROXYING ADMIN API REQUESTS ===================
async fn handle_api_request(
self: &Arc<Self>,
req: &AdminApiRequest,
) -> Result<AdminRpc, Error> {
let req = req.clone();
info!("Proxied admin API request: {}", req.name());
let res = req.handle(&self.garage).await;
match res {
Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
Err(e) => Ok(AdminRpc::ApiErrorResponse {
http_code: e.http_status_code().as_u16(),
error_code: e.code().to_string(),
message: e.to_string(),
}),
}
}
}
#[async_trait]
@ -514,12 +533,12 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
) -> Result<AdminRpc, Error> {
match message {
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
AdminRpc::KeyOperation(ko) => self.handle_key_cmd(ko).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}

View file

@ -1,10 +1,5 @@
use std::collections::{HashMap, HashSet};
use std::time::Duration;
use format_table::format_table;
use garage_util::error::*;
use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
@ -13,204 +8,6 @@ use garage_model::helper::error::Error as HelperError;
use crate::admin::*;
use crate::cli::*;
pub async fn cli_command_dispatch(
cmd: Command,
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
admin_rpc_endpoint: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
) -> Result<(), HelperError> {
match cmd {
Command::Status => Ok(cmd_status(system_rpc_endpoint, rpc_host).await?),
Command::Node(NodeOperation::Connect(connect_opt)) => {
Ok(cmd_connect(system_rpc_endpoint, rpc_host, connect_opt).await?)
}
Command::Layout(layout_opt) => {
Ok(cli_layout_command_dispatch(layout_opt, system_rpc_endpoint, rpc_host).await?)
}
Command::Bucket(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BucketOperation(bo)).await
}
Command::Key(ko) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::KeyOperation(ko)).await
}
Command::Repair(ro) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::LaunchRepair(ro)).await
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
Command::Block(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
}
Command::Meta(mo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::MetaOperation(mo)).await
}
_ => unreachable!(),
}
}
pub async fn cmd_status(rpc_cli: &Endpoint<SystemRpc, ()>, rpc_host: NodeID) -> Result<(), Error> {
let status = fetch_status(rpc_cli, rpc_host).await?;
let layout = fetch_layout(rpc_cli, rpc_host).await?;
println!("==== HEALTHY NODES ====");
let mut healthy_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.iter().filter(|adv| adv.is_up) {
let host = adv.status.hostname.as_deref().unwrap_or("?");
let addr = match adv.addr {
Some(addr) => addr.to_string(),
None => "N/A".to_string(),
};
if let Some(NodeRoleV(Some(cfg))) = layout.current().roles.get(&adv.id) {
let data_avail = match &adv.status.data_disk_avail {
_ if cfg.capacity.is_none() => "N/A".into(),
Some((avail, total)) => {
let pct = (*avail as f64) / (*total as f64) * 100.;
let avail = bytesize::ByteSize::b(*avail);
format!("{} ({:.1}%)", avail, pct)
}
None => "?".into(),
};
healthy_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
id = adv.id,
host = host,
addr = addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = cfg.capacity_string(),
data_avail = data_avail,
));
} else {
let prev_role = layout
.versions
.iter()
.rev()
.find_map(|x| match x.roles.get(&adv.id) {
Some(NodeRoleV(Some(cfg))) => Some(cfg),
_ => None,
});
if let Some(cfg) = prev_role {
healthy_nodes.push(format!(
"{id:?}\t{host}\t{addr}\t[{tags}]\t{zone}\tdraining metadata...",
id = adv.id,
host = host,
addr = addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
));
} else {
let new_role = match layout.staging.get().roles.get(&adv.id) {
Some(NodeRoleV(Some(_))) => "pending...",
_ => "NO ROLE ASSIGNED",
};
healthy_nodes.push(format!(
"{id:?}\t{h}\t{addr}\t\t\t{new_role}",
id = adv.id,
h = host,
addr = addr,
new_role = new_role,
));
}
}
}
format_table(healthy_nodes);
// Determine which nodes are unhealthy and print that to stdout
let status_map = status
.iter()
.map(|adv| (adv.id, adv))
.collect::<HashMap<_, _>>();
let tf = timeago::Formatter::new();
let mut drain_msg = false;
let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()];
let mut listed = HashSet::new();
for ver in layout.versions.iter().rev() {
for (node, _, role) in ver.roles.items().iter() {
let cfg = match role {
NodeRoleV(Some(role)) if role.capacity.is_some() => role,
_ => continue,
};
if listed.contains(node) {
continue;
}
listed.insert(*node);
let adv = status_map.get(node);
if adv.map(|x| x.is_up).unwrap_or(false) {
continue;
}
// Node is in a layout version, is not a gateway node, and is not up:
// it is in a failed state, add proper line to the output
let (host, last_seen) = match adv {
Some(adv) => (
adv.status.hostname.as_deref().unwrap_or("?"),
adv.last_seen_secs_ago
.map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into()),
),
None => ("??", "never seen".into()),
};
let capacity = if ver.version == layout.current().version {
cfg.capacity_string()
} else {
drain_msg = true;
"draining metadata...".to_string()
};
failed_nodes.push(format!(
"{id:?}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = node,
host = host,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = capacity,
last_seen = last_seen,
));
}
}
if failed_nodes.len() > 1 {
println!("\n==== FAILED NODES ====");
format_table(failed_nodes);
if drain_msg {
println!();
println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
println!("If these nodes are definitely dead, please review the layout history with");
println!(
"`garage layout history` and use `garage layout skip-dead-nodes` to force progress."
);
}
}
if print_staging_role_changes(&layout) {
println!();
println!("Please use `garage layout show` to check the proposed new layout and apply it.");
println!();
}
Ok(())
}
pub async fn cmd_connect(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: ConnectNodeOpt,
) -> Result<(), Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::Connect(args.node), PRIO_NORMAL)
.await??
{
SystemRpc::Ok => {
println!("Success.");
Ok(())
}
m => Err(Error::unexpected_rpc_message(m)),
}
}
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
@ -220,23 +17,6 @@ pub async fn cmd_admin(
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::BucketList(bl) => {
print_bucket_list(bl);
}
AdminRpc::BucketInfo {
bucket,
relevant_keys,
counters,
mpu_counters,
} => {
print_bucket_info(&bucket, &relevant_keys, &counters, &mpu_counters);
}
AdminRpc::KeyList(kl) => {
print_key_list(kl);
}
AdminRpc::KeyInfo(key, rb) => {
print_key_info(&key, &rb);
}
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}

View file

@ -1,7 +1,6 @@
use bytesize::ByteSize;
use format_table::format_table;
use garage_util::crdt::Crdt;
use garage_util::error::*;
use garage_rpc::layout::*;
@ -10,174 +9,6 @@ use garage_rpc::*;
use crate::cli::*;
pub async fn cli_layout_command_dispatch(
cmd: LayoutOperation,
system_rpc_endpoint: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<(), Error> {
match cmd {
LayoutOperation::Assign(assign_opt) => {
cmd_assign_role(system_rpc_endpoint, rpc_host, assign_opt).await
}
LayoutOperation::Remove(remove_opt) => {
cmd_remove_role(system_rpc_endpoint, rpc_host, remove_opt).await
}
LayoutOperation::Show => cmd_show_layout(system_rpc_endpoint, rpc_host).await,
LayoutOperation::Apply(apply_opt) => {
cmd_apply_layout(system_rpc_endpoint, rpc_host, apply_opt).await
}
LayoutOperation::Revert(revert_opt) => {
cmd_revert_layout(system_rpc_endpoint, rpc_host, revert_opt).await
}
LayoutOperation::Config(config_opt) => {
cmd_config_layout(system_rpc_endpoint, rpc_host, config_opt).await
}
LayoutOperation::History => cmd_layout_history(system_rpc_endpoint, rpc_host).await,
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
cmd_layout_skip_dead_nodes(system_rpc_endpoint, rpc_host, assume_sync_opt).await
}
}
}
pub async fn cmd_assign_role(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: AssignRoleOpt,
) -> Result<(), Error> {
let status = match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => nodes,
resp => return Err(Error::Message(format!("Invalid RPC response: {:?}", resp))),
};
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let all_nodes = layout.get_all_nodes();
let added_nodes = args
.node_ids
.iter()
.map(|node_id| {
find_matching_node(
status
.iter()
.map(|adv| adv.id)
.chain(all_nodes.iter().cloned()),
node_id,
)
})
.collect::<Result<Vec<_>, _>>()?;
let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging.get().roles);
for replaced in args.replace.iter() {
let replaced_node = find_matching_node(all_nodes.iter().cloned(), replaced)?;
match roles.get(&replaced_node) {
Some(NodeRoleV(Some(_))) => {
layout
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(replaced_node, NodeRoleV(None)));
}
_ => {
return Err(Error::Message(format!(
"Cannot replace node {:?} as it is not currently in planned layout",
replaced_node
)));
}
}
}
if args.capacity.is_some() && args.gateway {
return Err(Error::Message(
"-c and -g are mutually exclusive, please configure node either with c>0 to act as a storage node or with -g to act as a gateway node".into()));
}
if args.capacity == Some(ByteSize::b(0)) {
return Err(Error::Message("Invalid capacity value: 0".into()));
}
for added_node in added_nodes {
let new_entry = match roles.get(&added_node) {
Some(NodeRoleV(Some(old))) => {
let capacity = match args.capacity {
Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => old.capacity,
};
let tags = if args.tags.is_empty() {
old.tags.clone()
} else {
args.tags.clone()
};
NodeRole {
zone: args.zone.clone().unwrap_or_else(|| old.zone.to_string()),
capacity,
tags,
}
}
_ => {
let capacity = match args.capacity {
Some(c) => Some(c.as_u64()),
None if args.gateway => None,
None => return Err(Error::Message(
"Please specify a capacity with the -c flag, or set node explicitly as gateway with -g".into())),
};
NodeRole {
zone: args
.zone
.clone()
.ok_or("Please specify a zone with the -z flag")?,
capacity,
tags: args.tags.clone(),
}
}
};
layout
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(added_node, NodeRoleV(Some(new_entry))));
}
send_layout(rpc_cli, rpc_host, layout).await?;
println!("Role changes are staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_remove_role(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
args: RemoveRoleOpt,
) -> Result<(), Error> {
let mut layout = fetch_layout(rpc_cli, rpc_host).await?;
let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging.get().roles);
let deleted_node =
find_matching_node(roles.items().iter().map(|(id, _, _)| *id), &args.node_id)?;
layout
.staging
.get_mut()
.roles
.merge(&roles.update_mutator(deleted_node, NodeRoleV(None)));
send_layout(rpc_cli, rpc_host, layout).await?;
println!("Role removal is staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
@ -226,47 +57,6 @@ pub async fn cmd_show_layout(
Ok(())
}
pub async fn cmd_apply_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
apply_opt: ApplyLayoutOpt,
) -> Result<(), Error> {
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let (layout, msg) = layout.apply_staged_changes(apply_opt.version)?;
for line in msg.iter() {
println!("{}", line);
}
send_layout(rpc_cli, rpc_host, layout).await?;
println!("New cluster layout with updated role assignment has been applied in cluster.");
println!("Data will now be moved around between nodes accordingly.");
Ok(())
}
pub async fn cmd_revert_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
revert_opt: RevertLayoutOpt,
) -> Result<(), Error> {
if !revert_opt.yes {
return Err(Error::Message(
"Please add the --yes flag to run the layout revert operation".into(),
));
}
let layout = fetch_layout(rpc_cli, rpc_host).await?;
let layout = layout.revert_staged_changes()?;
send_layout(rpc_cli, rpc_host, layout).await?;
println!("All proposed role changes in cluster layout have been canceled.");
Ok(())
}
pub async fn cmd_config_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,

View file

@ -8,6 +8,5 @@ pub(crate) mod convert_db;
pub(crate) use cmd::*;
pub(crate) use init::*;
pub(crate) use layout::*;
pub(crate) use structs::*;
pub(crate) use util::*;

View file

@ -3,257 +3,16 @@ use std::time::Duration;
use format_table::format_table;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::mpu_table::{self, MultipartUpload};
use garage_model::s3::object_table;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt;
pub fn print_bucket_list(bl: Vec<Bucket>) {
println!("List of buckets:");
let mut table = vec![];
for bucket in bl {
let aliases = bucket
.aliases()
.iter()
.filter(|(_, _, active)| *active)
.map(|(name, _, _)| name.to_string())
.collect::<Vec<_>>();
let local_aliases_n = match &bucket
.local_aliases()
.iter()
.filter(|(_, _, active)| *active)
.collect::<Vec<_>>()[..]
{
[] => "".into(),
[((k, n), _, _)] => format!("{}:{}", k, n),
s => format!("[{} local aliases]", s.len()),
};
table.push(format!(
"\t{}\t{}\t{}",
aliases.join(","),
local_aliases_n,
hex::encode(bucket.id),
));
}
format_table(table);
}
pub fn print_key_list(kl: Vec<(String, String)>) {
println!("List of keys:");
let mut table = vec![];
for key in kl {
table.push(format!("\t{}\t{}", key.0, key.1));
}
format_table(table);
}
pub fn print_key_info(key: &Key, relevant_buckets: &HashMap<Uuid, Bucket>) {
let bucket_global_aliases = |b: &Uuid| {
if let Some(bucket) = relevant_buckets.get(b) {
if let Some(p) = bucket.state.as_option() {
return p
.aliases
.items()
.iter()
.filter(|(_, _, active)| *active)
.map(|(a, _, _)| a.clone())
.collect::<Vec<_>>()
.join(", ");
}
}
"".to_string()
};
match &key.state {
Deletable::Present(p) => {
println!("Key name: {}", p.name.get());
println!("Key ID: {}", key.key_id);
println!("Secret key: {}", p.secret_key);
println!("Can create buckets: {}", p.allow_create_bucket.get());
println!("\nKey-specific bucket aliases:");
let mut table = vec![];
for (alias_name, _, alias) in p.local_aliases.items().iter() {
if let Some(bucket_id) = alias {
table.push(format!(
"\t{}\t{}\t{}",
alias_name,
bucket_global_aliases(bucket_id),
hex::encode(bucket_id)
));
}
}
format_table(table);
println!("\nAuthorized buckets:");
let mut table = vec![];
for (bucket_id, perm) in p.authorized_buckets.items().iter() {
if !perm.is_any() {
continue;
}
let rflag = if perm.allow_read { "R" } else { " " };
let wflag = if perm.allow_write { "W" } else { " " };
let oflag = if perm.allow_owner { "O" } else { " " };
let local_aliases = p
.local_aliases
.items()
.iter()
.filter(|(_, _, a)| *a == Some(*bucket_id))
.map(|(a, _, _)| a.clone())
.collect::<Vec<_>>()
.join(", ");
table.push(format!(
"\t{}{}{}\t{}\t{}\t{:?}",
rflag,
wflag,
oflag,
bucket_global_aliases(bucket_id),
local_aliases,
bucket_id
));
}
format_table(table);
}
Deletable::Deleted => {
println!("Key {} is deleted.", key.key_id);
}
}
}
pub fn print_bucket_info(
bucket: &Bucket,
relevant_keys: &HashMap<String, Key>,
counters: &HashMap<String, i64>,
mpu_counters: &HashMap<String, i64>,
) {
let key_name = |k| {
relevant_keys
.get(k)
.map(|k| k.params().unwrap().name.get().as_str())
.unwrap_or("<deleted>")
};
println!("Bucket: {}", hex::encode(bucket.id));
match &bucket.state {
Deletable::Deleted => println!("Bucket is deleted."),
Deletable::Present(p) => {
let size =
bytesize::ByteSize::b(*counters.get(object_table::BYTES).unwrap_or(&0) as u64);
println!(
"\nSize: {} ({})",
size.to_string_as(true),
size.to_string_as(false)
);
println!(
"Objects: {}",
*counters.get(object_table::OBJECTS).unwrap_or(&0)
);
println!(
"Unfinished uploads (multipart and non-multipart): {}",
*counters.get(object_table::UNFINISHED_UPLOADS).unwrap_or(&0)
);
println!(
"Unfinished multipart uploads: {}",
*mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0)
);
let mpu_size =
bytesize::ByteSize::b(*mpu_counters.get(mpu_table::BYTES).unwrap_or(&0) as u64);
println!(
"Size of unfinished multipart uploads: {} ({})",
mpu_size.to_string_as(true),
mpu_size.to_string_as(false),
);
println!("\nWebsite access: {}", p.website_config.get().is_some());
let quotas = p.quotas.get();
if quotas.max_size.is_some() || quotas.max_objects.is_some() {
println!("\nQuotas:");
if let Some(ms) = quotas.max_size {
let ms = bytesize::ByteSize::b(ms);
println!(
" maximum size: {} ({})",
ms.to_string_as(true),
ms.to_string_as(false)
);
}
if let Some(mo) = quotas.max_objects {
println!(" maximum number of objects: {}", mo);
}
}
println!("\nGlobal aliases:");
for (alias, _, active) in p.aliases.items().iter() {
if *active {
println!(" {}", alias);
}
}
println!("\nKey-specific aliases:");
let mut table = vec![];
for ((key_id, alias), _, active) in p.local_aliases.items().iter() {
if *active {
table.push(format!("\t{} ({})\t{}", key_id, key_name(key_id), alias));
}
}
format_table(table);
println!("\nAuthorized keys:");
let mut table = vec![];
for (k, perm) in p.authorized_keys.items().iter() {
if !perm.is_any() {
continue;
}
let rflag = if perm.allow_read { "R" } else { " " };
let wflag = if perm.allow_write { "W" } else { " " };
let oflag = if perm.allow_owner { "O" } else { " " };
table.push(format!(
"\t{}{}{}\t{}\t{}",
rflag,
wflag,
oflag,
k,
key_name(k)
));
}
format_table(table);
}
};
}
pub fn find_matching_node(
cand: impl std::iter::Iterator<Item = Uuid>,
pattern: &str,
) -> Result<Uuid, Error> {
let mut candidates = vec![];
for c in cand {
if hex::encode(c).starts_with(pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0])
}
}
pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {

523
src/garage/cli_v2/bucket.rs Normal file
View file

@ -0,0 +1,523 @@
//use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api::admin::api::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_bucket(&self, cmd: BucketOperation) -> Result<(), Error> {
match cmd {
BucketOperation::List => self.cmd_list_buckets().await,
BucketOperation::Info(query) => self.cmd_bucket_info(query).await,
BucketOperation::Create(query) => self.cmd_create_bucket(query).await,
BucketOperation::Delete(query) => self.cmd_delete_bucket(query).await,
BucketOperation::Alias(query) => self.cmd_alias_bucket(query).await,
BucketOperation::Unalias(query) => self.cmd_unalias_bucket(query).await,
BucketOperation::Allow(query) => self.cmd_bucket_allow(query).await,
BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await,
BucketOperation::Website(query) => self.cmd_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await,
// TODO
x => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BucketOperation(x),
)
.await
.ok_or_message("old error"),
}
}
pub async fn cmd_list_buckets(&self) -> Result<(), Error> {
let buckets = self.api_request(ListBucketsRequest).await?;
println!("List of buckets:");
let mut table = vec![];
for bucket in buckets.0.iter() {
let local_aliases_n = match &bucket.local_aliases[..] {
[] => "".into(),
[alias] => format!("{}:{}", alias.access_key_id, alias.alias),
s => format!("[{} local aliases]", s.len()),
};
table.push(format!(
"\t{}\t{}\t{}",
bucket.global_aliases.join(","),
local_aliases_n,
bucket.id,
));
}
format_table(table);
Ok(())
}
pub async fn cmd_bucket_info(&self, opt: BucketOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.name),
})
.await?;
println!("Bucket: {}", bucket.id);
let size = bytesize::ByteSize::b(bucket.bytes as u64);
println!(
"\nSize: {} ({})",
size.to_string_as(true),
size.to_string_as(false)
);
println!("Objects: {}", bucket.objects);
println!(
"Unfinished uploads (multipart and non-multipart): {}",
bucket.unfinished_uploads,
);
println!(
"Unfinished multipart uploads: {}",
bucket.unfinished_multipart_uploads
);
let mpu_size = bytesize::ByteSize::b(bucket.unfinished_multipart_uploads as u64);
println!(
"Size of unfinished multipart uploads: {} ({})",
mpu_size.to_string_as(true),
mpu_size.to_string_as(false),
);
println!("\nWebsite access: {}", bucket.website_access);
if bucket.quotas.max_size.is_some() || bucket.quotas.max_objects.is_some() {
println!("\nQuotas:");
if let Some(ms) = bucket.quotas.max_size {
let ms = bytesize::ByteSize::b(ms);
println!(
" maximum size: {} ({})",
ms.to_string_as(true),
ms.to_string_as(false)
);
}
if let Some(mo) = bucket.quotas.max_objects {
println!(" maximum number of objects: {}", mo);
}
}
println!("\nGlobal aliases:");
for alias in bucket.global_aliases {
println!(" {}", alias);
}
println!("\nKey-specific aliases:");
let mut table = vec![];
for key in bucket.keys.iter() {
for alias in key.bucket_local_aliases.iter() {
table.push(format!("\t{} ({})\t{}", key.access_key_id, key.name, alias));
}
}
format_table(table);
println!("\nAuthorized keys:");
let mut table = vec![];
for key in bucket.keys.iter() {
if !(key.permissions.read || key.permissions.write || key.permissions.owner) {
continue;
}
let rflag = if key.permissions.read { "R" } else { " " };
let wflag = if key.permissions.write { "W" } else { " " };
let oflag = if key.permissions.owner { "O" } else { " " };
table.push(format!(
"\t{}{}{}\t{}\t{}",
rflag, wflag, oflag, key.access_key_id, key.name
));
}
format_table(table);
Ok(())
}
pub async fn cmd_create_bucket(&self, opt: BucketOpt) -> Result<(), Error> {
self.api_request(CreateBucketRequest {
global_alias: Some(opt.name.clone()),
local_alias: None,
})
.await?;
println!("Bucket {} was created.", opt.name);
Ok(())
}
pub async fn cmd_delete_bucket(&self, opt: DeleteBucketOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.name.clone()),
})
.await?;
// CLI-only checks: the bucket must not have other aliases
if bucket
.global_aliases
.iter()
.find(|a| **a != opt.name)
.is_some()
{
return Err(Error::Message(format!("Bucket {} still has other global aliases. Use `bucket unalias` to delete them one by one.", opt.name)));
}
if bucket
.keys
.iter()
.any(|k| !k.bucket_local_aliases.is_empty())
{
return Err(Error::Message(format!("Bucket {} still has other local aliases. Use `bucket unalias` to delete them one by one.", opt.name)));
}
if !opt.yes {
println!("About to delete bucket {}.", bucket.id);
return Err(Error::Message(
"Add --yes flag to really perform this operation".to_string(),
));
}
self.api_request(DeleteBucketRequest {
id: bucket.id.clone(),
})
.await?;
println!("Bucket {} has been deleted.", bucket.id);
Ok(())
}
pub async fn cmd_alias_bucket(&self, opt: AliasBucketOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.existing_bucket.clone()),
})
.await?;
if let Some(key_pat) = &opt.local {
let key = self
.api_request(GetKeyInfoRequest {
search: Some(key_pat.clone()),
id: None,
show_secret_key: false,
})
.await?;
self.api_request(AddBucketAliasRequest {
bucket_id: bucket.id.clone(),
alias: BucketAliasEnum::Local {
local_alias: opt.new_name.clone(),
access_key_id: key.access_key_id.clone(),
},
})
.await?;
println!(
"Alias {} now points to bucket {:.16} in namespace of key {}",
opt.new_name, bucket.id, key.access_key_id
)
} else {
self.api_request(AddBucketAliasRequest {
bucket_id: bucket.id.clone(),
alias: BucketAliasEnum::Global {
global_alias: opt.new_name.clone(),
},
})
.await?;
println!(
"Alias {} now points to bucket {:.16}",
opt.new_name, bucket.id
)
}
Ok(())
}
pub async fn cmd_unalias_bucket(&self, opt: UnaliasBucketOpt) -> Result<(), Error> {
if let Some(key_pat) = &opt.local {
let key = self
.api_request(GetKeyInfoRequest {
search: Some(key_pat.clone()),
id: None,
show_secret_key: false,
})
.await?;
let bucket = key
.buckets
.iter()
.find(|x| x.local_aliases.contains(&opt.name))
.ok_or_message(format!(
"No bucket called {} in namespace of key {}",
opt.name, key.access_key_id
))?;
self.api_request(RemoveBucketAliasRequest {
bucket_id: bucket.id.clone(),
alias: BucketAliasEnum::Local {
access_key_id: key.access_key_id.clone(),
local_alias: opt.name.clone(),
},
})
.await?;
println!(
"Alias {} no longer points to bucket {:.16} in namespace of key {}",
&opt.name, bucket.id, key.access_key_id
)
} else {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: Some(opt.name.clone()),
search: None,
})
.await?;
self.api_request(RemoveBucketAliasRequest {
bucket_id: bucket.id.clone(),
alias: BucketAliasEnum::Global {
global_alias: opt.name.clone(),
},
})
.await?;
println!(
"Alias {} no longer points to bucket {:.16}",
opt.name, bucket.id
)
}
Ok(())
}
pub async fn cmd_bucket_allow(&self, opt: PermBucketOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.bucket.clone()),
})
.await?;
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern.clone()),
show_secret_key: false,
})
.await?;
self.api_request(AllowBucketKeyRequest(BucketKeyPermChangeRequest {
bucket_id: bucket.id.clone(),
access_key_id: key.access_key_id.clone(),
permissions: ApiBucketKeyPerm {
read: opt.read,
write: opt.write,
owner: opt.owner,
},
}))
.await?;
let new_bucket = self
.api_request(GetBucketInfoRequest {
id: Some(bucket.id),
global_alias: None,
search: None,
})
.await?;
if let Some(new_key) = new_bucket
.keys
.iter()
.find(|k| k.access_key_id == key.access_key_id)
{
println!(
"New permissions for key {} on bucket {:.16}:\n read {}\n write {}\n owner {}",
key.access_key_id,
new_bucket.id,
new_key.permissions.read,
new_key.permissions.write,
new_key.permissions.owner
);
} else {
println!(
"Access key {} has no permissions on bucket {:.16}",
key.access_key_id, new_bucket.id
);
}
Ok(())
}
pub async fn cmd_bucket_deny(&self, opt: PermBucketOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.bucket.clone()),
})
.await?;
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern.clone()),
show_secret_key: false,
})
.await?;
self.api_request(DenyBucketKeyRequest(BucketKeyPermChangeRequest {
bucket_id: bucket.id.clone(),
access_key_id: key.access_key_id.clone(),
permissions: ApiBucketKeyPerm {
read: opt.read,
write: opt.write,
owner: opt.owner,
},
}))
.await?;
let new_bucket = self
.api_request(GetBucketInfoRequest {
id: Some(bucket.id),
global_alias: None,
search: None,
})
.await?;
if let Some(new_key) = new_bucket
.keys
.iter()
.find(|k| k.access_key_id == key.access_key_id)
{
println!(
"New permissions for key {} on bucket {:.16}:\n read {}\n write {}\n owner {}",
key.access_key_id,
new_bucket.id,
new_key.permissions.read,
new_key.permissions.write,
new_key.permissions.owner
);
} else {
println!(
"Access key {} no longer has permissions on bucket {:.16}",
key.access_key_id, new_bucket.id
);
}
Ok(())
}
pub async fn cmd_bucket_website(&self, opt: WebsiteOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.bucket.clone()),
})
.await?;
if !(opt.allow ^ opt.deny) {
return Err(Error::Message(
"You must specify exactly one flag, either --allow or --deny".to_string(),
));
}
let wa = if opt.allow {
UpdateBucketWebsiteAccess {
enabled: true,
index_document: Some(opt.index_document.clone()),
error_document: opt
.error_document
.or(bucket.website_config.and_then(|x| x.error_document.clone())),
}
} else {
UpdateBucketWebsiteAccess {
enabled: false,
index_document: None,
error_document: None,
}
};
self.api_request(UpdateBucketRequest {
id: bucket.id,
body: UpdateBucketRequestBody {
website_access: Some(wa),
quotas: None,
},
})
.await?;
if opt.allow {
println!("Website access allowed for {}", &opt.bucket);
} else {
println!("Website access denied for {}", &opt.bucket);
}
Ok(())
}
pub async fn cmd_bucket_set_quotas(&self, opt: SetQuotasOpt) -> Result<(), Error> {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(opt.bucket.clone()),
})
.await?;
if opt.max_size.is_none() && opt.max_objects.is_none() {
return Err(Error::Message(
"You must specify either --max-size or --max-objects (or both) for this command to do something.".to_string(),
));
}
let new_quotas = ApiBucketQuotas {
max_size: match opt.max_size.as_deref() {
Some("none") => None,
Some(v) => Some(
v.parse::<bytesize::ByteSize>()
.ok_or_message(format!("Invalid size specified: {}", v))?
.as_u64(),
),
None => bucket.quotas.max_size,
},
max_objects: match opt.max_objects.as_deref() {
Some("none") => None,
Some(v) => Some(
v.parse::<u64>()
.ok_or_message(format!("Invalid number: {}", v))?,
),
None => bucket.quotas.max_objects,
},
};
self.api_request(UpdateBucketRequest {
id: bucket.id.clone(),
body: UpdateBucketRequestBody {
website_access: None,
quotas: Some(new_quotas),
},
})
.await?;
println!("Quotas updated for bucket {:.16}", bucket.id);
Ok(())
}
}

View file

@ -0,0 +1,158 @@
use format_table::format_table;
use garage_util::error::*;
use garage_api::admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::layout::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_status(&self) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
println!("==== HEALTHY NODES ====");
let mut healthy_nodes =
vec!["ID\tHostname\tAddress\tTags\tZone\tCapacity\tDataAvail".to_string()];
for adv in status.nodes.iter().filter(|adv| adv.is_up) {
let host = adv.hostname.as_deref().unwrap_or("?");
let addr = match adv.addr {
Some(addr) => addr.to_string(),
None => "N/A".to_string(),
};
if let Some(cfg) = &adv.role {
let data_avail = match &adv.data_partition {
_ if cfg.capacity.is_none() => "N/A".into(),
Some(FreeSpaceResp { available, total }) => {
let pct = (*available as f64) / (*total as f64) * 100.;
let avail_str = bytesize::ByteSize::b(*available);
format!("{} ({:.1}%)", avail_str, pct)
}
None => "?".into(),
};
healthy_nodes.push(format!(
"{id:.16}\t{host}\t{addr}\t[{tags}]\t{zone}\t{capacity}\t{data_avail}",
id = adv.id,
host = host,
addr = addr,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = capacity_string(cfg.capacity),
data_avail = data_avail,
));
} else {
let status = match layout.staged_role_changes.iter().find(|x| x.id == adv.id) {
Some(NodeRoleChange {
action: NodeRoleChangeEnum::Update { .. },
..
}) => "pending...",
_ if adv.draining => "draining metadata..",
_ => "NO ROLE ASSIGNED",
};
healthy_nodes.push(format!(
"{id:.16}\t{h}\t{addr}\t\t\t{status}",
id = adv.id,
h = host,
addr = addr,
status = status,
));
}
}
format_table(healthy_nodes);
let tf = timeago::Formatter::new();
let mut drain_msg = false;
let mut failed_nodes = vec!["ID\tHostname\tTags\tZone\tCapacity\tLast seen".to_string()];
for adv in status.nodes.iter().filter(|x| !x.is_up) {
let node = &adv.id;
let host = adv.hostname.as_deref().unwrap_or("?");
let last_seen = adv
.last_seen_secs_ago
.map(|s| tf.convert(Duration::from_secs(s)))
.unwrap_or_else(|| "never seen".into());
if let Some(cfg) = &adv.role {
let capacity = capacity_string(cfg.capacity);
failed_nodes.push(format!(
"{id:.16}\t{host}\t[{tags}]\t{zone}\t{capacity}\t{last_seen}",
id = node,
host = host,
tags = cfg.tags.join(","),
zone = cfg.zone,
capacity = capacity,
last_seen = last_seen,
));
} else {
let status = match layout.staged_role_changes.iter().find(|x| x.id == adv.id) {
Some(NodeRoleChange {
action: NodeRoleChangeEnum::Update { .. },
..
}) => "pending...",
_ if adv.draining => {
drain_msg = true;
"draining metadata.."
}
_ => unreachable!(),
};
failed_nodes.push(format!(
"{id:.16}\t{host}\t\t\t{status}\t{last_seen}",
id = node,
host = host,
status = status,
last_seen = last_seen,
));
}
}
if failed_nodes.len() > 1 {
println!("\n==== FAILED NODES ====");
format_table(failed_nodes);
if drain_msg {
println!();
println!("Your cluster is expecting to drain data from nodes that are currently unavailable.");
println!(
"If these nodes are definitely dead, please review the layout history with"
);
println!(
"`garage layout history` and use `garage layout skip-dead-nodes` to force progress."
);
}
}
if print_staging_role_changes(&layout) {
println!();
println!(
"Please use `garage layout show` to check the proposed new layout and apply it."
);
println!();
}
Ok(())
}
pub async fn cmd_connect(&self, opt: ConnectNodeOpt) -> Result<(), Error> {
let res = self
.api_request(ConnectClusterNodesRequest(vec![opt.node]))
.await?;
if res.0.len() != 1 {
return Err(Error::Message(format!("unexpected response: {:?}", res)));
}
let res = res.0.into_iter().next().unwrap();
if res.success {
println!("Success.");
Ok(())
} else {
Err(Error::Message(format!(
"Failure: {}",
res.error.unwrap_or_default()
)))
}
}
}

227
src/garage/cli_v2/key.rs Normal file
View file

@ -0,0 +1,227 @@
use format_table::format_table;
use garage_util::error::*;
use garage_api::admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_key(&self, cmd: KeyOperation) -> Result<(), Error> {
match cmd {
KeyOperation::List => self.cmd_list_keys().await,
KeyOperation::Info(query) => self.cmd_key_info(query).await,
KeyOperation::Create(query) => self.cmd_create_key(query).await,
KeyOperation::Rename(query) => self.cmd_rename_key(query).await,
KeyOperation::Delete(query) => self.cmd_delete_key(query).await,
KeyOperation::Allow(query) => self.cmd_allow_key(query).await,
KeyOperation::Deny(query) => self.cmd_deny_key(query).await,
KeyOperation::Import(query) => self.cmd_import_key(query).await,
}
}
pub async fn cmd_list_keys(&self) -> Result<(), Error> {
let keys = self.api_request(ListKeysRequest).await?;
println!("List of keys:");
let mut table = vec![];
for key in keys.0.iter() {
table.push(format!("\t{}\t{}", key.id, key.name));
}
format_table(table);
Ok(())
}
pub async fn cmd_key_info(&self, opt: KeyInfoOpt) -> Result<(), Error> {
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern),
show_secret_key: opt.show_secret,
})
.await?;
print_key_info(&key);
Ok(())
}
pub async fn cmd_create_key(&self, opt: KeyNewOpt) -> Result<(), Error> {
let key = self
.api_request(CreateKeyRequest {
name: Some(opt.name),
})
.await?;
print_key_info(&key.0);
Ok(())
}
pub async fn cmd_rename_key(&self, opt: KeyRenameOpt) -> Result<(), Error> {
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern),
show_secret_key: false,
})
.await?;
let new_key = self
.api_request(UpdateKeyRequest {
id: key.access_key_id,
body: UpdateKeyRequestBody {
name: Some(opt.new_name),
allow: None,
deny: None,
},
})
.await?;
print_key_info(&new_key.0);
Ok(())
}
pub async fn cmd_delete_key(&self, opt: KeyDeleteOpt) -> Result<(), Error> {
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern),
show_secret_key: false,
})
.await?;
if !opt.yes {
println!("About to delete key {}...", key.access_key_id);
return Err(Error::Message(
"Add --yes flag to really perform this operation".to_string(),
));
}
self.api_request(DeleteKeyRequest {
id: key.access_key_id.clone(),
})
.await?;
println!("Access key {} has been deleted.", key.access_key_id);
Ok(())
}
pub async fn cmd_allow_key(&self, opt: KeyPermOpt) -> Result<(), Error> {
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern),
show_secret_key: false,
})
.await?;
let new_key = self
.api_request(UpdateKeyRequest {
id: key.access_key_id,
body: UpdateKeyRequestBody {
name: None,
allow: Some(KeyPerm {
create_bucket: opt.create_bucket,
}),
deny: None,
},
})
.await?;
print_key_info(&new_key.0);
Ok(())
}
pub async fn cmd_deny_key(&self, opt: KeyPermOpt) -> Result<(), Error> {
let key = self
.api_request(GetKeyInfoRequest {
id: None,
search: Some(opt.key_pattern),
show_secret_key: false,
})
.await?;
let new_key = self
.api_request(UpdateKeyRequest {
id: key.access_key_id,
body: UpdateKeyRequestBody {
name: None,
allow: None,
deny: Some(KeyPerm {
create_bucket: opt.create_bucket,
}),
},
})
.await?;
print_key_info(&new_key.0);
Ok(())
}
pub async fn cmd_import_key(&self, opt: KeyImportOpt) -> Result<(), Error> {
if !opt.yes {
return Err(Error::Message("This command is intended to re-import keys that were previously generated by Garage. If you want to create a new key, use `garage key new` instead. Add the --yes flag if you really want to re-import a key.".to_string()));
}
let new_key = self
.api_request(ImportKeyRequest {
name: Some(opt.name),
access_key_id: opt.key_id,
secret_access_key: opt.secret_key,
})
.await?;
print_key_info(&new_key.0);
Ok(())
}
}
fn print_key_info(key: &GetKeyInfoResponse) {
println!("Key name: {}", key.name);
println!("Key ID: {}", key.access_key_id);
println!(
"Secret key: {}",
key.secret_access_key.as_deref().unwrap_or("(redacted)")
);
println!("Can create buckets: {}", key.permissions.create_bucket);
println!("\nKey-specific bucket aliases:");
let mut table = vec![];
for bucket in key.buckets.iter() {
for la in bucket.local_aliases.iter() {
table.push(format!(
"\t{}\t{}\t{}",
la,
bucket.global_aliases.join(","),
bucket.id
));
}
}
format_table(table);
println!("\nAuthorized buckets:");
let mut table = vec![];
for bucket in key.buckets.iter() {
let rflag = if bucket.permissions.read { "R" } else { " " };
let wflag = if bucket.permissions.write { "W" } else { " " };
let oflag = if bucket.permissions.owner { "O" } else { " " };
table.push(format!(
"\t{}{}{}\t{}\t{}\t{:.16}",
rflag,
wflag,
oflag,
bucket.global_aliases.join(","),
bucket.local_aliases.join(","),
bucket.id
));
}
format_table(table);
}

284
src/garage/cli_v2/layout.rs Normal file
View file

@ -0,0 +1,284 @@
use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api::admin::api::*;
use crate::cli::layout as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn layout_command_dispatch(&self, cmd: LayoutOperation) -> Result<(), Error> {
match cmd {
LayoutOperation::Assign(assign_opt) => self.cmd_assign_role(assign_opt).await,
LayoutOperation::Remove(remove_opt) => self.cmd_remove_role(remove_opt).await,
LayoutOperation::Apply(apply_opt) => self.cmd_apply_layout(apply_opt).await,
LayoutOperation::Revert(revert_opt) => self.cmd_revert_layout(revert_opt).await,
// TODO
LayoutOperation::Show => {
cli_v1::cmd_show_layout(&self.system_rpc_endpoint, self.rpc_host).await
}
LayoutOperation::Config(config_opt) => {
cli_v1::cmd_config_layout(&self.system_rpc_endpoint, self.rpc_host, config_opt)
.await
}
LayoutOperation::History => {
cli_v1::cmd_layout_history(&self.system_rpc_endpoint, self.rpc_host).await
}
LayoutOperation::SkipDeadNodes(assume_sync_opt) => {
cli_v1::cmd_layout_skip_dead_nodes(
&self.system_rpc_endpoint,
self.rpc_host,
assume_sync_opt,
)
.await
}
}
}
pub async fn cmd_assign_role(&self, opt: AssignRoleOpt) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
let all_node_ids_iter = status
.nodes
.iter()
.map(|x| x.id.as_str())
.chain(layout.roles.iter().map(|x| x.id.as_str()));
let mut actions = vec![];
for node in opt.replace.iter() {
let id = find_matching_node(all_node_ids_iter.clone(), &node)?;
actions.push(NodeRoleChange {
id,
action: NodeRoleChangeEnum::Remove { remove: true },
});
}
for node in opt.node_ids.iter() {
let id = find_matching_node(all_node_ids_iter.clone(), &node)?;
let current = get_staged_or_current_role(&id, &layout);
let zone = opt
.zone
.clone()
.or_else(|| current.as_ref().map(|c| c.zone.clone()))
.ok_or_message("Please specify a zone with the -z flag")?;
let capacity = if opt.gateway {
if opt.capacity.is_some() {
return Err(Error::Message("Please specify only -c or -g".into()));
}
None
} else if let Some(cap) = opt.capacity {
Some(cap.as_u64())
} else {
current.as_ref().ok_or_message("Please specify a capacity with the -c flag, or set node explicitly as gateway with -g")?.capacity
};
let tags = if !opt.tags.is_empty() {
opt.tags.clone()
} else if let Some(cur) = current.as_ref() {
cur.tags.clone()
} else {
vec![]
};
actions.push(NodeRoleChange {
id,
action: NodeRoleChangeEnum::Update {
zone,
capacity,
tags,
},
});
}
self.api_request(UpdateClusterLayoutRequest(actions))
.await?;
println!("Role changes are staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_remove_role(&self, opt: RemoveRoleOpt) -> Result<(), Error> {
let status = self.api_request(GetClusterStatusRequest).await?;
let layout = self.api_request(GetClusterLayoutRequest).await?;
let all_node_ids_iter = status
.nodes
.iter()
.map(|x| x.id.as_str())
.chain(layout.roles.iter().map(|x| x.id.as_str()));
let id = find_matching_node(all_node_ids_iter.clone(), &opt.node_id)?;
let actions = vec![NodeRoleChange {
id,
action: NodeRoleChangeEnum::Remove { remove: true },
}];
self.api_request(UpdateClusterLayoutRequest(actions))
.await?;
println!("Role removal is staged but not yet committed.");
println!("Use `garage layout show` to view staged role changes,");
println!("and `garage layout apply` to enact staged changes.");
Ok(())
}
pub async fn cmd_apply_layout(&self, apply_opt: ApplyLayoutOpt) -> Result<(), Error> {
let missing_version_error = r#"
Please pass the new layout version number to ensure that you are writing the correct version of the cluster layout.
To know the correct value of the new layout version, invoke `garage layout show` and review the proposed changes.
"#;
let req = ApplyClusterLayoutRequest {
version: apply_opt.version.ok_or_message(missing_version_error)?,
};
let res = self.api_request(req).await?;
for line in res.message.iter() {
println!("{}", line);
}
println!("New cluster layout with updated role assignment has been applied in cluster.");
println!("Data will now be moved around between nodes accordingly.");
Ok(())
}
pub async fn cmd_revert_layout(&self, revert_opt: RevertLayoutOpt) -> Result<(), Error> {
if !revert_opt.yes {
return Err(Error::Message(
"Please add the --yes flag to run the layout revert operation".into(),
));
}
self.api_request(RevertClusterLayoutRequest).await?;
println!("All proposed role changes in cluster layout have been canceled.");
Ok(())
}
}
// --------------------------
// ---- helper functions ----
// --------------------------
pub fn capacity_string(v: Option<u64>) -> String {
match v {
Some(c) => ByteSize::b(c).to_string_as(false),
None => "gateway".to_string(),
}
}
pub fn get_staged_or_current_role(
id: &str,
layout: &GetClusterLayoutResponse,
) -> Option<NodeRoleResp> {
for node in layout.staged_role_changes.iter() {
if node.id == id {
return match &node.action {
NodeRoleChangeEnum::Remove { .. } => None,
NodeRoleChangeEnum::Update {
zone,
capacity,
tags,
} => Some(NodeRoleResp {
id: id.to_string(),
zone: zone.to_string(),
capacity: *capacity,
tags: tags.clone(),
}),
};
}
}
for node in layout.roles.iter() {
if node.id == id {
return Some(node.clone());
}
}
None
}
pub fn find_matching_node<'a>(
cand: impl std::iter::Iterator<Item = &'a str>,
pattern: &'a str,
) -> Result<String, Error> {
let mut candidates = vec![];
for c in cand {
if c.starts_with(pattern) && !candidates.contains(&c) {
candidates.push(c);
}
}
if candidates.len() != 1 {
Err(Error::Message(format!(
"{} nodes match '{}'",
candidates.len(),
pattern,
)))
} else {
Ok(candidates[0].to_string())
}
}
pub fn print_staging_role_changes(layout: &GetClusterLayoutResponse) -> bool {
let has_role_changes = !layout.staged_role_changes.is_empty();
// TODO!! Layout parameters
let has_layout_changes = false;
if has_role_changes || has_layout_changes {
println!();
println!("==== STAGED ROLE CHANGES ====");
if has_role_changes {
let mut table = vec!["ID\tTags\tZone\tCapacity".to_string()];
for change in layout.staged_role_changes.iter() {
match &change.action {
NodeRoleChangeEnum::Update {
tags,
zone,
capacity,
} => {
let tags = tags.join(",");
table.push(format!(
"{:.16}\t{}\t{}\t{}",
change.id,
tags,
zone,
capacity_string(*capacity),
));
}
NodeRoleChangeEnum::Remove { .. } => {
table.push(format!("{:.16}\tREMOVED", change.id));
}
}
}
format_table(table);
println!();
}
//TODO
/*
if has_layout_changes {
println!(
"Zone redundancy: {}",
staging.parameters.get().zone_redundancy
);
}
*/
true
} else {
false
}
}

106
src/garage/cli_v2/mod.rs Normal file
View file

@ -0,0 +1,106 @@
pub mod bucket;
pub mod cluster;
pub mod key;
pub mod layout;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_api::admin::api::*;
use garage_api::admin::EndpointHandler as AdminApiEndpoint;
use crate::admin::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
pub rpc_host: NodeID,
}
impl Cli {
pub async fn handle(&self, cmd: Command) -> Result<(), Error> {
match cmd {
Command::Status => self.cmd_status().await,
Command::Node(NodeOperation::Connect(connect_opt)) => {
self.cmd_connect(connect_opt).await
}
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::LaunchRepair(ro),
)
.await
.ok_or_message("cli_v1"),
Command::Stats(so) => {
cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
.await
.ok_or_message("cli_v1")
}
Command::Worker(wo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::Worker(wo),
)
.await
.ok_or_message("cli_v1"),
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
Command::Meta(mo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::MetaOperation(mo),
)
.await
.ok_or_message("cli_v1"),
_ => unreachable!(),
}
}
pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
where
T: AdminApiEndpoint,
AdminApiRequest: From<T>,
<T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
.admin_rpc_endpoint
.call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
.await?
.ok_or_message("rpc")?
{
AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
.map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
AdminRpc::ApiErrorResponse {
http_code,
error_code,
message,
} => Err(Error::Message(format!(
"{} returned {} ({}): {}",
req_name, error_code, http_code, message
))),
m => Err(Error::unexpected_rpc_message(m)),
}
}
}

View file

@ -6,6 +6,7 @@ extern crate tracing;
mod admin;
mod cli;
mod cli_v2;
mod repair;
mod secrets;
mod server;
@ -34,8 +35,6 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_model::helper::error::Error as HelperError;
use admin::*;
use cli::*;
use secrets::Secrets;
@ -284,10 +283,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
match cli_command_dispatch(opt.cmd, &system_rpc_endpoint, &admin_rpc_endpoint, id).await {
Err(HelperError::Internal(i)) => Err(Error::Message(format!("Internal error: {}", i))),
Err(HelperError::BadRequest(b)) => Err(Error::Message(b)),
Err(e) => Err(Error::Message(format!("{}", e))),
Ok(x) => Ok(x),
}
let cli = cli_v2::Cli {
system_rpc_endpoint,
admin_rpc_endpoint,
rpc_host: id,
};
cli.handle(opt.cmd).await
}

View file

@ -73,41 +73,48 @@ impl<'a> BucketHelper<'a> {
pattern: &String,
) -> Result<Uuid, Error> {
if let Some(uuid) = self.resolve_global_bucket_name(pattern).await? {
return Ok(uuid);
} else if pattern.len() >= 2 {
let hexdec = pattern
.get(..pattern.len() & !1)
.and_then(|x| hex::decode(x).ok());
if let Some(hex) = hexdec {
let mut start = [0u8; 32];
start
.as_mut_slice()
.get_mut(..hex.len())
.ok_or_bad_request("invalid length")?
.copy_from_slice(&hex);
let mut candidates = self
.0
.bucket_table
.get_range(
&EmptyKey,
Some(start.into()),
Some(DeletedFilter::NotDeleted),
10,
EnumerationOrder::Forward,
)
.await?
.into_iter()
.collect::<Vec<_>>();
candidates.retain(|x| hex::encode(x.id).starts_with(pattern));
if candidates.len() == 1 {
return Ok(candidates.into_iter().next().unwrap().id);
}
Ok(uuid)
} else {
let hexdec = if pattern.len() >= 2 {
pattern
.get(..pattern.len() & !1)
.and_then(|x| hex::decode(x).ok())
} else {
None
};
let hex = hexdec.ok_or_else(|| Error::NoSuchBucket(pattern.clone()))?;
let mut start = [0u8; 32];
start
.as_mut_slice()
.get_mut(..hex.len())
.ok_or_bad_request("invalid length")?
.copy_from_slice(&hex);
let mut candidates = self
.0
.bucket_table
.get_range(
&EmptyKey,
Some(start.into()),
Some(DeletedFilter::NotDeleted),
10,
EnumerationOrder::Forward,
)
.await?
.into_iter()
.collect::<Vec<_>>();
candidates.retain(|x| hex::encode(x.id).starts_with(pattern));
if candidates.is_empty() {
Err(Error::NoSuchBucket(pattern.clone()))
} else if candidates.len() == 1 {
Ok(candidates.into_iter().next().unwrap().id)
} else {
Err(Error::BadRequest(format!(
"Several matching buckets: {}",
pattern
)))
}
}
Err(Error::BadRequest(format!(
"Bucket not found / several matching buckets: {}",
pattern
)))
}
/// Returns a Bucket if it is present in bucket table,