sync() and push() seems to be working, todo checkpoint()
This commit is contained in:
parent
cfc02ba368
commit
c8be884ad5
6 changed files with 294 additions and 28 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
@ -1,2 +1,3 @@
|
|||
/target
|
||||
.vimrc
|
||||
test.sh
|
||||
|
|
17
Cargo.lock
generated
17
Cargo.lock
generated
|
@ -178,6 +178,12 @@ dependencies = [
|
|||
"signature",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "either"
|
||||
version = "1.6.1"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "e78d4f1cc4ae33bbfc157ed5d5a5ef3bc29227303d595861deb238fcec4e9457"
|
||||
|
||||
[[package]]
|
||||
name = "fastrand"
|
||||
version = "1.7.0"
|
||||
|
@ -472,6 +478,15 @@ dependencies = [
|
|||
"cfg-if",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itertools"
|
||||
version = "0.10.3"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "a9a9d19fa1e79b6215ff29b9d6880b706147f16e9b1dbb1e4e5947b5b02bc5e3"
|
||||
dependencies = [
|
||||
"either",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "itoa"
|
||||
version = "1.0.2"
|
||||
|
@ -541,8 +556,10 @@ name = "mailrage"
|
|||
version = "0.0.1"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64",
|
||||
"hex",
|
||||
"im",
|
||||
"itertools",
|
||||
"k2v-client",
|
||||
"rand",
|
||||
"rmp-serde",
|
||||
|
|
|
@ -8,8 +8,10 @@ description = "Encrypted mail storage over Garage"
|
|||
|
||||
[dependencies]
|
||||
anyhow = "1.0.28"
|
||||
base64 = "0.13"
|
||||
hex = "0.4"
|
||||
im = "15"
|
||||
itertools = "0.10"
|
||||
rusoto_core = "0.48.0"
|
||||
rusoto_credential = "0.48.0"
|
||||
rusoto_s3 = "0.48"
|
||||
|
|
236
src/bayou.rs
236
src/bayou.rs
|
@ -1,22 +1,31 @@
|
|||
use std::time::Duration;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{anyhow, bail, Result};
|
||||
use rand::prelude::*;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::io::AsyncReadExt;
|
||||
|
||||
use k2v_client::K2vClient;
|
||||
use k2v_client::{BatchReadOp, Filter, K2vClient, K2vValue};
|
||||
use rusoto_core::HttpClient;
|
||||
use rusoto_credential::{AwsCredentials, StaticProvider};
|
||||
use rusoto_s3::S3Client;
|
||||
use rusoto_s3::{GetObjectRequest, ListObjectsV2Request, S3Client, S3};
|
||||
use rusoto_signature::Region;
|
||||
|
||||
use crate::cryptoblob::Key;
|
||||
use crate::cryptoblob::*;
|
||||
use crate::time::now_msec;
|
||||
|
||||
const SAVE_STATE_EVERY: usize = 64;
|
||||
|
||||
// Checkpointing interval constants: a checkpoint is not made earlier
|
||||
// than CHECKPOINT_INTERVAL time after the last one, and is not made
|
||||
// if there are less than CHECKPOINT_MIN_OPS new operations since last one.
|
||||
const CHECKPOINT_INTERVAL: Duration = Duration::from_secs(3600);
|
||||
const CHECKPOINT_MIN_OPS: usize = 16;
|
||||
|
||||
pub trait BayouState:
|
||||
Default + Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static
|
||||
{
|
||||
type Op: Clone + Serialize + for<'de> Deserialize<'de> + Send + Sync + 'static;
|
||||
type Op: Clone + Serialize + for<'de> Deserialize<'de> + std::fmt::Debug + Send + Sync + 'static;
|
||||
|
||||
fn apply(&self, op: &Self::Op) -> Self;
|
||||
}
|
||||
|
@ -31,6 +40,7 @@ pub struct Bayou<S: BayouState> {
|
|||
|
||||
checkpoint: (Timestamp, S),
|
||||
history: Vec<(Timestamp, S::Op, Option<S>)>,
|
||||
last_sync: Option<Instant>,
|
||||
}
|
||||
|
||||
impl<S: BayouState> Bayou<S> {
|
||||
|
@ -59,23 +69,231 @@ impl<S: BayouState> Bayou<S> {
|
|||
s3: s3_client,
|
||||
checkpoint: (Timestamp::zero(), S::default()),
|
||||
history: vec![],
|
||||
last_sync: None,
|
||||
})
|
||||
}
|
||||
|
||||
/// Re-reads the state from persistent storage backend
|
||||
pub async fn sync(&mut self) -> Result<()> {
|
||||
// 1. List checkpoints
|
||||
let prefix = format!("{}/checkpoint/", self.path);
|
||||
|
||||
let mut lor = ListObjectsV2Request::default();
|
||||
lor.bucket = self.bucket.clone();
|
||||
lor.max_keys = Some(1000);
|
||||
lor.prefix = Some(prefix.clone());
|
||||
|
||||
let checkpoints_res = self.s3.list_objects_v2(lor).await?;
|
||||
|
||||
let mut checkpoints = vec![];
|
||||
for object in checkpoints_res.contents.unwrap_or_default() {
|
||||
if let Some(key) = object.key {
|
||||
if let Some(ckid) = key.strip_prefix(&prefix) {
|
||||
if let Some(ts) = Timestamp::parse(ckid) {
|
||||
checkpoints.push((ts, key));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
checkpoints.sort_by_key(|(ts, _)| *ts);
|
||||
eprintln!("(sync) listed checkpoints: {:?}", checkpoints);
|
||||
|
||||
// 2. Load last checkpoint if different from currently used one
|
||||
let checkpoint = if let Some((ts, key)) = checkpoints.last() {
|
||||
if *ts == self.checkpoint.0 {
|
||||
(*ts, None)
|
||||
} else {
|
||||
eprintln!("(sync) loading checkpoint: {}", key);
|
||||
|
||||
let mut gor = GetObjectRequest::default();
|
||||
gor.bucket = self.bucket.clone();
|
||||
gor.key = key.to_string();
|
||||
let obj_res = self.s3.get_object(gor).await?;
|
||||
|
||||
let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
|
||||
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
|
||||
obj_body.into_async_read().read_to_end(&mut buf).await?;
|
||||
|
||||
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
||||
(*ts, Some(ck))
|
||||
}
|
||||
} else {
|
||||
(Timestamp::zero(), None)
|
||||
};
|
||||
|
||||
if self.checkpoint.0 > checkpoint.0 {
|
||||
bail!("Existing checkpoint is more recent than stored one");
|
||||
}
|
||||
|
||||
if let Some(ck) = checkpoint.1 {
|
||||
eprintln!(
|
||||
"(sync) updating checkpoint to loaded state at {:?}",
|
||||
checkpoint.0
|
||||
);
|
||||
self.checkpoint = (checkpoint.0, ck);
|
||||
};
|
||||
|
||||
// remove from history events before checkpoint
|
||||
self.history = std::mem::take(&mut self.history)
|
||||
.into_iter()
|
||||
.skip_while(|(ts, _, _)| *ts < self.checkpoint.0)
|
||||
.collect();
|
||||
|
||||
// 3. List all operations starting from checkpoint
|
||||
let ts_ser = self.checkpoint.0.serialize();
|
||||
eprintln!("(sync) looking up operations starting at {}", ts_ser);
|
||||
let ops_map = self
|
||||
.k2v
|
||||
.read_batch(&[BatchReadOp {
|
||||
partition_key: &self.path,
|
||||
filter: Filter {
|
||||
start: Some(&ts_ser),
|
||||
end: None,
|
||||
prefix: None,
|
||||
limit: None,
|
||||
reverse: false,
|
||||
},
|
||||
single_item: false,
|
||||
conflicts_only: false,
|
||||
include_tombstones: false,
|
||||
}])
|
||||
.await?
|
||||
.into_iter()
|
||||
.next()
|
||||
.ok_or(anyhow!("Missing K2V result"))?
|
||||
.items;
|
||||
|
||||
let mut ops = vec![];
|
||||
for (tsstr, val) in ops_map {
|
||||
let ts = Timestamp::parse(&tsstr)
|
||||
.ok_or(anyhow!("Invalid operation timestamp: {}", tsstr))?;
|
||||
if val.value.len() != 1 {
|
||||
bail!("Invalid operation, has {} values", val.value.len());
|
||||
}
|
||||
match &val.value[0] {
|
||||
K2vValue::Value(v) => {
|
||||
let op = open_deserialize::<S::Op>(&v, &self.key)?;
|
||||
eprintln!("(sync) operation {}: {} {:?}", tsstr, base64::encode(v), op);
|
||||
ops.push((ts, op));
|
||||
}
|
||||
K2vValue::Tombstone => {
|
||||
unreachable!();
|
||||
}
|
||||
}
|
||||
}
|
||||
ops.sort_by_key(|(ts, _)| *ts);
|
||||
eprintln!("(sync) {} operations", ops.len());
|
||||
|
||||
// if no operations, clean up and return now
|
||||
if ops.is_empty() {
|
||||
self.history.clear();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// 4. Check that first operation has same timestamp as checkpoint (if not zero)
|
||||
if self.checkpoint.0 != Timestamp::zero() && ops[0].0 != self.checkpoint.0 {
|
||||
bail!(
|
||||
"First operation in listing doesn't have timestamp that corresponds to checkpoint"
|
||||
);
|
||||
}
|
||||
|
||||
// 5. Apply all operations in order
|
||||
unimplemented!()
|
||||
// Hypothesis: before the loaded checkpoint, operations haven't changed
|
||||
// between what's on storage and what we used to calculate the state in RAM here.
|
||||
let i0 = self
|
||||
.history
|
||||
.iter()
|
||||
.enumerate()
|
||||
.zip(ops.iter())
|
||||
.skip_while(|((i, (ts1, _, _)), (ts2, _))| ts1 == ts2)
|
||||
.map(|((i, _), _)| i)
|
||||
.next()
|
||||
.unwrap_or(self.history.len());
|
||||
|
||||
if ops.len() > i0 {
|
||||
// Remove operations from first position where histories differ
|
||||
self.history.truncate(i0);
|
||||
|
||||
// Look up last calculated state which we have saved and start from there.
|
||||
let mut last_state = (0, &self.checkpoint.1);
|
||||
for (i, (_, _, state_opt)) in self.history.iter().enumerate().rev() {
|
||||
if let Some(state) = state_opt {
|
||||
last_state = (i + 1, state);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate state at the end of this common part of the history
|
||||
let mut state = last_state.1.clone();
|
||||
for (_, op, _) in self.history[last_state.0..].iter() {
|
||||
state = state.apply(op);
|
||||
}
|
||||
|
||||
// Now, apply all operations retrieved from storage after the common part
|
||||
for (ts, op) in ops.drain(i0..) {
|
||||
state = state.apply(&op);
|
||||
if (self.history.len() + 1) % SAVE_STATE_EVERY == 0 {
|
||||
self.history.push((ts, op, Some(state.clone())));
|
||||
} else {
|
||||
self.history.push((ts, op, None));
|
||||
}
|
||||
}
|
||||
|
||||
// Always save final state as result of last operation
|
||||
self.history.last_mut().unwrap().2 = Some(state);
|
||||
}
|
||||
|
||||
self.last_sync = Some(Instant::now());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn check_recent_sync(&mut self) -> Result<()> {
|
||||
match self.last_sync {
|
||||
Some(t) if (Instant::now() - t) < CHECKPOINT_INTERVAL / 10 => Ok(()),
|
||||
_ => self.sync().await,
|
||||
}
|
||||
}
|
||||
|
||||
/// Applies a new operation on the state. Once this function returns,
|
||||
/// the option has been safely persisted to storage backend
|
||||
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
||||
unimplemented!()
|
||||
self.check_recent_sync().await?;
|
||||
|
||||
let ts = Timestamp::after(
|
||||
self.history
|
||||
.last()
|
||||
.map(|(ts, _, _)| ts)
|
||||
.unwrap_or(&self.checkpoint.0),
|
||||
);
|
||||
self.k2v
|
||||
.insert_item(
|
||||
&self.path,
|
||||
&ts.serialize(),
|
||||
seal_serialize(&op, &self.key)?,
|
||||
None,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let new_state = self.state().apply(&op);
|
||||
self.history.push((ts, op, Some(new_state)));
|
||||
|
||||
// Clear previously saved state in history if not required
|
||||
let hlen = self.history.len();
|
||||
if hlen >= 2 && (hlen - 1) % SAVE_STATE_EVERY != 0 {
|
||||
self.history[hlen - 2].2 = None;
|
||||
}
|
||||
|
||||
self.checkpoint().await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Save a new checkpoint if previous checkpoint is too old
|
||||
pub async fn checkpoint(&mut self) -> Result<()> {
|
||||
self.check_recent_sync().await?;
|
||||
|
||||
eprintln!("Mock checkpointing, not implemented");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn state(&self) -> &S {
|
||||
|
@ -87,7 +305,7 @@ impl<S: BayouState> Bayou<S> {
|
|||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
|
||||
#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Debug)]
|
||||
pub struct Timestamp {
|
||||
pub msec: u64,
|
||||
pub rand: u64,
|
||||
|
|
50
src/main.rs
50
src/main.rs
|
@ -1,13 +1,14 @@
|
|||
use anyhow::Result;
|
||||
|
||||
use rusoto_credential::{EnvironmentProvider, ProvideAwsCredentials};
|
||||
use rusoto_signature::Region;
|
||||
|
||||
mod bayou;
|
||||
mod cryptoblob;
|
||||
mod time;
|
||||
mod uidindex;
|
||||
|
||||
use anyhow::Result;
|
||||
|
||||
use rand::prelude::*;
|
||||
use rusoto_credential::{EnvironmentProvider, ProvideAwsCredentials};
|
||||
use rusoto_signature::Region;
|
||||
|
||||
use bayou::*;
|
||||
use cryptoblob::Key;
|
||||
use uidindex::*;
|
||||
|
@ -32,21 +33,48 @@ async fn do_stuff() -> Result<()> {
|
|||
|
||||
let key = Key::from_slice(&[0u8; 32]).unwrap();
|
||||
|
||||
let mut mail_index = Bayou::<UidIndex>::new(
|
||||
let mut uid_index = Bayou::<UidIndex>::new(
|
||||
creds,
|
||||
k2v_region,
|
||||
s3_region,
|
||||
"alex".into(),
|
||||
"mail".into(),
|
||||
"TestMailbox".into(),
|
||||
key,
|
||||
)?;
|
||||
|
||||
mail_index.sync().await?;
|
||||
uid_index.sync().await?;
|
||||
|
||||
let add_mail_op = mail_index
|
||||
dump(&uid_index);
|
||||
|
||||
let mut rand_id = [0u8; 24];
|
||||
rand_id[..8].copy_from_slice(&u64::to_be_bytes(thread_rng().gen()));
|
||||
let add_mail_op = uid_index
|
||||
.state()
|
||||
.op_mail_add(MailUuid([0xFFu8; 24]), vec!["\\Unseen".into()]);
|
||||
mail_index.push(add_mail_op).await?;
|
||||
.op_mail_add(MailUuid(rand_id), vec!["\\Unseen".into()]);
|
||||
uid_index.push(add_mail_op).await?;
|
||||
|
||||
dump(&uid_index);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn dump(uid_index: &Bayou<UidIndex>) {
|
||||
let s = uid_index.state();
|
||||
println!("---- MAILBOX STATE ----");
|
||||
println!("UIDVALIDITY {}", s.uidvalidity);
|
||||
println!("UIDNEXT {}", s.uidnext);
|
||||
println!("INTERNALSEQ {}", s.internalseq);
|
||||
for (uid, uuid) in s.mails_by_uid.iter() {
|
||||
println!(
|
||||
"{} {} {}",
|
||||
uid,
|
||||
hex::encode(uuid.0),
|
||||
s.mail_flags
|
||||
.get(uuid)
|
||||
.cloned()
|
||||
.unwrap_or_default()
|
||||
.join(", ")
|
||||
);
|
||||
}
|
||||
println!("");
|
||||
}
|
||||
|
|
|
@ -9,22 +9,22 @@ type ImapUidvalidity = u32;
|
|||
/// A Mail UUID is composed of two components:
|
||||
/// - a process identifier, 128 bits
|
||||
/// - a sequence number, 64 bits
|
||||
#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq)]
|
||||
#[derive(Clone, Copy, PartialOrd, Ord, PartialEq, Eq, Debug)]
|
||||
pub struct MailUuid(pub [u8; 24]);
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct UidIndex {
|
||||
mail_uid: OrdMap<MailUuid, ImapUid>,
|
||||
mail_flags: OrdMap<MailUuid, Vec<String>>,
|
||||
pub mail_uid: OrdMap<MailUuid, ImapUid>,
|
||||
pub mail_flags: OrdMap<MailUuid, Vec<String>>,
|
||||
|
||||
mails_by_uid: OrdMap<ImapUid, MailUuid>,
|
||||
pub mails_by_uid: OrdMap<ImapUid, MailUuid>,
|
||||
|
||||
uidvalidity: ImapUidvalidity,
|
||||
uidnext: ImapUid,
|
||||
internalseq: ImapUid,
|
||||
pub uidvalidity: ImapUidvalidity,
|
||||
pub uidnext: ImapUid,
|
||||
pub internalseq: ImapUid,
|
||||
}
|
||||
|
||||
#[derive(Clone, Serialize, Deserialize)]
|
||||
#[derive(Clone, Serialize, Deserialize, Debug)]
|
||||
pub enum UidIndexOp {
|
||||
MailAdd(MailUuid, ImapUid, Vec<String>),
|
||||
MailDel(MailUuid),
|
||||
|
|
Loading…
Add table
Reference in a new issue