some refactoring on data read/write path #729
3 changed files with 88 additions and 78 deletions
|
@ -13,77 +13,99 @@ pub enum DataBlockHeader {
|
|||
Compressed,
|
||||
}
|
||||
|
||||
/// A possibly compressed block of data
|
||||
pub enum DataBlock {
|
||||
/// Uncompressed data
|
||||
Plain(Bytes),
|
||||
/// Data compressed with zstd
|
||||
Compressed(Bytes),
|
||||
#[derive(Debug)]
|
||||
pub struct DataBlockElem<T> {
|
||||
header: DataBlockHeader,
|
||||
elem: T,
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub enum DataBlockPath {
|
||||
/// Uncompressed data fail
|
||||
Plain(PathBuf),
|
||||
/// Compressed data fail
|
||||
Compressed(PathBuf),
|
||||
/// A possibly compressed block of data
|
||||
pub type DataBlock = DataBlockElem<Bytes>;
|
||||
|
||||
/// A path to a possibly compressed block of data
|
||||
pub type DataBlockPath = DataBlockElem<PathBuf>;
|
||||
|
||||
impl DataBlockHeader {
|
||||
pub fn is_compressed(&self) -> bool {
|
||||
matches!(self, DataBlockHeader::Compressed)
|
||||
}
|
||||
}
|
||||
|
||||
impl<T> DataBlockElem<T> {
|
||||
pub fn from_parts(header: DataBlockHeader, elem: T) -> Self {
|
||||
Self { header, elem }
|
||||
}
|
||||
|
||||
pub fn plain(elem: T) -> Self {
|
||||
Self {
|
||||
header: DataBlockHeader::Plain,
|
||||
elem,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn compressed(elem: T) -> Self {
|
||||
Self {
|
||||
header: DataBlockHeader::Compressed,
|
||||
elem,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn into_parts(self) -> (DataBlockHeader, T) {
|
||||
(self.header, self.elem)
|
||||
}
|
||||
|
||||
pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) {
|
||||
(self.header, &self.elem)
|
||||
}
|
||||
|
||||
/// Query whether this block is compressed
|
||||
pub fn is_compressed(&self) -> bool {
|
||||
self.header.is_compressed()
|
||||
}
|
||||
}
|
||||
|
||||
impl DataBlock {
|
||||
/// Query whether this block is compressed
|
||||
pub fn is_compressed(&self) -> bool {
|
||||
matches!(self, DataBlock::Compressed(_))
|
||||
}
|
||||
|
||||
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
|
||||
/// instead
|
||||
pub fn inner_buffer(&self) -> &[u8] {
|
||||
use DataBlock::*;
|
||||
let (Plain(ref res) | Compressed(ref res)) = self;
|
||||
res
|
||||
&self.elem
|
||||
}
|
||||
|
||||
/// Verify data integrity. Does not return the buffer content.
|
||||
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
||||
match self {
|
||||
DataBlock::Plain(data) => {
|
||||
if blake2sum(data) == hash {
|
||||
match self.header {
|
||||
DataBlockHeader::Plain => {
|
||||
if blake2sum(&self.elem) == hash {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
|
||||
.map_err(|_| Error::CorruptData(hash)),
|
||||
DataBlockHeader::Compressed => {
|
||||
zstd::stream::copy_decode(&self.elem[..], std::io::sink())
|
||||
.map_err(|_| Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
|
||||
tokio::task::spawn_blocking(move || {
|
||||
if let Some(level) = level {
|
||||
if let Ok(data) = zstd_encode(&data[..], level) {
|
||||
return DataBlock::Compressed(data.into());
|
||||
if let Ok(data_compressed) = zstd_encode(&data[..], level) {
|
||||
return DataBlock {
|
||||
header: DataBlockHeader::Compressed,
|
||||
elem: data_compressed.into(),
|
||||
};
|
||||
}
|
||||
}
|
||||
DataBlock::Plain(data)
|
||||
DataBlock {
|
||||
header: DataBlockHeader::Plain,
|
||||
elem: data.into(),
|
||||
}
|
||||
})
|
||||
.await
|
||||
.unwrap()
|
||||
}
|
||||
|
||||
pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
|
||||
match self {
|
||||
DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
|
||||
DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
|
||||
match h {
|
||||
DataBlockHeader::Plain => DataBlock::Plain(bytes),
|
||||
DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
||||
|
|
|
@ -547,10 +547,7 @@ impl BlockManager {
|
|||
hash: &Hash,
|
||||
block_path: &DataBlockPath,
|
||||
) -> Result<DataBlock, Error> {
|
||||
let (path, compressed) = match block_path {
|
||||
DataBlockPath::Plain(p) => (p, false),
|
||||
DataBlockPath::Compressed(p) => (p, true),
|
||||
};
|
||||
let (header, path) = block_path.as_parts_ref();
|
||||
|
||||
let mut f = fs::File::open(&path).await?;
|
||||
let mut data = vec![];
|
||||
|
@ -558,11 +555,7 @@ impl BlockManager {
|
|||
self.metrics.bytes_read.add(data.len() as u64);
|
||||
drop(f);
|
||||
|
||||
let data = if compressed {
|
||||
DataBlock::Compressed(data.into())
|
||||
} else {
|
||||
DataBlock::Plain(data.into())
|
||||
};
|
||||
let data = DataBlock::from_parts(header, data.into());
|
||||
|
||||
if data.verify(*hash).is_err() {
|
||||
self.metrics.corruption_counter.add(1);
|
||||
|
@ -615,20 +608,20 @@ impl BlockManager {
|
|||
// first and then a compressed one (as compression may have been
|
||||
// previously enabled).
|
||||
if fs::metadata(&path).await.is_ok() {
|
||||
return Some(DataBlockPath::Plain(path));
|
||||
return Some(DataBlockPath::plain(path));
|
||||
}
|
||||
path.set_extension("zst");
|
||||
if fs::metadata(&path).await.is_ok() {
|
||||
return Some(DataBlockPath::Compressed(path));
|
||||
return Some(DataBlockPath::compressed(path));
|
||||
}
|
||||
} else {
|
||||
path.set_extension("zst");
|
||||
if fs::metadata(&path).await.is_ok() {
|
||||
return Some(DataBlockPath::Compressed(path));
|
||||
return Some(DataBlockPath::compressed(path));
|
||||
}
|
||||
path.set_extension("");
|
||||
if fs::metadata(&path).await.is_ok() {
|
||||
return Some(DataBlockPath::Plain(path));
|
||||
return Some(DataBlockPath::plain(path));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -709,24 +702,25 @@ impl BlockManagerLocked {
|
|||
tgt_path.set_extension("zst");
|
||||
}
|
||||
|
||||
let to_delete = match (existing_path, compressed) {
|
||||
let existing_info = existing_path.map(|x| x.into_parts());
|
||||
let to_delete = match (existing_info, compressed) {
|
||||
// If the block is stored in the wrong directory,
|
||||
// write it again at the correct path and delete the old path
|
||||
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
|
||||
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
|
||||
(Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p),
|
||||
(Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p),
|
||||
|
||||
// If the block is already stored not compressed but we have a compressed
|
||||
// copy, write the compressed copy and delete the uncompressed one
|
||||
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
|
||||
(Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path),
|
||||
|
||||
// If the block is already stored compressed,
|
||||
// keep the stored copy, we have nothing to do
|
||||
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
|
||||
(Some((DataBlockHeader::Compressed, _)), _) => return Ok(()),
|
||||
|
||||
// If the block is already stored not compressed,
|
||||
// and we don't have a compressed copy either,
|
||||
// keep the stored copy, we have nothing to do
|
||||
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
|
||||
(Some((DataBlockHeader::Plain, _)), false) => return Ok(()),
|
||||
|
||||
// If the block isn't stored already, just store what is given to us
|
||||
(None, _) => None,
|
||||
|
@ -778,18 +772,14 @@ impl BlockManagerLocked {
|
|||
}
|
||||
|
||||
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
|
||||
let (path, path2) = match block_path {
|
||||
DataBlockPath::Plain(p) => {
|
||||
let mut p2 = p.clone();
|
||||
p2.set_extension("corrupted");
|
||||
(p, p2)
|
||||
}
|
||||
DataBlockPath::Compressed(p) => {
|
||||
let mut p2 = p.clone();
|
||||
p2.set_extension("zst.corrupted");
|
||||
(p, p2)
|
||||
}
|
||||
};
|
||||
let (header, path) = block_path.as_parts_ref();
|
||||
|
||||
let mut path2 = path.clone();
|
||||
if header.is_compressed() {
|
||||
path2.set_extension("zst.corrupted");
|
||||
} else {
|
||||
path2.set_extension("corrupted");
|
||||
}
|
||||
|
||||
fs::rename(path, path2).await?;
|
||||
Ok(())
|
||||
|
@ -799,9 +789,7 @@ impl BlockManagerLocked {
|
|||
let rc = mgr.rc.get_block_rc(hash)?;
|
||||
if rc.is_deletable() {
|
||||
while let Some(path) = mgr.find_block(hash).await {
|
||||
let path = match path {
|
||||
DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
|
||||
};
|
||||
let (_header, path) = path.as_parts_ref();
|
||||
fs::remove_file(path).await?;
|
||||
mgr.metrics.delete_counter.add(1);
|
||||
}
|
||||
|
|
|
@ -584,8 +584,8 @@ impl Worker for RebalanceWorker {
|
|||
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
|
||||
if path.ancestors().all(|x| x != prim_loc) {
|
||||
let block_path = match path.extension() {
|
||||
None => DataBlockPath::Plain(path.clone()),
|
||||
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
|
||||
None => DataBlockPath::plain(path.clone()),
|
||||
Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()),
|
||||
_ => {
|
||||
warn!("not rebalancing file: {}", path.to_string_lossy());
|
||||
return Ok(WorkerState::Busy);
|
||||
|
|
Loading…
Reference in a new issue