Add compression using zstd (#173)
All checks were successful
continuous-integration/drone/push Build is passing

fix #27

Co-authored-by: Trinity Pointard <trinity.pointard@gmail.com>
Reviewed-on: #173
Co-authored-by: trinity-1686a <trinity.pointard@gmail.com>
Co-committed-by: trinity-1686a <trinity.pointard@gmail.com>
This commit is contained in:
trinity-1686a 2021-12-15 11:26:43 +01:00 committed by Alex
parent 60d4459926
commit 1eb972b1ac
8 changed files with 377 additions and 53 deletions

42
Cargo.lock generated
View file

@ -106,6 +106,9 @@ name = "cc"
version = "1.0.71" version = "1.0.71"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd" checksum = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"
dependencies = [
"jobserver",
]
[[package]] [[package]]
name = "cfg-if" name = "cfg-if"
@ -458,6 +461,7 @@ dependencies = [
"serde_bytes", "serde_bytes",
"sled", "sled",
"tokio", "tokio",
"zstd",
] ]
[[package]] [[package]]
@ -752,6 +756,15 @@ version = "0.4.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4" checksum = "b71991ff56294aa922b450139ee08b3bfc70982c6b2c7562771375cf73542dd4"
[[package]]
name = "jobserver"
version = "0.1.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "kuska-handshake" name = "kuska-handshake"
version = "0.2.0" version = "0.2.0"
@ -1652,3 +1665,32 @@ name = "xxhash-rust"
version = "0.8.2" version = "0.8.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded" checksum = "e575e15bedf6e57b5c2d763ffc6c3c760143466cbd09d762d539680ab5992ded"
[[package]]
name = "zstd"
version = "0.9.0+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"
dependencies = [
"zstd-safe",
]
[[package]]
name = "zstd-safe"
version = "4.1.1+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"
dependencies = [
"libc",
"zstd-sys",
]
[[package]]
name = "zstd-sys"
version = "1.6.1+zstd.1.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"
dependencies = [
"cc",
"libc",
]

View file

@ -196,6 +196,13 @@ in
version = "1.0.71"; version = "1.0.71";
registry = "registry+https://github.com/rust-lang/crates.io-index"; registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"; }; src = fetchCratesIo { inherit name version; sha256 = "79c2681d6594606957bbb8631c4b90a7fcaaa72cdb714743a437b156d6a7eedd"; };
features = builtins.concatLists [
[ "jobserver" ]
[ "parallel" ]
];
dependencies = {
jobserver = rustPackages."registry+https://github.com/rust-lang/crates.io-index".jobserver."0.1.24" { inherit profileName; };
};
}); });
"registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".cfg-if."1.0.0" = overridableMkRustCrate (profileName: rec {
@ -695,6 +702,7 @@ in
serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; }; serde_bytes = rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.5" { inherit profileName; };
sled = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sled."0.34.7" { inherit profileName; }; sled = rustPackages."registry+https://github.com/rust-lang/crates.io-index".sled."0.34.7" { inherit profileName; };
tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; }; tokio = rustPackages."registry+https://github.com/rust-lang/crates.io-index".tokio."1.12.0" { inherit profileName; };
zstd = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.0+zstd.1.5.0" { inherit profileName; };
}; };
}); });
@ -1055,6 +1063,16 @@ in
]; ];
}); });
"registry+https://github.com/rust-lang/crates.io-index".jobserver."0.1.24" = overridableMkRustCrate (profileName: rec {
name = "jobserver";
version = "0.1.24";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "af25a77299a7f711a01975c35a6a424eb6862092cc2d6c72c4ed6cbc56dfc1fa"; };
dependencies = {
${ if hostPlatform.isUnix then "libc" else null } = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" = overridableMkRustCrate (profileName: rec { "registry+https://github.com/rust-lang/crates.io-index".kuska-handshake."0.2.0" = overridableMkRustCrate (profileName: rec {
name = "kuska-handshake"; name = "kuska-handshake";
version = "0.2.0"; version = "0.2.0";
@ -2351,4 +2369,44 @@ in
]; ];
}); });
"registry+https://github.com/rust-lang/crates.io-index".zstd."0.9.0+zstd.1.5.0" = overridableMkRustCrate (profileName: rec {
name = "zstd";
version = "0.9.0+zstd.1.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "07749a5dc2cb6b36661290245e350f15ec3bbb304e493db54a1d354480522ccd"; };
dependencies = {
zstd_safe = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.1+zstd.1.5.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".zstd-safe."4.1.1+zstd.1.5.0" = overridableMkRustCrate (profileName: rec {
name = "zstd-safe";
version = "4.1.1+zstd.1.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "c91c90f2c593b003603e5e0493c837088df4469da25aafff8bce42ba48caf079"; };
features = builtins.concatLists [
[ "std" ]
];
dependencies = {
libc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; };
zstd_sys = rustPackages."registry+https://github.com/rust-lang/crates.io-index".zstd-sys."1.6.1+zstd.1.5.0" { inherit profileName; };
};
});
"registry+https://github.com/rust-lang/crates.io-index".zstd-sys."1.6.1+zstd.1.5.0" = overridableMkRustCrate (profileName: rec {
name = "zstd-sys";
version = "1.6.1+zstd.1.5.0";
registry = "registry+https://github.com/rust-lang/crates.io-index";
src = fetchCratesIo { inherit name version; sha256 = "615120c7a2431d16cf1cf979e7fc31ba7a5b5e5707b29c8a99e5dbf8a8392a33"; };
features = builtins.concatLists [
[ "std" ]
];
dependencies = {
libc = rustPackages."registry+https://github.com/rust-lang/crates.io-index".libc."0.2.103" { inherit profileName; };
};
buildDependencies = {
cc = buildRustPackages."registry+https://github.com/rust-lang/crates.io-index".cc."1.0.71" { profileName = "__noProfile"; };
};
});
} }

View file

@ -74,6 +74,8 @@ data_dir = "/var/lib/garage/data"
replication_mode = "3" replication_mode = "3"
compression_level = 2
rpc_bind_addr = "[::]:3901" rpc_bind_addr = "[::]:3901"
rpc_public_addr = "<this node's public IP>:3901" rpc_public_addr = "<this node's public IP>:3901"
rpc_secret = "<RPC secret>" rpc_secret = "<RPC secret>"

View file

@ -10,6 +10,8 @@ block_size = 1048576
replication_mode = "3" replication_mode = "3"
compression_level = 1
rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6" rpc_secret = "4425f5c26c5e11581d3223904324dcb5b5d5dfb14e5e7f35e38c595424f5f1e6"
rpc_bind_addr = "[::]:3901" rpc_bind_addr = "[::]:3901"
rpc_public_addr = "[fc00:1::1]:3901" rpc_public_addr = "[fc00:1::1]:3901"
@ -98,6 +100,30 @@ Never run a Garage cluster where that is not the case.**
Changing the `replication_mode` of a cluster might work (make sure to shut down all nodes Changing the `replication_mode` of a cluster might work (make sure to shut down all nodes
and changing it everywhere at the time), but is not officially supported. and changing it everywhere at the time), but is not officially supported.
### `compression_level`
Zstd compression level to use for storing blocks.
Values between `1` (faster compression) and `19` (smaller file) are standard compression
levels for zstd. From `20` to `22`, compression levels are referred as "ultra" and must be
used with extra care as it will use lot of memory. A value of `0` will let zstd choose a
default value (currently `3`). Finally, zstd has also compression designed to be faster
than default compression levels, they range from `-1` (smaller file) to `-99` (faster
compression).
If you do not specify a `compression_level` entry, garage will set it to `1` for you. With
this parameters, zstd consumes low amount of cpu and should work faster than line speed in
most situations, while saving some space and intra-cluster
bandwidth.
If you want to totally deactivate zstd in garage, you can pass the special value `'none'`. No
zstd related code will be called, your chunks will be stored on disk without any processing.
Compression is done synchronously, setting a value too high will add latency to write queries.
This value can be different between nodes, compression is done by the node which receive the
API call.
#### `rpc_secret` #### `rpc_secret`
Garage uses a secret key that is shared between all nodes of the cluster Garage uses a secret key that is shared between all nodes of the cluster

View file

@ -30,6 +30,11 @@ dd if=/dev/urandom of=/tmp/garage.1.rnd bs=1k count=2 # No multipart, inline sto
dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5 # No multipart but file will be chunked dd if=/dev/urandom of=/tmp/garage.2.rnd bs=1M count=5 # No multipart but file will be chunked
dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10 # by default, AWS starts using multipart at 8MB dd if=/dev/urandom of=/tmp/garage.3.rnd bs=1M count=10 # by default, AWS starts using multipart at 8MB
# data of lower entropy, to test compression
dd if=/dev/urandom bs=1k count=2 | base64 -w0 > /tmp/garage.1.b64
dd if=/dev/urandom bs=1M count=5 | base64 -w0 > /tmp/garage.2.b64
dd if=/dev/urandom bs=1M count=10 | base64 -w0 > /tmp/garage.3.b64
echo "🧪 S3 API testing..." echo "🧪 S3 API testing..."
# AWS # AWS
@ -37,11 +42,11 @@ if [ -z "$SKIP_AWS" ]; then
echo "🛠️ Testing with awscli" echo "🛠️ Testing with awscli"
source ${SCRIPT_FOLDER}/dev-env-aws.sh source ${SCRIPT_FOLDER}/dev-env-aws.sh
aws s3 ls aws s3 ls
for idx in $(seq 1 3); do for idx in {1..3}.{rnd,b64}; do
aws s3 cp "/tmp/garage.$idx.rnd" "s3://eprouvette/&+-é\"/garage.$idx.aws" aws s3 cp "/tmp/garage.$idx" "s3://eprouvette/&+-é\"/garage.$idx.aws"
aws s3 ls s3://eprouvette aws s3 ls s3://eprouvette
aws s3 cp "s3://eprouvette/&+-é\"/garage.$idx.aws" "/tmp/garage.$idx.dl" aws s3 cp "s3://eprouvette/&+-é\"/garage.$idx.aws" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl
aws s3 rm "s3://eprouvette/&+-é\"/garage.$idx.aws" aws s3 rm "s3://eprouvette/&+-é\"/garage.$idx.aws"
done done
@ -52,11 +57,11 @@ if [ -z "$SKIP_S3CMD" ]; then
echo "🛠️ Testing with s3cmd" echo "🛠️ Testing with s3cmd"
source ${SCRIPT_FOLDER}/dev-env-s3cmd.sh source ${SCRIPT_FOLDER}/dev-env-s3cmd.sh
s3cmd ls s3cmd ls
for idx in $(seq 1 3); do for idx in {1..3}.{rnd,b64}; do
s3cmd put "/tmp/garage.$idx.rnd" "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" s3cmd put "/tmp/garage.$idx" "s3://eprouvette/&+-é\"/garage.$idx.s3cmd"
s3cmd ls s3://eprouvette s3cmd ls s3://eprouvette
s3cmd get "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" "/tmp/garage.$idx.dl" s3cmd get "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl
s3cmd rm "s3://eprouvette/&+-é\"/garage.$idx.s3cmd" s3cmd rm "s3://eprouvette/&+-é\"/garage.$idx.s3cmd"
done done
@ -67,11 +72,11 @@ if [ -z "$SKIP_MC" ]; then
echo "🛠️ Testing with mc (minio client)" echo "🛠️ Testing with mc (minio client)"
source ${SCRIPT_FOLDER}/dev-env-mc.sh source ${SCRIPT_FOLDER}/dev-env-mc.sh
mc ls garage/ mc ls garage/
for idx in $(seq 1 3); do for idx in {1..3}.{rnd,b64}; do
mc cp "/tmp/garage.$idx.rnd" "garage/eprouvette/&+-é\"/garage.$idx.mc" mc cp "/tmp/garage.$idx" "garage/eprouvette/&+-é\"/garage.$idx.mc"
mc ls garage/eprouvette mc ls garage/eprouvette
mc cp "garage/eprouvette/&+-é\"/garage.$idx.mc" "/tmp/garage.$idx.dl" mc cp "garage/eprouvette/&+-é\"/garage.$idx.mc" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl
mc rm "garage/eprouvette/&+-é\"/garage.$idx.mc" mc rm "garage/eprouvette/&+-é\"/garage.$idx.mc"
done done
@ -82,13 +87,13 @@ if [ -z "$SKIP_RCLONE" ]; then
echo "🛠️ Testing with rclone" echo "🛠️ Testing with rclone"
source ${SCRIPT_FOLDER}/dev-env-rclone.sh source ${SCRIPT_FOLDER}/dev-env-rclone.sh
rclone lsd garage: rclone lsd garage:
for idx in $(seq 1 3); do for idx in {1..3}.{rnd,b64}; do
cp /tmp/garage.$idx.rnd /tmp/garage.$idx.dl cp /tmp/garage.$idx /tmp/garage.$idx.dl
rclone copy "/tmp/garage.$idx.dl" "garage:eprouvette/&+-é\"/" rclone copy "/tmp/garage.$idx.dl" "garage:eprouvette/&+-é\"/"
rm /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl
rclone ls garage:eprouvette rclone ls garage:eprouvette
rclone copy "garage:eprouvette/&+-é\"/garage.$idx.dl" "/tmp/" rclone copy "garage:eprouvette/&+-é\"/garage.$idx.dl" "/tmp/"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl
rclone delete "garage:eprouvette/&+-é\"/garage.$idx.dl" rclone delete "garage:eprouvette/&+-é\"/garage.$idx.dl"
done done
@ -100,17 +105,17 @@ if [ -z "$SKIP_DUCK" ]; then
source ${SCRIPT_FOLDER}/dev-env-duck.sh source ${SCRIPT_FOLDER}/dev-env-duck.sh
duck --list garage:/ duck --list garage:/
duck --mkdir "garage:/eprouvette/duck" duck --mkdir "garage:/eprouvette/duck"
for idx in $(seq 1 3); do for idx in {1..3}.{rnd,b64}; do
duck --verbose --upload "garage:/eprouvette/duck/" "/tmp/garage.$idx.rnd" duck --verbose --upload "garage:/eprouvette/duck/" "/tmp/garage.$idx"
duck --list garage:/eprouvette/duck/ duck --list garage:/eprouvette/duck/
duck --download "garage:/eprouvette/duck/garage.$idx.rnd" "/tmp/garage.$idx.dl" duck --download "garage:/eprouvette/duck/garage.$idx" "/tmp/garage.$idx.dl"
diff /tmp/garage.$idx.rnd /tmp/garage.$idx.dl diff /tmp/garage.$idx /tmp/garage.$idx.dl
rm /tmp/garage.$idx.dl rm /tmp/garage.$idx.dl
duck --delete "garage:/eprouvette/duck/garage.$idx.dk" duck --delete "garage:/eprouvette/duck/garage.$idx.dk"
done done
fi fi
rm /tmp/garage.{1,2,3}.rnd rm /tmp/garage.{1..3}.{rnd,b64}
if [ -z "$SKIP_AWS" ]; then if [ -z "$SKIP_AWS" ]; then
echo "🧪 Website Testing" echo "🧪 Website Testing"

View file

@ -23,6 +23,7 @@ arc-swap = "1.0"
hex = "0.4" hex = "0.4"
log = "0.4" log = "0.4"
rand = "0.8" rand = "0.8"
zstd = { version = "0.9", default-features = false }
sled = "0.34" sled = "0.34"

View file

@ -11,6 +11,7 @@ use serde::{Deserialize, Serialize};
use tokio::fs; use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt}; use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio::sync::{watch, Mutex, Notify}; use tokio::sync::{watch, Mutex, Notify};
use zstd::stream::{decode_all as zstd_decode, Encoder};
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
@ -55,22 +56,81 @@ pub enum BlockRpc {
GetBlock(Hash), GetBlock(Hash),
/// Message to send a block of data, either because requested, of for first delivery of new /// Message to send a block of data, either because requested, of for first delivery of new
/// block /// block
PutBlock(PutBlockMessage), PutBlock {
hash: Hash,
data: DataBlock,
},
/// Ask other node if they should have this block, but don't actually have it /// Ask other node if they should have this block, but don't actually have it
NeedBlockQuery(Hash), NeedBlockQuery(Hash),
/// Response : whether the node do require that block /// Response : whether the node do require that block
NeedBlockReply(bool), NeedBlockReply(bool),
} }
/// Structure used to send a block /// A possibly compressed block of data
#[derive(Debug, Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct PutBlockMessage { pub enum DataBlock {
/// Hash of the block /// Uncompressed data
pub hash: Hash, Plain(#[serde(with = "serde_bytes")] Vec<u8>),
/// Data compressed with zstd
Compressed(#[serde(with = "serde_bytes")] Vec<u8>),
}
/// Content of the block impl DataBlock {
#[serde(with = "serde_bytes")] /// Query whether this block is compressed
pub data: Vec<u8>, pub fn is_compressed(&self) -> bool {
matches!(self, DataBlock::Compressed(_))
}
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
/// instead
pub fn inner_buffer(&self) -> &[u8] {
use DataBlock::*;
let (Plain(ref res) | Compressed(ref res)) = self;
res
}
/// Get the buffer, possibly decompressing it, and verify it's integrity.
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
/// is used instead.
pub fn verify_get(self, hash: Hash) -> Result<Vec<u8>, Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(&data) == hash {
Ok(data)
} else {
Err(Error::CorruptData(hash))
}
}
DataBlock::Compressed(data) => {
zstd_decode(&data[..]).map_err(|_| Error::CorruptData(hash))
}
}
}
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
/// does not return the buffer content.
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
match self {
DataBlock::Plain(data) => {
if blake2sum(data) == hash {
Ok(())
} else {
Err(Error::CorruptData(hash))
}
}
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
.map_err(|_| Error::CorruptData(hash)),
}
}
pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
if let Some(level) = level {
if let Ok(data) = zstd_encode(&data[..], level) {
return DataBlock::Compressed(data);
}
}
DataBlock::Plain(data)
}
} }
impl Rpc for BlockRpc { impl Rpc for BlockRpc {
@ -138,10 +198,8 @@ impl BlockManager {
block_manager block_manager
} }
// ---- Public interface ---- /// Ask nodes that might have a (possibly compressed) block for it
async fn rpc_get_raw_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
let who = self.replication.read_nodes(hash); let who = self.replication.read_nodes(hash);
let resps = self let resps = self
.system .system
@ -158,8 +216,8 @@ impl BlockManager {
.await?; .await?;
for resp in resps { for resp in resps {
if let BlockRpc::PutBlock(msg) = resp { if let BlockRpc::PutBlock { data, .. } = resp {
return Ok(msg.data); return Ok(data);
} }
} }
Err(Error::Message(format!( Err(Error::Message(format!(
@ -168,15 +226,30 @@ impl BlockManager {
))) )))
} }
// ---- Public interface ----
/// Ask nodes that might have a block for it
pub async fn rpc_get_block(&self, hash: &Hash) -> Result<Vec<u8>, Error> {
self.rpc_get_raw_block(hash).await?.verify_get(*hash)
}
/// Send block to nodes that should have it /// Send block to nodes that should have it
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> { pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
let who = self.replication.write_nodes(&hash); let who = self.replication.write_nodes(&hash);
let compression_level = self
.garage
.load()
.as_ref()
.unwrap()
.config
.compression_level;
let data = DataBlock::from_buffer(data, compression_level);
self.system self.system
.rpc .rpc
.try_call_many( .try_call_many(
&self.endpoint, &self.endpoint,
&who[..], &who[..],
BlockRpc::PutBlock(PutBlockMessage { hash, data }), BlockRpc::PutBlock { hash, data },
RequestStrategy::with_priority(PRIO_NORMAL) RequestStrategy::with_priority(PRIO_NORMAL)
.with_quorum(self.replication.write_quorum()) .with_quorum(self.replication.write_quorum())
.with_timeout(BLOCK_RW_TIMEOUT), .with_timeout(BLOCK_RW_TIMEOUT),
@ -306,7 +379,7 @@ impl BlockManager {
// ---- Reading and writing blocks locally ---- // ---- Reading and writing blocks locally ----
/// Write a block to disk /// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &[u8]) -> Result<BlockRpc, Error> { async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
self.mutation_lock self.mutation_lock
.lock() .lock()
.await .await
@ -316,21 +389,31 @@ impl BlockManager {
/// Read block from disk, verifying it's integrity /// Read block from disk, verifying it's integrity
async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> { async fn read_block(&self, hash: &Hash) -> Result<BlockRpc, Error> {
let path = self.block_path(hash); let mut path = self.block_path(hash);
let compressed = match self.is_block_compressed(hash).await {
let mut f = match fs::File::open(&path).await { Ok(c) => c,
Ok(f) => f,
Err(e) => { Err(e) => {
// Not found but maybe we should have had it ?? // Not found but maybe we should have had it ??
self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?; self.put_to_resync(hash, 2 * BLOCK_RW_TIMEOUT)?;
return Err(Into::into(e)); return Err(Into::into(e));
} }
}; };
if compressed {
path.set_extension("zst");
}
let mut f = fs::File::open(&path).await?;
let mut data = vec![]; let mut data = vec![];
f.read_to_end(&mut data).await?; f.read_to_end(&mut data).await?;
drop(f); drop(f);
if blake2sum(&data[..]) != *hash { let data = if compressed {
DataBlock::Compressed(data)
} else {
DataBlock::Plain(data)
};
if data.verify(*hash).is_err() {
self.mutation_lock self.mutation_lock
.lock() .lock()
.await .await
@ -340,7 +423,7 @@ impl BlockManager {
return Err(Error::CorruptData(*hash)); return Err(Error::CorruptData(*hash));
} }
Ok(BlockRpc::PutBlock(PutBlockMessage { hash: *hash, data })) Ok(BlockRpc::PutBlock { hash: *hash, data })
} }
/// Check if this node should have a block, but don't actually have it /// Check if this node should have a block, but don't actually have it
@ -362,13 +445,25 @@ impl BlockManager {
path path
} }
/// Utility: give the full path where a block should be found /// Utility: give the full path where a block should be found, minus extension if block is
/// compressed
fn block_path(&self, hash: &Hash) -> PathBuf { fn block_path(&self, hash: &Hash) -> PathBuf {
let mut path = self.block_dir(hash); let mut path = self.block_dir(hash);
path.push(hex::encode(hash.as_ref())); path.push(hex::encode(hash.as_ref()));
path path
} }
/// Utility: check if block is stored compressed. Error if block is not stored
async fn is_block_compressed(&self, hash: &Hash) -> Result<bool, Error> {
let mut path = self.block_path(hash);
path.set_extension("zst");
if fs::metadata(&path).await.is_ok() {
return Ok(true);
}
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
// ---- Resync loop ---- // ---- Resync loop ----
pub fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
@ -550,8 +645,8 @@ impl BlockManager {
hash hash
); );
let block_data = self.rpc_get_block(hash).await?; let block_data = self.rpc_get_raw_block(hash).await?;
self.write_block(hash, &block_data[..]).await?; self.write_block(hash, &block_data).await?;
} }
Ok(()) Ok(())
@ -602,6 +697,7 @@ impl BlockManager {
}; };
let ent_type = data_dir_ent.file_type().await?; let ent_type = data_dir_ent.file_type().await?;
let name = name.strip_suffix(".zst").unwrap_or(&name);
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
state = self state = self
.for_each_file_rec(&data_dir_ent.path(), state, f, must_exit) .for_each_file_rec(&data_dir_ent.path(), state, f, must_exit)
@ -631,7 +727,7 @@ impl EndpointHandler<BlockRpc> for BlockManager {
_from: NodeID, _from: NodeID,
) -> Result<BlockRpc, Error> { ) -> Result<BlockRpc, Error> {
match message { match message {
BlockRpc::PutBlock(m) => self.write_block(&m.hash, &m.data).await, BlockRpc::PutBlock { hash, data } => self.write_block(hash, data).await,
BlockRpc::GetBlock(h) => self.read_block(h).await, BlockRpc::GetBlock(h) => self.read_block(h).await,
BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply), BlockRpc::NeedBlockQuery(h) => self.need_block(h).await.map(BlockRpc::NeedBlockReply),
_ => Err(Error::BadRpc("Unexpected RPC message".to_string())), _ => Err(Error::BadRpc("Unexpected RPC message".to_string())),
@ -650,9 +746,7 @@ impl BlockManagerLocked {
hash: &Hash, hash: &Hash,
mgr: &BlockManager, mgr: &BlockManager,
) -> Result<BlockStatus, Error> { ) -> Result<BlockStatus, Error> {
let path = mgr.block_path(hash); let exists = mgr.is_block_compressed(hash).await.is_ok();
let exists = fs::metadata(&path).await.is_ok();
let needed = mgr.get_block_rc(hash)?; let needed = mgr.get_block_rc(hash)?;
Ok(BlockStatus { exists, needed }) Ok(BlockStatus { exists, needed })
@ -661,16 +755,31 @@ impl BlockManagerLocked {
async fn write_block( async fn write_block(
&self, &self,
hash: &Hash, hash: &Hash,
data: &[u8], data: &DataBlock,
mgr: &BlockManager, mgr: &BlockManager,
) -> Result<BlockRpc, Error> { ) -> Result<BlockRpc, Error> {
let compressed = data.is_compressed();
let data = data.inner_buffer();
let mut path = mgr.block_dir(hash); let mut path = mgr.block_dir(hash);
fs::create_dir_all(&path).await?; fs::create_dir_all(&path).await?;
path.push(hex::encode(hash)); path.push(hex::encode(hash));
if fs::metadata(&path).await.is_ok() { let to_delete = match (mgr.is_block_compressed(hash).await, compressed) {
return Ok(BlockRpc::Ok); (Ok(true), _) => return Ok(BlockRpc::Ok),
(Ok(false), false) => return Ok(BlockRpc::Ok),
(Ok(false), true) => {
let path_to_delete = path.clone();
path.set_extension("zst");
Some(path_to_delete)
} }
(Err(_), compressed) => {
if compressed {
path.set_extension("zst");
}
None
}
};
let mut path2 = path.clone(); let mut path2 = path.clone();
path2.set_extension("tmp"); path2.set_extension("tmp");
@ -679,6 +788,9 @@ impl BlockManagerLocked {
drop(f); drop(f);
fs::rename(path2, path).await?; fs::rename(path2, path).await?;
if let Some(to_delete) = to_delete {
fs::remove_file(to_delete).await?;
}
Ok(BlockRpc::Ok) Ok(BlockRpc::Ok)
} }
@ -688,9 +800,14 @@ impl BlockManagerLocked {
"Block {:?} is corrupted. Renaming to .corrupted and resyncing.", "Block {:?} is corrupted. Renaming to .corrupted and resyncing.",
hash hash
); );
let path = mgr.block_path(hash); let mut path = mgr.block_path(hash);
let mut path2 = path.clone(); let mut path2 = path.clone();
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
path2.set_extension("zst.corrupted");
} else {
path2.set_extension("corrupted"); path2.set_extension("corrupted");
}
fs::rename(path, path2).await?; fs::rename(path, path2).await?;
Ok(()) Ok(())
} }
@ -699,7 +816,10 @@ impl BlockManagerLocked {
let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?; let BlockStatus { exists, needed } = self.check_block_status(hash, mgr).await?;
if exists && needed.is_deletable() { if exists && needed.is_deletable() {
let path = mgr.block_path(hash); let mut path = mgr.block_path(hash);
if mgr.is_block_compressed(hash).await? {
path.set_extension("zst");
}
fs::remove_file(path).await?; fs::remove_file(path).await?;
} }
Ok(()) Ok(())
@ -806,3 +926,12 @@ impl RcEntry {
!self.is_deletable() !self.is_deletable()
} }
} }
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
let mut result = Vec::<u8>::new();
let mut encoder = Encoder::new(&mut result, level)?;
encoder.include_checksum(true)?;
std::io::copy(&mut source, &mut encoder)?;
encoder.finish()?;
Ok(result)
}

View file

@ -30,6 +30,13 @@ pub struct Config {
// (we can add more aliases for this later) // (we can add more aliases for this later)
pub replication_mode: String, pub replication_mode: String,
/// Zstd compression level used on data blocks
#[serde(
deserialize_with = "deserialize_compression",
default = "default_compression"
)]
pub compression_level: Option<i32>,
/// RPC secret key: 32 bytes hex encoded /// RPC secret key: 32 bytes hex encoded
pub rpc_secret: String, pub rpc_secret: String,
@ -123,3 +130,57 @@ where
Ok(ret) Ok(ret)
} }
fn default_compression() -> Option<i32> {
Some(1)
}
fn deserialize_compression<'de, D>(deserializer: D) -> Result<Option<i32>, D::Error>
where
D: de::Deserializer<'de>,
{
use std::convert::TryFrom;
struct OptionVisitor;
impl<'de> serde::de::Visitor<'de> for OptionVisitor {
type Value = Option<i32>;
fn expecting(&self, formatter: &mut std::fmt::Formatter) -> std::fmt::Result {
formatter.write_str("int or 'none'")
}
fn visit_str<E>(self, value: &str) -> Result<Self::Value, E>
where
E: de::Error,
{
if value.eq_ignore_ascii_case("none") {
Ok(None)
} else {
Err(E::custom(format!(
"Invalid compression level: '{}', should be a number, or 'none'",
value
)))
}
}
fn visit_i64<E>(self, v: i64) -> Result<Self::Value, E>
where
E: de::Error,
{
i32::try_from(v)
.map(Some)
.map_err(|_| E::custom("Compression level out of bound".to_owned()))
}
fn visit_u64<E>(self, v: u64) -> Result<Self::Value, E>
where
E: de::Error,
{
i32::try_from(v)
.map(Some)
.map_err(|_| E::custom("Compression level out of bound".to_owned()))
}
}
deserializer.deserialize_any(OptionVisitor)
}