cli_v2: implement ListBlockErrors and GetBlockInfo

This commit is contained in:
Alex 2025-01-31 16:53:33 +01:00
parent 7b9c047b11
commit d405a9f839
14 changed files with 346 additions and 210 deletions

View file

@ -82,6 +82,10 @@ admin_endpoints![
GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
// Block operations
ListBlockErrors,
GetBlockInfo,
];
local_admin_endpoints![
@ -90,6 +94,9 @@ local_admin_endpoints![
GetWorkerInfo,
GetWorkerVariable,
SetWorkerVariable,
// Block operations
ListBlockErrors,
GetBlockInfo,
];
#[derive(Debug, Clone, Serialize, Deserialize)]
@ -619,6 +626,7 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse);
// ---- GetWorkerList ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
#[serde(rename_all = "camelCase")]
pub struct LocalListWorkersRequest {
#[serde(default)]
pub busy_only: bool,
@ -694,3 +702,66 @@ pub struct LocalSetWorkerVariableResponse {
pub variable: String,
pub value: String,
}
// **********************************************
// Block operations
// **********************************************
// ---- ListBlockErrors ----
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LocalListBlockErrorsRequest;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LocalListBlockErrorsResponse(pub Vec<BlockError>);
#[derive(Serialize, Deserialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct BlockError {
pub block_hash: String,
pub refcount: u64,
pub error_count: u64,
pub last_try_secs_ago: u64,
pub next_try_in_secs: u64,
}
// ---- GetBlockInfo ----
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalGetBlockInfoRequest {
pub block_hash: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct LocalGetBlockInfoResponse {
pub block_hash: String,
pub refcount: u64,
pub versions: Vec<BlockVersion>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct BlockVersion {
pub version_id: String,
pub deleted: bool,
pub garbage_collected: bool,
pub backlink: Option<BlockVersionBacklink>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub enum BlockVersionBacklink {
Object {
bucket_id: String,
key: String,
},
Upload {
upload_id: String,
upload_deleted: bool,
upload_garbage_collected: bool,
bucket_id: Option<String>,
key: Option<String>,
},
}

149
src/api/admin/block.rs Normal file
View file

@ -0,0 +1,149 @@
use std::sync::Arc;
use async_trait::async_trait;
use garage_util::data::*;
use garage_util::error::Error as GarageError;
use garage_util::time::now_msec;
use garage_table::EmptyKey;
use garage_model::garage::Garage;
use garage_model::s3::version_table::*;
use crate::admin::api::*;
use crate::admin::error::*;
use crate::admin::{Admin, RequestHandler};
use crate::common_error::CommonErrorDerivative;
#[async_trait]
impl RequestHandler for LocalListBlockErrorsRequest {
type Response = LocalListBlockErrorsResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalListBlockErrorsResponse, Error> {
let errors = garage.block_manager.list_resync_errors()?;
let now = now_msec();
let errors = errors
.into_iter()
.map(|e| BlockError {
block_hash: hex::encode(&e.hash),
refcount: e.refcount,
error_count: e.error_count,
last_try_secs_ago: now.saturating_sub(e.last_try) / 1000,
next_try_in_secs: e.next_try.saturating_sub(now) / 1000,
})
.collect();
Ok(LocalListBlockErrorsResponse(errors))
}
}
#[async_trait]
impl RequestHandler for LocalGetBlockInfoRequest {
type Response = LocalGetBlockInfoResponse;
async fn handle(
self,
garage: &Arc<Garage>,
_admin: &Admin,
) -> Result<LocalGetBlockInfoResponse, Error> {
let hash = find_block_hash_by_prefix(garage, &self.block_hash)?;
let refcount = garage.block_manager.get_block_rc(&hash)?;
let block_refs = garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
for br in block_refs {
if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? {
let bl = match &v.backlink {
VersionBacklink::MultipartUpload { upload_id } => {
if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? {
BlockVersionBacklink::Upload {
upload_id: hex::encode(&upload_id),
upload_deleted: u.deleted.get(),
upload_garbage_collected: false,
bucket_id: Some(hex::encode(&u.bucket_id)),
key: Some(u.key.to_string()),
}
} else {
BlockVersionBacklink::Upload {
upload_id: hex::encode(&upload_id),
upload_deleted: true,
upload_garbage_collected: true,
bucket_id: None,
key: None,
}
}
}
VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object {
bucket_id: hex::encode(&bucket_id),
key: key.to_string(),
},
};
versions.push(BlockVersion {
version_id: hex::encode(&br.version),
deleted: v.deleted.get(),
garbage_collected: false,
backlink: Some(bl),
});
} else {
versions.push(BlockVersion {
version_id: hex::encode(&br.version),
deleted: true,
garbage_collected: true,
backlink: None,
});
}
}
Ok(LocalGetBlockInfoResponse {
block_hash: hex::encode(&hash),
refcount,
versions,
})
}
}
fn find_block_hash_by_prefix(garage: &Arc<Garage>, prefix: &str) -> Result<Hash, Error> {
if prefix.len() < 4 {
return Err(Error::bad_request(
"Please specify at least 4 characters of the block hash",
));
}
let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
let iter = garage
.block_ref_table
.data
.store
.range(&prefix_bin[..]..)
.map_err(GarageError::from)?;
let mut found = None;
for item in iter {
let (k, _v) = item.map_err(GarageError::from)?;
let hash = Hash::try_from(&k[..32]).unwrap();
if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
break;
}
if hex::encode(hash.as_slice()).starts_with(prefix) {
match &found {
Some(x) if *x == hash => (),
Some(_) => {
return Err(Error::bad_request(format!(
"Several blocks match prefix `{}`",
prefix
)));
}
None => {
found = Some(hash);
}
}
}
}
found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string()))
}

