From 9fa2e958b3b37538b80b7f26107b7df2238f335b Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Jul 2022 12:30:35 +0200 Subject: [PATCH] Begin add watch mechanism to Bayou --- src/bayou.rs | 119 +++++++++++++++++++++++++++++++++++++++---- src/k2v_util.rs | 29 +++++++++++ src/mail/incoming.rs | 29 +---------- src/main.rs | 1 + 4 files changed, 142 insertions(+), 36 deletions(-) create mode 100644 src/k2v_util.rs diff --git a/src/bayou.rs b/src/bayou.rs index 7a76222..f6e0fb7 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -1,18 +1,21 @@ use std::str::FromStr; +use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; -use log::debug; +use log::{debug, error, info}; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::io::AsyncReadExt; +use tokio::sync::{watch, Notify}; -use k2v_client::{BatchDeleteOp, BatchReadOp, Filter, K2vClient, K2vValue}; +use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue}; use rusoto_s3::{ DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, }; use crate::cryptoblob::*; +use crate::k2v_util::k2v_wait_value_changed; use crate::login::Credentials; use crate::time::now_msec; @@ -34,6 +37,8 @@ const CHECKPOINT_MIN_OPS: usize = 16; // between processes doing .checkpoint() and those doing .sync() const CHECKPOINTS_TO_KEEP: usize = 3; +const WATCH_SK: &str = "watch"; + pub trait BayouState: Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static { @@ -52,8 +57,12 @@ pub struct Bayou { checkpoint: (Timestamp, S), history: Vec<(Timestamp, S::Op, Option)>, + last_sync: Option, last_try_checkpoint: Option, + + watch: Arc, + last_sync_watch_ct: Option, } impl Bayou { @@ -61,6 +70,8 @@ impl Bayou { let k2v_client = creds.k2v_client()?; let s3_client = creds.s3_client()?; + let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?; + Ok(Self { bucket: creds.bucket().to_string(), path, @@ -71,6 +82,8 @@ impl Bayou { history: vec![], last_sync: None, last_try_checkpoint: None, + watch, + last_sync_watch_ct: None, }) } @@ -106,7 +119,7 @@ impl Bayou { }; if self.checkpoint.0 > checkpoint.0 { - bail!("Existing checkpoint is more recent than stored one"); + bail!("Loaded checkpoint is more recent than stored one"); } if let Some(ck) = checkpoint.1 { @@ -132,7 +145,7 @@ impl Bayou { partition_key: &self.path, filter: Filter { start: Some(&ts_ser), - end: None, + end: Some(WATCH_SK), prefix: None, limit: None, reverse: false, @@ -186,12 +199,9 @@ impl Bayou { let i0 = self .history .iter() - .enumerate() .zip(ops.iter()) - .skip_while(|((_, (ts1, _, _)), (ts2, _))| ts1 == ts2) - .map(|((i, _), _)| i) - .next() - .unwrap_or(self.history.len()); + .take_while(|((ts1, _, _), (ts2, _))| ts1 == ts2) + .count(); if ops.len() > i0 { // Remove operations from first position where histories differ @@ -259,6 +269,8 @@ impl Bayou { ) .await?; + self.watch.notify.notify_one(); + let new_state = self.state().apply(&op); self.history.push((ts, op, Some(new_state))); @@ -427,6 +439,95 @@ impl Bayou { } } +// ---- Bayou watch in K2V ---- + +struct K2vWatch { + pk: String, + sk: String, + rx: watch::Receiver>, + notify: Notify, +} + +impl K2vWatch { + /// Creates a new watch and launches subordinate threads. + /// These threads hold Weak pointers to the struct; + /// the exit when the Arc is dropped. + fn new(creds: &Credentials, pk: String, sk: String) -> Result> { + let (tx, rx) = watch::channel::>(None); + let notify = Notify::new(); + + let watch = Arc::new(K2vWatch { pk, sk, rx, notify }); + + tokio::spawn(Self::watcher_task( + Arc::downgrade(&watch), + creds.k2v_client()?, + tx, + )); + tokio::spawn(Self::updater_task( + Arc::downgrade(&watch), + creds.k2v_client()?, + )); + + Ok(watch) + } + + async fn watcher_task( + self_weak: Weak, + k2v: K2vClient, + tx: watch::Sender>, + ) { + let mut ct = None; + while let Some(this) = Weak::upgrade(&self_weak) { + info!("bayou k2v watch loop iter: ct = {:?}", ct); + let update = tokio::select!( + _ = tokio::time::sleep(Duration::from_secs(60)) => continue, + r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r, + ); + match update { + Err(e) => { + error!("Error in bayou k2v wait value changed: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + Ok(cv) => { + if tx.send(Some(cv.causality.clone())).is_err() { + break; + } + ct = Some(cv.causality); + } + } + } + info!("bayou k2v watch loop exiting"); + } + + async fn updater_task(self_weak: Weak, k2v: K2vClient) { + while let Some(this) = Weak::upgrade(&self_weak) { + let ct: Option = this.rx.borrow().clone(); + let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); + + tokio::select!( + _ = tokio::time::sleep(Duration::from_secs(60)) => (), + _ = this.notify.notified() => { + if let Err(e) = k2v + .insert_item( + &this.pk, + &this.sk, + rand, + ct, + ) + .await + { + error!("Error in bayou k2v watch updater loop: {}", e); + tokio::time::sleep(Duration::from_secs(30)).await; + } + } + ) + } + info!("bayou k2v watch updater loop exiting"); + } +} + +// ---- TIMESTAMP CLASS ---- + #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] pub struct Timestamp { pub msec: u64, diff --git a/src/k2v_util.rs b/src/k2v_util.rs new file mode 100644 index 0000000..9dadab4 --- /dev/null +++ b/src/k2v_util.rs @@ -0,0 +1,29 @@ +use anyhow::Result; + +use k2v_client::{CausalValue, CausalityToken, K2vClient}; + +// ---- UTIL: function to wait for a value to have changed in K2V ---- + +pub async fn k2v_wait_value_changed( + k2v: &K2vClient, + pk: &str, + sk: &str, + prev_ct: &Option, +) -> Result { + loop { + if let Some(ct) = prev_ct { + match k2v.poll_item(pk, sk, ct.clone(), None).await? { + None => continue, + Some(cv) => return Ok(cv), + } + } else { + match k2v.read_item(pk, sk).await { + Err(k2v_client::Error::NotFound) => { + k2v.insert_item(pk, sk, vec![0u8], None).await?; + } + Err(e) => return Err(e.into()), + Ok(cv) => return Ok(cv), + } + } + } +} diff --git a/src/mail/incoming.rs b/src/mail/incoming.rs index 9643985..66513bf 100644 --- a/src/mail/incoming.rs +++ b/src/mail/incoming.rs @@ -6,7 +6,7 @@ use std::time::Duration; use anyhow::{anyhow, bail, Result}; use futures::{future::BoxFuture, FutureExt}; -use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; +use k2v_client::{CausalityToken, K2vClient, K2vValue}; use rusoto_s3::{ DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, }; @@ -15,6 +15,7 @@ use tokio::sync::watch; use tracing::{error, info, warn}; use crate::cryptoblob; +use crate::k2v_util::k2v_wait_value_changed; use crate::login::{Credentials, PublicCredentials}; use crate::mail::mailbox::Mailbox; use crate::mail::uidindex::ImapUidvalidity; @@ -408,32 +409,6 @@ async fn k2v_lock_loop_internal( } } -// ---- UTIL: function to wait for a value to have changed in K2V ---- - -async fn k2v_wait_value_changed<'a>( - k2v: &'a K2vClient, - pk: &'static str, - sk: &'static str, - prev_ct: &'a Option, -) -> Result { - loop { - if let Some(ct) = prev_ct { - match k2v.poll_item(pk, sk, ct.clone(), None).await? { - None => continue, - Some(cv) => return Ok(cv), - } - } else { - match k2v.read_item(pk, sk).await { - Err(k2v_client::Error::NotFound) => { - k2v.insert_item(pk, sk, vec![0u8], None).await?; - } - Err(e) => return Err(e.into()), - Ok(cv) => return Ok(cv), - } - } - } -} - // ---- LMTP SIDE: storing messages encrypted with user's pubkey ---- pub struct EncryptedMessage { diff --git a/src/main.rs b/src/main.rs index b27c891..a4e22ff 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod bayou; mod config; mod cryptoblob; mod imap; +mod k2v_util; mod lmtp; mod login; mod mail;