Add compression using zstd #173

Merged
lx merged 8 commits from trinity-1686a/garage:compression into main 2021-12-15 10:26:43 +00:00
8 changed files with 377 additions and 53 deletions

42
Cargo.lock generated
View file

@ -106,6 +106,9 @@ name = "cc"
version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
dependencies = [
"jobserver",
]
[[package]]
name = "cfg-if"
@ -458,6 +461,7 @@ dependencies = [
"serde_bytes",
"sled",
"tokio",
"zstd",
]
[[package]]
@ -752,6 +756,15 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]]
name = "kuska-handshake"
version = "0.2.0"
@ -1652,3 +1665,32 @@ name = "xxhash-rust"
version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded"
[[package]]
name = "zstd"
version = "0.9.0+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "4.1.1+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "1.6.1+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"
dependencies = [
"cc",
"libc",
]

View file

@ -196,6 +196,13 @@ in
version = "1.0.71";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"; };
features = builtins.concatLists [
[ "jobserver" ]
[ "parallel" ]
];
dependencies = {
jobserver = rustPackages."registry+https://github.com/rust-lang/crates.io-index".jobserver."0.1.24" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" = overridableMkRustCrate (profileName: rec {
@ -695,6 +702,7 @@ in
serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; };
sled = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sled."0.34.7" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; };
zstd = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.0+zstd.1.5.0" { inherit profileName; };
};
});
@ -1055,6 +1063,16 @@ in
];
});
"registry+https://github.com/rust-lang/crates.io-index".jobserver."0.1.24" = overridableMkRustCrate (profileName: rec {
name = "jobserver";
version = "0.1.24";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"; };
dependencies = {
${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" = overridableMkRustCrate (profileName: rec {
name = "kuska-handshake";
version = "0.2.0";
@ -2351,4 +2369,44 @@ in
];
});
"registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.0+zstd.1.5.0" = overridableMkRustCrate (profileName: rec {
name = "zstd";
version = "0.9.0+zstd.1.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"; };
dependencies = {
zstd_safe = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.1+zstd.1.5.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.1+zstd.1.5.0" = overridableMkRustCrate (profileName: rec {
name = "zstd-safe";
version = "4.1.1+zstd.1.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"; };
features = builtins.concatLists [
[ "std" ]
];
dependencies = {
libc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; };
zstd_sys = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-sys."1.6.1+zstd.1.5.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".zstd-sys."1.6.1+zstd.1.5.0" = overridableMkRustCrate (profileName: rec {
name = "zstd-sys";
version = "1.6.1+zstd.1.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"; };
features = builtins.concatLists [
[ "std" ]
];
dependencies = {
libc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; };
};
buildDependencies = {
cc = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".cc."1.0.71" { profileName = "__noProfile"; };
};
});
}

View file

@ -74,6 +74,8 @@ data_dir = "/var/lib/garage/data"
replication_mode = "3"
compression_level = 2
rpc_bind_addr = "[::]:3901"
rpc_public_addr = "<this node's public IP>:3901"
rpc_secret = "<RPC secret>"

View file

@ -10,6 +10,8 @@ block_size = 1048576
replication_mode = "3"
compression_level = 1
rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6"
rpc_bind_addr = "[::]:3901"
rpc_public_addr = "[fc00:1::1]:3901"
@ -98,6 +100,30 @@ Never run a Garage cluster where that is not the case.**
Changing the `replication_mode` of a cluster might work (make sure to shut down all nodes
and changing it everywhere at the time), but is not officially supported.
### `compression_level`
Review

To help people, I propose adding a bit more information here and maybe a link or two?

I found these resources:

The first link says:

The library supports regular compression levels from 1 up to ZSTD_maxCLevel(), which is currently 22. Levels >= 20, labeled --ultra, should be used with caution, as they require more memory. The library also offers negative compression levels, which extend the range of speed vs. ratio preferences. The lower the level, the faster the speed (at the cost of compression).

The CLI explains the 3 different types of compression levels: standard, ultra and fast.


We could add something like that:

Values between 1 (faster compression) and 19 (smaller file) are standard compression levels for zstd. From 20 to 22, compression levels are referred as "ultra" and must be used with extra care as it will use lot of memory. A value of 0 will let zstd choose a default value (currently 3). Finally, zstd has also compression designed to be faster than default compression levels, they range from -1 (smaller file) to -99 (faster compression).

If you do not specify a compression_level entry, garage will set it to 1 for you. With this parameters, zstd consumes low amount of cpu and should work faster than line speed in most situations, while saving some space and intra-cluster
bandwidth.

If you want to totally deactivate zstd in garage, you can pass the special value none. No zstd related code will be called, your chunks will be stored on disk without any processing.

To help people, I propose adding a bit more information here and maybe a link or two? I found these resources: - https://facebook.github.io/zstd/zstd_manual.html - https://github.com/facebook/zstd/blob/d7e17363751974dc1ad10785deb4170b23bee0ec/programs/zstd.1.md - https://github.com/facebook/zstd/blob/9b97fdf74fa9f8adaa557a710b726e2e6966adee/lib/dictBuilder/zdict.c#L768 - https://github.com/facebook/zstd/blob/550410d05d7c7815b1ff417c4cac51153a78785e/lib/zstd.h#L97 - https://github.com/facebook/zstd/blob/38dfc4699e1108d839b3222b6093caaad5befd1c/programs/zstdcli.c#L774-L775 The first link says: > The library supports regular compression levels from 1 up to ZSTD_maxCLevel(), which is currently 22. Levels >= 20, labeled `--ultra`, should be used with caution, as they require more memory. The library also offers negative compression levels, which extend the range of speed vs. ratio preferences. The lower the level, the faster the speed (at the cost of compression). The CLI explains the 3 different types of compression levels: standard, ultra and fast. --- We could add something like that: > Values between `1` (faster compression) and `19` (smaller file) are standard compression levels for zstd. From `20` to `22`, compression levels are referred as "ultra" and must be used with extra care as it will use lot of memory. A value of `0` will let zstd choose a default value (currently `3`). Finally, zstd has also compression designed to be faster than default compression levels, they range from `-1` (smaller file) to `-99` (faster compression). > > If you do not specify a `compression_level` entry, garage will set it to `1` for you. With this parameters, zstd consumes low amount of cpu and should work faster than line speed in most situations, while saving some space and intra-cluster bandwidth. > > If you want to totally deactivate zstd in garage, you can pass the special value `none`. No zstd related code will be called, your chunks will be stored on disk without any processing.
Zstd compression level to use for storing blocks.
Values between `1` (faster compression) and `19` (smaller file) are standard compression
levels for zstd. From `20` to `22`, compression levels are referred as "ultra" and must be
used with extra care as it will use lot of memory. A value of `0` will let zstd choose a
default value (currently `3`). Finally, zstd has also compression designed to be faster
than default compression levels, they range from `-1` (smaller file) to `-99` (faster
compression).
If you do not specify a `compression_level` entry, garage will set it to `1` for you. With
this parameters, zstd consumes low amount of cpu and should work faster than line speed in
most situations, while saving some space and intra-cluster
bandwidth.
If you want to totally deactivate zstd in garage, you can pass the special value `'none'`. No
zstd related code will be called, your chunks will be stored on disk without any processing.
Compression is done synchronously, setting a value too high will add latency to write queries.
This value can be different between nodes, compression is done by the node which receive the
API call.
#### `rpc_secret`
Garage uses a secret key that is shared between all nodes of the cluster

View file

@ -30,6 +30,11 @@ dd if=/dev/urandom of=/tmp/garage.1.rnd bs=1k count=2 # No multipart, inline sto
dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5 # No multipart but file will be chunked
dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10 # by default, AWS starts using multipart at 8MB
# data of lower entropy, to test compression
dd if=/dev/urandom bs=1k count=2 | base64 -w0 > /tmp/garage.1.b64
dd if=/dev/urandom bs=1M count=5 | base64 -w0 > /tmp/garage.2.b64
dd if=/dev/urandom bs=1M count=10 | base64 -w0 > /tmp/garage.3.b64
echo "🧪 S3 API testing..."
# AWS
@ -37,11 +42,11 @@ if [ -z "$SKIP_AWS" ]; then
echo "🛠️ Testing with awscli"
source ${SCRIPT_FOLDER}/dev-env-aws.sh
aws s3 ls
for idx in $(seq 1 3); do
aws s3 cp "/tmp/garage.$idx.rnd" "s3://eprouvette/&+-é\"/garage.$idx.aws"
for idx in {1..3}.{rnd,b64}; do
aws s3 cp "/tmp/garage.$idx" "s3://eprouvette/&+-é\"/garage.$idx.aws"
aws s3 ls s3://eprouvette
aws s3 cp "s3://eprouvette/&+-é\"/garage.$idx.aws" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
aws s3 rm "s3://eprouvette/&+-é\"/garage.$idx.aws"
done
@ -52,11 +57,11 @@ if [ -z "$SKIP_S3CMD" ]; then
echo "🛠️ Testing with s3cmd"
source ${SCRIPT_FOLDER}/dev-env-s3cmd.sh
s3cmd ls
for idx in $(seq 1 3); do
s3cmd put "/tmp/garage.$idx.rnd" "s3://eprouvette/&+-é\"/garage.$idx.s3cmd"
for idx in {1..3}.{rnd,b64}; do
s3cmd put "/tmp/garage.$idx" "s3://eprouvette/&+-é\"/garage.$idx.s3cmd"
s3cmd ls s3://eprouvette
s3cmd get "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
s3cmd rm "s3://eprouvette/&+-é\"/garage.$idx.s3cmd"
done
@ -67,11 +72,11 @@ if [ -z "$SKIP_MC" ]; then
echo "🛠️ Testing with mc (minio client)"
source ${SCRIPT_FOLDER}/dev-env-mc.sh
mc ls garage/
for idx in $(seq 1 3); do
mc cp "/tmp/garage.$idx.rnd" "garage/eprouvette/&+-é\"/garage.$idx.mc"
for idx in {1..3}.{rnd,b64}; do
mc cp "/tmp/garage.$idx" "garage/eprouvette/&+-é\"/garage.$idx.mc"
mc ls garage/eprouvette
mc cp "garage/eprouvette/&+-é\"/garage.$idx.mc" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
mc rm "garage/eprouvette/&+-é\"/garage.$idx.mc"
done
@ -82,13 +87,13 @@ if [ -z "$SKIP_RCLONE" ]; then
echo "🛠️ Testing with rclone"
source ${SCRIPT_FOLDER}/dev-env-rclone.sh
rclone lsd garage:
for idx in $(seq 1 3); do
cp /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
for idx in {1..3}.{rnd,b64}; do
cp /tmp/garage.$idx /tmp/garage.$idx.dl
rclone copy "/tmp/garage.$idx.dl" "garage:eprouvette/&+-é\"/"
rm /tmp/garage.$idx.dl
rclone ls garage:eprouvette
rclone copy "garage:eprouvette/&+-é\"/garage.$idx.dl" "/tmp/"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
rclone delete "garage:eprouvette/&+-é\"/garage.$idx.dl"
done
@ -100,17 +105,17 @@ if [ -z "$SKIP_DUCK" ]; then
source ${SCRIPT_FOLDER}/dev-env-duck.sh
duck --list garage:/
duck --mkdir "garage:/eprouvette/duck"
for idx in $(seq 1 3); do
duck --verbose --upload "garage:/eprouvette/duck/" "/tmp/garage.$idx.rnd"
for idx in {1..3}.{rnd,b64}; do
duck --verbose --upload "garage:/eprouvette/duck/" "/tmp/garage.$idx"
duck --list garage:/eprouvette/duck/
duck --download "garage:/eprouvette/duck/garage.$idx.rnd" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl
duck --download "garage:/eprouvette/duck/garage.$idx" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl
duck --delete "garage:/eprouvette/duck/garage.$idx.dk"
done
fi
rm /tmp/garage.{1,2,3}.rnd
rm /tmp/garage.{1..3}.{rnd,b64}
if [ -z "$SKIP_AWS" ]; then
echo "🧪 Website Testing"

View file

@ -23,6 +23,7 @@ arc-swap = "1.0"
hex = "0.4"
log = "0.4"
rand = "0.8"
zstd = { version = "0.9", default-features = false }
sled = "0.34"

View file

@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*;
use garage_util::error::*;
@ -55,22 +56,81 @@ pub enum BlockRpc {
GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new
/// block
PutBlock(PutBlockMessage),
PutBlock {
hash: Hash,
data: DataBlock,
},
/// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash),
/// Response : whether the node do require that block
NeedBlockReply(bool),
}
/// Structure used to send a block
/// A possibly compressed block of data
#[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage {
/// Hash of the block
pub hash: Hash,
pub enum DataBlock {
/// Uncompressed data
Plain(#[serde(with = "serde_bytes")] Vec<u8>),
/// Data compressed with zstd
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
}
/// Content of the block
#[serde(with = "serde_bytes")]
pub data: Vec<u8>,
impl DataBlock {
/// Query whether this block is compressed
pub fn is_compressed(&self) -> bool {
matches!(self, DataBlock::Compressed(_))
}
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
/// instead
pub fn inner_buffer(&self) -> &[u8] {
use DataBlock::*;
let (Plain(ref res) | Compressed(ref res)) = self;
res
}
/// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead.
pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(&data) == hash {
Ok(data)
} else {
Err(Error::CorruptData(hash))
}
}
DataBlock::Compressed(data) => {
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
}
}
}
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
/// does not return the buffer content.
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(data) == hash {
Ok(())
} else {
Err(Error::CorruptData(hash))
}
}
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
.map_err(|_| Error::CorruptData(hash)),
}
}
pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
if let Some(level) = level {
if let Ok(data) = zstd_encode(&data[..], level) {
return DataBlock::Compressed(data);
}
}
DataBlock::Plain(data)
}
}
impl Rpc for BlockRpc {
@ -138,10 +198,8 @@ impl BlockManager {
block_manager
}
// ---- Public interface ----
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
/// Ask nodes that might have a (possibly compressed) block for it
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
let who = self.replication.read_nodes(hash);
let resps = self
.system
@ -158,8 +216,8 @@ impl BlockManager {
.await?;
for resp in resps {
if let BlockRpc::PutBlock(msg) = resp {
return Ok(msg.data);
if let BlockRpc::PutBlock { data, .. } = resp {
return Ok(data);
}
}
Err(Error::Message(format!(
@ -168,15 +226,30 @@ impl BlockManager {
)))
}
// ---- Public interface ----
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
self.rpc_get_raw_block(hash).await?.verify_get(*hash)
}
/// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash);
let compression_level = self
.garage
.load()
.as_ref()
.unwrap()
.config
.compression_level;
let data = DataBlock::from_buffer(data, compression_level);
self.system
.rpc
.try_call_many(
&self.endpoint,
&who[..],
BlockRpc::PutBlock(PutBlockMessage { hash, data }),
BlockRpc::PutBlock { hash, data },
RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT),
@ -306,7 +379,7 @@ impl BlockManager {
// ---- Reading and writing blocks locally ----
/// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> {
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
self.mutation_lock
.lock()
.await
@ -316,21 +389,31 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let path = self.block_path(hash);
let mut f = match fs::File::open(&path).await {
Ok(f) => f,
let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
Ok(c) => c,
Err(e) => {
// Not found but maybe we should have had it ??
self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
return Err(Into::into(e));
}
};
if compressed {
path.set_extension("zst");
}
let mut f = fs::File::open(&path).await?;
let mut data = vec![];
f.read_to_end(&mut data).await?;
drop(f);
if blake2sum(&data[..]) != *hash {
let data = if compressed {
DataBlock::Compressed(data)
} else {
DataBlock::Plain(data)
};
if data.verify(*hash).is_err() {
self.mutation_lock
.lock()
.await
@ -340,7 +423,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash));
}
Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data }))
Ok(BlockRpc::PutBlock { hash: *hash, data })
}
/// Check if this node should have a block, but don't actually have it
@ -362,13 +445,25 @@ impl BlockManager {
path
}
/// Utility: give the full path where a block should be found
/// Utility: give the full path where a block should be found, minus extension if block is
/// compressed
fn block_path(&self, hash: &Hash) -> PathBuf {
let mut path = self.block_dir(hash);
path.push(hex::encode(hash.as_ref()));
path
}
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Ok(true);
}
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
// ---- Resync loop ----
pub fn spawn_background_worker(self: Arc<Self>) {
@ -550,8 +645,8 @@ impl BlockManager {
hash
);
let block_data = self.rpc_get_block(hash).await?;
self.write_block(hash, &block_data[..]).await?;
let block_data = self.rpc_get_raw_block(hash).await?;
self.write_block(hash, &block_data).await?;
}
Ok(())
@ -602,6 +697,7 @@ impl BlockManager {
};
let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
state = self
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
@ -631,7 +727,7 @@ impl EndpointHandler<BlockRpc> for BlockManager {
_from: NodeID,
) -> Result<BlockRpc, Error> {
match message {
BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await,
BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
BlockRpc::GetBlock(h) => self.read_block(h).await,
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
@ -650,9 +746,7 @@ impl BlockManagerLocked {
hash: &Hash,
mgr: &BlockManager,
) -> Result<BlockStatus, Error> {
let path = mgr.block_path(hash);
let exists = fs::metadata(&path).await.is_ok();
let exists = mgr.is_block_compressed(hash).await.is_ok();
let needed = mgr.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed })
@ -661,16 +755,31 @@ impl BlockManagerLocked {
async fn write_block(
&self,
hash: &Hash,
data: &[u8],
data: &DataBlock,
mgr: &BlockManager,
) -> Result<BlockRpc, Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
let mut path = mgr.block_dir(hash);
fs::create_dir_all(&path).await?;
path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() {
return Ok(BlockRpc::Ok);
}
let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
(Ok(true), _) => return Ok(BlockRpc::Ok),
(Ok(false), false) => return Ok(BlockRpc::Ok),
(Ok(false), true) => {
let path_to_delete = path.clone();
path.set_extension("zst");
Some(path_to_delete)
}
(Err(_), compressed) => {
if compressed {
path.set_extension("zst");
}
None
}
};
let mut path2 = path.clone();
path2.set_extension("tmp");
@ -679,6 +788,9 @@ impl BlockManagerLocked {
drop(f);
fs::rename(path2, path).await?;
if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?;
}
Ok(BlockRpc::Ok)
}
@ -688,9 +800,14 @@ impl BlockManagerLocked {
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
hash
);
let path = mgr.block_path(hash);
let mut path = mgr.block_path(hash);
let mut path2 = path.clone();
path2.set_extension("corrupted");
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
path2.set_extension("zst.corrupted");
} else {
path2.set_extension("corrupted");
}
fs::rename(path, path2).await?;
Ok(())
}
@ -699,7 +816,10 @@ impl BlockManagerLocked {
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
if exists && needed.is_deletable() {
let path = mgr.block_path(hash);
let mut path = mgr.block_path(hash);
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
}
fs::remove_file(path).await?;
}
Ok(())
@ -806,3 +926,12 @@ impl RcEntry {
!self.is_deletable()
}
}
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?;
std::io::copy(&mut source, &mut encoder)?;
encoder.finish()?;
Ok(result)
}

View file

@ -30,6 +30,13 @@ pub struct Config {
// (we can add more aliases for this later)
pub replication_mode: String,
/// Zstd compression level used on data blocks
#[serde(
deserialize_with = "deserialize_compression",
default = "default_compression"
)]
pub compression_level: Option<i32>,
/// RPC secret key: 32 bytes hex encoded
pub rpc_secret: String,
@ -123,3 +130,57 @@ where
Ok(ret)
}
fn default_compression() -> Option<i32> {
Some(1)
}
fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error>
where
D: de::Deserializer<'de>,
{
use std::convert::TryFrom;
struct OptionVisitor;
impl<'de> serde::de::Visitor<'de> for OptionVisitor {
type Value = Option<i32>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("int or 'none'")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
if value.eq_ignore_ascii_case("none") {
Ok(None)
} else {
Err(E::custom(format!(
"Invalid compression level: '{}', should be a number, or 'none'",
value
)))
}
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
i32::try_from(v)
.map(Some)
.map_err(|_| E::custom("Compression level out of bound".to_owned()))
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
i32::try_from(v)
.map(Some)
.map_err(|_| E::custom("Compression level out of bound".to_owned()))
}
}
deserializer.deserialize_any(OptionVisitor)
}