CONDSTORE #71

Merged
quentin merged 21 commits from feat/condstore-try-2 into main 2024-01-15 07:07:07 +00:00
3 changed files with 115 additions and 40 deletions
Showing only changes of commit a2d6efc962 - Show all commits

View file

@ -1,8 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody}; use imap_codec::imap_types::command::{Command, CommandBody, StoreModifier};
use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::core::{Charset, Atom};
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
@ -56,8 +56,9 @@ pub async fn dispatch<'a>(
kind, kind,
response, response,
flags, flags,
modifiers,
uid, uid,
} => ctx.store(sequence_set, kind, response, flags, uid).await, } => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await,
CommandBody::Copy { CommandBody::Copy {
sequence_set, sequence_set,
mailbox, mailbox,
@ -185,8 +186,10 @@ impl<'a> SelectedContext<'a> {
kind: &StoreType, kind: &StoreType,
response: &StoreResponse, response: &StoreResponse,
flags: &[Flag<'a>], flags: &[Flag<'a>],
modifiers: &[(Atom<'a>, StoreModifier<'a>)],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
tracing::info!(modifiers=?modifiers);
let data = self let data = self
.mailbox .mailbox
.store(sequence_set, kind, response, flags, uid) .store(sequence_set, kind, response, flags, uid)

View file

@ -113,7 +113,7 @@ impl Mailbox {
msg: IMF<'a>, msg: IMF<'a>,
ident: Option<UniqueIdent>, ident: Option<UniqueIdent>,
flags: &[Flag], flags: &[Flag],
) -> Result<(ImapUidvalidity, ImapUid)> { ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
self.mbox.write().await.append(msg, ident, flags).await self.mbox.write().await.append(msg, ident, flags).await
} }
@ -271,7 +271,7 @@ impl MailboxInternal {
mail: IMF<'_>, mail: IMF<'_>,
ident: Option<UniqueIdent>, ident: Option<UniqueIdent>,
flags: &[Flag], flags: &[Flag],
) -> Result<(ImapUidvalidity, ImapUid)> { ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
let ident = ident.unwrap_or_else(gen_ident); let ident = ident.unwrap_or_else(gen_ident);
let message_key = gen_key(); let message_key = gen_key();
@ -312,14 +312,14 @@ impl MailboxInternal {
let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec()); let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
let uidvalidity = uid_state.uidvalidity; let uidvalidity = uid_state.uidvalidity;
let uid = match add_mail_op { let (uid, modseq) = match add_mail_op {
UidIndexOp::MailAdd(_, uid, _) => uid, UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq),
_ => unreachable!(), _ => unreachable!(),
}; };
self.uid_index.push(add_mail_op).await?; self.uid_index.push(add_mail_op).await?;
Ok((uidvalidity, uid)) Ok((uidvalidity, uid, modseq))
} }
async fn append_from_s3<'a>( async fn append_from_s3<'a>(
@ -432,7 +432,7 @@ impl MailboxInternal {
.table .table
.get(&source_id) .get(&source_id)
.ok_or(anyhow!("Source mail not found"))? .ok_or(anyhow!("Source mail not found"))?
.1 .2
.clone(); .clone();
futures::try_join!( futures::try_join!(
@ -476,7 +476,7 @@ fn dump(uid_index: &Bayou<UidIndex>) {
"{} {} {}", "{} {} {}",
uid, uid,
hex::encode(ident.0), hex::encode(ident.0),
s.table.get(ident).cloned().unwrap().1.join(", ") s.table.get(ident).cloned().unwrap().2.join(", ")
); );
} }
println!(); println!();

View file

@ -6,10 +6,11 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::bayou::*; use crate::bayou::*;
use crate::mail::unique_ident::UniqueIdent; use crate::mail::unique_ident::UniqueIdent;
pub type ModSeq = NonZeroU32;
pub type ImapUid = NonZeroU32; pub type ImapUid = NonZeroU32;
pub type ImapUidvalidity = NonZeroU32; pub type ImapUidvalidity = NonZeroU32;
pub type Flag = String; pub type Flag = String;
pub type IndexEntry = (ImapUid, Vec<Flag>); pub type IndexEntry = (ImapUid, ModSeq, Vec<Flag>);
/// A UidIndex handles the mutable part of a mailbox /// A UidIndex handles the mutable part of a mailbox
/// It is built by running the event log on it /// It is built by running the event log on it
@ -23,28 +24,33 @@ pub struct UidIndex {
// Indexes optimized for queries // Indexes optimized for queries
pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>, pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>,
pub idx_by_modseq: OrdMap<ModSeq, UniqueIdent>,
pub idx_by_flag: FlagIndex, pub idx_by_flag: FlagIndex,
// Counters // "Public" Counters
pub uidvalidity: ImapUidvalidity, pub uidvalidity: ImapUidvalidity,
pub uidnext: ImapUid, pub uidnext: ImapUid,
pub highestmodseq: ModSeq,
// "Internal" Counters
pub internalseq: ImapUid, pub internalseq: ImapUid,
pub internalmodseq: ModSeq,
} }
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
pub enum UidIndexOp { pub enum UidIndexOp {
MailAdd(UniqueIdent, ImapUid, Vec<Flag>), MailAdd(UniqueIdent, ImapUid, ModSeq, Vec<Flag>),
MailDel(UniqueIdent), MailDel(UniqueIdent),
FlagAdd(UniqueIdent, Vec<Flag>), FlagAdd(UniqueIdent, ModSeq, Vec<Flag>),
FlagDel(UniqueIdent, Vec<Flag>), FlagDel(UniqueIdent, ModSeq, Vec<Flag>),
FlagSet(UniqueIdent, Vec<Flag>), FlagSet(UniqueIdent, ModSeq, Vec<Flag>),
BumpUidvalidity(u32), BumpUidvalidity(u32),
} }
impl UidIndex { impl UidIndex {
#[must_use] #[must_use]
pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::MailAdd(ident, self.internalseq, flags) UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
@ -54,17 +60,17 @@ impl UidIndex {
#[must_use] #[must_use]
pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::FlagAdd(ident, flags) UidIndexOp::FlagAdd(ident, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::FlagDel(ident, flags) UidIndexOp::FlagDel(ident, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::FlagSet(ident, flags) UidIndexOp::FlagSet(ident, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
@ -74,18 +80,19 @@ impl UidIndex {
// INTERNAL functions to keep state consistent // INTERNAL functions to keep state consistent
fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, flags: &[Flag]) { fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) {
// Insert the email in our table // Insert the email in our table
self.table.insert(ident, (uid, flags.to_owned())); self.table.insert(ident, (uid, modseq, flags.to_owned()));
// Update the indexes/caches // Update the indexes/caches
self.idx_by_uid.insert(uid, ident); self.idx_by_uid.insert(uid, ident);
self.idx_by_flag.insert(uid, flags); self.idx_by_flag.insert(uid, flags);
self.idx_by_modseq.insert(modseq, ident);
} }
fn unreg_email(&mut self, ident: &UniqueIdent) { fn unreg_email(&mut self, ident: &UniqueIdent) {
// We do nothing if the mail does not exist // We do nothing if the mail does not exist
let (uid, flags) = match self.table.get(ident) { let (uid, modseq, flags) = match self.table.get(ident) {
Some(v) => v, Some(v) => v,
None => return, None => return,
}; };
@ -93,6 +100,7 @@ impl UidIndex {
// Delete all cache entries // Delete all cache entries
self.idx_by_uid.remove(uid); self.idx_by_uid.remove(uid);
self.idx_by_flag.remove(*uid, flags); self.idx_by_flag.remove(*uid, flags);
self.idx_by_modseq.remove(modseq);
// Remove from source of trust // Remove from source of trust
self.table.remove(ident); self.table.remove(ident);
@ -103,11 +111,17 @@ impl Default for UidIndex {
fn default() -> Self { fn default() -> Self {
Self { Self {
table: OrdMap::new(), table: OrdMap::new(),
idx_by_uid: OrdMap::new(), idx_by_uid: OrdMap::new(),
idx_by_modseq: OrdMap::new(),
idx_by_flag: FlagIndex::new(), idx_by_flag: FlagIndex::new(),
uidvalidity: NonZeroU32::new(1).unwrap(), uidvalidity: NonZeroU32::new(1).unwrap(),
uidnext: NonZeroU32::new(1).unwrap(), uidnext: NonZeroU32::new(1).unwrap(),
highestmodseq: NonZeroU32::new(1).unwrap(),
internalseq: NonZeroU32::new(1).unwrap(), internalseq: NonZeroU32::new(1).unwrap(),
internalmodseq: NonZeroU32::new(1).unwrap(),
} }
} }
} }
@ -118,17 +132,24 @@ impl BayouState for UidIndex {
fn apply(&self, op: &UidIndexOp) -> Self { fn apply(&self, op: &UidIndexOp) -> Self {
let mut new = self.clone(); let mut new = self.clone();
match op { match op {
UidIndexOp::MailAdd(ident, uid, flags) => { UidIndexOp::MailAdd(ident, uid, modseq, flags) => {
// Change UIDValidity if there is a conflict // Change UIDValidity if there is a UID conflict or a MODSEQ conflict
if *uid < new.internalseq { // @FIXME Need to prove that summing work
// The intuition: we increase the UIDValidity by the number of possible conflicts
if *uid < new.internalseq || *modseq < new.highestmodseq {
let bump_uid = new.internalseq.get() - uid.get();
let bump_modseq = new.internalmodseq.get() - modseq.get();
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + new.internalseq.get() - uid.get()) NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq)
.unwrap(); .unwrap();
} }
// Assign the real uid of the email // Assign the real uid of the email
let new_uid = new.internalseq; let new_uid = new.internalseq;
// Assign the real modseq of the email and its new flags
let new_modseq = new.highestmodseq;
// Delete the previous entry if any. // Delete the previous entry if any.
// Our proof has no assumption on `ident` uniqueness, // Our proof has no assumption on `ident` uniqueness,
// so we must handle this case even it is very unlikely // so we must handle this case even it is very unlikely
@ -137,10 +158,14 @@ impl BayouState for UidIndex {
new.unreg_email(ident); new.unreg_email(ident);
// We record our email and update ou caches // We record our email and update ou caches
new.reg_email(*ident, new_uid, flags); new.reg_email(*ident, new_uid, new_modseq, flags);
// Update counters // Update counters
new.highestmodseq = new.internalmodseq;
new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap();
new.uidnext = new.internalseq; new.uidnext = new.internalseq;
} }
UidIndexOp::MailDel(ident) => { UidIndexOp::MailDel(ident) => {
@ -150,8 +175,16 @@ impl BayouState for UidIndex {
// We update the counter // We update the counter
new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
} }
UidIndexOp::FlagAdd(ident, new_flags) => { UidIndexOp::FlagAdd(ident, modseq, new_flags) => {
if let Some((uid, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required
if *modseq < new.highestmodseq {
let bump_modseq = new.internalmodseq.get() - modseq.get();
new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
.unwrap();
}
// Add flags to the source of trust and the cache // Add flags to the source of trust and the cache
let mut to_add: Vec<Flag> = new_flags let mut to_add: Vec<Flag> = new_flags
.iter() .iter()
@ -159,18 +192,38 @@ impl BayouState for UidIndex {
.cloned() .cloned()
.collect(); .collect();
new.idx_by_flag.insert(*uid, &to_add); new.idx_by_flag.insert(*uid, &to_add);
new.idx_by_modseq.insert(*modseq, *ident);
existing_flags.append(&mut to_add); existing_flags.append(&mut to_add);
// Update counters
new.highestmodseq = new.internalmodseq;
new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap();
} }
} }
UidIndexOp::FlagDel(ident, rm_flags) => { UidIndexOp::FlagDel(ident, modseq, rm_flags) => {
if let Some((uid, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required
if *modseq < new.highestmodseq {
let bump_modseq = new.internalmodseq.get() - modseq.get();
new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
.unwrap();
}
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
existing_flags.retain(|x| !rm_flags.contains(x)); existing_flags.retain(|x| !rm_flags.contains(x));
new.idx_by_flag.remove(*uid, rm_flags); new.idx_by_flag.remove(*uid, rm_flags);
// Register that email has been modified
new.idx_by_modseq.insert(*modseq, *ident);
// Update counters
new.highestmodseq = new.internalmodseq;
new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap();
} }
} }
UidIndexOp::FlagSet(ident, new_flags) => { UidIndexOp::FlagSet(ident, modseq, new_flags) => {
if let Some((uid, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, modseq, existing_flags)) = new.table.get_mut(ident) {
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags
.iter() .iter()
@ -185,6 +238,13 @@ impl BayouState for UidIndex {
existing_flags.append(&mut to_add); existing_flags.append(&mut to_add);
new.idx_by_flag.remove(*uid, &rm_flags); new.idx_by_flag.remove(*uid, &rm_flags);
new.idx_by_flag.insert(*uid, &to_add); new.idx_by_flag.insert(*uid, &to_add);
// Register that email has been modified
new.idx_by_modseq.insert(*modseq, *ident);
// Update counters
new.highestmodseq = new.internalmodseq;
new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap();
} }
} }
UidIndexOp::BumpUidvalidity(count) => { UidIndexOp::BumpUidvalidity(count) => {
@ -238,10 +298,14 @@ impl FlagIndex {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct UidIndexSerializedRepr { struct UidIndexSerializedRepr {
mails: Vec<(ImapUid, UniqueIdent, Vec<Flag>)>, mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec<Flag>)>,
uidvalidity: ImapUidvalidity, uidvalidity: ImapUidvalidity,
uidnext: ImapUid, uidnext: ImapUid,
highestmodseq: ModSeq,
internalseq: ImapUid, internalseq: ImapUid,
internalmodseq: ModSeq,
} }
impl<'de> Deserialize<'de> for UidIndex { impl<'de> Deserialize<'de> for UidIndex {
@ -253,16 +317,22 @@ impl<'de> Deserialize<'de> for UidIndex {
let mut uidindex = UidIndex { let mut uidindex = UidIndex {
table: OrdMap::new(), table: OrdMap::new(),
idx_by_uid: OrdMap::new(), idx_by_uid: OrdMap::new(),
idx_by_modseq: OrdMap::new(),
idx_by_flag: FlagIndex::new(), idx_by_flag: FlagIndex::new(),
uidvalidity: val.uidvalidity, uidvalidity: val.uidvalidity,
uidnext: val.uidnext, uidnext: val.uidnext,
highestmodseq: val.highestmodseq,
internalseq: val.internalseq, internalseq: val.internalseq,
internalmodseq: val.internalmodseq,
}; };
val.mails val.mails
.iter() .iter()
.for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f)); .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags));
Ok(uidindex) Ok(uidindex)
} }
@ -274,15 +344,17 @@ impl Serialize for UidIndex {
S: Serializer, S: Serializer,
{ {
let mut mails = vec![]; let mut mails = vec![];
for (ident, (uid, flags)) in self.table.iter() { for (ident, (uid, modseq, flags)) in self.table.iter() {
mails.push((*uid, *ident, flags.clone())); mails.push((*uid, *modseq, *ident, flags.clone()));
} }
let val = UidIndexSerializedRepr { let val = UidIndexSerializedRepr {
mails, mails,
uidvalidity: self.uidvalidity, uidvalidity: self.uidvalidity,
uidnext: self.uidnext, uidnext: self.uidnext,
highestmodseq: self.highestmodseq,
internalseq: self.internalseq, internalseq: self.internalseq,
internalmodseq: self.internalmodseq,
}; };
val.serialize(serializer) val.serialize(serializer)
@ -308,7 +380,7 @@ mod tests {
// Early checks // Early checks
assert_eq!(state.table.len(), 1); assert_eq!(state.table.len(), 1);
let (uid, flags) = state.table.get(&m).unwrap(); let (uid, modseq, flags) = state.table.get(&m).unwrap();
assert_eq!(*uid, NonZeroU32::new(1).unwrap()); assert_eq!(*uid, NonZeroU32::new(1).unwrap());
assert_eq!(flags.len(), 2); assert_eq!(flags.len(), 2);
let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap();
@ -364,7 +436,7 @@ mod tests {
{ {
let m = UniqueIdent([0x03; 24]); let m = UniqueIdent([0x03; 24]);
let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), f); let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), f);
state = state.apply(&ev); state = state.apply(&ev);
} }