admin api: first local endpoint to work with new scheme
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful

This commit is contained in:
Alex 2025-01-30 20:30:49 +01:00
parent bb36360c0b
commit 783c1e1c70
7 changed files with 125 additions and 64 deletions

View file

@ -7,11 +7,15 @@ use async_trait::async_trait;
use paste::paste;
use serde::{Deserialize, Serialize};
use garage_rpc::*;
use garage_model::garage::Garage;
use crate::admin::api_server::{AdminRpc, AdminRpcResponse};
use crate::admin::error::Error;
use crate::admin::macros::*;
use crate::admin::{Admin, RequestHandler};
use crate::common_error::CommonErrorDerivative;
use crate::helpers::is_default;
// This generates the following:

View file

@ -103,8 +103,8 @@ pub struct AdminApiServer {
exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
background: Arc<BackgroundRunner>,
endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
pub(crate) background: Arc<BackgroundRunner>,
pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
}
pub enum HttpEndpoint {

View file

@ -113,12 +113,17 @@ macro_rules! local_admin_endpoints {
$(
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct [< $endpoint Request >] {
node: String,
body: [< Local $endpoint Request >],
pub node: String,
pub body: [< Local $endpoint Request >],
}
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct [< $endpoint Response >](HashMap<String, [< Local $endpoint Response >] >);
pub struct [< $endpoint Response >] {
pub success: HashMap<String, [< Local $endpoint Response >] >,
pub error: HashMap<String, String>,
}
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
@ -141,7 +146,56 @@ macro_rules! local_admin_endpoints {
type Response = [< $endpoint Response >];
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
todo!()
let to = match self.node.as_str() {
"*" => garage.system.cluster_layout().all_nodes().to_vec(),
id => {
let nodes = garage.system.cluster_layout().all_nodes()
.iter()
.filter(|x| hex::encode(x).starts_with(id))
.cloned()
.collect::<Vec<_>>();
if nodes.len() != 1 {
return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
}
nodes
}
};
let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
&to,
AdminRpc::Internal(self.body.into()),
RequestStrategy::with_priority(PRIO_NORMAL),
).await?;
let mut ret = [< $endpoint Response >] {
success: HashMap::new(),
error: HashMap::new(),
};
for (node, resp) in resps {
match resp {
Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
match [< Local $endpoint Response >]::try_from(r) {
Ok(r) => {
ret.success.insert(hex::encode(node), r);
}
Err(_) => {
ret.error.insert(hex::encode(node), "returned invalid value".to_string());
}
}
}
Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
}
Ok(_) => {
ret.error.insert(hex::encode(node), "returned invalid value".to_string());
}
Err(e) => {
ret.error.insert(hex::encode(node), e.to_string());
}
}
}
Ok(ret)
}
}
)*

View file

@ -58,6 +58,8 @@ impl AdminApiRequest {
// Bucket aliases
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
// Worker APIs
POST GetWorkerVariable (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
@ -239,6 +241,7 @@ impl AdminApiRequest {
generateQueryParameters! {
keywords: [],
fields: [
"node" => node,
"domain" => domain,
"format" => format,
"id" => id,

View file

@ -354,62 +354,12 @@ impl AdminRpcHandler {
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerOperation::Get {
all_nodes,
variable,
} => self.handle_get_var(*all_nodes, variable).await,
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.handle_set_var(*all_nodes, variable, value).await,
}
}
async fn handle_get_var(
&self,
all_nodes: bool,
variable: &Option<String>,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Get {
all_nodes: false,
variable: variable.clone(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(v) = variable {
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
v.clone(),
self.garage.bg_vars.get(v)?,
)]))
} else {
let mut vars = self.garage.bg_vars.get_all();
vars.sort();
Ok(AdminRpc::WorkerVars(
vars.into_iter()
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
.collect(),
))
}
_ => unreachable!(),
}
}

View file

@ -3,6 +3,8 @@ pub mod cluster;
pub mod key;
pub mod layout;
pub mod worker;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@ -38,6 +40,7 @@ impl Cli {
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
Command::Worker(wo) => self.cmd_worker(wo).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
@ -52,13 +55,6 @@ impl Cli {
.await
.ok_or_message("cli_v1")
}
Command::Worker(wo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::Worker(wo),
)
.await
.ok_or_message("cli_v1"),
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,

View file

@ -0,0 +1,54 @@
//use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api::admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
match cmd {
WorkerOperation::Get {
all_nodes,
variable,
} => self.cmd_get_var(all_nodes, variable).await,
wo => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::Worker(wo),
)
.await
.ok_or_message("cli_v1"),
}
}
pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
let res = self
.api_request(GetWorkerVariableRequest {
node: if all {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalGetWorkerVariableRequest { variable: var },
})
.await?;
let mut table = vec![];
for (node, vars) in res.success.iter() {
for (key, val) in vars.0.iter() {
table.push(format!("{:.16}\t{}\t{}", node, key, val));
}
}
format_table(table);
for (node, err) in res.error.iter() {
eprintln!("{:.16}: error: {}", node, err);
}
Ok(())
}
}