diff --git a/Cargo.lock b/Cargo.lock index 659e2fe7..0b86147b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1258,7 +1258,6 @@ dependencies = [ "opentelemetry-otlp", "opentelemetry-prometheus", "parse_duration", - "serde", "serde_json", "sha1", "sha2", @@ -1277,9 +1276,12 @@ version = "1.0.1" dependencies = [ "argon2", "async-trait", + "bytesize", "err-derive", + "format_table", "futures", "garage_api_common", + "garage_block", "garage_model", "garage_rpc", "garage_table", diff --git a/doc/api/garage-admin-v2.yml b/doc/api/garage-admin-v2.yml index 725c1d01..f9e3c10c 100644 --- a/doc/api/garage-admin-v2.yml +++ b/doc/api/garage-admin-v2.yml @@ -826,6 +826,46 @@ paths: schema: $ref: '#/components/schemas/BucketInfo' + /CleanupIncompleteUploads: + post: + tags: + - Bucket + operationId: "CleanupIncompleteUploads" + summary: "Cleanup incomplete uploads in a bucket" + description: | + Cleanup all incomplete uploads in a bucket that are older than a specified number of seconds + requestBody: + description: | + Bucket id and minimum age of uploads to delete (in seconds) + required: true + content: + application/json: + schema: + type: object + required: [bucketId, olderThanSecs] + properties: + bucketId: + type: string + example: "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b" + olderThanSecs: + type: integer + example: "3600" + responses: + '500': + description: "The server can not handle your request. Check your connectivity with the rest of the cluster." + '400': + description: "The payload is not formatted correctly" + '200': + description: "The bucket was cleaned up successfully" + content: + application/json: + schema: + type: object + properties: + uploadsDeleted: + type: integer + example: 12 + /AllowBucketKey: post: tags: diff --git a/doc/drafts/admin-api.md b/doc/drafts/admin-api.md index eb327307..029c7ddd 100644 --- a/doc/drafts/admin-api.md +++ b/doc/drafts/admin-api.md @@ -702,6 +702,28 @@ Deletes a storage bucket. A bucket cannot be deleted if it is not empty. Warning: this will delete all aliases associated with the bucket! +#### CleanupIncompleteUploads `POST /v2/CleanupIncompleteUploads` + +Cleanup all incomplete uploads in a bucket that are older than a specified number +of seconds. + +Request body format: + +```json +{ + "bucketId": "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b", + "olderThanSecs": 3600 +} +``` + +Response format + +```json +{ + "uploadsDeleted": 12 +} +``` + ### Operations on permissions for keys on buckets diff --git a/src/api/admin/Cargo.toml b/src/api/admin/Cargo.toml index 94a321a6..9ac099e8 100644 --- a/src/api/admin/Cargo.toml +++ b/src/api/admin/Cargo.toml @@ -14,7 +14,9 @@ path = "lib.rs" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +format_table.workspace = true garage_model.workspace = true +garage_block.workspace = true garage_table.workspace = true garage_util.workspace = true garage_rpc.workspace = true @@ -22,6 +24,7 @@ garage_api_common.workspace = true argon2.workspace = true async-trait.workspace = true +bytesize.workspace = true err-derive.workspace = true hex.workspace = true paste.workspace = true diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index 99832564..97cde158 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -1,18 +1,22 @@ +use std::collections::HashMap; use std::convert::TryFrom; use std::net::SocketAddr; use std::sync::Arc; -use async_trait::async_trait; use paste::paste; use serde::{Deserialize, Serialize}; +use garage_rpc::*; + use garage_model::garage::Garage; +use garage_api_common::common_error::CommonErrorDerivative; use garage_api_common::helpers::is_default; +use crate::api_server::{AdminRpc, AdminRpcResponse}; use crate::error::Error; use crate::macros::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; // This generates the following: // @@ -62,6 +66,7 @@ admin_endpoints![ CreateBucket, UpdateBucket, DeleteBucket, + CleanupIncompleteUploads, // Operations on permissions for keys on buckets AllowBucketKey, @@ -70,8 +75,55 @@ admin_endpoints![ // Operations on bucket aliases AddBucketAlias, RemoveBucketAlias, + + // Node operations + CreateMetadataSnapshot, + GetNodeStatistics, + GetClusterStatistics, + LaunchRepairOperation, + + // Worker operations + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, + + // Block operations + ListBlockErrors, + GetBlockInfo, + RetryBlockResync, + PurgeBlocks, ]; +local_admin_endpoints![ + // Node operations + CreateMetadataSnapshot, + GetNodeStatistics, + LaunchRepairOperation, + // Background workers + ListWorkers, + GetWorkerInfo, + GetWorkerVariable, + SetWorkerVariable, + // Block operations + ListBlockErrors, + GetBlockInfo, + RetryBlockResync, + PurgeBlocks, +]; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiRequest { + pub node: String, + pub body: RB, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct MultiResponse { + pub success: HashMap, + pub error: HashMap, +} + // ********************************************** // Special endpoints // @@ -497,6 +549,19 @@ pub struct DeleteBucketRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct DeleteBucketResponse; +// ---- CleanupIncompleteUploads ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CleanupIncompleteUploadsRequest { + pub bucket_id: String, + pub older_than_secs: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct CleanupIncompleteUploadsResponse { + pub uploads_deleted: u64, +} + // ********************************************** // Operations on permissions for keys on buckets // ********************************************** @@ -566,3 +631,246 @@ pub struct RemoveBucketAliasRequest { #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse); + +// ********************************************** +// Node operations +// ********************************************** + +// ---- CreateMetadataSnapshot ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalCreateMetadataSnapshotRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalCreateMetadataSnapshotResponse; + +// ---- GetNodeStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalGetNodeStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetNodeStatisticsResponse { + pub freeform: String, +} + +// ---- GetClusterStatistics ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct GetClusterStatisticsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct GetClusterStatisticsResponse { + pub freeform: String, +} + +// ---- LaunchRepairOperation ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationRequest { + pub repair_type: RepairType, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum RepairType { + Tables, + Blocks, + Versions, + MultipartUploads, + BlockRefs, + BlockRc, + Rebalance, + Scrub(ScrubCommand), +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum ScrubCommand { + Start, + Pause, + Resume, + Cancel, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalLaunchRepairOperationResponse; + +// ********************************************** +// Worker operations +// ********************************************** + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] +pub struct LocalListWorkersRequest { + #[serde(default)] + pub busy_only: bool, + #[serde(default)] + pub error_only: bool, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListWorkersResponse(pub Vec); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerInfoResp { + pub id: u64, + pub name: String, + pub state: WorkerStateResp, + pub errors: u64, + pub consecutive_errors: u64, + pub last_error: Option, + pub tranquility: Option, + pub progress: Option, + pub queue_length: Option, + pub persistent_errors: Option, + pub freeform: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum WorkerStateResp { + Busy, + Throttled { duration_secs: f32 }, + Idle, + Done, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct WorkerLastError { + pub message: String, + pub secs_ago: u64, +} + +// ---- GetWorkerList ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoRequest { + pub id: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerInfoResponse(pub WorkerInfoResp); + +// ---- GetWorkerVariable ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerVariableRequest { + pub variable: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalGetWorkerVariableResponse(pub HashMap); + +// ---- SetWorkerVariable ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalSetWorkerVariableRequest { + pub variable: String, + pub value: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalSetWorkerVariableResponse { + pub variable: String, + pub value: String, +} + +// ********************************************** +// Block operations +// ********************************************** + +// ---- ListBlockErrors ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalListBlockErrorsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListBlockErrorsResponse(pub Vec); + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BlockError { + pub block_hash: String, + pub refcount: u64, + pub error_count: u64, + pub last_try_secs_ago: u64, + pub next_try_in_secs: u64, +} + +// ---- GetBlockInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoRequest { + pub block_hash: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoResponse { + pub block_hash: String, + pub refcount: u64, + pub versions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockVersion { + pub version_id: String, + pub deleted: bool, + pub garbage_collected: bool, + pub backlink: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BlockVersionBacklink { + Object { + bucket_id: String, + key: String, + }, + Upload { + upload_id: String, + upload_deleted: bool, + upload_garbage_collected: bool, + bucket_id: Option, + key: Option, + }, +} + +// ---- RetryBlockResync ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(untagged)] +pub enum LocalRetryBlockResyncRequest { + #[serde(rename_all = "camelCase")] + All { all: bool }, + #[serde(rename_all = "camelCase")] + Blocks { block_hashes: Vec }, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalRetryBlockResyncResponse { + pub count: u64, +} + +// ---- PurgeBlocks ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalPurgeBlocksRequest(pub Vec); + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalPurgeBlocksResponse { + pub blocks_purged: u64, + pub objects_deleted: u64, + pub uploads_deleted: u64, + pub versions_deleted: u64, +} diff --git a/src/api/admin/api_server.rs b/src/api/admin/api_server.rs index be29e617..1ab81be3 100644 --- a/src/api/admin/api_server.rs +++ b/src/api/admin/api_server.rs @@ -5,17 +5,18 @@ use argon2::password_hash::PasswordHash; use async_trait::async_trait; use http::header::{HeaderValue, ACCESS_CONTROL_ALLOW_ORIGIN, AUTHORIZATION}; -use hyper::{body::Incoming as IncomingBody, Request, Response, StatusCode}; +use hyper::{body::Incoming as IncomingBody, Request, Response}; +use serde::{Deserialize, Serialize}; use tokio::sync::watch; use opentelemetry::trace::SpanRef; #[cfg(feature = "metrics")] use opentelemetry_prometheus::PrometheusExporter; -#[cfg(feature = "metrics")] -use prometheus::{Encoder, TextEncoder}; use garage_model::garage::Garage; +use garage_rpc::{Endpoint as RpcEndpoint, *}; +use garage_util::background::BackgroundRunner; use garage_util::error::Error as GarageError; use garage_util::socket_address::UnixOrTCPSocketAddress; @@ -27,19 +28,84 @@ use crate::error::*; use crate::router_v0; use crate::router_v1; use crate::Authorization; -use crate::EndpointHandler; +use crate::RequestHandler; + +// ---- FOR RPC ---- + +pub const ADMIN_RPC_PATH: &str = "garage_api/admin/rpc.rs/Rpc"; + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpc { + Proxy(AdminApiRequest), + Internal(LocalAdminApiRequest), +} + +#[derive(Debug, Serialize, Deserialize)] +pub enum AdminRpcResponse { + ProxyApiOkResponse(TaggedAdminApiResponse), + InternalApiOkResponse(LocalAdminApiResponse), + ApiErrorResponse { + http_code: u16, + error_code: String, + message: String, + }, +} + +impl Rpc for AdminRpc { + type Response = Result; +} + +#[async_trait] +impl EndpointHandler for AdminApiServer { + async fn handle( + self: &Arc, + message: &AdminRpc, + _from: NodeID, + ) -> Result { + match message { + AdminRpc::Proxy(req) => { + info!("Proxied admin API request: {}", req.name()); + let res = req.clone().handle(&self.garage, &self).await; + match res { + Ok(res) => Ok(AdminRpcResponse::ProxyApiOkResponse(res.tagged())), + Err(e) => Ok(AdminRpcResponse::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } + AdminRpc::Internal(req) => { + info!("Internal admin API request: {}", req.name()); + let res = req.clone().handle(&self.garage, &self).await; + match res { + Ok(res) => Ok(AdminRpcResponse::InternalApiOkResponse(res)), + Err(e) => Ok(AdminRpcResponse::ApiErrorResponse { + http_code: e.http_status_code().as_u16(), + error_code: e.code().to_string(), + message: e.to_string(), + }), + } + } + } + } +} + +// ---- FOR HTTP ---- pub type ResBody = BoxBody; pub struct AdminApiServer { garage: Arc, #[cfg(feature = "metrics")] - exporter: PrometheusExporter, + pub(crate) exporter: PrometheusExporter, metrics_token: Option, admin_token: Option, + pub(crate) background: Arc, + pub(crate) endpoint: Arc>, } -pub enum Endpoint { +pub enum HttpEndpoint { Old(router_v1::Endpoint), New(String), } @@ -47,91 +113,48 @@ pub enum Endpoint { impl AdminApiServer { pub fn new( garage: Arc, + background: Arc, #[cfg(feature = "metrics")] exporter: PrometheusExporter, - ) -> Self { + ) -> Arc { let cfg = &garage.config.admin; let metrics_token = cfg.metrics_token.as_deref().map(hash_bearer_token); let admin_token = cfg.admin_token.as_deref().map(hash_bearer_token); - Self { + + let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); + let admin = Arc::new(Self { garage, #[cfg(feature = "metrics")] exporter, metrics_token, admin_token, - } + background, + endpoint, + }); + admin.endpoint.set_handler(admin.clone()); + admin } pub async fn run( - self, + self: Arc, bind_addr: UnixOrTCPSocketAddress, must_exit: watch::Receiver, ) -> Result<(), GarageError> { let region = self.garage.config.s3_api.s3_region.clone(); - ApiServer::new(region, self) + ApiServer::new(region, ArcAdminApiServer(self)) .run_server(bind_addr, Some(0o220), must_exit) .await } - fn handle_metrics(&self) -> Result, Error> { - #[cfg(feature = "metrics")] - { - use opentelemetry::trace::Tracer; - - let mut buffer = vec![]; - let encoder = TextEncoder::new(); - - let tracer = opentelemetry::global::tracer("garage"); - let metric_families = tracer.in_span("admin/gather_metrics", |_| { - self.exporter.registry().gather() - }); - - encoder - .encode(&metric_families, &mut buffer) - .ok_or_internal_error("Could not serialize metrics")?; - - Ok(Response::builder() - .status(StatusCode::OK) - .header(http::header::CONTENT_TYPE, encoder.format_type()) - .body(bytes_body(buffer.into()))?) - } - #[cfg(not(feature = "metrics"))] - Err(Error::bad_request( - "Garage was built without the metrics feature".to_string(), - )) - } -} - -#[async_trait] -impl ApiHandler for AdminApiServer { - const API_NAME: &'static str = "admin"; - const API_NAME_DISPLAY: &'static str = "Admin"; - - type Endpoint = Endpoint; - type Error = Error; - - fn parse_endpoint(&self, req: &Request) -> Result { - if req.uri().path().starts_with("/v0/") { - let endpoint_v0 = router_v0::Endpoint::from_request(req)?; - let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?; - Ok(Endpoint::Old(endpoint_v1)) - } else if req.uri().path().starts_with("/v1/") { - let endpoint_v1 = router_v1::Endpoint::from_request(req)?; - Ok(Endpoint::Old(endpoint_v1)) - } else { - Ok(Endpoint::New(req.uri().path().to_string())) - } - } - - async fn handle( + async fn handle_http_api( &self, req: Request, - endpoint: Endpoint, + endpoint: HttpEndpoint, ) -> Result, Error> { let auth_header = req.headers().get(AUTHORIZATION).cloned(); let request = match endpoint { - Endpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?, - Endpoint::New(_) => AdminApiRequest::from_request(req).await?, + HttpEndpoint::Old(endpoint_v1) => AdminApiRequest::from_v1(endpoint_v1, req).await?, + HttpEndpoint::New(_) => AdminApiRequest::from_request(req).await?, }; let required_auth_hash = @@ -156,12 +179,12 @@ impl ApiHandler for AdminApiServer { } match request { - AdminApiRequest::Options(req) => req.handle(&self.garage).await, - AdminApiRequest::CheckDomain(req) => req.handle(&self.garage).await, - AdminApiRequest::Health(req) => req.handle(&self.garage).await, - AdminApiRequest::Metrics(_req) => self.handle_metrics(), + AdminApiRequest::Options(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::CheckDomain(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::Health(req) => req.handle(&self.garage, &self).await, + AdminApiRequest::Metrics(req) => req.handle(&self.garage, &self).await, req => { - let res = req.handle(&self.garage).await?; + let res = req.handle(&self.garage, &self).await?; let mut res = json_ok_response(&res)?; res.headers_mut() .insert(ACCESS_CONTROL_ALLOW_ORIGIN, HeaderValue::from_static("*")); @@ -171,7 +194,39 @@ impl ApiHandler for AdminApiServer { } } -impl ApiEndpoint for Endpoint { +struct ArcAdminApiServer(Arc); + +#[async_trait] +impl ApiHandler for ArcAdminApiServer { + const API_NAME: &'static str = "admin"; + const API_NAME_DISPLAY: &'static str = "Admin"; + + type Endpoint = HttpEndpoint; + type Error = Error; + + fn parse_endpoint(&self, req: &Request) -> Result { + if req.uri().path().starts_with("/v0/") { + let endpoint_v0 = router_v0::Endpoint::from_request(req)?; + let endpoint_v1 = router_v1::Endpoint::from_v0(endpoint_v0)?; + Ok(HttpEndpoint::Old(endpoint_v1)) + } else if req.uri().path().starts_with("/v1/") { + let endpoint_v1 = router_v1::Endpoint::from_request(req)?; + Ok(HttpEndpoint::Old(endpoint_v1)) + } else { + Ok(HttpEndpoint::New(req.uri().path().to_string())) + } + } + + async fn handle( + &self, + req: Request, + endpoint: HttpEndpoint, + ) -> Result, Error> { + self.0.handle_http_api(req, endpoint).await + } +} + +impl ApiEndpoint for HttpEndpoint { fn name(&self) -> Cow<'static, str> { match self { Self::Old(endpoint_v1) => Cow::Borrowed(endpoint_v1.name()), diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs new file mode 100644 index 00000000..73d186a6 --- /dev/null +++ b/src/api/admin/block.rs @@ -0,0 +1,274 @@ +use std::sync::Arc; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::time::now_msec; + +use garage_table::EmptyKey; + +use garage_model::garage::Garage; +use garage_model::s3::object_table::*; +use garage_model::s3::version_table::*; + +use garage_api_common::common_error::CommonErrorDerivative; + +use crate::api::*; +use crate::error::*; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalListBlockErrorsRequest { + type Response = LocalListBlockErrorsResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let errors = garage.block_manager.list_resync_errors()?; + let now = now_msec(); + let errors = errors + .into_iter() + .map(|e| BlockError { + block_hash: hex::encode(&e.hash), + refcount: e.refcount, + error_count: e.error_count, + last_try_secs_ago: now.saturating_sub(e.last_try) / 1000, + next_try_in_secs: e.next_try.saturating_sub(now) / 1000, + }) + .collect(); + Ok(LocalListBlockErrorsResponse(errors)) + } +} + +impl RequestHandler for LocalGetBlockInfoRequest { + type Response = LocalGetBlockInfoResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let hash = find_block_hash_by_prefix(garage, &self.block_hash)?; + let refcount = garage.block_manager.get_block_rc(&hash)?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? { + let bl = match &v.backlink { + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: u.deleted.get(), + upload_garbage_collected: false, + bucket_id: Some(hex::encode(&u.bucket_id)), + key: Some(u.key.to_string()), + } + } else { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: true, + upload_garbage_collected: true, + bucket_id: None, + key: None, + } + } + } + VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object { + bucket_id: hex::encode(&bucket_id), + key: key.to_string(), + }, + }; + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: v.deleted.get(), + garbage_collected: false, + backlink: Some(bl), + }); + } else { + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: true, + garbage_collected: true, + backlink: None, + }); + } + } + Ok(LocalGetBlockInfoResponse { + block_hash: hex::encode(&hash), + refcount, + versions, + }) + } +} + +impl RequestHandler for LocalRetryBlockResyncRequest { + type Response = LocalRetryBlockResyncResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + match self { + Self::All { all: true } => { + let blocks = garage.block_manager.list_resync_errors()?; + for b in blocks.iter() { + garage.block_manager.resync.clear_backoff(&b.hash)?; + } + Ok(LocalRetryBlockResyncResponse { + count: blocks.len() as u64, + }) + } + Self::All { all: false } => Err(Error::bad_request("nonsense")), + Self::Blocks { block_hashes } => { + for hash in block_hashes.iter() { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + garage.block_manager.resync.clear_backoff(&hash)?; + } + Ok(LocalRetryBlockResyncResponse { + count: block_hashes.len() as u64, + }) + } + } + } +} + +impl RequestHandler for LocalPurgeBlocksRequest { + type Response = LocalPurgeBlocksResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let mut obj_dels = 0; + let mut mpu_dels = 0; + let mut ver_dels = 0; + + for hash in self.0.iter() { + let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; + let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + + for br in block_refs { + if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? { + handle_block_purge_version_backlink( + garage, + &version, + &mut obj_dels, + &mut mpu_dels, + ) + .await?; + + if !version.deleted.get() { + let deleted_version = Version::new(version.uuid, version.backlink, true); + garage.version_table.insert(&deleted_version).await?; + ver_dels += 1; + } + } + } + } + + Ok(LocalPurgeBlocksResponse { + blocks_purged: self.0.len() as u64, + versions_deleted: ver_dels, + objects_deleted: obj_dels, + uploads_deleted: mpu_dels, + }) + } +} + +fn find_block_hash_by_prefix(garage: &Arc, prefix: &str) -> Result { + if prefix.len() < 4 { + return Err(Error::bad_request( + "Please specify at least 4 characters of the block hash", + )); + } + + let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; + + let iter = garage + .block_ref_table + .data + .store + .range(&prefix_bin[..]..) + .map_err(GarageError::from)?; + let mut found = None; + for item in iter { + let (k, _v) = item.map_err(GarageError::from)?; + let hash = Hash::try_from(&k[..32]).unwrap(); + if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { + break; + } + if hex::encode(hash.as_slice()).starts_with(prefix) { + match &found { + Some(x) if *x == hash => (), + Some(_) => { + return Err(Error::bad_request(format!( + "Several blocks match prefix `{}`", + prefix + ))); + } + None => { + found = Some(hash); + } + } + } + } + + found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string())) +} + +async fn handle_block_purge_version_backlink( + garage: &Arc, + version: &Version, + obj_dels: &mut u64, + mpu_dels: &mut u64, +) -> Result<(), Error> { + let (bucket_id, key, ov_id) = match &version.backlink { + VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + if !mpu.deleted.get() { + mpu.parts.clear(); + mpu.deleted.set(); + garage.mpu_table.insert(&mpu).await?; + *mpu_dels += 1; + } + (mpu.bucket_id, mpu.key.clone(), *upload_id) + } else { + return Ok(()); + } + } + }; + + if let Some(object) = garage.object_table.get(&bucket_id, &key).await? { + let ov = object.versions().iter().rev().find(|v| v.is_complete()); + if let Some(ov) = ov { + if ov.uuid == ov_id { + let del_uuid = gen_uuid(); + let deleted_object = Object::new( + bucket_id, + key, + vec![ObjectVersion { + uuid: del_uuid, + timestamp: ov.timestamp + 1, + state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), + }], + ); + garage.object_table.insert(&deleted_object).await?; + *obj_dels += 1; + } + } + } + + Ok(()) +} diff --git a/src/api/admin/bucket.rs b/src/api/admin/bucket.rs index 123956ca..d2bb62e0 100644 --- a/src/api/admin/bucket.rs +++ b/src/api/admin/bucket.rs @@ -1,7 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; - -use async_trait::async_trait; +use std::time::Duration; use garage_util::crdt::*; use garage_util::data::*; @@ -20,13 +19,16 @@ use garage_api_common::common_error::CommonError; use crate::api::*; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for ListBucketsRequest { +impl RequestHandler for ListBucketsRequest { type Response = ListBucketsResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let buckets = garage .bucket_table .get_range( @@ -69,11 +71,14 @@ impl EndpointHandler for ListBucketsRequest { } } -#[async_trait] -impl EndpointHandler for GetBucketInfoRequest { +impl RequestHandler for GetBucketInfoRequest { type Response = GetBucketInfoResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let bucket_id = match (self.id, self.global_alias, self.search) { (Some(id), None, None) => parse_bucket_id(&id)?, (None, Some(ga), None) => garage @@ -221,11 +226,14 @@ async fn bucket_info_results( Ok(res) } -#[async_trait] -impl EndpointHandler for CreateBucketRequest { +impl RequestHandler for CreateBucketRequest { type Response = CreateBucketResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let helper = garage.locked_helper().await; if let Some(ga) = &self.global_alias { @@ -292,11 +300,14 @@ impl EndpointHandler for CreateBucketRequest { } } -#[async_trait] -impl EndpointHandler for DeleteBucketRequest { +impl RequestHandler for DeleteBucketRequest { type Response = DeleteBucketResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let helper = garage.locked_helper().await; let bucket_id = parse_bucket_id(&self.id)?; @@ -341,11 +352,14 @@ impl EndpointHandler for DeleteBucketRequest { } } -#[async_trait] -impl EndpointHandler for UpdateBucketRequest { +impl RequestHandler for UpdateBucketRequest { type Response = UpdateBucketResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let bucket_id = parse_bucket_id(&self.id)?; let mut bucket = garage @@ -388,23 +402,52 @@ impl EndpointHandler for UpdateBucketRequest { } } +impl RequestHandler for CleanupIncompleteUploadsRequest { + type Response = CleanupIncompleteUploadsResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let duration = Duration::from_secs(self.older_than_secs); + + let bucket_id = parse_bucket_id(&self.bucket_id)?; + + let count = garage + .bucket_helper() + .cleanup_incomplete_uploads(&bucket_id, duration) + .await?; + + Ok(CleanupIncompleteUploadsResponse { + uploads_deleted: count as u64, + }) + } +} + // ---- BUCKET/KEY PERMISSIONS ---- -#[async_trait] -impl EndpointHandler for AllowBucketKeyRequest { +impl RequestHandler for AllowBucketKeyRequest { type Response = AllowBucketKeyResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let res = handle_bucket_change_key_perm(garage, self.0, true).await?; Ok(AllowBucketKeyResponse(res)) } } -#[async_trait] -impl EndpointHandler for DenyBucketKeyRequest { +impl RequestHandler for DenyBucketKeyRequest { type Response = DenyBucketKeyResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let res = handle_bucket_change_key_perm(garage, self.0, false).await?; Ok(DenyBucketKeyResponse(res)) } @@ -449,11 +492,14 @@ pub async fn handle_bucket_change_key_perm( // ---- BUCKET ALIASES ---- -#[async_trait] -impl EndpointHandler for AddBucketAliasRequest { +impl RequestHandler for AddBucketAliasRequest { type Response = AddBucketAliasResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let bucket_id = parse_bucket_id(&self.bucket_id)?; let helper = garage.locked_helper().await; @@ -480,11 +526,14 @@ impl EndpointHandler for AddBucketAliasRequest { } } -#[async_trait] -impl EndpointHandler for RemoveBucketAliasRequest { +impl RequestHandler for RemoveBucketAliasRequest { type Response = RemoveBucketAliasResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let bucket_id = parse_bucket_id(&self.bucket_id)?; let helper = garage.locked_helper().await; diff --git a/src/api/admin/cluster.rs b/src/api/admin/cluster.rs index dc16bd50..cb1fa493 100644 --- a/src/api/admin/cluster.rs +++ b/src/api/admin/cluster.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; - use garage_util::crdt::*; use garage_util::data::*; @@ -12,13 +10,16 @@ use garage_model::garage::Garage; use crate::api::*; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for GetClusterStatusRequest { +impl RequestHandler for GetClusterStatusRequest { type Response = GetClusterStatusResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let layout = garage.system.cluster_layout(); let mut nodes = garage .system @@ -116,11 +117,14 @@ impl EndpointHandler for GetClusterStatusRequest { } } -#[async_trait] -impl EndpointHandler for GetClusterHealthRequest { +impl RequestHandler for GetClusterHealthRequest { type Response = GetClusterHealthResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { use garage_rpc::system::ClusterHealthStatus; let health = garage.system.health(); let health = GetClusterHealthResponse { @@ -142,11 +146,14 @@ impl EndpointHandler for GetClusterHealthRequest { } } -#[async_trait] -impl EndpointHandler for ConnectClusterNodesRequest { +impl RequestHandler for ConnectClusterNodesRequest { type Response = ConnectClusterNodesResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let res = futures::future::join_all(self.0.iter().map(|node| garage.system.connect(node))) .await .into_iter() @@ -165,11 +172,14 @@ impl EndpointHandler for ConnectClusterNodesRequest { } } -#[async_trait] -impl EndpointHandler for GetClusterLayoutRequest { +impl RequestHandler for GetClusterLayoutRequest { type Response = GetClusterLayoutResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { Ok(format_cluster_layout( garage.system.cluster_layout().inner(), )) @@ -225,11 +235,14 @@ fn format_cluster_layout(layout: &layout::LayoutHistory) -> GetClusterLayoutResp // ---- update functions ---- -#[async_trait] -impl EndpointHandler for UpdateClusterLayoutRequest { +impl RequestHandler for UpdateClusterLayoutRequest { type Response = UpdateClusterLayoutResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let mut layout = garage.system.cluster_layout().inner().clone(); let mut roles = layout.current().roles.clone(); @@ -271,11 +284,14 @@ impl EndpointHandler for UpdateClusterLayoutRequest { } } -#[async_trait] -impl EndpointHandler for ApplyClusterLayoutRequest { +impl RequestHandler for ApplyClusterLayoutRequest { type Response = ApplyClusterLayoutResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let layout = garage.system.cluster_layout().inner().clone(); let (layout, msg) = layout.apply_staged_changes(Some(self.version))?; @@ -292,11 +308,14 @@ impl EndpointHandler for ApplyClusterLayoutRequest { } } -#[async_trait] -impl EndpointHandler for RevertClusterLayoutRequest { +impl RequestHandler for RevertClusterLayoutRequest { type Response = RevertClusterLayoutResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let layout = garage.system.cluster_layout().inner().clone(); let layout = layout.revert_staged_changes()?; garage diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 3712ee7d..d7ea7dc9 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -25,6 +25,14 @@ pub enum Error { #[error(display = "Access key not found: {}", _0)] NoSuchAccessKey(String), + /// The requested block does not exist + #[error(display = "Block not found: {}", _0)] + NoSuchBlock(String), + + /// The requested worker does not exist + #[error(display = "Worker not found: {}", _0)] + NoSuchWorker(u64), + /// In Import key, the key already exists #[error( display = "Key {} already exists in data store. Even if it is deleted, we can't let you create a new key with the same ID. Sorry.", @@ -53,6 +61,8 @@ impl Error { match self { Error::Common(c) => c.aws_code(), Error::NoSuchAccessKey(_) => "NoSuchAccessKey", + Error::NoSuchWorker(_) => "NoSuchWorker", + Error::NoSuchBlock(_) => "NoSuchBlock", Error::KeyAlreadyExists(_) => "KeyAlreadyExists", } } @@ -63,7 +73,9 @@ impl ApiError for Error { fn http_status_code(&self) -> StatusCode { match self { Error::Common(c) => c.http_status_code(), - Error::NoSuchAccessKey(_) => StatusCode::NOT_FOUND, + Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => { + StatusCode::NOT_FOUND + } Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, } } diff --git a/src/api/admin/key.rs b/src/api/admin/key.rs index 5b7de075..dc6ae4e9 100644 --- a/src/api/admin/key.rs +++ b/src/api/admin/key.rs @@ -1,8 +1,6 @@ use std::collections::HashMap; use std::sync::Arc; -use async_trait::async_trait; - use garage_table::*; use garage_model::garage::Garage; @@ -10,13 +8,12 @@ use garage_model::key_table::*; use crate::api::*; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for ListKeysRequest { +impl RequestHandler for ListKeysRequest { type Response = ListKeysResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle(self, garage: &Arc, _admin: &Admin) -> Result { let res = garage .key_table .get_range( @@ -38,11 +35,14 @@ impl EndpointHandler for ListKeysRequest { } } -#[async_trait] -impl EndpointHandler for GetKeyInfoRequest { +impl RequestHandler for GetKeyInfoRequest { type Response = GetKeyInfoResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let key = match (self.id, self.search) { (Some(id), None) => garage.key_helper().get_existing_key(&id).await?, (None, Some(search)) => { @@ -62,11 +62,14 @@ impl EndpointHandler for GetKeyInfoRequest { } } -#[async_trait] -impl EndpointHandler for CreateKeyRequest { +impl RequestHandler for CreateKeyRequest { type Response = CreateKeyResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let key = Key::new(self.name.as_deref().unwrap_or("Unnamed key")); garage.key_table.insert(&key).await?; @@ -76,11 +79,14 @@ impl EndpointHandler for CreateKeyRequest { } } -#[async_trait] -impl EndpointHandler for ImportKeyRequest { +impl RequestHandler for ImportKeyRequest { type Response = ImportKeyResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let prev_key = garage.key_table.get(&EmptyKey, &self.access_key_id).await?; if prev_key.is_some() { return Err(Error::KeyAlreadyExists(self.access_key_id.to_string())); @@ -100,11 +106,14 @@ impl EndpointHandler for ImportKeyRequest { } } -#[async_trait] -impl EndpointHandler for UpdateKeyRequest { +impl RequestHandler for UpdateKeyRequest { type Response = UpdateKeyResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let mut key = garage.key_helper().get_existing_key(&self.id).await?; let key_state = key.state.as_option_mut().unwrap(); @@ -131,11 +140,14 @@ impl EndpointHandler for UpdateKeyRequest { } } -#[async_trait] -impl EndpointHandler for DeleteKeyRequest { +impl RequestHandler for DeleteKeyRequest { type Response = DeleteKeyResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { let helper = garage.locked_helper().await; let mut key = helper.key().get_existing_key(&self.id).await?; diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index 31b3874d..dd9b7ffd 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -15,21 +15,29 @@ mod cluster; mod key; mod special; +mod block; +mod node; +mod repair; +mod worker; + use std::sync::Arc; -use async_trait::async_trait; - use garage_model::garage::Garage; +pub use api_server::AdminApiServer as Admin; + pub enum Authorization { None, MetricsToken, AdminToken, } -#[async_trait] -pub trait EndpointHandler { +pub trait RequestHandler { type Response; - async fn handle(self, garage: &Arc) -> Result; + fn handle( + self, + garage: &Arc, + admin: &Admin, + ) -> impl std::future::Future> + Send; } diff --git a/src/api/admin/macros.rs b/src/api/admin/macros.rs index 9521616e..df2762fe 100644 --- a/src/api/admin/macros.rs +++ b/src/api/admin/macros.rs @@ -70,11 +70,10 @@ macro_rules! admin_endpoints { } )* - #[async_trait] - impl EndpointHandler for AdminApiRequest { + impl RequestHandler for AdminApiRequest { type Response = AdminApiResponse; - async fn handle(self, garage: &Arc) -> Result { + async fn handle(self, garage: &Arc, admin: &Admin) -> Result { Ok(match self { $( AdminApiRequest::$special_endpoint(_) => panic!( @@ -82,7 +81,132 @@ macro_rules! admin_endpoints { ), )* $( - AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage).await?), + AdminApiRequest::$endpoint(req) => AdminApiResponse::$endpoint(req.handle(garage, admin).await?), + )* + }) + } + } + } + }; +} + +macro_rules! local_admin_endpoints { + [ + $($endpoint:ident,)* + ] => { + paste! { + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum LocalAdminApiRequest { + $( + $endpoint( [] ), + )* + } + + #[derive(Debug, Clone, Serialize, Deserialize)] + pub enum LocalAdminApiResponse { + $( + $endpoint( [] ), + )* + } + + $( + pub type [< $endpoint Request >] = MultiRequest< [< Local $endpoint Request >] >; + + pub type [< $endpoint RequestBody >] = [< Local $endpoint Request >]; + + pub type [< $endpoint Response >] = MultiResponse< [< Local $endpoint Response >] >; + + impl From< [< Local $endpoint Request >] > for LocalAdminApiRequest { + fn from(req: [< Local $endpoint Request >]) -> LocalAdminApiRequest { + LocalAdminApiRequest::$endpoint(req) + } + } + + impl TryFrom for [< Local $endpoint Response >] { + type Error = LocalAdminApiResponse; + fn try_from(resp: LocalAdminApiResponse) -> Result< [< Local $endpoint Response >], LocalAdminApiResponse> { + match resp { + LocalAdminApiResponse::$endpoint(v) => Ok(v), + x => Err(x), + } + } + } + + impl RequestHandler for [< $endpoint Request >] { + type Response = [< $endpoint Response >]; + + async fn handle(self, garage: &Arc, admin: &Admin) -> Result { + let to = match self.node.as_str() { + "*" => garage.system.cluster_layout().all_nodes().to_vec(), + id => { + let nodes = garage.system.cluster_layout().all_nodes() + .iter() + .filter(|x| hex::encode(x).starts_with(id)) + .cloned() + .collect::>(); + if nodes.len() != 1 { + return Err(Error::bad_request(format!("Zero or multiple nodes matching {}: {:?}", id, nodes))); + } + nodes + } + }; + + let resps = garage.system.rpc_helper().call_many(&admin.endpoint, + &to, + AdminRpc::Internal(self.body.into()), + RequestStrategy::with_priority(PRIO_NORMAL), + ).await?; + + let mut ret = [< $endpoint Response >] { + success: HashMap::new(), + error: HashMap::new(), + }; + for (node, resp) in resps { + match resp { + Ok(AdminRpcResponse::InternalApiOkResponse(r)) => { + match [< Local $endpoint Response >]::try_from(r) { + Ok(r) => { + ret.success.insert(hex::encode(node), r); + } + Err(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + } + } + Ok(AdminRpcResponse::ApiErrorResponse{error_code, http_code, message}) => { + ret.error.insert(hex::encode(node), format!("{} ({}): {}", error_code, http_code, message)); + } + Ok(_) => { + ret.error.insert(hex::encode(node), "returned invalid value".to_string()); + } + Err(e) => { + ret.error.insert(hex::encode(node), e.to_string()); + } + } + } + + Ok(ret) + } + } + )* + + impl LocalAdminApiRequest { + pub fn name(&self) -> &'static str { + match self { + $( + Self::$endpoint(_) => stringify!($endpoint), + )* + } + } + } + + impl RequestHandler for LocalAdminApiRequest { + type Response = LocalAdminApiResponse; + + async fn handle(self, garage: &Arc, admin: &Admin) -> Result { + Ok(match self { + $( + LocalAdminApiRequest::$endpoint(req) => LocalAdminApiResponse::$endpoint(req.handle(garage, admin).await?), )* }) } @@ -92,3 +216,4 @@ macro_rules! admin_endpoints { } pub(crate) use admin_endpoints; +pub(crate) use local_admin_endpoints; diff --git a/src/api/admin/node.rs b/src/api/admin/node.rs new file mode 100644 index 00000000..f6f43d95 --- /dev/null +++ b/src/api/admin/node.rs @@ -0,0 +1,216 @@ +use std::collections::HashMap; +use std::fmt::Write; +use std::sync::Arc; + +use format_table::format_table_to_string; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; + +use garage_table::replication::*; +use garage_table::*; + +use garage_rpc::layout::PARTITION_BITS; + +use garage_model::garage::Garage; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalCreateMetadataSnapshotRequest { + type Response = LocalCreateMetadataSnapshotResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + garage_model::snapshot::async_snapshot_metadata(garage).await?; + Ok(LocalCreateMetadataSnapshotResponse) + } +} + +impl RequestHandler for LocalGetNodeStatisticsRequest { + type Response = LocalGetNodeStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let mut ret = String::new(); + writeln!( + &mut ret, + "Garage version: {} [features: {}]\nRust compiler version: {}", + garage_util::version::garage_version(), + garage_util::version::garage_features() + .map(|list| list.join(", ")) + .unwrap_or_else(|| "(unknown)".into()), + garage_util::version::rust_version(), + ) + .unwrap(); + + writeln!(&mut ret, "\nDatabase engine: {}", garage.db.engine()).unwrap(); + + // Gather table statistics + let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + table.push(gather_table_stats(&garage.bucket_table)?); + table.push(gather_table_stats(&garage.key_table)?); + table.push(gather_table_stats(&garage.object_table)?); + table.push(gather_table_stats(&garage.version_table)?); + table.push(gather_table_stats(&garage.block_ref_table)?); + write!( + &mut ret, + "\nTable stats:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + // Gather block manager statistics + writeln!(&mut ret, "\nBlock manager stats:").unwrap(); + let rc_len = garage.block_manager.rc_len()?.to_string(); + + writeln!( + &mut ret, + " number of RC entries (~= number of blocks): {}", + rc_len + ) + .unwrap(); + writeln!( + &mut ret, + " resync queue length: {}", + garage.block_manager.resync.queue_len()? + ) + .unwrap(); + writeln!( + &mut ret, + " blocks with resync errors: {}", + garage.block_manager.resync.errors_len()? + ) + .unwrap(); + + Ok(LocalGetNodeStatisticsResponse { freeform: ret }) + } +} + +impl RequestHandler for GetClusterStatisticsRequest { + type Response = GetClusterStatisticsResponse; + + // FIXME: return this as a JSON struct instead of text + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let mut ret = String::new(); + + // Gather storage node and free space statistics for current nodes + let layout = &garage.system.cluster_layout(); + let mut node_partition_count = HashMap::::new(); + for short_id in layout.current().ring_assignment_data.iter() { + let id = layout.current().node_id_vec[*short_id as usize]; + *node_partition_count.entry(id).or_default() += 1; + } + let node_info = garage + .system + .get_known_nodes() + .into_iter() + .map(|n| (n.id, n)) + .collect::>(); + + let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; + for (id, parts) in node_partition_count.iter() { + let info = node_info.get(id); + let status = info.map(|x| &x.status); + let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); + let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); + let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); + let capacity = role + .map(|x| x.capacity_string()) + .unwrap_or_else(|| "?".into()); + let avail_str = |x| match x { + Some((avail, total)) => { + let pct = (avail as f64) / (total as f64) * 100.; + let avail = bytesize::ByteSize::b(avail); + let total = bytesize::ByteSize::b(total); + format!("{}/{} ({:.1}%)", avail, total, pct) + } + None => "?".into(), + }; + let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); + let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); + table.push(format!( + " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", + id, hostname, zone, capacity, parts, data_avail, meta_avail + )); + } + write!( + &mut ret, + "Storage nodes:\n{}", + format_table_to_string(table) + ) + .unwrap(); + + let meta_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.meta_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::>(); + let data_part_avail = node_partition_count + .iter() + .filter_map(|(id, parts)| { + node_info + .get(id) + .and_then(|x| x.status.data_disk_avail) + .map(|c| c.0 / *parts) + }) + .collect::>(); + if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { + let meta_avail = + bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + let data_avail = + bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); + writeln!( + &mut ret, + "\nEstimated available storage space cluster-wide (might be lower in practice):" + ) + .unwrap(); + if meta_part_avail.len() < node_partition_count.len() + || data_part_avail.len() < node_partition_count.len() + { + writeln!(&mut ret, " data: < {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); + writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); + } else { + writeln!(&mut ret, " data: {}", data_avail).unwrap(); + writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); + } + } + + Ok(GetClusterStatisticsResponse { freeform: ret }) + } +} + +fn gather_table_stats(t: &Arc>) -> Result +where + F: TableSchema + 'static, + R: TableReplication + 'static, +{ + let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); + let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); + + Ok(format!( + " {}\t{}\t{}\t{}\t{}", + F::TABLE_NAME, + data_len, + mkl_len, + t.merkle_updater.todo_len()?, + t.data.gc_todo_len()? + )) +} diff --git a/src/garage/repair/online.rs b/src/api/admin/repair.rs similarity index 69% rename from src/garage/repair/online.rs rename to src/api/admin/repair.rs index 2c5227d2..113ef636 100644 --- a/src/garage/repair/online.rs +++ b/src/api/admin/repair.rs @@ -4,6 +4,14 @@ use std::time::Duration; use async_trait::async_trait; use tokio::sync::watch; +use garage_util::background::*; +use garage_util::data::*; +use garage_util::error::{Error as GarageError, OkOrMessage}; +use garage_util::migrate::Migrate; + +use garage_table::replication::*; +use garage_table::*; + use garage_block::manager::BlockManager; use garage_block::repair::ScrubWorkerCommand; @@ -13,97 +21,90 @@ use garage_model::s3::mpu_table::*; use garage_model::s3::object_table::*; use garage_model::s3::version_table::*; -use garage_table::replication::*; -use garage_table::*; - -use garage_util::background::*; -use garage_util::data::*; -use garage_util::error::Error; -use garage_util::migrate::Migrate; - -use crate::*; +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; const RC_REPAIR_ITER_COUNT: usize = 64; -pub async fn launch_online_repair( - garage: &Arc, - bg: &BackgroundRunner, - opt: RepairOpt, -) -> Result<(), Error> { - match opt.what { - RepairWhat::Tables => { - info!("Launching a full sync of tables"); - garage.bucket_table.syncer.add_full_sync()?; - garage.object_table.syncer.add_full_sync()?; - garage.version_table.syncer.add_full_sync()?; - garage.block_ref_table.syncer.add_full_sync()?; - garage.key_table.syncer.add_full_sync()?; - } - RepairWhat::Versions => { - info!("Repairing the versions table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); - } - RepairWhat::MultipartUploads => { - info!("Repairing the multipart uploads table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); - } - RepairWhat::BlockRefs => { - info!("Repairing the block refs table"); - bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); - } - RepairWhat::BlockRc => { - info!("Repairing the block reference counters"); - bg.spawn_worker(BlockRcRepair::new( - garage.block_manager.clone(), - garage.block_ref_table.clone(), - )); - } - RepairWhat::Blocks => { - info!("Repairing the stored blocks"); - bg.spawn_worker(garage_block::repair::RepairWorker::new( - garage.block_manager.clone(), - )); - } - RepairWhat::Scrub { cmd } => { - let cmd = match cmd { - ScrubCmd::Start => ScrubWorkerCommand::Start, - ScrubCmd::Pause => ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)), - ScrubCmd::Resume => ScrubWorkerCommand::Resume, - ScrubCmd::Cancel => ScrubWorkerCommand::Cancel, - ScrubCmd::SetTranquility { tranquility } => { - garage - .block_manager - .scrub_persister - .set_with(|x| x.tranquility = tranquility)?; - return Ok(()); - } - }; - info!("Sending command to scrub worker: {:?}", cmd); - garage.block_manager.send_scrub_command(cmd).await?; - } - RepairWhat::Rebalance => { - info!("Rebalancing the stored blocks among storage locations"); - bg.spawn_worker(garage_block::repair::RebalanceWorker::new( - garage.block_manager.clone(), - )); +impl RequestHandler for LocalLaunchRepairOperationRequest { + type Response = LocalLaunchRepairOperationResponse; + + async fn handle( + self, + garage: &Arc, + admin: &Admin, + ) -> Result { + let bg = &admin.background; + match self.repair_type { + RepairType::Tables => { + info!("Launching a full sync of tables"); + garage.bucket_table.syncer.add_full_sync()?; + garage.object_table.syncer.add_full_sync()?; + garage.version_table.syncer.add_full_sync()?; + garage.block_ref_table.syncer.add_full_sync()?; + garage.key_table.syncer.add_full_sync()?; + } + RepairType::Versions => { + info!("Repairing the versions table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairVersions)); + } + RepairType::MultipartUploads => { + info!("Repairing the multipart uploads table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairMpu)); + } + RepairType::BlockRefs => { + info!("Repairing the block refs table"); + bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs)); + } + RepairType::BlockRc => { + info!("Repairing the block reference counters"); + bg.spawn_worker(BlockRcRepair::new( + garage.block_manager.clone(), + garage.block_ref_table.clone(), + )); + } + RepairType::Blocks => { + info!("Repairing the stored blocks"); + bg.spawn_worker(garage_block::repair::RepairWorker::new( + garage.block_manager.clone(), + )); + } + RepairType::Scrub(cmd) => { + let cmd = match cmd { + ScrubCommand::Start => ScrubWorkerCommand::Start, + ScrubCommand::Pause => { + ScrubWorkerCommand::Pause(Duration::from_secs(3600 * 24)) + } + ScrubCommand::Resume => ScrubWorkerCommand::Resume, + ScrubCommand::Cancel => ScrubWorkerCommand::Cancel, + }; + info!("Sending command to scrub worker: {:?}", cmd); + garage.block_manager.send_scrub_command(cmd).await?; + } + RepairType::Rebalance => { + info!("Rebalancing the stored blocks among storage locations"); + bg.spawn_worker(garage_block::repair::RebalanceWorker::new( + garage.block_manager.clone(), + )); + } } + Ok(LocalLaunchRepairOperationResponse) } - Ok(()) } // ---- -#[async_trait] trait TableRepair: Send + Sync + 'static { type T: TableSchema; fn table(garage: &Garage) -> &Table; - async fn process( + fn process( &mut self, garage: &Garage, entry: <::T as TableSchema>::E, - ) -> Result; + ) -> impl std::future::Future> + Send; } struct TableRepairWorker { @@ -139,7 +140,10 @@ impl Worker for TableRepairWorker { } } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { let (item_bytes, next_pos) = match R::table(&self.garage).data.store.get_gt(&self.pos)? { Some((k, v)) => (v, k), None => { @@ -174,7 +178,6 @@ impl Worker for TableRepairWorker { struct RepairVersions; -#[async_trait] impl TableRepair for RepairVersions { type T = VersionTable; @@ -182,7 +185,7 @@ impl TableRepair for RepairVersions { &garage.version_table } - async fn process(&mut self, garage: &Garage, version: Version) -> Result { + async fn process(&mut self, garage: &Garage, version: Version) -> Result { if !version.deleted.get() { let ref_exists = match &version.backlink { VersionBacklink::Object { bucket_id, key } => garage @@ -221,7 +224,6 @@ impl TableRepair for RepairVersions { struct RepairBlockRefs; -#[async_trait] impl TableRepair for RepairBlockRefs { type T = BlockRefTable; @@ -229,7 +231,11 @@ impl TableRepair for RepairBlockRefs { &garage.block_ref_table } - async fn process(&mut self, garage: &Garage, mut block_ref: BlockRef) -> Result { + async fn process( + &mut self, + garage: &Garage, + mut block_ref: BlockRef, + ) -> Result { if !block_ref.deleted.get() { let ref_exists = garage .version_table @@ -257,7 +263,6 @@ impl TableRepair for RepairBlockRefs { struct RepairMpu; -#[async_trait] impl TableRepair for RepairMpu { type T = MultipartUploadTable; @@ -265,7 +270,11 @@ impl TableRepair for RepairMpu { &garage.mpu_table } - async fn process(&mut self, garage: &Garage, mut mpu: MultipartUpload) -> Result { + async fn process( + &mut self, + garage: &Garage, + mut mpu: MultipartUpload, + ) -> Result { if !mpu.deleted.get() { let ref_exists = garage .object_table @@ -332,7 +341,10 @@ impl Worker for BlockRcRepair { } } - async fn work(&mut self, _must_exit: &mut watch::Receiver) -> Result { + async fn work( + &mut self, + _must_exit: &mut watch::Receiver, + ) -> Result { for _i in 0..RC_REPAIR_ITER_COUNT { let next1 = self .block_manager diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index b36bca34..4d5c015e 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -52,12 +52,28 @@ impl AdminApiRequest { POST CreateBucket (body), POST DeleteBucket (query::id), POST UpdateBucket (body_field, query::id), + POST CleanupIncompleteUploads (body), // Bucket-key permissions POST AllowBucketKey (body), POST DenyBucketKey (body), // Bucket aliases POST AddBucketAlias (body), POST RemoveBucketAlias (body), + // Node APIs + POST CreateMetadataSnapshot (default::body, query::node), + GET GetNodeStatistics (default::body, query::node), + GET GetClusterStatistics (), + POST LaunchRepairOperation (body_field, query::node), + // Worker APIs + POST ListWorkers (body_field, query::node), + POST GetWorkerInfo (body_field, query::node), + POST GetWorkerVariable (body_field, query::node), + POST SetWorkerVariable (body_field, query::node), + // Block APIs + GET ListBlockErrors (default::body, query::node), + POST GetBlockInfo (body_field, query::node), + POST RetryBlockResync (body_field, query::node), + POST PurgeBlocks (body_field, query::node), ]); if let Some(message) = query.nonempty_message() { @@ -239,6 +255,7 @@ impl AdminApiRequest { generateQueryParameters! { keywords: [], fields: [ + "node" => node, "domain" => domain, "format" => format, "id" => id, diff --git a/src/api/admin/special.rs b/src/api/admin/special.rs index 0b26fe32..0ecf82bc 100644 --- a/src/api/admin/special.rs +++ b/src/api/admin/special.rs @@ -1,27 +1,31 @@ use std::sync::Arc; -use async_trait::async_trait; - use http::header::{ ACCESS_CONTROL_ALLOW_HEADERS, ACCESS_CONTROL_ALLOW_METHODS, ACCESS_CONTROL_ALLOW_ORIGIN, ALLOW, }; use hyper::{Response, StatusCode}; +#[cfg(feature = "metrics")] +use prometheus::{Encoder, TextEncoder}; + use garage_model::garage::Garage; use garage_rpc::system::ClusterHealthStatus; use garage_api_common::helpers::*; -use crate::api::{CheckDomainRequest, HealthRequest, OptionsRequest}; +use crate::api::{CheckDomainRequest, HealthRequest, MetricsRequest, OptionsRequest}; use crate::api_server::ResBody; use crate::error::*; -use crate::EndpointHandler; +use crate::{Admin, RequestHandler}; -#[async_trait] -impl EndpointHandler for OptionsRequest { +impl RequestHandler for OptionsRequest { type Response = Response; - async fn handle(self, _garage: &Arc) -> Result, Error> { + async fn handle( + self, + _garage: &Arc, + _admin: &Admin, + ) -> Result, Error> { Ok(Response::builder() .status(StatusCode::OK) .header(ALLOW, "OPTIONS,GET,POST") @@ -32,11 +36,83 @@ impl EndpointHandler for OptionsRequest { } } -#[async_trait] -impl EndpointHandler for CheckDomainRequest { +impl RequestHandler for MetricsRequest { type Response = Response; - async fn handle(self, garage: &Arc) -> Result, Error> { + async fn handle( + self, + _garage: &Arc, + admin: &Admin, + ) -> Result, Error> { + #[cfg(feature = "metrics")] + { + use opentelemetry::trace::Tracer; + + let mut buffer = vec![]; + let encoder = TextEncoder::new(); + + let tracer = opentelemetry::global::tracer("garage"); + let metric_families = tracer.in_span("admin/gather_metrics", |_| { + admin.exporter.registry().gather() + }); + + encoder + .encode(&metric_families, &mut buffer) + .ok_or_internal_error("Could not serialize metrics")?; + + Ok(Response::builder() + .status(StatusCode::OK) + .header(http::header::CONTENT_TYPE, encoder.format_type()) + .body(bytes_body(buffer.into()))?) + } + #[cfg(not(feature = "metrics"))] + Err(Error::bad_request( + "Garage was built without the metrics feature".to_string(), + )) + } +} + +impl RequestHandler for HealthRequest { + type Response = Response; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result, Error> { + let health = garage.system.health(); + + let (status, status_str) = match health.status { + ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), + ClusterHealthStatus::Degraded => ( + StatusCode::OK, + "Garage is operational but some storage nodes are unavailable", + ), + ClusterHealthStatus::Unavailable => ( + StatusCode::SERVICE_UNAVAILABLE, + "Quorum is not available for some/all partitions, reads and writes will fail", + ), + }; + let status_str = format!( + "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n", + status_str + ); + + Ok(Response::builder() + .status(status) + .header(http::header::CONTENT_TYPE, "text/plain") + .body(string_body(status_str))?) + } +} + +impl RequestHandler for CheckDomainRequest { + type Response = Response; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result, Error> { if check_domain(garage, &self.domain).await? { Ok(Response::builder() .status(StatusCode::OK) @@ -101,33 +177,3 @@ async fn check_domain(garage: &Arc, domain: &str) -> Result None => Ok(false), } } - -#[async_trait] -impl EndpointHandler for HealthRequest { - type Response = Response; - - async fn handle(self, garage: &Arc) -> Result, Error> { - let health = garage.system.health(); - - let (status, status_str) = match health.status { - ClusterHealthStatus::Healthy => (StatusCode::OK, "Garage is fully operational"), - ClusterHealthStatus::Degraded => ( - StatusCode::OK, - "Garage is operational but some storage nodes are unavailable", - ), - ClusterHealthStatus::Unavailable => ( - StatusCode::SERVICE_UNAVAILABLE, - "Quorum is not available for some/all partitions, reads and writes will fail", - ), - }; - let status_str = format!( - "{}\nConsult the full health check API endpoint at /v2/GetClusterHealth for more details\n", - status_str - ); - - Ok(Response::builder() - .status(status) - .header(http::header::CONTENT_TYPE, "text/plain") - .body(string_body(status_str))?) - } -} diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs new file mode 100644 index 00000000..b3f4537b --- /dev/null +++ b/src/api/admin/worker.rs @@ -0,0 +1,118 @@ +use std::collections::HashMap; +use std::sync::Arc; + +use garage_util::background::*; +use garage_util::time::now_msec; + +use garage_model::garage::Garage; + +use crate::api::*; +use crate::error::Error; +use crate::{Admin, RequestHandler}; + +impl RequestHandler for LocalListWorkersRequest { + type Response = LocalListWorkersResponse; + + async fn handle( + self, + _garage: &Arc, + admin: &Admin, + ) -> Result { + let workers = admin.background.get_worker_info(); + let info = workers + .into_iter() + .filter(|(_, w)| { + (!self.busy_only + || matches!(w.state, WorkerState::Busy | WorkerState::Throttled(_))) + && (!self.error_only || w.errors > 0) + }) + .map(|(id, w)| worker_info_to_api(id as u64, w)) + .collect::>(); + Ok(LocalListWorkersResponse(info)) + } +} + +impl RequestHandler for LocalGetWorkerInfoRequest { + type Response = LocalGetWorkerInfoResponse; + + async fn handle( + self, + _garage: &Arc, + admin: &Admin, + ) -> Result { + let info = admin + .background + .get_worker_info() + .get(&(self.id as usize)) + .ok_or(Error::NoSuchWorker(self.id))? + .clone(); + Ok(LocalGetWorkerInfoResponse(worker_info_to_api( + self.id, info, + ))) + } +} + +impl RequestHandler for LocalGetWorkerVariableRequest { + type Response = LocalGetWorkerVariableResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let mut res = HashMap::new(); + if let Some(k) = self.variable { + res.insert(k.clone(), garage.bg_vars.get(&k)?); + } else { + let vars = garage.bg_vars.get_all(); + for (k, v) in vars.iter() { + res.insert(k.to_string(), v.to_string()); + } + } + Ok(LocalGetWorkerVariableResponse(res)) + } +} + +impl RequestHandler for LocalSetWorkerVariableRequest { + type Response = LocalSetWorkerVariableResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + garage.bg_vars.set(&self.variable, &self.value)?; + + Ok(LocalSetWorkerVariableResponse { + variable: self.variable, + value: self.value, + }) + } +} + +// ---- helper functions ---- + +fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { + WorkerInfoResp { + id, + name: info.name, + state: match info.state { + WorkerState::Busy => WorkerStateResp::Busy, + WorkerState::Throttled(t) => WorkerStateResp::Throttled { duration_secs: t }, + WorkerState::Idle => WorkerStateResp::Idle, + WorkerState::Done => WorkerStateResp::Done, + }, + errors: info.errors as u64, + consecutive_errors: info.consecutive_errors as u64, + last_error: info.last_error.map(|(message, t)| WorkerLastError { + message, + secs_ago: now_msec().saturating_sub(t) / 1000, + }), + + tranquility: info.status.tranquility, + progress: info.status.progress, + queue_length: info.status.queue_length, + persistent_errors: info.status.persistent_errors, + freeform: info.status.freeform, + } +} diff --git a/src/api/common/router_macros.rs b/src/api/common/router_macros.rs index 299420f7..f4a93c67 100644 --- a/src/api/common/router_macros.rs +++ b/src/api/common/router_macros.rs @@ -141,6 +141,9 @@ macro_rules! router_match { } }}; + (@@parse_param $query:expr, default, $param:ident) => {{ + Default::default() + }}; (@@parse_param $query:expr, query_opt, $param:ident) => {{ // extract optional query parameter $query.$param.take().map(|param| param.into_owned()) diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 4f823fc6..c566c3e0 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -49,8 +49,6 @@ sodiumoxide.workspace = true structopt.workspace = true git-version.workspace = true -serde.workspace = true - futures.workspace = true tokio.workspace = true diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs deleted file mode 100644 index edeb88c0..00000000 --- a/src/garage/admin/block.rs +++ /dev/null @@ -1,235 +0,0 @@ -use garage_util::data::*; - -use garage_table::*; - -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::s3::object_table::*; -use garage_model::s3::version_table::*; - -use crate::cli::*; - -use super::*; - -impl AdminRpcHandler { - pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result { - match cmd { - BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList( - self.garage.block_manager.list_resync_errors()?, - )), - BlockOperation::Info { hash } => self.handle_block_info(hash).await, - BlockOperation::RetryNow { all, blocks } => { - self.handle_block_retry_now(*all, blocks).await - } - BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await, - } - } - - async fn handle_block_info(&self, hash: &String) -> Result { - let hash = self.find_block_hash_by_prefix(hash)?; - let refcount = self.garage.block_manager.get_block_rc(&hash)?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - let mut versions = vec![]; - let mut uploads = vec![]; - for br in block_refs { - if let Some(v) = self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { - if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { - uploads.push(u); - } - } - versions.push(Ok(v)); - } else { - versions.push(Err(br.version)); - } - } - Ok(AdminRpc::BlockInfo { - hash, - refcount, - versions, - uploads, - }) - } - - async fn handle_block_retry_now( - &self, - all: bool, - blocks: &[String], - ) -> Result { - if all { - if !blocks.is_empty() { - return Err(Error::BadRequest( - "--all was specified, cannot also specify blocks".into(), - )); - } - let blocks = self.garage.block_manager.list_resync_errors()?; - for b in blocks.iter() { - self.garage.block_manager.resync.clear_backoff(&b.hash)?; - } - Ok(AdminRpc::Ok(format!( - "{} blocks returned in queue for a retry now (check logs to see results)", - blocks.len() - ))) - } else { - for hash in blocks { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - self.garage.block_manager.resync.clear_backoff(&hash)?; - } - Ok(AdminRpc::Ok(format!( - "{} blocks returned in queue for a retry now (check logs to see results)", - blocks.len() - ))) - } - } - - async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result { - if !yes { - return Err(Error::BadRequest( - "Pass the --yes flag to confirm block purge operation.".into(), - )); - } - - let mut obj_dels = 0; - let mut mpu_dels = 0; - let mut ver_dels = 0; - - for hash in blocks { - let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?; - let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - - for br in block_refs { - if let Some(version) = self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - self.handle_block_purge_version_backlink( - &version, - &mut obj_dels, - &mut mpu_dels, - ) - .await?; - - if !version.deleted.get() { - let deleted_version = Version::new(version.uuid, version.backlink, true); - self.garage.version_table.insert(&deleted_version).await?; - ver_dels += 1; - } - } - } - } - - Ok(AdminRpc::Ok(format!( - "Purged {} blocks, {} versions, {} objects, {} multipart uploads", - blocks.len(), - ver_dels, - obj_dels, - mpu_dels, - ))) - } - - async fn handle_block_purge_version_backlink( - &self, - version: &Version, - obj_dels: &mut usize, - mpu_dels: &mut usize, - ) -> Result<(), Error> { - let (bucket_id, key, ov_id) = match &version.backlink { - VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid), - VersionBacklink::MultipartUpload { upload_id } => { - if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { - if !mpu.deleted.get() { - mpu.parts.clear(); - mpu.deleted.set(); - self.garage.mpu_table.insert(&mpu).await?; - *mpu_dels += 1; - } - (mpu.bucket_id, mpu.key.clone(), *upload_id) - } else { - return Ok(()); - } - } - }; - - if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? { - let ov = object.versions().iter().rev().find(|v| v.is_complete()); - if let Some(ov) = ov { - if ov.uuid == ov_id { - let del_uuid = gen_uuid(); - let deleted_object = Object::new( - bucket_id, - key, - vec![ObjectVersion { - uuid: del_uuid, - timestamp: ov.timestamp + 1, - state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker), - }], - ); - self.garage.object_table.insert(&deleted_object).await?; - *obj_dels += 1; - } - } - } - - Ok(()) - } - - // ---- helper function ---- - fn find_block_hash_by_prefix(&self, prefix: &str) -> Result { - if prefix.len() < 4 { - return Err(Error::BadRequest( - "Please specify at least 4 characters of the block hash".into(), - )); - } - - let prefix_bin = - hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; - - let iter = self - .garage - .block_ref_table - .data - .store - .range(&prefix_bin[..]..) - .map_err(GarageError::from)?; - let mut found = None; - for item in iter { - let (k, _v) = item.map_err(GarageError::from)?; - let hash = Hash::try_from(&k[..32]).unwrap(); - if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { - break; - } - if hex::encode(hash.as_slice()).starts_with(prefix) { - match &found { - Some(x) if *x == hash => (), - Some(_) => { - return Err(Error::BadRequest(format!( - "Several blocks match prefix `{}`", - prefix - ))); - } - None => { - found = Some(hash); - } - } - } - } - - found.ok_or_else(|| Error::BadRequest("No matching block found".into())) - } -} diff --git a/src/garage/admin/bucket.rs b/src/garage/admin/bucket.rs deleted file mode 100644 index 26d54084..00000000 --- a/src/garage/admin/bucket.rs +++ /dev/null @@ -1,53 +0,0 @@ -use std::fmt::Write; - -use garage_model::helper::error::{Error, OkOrBadRequest}; - -use crate::cli::*; - -use super::*; - -impl AdminRpcHandler { - pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result { - match cmd { - BucketOperation::CleanupIncompleteUploads(query) => { - self.handle_bucket_cleanup_incomplete_uploads(query).await - } - _ => unreachable!(), - } - } - - async fn handle_bucket_cleanup_incomplete_uploads( - &self, - query: &CleanupIncompleteUploadsOpt, - ) -> Result { - let mut bucket_ids = vec![]; - for b in query.buckets.iter() { - bucket_ids.push( - self.garage - .bucket_helper() - .admin_get_existing_matching_bucket(b) - .await?, - ); - } - - let duration = parse_duration::parse::parse(&query.older_than) - .ok_or_bad_request("Invalid duration passed for --older-than parameter")?; - - let mut ret = String::new(); - for bucket in bucket_ids { - let count = self - .garage - .bucket_helper() - .cleanup_incomplete_uploads(&bucket, duration) - .await?; - writeln!( - &mut ret, - "Bucket {:?}: {} incomplete uploads aborted", - bucket, count - ) - .unwrap(); - } - - Ok(AdminRpc::Ok(ret)) - } -} diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs deleted file mode 100644 index 70f8ec67..00000000 --- a/src/garage/admin/mod.rs +++ /dev/null @@ -1,545 +0,0 @@ -mod block; -mod bucket; - -use std::collections::HashMap; -use std::fmt::Write; -use std::sync::Arc; - -use async_trait::async_trait; -use serde::{Deserialize, Serialize}; - -use format_table::format_table_to_string; - -use garage_util::background::BackgroundRunner; -use garage_util::data::*; -use garage_util::error::Error as GarageError; - -use garage_table::replication::*; -use garage_table::*; - -use garage_rpc::layout::PARTITION_BITS; -use garage_rpc::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::garage::Garage; -use garage_model::helper::error::{Error, OkOrBadRequest}; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::Version; - -use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse}; -use garage_api_admin::EndpointHandler as AdminApiEndpoint; -use garage_api_common::generic_server::ApiError; - -use crate::cli::*; -use crate::repair::online::launch_online_repair; - -pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc"; - -#[derive(Debug, Serialize, Deserialize)] -#[allow(clippy::large_enum_variant)] -pub enum AdminRpc { - BucketOperation(BucketOperation), - LaunchRepair(RepairOpt), - Stats(StatsOpt), - Worker(WorkerOperation), - BlockOperation(BlockOperation), - MetaOperation(MetaOperation), - - // Replies - Ok(String), - WorkerList( - HashMap, - WorkerListOpt, - ), - WorkerVars(Vec<(Uuid, String, String)>), - WorkerInfo(usize, garage_util::background::WorkerInfo), - BlockErrorList(Vec), - BlockInfo { - hash: Hash, - refcount: u64, - versions: Vec>, - uploads: Vec, - }, - - // Proxying HTTP Admin API endpoints - ApiRequest(AdminApiRequest), - ApiOkResponse(TaggedAdminApiResponse), - ApiErrorResponse { - http_code: u16, - error_code: String, - message: String, - }, -} - -impl Rpc for AdminRpc { - type Response = Result; -} - -pub struct AdminRpcHandler { - garage: Arc, - background: Arc, - endpoint: Arc>, -} - -impl AdminRpcHandler { - pub fn new(garage: Arc, background: Arc) -> Arc { - let endpoint = garage.system.netapp.endpoint(ADMIN_RPC_PATH.into()); - let admin = Arc::new(Self { - garage, - background, - endpoint, - }); - admin.endpoint.set_handler(admin.clone()); - admin - } - - // ================ REPAIR COMMANDS ==================== - - async fn handle_launch_repair(self: &Arc, opt: RepairOpt) -> Result { - if !opt.yes { - return Err(Error::BadRequest( - "Please provide the --yes flag to initiate repair operations.".to_string(), - )); - } - if opt.all_nodes { - let mut opt_to_send = opt.clone(); - opt_to_send.all_nodes = false; - - let mut failures = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - let resp = self - .endpoint - .call( - &node, - AdminRpc::LaunchRepair(opt_to_send.clone()), - PRIO_NORMAL, - ) - .await; - if !matches!(resp, Ok(Ok(_))) { - failures.push(node); - } - } - if failures.is_empty() { - Ok(AdminRpc::Ok("Repair launched on all nodes".to_string())) - } else { - Err(Error::BadRequest(format!( - "Could not launch repair on nodes: {:?} (launched successfully on other nodes)", - failures - ))) - } - } else { - launch_online_repair(&self.garage, &self.background, opt).await?; - Ok(AdminRpc::Ok(format!( - "Repair launched on {:?}", - self.garage.system.id - ))) - } - } - - // ================ STATS COMMANDS ==================== - - async fn handle_stats(&self, opt: StatsOpt) -> Result { - if opt.all_nodes { - let mut ret = String::new(); - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - - for node in all_nodes.iter() { - let mut opt = opt.clone(); - opt.all_nodes = false; - opt.skip_global = true; - - writeln!(&mut ret, "\n======================").unwrap(); - writeln!(&mut ret, "Stats for node {:?}:", node).unwrap(); - - let node_id = (*node).into(); - match self - .endpoint - .call(&node_id, AdminRpc::Stats(opt), PRIO_NORMAL) - .await - { - Ok(Ok(AdminRpc::Ok(s))) => writeln!(&mut ret, "{}", s).unwrap(), - Ok(Ok(x)) => writeln!(&mut ret, "Bad answer: {:?}", x).unwrap(), - Ok(Err(e)) => writeln!(&mut ret, "Remote error: {}", e).unwrap(), - Err(e) => writeln!(&mut ret, "Network error: {}", e).unwrap(), - } - } - - writeln!(&mut ret, "\n======================").unwrap(); - write!( - &mut ret, - "Cluster statistics:\n\n{}", - self.gather_cluster_stats() - ) - .unwrap(); - - Ok(AdminRpc::Ok(ret)) - } else { - Ok(AdminRpc::Ok(self.gather_stats_local(opt)?)) - } - } - - fn gather_stats_local(&self, opt: StatsOpt) -> Result { - let mut ret = String::new(); - writeln!( - &mut ret, - "\nGarage version: {} [features: {}]\nRust compiler version: {}", - garage_util::version::garage_version(), - garage_util::version::garage_features() - .map(|list| list.join(", ")) - .unwrap_or_else(|| "(unknown)".into()), - garage_util::version::rust_version(), - ) - .unwrap(); - - writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); - - // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; - table.push(self.gather_table_stats(&self.garage.bucket_table)?); - table.push(self.gather_table_stats(&self.garage.key_table)?); - table.push(self.gather_table_stats(&self.garage.object_table)?); - table.push(self.gather_table_stats(&self.garage.version_table)?); - table.push(self.gather_table_stats(&self.garage.block_ref_table)?); - write!( - &mut ret, - "\nTable stats:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - // Gather block manager statistics - writeln!(&mut ret, "\nBlock manager stats:").unwrap(); - let rc_len = self.garage.block_manager.rc_len()?.to_string(); - - writeln!( - &mut ret, - " number of RC entries (~= number of blocks): {}", - rc_len - ) - .unwrap(); - writeln!( - &mut ret, - " resync queue length: {}", - self.garage.block_manager.resync.queue_len()? - ) - .unwrap(); - writeln!( - &mut ret, - " blocks with resync errors: {}", - self.garage.block_manager.resync.errors_len()? - ) - .unwrap(); - - if !opt.skip_global { - write!(&mut ret, "\n{}", self.gather_cluster_stats()).unwrap(); - } - - Ok(ret) - } - - fn gather_cluster_stats(&self) -> String { - let mut ret = String::new(); - - // Gather storage node and free space statistics for current nodes - let layout = &self.garage.system.cluster_layout(); - let mut node_partition_count = HashMap::::new(); - for short_id in layout.current().ring_assignment_data.iter() { - let id = layout.current().node_id_vec[*short_id as usize]; - *node_partition_count.entry(id).or_default() += 1; - } - let node_info = self - .garage - .system - .get_known_nodes() - .into_iter() - .map(|n| (n.id, n)) - .collect::>(); - - let mut table = vec![" ID\tHostname\tZone\tCapacity\tPart.\tDataAvail\tMetaAvail".into()]; - for (id, parts) in node_partition_count.iter() { - let info = node_info.get(id); - let status = info.map(|x| &x.status); - let role = layout.current().roles.get(id).and_then(|x| x.0.as_ref()); - let hostname = status.and_then(|x| x.hostname.as_deref()).unwrap_or("?"); - let zone = role.map(|x| x.zone.as_str()).unwrap_or("?"); - let capacity = role - .map(|x| x.capacity_string()) - .unwrap_or_else(|| "?".into()); - let avail_str = |x| match x { - Some((avail, total)) => { - let pct = (avail as f64) / (total as f64) * 100.; - let avail = bytesize::ByteSize::b(avail); - let total = bytesize::ByteSize::b(total); - format!("{}/{} ({:.1}%)", avail, total, pct) - } - None => "?".into(), - }; - let data_avail = avail_str(status.and_then(|x| x.data_disk_avail)); - let meta_avail = avail_str(status.and_then(|x| x.meta_disk_avail)); - table.push(format!( - " {:?}\t{}\t{}\t{}\t{}\t{}\t{}", - id, hostname, zone, capacity, parts, data_avail, meta_avail - )); - } - write!( - &mut ret, - "Storage nodes:\n{}", - format_table_to_string(table) - ) - .unwrap(); - - let meta_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.meta_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - let data_part_avail = node_partition_count - .iter() - .filter_map(|(id, parts)| { - node_info - .get(id) - .and_then(|x| x.status.data_disk_avail) - .map(|c| c.0 / *parts) - }) - .collect::>(); - if !meta_part_avail.is_empty() && !data_part_avail.is_empty() { - let meta_avail = - bytesize::ByteSize(meta_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - let data_avail = - bytesize::ByteSize(data_part_avail.iter().min().unwrap() * (1 << PARTITION_BITS)); - writeln!( - &mut ret, - "\nEstimated available storage space cluster-wide (might be lower in practice):" - ) - .unwrap(); - if meta_part_avail.len() < node_partition_count.len() - || data_part_avail.len() < node_partition_count.len() - { - writeln!(&mut ret, " data: < {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: < {}", meta_avail).unwrap(); - writeln!(&mut ret, "A precise estimate could not be given as information is missing for some storage nodes.").unwrap(); - } else { - writeln!(&mut ret, " data: {}", data_avail).unwrap(); - writeln!(&mut ret, " metadata: {}", meta_avail).unwrap(); - } - } - - ret - } - - fn gather_table_stats(&self, t: &Arc>) -> Result - where - F: TableSchema + 'static, - R: TableReplication + 'static, - { - let data_len = t.data.store.len().map_err(GarageError::from)?.to_string(); - let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); - - Ok(format!( - " {}\t{}\t{}\t{}\t{}", - F::TABLE_NAME, - data_len, - mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? - )) - } - - // ================ WORKER COMMANDS ==================== - - async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result { - match cmd { - WorkerOperation::List { opt } => { - let workers = self.background.get_worker_info(); - Ok(AdminRpc::WorkerList(workers, *opt)) - } - WorkerOperation::Info { tid } => { - let info = self - .background - .get_worker_info() - .get(tid) - .ok_or_bad_request(format!("No worker with TID {}", tid))? - .clone(); - Ok(AdminRpc::WorkerInfo(*tid, info)) - } - WorkerOperation::Get { - all_nodes, - variable, - } => self.handle_get_var(*all_nodes, variable).await, - WorkerOperation::Set { - all_nodes, - variable, - value, - } => self.handle_set_var(*all_nodes, variable, value).await, - } - } - - async fn handle_get_var( - &self, - all_nodes: bool, - variable: &Option, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Get { - all_nodes: false, - variable: variable.clone(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - #[allow(clippy::collapsible_else_if)] - if let Some(v) = variable { - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - v.clone(), - self.garage.bg_vars.get(v)?, - )])) - } else { - let mut vars = self.garage.bg_vars.get_all(); - vars.sort(); - Ok(AdminRpc::WorkerVars( - vars.into_iter() - .map(|(k, v)| (self.garage.system.id, k.to_string(), v)) - .collect(), - )) - } - } - } - - async fn handle_set_var( - &self, - all_nodes: bool, - variable: &str, - value: &str, - ) -> Result { - if all_nodes { - let mut ret = vec![]; - let all_nodes = self.garage.system.cluster_layout().all_nodes().to_vec(); - for node in all_nodes.iter() { - let node = (*node).into(); - match self - .endpoint - .call( - &node, - AdminRpc::Worker(WorkerOperation::Set { - all_nodes: false, - variable: variable.to_string(), - value: value.to_string(), - }), - PRIO_NORMAL, - ) - .await?? - { - AdminRpc::WorkerVars(v) => ret.extend(v), - m => return Err(GarageError::unexpected_rpc_message(m).into()), - } - } - Ok(AdminRpc::WorkerVars(ret)) - } else { - self.garage.bg_vars.set(variable, value)?; - Ok(AdminRpc::WorkerVars(vec![( - self.garage.system.id, - variable.to_string(), - value.to_string(), - )])) - } - } - - // ================ META DB COMMANDS ==================== - - async fn handle_meta_cmd(self: &Arc, mo: &MetaOperation) -> Result { - match mo { - MetaOperation::Snapshot { all: true } => { - let to = self.garage.system.cluster_layout().all_nodes().to_vec(); - - let resps = futures::future::join_all(to.iter().map(|to| async move { - let to = (*to).into(); - self.endpoint - .call( - &to, - AdminRpc::MetaOperation(MetaOperation::Snapshot { all: false }), - PRIO_NORMAL, - ) - .await - })) - .await; - - let mut ret = vec![]; - for (to, resp) in to.iter().zip(resps.iter()) { - let res_str = match resp { - Ok(_) => "ok".to_string(), - Err(e) => format!("error: {}", e), - }; - ret.push(format!("{:?}\t{}", to, res_str)); - } - - Ok(AdminRpc::Ok(format_table_to_string(ret))) - } - MetaOperation::Snapshot { all: false } => { - garage_model::snapshot::async_snapshot_metadata(&self.garage).await?; - Ok(AdminRpc::Ok("Snapshot has been saved.".into())) - } - } - } - - // ================== PROXYING ADMIN API REQUESTS =================== - - async fn handle_api_request( - self: &Arc, - req: &AdminApiRequest, - ) -> Result { - let req = req.clone(); - info!("Proxied admin API request: {}", req.name()); - let res = req.handle(&self.garage).await; - match res { - Ok(res) => Ok(AdminRpc::ApiOkResponse(res.tagged())), - Err(e) => Ok(AdminRpc::ApiErrorResponse { - http_code: e.http_status_code().as_u16(), - error_code: e.code().to_string(), - message: e.to_string(), - }), - } - } -} - -#[async_trait] -impl EndpointHandler for AdminRpcHandler { - async fn handle( - self: &Arc, - message: &AdminRpc, - _from: NodeID, - ) -> Result { - match message { - AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await, - AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await, - AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await, - AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await, - AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await, - AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await, - AdminRpc::ApiRequest(r) => self.handle_api_request(r).await, - m => Err(GarageError::unexpected_rpc_message(m).into()), - } - } -} diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs deleted file mode 100644 index a6540c65..00000000 --- a/src/garage/cli/cmd.rs +++ /dev/null @@ -1,60 +0,0 @@ -use garage_util::error::*; - -use garage_rpc::system::*; -use garage_rpc::*; - -use garage_model::helper::error::Error as HelperError; - -use crate::admin::*; -use crate::cli::*; - -pub async fn cmd_admin( - rpc_cli: &Endpoint, - rpc_host: NodeID, - args: AdminRpc, -) -> Result<(), HelperError> { - match rpc_cli.call(&rpc_host, args, PRIO_NORMAL).await?? { - AdminRpc::Ok(msg) => { - println!("{}", msg); - } - AdminRpc::WorkerList(wi, wlo) => { - print_worker_list(wi, wlo); - } - AdminRpc::WorkerVars(wv) => { - print_worker_vars(wv); - } - AdminRpc::WorkerInfo(tid, wi) => { - print_worker_info(tid, wi); - } - AdminRpc::BlockErrorList(el) => { - print_block_error_list(el); - } - AdminRpc::BlockInfo { - hash, - refcount, - versions, - uploads, - } => { - print_block_info(hash, refcount, versions, uploads); - } - r => { - error!("Unexpected response: {:?}", r); - } - } - Ok(()) -} - -// ---- utility ---- - -pub async fn fetch_status( - rpc_cli: &Endpoint, - rpc_host: NodeID, -) -> Result, Error> { - match rpc_cli - .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) - .await?? - { - SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), - resp => Err(Error::unexpected_rpc_message(resp)), - } -} diff --git a/src/garage/cli/layout.rs b/src/garage/cli/layout.rs index bb81d144..bb77cc2a 100644 --- a/src/garage/cli/layout.rs +++ b/src/garage/cli/layout.rs @@ -7,7 +7,7 @@ use garage_rpc::layout::*; use garage_rpc::system::*; use garage_rpc::*; -use crate::cli::*; +use crate::cli::structs::*; pub async fn cmd_show_layout( rpc_cli: &Endpoint, @@ -260,6 +260,19 @@ pub async fn cmd_layout_skip_dead_nodes( // --- utility --- +pub async fn fetch_status( + rpc_cli: &Endpoint, + rpc_host: NodeID, +) -> Result, Error> { + match rpc_cli + .call(&rpc_host, SystemRpc::GetKnownNodes, PRIO_NORMAL) + .await?? + { + SystemRpc::ReturnKnownNodes(nodes) => Ok(nodes), + resp => Err(Error::unexpected_rpc_message(resp)), + } +} + pub async fn fetch_layout( rpc_cli: &Endpoint, rpc_host: NodeID, diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index 30f566e2..e007808b 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -1,12 +1,7 @@ -pub(crate) mod cmd; -pub(crate) mod init; -pub(crate) mod layout; pub(crate) mod structs; -pub(crate) mod util; pub(crate) mod convert_db; +pub(crate) mod init; +pub(crate) mod repair; -pub(crate) use cmd::*; -pub(crate) use init::*; -pub(crate) use structs::*; -pub(crate) use util::*; +pub(crate) mod layout; diff --git a/src/garage/repair/offline.rs b/src/garage/cli/repair.rs similarity index 100% rename from src/garage/repair/offline.rs rename to src/garage/cli/repair.rs diff --git a/src/garage/cli/structs.rs b/src/garage/cli/structs.rs index 4ec35e68..c6471515 100644 --- a/src/garage/cli/structs.rs +++ b/src/garage/cli/structs.rs @@ -1,4 +1,3 @@ -use serde::{Deserialize, Serialize}; use structopt::StructOpt; use garage_util::version::garage_version; @@ -190,7 +189,7 @@ pub struct SkipDeadNodesOpt { pub(crate) allow_missing_data: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum BucketOperation { /// List buckets #[structopt(name = "list", version = garage_version())] @@ -237,7 +236,7 @@ pub enum BucketOperation { CleanupIncompleteUploads(CleanupIncompleteUploadsOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct WebsiteOpt { /// Create #[structopt(long = "allow")] @@ -259,13 +258,13 @@ pub struct WebsiteOpt { pub error_document: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct BucketOpt { /// Bucket name pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct DeleteBucketOpt { /// Bucket name pub name: String, @@ -275,7 +274,7 @@ pub struct DeleteBucketOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct AliasBucketOpt { /// Existing bucket name (its alias in global namespace or its full hex uuid) pub existing_bucket: String, @@ -288,7 +287,7 @@ pub struct AliasBucketOpt { pub local: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct UnaliasBucketOpt { /// Bucket name pub name: String, @@ -298,7 +297,7 @@ pub struct UnaliasBucketOpt { pub local: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct PermBucketOpt { /// Access key name or ID #[structopt(long = "key")] @@ -321,7 +320,7 @@ pub struct PermBucketOpt { pub bucket: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct SetQuotasOpt { /// Bucket name pub bucket: String, @@ -336,7 +335,7 @@ pub struct SetQuotasOpt { pub max_objects: Option, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct CleanupIncompleteUploadsOpt { /// Abort multipart uploads older than this value #[structopt(long = "older-than", default_value = "1d")] @@ -347,7 +346,7 @@ pub struct CleanupIncompleteUploadsOpt { pub buckets: Vec, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub enum KeyOperation { /// List keys #[structopt(name = "list", version = garage_version())] @@ -382,7 +381,7 @@ pub enum KeyOperation { Import(KeyImportOpt), } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyInfoOpt { /// ID or name of the key pub key_pattern: String, @@ -391,14 +390,14 @@ pub struct KeyInfoOpt { pub show_secret: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyNewOpt { /// Name of the key #[structopt(default_value = "Unnamed key")] pub name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyRenameOpt { /// ID or name of the key pub key_pattern: String, @@ -407,7 +406,7 @@ pub struct KeyRenameOpt { pub new_name: String, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyDeleteOpt { /// ID or name of the key pub key_pattern: String, @@ -417,7 +416,7 @@ pub struct KeyDeleteOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyPermOpt { /// ID or name of the key pub key_pattern: String, @@ -427,7 +426,7 @@ pub struct KeyPermOpt { pub create_bucket: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug)] +#[derive(StructOpt, Debug)] pub struct KeyImportOpt { /// Access key ID pub key_id: String, @@ -444,7 +443,7 @@ pub struct KeyImportOpt { pub yes: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct RepairOpt { /// Launch repair operation on all nodes #[structopt(short = "a", long = "all-nodes")] @@ -458,7 +457,7 @@ pub struct RepairOpt { pub what: RepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum RepairWhat { /// Do a full sync of metadata tables #[structopt(name = "tables", version = garage_version())] @@ -489,7 +488,7 @@ pub enum RepairWhat { Rebalance, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum ScrubCmd { /// Start scrub #[structopt(name = "start", version = garage_version())] @@ -503,15 +502,9 @@ pub enum ScrubCmd { /// Cancel scrub in progress #[structopt(name = "cancel", version = garage_version())] Cancel, - /// Set tranquility level for in-progress and future scrubs - #[structopt(name = "set-tranquility", version = garage_version())] - SetTranquility { - #[structopt()] - tranquility: u32, - }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct OfflineRepairOpt { /// Confirm the launch of the repair operation #[structopt(long = "yes")] @@ -521,7 +514,7 @@ pub struct OfflineRepairOpt { pub what: OfflineRepairWhat, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum OfflineRepairWhat { /// Repair K2V item counters #[cfg(feature = "k2v")] @@ -532,19 +525,14 @@ pub enum OfflineRepairWhat { ObjectCounters, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)] +#[derive(StructOpt, Debug, Clone)] pub struct StatsOpt { /// Gather statistics from all nodes #[structopt(short = "a", long = "all-nodes")] pub all_nodes: bool, - - /// Don't show global cluster stats (internal use in RPC) - #[structopt(skip)] - #[serde(default)] - pub skip_global: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum WorkerOperation { /// List all workers on Garage node #[structopt(name = "list", version = garage_version())] @@ -577,7 +565,7 @@ pub enum WorkerOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub struct WorkerListOpt { /// Show only busy workers #[structopt(short = "b", long = "busy")] @@ -587,7 +575,7 @@ pub struct WorkerListOpt { pub errors: bool, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone)] pub enum BlockOperation { /// List all blocks that currently have a resync error #[structopt(name = "list-errors", version = garage_version())] @@ -619,7 +607,7 @@ pub enum BlockOperation { }, } -#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone, Copy)] +#[derive(StructOpt, Debug, Eq, PartialEq, Clone, Copy)] pub enum MetaOperation { /// Save a snapshot of the metadata db file #[structopt(name = "snapshot", version = garage_version())] diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs deleted file mode 100644 index a3a1480e..00000000 --- a/src/garage/cli/util.rs +++ /dev/null @@ -1,216 +0,0 @@ -use std::collections::HashMap; -use std::time::Duration; - -use format_table::format_table; -use garage_util::background::*; -use garage_util::data::*; -use garage_util::time::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::*; - -use crate::cli::structs::WorkerListOpt; - -pub fn print_worker_list(wi: HashMap, wlo: WorkerListOpt) { - let mut wi = wi.into_iter().collect::>(); - wi.sort_by_key(|(tid, info)| { - ( - match info.state { - WorkerState::Busy | WorkerState::Throttled(_) => 0, - WorkerState::Idle => 1, - WorkerState::Done => 2, - }, - *tid, - ) - }); - - let mut table = vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; - for (tid, info) in wi.iter() { - if wlo.busy && !matches!(info.state, WorkerState::Busy | WorkerState::Throttled(_)) { - continue; - } - if wlo.errors && info.errors == 0 { - continue; - } - - let tf = timeago::Formatter::new(); - let err_ago = info - .last_error - .as_ref() - .map(|(_, t)| tf.convert(Duration::from_millis(now_msec() - t))) - .unwrap_or_default(); - let (total_err, consec_err) = if info.errors > 0 { - (info.errors.to_string(), info.consecutive_errors.to_string()) - } else { - ("-".into(), "-".into()) - }; - - table.push(format!( - "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", - tid, - info.state, - info.name, - info.status - .tranquility - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "-".into()), - info.status.progress.as_deref().unwrap_or("-"), - info.status - .queue_length - .as_ref() - .map(ToString::to_string) - .unwrap_or_else(|| "-".into()), - total_err, - consec_err, - err_ago, - )); - } - format_table(table); -} - -pub fn print_worker_info(tid: usize, info: WorkerInfo) { - let mut table = vec![]; - table.push(format!("Task id:\t{}", tid)); - table.push(format!("Worker name:\t{}", info.name)); - match info.state { - WorkerState::Throttled(t) => { - table.push(format!( - "Worker state:\tBusy (throttled, paused for {:.3}s)", - t - )); - } - s => { - table.push(format!("Worker state:\t{}", s)); - } - }; - if let Some(tql) = info.status.tranquility { - table.push(format!("Tranquility:\t{}", tql)); - } - - table.push("".into()); - table.push(format!("Total errors:\t{}", info.errors)); - table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); - if let Some((s, t)) = info.last_error { - table.push(format!("Last error:\t{}", s)); - let tf = timeago::Formatter::new(); - table.push(format!( - "Last error time:\t{}", - tf.convert(Duration::from_millis(now_msec() - t)) - )); - } - - table.push("".into()); - if let Some(p) = info.status.progress { - table.push(format!("Progress:\t{}", p)); - } - if let Some(ql) = info.status.queue_length { - table.push(format!("Queue length:\t{}", ql)); - } - if let Some(pe) = info.status.persistent_errors { - table.push(format!("Persistent errors:\t{}", pe)); - } - - for (i, s) in info.status.freeform.iter().enumerate() { - if i == 0 { - if table.last() != Some(&"".into()) { - table.push("".into()); - } - table.push(format!("Message:\t{}", s)); - } else { - table.push(format!("\t{}", s)); - } - } - format_table(table); -} - -pub fn print_worker_vars(wv: Vec<(Uuid, String, String)>) { - let table = wv - .into_iter() - .map(|(n, k, v)| format!("{:?}\t{}\t{}", n, k, v)) - .collect::>(); - format_table(table); -} - -pub fn print_block_error_list(el: Vec) { - let now = now_msec(); - let tf = timeago::Formatter::new(); - let mut tf2 = timeago::Formatter::new(); - tf2.ago(""); - - let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; - for e in el { - let next_try = if e.next_try > now { - tf2.convert(Duration::from_millis(e.next_try - now)) - } else { - "asap".to_string() - }; - table.push(format!( - "{}\t{}\t{}\t{}\tin {}", - hex::encode(e.hash.as_slice()), - e.refcount, - e.error_count, - tf.convert(Duration::from_millis(now - e.last_try)), - next_try - )); - } - format_table(table); -} - -pub fn print_block_info( - hash: Hash, - refcount: u64, - versions: Vec>, - uploads: Vec, -) { - println!("Block hash: {}", hex::encode(hash.as_slice())); - println!("Refcount: {}", refcount); - println!(); - - let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; - let mut nondeleted_count = 0; - for v in versions.iter() { - match v { - Ok(ver) => { - match &ver.backlink { - VersionBacklink::Object { bucket_id, key } => { - table.push(format!( - "{:?}\t{:?}\t{}\t\t{:?}", - ver.uuid, - bucket_id, - key, - ver.deleted.get() - )); - } - VersionBacklink::MultipartUpload { upload_id } => { - let upload = uploads.iter().find(|x| x.upload_id == *upload_id); - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}\t{:?}", - ver.uuid, - upload.map(|u| u.bucket_id).unwrap_or_default(), - upload.map(|u| u.key.as_str()).unwrap_or_default(), - upload_id, - ver.deleted.get() - )); - } - } - if !ver.deleted.get() { - nondeleted_count += 1; - } - } - Err(vh) => { - table.push(format!("{:?}\t\t\t\tyes", vh)); - } - } - } - format_table(table); - - if refcount != nondeleted_count { - println!(); - println!( - "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." - ); - } -} diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs new file mode 100644 index 00000000..bfc0db4a --- /dev/null +++ b/src/garage/cli_v2/block.rs @@ -0,0 +1,145 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> { + match cmd { + BlockOperation::ListErrors => self.cmd_list_block_errors().await, + BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await, + BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await, + BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await, + } + } + + pub async fn cmd_list_block_errors(&self) -> Result<(), Error> { + let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0; + + let tf = timeago::Formatter::new(); + let mut tf2 = timeago::Formatter::new(); + tf2.ago(""); + + let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; + for e in errors { + let next_try = if e.next_try_in_secs > 0 { + tf2.convert(Duration::from_secs(e.next_try_in_secs)) + } else { + "asap".to_string() + }; + table.push(format!( + "{}\t{}\t{}\t{}\tin {}", + e.block_hash, + e.refcount, + e.error_count, + tf.convert(Duration::from_secs(e.last_try_secs_ago)), + next_try + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetBlockInfoRequest { block_hash: hash }) + .await?; + + println!("Block hash: {}", info.block_hash); + println!("Refcount: {}", info.refcount); + println!(); + + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; + let mut nondeleted_count = 0; + for ver in info.versions.iter() { + match &ver.backlink { + Some(BlockVersionBacklink::Object { bucket_id, key }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t\t{:?}", + ver.version_id, bucket_id, key, ver.deleted + )); + } + Some(BlockVersionBacklink::Upload { + upload_id, + upload_deleted: _, + upload_garbage_collected: _, + bucket_id, + key, + }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}", + ver.version_id, + bucket_id.as_deref().unwrap_or(""), + key.as_deref().unwrap_or(""), + upload_id, + ver.deleted + )); + } + None => { + table.push(format!("{:.16}\t\t\tyes", ver.version_id)); + } + } + if !ver.deleted { + nondeleted_count += 1; + } + } + format_table(table); + + if info.refcount != nondeleted_count { + println!(); + println!( + "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." + ); + } + + Ok(()) + } + + pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec) -> Result<(), Error> { + let req = match (all, blocks.len()) { + (true, 0) => LocalRetryBlockResyncRequest::All { all: true }, + (false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks { + block_hashes: blocks, + }, + _ => { + return Err(Error::Message( + "Please specify block hashes or --all (not both)".into(), + )) + } + }; + + let res = self.local_api_request(req).await?; + + println!( + "{} blocks returned in queue for a retry now (check logs to see results)", + res.count + ); + + Ok(()) + } + + pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec) -> Result<(), Error> { + if !yes { + return Err(Error::Message( + "Pass the --yes flag to confirm block purge operation.".into(), + )); + } + + let res = self + .local_api_request(LocalPurgeBlocksRequest(blocks)) + .await?; + + println!( + "Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads", + res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted, + ); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/bucket.rs b/src/garage/cli_v2/bucket.rs index ee3b6800..c25c2c3e 100644 --- a/src/garage/cli_v2/bucket.rs +++ b/src/garage/cli_v2/bucket.rs @@ -5,7 +5,6 @@ use garage_util::error::*; use garage_api_admin::api::*; -use crate::cli as cli_v1; use crate::cli::structs::*; use crate::cli_v2::*; @@ -22,15 +21,9 @@ impl Cli { BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await, BucketOperation::Website(query) => self.cmd_bucket_website(query).await, BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await, - - // TODO - x => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BucketOperation(x), - ) - .await - .ok_or_message("old error"), + BucketOperation::CleanupIncompleteUploads(query) => { + self.cmd_cleanup_incomplete_uploads(query).await + } } } @@ -520,4 +513,37 @@ impl Cli { Ok(()) } + + pub async fn cmd_cleanup_incomplete_uploads( + &self, + opt: CleanupIncompleteUploadsOpt, + ) -> Result<(), Error> { + let older_than = parse_duration::parse::parse(&opt.older_than) + .ok_or_message("Invalid duration passed for --older-than parameter")?; + + for b in opt.buckets.iter() { + let bucket = self + .api_request(GetBucketInfoRequest { + id: None, + global_alias: None, + search: Some(b.clone()), + }) + .await?; + + let res = self + .api_request(CleanupIncompleteUploadsRequest { + bucket_id: bucket.id.clone(), + older_than_secs: older_than.as_secs(), + }) + .await?; + + if res.uploads_deleted > 0 { + println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted); + } else { + println!("{:.16}: no uploads deleted", bucket.id); + } + } + + Ok(()) + } } diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index 6cc13b2d..28c7c824 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -3,6 +3,10 @@ pub mod cluster; pub mod key; pub mod layout; +pub mod block; +pub mod node; +pub mod worker; + use std::convert::TryFrom; use std::sync::Arc; use std::time::Duration; @@ -13,16 +17,14 @@ use garage_rpc::system::*; use garage_rpc::*; use garage_api_admin::api::*; -use garage_api_admin::EndpointHandler as AdminApiEndpoint; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, AdminRpcResponse as ProxyRpcResponse}; +use garage_api_admin::RequestHandler; -use crate::admin::*; -use crate::cli as cli_v1; use crate::cli::structs::*; -use crate::cli::Command; pub struct Cli { pub system_rpc_endpoint: Arc>, - pub admin_rpc_endpoint: Arc>, + pub proxy_rpc_endpoint: Arc>, pub rpc_host: NodeID, } @@ -36,63 +38,35 @@ impl Cli { Command::Layout(layout_opt) => self.layout_command_dispatch(layout_opt).await, Command::Bucket(bo) => self.cmd_bucket(bo).await, Command::Key(ko) => self.cmd_key(ko).await, - - // TODO - Command::Repair(ro) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::LaunchRepair(ro), - ) - .await - .ok_or_message("cli_v1"), - Command::Stats(so) => { - cli_v1::cmd_admin(&self.admin_rpc_endpoint, self.rpc_host, AdminRpc::Stats(so)) - .await - .ok_or_message("cli_v1") - } - Command::Worker(wo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::Worker(wo), - ) - .await - .ok_or_message("cli_v1"), - Command::Block(bo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BlockOperation(bo), - ) - .await - .ok_or_message("cli_v1"), - Command::Meta(mo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::MetaOperation(mo), - ) - .await - .ok_or_message("cli_v1"), + Command::Worker(wo) => self.cmd_worker(wo).await, + Command::Block(bo) => self.cmd_block(bo).await, + Command::Meta(mo) => self.cmd_meta(mo).await, + Command::Stats(so) => self.cmd_stats(so).await, + Command::Repair(ro) => self.cmd_repair(ro).await, _ => unreachable!(), } } - pub async fn api_request(&self, req: T) -> Result<::Response, Error> + pub async fn api_request(&self, req: T) -> Result<::Response, Error> where - T: AdminApiEndpoint, + T: RequestHandler, AdminApiRequest: From, - ::Response: TryFrom, + ::Response: TryFrom, { let req = AdminApiRequest::from(req); let req_name = req.name(); match self - .admin_rpc_endpoint - .call(&self.rpc_host, AdminRpc::ApiRequest(req), PRIO_NORMAL) - .await? - .ok_or_message("rpc")? + .proxy_rpc_endpoint + .call(&self.rpc_host, ProxyRpc::Proxy(req), PRIO_NORMAL) + .await?? { - AdminRpc::ApiOkResponse(resp) => ::Response::try_from(resp) - .map_err(|_| Error::Message(format!("{} returned unexpected response", req_name))), - AdminRpc::ApiErrorResponse { + ProxyRpcResponse::ProxyApiOkResponse(resp) => { + ::Response::try_from(resp).map_err(|_| { + Error::Message(format!("{} returned unexpected response", req_name)) + }) + } + ProxyRpcResponse::ApiErrorResponse { http_code, error_code, message, @@ -103,4 +77,32 @@ impl Cli { m => Err(Error::unexpected_rpc_message(m)), } } + + pub async fn local_api_request( + &self, + req: T, + ) -> Result<::Response, Error> + where + T: RequestHandler, + MultiRequest: RequestHandler::Response>>, + AdminApiRequest: From>, + as RequestHandler>::Response: TryFrom, + { + let req = MultiRequest { + node: hex::encode(self.rpc_host), + body: req, + }; + let resp = self.api_request(req).await?; + + if let Some((_, e)) = resp.error.into_iter().next() { + return Err(Error::Message(e)); + } + if resp.success.len() != 1 { + return Err(Error::Message(format!( + "{} responses returned, expected 1", + resp.success.len() + ))); + } + Ok(resp.success.into_iter().next().unwrap().1) + } } diff --git a/src/garage/cli_v2/node.rs b/src/garage/cli_v2/node.rs new file mode 100644 index 00000000..c5d0cdea --- /dev/null +++ b/src/garage/cli_v2/node.rs @@ -0,0 +1,113 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_meta(&self, cmd: MetaOperation) -> Result<(), Error> { + let MetaOperation::Snapshot { all } = cmd; + + let res = self + .api_request(CreateMetadataSnapshotRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalCreateMetadataSnapshotRequest, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tSnapshot created", node)); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_stats(&self, cmd: StatsOpt) -> Result<(), Error> { + let res = self + .api_request(GetNodeStatisticsRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetNodeStatisticsRequest, + }) + .await?; + + for (node, res) in res.success.iter() { + println!("======================"); + println!("Stats for node {:.16}:\n", node); + println!("{}\n", res.freeform); + } + + for (node, err) in res.error.iter() { + println!("======================"); + println!("Node {:.16}: error: {}\n", node, err); + } + + let res = self.api_request(GetClusterStatisticsRequest).await?; + println!("======================"); + println!("Cluster statistics:\n"); + println!("{}\n", res.freeform); + + Ok(()) + } + + pub async fn cmd_repair(&self, cmd: RepairOpt) -> Result<(), Error> { + if !cmd.yes { + return Err(Error::Message( + "Please add --yes to start the repair operation".into(), + )); + } + + let repair_type = match cmd.what { + RepairWhat::Tables => RepairType::Tables, + RepairWhat::Blocks => RepairType::Blocks, + RepairWhat::Versions => RepairType::Versions, + RepairWhat::MultipartUploads => RepairType::MultipartUploads, + RepairWhat::BlockRefs => RepairType::BlockRefs, + RepairWhat::BlockRc => RepairType::BlockRc, + RepairWhat::Rebalance => RepairType::Rebalance, + RepairWhat::Scrub { cmd } => RepairType::Scrub(match cmd { + ScrubCmd::Start => ScrubCommand::Start, + ScrubCmd::Cancel => ScrubCommand::Cancel, + ScrubCmd::Pause => ScrubCommand::Pause, + ScrubCmd::Resume => ScrubCommand::Resume, + }), + }; + + let res = self + .api_request(LaunchRepairOperationRequest { + node: if cmd.all_nodes { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalLaunchRepairOperationRequest { repair_type }, + }) + .await?; + + let mut table = vec![]; + for (node, err) in res.error.iter() { + table.push(format!("{:.16}\tError: {}", node, err)); + } + for (node, _) in res.success.iter() { + table.push(format!("{:.16}\tRepair launched", node)); + } + format_table(table); + + Ok(()) + } +} diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs new file mode 100644 index 00000000..9c248a39 --- /dev/null +++ b/src/garage/cli_v2/worker.rs @@ -0,0 +1,213 @@ +use format_table::format_table; + +use garage_util::error::*; + +use garage_api_admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_worker(&self, cmd: WorkerOperation) -> Result<(), Error> { + match cmd { + WorkerOperation::List { opt } => self.cmd_list_workers(opt).await, + WorkerOperation::Info { tid } => self.cmd_worker_info(tid).await, + WorkerOperation::Get { + all_nodes, + variable, + } => self.cmd_get_var(all_nodes, variable).await, + WorkerOperation::Set { + all_nodes, + variable, + value, + } => self.cmd_set_var(all_nodes, variable, value).await, + } + } + + pub async fn cmd_list_workers(&self, opt: WorkerListOpt) -> Result<(), Error> { + let mut list = self + .local_api_request(LocalListWorkersRequest { + busy_only: opt.busy, + error_only: opt.errors, + }) + .await? + .0; + + list.sort_by_key(|info| { + ( + match info.state { + WorkerStateResp::Busy | WorkerStateResp::Throttled { .. } => 0, + WorkerStateResp::Idle => 1, + WorkerStateResp::Done => 2, + }, + info.id, + ) + }); + + let mut table = + vec!["TID\tState\tName\tTranq\tDone\tQueue\tErrors\tConsec\tLast".to_string()]; + let tf = timeago::Formatter::new(); + for info in list.iter() { + let err_ago = info + .last_error + .as_ref() + .map(|x| tf.convert(Duration::from_secs(x.secs_ago))) + .unwrap_or_default(); + let (total_err, consec_err) = if info.errors > 0 { + (info.errors.to_string(), info.consecutive_errors.to_string()) + } else { + ("-".into(), "-".into()) + }; + + table.push(format!( + "{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}\t{}", + info.id, + format_worker_state(&info.state), + info.name, + info.tranquility + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + info.progress.as_deref().unwrap_or("-"), + info.queue_length + .as_ref() + .map(ToString::to_string) + .unwrap_or_else(|| "-".into()), + total_err, + consec_err, + err_ago, + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_worker_info(&self, tid: usize) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetWorkerInfoRequest { id: tid as u64 }) + .await? + .0; + + let mut table = vec![]; + table.push(format!("Task id:\t{}", info.id)); + table.push(format!("Worker name:\t{}", info.name)); + match &info.state { + WorkerStateResp::Throttled { duration_secs } => { + table.push(format!( + "Worker state:\tBusy (throttled, paused for {:.3}s)", + duration_secs + )); + } + s => { + table.push(format!("Worker state:\t{}", format_worker_state(s))); + } + }; + if let Some(tql) = info.tranquility { + table.push(format!("Tranquility:\t{}", tql)); + } + + table.push("".into()); + table.push(format!("Total errors:\t{}", info.errors)); + table.push(format!("Consecutive errs:\t{}", info.consecutive_errors)); + if let Some(err) = info.last_error { + table.push(format!("Last error:\t{}", err.message)); + let tf = timeago::Formatter::new(); + table.push(format!( + "Last error time:\t{}", + tf.convert(Duration::from_secs(err.secs_ago)) + )); + } + + table.push("".into()); + if let Some(p) = info.progress { + table.push(format!("Progress:\t{}", p)); + } + if let Some(ql) = info.queue_length { + table.push(format!("Queue length:\t{}", ql)); + } + if let Some(pe) = info.persistent_errors { + table.push(format!("Persistent errors:\t{}", pe)); + } + + for (i, s) in info.freeform.iter().enumerate() { + if i == 0 { + if table.last() != Some(&"".into()) { + table.push("".into()); + } + table.push(format!("Message:\t{}", s)); + } else { + table.push(format!("\t{}", s)); + } + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_var(&self, all: bool, var: Option) -> Result<(), Error> { + let res = self + .api_request(GetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalGetWorkerVariableRequest { variable: var }, + }) + .await?; + + let mut table = vec![]; + for (node, vars) in res.success.iter() { + for (key, val) in vars.0.iter() { + table.push(format!("{:.16}\t{}\t{}", node, key, val)); + } + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } + + pub async fn cmd_set_var( + &self, + all: bool, + variable: String, + value: String, + ) -> Result<(), Error> { + let res = self + .api_request(SetWorkerVariableRequest { + node: if all { + "*".to_string() + } else { + hex::encode(self.rpc_host) + }, + body: LocalSetWorkerVariableRequest { variable, value }, + }) + .await?; + + let mut table = vec![]; + for (node, kv) in res.success.iter() { + table.push(format!("{:.16}\t{}\t{}", node, kv.variable, kv.value)); + } + format_table(table); + + for (node, err) in res.error.iter() { + eprintln!("{:.16}: error: {}", node, err); + } + + Ok(()) + } +} + +fn format_worker_state(s: &WorkerStateResp) -> &'static str { + match s { + WorkerStateResp::Busy => "Busy", + WorkerStateResp::Throttled { .. } => "Busy*", + WorkerStateResp::Idle => "Idle", + WorkerStateResp::Done => "Done", + } +} diff --git a/src/garage/main.rs b/src/garage/main.rs index 08c7cee7..2a88d760 100644 --- a/src/garage/main.rs +++ b/src/garage/main.rs @@ -4,10 +4,8 @@ #[macro_use] extern crate tracing; -mod admin; mod cli; mod cli_v2; -mod repair; mod secrets; mod server; #[cfg(feature = "telemetry-otlp")] @@ -35,8 +33,9 @@ use garage_util::error::*; use garage_rpc::system::*; use garage_rpc::*; -use admin::*; -use cli::*; +use garage_api_admin::api_server::{AdminRpc as ProxyRpc, ADMIN_RPC_PATH as PROXY_RPC_PATH}; + +use cli::structs::*; use secrets::Secrets; #[derive(StructOpt, Debug)] @@ -144,13 +143,13 @@ async fn main() { let res = match opt.cmd { Command::Server => server::run_server(opt.config_file, opt.secrets).await, Command::OfflineRepair(repair_opt) => { - repair::offline::offline_repair(opt.config_file, opt.secrets, repair_opt).await + cli::repair::offline_repair(opt.config_file, opt.secrets, repair_opt).await } Command::ConvertDb(conv_opt) => { cli::convert_db::do_conversion(conv_opt).map_err(From::from) } Command::Node(NodeOperation::NodeId(node_id_opt)) => { - node_id_command(opt.config_file, node_id_opt.quiet) + cli::init::node_id_command(opt.config_file, node_id_opt.quiet) } _ => cli_command(opt).await, }; @@ -251,7 +250,7 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { (id, addrs[0], false) } else { let node_id = garage_rpc::system::read_node_id(&config.as_ref().unwrap().metadata_dir) - .err_context(READ_KEY_ERROR)?; + .err_context(cli::init::READ_KEY_ERROR)?; if let Some(a) = config.as_ref().and_then(|c| c.rpc_public_addr.as_ref()) { use std::net::ToSocketAddrs; let a = a @@ -281,11 +280,11 @@ async fn cli_command(opt: Opt) -> Result<(), Error> { } let system_rpc_endpoint = netapp.endpoint::(SYSTEM_RPC_PATH.into()); - let admin_rpc_endpoint = netapp.endpoint::(ADMIN_RPC_PATH.into()); + let proxy_rpc_endpoint = netapp.endpoint::(PROXY_RPC_PATH.into()); let cli = cli_v2::Cli { system_rpc_endpoint, - admin_rpc_endpoint, + proxy_rpc_endpoint, rpc_host: id, }; diff --git a/src/garage/repair/mod.rs b/src/garage/repair/mod.rs deleted file mode 100644 index 4699ace5..00000000 --- a/src/garage/repair/mod.rs +++ /dev/null @@ -1,2 +0,0 @@ -pub mod offline; -pub mod online; diff --git a/src/garage/server.rs b/src/garage/server.rs index 9e58fa6d..131cc8aa 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -14,7 +14,6 @@ use garage_web::WebServer; #[cfg(feature = "k2v")] use garage_api_k2v::api_server::K2VApiServer; -use crate::admin::*; use crate::secrets::{fill_secrets, Secrets}; #[cfg(feature = "telemetry-otlp")] use crate::tracing_setup::*; @@ -66,6 +65,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Initialize Admin API server and metrics collector..."); let admin_server = AdminApiServer::new( garage.clone(), + background.clone(), #[cfg(feature = "metrics")] metrics_exporter, ); @@ -73,9 +73,6 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er info!("Launching internal Garage cluster communications..."); let run_system = tokio::spawn(garage.system.clone().run(watch_cancel.clone())); - info!("Create admin RPC handler..."); - AdminRpcHandler::new(garage.clone(), background.clone()); - // ---- Launch public-facing API servers ---- let mut servers = vec![]; diff --git a/src/util/background/mod.rs b/src/util/background/mod.rs index 607cd7a3..cae3a462 100644 --- a/src/util/background/mod.rs +++ b/src/util/background/mod.rs @@ -6,7 +6,6 @@ pub mod worker; use std::collections::HashMap; use std::sync::Arc; -use serde::{Deserialize, Serialize}; use tokio::sync::{mpsc, watch}; use worker::WorkerProcessor; @@ -18,7 +17,7 @@ pub struct BackgroundRunner { worker_info: Arc>>, } -#[derive(Clone, Serialize, Deserialize, Debug)] +#[derive(Clone, Debug)] pub struct WorkerInfo { pub name: String, pub status: WorkerStatus, @@ -30,7 +29,7 @@ pub struct WorkerInfo { /// WorkerStatus is a struct returned by the worker with a bunch of canonical /// fields to indicate their status to CLI users. All fields are optional. -#[derive(Clone, Serialize, Deserialize, Debug, Default)] +#[derive(Clone, Debug, Default)] pub struct WorkerStatus { pub tranquility: Option, pub progress: Option, diff --git a/src/util/background/worker.rs b/src/util/background/worker.rs index 76fb14e8..9028a052 100644 --- a/src/util/background/worker.rs +++ b/src/util/background/worker.rs @@ -6,7 +6,6 @@ use async_trait::async_trait; use futures::future::*; use futures::stream::FuturesUnordered; use futures::StreamExt; -use serde::{Deserialize, Serialize}; use tokio::select; use tokio::sync::{mpsc, watch}; @@ -18,7 +17,7 @@ use crate::time::now_msec; // will be interrupted in the middle of whatever they are doing. const EXIT_DEADLINE: Duration = Duration::from_secs(8); -#[derive(PartialEq, Copy, Clone, Serialize, Deserialize, Debug)] +#[derive(PartialEq, Copy, Clone, Debug)] pub enum WorkerState { Busy, Throttled(f32), @@ -26,17 +25,6 @@ pub enum WorkerState { Done, } -impl std::fmt::Display for WorkerState { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - match self { - WorkerState::Busy => write!(f, "Busy"), - WorkerState::Throttled(_) => write!(f, "Busy*"), - WorkerState::Idle => write!(f, "Idle"), - WorkerState::Done => write!(f, "Done"), - } - } -} - #[async_trait] pub trait Worker: Send { fn name(&self) -> String;