Begin add watch mechanism to Bayou

This commit is contained in:
Alex 2022-07-13 12:30:35 +02:00
parent 33fa51021c
commit 9fa2e958b3
Signed by: lx
GPG key ID: 0E496D15096376BE
4 changed files with 142 additions and 36 deletions

View file

@ -1,18 +1,21 @@
use std::str::FromStr; use std::str::FromStr;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use log::debug; use log::{debug, error, info};
use rand::prelude::*; use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::{watch, Notify};
use k2v_client::{BatchDeleteOp, BatchReadOp, Filter, K2vClient, K2vValue}; use k2v_client::{BatchDeleteOp, BatchReadOp, CausalityToken, Filter, K2vClient, K2vValue};
use rusoto_s3::{ use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
}; };
use crate::cryptoblob::*; use crate::cryptoblob::*;
use crate::k2v_util::k2v_wait_value_changed;
use crate::login::Credentials; use crate::login::Credentials;
use crate::time::now_msec; use crate::time::now_msec;
@ -34,6 +37,8 @@ const CHECKPOINT_MIN_OPS: usize = 16;
// between processes doing .checkpoint() and those doing .sync() // between processes doing .checkpoint() and those doing .sync()
const CHECKPOINTS_TO_KEEP: usize = 3; const CHECKPOINTS_TO_KEEP: usize = 3;
const WATCH_SK: &str = "watch";
pub trait BayouState: pub trait BayouState:
Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
{ {
@ -52,8 +57,12 @@ pub struct Bayou<S: BayouState> {
checkpoint: (Timestamp, S), checkpoint: (Timestamp, S),
history: Vec<(Timestamp, S::Op, Option<S>)>, history: Vec<(Timestamp, S::Op, Option<S>)>,
last_sync: Option<Instant>, last_sync: Option<Instant>,
last_try_checkpoint: Option<Instant>, last_try_checkpoint: Option<Instant>,
watch: Arc<K2vWatch>,
last_sync_watch_ct: Option<CausalityToken>,
} }
impl<S: BayouState> Bayou<S> { impl<S: BayouState> Bayou<S> {
@ -61,6 +70,8 @@ impl<S: BayouState> Bayou<S> {
let k2v_client = creds.k2v_client()?; let k2v_client = creds.k2v_client()?;
let s3_client = creds.s3_client()?; let s3_client = creds.s3_client()?;
let watch = K2vWatch::new(creds, path.clone(), WATCH_SK.to_string())?;
Ok(Self { Ok(Self {
bucket: creds.bucket().to_string(), bucket: creds.bucket().to_string(),
path, path,
@ -71,6 +82,8 @@ impl<S: BayouState> Bayou<S> {
history: vec![], history: vec![],
last_sync: None, last_sync: None,
last_try_checkpoint: None, last_try_checkpoint: None,
watch,
last_sync_watch_ct: None,
}) })
} }
@ -106,7 +119,7 @@ impl<S: BayouState> Bayou<S> {
}; };
if self.checkpoint.0 > checkpoint.0 { if self.checkpoint.0 > checkpoint.0 {
bail!("Existing checkpoint is more recent than stored one"); bail!("Loaded checkpoint is more recent than stored one");
} }
if let Some(ck) = checkpoint.1 { if let Some(ck) = checkpoint.1 {
@ -132,7 +145,7 @@ impl<S: BayouState> Bayou<S> {
partition_key: &self.path, partition_key: &self.path,
filter: Filter { filter: Filter {
start: Some(&ts_ser), start: Some(&ts_ser),
end: None, end: Some(WATCH_SK),
prefix: None, prefix: None,
limit: None, limit: None,
reverse: false, reverse: false,
@ -186,12 +199,9 @@ impl<S: BayouState> Bayou<S> {
let i0 = self let i0 = self
.history .history
.iter() .iter()
.enumerate()
.zip(ops.iter()) .zip(ops.iter())
.skip_while(|((_, (ts1, _, _)), (ts2, _))| ts1 == ts2) .take_while(|((ts1, _, _), (ts2, _))| ts1 == ts2)
.map(|((i, _), _)| i) .count();
.next()
.unwrap_or(self.history.len());
if ops.len() > i0 { if ops.len() > i0 {
// Remove operations from first position where histories differ // Remove operations from first position where histories differ
@ -259,6 +269,8 @@ impl<S: BayouState> Bayou<S> {
) )
.await?; .await?;
self.watch.notify.notify_one();
let new_state = self.state().apply(&op); let new_state = self.state().apply(&op);
self.history.push((ts, op, Some(new_state))); self.history.push((ts, op, Some(new_state)));
@ -427,6 +439,95 @@ impl<S: BayouState> Bayou<S> {
} }
} }
// ---- Bayou watch in K2V ----
struct K2vWatch {
pk: String,
sk: String,
rx: watch::Receiver<Option<CausalityToken>>,
notify: Notify,
}
impl K2vWatch {
/// Creates a new watch and launches subordinate threads.
/// These threads hold Weak pointers to the struct;
/// the exit when the Arc is dropped.
fn new(creds: &Credentials, pk: String, sk: String) -> Result<Arc<Self>> {
let (tx, rx) = watch::channel::<Option<CausalityToken>>(None);
let notify = Notify::new();
let watch = Arc::new(K2vWatch { pk, sk, rx, notify });
tokio::spawn(Self::watcher_task(
Arc::downgrade(&watch),
creds.k2v_client()?,
tx,
));
tokio::spawn(Self::updater_task(
Arc::downgrade(&watch),
creds.k2v_client()?,
));
Ok(watch)
}
async fn watcher_task(
self_weak: Weak<Self>,
k2v: K2vClient,
tx: watch::Sender<Option<CausalityToken>>,
) {
let mut ct = None;
while let Some(this) = Weak::upgrade(&self_weak) {
info!("bayou k2v watch loop iter: ct = {:?}", ct);
let update = tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
r = k2v_wait_value_changed(&k2v, &this.pk, &this.sk, &ct) => r,
);
match update {
Err(e) => {
error!("Error in bayou k2v wait value changed: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
Ok(cv) => {
if tx.send(Some(cv.causality.clone())).is_err() {
break;
}
ct = Some(cv.causality);
}
}
}
info!("bayou k2v watch loop exiting");
}
async fn updater_task(self_weak: Weak<Self>, k2v: K2vClient) {
while let Some(this) = Weak::upgrade(&self_weak) {
let ct: Option<CausalityToken> = this.rx.borrow().clone();
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
tokio::select!(
_ = tokio::time::sleep(Duration::from_secs(60)) => (),
_ = this.notify.notified() => {
if let Err(e) = k2v
.insert_item(
&this.pk,
&this.sk,
rand,
ct,
)
.await
{
error!("Error in bayou k2v watch updater loop: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await;
}
}
)
}
info!("bayou k2v watch updater loop exiting");
}
}
// ---- TIMESTAMP CLASS ----
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)] #[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
pub struct Timestamp { pub struct Timestamp {
pub msec: u64, pub msec: u64,

29
src/k2v_util.rs Normal file
View file

@ -0,0 +1,29 @@
use anyhow::Result;
use k2v_client::{CausalValue, CausalityToken, K2vClient};
// ---- UTIL: function to wait for a value to have changed in K2V ----
pub async fn k2v_wait_value_changed(
k2v: &K2vClient,
pk: &str,
sk: &str,
prev_ct: &Option<CausalityToken>,
) -> Result<CausalValue> {
loop {
if let Some(ct) = prev_ct {
match k2v.poll_item(pk, sk, ct.clone(), None).await? {
None => continue,
Some(cv) => return Ok(cv),
}
} else {
match k2v.read_item(pk, sk).await {
Err(k2v_client::Error::NotFound) => {
k2v.insert_item(pk, sk, vec![0u8], None).await?;
}
Err(e) => return Err(e.into()),
Ok(cv) => return Ok(cv),
}
}
}
}

View file

@ -6,7 +6,7 @@ use std::time::Duration;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use futures::{future::BoxFuture, FutureExt}; use futures::{future::BoxFuture, FutureExt};
use k2v_client::{CausalValue, CausalityToken, K2vClient, K2vValue}; use k2v_client::{CausalityToken, K2vClient, K2vValue};
use rusoto_s3::{ use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3, DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
}; };
@ -15,6 +15,7 @@ use tokio::sync::watch;
use tracing::{error, info, warn}; use tracing::{error, info, warn};
use crate::cryptoblob; use crate::cryptoblob;
use crate::k2v_util::k2v_wait_value_changed;
use crate::login::{Credentials, PublicCredentials}; use crate::login::{Credentials, PublicCredentials};
use crate::mail::mailbox::Mailbox; use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::ImapUidvalidity; use crate::mail::uidindex::ImapUidvalidity;
@ -408,32 +409,6 @@ async fn k2v_lock_loop_internal(
} }
} }
// ---- UTIL: function to wait for a value to have changed in K2V ----
async fn k2v_wait_value_changed<'a>(
k2v: &'a K2vClient,
pk: &'static str,
sk: &'static str,
prev_ct: &'a Option<CausalityToken>,
) -> Result<CausalValue> {
loop {
if let Some(ct) = prev_ct {
match k2v.poll_item(pk, sk, ct.clone(), None).await? {
None => continue,
Some(cv) => return Ok(cv),
}
} else {
match k2v.read_item(pk, sk).await {
Err(k2v_client::Error::NotFound) => {
k2v.insert_item(pk, sk, vec![0u8], None).await?;
}
Err(e) => return Err(e.into()),
Ok(cv) => return Ok(cv),
}
}
}
}
// ---- LMTP SIDE: storing messages encrypted with user's pubkey ---- // ---- LMTP SIDE: storing messages encrypted with user's pubkey ----
pub struct EncryptedMessage { pub struct EncryptedMessage {

View file

@ -2,6 +2,7 @@ mod bayou;
mod config; mod config;
mod cryptoblob; mod cryptoblob;
mod imap; mod imap;
mod k2v_util;
mod lmtp; mod lmtp;
mod login; mod login;
mod mail; mod mail;