forked from Deuxfleurs/garage
Compute hashes on dedicated threads
This commit is contained in:
parent
a184f0d0b5
commit
1b2e1296eb
11 changed files with 188 additions and 51 deletions
95
Cargo.lock
generated
95
Cargo.lock
generated
|
@ -343,7 +343,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174"
|
checksum = "0a4e37d16930f5459780f5621038b6382b9bb37c19016f39fb6b5808d831f174"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crypto-mac 0.8.0",
|
"crypto-mac 0.8.0",
|
||||||
"digest",
|
"digest 0.9.0",
|
||||||
"opaque-debug",
|
"opaque-debug",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -356,6 +356,15 @@ dependencies = [
|
||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "block-buffer"
|
||||||
|
version = "0.10.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "0bf7fe51849ea569fd452f37822f606a5cabb684dc918707a0193fd4664ff324"
|
||||||
|
dependencies = [
|
||||||
|
"generic-array",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "bumpalo"
|
name = "bumpalo"
|
||||||
version = "3.9.1"
|
version = "3.9.1"
|
||||||
|
@ -589,6 +598,16 @@ dependencies = [
|
||||||
"lazy_static",
|
"lazy_static",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "crypto-common"
|
||||||
|
version = "0.1.6"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
|
||||||
|
dependencies = [
|
||||||
|
"generic-array",
|
||||||
|
"typenum",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "crypto-mac"
|
name = "crypto-mac"
|
||||||
version = "0.8.0"
|
version = "0.8.0"
|
||||||
|
@ -693,6 +712,17 @@ dependencies = [
|
||||||
"generic-array",
|
"generic-array",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "digest"
|
||||||
|
version = "0.10.3"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "f2fb860ca6fafa5552fb6d0e816a69c8e49f0908bf524e30a90d97c85892d506"
|
||||||
|
dependencies = [
|
||||||
|
"block-buffer 0.10.2",
|
||||||
|
"crypto-common",
|
||||||
|
"subtle",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "dirs-next"
|
name = "dirs-next"
|
||||||
version = "2.0.0"
|
version = "2.0.0"
|
||||||
|
@ -991,7 +1021,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2 0.10.2",
|
||||||
"static_init",
|
"static_init",
|
||||||
"structopt",
|
"structopt",
|
||||||
"timeago",
|
"timeago",
|
||||||
|
@ -1008,7 +1038,7 @@ dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"bytes 1.1.0",
|
"bytes 1.1.0",
|
||||||
"chrono",
|
"chrono",
|
||||||
"crypto-mac 0.10.1",
|
"crypto-common",
|
||||||
"err-derive 0.3.1",
|
"err-derive 0.3.1",
|
||||||
"form_urlencoded",
|
"form_urlencoded",
|
||||||
"futures",
|
"futures",
|
||||||
|
@ -1019,13 +1049,13 @@ dependencies = [
|
||||||
"garage_table 0.7.0",
|
"garage_table 0.7.0",
|
||||||
"garage_util 0.7.0",
|
"garage_util 0.7.0",
|
||||||
"hex",
|
"hex",
|
||||||
"hmac 0.10.1",
|
"hmac 0.12.1",
|
||||||
"http",
|
"http",
|
||||||
"http-range",
|
"http-range",
|
||||||
"httpdate 0.3.2",
|
"httpdate 0.3.2",
|
||||||
"hyper",
|
"hyper",
|
||||||
"idna",
|
"idna",
|
||||||
"md-5",
|
"md-5 0.10.1",
|
||||||
"multer",
|
"multer",
|
||||||
"nom",
|
"nom",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
|
@ -1039,7 +1069,7 @@ dependencies = [
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2 0.10.2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"tracing",
|
"tracing",
|
||||||
"url",
|
"url",
|
||||||
|
@ -1259,7 +1289,7 @@ dependencies = [
|
||||||
"rmp-serde 0.15.5",
|
"rmp-serde 0.15.5",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2 0.9.9",
|
||||||
"sled",
|
"sled",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
|
@ -1272,7 +1302,9 @@ version = "0.7.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"blake2",
|
"blake2",
|
||||||
|
"bytes 1.1.0",
|
||||||
"chrono",
|
"chrono",
|
||||||
|
"digest 0.10.3",
|
||||||
"err-derive 0.3.1",
|
"err-derive 0.3.1",
|
||||||
"futures",
|
"futures",
|
||||||
"garage_db",
|
"garage_db",
|
||||||
|
@ -1285,7 +1317,7 @@ dependencies = [
|
||||||
"rmp-serde 0.15.5",
|
"rmp-serde 0.15.5",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"sha2",
|
"sha2 0.10.2",
|
||||||
"tokio",
|
"tokio",
|
||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
|
@ -1485,7 +1517,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
|
checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crypto-mac 0.10.1",
|
"crypto-mac 0.10.1",
|
||||||
"digest",
|
"digest 0.9.0",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1495,7 +1527,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
|
checksum = "2a2a2320eb7ec0ebe8da8f744d7812d9fc4cb4d09344ac01898dbcb6a20ae69b"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"crypto-mac 0.11.1",
|
"crypto-mac 0.11.1",
|
||||||
"digest",
|
"digest 0.9.0",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "hmac"
|
||||||
|
version = "0.12.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e"
|
||||||
|
dependencies = [
|
||||||
|
"digest 0.10.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
@ -1975,11 +2016,20 @@ version = "0.9.1"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15"
|
checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"block-buffer",
|
"block-buffer 0.9.0",
|
||||||
"digest",
|
"digest 0.9.0",
|
||||||
"opaque-debug",
|
"opaque-debug",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "md-5"
|
||||||
|
version = "0.10.1"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "658646b21e0b72f7866c7038ab086d3d5e1cd6271f060fd37defb241949d0582"
|
||||||
|
dependencies = [
|
||||||
|
"digest 0.10.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "md5"
|
name = "md5"
|
||||||
version = "0.7.0"
|
version = "0.7.0"
|
||||||
|
@ -3000,20 +3050,20 @@ dependencies = [
|
||||||
"base64",
|
"base64",
|
||||||
"bytes 1.1.0",
|
"bytes 1.1.0",
|
||||||
"chrono",
|
"chrono",
|
||||||
"digest",
|
"digest 0.9.0",
|
||||||
"futures",
|
"futures",
|
||||||
"hex",
|
"hex",
|
||||||
"hmac 0.11.0",
|
"hmac 0.11.0",
|
||||||
"http",
|
"http",
|
||||||
"hyper",
|
"hyper",
|
||||||
"log",
|
"log",
|
||||||
"md-5",
|
"md-5 0.9.1",
|
||||||
"percent-encoding",
|
"percent-encoding",
|
||||||
"pin-project-lite",
|
"pin-project-lite",
|
||||||
"rusoto_credential",
|
"rusoto_credential",
|
||||||
"rustc_version",
|
"rustc_version",
|
||||||
"serde",
|
"serde",
|
||||||
"sha2",
|
"sha2 0.9.9",
|
||||||
"tokio",
|
"tokio",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
@ -3246,13 +3296,24 @@ version = "0.9.9"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
|
checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"block-buffer",
|
"block-buffer 0.9.0",
|
||||||
"cfg-if 1.0.0",
|
"cfg-if 1.0.0",
|
||||||
"cpufeatures",
|
"cpufeatures",
|
||||||
"digest",
|
"digest 0.9.0",
|
||||||
"opaque-debug",
|
"opaque-debug",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "sha2"
|
||||||
|
version = "0.10.2"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "55deaec60f81eefe3cce0dc50bda92d6d8e88f2a27df7c5033b42afeb1ed2676"
|
||||||
|
dependencies = [
|
||||||
|
"cfg-if 1.0.0",
|
||||||
|
"cpufeatures",
|
||||||
|
"digest 0.10.3",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "shlex"
|
name = "shlex"
|
||||||
version = "1.1.0"
|
version = "1.1.0"
|
||||||
|
|
|
@ -24,15 +24,15 @@ async-trait = "0.1.7"
|
||||||
base64 = "0.13"
|
base64 = "0.13"
|
||||||
bytes = "1.0"
|
bytes = "1.0"
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
crypto-mac = "0.10"
|
crypto-common = "0.1"
|
||||||
err-derive = "0.3"
|
err-derive = "0.3"
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
hmac = "0.10"
|
hmac = "0.12"
|
||||||
idna = "0.2"
|
idna = "0.2"
|
||||||
tracing = "0.1.30"
|
tracing = "0.1.30"
|
||||||
md-5 = "0.9"
|
md-5 = "0.10"
|
||||||
nom = "7.1"
|
nom = "7.1"
|
||||||
sha2 = "0.9"
|
sha2 = "0.10"
|
||||||
|
|
||||||
futures = "0.3"
|
futures = "0.3"
|
||||||
futures-util = "0.3"
|
futures-util = "0.3"
|
||||||
|
|
|
@ -365,7 +365,10 @@ pub async fn handle_upload_part_copy(
|
||||||
// we need to insert that data as a new block.
|
// we need to insert that data as a new block.
|
||||||
async move {
|
async move {
|
||||||
if must_upload {
|
if must_upload {
|
||||||
garage2.block_manager.rpc_put_block(final_hash, data).await
|
garage2
|
||||||
|
.block_manager
|
||||||
|
.rpc_put_block(final_hash, data.into())
|
||||||
|
.await
|
||||||
} else {
|
} else {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
|
use garage_util::async_hash::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::Error as GarageError;
|
use garage_util::error::Error as GarageError;
|
||||||
use garage_util::time::*;
|
use garage_util::time::*;
|
||||||
|
@ -130,7 +131,8 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage.version_table.insert(&version).await?;
|
garage.version_table.insert(&version).await?;
|
||||||
|
|
||||||
// Transfer data and verify checksum
|
// Transfer data and verify checksum
|
||||||
let first_block_hash = blake2sum(&first_block[..]);
|
let first_block = Bytes::from(first_block);
|
||||||
|
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
||||||
|
|
||||||
let tx_result = (|| async {
|
let tx_result = (|| async {
|
||||||
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
let (total_size, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
||||||
|
@ -273,14 +275,16 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage: &Garage,
|
garage: &Garage,
|
||||||
version: &Version,
|
version: &Version,
|
||||||
part_number: u64,
|
part_number: u64,
|
||||||
first_block: Vec<u8>,
|
first_block: Bytes,
|
||||||
first_block_hash: Hash,
|
first_block_hash: Hash,
|
||||||
chunker: &mut StreamChunker<S>,
|
chunker: &mut StreamChunker<S>,
|
||||||
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
|
||||||
let mut md5hasher = Md5::new();
|
let first_block = Bytes::from(first_block);
|
||||||
let mut sha256hasher = Sha256::new();
|
|
||||||
md5hasher.update(&first_block[..]);
|
let md5hasher = AsyncHasher::<Md5>::new();
|
||||||
sha256hasher.update(&first_block[..]);
|
let sha256hasher = AsyncHasher::<Sha256>::new();
|
||||||
|
md5hasher.update(first_block.clone());
|
||||||
|
sha256hasher.update(first_block.clone());
|
||||||
|
|
||||||
let mut next_offset = first_block.len();
|
let mut next_offset = first_block.len();
|
||||||
let mut put_curr_version_block = put_block_meta(
|
let mut put_curr_version_block = put_block_meta(
|
||||||
|
@ -302,9 +306,10 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
chunker.next(),
|
chunker.next(),
|
||||||
)?;
|
)?;
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
md5hasher.update(&block[..]);
|
let block = Bytes::from(block);
|
||||||
sha256hasher.update(&block[..]);
|
md5hasher.update(block.clone());
|
||||||
let block_hash = blake2sum(&block[..]);
|
sha256hasher.update(block.clone());
|
||||||
|
let block_hash = async_blake2sum(block.clone()).await;
|
||||||
let block_len = block.len();
|
let block_len = block.len();
|
||||||
put_curr_version_block = put_block_meta(
|
put_curr_version_block = put_block_meta(
|
||||||
garage,
|
garage,
|
||||||
|
@ -322,9 +327,9 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
}
|
}
|
||||||
|
|
||||||
let total_size = next_offset as u64;
|
let total_size = next_offset as u64;
|
||||||
let data_md5sum = md5hasher.finalize();
|
let data_md5sum = md5hasher.finalize().await;
|
||||||
|
|
||||||
let data_sha256sum = sha256hasher.finalize();
|
let data_sha256sum = sha256hasher.finalize().await;
|
||||||
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
|
let data_sha256sum = Hash::try_from(&data_sha256sum[..]).unwrap();
|
||||||
|
|
||||||
Ok((total_size, data_md5sum, data_sha256sum))
|
Ok((total_size, data_md5sum, data_sha256sum))
|
||||||
|
@ -504,7 +509,10 @@ pub async fn handle_put_part(
|
||||||
|
|
||||||
// Copy block to store
|
// Copy block to store
|
||||||
let version = Version::new(version_uuid, bucket_id, key, false);
|
let version = Version::new(version_uuid, bucket_id, key, false);
|
||||||
let first_block_hash = blake2sum(&first_block[..]);
|
|
||||||
|
let first_block = Bytes::from(first_block);
|
||||||
|
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
||||||
|
|
||||||
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
||||||
&garage,
|
&garage,
|
||||||
&version,
|
&version,
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use chrono::{DateTime, Utc};
|
use chrono::{DateTime, Utc};
|
||||||
use hmac::{Hmac, Mac, NewMac};
|
use hmac::{Hmac, Mac};
|
||||||
use sha2::Sha256;
|
use sha2::Sha256;
|
||||||
|
|
||||||
use garage_util::data::{sha256sum, Hash};
|
use garage_util::data::{sha256sum, Hash};
|
||||||
|
@ -29,17 +29,17 @@ pub fn signing_hmac(
|
||||||
secret_key: &str,
|
secret_key: &str,
|
||||||
region: &str,
|
region: &str,
|
||||||
service: &str,
|
service: &str,
|
||||||
) -> Result<HmacSha256, crypto_mac::InvalidKeyLength> {
|
) -> Result<HmacSha256, crypto_common::InvalidLength> {
|
||||||
let secret = String::from("AWS4") + secret_key;
|
let secret = String::from("AWS4") + secret_key;
|
||||||
let mut date_hmac = HmacSha256::new_varkey(secret.as_bytes())?;
|
let mut date_hmac = HmacSha256::new_from_slice(secret.as_bytes())?;
|
||||||
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
|
date_hmac.update(datetime.format(SHORT_DATE).to_string().as_bytes());
|
||||||
let mut region_hmac = HmacSha256::new_varkey(&date_hmac.finalize().into_bytes())?;
|
let mut region_hmac = HmacSha256::new_from_slice(&date_hmac.finalize().into_bytes())?;
|
||||||
region_hmac.update(region.as_bytes());
|
region_hmac.update(region.as_bytes());
|
||||||
let mut service_hmac = HmacSha256::new_varkey(®ion_hmac.finalize().into_bytes())?;
|
let mut service_hmac = HmacSha256::new_from_slice(®ion_hmac.finalize().into_bytes())?;
|
||||||
service_hmac.update(service.as_bytes());
|
service_hmac.update(service.as_bytes());
|
||||||
let mut signing_hmac = HmacSha256::new_varkey(&service_hmac.finalize().into_bytes())?;
|
let mut signing_hmac = HmacSha256::new_from_slice(&service_hmac.finalize().into_bytes())?;
|
||||||
signing_hmac.update(b"aws4_request");
|
signing_hmac.update(b"aws4_request");
|
||||||
let hmac = HmacSha256::new_varkey(&signing_hmac.finalize().into_bytes())?;
|
let hmac = HmacSha256::new_from_slice(&signing_hmac.finalize().into_bytes())?;
|
||||||
Ok(hmac)
|
Ok(hmac)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,3 +1,4 @@
|
||||||
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
||||||
|
|
||||||
|
@ -61,13 +62,17 @@ impl DataBlock {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn from_buffer(data: Vec<u8>, level: Option<i32>) -> DataBlock {
|
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
|
||||||
|
tokio::task::spawn_blocking(move || {
|
||||||
if let Some(level) = level {
|
if let Some(level) = level {
|
||||||
if let Ok(data) = zstd_encode(&data[..], level) {
|
if let Ok(data) = zstd_encode(&data[..], level) {
|
||||||
return DataBlock::Compressed(data);
|
return DataBlock::Compressed(data);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DataBlock::Plain(data)
|
DataBlock::Plain(data.to_vec()) // TODO: remove to_vec here
|
||||||
|
})
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,6 +5,7 @@ use std::time::Duration;
|
||||||
|
|
||||||
use arc_swap::ArcSwapOption;
|
use arc_swap::ArcSwapOption;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use futures::future::*;
|
use futures::future::*;
|
||||||
|
@ -211,14 +212,15 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send block to nodes that should have it
|
/// Send block to nodes that should have it
|
||||||
pub async fn rpc_put_block(&self, hash: Hash, data: Vec<u8>) -> Result<(), Error> {
|
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
|
||||||
let who = self.replication.write_nodes(&hash);
|
let who = self.replication.write_nodes(&hash);
|
||||||
let data = DataBlock::from_buffer(data, self.compression_level);
|
let data = DataBlock::from_buffer(data, self.compression_level).await;
|
||||||
self.system
|
self.system
|
||||||
.rpc
|
.rpc
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
&who[..],
|
&who[..],
|
||||||
|
// TODO: remove to_vec() here
|
||||||
BlockRpc::PutBlock { hash, data },
|
BlockRpc::PutBlock { hash, data },
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||||
.with_quorum(self.replication.write_quorum())
|
.with_quorum(self.replication.write_quorum())
|
||||||
|
|
|
@ -65,7 +65,7 @@ chrono = "0.4"
|
||||||
http = "0.2"
|
http = "0.2"
|
||||||
hmac = "0.10"
|
hmac = "0.10"
|
||||||
hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
|
hyper = { version = "0.14", features = ["client", "http1", "runtime"] }
|
||||||
sha2 = "0.9"
|
sha2 = "0.10"
|
||||||
|
|
||||||
static_init = "1.0"
|
static_init = "1.0"
|
||||||
assert-json-diff = "2.0"
|
assert-json-diff = "2.0"
|
||||||
|
|
|
@ -18,12 +18,14 @@ garage_db = { version = "0.8.0", path = "../db" }
|
||||||
|
|
||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
blake2 = "0.9"
|
blake2 = "0.9"
|
||||||
|
bytes = "1.0"
|
||||||
|
digest = "0.10"
|
||||||
err-derive = "0.3"
|
err-derive = "0.3"
|
||||||
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
|
xxhash-rust = { version = "0.8", default-features = false, features = ["xxh3"] }
|
||||||
hex = "0.4"
|
hex = "0.4"
|
||||||
tracing = "0.1.30"
|
tracing = "0.1.30"
|
||||||
rand = "0.8"
|
rand = "0.8"
|
||||||
sha2 = "0.9"
|
sha2 = "0.10"
|
||||||
|
|
||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
rmp-serde = "0.15"
|
rmp-serde = "0.15"
|
||||||
|
|
55
src/util/async_hash.rs
Normal file
55
src/util/async_hash.rs
Normal file
|
@ -0,0 +1,55 @@
|
||||||
|
use bytes::Bytes;
|
||||||
|
use digest::Digest;
|
||||||
|
|
||||||
|
use tokio::sync::mpsc;
|
||||||
|
use tokio::task::JoinHandle;
|
||||||
|
|
||||||
|
use crate::data::*;
|
||||||
|
|
||||||
|
/// Compute the sha256 of a slice,
|
||||||
|
/// spawning on a tokio thread for CPU-intensive processing
|
||||||
|
/// The argument has to be an owned Bytes, as it is moved out to a new thread.
|
||||||
|
pub async fn async_sha256sum(data: Bytes) -> Hash {
|
||||||
|
tokio::task::spawn_blocking(move || sha256sum(&data))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Compute the blake2sum of a slice,
|
||||||
|
/// spawning on a tokio thread for CPU-intensive processing.
|
||||||
|
/// The argument has to be an owned Bytes, as it is moved out to a new thread.
|
||||||
|
pub async fn async_blake2sum(data: Bytes) -> Hash {
|
||||||
|
tokio::task::spawn_blocking(move || blake2sum(&data))
|
||||||
|
.await
|
||||||
|
.unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
// ----
|
||||||
|
|
||||||
|
pub struct AsyncHasher<D: Digest> {
|
||||||
|
sendblk: mpsc::UnboundedSender<Bytes>,
|
||||||
|
task: JoinHandle<digest::Output<D>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<D: Digest> AsyncHasher<D> {
|
||||||
|
pub fn new() -> Self {
|
||||||
|
let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>();
|
||||||
|
let task = tokio::task::spawn_blocking(move || {
|
||||||
|
let mut digest = D::new();
|
||||||
|
while let Some(blk) = recvblk.blocking_recv() {
|
||||||
|
digest.update(&blk[..]);
|
||||||
|
}
|
||||||
|
digest.finalize()
|
||||||
|
});
|
||||||
|
Self { sendblk, task }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn update(&self, b: Bytes) {
|
||||||
|
self.sendblk.send(b).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn finalize(self) -> digest::Output<D> {
|
||||||
|
drop(self.sendblk);
|
||||||
|
self.task.await.unwrap()
|
||||||
|
}
|
||||||
|
}
|
|
@ -3,6 +3,7 @@
|
||||||
#[macro_use]
|
#[macro_use]
|
||||||
extern crate tracing;
|
extern crate tracing;
|
||||||
|
|
||||||
|
pub mod async_hash;
|
||||||
pub mod background;
|
pub mod background;
|
||||||
pub mod config;
|
pub mod config;
|
||||||
pub mod crdt;
|
pub mod crdt;
|
||||||
|
|
Loading…
Reference in a new issue