Admin API refactoring: convert existing commands to API requests (step 3) #945

Merged
lx merged 12 commits from refactor-admin into next-v2 2025-02-05 19:54:42 +00:00
8 changed files with 134 additions and 66 deletions
Showing only changes of commit bdaf55ab3f - Show all commits

View file

@ -826,6 +826,46 @@ paths:
schema:
$ref: '#/components/schemas/BucketInfo'
/CleanupIncompleteUploads:
post:
tags:
- Bucket
operationId: "CleanupIncompleteUploads"
summary: "Cleanup incomplete uploads in a bucket"
description: |
Cleanup all incomplete uploads in a bucket that are older than a specified number of seconds
requestBody:
description: |
Bucket id and minimum age of uploads to delete (in seconds)
required: true
content:
application/json:
schema:
type: object
required: [bucketId, olderThanSecs]
properties:
bucketId:
type: string
example: "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b"
olderThanSecs:
type: integer
example: "3600"
responses:
'500':
description: "The server can not handle your request. Check your connectivity with the rest of the cluster."
'400':
description: "The payload is not formatted correctly"
'200':
description: "The bucket was cleaned up successfully"
content:
application/json:
schema:
type: object
properties:
uploadsDeleted:
type: integer
example: 12
/AllowBucketKey:
post:
tags:

View file

@ -702,6 +702,28 @@ Deletes a storage bucket. A bucket cannot be deleted if it is not empty.
Warning: this will delete all aliases associated with the bucket!
#### CleanupIncompleteUploads `POST /v2/CleanupIncompleteUploads`
Cleanup all incomplete uploads in a bucket that are older than a specified number
of seconds.
Request body format:
```json
{
"bucketId": "e6a14cd6a27f48684579ec6b381c078ab11697e6bc8513b72b2f5307e25fff9b",
"olderThanSecs": 3600
}
```
Response format
```json
{
"uploadsDeleted": 12
}
```
### Operations on permissions for keys on buckets

View file

@ -62,6 +62,7 @@ admin_endpoints![
CreateBucket,
UpdateBucket,
DeleteBucket,
CleanupIncompleteUploads,
// Operations on permissions for keys on buckets
AllowBucketKey,
@ -497,6 +498,19 @@ pub struct DeleteBucketRequest {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DeleteBucketResponse;
// ---- CleanupIncompleteUploads ----
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupIncompleteUploadsRequest {
pub bucket_id: String,
pub older_than_secs: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct CleanupIncompleteUploadsResponse {
pub uploads_deleted: u64,
}
// **********************************************
// Operations on permissions for keys on buckets
// **********************************************

View file

@ -1,5 +1,6 @@
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use async_trait::async_trait;
@ -388,6 +389,26 @@ impl EndpointHandler for UpdateBucketRequest {
}
}
#[async_trait]
impl EndpointHandler for CleanupIncompleteUploadsRequest {
type Response = CleanupIncompleteUploadsResponse;
async fn handle(self, garage: &Arc<Garage>) -> Result<CleanupIncompleteUploadsResponse, Error> {
let duration = Duration::from_secs(self.older_than_secs);
let bucket_id = parse_bucket_id(&self.bucket_id)?;
let count = garage
.bucket_helper()
.cleanup_incomplete_uploads(&bucket_id, duration)
.await?;
Ok(CleanupIncompleteUploadsResponse {
uploads_deleted: count as u64,
})
}
}
// ---- BUCKET/KEY PERMISSIONS ----
#[async_trait]

View file

@ -52,6 +52,7 @@ impl AdminApiRequest {
POST CreateBucket (body),
POST DeleteBucket (query::id),
POST UpdateBucket (body_field, query::id),
POST CleanupIncompleteUploads (body),
// Bucket-key permissions
POST AllowBucketKey (body),
POST DenyBucketKey (body),

View file

@ -1,53 +0,0 @@
use std::fmt::Write;
use garage_model::helper::error::{Error, OkOrBadRequest};
use crate::cli::*;
use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::CleanupIncompleteUploads(query) => {
self.handle_bucket_cleanup_incomplete_uploads(query).await
}
_ => unreachable!(),
}
}
async fn handle_bucket_cleanup_incomplete_uploads(
&self,
query: &CleanupIncompleteUploadsOpt,
) -> Result<AdminRpc, Error> {
let mut bucket_ids = vec![];
for b in query.buckets.iter() {
bucket_ids.push(
self.garage
.bucket_helper()
.admin_get_existing_matching_bucket(b)
.await?,
);
}
let duration = parse_duration::parse::parse(&query.older_than)
.ok_or_bad_request("Invalid duration passed for --older-than parameter")?;
let mut ret = String::new();
for bucket in bucket_ids {
let count = self
.garage
.bucket_helper()
.cleanup_incomplete_uploads(&bucket, duration)
.await?;
writeln!(
&mut ret,
"Bucket {:?}: {} incomplete uploads aborted",
bucket, count
)
.unwrap();
}
Ok(AdminRpc::Ok(ret))
}
}

View file

@ -1,5 +1,4 @@
mod block;
mod bucket;
use std::collections::HashMap;
use std::fmt::Write;
@ -39,7 +38,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
#[derive(Debug, Serialize, Deserialize)]
#[allow(clippy::large_enum_variant)]
pub enum AdminRpc {
BucketOperation(BucketOperation),
LaunchRepair(RepairOpt),
Stats(StatsOpt),
Worker(WorkerOperation),
@ -532,7 +530,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
_from: NodeID,
) -> Result<AdminRpc, Error> {
match message {
AdminRpc::BucketOperation(bo) => self.handle_bucket_cmd(bo).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,

View file

@ -5,7 +5,6 @@ use garage_util::error::*;
use garage_api_admin::api::*;
use crate::cli as cli_v1;
use crate::cli::structs::*;
use crate::cli_v2::*;
@ -22,15 +21,9 @@ impl Cli {
BucketOperation::Deny(query) => self.cmd_bucket_deny(query).await,
BucketOperation::Website(query) => self.cmd_bucket_website(query).await,
BucketOperation::SetQuotas(query) => self.cmd_bucket_set_quotas(query).await,
// TODO
x => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BucketOperation(x),
)
.await
.ok_or_message("old error"),
BucketOperation::CleanupIncompleteUploads(query) => {
self.cmd_cleanup_incomplete_uploads(query).await
}
}
}
@ -520,4 +513,37 @@ impl Cli {
Ok(())
}
pub async fn cmd_cleanup_incomplete_uploads(
&self,
opt: CleanupIncompleteUploadsOpt,
) -> Result<(), Error> {
let older_than = parse_duration::parse::parse(&opt.older_than)
.ok_or_message("Invalid duration passed for --older-than parameter")?;
for b in opt.buckets.iter() {
let bucket = self
.api_request(GetBucketInfoRequest {
id: None,
global_alias: None,
search: Some(b.clone()),
})
.await?;
let res = self
.api_request(CleanupIncompleteUploadsRequest {
bucket_id: bucket.id.clone(),
older_than_secs: older_than.as_secs(),
})
.await?;
if res.uploads_deleted > 0 {
println!("{:.16}: {} uploads deleted", bucket.id, res.uploads_deleted);
} else {
println!("{:.16}: no uploads deleted", bucket.id);
}
}
Ok(())
}
}