From c3982a90b6b9cece7c765070c2c2be22c816ff70 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 15 Mar 2022 12:12:12 +0100 Subject: [PATCH] Move DataBlock out of manager.rs --- src/block/block.rs | 81 +++++++++++++++++++++++++++++++++++++++++++ src/block/lib.rs | 1 + src/block/manager.rs | 82 +++----------------------------------------- src/model/garage.rs | 3 -- 4 files changed, 86 insertions(+), 81 deletions(-) create mode 100644 src/block/block.rs diff --git a/src/block/block.rs b/src/block/block.rs new file mode 100644 index 00000000..4d3fbcb8 --- /dev/null +++ b/src/block/block.rs @@ -0,0 +1,81 @@ +use serde::{Deserialize, Serialize}; +use zstd::stream::{decode_all as zstd_decode, Encoder}; + +use garage_util::data::*; +use garage_util::error::*; + +/// A possibly compressed block of data +#[derive(Debug, Serialize, Deserialize)] +pub enum DataBlock { + /// Uncompressed data + Plain(#[serde(with = "serde_bytes")] Vec), + /// Data compressed with zstd + Compressed(#[serde(with = "serde_bytes")] Vec), +} + +impl DataBlock { + /// Query whether this block is compressed + pub fn is_compressed(&self) -> bool { + matches!(self, DataBlock::Compressed(_)) + } + + /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] + /// instead + pub fn inner_buffer(&self) -> &[u8] { + use DataBlock::*; + let (Plain(ref res) | Compressed(ref res)) = self; + res + } + + /// Get the buffer, possibly decompressing it, and verify it's integrity. + /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system + /// is used instead. + pub fn verify_get(self, hash: Hash) -> Result, Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(&data) == hash { + Ok(data) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => { + zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) + } + } + } + + /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but + /// does not return the buffer content. + pub fn verify(&self, hash: Hash) -> Result<(), Error> { + match self { + DataBlock::Plain(data) => { + if blake2sum(data) == hash { + Ok(()) + } else { + Err(Error::CorruptData(hash)) + } + } + DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) + .map_err(|_| Error::CorruptData(hash)), + } + } + + pub fn from_buffer(data: Vec, level: Option) -> DataBlock { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } + } + DataBlock::Plain(data) + } +} + +fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { + let mut result = Vec::::new(); + let mut encoder = Encoder::new(&mut result, level)?; + encoder.include_checksum(true)?; + std::io::copy(&mut source, &mut encoder)?; + encoder.finish()?; + Ok(result) +} diff --git a/src/block/lib.rs b/src/block/lib.rs index 47ff402d..0c67c956 100644 --- a/src/block/lib.rs +++ b/src/block/lib.rs @@ -3,4 +3,5 @@ extern crate tracing; pub mod manager; +mod block; mod metrics; diff --git a/src/block/manager.rs b/src/block/manager.rs index f047e1d3..9665a306 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,7 +5,6 @@ use std::time::Duration; use async_trait::async_trait; use serde::{Deserialize, Serialize}; -use zstd::stream::{decode_all as zstd_decode, Encoder}; use futures::future::*; use futures::select; @@ -31,6 +30,7 @@ use garage_rpc::*; use garage_table::replication::{TableReplication, TableShardedReplication}; use crate::metrics::*; +use crate::block::*; /// Size under which data will be stored inlined in database instead of as files pub const INLINE_THRESHOLD: usize = 3072; @@ -71,73 +71,6 @@ pub enum BlockRpc { NeedBlockReply(bool), } -/// A possibly compressed block of data -#[derive(Debug, Serialize, Deserialize)] -pub enum DataBlock { - /// Uncompressed data - Plain(#[serde(with = "serde_bytes")] Vec), - /// Data compressed with zstd - Compressed(#[serde(with = "serde_bytes")] Vec), -} - -impl DataBlock { - /// Query whether this block is compressed - pub fn is_compressed(&self) -> bool { - matches!(self, DataBlock::Compressed(_)) - } - - /// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`] - /// instead - pub fn inner_buffer(&self) -> &[u8] { - use DataBlock::*; - let (Plain(ref res) | Compressed(ref res)) = self; - res - } - - /// Get the buffer, possibly decompressing it, and verify it's integrity. - /// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system - /// is used instead. - pub fn verify_get(self, hash: Hash) -> Result, Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(&data) == hash { - Ok(data) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => { - zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash)) - } - } - } - - /// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but - /// does not return the buffer content. - pub fn verify(&self, hash: Hash) -> Result<(), Error> { - match self { - DataBlock::Plain(data) => { - if blake2sum(data) == hash { - Ok(()) - } else { - Err(Error::CorruptData(hash)) - } - } - DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink()) - .map_err(|_| Error::CorruptData(hash)), - } - } - - pub fn from_buffer(data: Vec, level: Option) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); - } - } - DataBlock::Plain(data) - } -} - impl Rpc for BlockRpc { type Response = Result; } @@ -215,6 +148,8 @@ impl BlockManager { }); block_manager.endpoint.set_handler(block_manager.clone()); + block_manager.clone().spawn_background_worker(); + block_manager } @@ -494,7 +429,7 @@ impl BlockManager { // ---- Resync loop ---- - pub fn spawn_background_worker(self: Arc) { + fn spawn_background_worker(self: Arc) { // Launch a background workers for background resync loop processing let background = self.system.background.clone(); tokio::spawn(async move { @@ -1080,12 +1015,3 @@ impl ErrorCounter { self.last_try + self.delay_msec() } } - -fn zstd_encode(mut source: R, level: i32) -> std::io::Result> { - let mut result = Vec::::new(); - let mut encoder = Encoder::new(&mut result, level)?; - encoder.include_checksum(true)?; - std::io::copy(&mut source, &mut encoder)?; - encoder.finish()?; - Ok(result) -} diff --git a/src/model/garage.rs b/src/model/garage.rs index 3f2605f1..93402ca2 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -153,9 +153,6 @@ impl Garage { block_ref_table, }); - info!("Start block manager background thread..."); - garage.block_manager.clone().spawn_background_worker(); - garage }