Merge pull request 'CONDSTORE' (#71) from feat/condstore-try-2 into main

Reviewed-on: #71
This commit is contained in:
Quentin 2024-01-15 07:07:06 +00:00
commit 55e26d24a0
18 changed files with 801 additions and 258 deletions

10
Cargo.lock generated
View file

@ -1806,8 +1806,8 @@ dependencies = [
[[package]] [[package]]
name = "imap-codec" name = "imap-codec"
version = "1.0.0" version = "2.0.0"
source = "git+https://github.com/duesee/imap-codec?branch=v2#1f490146bb6197eee6032205e3aa7f297efd9b39" source = "git+https://github.com/superboum/imap-codec?branch=custom/aerogramme#b0a80c4826f3d8bf2d2e69f68443c261e62bb40f"
dependencies = [ dependencies = [
"abnf-core", "abnf-core",
"base64 0.21.5", "base64 0.21.5",
@ -1822,7 +1822,7 @@ dependencies = [
[[package]] [[package]]
name = "imap-flow" name = "imap-flow"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/duesee/imap-flow.git?rev=e45ce7bb6ab6bda3c71a0c7b05e9b558a5902e90#e45ce7bb6ab6bda3c71a0c7b05e9b558a5902e90" source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#0f548a2070aace09f9f9a0b6ef221efefb8b110b"
dependencies = [ dependencies = [
"bounded-static", "bounded-static",
"bytes", "bytes",
@ -1833,8 +1833,8 @@ dependencies = [
[[package]] [[package]]
name = "imap-types" name = "imap-types"
version = "1.0.0" version = "2.0.0"
source = "git+https://github.com/duesee/imap-codec?branch=v2#1f490146bb6197eee6032205e3aa7f297efd9b39" source = "git+https://github.com/superboum/imap-codec?branch=custom/aerogramme#b0a80c4826f3d8bf2d2e69f68443c261e62bb40f"
dependencies = [ dependencies = [
"base64 0.21.5", "base64 0.21.5",
"bounded-static", "bounded-static",

View file

@ -58,14 +58,14 @@ aws-sdk-s3 = "1.9.0"
eml-codec = { git = "https://git.deuxfleurs.fr/Deuxfleurs/eml-codec.git", branch = "main" } eml-codec = { git = "https://git.deuxfleurs.fr/Deuxfleurs/eml-codec.git", branch = "main" }
smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
imap-codec = { version = "1.0.0", features = ["bounded-static", "ext_condstore_qresync"] } imap-codec = { version = "2.0.0", features = ["bounded-static", "ext_condstore_qresync"] }
imap-flow = { git = "https://github.com/duesee/imap-flow.git", rev = "e45ce7bb6ab6bda3c71a0c7b05e9b558a5902e90" } imap-flow = { git = "https://github.com/superboum/imap-flow.git", branch = "custom/aerogramme" }
[dev-dependencies] [dev-dependencies]
[patch.crates-io] [patch.crates-io]
imap-types = { git = "https://github.com/duesee/imap-codec", branch = "v2" } imap-types = { git = "https://github.com/superboum/imap-codec", branch = "custom/aerogramme" }
imap-codec = { git = "https://github.com/duesee/imap-codec", branch = "v2" } imap-codec = { git = "https://github.com/superboum/imap-codec", branch = "custom/aerogramme" }
[[test]] [[test]]
name = "behavior" name = "behavior"

View file

@ -1,4 +1,5 @@
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section}; use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
use imap_codec::imap_types::command::FetchModifier;
/// Internal decisions based on fetched attributes /// Internal decisions based on fetched attributes
/// passed by the client /// passed by the client
@ -7,7 +8,7 @@ pub struct AttributesProxy {
pub attrs: Vec<MessageDataItemName<'static>>, pub attrs: Vec<MessageDataItemName<'static>>,
} }
impl AttributesProxy { impl AttributesProxy {
pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self { pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self {
// Expand macros // Expand macros
let mut fetch_attrs = match attrs { let mut fetch_attrs = match attrs {
MacroOrMessageDataItemNames::Macro(m) => { MacroOrMessageDataItemNames::Macro(m) => {
@ -31,9 +32,23 @@ impl AttributesProxy {
fetch_attrs.push(MessageDataItemName::Uid); fetch_attrs.push(MessageDataItemName::Uid);
} }
// Handle inferred MODSEQ tag
let is_changed_since = modifiers
.iter()
.any(|m| matches!(m, FetchModifier::ChangedSince(..)));
if is_changed_since && !fetch_attrs.contains(&MessageDataItemName::ModSeq) {
fetch_attrs.push(MessageDataItemName::ModSeq);
}
Self { attrs: fetch_attrs } Self { attrs: fetch_attrs }
} }
pub fn is_enabling_condstore(&self) -> bool {
self.attrs.iter().any(|x| {
matches!(x, MessageDataItemName::ModSeq)
})
}
pub fn need_body(&self) -> bool { pub fn need_body(&self) -> bool {
self.attrs.iter().any(|x| { self.attrs.iter().any(|x| {
match x { match x {

View file

@ -1,8 +1,11 @@
use imap_codec::imap_types::command::{FetchModifier, StoreModifier, SelectExamineModifier};
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
use imap_codec::imap_types::response::Capability; use imap_codec::imap_types::response::Capability;
use std::collections::HashSet; use std::collections::HashSet;
use crate::imap::attributes::AttributesProxy;
fn capability_unselect() -> Capability<'static> { fn capability_unselect() -> Capability<'static> {
Capability::try_from("UNSELECT").unwrap() Capability::try_from("UNSELECT").unwrap()
} }
@ -11,9 +14,11 @@ fn capability_condstore() -> Capability<'static> {
Capability::try_from("CONDSTORE").unwrap() Capability::try_from("CONDSTORE").unwrap()
} }
/*
fn capability_qresync() -> Capability<'static> { fn capability_qresync() -> Capability<'static> {
Capability::try_from("QRESYNC").unwrap() Capability::try_from("QRESYNC").unwrap()
} }
*/
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct ServerCapability(HashSet<Capability<'static>>); pub struct ServerCapability(HashSet<Capability<'static>>);
@ -26,7 +31,7 @@ impl Default for ServerCapability {
Capability::Move, Capability::Move,
Capability::LiteralPlus, Capability::LiteralPlus,
capability_unselect(), capability_unselect(),
//capability_condstore(), capability_condstore(),
//capability_qresync(), //capability_qresync(),
])) ]))
} }
@ -48,15 +53,29 @@ impl ServerCapability {
} }
} }
enum ClientStatus { #[derive(Clone)]
pub enum ClientStatus {
NotSupportedByServer, NotSupportedByServer,
Disabled, Disabled,
Enabled, Enabled,
} }
impl ClientStatus {
pub fn is_enabled(&self) -> bool {
matches!(self, Self::Enabled)
}
pub fn enable(&self) -> Self {
match self {
Self::Disabled => Self::Enabled,
other => other.clone(),
}
}
}
pub struct ClientCapability { pub struct ClientCapability {
condstore: ClientStatus, pub condstore: ClientStatus,
utf8kind: Option<Utf8Kind>, pub utf8kind: Option<Utf8Kind>,
} }
impl ClientCapability { impl ClientCapability {
@ -70,6 +89,36 @@ impl ClientCapability {
} }
} }
pub fn enable_condstore(&mut self) {
self.condstore = self.condstore.enable();
}
pub fn attributes_enable(&mut self, ap: &AttributesProxy) {
if ap.is_enabling_condstore() {
self.enable_condstore()
}
}
pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) {
if mods.iter().any(|x| matches!(x, FetchModifier::ChangedSince(..))) {
self.enable_condstore()
}
}
pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) {
if mods.iter().any(|x| matches!(x, StoreModifier::UnchangedSince(..))) {
self.enable_condstore()
}
}
pub fn select_enable(&mut self, mods: &[SelectExamineModifier]) {
for m in mods.iter() {
match m {
SelectExamineModifier::Condstore => self.enable_condstore(),
}
}
}
pub fn try_enable( pub fn try_enable(
&mut self, &mut self,
caps: &[CapabilityEnable<'static>], caps: &[CapabilityEnable<'static>],

View file

@ -2,7 +2,7 @@ use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::{Command, CommandBody}; use imap_codec::imap_types::command::{Command, CommandBody, SelectExamineModifier};
use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar}; use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar};
use imap_codec::imap_types::datetime::DateTime; use imap_codec::imap_types::datetime::DateTime;
use imap_codec::imap_types::extensions::enable::CapabilityEnable; use imap_codec::imap_types::extensions::enable::CapabilityEnable;
@ -58,8 +58,8 @@ pub async fn dispatch<'a>(
} => ctx.status(mailbox, item_names).await, } => ctx.status(mailbox, item_names).await,
CommandBody::Subscribe { mailbox } => ctx.subscribe(mailbox).await, CommandBody::Subscribe { mailbox } => ctx.subscribe(mailbox).await,
CommandBody::Unsubscribe { mailbox } => ctx.unsubscribe(mailbox).await, CommandBody::Unsubscribe { mailbox } => ctx.unsubscribe(mailbox).await,
CommandBody::Select { mailbox } => ctx.select(mailbox).await, CommandBody::Select { mailbox, modifiers } => ctx.select(mailbox, modifiers).await,
CommandBody::Examine { mailbox } => ctx.examine(mailbox).await, CommandBody::Examine { mailbox, modifiers } => ctx.examine(mailbox, modifiers).await,
CommandBody::Append { CommandBody::Append {
mailbox, mailbox,
flags, flags,
@ -292,7 +292,7 @@ impl<'a> AuthenticatedContext<'a> {
} }
}; };
let view = MailboxView::new(mb).await; let view = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await;
let mut ret_attrs = vec![]; let mut ret_attrs = vec![];
for attr in attributes.iter() { for attr in attributes.iter() {
@ -311,8 +311,9 @@ impl<'a> AuthenticatedContext<'a> {
bail!("quota not implemented, can't return freed storage after EXPUNGE will be run"); bail!("quota not implemented, can't return freed storage after EXPUNGE will be run");
}, },
StatusDataItemName::HighestModSeq => { StatusDataItemName::HighestModSeq => {
bail!("highestmodseq not yet implemented"); self.client_capabilities.enable_condstore();
} StatusDataItem::HighestModSeq(view.highestmodseq().get())
},
}); });
} }
@ -404,6 +405,7 @@ impl<'a> AuthenticatedContext<'a> {
it is therefore correct to not return it even if there are unseen messages it is therefore correct to not return it even if there are unseen messages
RFC9051 (imap4rev2) says that OK [UNSEEN] responses are deprecated after SELECT and EXAMINE RFC9051 (imap4rev2) says that OK [UNSEEN] responses are deprecated after SELECT and EXAMINE
For Aerogramme, we just don't send the OK [UNSEEN], it's correct to do in both specifications. For Aerogramme, we just don't send the OK [UNSEEN], it's correct to do in both specifications.
20 select "INBOX.achats" 20 select "INBOX.achats"
* FLAGS (\Answered \Flagged \Deleted \Seen \Draft $Forwarded JUNK $label1) * FLAGS (\Answered \Flagged \Deleted \Seen \Draft $Forwarded JUNK $label1)
@ -420,7 +422,10 @@ impl<'a> AuthenticatedContext<'a> {
async fn select( async fn select(
self, self,
mailbox: &MailboxCodec<'a>, mailbox: &MailboxCodec<'a>,
modifiers: &[SelectExamineModifier],
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
self.client_capabilities.select_enable(modifiers);
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?; let mb_opt = self.user.open_mailbox(&name).await?;
@ -438,7 +443,7 @@ impl<'a> AuthenticatedContext<'a> {
}; };
tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.selected"); tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.selected");
let mb = MailboxView::new(mb).await; let mb = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await;
let data = mb.summary()?; let data = mb.summary()?;
Ok(( Ok((
@ -455,7 +460,10 @@ impl<'a> AuthenticatedContext<'a> {
async fn examine( async fn examine(
self, self,
mailbox: &MailboxCodec<'a>, mailbox: &MailboxCodec<'a>,
modifiers: &[SelectExamineModifier],
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
self.client_capabilities.select_enable(modifiers);
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?; let mb_opt = self.user.open_mailbox(&name).await?;
@ -473,7 +481,7 @@ impl<'a> AuthenticatedContext<'a> {
}; };
tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.examined"); tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.examined");
let mb = MailboxView::new(mb).await; let mb = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await;
let data = mb.summary()?; let data = mb.summary()?;
Ok(( Ok((
@ -496,7 +504,7 @@ impl<'a> AuthenticatedContext<'a> {
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let append_tag = self.req.tag.clone(); let append_tag = self.req.tag.clone();
match self.append_internal(mailbox, flags, date, message).await { match self.append_internal(mailbox, flags, date, message).await {
Ok((_mb, uidvalidity, uid)) => Ok(( Ok((_mb, uidvalidity, uid, _modseq)) => Ok((
Response::build() Response::build()
.tag(append_tag) .tag(append_tag)
.message("APPEND completed") .message("APPEND completed")
@ -537,7 +545,7 @@ impl<'a> AuthenticatedContext<'a> {
flags: &[Flag<'a>], flags: &[Flag<'a>],
date: &Option<DateTime>, date: &Option<DateTime>,
message: &Literal<'a>, message: &Literal<'a>,
) -> Result<(Arc<Mailbox>, ImapUidvalidity, ImapUidvalidity)> { ) -> Result<(Arc<Mailbox>, ImapUidvalidity, ImapUid, ModSeq)> {
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?; let mb_opt = self.user.open_mailbox(&name).await?;
@ -555,9 +563,9 @@ impl<'a> AuthenticatedContext<'a> {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
// TODO: filter allowed flags? ping @Quentin // TODO: filter allowed flags? ping @Quentin
let (uidvalidity, uid) = mb.append(msg, None, &flags[..]).await?; let (uidvalidity, uid, modseq) = mb.append(msg, None, &flags[..]).await?;
Ok((mb, uidvalidity, uid)) Ok((mb, uidvalidity, uid, modseq))
} }
} }

View file

@ -1,16 +1,18 @@
use std::sync::Arc; use std::sync::Arc;
use std::num::NonZeroU64;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier};
use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated}; use crate::imap::command::{anystate, authenticated};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response; use crate::imap::response::Response;
use crate::mail::user::User; use crate::mail::user::User;
@ -37,8 +39,9 @@ pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, fl
CommandBody::Fetch { CommandBody::Fetch {
sequence_set, sequence_set,
macro_or_item_names, macro_or_item_names,
modifiers,
uid, uid,
} => ctx.fetch(sequence_set, macro_or_item_names, uid).await, } => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
CommandBody::Search { CommandBody::Search {
charset, charset,
criteria, criteria,
@ -88,17 +91,33 @@ impl<'a> ExaminedContext<'a> {
self, self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
attributes: &'a MacroOrMessageDataItemNames<'static>, attributes: &'a MacroOrMessageDataItemNames<'static>,
modifiers: &[FetchModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await { let ap = AttributesProxy::new(attributes, modifiers, *uid);
Ok(resp) => Ok(( let mut changed_since: Option<NonZeroU64> = None;
Response::build() modifiers.iter().for_each(|m| match m {
.to_req(self.req) FetchModifier::ChangedSince(val) => {
.message("FETCH completed") changed_since = Some(*val);
.set_body(resp) },
.ok()?, });
flow::Transition::None,
)), match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
Ok(resp) => {
// Capabilities enabling logic only on successful command
// (according to my understanding of the spec)
self.client_capabilities.attributes_enable(&ap);
self.client_capabilities.fetch_modifiers_enable(modifiers);
Ok((
Response::build()
.to_req(self.req)
.message("FETCH completed")
.set_body(resp)
.ok()?,
flow::Transition::None,
))
},
Err(e) => Ok(( Err(e) => Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -115,7 +134,10 @@ impl<'a> ExaminedContext<'a> {
criteria: &SearchKey<'a>, criteria: &SearchKey<'a>,
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let found = self.mailbox.search(charset, criteria, *uid).await?; let (found, enable_condstore) = self.mailbox.search(charset, criteria, *uid).await?;
if enable_condstore {
self.client_capabilities.enable_condstore();
}
Ok(( Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -127,9 +149,9 @@ impl<'a> ExaminedContext<'a> {
} }
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.0.mailbox.force_sync().await?; self.mailbox.internal.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?; let updates = self.mailbox.update(UpdateParameters::default()).await?;
Ok(( Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)

View file

@ -1,7 +1,8 @@
use std::sync::Arc; use std::sync::Arc;
use std::num::NonZeroU64;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
@ -13,9 +14,9 @@ use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated, MailboxName}; use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response; use crate::imap::response::Response;
use crate::imap::attributes::AttributesProxy;
use crate::mail::user::User; use crate::mail::user::User;
pub struct SelectedContext<'a> { pub struct SelectedContext<'a> {
@ -43,8 +44,9 @@ pub async fn dispatch<'a>(
CommandBody::Fetch { CommandBody::Fetch {
sequence_set, sequence_set,
macro_or_item_names, macro_or_item_names,
modifiers,
uid, uid,
} => ctx.fetch(sequence_set, macro_or_item_names, uid).await, } => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
CommandBody::Search { CommandBody::Search {
charset, charset,
criteria, criteria,
@ -56,8 +58,9 @@ pub async fn dispatch<'a>(
kind, kind,
response, response,
flags, flags,
modifiers,
uid, uid,
} => ctx.store(sequence_set, kind, response, flags, uid).await, } => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await,
CommandBody::Copy { CommandBody::Copy {
sequence_set, sequence_set,
mailbox, mailbox,
@ -113,17 +116,34 @@ impl<'a> SelectedContext<'a> {
self, self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
attributes: &'a MacroOrMessageDataItemNames<'static>, attributes: &'a MacroOrMessageDataItemNames<'static>,
modifiers: &[FetchModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await { let ap = AttributesProxy::new(attributes, modifiers, *uid);
Ok(resp) => Ok(( let mut changed_since: Option<NonZeroU64> = None;
Response::build() modifiers.iter().for_each(|m| match m {
.to_req(self.req) FetchModifier::ChangedSince(val) => {
.message("FETCH completed") changed_since = Some(*val);
.set_body(resp) },
.ok()?, });
flow::Transition::None,
)), match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
Ok(resp) => {
// Capabilities enabling logic only on successful command
// (according to my understanding of the spec)
self.client_capabilities.attributes_enable(&ap);
self.client_capabilities.fetch_modifiers_enable(modifiers);
// Response to the client
Ok((
Response::build()
.to_req(self.req)
.message("FETCH completed")
.set_body(resp)
.ok()?,
flow::Transition::None,
))
},
Err(e) => Ok(( Err(e) => Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -140,7 +160,10 @@ impl<'a> SelectedContext<'a> {
criteria: &SearchKey<'a>, criteria: &SearchKey<'a>,
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let found = self.mailbox.search(charset, criteria, *uid).await?; let (found, enable_condstore) = self.mailbox.search(charset, criteria, *uid).await?;
if enable_condstore {
self.client_capabilities.enable_condstore();
}
Ok(( Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -152,9 +175,9 @@ impl<'a> SelectedContext<'a> {
} }
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.0.mailbox.force_sync().await?; self.mailbox.internal.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?; let updates = self.mailbox.update(UpdateParameters::default()).await?;
Ok(( Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -185,19 +208,39 @@ impl<'a> SelectedContext<'a> {
kind: &StoreType, kind: &StoreType,
response: &StoreResponse, response: &StoreResponse,
flags: &[Flag<'a>], flags: &[Flag<'a>],
modifiers: &[StoreModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let data = self let mut unchanged_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
StoreModifier::UnchangedSince(val) => {
unchanged_since = Some(*val);
},
});
let (data, modified) = self
.mailbox .mailbox
.store(sequence_set, kind, response, flags, uid) .store(sequence_set, kind, response, flags, unchanged_since, uid)
.await?; .await?;
Ok(( let mut ok_resp = Response::build()
Response::build()
.to_req(self.req) .to_req(self.req)
.message("STORE completed") .message("STORE completed")
.set_body(data) .set_body(data);
.ok()?,
match modified[..] {
[] => (),
[_head, ..] => {
let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::<Vec<_>>().join(","));
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes())));
},
};
self.client_capabilities.store_modifiers_enable(modifiers);
Ok((ok_resp.ok()?,
flow::Transition::None, flow::Transition::None,
)) ))
} }

View file

@ -1,9 +1,9 @@
use std::num::NonZeroU32; use std::num::{NonZeroU32, NonZeroU64};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use imap_codec::imap_types::sequence::{self, SeqOrUid, Sequence, SequenceSet}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::mail::uidindex::{ImapUid, UidIndex}; use crate::mail::uidindex::{ImapUid, ModSeq, UidIndex};
use crate::mail::unique_ident::UniqueIdent; use crate::mail::unique_ident::UniqueIdent;
pub struct Index<'a> { pub struct Index<'a> {
@ -17,12 +17,10 @@ impl<'a> Index<'a> {
.iter() .iter()
.enumerate() .enumerate()
.map(|(i_enum, (&uid, &uuid))| { .map(|(i_enum, (&uid, &uuid))| {
let flags = internal let (_, modseq, flags) = internal
.table .table
.get(&uuid) .get(&uuid)
.ok_or(anyhow!("mail is missing from index"))? .ok_or(anyhow!("mail is missing from index"))?;
.1
.as_ref();
let i_int: u32 = (i_enum + 1).try_into()?; let i_int: u32 = (i_enum + 1).try_into()?;
let i: NonZeroU32 = i_int.try_into()?; let i: NonZeroU32 = i_int.try_into()?;
@ -30,6 +28,7 @@ impl<'a> Index<'a> {
i, i,
uid, uid,
uuid, uuid,
modseq: *modseq,
flags, flags,
}) })
}) })
@ -61,10 +60,8 @@ impl<'a> Index<'a> {
if self.imap_index.is_empty() { if self.imap_index.is_empty() {
return vec![]; return vec![];
} }
let iter_strat = sequence::Strategy::Naive { let largest = self.last().expect("The mailbox is not empty").uid;
largest: self.last().expect("The mailbox is not empty").uid, let mut unroll_seq = sequence_set.iter(largest).collect::<Vec<_>>();
};
let mut unroll_seq = sequence_set.iter(iter_strat).collect::<Vec<_>>();
unroll_seq.sort(); unroll_seq.sort();
let start_seq = match unroll_seq.iter().next() { let start_seq = match unroll_seq.iter().next() {
@ -103,11 +100,9 @@ impl<'a> Index<'a> {
if self.imap_index.is_empty() { if self.imap_index.is_empty() {
return Ok(vec![]); return Ok(vec![]);
} }
let iter_strat = sequence::Strategy::Naive { let largest = NonZeroU32::try_from(self.imap_index.len() as u32)?;
largest: NonZeroU32::try_from(self.imap_index.len() as u32)?,
};
let mut acc = sequence_set let mut acc = sequence_set
.iter(iter_strat) .iter(largest)
.map(|wanted_id| { .map(|wanted_id| {
self.imap_index self.imap_index
.get((wanted_id.get() as usize) - 1) .get((wanted_id.get() as usize) - 1)
@ -131,6 +126,36 @@ impl<'a> Index<'a> {
_ => self.fetch_on_id(sequence_set), _ => self.fetch_on_id(sequence_set),
} }
} }
pub fn fetch_changed_since(
self: &'a Index<'a>,
sequence_set: &SequenceSet,
maybe_modseq: Option<NonZeroU64>,
by_uid: bool,
) -> Result<Vec<&'a MailIndex<'a>>> {
let raw = self.fetch(sequence_set, by_uid)?;
let res = match maybe_modseq {
Some(pit) => raw.into_iter().filter(|midx| midx.modseq > pit).collect(),
None => raw,
};
Ok(res)
}
pub fn fetch_unchanged_since(
self: &'a Index<'a>,
sequence_set: &SequenceSet,
maybe_modseq: Option<NonZeroU64>,
by_uid: bool,
) -> Result<(Vec<&'a MailIndex<'a>>, Vec<&'a MailIndex<'a>>)> {
let raw = self.fetch(sequence_set, by_uid)?;
let res = match maybe_modseq {
Some(pit) => raw.into_iter().partition(|midx| midx.modseq <= pit),
None => (raw, vec![]),
};
Ok(res)
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@ -138,6 +163,7 @@ pub struct MailIndex<'a> {
pub i: NonZeroU32, pub i: NonZeroU32,
pub uid: ImapUid, pub uid: ImapUid,
pub uuid: UniqueIdent, pub uuid: UniqueIdent,
pub modseq: ModSeq,
pub flags: &'a Vec<String>, pub flags: &'a Vec<String>,
} }

View file

@ -90,6 +90,7 @@ impl<'a> MailView<'a> {
Ok(body) Ok(body)
} }
MessageDataItemName::InternalDate => self.internal_date(), MessageDataItemName::InternalDate => self.internal_date(),
MessageDataItemName::ModSeq => Ok(self.modseq()),
}) })
.collect::<Result<Vec<_>, _>>()?; .collect::<Result<Vec<_>, _>>()?;
@ -252,6 +253,10 @@ impl<'a> MailView<'a> {
.ok_or(anyhow!("Unable to parse internal date"))?; .ok_or(anyhow!("Unable to parse internal date"))?;
Ok(MessageDataItem::InternalDate(DateTime::unvalidated(dt))) Ok(MessageDataItem::InternalDate(DateTime::unvalidated(dt)))
} }
fn modseq(&self) -> MessageDataItem<'static> {
MessageDataItem::ModSeq(self.in_idx.modseq)
}
} }
pub enum SeenFlag { pub enum SeenFlag {

View file

@ -1,21 +1,23 @@
use std::num::NonZeroU32; use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashSet;
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
use futures::stream::{FuturesOrdered, StreamExt}; use futures::stream::{FuturesOrdered, StreamExt};
use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItem}; use imap_codec::imap_types::fetch::MessageDataItem;
use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType}; use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType};
use imap_codec::imap_types::response::{Code, Data, Status}; use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::mail::unique_ident::UniqueIdent;
use crate::mail::mailbox::Mailbox; use crate::mail::mailbox::Mailbox;
use crate::mail::query::QueryScope; use crate::mail::query::QueryScope;
use crate::mail::snapshot::FrozenMailbox; use crate::mail::snapshot::FrozenMailbox;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity}; use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
use crate::imap::attributes::AttributesProxy; use crate::imap::attributes::AttributesProxy;
use crate::imap::flags; use crate::imap::flags;
@ -32,6 +34,21 @@ const DEFAULT_FLAGS: [Flag; 5] = [
Flag::Draft, Flag::Draft,
]; ];
pub struct UpdateParameters {
pub silence: HashSet<UniqueIdent>,
pub with_modseq: bool,
pub with_uid: bool,
}
impl Default for UpdateParameters {
fn default() -> Self {
Self {
silence: HashSet::new(),
with_modseq: false,
with_uid: false,
}
}
}
/// A MailboxView is responsible for giving the client the information /// A MailboxView is responsible for giving the client the information
/// it needs about a mailbox, such as an initial summary of the mailbox's /// it needs about a mailbox, such as an initial summary of the mailbox's
/// content and continuous updates indicating when the content /// content and continuous updates indicating when the content
@ -39,12 +56,18 @@ const DEFAULT_FLAGS: [Flag; 5] = [
/// To do this, it keeps a variable `known_state` that corresponds to /// To do this, it keeps a variable `known_state` that corresponds to
/// what the client knows, and produces IMAP messages to be sent to the /// what the client knows, and produces IMAP messages to be sent to the
/// client that go along updates to `known_state`. /// client that go along updates to `known_state`.
pub struct MailboxView(pub FrozenMailbox); pub struct MailboxView {
pub internal: FrozenMailbox,
pub is_condstore: bool,
}
impl MailboxView { impl MailboxView {
/// Creates a new IMAP view into a mailbox. /// Creates a new IMAP view into a mailbox.
pub async fn new(mailbox: Arc<Mailbox>) -> Self { pub async fn new(mailbox: Arc<Mailbox>, is_cond: bool) -> Self {
Self(mailbox.frozen().await) Self {
internal: mailbox.frozen().await,
is_condstore: is_cond,
}
} }
/// Create an updated view, useful to make a diff /// Create an updated view, useful to make a diff
@ -53,9 +76,9 @@ impl MailboxView {
/// what the client knows and what is actually in the mailbox. /// what the client knows and what is actually in the mailbox.
/// This does NOT trigger a sync, it bases itself on what is currently /// This does NOT trigger a sync, it bases itself on what is currently
/// loaded in RAM by Bayou. /// loaded in RAM by Bayou.
pub async fn update(&mut self) -> Result<Vec<Body<'static>>> { pub async fn update(&mut self, params: UpdateParameters) -> Result<Vec<Body<'static>>> {
let old_snapshot = self.0.update().await; let old_snapshot = self.internal.update().await;
let new_snapshot = &self.0.snapshot; let new_snapshot = &self.internal.snapshot;
let mut data = Vec::<Body>::new(); let mut data = Vec::<Body>::new();
@ -99,19 +122,31 @@ impl MailboxView {
} else { } else {
// - if flags changed for existing mails, tell client // - if flags changed for existing mails, tell client
for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() { for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() {
if params.silence.contains(uuid) {
continue;
}
let old_mail = old_snapshot.table.get(uuid); let old_mail = old_snapshot.table.get(uuid);
let new_mail = new_snapshot.table.get(uuid); let new_mail = new_snapshot.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail { if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, flags)) = new_mail { if let Some((uid, modseq, flags)) = new_mail {
let mut items = vec![
MessageDataItem::Flags(
flags.iter().filter_map(|f| flags::from_str(f)).collect(),
),
];
if params.with_uid {
items.push(MessageDataItem::Uid(*uid));
}
if params.with_modseq {
items.push(MessageDataItem::ModSeq(*modseq));
}
data.push(Body::Data(Data::Fetch { data.push(Body::Data(Data::Fetch {
seq: NonZeroU32::try_from((i + 1) as u32).unwrap(), seq: NonZeroU32::try_from((i + 1) as u32).unwrap(),
items: vec![ items: items.try_into()?,
MessageDataItem::Uid(*uid),
MessageDataItem::Flags(
flags.iter().filter_map(|f| flags::from_str(f)).collect(),
),
]
.try_into()?,
})); }));
} }
} }
@ -130,8 +165,11 @@ impl MailboxView {
data.extend(self.flags_status()?.into_iter()); data.extend(self.flags_status()?.into_iter());
data.push(self.uidvalidity_status()?); data.push(self.uidvalidity_status()?);
data.push(self.uidnext_status()?); data.push(self.uidnext_status()?);
self.unseen_first_status()? if self.is_condstore {
.map(|unseen_status| data.push(unseen_status)); data.push(self.highestmodseq_status()?);
}
/*self.unseen_first_status()?
.map(|unseen_status| data.push(unseen_status));*/
Ok(data) Ok(data)
} }
@ -140,50 +178,68 @@ impl MailboxView {
&mut self, &mut self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
kind: &StoreType, kind: &StoreType,
_response: &StoreResponse, response: &StoreResponse,
flags: &[Flag<'a>], flags: &[Flag<'a>],
unchanged_since: Option<NonZeroU64>,
is_uid_store: &bool, is_uid_store: &bool,
) -> Result<Vec<Body<'static>>> { ) -> Result<(Vec<Body<'static>>, Vec<NonZeroU32>)> {
self.0.sync().await?; self.internal.sync().await?;
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let idx = self.index()?; let idx = self.index()?;
let mails = idx.fetch(sequence_set, *is_uid_store)?; let (editable, in_conflict) = idx
for mi in mails.iter() { .fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
for mi in editable.iter() {
match kind { match kind {
StoreType::Add => { StoreType::Add => {
self.0.mailbox.add_flags(mi.uuid, &flags[..]).await?; self.internal.mailbox.add_flags(mi.uuid, &flags[..]).await?;
} }
StoreType::Remove => { StoreType::Remove => {
self.0.mailbox.del_flags(mi.uuid, &flags[..]).await?; self.internal.mailbox.del_flags(mi.uuid, &flags[..]).await?;
} }
StoreType::Replace => { StoreType::Replace => {
self.0.mailbox.set_flags(mi.uuid, &flags[..]).await?; self.internal.mailbox.set_flags(mi.uuid, &flags[..]).await?;
} }
} }
} }
// @TODO: handle _response let silence = match response {
self.update().await StoreResponse::Answer => HashSet::new(),
StoreResponse::Silent => editable.iter().map(|midx| midx.uuid).collect(),
};
let conflict_id_or_uid = match is_uid_store {
true => in_conflict.into_iter().map(|midx| midx.uid).collect(),
_ => in_conflict.into_iter().map(|midx| midx.i).collect(),
};
let summary = self.update(UpdateParameters {
with_uid: *is_uid_store,
with_modseq: unchanged_since.is_some(),
silence,
}).await?;
Ok((summary, conflict_id_or_uid))
} }
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> { pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
self.0.sync().await?; self.internal.sync().await?;
let state = self.0.peek().await; let state = self.internal.peek().await;
let deleted_flag = Flag::Deleted.to_string(); let deleted_flag = Flag::Deleted.to_string();
let msgs = state let msgs = state
.table .table
.iter() .iter()
.filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag)) .filter(|(_uuid, (_uid, _modseq, flags))| flags.iter().any(|x| *x == deleted_flag))
.map(|(uuid, _)| *uuid); .map(|(uuid, _)| *uuid);
for msg in msgs { for msg in msgs {
self.0.mailbox.delete(msg).await?; self.internal.mailbox.delete(msg).await?;
} }
self.update().await self.update(UpdateParameters::default()).await
} }
pub async fn copy( pub async fn copy(
@ -197,7 +253,7 @@ impl MailboxView {
let mut new_uuids = vec![]; let mut new_uuids = vec![];
for mi in mails.iter() { for mi in mails.iter() {
new_uuids.push(to.copy_from(&self.0.mailbox, mi.uuid).await?); new_uuids.push(to.copy_from(&self.internal.mailbox, mi.uuid).await?);
} }
let mut ret = vec![]; let mut ret = vec![];
@ -224,7 +280,7 @@ impl MailboxView {
let mails = idx.fetch(sequence_set, *is_uid_copy)?; let mails = idx.fetch(sequence_set, *is_uid_copy)?;
for mi in mails.iter() { for mi in mails.iter() {
to.move_from(&self.0.mailbox, mi.uuid).await?; to.move_from(&self.internal.mailbox, mi.uuid).await?;
} }
let mut ret = vec![]; let mut ret = vec![];
@ -238,7 +294,10 @@ impl MailboxView {
ret.push((mi.uid, dest_uid)); ret.push((mi.uid, dest_uid));
} }
let update = self.update().await?; let update = self.update(UpdateParameters {
with_uid: *is_uid_copy,
..UpdateParameters::default()
}).await?;
Ok((to_state.uidvalidity, ret, update)) Ok((to_state.uidvalidity, ret, update))
} }
@ -248,27 +307,32 @@ impl MailboxView {
pub async fn fetch<'b>( pub async fn fetch<'b>(
&self, &self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
attributes: &'b MacroOrMessageDataItemNames<'static>, ap: &AttributesProxy,
changed_since: Option<NonZeroU64>,
is_uid_fetch: &bool, is_uid_fetch: &bool,
) -> Result<Vec<Body<'static>>> { ) -> Result<Vec<Body<'static>>> {
// [1/6] Pre-compute data // [1/6] Pre-compute data
// a. what are the uuids of the emails we want? // a. what are the uuids of the emails we want?
// b. do we need to fetch the full body? // b. do we need to fetch the full body?
let ap = AttributesProxy::new(attributes, *is_uid_fetch); //let ap = AttributesProxy::new(attributes, *is_uid_fetch);
let query_scope = match ap.need_body() { let query_scope = match ap.need_body() {
true => QueryScope::Full, true => QueryScope::Full,
_ => QueryScope::Partial, _ => QueryScope::Partial,
}; };
tracing::debug!("Query scope {:?}", query_scope); tracing::debug!("Query scope {:?}", query_scope);
let idx = self.index()?; let idx = self.index()?;
let mail_idx_list = idx.fetch(sequence_set, *is_uid_fetch)?; let mail_idx_list = idx.fetch_changed_since(
sequence_set,
changed_since,
*is_uid_fetch
)?;
// [2/6] Fetch the emails // [2/6] Fetch the emails
let uuids = mail_idx_list let uuids = mail_idx_list
.iter() .iter()
.map(|midx| midx.uuid) .map(|midx| midx.uuid)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
let query_result = self.0.query(&uuids, query_scope).fetch().await?; let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
// [3/6] Derive an IMAP-specific view from the results, apply the filters // [3/6] Derive an IMAP-specific view from the results, apply the filters
let views = query_result let views = query_result
@ -294,7 +358,7 @@ impl MailboxView {
.filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd))
.map(|(mv, _seen)| async move { .map(|(mv, _seen)| async move {
let seen_flag = Flag::Seen.to_string(); let seen_flag = Flag::Seen.to_string();
self.0 self.internal
.mailbox .mailbox
.add_flags(*mv.query_result.uuid(), &[seen_flag]) .add_flags(*mv.query_result.uuid(), &[seen_flag])
.await?; .await?;
@ -316,7 +380,7 @@ impl MailboxView {
_charset: &Option<Charset<'a>>, _charset: &Option<Charset<'a>>,
search_key: &SearchKey<'a>, search_key: &SearchKey<'a>,
uid: bool, uid: bool,
) -> Result<Vec<Body<'static>>> { ) -> Result<(Vec<Body<'static>>, bool)> {
// 1. Compute the subset of sequence identifiers we need to fetch // 1. Compute the subset of sequence identifiers we need to fetch
// based on the search query // based on the search query
let crit = search::Criteria(search_key); let crit = search::Criteria(search_key);
@ -332,20 +396,30 @@ impl MailboxView {
// 4. Fetch additional info about the emails // 4. Fetch additional info about the emails
let query_scope = crit.query_scope(); let query_scope = crit.query_scope();
let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>(); let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>();
let query_result = self.0.query(&uuids, query_scope).fetch().await?; let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
// 5. If needed, filter the selection based on the body // 5. If needed, filter the selection based on the body
let kept_query = crit.filter_on_query(&to_fetch, &query_result)?; let kept_query = crit.filter_on_query(&to_fetch, &query_result)?;
// 6. Format the result according to the client's taste: // 6. Format the result according to the client's taste:
// either return UID or ID. // either return UID or ID.
let final_selection = kept_idx.into_iter().chain(kept_query.into_iter()); let final_selection = kept_idx.iter().chain(kept_query.iter());
let selection_fmt = match uid { let selection_fmt = match uid {
true => final_selection.map(|in_idx| in_idx.uid).collect(), true => final_selection.map(|in_idx| in_idx.uid).collect(),
_ => final_selection.map(|in_idx| in_idx.i).collect(), _ => final_selection.map(|in_idx| in_idx.i).collect(),
}; };
Ok(vec![Body::Data(Data::Search(selection_fmt))]) // 7. Add the modseq entry if needed
let is_modseq = crit.is_modseq();
let maybe_modseq = match is_modseq {
true => {
let final_selection = kept_idx.iter().chain(kept_query.iter());
final_selection.map(|in_idx| in_idx.modseq).max().map(|r| NonZeroU64::try_from(r)).transpose()?
},
_ => None,
};
Ok((vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], is_modseq))
} }
// ---- // ----
@ -354,7 +428,7 @@ impl MailboxView {
/// It's not trivial to refactor the code to do that, so we are doing /// It's not trivial to refactor the code to do that, so we are doing
/// some useless computation for now... /// some useless computation for now...
fn index<'a>(&'a self) -> Result<Index<'a>> { fn index<'a>(&'a self) -> Result<Index<'a>> {
Index::new(&self.0.snapshot) Index::new(&self.internal.snapshot)
} }
/// Produce an OK [UIDVALIDITY _] message corresponding to `known_state` /// Produce an OK [UIDVALIDITY _] message corresponding to `known_state`
@ -369,7 +443,7 @@ impl MailboxView {
} }
pub(crate) fn uidvalidity(&self) -> ImapUidvalidity { pub(crate) fn uidvalidity(&self) -> ImapUidvalidity {
self.0.snapshot.uidvalidity self.internal.snapshot.uidvalidity
} }
/// Produce an OK [UIDNEXT _] message corresponding to `known_state` /// Produce an OK [UIDNEXT _] message corresponding to `known_state`
@ -384,7 +458,19 @@ impl MailboxView {
} }
pub(crate) fn uidnext(&self) -> ImapUid { pub(crate) fn uidnext(&self) -> ImapUid {
self.0.snapshot.uidnext self.internal.snapshot.uidnext
}
pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> {
Ok(Body::Status(Status::ok(
None,
Some(Code::Other(CodeOther::unvalidated(format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes()))),
"Highest",
)?))
}
pub(crate) fn highestmodseq(&self) -> ModSeq {
self.internal.snapshot.highestmodseq
} }
/// Produce an EXISTS message corresponding to the number of mails /// Produce an EXISTS message corresponding to the number of mails
@ -394,7 +480,7 @@ impl MailboxView {
} }
pub(crate) fn exists(&self) -> Result<u32> { pub(crate) fn exists(&self) -> Result<u32> {
Ok(u32::try_from(self.0.snapshot.idx_by_uid.len())?) Ok(u32::try_from(self.internal.snapshot.idx_by_uid.len())?)
} }
/// Produce a RECENT message corresponding to the number of /// Produce a RECENT message corresponding to the number of
@ -403,6 +489,7 @@ impl MailboxView {
Ok(Body::Data(Data::Recent(self.recent()?))) Ok(Body::Data(Data::Recent(self.recent()?)))
} }
#[allow(dead_code)]
fn unseen_first_status(&self) -> Result<Option<Body<'static>>> { fn unseen_first_status(&self) -> Result<Option<Body<'static>>> {
Ok(self Ok(self
.unseen_first()? .unseen_first()?
@ -412,21 +499,22 @@ impl MailboxView {
.transpose()?) .transpose()?)
} }
#[allow(dead_code)]
fn unseen_first(&self) -> Result<Option<NonZeroU32>> { fn unseen_first(&self) -> Result<Option<NonZeroU32>> {
Ok(self Ok(self
.0 .internal
.snapshot .snapshot
.table .table
.values() .values()
.enumerate() .enumerate()
.find(|(_i, (_imap_uid, flags))| !flags.contains(&"\\Seen".to_string())) .find(|(_i, (_imap_uid, _modseq, flags))| !flags.contains(&"\\Seen".to_string()))
.map(|(i, _)| NonZeroU32::try_from(i as u32 + 1)) .map(|(i, _)| NonZeroU32::try_from(i as u32 + 1))
.transpose()?) .transpose()?)
} }
pub(crate) fn recent(&self) -> Result<u32> { pub(crate) fn recent(&self) -> Result<u32> {
let recent = self let recent = self
.0 .internal
.snapshot .snapshot
.idx_by_flag .idx_by_flag
.get(&"\\Recent".to_string()) .get(&"\\Recent".to_string())
@ -443,7 +531,7 @@ impl MailboxView {
// 1. Collecting all the possible flags in the mailbox // 1. Collecting all the possible flags in the mailbox
// 1.a Fetch them from our index // 1.a Fetch them from our index
let mut known_flags: Vec<Flag> = self let mut known_flags: Vec<Flag> = self
.0 .internal
.snapshot .snapshot
.idx_by_flag .idx_by_flag
.flags() .flags()
@ -483,9 +571,9 @@ impl MailboxView {
} }
pub(crate) fn unseen_count(&self) -> usize { pub(crate) fn unseen_count(&self) -> usize {
let total = self.0.snapshot.table.len(); let total = self.internal.snapshot.table.len();
let seen = self let seen = self
.0 .internal
.snapshot .snapshot
.idx_by_flag .idx_by_flag
.get(&Flag::Seen.to_string()) .get(&Flag::Seen.to_string())
@ -524,6 +612,7 @@ mod tests {
peek: false, peek: false,
}, },
]), ]),
&[],
false, false,
); );
@ -535,12 +624,13 @@ mod tests {
rfc822_size: 8usize, rfc822_size: 8usize,
}; };
let index_entry = (NonZeroU32::MIN, vec![]); let index_entry = (NonZeroU32::MIN, NonZeroU64::MIN, vec![]);
let mail_in_idx = MailIndex { let mail_in_idx = MailIndex {
i: NonZeroU32::MIN, i: NonZeroU32::MIN,
uid: index_entry.0, uid: index_entry.0,
modseq: index_entry.1,
uuid: unique_ident::gen_ident(), uuid: unique_ident::gen_ident(),
flags: &index_entry.1, flags: &index_entry.2,
}; };
let rfc822 = b"Subject: hello\r\nFrom: a@a.a\r\nTo: b@b.b\r\nDate: Thu, 12 Oct 2023 08:45:28 +0000\r\n\r\nhello world"; let rfc822 = b"Subject: hello\r\nFrom: a@a.a\r\nTo: b@b.b\r\nDate: Thu, 12 Oct 2023 08:45:28 +0000\r\n\r\nhello world";
let qr = QueryResult::FullResult { let qr = QueryResult::FullResult {

View file

@ -175,6 +175,11 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
} }
} }
}, },
flow => {
server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow);
}
}, },
// Managing response generated by Aerogramme // Managing response generated by Aerogramme

View file

@ -1,8 +1,8 @@
use std::num::NonZeroU32; use std::num::{NonZeroU32, NonZeroU64};
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::{SearchKey, MetadataItemSearch};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::imap::index::MailIndex; use crate::imap::index::MailIndex;
@ -112,6 +112,17 @@ impl<'a> Criteria<'a> {
} }
} }
pub fn is_modseq(&self) -> bool {
use SearchKey::*;
match self.0 {
And(and_list) => and_list.as_ref().iter().any(|child| Criteria(child).is_modseq()),
Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(),
Not(child) => Criteria(child).is_modseq(),
ModSeq { .. } => true,
_ => false,
}
}
/// Returns emails that we now for sure we want to keep /// Returns emails that we now for sure we want to keep
/// but also a second list of emails we need to investigate further by /// but also a second list of emails we need to investigate further by
/// fetching some remote data /// fetching some remote data
@ -176,6 +187,7 @@ impl<'a> Criteria<'a> {
// Sequence logic // Sequence logic
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(), maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(),
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, midx).into(),
// All the stuff we can't evaluate yet // All the stuff we can't evaluate yet
Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_) Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_)
@ -210,9 +222,10 @@ impl<'a> Criteria<'a> {
Not(expr) => !Criteria(expr).is_keep_on_query(mail_view), Not(expr) => !Criteria(expr).is_keep_on_query(mail_view),
All => true, All => true,
// Reevaluating our previous logic... //@FIXME Reevaluating our previous logic...
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx), maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx),
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
// Filter on mail meta // Filter on mail meta
Before(search_naive) => match mail_view.stored_naive_date() { Before(search_naive) => match mail_view.stored_naive_date() {
@ -318,7 +331,8 @@ fn approx_sequence_set_size(seq_set: &SequenceSet) -> u64 {
} }
// This is wrong as sequence UID can have holes, // This is wrong as sequence UID can have holes,
// as we don't know the number of messages in the mailbox also // as we don't know the number of messages in the mailbox also
// we gave to guess
fn approx_sequence_size(seq: &Sequence) -> u64 { fn approx_sequence_size(seq: &Sequence) -> u64 {
match seq { match seq {
Sequence::Single(_) => 1, Sequence::Single(_) => 1,
@ -458,3 +472,10 @@ fn is_keep_seq(sk: &SearchKey, midx: &MailIndex) -> bool {
_ => unreachable!(), _ => unreachable!(),
} }
} }
fn is_keep_modseq(filter: &Option<MetadataItemSearch>, modseq: &NonZeroU64, midx: &MailIndex) -> bool {
if filter.is_some() {
tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet");
}
modseq <= &midx.modseq
}

View file

@ -113,7 +113,7 @@ impl Mailbox {
msg: IMF<'a>, msg: IMF<'a>,
ident: Option<UniqueIdent>, ident: Option<UniqueIdent>,
flags: &[Flag], flags: &[Flag],
) -> Result<(ImapUidvalidity, ImapUid)> { ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
self.mbox.write().await.append(msg, ident, flags).await self.mbox.write().await.append(msg, ident, flags).await
} }
@ -271,7 +271,7 @@ impl MailboxInternal {
mail: IMF<'_>, mail: IMF<'_>,
ident: Option<UniqueIdent>, ident: Option<UniqueIdent>,
flags: &[Flag], flags: &[Flag],
) -> Result<(ImapUidvalidity, ImapUid)> { ) -> Result<(ImapUidvalidity, ImapUid, ModSeq)> {
let ident = ident.unwrap_or_else(gen_ident); let ident = ident.unwrap_or_else(gen_ident);
let message_key = gen_key(); let message_key = gen_key();
@ -312,14 +312,14 @@ impl MailboxInternal {
let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec()); let add_mail_op = uid_state.op_mail_add(ident, flags.to_vec());
let uidvalidity = uid_state.uidvalidity; let uidvalidity = uid_state.uidvalidity;
let uid = match add_mail_op { let (uid, modseq) = match add_mail_op {
UidIndexOp::MailAdd(_, uid, _) => uid, UidIndexOp::MailAdd(_, uid, modseq, _) => (uid, modseq),
_ => unreachable!(), _ => unreachable!(),
}; };
self.uid_index.push(add_mail_op).await?; self.uid_index.push(add_mail_op).await?;
Ok((uidvalidity, uid)) Ok((uidvalidity, uid, modseq))
} }
async fn append_from_s3<'a>( async fn append_from_s3<'a>(
@ -432,7 +432,7 @@ impl MailboxInternal {
.table .table
.get(&source_id) .get(&source_id)
.ok_or(anyhow!("Source mail not found"))? .ok_or(anyhow!("Source mail not found"))?
.1 .2
.clone(); .clone();
futures::try_join!( futures::try_join!(
@ -465,6 +465,9 @@ impl MailboxInternal {
} }
} }
// Can be useful to debug so we want this code
// to be available to developers
#[allow(dead_code)]
fn dump(uid_index: &Bayou<UidIndex>) { fn dump(uid_index: &Bayou<UidIndex>) {
let s = uid_index.state(); let s = uid_index.state();
println!("---- MAILBOX STATE ----"); println!("---- MAILBOX STATE ----");
@ -476,7 +479,7 @@ fn dump(uid_index: &Bayou<UidIndex>) {
"{} {} {}", "{} {} {}",
uid, uid,
hex::encode(ident.0), hex::encode(ident.0),
s.table.get(ident).cloned().unwrap().1.join(", ") s.table.get(ident).cloned().unwrap().2.join(", ")
); );
} }
println!(); println!();

View file

@ -125,13 +125,6 @@ impl QueryResult {
} }
} }
fn into_partial(self, metadata: MailMeta) -> Option<Self> {
match self {
Self::IndexResult { uuid } => Some(Self::PartialResult { uuid, metadata }),
_ => None,
}
}
fn into_full(self, content: Vec<u8>) -> Option<Self> { fn into_full(self, content: Vec<u8>) -> Option<Self> {
match self { match self {
Self::PartialResult { uuid, metadata } => Some(Self::FullResult { Self::PartialResult { uuid, metadata } => Some(Self::FullResult {

View file

@ -1,4 +1,4 @@
use std::num::NonZeroU32; use std::num::{NonZeroU32, NonZeroU64};
use im::{HashMap, OrdMap, OrdSet}; use im::{HashMap, OrdMap, OrdSet};
use serde::{Deserialize, Deserializer, Serialize, Serializer}; use serde::{Deserialize, Deserializer, Serialize, Serializer};
@ -6,10 +6,11 @@ use serde::{Deserialize, Deserializer, Serialize, Serializer};
use crate::bayou::*; use crate::bayou::*;
use crate::mail::unique_ident::UniqueIdent; use crate::mail::unique_ident::UniqueIdent;
pub type ModSeq = NonZeroU64;
pub type ImapUid = NonZeroU32; pub type ImapUid = NonZeroU32;
pub type ImapUidvalidity = NonZeroU32; pub type ImapUidvalidity = NonZeroU32;
pub type Flag = String; pub type Flag = String;
pub type IndexEntry = (ImapUid, Vec<Flag>); pub type IndexEntry = (ImapUid, ModSeq, Vec<Flag>);
/// A UidIndex handles the mutable part of a mailbox /// A UidIndex handles the mutable part of a mailbox
/// It is built by running the event log on it /// It is built by running the event log on it
@ -23,28 +24,33 @@ pub struct UidIndex {
// Indexes optimized for queries // Indexes optimized for queries
pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>, pub idx_by_uid: OrdMap<ImapUid, UniqueIdent>,
pub idx_by_modseq: OrdMap<ModSeq, UniqueIdent>,
pub idx_by_flag: FlagIndex, pub idx_by_flag: FlagIndex,
// Counters // "Public" Counters
pub uidvalidity: ImapUidvalidity, pub uidvalidity: ImapUidvalidity,
pub uidnext: ImapUid, pub uidnext: ImapUid,
pub highestmodseq: ModSeq,
// "Internal" Counters
pub internalseq: ImapUid, pub internalseq: ImapUid,
pub internalmodseq: ModSeq,
} }
#[derive(Clone, Serialize, Deserialize, Debug)] #[derive(Clone, Serialize, Deserialize, Debug)]
pub enum UidIndexOp { pub enum UidIndexOp {
MailAdd(UniqueIdent, ImapUid, Vec<Flag>), MailAdd(UniqueIdent, ImapUid, ModSeq, Vec<Flag>),
MailDel(UniqueIdent), MailDel(UniqueIdent),
FlagAdd(UniqueIdent, Vec<Flag>), FlagAdd(UniqueIdent, ModSeq, Vec<Flag>),
FlagDel(UniqueIdent, Vec<Flag>), FlagDel(UniqueIdent, ModSeq, Vec<Flag>),
FlagSet(UniqueIdent, Vec<Flag>), FlagSet(UniqueIdent, ModSeq, Vec<Flag>),
BumpUidvalidity(u32), BumpUidvalidity(u32),
} }
impl UidIndex { impl UidIndex {
#[must_use] #[must_use]
pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_mail_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::MailAdd(ident, self.internalseq, flags) UidIndexOp::MailAdd(ident, self.internalseq, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
@ -54,17 +60,17 @@ impl UidIndex {
#[must_use] #[must_use]
pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_flag_add(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::FlagAdd(ident, flags) UidIndexOp::FlagAdd(ident, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_flag_del(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::FlagDel(ident, flags) UidIndexOp::FlagDel(ident, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp { pub fn op_flag_set(&self, ident: UniqueIdent, flags: Vec<Flag>) -> UidIndexOp {
UidIndexOp::FlagSet(ident, flags) UidIndexOp::FlagSet(ident, self.internalmodseq, flags)
} }
#[must_use] #[must_use]
@ -74,18 +80,19 @@ impl UidIndex {
// INTERNAL functions to keep state consistent // INTERNAL functions to keep state consistent
fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, flags: &[Flag]) { fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, modseq: ModSeq, flags: &[Flag]) {
// Insert the email in our table // Insert the email in our table
self.table.insert(ident, (uid, flags.to_owned())); self.table.insert(ident, (uid, modseq, flags.to_owned()));
// Update the indexes/caches // Update the indexes/caches
self.idx_by_uid.insert(uid, ident); self.idx_by_uid.insert(uid, ident);
self.idx_by_flag.insert(uid, flags); self.idx_by_flag.insert(uid, flags);
self.idx_by_modseq.insert(modseq, ident);
} }
fn unreg_email(&mut self, ident: &UniqueIdent) { fn unreg_email(&mut self, ident: &UniqueIdent) {
// We do nothing if the mail does not exist // We do nothing if the mail does not exist
let (uid, flags) = match self.table.get(ident) { let (uid, modseq, flags) = match self.table.get(ident) {
Some(v) => v, Some(v) => v,
None => return, None => return,
}; };
@ -93,6 +100,7 @@ impl UidIndex {
// Delete all cache entries // Delete all cache entries
self.idx_by_uid.remove(uid); self.idx_by_uid.remove(uid);
self.idx_by_flag.remove(*uid, flags); self.idx_by_flag.remove(*uid, flags);
self.idx_by_modseq.remove(modseq);
// Remove from source of trust // Remove from source of trust
self.table.remove(ident); self.table.remove(ident);
@ -103,11 +111,17 @@ impl Default for UidIndex {
fn default() -> Self { fn default() -> Self {
Self { Self {
table: OrdMap::new(), table: OrdMap::new(),
idx_by_uid: OrdMap::new(), idx_by_uid: OrdMap::new(),
idx_by_modseq: OrdMap::new(),
idx_by_flag: FlagIndex::new(), idx_by_flag: FlagIndex::new(),
uidvalidity: NonZeroU32::new(1).unwrap(), uidvalidity: NonZeroU32::new(1).unwrap(),
uidnext: NonZeroU32::new(1).unwrap(), uidnext: NonZeroU32::new(1).unwrap(),
highestmodseq: NonZeroU64::new(1).unwrap(),
internalseq: NonZeroU32::new(1).unwrap(), internalseq: NonZeroU32::new(1).unwrap(),
internalmodseq: NonZeroU64::new(1).unwrap(),
} }
} }
} }
@ -118,17 +132,24 @@ impl BayouState for UidIndex {
fn apply(&self, op: &UidIndexOp) -> Self { fn apply(&self, op: &UidIndexOp) -> Self {
let mut new = self.clone(); let mut new = self.clone();
match op { match op {
UidIndexOp::MailAdd(ident, uid, flags) => { UidIndexOp::MailAdd(ident, uid, modseq, flags) => {
// Change UIDValidity if there is a conflict // Change UIDValidity if there is a UID conflict or a MODSEQ conflict
if *uid < new.internalseq { // @FIXME Need to prove that summing work
// The intuition: we increase the UIDValidity by the number of possible conflicts
if *uid < new.internalseq || *modseq < new.internalmodseq {
let bump_uid = new.internalseq.get() - uid.get();
let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + new.internalseq.get() - uid.get()) NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq)
.unwrap(); .unwrap();
} }
// Assign the real uid of the email // Assign the real uid of the email
let new_uid = new.internalseq; let new_uid = new.internalseq;
// Assign the real modseq of the email and its new flags
let new_modseq = new.internalmodseq;
// Delete the previous entry if any. // Delete the previous entry if any.
// Our proof has no assumption on `ident` uniqueness, // Our proof has no assumption on `ident` uniqueness,
// so we must handle this case even it is very unlikely // so we must handle this case even it is very unlikely
@ -137,10 +158,14 @@ impl BayouState for UidIndex {
new.unreg_email(ident); new.unreg_email(ident);
// We record our email and update ou caches // We record our email and update ou caches
new.reg_email(*ident, new_uid, flags); new.reg_email(*ident, new_uid, new_modseq, flags);
// Update counters // Update counters
new.highestmodseq = new.internalmodseq;
new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
new.uidnext = new.internalseq; new.uidnext = new.internalseq;
} }
UidIndexOp::MailDel(ident) => { UidIndexOp::MailDel(ident) => {
@ -150,8 +175,16 @@ impl BayouState for UidIndex {
// We update the counter // We update the counter
new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap(); new.internalseq = NonZeroU32::new(new.internalseq.get() + 1).unwrap();
} }
UidIndexOp::FlagAdd(ident, new_flags) => { UidIndexOp::FlagAdd(ident, candidate_modseq, new_flags) => {
if let Some((uid, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
.unwrap();
}
// Add flags to the source of trust and the cache // Add flags to the source of trust and the cache
let mut to_add: Vec<Flag> = new_flags let mut to_add: Vec<Flag> = new_flags
.iter() .iter()
@ -159,18 +192,48 @@ impl BayouState for UidIndex {
.cloned() .cloned()
.collect(); .collect();
new.idx_by_flag.insert(*uid, &to_add); new.idx_by_flag.insert(*uid, &to_add);
*email_modseq = new.internalmodseq;
new.idx_by_modseq.insert(new.internalmodseq, *ident);
existing_flags.append(&mut to_add); existing_flags.append(&mut to_add);
// Update counters
new.highestmodseq = new.internalmodseq;
new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
} }
} }
UidIndexOp::FlagDel(ident, rm_flags) => { UidIndexOp::FlagDel(ident, candidate_modseq, rm_flags) => {
if let Some((uid, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
.unwrap();
}
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
existing_flags.retain(|x| !rm_flags.contains(x)); existing_flags.retain(|x| !rm_flags.contains(x));
new.idx_by_flag.remove(*uid, rm_flags); new.idx_by_flag.remove(*uid, rm_flags);
// Register that email has been modified
new.idx_by_modseq.insert(new.internalmodseq, *ident);
*email_modseq = new.internalmodseq;
// Update counters
new.highestmodseq = new.internalmodseq;
new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
} }
} }
UidIndexOp::FlagSet(ident, new_flags) => { UidIndexOp::FlagSet(ident, candidate_modseq, new_flags) => {
if let Some((uid, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
.unwrap();
}
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags let (keep_flags, rm_flags): (Vec<String>, Vec<String>) = existing_flags
.iter() .iter()
@ -185,6 +248,14 @@ impl BayouState for UidIndex {
existing_flags.append(&mut to_add); existing_flags.append(&mut to_add);
new.idx_by_flag.remove(*uid, &rm_flags); new.idx_by_flag.remove(*uid, &rm_flags);
new.idx_by_flag.insert(*uid, &to_add); new.idx_by_flag.insert(*uid, &to_add);
// Register that email has been modified
new.idx_by_modseq.insert(new.internalmodseq, *ident);
*email_modseq = new.internalmodseq;
// Update counters
new.highestmodseq = new.internalmodseq;
new.internalmodseq = NonZeroU64::new(new.internalmodseq.get() + 1).unwrap();
} }
} }
UidIndexOp::BumpUidvalidity(count) => { UidIndexOp::BumpUidvalidity(count) => {
@ -238,10 +309,14 @@ impl FlagIndex {
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
struct UidIndexSerializedRepr { struct UidIndexSerializedRepr {
mails: Vec<(ImapUid, UniqueIdent, Vec<Flag>)>, mails: Vec<(ImapUid, ModSeq, UniqueIdent, Vec<Flag>)>,
uidvalidity: ImapUidvalidity, uidvalidity: ImapUidvalidity,
uidnext: ImapUid, uidnext: ImapUid,
highestmodseq: ModSeq,
internalseq: ImapUid, internalseq: ImapUid,
internalmodseq: ModSeq,
} }
impl<'de> Deserialize<'de> for UidIndex { impl<'de> Deserialize<'de> for UidIndex {
@ -253,16 +328,22 @@ impl<'de> Deserialize<'de> for UidIndex {
let mut uidindex = UidIndex { let mut uidindex = UidIndex {
table: OrdMap::new(), table: OrdMap::new(),
idx_by_uid: OrdMap::new(), idx_by_uid: OrdMap::new(),
idx_by_modseq: OrdMap::new(),
idx_by_flag: FlagIndex::new(), idx_by_flag: FlagIndex::new(),
uidvalidity: val.uidvalidity, uidvalidity: val.uidvalidity,
uidnext: val.uidnext, uidnext: val.uidnext,
highestmodseq: val.highestmodseq,
internalseq: val.internalseq, internalseq: val.internalseq,
internalmodseq: val.internalmodseq,
}; };
val.mails val.mails
.iter() .iter()
.for_each(|(u, i, f)| uidindex.reg_email(*i, *u, f)); .for_each(|(uid, modseq, uuid, flags)| uidindex.reg_email(*uuid, *uid, *modseq, flags));
Ok(uidindex) Ok(uidindex)
} }
@ -274,15 +355,17 @@ impl Serialize for UidIndex {
S: Serializer, S: Serializer,
{ {
let mut mails = vec![]; let mut mails = vec![];
for (ident, (uid, flags)) in self.table.iter() { for (ident, (uid, modseq, flags)) in self.table.iter() {
mails.push((*uid, *ident, flags.clone())); mails.push((*uid, *modseq, *ident, flags.clone()));
} }
let val = UidIndexSerializedRepr { let val = UidIndexSerializedRepr {
mails, mails,
uidvalidity: self.uidvalidity, uidvalidity: self.uidvalidity,
uidnext: self.uidnext, uidnext: self.uidnext,
highestmodseq: self.highestmodseq,
internalseq: self.internalseq, internalseq: self.internalseq,
internalmodseq: self.internalmodseq,
}; };
val.serialize(serializer) val.serialize(serializer)
@ -308,8 +391,9 @@ mod tests {
// Early checks // Early checks
assert_eq!(state.table.len(), 1); assert_eq!(state.table.len(), 1);
let (uid, flags) = state.table.get(&m).unwrap(); let (uid, modseq, flags) = state.table.get(&m).unwrap();
assert_eq!(*uid, NonZeroU32::new(1).unwrap()); assert_eq!(*uid, NonZeroU32::new(1).unwrap());
assert_eq!(*modseq, NonZeroU64::new(1).unwrap());
assert_eq!(flags.len(), 2); assert_eq!(flags.len(), 2);
let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap(); let ident = state.idx_by_uid.get(&NonZeroU32::new(1).unwrap()).unwrap();
assert_eq!(&m, ident); assert_eq!(&m, ident);
@ -364,7 +448,7 @@ mod tests {
{ {
let m = UniqueIdent([0x03; 24]); let m = UniqueIdent([0x03; 24]);
let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), f); let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU64::new(1).unwrap(), f);
state = state.apply(&ev); state = state.apply(&ev);
} }

View file

@ -2,6 +2,7 @@ use anyhow::Context;
mod common; mod common;
use crate::common::fragments::*; use crate::common::fragments::*;
use crate::common::constants::*;
fn main() { fn main() {
rfc3501_imap4rev1_base(); rfc3501_imap4rev1_base();
@ -9,10 +10,12 @@ fn main() {
rfc5161_imapext_enable(); rfc5161_imapext_enable();
rfc6851_imapext_move(); rfc6851_imapext_move();
rfc7888_imapext_literal(); rfc7888_imapext_literal();
rfc4551_imapext_condstore();
println!("✅ SUCCESS 🌟🚀🥳🙏🥹");
} }
fn rfc3501_imap4rev1_base() { fn rfc3501_imap4rev1_base() {
println!("rfc3501_imap4rev1_base"); println!("🧪 rfc3501_imap4rev1_base");
common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| { common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| {
connect(imap_socket).context("server says hello")?; connect(imap_socket).context("server says hello")?;
capability(imap_socket, Extension::None).context("check server capabilities")?; capability(imap_socket, Extension::None).context("check server capabilities")?;
@ -20,20 +23,26 @@ fn rfc3501_imap4rev1_base() {
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
// UNSUBSCRIBE IS NOT IMPLEMENTED YET // UNSUBSCRIBE IS NOT IMPLEMENTED YET
//unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?; //unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?;
select(imap_socket, Mailbox::Inbox, None).context("select inbox")?; let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS"));
check(imap_socket).context("check must run")?; check(imap_socket).context("check must run")?;
status_mailbox(imap_socket, Mailbox::Archive).context("status of archive from inbox")?; status(imap_socket, Mailbox::Archive, StatusKind::UidNext).context("status of archive from inbox")?;
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
noop_exists(imap_socket).context("noop loop must detect a new email")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
fetch_rfc822(imap_socket, Selection::FirstId, Email::Multipart)
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None)
.context("fetch rfc822 message, should be our first message")?; .context("fetch rfc822 message, should be our first message")?;
let orig_email = std::str::from_utf8(EMAIL1)?;
assert!(srv_msg.contains(orig_email));
copy(imap_socket, Selection::FirstId, Mailbox::Archive) copy(imap_socket, Selection::FirstId, Mailbox::Archive)
.context("copy message to the archive mailbox")?; .context("copy message to the archive mailbox")?;
append_email(imap_socket, Email::Basic).context("insert email in INBOX")?; append_email(imap_socket, Email::Basic).context("insert email in INBOX")?;
// SEARCH IS NOT IMPLEMENTED YET noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
//search(imap_socket).expect("search should return something"); search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something");
add_flags_email(imap_socket, Selection::FirstId, Flag::Deleted) store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None)
.context("should add delete flag to the email")?; .context("should add delete flag to the email")?;
expunge(imap_socket).context("expunge emails")?; expunge(imap_socket).context("expunge emails")?;
rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts) rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts)
@ -45,7 +54,7 @@ fn rfc3501_imap4rev1_base() {
} }
fn rfc3691_imapext_unselect() { fn rfc3691_imapext_unselect() {
println!("rfc3691_imapext_unselect"); println!("🧪 rfc3691_imapext_unselect");
common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| { common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| {
connect(imap_socket).context("server says hello")?; connect(imap_socket).context("server says hello")?;
@ -54,18 +63,26 @@ fn rfc3691_imapext_unselect() {
capability(imap_socket, Extension::Unselect).context("check server capabilities")?; capability(imap_socket, Extension::Unselect).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
select(imap_socket, Mailbox::Inbox, None).context("select inbox")?; let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
noop_exists(imap_socket).context("noop loop must detect a new email")?; assert!(select_res.contains("* 0 EXISTS"));
add_flags_email(imap_socket, Selection::FirstId, Flag::Deleted)
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None)
.context("add delete flags to the email")?; .context("add delete flags to the email")?;
unselect(imap_socket) unselect(imap_socket)
.context("unselect inbox while preserving email with the \\Delete flag")?; .context("unselect inbox while preserving email with the \\Delete flag")?;
select(imap_socket, Mailbox::Inbox, Some(1)).context("select inbox again")?; let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?;
fetch_rfc822(imap_socket, Selection::FirstId, Email::Basic) assert!(select_res.contains("* 1 EXISTS"));
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None)
.context("message is still present")?; .context("message is still present")?;
let orig_email = std::str::from_utf8(EMAIL2)?;
assert!(srv_msg.contains(orig_email));
close(imap_socket).context("close inbox and expunge message")?; close(imap_socket).context("close inbox and expunge message")?;
select(imap_socket, Mailbox::Inbox, Some(0)) let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None)
.context("select inbox again and check it's empty")?; .context("select inbox again and check it's empty")?;
assert!(select_res.contains("* 0 EXISTS"));
Ok(()) Ok(())
}) })
@ -73,7 +90,7 @@ fn rfc3691_imapext_unselect() {
} }
fn rfc5161_imapext_enable() { fn rfc5161_imapext_enable() {
println!("rfc5161_imapext_enable"); println!("🧪 rfc5161_imapext_enable");
common::aerogramme_provider_daemon_dev(|imap_socket, _lmtp_socket| { common::aerogramme_provider_daemon_dev(|imap_socket, _lmtp_socket| {
connect(imap_socket).context("server says hello")?; connect(imap_socket).context("server says hello")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
@ -87,26 +104,37 @@ fn rfc5161_imapext_enable() {
} }
fn rfc6851_imapext_move() { fn rfc6851_imapext_move() {
println!("rfc6851_imapext_move"); println!("🧪 rfc6851_imapext_move");
common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| { common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| {
connect(imap_socket).context("server says hello")?; connect(imap_socket).context("server says hello")?;
capability(imap_socket, Extension::Move).context("check server capabilities")?; capability(imap_socket, Extension::Move).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
select(imap_socket, Mailbox::Inbox, None).context("select inbox")?; let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS"));
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
noop_exists(imap_socket).context("noop loop must detect a new email")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
r#move(imap_socket, Selection::FirstId, Mailbox::Archive) r#move(imap_socket, Selection::FirstId, Mailbox::Archive)
.context("message from inbox moved to archive")?; .context("message from inbox moved to archive")?;
unselect(imap_socket) unselect(imap_socket)
.context("unselect inbox while preserving email with the \\Delete flag")?; .context("unselect inbox while preserving email with the \\Delete flag")?;
select(imap_socket, Mailbox::Archive, Some(1)).context("select archive")?; let select_res = select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?;
fetch_rfc822(imap_socket, Selection::FirstId, Email::Basic).context("check mail exists")?; assert!(select_res.contains("* 1 EXISTS"));
let srv_msg = fetch(
imap_socket,
Selection::FirstId,
FetchKind::Rfc822,
FetchMod::None,
).context("check mail exists")?;
let orig_email = std::str::from_utf8(EMAIL2)?;
assert!(srv_msg.contains(orig_email));
logout(imap_socket).context("must quit")?; logout(imap_socket).context("must quit")?;
Ok(()) Ok(())
@ -115,7 +143,7 @@ fn rfc6851_imapext_move() {
} }
fn rfc7888_imapext_literal() { fn rfc7888_imapext_literal() {
println!("rfc7888_imapext_literal"); println!("🧪 rfc7888_imapext_literal");
common::aerogramme_provider_daemon_dev(|imap_socket, _lmtp_socket| { common::aerogramme_provider_daemon_dev(|imap_socket, _lmtp_socket| {
connect(imap_socket).context("server says hello")?; connect(imap_socket).context("server says hello")?;
@ -126,3 +154,49 @@ fn rfc7888_imapext_literal() {
}) })
.expect("test fully run"); .expect("test fully run");
} }
fn rfc4551_imapext_condstore() {
println!("🧪 rfc4551_imapext_condstore");
common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| {
// Setup the test
connect(imap_socket).context("server says hello")?;
// RFC 3.1.1 Advertising Support for CONDSTORE
capability(imap_socket, Extension::Condstore).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?;
// RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?;
// RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE
assert!(select_res.contains("[HIGHESTMODSEQ 1]"));
// RFC 3.1.3. STORE and UID STORE Commands
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
let store_res = store(imap_socket, Selection::All, Flag::Important, StoreAction::AddFlags, StoreMod::UnchangedSince(1))?;
assert!(store_res.contains("[MODIFIED 2]"));
assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))"));
assert!(!store_res.contains("* 2 FETCH"));
assert_eq!(store_res.lines().count(), 2);
// RFC 3.1.4. FETCH and UID FETCH Commands
let fetch_res = fetch(imap_socket, Selection::All, FetchKind::Rfc822Size, FetchMod::ChangedSince(2))?;
assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))"));
assert!(!fetch_res.contains("* 2 FETCH"));
assert_eq!(store_res.lines().count(), 2);
// RFC 3.1.5. MODSEQ Search Criterion in SEARCH
let search_res = search(imap_socket, SearchKind::ModSeq(3))?;
// RFC 3.1.6. Modified SEARCH Untagged Response
assert!(search_res.contains("* SEARCH 1 (MODSEQ 3)"));
// RFC 3.1.7 HIGHESTMODSEQ Status Data Items
let status_res = status(imap_socket, Mailbox::Inbox, StatusKind::HighestModSeq)?;
assert!(status_res.contains("HIGHESTMODSEQ 3"));
Ok(())
})
.expect("test fully run");
}

View file

@ -34,7 +34,7 @@ pub enum Extension {
None, None,
Unselect, Unselect,
Move, Move,
CondStore, Condstore,
LiteralPlus, LiteralPlus,
} }
@ -63,6 +63,46 @@ pub enum Email {
pub enum Selection { pub enum Selection {
FirstId, FirstId,
SecondId, SecondId,
All,
}
pub enum SelectMod {
None,
Condstore,
}
pub enum StoreAction {
AddFlags,
DelFlags,
SetFlags,
AddFlagsSilent,
DelFlagsSilent,
SetFlagsSilent,
}
pub enum StoreMod {
None,
UnchangedSince(u64),
}
pub enum FetchKind {
Rfc822,
Rfc822Size,
}
pub enum FetchMod {
None,
ChangedSince(u64),
}
pub enum SearchKind<'a> {
Text(&'a str),
ModSeq(u64),
}
pub enum StatusKind {
UidNext,
HighestModSeq,
} }
pub fn capability(imap: &mut TcpStream, ext: Extension) -> Result<()> { pub fn capability(imap: &mut TcpStream, ext: Extension) -> Result<()> {
@ -72,7 +112,7 @@ pub fn capability(imap: &mut TcpStream, ext: Extension) -> Result<()> {
Extension::None => None, Extension::None => None,
Extension::Unselect => Some("UNSELECT"), Extension::Unselect => Some("UNSELECT"),
Extension::Move => Some("MOVE"), Extension::Move => Some("MOVE"),
Extension::CondStore => Some("CONDSTORE"), Extension::Condstore => Some("CONDSTORE"),
Extension::LiteralPlus => Some("LITERAL+"), Extension::LiteralPlus => Some("LITERAL+"),
}; };
@ -125,7 +165,7 @@ pub fn create_mailbox(imap: &mut TcpStream, mbx: Mailbox) -> Result<()> {
Ok(()) Ok(())
} }
pub fn select(imap: &mut TcpStream, mbx: Mailbox, maybe_exists: Option<u64>) -> Result<()> { pub fn select(imap: &mut TcpStream, mbx: Mailbox, modifier: SelectMod) -> Result<String> {
let mut buffer: [u8; 6000] = [0; 6000]; let mut buffer: [u8; 6000] = [0; 6000];
let mbx_str = match mbx { let mbx_str = match mbx {
@ -133,16 +173,18 @@ pub fn select(imap: &mut TcpStream, mbx: Mailbox, maybe_exists: Option<u64>) ->
Mailbox::Archive => "Archive", Mailbox::Archive => "Archive",
Mailbox::Drafts => "Drafts", Mailbox::Drafts => "Drafts",
}; };
imap.write(format!("20 select {}\r\n", mbx_str).as_bytes())?;
let mod_str = match modifier {
SelectMod::Condstore => " (CONDSTORE)",
SelectMod::None => "",
};
imap.write(format!("20 select {}{}\r\n", mbx_str, mod_str).as_bytes())?;
let read = read_lines(imap, &mut buffer, Some(&b"20 OK"[..]))?; let read = read_lines(imap, &mut buffer, Some(&b"20 OK"[..]))?;
let srv_msg = std::str::from_utf8(read)?; let srv_msg = std::str::from_utf8(read)?;
if let Some(exists) = maybe_exists {
let expected = format!("* {} EXISTS", exists);
assert!(srv_msg.contains(&expected));
}
Ok(()) Ok(srv_msg.to_string())
} }
pub fn unselect(imap: &mut TcpStream) -> Result<()> { pub fn unselect(imap: &mut TcpStream) -> Result<()> {
@ -162,13 +204,22 @@ pub fn check(imap: &mut TcpStream) -> Result<()> {
Ok(()) Ok(())
} }
pub fn status_mailbox(imap: &mut TcpStream, mbx: Mailbox) -> Result<()> { pub fn status(imap: &mut TcpStream, mbx: Mailbox, sk: StatusKind) -> Result<String> {
assert!(matches!(mbx, Mailbox::Archive)); let mbx_str = match mbx {
imap.write(&b"25 STATUS Archive (UIDNEXT MESSAGES)\r\n"[..])?; Mailbox::Inbox => "INBOX",
Mailbox::Archive => "Archive",
Mailbox::Drafts => "Drafts",
};
let sk_str = match sk {
StatusKind::UidNext => "(UIDNEXT)",
StatusKind::HighestModSeq => "(HIGHESTMODSEQ)",
};
imap.write(format!("25 STATUS {} {}\r\n", mbx_str, sk_str).as_bytes())?;
let mut buffer: [u8; 6000] = [0; 6000]; let mut buffer: [u8; 6000] = [0; 6000];
let _read = read_lines(imap, &mut buffer, Some(&b"25 OK"[..]))?; let read = read_lines(imap, &mut buffer, Some(&b"25 OK"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
Ok(()) Ok(srv_msg.to_string())
} }
pub fn lmtp_handshake(lmtp: &mut TcpStream) -> Result<()> { pub fn lmtp_handshake(lmtp: &mut TcpStream) -> Result<()> {
@ -206,7 +257,7 @@ pub fn lmtp_deliver_email(lmtp: &mut TcpStream, email_type: Email) -> Result<()>
Ok(()) Ok(())
} }
pub fn noop_exists(imap: &mut TcpStream) -> Result<()> { pub fn noop_exists(imap: &mut TcpStream, must_exists: u32) -> Result<()> {
let mut buffer: [u8; 6000] = [0; 6000]; let mut buffer: [u8; 6000] = [0; 6000];
let mut max_retry = 20; let mut max_retry = 20;
@ -216,35 +267,50 @@ pub fn noop_exists(imap: &mut TcpStream) -> Result<()> {
let read = read_lines(imap, &mut buffer, Some(&b"30 OK"[..]))?; let read = read_lines(imap, &mut buffer, Some(&b"30 OK"[..]))?;
let srv_msg = std::str::from_utf8(read)?; let srv_msg = std::str::from_utf8(read)?;
match (max_retry, srv_msg.lines().count()) { for line in srv_msg.lines() {
(_, cnt) if cnt > 1 => break, if line.contains("EXISTS") {
(0, _) => bail!("no more retry"), let got = read_first_u32(line)?;
_ => (), if got == must_exists {
// Done
return Ok(());
}
}
}
if max_retry <= 0 {
// Failed
bail!("no more retry");
} }
thread::sleep(SMALL_DELAY); thread::sleep(SMALL_DELAY);
} }
Ok(())
} }
pub fn fetch_rfc822(imap: &mut TcpStream, selection: Selection, r#ref: Email) -> Result<()> { pub fn fetch(imap: &mut TcpStream, selection: Selection, kind: FetchKind, modifier: FetchMod) -> Result<String> {
let mut buffer: [u8; 65535] = [0; 65535]; let mut buffer: [u8; 65535] = [0; 65535];
assert!(matches!(selection, Selection::FirstId)); let sel_str = match selection {
imap.write(&b"40 fetch 1 rfc822\r\n"[..])?; Selection::FirstId => "1",
Selection::SecondId => "2",
Selection::All => "1:*",
};
let kind_str = match kind {
FetchKind::Rfc822 => "RFC822",
FetchKind::Rfc822Size => "RFC822.SIZE",
};
let mod_str = match modifier {
FetchMod::None => "".into(),
FetchMod::ChangedSince(val) => format!(" (CHANGEDSINCE {})", val),
};
imap.write(format!("40 fetch {} {}{}\r\n", sel_str, kind_str, mod_str).as_bytes())?;
let read = read_lines(imap, &mut buffer, Some(&b"40 OK FETCH"[..]))?; let read = read_lines(imap, &mut buffer, Some(&b"40 OK FETCH"[..]))?;
let srv_msg = std::str::from_utf8(read)?; let srv_msg = std::str::from_utf8(read)?;
let ref_mail = match r#ref { Ok(srv_msg.to_string())
Email::Basic => EMAIL2,
Email::Multipart => EMAIL1,
};
let orig_email = std::str::from_utf8(ref_mail)?;
assert!(srv_msg.contains(orig_email));
Ok(())
} }
pub fn copy(imap: &mut TcpStream, selection: Selection, to: Mailbox) -> Result<()> { pub fn copy(imap: &mut TcpStream, selection: Selection, to: Mailbox) -> Result<()> {
@ -281,29 +347,59 @@ pub fn append_email(imap: &mut TcpStream, content: Email) -> Result<()> {
let read = read_lines(imap, &mut buffer, None)?; let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"47 OK"[..]); assert_eq!(&read[..5], &b"47 OK"[..]);
// we check that noop detects the change
noop_exists(imap)?;
Ok(()) Ok(())
} }
pub fn add_flags_email(imap: &mut TcpStream, selection: Selection, flag: Flag) -> Result<()> { pub fn search(imap: &mut TcpStream, sk: SearchKind) -> Result<String> {
let sk_str = match sk {
SearchKind::Text(x) => format!("TEXT \"{}\"", x),
SearchKind::ModSeq(x) => format!("MODSEQ {}", x),
};
imap.write(format!("55 SEARCH {}\r\n", sk_str).as_bytes())?;
let mut buffer: [u8; 1500] = [0; 1500]; let mut buffer: [u8; 1500] = [0; 1500];
assert!(matches!(selection, Selection::FirstId)); let read = read_lines(imap, &mut buffer, Some(&b"55 OK"[..]))?;
assert!(matches!(flag, Flag::Deleted)); let srv_msg = std::str::from_utf8(read)?;
imap.write(&b"50 store 1 +FLAGS (\\Deleted)\r\n"[..])?; Ok(srv_msg.to_string())
let _read = read_lines(imap, &mut buffer, Some(&b"50 OK STORE"[..]))?;
Ok(())
} }
#[allow(dead_code)] pub fn store(
/// Not yet implemented imap: &mut TcpStream,
pub fn search(imap: &mut TcpStream) -> Result<()> { sel: Selection,
imap.write(&b"55 search text \"OoOoO\"\r\n"[..])?; flag: Flag,
let mut buffer: [u8; 1500] = [0; 1500]; action: StoreAction,
let _read = read_lines(imap, &mut buffer, Some(&b"55 OK SEARCH"[..]))?; modifier: StoreMod
Ok(()) ) -> Result<String> {
let mut buffer: [u8; 6000] = [0; 6000];
let seq = match sel {
Selection::FirstId => "1",
Selection::SecondId => "2",
Selection::All => "1:*",
};
let modif = match modifier {
StoreMod::None => "".into(),
StoreMod::UnchangedSince(val) => format!(" (UNCHANGEDSINCE {})", val),
};
let flags_str = match flag {
Flag::Deleted => "(\\Deleted)",
Flag::Important => "(\\Important)",
};
let action_str = match action {
StoreAction::AddFlags => "+FLAGS",
StoreAction::DelFlags => "-FLAGS",
StoreAction::SetFlags => "FLAGS",
StoreAction::AddFlagsSilent => "+FLAGS.SILENT",
StoreAction::DelFlagsSilent => "-FLAGS.SILENT",
StoreAction::SetFlagsSilent => "FLAGS.SILENT",
};
imap.write(format!("57 STORE {}{} {} {}\r\n", seq, modif, action_str, flags_str).as_bytes())?;
let read = read_lines(imap, &mut buffer, Some(&b"57 OK"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
Ok(srv_msg.to_string())
} }
pub fn expunge(imap: &mut TcpStream) -> Result<()> { pub fn expunge(imap: &mut TcpStream) -> Result<()> {

View file

@ -88,3 +88,12 @@ pub fn read_lines<'a, F: Read>(
println!("read: {}", std::str::from_utf8(&buffer[..nbytes])?); println!("read: {}", std::str::from_utf8(&buffer[..nbytes])?);
Ok(&buffer[..nbytes]) Ok(&buffer[..nbytes])
} }
pub fn read_first_u32(inp: &str) -> Result<u32> {
Ok(inp
.chars()
.skip_while(|c| !c.is_digit(10))
.take_while(|c| c.is_digit(10))
.collect::<String>()
.parse::<u32>()?)
}