Don't try to checkpoint as often
This commit is contained in:
parent
f3cddcb485
commit
aeec77ae44
2 changed files with 37 additions and 1 deletions
20
src/bayou.rs
20
src/bayou.rs
|
@ -53,6 +53,7 @@ pub struct Bayou<S: BayouState> {
|
||||||
checkpoint: (Timestamp, S),
|
checkpoint: (Timestamp, S),
|
||||||
history: Vec<(Timestamp, S::Op, Option<S>)>,
|
history: Vec<(Timestamp, S::Op, Option<S>)>,
|
||||||
last_sync: Option<Instant>,
|
last_sync: Option<Instant>,
|
||||||
|
last_try_checkpoint: Option<Instant>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: BayouState> Bayou<S> {
|
impl<S: BayouState> Bayou<S> {
|
||||||
|
@ -82,6 +83,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
checkpoint: (Timestamp::zero(), S::default()),
|
checkpoint: (Timestamp::zero(), S::default()),
|
||||||
history: vec![],
|
history: vec![],
|
||||||
last_sync: None,
|
last_sync: None,
|
||||||
|
last_try_checkpoint: None,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -107,6 +109,8 @@ impl<S: BayouState> Bayou<S> {
|
||||||
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
|
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
|
||||||
obj_body.into_async_read().read_to_end(&mut buf).await?;
|
obj_body.into_async_read().read_to_end(&mut buf).await?;
|
||||||
|
|
||||||
|
eprintln!("(sync) checkpoint body length: {}", buf.len());
|
||||||
|
|
||||||
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
||||||
(*ts, Some(ck))
|
(*ts, Some(ck))
|
||||||
}
|
}
|
||||||
|
@ -250,6 +254,8 @@ impl<S: BayouState> Bayou<S> {
|
||||||
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
||||||
self.check_recent_sync().await?;
|
self.check_recent_sync().await?;
|
||||||
|
|
||||||
|
eprintln!("(push) add operation: {:?}", op);
|
||||||
|
|
||||||
let ts = Timestamp::after(
|
let ts = Timestamp::after(
|
||||||
self.history
|
self.history
|
||||||
.last()
|
.last()
|
||||||
|
@ -281,6 +287,19 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
/// Save a new checkpoint if previous checkpoint is too old
|
/// Save a new checkpoint if previous checkpoint is too old
|
||||||
pub async fn checkpoint(&mut self) -> Result<()> {
|
pub async fn checkpoint(&mut self) -> Result<()> {
|
||||||
|
match self.last_try_checkpoint {
|
||||||
|
Some(ts) if Instant::now() - ts < CHECKPOINT_INTERVAL / 10 => Ok(()),
|
||||||
|
_ => {
|
||||||
|
let res = self.checkpoint_internal().await;
|
||||||
|
if res.is_ok() {
|
||||||
|
self.last_try_checkpoint = Some(Instant::now());
|
||||||
|
}
|
||||||
|
res
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn checkpoint_internal(&mut self) -> Result<()> {
|
||||||
self.check_recent_sync().await?;
|
self.check_recent_sync().await?;
|
||||||
|
|
||||||
// Check what would be the possible time for a checkpoint in the history we have
|
// Check what would be the possible time for a checkpoint in the history we have
|
||||||
|
@ -347,6 +366,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Serialize and save checkpoint
|
// Serialize and save checkpoint
|
||||||
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
|
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
|
||||||
|
eprintln!("(cp) checkpoint body length: {}", cryptoblob.len());
|
||||||
|
|
||||||
let mut por = PutObjectRequest::default();
|
let mut por = PutObjectRequest::default();
|
||||||
por.bucket = self.bucket.clone();
|
por.bucket = self.bucket.clone();
|
||||||
|
|
18
src/main.rs
18
src/main.rs
|
@ -47,7 +47,7 @@ async fn do_stuff() -> Result<()> {
|
||||||
dump(&uid_index);
|
dump(&uid_index);
|
||||||
|
|
||||||
let mut rand_id = [0u8; 24];
|
let mut rand_id = [0u8; 24];
|
||||||
rand_id[..8].copy_from_slice(&u64::to_be_bytes(thread_rng().gen()));
|
rand_id[..16].copy_from_slice(&u128::to_be_bytes(thread_rng().gen()));
|
||||||
let add_mail_op = uid_index
|
let add_mail_op = uid_index
|
||||||
.state()
|
.state()
|
||||||
.op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]);
|
.op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]);
|
||||||
|
@ -55,6 +55,22 @@ async fn do_stuff() -> Result<()> {
|
||||||
|
|
||||||
dump(&uid_index);
|
dump(&uid_index);
|
||||||
|
|
||||||
|
if uid_index.state().mails_by_uid.len() > 6 {
|
||||||
|
for i in 0..2 {
|
||||||
|
let (_, uuid) = uid_index
|
||||||
|
.state()
|
||||||
|
.mails_by_uid
|
||||||
|
.iter()
|
||||||
|
.skip(3 + i)
|
||||||
|
.next()
|
||||||
|
.unwrap();
|
||||||
|
let del_mail_op = uid_index.state().op_mail_del(*uuid);
|
||||||
|
uid_index.push(del_mail_op).await?;
|
||||||
|
|
||||||
|
dump(&uid_index);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue