From d405a9f839779b1454e47e4b53a418603061c5e9 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 31 Jan 2025 16:53:33 +0100 Subject: [PATCH] cli_v2: implement ListBlockErrors and GetBlockInfo --- src/api/admin/api.rs | 71 +++++++++++++++++ src/api/admin/block.rs | 149 ++++++++++++++++++++++++++++++++++++ src/api/admin/error.rs | 9 ++- src/api/admin/lib.rs | 1 + src/api/admin/router_v2.rs | 3 + src/api/admin/worker.rs | 4 +- src/garage/admin/block.rs | 84 +------------------- src/garage/admin/mod.rs | 11 --- src/garage/cli/cmd.rs | 12 --- src/garage/cli/mod.rs | 2 - src/garage/cli/util.rs | 91 ---------------------- src/garage/cli_v2/block.rs | 109 ++++++++++++++++++++++++++ src/garage/cli_v2/mod.rs | 9 +-- src/garage/cli_v2/worker.rs | 1 - 14 files changed, 346 insertions(+), 210 deletions(-) create mode 100644 src/api/admin/block.rs delete mode 100644 src/garage/cli/util.rs create mode 100644 src/garage/cli_v2/block.rs diff --git a/src/api/admin/api.rs b/src/api/admin/api.rs index cf136d28..42872ad0 100644 --- a/src/api/admin/api.rs +++ b/src/api/admin/api.rs @@ -82,6 +82,10 @@ admin_endpoints![ GetWorkerInfo, GetWorkerVariable, SetWorkerVariable, + + // Block operations + ListBlockErrors, + GetBlockInfo, ]; local_admin_endpoints![ @@ -90,6 +94,9 @@ local_admin_endpoints![ GetWorkerInfo, GetWorkerVariable, SetWorkerVariable, + // Block operations + ListBlockErrors, + GetBlockInfo, ]; #[derive(Debug, Clone, Serialize, Deserialize)] @@ -619,6 +626,7 @@ pub struct RemoveBucketAliasResponse(pub GetBucketInfoResponse); // ---- GetWorkerList ---- #[derive(Debug, Clone, Serialize, Deserialize, Default)] +#[serde(rename_all = "camelCase")] pub struct LocalListWorkersRequest { #[serde(default)] pub busy_only: bool, @@ -694,3 +702,66 @@ pub struct LocalSetWorkerVariableResponse { pub variable: String, pub value: String, } + +// ********************************************** +// Block operations +// ********************************************** + +// ---- ListBlockErrors ---- + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LocalListBlockErrorsRequest; + +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct LocalListBlockErrorsResponse(pub Vec); + +#[derive(Serialize, Deserialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct BlockError { + pub block_hash: String, + pub refcount: u64, + pub error_count: u64, + pub last_try_secs_ago: u64, + pub next_try_in_secs: u64, +} + +// ---- GetBlockInfo ---- + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoRequest { + pub block_hash: String, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct LocalGetBlockInfoResponse { + pub block_hash: String, + pub refcount: u64, + pub versions: Vec, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub struct BlockVersion { + pub version_id: String, + pub deleted: bool, + pub garbage_collected: bool, + pub backlink: Option, +} + +#[derive(Debug, Clone, Serialize, Deserialize)] +#[serde(rename_all = "camelCase")] +pub enum BlockVersionBacklink { + Object { + bucket_id: String, + key: String, + }, + Upload { + upload_id: String, + upload_deleted: bool, + upload_garbage_collected: bool, + bucket_id: Option, + key: Option, + }, +} diff --git a/src/api/admin/block.rs b/src/api/admin/block.rs new file mode 100644 index 00000000..157db5b5 --- /dev/null +++ b/src/api/admin/block.rs @@ -0,0 +1,149 @@ +use std::sync::Arc; + +use async_trait::async_trait; + +use garage_util::data::*; +use garage_util::error::Error as GarageError; +use garage_util::time::now_msec; + +use garage_table::EmptyKey; + +use garage_model::garage::Garage; +use garage_model::s3::version_table::*; + +use crate::admin::api::*; +use crate::admin::error::*; +use crate::admin::{Admin, RequestHandler}; +use crate::common_error::CommonErrorDerivative; + +#[async_trait] +impl RequestHandler for LocalListBlockErrorsRequest { + type Response = LocalListBlockErrorsResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let errors = garage.block_manager.list_resync_errors()?; + let now = now_msec(); + let errors = errors + .into_iter() + .map(|e| BlockError { + block_hash: hex::encode(&e.hash), + refcount: e.refcount, + error_count: e.error_count, + last_try_secs_ago: now.saturating_sub(e.last_try) / 1000, + next_try_in_secs: e.next_try.saturating_sub(now) / 1000, + }) + .collect(); + Ok(LocalListBlockErrorsResponse(errors)) + } +} + +#[async_trait] +impl RequestHandler for LocalGetBlockInfoRequest { + type Response = LocalGetBlockInfoResponse; + + async fn handle( + self, + garage: &Arc, + _admin: &Admin, + ) -> Result { + let hash = find_block_hash_by_prefix(garage, &self.block_hash)?; + let refcount = garage.block_manager.get_block_rc(&hash)?; + let block_refs = garage + .block_ref_table + .get_range(&hash, None, None, 10000, Default::default()) + .await?; + let mut versions = vec![]; + for br in block_refs { + if let Some(v) = garage.version_table.get(&br.version, &EmptyKey).await? { + let bl = match &v.backlink { + VersionBacklink::MultipartUpload { upload_id } => { + if let Some(u) = garage.mpu_table.get(upload_id, &EmptyKey).await? { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: u.deleted.get(), + upload_garbage_collected: false, + bucket_id: Some(hex::encode(&u.bucket_id)), + key: Some(u.key.to_string()), + } + } else { + BlockVersionBacklink::Upload { + upload_id: hex::encode(&upload_id), + upload_deleted: true, + upload_garbage_collected: true, + bucket_id: None, + key: None, + } + } + } + VersionBacklink::Object { bucket_id, key } => BlockVersionBacklink::Object { + bucket_id: hex::encode(&bucket_id), + key: key.to_string(), + }, + }; + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: v.deleted.get(), + garbage_collected: false, + backlink: Some(bl), + }); + } else { + versions.push(BlockVersion { + version_id: hex::encode(&br.version), + deleted: true, + garbage_collected: true, + backlink: None, + }); + } + } + Ok(LocalGetBlockInfoResponse { + block_hash: hex::encode(&hash), + refcount, + versions, + }) + } +} + +fn find_block_hash_by_prefix(garage: &Arc, prefix: &str) -> Result { + if prefix.len() < 4 { + return Err(Error::bad_request( + "Please specify at least 4 characters of the block hash", + )); + } + + let prefix_bin = hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; + + let iter = garage + .block_ref_table + .data + .store + .range(&prefix_bin[..]..) + .map_err(GarageError::from)?; + let mut found = None; + for item in iter { + let (k, _v) = item.map_err(GarageError::from)?; + let hash = Hash::try_from(&k[..32]).unwrap(); + if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { + break; + } + if hex::encode(hash.as_slice()).starts_with(prefix) { + match &found { + Some(x) if *x == hash => (), + Some(_) => { + return Err(Error::bad_request(format!( + "Several blocks match prefix `{}`", + prefix + ))); + } + None => { + found = Some(hash); + } + } + } + } + + found.ok_or_else(|| Error::NoSuchBlock(prefix.to_string())) +} diff --git a/src/api/admin/error.rs b/src/api/admin/error.rs index 354a3bab..d7ea7dc9 100644 --- a/src/api/admin/error.rs +++ b/src/api/admin/error.rs @@ -25,6 +25,10 @@ pub enum Error { #[error(display = "Access key not found: {}", _0)] NoSuchAccessKey(String), + /// The requested block does not exist + #[error(display = "Block not found: {}", _0)] + NoSuchBlock(String), + /// The requested worker does not exist #[error(display = "Worker not found: {}", _0)] NoSuchWorker(u64), @@ -58,6 +62,7 @@ impl Error { Error::Common(c) => c.aws_code(), Error::NoSuchAccessKey(_) => "NoSuchAccessKey", Error::NoSuchWorker(_) => "NoSuchWorker", + Error::NoSuchBlock(_) => "NoSuchBlock", Error::KeyAlreadyExists(_) => "KeyAlreadyExists", } } @@ -68,7 +73,9 @@ impl ApiError for Error { fn http_status_code(&self) -> StatusCode { match self { Error::Common(c) => c.http_status_code(), - Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) => StatusCode::NOT_FOUND, + Error::NoSuchAccessKey(_) | Error::NoSuchWorker(_) | Error::NoSuchBlock(_) => { + StatusCode::NOT_FOUND + } Error::KeyAlreadyExists(_) => StatusCode::CONFLICT, } } diff --git a/src/api/admin/lib.rs b/src/api/admin/lib.rs index 4ad10532..e7ee37af 100644 --- a/src/api/admin/lib.rs +++ b/src/api/admin/lib.rs @@ -15,6 +15,7 @@ mod cluster; mod key; mod special; +mod block; mod worker; use std::sync::Arc; diff --git a/src/api/admin/router_v2.rs b/src/api/admin/router_v2.rs index 6334b3b1..5c6cb29c 100644 --- a/src/api/admin/router_v2.rs +++ b/src/api/admin/router_v2.rs @@ -64,6 +64,9 @@ impl AdminApiRequest { POST GetWorkerInfo (body_field, query::node), POST GetWorkerVariable (body_field, query::node), POST SetWorkerVariable (body_field, query::node), + // Block APIs + GET ListBlockErrors (default::body, query::node), + POST GetBlockInfo (body_field, query::node), ]); if let Some(message) = query.nonempty_message() { diff --git a/src/api/admin/worker.rs b/src/api/admin/worker.rs index c7c75700..d143e5be 100644 --- a/src/api/admin/worker.rs +++ b/src/api/admin/worker.rs @@ -100,7 +100,7 @@ impl RequestHandler for LocalSetWorkerVariableRequest { fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { WorkerInfoResp { - id: id, + id, name: info.name, state: match info.state { WorkerState::Busy => WorkerStateResp::Busy, @@ -112,7 +112,7 @@ fn worker_info_to_api(id: u64, info: WorkerInfo) -> WorkerInfoResp { consecutive_errors: info.consecutive_errors as u64, last_error: info.last_error.map(|(message, t)| WorkerLastError { message, - secs_ago: (std::cmp::max(t, now_msec()) - t) / 1000, + secs_ago: now_msec().saturating_sub(t) / 1000, }), tranquility: info.status.tranquility, diff --git a/src/garage/admin/block.rs b/src/garage/admin/block.rs index edeb88c0..1138703a 100644 --- a/src/garage/admin/block.rs +++ b/src/garage/admin/block.rs @@ -13,52 +13,14 @@ use super::*; impl AdminRpcHandler { pub(super) async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result { match cmd { - BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList( - self.garage.block_manager.list_resync_errors()?, - )), - BlockOperation::Info { hash } => self.handle_block_info(hash).await, BlockOperation::RetryNow { all, blocks } => { self.handle_block_retry_now(*all, blocks).await } BlockOperation::Purge { yes, blocks } => self.handle_block_purge(*yes, blocks).await, + _ => unreachable!(), } } - async fn handle_block_info(&self, hash: &String) -> Result { - let hash = self.find_block_hash_by_prefix(hash)?; - let refcount = self.garage.block_manager.get_block_rc(&hash)?; - let block_refs = self - .garage - .block_ref_table - .get_range(&hash, None, None, 10000, Default::default()) - .await?; - let mut versions = vec![]; - let mut uploads = vec![]; - for br in block_refs { - if let Some(v) = self - .garage - .version_table - .get(&br.version, &EmptyKey) - .await? - { - if let VersionBacklink::MultipartUpload { upload_id } = &v.backlink { - if let Some(u) = self.garage.mpu_table.get(upload_id, &EmptyKey).await? { - uploads.push(u); - } - } - versions.push(Ok(v)); - } else { - versions.push(Err(br.version)); - } - } - Ok(AdminRpc::BlockInfo { - hash, - refcount, - versions, - uploads, - }) - } - async fn handle_block_retry_now( &self, all: bool, @@ -188,48 +150,4 @@ impl AdminRpcHandler { Ok(()) } - - // ---- helper function ---- - fn find_block_hash_by_prefix(&self, prefix: &str) -> Result { - if prefix.len() < 4 { - return Err(Error::BadRequest( - "Please specify at least 4 characters of the block hash".into(), - )); - } - - let prefix_bin = - hex::decode(&prefix[..prefix.len() & !1]).ok_or_bad_request("invalid hash")?; - - let iter = self - .garage - .block_ref_table - .data - .store - .range(&prefix_bin[..]..) - .map_err(GarageError::from)?; - let mut found = None; - for item in iter { - let (k, _v) = item.map_err(GarageError::from)?; - let hash = Hash::try_from(&k[..32]).unwrap(); - if &hash.as_slice()[..prefix_bin.len()] != prefix_bin { - break; - } - if hex::encode(hash.as_slice()).starts_with(prefix) { - match &found { - Some(x) if *x == hash => (), - Some(_) => { - return Err(Error::BadRequest(format!( - "Several blocks match prefix `{}`", - prefix - ))); - } - None => { - found = Some(hash); - } - } - } - } - - found.ok_or_else(|| Error::BadRequest("No matching block found".into())) - } } diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index c0e63524..1aa9482c 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -19,12 +19,8 @@ use garage_table::*; use garage_rpc::layout::PARTITION_BITS; use garage_rpc::*; -use garage_block::manager::BlockResyncErrorInfo; - use garage_model::garage::Garage; use garage_model::helper::error::Error; -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::Version; use garage_api_admin::api::{AdminApiRequest, TaggedAdminApiResponse}; use garage_api_admin::RequestHandler as AdminApiEndpoint; @@ -45,13 +41,6 @@ pub enum AdminRpc { // Replies Ok(String), - BlockErrorList(Vec), - BlockInfo { - hash: Hash, - refcount: u64, - versions: Vec>, - uploads: Vec, - }, } impl Rpc for AdminRpc { diff --git a/src/garage/cli/cmd.rs b/src/garage/cli/cmd.rs index bc34d014..e5af461c 100644 --- a/src/garage/cli/cmd.rs +++ b/src/garage/cli/cmd.rs @@ -6,7 +6,6 @@ use garage_rpc::*; use garage_model::helper::error::Error as HelperError; use crate::admin::*; -use crate::cli::*; pub async fn cmd_admin( rpc_cli: &Endpoint, @@ -17,17 +16,6 @@ pub async fn cmd_admin( AdminRpc::Ok(msg) => { println!("{}", msg); } - AdminRpc::BlockErrorList(el) => { - print_block_error_list(el); - } - AdminRpc::BlockInfo { - hash, - refcount, - versions, - uploads, - } => { - print_block_info(hash, refcount, versions, uploads); - } r => { error!("Unexpected response: {:?}", r); } diff --git a/src/garage/cli/mod.rs b/src/garage/cli/mod.rs index 30f566e2..c15afda1 100644 --- a/src/garage/cli/mod.rs +++ b/src/garage/cli/mod.rs @@ -2,11 +2,9 @@ pub(crate) mod cmd; pub(crate) mod init; pub(crate) mod layout; pub(crate) mod structs; -pub(crate) mod util; pub(crate) mod convert_db; pub(crate) use cmd::*; pub(crate) use init::*; pub(crate) use structs::*; -pub(crate) use util::*; diff --git a/src/garage/cli/util.rs b/src/garage/cli/util.rs deleted file mode 100644 index 43b28623..00000000 --- a/src/garage/cli/util.rs +++ /dev/null @@ -1,91 +0,0 @@ -use std::time::Duration; - -use format_table::format_table; -use garage_util::data::*; -use garage_util::time::*; - -use garage_block::manager::BlockResyncErrorInfo; - -use garage_model::s3::mpu_table::MultipartUpload; -use garage_model::s3::version_table::*; - -pub fn print_block_error_list(el: Vec) { - let now = now_msec(); - let tf = timeago::Formatter::new(); - let mut tf2 = timeago::Formatter::new(); - tf2.ago(""); - - let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; - for e in el { - let next_try = if e.next_try > now { - tf2.convert(Duration::from_millis(e.next_try - now)) - } else { - "asap".to_string() - }; - table.push(format!( - "{}\t{}\t{}\t{}\tin {}", - hex::encode(e.hash.as_slice()), - e.refcount, - e.error_count, - tf.convert(Duration::from_millis(now - e.last_try)), - next_try - )); - } - format_table(table); -} - -pub fn print_block_info( - hash: Hash, - refcount: u64, - versions: Vec>, - uploads: Vec, -) { - println!("Block hash: {}", hex::encode(hash.as_slice())); - println!("Refcount: {}", refcount); - println!(); - - let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; - let mut nondeleted_count = 0; - for v in versions.iter() { - match v { - Ok(ver) => { - match &ver.backlink { - VersionBacklink::Object { bucket_id, key } => { - table.push(format!( - "{:?}\t{:?}\t{}\t\t{:?}", - ver.uuid, - bucket_id, - key, - ver.deleted.get() - )); - } - VersionBacklink::MultipartUpload { upload_id } => { - let upload = uploads.iter().find(|x| x.upload_id == *upload_id); - table.push(format!( - "{:?}\t{:?}\t{}\t{:?}\t{:?}", - ver.uuid, - upload.map(|u| u.bucket_id).unwrap_or_default(), - upload.map(|u| u.key.as_str()).unwrap_or_default(), - upload_id, - ver.deleted.get() - )); - } - } - if !ver.deleted.get() { - nondeleted_count += 1; - } - } - Err(vh) => { - table.push(format!("{:?}\t\t\t\tyes", vh)); - } - } - } - format_table(table); - - if refcount != nondeleted_count { - println!(); - println!( - "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." - ); - } -} diff --git a/src/garage/cli_v2/block.rs b/src/garage/cli_v2/block.rs new file mode 100644 index 00000000..ff3c79e9 --- /dev/null +++ b/src/garage/cli_v2/block.rs @@ -0,0 +1,109 @@ +//use bytesize::ByteSize; +use format_table::format_table; + +use garage_util::error::*; + +use garage_api::admin::api::*; + +use crate::cli::structs::*; +use crate::cli_v2::*; + +impl Cli { + pub async fn cmd_block(&self, cmd: BlockOperation) -> Result<(), Error> { + match cmd { + BlockOperation::ListErrors => self.cmd_list_block_errors().await, + BlockOperation::Info { hash } => self.cmd_get_block_info(hash).await, + + bo => cli_v1::cmd_admin( + &self.admin_rpc_endpoint, + self.rpc_host, + AdminRpc::BlockOperation(bo), + ) + .await + .ok_or_message("cli_v1"), + } + } + + pub async fn cmd_list_block_errors(&self) -> Result<(), Error> { + let errors = self.local_api_request(LocalListBlockErrorsRequest).await?.0; + + let tf = timeago::Formatter::new(); + let mut tf2 = timeago::Formatter::new(); + tf2.ago(""); + + let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()]; + for e in errors { + let next_try = if e.next_try_in_secs > 0 { + tf2.convert(Duration::from_secs(e.next_try_in_secs)) + } else { + "asap".to_string() + }; + table.push(format!( + "{}\t{}\t{}\t{}\tin {}", + e.block_hash, + e.refcount, + e.error_count, + tf.convert(Duration::from_secs(e.last_try_secs_ago)), + next_try + )); + } + format_table(table); + + Ok(()) + } + + pub async fn cmd_get_block_info(&self, hash: String) -> Result<(), Error> { + let info = self + .local_api_request(LocalGetBlockInfoRequest { block_hash: hash }) + .await?; + + println!("Block hash: {}", info.block_hash); + println!("Refcount: {}", info.refcount); + println!(); + + let mut table = vec!["Version\tBucket\tKey\tMPU\tDeleted".into()]; + let mut nondeleted_count = 0; + for ver in info.versions.iter() { + match &ver.backlink { + Some(BlockVersionBacklink::Object { bucket_id, key }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t\t{:?}", + ver.version_id, bucket_id, key, ver.deleted + )); + } + Some(BlockVersionBacklink::Upload { + upload_id, + upload_deleted: _, + upload_garbage_collected: _, + bucket_id, + key, + }) => { + table.push(format!( + "{:.16}\t{:.16}\t{}\t{:.16}\t{:.16}", + ver.version_id, + bucket_id.as_deref().unwrap_or(""), + key.as_deref().unwrap_or(""), + upload_id, + ver.deleted + )); + } + None => { + table.push(format!("{:.16}\t\t\tyes", ver.version_id)); + } + } + if !ver.deleted { + nondeleted_count += 1; + } + } + format_table(table); + + if info.refcount != nondeleted_count { + println!(); + println!( + "Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`." + ); + } + + Ok(()) + } +} diff --git a/src/garage/cli_v2/mod.rs b/src/garage/cli_v2/mod.rs index b175ab38..462e5722 100644 --- a/src/garage/cli_v2/mod.rs +++ b/src/garage/cli_v2/mod.rs @@ -3,6 +3,7 @@ pub mod cluster; pub mod key; pub mod layout; +pub mod block; pub mod worker; use std::convert::TryFrom; @@ -41,6 +42,7 @@ impl Cli { Command::Bucket(bo) => self.cmd_bucket(bo).await, Command::Key(ko) => self.cmd_key(ko).await, Command::Worker(wo) => self.cmd_worker(wo).await, + Command::Block(bo) => self.cmd_block(bo).await, // TODO Command::Repair(ro) => cli_v1::cmd_admin( @@ -55,13 +57,6 @@ impl Cli { .await .ok_or_message("cli_v1") } - Command::Block(bo) => cli_v1::cmd_admin( - &self.admin_rpc_endpoint, - self.rpc_host, - AdminRpc::BlockOperation(bo), - ) - .await - .ok_or_message("cli_v1"), Command::Meta(mo) => cli_v1::cmd_admin( &self.admin_rpc_endpoint, self.rpc_host, diff --git a/src/garage/cli_v2/worker.rs b/src/garage/cli_v2/worker.rs index b94a4f68..9c248a39 100644 --- a/src/garage/cli_v2/worker.rs +++ b/src/garage/cli_v2/worker.rs @@ -1,4 +1,3 @@ -//use bytesize::ByteSize; use format_table::format_table; use garage_util::error::*;