block manager: use data paths from layout

This commit is contained in:
Alex 2023-09-05 14:27:39 +02:00
parent 6c420c0880
commit 887b3233f4
3 changed files with 105 additions and 90 deletions

View file

@ -1,3 +1,5 @@
use std::path::PathBuf;
use bytes::Bytes; use bytes::Bytes;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use zstd::stream::{decode_all as zstd_decode, Encoder}; use zstd::stream::{decode_all as zstd_decode, Encoder};
@ -19,6 +21,13 @@ pub enum DataBlock {
Compressed(Bytes), Compressed(Bytes),
} }
pub enum DataBlockPath {
/// Uncompressed data fail
Plain(PathBuf),
/// Compressed data fail
Compressed(PathBuf),
}
impl DataBlock { impl DataBlock {
/// Query whether this block is compressed /// Query whether this block is compressed
pub fn is_compressed(&self) -> bool { pub fn is_compressed(&self) -> bool {

View file

@ -190,27 +190,30 @@ impl DataLayout {
}) })
} }
pub(crate) fn primary_data_dir(&self, hash: &Hash) -> PathBuf { pub(crate) fn primary_block_dir(&self, hash: &Hash) -> PathBuf {
let ipart = self.partition_from(hash); let ipart = self.partition_from(hash);
let idir = self.part_prim[ipart] as usize; let idir = self.part_prim[ipart] as usize;
self.data_dir_from(hash, &self.data_dirs[idir].path) self.block_dir_from(hash, &self.data_dirs[idir].path)
} }
pub(crate) fn secondary_data_dirs<'a>(&'a self, hash: &'a Hash) -> impl Iterator<Item=PathBuf> + 'a { pub(crate) fn secondary_block_dirs<'a>(
&'a self,
hash: &'a Hash,
) -> impl Iterator<Item = PathBuf> + 'a {
let ipart = self.partition_from(hash); let ipart = self.partition_from(hash);
self.part_sec[ipart] self.part_sec[ipart]
.iter() .iter()
.map(move |idir| self.data_dir_from(hash, &self.data_dirs[*idir as usize].path)) .map(move |idir| self.block_dir_from(hash, &self.data_dirs[*idir as usize].path))
} }
fn partition_from(&self, hash: &Hash) -> usize { fn partition_from(&self, hash: &Hash) -> usize {
u16::from_be_bytes([ u16::from_be_bytes([
hash.as_slice()[DPART_BYTES.0], hash.as_slice()[DPART_BYTES.0],
hash.as_slice()[DPART_BYTES.1] hash.as_slice()[DPART_BYTES.1],
]) as usize % DRIVE_NPART ]) as usize % DRIVE_NPART
} }
fn data_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf { fn block_dir_from(&self, hash: &Hash, dir: &PathBuf) -> PathBuf {
let mut path = dir.clone(); let mut path = dir.clone();
path.push(hex::encode(&hash.as_slice()[0..1])); path.push(hex::encode(&hash.as_slice()[0..1]));
path.push(hex::encode(&hash.as_slice()[1..2])); path.push(hex::encode(&hash.as_slice()[1..2]));

View file

@ -543,21 +543,25 @@ impl BlockManager {
} }
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> { async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
let mut path = self.block_path(hash); let block_path = match self.find_block(hash).await {
let compressed = match self.is_block_compressed(hash).await { Some(p) => p,
Ok(c) => c, None => {
Err(e) => {
// Not found but maybe we should have had it ?? // Not found but maybe we should have had it ??
self.resync self.resync
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?; .put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
return Err(Into::into(e)); return Err(Error::Message(format!(
"block {:?} not found on node",
hash
)));
} }
}; };
if compressed {
path.set_extension("zst");
}
let mut f = fs::File::open(&path).await?;
let (path, compressed) = match &block_path {
DataBlockPath::Plain(p) => (p, false),
DataBlockPath::Compressed(p) => (p, true),
};
let mut f = fs::File::open(&path).await?;
let mut data = vec![]; let mut data = vec![];
f.read_to_end(&mut data).await?; f.read_to_end(&mut data).await?;
drop(f); drop(f);
@ -571,11 +575,16 @@ impl BlockManager {
if data.verify(*hash).is_err() { if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1); self.metrics.corruption_counter.add(1);
warn!(
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
hash
);
self.lock_mutate(hash) self.lock_mutate(hash)
.await .await
.move_block_to_corrupted(hash, self) .move_block_to_corrupted(&block_path)
.await?; .await?;
self.resync.put_to_resync(hash, Duration::from_millis(0))?; self.resync.put_to_resync(hash, Duration::from_millis(0))?;
return Err(Error::CorruptData(*hash)); return Err(Error::CorruptData(*hash));
} }
@ -604,56 +613,51 @@ impl BlockManager {
.await .await
} }
/// Utility: gives the path of the directory in which a block should be found /// Utility: check if block is stored compressed.
fn block_dir(&self, hash: &Hash) -> PathBuf { async fn find_block(&self, hash: &Hash) -> Option<DataBlockPath> {
self.data_layout.primary_data_dir(hash) let dirs = Some(self.data_layout.primary_block_dir(hash))
} .into_iter()
.chain(self.data_layout.secondary_block_dirs(hash));
let filename = hex::encode(hash.as_ref());
/// Utility: give the full path where a block should be found, minus extension if block is for dir in dirs {
/// compressed let mut path = dir;
fn block_path(&self, hash: &Hash) -> PathBuf { path.push(&filename);
let mut path = self.block_dir(hash);
path.push(hex::encode(hash.as_ref()));
path
}
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
if self.compression_level.is_none() {
// If compression is disabled on node - check for the raw block // If compression is disabled on node - check for the raw block
// first and then a compressed one (as compression may have been // first and then a compressed one (as compression may have been
// previously enabled). // previously enabled).
match self.compression_level {
None => {
if fs::metadata(&path).await.is_ok() { if fs::metadata(&path).await.is_ok() {
return Ok(false); return Some(DataBlockPath::Plain(path));
} }
path.set_extension("zst"); path.set_extension("zst");
fs::metadata(&path).await.map(|_| true).map_err(Into::into)
}
_ => {
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() { if fs::metadata(&path).await.is_ok() {
return Ok(true); return Some(DataBlockPath::Compressed(path));
}
} else {
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Compressed(path));
} }
path.set_extension(""); path.set_extension("");
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Plain(path));
}
}
}
fs::metadata(&path).await.map(|_| false).map_err(Into::into) None
}
}
} }
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> { async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage"); let tracer = opentelemetry::global::tracer("garage");
self.mutation_lock[hash.as_slice()[0] as usize] let ilock = u16::from_be_bytes([hash.as_slice()[0], hash.as_slice()[1]]) as usize
% self.mutation_lock.len();
self.mutation_lock[ilock]
.lock() .lock()
.with_context(Context::current_with_span( .with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"), tracer.start(format!("Acquire mutation_lock #{}", ilock)),
)) ))
.await .await
} }
@ -688,7 +692,7 @@ impl BlockManagerLocked {
hash: &Hash, hash: &Hash,
mgr: &BlockManager, mgr: &BlockManager,
) -> Result<BlockStatus, Error> { ) -> Result<BlockStatus, Error> {
let exists = mgr.is_block_compressed(hash).await.is_ok(); let exists = mgr.find_block(hash).await.is_some();
let needed = mgr.rc.get_block_rc(hash)?; let needed = mgr.rc.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed }) Ok(BlockStatus { exists, needed })
@ -703,21 +707,17 @@ impl BlockManagerLocked {
let compressed = data.is_compressed(); let compressed = data.is_compressed();
let data = data.inner_buffer(); let data = data.inner_buffer();
let mut path = mgr.block_dir(hash); let mut path = mgr.data_layout.primary_block_dir(hash);
let directory = path.clone(); let directory = path.clone();
path.push(hex::encode(hash)); path.push(hex::encode(hash));
fs::create_dir_all(&directory).await?; fs::create_dir_all(&directory).await?;
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) { let to_delete = match (mgr.find_block(hash).await, compressed) {
(Ok(true), _) => return Ok(()), (Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
(Ok(false), false) => return Ok(()), (Some(DataBlockPath::Plain(_)), false) => return Ok(()),
(Ok(false), true) => { (Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
let path_to_delete = path.clone(); (None, compressed) => {
path.set_extension("zst");
Some(path_to_delete)
}
(Err(_), compressed) => {
if compressed { if compressed {
path.set_extension("zst"); path.set_extension("zst");
} }
@ -766,19 +766,20 @@ impl BlockManagerLocked {
Ok(()) Ok(())
} }
async fn move_block_to_corrupted(&self, hash: &Hash, mgr: &BlockManager) -> Result<(), Error> { async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
warn!( let (path, path2) = match block_path {
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.", DataBlockPath::Plain(p) => {
hash let mut p2 = p.clone();
); p2.set_extension("corrupted");
let mut path = mgr.block_path(hash); (p, p2)
let mut path2 = path.clone();
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
path2.set_extension("zst.corrupted");
} else {
path2.set_extension("corrupted");
} }
DataBlockPath::Compressed(p) => {
let mut p2 = p.clone();
p2.set_extension("zst.corrupted");
(p, p2)
}
};
fs::rename(path, path2).await?; fs::rename(path, path2).await?;
Ok(()) Ok(())
} }
@ -787,13 +788,15 @@ impl BlockManagerLocked {
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
if exists && needed.is_deletable() { if exists && needed.is_deletable() {
let mut path = mgr.block_path(hash); let path_opt = match mgr.find_block(hash).await {
if mgr.is_block_compressed(hash).await? { Some(DataBlockPath::Plain(p)) | Some(DataBlockPath::Compressed(p)) => Some(p),
path.set_extension("zst"); None => None,
} };
if let Some(path) = path_opt {
fs::remove_file(path).await?; fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1); mgr.metrics.delete_counter.add(1);
} }
}
Ok(()) Ok(())
} }
} }