View file

@ -25,6 +25,10 @@ pub enum Error {
#[error(display = "Access key not found: {}", _0)]
NoSuchAccessKey(String),
/// The requested block does not exist
#[error(display = "Block not found: {}", _0)]
NoSuchBlock(String),
/// The requested worker does not exist
#[error(display = "Worker not found: {}", _0)]
NoSuchWorker(u64),
@ -58,6 +62,7 @@ impl Error {
Error::Common(c) => c.aws_code(),
Error::NoSuchAccessKey(_) => "NoSuchAccessKey",
Error::NoSuchWorker(_) => "NoSuchWorker",
Error::NoSuchBlock(_) => "NoSuchBlock",
Error::KeyAlreadyExists(_) => "KeyAlreadyExists",
}
}
@ -68,7 +73,9 @@ impl ApiError for Error {
fn http_status_code(&self) -> StatusCode {
match self {
Error::Common(c) => c.http_status_code(),
Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND,
Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => {
StatusCode::NOT_FOUND
}
Error::KeyAlreadyExists(_) => StatusCode::CONFLICT,
}
}

View file

@ -15,6 +15,7 @@ mod cluster;
mod key;
mod special;
mod block;
mod worker;
use std::sync::Arc;

View file

@ -64,6 +64,9 @@ impl AdminApiRequest {
POST GetWorkerInfo (body_field, query::node),
POST GetWorkerVariable (body_field, query::node),
POST SetWorkerVariable (body_field, query::node),
// Block APIs
GET ListBlockErrors (default::body, query::node),
POST GetBlockInfo (body_field, query::node),
]);
if let Some(message) = query.nonempty_message() {

View file

@ -100,7 +100,7 @@ impl RequestHandler for LocalSetWorkerVariableRequest {
fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
WorkerInfoResp {
id: id,
id,
name: info.name,
state: match info.state {
WorkerState::Busy => WorkerStateResp::Busy,
@ -112,7 +112,7 @@ fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp {
consecutive_errors: info.consecutive_errors as u64,
last_error: info.last_error.map(|(message, t)| WorkerLastError {
message,
secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000,
secs_ago: now_msec().saturating_sub(t) / 1000,
}),
tranquility: info.status.tranquility,

View file

@ -13,52 +13,14 @@ use super::*;
impl AdminRpcHandler {
pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
match cmd {
BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
self.garage.block_manager.list_resync_errors()?,
)),
BlockOperation::Info { hash } => self.handle_block_info(hash).await,
BlockOperation::RetryNow { all, blocks } => {
self.handle_block_retry_now(*all, blocks).await
}
BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await,
_ => unreachable!(),
}
}
async fn handle_block_info(&self, hash: &String) -> Result<AdminRpc, Error> {
let hash = self.find_block_hash_by_prefix(hash)?;
let refcount = self.garage.block_manager.get_block_rc(&hash)?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
let mut uploads = vec![];
for br in block_refs {
if let Some(v) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink {
if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? {
uploads.push(u);
}
}
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
}
}
Ok(AdminRpc::BlockInfo {
hash,
refcount,
versions,
uploads,
})
}
async fn handle_block_retry_now(
&self,
all: bool,
@ -188,48 +150,4 @@ impl AdminRpcHandler {
Ok(())
}
// ---- helper function ----
fn find_block_hash_by_prefix(&self, prefix: &str) -> Result<Hash, Error> {
if prefix.len() < 4 {
return Err(Error::BadRequest(
"Please specify at least 4 characters of the block hash".into(),
));
}
let prefix_bin =
hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?;
let iter = self
.garage
.block_ref_table
.data
.store
.range(&prefix_bin[..]..)
.map_err(GarageError::from)?;
let mut found = None;
for item in iter {
let (k, _v) = item.map_err(GarageError::from)?;
let hash = Hash::try_from(&k[..32]).unwrap();
if &hash.as_slice()[..prefix_bin.len()] != prefix_bin {
break;
}
if hex::encode(hash.as_slice()).starts_with(prefix) {
match &found {
Some(x) if *x == hash => (),
Some(_) => {
return Err(Error::BadRequest(format!(
"Several blocks match prefix `{}`",
prefix
)));
}
None => {
found = Some(hash);
}
}
}
}
found.ok_or_else(|| Error::BadRequest("No matching block found".into()))
}
}

View file

@ -19,12 +19,8 @@ use garage_table::*;
use garage_rpc::layout::PARTITION_BITS;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::garage::Garage;
use garage_model::helper::error::Error;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::Version;
use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse};
use garage_api_admin::RequestHandler as AdminApiEndpoint;
@ -45,13 +41,6 @@ pub enum AdminRpc {
// Replies
Ok(String),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
},
}
impl Rpc for AdminRpc {

View file

@ -6,7 +6,6 @@ use garage_rpc::*;
use garage_model::helper::error::Error as HelperError;
use crate::admin::*;
use crate::cli::*;
pub async fn cmd_admin(
rpc_cli: &Endpoint<AdminRpc, ()>,
@ -17,17 +16,6 @@ pub async fn cmd_admin(
AdminRpc::Ok(msg) => {
println!("{}", msg);
}
AdminRpc::BlockErrorList(el) => {
print_block_error_list(el);
}
AdminRpc::BlockInfo {
hash,
refcount,
versions,
uploads,
} => {
print_block_info(hash, refcount, versions, uploads);
}
r => {
error!("Unexpected response: {:?}", r);
}

View file

@ -2,11 +2,9 @@ pub(crate) mod cmd;
pub(crate) mod init;
pub(crate) mod layout;
pub(crate) mod structs;
pub(crate) mod util;
pub(crate) mod convert_db;
pub(crate) use cmd::*;
pub(crate) use init::*;
pub(crate) use structs::*;
pub(crate) use util::*;

View file

@ -1,91 +0,0 @@
use std::time::Duration;
use format_table::format_table;
use garage_util::data::*;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::s3::mpu_table::MultipartUpload;
use garage_model::s3::version_table::*;
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
let mut tf2 = timeago::Formatter::new();
tf2.ago("");
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
let next_try = if e.next_try > now {
tf2.convert(Duration::from_millis(e.next_try - now))
} else {
"asap".to_string()
};
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
next_try
));
}
format_table(table);
}
pub fn print_block_info(
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
uploads: Vec<MultipartUpload>,
) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
match &ver.backlink {
VersionBacklink::Object { bucket_id, key } => {
table.push(format!(
"{:?}\t{:?}\t{}\t\t{:?}",
ver.uuid,
bucket_id,
key,
ver.deleted.get()
));
}
VersionBacklink::MultipartUpload { upload_id } => {
let upload = uploads.iter().find(|x| x.upload_id == *upload_id);
table.push(format!(
"{:?}\t{:?}\t{}\t{:?}\t{:?}",
ver.uuid,
upload.map(|u| u.bucket_id).unwrap_or_default(),
upload.map(|u| u.key.as_str()).unwrap_or_default(),
upload_id,
ver.deleted.get()
));
}
}
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
table.push(format!("{:?}\t\t\t\tyes", vh));
}
}
}
format_table(table);
if refcount != nondeleted_count {
println!();
println!(
"Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
);
}
}

109
src/garage/cli_v2/block.rs Normal file
View file

@ -0,0 +1,109 @@
//use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;
use garage_api::admin::api::*;
use crate::cli::structs::*;
use crate::cli_v2::*;
impl Cli {
pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> {
match cmd {
BlockOperation::ListErrors => self.cmd_list_block_errors().await,
BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await,
bo => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
}
}
pub async fn cmd_list_block_errors(&self) -> Result<(), Error> {
let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0;
let tf = timeago::Formatter::new();
let mut tf2 = timeago::Formatter::new();
tf2.ago("");
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in errors {
let next_try = if e.next_try_in_secs > 0 {
tf2.convert(Duration::from_secs(e.next_try_in_secs))
} else {
"asap".to_string()
};
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
e.block_hash,
e.refcount,
e.error_count,
tf.convert(Duration::from_secs(e.last_try_secs_ago)),
next_try
));
}
format_table(table);
Ok(())
}
pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> {
let info = self
.local_api_request(LocalGetBlockInfoRequest { block_hash: hash })
.await?;
println!("Block hash: {}", info.block_hash);
println!("Refcount: {}", info.refcount);
println!();
let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()];
let mut nondeleted_count = 0;
for ver in info.versions.iter() {
match &ver.backlink {
Some(BlockVersionBacklink::Object { bucket_id, key }) => {
table.push(format!(
"{:.16}\t{:.16}\t{}\t\t{:?}",
ver.version_id, bucket_id, key, ver.deleted
));
}
Some(BlockVersionBacklink::Upload {
upload_id,
upload_deleted: _,
upload_garbage_collected: _,
bucket_id,
key,
}) => {
table.push(format!(
"{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}",
ver.version_id,
bucket_id.as_deref().unwrap_or(""),
key.as_deref().unwrap_or(""),
upload_id,
ver.deleted
));
}
None => {
table.push(format!("{:.16}\t\t\tyes", ver.version_id));
}
}
if !ver.deleted {
nondeleted_count += 1;
}
}
format_table(table);
if info.refcount != nondeleted_count {
println!();
println!(
"Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
);
}
Ok(())
}
}

View file

@ -3,6 +3,7 @@ pub mod cluster;
pub mod key;
pub mod layout;
pub mod block;
pub mod worker;
use std::convert::TryFrom;
@ -41,6 +42,7 @@ impl Cli {
Command::Bucket(bo) => self.cmd_bucket(bo).await,
Command::Key(ko) => self.cmd_key(ko).await,
Command::Worker(wo) => self.cmd_worker(wo).await,
Command::Block(bo) => self.cmd_block(bo).await,
// TODO
Command::Repair(ro) => cli_v1::cmd_admin(
@ -55,13 +57,6 @@ impl Cli {
.await
.ok_or_message("cli_v1")
}
Command::Block(bo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,
AdminRpc::BlockOperation(bo),
)
.await
.ok_or_message("cli_v1"),
Command::Meta(mo) => cli_v1::cmd_admin(
&self.admin_rpc_endpoint,
self.rpc_host,

View file

@ -1,4 +1,3 @@
//use bytesize::ByteSize;
use format_table::format_table;
use garage_util::error::*;