Add compression using zstd #173
2 changed files with 71 additions and 3 deletions
|
@ -107,12 +107,12 @@ impl DataBlock {
|
|||
}
|
||||
}
|
||||
|
||||
/// Verify data integrity. Allocate less than [`verify_get`] and don't consume self, but
|
||||
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
|
||||
/// does not return the buffer content.
|
||||
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
||||
match self {
|
||||
DataBlock::Plain(data) => {
|
||||
if blake2sum(&data) == hash {
|
||||
if blake2sum(data) == hash {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(Error::CorruptData(hash))
|
||||
|
@ -236,7 +236,14 @@ impl BlockManager {
|
|||
/// Send block to nodes that should have it
|
||||
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
||||
let who = self.replication.write_nodes(&hash);
|
||||
let data = DataBlock::from_buffer(data, None); // TODO get compression level from somewhere
|
||||
let compression_level = self
|
||||
.garage
|
||||
.load()
|
||||
.as_ref()
|
||||
.unwrap()
|
||||
.config
|
||||
.compression_level;
|
||||
let data = DataBlock::from_buffer(data, compression_level);
|
||||
self.system
|
||||
.rpc
|
||||
.try_call_many(
|
||||
|
|
|
@ -30,6 +30,13 @@ pub struct Config {
|
|||
// (we can add more aliases for this later)
|
||||
pub replication_mode: String,
|
||||
|
||||
/// Zstd compression level used on data blocks
|
||||
#[serde(
|
||||
deserialize_with = "deserialize_compression",
|
||||
default = "default_compression"
|
||||
)]
|
||||
pub compression_level: Option<i32>,
|
||||
|
||||
/// RPC secret key: 32 bytes hex encoded
|
||||
pub rpc_secret: String,
|
||||
|
||||
|
@ -123,3 +130,57 @@ where
|
|||
|
||||
Ok(ret)
|
||||
}
|
||||
|
||||
fn default_compression() -> Option<i32> {
|
||||
Some(1)
|
||||
}
|
||||
|
||||
fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error>
|
||||
where
|
||||
D: de::Deserializer<'de>,
|
||||
{
|
||||
use std::convert::TryFrom;
|
||||
|
||||
struct OptionVisitor;
|
||||
|
||||
impl<'de> serde::de::Visitor<'de> for OptionVisitor {
|
||||
type Value = Option<i32>;
|
||||
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
|
||||
formatter.write_str("int or 'none'")
|
||||
}
|
||||
|
||||
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
if value.eq_ignore_ascii_case("none") {
|
||||
Ok(None)
|
||||
} else {
|
||||
Err(E::custom(format!(
|
||||
"Invalid compression level: '{}', should be a number, or 'none'",
|
||||
value
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
i32::try_from(v)
|
||||
.map(Some)
|
||||
.map_err(|_| E::custom("Compression level out of bound".to_owned()))
|
||||
}
|
||||
|
||||
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
|
||||
where
|
||||
E: de::Error,
|
||||
{
|
||||
i32::try_from(v)
|
||||
.map(Some)
|
||||
.map_err(|_| E::custom("Compression level out of bound".to_owned()))
|
||||
}
|
||||
}
|
||||
|
||||
deserializer.deserialize_any(OptionVisitor)
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue