Compare commits

...

2 commits

Author SHA1 Message Date
e9c42bca34
[refactor-block] add DataBlockStream type
All checks were successful
ci/woodpecker/pr/debug Pipeline was successful
ci/woodpecker/push/debug Pipeline was successful
2024-02-23 12:22:29 +01:00
cd1069c1d4
[refactor-block] refactor DataBlock and DataBlockPath 2024-02-23 12:15:52 +01:00
3 changed files with 106 additions and 92 deletions

View file

@ -7,83 +7,110 @@ use zstd::stream::Encoder;
use garage_util::data::*;
use garage_util::error::*;
use garage_net::stream::ByteStream;
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
pub enum DataBlockHeader {
Plain,
Compressed,
}
/// A possibly compressed block of data
pub enum DataBlock {
/// Uncompressed data
Plain(Bytes),
/// Data compressed with zstd
Compressed(Bytes),
#[derive(Debug)]
pub struct DataBlockElem<T> {
header: DataBlockHeader,
elem: T,
}
#[derive(Debug)]
pub enum DataBlockPath {
/// Uncompressed data fail
Plain(PathBuf),
/// Compressed data fail
Compressed(PathBuf),
/// A possibly compressed block of data
pub type DataBlock = DataBlockElem<Bytes>;
/// A path to a possibly compressed block of data
pub type DataBlockPath = DataBlockElem<PathBuf>;
/// A stream of possibly compressed block data
pub type DataBlockStream = DataBlockElem<ByteStream>;
impl DataBlockHeader {
pub fn is_compressed(&self) -> bool {
matches!(self, DataBlockHeader::Compressed)
}
}
impl<T> DataBlockElem<T> {
pub fn from_parts(header: DataBlockHeader, elem: T) -> Self {
Self { header, elem }
}
pub fn plain(elem: T) -> Self {
Self {
header: DataBlockHeader::Plain,
elem,
}
}
pub fn compressed(elem: T) -> Self {
Self {
header: DataBlockHeader::Compressed,
elem,
}
}
pub fn into_parts(self) -> (DataBlockHeader, T) {
(self.header, self.elem)
}
pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) {
(self.header, &self.elem)
}
/// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {
self.header.is_compressed()
}
}
impl DataBlock {
/// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {
matches!(self, DataBlock::Compressed(_))
}
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
/// instead
pub fn inner_buffer(&self) -> &[u8] {
use DataBlock::*;
let (Plain(ref res) | Compressed(ref res)) = self;
res
&self.elem
}
/// Verify data integrity. Does not return the buffer content.
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(data) == hash {
match self.header {
DataBlockHeader::Plain => {
if blake2sum(&self.elem) == hash {
Ok(())
} else {
Err(Error::CorruptData(hash))
}
}
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
.map_err(|_| Error::CorruptData(hash)),
DataBlockHeader::Compressed => {
zstd::stream::copy_decode(&self.elem[..], std::io::sink())
.map_err(|_| Error::CorruptData(hash))
}
}
}
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
tokio::task::spawn_blocking(move || {
if let Some(level) = level {
if let Ok(data) = zstd_encode(&data[..], level) {
return DataBlock::Compressed(data.into());
if let Ok(data_compressed) = zstd_encode(&data[..], level) {
return DataBlock {
header: DataBlockHeader::Compressed,
elem: data_compressed.into(),
};
}
}
DataBlock::Plain(data)
DataBlock {
header: DataBlockHeader::Plain,
elem: data.into(),
}
})
.await
.unwrap()
}
pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
match self {
DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
}
}
pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
match h {
DataBlockHeader::Plain => DataBlock::Plain(bytes),
DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
}
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {

View file

@ -229,11 +229,9 @@ impl BlockManager {
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<(DataBlockHeader, ByteStream), Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
Ok((header, stream))
})
.await
) -> Result<DataBlockStream, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
.await
}
/// Ask nodes that might have a (possibly compressed) block for it
@ -243,7 +241,8 @@ impl BlockManager {
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<DataBlock, Error> {
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
let (header, stream) = block_stream.into_parts();
read_stream_to_end(stream)
.await
.err_context("error in block data stream")
@ -259,7 +258,7 @@ impl BlockManager {
f: F,
) -> Result<T, Error>
where
F: Fn(DataBlockHeader, ByteStream) -> Fut,
F: Fn(DataBlockStream) -> Fut,
Fut: futures::Future<Output = Result<T, Error>>,
{
let who = self.replication.read_nodes(hash);
@ -281,8 +280,8 @@ impl BlockManager {
continue;
}
};
let (header, stream) = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
let block_stream = match res.into_parts() {
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream),
(Ok(_), _) => {
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
continue;
@ -292,7 +291,7 @@ impl BlockManager {
continue;
}
};
match f(header, stream).await {
match f(block_stream).await {
Ok(ret) => return Ok(ret),
Err(e) => {
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
@ -316,14 +315,14 @@ impl BlockManager {
// ---- Public interface ----
/// Ask nodes that might have a block for it,
/// return it as a stream
/// Ask nodes that might have a block for it, return it as a stream
pub async fn rpc_get_block_streaming(
&self,
hash: &Hash,
order_tag: Option<OrderTag>,
) -> Result<ByteStream, Error> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
let (header, stream) = block_stream.into_parts();
match header {
DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => {
@ -336,7 +335,7 @@ impl BlockManager {
}
}
/// Ask nodes that might have a block for it
/// Ask nodes that might have a block for it, return it as one big Bytes
pub async fn rpc_get_block(
&self,
hash: &Hash,
@ -547,10 +546,7 @@ impl BlockManager {
hash: &Hash,
block_path: &DataBlockPath,
) -> Result<DataBlock, Error> {
let (path, compressed) = match block_path {
DataBlockPath::Plain(p) => (p, false),
DataBlockPath::Compressed(p) => (p, true),
};
let (header, path) = block_path.as_parts_ref();
let mut f = fs::File::open(&path).await?;
let mut data = vec![];
@ -558,11 +554,7 @@ impl BlockManager {
self.metrics.bytes_read.add(data.len() as u64);
drop(f);
let data = if compressed {
DataBlock::Compressed(data.into())
} else {
DataBlock::Plain(data.into())
};
let data = DataBlock::from_parts(header, data.into());
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
@ -615,20 +607,20 @@ impl BlockManager {
// first and then a compressed one (as compression may have been
// previously enabled).
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Plain(path));
return Some(DataBlockPath::plain(path));
}
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Compressed(path));
return Some(DataBlockPath::compressed(path));
}
} else {
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Compressed(path));
return Some(DataBlockPath::compressed(path));
}
path.set_extension("");
if fs::metadata(&path).await.is_ok() {
return Some(DataBlockPath::Plain(path));
return Some(DataBlockPath::plain(path));
}
}
}
@ -709,24 +701,25 @@ impl BlockManagerLocked {
tgt_path.set_extension("zst");
}
let to_delete = match (existing_path, compressed) {
let existing_info = existing_path.map(|x| x.into_parts());
let to_delete = match (existing_info, compressed) {
// If the block is stored in the wrong directory,
// write it again at the correct path and delete the old path
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
(Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p),
(Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p),
// If the block is already stored not compressed but we have a compressed
// copy, write the compressed copy and delete the uncompressed one
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
(Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path),
// If the block is already stored compressed,
// keep the stored copy, we have nothing to do
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
(Some((DataBlockHeader::Compressed, _)), _) => return Ok(()),
// If the block is already stored not compressed,
// and we don't have a compressed copy either,
// keep the stored copy, we have nothing to do
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
(Some((DataBlockHeader::Plain, _)), false) => return Ok(()),
// If the block isn't stored already, just store what is given to us
(None, _) => None,
@ -778,18 +771,14 @@ impl BlockManagerLocked {
}
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
let (path, path2) = match block_path {
DataBlockPath::Plain(p) => {
let mut p2 = p.clone();
p2.set_extension("corrupted");
(p, p2)
}
DataBlockPath::Compressed(p) => {
let mut p2 = p.clone();
p2.set_extension("zst.corrupted");
(p, p2)
}
};
let (header, path) = block_path.as_parts_ref();
let mut path2 = path.clone();
if header.is_compressed() {
path2.set_extension("zst.corrupted");
} else {
path2.set_extension("corrupted");
}
fs::rename(path, path2).await?;
Ok(())
@ -799,9 +788,7 @@ impl BlockManagerLocked {
let rc = mgr.rc.get_block_rc(hash)?;
if rc.is_deletable() {
while let Some(path) = mgr.find_block(hash).await {
let path = match path {
DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
};
let (_header, path) = path.as_parts_ref();
fs::remove_file(path).await?;
mgr.metrics.delete_counter.add(1);
}

View file

@ -584,8 +584,8 @@ impl Worker for RebalanceWorker {
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
if path.ancestors().all(|x| x != prim_loc) {
let block_path = match path.extension() {
None => DataBlockPath::Plain(path.clone()),
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
None => DataBlockPath::plain(path.clone()),
Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()),
_ => {
warn!("not rebalancing file: {}", path.to_string_lossy());
return Ok(WorkerState::Busy);