From 783c1e1c702988c00935d9665ab1db32338be370 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 30 Jan 2025 20:30:49 +0100 Subject: [PATCH] admin api: first local endpoint to work with new scheme --- src/api/admin/api.rs | 4 +++ src/api/admin/api_server.rs | 4 +-- src/api/admin/macros.rs | 62 ++++++++++++++++++++++++++++++++++--- src/api/admin/router_v2.rs | 3 ++ src/garage/admin/mod.rs | 52 +------------------------------ src/garage/cli_v2/mod.rs | 10 ++---- src/garage/cli_v2/worker.rs | 54 ++++++++++++++++++++++++++++++++ 7 files changed, 125 insertions(+), 64 deletions(-) create mode 100644 src/garage/cli_v2/worker.rs diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 2eb895a4..9663d53a 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -7,11 +7,15 @@ use async_trait::async_trait; use paste::paste; use serde::{Deserialize, Serialize}; +use garage_rpc::*; + use garage_model::garage::Garage; +use crate::admin::api_server::{AdminRpc, AdminRpcResponse}; use crate::admin::error::Error; use crate::admin::macros::*; use crate::admin::{Admin, RequestHandler}; +use crate::common_error::CommonErrorDerivative; use crate::helpers::is_default; // This generates the following: diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index 3ca54b4d..f8360fea 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -103,8 +103,8 @@ pub struct AdminApiServer { exporter: PrometheusExporter, metrics_token: Option, admin_token: Option, - background: Arc, - endpoint: Arc>, + pub(crate) background: Arc, + pub(crate) endpoint: Arc>, } pub enum HttpEndpoint { diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs index 5d351c65..bf7eede9 100644 --- a/src/api/admin/macros.rs +++ b/src/api/admin/macros.rs @@ -113,12 +113,17 @@ macro_rules! local_admin_endpoints { $( #[derive(Debug, Clone, Serialize, Deserialize)] pub struct [< $endpoint Request >] { - node: String, - body: [< Local $endpoint Request >], + pub node: String, + pub body: [< Local $endpoint Request >], } + pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >]; + #[derive(Debug, Clone, Serialize, Deserialize)] - pub struct [< $endpoint Response >](HashMap] >); + pub struct [< $endpoint Response >] { + pub success: HashMap] >, + pub error: HashMap, + } impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest { fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest { @@ -141,7 +146,56 @@ macro_rules! local_admin_endpoints { type Response = [< $endpoint Response >]; async fn handle(self, garage: &Arc, admin: &Admin) -> Result { - todo!() + let to = match self.node.as_str() { + "*" => garage.system.cluster_layout().all_nodes().to_vec(), + id => { + let nodes = garage.system.cluster_layout().all_nodes() + .iter() + .filter(|x| hex::encode(x).starts_with(id)) + .cloned() + .collect::>(); + if nodes.len() != 1 { + return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes))); + } + nodes + } + }; + + let resps = garage.system.rpc_helper().call_many(&admin.endpoint, + &to, + AdminRpc::Internal(self.body.into()), + RequestStrategy::with_priority(PRIO_NORMAL), + ).await?; + + let mut ret = [< $endpoint Response >] { + success: HashMap::new(), + error: HashMap::new(), + }; + for (node, resp) in resps { + match resp { + Ok(AdminRpcResponse::InternalApiOkResponse(r)) => { + match [< Local $endpoint Response >]::try_from(r) { + Ok(r) => { + ret.success.insert(hex::encode(node), r); + } + Err(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + } + } + Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => { + ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message)); + } + Ok(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + Err(e) => { + ret.error.insert(hex::encode(node), e.to_string()); + } + } + } + + Ok(ret) } } )* diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index dec58c01..b010651c 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -58,6 +58,8 @@ impl AdminApiRequest { // Bucket aliases POST AddBucketAlias (body), POST RemoveBucketAlias (body), + // Worker APIs + POST GetWorkerVariable (body_field, query::node), ]); if let Some(message) = query.nonempty_message() { @@ -239,6 +241,7 @@ impl AdminApiRequest { generateQueryParameters! { keywords: [], fields: [ + "node" => node, "domain" => domain, "format" => format, "id" => id, diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index 38cabb90..ea26f8fb 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -354,62 +354,12 @@ impl AdminRpcHandler { .clone(); Ok(AdminRpc::WorkerInfo(*tid, info)) } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, WorkerOperation::Set { all_nodes, variable, value, } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } + _ => unreachable!(), } } diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index ded6d686..0f0b3b34 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -3,6 +3,8 @@ pub mod cluster; pub mod key; pub mod layout; +pub mod worker; + use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; @@ -38,6 +40,7 @@ impl Cli { Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, Command::Bucket(bo) => self.cmd_bucket(bo).await, Command::Key(ko) => self.cmd_key(ko).await, + Command::Worker(wo) => self.cmd_worker(wo).await, // TODO Command::Repair(ro) => cli_v1::cmd_admin( @@ -52,13 +55,6 @@ impl Cli { .await .ok_or_message("cli_v1") } - Command::Worker(wo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), - ) - .await - .ok_or_message("cli_v1"), Command::Block(bo) => cli_v1::cmd_admin( &self.admin_rpc_endpoint, self.rpc_host, diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs new file mode 100644 index 00000000..56a76dec --- /dev/null +++ b/src/garage/cli_v2/worker.rs @@ -0,0 +1,54 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { + match cmd { + WorkerOperation::Get { + all_nodes, + variable, + } => self.cmd_get_var(all_nodes, variable).await, + wo => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::Worker(wo), + ) + .await + .ok_or_message("cli_v1"), + } + } + + pub async fn cmd_get_var(&self, all: bool, var: Option) -> Result<(), Error> { + let res = self + .api_request(GetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetWorkerVariableRequest { variable: var }, + }) + .await?; + + let mut table = vec![]; + for (node, vars) in res.success.iter() { + for (key, val) in vars.0.iter() { + table.push(format!("{:.16}\t{}\t{}", node, key, val)); + } + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } +}