garage/src/util/async_hash.rs

62 lines
1.4 KiB
Rust
Raw Normal View History

2022-07-18 17:18:47 +02:00
use bytes::Bytes;
use digest::Digest;
use tokio::sync::mpsc;
2022-07-18 17:18:47 +02:00
use tokio::task::JoinHandle;
use crate::data::*;
/// Compute the sha256 of a slice,
/// spawning on a tokio thread for CPU-intensive processing
/// The argument has to be an owned Bytes, as it is moved out to a new thread.
pub async fn async_sha256sum(data: Bytes) -> Hash {
tokio::task::spawn_blocking(move || sha256sum(&data))
.await
.unwrap()
}
/// Compute the blake2sum of a slice,
/// spawning on a tokio thread for CPU-intensive processing.
/// The argument has to be an owned Bytes, as it is moved out to a new thread.
pub async fn async_blake2sum(data: Bytes) -> Hash {
tokio::task::spawn_blocking(move || blake2sum(&data))
.await
.unwrap()
}
// ----
pub struct AsyncHasher<D: Digest> {
sendblk: mpsc::Sender<Bytes>,
2022-07-18 17:18:47 +02:00
task: JoinHandle<digest::Output<D>>,
}
impl<D: Digest> AsyncHasher<D> {
pub fn new() -> Self {
let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1);
2022-07-18 17:18:47 +02:00
let task = tokio::task::spawn_blocking(move || {
let mut digest = D::new();
while let Some(blk) = recvblk.blocking_recv() {
2022-07-18 17:18:47 +02:00
digest.update(&blk[..]);
}
digest.finalize()
});
Self { sendblk, task }
}
pub async fn update(&self, b: Bytes) {
self.sendblk.send(b).await.unwrap();
2022-07-18 17:18:47 +02:00
}
pub async fn finalize(self) -> digest::Output<D> {
drop(self.sendblk);
self.task.await.unwrap()
}
}
2022-07-22 18:37:20 +02:00
impl<D: Digest> Default for AsyncHasher<D> {
fn default() -> Self {
Self::new()
}
}