cli_v2: implement RetryBlockResync and PurgeBlocks
This commit is contained in:
parent
af648b4fae
commit
1533084241
6 changed files with 212 additions and 165 deletions
|
@ -85,6 +85,8 @@ admin_endpoints![
|
|||
// Block operations
|
||||
ListBlockErrors,
|
||||
GetBlockInfo,
|
||||
RetryBlockResync,
|
||||
PurgeBlocks,
|
||||
];
|
||||
|
||||
local_admin_endpoints![
|
||||
|
@ -96,6 +98,8 @@ local_admin_endpoints![
|
|||
// Block operations
|
||||
ListBlockErrors,
|
||||
GetBlockInfo,
|
||||
RetryBlockResync,
|
||||
PurgeBlocks,
|
||||
];
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
|
@ -764,3 +768,35 @@ pub enum BlockVersionBacklink {
|
|||
key: Option<String>,
|
||||
},
|
||||
}
|
||||
|
||||
// ---- RetryBlockResync ----
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(untagged)]
|
||||
pub enum LocalRetryBlockResyncRequest {
|
||||
#[serde(rename_all = "camelCase")]
|
||||
All { all: bool },
|
||||
#[serde(rename_all = "camelCase")]
|
||||
Blocks { block_hashes: Vec<String> },
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LocalRetryBlockResyncResponse {
|
||||
pub count: u64,
|
||||
}
|
||||
|
||||
// ---- PurgeBlocks ----
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LocalPurgeBlocksRequest(pub Vec<String>);
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LocalPurgeBlocksResponse {
|
||||
pub blocks_purged: u64,
|
||||
pub objects_deleted: u64,
|
||||
pub uploads_deleted: u64,
|
||||
pub versions_deleted: u64,
|
||||
}
|
||||
|
|
|
@ -9,6 +9,7 @@ use garage_util::time::now_msec;
|
|||
use garage_table::EmptyKey;
|
||||
|
||||
use garage_model::garage::Garage;
|
||||
use garage_model::s3::object_table::*;
|
||||
use garage_model::s3::version_table::*;
|
||||
|
||||
use crate::admin::api::*;
|
||||
|
@ -107,6 +108,89 @@ impl RequestHandler for LocalGetBlockInfoRequest {
|
|||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RequestHandler for LocalRetryBlockResyncRequest {
|
||||
type Response = LocalRetryBlockResyncResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<LocalRetryBlockResyncResponse, Error> {
|
||||
match self {
|
||||
Self::All { all: true } => {
|
||||
let blocks = garage.block_manager.list_resync_errors()?;
|
||||
for b in blocks.iter() {
|
||||
garage.block_manager.resync.clear_backoff(&b.hash)?;
|
||||
}
|
||||
Ok(LocalRetryBlockResyncResponse {
|
||||
count: blocks.len() as u64,
|
||||
})
|
||||
}
|
||||
Self::All { all: false } => Err(Error::bad_request("nonsense")),
|
||||
Self::Blocks { block_hashes } => {
|
||||
for hash in block_hashes.iter() {
|
||||
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
|
||||
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
|
||||
garage.block_manager.resync.clear_backoff(&hash)?;
|
||||
}
|
||||
Ok(LocalRetryBlockResyncResponse {
|
||||
count: block_hashes.len() as u64,
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl RequestHandler for LocalPurgeBlocksRequest {
|
||||
type Response = LocalPurgeBlocksResponse;
|
||||
|
||||
async fn handle(
|
||||
self,
|
||||
garage: &Arc<Garage>,
|
||||
_admin: &Admin,
|
||||
) -> Result<LocalPurgeBlocksResponse, Error> {
|
||||
let mut obj_dels = 0;
|
||||
let mut mpu_dels = 0;
|
||||
let mut ver_dels = 0;
|
||||
|
||||
for hash in self.0.iter() {
|
||||
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
|
||||
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
|
||||
let block_refs = garage
|
||||
.block_ref_table
|
||||
.get_range(&hash, None, None, 10000, Default::default())
|
||||
.await?;
|
||||
|
||||
for br in block_refs {
|
||||
if let Some(version) = garage.version_table.get(&br.version, &EmptyKey).await? {
|
||||
handle_block_purge_version_backlink(
|
||||
garage,
|
||||
&version,
|
||||
&mut obj_dels,
|
||||
&mut mpu_dels,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if !version.deleted.get() {
|
||||
let deleted_version = Version::new(version.uuid, version.backlink, true);
|
||||
garage.version_table.insert(&deleted_version).await?;
|
||||
ver_dels += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(LocalPurgeBlocksResponse {
|
||||
blocks_purged: self.0.len() as u64,
|
||||
versions_deleted: ver_dels,
|
||||
objects_deleted: obj_dels,
|
||||
uploads_deleted: mpu_dels,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
|
||||
if prefix.len() < 4 {
|
||||
return Err(Error::bad_request(
|
||||
|
@ -147,3 +231,49 @@ fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash,
|
|||
|
||||
found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
|
||||
}
|
||||
|
||||
async fn handle_block_purge_version_backlink(
|
||||
garage: &Arc<Garage>,
|
||||
version: &Version,
|
||||
obj_dels: &mut u64,
|
||||
mpu_dels: &mut u64,
|
||||
) -> Result<(), Error> {
|
||||
let (bucket_id, key, ov_id) = match &version.backlink {
|
||||
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
|
||||
VersionBacklink::MultipartUpload { upload_id } => {
|
||||
if let Some(mut mpu) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
|
||||
if !mpu.deleted.get() {
|
||||
mpu.parts.clear();
|
||||
mpu.deleted.set();
|
||||
garage.mpu_table.insert(&mpu).await?;
|
||||
*mpu_dels += 1;
|
||||
}
|
||||
(mpu.bucket_id, mpu.key.clone(), *upload_id)
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(object) = garage.object_table.get(&bucket_id, &key).await? {
|
||||
let ov = object.versions().iter().rev().find(|v| v.is_complete());
|
||||
if let Some(ov) = ov {
|
||||
if ov.uuid == ov_id {
|
||||
let del_uuid = gen_uuid();
|
||||
let deleted_object = Object::new(
|
||||
bucket_id,
|
||||
key,
|
||||
vec![ObjectVersion {
|
||||
uuid: del_uuid,
|
||||
timestamp: ov.timestamp + 1,
|
||||
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
|
||||
}],
|
||||
);
|
||||
garage.object_table.insert(&deleted_object).await?;
|
||||
*obj_dels += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
@ -66,6 +66,8 @@ impl AdminApiRequest {
|
|||
// Block APIs
|
||||
GET ListBlockErrors (default::body, query::node),
|
||||
POST GetBlockInfo (body_field, query::node),
|
||||
POST RetryBlockResync (body_field, query::node),
|
||||
POST PurgeBlocks (body_field, query::node),
|
||||
]);
|
||||
|
||||
if let Some(message) = query.nonempty_message() {
|
||||
|
|
|
@ -1,153 +0,0 @@
|
|||
use garage_util::data::*;
|
||||
|
||||
use garage_table::*;
|
||||
|
||||
use garage_model::helper::error::{Error, OkOrBadRequest};
|
||||
use garage_model::s3::object_table::*;
|
||||
use garage_model::s3::version_table::*;
|
||||
|
||||
use crate::cli::*;
|
||||
|
||||
use super::*;
|
||||
|
||||
impl AdminRpcHandler {
|
||||
pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
|
||||
match cmd {
|
||||
BlockOperation::RetryNow { all, blocks } => {
|
||||
self.handle_block_retry_now(*all, blocks).await
|
||||
}
|
||||
BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
|
||||
_ => unreachable!(),
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_block_retry_now(
|
||||
&self,
|
||||
all: bool,
|
||||
blocks: &[String],
|
||||
) -> Result<AdminRpc, Error> {
|
||||
if all {
|
||||
if !blocks.is_empty() {
|
||||
return Err(Error::BadRequest(
|
||||
"--all was specified, cannot also specify blocks".into(),
|
||||
));
|
||||
}
|
||||
let blocks = self.garage.block_manager.list_resync_errors()?;
|
||||
for b in blocks.iter() {
|
||||
self.garage.block_manager.resync.clear_backoff(&b.hash)?;
|
||||
}
|
||||
Ok(AdminRpc::Ok(format!(
|
||||
"{} blocks returned in queue for a retry now (check logs to see results)",
|
||||
blocks.len()
|
||||
)))
|
||||
} else {
|
||||
for hash in blocks {
|
||||
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
|
||||
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
|
||||
self.garage.block_manager.resync.clear_backoff(&hash)?;
|
||||
}
|
||||
Ok(AdminRpc::Ok(format!(
|
||||
"{} blocks returned in queue for a retry now (check logs to see results)",
|
||||
blocks.len()
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
async fn handle_block_purge(&self, yes: bool, blocks: &[String]) -> Result<AdminRpc, Error> {
|
||||
if !yes {
|
||||
return Err(Error::BadRequest(
|
||||
"Pass the --yes flag to confirm block purge operation.".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let mut obj_dels = 0;
|
||||
let mut mpu_dels = 0;
|
||||
let mut ver_dels = 0;
|
||||
|
||||
for hash in blocks {
|
||||
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
|
||||
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
|
||||
let block_refs = self
|
||||
.garage
|
||||
.block_ref_table
|
||||
.get_range(&hash, None, None, 10000, Default::default())
|
||||
.await?;
|
||||
|
||||
for br in block_refs {
|
||||
if let Some(version) = self
|
||||
.garage
|
||||
.version_table
|
||||
.get(&br.version, &EmptyKey)
|
||||
.await?
|
||||
{
|
||||
self.handle_block_purge_version_backlink(
|
||||
&version,
|
||||
&mut obj_dels,
|
||||
&mut mpu_dels,
|
||||
)
|
||||
.await?;
|
||||
|
||||
if !version.deleted.get() {
|
||||
let deleted_version = Version::new(version.uuid, version.backlink, true);
|
||||
self.garage.version_table.insert(&deleted_version).await?;
|
||||
ver_dels += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(AdminRpc::Ok(format!(
|
||||
"Purged {} blocks, {} versions, {} objects, {} multipart uploads",
|
||||
blocks.len(),
|
||||
ver_dels,
|
||||
obj_dels,
|
||||
mpu_dels,
|
||||
)))
|
||||
}
|
||||
|
||||
async fn handle_block_purge_version_backlink(
|
||||
&self,
|
||||
version: &Version,
|
||||
obj_dels: &mut usize,
|
||||
mpu_dels: &mut usize,
|
||||
) -> Result<(), Error> {
|
||||
let (bucket_id, key, ov_id) = match &version.backlink {
|
||||
VersionBacklink::Object { bucket_id, key } => (*bucket_id, key.clone(), version.uuid),
|
||||
VersionBacklink::MultipartUpload { upload_id } => {
|
||||
if let Some(mut mpu) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
|
||||
if !mpu.deleted.get() {
|
||||
mpu.parts.clear();
|
||||
mpu.deleted.set();
|
||||
self.garage.mpu_table.insert(&mpu).await?;
|
||||
*mpu_dels += 1;
|
||||
}
|
||||
(mpu.bucket_id, mpu.key.clone(), *upload_id)
|
||||
} else {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
if let Some(object) = self.garage.object_table.get(&bucket_id, &key).await? {
|
||||
let ov = object.versions().iter().rev().find(|v| v.is_complete());
|
||||
if let Some(ov) = ov {
|
||||
if ov.uuid == ov_id {
|
||||
let del_uuid = gen_uuid();
|
||||
let deleted_object = Object::new(
|
||||
bucket_id,
|
||||
key,
|
||||
vec![ObjectVersion {
|
||||
uuid: del_uuid,
|
||||
timestamp: ov.timestamp + 1,
|
||||
state: ObjectVersionState::Complete(ObjectVersionData::DeleteMarker),
|
||||
}],
|
||||
);
|
||||
self.garage.object_table.insert(&deleted_object).await?;
|
||||
*obj_dels += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
|
@ -1,5 +1,3 @@
|
|||
mod block;
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::fmt::Write;
|
||||
use std::sync::Arc;
|
||||
|
@ -32,7 +30,6 @@ pub const ADMIN_RPC_PATH: &str = "garage/admin_rpc.rs/Rpc";
|
|||
pub enum AdminRpc {
|
||||
LaunchRepair(RepairOpt),
|
||||
Stats(StatsOpt),
|
||||
BlockOperation(BlockOperation),
|
||||
MetaOperation(MetaOperation),
|
||||
|
||||
// Replies
|
||||
|
@ -367,7 +364,6 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
|
|||
match message {
|
||||
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
|
||||
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
|
||||
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
|
||||
AdminRpc::MetaOperation(mo) => self.handle_meta_cmd(mo).await,
|
||||
m => Err(GarageError::unexpected_rpc_message(m).into()),
|
||||
}
|
||||
|
|
|
@ -13,14 +13,8 @@ impl Cli {
|
|||
match cmd {
|
||||
BlockOperation::ListErrors => self.cmd_list_block_errors().await,
|
||||
BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
|
||||
|
||||
bo => cli_v1::cmd_admin(
|
||||
&self.admin_rpc_endpoint,
|
||||
self.rpc_host,
|
||||
AdminRpc::BlockOperation(bo),
|
||||
)
|
||||
.await
|
||||
.ok_or_message("cli_v1"),
|
||||
BlockOperation::RetryNow { all, blocks } => self.cmd_block_retry_now(all, blocks).await,
|
||||
BlockOperation::Purge { yes, blocks } => self.cmd_block_purge(yes, blocks).await,
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -106,4 +100,46 @@ impl Cli {
|
|||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_block_retry_now(&self, all: bool, blocks: Vec<String>) -> Result<(), Error> {
|
||||
let req = match (all, blocks.len()) {
|
||||
(true, 0) => LocalRetryBlockResyncRequest::All { all: true },
|
||||
(false, n) if n > 0 => LocalRetryBlockResyncRequest::Blocks {
|
||||
block_hashes: blocks,
|
||||
},
|
||||
_ => {
|
||||
return Err(Error::Message(
|
||||
"Please specify block hashes or --all (not both)".into(),
|
||||
))
|
||||
}
|
||||
};
|
||||
|
||||
let res = self.local_api_request(req).await?;
|
||||
|
||||
println!(
|
||||
"{} blocks returned in queue for a retry now (check logs to see results)",
|
||||
res.count
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn cmd_block_purge(&self, yes: bool, blocks: Vec<String>) -> Result<(), Error> {
|
||||
if !yes {
|
||||
return Err(Error::Message(
|
||||
"Pass the --yes flag to confirm block purge operation.".into(),
|
||||
));
|
||||
}
|
||||
|
||||
let res = self
|
||||
.local_api_request(LocalPurgeBlocksRequest(blocks))
|
||||
.await?;
|
||||
|
||||
println!(
|
||||
"Purged {} blocks: deleted {} versions, {} objects, {} multipart uploads",
|
||||
res.blocks_purged, res.versions_deleted, res.objects_deleted, res.uploads_deleted,
|
||||
);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Add table
Reference in a new issue