Simplify+improve async hasher by using bounded channel
continuous-integration/drone/push Build is passing Details
continuous-integration/drone/pr Build is passing Details

This commit is contained in:
Alex 2022-09-12 16:23:43 +02:00
parent d9d199a6c9
commit f91fab8582
Signed by: lx
GPG Key ID: 0E496D15096376BE
1 changed files with 5 additions and 8 deletions

View File

@ -1,7 +1,7 @@
use bytes::Bytes; use bytes::Bytes;
use digest::Digest; use digest::Digest;
use tokio::sync::{mpsc, oneshot}; use tokio::sync::mpsc;
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::data::*; use crate::data::*;
@ -27,18 +27,17 @@ pub async fn async_blake2sum(data: Bytes) -> Hash {
// ---- // ----
pub struct AsyncHasher<D: Digest> { pub struct AsyncHasher<D: Digest> {
sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>, sendblk: mpsc::Sender<Bytes>,
task: JoinHandle<digest::Output<D>>, task: JoinHandle<digest::Output<D>>,
} }
impl<D: Digest> AsyncHasher<D> { impl<D: Digest> AsyncHasher<D> {
pub fn new() -> Self { pub fn new() -> Self {
let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>(); let (sendblk, mut recvblk) = mpsc::channel::<Bytes>(1);
let task = tokio::task::spawn_blocking(move || { let task = tokio::task::spawn_blocking(move || {
let mut digest = D::new(); let mut digest = D::new();
while let Some((blk, ch)) = recvblk.blocking_recv() { while let Some(blk) = recvblk.blocking_recv() {
digest.update(&blk[..]); digest.update(&blk[..]);
let _ = ch.send(());
} }
digest.finalize() digest.finalize()
}); });
@ -46,9 +45,7 @@ impl<D: Digest> AsyncHasher<D> {
} }
pub async fn update(&self, b: Bytes) { pub async fn update(&self, b: Bytes) {
let (tx, rx) = oneshot::channel(); self.sendblk.send(b).await.unwrap();
self.sendblk.send((b, tx)).unwrap();
let _ = rx.await;
} }
pub async fn finalize(self) -> digest::Output<D> { pub async fn finalize(self) -> digest::Output<D> {