Move DataBlock out of manager.rs
This commit is contained in:
parent
c1d9854d2c
commit
c3982a90b6
4 changed files with 86 additions and 81 deletions
81
src/block/block.rs
Normal file
81
src/block/block.rs
Normal file
|
@ -0,0 +1,81 @@
|
|||
use serde::{Deserialize, Serialize};
|
||||
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
||||
|
||||
use garage_util::data::*;
|
||||
use garage_util::error::*;
|
||||
|
||||
/// A possibly compressed block of data
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum DataBlock {
|
||||
/// Uncompressed data
|
||||
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
/// Data compressed with zstd
|
||||
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
}
|
||||
|
||||
impl DataBlock {
|
||||
/// Query whether this block is compressed
|
||||
pub fn is_compressed(&self) -> bool {
|
||||
matches!(self, DataBlock::Compressed(_))
|
||||
}
|
||||
|
||||
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
|
||||
/// instead
|
||||
pub fn inner_buffer(&self) -> &[u8] {
|
||||
use DataBlock::*;
|
||||
let (Plain(ref res) | Compressed(ref res)) = self;
|
||||
res
|
||||
}
|
||||
|
||||
/// Get the buffer, possibly decompressing it, and verify it's integrity.
|
||||
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
|
||||
/// is used instead.
|
||||
pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
|
||||
match self {
|
||||
DataBlock::Plain(data) => {
|
||||
if blake2sum(&data) == hash {
|
||||
Ok(data)
|
||||
} else {
|
||||
Err(Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
DataBlock::Compressed(data) => {
|
||||
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
|
||||
/// does not return the buffer content.
|
||||
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
||||
match self {
|
||||
DataBlock::Plain(data) => {
|
||||
if blake2sum(data) == hash {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
|
||||
.map_err(|_| Error::CorruptData(hash)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
|
||||
if let Some(level) = level {
|
||||
if let Ok(data) = zstd_encode(&data[..], level) {
|
||||
return DataBlock::Compressed(data);
|
||||
}
|
||||
}
|
||||
DataBlock::Plain(data)
|
||||
}
|
||||
}
|
||||
|
||||
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
||||
let mut result = Vec::<u8>::new();
|
||||
let mut encoder = Encoder::new(&mut result, level)?;
|
||||
encoder.include_checksum(true)?;
|
||||
std::io::copy(&mut source, &mut encoder)?;
|
||||
encoder.finish()?;
|
||||
Ok(result)
|
||||
}
|
|
@ -3,4 +3,5 @@ extern crate tracing;
|
|||
|
||||
pub mod manager;
|
||||
|
||||
mod block;
|
||||
mod metrics;
|
||||
|
|
|
@ -5,7 +5,6 @@ use std::time::Duration;
|
|||
|
||||
use async_trait::async_trait;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
||||
|
||||
use futures::future::*;
|
||||
use futures::select;
|
||||
|
@ -31,6 +30,7 @@ use garage_rpc::*;
|
|||
use garage_table::replication::{TableReplication, TableShardedReplication};
|
||||
|
||||
use crate::metrics::*;
|
||||
use crate::block::*;
|
||||
|
||||
/// Size under which data will be stored inlined in database instead of as files
|
||||
pub const INLINE_THRESHOLD: usize = 3072;
|
||||
|
@ -71,73 +71,6 @@ pub enum BlockRpc {
|
|||
NeedBlockReply(bool),
|
||||
}
|
||||
|
||||
/// A possibly compressed block of data
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub enum DataBlock {
|
||||
/// Uncompressed data
|
||||
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
/// Data compressed with zstd
|
||||
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
|
||||
}
|
||||
|
||||
impl DataBlock {
|
||||
/// Query whether this block is compressed
|
||||
pub fn is_compressed(&self) -> bool {
|
||||
matches!(self, DataBlock::Compressed(_))
|
||||
}
|
||||
|
||||
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
|
||||
/// instead
|
||||
pub fn inner_buffer(&self) -> &[u8] {
|
||||
use DataBlock::*;
|
||||
let (Plain(ref res) | Compressed(ref res)) = self;
|
||||
res
|
||||
}
|
||||
|
||||
/// Get the buffer, possibly decompressing it, and verify it's integrity.
|
||||
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
|
||||
/// is used instead.
|
||||
pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
|
||||
match self {
|
||||
DataBlock::Plain(data) => {
|
||||
if blake2sum(&data) == hash {
|
||||
Ok(data)
|
||||
} else {
|
||||
Err(Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
DataBlock::Compressed(data) => {
|
||||
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
|
||||
/// does not return the buffer content.
|
||||
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
||||
match self {
|
||||
DataBlock::Plain(data) => {
|
||||
if blake2sum(data) == hash {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::CorruptData(hash))
|
||||
}
|
||||
}
|
||||
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
|
||||
.map_err(|_| Error::CorruptData(hash)),
|
||||
}
|
||||
}
|
||||
|
||||
pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
|
||||
if let Some(level) = level {
|
||||
if let Ok(data) = zstd_encode(&data[..], level) {
|
||||
return DataBlock::Compressed(data);
|
||||
}
|
||||
}
|
||||
DataBlock::Plain(data)
|
||||
}
|
||||
}
|
||||
|
||||
impl Rpc for BlockRpc {
|
||||
type Response = Result<BlockRpc, Error>;
|
||||
}
|
||||
|
@ -215,6 +148,8 @@ impl BlockManager {
|
|||
});
|
||||
block_manager.endpoint.set_handler(block_manager.clone());
|
||||
|
||||
block_manager.clone().spawn_background_worker();
|
||||
|
||||
block_manager
|
||||
}
|
||||
|
||||
|
@ -494,7 +429,7 @@ impl BlockManager {
|
|||
|
||||
// ---- Resync loop ----
|
||||
|
||||
pub fn spawn_background_worker(self: Arc<Self>) {
|
||||
fn spawn_background_worker(self: Arc<Self>) {
|
||||
// Launch a background workers for background resync loop processing
|
||||
let background = self.system.background.clone();
|
||||
tokio::spawn(async move {
|
||||
|
@ -1080,12 +1015,3 @@ impl ErrorCounter {
|
|||
self.last_try + self.delay_msec()
|
||||
}
|
||||
}
|
||||
|
||||
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
||||
let mut result = Vec::<u8>::new();
|
||||
let mut encoder = Encoder::new(&mut result, level)?;
|
||||
encoder.include_checksum(true)?;
|
||||
std::io::copy(&mut source, &mut encoder)?;
|
||||
encoder.finish()?;
|
||||
Ok(result)
|
||||
}
|
||||
|
|
|
@ -153,9 +153,6 @@ impl Garage {
|
|||
block_ref_table,
|
||||
});
|
||||
|
||||
info!("Start block manager background thread...");
|
||||
garage.block_manager.clone().spawn_background_worker();
|
||||
|
||||
garage
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in a new issue