block manager: get rid of check_block_status
This commit is contained in:
parent
3199cab4c8
commit
1b8c265c14
2 changed files with 45 additions and 56 deletions
|
@ -543,8 +543,8 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
|
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
|
||||||
let block_path = match self.find_block(hash).await {
|
match self.find_block(hash).await {
|
||||||
Some(p) => p,
|
Some(p) => self.read_block_from(hash, &p).await,
|
||||||
None => {
|
None => {
|
||||||
// Not found but maybe we should have had it ??
|
// Not found but maybe we should have had it ??
|
||||||
self.resync
|
self.resync
|
||||||
|
@ -554,9 +554,15 @@ impl BlockManager {
|
||||||
hash
|
hash
|
||||||
)));
|
)));
|
||||||
}
|
}
|
||||||
};
|
}
|
||||||
|
}
|
||||||
|
|
||||||
let (path, compressed) = match &block_path {
|
pub(crate) async fn read_block_from(
|
||||||
|
&self,
|
||||||
|
hash: &Hash,
|
||||||
|
block_path: &DataBlockPath,
|
||||||
|
) -> Result<DataBlock, Error> {
|
||||||
|
let (path, compressed) = match block_path {
|
||||||
DataBlockPath::Plain(p) => (p, false),
|
DataBlockPath::Plain(p) => (p, false),
|
||||||
DataBlockPath::Compressed(p) => (p, true),
|
DataBlockPath::Compressed(p) => (p, true),
|
||||||
};
|
};
|
||||||
|
@ -581,7 +587,7 @@ impl BlockManager {
|
||||||
);
|
);
|
||||||
self.lock_mutate(hash)
|
self.lock_mutate(hash)
|
||||||
.await
|
.await
|
||||||
.move_block_to_corrupted(&block_path)
|
.move_block_to_corrupted(block_path)
|
||||||
.await?;
|
.await?;
|
||||||
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
|
self.resync.put_to_resync(hash, Duration::from_millis(0))?;
|
||||||
|
|
||||||
|
@ -591,18 +597,11 @@ impl BlockManager {
|
||||||
Ok(data)
|
Ok(data)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Check if this node has a block and whether it needs it
|
|
||||||
pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
|
|
||||||
self.lock_mutate(hash)
|
|
||||||
.await
|
|
||||||
.check_block_status(hash, self)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Check if this node should have a block, but don't actually have it
|
/// Check if this node should have a block, but don't actually have it
|
||||||
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
|
||||||
let BlockStatus { exists, needed } = self.check_block_status(hash).await?;
|
let rc = self.rc.get_block_rc(hash)?;
|
||||||
Ok(needed.is_nonzero() && !exists)
|
let exists = self.find_block(hash).await.is_some();
|
||||||
|
Ok(rc.is_nonzero() && !exists)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Delete block if it is not needed anymore
|
/// Delete block if it is not needed anymore
|
||||||
|
@ -613,8 +612,8 @@ impl BlockManager {
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Utility: check if block is stored compressed.
|
/// Find the path where a block is currently stored
|
||||||
async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
|
pub(crate) async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
|
||||||
let dirs = Some(self.data_layout.primary_block_dir(hash))
|
let dirs = Some(self.data_layout.primary_block_dir(hash))
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.chain(self.data_layout.secondary_block_dirs(hash));
|
.chain(self.data_layout.secondary_block_dirs(hash));
|
||||||
|
@ -687,17 +686,6 @@ pub(crate) struct BlockStatus {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl BlockManagerLocked {
|
impl BlockManagerLocked {
|
||||||
async fn check_block_status(
|
|
||||||
&self,
|
|
||||||
hash: &Hash,
|
|
||||||
mgr: &BlockManager,
|
|
||||||
) -> Result<BlockStatus, Error> {
|
|
||||||
let exists = mgr.find_block(hash).await.is_some();
|
|
||||||
let needed = mgr.rc.get_block_rc(hash)?;
|
|
||||||
|
|
||||||
Ok(BlockStatus { exists, needed })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn write_block(
|
async fn write_block(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
|
@ -710,32 +698,32 @@ impl BlockManagerLocked {
|
||||||
let mut tgt_path = mgr.data_layout.primary_block_dir(hash);
|
let mut tgt_path = mgr.data_layout.primary_block_dir(hash);
|
||||||
let directory = tgt_path.clone();
|
let directory = tgt_path.clone();
|
||||||
tgt_path.push(hex::encode(hash));
|
tgt_path.push(hex::encode(hash));
|
||||||
if compressed {
|
if compressed {
|
||||||
tgt_path.set_extension("zst");
|
tgt_path.set_extension("zst");
|
||||||
}
|
}
|
||||||
|
|
||||||
fs::create_dir_all(&directory).await?;
|
fs::create_dir_all(&directory).await?;
|
||||||
|
|
||||||
let to_delete = match (mgr.find_block(hash).await, compressed) {
|
let to_delete = match (mgr.find_block(hash).await, compressed) {
|
||||||
// If the block is stored in the wrong directory,
|
// If the block is stored in the wrong directory,
|
||||||
// write it again at the correct path and delete the old path
|
// write it again at the correct path and delete the old path
|
||||||
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
|
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
|
||||||
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
|
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
|
||||||
|
|
||||||
// If the block is already stored not compressed but we have a compressed
|
// If the block is already stored not compressed but we have a compressed
|
||||||
// copy, write the compressed copy and delete the uncompressed one
|
// copy, write the compressed copy and delete the uncompressed one
|
||||||
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
|
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
|
||||||
|
|
||||||
// If the block is already stored compressed,
|
// If the block is already stored compressed,
|
||||||
// keep the stored copy, we have nothing to do
|
// keep the stored copy, we have nothing to do
|
||||||
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
|
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
|
||||||
|
|
||||||
// If the block is already stored not compressed,
|
// If the block is already stored not compressed,
|
||||||
// and we don't have a compressed copy either,
|
// and we don't have a compressed copy either,
|
||||||
// keep the stored copy, we have nothing to do
|
// keep the stored copy, we have nothing to do
|
||||||
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
|
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
|
||||||
|
|
||||||
// If the block isn't stored already, just store what is given to us
|
// If the block isn't stored already, just store what is given to us
|
||||||
(None, _) => None,
|
(None, _) => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -799,14 +787,12 @@ impl BlockManagerLocked {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
|
async fn delete_if_unneeded(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> {
|
||||||
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
|
let rc = mgr.rc.get_block_rc(hash)?;
|
||||||
|
if rc.is_deletable() {
|
||||||
if exists && needed.is_deletable() {
|
while let Some(path) = mgr.find_block(hash).await {
|
||||||
let path_opt = match mgr.find_block(hash).await {
|
let path = match path {
|
||||||
Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p),
|
DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
|
||||||
None => None,
|
};
|
||||||
};
|
|
||||||
if let Some(path) = path_opt {
|
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
mgr.metrics.delete_counter.add(1);
|
mgr.metrics.delete_counter.add(1);
|
||||||
}
|
}
|
||||||
|
|
|
@ -359,20 +359,23 @@ impl BlockResyncManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
|
async fn resync_block(&self, manager: &BlockManager, hash: &Hash) -> Result<(), Error> {
|
||||||
let BlockStatus { exists, needed } = manager.check_block_status(hash).await?;
|
let existing_path = manager.find_block(hash).await;
|
||||||
|
let exists = existing_path.is_some();
|
||||||
|
let rc = manager.rc.get_block_rc(hash)?;
|
||||||
|
|
||||||
if exists != needed.is_needed() || exists != needed.is_nonzero() {
|
if exists != rc.is_needed() || exists != rc.is_nonzero() {
|
||||||
debug!(
|
debug!(
|
||||||
"Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
|
"Resync block {:?}: exists {}, nonzero rc {}, deletable {}",
|
||||||
hash,
|
hash,
|
||||||
exists,
|
exists,
|
||||||
needed.is_nonzero(),
|
rc.is_nonzero(),
|
||||||
needed.is_deletable(),
|
rc.is_deletable(),
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
if exists && needed.is_deletable() {
|
if exists && rc.is_deletable() {
|
||||||
info!("Resync block {:?}: offloading and deleting", hash);
|
info!("Resync block {:?}: offloading and deleting", hash);
|
||||||
|
let existing_path = existing_path.unwrap();
|
||||||
|
|
||||||
let mut who = manager.replication.write_nodes(hash);
|
let mut who = manager.replication.write_nodes(hash);
|
||||||
if who.len() < manager.replication.write_quorum() {
|
if who.len() < manager.replication.write_quorum() {
|
||||||
|
@ -419,7 +422,7 @@ impl BlockResyncManager {
|
||||||
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
|
.add(1, &[KeyValue::new("to", format!("{:?}", node))]);
|
||||||
}
|
}
|
||||||
|
|
||||||
let block = manager.read_block(hash).await?;
|
let block = manager.read_block_from(hash, &existing_path).await?;
|
||||||
let (header, bytes) = block.into_parts();
|
let (header, bytes) = block.into_parts();
|
||||||
let put_block_message = Req::new(BlockRpc::PutBlock {
|
let put_block_message = Req::new(BlockRpc::PutBlock {
|
||||||
hash: *hash,
|
hash: *hash,
|
||||||
|
@ -451,7 +454,7 @@ impl BlockResyncManager {
|
||||||
manager.rc.clear_deleted_block_rc(hash)?;
|
manager.rc.clear_deleted_block_rc(hash)?;
|
||||||
}
|
}
|
||||||
|
|
||||||
if needed.is_nonzero() && !exists {
|
if rc.is_nonzero() && !exists {
|
||||||
info!(
|
info!(
|
||||||
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
|
"Resync block {:?}: fetching absent but needed block (refcount > 0)",
|
||||||
hash
|
hash
|
||||||
|
|
Loading…
Reference in a new issue