From 1b2e1296eb99630e969e585ede0424072adc2d0c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Mon, 18 Jul 2022 17:18:47 +0200 Subject: [PATCH] Compute hashes on dedicated threads --- Cargo.lock | 95 +++++++++++++++++++++++++++++++++------- src/api/Cargo.toml | 8 ++-- src/api/s3/copy.rs | 5 ++- src/api/s3/put.rs | 32 +++++++++----- src/api/signature/mod.rs | 14 +++--- src/block/block.rs | 17 ++++--- src/block/manager.rs | 6 ++- src/garage/Cargo.toml | 2 +- src/util/Cargo.toml | 4 +- src/util/async_hash.rs | 55 +++++++++++++++++++++++ src/util/lib.rs | 1 + 11 files changed, 188 insertions(+), 51 deletions(-) create mode 100644 src/util/async_hash.rs diff --git a/Cargo.lock b/Cargo.lock index e1ccfc2d..260ae907 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -343,7 +343,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174" dependencies = [ "crypto-mac 0.8.0", - "digest", + "digest 0.9.0", "opaque-debug", ] @@ -356,6 +356,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "block-buffer" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324" +dependencies = [ + "generic-array", +] + [[package]] name = "bumpalo" version = "3.9.1" @@ -589,6 +598,16 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "crypto-common" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" +dependencies = [ + "generic-array", + "typenum", +] + [[package]] name = "crypto-mac" version = "0.8.0" @@ -693,6 +712,17 @@ dependencies = [ "generic-array", ] +[[package]] +name = "digest" +version = "0.10.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506" +dependencies = [ + "block-buffer 0.10.2", + "crypto-common", + "subtle", +] + [[package]] name = "dirs-next" version = "2.0.0" @@ -991,7 +1021,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "sha2", + "sha2 0.10.2", "static_init", "structopt", "timeago", @@ -1008,7 +1038,7 @@ dependencies = [ "base64", "bytes 1.1.0", "chrono", - "crypto-mac 0.10.1", + "crypto-common", "err-derive 0.3.1", "form_urlencoded", "futures", @@ -1019,13 +1049,13 @@ dependencies = [ "garage_table 0.7.0", "garage_util 0.7.0", "hex", - "hmac 0.10.1", + "hmac 0.12.1", "http", "http-range", "httpdate 0.3.2", "hyper", "idna", - "md-5", + "md-5 0.10.1", "multer", "nom", "opentelemetry", @@ -1039,7 +1069,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "sha2", + "sha2 0.10.2", "tokio", "tracing", "url", @@ -1259,7 +1289,7 @@ dependencies = [ "rmp-serde 0.15.5", "serde", "serde_json", - "sha2", + "sha2 0.9.9", "sled", "tokio", "toml", @@ -1272,7 +1302,9 @@ version = "0.7.0" dependencies = [ "async-trait", "blake2", + "bytes 1.1.0", "chrono", + "digest 0.10.3", "err-derive 0.3.1", "futures", "garage_db", @@ -1285,7 +1317,7 @@ dependencies = [ "rmp-serde 0.15.5", "serde", "serde_json", - "sha2", + "sha2 0.10.2", "tokio", "toml", "tracing", @@ -1485,7 +1517,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" dependencies = [ "crypto-mac 0.10.1", - "digest", + "digest 0.9.0", ] [[package]] @@ -1495,7 +1527,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b" dependencies = [ "crypto-mac 0.11.1", - "digest", + "digest 0.9.0", +] + +[[package]] +name = "hmac" +version = "0.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" +dependencies = [ + "digest 0.10.3", ] [[package]] @@ -1975,11 +2016,20 @@ version = "0.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" dependencies = [ - "block-buffer", - "digest", + "block-buffer 0.9.0", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "md-5" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "658646b21e0b72f7866c7038ab086d3d5e1cd6271f060fd37defb241949d0582" +dependencies = [ + "digest 0.10.3", +] + [[package]] name = "md5" version = "0.7.0" @@ -3000,20 +3050,20 @@ dependencies = [ "base64", "bytes 1.1.0", "chrono", - "digest", + "digest 0.9.0", "futures", "hex", "hmac 0.11.0", "http", "hyper", "log", - "md-5", + "md-5 0.9.1", "percent-encoding", "pin-project-lite", "rusoto_credential", "rustc_version", "serde", - "sha2", + "sha2 0.9.9", "tokio", ] @@ -3246,13 +3296,24 @@ version = "0.9.9" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" dependencies = [ - "block-buffer", + "block-buffer 0.9.0", "cfg-if 1.0.0", "cpufeatures", - "digest", + "digest 0.9.0", "opaque-debug", ] +[[package]] +name = "sha2" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676" +dependencies = [ + "cfg-if 1.0.0", + "cpufeatures", + "digest 0.10.3", +] + [[package]] name = "shlex" version = "1.1.0" diff --git a/src/api/Cargo.toml b/src/api/Cargo.toml index db77cf38..901cb959 100644 --- a/src/api/Cargo.toml +++ b/src/api/Cargo.toml @@ -24,15 +24,15 @@ async-trait = "0.1.7" base64 = "0.13" bytes = "1.0" chrono = "0.4" -crypto-mac = "0.10" +crypto-common = "0.1" err-derive = "0.3" hex = "0.4" -hmac = "0.10" +hmac = "0.12" idna = "0.2" tracing = "0.1.30" -md-5 = "0.9" +md-5 = "0.10" nom = "7.1" -sha2 = "0.9" +sha2 = "0.10" futures = "0.3" futures-util = "0.3" diff --git a/src/api/s3/copy.rs b/src/api/s3/copy.rs index 0fc16993..4415a037 100644 --- a/src/api/s3/copy.rs +++ b/src/api/s3/copy.rs @@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy( // we need to insert that data as a new block. async move { if must_upload { - garage2.block_manager.rpc_put_block(final_hash, data).await + garage2 + .block_manager + .rpc_put_block(final_hash, data.into()) + .await } else { Ok(()) } diff --git a/src/api/s3/put.rs b/src/api/s3/put.rs index 9ef37421..fbfa6f0d 100644 --- a/src/api/s3/put.rs +++ b/src/api/s3/put.rs @@ -9,6 +9,7 @@ use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use sha2::Sha256; use garage_table::*; +use garage_util::async_hash::*; use garage_util::data::*; use garage_util::error::Error as GarageError; use garage_util::time::*; @@ -130,7 +131,8 @@ pub(crate) async fn save_stream> + Unpin>( garage.version_table.insert(&version).await?; // Transfer data and verify checksum - let first_block_hash = blake2sum(&first_block[..]); + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; let tx_result = (|| async { let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks( @@ -273,14 +275,16 @@ async fn read_and_put_blocks> + Unpin>( garage: &Garage, version: &Version, part_number: u64, - first_block: Vec, + first_block: Bytes, first_block_hash: Hash, chunker: &mut StreamChunker, ) -> Result<(u64, GenericArray, Hash), Error> { - let mut md5hasher = Md5::new(); - let mut sha256hasher = Sha256::new(); - md5hasher.update(&first_block[..]); - sha256hasher.update(&first_block[..]); + let first_block = Bytes::from(first_block); + + let md5hasher = AsyncHasher::::new(); + let sha256hasher = AsyncHasher::::new(); + md5hasher.update(first_block.clone()); + sha256hasher.update(first_block.clone()); let mut next_offset = first_block.len(); let mut put_curr_version_block = put_block_meta( @@ -302,9 +306,10 @@ async fn read_and_put_blocks> + Unpin>( chunker.next(), )?; if let Some(block) = next_block { - md5hasher.update(&block[..]); - sha256hasher.update(&block[..]); - let block_hash = blake2sum(&block[..]); + let block = Bytes::from(block); + md5hasher.update(block.clone()); + sha256hasher.update(block.clone()); + let block_hash = async_blake2sum(block.clone()).await; let block_len = block.len(); put_curr_version_block = put_block_meta( garage, @@ -322,9 +327,9 @@ async fn read_and_put_blocks> + Unpin>( } let total_size = next_offset as u64; - let data_md5sum = md5hasher.finalize(); + let data_md5sum = md5hasher.finalize().await; - let data_sha256sum = sha256hasher.finalize(); + let data_sha256sum = sha256hasher.finalize().await; let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap(); Ok((total_size, data_md5sum, data_sha256sum)) @@ -504,7 +509,10 @@ pub async fn handle_put_part( // Copy block to store let version = Version::new(version_uuid, bucket_id, key, false); - let first_block_hash = blake2sum(&first_block[..]); + + let first_block = Bytes::from(first_block); + let first_block_hash = async_blake2sum(first_block.clone()).await; + let (_, data_md5sum, data_sha256sum) = read_and_put_blocks( &garage, &version, diff --git a/src/api/signature/mod.rs b/src/api/signature/mod.rs index dd5b590c..4b8b990f 100644 --- a/src/api/signature/mod.rs +++ b/src/api/signature/mod.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use hmac::{Hmac, Mac, NewMac}; +use hmac::{Hmac, Mac}; use sha2::Sha256; use garage_util::data::{sha256sum, Hash}; @@ -29,17 +29,17 @@ pub fn signing_hmac( secret_key: &str, region: &str, service: &str, -) -> Result { +) -> Result { let secret = String::from("AWS4") + secret_key; - let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?; + let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?; date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes()); - let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?; + let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?; region_hmac.update(region.as_bytes()); - let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?; + let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?; service_hmac.update(service.as_bytes()); - let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?; + let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?; signing_hmac.update(b"aws4_request"); - let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?; + let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?; Ok(hmac) } diff --git a/src/block/block.rs b/src/block/block.rs index 4d3fbcb8..f17bd2c0 100644 --- a/src/block/block.rs +++ b/src/block/block.rs @@ -1,3 +1,4 @@ +use bytes::Bytes; use serde::{Deserialize, Serialize}; use zstd::stream::{decode_all as zstd_decode, Encoder}; @@ -61,13 +62,17 @@ impl DataBlock { } } - pub fn from_buffer(data: Vec, level: Option) -> DataBlock { - if let Some(level) = level { - if let Ok(data) = zstd_encode(&data[..], level) { - return DataBlock::Compressed(data); + pub async fn from_buffer(data: Bytes, level: Option) -> DataBlock { + tokio::task::spawn_blocking(move || { + if let Some(level) = level { + if let Ok(data) = zstd_encode(&data[..], level) { + return DataBlock::Compressed(data); + } } - } - DataBlock::Plain(data) + DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here + }) + .await + .unwrap() } } diff --git a/src/block/manager.rs b/src/block/manager.rs index 017ba9da..890c247d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -5,6 +5,7 @@ use std::time::Duration; use arc_swap::ArcSwapOption; use async_trait::async_trait; +use bytes::Bytes; use serde::{Deserialize, Serialize}; use futures::future::*; @@ -211,14 +212,15 @@ impl BlockManager { } /// Send block to nodes that should have it - pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { + pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { let who = self.replication.write_nodes(&hash); - let data = DataBlock::from_buffer(data, self.compression_level); + let data = DataBlock::from_buffer(data, self.compression_level).await; self.system .rpc .try_call_many( &self.endpoint, &who[..], + // TODO: remove to_vec() here BlockRpc::PutBlock { hash, data }, RequestStrategy::with_priority(PRIO_NORMAL) .with_quorum(self.replication.write_quorum()) diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 8948e750..80802a16 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -65,7 +65,7 @@ chrono = "0.4" http = "0.2" hmac = "0.10" hyper = { version = "0.14", features = ["client", "http1", "runtime"] } -sha2 = "0.9" +sha2 = "0.10" static_init = "1.0" assert-json-diff = "2.0" diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index 57c70ffb..7d79f21a 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -18,12 +18,14 @@ garage_db = { version = "0.8.0", path = "../db" } async-trait = "0.1" blake2 = "0.9" +bytes = "1.0" +digest = "0.10" err-derive = "0.3" xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] } hex = "0.4" tracing = "0.1.30" rand = "0.8" -sha2 = "0.9" +sha2 = "0.10" chrono = "0.4" rmp-serde = "0.15" diff --git a/src/util/async_hash.rs b/src/util/async_hash.rs new file mode 100644 index 00000000..67776eb9 --- /dev/null +++ b/src/util/async_hash.rs @@ -0,0 +1,55 @@ +use bytes::Bytes; +use digest::Digest; + +use tokio::sync::mpsc; +use tokio::task::JoinHandle; + +use crate::data::*; + +/// Compute the sha256 of a slice, +/// spawning on a tokio thread for CPU-intensive processing +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_sha256sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || sha256sum(&data)) + .await + .unwrap() +} + +/// Compute the blake2sum of a slice, +/// spawning on a tokio thread for CPU-intensive processing. +/// The argument has to be an owned Bytes, as it is moved out to a new thread. +pub async fn async_blake2sum(data: Bytes) -> Hash { + tokio::task::spawn_blocking(move || blake2sum(&data)) + .await + .unwrap() +} + +// ---- + +pub struct AsyncHasher { + sendblk: mpsc::UnboundedSender, + task: JoinHandle>, +} + +impl AsyncHasher { + pub fn new() -> Self { + let (sendblk, mut recvblk) = mpsc::unbounded_channel::(); + let task = tokio::task::spawn_blocking(move || { + let mut digest = D::new(); + while let Some(blk) = recvblk.blocking_recv() { + digest.update(&blk[..]); + } + digest.finalize() + }); + Self { sendblk, task } + } + + pub fn update(&self, b: Bytes) { + self.sendblk.send(b).unwrap() + } + + pub async fn finalize(self) -> digest::Output { + drop(self.sendblk); + self.task.await.unwrap() + } +} diff --git a/src/util/lib.rs b/src/util/lib.rs index fce151af..7152f92a 100644 --- a/src/util/lib.rs +++ b/src/util/lib.rs @@ -3,6 +3,7 @@ #[macro_use] extern crate tracing; +pub mod async_hash; pub mod background; pub mod config; pub mod crdt;