wip: infrastructure for local api calls
Some checks failed
ci/woodpecker/pr/debug Pipeline failed
ci/woodpecker/push/debug Pipeline failed

This commit is contained in:
Alex 2025-01-30 19:38:18 +01:00
parent de7bbe8be0
commit e3d1571247
11 changed files with 241 additions and 137 deletions

View file

@ -11,7 +11,7 @@ use garage_model::garage::Garage;
use crate::admin::error::Error; use crate::admin::error::Error;
use crate::admin::macros::*; use crate::admin::macros::*;
use crate::admin::RequestHandler; use crate::admin::{Admin, RequestHandler};
use crate::helpers::is_default; use crate::helpers::is_default;
// This generates the following: // This generates the following:
@ -592,8 +592,8 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerParamRequest { pub struct LocalGetWorkerParamRequest {
param: Option<String>, pub param: Option<String>,
} }
#[derive(Debug, Clone, Serialize, Deserialize)] #[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerParamResponse(HashMap<String, String>); pub struct LocalGetWorkerParamResponse(pub HashMap<String, String>);

View file

@ -6,6 +6,7 @@ use async_trait::async_trait;
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION}; use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use serde::{Deserialize, Serialize};
use tokio::sync::watch; use tokio::sync::watch;
use opentelemetry::trace::SpanRef; use opentelemetry::trace::SpanRef;
@ -16,6 +17,8 @@ use opentelemetry_prometheus::PrometheusExporter;
use prometheus::{Encoder, TextEncoder}; use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage; use garage_model::garage::Garage;
use garage_rpc::{Endpoint as RpcEndpoint, *};
use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError; use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress; use garage_util::socket_address::UnixOrTCPSocketAddress;
@ -29,6 +32,50 @@ use crate::admin::Authorization;
use crate::admin::RequestHandler; use crate::admin::RequestHandler;
use crate::helpers::*; use crate::helpers::*;
// ---- FOR RPC ----
pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
pub struct AdminRpc(LocalAdminApiRequest);
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpcResponse {
ApiOkResponse(LocalAdminApiResponse),
ApiErrorResponse {
http_code: u16,
error_code: String,
message: String,
},
}
impl Rpc for AdminRpc {
type Response = Result<AdminRpcResponse, GarageError>;
}
#[async_trait]
impl EndpointHandler<AdminRpc> for AdminApiServer {
async fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
) -> Result<AdminRpcResponse, GarageError> {
let req = message.0.clone();
info!("Proxied admin API request: {}", req.name());
let res = req.handle(&self.garage, &self).await;
match res {
Ok(res) => Ok(AdminRpcResponse::ApiOkResponse(res)),
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
http_code: e.http_status_code().as_u16(),
error_code: e.code().to_string(),
message: e.to_string(),
}),
}
}
}
// ---- FOR HTTP ----
pub type ResBody = BoxBody<Error>; pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer { pub struct AdminApiServer {
@ -37,9 +84,11 @@ pub struct AdminApiServer {
exporter: PrometheusExporter, exporter: PrometheusExporter,
metrics_token: Option<String>, metrics_token: Option<String>,
admin_token: Option<String>, admin_token: Option<String>,
background: Arc<BackgroundRunner>,
endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
} }
pub enum Endpoint { pub enum HttpEndpoint {
Old(router_v1::Endpoint), Old(router_v1::Endpoint),
New(String), New(String),
} }
@ -47,18 +96,25 @@ pub enum Endpoint {
impl AdminApiServer { impl AdminApiServer {
pub fn new( pub fn new(
garage: Arc<Garage>, garage: Arc<Garage>,
background: Arc<BackgroundRunner>,
#[cfg(feature = "metrics")] exporter: PrometheusExporter, #[cfg(feature = "metrics")] exporter: PrometheusExporter,
) -> Self { ) -> Arc<Self> {
let cfg = &garage.config.admin; let cfg = &garage.config.admin;
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token); let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token); let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
Self {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
let admin = Arc::new(Self {
garage, garage,
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
exporter, exporter,
metrics_token, metrics_token,
admin_token, admin_token,
} background,
endpoint,
});
admin.endpoint.set_handler(admin.clone());
admin
} }
pub async fn run( pub async fn run(
@ -106,32 +162,32 @@ impl ApiHandler for AdminApiServer {
const API_NAME: &'static str = "admin"; const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin"; const API_NAME_DISPLAY: &'static str = "Admin";
type Endpoint = Endpoint; type Endpoint = HttpEndpoint;
type Error = Error; type Error = Error;
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> { fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
if req.uri().path().starts_with("/v0/") { if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?; let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?; let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
Ok(Endpoint::Old(endpoint_v1)) Ok(HttpEndpoint::Old(endpoint_v1))
} else if req.uri().path().starts_with("/v1/") { } else if req.uri().path().starts_with("/v1/") {
let endpoint_v1 = router_v1::Endpoint::from_request(req)?; let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
Ok(Endpoint::Old(endpoint_v1)) Ok(HttpEndpoint::Old(endpoint_v1))
} else { } else {
Ok(Endpoint::New(req.uri().path().to_string())) Ok(HttpEndpoint::New(req.uri().path().to_string()))
} }
} }
async fn handle( async fn handle(
&self, &self,
req: Request<IncomingBody>, req: Request<IncomingBody>,
endpoint: Endpoint, endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> { ) -> Result<Response<ResBody>, Error> {
let auth_header = req.headers().get(AUTHORIZATION).cloned(); let auth_header = req.headers().get(AUTHORIZATION).cloned();
let request = match endpoint { let request = match endpoint {
Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?, HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
Endpoint::New(_) => AdminApiRequest::from_request(req).await?, HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
}; };
let required_auth_hash = let required_auth_hash =
@ -156,12 +212,12 @@ impl ApiHandler for AdminApiServer {
} }
match request { match request {
AdminApiRequest::Options(req) => req.handle(&self.garage).await, AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await, AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::Health(req) => req.handle(&self.garage).await, AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::Metrics(_req) => self.handle_metrics(), AdminApiRequest::Metrics(_req) => self.handle_metrics(),
req => { req => {
let res = req.handle(&self.garage).await?; let res = req.handle(&self.garage, &self).await?;
let mut res = json_ok_response(&res)?; let mut res = json_ok_response(&res)?;
res.headers_mut() res.headers_mut()
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*")); .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
@ -171,7 +227,7 @@ impl ApiHandler for AdminApiServer {
} }
} }
impl ApiEndpoint for Endpoint { impl ApiEndpoint for HttpEndpoint {
fn name(&self) -> Cow<'static, str> { fn name(&self) -> Cow<'static, str> {
match self { match self {
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()), Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),

View file

@ -19,14 +19,18 @@ use garage_model::s3::object_table::*;
use crate::admin::api::*; use crate::admin::api::*;
use crate::admin::error::*; use crate::admin::error::*;
use crate::admin::RequestHandler; use crate::admin::{Admin, RequestHandler};
use crate::common_error::CommonError; use crate::common_error::CommonError;
#[async_trait] #[async_trait]
impl RequestHandler for ListBucketsRequest { impl RequestHandler for ListBucketsRequest {
type Response = ListBucketsResponse; type Response = ListBucketsResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ListBucketsResponse, Error> {
let buckets = garage let buckets = garage
.bucket_table .bucket_table
.get_range( .get_range(
@ -73,7 +77,11 @@ impl RequestHandler for ListBucketsRequest {
impl RequestHandler for GetBucketInfoRequest { impl RequestHandler for GetBucketInfoRequest {
type Response = GetBucketInfoResponse; type Response = GetBucketInfoResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias, self.search) { let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?, (Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage (None, Some(ga), None) => garage
@ -225,7 +233,11 @@ async fn bucket_info_results(
impl RequestHandler for CreateBucketRequest { impl RequestHandler for CreateBucketRequest {
type Response = CreateBucketResponse; type Response = CreateBucketResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<CreateBucketResponse, Error> {
let helper = garage.locked_helper().await; let helper = garage.locked_helper().await;
if let Some(ga) = &self.global_alias { if let Some(ga) = &self.global_alias {
@ -296,7 +308,11 @@ impl RequestHandler for CreateBucketRequest {
impl RequestHandler for DeleteBucketRequest { impl RequestHandler for DeleteBucketRequest {
type Response = DeleteBucketResponse; type Response = DeleteBucketResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<DeleteBucketResponse, Error> {
let helper = garage.locked_helper().await; let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&self.id)?; let bucket_id = parse_bucket_id(&self.id)?;
@ -345,7 +361,11 @@ impl RequestHandler for DeleteBucketRequest {
impl RequestHandler for UpdateBucketRequest { impl RequestHandler for UpdateBucketRequest {
type Response = UpdateBucketResponse; type Response = UpdateBucketResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<UpdateBucketResponse, Error> {
let bucket_id = parse_bucket_id(&self.id)?; let bucket_id = parse_bucket_id(&self.id)?;
let mut bucket = garage let mut bucket = garage
@ -392,7 +412,11 @@ impl RequestHandler for UpdateBucketRequest {
impl RequestHandler for CleanupIncompleteUploadsRequest { impl RequestHandler for CleanupIncompleteUploadsRequest {
type Response = CleanupIncompleteUploadsResponse; type Response = CleanupIncompleteUploadsResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<CleanupIncompleteUploadsResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<CleanupIncompleteUploadsResponse, Error> {
let duration = Duration::from_secs(self.older_than_secs); let duration = Duration::from_secs(self.older_than_secs);
let bucket_id = parse_bucket_id(&self.bucket_id)?; let bucket_id = parse_bucket_id(&self.bucket_id)?;
@ -414,7 +438,11 @@ impl RequestHandler for CleanupIncompleteUploadsRequest {
impl RequestHandler for AllowBucketKeyRequest { impl RequestHandler for AllowBucketKeyRequest {
type Response = AllowBucketKeyResponse; type Response = AllowBucketKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<AllowBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, true).await?; let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
Ok(AllowBucketKeyResponse(res)) Ok(AllowBucketKeyResponse(res))
} }
@ -424,7 +452,11 @@ impl RequestHandler for AllowBucketKeyRequest {
impl RequestHandler for DenyBucketKeyRequest { impl RequestHandler for DenyBucketKeyRequest {
type Response = DenyBucketKeyResponse; type Response = DenyBucketKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<DenyBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, false).await?; let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
Ok(DenyBucketKeyResponse(res)) Ok(DenyBucketKeyResponse(res))
} }
@ -473,7 +505,11 @@ pub async fn handle_bucket_change_key_perm(
impl RequestHandler for AddBucketAliasRequest { impl RequestHandler for AddBucketAliasRequest {
type Response = AddBucketAliasResponse; type Response = AddBucketAliasResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<AddBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?; let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await; let helper = garage.locked_helper().await;
@ -504,7 +540,11 @@ impl RequestHandler for AddBucketAliasRequest {
impl RequestHandler for RemoveBucketAliasRequest { impl RequestHandler for RemoveBucketAliasRequest {
type Response = RemoveBucketAliasResponse; type Response = RemoveBucketAliasResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<RemoveBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?; let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await; let helper = garage.locked_helper().await;

View file

@ -12,13 +12,17 @@ use garage_model::garage::Garage;
use crate::admin::api::*; use crate::admin::api::*;
use crate::admin::error::*; use crate::admin::error::*;
use crate::admin::RequestHandler; use crate::admin::{Admin, RequestHandler};
#[async_trait] #[async_trait]
impl RequestHandler for GetClusterStatusRequest { impl RequestHandler for GetClusterStatusRequest {
type Response = GetClusterStatusResponse; type Response = GetClusterStatusResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterStatusResponse, Error> {
let layout = garage.system.cluster_layout(); let layout = garage.system.cluster_layout();
let mut nodes = garage let mut nodes = garage
.system .system
@ -120,7 +124,11 @@ impl RequestHandler for GetClusterStatusRequest {
impl RequestHandler for GetClusterHealthRequest { impl RequestHandler for GetClusterHealthRequest {
type Response = GetClusterHealthResponse; type Response = GetClusterHealthResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterHealthResponse, Error> {
use garage_rpc::system::ClusterHealthStatus; use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health(); let health = garage.system.health();
let health = GetClusterHealthResponse { let health = GetClusterHealthResponse {
@ -146,7 +154,11 @@ impl RequestHandler for GetClusterHealthRequest {
impl RequestHandler for ConnectClusterNodesRequest { impl RequestHandler for ConnectClusterNodesRequest {
type Response = ConnectClusterNodesResponse; type Response = ConnectClusterNodesResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ConnectClusterNodesResponse, Error> {
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node))) let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
.await .await
.into_iter() .into_iter()
@ -169,7 +181,11 @@ impl RequestHandler for ConnectClusterNodesRequest {
impl RequestHandler for GetClusterLayoutRequest { impl RequestHandler for GetClusterLayoutRequest {
type Response = GetClusterLayoutResponse; type Response = GetClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterLayoutResponse, Error> {
Ok(format_cluster_layout( Ok(format_cluster_layout(
garage.system.cluster_layout().inner(), garage.system.cluster_layout().inner(),
)) ))
@ -229,7 +245,11 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
impl RequestHandler for UpdateClusterLayoutRequest { impl RequestHandler for UpdateClusterLayoutRequest {
type Response = UpdateClusterLayoutResponse; type Response = UpdateClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<UpdateClusterLayoutResponse, Error> {
let mut layout = garage.system.cluster_layout().inner().clone(); let mut layout = garage.system.cluster_layout().inner().clone();
let mut roles = layout.current().roles.clone(); let mut roles = layout.current().roles.clone();
@ -275,7 +295,11 @@ impl RequestHandler for UpdateClusterLayoutRequest {
impl RequestHandler for ApplyClusterLayoutRequest { impl RequestHandler for ApplyClusterLayoutRequest {
type Response = ApplyClusterLayoutResponse; type Response = ApplyClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ApplyClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone(); let layout = garage.system.cluster_layout().inner().clone();
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?; let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
@ -296,7 +320,11 @@ impl RequestHandler for ApplyClusterLayoutRequest {
impl RequestHandler for RevertClusterLayoutRequest { impl RequestHandler for RevertClusterLayoutRequest {
type Response = RevertClusterLayoutResponse; type Response = RevertClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<RevertClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone(); let layout = garage.system.cluster_layout().inner().clone();
let layout = layout.revert_staged_changes()?; let layout = layout.revert_staged_changes()?;
garage garage

View file

@ -10,13 +10,13 @@ use garage_model::key_table::*;
use crate::admin::api::*; use crate::admin::api::*;
use crate::admin::error::*; use crate::admin::error::*;
use crate::admin::RequestHandler; use crate::admin::{Admin, RequestHandler};
#[async_trait] #[async_trait]
impl RequestHandler for ListKeysRequest { impl RequestHandler for ListKeysRequest {
type Response = ListKeysResponse; type Response = ListKeysResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> { async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
let res = garage let res = garage
.key_table .key_table
.get_range( .get_range(
@ -42,7 +42,11 @@ impl RequestHandler for ListKeysRequest {
impl RequestHandler for GetKeyInfoRequest { impl RequestHandler for GetKeyInfoRequest {
type Response = GetKeyInfoResponse; type Response = GetKeyInfoResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetKeyInfoResponse, Error> {
let key = match (self.id, self.search) { let key = match (self.id, self.search) {
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?, (Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
(None, Some(search)) => { (None, Some(search)) => {
@ -66,7 +70,11 @@ impl RequestHandler for GetKeyInfoRequest {
impl RequestHandler for CreateKeyRequest { impl RequestHandler for CreateKeyRequest {
type Response = CreateKeyResponse; type Response = CreateKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<CreateKeyResponse, Error> {
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key")); let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?; garage.key_table.insert(&key).await?;
@ -80,7 +88,11 @@ impl RequestHandler for CreateKeyRequest {
impl RequestHandler for ImportKeyRequest { impl RequestHandler for ImportKeyRequest {
type Response = ImportKeyResponse; type Response = ImportKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ImportKeyResponse, Error> {
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?; let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
if prev_key.is_some() { if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string())); return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
@ -104,7 +116,11 @@ impl RequestHandler for ImportKeyRequest {
impl RequestHandler for UpdateKeyRequest { impl RequestHandler for UpdateKeyRequest {
type Response = UpdateKeyResponse; type Response = UpdateKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<UpdateKeyResponse, Error> {
let mut key = garage.key_helper().get_existing_key(&self.id).await?; let mut key = garage.key_helper().get_existing_key(&self.id).await?;
let key_state = key.state.as_option_mut().unwrap(); let key_state = key.state.as_option_mut().unwrap();
@ -135,7 +151,11 @@ impl RequestHandler for UpdateKeyRequest {
impl RequestHandler for DeleteKeyRequest { impl RequestHandler for DeleteKeyRequest {
type Response = DeleteKeyResponse; type Response = DeleteKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<DeleteKeyResponse, Error> {
let helper = garage.locked_helper().await; let helper = garage.locked_helper().await;
let mut key = helper.key().get_existing_key(&self.id).await?; let mut key = helper.key().get_existing_key(&self.id).await?;

View file

@ -74,7 +74,7 @@ macro_rules! admin_endpoints {
impl RequestHandler for AdminApiRequest { impl RequestHandler for AdminApiRequest {
type Response = AdminApiResponse; type Response = AdminApiResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> { async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
Ok(match self { Ok(match self {
$( $(
AdminApiRequest::$special_endpoint(_) => panic!( AdminApiRequest::$special_endpoint(_) => panic!(
@ -82,7 +82,7 @@ macro_rules! admin_endpoints {
), ),
)* )*
$( $(
AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?), AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)* )*
}) })
} }
@ -140,7 +140,7 @@ macro_rules! local_admin_endpoints {
impl RequestHandler for [< $endpoint Request >] { impl RequestHandler for [< $endpoint Request >] {
type Response = [< $endpoint Response >]; type Response = [< $endpoint Response >];
async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, Error> { async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
todo!() todo!()
} }
} }
@ -160,10 +160,10 @@ macro_rules! local_admin_endpoints {
impl RequestHandler for LocalAdminApiRequest { impl RequestHandler for LocalAdminApiRequest {
type Response = LocalAdminApiResponse; type Response = LocalAdminApiResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<LocalAdminApiResponse, Error> { async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
Ok(match self { Ok(match self {
$( $(
LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage).await?), LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)* )*
}) })
} }

View file

@ -7,8 +7,6 @@ mod router_v0;
mod router_v1; mod router_v1;
mod router_v2; mod router_v2;
pub mod rpc;
mod bucket; mod bucket;
mod cluster; mod cluster;
mod key; mod key;
@ -22,6 +20,8 @@ use async_trait::async_trait;
use garage_model::garage::Garage; use garage_model::garage::Garage;
pub use api_server::AdminApiServer as Admin;
pub enum Authorization { pub enum Authorization {
None, None,
MetricsToken, MetricsToken,
@ -32,5 +32,9 @@ pub enum Authorization {
pub trait RequestHandler { pub trait RequestHandler {
type Response; type Response;
async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>; async fn handle(
self,
garage: &Arc<Garage>,
admin: &Admin,
) -> Result<Self::Response, error::Error>;
} }

View file

@ -1,75 +0,0 @@
use std::fmt::Write;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use garage_util::background::BackgroundRunner;
use garage_util::error::Error;
use garage_rpc::*;
use garage_model::garage::Garage;
use crate::admin::api::{LocalAdminApiRequest, LocalAdminApiResponse};
use crate::admin::RequestHandler as AdminApiEndpoint;
use crate::generic_server::ApiError;
pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
pub struct AdminRpc(LocalAdminApiRequest);
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpcResponse {
ApiOkResponse(LocalAdminApiResponse),
ApiErrorResponse {
http_code: u16,
error_code: String,
message: String,
},
}
impl Rpc for AdminRpc {
type Response = Result<AdminRpcResponse, Error>;
}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
background: Arc<BackgroundRunner>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
let admin = Arc::new(Self {
garage,
background,
endpoint,
});
admin.endpoint.set_handler(admin.clone());
admin
}
}
#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
async fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
) -> Result<AdminRpcResponse, Error> {
let req = message.0.clone();
info!("Proxied admin API request: {}", req.name());
let res = req.handle(&self.garage).await;
match res {
Ok(res) => Ok(AdminRpcResponse::ApiOkResponse(res)),
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
http_code: e.http_status_code().as_u16(),
error_code: e.code().to_string(),
message: e.to_string(),
}),
}
}
}

View file

@ -13,14 +13,18 @@ use garage_rpc::system::ClusterHealthStatus;
use crate::admin::api::{CheckDomainRequest, HealthRequest, OptionsRequest}; use crate::admin::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
use crate::admin::api_server::ResBody; use crate::admin::api_server::ResBody;
use crate::admin::error::*; use crate::admin::error::*;
use crate::admin::RequestHandler; use crate::admin::{Admin, RequestHandler};
use crate::helpers::*; use crate::helpers::*;
#[async_trait] #[async_trait]
impl RequestHandler for OptionsRequest { impl RequestHandler for OptionsRequest {
type Response = Response<ResBody>; type Response = Response<ResBody>;
async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { async fn handle(
self,
_garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<Response<ResBody>, Error> {
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
.header(ALLOW, "OPTIONS,GET,POST") .header(ALLOW, "OPTIONS,GET,POST")
@ -35,7 +39,11 @@ impl RequestHandler for OptionsRequest {
impl RequestHandler for CheckDomainRequest { impl RequestHandler for CheckDomainRequest {
type Response = Response<ResBody>; type Response = Response<ResBody>;
async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<Response<ResBody>, Error> {
if check_domain(garage, &self.domain).await? { if check_domain(garage, &self.domain).await? {
Ok(Response::builder() Ok(Response::builder()
.status(StatusCode::OK) .status(StatusCode::OK)
@ -105,7 +113,11 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
impl RequestHandler for HealthRequest { impl RequestHandler for HealthRequest {
type Response = Response<ResBody>; type Response = Response<ResBody>;
async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> { async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<Response<ResBody>, Error> {
let health = garage.system.health(); let health = garage.system.health();
let (status, status_str) = match health.status { let (status, status_str) = match health.status {

View file

@ -1,3 +1,4 @@
use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
@ -6,13 +7,26 @@ use garage_model::garage::Garage;
use crate::admin::api::*; use crate::admin::api::*;
use crate::admin::error::Error; use crate::admin::error::Error;
use crate::admin::RequestHandler; use crate::admin::{Admin, RequestHandler};
#[async_trait] #[async_trait]
impl RequestHandler for LocalGetWorkerParamRequest { impl RequestHandler for LocalGetWorkerParamRequest {
type Response = LocalGetWorkerParamResponse; type Response = LocalGetWorkerParamResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<LocalGetWorkerParamResponse, Error> { async fn handle(
todo!() self,
garage: &Arc<Garage>,
admin: &Admin,
) -> Result<LocalGetWorkerParamResponse, Error> {
let mut res = HashMap::new();
if let Some(k) = self.param {
res.insert(k.clone(), garage.bg_vars.get(&k)?);
} else {
let mut vars = garage.bg_vars.get_all();
for (k, v) in vars.iter() {
res.insert(k.to_string(), v.to_string());
}
}
Ok(LocalGetWorkerParamResponse(res))
} }
} }

View file

@ -66,6 +66,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Initialize Admin API server and metrics collector..."); info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new( let admin_server = AdminApiServer::new(
garage.clone(), garage.clone(),
background.clone(),
#[cfg(feature = "metrics")] #[cfg(feature = "metrics")]
metrics_exporter, metrics_exporter,
); );
@ -124,7 +125,11 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching Admin API server..."); info!("Launching Admin API server...");
servers.push(( servers.push((
"Admin", "Admin",
tokio::spawn(admin_server.run(admin_bind_addr.clone(), watch_cancel.clone())), tokio::spawn(
admin_server
.clone()
.run(admin_bind_addr.clone(), watch_cancel.clone()),
),
)); ));
} }