admin api: base infrastructure for local endpoints
admin api: rename EndpointHandler into RequestHandler to avoid confusion with RPC wip: infrastructure for local api calls admin api: fix things admin api: first local endpoint to work with new scheme admin api: implement SetWorkerVariable
This commit is contained in:
parent
bdaf55ab3f
commit
89ff9f5576
17 changed files with 619 additions and 236 deletions
|
@ -1,3 +1,4 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
@ -6,13 +7,17 @@ use async_trait::async_trait;
|
||||||
use paste::paste;
|
use paste::paste;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use garage_rpc::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
use garage_api_common::common_error::CommonErrorDerivative;
|
||||||
use garage_api_common::helpers::is_default;
|
use garage_api_common::helpers::is_default;
|
||||||
|
|
||||||
|
use crate::api_server::{AdminRpc, AdminRpcResponse};
|
||||||
use crate::error::Error;
|
use crate::error::Error;
|
||||||
use crate::macros::*;
|
use crate::macros::*;
|
||||||
use crate::EndpointHandler;
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
// This generates the following:
|
// This generates the following:
|
||||||
//
|
//
|
||||||
|
@ -71,8 +76,14 @@ admin_endpoints![
|
||||||
// Operations on bucket aliases
|
// Operations on bucket aliases
|
||||||
AddBucketAlias,
|
AddBucketAlias,
|
||||||
RemoveBucketAlias,
|
RemoveBucketAlias,
|
||||||
|
|
||||||
|
// Worker operations
|
||||||
|
GetWorkerVariable,
|
||||||
|
SetWorkerVariable,
|
||||||
];
|
];
|
||||||
|
|
||||||
|
local_admin_endpoints![GetWorkerVariable, SetWorkerVariable,];
|
||||||
|
|
||||||
// **********************************************
|
// **********************************************
|
||||||
// Special endpoints
|
// Special endpoints
|
||||||
//
|
//
|
||||||
|
@ -580,3 +591,31 @@ pub struct RemoveBucketAliasRequest {
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
|
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
|
||||||
|
|
||||||
|
// **********************************************
|
||||||
|
// Worker operations
|
||||||
|
// **********************************************
|
||||||
|
|
||||||
|
// ---- GetWorkerVariable ----
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalGetWorkerVariableRequest {
|
||||||
|
pub variable: Option<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
|
||||||
|
|
||||||
|
// ---- SetWorkerVariable ----
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalSetWorkerVariableRequest {
|
||||||
|
pub variable: String,
|
||||||
|
pub value: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct LocalSetWorkerVariableResponse {
|
||||||
|
pub variable: String,
|
||||||
|
pub value: String,
|
||||||
|
}
|
||||||
|
|
|
@ -6,6 +6,7 @@ use async_trait::async_trait;
|
||||||
|
|
||||||
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
|
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
|
||||||
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
|
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use opentelemetry::trace::SpanRef;
|
use opentelemetry::trace::SpanRef;
|
||||||
|
@ -16,6 +17,8 @@ use opentelemetry_prometheus::PrometheusExporter;
|
||||||
use prometheus::{Encoder, TextEncoder};
|
use prometheus::{Encoder, TextEncoder};
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
use garage_rpc::{Endpoint as RpcEndpoint, *};
|
||||||
|
use garage_util::background::BackgroundRunner;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
use garage_util::socket_address::UnixOrTCPSocketAddress;
|
||||||
|
|
||||||
|
@ -27,7 +30,70 @@ use crate::error::*;
|
||||||
use crate::router_v0;
|
use crate::router_v0;
|
||||||
use crate::router_v1;
|
use crate::router_v1;
|
||||||
use crate::Authorization;
|
use crate::Authorization;
|
||||||
use crate::EndpointHandler;
|
use crate::RequestHandler;
|
||||||
|
|
||||||
|
// ---- FOR RPC ----
|
||||||
|
|
||||||
|
pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum AdminRpc {
|
||||||
|
Proxy(AdminApiRequest),
|
||||||
|
Internal(LocalAdminApiRequest),
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub enum AdminRpcResponse {
|
||||||
|
ProxyApiOkResponse(TaggedAdminApiResponse),
|
||||||
|
InternalApiOkResponse(LocalAdminApiResponse),
|
||||||
|
ApiErrorResponse {
|
||||||
|
http_code: u16,
|
||||||
|
error_code: String,
|
||||||
|
message: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Rpc for AdminRpc {
|
||||||
|
type Response = Result<AdminRpcResponse, GarageError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl EndpointHandler<AdminRpc> for AdminApiServer {
|
||||||
|
async fn handle(
|
||||||
|
self: &Arc<Self>,
|
||||||
|
message: &AdminRpc,
|
||||||
|
_from: NodeID,
|
||||||
|
) -> Result<AdminRpcResponse, GarageError> {
|
||||||
|
match message {
|
||||||
|
AdminRpc::Proxy(req) => {
|
||||||
|
info!("Proxied admin API request: {}", req.name());
|
||||||
|
let res = req.clone().handle(&self.garage, &self).await;
|
||||||
|
match res {
|
||||||
|
Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
|
||||||
|
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
|
||||||
|
http_code: e.http_status_code().as_u16(),
|
||||||
|
error_code: e.code().to_string(),
|
||||||
|
message: e.to_string(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
AdminRpc::Internal(req) => {
|
||||||
|
info!("Internal admin API request: {}", req.name());
|
||||||
|
let res = req.clone().handle(&self.garage, &self).await;
|
||||||
|
match res {
|
||||||
|
Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
|
||||||
|
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
|
||||||
|
http_code: e.http_status_code().as_u16(),
|
||||||
|
error_code: e.code().to_string(),
|
||||||
|
message: e.to_string(),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- FOR HTTP ----
|
||||||
|
|
||||||
pub type ResBody = BoxBody<Error>;
|
pub type ResBody = BoxBody<Error>;
|
||||||
|
|
||||||
|
@ -37,37 +103,48 @@ pub struct AdminApiServer {
|
||||||
exporter: PrometheusExporter,
|
exporter: PrometheusExporter,
|
||||||
metrics_token: Option<String>,
|
metrics_token: Option<String>,
|
||||||
admin_token: Option<String>,
|
admin_token: Option<String>,
|
||||||
|
pub(crate) background: Arc<BackgroundRunner>,
|
||||||
|
pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Endpoint {
|
pub enum HttpEndpoint {
|
||||||
Old(router_v1::Endpoint),
|
Old(router_v1::Endpoint),
|
||||||
New(String),
|
New(String),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
struct ArcAdminApiServer(Arc<AdminApiServer>);
|
||||||
|
|
||||||
impl AdminApiServer {
|
impl AdminApiServer {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
garage: Arc<Garage>,
|
garage: Arc<Garage>,
|
||||||
|
background: Arc<BackgroundRunner>,
|
||||||
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
|
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
|
||||||
) -> Self {
|
) -> Arc<Self> {
|
||||||
let cfg = &garage.config.admin;
|
let cfg = &garage.config.admin;
|
||||||
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
|
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
|
||||||
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
|
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
|
||||||
Self {
|
|
||||||
|
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
|
||||||
|
let admin = Arc::new(Self {
|
||||||
garage,
|
garage,
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
exporter,
|
exporter,
|
||||||
metrics_token,
|
metrics_token,
|
||||||
admin_token,
|
admin_token,
|
||||||
}
|
background,
|
||||||
|
endpoint,
|
||||||
|
});
|
||||||
|
admin.endpoint.set_handler(admin.clone());
|
||||||
|
admin
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn run(
|
pub async fn run(
|
||||||
self,
|
self: Arc<Self>,
|
||||||
bind_addr: UnixOrTCPSocketAddress,
|
bind_addr: UnixOrTCPSocketAddress,
|
||||||
must_exit: watch::Receiver<bool>,
|
must_exit: watch::Receiver<bool>,
|
||||||
) -> Result<(), GarageError> {
|
) -> Result<(), GarageError> {
|
||||||
let region = self.garage.config.s3_api.s3_region.clone();
|
let region = self.garage.config.s3_api.s3_region.clone();
|
||||||
ApiServer::new(region, self)
|
ApiServer::new(region, ArcAdminApiServer(self))
|
||||||
.run_server(bind_addr, Some(0o220), must_exit)
|
.run_server(bind_addr, Some(0o220), must_exit)
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -102,36 +179,46 @@ impl AdminApiServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl ApiHandler for AdminApiServer {
|
impl ApiHandler for ArcAdminApiServer {
|
||||||
const API_NAME: &'static str = "admin";
|
const API_NAME: &'static str = "admin";
|
||||||
const API_NAME_DISPLAY: &'static str = "Admin";
|
const API_NAME_DISPLAY: &'static str = "Admin";
|
||||||
|
|
||||||
type Endpoint = Endpoint;
|
type Endpoint = HttpEndpoint;
|
||||||
type Error = Error;
|
type Error = Error;
|
||||||
|
|
||||||
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
|
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
|
||||||
if req.uri().path().starts_with("/v0/") {
|
if req.uri().path().starts_with("/v0/") {
|
||||||
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
|
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
|
||||||
let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
|
let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
|
||||||
Ok(Endpoint::Old(endpoint_v1))
|
Ok(HttpEndpoint::Old(endpoint_v1))
|
||||||
} else if req.uri().path().starts_with("/v1/") {
|
} else if req.uri().path().starts_with("/v1/") {
|
||||||
let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
|
let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
|
||||||
Ok(Endpoint::Old(endpoint_v1))
|
Ok(HttpEndpoint::Old(endpoint_v1))
|
||||||
} else {
|
} else {
|
||||||
Ok(Endpoint::New(req.uri().path().to_string()))
|
Ok(HttpEndpoint::New(req.uri().path().to_string()))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle(
|
async fn handle(
|
||||||
&self,
|
&self,
|
||||||
req: Request<IncomingBody>,
|
req: Request<IncomingBody>,
|
||||||
endpoint: Endpoint,
|
endpoint: HttpEndpoint,
|
||||||
|
) -> Result<Response<ResBody>, Error> {
|
||||||
|
self.0.handle_http_api(req, endpoint).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AdminApiServer {
|
||||||
|
async fn handle_http_api(
|
||||||
|
&self,
|
||||||
|
req: Request<IncomingBody>,
|
||||||
|
endpoint: HttpEndpoint,
|
||||||
) -> Result<Response<ResBody>, Error> {
|
) -> Result<Response<ResBody>, Error> {
|
||||||
let auth_header = req.headers().get(AUTHORIZATION).cloned();
|
let auth_header = req.headers().get(AUTHORIZATION).cloned();
|
||||||
|
|
||||||
let request = match endpoint {
|
let request = match endpoint {
|
||||||
Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
|
HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
|
||||||
Endpoint::New(_) => AdminApiRequest::from_request(req).await?,
|
HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
|
||||||
};
|
};
|
||||||
|
|
||||||
let required_auth_hash =
|
let required_auth_hash =
|
||||||
|
@ -156,12 +243,12 @@ impl ApiHandler for AdminApiServer {
|
||||||
}
|
}
|
||||||
|
|
||||||
match request {
|
match request {
|
||||||
AdminApiRequest::Options(req) => req.handle(&self.garage).await,
|
AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
|
||||||
AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await,
|
AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
|
||||||
AdminApiRequest::Health(req) => req.handle(&self.garage).await,
|
AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
|
||||||
AdminApiRequest::Metrics(_req) => self.handle_metrics(),
|
AdminApiRequest::Metrics(_req) => self.handle_metrics(),
|
||||||
req => {
|
req => {
|
||||||
let res = req.handle(&self.garage).await?;
|
let res = req.handle(&self.garage, &self).await?;
|
||||||
let mut res = json_ok_response(&res)?;
|
let mut res = json_ok_response(&res)?;
|
||||||
res.headers_mut()
|
res.headers_mut()
|
||||||
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
|
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
|
||||||
|
@ -171,7 +258,7 @@ impl ApiHandler for AdminApiServer {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ApiEndpoint for Endpoint {
|
impl ApiEndpoint for HttpEndpoint {
|
||||||
fn name(&self) -> Cow<'static, str> {
|
fn name(&self) -> Cow<'static, str> {
|
||||||
match self {
|
match self {
|
||||||
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),
|
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),
|
||||||
|
|
|
@ -21,13 +21,17 @@ use garage_api_common::common_error::CommonError;
|
||||||
|
|
||||||
use crate::api::*;
|
use crate::api::*;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::EndpointHandler;
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for ListBucketsRequest {
|
impl RequestHandler for ListBucketsRequest {
|
||||||
type Response = ListBucketsResponse;
|
type Response = ListBucketsResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<ListBucketsResponse, Error> {
|
||||||
let buckets = garage
|
let buckets = garage
|
||||||
.bucket_table
|
.bucket_table
|
||||||
.get_range(
|
.get_range(
|
||||||
|
@ -71,10 +75,14 @@ impl EndpointHandler for ListBucketsRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for GetBucketInfoRequest {
|
impl RequestHandler for GetBucketInfoRequest {
|
||||||
type Response = GetBucketInfoResponse;
|
type Response = GetBucketInfoResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetBucketInfoResponse, Error> {
|
||||||
let bucket_id = match (self.id, self.global_alias, self.search) {
|
let bucket_id = match (self.id, self.global_alias, self.search) {
|
||||||
(Some(id), None, None) => parse_bucket_id(&id)?,
|
(Some(id), None, None) => parse_bucket_id(&id)?,
|
||||||
(None, Some(ga), None) => garage
|
(None, Some(ga), None) => garage
|
||||||
|
@ -223,10 +231,14 @@ async fn bucket_info_results(
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for CreateBucketRequest {
|
impl RequestHandler for CreateBucketRequest {
|
||||||
type Response = CreateBucketResponse;
|
type Response = CreateBucketResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<CreateBucketResponse, Error> {
|
||||||
let helper = garage.locked_helper().await;
|
let helper = garage.locked_helper().await;
|
||||||
|
|
||||||
if let Some(ga) = &self.global_alias {
|
if let Some(ga) = &self.global_alias {
|
||||||
|
@ -294,10 +306,14 @@ impl EndpointHandler for CreateBucketRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for DeleteBucketRequest {
|
impl RequestHandler for DeleteBucketRequest {
|
||||||
type Response = DeleteBucketResponse;
|
type Response = DeleteBucketResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<DeleteBucketResponse, Error> {
|
||||||
let helper = garage.locked_helper().await;
|
let helper = garage.locked_helper().await;
|
||||||
|
|
||||||
let bucket_id = parse_bucket_id(&self.id)?;
|
let bucket_id = parse_bucket_id(&self.id)?;
|
||||||
|
@ -343,10 +359,14 @@ impl EndpointHandler for DeleteBucketRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for UpdateBucketRequest {
|
impl RequestHandler for UpdateBucketRequest {
|
||||||
type Response = UpdateBucketResponse;
|
type Response = UpdateBucketResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<UpdateBucketResponse, Error> {
|
||||||
let bucket_id = parse_bucket_id(&self.id)?;
|
let bucket_id = parse_bucket_id(&self.id)?;
|
||||||
|
|
||||||
let mut bucket = garage
|
let mut bucket = garage
|
||||||
|
@ -390,10 +410,14 @@ impl EndpointHandler for UpdateBucketRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for CleanupIncompleteUploadsRequest {
|
impl RequestHandler for CleanupIncompleteUploadsRequest {
|
||||||
type Response = CleanupIncompleteUploadsResponse;
|
type Response = CleanupIncompleteUploadsResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<CleanupIncompleteUploadsResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<CleanupIncompleteUploadsResponse, Error> {
|
||||||
let duration = Duration::from_secs(self.older_than_secs);
|
let duration = Duration::from_secs(self.older_than_secs);
|
||||||
|
|
||||||
let bucket_id = parse_bucket_id(&self.bucket_id)?;
|
let bucket_id = parse_bucket_id(&self.bucket_id)?;
|
||||||
|
@ -412,20 +436,28 @@ impl EndpointHandler for CleanupIncompleteUploadsRequest {
|
||||||
// ---- BUCKET/KEY PERMISSIONS ----
|
// ---- BUCKET/KEY PERMISSIONS ----
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for AllowBucketKeyRequest {
|
impl RequestHandler for AllowBucketKeyRequest {
|
||||||
type Response = AllowBucketKeyResponse;
|
type Response = AllowBucketKeyResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<AllowBucketKeyResponse, Error> {
|
||||||
let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
|
let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
|
||||||
Ok(AllowBucketKeyResponse(res))
|
Ok(AllowBucketKeyResponse(res))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for DenyBucketKeyRequest {
|
impl RequestHandler for DenyBucketKeyRequest {
|
||||||
type Response = DenyBucketKeyResponse;
|
type Response = DenyBucketKeyResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<DenyBucketKeyResponse, Error> {
|
||||||
let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
|
let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
|
||||||
Ok(DenyBucketKeyResponse(res))
|
Ok(DenyBucketKeyResponse(res))
|
||||||
}
|
}
|
||||||
|
@ -471,10 +503,14 @@ pub async fn handle_bucket_change_key_perm(
|
||||||
// ---- BUCKET ALIASES ----
|
// ---- BUCKET ALIASES ----
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for AddBucketAliasRequest {
|
impl RequestHandler for AddBucketAliasRequest {
|
||||||
type Response = AddBucketAliasResponse;
|
type Response = AddBucketAliasResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<AddBucketAliasResponse, Error> {
|
||||||
let bucket_id = parse_bucket_id(&self.bucket_id)?;
|
let bucket_id = parse_bucket_id(&self.bucket_id)?;
|
||||||
|
|
||||||
let helper = garage.locked_helper().await;
|
let helper = garage.locked_helper().await;
|
||||||
|
@ -502,10 +538,14 @@ impl EndpointHandler for AddBucketAliasRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for RemoveBucketAliasRequest {
|
impl RequestHandler for RemoveBucketAliasRequest {
|
||||||
type Response = RemoveBucketAliasResponse;
|
type Response = RemoveBucketAliasResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<RemoveBucketAliasResponse, Error> {
|
||||||
let bucket_id = parse_bucket_id(&self.bucket_id)?;
|
let bucket_id = parse_bucket_id(&self.bucket_id)?;
|
||||||
|
|
||||||
let helper = garage.locked_helper().await;
|
let helper = garage.locked_helper().await;
|
||||||
|
|
|
@ -12,13 +12,17 @@ use garage_model::garage::Garage;
|
||||||
|
|
||||||
use crate::api::*;
|
use crate::api::*;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::EndpointHandler;
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for GetClusterStatusRequest {
|
impl RequestHandler for GetClusterStatusRequest {
|
||||||
type Response = GetClusterStatusResponse;
|
type Response = GetClusterStatusResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetClusterStatusResponse, Error> {
|
||||||
let layout = garage.system.cluster_layout();
|
let layout = garage.system.cluster_layout();
|
||||||
let mut nodes = garage
|
let mut nodes = garage
|
||||||
.system
|
.system
|
||||||
|
@ -117,10 +121,14 @@ impl EndpointHandler for GetClusterStatusRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for GetClusterHealthRequest {
|
impl RequestHandler for GetClusterHealthRequest {
|
||||||
type Response = GetClusterHealthResponse;
|
type Response = GetClusterHealthResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetClusterHealthResponse, Error> {
|
||||||
use garage_rpc::system::ClusterHealthStatus;
|
use garage_rpc::system::ClusterHealthStatus;
|
||||||
let health = garage.system.health();
|
let health = garage.system.health();
|
||||||
let health = GetClusterHealthResponse {
|
let health = GetClusterHealthResponse {
|
||||||
|
@ -143,10 +151,14 @@ impl EndpointHandler for GetClusterHealthRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for ConnectClusterNodesRequest {
|
impl RequestHandler for ConnectClusterNodesRequest {
|
||||||
type Response = ConnectClusterNodesResponse;
|
type Response = ConnectClusterNodesResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<ConnectClusterNodesResponse, Error> {
|
||||||
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
|
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
|
||||||
.await
|
.await
|
||||||
.into_iter()
|
.into_iter()
|
||||||
|
@ -166,10 +178,14 @@ impl EndpointHandler for ConnectClusterNodesRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for GetClusterLayoutRequest {
|
impl RequestHandler for GetClusterLayoutRequest {
|
||||||
type Response = GetClusterLayoutResponse;
|
type Response = GetClusterLayoutResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetClusterLayoutResponse, Error> {
|
||||||
Ok(format_cluster_layout(
|
Ok(format_cluster_layout(
|
||||||
garage.system.cluster_layout().inner(),
|
garage.system.cluster_layout().inner(),
|
||||||
))
|
))
|
||||||
|
@ -226,10 +242,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
|
||||||
// ---- update functions ----
|
// ---- update functions ----
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for UpdateClusterLayoutRequest {
|
impl RequestHandler for UpdateClusterLayoutRequest {
|
||||||
type Response = UpdateClusterLayoutResponse;
|
type Response = UpdateClusterLayoutResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<UpdateClusterLayoutResponse, Error> {
|
||||||
let mut layout = garage.system.cluster_layout().inner().clone();
|
let mut layout = garage.system.cluster_layout().inner().clone();
|
||||||
|
|
||||||
let mut roles = layout.current().roles.clone();
|
let mut roles = layout.current().roles.clone();
|
||||||
|
@ -272,10 +292,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for ApplyClusterLayoutRequest {
|
impl RequestHandler for ApplyClusterLayoutRequest {
|
||||||
type Response = ApplyClusterLayoutResponse;
|
type Response = ApplyClusterLayoutResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<ApplyClusterLayoutResponse, Error> {
|
||||||
let layout = garage.system.cluster_layout().inner().clone();
|
let layout = garage.system.cluster_layout().inner().clone();
|
||||||
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
|
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
|
||||||
|
|
||||||
|
@ -293,10 +317,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for RevertClusterLayoutRequest {
|
impl RequestHandler for RevertClusterLayoutRequest {
|
||||||
type Response = RevertClusterLayoutResponse;
|
type Response = RevertClusterLayoutResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<RevertClusterLayoutResponse, Error> {
|
||||||
let layout = garage.system.cluster_layout().inner().clone();
|
let layout = garage.system.cluster_layout().inner().clone();
|
||||||
let layout = layout.revert_staged_changes()?;
|
let layout = layout.revert_staged_changes()?;
|
||||||
garage
|
garage
|
||||||
|
|
|
@ -10,13 +10,13 @@ use garage_model::key_table::*;
|
||||||
|
|
||||||
use crate::api::*;
|
use crate::api::*;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::EndpointHandler;
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for ListKeysRequest {
|
impl RequestHandler for ListKeysRequest {
|
||||||
type Response = ListKeysResponse;
|
type Response = ListKeysResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> {
|
async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
|
||||||
let res = garage
|
let res = garage
|
||||||
.key_table
|
.key_table
|
||||||
.get_range(
|
.get_range(
|
||||||
|
@ -39,10 +39,14 @@ impl EndpointHandler for ListKeysRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for GetKeyInfoRequest {
|
impl RequestHandler for GetKeyInfoRequest {
|
||||||
type Response = GetKeyInfoResponse;
|
type Response = GetKeyInfoResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<GetKeyInfoResponse, Error> {
|
||||||
let key = match (self.id, self.search) {
|
let key = match (self.id, self.search) {
|
||||||
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
|
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
|
||||||
(None, Some(search)) => {
|
(None, Some(search)) => {
|
||||||
|
@ -63,10 +67,14 @@ impl EndpointHandler for GetKeyInfoRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for CreateKeyRequest {
|
impl RequestHandler for CreateKeyRequest {
|
||||||
type Response = CreateKeyResponse;
|
type Response = CreateKeyResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<CreateKeyResponse, Error> {
|
||||||
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
|
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
|
||||||
garage.key_table.insert(&key).await?;
|
garage.key_table.insert(&key).await?;
|
||||||
|
|
||||||
|
@ -77,10 +85,14 @@ impl EndpointHandler for CreateKeyRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for ImportKeyRequest {
|
impl RequestHandler for ImportKeyRequest {
|
||||||
type Response = ImportKeyResponse;
|
type Response = ImportKeyResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<ImportKeyResponse, Error> {
|
||||||
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
|
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
|
||||||
if prev_key.is_some() {
|
if prev_key.is_some() {
|
||||||
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
|
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
|
||||||
|
@ -101,10 +113,14 @@ impl EndpointHandler for ImportKeyRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for UpdateKeyRequest {
|
impl RequestHandler for UpdateKeyRequest {
|
||||||
type Response = UpdateKeyResponse;
|
type Response = UpdateKeyResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<UpdateKeyResponse, Error> {
|
||||||
let mut key = garage.key_helper().get_existing_key(&self.id).await?;
|
let mut key = garage.key_helper().get_existing_key(&self.id).await?;
|
||||||
|
|
||||||
let key_state = key.state.as_option_mut().unwrap();
|
let key_state = key.state.as_option_mut().unwrap();
|
||||||
|
@ -132,10 +148,14 @@ impl EndpointHandler for UpdateKeyRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for DeleteKeyRequest {
|
impl RequestHandler for DeleteKeyRequest {
|
||||||
type Response = DeleteKeyResponse;
|
type Response = DeleteKeyResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<DeleteKeyResponse, Error> {
|
||||||
let helper = garage.locked_helper().await;
|
let helper = garage.locked_helper().await;
|
||||||
|
|
||||||
let mut key = helper.key().get_existing_key(&self.id).await?;
|
let mut key = helper.key().get_existing_key(&self.id).await?;
|
||||||
|
|
|
@ -15,12 +15,16 @@ mod cluster;
|
||||||
mod key;
|
mod key;
|
||||||
mod special;
|
mod special;
|
||||||
|
|
||||||
|
mod worker;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
pub use api_server::AdminApiServer as Admin;
|
||||||
|
|
||||||
pub enum Authorization {
|
pub enum Authorization {
|
||||||
None,
|
None,
|
||||||
MetricsToken,
|
MetricsToken,
|
||||||
|
@ -28,8 +32,12 @@ pub enum Authorization {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait EndpointHandler {
|
pub trait RequestHandler {
|
||||||
type Response;
|
type Response;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>;
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
admin: &Admin,
|
||||||
|
) -> Result<Self::Response, error::Error>;
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,10 +71,10 @@ macro_rules! admin_endpoints {
|
||||||
)*
|
)*
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for AdminApiRequest {
|
impl RequestHandler for AdminApiRequest {
|
||||||
type Response = AdminApiResponse;
|
type Response = AdminApiResponse;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> {
|
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
|
||||||
Ok(match self {
|
Ok(match self {
|
||||||
$(
|
$(
|
||||||
AdminApiRequest::$special_endpoint(_) => panic!(
|
AdminApiRequest::$special_endpoint(_) => panic!(
|
||||||
|
@ -82,7 +82,142 @@ macro_rules! admin_endpoints {
|
||||||
),
|
),
|
||||||
)*
|
)*
|
||||||
$(
|
$(
|
||||||
AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?),
|
AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
|
||||||
|
)*
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
macro_rules! local_admin_endpoints {
|
||||||
|
[
|
||||||
|
$($endpoint:ident,)*
|
||||||
|
] => {
|
||||||
|
paste! {
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum LocalAdminApiRequest {
|
||||||
|
$(
|
||||||
|
$endpoint( [<Local $endpoint Request>] ),
|
||||||
|
)*
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub enum LocalAdminApiResponse {
|
||||||
|
$(
|
||||||
|
$endpoint( [<Local $endpoint Response>] ),
|
||||||
|
)*
|
||||||
|
}
|
||||||
|
|
||||||
|
$(
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct [< $endpoint Request >] {
|
||||||
|
pub node: String,
|
||||||
|
pub body: [< Local $endpoint Request >],
|
||||||
|
}
|
||||||
|
|
||||||
|
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
|
pub struct [< $endpoint Response >] {
|
||||||
|
pub success: HashMap<String, [< Local $endpoint Response >] >,
|
||||||
|
pub error: HashMap<String, String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
|
||||||
|
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
|
||||||
|
LocalAdminApiRequest::$endpoint(req)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] {
|
||||||
|
type Error = LocalAdminApiResponse;
|
||||||
|
fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> {
|
||||||
|
match resp {
|
||||||
|
LocalAdminApiResponse::$endpoint(v) => Ok(v),
|
||||||
|
x => Err(x),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for [< $endpoint Request >] {
|
||||||
|
type Response = [< $endpoint Response >];
|
||||||
|
|
||||||
|
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
|
||||||
|
let to = match self.node.as_str() {
|
||||||
|
"*" => garage.system.cluster_layout().all_nodes().to_vec(),
|
||||||
|
id => {
|
||||||
|
let nodes = garage.system.cluster_layout().all_nodes()
|
||||||
|
.iter()
|
||||||
|
.filter(|x| hex::encode(x).starts_with(id))
|
||||||
|
.cloned()
|
||||||
|
.collect::<Vec<_>>();
|
||||||
|
if nodes.len() != 1 {
|
||||||
|
return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
|
||||||
|
}
|
||||||
|
nodes
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
|
||||||
|
&to,
|
||||||
|
AdminRpc::Internal(self.body.into()),
|
||||||
|
RequestStrategy::with_priority(PRIO_NORMAL),
|
||||||
|
).await?;
|
||||||
|
|
||||||
|
let mut ret = [< $endpoint Response >] {
|
||||||
|
success: HashMap::new(),
|
||||||
|
error: HashMap::new(),
|
||||||
|
};
|
||||||
|
for (node, resp) in resps {
|
||||||
|
match resp {
|
||||||
|
Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
|
||||||
|
match [< Local $endpoint Response >]::try_from(r) {
|
||||||
|
Ok(r) => {
|
||||||
|
ret.success.insert(hex::encode(node), r);
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
ret.error.insert(hex::encode(node), "returned invalid value".to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
|
||||||
|
ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
|
||||||
|
}
|
||||||
|
Ok(_) => {
|
||||||
|
ret.error.insert(hex::encode(node), "returned invalid value".to_string());
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
ret.error.insert(hex::encode(node), e.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(ret)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
)*
|
||||||
|
|
||||||
|
impl LocalAdminApiRequest {
|
||||||
|
pub fn name(&self) -> &'static str {
|
||||||
|
match self {
|
||||||
|
$(
|
||||||
|
Self::$endpoint(_) => stringify!($endpoint),
|
||||||
|
)*
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for LocalAdminApiRequest {
|
||||||
|
type Response = LocalAdminApiResponse;
|
||||||
|
|
||||||
|
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
|
||||||
|
Ok(match self {
|
||||||
|
$(
|
||||||
|
LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
|
||||||
)*
|
)*
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
@ -92,3 +227,4 @@ macro_rules! admin_endpoints {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) use admin_endpoints;
|
pub(crate) use admin_endpoints;
|
||||||
|
pub(crate) use local_admin_endpoints;
|
||||||
|
|
|
@ -59,6 +59,8 @@ impl AdminApiRequest {
|
||||||
// Bucket aliases
|
// Bucket aliases
|
||||||
POST AddBucketAlias (body),
|
POST AddBucketAlias (body),
|
||||||
POST RemoveBucketAlias (body),
|
POST RemoveBucketAlias (body),
|
||||||
|
// Worker APIs
|
||||||
|
POST GetWorkerVariable (body_field, query::node),
|
||||||
]);
|
]);
|
||||||
|
|
||||||
if let Some(message) = query.nonempty_message() {
|
if let Some(message) = query.nonempty_message() {
|
||||||
|
@ -240,6 +242,7 @@ impl AdminApiRequest {
|
||||||
generateQueryParameters! {
|
generateQueryParameters! {
|
||||||
keywords: [],
|
keywords: [],
|
||||||
fields: [
|
fields: [
|
||||||
|
"node" => node,
|
||||||
"domain" => domain,
|
"domain" => domain,
|
||||||
"format" => format,
|
"format" => format,
|
||||||
"id" => id,
|
"id" => id,
|
||||||
|
|
|
@ -15,13 +15,17 @@ use garage_api_common::helpers::*;
|
||||||
use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
|
use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
|
||||||
use crate::api_server::ResBody;
|
use crate::api_server::ResBody;
|
||||||
use crate::error::*;
|
use crate::error::*;
|
||||||
use crate::EndpointHandler;
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for OptionsRequest {
|
impl RequestHandler for OptionsRequest {
|
||||||
type Response = Response<ResBody>;
|
type Response = Response<ResBody>;
|
||||||
|
|
||||||
async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
_garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<Response<ResBody>, Error> {
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
.header(ALLOW, "OPTIONS,GET,POST")
|
.header(ALLOW, "OPTIONS,GET,POST")
|
||||||
|
@ -33,10 +37,14 @@ impl EndpointHandler for OptionsRequest {
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for CheckDomainRequest {
|
impl RequestHandler for CheckDomainRequest {
|
||||||
type Response = Response<ResBody>;
|
type Response = Response<ResBody>;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<Response<ResBody>, Error> {
|
||||||
if check_domain(garage, &self.domain).await? {
|
if check_domain(garage, &self.domain).await? {
|
||||||
Ok(Response::builder()
|
Ok(Response::builder()
|
||||||
.status(StatusCode::OK)
|
.status(StatusCode::OK)
|
||||||
|
@ -103,10 +111,14 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl EndpointHandler for HealthRequest {
|
impl RequestHandler for HealthRequest {
|
||||||
type Response = Response<ResBody>;
|
type Response = Response<ResBody>;
|
||||||
|
|
||||||
async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<Response<ResBody>, Error> {
|
||||||
let health = garage.system.health();
|
let health = garage.system.health();
|
||||||
|
|
||||||
let (status, status_str) = match health.status {
|
let (status, status_str) = match health.status {
|
||||||
|
|
50
src/api/admin/worker.rs
Normal file
50
src/api/admin/worker.rs
Normal file
|
@ -0,0 +1,50 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use garage_model::garage::Garage;
|
||||||
|
|
||||||
|
use crate::api::*;
|
||||||
|
use crate::error::Error;
|
||||||
|
use crate::{Admin, RequestHandler};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for LocalGetWorkerVariableRequest {
|
||||||
|
type Response = LocalGetWorkerVariableResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<LocalGetWorkerVariableResponse, Error> {
|
||||||
|
let mut res = HashMap::new();
|
||||||
|
if let Some(k) = self.variable {
|
||||||
|
res.insert(k.clone(), garage.bg_vars.get(&k)?);
|
||||||
|
} else {
|
||||||
|
let vars = garage.bg_vars.get_all();
|
||||||
|
for (k, v) in vars.iter() {
|
||||||
|
res.insert(k.to_string(), v.to_string());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(LocalGetWorkerVariableResponse(res))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl RequestHandler for LocalSetWorkerVariableRequest {
|
||||||
|
type Response = LocalSetWorkerVariableResponse;
|
||||||
|
|
||||||
|
async fn handle(
|
||||||
|
self,
|
||||||
|
garage: &Arc<Garage>,
|
||||||
|
_admin: &Admin,
|
||||||
|
) -> Result<LocalSetWorkerVariableResponse, Error> {
|
||||||
|
garage.bg_vars.set(&self.variable, &self.value)?;
|
||||||
|
|
||||||
|
Ok(LocalSetWorkerVariableResponse {
|
||||||
|
variable: self.variable,
|
||||||
|
value: self.value,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,7 +27,7 @@ use garage_model::s3::mpu_table::MultipartUpload;
|
||||||
use garage_model::s3::version_table::Version;
|
use garage_model::s3::version_table::Version;
|
||||||
|
|
||||||
use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse};
|
use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse};
|
||||||
use garage_api_admin::EndpointHandler as AdminApiEndpoint;
|
use garage_api_admin::RequestHandler as AdminApiEndpoint;
|
||||||
use garage_api_common::generic_server::ApiError;
|
use garage_api_common::generic_server::ApiError;
|
||||||
|
|
||||||
use crate::cli::*;
|
use crate::cli::*;
|
||||||
|
@ -50,7 +50,6 @@ pub enum AdminRpc {
|
||||||
HashMap<usize, garage_util::background::WorkerInfo>,
|
HashMap<usize, garage_util::background::WorkerInfo>,
|
||||||
WorkerListOpt,
|
WorkerListOpt,
|
||||||
),
|
),
|
||||||
WorkerVars(Vec<(Uuid, String, String)>),
|
|
||||||
WorkerInfo(usize, garage_util::background::WorkerInfo),
|
WorkerInfo(usize, garage_util::background::WorkerInfo),
|
||||||
BlockErrorList(Vec<BlockResyncErrorInfo>),
|
BlockErrorList(Vec<BlockResyncErrorInfo>),
|
||||||
BlockInfo {
|
BlockInfo {
|
||||||
|
@ -59,15 +58,6 @@ pub enum AdminRpc {
|
||||||
versions: Vec<Result<Version, Uuid>>,
|
versions: Vec<Result<Version, Uuid>>,
|
||||||
uploads: Vec<MultipartUpload>,
|
uploads: Vec<MultipartUpload>,
|
||||||
},
|
},
|
||||||
|
|
||||||
// Proxying HTTP Admin API endpoints
|
|
||||||
ApiRequest(AdminApiRequest),
|
|
||||||
ApiOkResponse(TaggedAdminApiResponse),
|
|
||||||
ApiErrorResponse {
|
|
||||||
http_code: u16,
|
|
||||||
error_code: String,
|
|
||||||
message: String,
|
|
||||||
},
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Rpc for AdminRpc {
|
impl Rpc for AdminRpc {
|
||||||
|
@ -367,101 +357,7 @@ impl AdminRpcHandler {
|
||||||
.clone();
|
.clone();
|
||||||
Ok(AdminRpc::WorkerInfo(*tid, info))
|
Ok(AdminRpc::WorkerInfo(*tid, info))
|
||||||
}
|
}
|
||||||
WorkerOperation::Get {
|
_ => unreachable!(),
|
||||||
all_nodes,
|
|
||||||
variable,
|
|
||||||
} => self.handle_get_var(*all_nodes, variable).await,
|
|
||||||
WorkerOperation::Set {
|
|
||||||
all_nodes,
|
|
||||||
variable,
|
|
||||||
value,
|
|
||||||
} => self.handle_set_var(*all_nodes, variable, value).await,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_get_var(
|
|
||||||
&self,
|
|
||||||
all_nodes: bool,
|
|
||||||
variable: &Option<String>,
|
|
||||||
) -> Result<AdminRpc, Error> {
|
|
||||||
if all_nodes {
|
|
||||||
let mut ret = vec![];
|
|
||||||
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
|
|
||||||
for node in all_nodes.iter() {
|
|
||||||
let node = (*node).into();
|
|
||||||
match self
|
|
||||||
.endpoint
|
|
||||||
.call(
|
|
||||||
&node,
|
|
||||||
AdminRpc::Worker(WorkerOperation::Get {
|
|
||||||
all_nodes: false,
|
|
||||||
variable: variable.clone(),
|
|
||||||
}),
|
|
||||||
PRIO_NORMAL,
|
|
||||||
)
|
|
||||||
.await??
|
|
||||||
{
|
|
||||||
AdminRpc::WorkerVars(v) => ret.extend(v),
|
|
||||||
m => return Err(GarageError::unexpected_rpc_message(m).into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(AdminRpc::WorkerVars(ret))
|
|
||||||
} else {
|
|
||||||
#[allow(clippy::collapsible_else_if)]
|
|
||||||
if let Some(v) = variable {
|
|
||||||
Ok(AdminRpc::WorkerVars(vec![(
|
|
||||||
self.garage.system.id,
|
|
||||||
v.clone(),
|
|
||||||
self.garage.bg_vars.get(v)?,
|
|
||||||
)]))
|
|
||||||
} else {
|
|
||||||
let mut vars = self.garage.bg_vars.get_all();
|
|
||||||
vars.sort();
|
|
||||||
Ok(AdminRpc::WorkerVars(
|
|
||||||
vars.into_iter()
|
|
||||||
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
|
|
||||||
.collect(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn handle_set_var(
|
|
||||||
&self,
|
|
||||||
all_nodes: bool,
|
|
||||||
variable: &str,
|
|
||||||
value: &str,
|
|
||||||
) -> Result<AdminRpc, Error> {
|
|
||||||
if all_nodes {
|
|
||||||
let mut ret = vec![];
|
|
||||||
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
|
|
||||||
for node in all_nodes.iter() {
|
|
||||||
let node = (*node).into();
|
|
||||||
match self
|
|
||||||
.endpoint
|
|
||||||
.call(
|
|
||||||
&node,
|
|
||||||
AdminRpc::Worker(WorkerOperation::Set {
|
|
||||||
all_nodes: false,
|
|
||||||
variable: variable.to_string(),
|
|
||||||
value: value.to_string(),
|
|
||||||
}),
|
|
||||||
PRIO_NORMAL,
|
|
||||||
)
|
|
||||||
.await??
|
|
||||||
{
|
|
||||||
AdminRpc::WorkerVars(v) => ret.extend(v),
|
|
||||||
m => return Err(GarageError::unexpected_rpc_message(m).into()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(AdminRpc::WorkerVars(ret))
|
|
||||||
} else {
|
|
||||||
self.garage.bg_vars.set(variable, value)?;
|
|
||||||
Ok(AdminRpc::WorkerVars(vec![(
|
|
||||||
self.garage.system.id,
|
|
||||||
variable.to_string(),
|
|
||||||
value.to_string(),
|
|
||||||
)]))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -501,25 +397,6 @@ impl AdminRpcHandler {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ================== PROXYING ADMIN API REQUESTS ===================
|
|
||||||
|
|
||||||
async fn handle_api_request(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
req: &AdminApiRequest,
|
|
||||||
) -> Result<AdminRpc, Error> {
|
|
||||||
let req = req.clone();
|
|
||||||
info!("Proxied admin API request: {}", req.name());
|
|
||||||
let res = req.handle(&self.garage).await;
|
|
||||||
match res {
|
|
||||||
Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
|
|
||||||
Err(e) => Ok(AdminRpc::ApiErrorResponse {
|
|
||||||
http_code: e.http_status_code().as_u16(),
|
|
||||||
error_code: e.code().to_string(),
|
|
||||||
message: e.to_string(),
|
|
||||||
}),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
@ -535,7 +412,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
||||||
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
|
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
|
||||||
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
||||||
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
||||||
AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
|
|
||||||
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,9 +20,6 @@ pub async fn cmd_admin(
|
||||||
AdminRpc::WorkerList(wi, wlo) => {
|
AdminRpc::WorkerList(wi, wlo) => {
|
||||||
print_worker_list(wi, wlo);
|
print_worker_list(wi, wlo);
|
||||||
}
|
}
|
||||||
AdminRpc::WorkerVars(wv) => {
|
|
||||||
print_worker_vars(wv);
|
|
||||||
}
|
|
||||||
AdminRpc::WorkerInfo(tid, wi) => {
|
AdminRpc::WorkerInfo(tid, wi) => {
|
||||||
print_worker_info(tid, wi);
|
print_worker_info(tid, wi);
|
||||||
}
|
}
|
||||||
|
|
|
@ -126,14 +126,6 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
|
||||||
format_table(table);
|
format_table(table);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
|
|
||||||
let table = wv
|
|
||||||
.into_iter()
|
|
||||||
.map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
format_table(table);
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
|
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
let tf = timeago::Formatter::new();
|
let tf = timeago::Formatter::new();
|
||||||
|
|
|
@ -3,6 +3,8 @@ pub mod cluster;
|
||||||
pub mod key;
|
pub mod key;
|
||||||
pub mod layout;
|
pub mod layout;
|
||||||
|
|
||||||
|
pub mod worker;
|
||||||
|
|
||||||
use std::convert::TryFrom;
|
use std::convert::TryFrom;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
@ -13,7 +15,8 @@ use garage_rpc::system::*;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
use garage_api_admin::api::*;
|
use garage_api_admin::api::*;
|
||||||
use garage_api_admin::EndpointHandler as AdminApiEndpoint;
|
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
|
||||||
|
use garage_api_admin::RequestHandler as AdminApiEndpoint;
|
||||||
|
|
||||||
use crate::admin::*;
|
use crate::admin::*;
|
||||||
use crate::cli as cli_v1;
|
use crate::cli as cli_v1;
|
||||||
|
@ -23,6 +26,7 @@ use crate::cli::Command;
|
||||||
pub struct Cli {
|
pub struct Cli {
|
||||||
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
|
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
|
||||||
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
|
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
|
||||||
|
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
|
||||||
pub rpc_host: NodeID,
|
pub rpc_host: NodeID,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -36,6 +40,7 @@ impl Cli {
|
||||||
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
|
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
|
||||||
Command::Bucket(bo) => self.cmd_bucket(bo).await,
|
Command::Bucket(bo) => self.cmd_bucket(bo).await,
|
||||||
Command::Key(ko) => self.cmd_key(ko).await,
|
Command::Key(ko) => self.cmd_key(ko).await,
|
||||||
|
Command::Worker(wo) => self.cmd_worker(wo).await,
|
||||||
|
|
||||||
// TODO
|
// TODO
|
||||||
Command::Repair(ro) => cli_v1::cmd_admin(
|
Command::Repair(ro) => cli_v1::cmd_admin(
|
||||||
|
@ -50,13 +55,6 @@ impl Cli {
|
||||||
.await
|
.await
|
||||||
.ok_or_message("cli_v1")
|
.ok_or_message("cli_v1")
|
||||||
}
|
}
|
||||||
Command::Worker(wo) => cli_v1::cmd_admin(
|
|
||||||
&self.admin_rpc_endpoint,
|
|
||||||
self.rpc_host,
|
|
||||||
AdminRpc::Worker(wo),
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.ok_or_message("cli_v1"),
|
|
||||||
Command::Block(bo) => cli_v1::cmd_admin(
|
Command::Block(bo) => cli_v1::cmd_admin(
|
||||||
&self.admin_rpc_endpoint,
|
&self.admin_rpc_endpoint,
|
||||||
self.rpc_host,
|
self.rpc_host,
|
||||||
|
@ -85,14 +83,16 @@ impl Cli {
|
||||||
let req = AdminApiRequest::from(req);
|
let req = AdminApiRequest::from(req);
|
||||||
let req_name = req.name();
|
let req_name = req.name();
|
||||||
match self
|
match self
|
||||||
.admin_rpc_endpoint
|
.proxy_rpc_endpoint
|
||||||
.call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
|
.call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
|
||||||
.await?
|
.await??
|
||||||
.ok_or_message("rpc")?
|
|
||||||
{
|
{
|
||||||
AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
|
ProxyRpcResponse::ProxyApiOkResponse(resp) => {
|
||||||
.map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
|
<T as AdminApiEndpoint>::Response::try_from(resp).map_err(|_| {
|
||||||
AdminRpc::ApiErrorResponse {
|
Error::Message(format!("{} returned unexpected response", req_name))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
ProxyRpcResponse::ApiErrorResponse {
|
||||||
http_code,
|
http_code,
|
||||||
error_code,
|
error_code,
|
||||||
message,
|
message,
|
||||||
|
|
89
src/garage/cli_v2/worker.rs
Normal file
89
src/garage/cli_v2/worker.rs
Normal file
|
@ -0,0 +1,89 @@
|
||||||
|
//use bytesize::ByteSize;
|
||||||
|
use format_table::format_table;
|
||||||
|
|
||||||
|
use garage_util::error::*;
|
||||||
|
|
||||||
|
use garage_api_admin::api::*;
|
||||||
|
|
||||||
|
use crate::cli::structs::*;
|
||||||
|
use crate::cli_v2::*;
|
||||||
|
|
||||||
|
impl Cli {
|
||||||
|
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
|
||||||
|
match cmd {
|
||||||
|
WorkerOperation::Get {
|
||||||
|
all_nodes,
|
||||||
|
variable,
|
||||||
|
} => self.cmd_get_var(all_nodes, variable).await,
|
||||||
|
WorkerOperation::Set {
|
||||||
|
all_nodes,
|
||||||
|
variable,
|
||||||
|
value,
|
||||||
|
} => self.cmd_set_var(all_nodes, variable, value).await,
|
||||||
|
wo => cli_v1::cmd_admin(
|
||||||
|
&self.admin_rpc_endpoint,
|
||||||
|
self.rpc_host,
|
||||||
|
AdminRpc::Worker(wo),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.ok_or_message("cli_v1"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
|
||||||
|
let res = self
|
||||||
|
.api_request(GetWorkerVariableRequest {
|
||||||
|
node: if all {
|
||||||
|
"*".to_string()
|
||||||
|
} else {
|
||||||
|
hex::encode(self.rpc_host)
|
||||||
|
},
|
||||||
|
body: LocalGetWorkerVariableRequest { variable: var },
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut table = vec![];
|
||||||
|
for (node, vars) in res.success.iter() {
|
||||||
|
for (key, val) in vars.0.iter() {
|
||||||
|
table.push(format!("{:.16}\t{}\t{}", node, key, val));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
format_table(table);
|
||||||
|
|
||||||
|
for (node, err) in res.error.iter() {
|
||||||
|
eprintln!("{:.16}: error: {}", node, err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn cmd_set_var(
|
||||||
|
&self,
|
||||||
|
all: bool,
|
||||||
|
variable: String,
|
||||||
|
value: String,
|
||||||
|
) -> Result<(), Error> {
|
||||||
|
let res = self
|
||||||
|
.api_request(SetWorkerVariableRequest {
|
||||||
|
node: if all {
|
||||||
|
"*".to_string()
|
||||||
|
} else {
|
||||||
|
hex::encode(self.rpc_host)
|
||||||
|
},
|
||||||
|
body: LocalSetWorkerVariableRequest { variable, value },
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let mut table = vec![];
|
||||||
|
for (node, kv) in res.success.iter() {
|
||||||
|
table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
|
||||||
|
}
|
||||||
|
format_table(table);
|
||||||
|
|
||||||
|
for (node, err) in res.error.iter() {
|
||||||
|
eprintln!("{:.16}: error: {}", node, err);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
|
@ -35,6 +35,8 @@ use garage_util::error::*;
|
||||||
use garage_rpc::system::*;
|
use garage_rpc::system::*;
|
||||||
use garage_rpc::*;
|
use garage_rpc::*;
|
||||||
|
|
||||||
|
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
|
||||||
|
|
||||||
use admin::*;
|
use admin::*;
|
||||||
use cli::*;
|
use cli::*;
|
||||||
use secrets::Secrets;
|
use secrets::Secrets;
|
||||||
|
@ -282,10 +284,12 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
|
||||||
|
|
||||||
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
|
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
|
||||||
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
|
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
|
||||||
|
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
|
||||||
|
|
||||||
let cli = cli_v2::Cli {
|
let cli = cli_v2::Cli {
|
||||||
system_rpc_endpoint,
|
system_rpc_endpoint,
|
||||||
admin_rpc_endpoint,
|
admin_rpc_endpoint,
|
||||||
|
proxy_rpc_endpoint,
|
||||||
rpc_host: id,
|
rpc_host: id,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,5 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
@ -64,8 +65,9 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
}
|
}
|
||||||
|
|
||||||
info!("Initialize Admin API server and metrics collector...");
|
info!("Initialize Admin API server and metrics collector...");
|
||||||
let admin_server = AdminApiServer::new(
|
let admin_server: Arc<AdminApiServer> = AdminApiServer::new(
|
||||||
garage.clone(),
|
garage.clone(),
|
||||||
|
background.clone(),
|
||||||
#[cfg(feature = "metrics")]
|
#[cfg(feature = "metrics")]
|
||||||
metrics_exporter,
|
metrics_exporter,
|
||||||
);
|
);
|
||||||
|
|
Loading…
Add table
Reference in a new issue