Admin API refactoring: convert existing commands to API requests (step 3) #945

Merged
lx merged 12 commits from refactor-admin into next-v2 2025-02-05 19:54:42 +00:00
6 changed files with 212 additions and 165 deletions
Showing only changes of commit b1629dd355 - Show all commits

View file

@ -86,6 +86,8 @@ admin_endpoints![
// Block operations
ListBlockErrors,
GetBlockInfo,
RetryBlockResync,
PurgeBlocks,
];
local_admin_endpoints![
@ -97,6 +99,8 @@ local_admin_endpoints![
// Block operations
ListBlockErrors,
GetBlockInfo,
RetryBlockResync,
PurgeBlocks,
];
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -765,3 +769,35 @@ pub enum BlockVersionBacklink {
key: Option<String>,
},
}
// ---- RetryBlockResync ----
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(untagged)]
pub enum LocalRetryBlockResyncRequest {
#[serde(rename_all = "camelCase")]
All { all: bool },
#[serde(rename_all = "camelCase")]
Blocks { block_hashes: Vec<String> },
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalRetryBlockResyncResponse {
pub count: u64,
}
// ---- PurgeBlocks ----
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalPurgeBlocksRequest(pub Vec<String>);
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalPurgeBlocksResponse {
pub blocks_purged: u64,
pub objects_deleted: u64,
pub uploads_deleted: u64,
pub versions_deleted: u64,
}

View file

@ -9,6 +9,7 @@ use garage_util::time::now_msec;
use garage_table::EmptyKey;
use garage_model::garage::Garage;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use crate::admin::api::*;
@ -107,6 +108,89 @@ impl RequestHandler for LocalGetBlockInfoRequest {
}
}
#[async_trait]
impl RequestHandler for LocalRetryBlockResyncRequest {
type Response = LocalRetryBlockResyncResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalRetryBlockResyncResponse, Error> {
match self {
Self::All { all: true } => {
let blocks = garage.block_manager.list_resync_errors()?;
for b in blocks.iter() {
garage.block_manager.resync.clear_backoff(&b.hash)?;
}
Ok(LocalRetryBlockResyncResponse {
count: blocks.len() as u64,
})
}
Self::All { all: false } => Err(Error::bad_request("nonsense")),
Self::Blocks { block_hashes } => {
for hash in block_hashes.iter() {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
garage.block_manager.resync.clear_backoff(&hash)?;
}
Ok(LocalRetryBlockResyncResponse {
count: block_hashes.len() as u64,
})
}
}
}
}
#[async_trait]
impl RequestHandler for LocalPurgeBlocksRequest {
type Response = LocalPurgeBlocksResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalPurgeBlocksResponse, Error> {
let mut obj_dels = 0;
let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in self.0.iter() {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let block_refs = garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
for br in block_refs {
if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? {
handle_block_purge_version_backlink(
garage,
&version,
&mut obj_dels,
&mut mpu_dels,
)
.await?;
if !version.deleted.get() {
let deleted_version = Version::new(version.uuid, version.backlink, true);
garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
}
Ok(LocalPurgeBlocksResponse {
blocks_purged: self.0.len() as u64,
versions_deleted: ver_dels,
objects_deleted: obj_dels,
uploads_deleted: mpu_dels,
})
}
}
fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
if prefix.len() < 4 {
return Err(Error::bad_request(
@ -147,3 +231,49 @@ fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash,
found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
}
async fn handle_block_purge_version_backlink(
garage: &Arc<Garage>,
version: &Version,
obj_dels: &mut u64,
mpu_dels: &mut u64,
) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = garage.object_table.get(&bucket_id, &key).await? {
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
}

View file

@ -67,6 +67,8 @@ impl AdminApiRequest {
// Block APIs
GET ListBlockErrors (default::body, query::node),
POST GetBlockInfo (body_field, query::node),
POST RetryBlockResync (body_field, query::node),
POST PurgeBlocks (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {

View file

@ -1,153 +0,0 @@
use garage_util::data::*;
use garage_table::*;
use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::*;
use crate::cli::*;
use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
match cmd {
BlockOperation::RetryNow { all, blocks } => {
self.handle_block_retry_now(*all, blocks).await
}
BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
_ => unreachable!(),
}
}
async fn handle_block_retry_now(
&self,
all: bool,
blocks: &[String],
) -> Result<AdminRpc, Error> {
if all {
if !blocks.is_empty() {
return Err(Error::BadRequest(
"--all was specified, cannot also specify blocks".into(),
));
}
let blocks = self.garage.block_manager.list_resync_errors()?;
for b in blocks.iter() {
self.garage.block_manager.resync.clear_backoff(&b.hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
} else {
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
self.garage.block_manager.resync.clear_backoff(&hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
}
}
async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
if !yes {
return Err(Error::BadRequest(
"Pass the --yes flag to confirm block purge operation.".into(),
));
}
let mut obj_dels = 0;
let mut mpu_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
for br in block_refs {
if let Some(version) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
self.handle_block_purge_version_backlink(
&version,
&mut obj_dels,
&mut mpu_dels,
)
.await?;
if !version.deleted.get() {
let deleted_version = Version::new(version.uuid, version.backlink, true);
self.garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
}
Ok(AdminRpc::Ok(format!(
"Purged {} blocks, {} versions, {} objects, {} multipart uploads",
blocks.len(),
ver_dels,
obj_dels,
mpu_dels,
)))
}
async fn handle_block_purge_version_backlink(
&self,
version: &Version,
obj_dels: &mut usize,
mpu_dels: &mut usize,
) -> Result<(), Error> {
let (bucket_id, key, ov_id) = match &version.backlink {
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
if !mpu.deleted.get() {
mpu.parts.clear();
mpu.deleted.set();
self.garage.mpu_table.insert(&mpu).await?;
*mpu_dels += 1;
}
(mpu.bucket_id, mpu.key.clone(), *upload_id)
} else {
return Ok(());
}
}
};
if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == ov_id {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
bucket_id,
key,
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
*obj_dels += 1;
}
}
}
Ok(())
}
}

View file

@ -1,5 +1,3 @@
mod block;
use std::collections::HashMap;
use std::fmt::Write;
use std::sync::Arc;
@ -36,7 +34,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
pub enum AdminRpc {
LaunchRepair(RepairOpt),
Stats(StatsOpt),
BlockOperation(BlockOperation),
MetaOperation(MetaOperation),
// Replies
@ -371,7 +368,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
match message {
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}

View file

@ -13,14 +13,8 @@ impl Cli {
match cmd {
BlockOperation::ListErrors => self.cmd_list_block_errors().await,
BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
bo => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await,
BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await,
}
}
@ -106,4 +100,46 @@ impl Cli {
Ok(())
}
pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> {
let req = match (all, blocks.len()) {
(true, 0) => LocalRetryBlockResyncRequest::All { all: true },
(false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks {
block_hashes: blocks,
},
_ => {
return Err(Error::Message(
"Please specify block hashes or --all (not both)".into(),
))
}
};
let res = self.local_api_request(req).await?;
println!(
"{} blocks returned in queue for a retry now (check logs to see results)",
res.count
);
Ok(())
}
pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> {
if !yes {
return Err(Error::Message(
"Pass the --yes flag to confirm block purge operation.".into(),
));
}
let res = self
.local_api_request(LocalPurgeBlocksRequest(blocks))
.await?;
println!(
"Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads",
res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted,
);
Ok(())
}
}