diff --git a/src/bayou.rs b/src/bayou.rs index 4d33a8e..2d83ce3 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -471,61 +471,50 @@ impl K2vWatch { let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); - tokio::spawn(Self::watcher_task( + tokio::spawn(Self::background_task( Arc::downgrade(&watch), creds.k2v_client()?, tx, )); - tokio::spawn(Self::updater_task( - Arc::downgrade(&watch), - creds.k2v_client()?, - )); Ok(watch) } - async fn watcher_task( + async fn background_task( self_weak: Weak, k2v: K2vClient, tx: watch::Sender>, ) { let mut ct = None; while let Some(this) = Weak::upgrade(&self_weak) { - info!("bayou k2v watch loop iter: ct = {:?}", ct); - let update = tokio::select!( - _ = tokio::time::sleep(Duration::from_secs(60)) => continue, - r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r, + debug!( + "bayou k2v watch bg loop iter ({}, {}): ct = {:?}", + this.pk, this.sk, ct ); - match update { - Err(e) => { - error!("Error in bayou k2v wait value changed: {}", e); - tokio::time::sleep(Duration::from_secs(30)).await; - } - Ok(cv) => { - if tx.send(Some(cv.causality.clone())).is_err() { - break; - } - ct = Some(cv.causality); - } - } - } - info!("bayou k2v watch loop exiting"); - } - - async fn updater_task(self_weak: Weak, k2v: K2vClient) { - while let Some(this) = Weak::upgrade(&self_weak) { - let ct: Option = this.rx.borrow().clone(); - let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); - tokio::select!( - _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = tokio::time::sleep(Duration::from_secs(60)) => continue, + update = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => { + match update { + Err(e) => { + error!("Error in bayou k2v wait value changed: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(cv) => { + if tx.send(Some(cv.causality.clone())).is_err() { + break; + } + ct = Some(cv.causality); + } + } + } _ = this.notify.notified() => { + let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); if let Err(e) = k2v .insert_item( &this.pk, &this.sk, rand, - ct, + ct.clone(), ) .await { @@ -533,9 +522,9 @@ impl K2vWatch { tokio::time::sleep(Duration::from_secs(30)).await; } } - ) + ); } - info!("bayou k2v watch updater loop exiting"); + info!("bayou k2v watch bg loop exiting"); } }