From a2d6efc962dbf5de64a70cf7d9f293534bd5369a Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 10 Jan 2024 11:24:01 +0100 Subject: [PATCH] [broken compilation] update mail internal --- src/imap/command/selected.rs | 9 ++- src/mail/mailbox.rs | 14 ++-- src/mail/uidindex.rs | 132 +++++++++++++++++++++++++++-------- 3 files changed, 115 insertions(+), 40 deletions(-) diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index c38c5d3..ef2654e 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -1,8 +1,8 @@ use std::sync::Arc; use anyhow::Result; -use imap_codec::imap_types::command::{Command, CommandBody}; -use imap_codec::imap_types::core::Charset; +use imap_codec::imap_types::command::{Command, CommandBody, StoreModifier}; +use imap_codec::imap_types::core::{Charset, Atom}; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; @@ -56,8 +56,9 @@ pub async fn dispatch<'a>( kind, response, flags, + modifiers, uid, - } => ctx.store(sequence_set, kind, response, flags, uid).await, + } => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await, CommandBody::Copy { sequence_set, mailbox, @@ -185,8 +186,10 @@ impl<'a> SelectedContext<'a> { kind: &StoreType, response: &StoreResponse, flags: &[Flag<'a>], + modifiers: &[(Atom<'a>, StoreModifier<'a>)], uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + tracing::info!(modifiers=?modifiers); let data = self .mailbox .store(sequence_set, kind, response, flags, uid) diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index aab200b..84fa5af 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -113,7 +113,7 @@ impl Mailbox { msg: IMF<'a>, ident: Option, flags: &[Flag], - ) -> Result<(ImapUidvalidity, ImapUid)> { + ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { self.mbox.write().await.append(msg, ident, flags).await } @@ -271,7 +271,7 @@ impl MailboxInternal { mail: IMF<'_>, ident: Option, flags: &[Flag], - ) -> Result<(ImapUidvalidity, ImapUid)> { + ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> { let ident = ident.unwrap_or_else(gen_ident); let message_key = gen_key(); @@ -312,14 +312,14 @@ impl MailboxInternal { let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec()); let uidvalidity = uid_state.uidvalidity; - let uid = match add_mail_op { - UidIndexOp::MailAdd(_, uid, _) => uid, + let (uid, modseq) = match add_mail_op { + UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq), _ => unreachable!(), }; self.uid_index.push(add_mail_op).await?; - Ok((uidvalidity, uid)) + Ok((uidvalidity, uid, modseq)) } async fn append_from_s3<'a>( @@ -432,7 +432,7 @@ impl MailboxInternal { .table .get(&source_id) .ok_or(anyhow!("Source mail not found"))? - .1 + .2 .clone(); futures::try_join!( @@ -476,7 +476,7 @@ fn dump(uid_index: &Bayou) { "{} {} {}", uid, hex::encode(ident.0), - s.table.get(ident).cloned().unwrap().1.join(", ") + s.table.get(ident).cloned().unwrap().2.join(", ") ); } println!(); diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 01f8c9c..f1c522c 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -6,10 +6,11 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer}; use crate::bayou::*; use crate::mail::unique_ident::UniqueIdent; +pub type ModSeq = NonZeroU32; pub type ImapUid = NonZeroU32; pub type ImapUidvalidity = NonZeroU32; pub type Flag = String; -pub type IndexEntry = (ImapUid, Vec); +pub type IndexEntry = (ImapUid, ModSeq, Vec); /// A UidIndex handles the mutable part of a mailbox /// It is built by running the event log on it @@ -23,28 +24,33 @@ pub struct UidIndex { // Indexes optimized for queries pub idx_by_uid: OrdMap, + pub idx_by_modseq: OrdMap, pub idx_by_flag: FlagIndex, - // Counters + // "Public" Counters pub uidvalidity: ImapUidvalidity, pub uidnext: ImapUid, + pub highestmodseq: ModSeq, + + // "Internal" Counters pub internalseq: ImapUid, + pub internalmodseq: ModSeq, } #[derive(Clone, Serialize, Deserialize, Debug)] pub enum UidIndexOp { - MailAdd(UniqueIdent, ImapUid, Vec), + MailAdd(UniqueIdent, ImapUid, ModSeq, Vec), MailDel(UniqueIdent), - FlagAdd(UniqueIdent, Vec), - FlagDel(UniqueIdent, Vec), - FlagSet(UniqueIdent, Vec), + FlagAdd(UniqueIdent, ModSeq, Vec), + FlagDel(UniqueIdent, ModSeq, Vec), + FlagSet(UniqueIdent, ModSeq, Vec), BumpUidvalidity(u32), } impl UidIndex { #[must_use] pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::MailAdd(ident, self.internalseq, flags) + UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags) } #[must_use] @@ -54,17 +60,17 @@ impl UidIndex { #[must_use] pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagAdd(ident, flags) + UidIndexOp::FlagAdd(ident, self.internalmodseq, flags) } #[must_use] pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagDel(ident, flags) + UidIndexOp::FlagDel(ident, self.internalmodseq, flags) } #[must_use] pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec) -> UidIndexOp { - UidIndexOp::FlagSet(ident, flags) + UidIndexOp::FlagSet(ident, self.internalmodseq, flags) } #[must_use] @@ -74,18 +80,19 @@ impl UidIndex { // INTERNAL functions to keep state consistent - fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, flags: &[Flag]) { + fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) { // Insert the email in our table - self.table.insert(ident, (uid, flags.to_owned())); + self.table.insert(ident, (uid, modseq, flags.to_owned())); // Update the indexes/caches self.idx_by_uid.insert(uid, ident); self.idx_by_flag.insert(uid, flags); + self.idx_by_modseq.insert(modseq, ident); } fn unreg_email(&mut self, ident: &UniqueIdent) { // We do nothing if the mail does not exist - let (uid, flags) = match self.table.get(ident) { + let (uid, modseq, flags) = match self.table.get(ident) { Some(v) => v, None => return, }; @@ -93,6 +100,7 @@ impl UidIndex { // Delete all cache entries self.idx_by_uid.remove(uid); self.idx_by_flag.remove(*uid, flags); + self.idx_by_modseq.remove(modseq); // Remove from source of trust self.table.remove(ident); @@ -103,11 +111,17 @@ impl Default for UidIndex { fn default() -> Self { Self { table: OrdMap::new(), + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), idx_by_flag: FlagIndex::new(), + uidvalidity: NonZeroU32::new(1).unwrap(), uidnext: NonZeroU32::new(1).unwrap(), + highestmodseq: NonZeroU32::new(1).unwrap(), + internalseq: NonZeroU32::new(1).unwrap(), + internalmodseq: NonZeroU32::new(1).unwrap(), } } } @@ -118,17 +132,24 @@ impl BayouState for UidIndex { fn apply(&self, op: &UidIndexOp) -> Self { let mut new = self.clone(); match op { - UidIndexOp::MailAdd(ident, uid, flags) => { - // Change UIDValidity if there is a conflict - if *uid < new.internalseq { + UidIndexOp::MailAdd(ident, uid, modseq, flags) => { + // Change UIDValidity if there is a UID conflict or a MODSEQ conflict + // @FIXME Need to prove that summing work + // The intuition: we increase the UIDValidity by the number of possible conflicts + if *uid < new.internalseq || *modseq < new.highestmodseq { + let bump_uid = new.internalseq.get() - uid.get(); + let bump_modseq = new.internalmodseq.get() - modseq.get(); new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + new.internalseq.get() - uid.get()) + NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq) .unwrap(); } // Assign the real uid of the email let new_uid = new.internalseq; + // Assign the real modseq of the email and its new flags + let new_modseq = new.highestmodseq; + // Delete the previous entry if any. // Our proof has no assumption on `ident` uniqueness, // so we must handle this case even it is very unlikely @@ -137,10 +158,14 @@ impl BayouState for UidIndex { new.unreg_email(ident); // We record our email and update ou caches - new.reg_email(*ident, new_uid, flags); + new.reg_email(*ident, new_uid, new_modseq, flags); // Update counters + new.highestmodseq = new.internalmodseq; + new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); + new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap(); + new.uidnext = new.internalseq; } UidIndexOp::MailDel(ident) => { @@ -150,8 +175,16 @@ impl BayouState for UidIndex { // We update the counter new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); } - UidIndexOp::FlagAdd(ident, new_flags) => { - if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + UidIndexOp::FlagAdd(ident, modseq, new_flags) => { + if let Some((uid, modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *modseq < new.highestmodseq { + let bump_modseq = new.internalmodseq.get() - modseq.get(); + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq) + .unwrap(); + } + // Add flags to the source of trust and the cache let mut to_add: Vec = new_flags .iter() @@ -159,18 +192,38 @@ impl BayouState for UidIndex { .cloned() .collect(); new.idx_by_flag.insert(*uid, &to_add); + new.idx_by_modseq.insert(*modseq, *ident); existing_flags.append(&mut to_add); + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap(); } } - UidIndexOp::FlagDel(ident, rm_flags) => { - if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + UidIndexOp::FlagDel(ident, modseq, rm_flags) => { + if let Some((uid, modseq, existing_flags)) = new.table.get_mut(ident) { + // Bump UIDValidity if required + if *modseq < new.highestmodseq { + let bump_modseq = new.internalmodseq.get() - modseq.get(); + new.uidvalidity = + NonZeroU32::new(new.uidvalidity.get() + bump_modseq) + .unwrap(); + } + // Remove flags from the source of trust and the cache existing_flags.retain(|x| !rm_flags.contains(x)); new.idx_by_flag.remove(*uid, rm_flags); + + // Register that email has been modified + new.idx_by_modseq.insert(*modseq, *ident); + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap(); } } - UidIndexOp::FlagSet(ident, new_flags) => { - if let Some((uid, existing_flags)) = new.table.get_mut(ident) { + UidIndexOp::FlagSet(ident, modseq, new_flags) => { + if let Some((uid, modseq, existing_flags)) = new.table.get_mut(ident) { // Remove flags from the source of trust and the cache let (keep_flags, rm_flags): (Vec, Vec) = existing_flags .iter() @@ -185,6 +238,13 @@ impl BayouState for UidIndex { existing_flags.append(&mut to_add); new.idx_by_flag.remove(*uid, &rm_flags); new.idx_by_flag.insert(*uid, &to_add); + + // Register that email has been modified + new.idx_by_modseq.insert(*modseq, *ident); + + // Update counters + new.highestmodseq = new.internalmodseq; + new.internalmodseq = NonZeroU32::new(new.internalmodseq.get() + 1).unwrap(); } } UidIndexOp::BumpUidvalidity(count) => { @@ -238,10 +298,14 @@ impl FlagIndex { #[derive(Serialize, Deserialize)] struct UidIndexSerializedRepr { - mails: Vec<(ImapUid, UniqueIdent, Vec)>, + mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec)>, + uidvalidity: ImapUidvalidity, uidnext: ImapUid, + highestmodseq: ModSeq, + internalseq: ImapUid, + internalmodseq: ModSeq, } impl<'de> Deserialize<'de> for UidIndex { @@ -253,16 +317,22 @@ impl<'de> Deserialize<'de> for UidIndex { let mut uidindex = UidIndex { table: OrdMap::new(), + idx_by_uid: OrdMap::new(), + idx_by_modseq: OrdMap::new(), idx_by_flag: FlagIndex::new(), + uidvalidity: val.uidvalidity, uidnext: val.uidnext, + highestmodseq: val.highestmodseq, + internalseq: val.internalseq, + internalmodseq: val.internalmodseq, }; val.mails .iter() - .for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f)); + .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags)); Ok(uidindex) } @@ -274,15 +344,17 @@ impl Serialize for UidIndex { S: Serializer, { let mut mails = vec![]; - for (ident, (uid, flags)) in self.table.iter() { - mails.push((*uid, *ident, flags.clone())); + for (ident, (uid, modseq, flags)) in self.table.iter() { + mails.push((*uid, *modseq, *ident, flags.clone())); } let val = UidIndexSerializedRepr { mails, uidvalidity: self.uidvalidity, uidnext: self.uidnext, + highestmodseq: self.highestmodseq, internalseq: self.internalseq, + internalmodseq: self.internalmodseq, }; val.serialize(serializer) @@ -308,7 +380,7 @@ mod tests { // Early checks assert_eq!(state.table.len(), 1); - let (uid, flags) = state.table.get(&m).unwrap(); + let (uid, modseq, flags) = state.table.get(&m).unwrap(); assert_eq!(*uid, NonZeroU32::new(1).unwrap()); assert_eq!(flags.len(), 2); let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); @@ -364,7 +436,7 @@ mod tests { { let m = UniqueIdent([0x03; 24]); let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; - let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), f); + let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU32::new(1).unwrap(), f); state = state.apply(&ev); }