Compare commits
No commits in common. "65853a48634d662809eee5dafbe48535d400f4a0" and "0038ca8a78f147b9c0ec07ef0121773aaf110dc9" have entirely different histories.
65853a4863
...
0038ca8a78
13 changed files with 22 additions and 302 deletions
|
@ -141,7 +141,4 @@ blocks may still be held by Garage. If you suspect that such corruption has occu
|
||||||
in your cluster, you can run one of the following repair procedures:
|
in your cluster, you can run one of the following repair procedures:
|
||||||
|
|
||||||
- `garage repair versions`: checks that all versions belong to a non-deleted object, and purges any orphan version
|
- `garage repair versions`: checks that all versions belong to a non-deleted object, and purges any orphan version
|
||||||
|
- `garage repair block_refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected)
|
||||||
- `garage repair block-refs`: checks that all block references belong to a non-deleted object version, and purges any orphan block reference (this will then allow the blocks to be garbage-collected)
|
|
||||||
|
|
||||||
- `garage repair block-rc`: checks that the reference counters for blocks are in sync with the actual number of non-deleted entries in the block reference table
|
|
||||||
|
|
|
@ -11,4 +11,3 @@ mod metrics;
|
||||||
mod rc;
|
mod rc;
|
||||||
|
|
||||||
pub use block::zstd_encode;
|
pub use block::zstd_encode;
|
||||||
pub use rc::CalculateRefcount;
|
|
||||||
|
|
|
@ -88,7 +88,7 @@ pub struct BlockManager {
|
||||||
|
|
||||||
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
|
mutation_lock: Vec<Mutex<BlockManagerLocked>>,
|
||||||
|
|
||||||
pub rc: BlockRc,
|
pub(crate) rc: BlockRc,
|
||||||
pub resync: BlockResyncManager,
|
pub resync: BlockResyncManager,
|
||||||
|
|
||||||
pub(crate) system: Arc<System>,
|
pub(crate) system: Arc<System>,
|
||||||
|
@ -156,7 +156,7 @@ impl BlockManager {
|
||||||
|
|
||||||
let metrics = BlockManagerMetrics::new(
|
let metrics = BlockManagerMetrics::new(
|
||||||
config.compression_level,
|
config.compression_level,
|
||||||
rc.rc_table.clone(),
|
rc.rc.clone(),
|
||||||
resync.queue.clone(),
|
resync.queue.clone(),
|
||||||
resync.errors.clone(),
|
resync.errors.clone(),
|
||||||
);
|
);
|
||||||
|
@ -229,12 +229,6 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Initialization: set how block references are recalculated
|
|
||||||
/// for repair operations
|
|
||||||
pub fn set_recalc_rc(&self, recalc: Vec<CalculateRefcount>) {
|
|
||||||
self.rc.recalc_rc.store(Some(Arc::new(recalc)));
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Ask nodes that might have a (possibly compressed) block for it
|
/// Ask nodes that might have a (possibly compressed) block for it
|
||||||
/// Return it as a stream with a header
|
/// Return it as a stream with a header
|
||||||
async fn rpc_get_raw_block_streaming(
|
async fn rpc_get_raw_block_streaming(
|
||||||
|
@ -322,9 +316,9 @@ impl BlockManager {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
let err = Error::MissingBlock(*hash);
|
let msg = format!("Get block {:?}: no node returned a valid block", hash);
|
||||||
debug!("{}", err);
|
debug!("{}", msg);
|
||||||
Err(err)
|
Err(Error::Message(msg))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Public interface ----
|
// ---- Public interface ----
|
||||||
|
@ -387,7 +381,7 @@ impl BlockManager {
|
||||||
|
|
||||||
/// Get number of items in the refcount table
|
/// Get number of items in the refcount table
|
||||||
pub fn rc_len(&self) -> Result<usize, Error> {
|
pub fn rc_len(&self) -> Result<usize, Error> {
|
||||||
Ok(self.rc.rc_table.len()?)
|
Ok(self.rc.rc.len()?)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send command to start/stop/manager scrub worker
|
/// Send command to start/stop/manager scrub worker
|
||||||
|
|
|
@ -1,7 +1,5 @@
|
||||||
use std::convert::TryInto;
|
use std::convert::TryInto;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
|
||||||
|
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -10,20 +8,13 @@ use garage_util::time::*;
|
||||||
|
|
||||||
use crate::manager::BLOCK_GC_DELAY;
|
use crate::manager::BLOCK_GC_DELAY;
|
||||||
|
|
||||||
pub type CalculateRefcount =
|
|
||||||
Box<dyn Fn(&db::Transaction, &Hash) -> db::TxResult<usize, Error> + Send + Sync>;
|
|
||||||
|
|
||||||
pub struct BlockRc {
|
pub struct BlockRc {
|
||||||
pub rc_table: db::Tree,
|
pub(crate) rc: db::Tree,
|
||||||
pub(crate) recalc_rc: ArcSwapOption<Vec<CalculateRefcount>>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockRc {
|
impl BlockRc {
|
||||||
pub(crate) fn new(rc: db::Tree) -> Self {
|
pub(crate) fn new(rc: db::Tree) -> Self {
|
||||||
Self {
|
Self { rc }
|
||||||
rc_table: rc,
|
|
||||||
recalc_rc: ArcSwapOption::new(None),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Increment the reference counter associated to a hash.
|
/// Increment the reference counter associated to a hash.
|
||||||
|
@ -33,9 +24,9 @@ impl BlockRc {
|
||||||
tx: &mut db::Transaction,
|
tx: &mut db::Transaction,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
) -> db::TxOpResult<bool> {
|
) -> db::TxOpResult<bool> {
|
||||||
let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
|
let old_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
||||||
match old_rc.increment().serialize() {
|
match old_rc.increment().serialize() {
|
||||||
Some(x) => tx.insert(&self.rc_table, hash, x)?,
|
Some(x) => tx.insert(&self.rc, hash, x)?,
|
||||||
None => unreachable!(),
|
None => unreachable!(),
|
||||||
};
|
};
|
||||||
Ok(old_rc.is_zero())
|
Ok(old_rc.is_zero())
|
||||||
|
@ -48,28 +39,28 @@ impl BlockRc {
|
||||||
tx: &mut db::Transaction,
|
tx: &mut db::Transaction,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
) -> db::TxOpResult<bool> {
|
) -> db::TxOpResult<bool> {
|
||||||
let new_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?).decrement();
|
let new_rc = RcEntry::parse_opt(tx.get(&self.rc, hash)?).decrement();
|
||||||
match new_rc.serialize() {
|
match new_rc.serialize() {
|
||||||
Some(x) => tx.insert(&self.rc_table, hash, x)?,
|
Some(x) => tx.insert(&self.rc, hash, x)?,
|
||||||
None => tx.remove(&self.rc_table, hash)?,
|
None => tx.remove(&self.rc, hash)?,
|
||||||
};
|
};
|
||||||
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
|
Ok(matches!(new_rc, RcEntry::Deletable { .. }))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Read a block's reference count
|
/// Read a block's reference count
|
||||||
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
|
pub(crate) fn get_block_rc(&self, hash: &Hash) -> Result<RcEntry, Error> {
|
||||||
Ok(RcEntry::parse_opt(self.rc_table.get(hash.as_ref())?))
|
Ok(RcEntry::parse_opt(self.rc.get(hash.as_ref())?))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete an entry in the RC table if it is deletable and the
|
/// Delete an entry in the RC table if it is deletable and the
|
||||||
/// deletion time has passed
|
/// deletion time has passed
|
||||||
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
pub(crate) fn clear_deleted_block_rc(&self, hash: &Hash) -> Result<(), Error> {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
self.rc_table.db().transaction(|tx| {
|
self.rc.db().transaction(|tx| {
|
||||||
let rcval = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
|
let rcval = RcEntry::parse_opt(tx.get(&self.rc, hash)?);
|
||||||
match rcval {
|
match rcval {
|
||||||
RcEntry::Deletable { at_time } if now > at_time => {
|
RcEntry::Deletable { at_time } if now > at_time => {
|
||||||
tx.remove(&self.rc_table, hash)?;
|
tx.remove(&self.rc, hash)?;
|
||||||
}
|
}
|
||||||
_ => (),
|
_ => (),
|
||||||
};
|
};
|
||||||
|
@ -77,58 +68,6 @@ impl BlockRc {
|
||||||
})?;
|
})?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Recalculate the reference counter of a block
|
|
||||||
/// to fix potential inconsistencies
|
|
||||||
pub fn recalculate_rc(&self, hash: &Hash) -> Result<(usize, bool), Error> {
|
|
||||||
if let Some(recalc_fns) = self.recalc_rc.load().as_ref() {
|
|
||||||
trace!("Repair block RC for {:?}", hash);
|
|
||||||
let res = self
|
|
||||||
.rc_table
|
|
||||||
.db()
|
|
||||||
.transaction(|tx| {
|
|
||||||
let mut cnt = 0;
|
|
||||||
for f in recalc_fns.iter() {
|
|
||||||
cnt += f(&tx, hash)?;
|
|
||||||
}
|
|
||||||
let old_rc = RcEntry::parse_opt(tx.get(&self.rc_table, hash)?);
|
|
||||||
trace!(
|
|
||||||
"Block RC for {:?}: stored={}, calculated={}",
|
|
||||||
hash,
|
|
||||||
old_rc.as_u64(),
|
|
||||||
cnt
|
|
||||||
);
|
|
||||||
if cnt as u64 != old_rc.as_u64() {
|
|
||||||
warn!(
|
|
||||||
"Fixing inconsistent block RC for {:?}: was {}, should be {}",
|
|
||||||
hash,
|
|
||||||
old_rc.as_u64(),
|
|
||||||
cnt
|
|
||||||
);
|
|
||||||
let new_rc = if cnt > 0 {
|
|
||||||
RcEntry::Present { count: cnt as u64 }
|
|
||||||
} else {
|
|
||||||
RcEntry::Deletable {
|
|
||||||
at_time: now_msec() + BLOCK_GC_DELAY.as_millis() as u64,
|
|
||||||
}
|
|
||||||
};
|
|
||||||
tx.insert(&self.rc_table, hash, new_rc.serialize().unwrap())?;
|
|
||||||
Ok((cnt, true))
|
|
||||||
} else {
|
|
||||||
Ok((cnt, false))
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.map_err(Error::from);
|
|
||||||
if let Err(e) = &res {
|
|
||||||
error!("Failed to fix RC for block {:?}: {}", hash, e);
|
|
||||||
}
|
|
||||||
res
|
|
||||||
} else {
|
|
||||||
Err(Error::Message(
|
|
||||||
"Block RC recalculation is not available at this point".into(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Describes the state of the reference counter for a block
|
/// Describes the state of the reference counter for a block
|
||||||
|
|
|
@ -107,7 +107,7 @@ impl Worker for RepairWorker {
|
||||||
for entry in self
|
for entry in self
|
||||||
.manager
|
.manager
|
||||||
.rc
|
.rc
|
||||||
.rc_table
|
.rc
|
||||||
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
|
.range::<&[u8], _>((start_bound, Bound::Unbounded))?
|
||||||
{
|
{
|
||||||
let (hash, _) = entry?;
|
let (hash, _) = entry?;
|
||||||
|
|
|
@ -367,13 +367,6 @@ impl BlockResyncManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists && rc.is_deletable() {
|
if exists && rc.is_deletable() {
|
||||||
if manager.rc.recalculate_rc(hash)?.0 > 0 {
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"Refcount for block {:?} was inconsistent, retrying later",
|
|
||||||
hash
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Resync block {:?}: offloading and deleting", hash);
|
info!("Resync block {:?}: offloading and deleting", hash);
|
||||||
let existing_path = existing_path.unwrap();
|
let existing_path = existing_path.unwrap();
|
||||||
|
|
||||||
|
@ -460,15 +453,7 @@ impl BlockResyncManager {
|
||||||
hash
|
hash
|
||||||
);
|
);
|
||||||
|
|
||||||
let block_data = manager.rpc_get_raw_block(hash, None).await;
|
let block_data = manager.rpc_get_raw_block(hash, None).await?;
|
||||||
if matches!(block_data, Err(Error::MissingBlock(_))) {
|
|
||||||
warn!(
|
|
||||||
"Could not fetch needed block {:?}, no node returned valid data. Checking that refcount is correct.",
|
|
||||||
hash
|
|
||||||
);
|
|
||||||
manager.rc.recalculate_rc(hash)?;
|
|
||||||
}
|
|
||||||
let block_data = block_data?;
|
|
||||||
|
|
||||||
manager.metrics.resync_recv_counter.add(1);
|
manager.metrics.resync_recv_counter.add(1);
|
||||||
|
|
||||||
|
|
|
@ -473,11 +473,8 @@ pub enum RepairWhat {
|
||||||
#[structopt(name = "mpu", version = garage_version())]
|
#[structopt(name = "mpu", version = garage_version())]
|
||||||
MultipartUploads,
|
MultipartUploads,
|
||||||
/// Repropagate version deletions to the block ref table
|
/// Repropagate version deletions to the block ref table
|
||||||
#[structopt(name = "block-refs", version = garage_version())]
|
#[structopt(name = "block_refs", version = garage_version())]
|
||||||
BlockRefs,
|
BlockRefs,
|
||||||
/// Recalculate block reference counters
|
|
||||||
#[structopt(name = "block-rc", version = garage_version())]
|
|
||||||
BlockRc,
|
|
||||||
/// Verify integrity of all blocks on disc
|
/// Verify integrity of all blocks on disc
|
||||||
#[structopt(name = "scrub", version = garage_version())]
|
#[structopt(name = "scrub", version = garage_version())]
|
||||||
Scrub {
|
Scrub {
|
||||||
|
|
|
@ -451,7 +451,7 @@ pub fn print_block_info(
|
||||||
if refcount != nondeleted_count {
|
if refcount != nondeleted_count {
|
||||||
println!();
|
println!();
|
||||||
println!(
|
println!(
|
||||||
"Warning: refcount does not match number of non-deleted versions, you should try `garage repair block-rc`."
|
"Warning: refcount does not match number of non-deleted versions (see issue #644)."
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,7 +4,6 @@ use std::time::Duration;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use garage_block::manager::BlockManager;
|
|
||||||
use garage_block::repair::ScrubWorkerCommand;
|
use garage_block::repair::ScrubWorkerCommand;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
@ -17,14 +16,11 @@ use garage_table::replication::*;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::data::*;
|
|
||||||
use garage_util::error::Error;
|
use garage_util::error::Error;
|
||||||
use garage_util::migrate::Migrate;
|
use garage_util::migrate::Migrate;
|
||||||
|
|
||||||
use crate::*;
|
use crate::*;
|
||||||
|
|
||||||
const RC_REPAIR_ITER_COUNT: usize = 64;
|
|
||||||
|
|
||||||
pub async fn launch_online_repair(
|
pub async fn launch_online_repair(
|
||||||
garage: &Arc<Garage>,
|
garage: &Arc<Garage>,
|
||||||
bg: &BackgroundRunner,
|
bg: &BackgroundRunner,
|
||||||
|
@ -51,13 +47,6 @@ pub async fn launch_online_repair(
|
||||||
info!("Repairing the block refs table");
|
info!("Repairing the block refs table");
|
||||||
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
|
bg.spawn_worker(TableRepairWorker::new(garage.clone(), RepairBlockRefs));
|
||||||
}
|
}
|
||||||
RepairWhat::BlockRc => {
|
|
||||||
info!("Repairing the block reference counters");
|
|
||||||
bg.spawn_worker(BlockRcRepair::new(
|
|
||||||
garage.block_manager.clone(),
|
|
||||||
garage.block_ref_table.clone(),
|
|
||||||
));
|
|
||||||
}
|
|
||||||
RepairWhat::Blocks => {
|
RepairWhat::Blocks => {
|
||||||
info!("Repairing the stored blocks");
|
info!("Repairing the stored blocks");
|
||||||
bg.spawn_worker(garage_block::repair::RepairWorker::new(
|
bg.spawn_worker(garage_block::repair::RepairWorker::new(
|
||||||
|
@ -293,98 +282,3 @@ impl TableRepair for RepairMpu {
|
||||||
Ok(false)
|
Ok(false)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===== block reference counter repair =====
|
|
||||||
|
|
||||||
pub struct BlockRcRepair {
|
|
||||||
block_manager: Arc<BlockManager>,
|
|
||||||
block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
|
|
||||||
cursor: Hash,
|
|
||||||
counter: u64,
|
|
||||||
repairs: u64,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl BlockRcRepair {
|
|
||||||
fn new(
|
|
||||||
block_manager: Arc<BlockManager>,
|
|
||||||
block_ref_table: Arc<Table<BlockRefTable, TableShardedReplication>>,
|
|
||||||
) -> Self {
|
|
||||||
Self {
|
|
||||||
block_manager,
|
|
||||||
block_ref_table,
|
|
||||||
cursor: [0u8; 32].into(),
|
|
||||||
counter: 0,
|
|
||||||
repairs: 0,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Worker for BlockRcRepair {
|
|
||||||
fn name(&self) -> String {
|
|
||||||
format!("Block refcount repair worker")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn status(&self) -> WorkerStatus {
|
|
||||||
WorkerStatus {
|
|
||||||
progress: Some(format!("{} ({})", self.counter, self.repairs)),
|
|
||||||
..Default::default()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
|
||||||
for _i in 0..RC_REPAIR_ITER_COUNT {
|
|
||||||
let next1 = self
|
|
||||||
.block_manager
|
|
||||||
.rc
|
|
||||||
.rc_table
|
|
||||||
.range(self.cursor.as_slice()..)?
|
|
||||||
.next()
|
|
||||||
.transpose()?
|
|
||||||
.map(|(k, _)| Hash::try_from(k.as_slice()).unwrap());
|
|
||||||
let next2 = self
|
|
||||||
.block_ref_table
|
|
||||||
.data
|
|
||||||
.store
|
|
||||||
.range(self.cursor.as_slice()..)?
|
|
||||||
.next()
|
|
||||||
.transpose()?
|
|
||||||
.map(|(k, _)| Hash::try_from(&k[..32]).unwrap());
|
|
||||||
let next = match (next1, next2) {
|
|
||||||
(Some(k1), Some(k2)) => std::cmp::min(k1, k2),
|
|
||||||
(Some(k), None) | (None, Some(k)) => k,
|
|
||||||
(None, None) => {
|
|
||||||
info!(
|
|
||||||
"{}: finished, done {}, fixed {}",
|
|
||||||
self.name(),
|
|
||||||
self.counter,
|
|
||||||
self.repairs
|
|
||||||
);
|
|
||||||
return Ok(WorkerState::Done);
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
if self.block_manager.rc.recalculate_rc(&next)?.1 {
|
|
||||||
self.repairs += 1;
|
|
||||||
}
|
|
||||||
self.counter += 1;
|
|
||||||
if let Some(next_incr) = next.increment() {
|
|
||||||
self.cursor = next_incr;
|
|
||||||
} else {
|
|
||||||
info!(
|
|
||||||
"{}: finished, done {}, fixed {}",
|
|
||||||
self.name(),
|
|
||||||
self.counter,
|
|
||||||
self.repairs
|
|
||||||
);
|
|
||||||
return Ok(WorkerState::Done);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(WorkerState::Busy)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn wait_for_work(&mut self) -> WorkerState {
|
|
||||||
unreachable!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -247,14 +247,6 @@ impl Garage {
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
|
||||||
|
|
||||||
// ---- setup block refcount recalculation ----
|
|
||||||
// this function can be used to fix inconsistencies in the RC table
|
|
||||||
block_manager.set_recalc_rc(vec![
|
|
||||||
block_ref_recount_fn(&block_ref_table),
|
|
||||||
// other functions could be added here if we had other tables
|
|
||||||
// that hold references to data blocks
|
|
||||||
]);
|
|
||||||
|
|
||||||
// -- done --
|
// -- done --
|
||||||
Ok(Arc::new(Self {
|
Ok(Arc::new(Self {
|
||||||
config,
|
config,
|
||||||
|
|
|
@ -3,12 +3,8 @@ use std::sync::Arc;
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
|
||||||
use garage_util::migrate::Migrate;
|
|
||||||
|
|
||||||
use garage_block::CalculateRefcount;
|
|
||||||
use garage_table::crdt::Crdt;
|
use garage_table::crdt::Crdt;
|
||||||
use garage_table::replication::TableShardedReplication;
|
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
|
||||||
use garage_block::manager::*;
|
use garage_block::manager::*;
|
||||||
|
@ -88,38 +84,3 @@ impl TableSchema for BlockRefTable {
|
||||||
filter.apply(entry.deleted.get())
|
filter.apply(entry.deleted.get())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn block_ref_recount_fn(
|
|
||||||
block_ref_table: &Arc<Table<BlockRefTable, TableShardedReplication>>,
|
|
||||||
) -> CalculateRefcount {
|
|
||||||
let table = Arc::downgrade(block_ref_table);
|
|
||||||
Box::new(move |tx: &db::Transaction, block: &Hash| {
|
|
||||||
let table = table
|
|
||||||
.upgrade()
|
|
||||||
.ok_or_message("cannot upgrade weak ptr to block_ref_table")
|
|
||||||
.map_err(db::TxError::Abort)?;
|
|
||||||
Ok(calculate_refcount(&table, tx, block)?)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn calculate_refcount(
|
|
||||||
block_ref_table: &Table<BlockRefTable, TableShardedReplication>,
|
|
||||||
tx: &db::Transaction,
|
|
||||||
block: &Hash,
|
|
||||||
) -> db::TxResult<usize, Error> {
|
|
||||||
let mut result = 0;
|
|
||||||
for entry in tx.range(&block_ref_table.data.store, block.as_slice()..)? {
|
|
||||||
let (key, value) = entry?;
|
|
||||||
if &key[..32] != block.as_slice() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
let value = BlockRef::decode(&value)
|
|
||||||
.ok_or_message("could not decode block_ref")
|
|
||||||
.map_err(db::TxError::Abort)?;
|
|
||||||
assert_eq!(value.block, *block);
|
|
||||||
if !value.deleted.get() {
|
|
||||||
result += 1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(result)
|
|
||||||
}
|
|
||||||
|
|
|
@ -83,19 +83,6 @@ impl FixedBytes32 {
|
||||||
ret.copy_from_slice(by);
|
ret.copy_from_slice(by);
|
||||||
Some(Self(ret))
|
Some(Self(ret))
|
||||||
}
|
}
|
||||||
/// Return the next hash
|
|
||||||
pub fn increment(&self) -> Option<Self> {
|
|
||||||
let mut ret = *self;
|
|
||||||
for byte in ret.0.iter_mut().rev() {
|
|
||||||
if *byte == u8::MAX {
|
|
||||||
*byte = 0;
|
|
||||||
} else {
|
|
||||||
*byte = *byte + 1;
|
|
||||||
return Some(ret);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return None;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl From<garage_net::NodeID> for FixedBytes32 {
|
impl From<garage_net::NodeID> for FixedBytes32 {
|
||||||
|
@ -153,25 +140,3 @@ pub fn fasthash(data: &[u8]) -> FastHash {
|
||||||
pub fn gen_uuid() -> Uuid {
|
pub fn gen_uuid() -> Uuid {
|
||||||
rand::thread_rng().gen::<[u8; 32]>().into()
|
rand::thread_rng().gen::<[u8; 32]>().into()
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod test {
|
|
||||||
use super::*;
|
|
||||||
|
|
||||||
#[test]
|
|
||||||
fn test_increment() {
|
|
||||||
let zero: FixedBytes32 = [0u8; 32].into();
|
|
||||||
let mut one: FixedBytes32 = [0u8; 32].into();
|
|
||||||
one.0[31] = 1;
|
|
||||||
let max: FixedBytes32 = [0xFFu8; 32].into();
|
|
||||||
assert_eq!(zero.increment(), Some(one));
|
|
||||||
assert_eq!(max.increment(), None);
|
|
||||||
|
|
||||||
let mut test: FixedBytes32 = [0u8; 32].into();
|
|
||||||
let i = 0x198DF97209F8FFFFu64;
|
|
||||||
test.0[24..32].copy_from_slice(&u64::to_be_bytes(i));
|
|
||||||
let mut test2: FixedBytes32 = [0u8; 32].into();
|
|
||||||
test2.0[24..32].copy_from_slice(&u64::to_be_bytes(i + 1));
|
|
||||||
assert_eq!(test.increment(), Some(test2));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
|
@ -70,9 +70,6 @@ pub enum Error {
|
||||||
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
|
#[error(display = "Corrupt data: does not match hash {:?}", _0)]
|
||||||
CorruptData(Hash),
|
CorruptData(Hash),
|
||||||
|
|
||||||
#[error(display = "Missing block {:?}: no node returned a valid block", _0)]
|
|
||||||
MissingBlock(Hash),
|
|
||||||
|
|
||||||
#[error(display = "{}", _0)]
|
#[error(display = "{}", _0)]
|
||||||
Message(String),
|
Message(String),
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue