Implement opportunistic sync based on watch value, and use it
This commit is contained in:
parent
9fa2e958b3
commit
cd59be3a00
4 changed files with 57 additions and 30 deletions
41
src/bayou.rs
41
src/bayou.rs
|
@ -19,12 +19,12 @@ use crate::k2v_util::k2v_wait_value_changed;
|
||||||
use crate::login::Credentials;
|
use crate::login::Credentials;
|
||||||
use crate::time::now_msec;
|
use crate::time::now_msec;
|
||||||
|
|
||||||
const SAVE_STATE_EVERY: usize = 64;
|
const KEEP_STATE_EVERY: usize = 64;
|
||||||
|
|
||||||
// Checkpointing interval constants: a checkpoint is not made earlier
|
// Checkpointing interval constants: a checkpoint is not made earlier
|
||||||
// than CHECKPOINT_INTERVAL time after the last one, and is not made
|
// than CHECKPOINT_INTERVAL time after the last one, and is not made
|
||||||
// if there are less than CHECKPOINT_MIN_OPS new operations since last one.
|
// if there are less than CHECKPOINT_MIN_OPS new operations since last one.
|
||||||
const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(3600);
|
const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(6 * 3600);
|
||||||
const CHECKPOINT_MIN_OPS: usize = 16;
|
const CHECKPOINT_MIN_OPS: usize = 16;
|
||||||
// HYPOTHESIS: processes are able to communicate in a synchronous
|
// HYPOTHESIS: processes are able to communicate in a synchronous
|
||||||
// fashion in times that are small compared to CHECKPOINT_INTERVAL.
|
// fashion in times that are small compared to CHECKPOINT_INTERVAL.
|
||||||
|
@ -89,6 +89,9 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
/// Re-reads the state from persistent storage backend
|
/// Re-reads the state from persistent storage backend
|
||||||
pub async fn sync(&mut self) -> Result<()> {
|
pub async fn sync(&mut self) -> Result<()> {
|
||||||
|
let new_last_sync = Some(Instant::now());
|
||||||
|
let new_last_sync_watch_ct = self.watch.rx.borrow().clone();
|
||||||
|
|
||||||
// 1. List checkpoints
|
// 1. List checkpoints
|
||||||
let checkpoints = self.list_checkpoints().await?;
|
let checkpoints = self.list_checkpoints().await?;
|
||||||
debug!("(sync) listed checkpoints: {:?}", checkpoints);
|
debug!("(sync) listed checkpoints: {:?}", checkpoints);
|
||||||
|
@ -225,7 +228,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
// Now, apply all operations retrieved from storage after the common part
|
// Now, apply all operations retrieved from storage after the common part
|
||||||
for (ts, op) in ops.drain(i0..) {
|
for (ts, op) in ops.drain(i0..) {
|
||||||
state = state.apply(&op);
|
state = state.apply(&op);
|
||||||
if (self.history.len() + 1) % SAVE_STATE_EVERY == 0 {
|
if (self.history.len() + 1) % KEEP_STATE_EVERY == 0 {
|
||||||
self.history.push((ts, op, Some(state.clone())));
|
self.history.push((ts, op, Some(state.clone())));
|
||||||
} else {
|
} else {
|
||||||
self.history.push((ts, op, None));
|
self.history.push((ts, op, None));
|
||||||
|
@ -236,22 +239,32 @@ impl<S: BayouState> Bayou<S> {
|
||||||
self.history.last_mut().unwrap().2 = Some(state);
|
self.history.last_mut().unwrap().2 = Some(state);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.last_sync = Some(Instant::now());
|
// Save info that sync has been done
|
||||||
|
self.last_sync = new_last_sync;
|
||||||
|
self.last_sync_watch_ct = new_last_sync_watch_ct;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn check_recent_sync(&mut self) -> Result<()> {
|
/// Does a sync() if either of the two conditions is met:
|
||||||
match self.last_sync {
|
/// - last sync was more than CHECKPOINT_INTERVAL/5 ago
|
||||||
Some(t) if (Instant::now() - t) < CHECKPOINT_INTERVAL / 10 => Ok(()),
|
/// - a change was detected
|
||||||
_ => self.sync().await,
|
pub async fn opportunistic_sync(&mut self) -> Result<()> {
|
||||||
|
let too_old = match self.last_sync {
|
||||||
|
Some(t) => Instant::now() > t + (CHECKPOINT_INTERVAL / 5),
|
||||||
|
_ => true,
|
||||||
|
};
|
||||||
|
let changed = self.last_sync_watch_ct != *self.watch.rx.borrow();
|
||||||
|
if too_old || changed {
|
||||||
|
self.sync().await?;
|
||||||
}
|
}
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Applies a new operation on the state. Once this function returns,
|
/// Applies a new operation on the state. Once this function returns,
|
||||||
/// the option has been safely persisted to storage backend
|
/// the operation has been safely persisted to storage backend.
|
||||||
|
/// Make sure to call `.opportunistic_sync()` before doing this,
|
||||||
|
/// and even before calculating the `op` argument given here.
|
||||||
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
||||||
self.check_recent_sync().await?;
|
|
||||||
|
|
||||||
debug!("(push) add operation: {:?}", op);
|
debug!("(push) add operation: {:?}", op);
|
||||||
|
|
||||||
let ts = Timestamp::after(
|
let ts = Timestamp::after(
|
||||||
|
@ -276,7 +289,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Clear previously saved state in history if not required
|
// Clear previously saved state in history if not required
|
||||||
let hlen = self.history.len();
|
let hlen = self.history.len();
|
||||||
if hlen >= 2 && (hlen - 1) % SAVE_STATE_EVERY != 0 {
|
if hlen >= 2 && (hlen - 1) % KEEP_STATE_EVERY != 0 {
|
||||||
self.history[hlen - 2].2 = None;
|
self.history[hlen - 2].2 = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -288,7 +301,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
/// Save a new checkpoint if previous checkpoint is too old
|
/// Save a new checkpoint if previous checkpoint is too old
|
||||||
pub async fn checkpoint(&mut self) -> Result<()> {
|
pub async fn checkpoint(&mut self) -> Result<()> {
|
||||||
match self.last_try_checkpoint {
|
match self.last_try_checkpoint {
|
||||||
Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()),
|
Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 5 => Ok(()),
|
||||||
_ => {
|
_ => {
|
||||||
let res = self.checkpoint_internal().await;
|
let res = self.checkpoint_internal().await;
|
||||||
if res.is_ok() {
|
if res.is_ok() {
|
||||||
|
@ -300,7 +313,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn checkpoint_internal(&mut self) -> Result<()> {
|
async fn checkpoint_internal(&mut self) -> Result<()> {
|
||||||
self.check_recent_sync().await?;
|
self.sync().await?;
|
||||||
|
|
||||||
// Check what would be the possible time for a checkpoint in the history we have
|
// Check what would be the possible time for a checkpoint in the history we have
|
||||||
let now = now_msec() as i128;
|
let now = now_msec() as i128;
|
||||||
|
|
|
@ -87,7 +87,9 @@ impl<'a> ExaminedContext<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn noop(self) -> Result<(Response, flow::Transition)> {
|
pub async fn noop(self) -> Result<(Response, flow::Transition)> {
|
||||||
let updates = self.mailbox.sync_update().await?;
|
self.mailbox.mailbox.force_sync().await?;
|
||||||
|
|
||||||
|
let updates = self.mailbox.update().await?;
|
||||||
Ok((
|
Ok((
|
||||||
Response::ok("NOOP completed.")?.with_body(updates),
|
Response::ok("NOOP completed.")?.with_body(updates),
|
||||||
flow::Transition::None,
|
flow::Transition::None,
|
||||||
|
|
|
@ -68,16 +68,10 @@ impl MailboxView {
|
||||||
Ok((new_view, data))
|
Ok((new_view, data))
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Looks up state changes in the mailbox and produces a set of IMAP
|
|
||||||
/// responses describing the changes.
|
|
||||||
pub async fn sync_update(&mut self) -> Result<Vec<Body>> {
|
|
||||||
self.mailbox.sync().await?;
|
|
||||||
|
|
||||||
self.update().await
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Produces a set of IMAP responses describing the change between
|
/// Produces a set of IMAP responses describing the change between
|
||||||
/// what the client knows and what is actually in the mailbox.
|
/// what the client knows and what is actually in the mailbox.
|
||||||
|
/// This does NOT trigger a sync, it bases itself on what is currently
|
||||||
|
/// loaded in RAM by Bayou.
|
||||||
pub async fn update(&mut self) -> Result<Vec<Body>> {
|
pub async fn update(&mut self) -> Result<Vec<Body>> {
|
||||||
let new_view = MailboxView {
|
let new_view = MailboxView {
|
||||||
mailbox: self.mailbox.clone(),
|
mailbox: self.mailbox.clone(),
|
||||||
|
@ -156,6 +150,8 @@ impl MailboxView {
|
||||||
flags: &[Flag],
|
flags: &[Flag],
|
||||||
uid: &bool,
|
uid: &bool,
|
||||||
) -> Result<Vec<Body>> {
|
) -> Result<Vec<Body>> {
|
||||||
|
self.mailbox.opportunistic_sync().await?;
|
||||||
|
|
||||||
if *uid {
|
if *uid {
|
||||||
bail!("UID STORE not implemented");
|
bail!("UID STORE not implemented");
|
||||||
}
|
}
|
||||||
|
@ -181,9 +177,11 @@ impl MailboxView {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn expunge(&mut self) -> Result<Vec<Body>> {
|
pub async fn expunge(&mut self) -> Result<Vec<Body>> {
|
||||||
|
self.mailbox.opportunistic_sync().await?;
|
||||||
|
|
||||||
let deleted_flag = Flag::Deleted.to_string();
|
let deleted_flag = Flag::Deleted.to_string();
|
||||||
let msgs = self
|
let state = self.mailbox.current_uid_index().await;
|
||||||
.known_state
|
let msgs = state
|
||||||
.table
|
.table
|
||||||
.iter()
|
.iter()
|
||||||
.filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag))
|
.filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag))
|
||||||
|
|
|
@ -60,8 +60,14 @@ impl Mailbox {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Sync data with backing store
|
/// Sync data with backing store
|
||||||
pub async fn sync(&self) -> Result<()> {
|
pub async fn force_sync(&self) -> Result<()> {
|
||||||
self.mbox.write().await.sync().await
|
self.mbox.write().await.force_sync().await
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sync data with backing store only if changes are detected
|
||||||
|
/// or last sync is too old
|
||||||
|
pub async fn opportunistic_sync(&self) -> Result<()> {
|
||||||
|
self.mbox.write().await.opportunistic_sync().await
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- Functions for reading the mailbox ----
|
// ---- Functions for reading the mailbox ----
|
||||||
|
@ -184,11 +190,16 @@ struct MailboxInternal {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MailboxInternal {
|
impl MailboxInternal {
|
||||||
async fn sync(&mut self) -> Result<()> {
|
async fn force_sync(&mut self) -> Result<()> {
|
||||||
self.uid_index.sync().await?;
|
self.uid_index.sync().await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn opportunistic_sync(&mut self) -> Result<()> {
|
||||||
|
self.uid_index.opportunistic_sync().await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// ---- Functions for reading the mailbox ----
|
// ---- Functions for reading the mailbox ----
|
||||||
|
|
||||||
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
||||||
|
@ -308,7 +319,8 @@ impl MailboxInternal {
|
||||||
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
|
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
|
||||||
.await?;
|
.await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
}
|
},
|
||||||
|
self.uid_index.opportunistic_sync()
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Add mail to Bayou mail index
|
// Add mail to Bayou mail index
|
||||||
|
@ -357,7 +369,8 @@ impl MailboxInternal {
|
||||||
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
|
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
|
||||||
.await?;
|
.await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
}
|
},
|
||||||
|
self.uid_index.opportunistic_sync()
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Add mail to Bayou mail index
|
// Add mail to Bayou mail index
|
||||||
|
@ -449,6 +462,7 @@ impl MailboxInternal {
|
||||||
.await?;
|
.await?;
|
||||||
Ok::<_, anyhow::Error>(())
|
Ok::<_, anyhow::Error>(())
|
||||||
},
|
},
|
||||||
|
self.uid_index.opportunistic_sync(),
|
||||||
)?;
|
)?;
|
||||||
|
|
||||||
// Add mail to Bayou mail index
|
// Add mail to Bayou mail index
|
||||||
|
|
Loading…
Reference in a new issue