Admin API refactoring: convert existing commands to API requests (step 3) #945

Merged
lx merged 12 commits from refactor-admin into next-v2 2025-02-05 19:54:42 +00:00
39 changed files with 2236 additions and 1530 deletions

4
Cargo.lock generated
View file

@ -1258,7 +1258,6 @@ dependencies = [
"opentelemetry-otlp",
"opentelemetry-prometheus",
"parse_duration",
"serde",
"serde_json",
"sha1",
"sha2",
@ -1277,9 +1276,12 @@ version = "1.0.1"
dependencies = [
"argon2",
"async-trait",
"bytesize",
"err-derive",
"format_table",
"futures",
"garage_api_common",
"garage_block",
"garage_model",
"garage_rpc",
"garage_table",

View file

@ -826,6 +826,46 @@ paths:
schema:
$ref: '#/components/schemas/BucketInfo'
/CleanupIncompleteUploads:
post:
tags:
- Bucket
operationId: "CleanupIncompleteUploads"
summary: "Cleanup incomplete uploads in a bucket"
description: |
Cleanup all incomplete uploads in a bucket that are older than a specified number of seconds
requestBody:
description: |
Bucket id and minimum age of uploads to delete (in seconds)
required: true
content:
application/json:
schema:
type: object
required: [bucketId, olderThanSecs]
properties:
bucketId:
type: string
example: "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b"
olderThanSecs:
type: integer
example: "3600"
responses:
'500':
description: "The server can not handle your request. Check your connectivity with the rest of the cluster."
'400':
description: "The payload is not formatted correctly"
'200':
description: "The bucket was cleaned up successfully"
content:
application/json:
schema:
type: object
properties:
uploadsDeleted:
type: integer
example: 12
/AllowBucketKey:
post:
tags:

View file

@ -702,6 +702,28 @@ Deletes a storage bucket. A bucket cannot be deleted if it is not empty.
Warning: this will delete all aliases associated with the bucket!
#### CleanupIncompleteUploads `POST /v2/CleanupIncompleteUploads`
Cleanup all incomplete uploads in a bucket that are older than a specified number
of seconds.
Request body format:
```json
{
"bucketId": "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b",
"olderThanSecs": 3600
}
```
Response format
```json
{
"uploadsDeleted": 12
}
```
### Operations on permissions for keys on buckets

View file

@ -14,7 +14,9 @@ path = "lib.rs"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
format_table.workspace = true
garage_model.workspace = true
garage_block.workspace = true
garage_table.workspace = true
garage_util.workspace = true
garage_rpc.workspace = true
@ -22,6 +24,7 @@ garage_api_common.workspace = true
argon2.workspace = true
async-trait.workspace = true
bytesize.workspace = true
err-derive.workspace = true
hex.workspace = true
paste.workspace = true

View file

@ -1,18 +1,22 @@
use std::collections::HashMap;
use std::convert::TryFrom;
use std::net::SocketAddr;
use std::sync::Arc;
use async_trait::async_trait;
use paste::paste;
use serde::{Deserialize, Serialize};
use garage_rpc::*;
use garage_model::garage::Garage;
use garage_api_common::common_error::CommonErrorDerivative;
use garage_api_common::helpers::is_default;
use crate::api_server::{AdminRpc, AdminRpcResponse};
use crate::error::Error;
use crate::macros::*;
use crate::EndpointHandler;
use crate::{Admin, RequestHandler};
// This generates the following:
//
@ -62,6 +66,7 @@ admin_endpoints![
CreateBucket,
UpdateBucket,
DeleteBucket,
CleanupIncompleteUploads,
// Operations on permissions for keys on buckets
AllowBucketKey,
@ -70,8 +75,55 @@ admin_endpoints![
// Operations on bucket aliases
AddBucketAlias,
RemoveBucketAlias,
// Node operations
CreateMetadataSnapshot,
GetNodeStatistics,
GetClusterStatistics,
LaunchRepairOperation,
// Worker operations
ListWorkers,
GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
// Block operations
ListBlockErrors,
GetBlockInfo,
RetryBlockResync,
PurgeBlocks,
];
local_admin_endpoints![
// Node operations
CreateMetadataSnapshot,
GetNodeStatistics,
LaunchRepairOperation,
// Background workers
ListWorkers,
GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
// Block operations
ListBlockErrors,
GetBlockInfo,
RetryBlockResync,
PurgeBlocks,
];
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiRequest<RB> {
pub node: String,
pub body: RB,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiResponse<RB> {
pub success: HashMap<String, RB>,
pub error: HashMap<String, String>,
}
// **********************************************
// Special endpoints
//
@ -497,6 +549,19 @@ pub struct DeleteBucketRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteBucketResponse;
// ---- CleanupIncompleteUploads ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupIncompleteUploadsRequest {
pub bucket_id: String,
pub older_than_secs: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupIncompleteUploadsResponse {
pub uploads_deleted: u64,
}
// **********************************************
// Operations on permissions for keys on buckets
// **********************************************
@ -566,3 +631,246 @@ pub struct RemoveBucketAliasRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
// **********************************************
// Node operations
// **********************************************
// ---- CreateMetadataSnapshot ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalCreateMetadataSnapshotRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalCreateMetadataSnapshotResponse;
// ---- GetNodeStatistics ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalGetNodeStatisticsRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetNodeStatisticsResponse {
pub freeform: String,
}
// ---- GetClusterStatistics ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct GetClusterStatisticsRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct GetClusterStatisticsResponse {
pub freeform: String,
}
// ---- LaunchRepairOperation ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalLaunchRepairOperationRequest {
pub repair_type: RepairType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum RepairType {
Tables,
Blocks,
Versions,
MultipartUploads,
BlockRefs,
BlockRc,
Rebalance,
Scrub(ScrubCommand),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum ScrubCommand {
Start,
Pause,
Resume,
Cancel,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalLaunchRepairOperationResponse;
// **********************************************
// Worker operations
// **********************************************
// ---- GetWorkerList ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct LocalListWorkersRequest {
#[serde(default)]
pub busy_only: bool,
#[serde(default)]
pub error_only: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalListWorkersResponse(pub Vec<WorkerInfoResp>);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkerInfoResp {
pub id: u64,
pub name: String,
pub state: WorkerStateResp,
pub errors: u64,
pub consecutive_errors: u64,
pub last_error: Option<WorkerLastError>,
pub tranquility: Option<u32>,
pub progress: Option<String>,
pub queue_length: Option<u64>,
pub persistent_errors: Option<u64>,
pub freeform: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum WorkerStateResp {
Busy,
Throttled { duration_secs: f32 },
Idle,
Done,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct WorkerLastError {
pub message: String,
pub secs_ago: u64,
}
// ---- GetWorkerList ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerInfoRequest {
pub id: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp);
// ---- GetWorkerVariable ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerVariableRequest {
pub variable: Option<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalGetWorkerVariableResponse(pub HashMap<String, String>);
// ---- SetWorkerVariable ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalSetWorkerVariableRequest {
pub variable: String,
pub value: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalSetWorkerVariableResponse {
pub variable: String,
pub value: String,
}
// **********************************************
// Block operations
// **********************************************
// ---- ListBlockErrors ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalListBlockErrorsRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>);
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct BlockError {
pub block_hash: String,
pub refcount: u64,
pub error_count: u64,
pub last_try_secs_ago: u64,
pub next_try_in_secs: u64,
}
// ---- GetBlockInfo ----
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalGetBlockInfoRequest {
pub block_hash: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalGetBlockInfoResponse {
pub block_hash: String,
pub refcount: u64,
pub versions: Vec<BlockVersion>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BlockVersion {
pub version_id: String,
pub deleted: bool,
pub garbage_collected: bool,
pub backlink: Option<BlockVersionBacklink>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BlockVersionBacklink {
Object {
bucket_id: String,
key: String,
},
Upload {
upload_id: String,
upload_deleted: bool,
upload_garbage_collected: bool,
bucket_id: Option<String>,
key: Option<String>,
},
}
// ---- RetryBlockResync ----
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum LocalRetryBlockResyncRequest {
#[serde(rename_all = "camelCase")]
All { all: bool },
#[serde(rename_all = "camelCase")]
Blocks { block_hashes: Vec<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalRetryBlockResyncResponse {
pub count: u64,
}
// ---- PurgeBlocks ----
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalPurgeBlocksRequest(pub Vec<String>);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalPurgeBlocksResponse {
pub blocks_purged: u64,
pub objects_deleted: u64,
pub uploads_deleted: u64,
pub versions_deleted: u64,
}

View file

@ -5,17 +5,18 @@ use argon2::password_hash::PasswordHash;
use async_trait::async_trait;
use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION};
use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode};
use hyper::{body::Incoming as IncomingBody, Request, Response};
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use opentelemetry::trace::SpanRef;
#[cfg(feature = "metrics")]
use opentelemetry_prometheus::PrometheusExporter;
#[cfg(feature = "metrics")]
use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
use garage_rpc::{Endpoint as RpcEndpoint, *};
use garage_util::background::BackgroundRunner;
use garage_util::error::Error as GarageError;
use garage_util::socket_address::UnixOrTCPSocketAddress;
@ -27,19 +28,84 @@ use crate::error::*;
use crate::router_v0;
use crate::router_v1;
use crate::Authorization;
use crate::EndpointHandler;
use crate::RequestHandler;
// ---- FOR RPC ----
pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpc {
Proxy(AdminApiRequest),
Internal(LocalAdminApiRequest),
}
#[derive(Debug, Serialize, Deserialize)]
pub enum AdminRpcResponse {
ProxyApiOkResponse(TaggedAdminApiResponse),
InternalApiOkResponse(LocalAdminApiResponse),
ApiErrorResponse {
http_code: u16,
error_code: String,
message: String,
},
}
impl Rpc for AdminRpc {
type Response = Result<AdminRpcResponse, GarageError>;
}
#[async_trait]
impl EndpointHandler<AdminRpc> for AdminApiServer {
async fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
) -> Result<AdminRpcResponse, GarageError> {
match message {
AdminRpc::Proxy(req) => {
info!("Proxied admin API request: {}", req.name());
let res = req.clone().handle(&self.garage, &self).await;
match res {
Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())),
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
http_code: e.http_status_code().as_u16(),
error_code: e.code().to_string(),
message: e.to_string(),
}),
}
}
AdminRpc::Internal(req) => {
info!("Internal admin API request: {}", req.name());
let res = req.clone().handle(&self.garage, &self).await;
match res {
Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)),
Err(e) => Ok(AdminRpcResponse::ApiErrorResponse {
http_code: e.http_status_code().as_u16(),
error_code: e.code().to_string(),
message: e.to_string(),
}),
}
}
}
}
}
// ---- FOR HTTP ----
pub type ResBody = BoxBody<Error>;
pub struct AdminApiServer {
garage: Arc<Garage>,
#[cfg(feature = "metrics")]
exporter: PrometheusExporter,
pub(crate) exporter: PrometheusExporter,
metrics_token: Option<String>,
admin_token: Option<String>,
pub(crate) background: Arc<BackgroundRunner>,
pub(crate) endpoint: Arc<RpcEndpoint<AdminRpc, Self>>,
}
pub enum Endpoint {
pub enum HttpEndpoint {
Old(router_v1::Endpoint),
New(String),
}
@ -47,91 +113,48 @@ pub enum Endpoint {
impl AdminApiServer {
pub fn new(
garage: Arc<Garage>,
background: Arc<BackgroundRunner>,
#[cfg(feature = "metrics")] exporter: PrometheusExporter,
) -> Self {
) -> Arc<Self> {
let cfg = &garage.config.admin;
let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token);
let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token);
Self {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
let admin = Arc::new(Self {
garage,
#[cfg(feature = "metrics")]
exporter,
metrics_token,
admin_token,
}
background,
endpoint,
});
admin.endpoint.set_handler(admin.clone());
admin
}
pub async fn run(
self,
self: Arc<Self>,
bind_addr: UnixOrTCPSocketAddress,
must_exit: watch::Receiver<bool>,
) -> Result<(), GarageError> {
let region = self.garage.config.s3_api.s3_region.clone();
ApiServer::new(region, self)
ApiServer::new(region, ArcAdminApiServer(self))
.run_server(bind_addr, Some(0o220), must_exit)
.await
}
fn handle_metrics(&self) -> Result<Response<ResBody>, Error> {
#[cfg(feature = "metrics")]
{
use opentelemetry::trace::Tracer;
let mut buffer = vec![];
let encoder = TextEncoder::new();
let tracer = opentelemetry::global::tracer("garage");
let metric_families = tracer.in_span("admin/gather_metrics", |_| {
self.exporter.registry().gather()
});
encoder
.encode(&metric_families, &mut buffer)
.ok_or_internal_error("Could not serialize metrics")?;
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.body(bytes_body(buffer.into()))?)
}
#[cfg(not(feature = "metrics"))]
Err(Error::bad_request(
"Garage was built without the metrics feature".to_string(),
))
}
}
#[async_trait]
impl ApiHandler for AdminApiServer {
const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin";
type Endpoint = Endpoint;
type Error = Error;
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<Endpoint, Error> {
if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
Ok(Endpoint::Old(endpoint_v1))
} else if req.uri().path().starts_with("/v1/") {
let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
Ok(Endpoint::Old(endpoint_v1))
} else {
Ok(Endpoint::New(req.uri().path().to_string()))
}
}
async fn handle(
async fn handle_http_api(
&self,
req: Request<IncomingBody>,
endpoint: Endpoint,
endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
let auth_header = req.headers().get(AUTHORIZATION).cloned();
let request = match endpoint {
Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
Endpoint::New(_) => AdminApiRequest::from_request(req).await?,
HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?,
HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?,
};
let required_auth_hash =
@ -156,12 +179,12 @@ impl ApiHandler for AdminApiServer {
}
match request {
AdminApiRequest::Options(req) => req.handle(&self.garage).await,
AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await,
AdminApiRequest::Health(req) => req.handle(&self.garage).await,
AdminApiRequest::Metrics(_req) => self.handle_metrics(),
AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await,
AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await,
req => {
let res = req.handle(&self.garage).await?;
let res = req.handle(&self.garage, &self).await?;
let mut res = json_ok_response(&res)?;
res.headers_mut()
.insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*"));
@ -171,7 +194,39 @@ impl ApiHandler for AdminApiServer {
}
}
impl ApiEndpoint for Endpoint {
struct ArcAdminApiServer(Arc<AdminApiServer>);
#[async_trait]
impl ApiHandler for ArcAdminApiServer {
const API_NAME: &'static str = "admin";
const API_NAME_DISPLAY: &'static str = "Admin";
type Endpoint = HttpEndpoint;
type Error = Error;
fn parse_endpoint(&self, req: &Request<IncomingBody>) -> Result<HttpEndpoint, Error> {
if req.uri().path().starts_with("/v0/") {
let endpoint_v0 = router_v0::Endpoint::from_request(req)?;
let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?;
Ok(HttpEndpoint::Old(endpoint_v1))
} else if req.uri().path().starts_with("/v1/") {
let endpoint_v1 = router_v1::Endpoint::from_request(req)?;
Ok(HttpEndpoint::Old(endpoint_v1))
} else {
Ok(HttpEndpoint::New(req.uri().path().to_string()))
}
}
async fn handle(
&self,
req: Request<IncomingBody>,
endpoint: HttpEndpoint,
) -> Result<Response<ResBody>, Error> {
self.0.handle_http_api(req, endpoint).await
}
}
impl ApiEndpoint for HttpEndpoint {
fn name(&self) -> Cow<'static, str> {
match self {
Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()),

274
src/api/admin/block.rs Normal file
View file

@ -0,0 +1,274 @@
use std::sync::Arc;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::now_msec;
use garage_table::EmptyKey;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_api_common::common_error::CommonErrorDerivative;
use crate::api::*;
use crate::error::*;
use crate::{Admin, RequestHandler};
impl RequestHandler for LocalListBlockErrorsRequest {
type Response = LocalListBlockErrorsResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalListBlockErrorsResponse, Error> {
let errors = garage.block_manager.list_resync_errors()?;
let now = now_msec();
let errors = errors
.into_iter()
.map(|e| BlockError {
block_hash: hex::encode(&e.hash),
refcount: e.refcount,
error_count: e.error_count,
last_try_secs_ago: now.saturating_sub(e.last_try) / 1000,
next_try_in_secs: e.next_try.saturating_sub(now) / 1000,
})
.collect();
Ok(LocalListBlockErrorsResponse(errors))
}
}
impl RequestHandler for LocalGetBlockInfoRequest {
type Response = LocalGetBlockInfoResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalGetBlockInfoResponse, Error> {
let hash = find_block_hash_by_prefix(garage, &self.block_hash)?;
let refcount = garage.block_manager.get_block_rc(&hash)?;
let block_refs = garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
for br in block_refs {
if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? {
let bl = match &v.backlink {
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
BlockVersionBacklink::Upload {
upload_id: hex::encode(&upload_id),
upload_deleted: u.deleted.get(),
upload_garbage_collected: false,
bucket_id: Some(hex::encode(&u.bucket_id)),
key: Some(u.key.to_string()),
}
} else {
BlockVersionBacklink::Upload {
upload_id: hex::encode(&upload_id),
upload_deleted: true,
upload_garbage_collected: true,
bucket_id: None,
key: None,
}
}
}
VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object {
bucket_id: hex::encode(&bucket_id),
key: key.to_string(),
},
};
versions.push(BlockVersion {
version_id: hex::encode(&br.version),
deleted: v.deleted.get(),
garbage_collected: false,
backlink: Some(bl),
});
} else {
versions.push(BlockVersion {
version_id: hex::encode(&br.version),
deleted: true,
garbage_collected: true,
backlink: None,
});
}
}
Ok(LocalGetBlockInfoResponse {
block_hash: hex::encode(&hash),
refcount,
versions,
})
}
}
impl RequestHandler for LocalRetryBlockResyncRequest {
type Response = LocalRetryBlockResyncResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalRetryBlockResyncResponse, Error> {
match self {
Self::All { all: true } => {
let blocks = garage.block_manager.list_resync_errors()?;
for b in blocks.iter() {
garage.block_manager.resync.clear_backoff(&b.hash)?;
}
Ok(LocalRetryBlockResyncResponse {
count: blocks.len() as u64,
})
}
Self::All { all: false } => Err(Error::bad_request("nonsense")),
Self::Blocks { block_hashes } => {
for hash in block_hashes.iter() {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
garage.block_manager.resync.clear_backoff(&hash)?;
}
Ok(LocalRetryBlockResyncResponse {
count: block_hashes.len() as u64,
})
}
}
}
}
impl RequestHandler for LocalPurgeBlocksRequest {
type Response = LocalPurgeBlocksResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalPurgeBlocksResponse, Error> {
let mut obj_dels = 0;
let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in self.0.iter() {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let block_refs = garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
for br in block_refs {
if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? {
handle_block_purge_version_backlink(
garage,
&version,
&mut obj_dels,
&mut mpu_dels,
)
.await?;
if !version.deleted.get() {
let deleted_version = Version::new(version.uuid, version.backlink, true);
garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
}
Ok(LocalPurgeBlocksResponse {
blocks_purged: self.0.len() as u64,
versions_deleted: ver_dels,
objects_deleted: obj_dels,
uploads_deleted: mpu_dels,
})
}
}
fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
if prefix.len() < 4 {
return Err(Error::bad_request(
"Please specify at least 4 characters of the block hash",
));
}
let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
let iter = garage
.block_ref_table
.data
.store
.range(&prefix_bin[..]..)
.map_err(GarageError::from)?;
let mut found = None;
for item in iter {
let (k, _v) = item.map_err(GarageError::from)?;
let hash = Hash::try_from(&k[..32]).unwrap();
if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
break;
}
if hex::encode(hash.as_slice()).starts_with(prefix) {
match &found {
Some(x) if *x == hash => (),
Some(_) => {
return Err(Error::bad_request(format!(
"Several blocks match prefix `{}`",
prefix
)));
}
None => {
found = Some(hash);
}
}
}
}
found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
}
async fn handle_block_purge_version_backlink(
garage: &Arc<Garage>,
version: &Version,
obj_dels: &mut u64,
mpu_dels: &mut u64,
) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = garage.object_table.get(&bucket_id, &key).await? {
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
}

View file

@ -1,7 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use std::time::Duration;
use garage_util::crdt::*;
use garage_util::data::*;
@ -20,13 +19,16 @@ use garage_api_common::common_error::CommonError;
use crate::api::*;
use crate::error::*;
use crate::EndpointHandler;
use crate::{Admin, RequestHandler};
#[async_trait]
impl EndpointHandler for ListBucketsRequest {
impl RequestHandler for ListBucketsRequest {
type Response = ListBucketsResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ListBucketsResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ListBucketsResponse, Error> {
let buckets = garage
.bucket_table
.get_range(
@ -69,11 +71,14 @@ impl EndpointHandler for ListBucketsRequest {
}
}
#[async_trait]
impl EndpointHandler for GetBucketInfoRequest {
impl RequestHandler for GetBucketInfoRequest {
type Response = GetBucketInfoResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetBucketInfoResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetBucketInfoResponse, Error> {
let bucket_id = match (self.id, self.global_alias, self.search) {
(Some(id), None, None) => parse_bucket_id(&id)?,
(None, Some(ga), None) => garage
@ -221,11 +226,14 @@ async fn bucket_info_results(
Ok(res)
}
#[async_trait]
impl EndpointHandler for CreateBucketRequest {
impl RequestHandler for CreateBucketRequest {
type Response = CreateBucketResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<CreateBucketResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<CreateBucketResponse, Error> {
let helper = garage.locked_helper().await;
if let Some(ga) = &self.global_alias {
@ -292,11 +300,14 @@ impl EndpointHandler for CreateBucketRequest {
}
}
#[async_trait]
impl EndpointHandler for DeleteBucketRequest {
impl RequestHandler for DeleteBucketRequest {
type Response = DeleteBucketResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteBucketResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<DeleteBucketResponse, Error> {
let helper = garage.locked_helper().await;
let bucket_id = parse_bucket_id(&self.id)?;
@ -341,11 +352,14 @@ impl EndpointHandler for DeleteBucketRequest {
}
}
#[async_trait]
impl EndpointHandler for UpdateBucketRequest {
impl RequestHandler for UpdateBucketRequest {
type Response = UpdateBucketResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateBucketResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<UpdateBucketResponse, Error> {
let bucket_id = parse_bucket_id(&self.id)?;
let mut bucket = garage
@ -388,23 +402,52 @@ impl EndpointHandler for UpdateBucketRequest {
}
}
impl RequestHandler for CleanupIncompleteUploadsRequest {
type Response = CleanupIncompleteUploadsResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<CleanupIncompleteUploadsResponse, Error> {
let duration = Duration::from_secs(self.older_than_secs);
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let count = garage
.bucket_helper()
.cleanup_incomplete_uploads(&bucket_id, duration)
.await?;
Ok(CleanupIncompleteUploadsResponse {
uploads_deleted: count as u64,
})
}
}
// ---- BUCKET/KEY PERMISSIONS ----
#[async_trait]
impl EndpointHandler for AllowBucketKeyRequest {
impl RequestHandler for AllowBucketKeyRequest {
type Response = AllowBucketKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<AllowBucketKeyResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<AllowBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
Ok(AllowBucketKeyResponse(res))
}
}
#[async_trait]
impl EndpointHandler for DenyBucketKeyRequest {
impl RequestHandler for DenyBucketKeyRequest {
type Response = DenyBucketKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<DenyBucketKeyResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<DenyBucketKeyResponse, Error> {
let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
Ok(DenyBucketKeyResponse(res))
}
@ -449,11 +492,14 @@ pub async fn handle_bucket_change_key_perm(
// ---- BUCKET ALIASES ----
#[async_trait]
impl EndpointHandler for AddBucketAliasRequest {
impl RequestHandler for AddBucketAliasRequest {
type Response = AddBucketAliasResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<AddBucketAliasResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<AddBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;
@ -480,11 +526,14 @@ impl EndpointHandler for AddBucketAliasRequest {
}
}
#[async_trait]
impl EndpointHandler for RemoveBucketAliasRequest {
impl RequestHandler for RemoveBucketAliasRequest {
type Response = RemoveBucketAliasResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<RemoveBucketAliasResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<RemoveBucketAliasResponse, Error> {
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let helper = garage.locked_helper().await;

View file

@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use garage_util::crdt::*;
use garage_util::data::*;
@ -12,13 +10,16 @@ use garage_model::garage::Garage;
use crate::api::*;
use crate::error::*;
use crate::EndpointHandler;
use crate::{Admin, RequestHandler};
#[async_trait]
impl EndpointHandler for GetClusterStatusRequest {
impl RequestHandler for GetClusterStatusRequest {
type Response = GetClusterStatusResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterStatusResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterStatusResponse, Error> {
let layout = garage.system.cluster_layout();
let mut nodes = garage
.system
@ -116,11 +117,14 @@ impl EndpointHandler for GetClusterStatusRequest {
}
}
#[async_trait]
impl EndpointHandler for GetClusterHealthRequest {
impl RequestHandler for GetClusterHealthRequest {
type Response = GetClusterHealthResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterHealthResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterHealthResponse, Error> {
use garage_rpc::system::ClusterHealthStatus;
let health = garage.system.health();
let health = GetClusterHealthResponse {
@ -142,11 +146,14 @@ impl EndpointHandler for GetClusterHealthRequest {
}
}
#[async_trait]
impl EndpointHandler for ConnectClusterNodesRequest {
impl RequestHandler for ConnectClusterNodesRequest {
type Response = ConnectClusterNodesResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ConnectClusterNodesResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ConnectClusterNodesResponse, Error> {
let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node)))
.await
.into_iter()
@ -165,11 +172,14 @@ impl EndpointHandler for ConnectClusterNodesRequest {
}
}
#[async_trait]
impl EndpointHandler for GetClusterLayoutRequest {
impl RequestHandler for GetClusterLayoutRequest {
type Response = GetClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetClusterLayoutResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterLayoutResponse, Error> {
Ok(format_cluster_layout(
garage.system.cluster_layout().inner(),
))
@ -225,11 +235,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp
// ---- update functions ----
#[async_trait]
impl EndpointHandler for UpdateClusterLayoutRequest {
impl RequestHandler for UpdateClusterLayoutRequest {
type Response = UpdateClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateClusterLayoutResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<UpdateClusterLayoutResponse, Error> {
let mut layout = garage.system.cluster_layout().inner().clone();
let mut roles = layout.current().roles.clone();
@ -271,11 +284,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest {
}
}
#[async_trait]
impl EndpointHandler for ApplyClusterLayoutRequest {
impl RequestHandler for ApplyClusterLayoutRequest {
type Response = ApplyClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ApplyClusterLayoutResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ApplyClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let (layout, msg) = layout.apply_staged_changes(Some(self.version))?;
@ -292,11 +308,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest {
}
}
#[async_trait]
impl EndpointHandler for RevertClusterLayoutRequest {
impl RequestHandler for RevertClusterLayoutRequest {
type Response = RevertClusterLayoutResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<RevertClusterLayoutResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<RevertClusterLayoutResponse, Error> {
let layout = garage.system.cluster_layout().inner().clone();
let layout = layout.revert_staged_changes()?;
garage

View file

@ -25,6 +25,14 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
/// The requested block does not exist
#[error(display = "Block not found: {}", _0)]
NoSuchBlock(String),
/// The requested worker does not exist
#[error(display = "Worker not found: {}", _0)]
NoSuchWorker(u64),
/// In Import key, the key already exists
#[error(
display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.",
@ -53,6 +61,8 @@ impl Error {
match self {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
Error::NoSuchWorker(_) => "NoSuchWorker",
Error::NoSuchBlock(_) => "NoSuchBlock",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@ -63,7 +73,9 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND,
Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => {
StatusCode::NOT_FOUND
}
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}

View file

@ -1,8 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use async_trait::async_trait;
use garage_table::*;
use garage_model::garage::Garage;
@ -10,13 +8,12 @@ use garage_model::key_table::*;
use crate::api::*;
use crate::error::*;
use crate::EndpointHandler;
use crate::{Admin, RequestHandler};
#[async_trait]
impl EndpointHandler for ListKeysRequest {
impl RequestHandler for ListKeysRequest {
type Response = ListKeysResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ListKeysResponse, Error> {
async fn handle(self, garage: &Arc<Garage>, _admin: &Admin) -> Result<ListKeysResponse, Error> {
let res = garage
.key_table
.get_range(
@ -38,11 +35,14 @@ impl EndpointHandler for ListKeysRequest {
}
}
#[async_trait]
impl EndpointHandler for GetKeyInfoRequest {
impl RequestHandler for GetKeyInfoRequest {
type Response = GetKeyInfoResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<GetKeyInfoResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetKeyInfoResponse, Error> {
let key = match (self.id, self.search) {
(Some(id), None) => garage.key_helper().get_existing_key(&id).await?,
(None, Some(search)) => {
@ -62,11 +62,14 @@ impl EndpointHandler for GetKeyInfoRequest {
}
}
#[async_trait]
impl EndpointHandler for CreateKeyRequest {
impl RequestHandler for CreateKeyRequest {
type Response = CreateKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<CreateKeyResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<CreateKeyResponse, Error> {
let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key"));
garage.key_table.insert(&key).await?;
@ -76,11 +79,14 @@ impl EndpointHandler for CreateKeyRequest {
}
}
#[async_trait]
impl EndpointHandler for ImportKeyRequest {
impl RequestHandler for ImportKeyRequest {
type Response = ImportKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<ImportKeyResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<ImportKeyResponse, Error> {
let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?;
if prev_key.is_some() {
return Err(Error::KeyAlreadyExists(self.access_key_id.to_string()));
@ -100,11 +106,14 @@ impl EndpointHandler for ImportKeyRequest {
}
}
#[async_trait]
impl EndpointHandler for UpdateKeyRequest {
impl RequestHandler for UpdateKeyRequest {
type Response = UpdateKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<UpdateKeyResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<UpdateKeyResponse, Error> {
let mut key = garage.key_helper().get_existing_key(&self.id).await?;
let key_state = key.state.as_option_mut().unwrap();
@ -131,11 +140,14 @@ impl EndpointHandler for UpdateKeyRequest {
}
}
#[async_trait]
impl EndpointHandler for DeleteKeyRequest {
impl RequestHandler for DeleteKeyRequest {
type Response = DeleteKeyResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<DeleteKeyResponse, Error> {
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<DeleteKeyResponse, Error> {
let helper = garage.locked_helper().await;
let mut key = helper.key().get_existing_key(&self.id).await?;

View file

@ -15,21 +15,29 @@ mod cluster;
mod key;
mod special;
mod block;
mod node;
mod repair;
mod worker;
use std::sync::Arc;
use async_trait::async_trait;
use garage_model::garage::Garage;
pub use api_server::AdminApiServer as Admin;
pub enum Authorization {
None,
MetricsToken,
AdminToken,
}
#[async_trait]
pub trait EndpointHandler {
pub trait RequestHandler {
type Response;
async fn handle(self, garage: &Arc<Garage>) -> Result<Self::Response, error::Error>;
fn handle(
self,
garage: &Arc<Garage>,
admin: &Admin,
) -> impl std::future::Future<Output = Result<Self::Response, error::Error>> + Send;
}

View file

@ -70,11 +70,10 @@ macro_rules! admin_endpoints {
}
)*
#[async_trait]
impl EndpointHandler for AdminApiRequest {
impl RequestHandler for AdminApiRequest {
type Response = AdminApiResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<AdminApiResponse, Error> {
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<AdminApiResponse, Error> {
Ok(match self {
$(
AdminApiRequest::$special_endpoint(_) => panic!(
@ -82,7 +81,132 @@ macro_rules! admin_endpoints {
),
)*
$(
AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?),
AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)*
})
}
}
}
};
}
macro_rules! local_admin_endpoints {
[
$($endpoint:ident,)*
] => {
paste! {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LocalAdminApiRequest {
$(
$endpoint( [<Local $endpoint Request>] ),
)*
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum LocalAdminApiResponse {
$(
$endpoint( [<Local $endpoint Response>] ),
)*
}
$(
pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >;
pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >];
pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >;
impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest {
fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest {
LocalAdminApiRequest::$endpoint(req)
}
}
impl TryFrom<LocalAdminApiResponse> for [< Local $endpoint Response >] {
type Error = LocalAdminApiResponse;
fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> {
match resp {
LocalAdminApiResponse::$endpoint(v) => Ok(v),
x => Err(x),
}
}
}
impl RequestHandler for [< $endpoint Request >] {
type Response = [< $endpoint Response >];
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<Self::Response, Error> {
let to = match self.node.as_str() {
"*" => garage.system.cluster_layout().all_nodes().to_vec(),
id => {
let nodes = garage.system.cluster_layout().all_nodes()
.iter()
.filter(|x| hex::encode(x).starts_with(id))
.cloned()
.collect::<Vec<_>>();
if nodes.len() != 1 {
return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes)));
}
nodes
}
};
let resps = garage.system.rpc_helper().call_many(&admin.endpoint,
&to,
AdminRpc::Internal(self.body.into()),
RequestStrategy::with_priority(PRIO_NORMAL),
).await?;
let mut ret = [< $endpoint Response >] {
success: HashMap::new(),
error: HashMap::new(),
};
for (node, resp) in resps {
match resp {
Ok(AdminRpcResponse::InternalApiOkResponse(r)) => {
match [< Local $endpoint Response >]::try_from(r) {
Ok(r) => {
ret.success.insert(hex::encode(node), r);
}
Err(_) => {
ret.error.insert(hex::encode(node), "returned invalid value".to_string());
}
}
}
Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => {
ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message));
}
Ok(_) => {
ret.error.insert(hex::encode(node), "returned invalid value".to_string());
}
Err(e) => {
ret.error.insert(hex::encode(node), e.to_string());
}
}
}
Ok(ret)
}
}
)*
impl LocalAdminApiRequest {
pub fn name(&self) -> &'static str {
match self {
$(
Self::$endpoint(_) => stringify!($endpoint),
)*
}
}
}
impl RequestHandler for LocalAdminApiRequest {
type Response = LocalAdminApiResponse;
async fn handle(self, garage: &Arc<Garage>, admin: &Admin) -> Result<LocalAdminApiResponse, Error> {
Ok(match self {
$(
LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?),
)*
})
}
@ -92,3 +216,4 @@ macro_rules! admin_endpoints {
}
pub(crate) use admin_endpoints;
pub(crate) use local_admin_endpoints;

216
src/api/admin/node.rs Normal file
View file

@ -0,0 +1,216 @@
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
use format_table::format_table_to_string;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::layout::PARTITION_BITS;
use garage_model::garage::Garage;
use crate::api::*;
use crate::error::Error;
use crate::{Admin, RequestHandler};
impl RequestHandler for LocalCreateMetadataSnapshotRequest {
type Response = LocalCreateMetadataSnapshotResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalCreateMetadataSnapshotResponse, Error> {
garage_model::snapshot::async_snapshot_metadata(garage).await?;
Ok(LocalCreateMetadataSnapshotResponse)
}
}
impl RequestHandler for LocalGetNodeStatisticsRequest {
type Response = LocalGetNodeStatisticsResponse;
// FIXME: return this as a JSON struct instead of text
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalGetNodeStatisticsResponse, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
"Garage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(gather_table_stats(&garage.bucket_table)?);
table.push(gather_table_stats(&garage.key_table)?);
table.push(gather_table_stats(&garage.object_table)?);
table.push(gather_table_stats(&garage.version_table)?);
table.push(gather_table_stats(&garage.block_ref_table)?);
write!(
&mut ret,
"\nTable stats:\n{}",
format_table_to_string(table)
)
.unwrap();
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
let rc_len = garage.block_manager.rc_len()?.to_string();
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
rc_len
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
garage.block_manager.resync.queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
garage.block_manager.resync.errors_len()?
)
.unwrap();
Ok(LocalGetNodeStatisticsResponse { freeform: ret })
}
}
impl RequestHandler for GetClusterStatisticsRequest {
type Response = GetClusterStatisticsResponse;
// FIXME: return this as a JSON struct instead of text
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<GetClusterStatisticsResponse, Error> {
let mut ret = String::new();
// Gather storage node and free space statistics for current nodes
let layout = &garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() {
let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = garage
.system
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<_, _>>();
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
.unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;
let avail = bytesize::ByteSize::b(avail);
let total = bytesize::ByteSize::b(total);
format!("{}/{} ({:.1}%)", avail, total, pct)
}
None => "?".into(),
};
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
table.push(format!(
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
id, hostname, zone, capacity, parts, data_avail, meta_avail
));
}
write!(
&mut ret,
"Storage nodes:\n{}",
format_table_to_string(table)
)
.unwrap();
let meta_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.meta_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
let data_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.data_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
let meta_avail =
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
let data_avail =
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
writeln!(
&mut ret,
"\nEstimated available storage space cluster-wide (might be lower in practice):"
)
.unwrap();
if meta_part_avail.len() < node_partition_count.len()
|| data_part_avail.len() < node_partition_count.len()
{
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
} else {
writeln!(&mut ret, " data: {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
}
}
Ok(GetClusterStatisticsResponse { freeform: ret })
}
}
fn gather_table_stats<F, R>(t: &Arc<Table<F, R>>) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}

View file

@ -4,6 +4,14 @@ use std::time::Duration;
use async_trait::async_trait;
use tokio::sync::watch;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::{Error as GarageError, OkOrMessage};
use garage_util::migrate::Migrate;
use garage_table::replication::*;
use garage_table::*;
use garage_block::manager::BlockManager;
use garage_block::repair::ScrubWorkerCommand;
@ -13,25 +21,23 @@ use garage_model::s3::mpu_table::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use garage_table::replication::*;
use garage_table::*;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::migrate::Migrate;
use crate::*;
use crate::api::*;
use crate::error::Error;
use crate::{Admin, RequestHandler};
const RC_REPAIR_ITER_COUNT: usize = 64;
pub async fn launch_online_repair(
impl RequestHandler for LocalLaunchRepairOperationRequest {
type Response = LocalLaunchRepairOperationResponse;
async fn handle(
self,
garage: &Arc<Garage>,
bg: &BackgroundRunner,
opt: RepairOpt,
) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
admin: &Admin,
) -> Result<LocalLaunchRepairOperationResponse, Error> {
let bg = &admin.background;
match self.repair_type {
RepairType::Tables => {
info!("Launching a full sync of tables");
garage.bucket_table.syncer.add_full_sync()?;
garage.object_table.syncer.add_full_sync()?;
@ -39,71 +45,66 @@ pub async fn launch_online_repair(
garage.block_ref_table.syncer.add_full_sync()?;
garage.key_table.syncer.add_full_sync()?;
}
RepairWhat::Versions => {
RepairType::Versions => {
info!("Repairing the versions table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions));
}
RepairWhat::MultipartUploads => {
RepairType::MultipartUploads => {
info!("Repairing the multipart uploads table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu));
}
RepairWhat::BlockRefs => {
RepairType::BlockRefs => {
info!("Repairing the block refs table");
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
}
RepairWhat::BlockRc => {
RepairType::BlockRc => {
info!("Repairing the block reference counters");
bg.spawn_worker(BlockRcRepair::new(
garage.block_manager.clone(),
garage.block_ref_table.clone(),
));
}
RepairWhat::Blocks => {
RepairType::Blocks => {
info!("Repairing the stored blocks");
bg.spawn_worker(garage_block::repair::RepairWorker::new(
garage.block_manager.clone(),
));
}
RepairWhat::Scrub { cmd } => {
RepairType::Scrub(cmd) => {
let cmd = match cmd {
ScrubCmd::Start => ScrubWorkerCommand::Start,
ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)),
ScrubCmd::Resume => ScrubWorkerCommand::Resume,
ScrubCmd::Cancel => ScrubWorkerCommand::Cancel,
ScrubCmd::SetTranquility { tranquility } => {
garage
.block_manager
.scrub_persister
.set_with(|x| x.tranquility = tranquility)?;
return Ok(());
ScrubCommand::Start => ScrubWorkerCommand::Start,
ScrubCommand::Pause => {
ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24))
}
ScrubCommand::Resume => ScrubWorkerCommand::Resume,
ScrubCommand::Cancel => ScrubWorkerCommand::Cancel,
};
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await?;
}
RepairWhat::Rebalance => {
RepairType::Rebalance => {
info!("Rebalancing the stored blocks among storage locations");
bg.spawn_worker(garage_block::repair::RebalanceWorker::new(
garage.block_manager.clone(),
));
}
}
Ok(())
Ok(LocalLaunchRepairOperationResponse)
}
}
// ----
#[async_trait]
trait TableRepair: Send + Sync + 'static {
type T: TableSchema;
fn table(garage: &Garage) -> &Table<Self::T, TableShardedReplication>;
async fn process(
fn process(
&mut self,
garage: &Garage,
entry: <<Self as TableRepair>::T as TableSchema>::E,
) -> Result<bool, Error>;
) -> impl std::future::Future<Output = Result<bool, GarageError>> + Send;
}
struct TableRepairWorker<T: TableRepair> {
@ -139,7 +140,10 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, GarageError> {
let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? {
Some((k, v)) => (v, k),
None => {
@ -174,7 +178,6 @@ impl<R: TableRepair> Worker for TableRepairWorker<R> {
struct RepairVersions;
#[async_trait]
impl TableRepair for RepairVersions {
type T = VersionTable;
@ -182,7 +185,7 @@ impl TableRepair for RepairVersions {
&garage.version_table
}
async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, Error> {
async fn process(&mut self, garage: &Garage, version: Version) -> Result<bool, GarageError> {
if !version.deleted.get() {
let ref_exists = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => garage
@ -221,7 +224,6 @@ impl TableRepair for RepairVersions {
struct RepairBlockRefs;
#[async_trait]
impl TableRepair for RepairBlockRefs {
type T = BlockRefTable;
@ -229,7 +231,11 @@ impl TableRepair for RepairBlockRefs {
&garage.block_ref_table
}
async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result<bool, Error> {
async fn process(
&mut self,
garage: &Garage,
mut block_ref: BlockRef,
) -> Result<bool, GarageError> {
if !block_ref.deleted.get() {
let ref_exists = garage
.version_table
@ -257,7 +263,6 @@ impl TableRepair for RepairBlockRefs {
struct RepairMpu;
#[async_trait]
impl TableRepair for RepairMpu {
type T = MultipartUploadTable;
@ -265,7 +270,11 @@ impl TableRepair for RepairMpu {
&garage.mpu_table
}
async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result<bool, Error> {
async fn process(
&mut self,
garage: &Garage,
mut mpu: MultipartUpload,
) -> Result<bool, GarageError> {
if !mpu.deleted.get() {
let ref_exists = garage
.object_table
@ -332,7 +341,10 @@ impl Worker for BlockRcRepair {
}
}
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerState, GarageError> {
for _i in 0..RC_REPAIR_ITER_COUNT {
let next1 = self
.block_manager

View file

@ -52,12 +52,28 @@ impl AdminApiRequest {
POST CreateBucket (body),
POST DeleteBucket (query::id),
POST UpdateBucket (body_field, query::id),
POST CleanupIncompleteUploads (body),
// Bucket-key permissions
POST AllowBucketKey (body),
POST DenyBucketKey (body),
// Bucket aliases
POST AddBucketAlias (body),
POST RemoveBucketAlias (body),
// Node APIs
POST CreateMetadataSnapshot (default::body, query::node),
GET GetNodeStatistics (default::body, query::node),
GET GetClusterStatistics (),
POST LaunchRepairOperation (body_field, query::node),
// Worker APIs
POST ListWorkers (body_field, query::node),
POST GetWorkerInfo (body_field, query::node),
POST GetWorkerVariable (body_field, query::node),
POST SetWorkerVariable (body_field, query::node),
// Block APIs
GET ListBlockErrors (default::body, query::node),
POST GetBlockInfo (body_field, query::node),
POST RetryBlockResync (body_field, query::node),
POST PurgeBlocks (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {
@ -239,6 +255,7 @@ impl AdminApiRequest {
generateQueryParameters! {
keywords: [],
fields: [
"node" => node,
"domain" => domain,
"format" => format,
"id" => id,

View file

@ -1,27 +1,31 @@
use std::sync::Arc;
use async_trait::async_trait;
use http::header::{
ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW,
};
use hyper::{Response, StatusCode};
#[cfg(feature = "metrics")]
use prometheus::{Encoder, TextEncoder};
use garage_model::garage::Garage;
use garage_rpc::system::ClusterHealthStatus;
use garage_api_common::helpers::*;
use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest};
use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest};
use crate::api_server::ResBody;
use crate::error::*;
use crate::EndpointHandler;
use crate::{Admin, RequestHandler};
#[async_trait]
impl EndpointHandler for OptionsRequest {
impl RequestHandler for OptionsRequest {
type Response = Response<ResBody>;
async fn handle(self, _garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
async fn handle(
self,
_garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<Response<ResBody>, Error> {
Ok(Response::builder()
.status(StatusCode::OK)
.header(ALLOW, "OPTIONS,GET,POST")
@ -32,11 +36,83 @@ impl EndpointHandler for OptionsRequest {
}
}
#[async_trait]
impl EndpointHandler for CheckDomainRequest {
impl RequestHandler for MetricsRequest {
type Response = Response<ResBody>;
async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
async fn handle(
self,
_garage: &Arc<Garage>,
admin: &Admin,
) -> Result<Response<ResBody>, Error> {
#[cfg(feature = "metrics")]
{
use opentelemetry::trace::Tracer;
let mut buffer = vec![];
let encoder = TextEncoder::new();
let tracer = opentelemetry::global::tracer("garage");
let metric_families = tracer.in_span("admin/gather_metrics", |_| {
admin.exporter.registry().gather()
});
encoder
.encode(&metric_families, &mut buffer)
.ok_or_internal_error("Could not serialize metrics")?;
Ok(Response::builder()
.status(StatusCode::OK)
.header(http::header::CONTENT_TYPE, encoder.format_type())
.body(bytes_body(buffer.into()))?)
}
#[cfg(not(feature = "metrics"))]
Err(Error::bad_request(
"Garage was built without the metrics feature".to_string(),
))
}
}
impl RequestHandler for HealthRequest {
type Response = Response<ResBody>;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<Response<ResBody>, Error> {
let health = garage.system.health();
let (status, status_str) = match health.status {
ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
ClusterHealthStatus::Degraded => (
StatusCode::OK,
"Garage is operational but some storage nodes are unavailable",
),
ClusterHealthStatus::Unavailable => (
StatusCode::SERVICE_UNAVAILABLE,
"Quorum is not available for some/all partitions, reads and writes will fail",
),
};
let status_str = format!(
"{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
status_str
);
Ok(Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain")
.body(string_body(status_str))?)
}
}
impl RequestHandler for CheckDomainRequest {
type Response = Response<ResBody>;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<Response<ResBody>, Error> {
if check_domain(garage, &self.domain).await? {
Ok(Response::builder()
.status(StatusCode::OK)
@ -101,33 +177,3 @@ async fn check_domain(garage: &Arc<Garage>, domain: &str) -> Result<bool, Error>
None => Ok(false),
}
}
#[async_trait]
impl EndpointHandler for HealthRequest {
type Response = Response<ResBody>;
async fn handle(self, garage: &Arc<Garage>) -> Result<Response<ResBody>, Error> {
let health = garage.system.health();
let (status, status_str) = match health.status {
ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"),
ClusterHealthStatus::Degraded => (
StatusCode::OK,
"Garage is operational but some storage nodes are unavailable",
),
ClusterHealthStatus::Unavailable => (
StatusCode::SERVICE_UNAVAILABLE,
"Quorum is not available for some/all partitions, reads and writes will fail",
),
};
let status_str = format!(
"{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n",
status_str
);
Ok(Response::builder()
.status(status)
.header(http::header::CONTENT_TYPE, "text/plain")
.body(string_body(status_str))?)
}
}

118
src/api/admin/worker.rs Normal file
View file

@ -0,0 +1,118 @@
use std::collections::HashMap;
use std::sync::Arc;
use garage_util::background::*;
use garage_util::time::now_msec;
use garage_model::garage::Garage;
use crate::api::*;
use crate::error::Error;
use crate::{Admin, RequestHandler};
impl RequestHandler for LocalListWorkersRequest {
type Response = LocalListWorkersResponse;
async fn handle(
self,
_garage: &Arc<Garage>,
admin: &Admin,
) -> Result<LocalListWorkersResponse, Error> {
let workers = admin.background.get_worker_info();
let info = workers
.into_iter()
.filter(|(_, w)| {
(!self.busy_only
|| matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_)))
&& (!self.error_only || w.errors > 0)
})
.map(|(id, w)| worker_info_to_api(id as u64, w))
.collect::<Vec<_>>();
Ok(LocalListWorkersResponse(info))
}
}
impl RequestHandler for LocalGetWorkerInfoRequest {
type Response = LocalGetWorkerInfoResponse;
async fn handle(
self,
_garage: &Arc<Garage>,
admin: &Admin,
) -> Result<LocalGetWorkerInfoResponse, Error> {
let info = admin
.background
.get_worker_info()
.get(&(self.id as usize))
.ok_or(Error::NoSuchWorker(self.id))?
.clone();
Ok(LocalGetWorkerInfoResponse(worker_info_to_api(
self.id, info,
)))
}
}
impl RequestHandler for LocalGetWorkerVariableRequest {
type Response = LocalGetWorkerVariableResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalGetWorkerVariableResponse, Error> {
let mut res = HashMap::new();
if let Some(k) = self.variable {
res.insert(k.clone(), garage.bg_vars.get(&k)?);
} else {
let vars = garage.bg_vars.get_all();
for (k, v) in vars.iter() {
res.insert(k.to_string(), v.to_string());
}
}
Ok(LocalGetWorkerVariableResponse(res))
}
}
impl RequestHandler for LocalSetWorkerVariableRequest {
type Response = LocalSetWorkerVariableResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalSetWorkerVariableResponse, Error> {
garage.bg_vars.set(&self.variable, &self.value)?;
Ok(LocalSetWorkerVariableResponse {
variable: self.variable,
value: self.value,
})
}
}
// ---- helper functions ----
fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
WorkerInfoResp {
id,
name: info.name,
state: match info.state {
WorkerState::Busy => WorkerStateResp::Busy,
WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t },
WorkerState::Idle => WorkerStateResp::Idle,
WorkerState::Done => WorkerStateResp::Done,
},
errors: info.errors as u64,
consecutive_errors: info.consecutive_errors as u64,
last_error: info.last_error.map(|(message, t)| WorkerLastError {
message,
secs_ago: now_msec().saturating_sub(t) / 1000,
}),
tranquility: info.status.tranquility,
progress: info.status.progress,
queue_length: info.status.queue_length,
persistent_errors: info.status.persistent_errors,
freeform: info.status.freeform,
}
}

View file

@ -141,6 +141,9 @@ macro_rules! router_match {
}
}};
(@@parse_param $query:expr, default, $param:ident) => {{
Default::default()
}};
(@@parse_param $query:expr, query_opt, $param:ident) => {{
// extract optional query parameter
$query.$param.take().map(|param| param.into_owned())

View file

@ -49,8 +49,6 @@ sodiumoxide.workspace = true
structopt.workspace = true
git-version.workspace = true
serde.workspace = true
futures.workspace = true
tokio.workspace = true

View file

@ -1,235 +0,0 @@
use garage_util::data::*;
use garage_table::*;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use crate::cli::*;
use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
match cmd {
BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
self.garage.block_manager.list_resync_errors()?,
)),
BlockOperation::Info { hash } => self.handle_block_info(hash).await,
BlockOperation::RetryNow { all, blocks } => {
self.handle_block_retry_now(*all, blocks).await
}
BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
}
}
async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
let hash = self.find_block_hash_by_prefix(hash)?;
let refcount = self.garage.block_manager.get_block_rc(&hash)?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
let mut uploads = vec![];
for br in block_refs {
if let Some(v) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
uploads.push(u);
}
}
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
}
}
Ok(AdminRpc::BlockInfo {
hash,
refcount,
versions,
uploads,
})
}
async fn handle_block_retry_now(
&self,
all: bool,
blocks: &[String],
) -> Result<AdminRpc, Error> {
if all {
if !blocks.is_empty() {
return Err(Error::BadRequest(
"--all was specified, cannot also specify blocks".into(),
));
}
let blocks = self.garage.block_manager.list_resync_errors()?;
for b in blocks.iter() {
self.garage.block_manager.resync.clear_backoff(&b.hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
} else {
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
self.garage.block_manager.resync.clear_backoff(&hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
}
}
async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
if !yes {
return Err(Error::BadRequest(
"Pass the --yes flag to confirm block purge operation.".into(),
));
}
let mut obj_dels = 0;
let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
for br in block_refs {
if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
self.handle_block_purge_version_backlink(
&version,
&mut obj_dels,
&mut mpu_dels,
)
.await?;
if !version.deleted.get() {
let deleted_version = Version::new(version.uuid, version.backlink, true);
self.garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
}
Ok(AdminRpc::Ok(format!(
"Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(),
ver_dels,
obj_dels,
mpu_dels,
)))
}
async fn handle_block_purge_version_backlink(
&self,
version: &Version,
obj_dels: &mut usize,
mpu_dels: &mut usize,
) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
}
// ---- helper function ----
fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> {
if prefix.len() < 4 {
return Err(Error::BadRequest(
"Please specify at least 4 characters of the block hash".into(),
));
}
let prefix_bin =
hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
let iter = self
.garage
.block_ref_table
.data
.store
.range(&prefix_bin[..]..)
.map_err(GarageError::from)?;
let mut found = None;
for item in iter {
let (k, _v) = item.map_err(GarageError::from)?;
let hash = Hash::try_from(&k[..32]).unwrap();
if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
break;
}
if hex::encode(hash.as_slice()).starts_with(prefix) {
match &found {
Some(x) if *x == hash => (),
Some(_) => {
return Err(Error::BadRequest(format!(
"Several blocks match prefix `{}`",
prefix
)));
}
None => {
found = Some(hash);
}
}
}
}
found.ok_or_else(|| Error::BadRequest("No matching block found".into()))
}
}

View file

@ -1,53 +0,0 @@
use std::fmt::Write;
use garage_model::helper::error::{Error, OkOrBadRequest};
use crate::cli::*;
use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::CleanupIncompleteUploads(query) => {
self.handle_bucket_cleanup_incomplete_uploads(query).await
}
_ => unreachable!(),
}
}
async fn handle_bucket_cleanup_incomplete_uploads(
&self,
query: &CleanupIncompleteUploadsOpt,
) -> Result<AdminRpc, Error> {
let mut bucket_ids = vec![];
for b in query.buckets.iter() {
bucket_ids.push(
self.garage
.bucket_helper()
.admin_get_existing_matching_bucket(b)
.await?,
);
}
let duration = parse_duration::parse::parse(&query.older_than)
.ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
let mut ret = String::new();
for bucket in bucket_ids {
let count = self
.garage
.bucket_helper()
.cleanup_incomplete_uploads(&bucket, duration)
.await?;
writeln!(
&mut ret,
"Bucket {:?}: {} incomplete uploads aborted",
bucket, count
)
.unwrap();
}
Ok(AdminRpc::Ok(ret))
}
}

View file

@ -1,545 +0,0 @@
mod block;
mod bucket;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use format_table::format_table_to_string;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_table::replication::*;
use garage_table::*;
use garage_rpc::layout::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::garage::Garage;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse};
use garage_api_admin::EndpointHandler as AdminApiEndpoint;
use garage_api_common::generic_server::ApiError;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
LaunchRepair(RepairOpt),
Stats(StatsOpt),
Worker(WorkerOperation),
BlockOperation(BlockOperation),
MetaOperation(MetaOperation),
// Replies
Ok(String),
WorkerList(
HashMap<usize, garage_util::background::WorkerInfo>,
WorkerListOpt,
),
WorkerVars(Vec<(Uuid, String, String)>),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
},
// Proxying HTTP Admin API endpoints
ApiRequest(AdminApiRequest),
ApiOkResponse(TaggedAdminApiResponse),
ApiErrorResponse {
http_code: u16,
error_code: String,
message: String,
},
}
impl Rpc for AdminRpc {
type Response = Result<AdminRpc, Error>;
}
pub struct AdminRpcHandler {
garage: Arc<Garage>,
background: Arc<BackgroundRunner>,
endpoint: Arc<Endpoint<AdminRpc, Self>>,
}
impl AdminRpcHandler {
pub fn new(garage: Arc<Garage>, background: Arc<BackgroundRunner>) -> Arc<Self> {
let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into());
let admin = Arc::new(Self {
garage,
background,
endpoint,
});
admin.endpoint.set_handler(admin.clone());
admin
}
// ================ REPAIR COMMANDS ====================
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
"Please provide the --yes flag to initiate repair operations.".to_string(),
));
}
if opt.all_nodes {
let mut opt_to_send = opt.clone();
opt_to_send.all_nodes = false;
let mut failures = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
let resp = self
.endpoint
.call(
&node,
AdminRpc::LaunchRepair(opt_to_send.clone()),
PRIO_NORMAL,
)
.await;
if !matches!(resp, Ok(Ok(_))) {
failures.push(node);
}
}
if failures.is_empty() {
Ok(AdminRpc::Ok("Repair launched on all nodes".to_string()))
} else {
Err(Error::BadRequest(format!(
"Could not launch repair on nodes: {:?} (launched successfully on other nodes)",
failures
)))
}
} else {
launch_online_repair(&self.garage, &self.background, opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
)))
}
}
// ================ STATS COMMANDS ====================
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let mut opt = opt.clone();
opt.all_nodes = false;
opt.skip_global = true;
writeln!(&mut ret, "\n======================").unwrap();
writeln!(&mut ret, "Stats for node {:?}:", node).unwrap();
let node_id = (*node).into();
match self
.endpoint
.call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL)
.await
{
Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(),
Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(),
Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(),
Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(),
}
}
writeln!(&mut ret, "\n======================").unwrap();
write!(
&mut ret,
"Cluster statistics:\n\n{}",
self.gather_cluster_stats()
)
.unwrap();
Ok(AdminRpc::Ok(ret))
} else {
Ok(AdminRpc::Ok(self.gather_stats_local(opt)?))
}
}
fn gather_stats_local(&self, opt: StatsOpt) -> Result<String, Error> {
let mut ret = String::new();
writeln!(
&mut ret,
"\nGarage version: {} [features: {}]\nRust compiler version: {}",
garage_util::version::garage_version(),
garage_util::version::garage_features()
.map(|list| list.join(", "))
.unwrap_or_else(|| "(unknown)".into()),
garage_util::version::rust_version(),
)
.unwrap();
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table)?);
table.push(self.gather_table_stats(&self.garage.key_table)?);
table.push(self.gather_table_stats(&self.garage.object_table)?);
table.push(self.gather_table_stats(&self.garage.version_table)?);
table.push(self.gather_table_stats(&self.garage.block_ref_table)?);
write!(
&mut ret,
"\nTable stats:\n{}",
format_table_to_string(table)
)
.unwrap();
// Gather block manager statistics
writeln!(&mut ret, "\nBlock manager stats:").unwrap();
let rc_len = self.garage.block_manager.rc_len()?.to_string();
writeln!(
&mut ret,
" number of RC entries (~= number of blocks): {}",
rc_len
)
.unwrap();
writeln!(
&mut ret,
" resync queue length: {}",
self.garage.block_manager.resync.queue_len()?
)
.unwrap();
writeln!(
&mut ret,
" blocks with resync errors: {}",
self.garage.block_manager.resync.errors_len()?
)
.unwrap();
if !opt.skip_global {
write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap();
}
Ok(ret)
}
fn gather_cluster_stats(&self) -> String {
let mut ret = String::new();
// Gather storage node and free space statistics for current nodes
let layout = &self.garage.system.cluster_layout();
let mut node_partition_count = HashMap::<Uuid, u64>::new();
for short_id in layout.current().ring_assignment_data.iter() {
let id = layout.current().node_id_vec[*short_id as usize];
*node_partition_count.entry(id).or_default() += 1;
}
let node_info = self
.garage
.system
.get_known_nodes()
.into_iter()
.map(|n| (n.id, n))
.collect::<HashMap<_, _>>();
let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()];
for (id, parts) in node_partition_count.iter() {
let info = node_info.get(id);
let status = info.map(|x| &x.status);
let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref());
let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?");
let zone = role.map(|x| x.zone.as_str()).unwrap_or("?");
let capacity = role
.map(|x| x.capacity_string())
.unwrap_or_else(|| "?".into());
let avail_str = |x| match x {
Some((avail, total)) => {
let pct = (avail as f64) / (total as f64) * 100.;
let avail = bytesize::ByteSize::b(avail);
let total = bytesize::ByteSize::b(total);
format!("{}/{} ({:.1}%)", avail, total, pct)
}
None => "?".into(),
};
let data_avail = avail_str(status.and_then(|x| x.data_disk_avail));
let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail));
table.push(format!(
" {:?}\t{}\t{}\t{}\t{}\t{}\t{}",
id, hostname, zone, capacity, parts, data_avail, meta_avail
));
}
write!(
&mut ret,
"Storage nodes:\n{}",
format_table_to_string(table)
)
.unwrap();
let meta_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.meta_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
let data_part_avail = node_partition_count
.iter()
.filter_map(|(id, parts)| {
node_info
.get(id)
.and_then(|x| x.status.data_disk_avail)
.map(|c| c.0 / *parts)
})
.collect::<Vec<_>>();
if !meta_part_avail.is_empty() && !data_part_avail.is_empty() {
let meta_avail =
bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
let data_avail =
bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS));
writeln!(
&mut ret,
"\nEstimated available storage space cluster-wide (might be lower in practice):"
)
.unwrap();
if meta_part_avail.len() < node_partition_count.len()
|| data_part_avail.len() < node_partition_count.len()
{
writeln!(&mut ret, " data: < {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap();
writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap();
} else {
writeln!(&mut ret, " data: {}", data_avail).unwrap();
writeln!(&mut ret, " metadata: {}", meta_avail).unwrap();
}
}
ret
}
fn gather_table_stats<F, R>(&self, t: &Arc<Table<F, R>>) -> Result<String, Error>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
let data_len = t.data.store.len().map_err(GarageError::from)?.to_string();
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}
// ================ WORKER COMMANDS ====================
async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
match cmd {
WorkerOperation::List { opt } => {
let workers = self.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, *opt))
}
WorkerOperation::Info { tid } => {
let info = self
.background
.get_worker_info()
.get(tid)
.ok_or_bad_request(format!("No worker with TID {}", tid))?
.clone();
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerOperation::Get {
all_nodes,
variable,
} => self.handle_get_var(*all_nodes, variable).await,
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.handle_set_var(*all_nodes, variable, value).await,
}
}
async fn handle_get_var(
&self,
all_nodes: bool,
variable: &Option<String>,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Get {
all_nodes: false,
variable: variable.clone(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
#[allow(clippy::collapsible_else_if)]
if let Some(v) = variable {
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
v.clone(),
self.garage.bg_vars.get(v)?,
)]))
} else {
let mut vars = self.garage.bg_vars.get_all();
vars.sort();
Ok(AdminRpc::WorkerVars(
vars.into_iter()
.map(|(k, v)| (self.garage.system.id, k.to_string(), v))
.collect(),
))
}
}
}
async fn handle_set_var(
&self,
all_nodes: bool,
variable: &str,
value: &str,
) -> Result<AdminRpc, Error> {
if all_nodes {
let mut ret = vec![];
let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec();
for node in all_nodes.iter() {
let node = (*node).into();
match self
.endpoint
.call(
&node,
AdminRpc::Worker(WorkerOperation::Set {
all_nodes: false,
variable: variable.to_string(),
value: value.to_string(),
}),
PRIO_NORMAL,
)
.await??
{
AdminRpc::WorkerVars(v) => ret.extend(v),
m => return Err(GarageError::unexpected_rpc_message(m).into()),
}
}
Ok(AdminRpc::WorkerVars(ret))
} else {
self.garage.bg_vars.set(variable, value)?;
Ok(AdminRpc::WorkerVars(vec![(
self.garage.system.id,
variable.to_string(),
value.to_string(),
)]))
}
}
// ================ META DB COMMANDS ====================
async fn handle_meta_cmd(self: &Arc<Self>, mo: &MetaOperation) -> Result<AdminRpc, Error> {
match mo {
MetaOperation::Snapshot { all: true } => {
let to = self.garage.system.cluster_layout().all_nodes().to_vec();
let resps = futures::future::join_all(to.iter().map(|to| async move {
let to = (*to).into();
self.endpoint
.call(
&to,
AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }),
PRIO_NORMAL,
)
.await
}))
.await;
let mut ret = vec![];
for (to, resp) in to.iter().zip(resps.iter()) {
let res_str = match resp {
Ok(_) => "ok".to_string(),
Err(e) => format!("error: {}", e),
};
ret.push(format!("{:?}\t{}", to, res_str));
}
Ok(AdminRpc::Ok(format_table_to_string(ret)))
}
MetaOperation::Snapshot { all: false } => {
garage_model::snapshot::async_snapshot_metadata(&self.garage).await?;
Ok(AdminRpc::Ok("Snapshot has been saved.".into()))
}
}
}
// ================== PROXYING ADMIN API REQUESTS ===================
async fn handle_api_request(
self: &Arc<Self>,
req: &AdminApiRequest,
) -> Result<AdminRpc, Error> {
let req = req.clone();
info!("Proxied admin API request: {}", req.name());
let res = req.handle(&self.garage).await;
match res {
Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())),
Err(e) => Ok(AdminRpc::ApiErrorResponse {
http_code: e.http_status_code().as_u16(),
error_code: e.code().to_string(),
message: e.to_string(),
}),
}
}
}
#[async_trait]
impl EndpointHandler<AdminRpc> for AdminRpcHandler {
async fn handle(
self: &Arc<Self>,
message: &AdminRpc,
_from: NodeID,
) -> Result<AdminRpc, Error> {
match message {
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
AdminRpc::ApiRequest(r) => self.handle_api_request(r).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}
}

View file

@ -1,60 +0,0 @@
use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use garage_model::helper::error::Error as HelperError;
use crate::admin::*;
use crate::cli::*;
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
rpc_host: NodeID,
args: AdminRpc,
) -> Result<(), HelperError> {
match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? {
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::WorkerList(wi, wlo) => {
print_worker_list(wi, wlo);
}
AdminRpc::WorkerVars(wv) => {
print_worker_vars(wv);
}
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
AdminRpc::BlockErrorList(el) => {
print_block_error_list(el);
}
AdminRpc::BlockInfo {
hash,
refcount,
versions,
uploads,
} => {
print_block_info(hash, refcount, versions, uploads);
}
r => {
error!("Unexpected response: {:?}", r);
}
}
Ok(())
}
// ---- utility ----
pub async fn fetch_status(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}

View file

@ -7,7 +7,7 @@ use garage_rpc::layout::*;
use garage_rpc::system::*;
use garage_rpc::*;
use crate::cli::*;
use crate::cli::structs::*;
pub async fn cmd_show_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
@ -260,6 +260,19 @@ pub async fn cmd_layout_skip_dead_nodes(
// --- utility ---
pub async fn fetch_status(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,
) -> Result<Vec<KnownNodeInfo>, Error> {
match rpc_cli
.call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL)
.await??
{
SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes),
resp => Err(Error::unexpected_rpc_message(resp)),
}
}
pub async fn fetch_layout(
rpc_cli: &Endpoint<SystemRpc, ()>,
rpc_host: NodeID,

View file

@ -1,12 +1,7 @@
pub(crate) mod cmd;
pub(crate) mod init;
pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod util;
pub(crate) mod convert_db;
pub(crate) mod init;
pub(crate) mod repair;
pub(crate) use cmd::*;
pub(crate) use init::*;
pub(crate) use structs::*;
pub(crate) use util::*;
pub(crate) mod layout;

View file

@ -1,4 +1,3 @@
use serde::{Deserialize, Serialize};
use structopt::StructOpt;
use garage_util::version::garage_version;
@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt {
pub(crate) allow_missing_data: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub enum BucketOperation {
/// List buckets
#[structopt(name = "list", version = garage_version())]
@ -237,7 +236,7 @@ pub enum BucketOperation {
CleanupIncompleteUploads(CleanupIncompleteUploadsOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct WebsiteOpt {
/// Create
#[structopt(long = "allow")]
@ -259,13 +258,13 @@ pub struct WebsiteOpt {
pub error_document: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct BucketOpt {
/// Bucket name
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct DeleteBucketOpt {
/// Bucket name
pub name: String,
@ -275,7 +274,7 @@ pub struct DeleteBucketOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct AliasBucketOpt {
/// Existing bucket name (its alias in global namespace or its full hex uuid)
pub existing_bucket: String,
@ -288,7 +287,7 @@ pub struct AliasBucketOpt {
pub local: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct UnaliasBucketOpt {
/// Bucket name
pub name: String,
@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt {
pub local: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct PermBucketOpt {
/// Access key name or ID
#[structopt(long = "key")]
@ -321,7 +320,7 @@ pub struct PermBucketOpt {
pub bucket: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct SetQuotasOpt {
/// Bucket name
pub bucket: String,
@ -336,7 +335,7 @@ pub struct SetQuotasOpt {
pub max_objects: Option<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct CleanupIncompleteUploadsOpt {
/// Abort multipart uploads older than this value
#[structopt(long = "older-than", default_value = "1d")]
@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt {
pub buckets: Vec<String>,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub enum KeyOperation {
/// List keys
#[structopt(name = "list", version = garage_version())]
@ -382,7 +381,7 @@ pub enum KeyOperation {
Import(KeyImportOpt),
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyInfoOpt {
/// ID or name of the key
pub key_pattern: String,
@ -391,14 +390,14 @@ pub struct KeyInfoOpt {
pub show_secret: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyNewOpt {
/// Name of the key
#[structopt(default_value = "Unnamed key")]
pub name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyRenameOpt {
/// ID or name of the key
pub key_pattern: String,
@ -407,7 +406,7 @@ pub struct KeyRenameOpt {
pub new_name: String,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyDeleteOpt {
/// ID or name of the key
pub key_pattern: String,
@ -417,7 +416,7 @@ pub struct KeyDeleteOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyPermOpt {
/// ID or name of the key
pub key_pattern: String,
@ -427,7 +426,7 @@ pub struct KeyPermOpt {
pub create_bucket: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug)]
#[derive(StructOpt, Debug)]
pub struct KeyImportOpt {
/// Access key ID
pub key_id: String,
@ -444,7 +443,7 @@ pub struct KeyImportOpt {
pub yes: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
#[derive(StructOpt, Debug, Clone)]
pub struct RepairOpt {
/// Launch repair operation on all nodes
#[structopt(short = "a", long = "all-nodes")]
@ -458,7 +457,7 @@ pub struct RepairOpt {
pub what: RepairWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum RepairWhat {
/// Do a full sync of metadata tables
#[structopt(name = "tables", version = garage_version())]
@ -489,7 +488,7 @@ pub enum RepairWhat {
Rebalance,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum ScrubCmd {
/// Start scrub
#[structopt(name = "start", version = garage_version())]
@ -503,15 +502,9 @@ pub enum ScrubCmd {
/// Cancel scrub in progress
#[structopt(name = "cancel", version = garage_version())]
Cancel,
/// Set tranquility level for in-progress and future scrubs
#[structopt(name = "set-tranquility", version = garage_version())]
SetTranquility {
#[structopt()]
tranquility: u32,
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
#[derive(StructOpt, Debug, Clone)]
pub struct OfflineRepairOpt {
/// Confirm the launch of the repair operation
#[structopt(long = "yes")]
@ -521,7 +514,7 @@ pub struct OfflineRepairOpt {
pub what: OfflineRepairWhat,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum OfflineRepairWhat {
/// Repair K2V item counters
#[cfg(feature = "k2v")]
@ -532,19 +525,14 @@ pub enum OfflineRepairWhat {
ObjectCounters,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
#[derive(StructOpt, Debug, Clone)]
pub struct StatsOpt {
/// Gather statistics from all nodes
#[structopt(short = "a", long = "all-nodes")]
pub all_nodes: bool,
/// Don't show global cluster stats (internal use in RPC)
#[structopt(skip)]
#[serde(default)]
pub skip_global: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
@ -577,7 +565,7 @@ pub enum WorkerOperation {
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub struct WorkerListOpt {
/// Show only busy workers
#[structopt(short = "b", long = "busy")]
@ -587,7 +575,7 @@ pub struct WorkerListOpt {
pub errors: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
@ -619,7 +607,7 @@ pub enum BlockOperation {
},
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)]
pub enum MetaOperation {
/// Save a snapshot of the metadata db file
#[structopt(name = "snapshot", version = garage_version())]

View file

@ -1,216 +0,0 @@
use std::collections::HashMap;
use std::time::Duration;
use format_table::format_table;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::*;
use crate::cli::structs::WorkerListOpt;
pub fn print_worker_list(wi: HashMap<usize, WorkerInfo>, wlo: WorkerListOpt) {
let mut wi = wi.into_iter().collect::<Vec<_>>();
wi.sort_by_key(|(tid, info)| {
(
match info.state {
WorkerState::Busy | WorkerState::Throttled(_) => 0,
WorkerState::Idle => 1,
WorkerState::Done => 2,
},
*tid,
)
});
let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
for (tid, info) in wi.iter() {
if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) {
continue;
}
if wlo.errors && info.errors == 0 {
continue;
}
let tf = timeago::Formatter::new();
let err_ago = info
.last_error
.as_ref()
.map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t)))
.unwrap_or_default();
let (total_err, consec_err) = if info.errors > 0 {
(info.errors.to_string(), info.consecutive_errors.to_string())
} else {
("-".into(), "-".into())
};
table.push(format!(
"{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
tid,
info.state,
info.name,
info.status
.tranquility
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "-".into()),
info.status.progress.as_deref().unwrap_or("-"),
info.status
.queue_length
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "-".into()),
total_err,
consec_err,
err_ago,
));
}
format_table(table);
}
pub fn print_worker_info(tid: usize, info: WorkerInfo) {
let mut table = vec![];
table.push(format!("Task id:\t{}", tid));
table.push(format!("Worker name:\t{}", info.name));
match info.state {
WorkerState::Throttled(t) => {
table.push(format!(
"Worker state:\tBusy (throttled, paused for {:.3}s)",
t
));
}
s => {
table.push(format!("Worker state:\t{}", s));
}
};
if let Some(tql) = info.status.tranquility {
table.push(format!("Tranquility:\t{}", tql));
}
table.push("".into());
table.push(format!("Total errors:\t{}", info.errors));
table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
if let Some((s, t)) = info.last_error {
table.push(format!("Last error:\t{}", s));
let tf = timeago::Formatter::new();
table.push(format!(
"Last error time:\t{}",
tf.convert(Duration::from_millis(now_msec() - t))
));
}
table.push("".into());
if let Some(p) = info.status.progress {
table.push(format!("Progress:\t{}", p));
}
if let Some(ql) = info.status.queue_length {
table.push(format!("Queue length:\t{}", ql));
}
if let Some(pe) = info.status.persistent_errors {
table.push(format!("Persistent errors:\t{}", pe));
}
for (i, s) in info.status.freeform.iter().enumerate() {
if i == 0 {
if table.last() != Some(&"".into()) {
table.push("".into());
}
table.push(format!("Message:\t{}", s));
} else {
table.push(format!("\t{}", s));
}
}
format_table(table);
}
pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) {
let table = wv
.into_iter()
.map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v))
.collect::<Vec<_>>();
format_table(table);
}
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
let mut tf2 = timeago::Formatter::new();
tf2.ago("");
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
let next_try = if e.next_try > now {
tf2.convert(Duration::from_millis(e.next_try - now))
} else {
"asap".to_string()
};
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
next_try
));
}
format_table(table);
}
pub fn print_block_info(
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
match &ver.backlink {
VersionBacklink::Object { bucket_id, key } => {
table.push(format!(
"{:?}\t{:?}\t{}\t\t{:?}",
ver.uuid,
bucket_id,
key,
ver.deleted.get()
));
}
VersionBacklink::MultipartUpload { upload_id } => {
let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
table.push(format!(
"{:?}\t{:?}\t{}\t{:?}\t{:?}",
ver.uuid,
upload.map(|u| u.bucket_id).unwrap_or_default(),
upload.map(|u| u.key.as_str()).unwrap_or_default(),
upload_id,
ver.deleted.get()
));
}
}
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
table.push(format!("{:?}\t\t\t\tyes", vh));
}
}
}
format_table(table);
if refcount != nondeleted_count {
println!();
println!(
"Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
);
}
}

145
src/garage/cli_v2/block.rs Normal file
View file

@ -0,0 +1,145 @@
//use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> {
match cmd {
BlockOperation::ListErrors => self.cmd_list_block_errors().await,
BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await,
BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await,
}
}
pub async fn cmd_list_block_errors(&self) -> Result<(), Error> {
let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0;
let tf = timeago::Formatter::new();
let mut tf2 = timeago::Formatter::new();
tf2.ago("");
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in errors {
let next_try = if e.next_try_in_secs > 0 {
tf2.convert(Duration::from_secs(e.next_try_in_secs))
} else {
"asap".to_string()
};
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
e.block_hash,
e.refcount,
e.error_count,
tf.convert(Duration::from_secs(e.last_try_secs_ago)),
next_try
));
}
format_table(table);
Ok(())
}
pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> {
let info = self
.local_api_request(LocalGetBlockInfoRequest { block_hash: hash })
.await?;
println!("Block hash: {}", info.block_hash);
println!("Refcount: {}", info.refcount);
println!();
let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for ver in info.versions.iter() {
match &ver.backlink {
Some(BlockVersionBacklink::Object { bucket_id, key }) => {
table.push(format!(
"{:.16}\t{:.16}\t{}\t\t{:?}",
ver.version_id, bucket_id, key, ver.deleted
));
}
Some(BlockVersionBacklink::Upload {
upload_id,
upload_deleted: _,
upload_garbage_collected: _,
bucket_id,
key,
}) => {
table.push(format!(
"{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}",
ver.version_id,
bucket_id.as_deref().unwrap_or(""),
key.as_deref().unwrap_or(""),
upload_id,
ver.deleted
));
}
None => {
table.push(format!("{:.16}\t\t\tyes", ver.version_id));
}
}
if !ver.deleted {
nondeleted_count += 1;
}
}
format_table(table);
if info.refcount != nondeleted_count {
println!();
println!(
"Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
);
}
Ok(())
}
pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> {
let req = match (all, blocks.len()) {
(true, 0) => LocalRetryBlockResyncRequest::All { all: true },
(false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks {
block_hashes: blocks,
},
_ => {
return Err(Error::Message(
"Please specify block hashes or --all (not both)".into(),
))
}
};
let res = self.local_api_request(req).await?;
println!(
"{} blocks returned in queue for a retry now (check logs to see results)",
res.count
);
Ok(())
}
pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> {
if !yes {
return Err(Error::Message(
"Pass the --yes flag to confirm block purge operation.".into(),
));
}
let res = self
.local_api_request(LocalPurgeBlocksRequest(blocks))
.await?;
println!(
"Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads",
res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted,
);
Ok(())
}
}

View file

@ -5,7 +5,6 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
@ -22,15 +21,9 @@ impl Cli {
BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await,
BucketOperation::Website(query) => self.cmd_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await,
// TODO
x => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BucketOperation(x),
)
.await
.ok_or_message("old error"),
BucketOperation::CleanupIncompleteUploads(query) => {
self.cmd_cleanup_incomplete_uploads(query).await
}
}
}
@ -520,4 +513,37 @@ impl Cli {
Ok(())
}
pub async fn cmd_cleanup_incomplete_uploads(
&self,
opt: CleanupIncompleteUploadsOpt,
) -> Result<(), Error> {
let older_than = parse_duration::parse::parse(&opt.older_than)
.ok_or_message("Invalid duration passed for --older-than parameter")?;
for b in opt.buckets.iter() {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(b.clone()),
})
.await?;
let res = self
.api_request(CleanupIncompleteUploadsRequest {
bucket_id: bucket.id.clone(),
older_than_secs: older_than.as_secs(),
})
.await?;
if res.uploads_deleted > 0 {
println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted);
} else {
println!("{:.16}: no uploads deleted", bucket.id);
}
}
Ok(())
}
}

View file

@ -3,6 +3,10 @@ pub mod cluster;
pub mod key;
pub mod layout;
pub mod block;
pub mod node;
pub mod worker;
use std::convert::TryFrom;
use std::sync::Arc;
use std::time::Duration;
@ -13,16 +17,14 @@ use garage_rpc::system::*;
use garage_rpc::*;
use garage_api_admin::api::*;
use garage_api_admin::EndpointHandler as AdminApiEndpoint;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse};
use garage_api_admin::RequestHandler;
use crate::admin::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli::Command;
pub struct Cli {
pub system_rpc_endpoint: Arc<Endpoint<SystemRpc, ()>>,
pub admin_rpc_endpoint: Arc<Endpoint<AdminRpc, ()>>,
pub proxy_rpc_endpoint: Arc<Endpoint<ProxyRpc, ()>>,
pub rpc_host: NodeID,
}
@ -36,63 +38,35 @@ impl Cli {
Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await,
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::LaunchRepair(ro),
)
.await
.ok_or_message("cli_v1"),
Command::Stats(so) => {
cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so))
.await
.ok_or_message("cli_v1")
}
Command::Worker(wo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::Worker(wo),
)
.await
.ok_or_message("cli_v1"),
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
Command::Meta(mo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::MetaOperation(mo),
)
.await
.ok_or_message("cli_v1"),
Command::Worker(wo) => self.cmd_worker(wo).await,
Command::Block(bo) => self.cmd_block(bo).await,
Command::Meta(mo) => self.cmd_meta(mo).await,
Command::Stats(so) => self.cmd_stats(so).await,
Command::Repair(ro) => self.cmd_repair(ro).await,
_ => unreachable!(),
}
}
pub async fn api_request<T>(&self, req: T) -> Result<<T as AdminApiEndpoint>::Response, Error>
pub async fn api_request<T>(&self, req: T) -> Result<<T as RequestHandler>::Response, Error>
where
T: AdminApiEndpoint,
T: RequestHandler,
AdminApiRequest: From<T>,
<T as AdminApiEndpoint>::Response: TryFrom<TaggedAdminApiResponse>,
<T as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = AdminApiRequest::from(req);
let req_name = req.name();
match self
.admin_rpc_endpoint
.call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL)
.await?
.ok_or_message("rpc")?
.proxy_rpc_endpoint
.call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL)
.await??
{
AdminRpc::ApiOkResponse(resp) => <T as AdminApiEndpoint>::Response::try_from(resp)
.map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))),
AdminRpc::ApiErrorResponse {
ProxyRpcResponse::ProxyApiOkResponse(resp) => {
<T as RequestHandler>::Response::try_from(resp).map_err(|_| {
Error::Message(format!("{} returned unexpected response", req_name))
})
}
ProxyRpcResponse::ApiErrorResponse {
http_code,
error_code,
message,
@ -103,4 +77,32 @@ impl Cli {
m => Err(Error::unexpected_rpc_message(m)),
}
}
pub async fn local_api_request<T>(
&self,
req: T,
) -> Result<<T as RequestHandler>::Response, Error>
where
T: RequestHandler,
MultiRequest<T>: RequestHandler<Response = MultiResponse<<T as RequestHandler>::Response>>,
AdminApiRequest: From<MultiRequest<T>>,
<MultiRequest<T> as RequestHandler>::Response: TryFrom<TaggedAdminApiResponse>,
{
let req = MultiRequest {
node: hex::encode(self.rpc_host),
body: req,
};
let resp = self.api_request(req).await?;
if let Some((_, e)) = resp.error.into_iter().next() {
return Err(Error::Message(e));
}
if resp.success.len() != 1 {
return Err(Error::Message(format!(
"{} responses returned, expected 1",
resp.success.len()
)));
}
Ok(resp.success.into_iter().next().unwrap().1)
}
}

113
src/garage/cli_v2/node.rs Normal file
View file

@ -0,0 +1,113 @@
use format_table::format_table;
use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> {
let MetaOperation::Snapshot { all } = cmd;
let res = self
.api_request(CreateMetadataSnapshotRequest {
node: if all {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalCreateMetadataSnapshotRequest,
})
.await?;
let mut table = vec![];
for (node, err) in res.error.iter() {
table.push(format!("{:.16}\tError: {}", node, err));
}
for (node, _) in res.success.iter() {
table.push(format!("{:.16}\tSnapshot created", node));
}
format_table(table);
Ok(())
}
pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> {
let res = self
.api_request(GetNodeStatisticsRequest {
node: if cmd.all_nodes {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalGetNodeStatisticsRequest,
})
.await?;
for (node, res) in res.success.iter() {
println!("======================");
println!("Stats for node {:.16}:\n", node);
println!("{}\n", res.freeform);
}
for (node, err) in res.error.iter() {
println!("======================");
println!("Node {:.16}: error: {}\n", node, err);
}
let res = self.api_request(GetClusterStatisticsRequest).await?;
println!("======================");
println!("Cluster statistics:\n");
println!("{}\n", res.freeform);
Ok(())
}
pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> {
if !cmd.yes {
return Err(Error::Message(
"Please add --yes to start the repair operation".into(),
));
}
let repair_type = match cmd.what {
RepairWhat::Tables => RepairType::Tables,
RepairWhat::Blocks => RepairType::Blocks,
RepairWhat::Versions => RepairType::Versions,
RepairWhat::MultipartUploads => RepairType::MultipartUploads,
RepairWhat::BlockRefs => RepairType::BlockRefs,
RepairWhat::BlockRc => RepairType::BlockRc,
RepairWhat::Rebalance => RepairType::Rebalance,
RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd {
ScrubCmd::Start => ScrubCommand::Start,
ScrubCmd::Cancel => ScrubCommand::Cancel,
ScrubCmd::Pause => ScrubCommand::Pause,
ScrubCmd::Resume => ScrubCommand::Resume,
}),
};
let res = self
.api_request(LaunchRepairOperationRequest {
node: if cmd.all_nodes {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalLaunchRepairOperationRequest { repair_type },
})
.await?;
let mut table = vec![];
for (node, err) in res.error.iter() {
table.push(format!("{:.16}\tError: {}", node, err));
}
for (node, _) in res.success.iter() {
table.push(format!("{:.16}\tRepair launched", node));
}
format_table(table);
Ok(())
}
}

213
src/garage/cli_v2/worker.rs Normal file
View file

@ -0,0 +1,213 @@
use format_table::format_table;
use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> {
match cmd {
WorkerOperation::List { opt } => self.cmd_list_workers(opt).await,
WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await,
WorkerOperation::Get {
all_nodes,
variable,
} => self.cmd_get_var(all_nodes, variable).await,
WorkerOperation::Set {
all_nodes,
variable,
value,
} => self.cmd_set_var(all_nodes, variable, value).await,
}
}
pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> {
let mut list = self
.local_api_request(LocalListWorkersRequest {
busy_only: opt.busy,
error_only: opt.errors,
})
.await?
.0;
list.sort_by_key(|info| {
(
match info.state {
WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0,
WorkerStateResp::Idle => 1,
WorkerStateResp::Done => 2,
},
info.id,
)
});
let mut table =
vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()];
let tf = timeago::Formatter::new();
for info in list.iter() {
let err_ago = info
.last_error
.as_ref()
.map(|x| tf.convert(Duration::from_secs(x.secs_ago)))
.unwrap_or_default();
let (total_err, consec_err) = if info.errors > 0 {
(info.errors.to_string(), info.consecutive_errors.to_string())
} else {
("-".into(), "-".into())
};
table.push(format!(
"{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}",
info.id,
format_worker_state(&info.state),
info.name,
info.tranquility
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "-".into()),
info.progress.as_deref().unwrap_or("-"),
info.queue_length
.as_ref()
.map(ToString::to_string)
.unwrap_or_else(|| "-".into()),
total_err,
consec_err,
err_ago,
));
}
format_table(table);
Ok(())
}
pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> {
let info = self
.local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 })
.await?
.0;
let mut table = vec![];
table.push(format!("Task id:\t{}", info.id));
table.push(format!("Worker name:\t{}", info.name));
match &info.state {
WorkerStateResp::Throttled { duration_secs } => {
table.push(format!(
"Worker state:\tBusy (throttled, paused for {:.3}s)",
duration_secs
));
}
s => {
table.push(format!("Worker state:\t{}", format_worker_state(s)));
}
};
if let Some(tql) = info.tranquility {
table.push(format!("Tranquility:\t{}", tql));
}
table.push("".into());
table.push(format!("Total errors:\t{}", info.errors));
table.push(format!("Consecutive errs:\t{}", info.consecutive_errors));
if let Some(err) = info.last_error {
table.push(format!("Last error:\t{}", err.message));
let tf = timeago::Formatter::new();
table.push(format!(
"Last error time:\t{}",
tf.convert(Duration::from_secs(err.secs_ago))
));
}
table.push("".into());
if let Some(p) = info.progress {
table.push(format!("Progress:\t{}", p));
}
if let Some(ql) = info.queue_length {
table.push(format!("Queue length:\t{}", ql));
}
if let Some(pe) = info.persistent_errors {
table.push(format!("Persistent errors:\t{}", pe));
}
for (i, s) in info.freeform.iter().enumerate() {
if i == 0 {
if table.last() != Some(&"".into()) {
table.push("".into());
}
table.push(format!("Message:\t{}", s));
} else {
table.push(format!("\t{}", s));
}
}
format_table(table);
Ok(())
}
pub async fn cmd_get_var(&self, all: bool, var: Option<String>) -> Result<(), Error> {
let res = self
.api_request(GetWorkerVariableRequest {
node: if all {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalGetWorkerVariableRequest { variable: var },
})
.await?;
let mut table = vec![];
for (node, vars) in res.success.iter() {
for (key, val) in vars.0.iter() {
table.push(format!("{:.16}\t{}\t{}", node, key, val));
}
}
format_table(table);
for (node, err) in res.error.iter() {
eprintln!("{:.16}: error: {}", node, err);
}
Ok(())
}
pub async fn cmd_set_var(
&self,
all: bool,
variable: String,
value: String,
) -> Result<(), Error> {
let res = self
.api_request(SetWorkerVariableRequest {
node: if all {
"*".to_string()
} else {
hex::encode(self.rpc_host)
},
body: LocalSetWorkerVariableRequest { variable, value },
})
.await?;
let mut table = vec![];
for (node, kv) in res.success.iter() {
table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value));
}
format_table(table);
for (node, err) in res.error.iter() {
eprintln!("{:.16}: error: {}", node, err);
}
Ok(())
}
}
fn format_worker_state(s: &WorkerStateResp) -> &'static str {
match s {
WorkerStateResp::Busy => "Busy",
WorkerStateResp::Throttled { .. } => "Busy*",
WorkerStateResp::Idle => "Idle",
WorkerStateResp::Done => "Done",
}
}

View file

@ -4,10 +4,8 @@
#[macro_use]
extern crate tracing;
mod admin;
mod cli;
mod cli_v2;
mod repair;
mod secrets;
mod server;
#[cfg(feature = "telemetry-otlp")]
@ -35,8 +33,9 @@ use garage_util::error::*;
use garage_rpc::system::*;
use garage_rpc::*;
use admin::*;
use cli::*;
use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH};
use cli::structs::*;
use secrets::Secrets;
#[derive(StructOpt, Debug)]
@ -144,13 +143,13 @@ async fn main() {
let res = match opt.cmd {
Command::Server => server::run_server(opt.config_file, opt.secrets).await,
Command::OfflineRepair(repair_opt) => {
repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await
cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await
}
Command::ConvertDb(conv_opt) => {
cli::convert_db::do_conversion(conv_opt).map_err(From::from)
}
Command::Node(NodeOperation::NodeId(node_id_opt)) => {
node_id_command(opt.config_file, node_id_opt.quiet)
cli::init::node_id_command(opt.config_file, node_id_opt.quiet)
}
_ => cli_command(opt).await,
};
@ -251,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
(id, addrs[0], false)
} else {
let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir)
.err_context(READ_KEY_ERROR)?;
.err_context(cli::init::READ_KEY_ERROR)?;
if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) {
use std::net::ToSocketAddrs;
let a = a
@ -281,11 +280,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> {
}
let system_rpc_endpoint = netapp.endpoint::<SystemRpc, ()>(SYSTEM_RPC_PATH.into());
let admin_rpc_endpoint = netapp.endpoint::<AdminRpc, ()>(ADMIN_RPC_PATH.into());
let proxy_rpc_endpoint = netapp.endpoint::<ProxyRpc, ()>(PROXY_RPC_PATH.into());
let cli = cli_v2::Cli {
system_rpc_endpoint,
admin_rpc_endpoint,
proxy_rpc_endpoint,
rpc_host: id,
};

View file

@ -1,2 +0,0 @@
pub mod offline;
pub mod online;

View file

@ -14,7 +14,6 @@ use garage_web::WebServer;
#[cfg(feature = "k2v")]
use garage_api_k2v::api_server::K2VApiServer;
use crate::admin::*;
use crate::secrets::{fill_secrets, Secrets};
#[cfg(feature = "telemetry-otlp")]
use crate::tracing_setup::*;
@ -66,6 +65,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Initialize Admin API server and metrics collector...");
let admin_server = AdminApiServer::new(
garage.clone(),
background.clone(),
#[cfg(feature = "metrics")]
metrics_exporter,
);
@ -73,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
info!("Launching internal Garage cluster communications...");
let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone()));
info!("Create admin RPC handler...");
AdminRpcHandler::new(garage.clone(), background.clone());
// ---- Launch public-facing API servers ----
let mut servers = vec![];

View file

@ -6,7 +6,6 @@ pub mod worker;
use std::collections::HashMap;
use std::sync::Arc;
use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch};
use worker::WorkerProcessor;
@ -18,7 +17,7 @@ pub struct BackgroundRunner {
worker_info: Arc<std::sync::Mutex<HashMap<usize, WorkerInfo>>>,
}
#[derive(Clone, Serialize, Deserialize, Debug)]
#[derive(Clone, Debug)]
pub struct WorkerInfo {
pub name: String,
pub status: WorkerStatus,
@ -30,7 +29,7 @@ pub struct WorkerInfo {
/// WorkerStatus is a struct returned by the worker with a bunch of canonical
/// fields to indicate their status to CLI users. All fields are optional.
#[derive(Clone, Serialize, Deserialize, Debug, Default)]
#[derive(Clone, Debug, Default)]
pub struct WorkerStatus {
pub tranquility: Option<u32>,
pub progress: Option<String>,

View file

@ -6,7 +6,6 @@ use async_trait::async_trait;
use futures::future::*;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use tokio::select;
use tokio::sync::{mpsc, watch};
@ -18,7 +17,7 @@ use crate::time::now_msec;
// will be interrupted in the middle of whatever they are doing.
const EXIT_DEADLINE: Duration = Duration::from_secs(8);
#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)]
#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerState {
Busy,
Throttled(f32),
@ -26,17 +25,6 @@ pub enum WorkerState {
Done,
}
impl std::fmt::Display for WorkerState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
WorkerState::Busy => write!(f, "Busy"),
WorkerState::Throttled(_) => write!(f, "Busy*"),
WorkerState::Idle => write!(f, "Idle"),
WorkerState::Done => write!(f, "Done"),
}
}
}
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;