Implement block retry-now and block purge

This commit is contained in:
Alex 2022-12-13 15:02:42 +01:00
parent 687660b27f
commit d7f90cabb0
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
4 changed files with 124 additions and 5 deletions

View file

@ -123,6 +123,24 @@ impl BlockResyncManager {
Ok(self.errors.len()) Ok(self.errors.len())
} }
/// Clear the error counter for a block and put it in queue immediately
pub fn clear_backoff(&self, hash: &Hash) -> Result<(), Error> {
let now = now_msec();
if let Some(ec) = self.errors.get(hash)? {
let mut ec = ErrorCounter::decode(&ec);
if ec.errors > 0 {
ec.last_try = now - ec.delay_msec();
self.errors.insert(hash, ec.encode())?;
self.put_to_resync_at(hash, now)?;
return Ok(());
}
}
Err(Error::Message(format!(
"Block {:?} was not in an errored state",
hash
)))
}
// ---- Resync loop ---- // ---- Resync loop ----
// This part manages a queue of blocks that need to be // This part manages a queue of blocks that need to be

View file

@ -25,6 +25,7 @@ use garage_model::helper::error::{Error, OkOrBadRequest};
use garage_model::key_table::*; use garage_model::key_table::*;
use garage_model::migrate::Migrate; use garage_model::migrate::Migrate;
use garage_model::permission::*; use garage_model::permission::*;
use garage_model::s3::object_table::*;
use garage_model::s3::version_table::Version; use garage_model::s3::version_table::Version;
use crate::cli::*; use crate::cli::*;
@ -974,11 +975,110 @@ impl AdminRpcHandler {
versions, versions,
}) })
} }
BlockOperation::RetryNow { .. } => { BlockOperation::RetryNow { all, blocks } => {
Err(GarageError::Message("not implemented".into()).into()) if *all {
if !blocks.is_empty() {
return Err(GarageError::Message(
"--all was specified, cannot also specify blocks".into(),
)
.into());
}
let blocks = self.garage.block_manager.list_resync_errors()?;
for b in blocks.iter() {
self.garage.block_manager.resync.clear_backoff(&b.hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
} else {
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
self.garage.block_manager.resync.clear_backoff(&hash)?;
}
Ok(AdminRpc::Ok(format!(
"{} blocks returned in queue for a retry now (check logs to see results)",
blocks.len()
)))
}
} }
BlockOperation::Purge { .. } => { BlockOperation::Purge { yes, blocks } => {
Err(GarageError::Message("not implemented".into()).into()) if !yes {
return Err(GarageError::Message(
"Pass the --yes flag to confirm block purge operation.".into(),
)
.into());
}
let mut obj_dels = 0;
let mut ver_dels = 0;
for hash in blocks {
let hash = hex::decode(hash).ok_or_bad_request("invalid hash")?;
let hash = Hash::try_from(&hash).ok_or_bad_request("invalid hash")?;
let block_refs = self
.garage
.block_ref_table
.get_range(&hash, None, None, 10000, Default::default())
.await?;
for br in block_refs {
let version = match self
.garage
.version_table
.get(&br.version, &EmptyKey)
.await?
{
Some(v) => v,
None => continue,
};
if let Some(object) = self
.garage
.object_table
.get(&version.bucket_id, &version.key)
.await?
{
let ov = object.versions().iter().rev().find(|v| v.is_complete());
if let Some(ov) = ov {
if ov.uuid == br.version {
let del_uuid = gen_uuid();
let deleted_object = Object::new(
version.bucket_id,
version.key.clone(),
vec![ObjectVersion {
uuid: del_uuid,
timestamp: ov.timestamp + 1,
state: ObjectVersionState::Complete(
ObjectVersionData::DeleteMarker,
),
}],
);
self.garage.object_table.insert(&deleted_object).await?;
obj_dels += 1;
}
}
}
if !version.deleted.get() {
let deleted_version = Version::new(
version.uuid,
version.bucket_id,
version.key.clone(),
true,
);
self.garage.version_table.insert(&deleted_version).await?;
ver_dels += 1;
}
}
}
Ok(AdminRpc::Ok(format!(
"{} blocks were purged: {} object deletion markers added, {} versions marked deleted",
blocks.len(),
obj_dels,
ver_dels
)))
} }
} }
} }

View file

@ -575,6 +575,7 @@ pub enum BlockOperation {
#[structopt(long = "yes")] #[structopt(long = "yes")]
yes: bool, yes: bool,
/// Hashes of the block to purge /// Hashes of the block to purge
#[structopt(required = true)]
blocks: Vec<String>, blocks: Vec<String>,
}, },
} }

View file

@ -382,7 +382,7 @@ pub fn print_block_info(hash: Hash, refcount: u64, versions: Vec<Result<Version,
println!("Refcount: {}", refcount); println!("Refcount: {}", refcount);
println!(); println!();
let mut table = vec!["Version\tBucket\tPath\tDeleted".into()]; let mut table = vec!["Version\tBucket\tKey\tDeleted".into()];
let mut nondeleted_count = 0; let mut nondeleted_count = 0;
for v in versions.iter() { for v in versions.iter() {
match v { match v {