admin api: fix things
This commit is contained in:
parent
e3d1571247
commit
bb36360c0b
7 changed files with 63 additions and 72 deletions
|
@ -73,10 +73,10 @@ admin_endpoints![
|
|||
RemoveBucketAlias,
|
||||
|
||||
// Worker operations
|
||||
GetWorkerParam,
|
||||
GetWorkerVariable,
|
||||
];
|
||||
|
||||
local_admin_endpoints![GetWorkerParam,];
|
||||
local_admin_endpoints![GetWorkerVariable,];
|
||||
|
||||
// **********************************************
|
||||
// Special endpoints
|
||||
|
@ -591,9 +591,9 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
|
|||
// **********************************************
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LocalGetWorkerParamRequest {
|
||||
pub param: Option<String>,
|
||||
pub struct LocalGetWorkerVariableRequest {
|
||||
pub variable: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct LocalGetWorkerParamResponse(pub HashMap<String, String>);
|
||||
pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
|
||||
|
|
|
@ -37,11 +37,15 @@ use crate::helpers::*;
|
|||
pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct AdminRpc(LocalAdminApiRequest);
|
||||
pub enum AdminRpc {
|
||||
Proxy(AdminApiRequest),
|
||||
Internal(LocalAdminApiRequest),
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum AdminRpcResponse {
|
||||
ApiOkResponse(LocalAdminApiResponse),
|
||||
ProxyApiOkResponse(TaggedAdminApiResponse),
|
||||
InternalApiOkResponse(LocalAdminApiResponse),
|
||||
ApiErrorResponse {
|
||||
http_code: u16,
|
||||
error_code: String,
|
||||
|
@ -60,11 +64,12 @@ impl EndpointHandler<AdminRpc> for AdminApiServer {
|
|||
message: &AdminRpc,
|
||||
_from: NodeID,
|
||||
) -> Result<AdminRpcResponse, GarageError> {
|
||||
let req = message.0.clone();
|
||||
match message {
|
||||
AdminRpc::Proxy(req) => {
|
||||
info!("Proxied admin API request: {}", req.name());
|
||||
let res = req.handle(&self.garage, &self).await;
|
||||
let res = req.clone().handle(&self.garage, &self).await;
|
||||
match res {
|
||||
Ok(res) => Ok(AdminRpcResponse::ApiOkResponse(res)),
|
||||
Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
|
||||
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
|
||||
http_code: e.http_status_code().as_u16(),
|
||||
error_code: e.code().to_string(),
|
||||
|
@ -72,6 +77,20 @@ impl EndpointHandler<AdminRpc> for AdminApiServer {
|
|||
}),
|
||||
}
|
||||
}
|
||||
AdminRpc::Internal(req) => {
|
||||
info!("Internal admin API request: {}", req.name());
|
||||
let res = req.clone().handle(&self.garage, &self).await;
|
||||
match res {
|
||||
Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
|
||||
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
|
||||
http_code: e.http_status_code().as_u16(),
|
||||
error_code: e.code().to_string(),
|
||||
message: e.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ---- FOR HTTP ----
|
||||
|
@ -118,7 +137,7 @@ impl AdminApiServer {
|
|||
}
|
||||
|
||||
pub async fn run(
|
||||
self,
|
||||
self: Arc<Self>,
|
||||
bind_addr: UnixOrTCPSocketAddress,
|
||||
must_exit: watch::Receiver<bool>,
|
||||
) -> Result<(), GarageError> {
|
||||
|
@ -158,7 +177,7 @@ impl AdminApiServer {
|
|||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ApiHandler for AdminApiServer {
|
||||
impl ApiHandler for Arc<AdminApiServer> {
|
||||
const API_NAME: &'static str = "admin";
|
||||
const API_NAME_DISPLAY: &'static str = "Admin";
|
||||
|
||||
|
|
|
@ -10,23 +10,23 @@ use crate::admin::error::Error;
|
|||
use crate::admin::{Admin, RequestHandler};
|
||||
|
||||
#[async_trait]
|
||||
impl RequestHandler for LocalGetWorkerParamRequest {
|
||||
type Response = LocalGetWorkerParamResponse;
|
||||
impl RequestHandler for LocalGetWorkerVariableRequest {
|
||||
type Response = LocalGetWorkerVariableResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
admin: &Admin,
|
||||
) -> Result<LocalGetWorkerParamResponse, Error> {
|
||||
_admin: &Admin,
|
||||
) -> Result<LocalGetWorkerVariableResponse, Error> {
|
||||
let mut res = HashMap::new();
|
||||
if let Some(k) = self.param {
|
||||
if let Some(k) = self.variable {
|
||||
res.insert(k.clone(), garage.bg_vars.get(&k)?);
|
||||
} else {
|
||||
let mut vars = garage.bg_vars.get_all();
|
||||
let vars = garage.bg_vars.get_all();
|
||||
for (k, v) in vars.iter() {
|
||||
res.insert(k.to_string(), v.to_string());
|
||||
}
|
||||
}
|
||||
Ok(LocalGetWorkerParamResponse(res))
|
||||
Ok(LocalGetWorkerVariableResponse(res))
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,10 +26,6 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
|
|||
use garage_model::s3::mpu_table::MultipartUpload;
|
||||
use garage_model::s3::version_table::Version;
|
||||
|
||||
use garage_api::admin::api::{AdminApiRequest, TaggedAdminApiResponse};
|
||||
use garage_api::admin::RequestHandler as AdminApiEndpoint;
|
||||
use garage_api::generic_server::ApiError;
|
||||
|
||||
use crate::cli::*;
|
||||
use crate::repair::online::launch_online_repair;
|
||||
|
||||
|
@ -59,15 +55,6 @@ pub enum AdminRpc {
|
|||
versions: Vec<Result<Version, Uuid>>,
|
||||
uploads: Vec<MultipartUpload>,
|
||||
},
|
||||
|
||||
// Proxying HTTP Admin API endpoints
|
||||
ApiRequest(AdminApiRequest),
|
||||
ApiOkResponse(TaggedAdminApiResponse),
|
||||
ApiErrorResponse {
|
||||
http_code: u16,
|
||||
error_code: String,
|
||||
message: String,
|
||||
},
|
||||
}
|
||||
|
||||
impl Rpc for AdminRpc {
|
||||
|
@ -501,25 +488,6 @@ impl AdminRpcHandler {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ================== PROXYING ADMIN API REQUESTS ===================
|
||||
|
||||
async fn handle_api_request(
|
||||
self: &Arc<Self>,
|
||||
req: &AdminApiRequest,
|
||||
) -> Result<AdminRpc, Error> {
|
||||
let req = req.clone();
|
||||
info!("Proxied admin API request: {}", req.name());
|
||||
let res = req.handle(&self.garage).await;
|
||||
match res {
|
||||
Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
|
||||
Err(e) => Ok(AdminRpc::ApiErrorResponse {
|
||||
http_code: e.http_status_code().as_u16(),
|
||||
error_code: e.code().to_string(),
|
||||
message: e.to_string(),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
@ -535,7 +503,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
|||
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
|
||||
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
||||
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
||||
AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
|
||||
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ use garage_rpc::system::*;
|
|||
use garage_rpc::*;
|
||||
|
||||
use garage_api::admin::api::*;
|
||||
use garage_api::admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
|
||||
use garage_api::admin::RequestHandler as AdminApiEndpoint;
|
||||
|
||||
use crate::admin::*;
|
||||
|
@ -23,6 +24,7 @@ use crate::cli::Command;
|
|||
pub struct Cli {
|
||||
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
|
||||
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
|
||||
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
|
||||
pub rpc_host: NodeID,
|
||||
}
|
||||
|
||||
|
@ -85,14 +87,16 @@ impl Cli {
|
|||
let req = AdminApiRequest::from(req);
|
||||
let req_name = req.name();
|
||||
match self
|
||||
.admin_rpc_endpoint
|
||||
.call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
|
||||
.await?
|
||||
.ok_or_message("rpc")?
|
||||
.proxy_rpc_endpoint
|
||||
.call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
|
||||
.await??
|
||||
{
|
||||
AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
|
||||
.map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
|
||||
AdminRpc::ApiErrorResponse {
|
||||
ProxyRpcResponse::ProxyApiOkResponse(resp) => {
|
||||
<T as AdminApiEndpoint>::Response::try_from(resp).map_err(|_| {
|
||||
Error::Message(format!("{} returned unexpected response", req_name))
|
||||
})
|
||||
}
|
||||
ProxyRpcResponse::ApiErrorResponse {
|
||||
http_code,
|
||||
error_code,
|
||||
message,
|
||||
|
|
|
@ -35,6 +35,8 @@ use garage_util::error::*;
|
|||
use garage_rpc::system::*;
|
||||
use garage_rpc::*;
|
||||
|
||||
use garage_api::admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
|
||||
|
||||
use admin::*;
|
||||
use cli::*;
|
||||
use secrets::Secrets;
|
||||
|
@ -282,10 +284,12 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
|
|||
|
||||
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
|
||||
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
|
||||
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
|
||||
|
||||
let cli = cli_v2::Cli {
|
||||
system_rpc_endpoint,
|
||||
admin_rpc_endpoint,
|
||||
proxy_rpc_endpoint,
|
||||
rpc_host: id,
|
||||
};
|
||||
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use tokio::sync::watch;
|
||||
|
||||
|
@ -64,7 +65,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
|||
}
|
||||
|
||||
info!("Initialize Admin API server and metrics collector...");
|
||||
let admin_server = AdminApiServer::new(
|
||||
let admin_server: Arc<AdminApiServer> = AdminApiServer::new(
|
||||
garage.clone(),
|
||||
background.clone(),
|
||||
#[cfg(feature = "metrics")]
|
||||
|
@ -125,11 +126,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
|||
info!("Launching Admin API server...");
|
||||
servers.push((
|
||||
"Admin",
|
||||
tokio::spawn(
|
||||
admin_server
|
||||
.clone()
|
||||
.run(admin_bind_addr.clone(), watch_cancel.clone()),
|
||||
),
|
||||
tokio::spawn(admin_server.run(admin_bind_addr.clone(), watch_cancel.clone())),
|
||||
));
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue