Small improvements to compression code
Some checks failed
continuous-integration/drone/push Build is failing
Some checks failed
continuous-integration/drone/push Build is failing
This commit is contained in:
parent
307858bf0a
commit
163ee977a0
3 changed files with 24 additions and 10 deletions
|
@ -35,6 +35,9 @@ data_replication_factor = 3
|
||||||
meta_replication_factor = 3
|
meta_replication_factor = 3
|
||||||
meta_epidemic_fanout = 3
|
meta_epidemic_fanout = 3
|
||||||
|
|
||||||
|
enable_compression = true
|
||||||
|
compressin_level = 10
|
||||||
|
|
||||||
[s3_api]
|
[s3_api]
|
||||||
api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part.
|
api_bind_addr = "0.0.0.0:$((3910+$count))" # the S3 API port, HTTP without TLS. Add a reverse proxy for the TLS part.
|
||||||
s3_region = "garage" # set this to anything. S3 API calls will fail if they are not made against the region set here.
|
s3_region = "garage" # set this to anything. S3 API calls will fail if they are not made against the region set here.
|
||||||
|
|
|
@ -196,12 +196,12 @@ impl BlockManager {
|
||||||
|
|
||||||
let mut f = fs::File::create(path.clone()).await?;
|
let mut f = fs::File::create(path.clone()).await?;
|
||||||
f.write_all(&buffer).await?;
|
f.write_all(&buffer).await?;
|
||||||
|
drop(f);
|
||||||
|
|
||||||
if clean_plain {
|
if clean_plain {
|
||||||
path.set_extension("");
|
path.set_extension("");
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
}
|
}
|
||||||
drop(f);
|
|
||||||
|
|
||||||
Ok(Message::Ok)
|
Ok(Message::Ok)
|
||||||
}
|
}
|
||||||
|
@ -516,17 +516,25 @@ impl BlockManager {
|
||||||
/// Send block to nodes that should have it
|
/// Send block to nodes that should have it
|
||||||
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
||||||
let garage = self.garage.load_full().unwrap();
|
let garage = self.garage.load_full().unwrap();
|
||||||
let compressed = zstd_encode(&data[..], garage.config.compression_level);
|
|
||||||
let message = if compressed.is_ok() && compressed.as_ref().unwrap().len() < data.len() {
|
let compressed = if garage.config.enable_compression {
|
||||||
Message::PutBlock {
|
zstd_encode(&data[..], garage.config.compression_level).ok()
|
||||||
hash,
|
|
||||||
data: BlockData::Compressed(compressed.unwrap()),
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
Message::PutBlock {
|
None
|
||||||
|
};
|
||||||
|
|
||||||
|
// If compressed data is not less than 7/8 of the size of the original data, i.e. if we
|
||||||
|
// don't gain a significant margin by compressing, then we store the plain data instead
|
||||||
|
// so that we don't lose time decompressing it on reads.
|
||||||
|
let block_data = if compressed.is_some() && compressed.as_ref().unwrap().len() < (data.len() * 7) / 8 {
|
||||||
|
BlockData::Compressed(compressed.unwrap())
|
||||||
|
} else {
|
||||||
|
BlockData::Plain(data)
|
||||||
|
};
|
||||||
|
|
||||||
|
let message = Message::PutBlock {
|
||||||
hash,
|
hash,
|
||||||
data: BlockData::Plain(data),
|
data: block_data,
|
||||||
}
|
|
||||||
};
|
};
|
||||||
let who = self.replication.write_nodes(&hash);
|
let who = self.replication.write_nodes(&hash);
|
||||||
self.rpc_client
|
self.rpc_client
|
||||||
|
|
|
@ -45,6 +45,9 @@ pub struct Config {
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub data_replication_factor: usize,
|
pub data_replication_factor: usize,
|
||||||
|
|
||||||
|
/// Enable Zstd compression of block data
|
||||||
|
pub enable_compression: bool,
|
||||||
|
|
||||||
/// Zstd compression level used on data blocks
|
/// Zstd compression level used on data blocks
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub compression_level: i32,
|
pub compression_level: i32,
|
||||||
|
|
Loading…
Reference in a new issue