Implement block list-errors and block info
All checks were successful
continuous-integration/drone/push Build is passing
continuous-integration/drone/pr Build is passing

This commit is contained in:
Alex 2022-12-13 14:23:45 +01:00
parent 9d82196945
commit 687660b27f
Signed by: lx
GPG key ID: 0E496D15096376BE
8 changed files with 240 additions and 28 deletions

View file

@ -90,6 +90,15 @@ pub struct BlockManager {
tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
pub struct BlockResyncErrorInfo {
pub hash: Hash,
pub refcount: u64,
pub error_count: u64,
pub last_try: u64,
pub next_try: u64,
}
// This custom struct contains functions that must only be ran
// when the lock is held. We ensure that it is the case by storing
// it INSIDE a Mutex.
@ -314,6 +323,31 @@ impl BlockManager {
let _ = self.tx_scrub_command.send(cmd).await;
}
/// Get the reference count of a block
pub fn get_block_rc(&self, hash: &Hash) -> Result<u64, Error> {
Ok(self.rc.get_block_rc(hash)?.as_u64())
}
/// List all resync errors
pub fn list_resync_errors(&self) -> Result<Vec<BlockResyncErrorInfo>, Error> {
let mut blocks = Vec::with_capacity(self.resync.errors.len());
for ent in self.resync.errors.iter()? {
let (hash, cnt) = ent?;
let cnt = ErrorCounter::decode(&cnt);
blocks.push(BlockResyncErrorInfo {
hash: Hash::try_from(&hash).unwrap(),
refcount: 0,
error_count: cnt.errors,
last_try: cnt.last_try,
next_try: cnt.next_try(),
});
}
for block in blocks.iter_mut() {
block.refcount = self.get_block_rc(&block.hash)?;
}
Ok(blocks)
}
//// ----- Managing the reference counter ----
/// Increment the number of time a block is used, putting it to resynchronization if it is

View file

@ -169,4 +169,11 @@ impl RcEntry {
pub(crate) fn is_needed(&self) -> bool {
!self.is_deletable()
}
pub(crate) fn as_u64(&self) -> u64 {
match self {
RcEntry::Present { count } => *count,
_ => 0,
}
}
}

View file

@ -540,9 +540,9 @@ impl Worker for ResyncWorker {
/// and the time of the last try.
/// Used to implement exponential backoff.
#[derive(Clone, Copy, Debug)]
struct ErrorCounter {
errors: u64,
last_try: u64,
pub(crate) struct ErrorCounter {
pub(crate) errors: u64,
pub(crate) last_try: u64,
}
impl ErrorCounter {
@ -553,12 +553,13 @@ impl ErrorCounter {
}
}
fn decode(data: &[u8]) -> Self {
pub(crate) fn decode(data: &[u8]) -> Self {
Self {
errors: u64::from_be_bytes(data[0..8].try_into().unwrap()),
last_try: u64::from_be_bytes(data[8..16].try_into().unwrap()),
}
}
fn encode(&self) -> Vec<u8> {
[
u64::to_be_bytes(self.errors),
@ -578,7 +579,8 @@ impl ErrorCounter {
(RESYNC_RETRY_DELAY.as_millis() as u64)
<< std::cmp::min(self.errors - 1, RESYNC_RETRY_DELAY_MAX_BACKOFF_POWER)
}
fn next_try(&self) -> u64 {
pub(crate) fn next_try(&self) -> u64 {
self.last_try + self.delay_msec()
}
}

View file

@ -15,6 +15,7 @@ use garage_table::*;
use garage_rpc::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_block::repair::ScrubWorkerCommand;
use garage_model::bucket_alias_table::*;
@ -24,6 +25,7 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*;
use garage_model::migrate::Migrate;
use garage_model::permission::*;
use garage_model::s3::version_table::Version;
use crate::cli::*;
use crate::repair::online::launch_online_repair;
@ -38,7 +40,8 @@ pub enum AdminRpc {
LaunchRepair(RepairOpt),
Migrate(MigrateOpt),
Stats(StatsOpt),
Worker(WorkerOpt),
Worker(WorkerOperation),
BlockOperation(BlockOperation),
// Replies
Ok(String),
@ -55,6 +58,12 @@ pub enum AdminRpc {
WorkerListOpt,
),
WorkerInfo(usize, garage_util::background::WorkerInfo),
BlockErrorList(Vec<BlockResyncErrorInfo>),
BlockInfo {
hash: Hash,
refcount: u64,
versions: Vec<Result<Version, Uuid>>,
},
}
impl Rpc for AdminRpc {
@ -74,6 +83,8 @@ impl AdminRpcHandler {
admin
}
// ================ BUCKET COMMANDS ====================
async fn handle_bucket_cmd(&self, cmd: &BucketOperation) -> Result<AdminRpc, Error> {
match cmd {
BucketOperation::List => self.handle_list_buckets().await,
@ -552,6 +563,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok(ret))
}
// ================ KEY COMMANDS ====================
async fn handle_key_cmd(&self, cmd: &KeyOperation) -> Result<AdminRpc, Error> {
match cmd {
KeyOperation::List => self.handle_list_keys().await,
@ -689,6 +702,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::KeyInfo(key, relevant_buckets))
}
// ================ MIGRATION COMMANDS ====================
async fn handle_migrate(self: &Arc<Self>, opt: MigrateOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@ -705,6 +720,8 @@ impl AdminRpcHandler {
Ok(AdminRpc::Ok("Migration successfull.".into()))
}
// ================ REPAIR COMMANDS ====================
async fn handle_launch_repair(self: &Arc<Self>, opt: RepairOpt) -> Result<AdminRpc, Error> {
if !opt.yes {
return Err(Error::BadRequest(
@ -748,6 +765,8 @@ impl AdminRpcHandler {
}
}
// ================ STATS COMMANDS ====================
async fn handle_stats(&self, opt: StatsOpt) -> Result<AdminRpc, Error> {
if opt.all_nodes {
let mut ret = String::new();
@ -873,27 +892,27 @@ impl AdminRpcHandler {
Ok(())
}
// ----
// ================ WORKER COMMANDS ====================
async fn handle_worker_cmd(&self, opt: WorkerOpt) -> Result<AdminRpc, Error> {
match opt.cmd {
WorkerCmd::List { opt } => {
async fn handle_worker_cmd(&self, cmd: &WorkerOperation) -> Result<AdminRpc, Error> {
match cmd {
WorkerOperation::List { opt } => {
let workers = self.garage.background.get_worker_info();
Ok(AdminRpc::WorkerList(workers, opt))
Ok(AdminRpc::WorkerList(workers, *opt))
}
WorkerCmd::Info { tid } => {
WorkerOperation::Info { tid } => {
let info = self
.garage
.background
.get_worker_info()
.get(&tid)
.get(tid)
.ok_or_bad_request(format!("No worker with TID {}", tid))?
.clone();
Ok(AdminRpc::WorkerInfo(tid, info))
Ok(AdminRpc::WorkerInfo(*tid, info))
}
WorkerCmd::Set { opt } => match opt {
WorkerOperation::Set { opt } => match opt {
WorkerSetCmd::ScrubTranquility { tranquility } => {
let scrub_command = ScrubWorkerCommand::SetTranquility(tranquility);
let scrub_command = ScrubWorkerCommand::SetTranquility(*tranquility);
self.garage
.block_manager
.send_scrub_command(scrub_command)
@ -904,7 +923,7 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
.set_n_workers(worker_count)
.set_n_workers(*worker_count)
.await?;
Ok(AdminRpc::Ok("Number of resync workers updated".into()))
}
@ -912,13 +931,57 @@ impl AdminRpcHandler {
self.garage
.block_manager
.resync
.set_tranquility(tranquility)
.set_tranquility(*tranquility)
.await?;
Ok(AdminRpc::Ok("Resync tranquility updated".into()))
}
},
}
}
// ================ BLOCK COMMANDS ====================
async fn handle_block_cmd(&self, cmd: &BlockOperation) -> Result<AdminRpc, Error> {
match cmd {
BlockOperation::ListErrors => Ok(AdminRpc::BlockErrorList(
self.garage.block_manager.list_resync_errors()?,
)),
BlockOperation::Info { hash } => {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let refcount = self.garage.block_manager.get_block_rc(&hash)?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
let mut versions = vec![];
for br in block_refs {
if let Some(v) = self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
versions.push(Ok(v));
} else {
versions.push(Err(br.version));
}
}
Ok(AdminRpc::BlockInfo {
hash,
refcount,
versions,
})
}
BlockOperation::RetryNow { .. } => {
Err(GarageError::Message("not implemented".into()).into())
}
BlockOperation::Purge { .. } => {
Err(GarageError::Message("not implemented".into()).into())
}
}
}
}
#[async_trait]
@ -934,7 +997,8 @@ impl EndpointHandler<AdminRpc> for AdminRpcHandler {
AdminRpc::Migrate(opt) => self.handle_migrate(opt.clone()).await,
AdminRpc::LaunchRepair(opt) => self.handle_launch_repair(opt.clone()).await,
AdminRpc::Stats(opt) => self.handle_stats(opt.clone()).await,
AdminRpc::Worker(opt) => self.handle_worker_cmd(opt.clone()).await,
AdminRpc::Worker(wo) => self.handle_worker_cmd(wo).await,
AdminRpc::BlockOperation(bo) => self.handle_block_cmd(bo).await,
m => Err(GarageError::unexpected_rpc_message(m).into()),
}
}

View file

@ -41,6 +41,9 @@ pub async fn cli_command_dispatch(
}
Command::Stats(so) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Stats(so)).await,
Command::Worker(wo) => cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::Worker(wo)).await,
Command::Block(bo) => {
cmd_admin(admin_rpc_endpoint, rpc_host, AdminRpc::BlockOperation(bo)).await
}
_ => unreachable!(),
}
}
@ -191,6 +194,16 @@ pub async fn cmd_admin(
AdminRpc::WorkerInfo(tid, wi) => {
print_worker_info(tid, wi);
}
AdminRpc::BlockErrorList(el) => {
print_block_error_list(el);
}
AdminRpc::BlockInfo {
hash,
refcount,
versions,
} => {
print_block_info(hash, refcount, versions);
}
r => {
error!("Unexpected response: {:?}", r);
}

View file

@ -49,7 +49,11 @@ pub enum Command {
/// Manage background workers
#[structopt(name = "worker", version = garage_version())]
Worker(WorkerOpt),
Worker(WorkerOperation),
/// Low-level debug operations on data blocks
#[structopt(name = "block", version = garage_version())]
Block(BlockOperation),
}
#[derive(StructOpt, Debug)]
@ -502,14 +506,8 @@ pub struct StatsOpt {
pub detailed: bool,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Clone)]
pub struct WorkerOpt {
#[structopt(subcommand)]
pub cmd: WorkerCmd,
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum WorkerCmd {
pub enum WorkerOperation {
/// List all workers on Garage node
#[structopt(name = "list", version = garage_version())]
List {
@ -549,3 +547,34 @@ pub enum WorkerSetCmd {
#[structopt(name = "resync-tranquility", version = garage_version())]
ResyncTranquility { tranquility: u32 },
}
#[derive(Serialize, Deserialize, StructOpt, Debug, Eq, PartialEq, Clone)]
pub enum BlockOperation {
/// List all blocks that currently have a resync error
#[structopt(name = "list-errors", version = garage_version())]
ListErrors,
/// Get detailed information about a single block
#[structopt(name = "info", version = garage_version())]
Info {
/// Hash of the block for which to retrieve information
hash: String,
},
/// Retry now the resync of one or many blocks
#[structopt(name = "retry-now", version = garage_version())]
RetryNow {
/// Retry all blocks that have a resync error
#[structopt(long = "all")]
all: bool,
/// Hashes of the block to retry to resync now
blocks: Vec<String>,
},
/// Delete all objects referencing a missing block
#[structopt(name = "purge", version = garage_version())]
Purge {
/// Mandatory to confirm this operation
#[structopt(long = "yes")]
yes: bool,
/// Hashes of the block to purge
blocks: Vec<String>,
},
}

View file

@ -3,14 +3,17 @@ use std::time::Duration;
use garage_util::background::*;
use garage_util::crdt::*;
use garage_util::data::Uuid;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::formater::format_table;
use garage_util::time::*;
use garage_block::manager::BlockResyncErrorInfo;
use garage_model::bucket_table::*;
use garage_model::key_table::*;
use garage_model::s3::object_table::{BYTES, OBJECTS, UNFINISHED_UPLOADS};
use garage_model::s3::version_table::Version;
use crate::cli::structs::WorkerListOpt;
@ -353,3 +356,57 @@ pub fn print_worker_info(tid: usize, info: WorkerInfo) {
}
format_table(table);
}
pub fn print_block_error_list(el: Vec<BlockResyncErrorInfo>) {
let now = now_msec();
let tf = timeago::Formatter::new();
let mut tf2 = timeago::Formatter::new();
tf2.ago("");
let mut table = vec!["Hash\tRC\tErrors\tLast error\tNext try".into()];
for e in el {
table.push(format!(
"{}\t{}\t{}\t{}\tin {}",
hex::encode(e.hash.as_slice()),
e.refcount,
e.error_count,
tf.convert(Duration::from_millis(now - e.last_try)),
tf2.convert(Duration::from_millis(e.next_try - now))
));
}
format_table(table);
}
pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version, Uuid>>) {
println!("Block hash: {}", hex::encode(hash.as_slice()));
println!("Refcount: {}", refcount);
println!();
let mut table = vec!["Version\tBucket\tPath\tDeleted".into()];
let mut nondeleted_count = 0;
for v in versions.iter() {
match v {
Ok(ver) => {
table.push(format!(
"{:?}\t{:?}\t{}\t{:?}",
ver.uuid,
ver.bucket_id,
ver.key,
ver.deleted.get()
));
if !ver.deleted.get() {
nondeleted_count += 1;
}
}
Err(vh) => {
table.push(format!("{:?}\t\t\tyes", vh));
}
}
}
format_table(table);
if refcount != nondeleted_count {
println!();
println!("Warning: refcount does not match number of non-deleted versions");
}
}

View file

@ -49,3 +49,9 @@ impl EnumerationOrder {
}
}
}
impl Default for EnumerationOrder {
fn default() -> Self {
EnumerationOrder::Forward
}
}