Refactor stuff to indeed release resources

This commit is contained in:
Alex 2022-07-13 16:14:10 +02:00
parent 3b256de3dc
commit 2a0aa0d42c
Signed by: lx
GPG Key ID: 0E496D15096376BE
1 changed files with 24 additions and 35 deletions

View File

@ -471,61 +471,50 @@ impl K2vWatch {
let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
tokio::spawn(Self::watcher_task(
tokio::spawn(Self::background_task(
Arc::downgrade(&watch),
creds.k2v_client()?,
tx,
));
tokio::spawn(Self::updater_task(
Arc::downgrade(&watch),
creds.k2v_client()?,
));
Ok(watch)
}
async fn watcher_task(
async fn background_task(
self_weak: Weak<Self>,
k2v: K2vClient,
tx: watch::Sender<Option<CausalityToken>>,
) {
let mut ct = None;
while let Some(this) = Weak::upgrade(&self_weak) {
info!("bayou k2v watch loop iter: ct = {:?}", ct);
let update = tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r,
debug!(
"bayou k2v watch bg loop iter ({}, {}): ct = {:?}",
this.pk, this.sk, ct
);
match update {
Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(cv) => {
if tx.send(Some(cv.causality.clone())).is_err() {
break;
}
ct = Some(cv.causality);
}
}
}
info!("bayou k2v watch loop exiting");
}
async fn updater_task(self_weak: Weak<Self>, k2v: K2vClient) {
while let Some(this) = Weak::upgrade(&self_weak) {
let ct: Option<CausalityToken> = this.rx.borrow().clone();
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => (),
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => {
match update {
Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(cv) => {
if tx.send(Some(cv.causality.clone())).is_err() {
break;
}
ct = Some(cv.causality);
}
}
}
_ = this.notify.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
if let Err(e) = k2v
.insert_item(
&this.pk,
&this.sk,
rand,
ct,
ct.clone(),
)
.await
{
@ -533,9 +522,9 @@ impl K2vWatch {
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
)
);
}
info!("bayou k2v watch updater loop exiting");
info!("bayou k2v watch bg loop exiting");
}
}