some refactoring on data read/write path #729
2 changed files with 4 additions and 24 deletions
|
@ -2,7 +2,7 @@ use std::path::PathBuf;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
use zstd::stream::Encoder;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
@ -43,26 +43,7 @@ impl DataBlock {
|
||||||
res
|
res
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Get the buffer, possibly decompressing it, and verify it's integrity.
|
/// Verify data integrity. Does not return the buffer content.
|
||||||
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
|
|
||||||
/// is used instead.
|
|
||||||
pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
|
|
||||||
match self {
|
|
||||||
DataBlock::Plain(data) => {
|
|
||||||
if blake2sum(&data) == hash {
|
|
||||||
Ok(data)
|
|
||||||
} else {
|
|
||||||
Err(Error::CorruptData(hash))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DataBlock::Compressed(data) => zstd_decode(&data[..])
|
|
||||||
.map_err(|_| Error::CorruptData(hash))
|
|
||||||
.map(Bytes::from),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
|
|
||||||
/// does not return the buffer content.
|
|
||||||
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
||||||
match self {
|
match self {
|
||||||
DataBlock::Plain(data) => {
|
DataBlock::Plain(data) => {
|
||||||
|
|
|
@ -342,9 +342,8 @@ impl BlockManager {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<Bytes, Error> {
|
) -> Result<Bytes, Error> {
|
||||||
self.rpc_get_raw_block(hash, order_tag)
|
let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
|
||||||
.await?
|
Ok(read_stream_to_end(stream).await?.into_bytes())
|
||||||
.verify_get(*hash)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send block to nodes that should have it
|
/// Send block to nodes that should have it
|
||||||
|
|
Loading…
Reference in a new issue