CONDSTORE #71

Merged
quentin merged 21 commits from feat/condstore-try-2 into main 2024-01-15 07:07:07 +00:00
5 changed files with 147 additions and 30 deletions
Showing only changes of commit 3c7186ab5a - Show all commits

View file

@ -1,4 +1,5 @@
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section}; use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
use imap_codec::imap_types::command::FetchModifier;
/// Internal decisions based on fetched attributes /// Internal decisions based on fetched attributes
/// passed by the client /// passed by the client
@ -7,7 +8,7 @@ pub struct AttributesProxy {
pub attrs: Vec<MessageDataItemName<'static>>, pub attrs: Vec<MessageDataItemName<'static>>,
} }
impl AttributesProxy { impl AttributesProxy {
pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self { pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self {
// Expand macros // Expand macros
let mut fetch_attrs = match attrs { let mut fetch_attrs = match attrs {
MacroOrMessageDataItemNames::Macro(m) => { MacroOrMessageDataItemNames::Macro(m) => {
@ -31,6 +32,14 @@ impl AttributesProxy {
fetch_attrs.push(MessageDataItemName::Uid); fetch_attrs.push(MessageDataItemName::Uid);
} }
// Handle inferred MODSEQ tag
let is_changed_since = modifiers
.iter()
.any(|m| matches!(m, FetchModifier::ChangedSince(..)));
if is_changed_since && !fetch_attrs.contains(&MessageDataItemName::ModSeq) {
fetch_attrs.push(MessageDataItemName::ModSeq);
}
Self { attrs: fetch_attrs } Self { attrs: fetch_attrs }
} }

View file

@ -1,4 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use std::num::NonZeroU64;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier};
@ -11,7 +12,7 @@ use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated}; use crate::imap::command::{anystate, authenticated};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response; use crate::imap::response::Response;
use crate::mail::user::User; use crate::mail::user::User;
@ -93,9 +94,15 @@ impl<'a> ExaminedContext<'a> {
modifiers: &[FetchModifier], modifiers: &[FetchModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let ap = AttributesProxy::new(attributes, *uid); let ap = AttributesProxy::new(attributes, modifiers, *uid);
let mut changed_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
FetchModifier::ChangedSince(val) => {
changed_since = Some(*val);
},
});
match self.mailbox.fetch(sequence_set, &ap, uid).await { match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
Ok(resp) => { Ok(resp) => {
// Capabilities enabling logic only on successful command // Capabilities enabling logic only on successful command
// (according to my understanding of the spec) // (according to my understanding of the spec)
@ -144,7 +151,7 @@ impl<'a> ExaminedContext<'a> {
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.internal.mailbox.force_sync().await?; self.mailbox.internal.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?; let updates = self.mailbox.update(UpdateParameters::default()).await?;
Ok(( Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)

View file

@ -1,4 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use std::num::NonZeroU64;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
@ -13,7 +14,7 @@ use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated, MailboxName}; use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response; use crate::imap::response::Response;
use crate::imap::attributes::AttributesProxy; use crate::imap::attributes::AttributesProxy;
use crate::mail::user::User; use crate::mail::user::User;
@ -118,9 +119,15 @@ impl<'a> SelectedContext<'a> {
modifiers: &[FetchModifier], modifiers: &[FetchModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let ap = AttributesProxy::new(attributes, *uid); let ap = AttributesProxy::new(attributes, modifiers, *uid);
let mut changed_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
FetchModifier::ChangedSince(val) => {
changed_since = Some(*val);
},
});
match self.mailbox.fetch(sequence_set, &ap, uid).await { match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
Ok(resp) => { Ok(resp) => {
// Capabilities enabling logic only on successful command // Capabilities enabling logic only on successful command
// (according to my understanding of the spec) // (according to my understanding of the spec)
@ -170,7 +177,7 @@ impl<'a> SelectedContext<'a> {
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.internal.mailbox.force_sync().await?; self.mailbox.internal.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?; let updates = self.mailbox.update(UpdateParameters::default()).await?;
Ok(( Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -204,10 +211,18 @@ impl<'a> SelectedContext<'a> {
modifiers: &[StoreModifier], modifiers: &[StoreModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let data = self let mut unchanged_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
StoreModifier::UnchangedSince(val) => {
unchanged_since = Some(*val);
},
});
let (data, modified) = self
.mailbox .mailbox
.store(sequence_set, kind, response, flags, uid) .store(sequence_set, kind, response, flags, unchanged_since, uid)
.await?; .await?;
let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::<Vec<_>>().join(","));
self.client_capabilities.store_modifiers_enable(modifiers); self.client_capabilities.store_modifiers_enable(modifiers);
@ -215,6 +230,7 @@ impl<'a> SelectedContext<'a> {
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
.message("STORE completed") .message("STORE completed")
.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes())))
.set_body(data) .set_body(data)
.ok()?, .ok()?,
flow::Transition::None, flow::Transition::None,

View file

@ -1,4 +1,4 @@
use std::num::NonZeroU32; use std::num::{NonZeroU32, NonZeroU64};
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
@ -126,6 +126,36 @@ impl<'a> Index<'a> {
_ => self.fetch_on_id(sequence_set), _ => self.fetch_on_id(sequence_set),
} }
} }
pub fn fetch_changed_since(
self: &'a Index<'a>,
sequence_set: &SequenceSet,
maybe_modseq: Option<NonZeroU64>,
by_uid: bool,
) -> Result<Vec<&'a MailIndex<'a>>> {
let raw = self.fetch(sequence_set, by_uid)?;
let res = match maybe_modseq {
Some(pit) => raw.into_iter().filter(|midx| midx.modseq > pit).collect(),
None => raw,
};
Ok(res)
}
pub fn fetch_unchanged_since(
self: &'a Index<'a>,
sequence_set: &SequenceSet,
maybe_modseq: Option<NonZeroU64>,
by_uid: bool,
) -> Result<(Vec<&'a MailIndex<'a>>, Vec<&'a MailIndex<'a>>)> {
let raw = self.fetch(sequence_set, by_uid)?;
let res = match maybe_modseq {
Some(pit) => raw.into_iter().partition(|midx| midx.modseq <= pit),
None => (raw, vec![]),
};
Ok(res)
}
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]

View file

@ -1,5 +1,6 @@
use std::num::{NonZeroU32, NonZeroU64}; use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashSet;
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
@ -12,6 +13,7 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::mail::unique_ident::UniqueIdent;
use crate::mail::mailbox::Mailbox; use crate::mail::mailbox::Mailbox;
use crate::mail::query::QueryScope; use crate::mail::query::QueryScope;
use crate::mail::snapshot::FrozenMailbox; use crate::mail::snapshot::FrozenMailbox;
@ -32,6 +34,21 @@ const DEFAULT_FLAGS: [Flag; 5] = [
Flag::Draft, Flag::Draft,
]; ];
pub struct UpdateParameters {
pub silence: HashSet<UniqueIdent>,
pub with_modseq: bool,
pub with_uid: bool,
}
impl Default for UpdateParameters {
fn default() -> Self {
Self {
silence: HashSet::new(),
with_modseq: false,
with_uid: false,
}
}
}
/// A MailboxView is responsible for giving the client the information /// A MailboxView is responsible for giving the client the information
/// it needs about a mailbox, such as an initial summary of the mailbox's /// it needs about a mailbox, such as an initial summary of the mailbox's
/// content and continuous updates indicating when the content /// content and continuous updates indicating when the content
@ -59,7 +76,7 @@ impl MailboxView {
/// what the client knows and what is actually in the mailbox. /// what the client knows and what is actually in the mailbox.
/// This does NOT trigger a sync, it bases itself on what is currently /// This does NOT trigger a sync, it bases itself on what is currently
/// loaded in RAM by Bayou. /// loaded in RAM by Bayou.
pub async fn update(&mut self) -> Result<Vec<Body<'static>>> { pub async fn update(&mut self, params: UpdateParameters) -> Result<Vec<Body<'static>>> {
let old_snapshot = self.internal.update().await; let old_snapshot = self.internal.update().await;
let new_snapshot = &self.internal.snapshot; let new_snapshot = &self.internal.snapshot;
@ -105,19 +122,31 @@ impl MailboxView {
} else { } else {
// - if flags changed for existing mails, tell client // - if flags changed for existing mails, tell client
for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() { for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() {
if params.silence.contains(uuid) {
continue;
}
let old_mail = old_snapshot.table.get(uuid); let old_mail = old_snapshot.table.get(uuid);
let new_mail = new_snapshot.table.get(uuid); let new_mail = new_snapshot.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail { if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, _modseq, flags)) = new_mail { if let Some((uid, modseq, flags)) = new_mail {
data.push(Body::Data(Data::Fetch { let mut items = vec![
seq: NonZeroU32::try_from((i + 1) as u32).unwrap(),
items: vec![
MessageDataItem::Uid(*uid),
MessageDataItem::Flags( MessageDataItem::Flags(
flags.iter().filter_map(|f| flags::from_str(f)).collect(), flags.iter().filter_map(|f| flags::from_str(f)).collect(),
), ),
] ];
.try_into()?,
if params.with_uid {
items.push(MessageDataItem::Uid(*uid));
}
if params.with_modseq {
items.push(MessageDataItem::ModSeq(*modseq));
}
data.push(Body::Data(Data::Fetch {
seq: NonZeroU32::try_from((i + 1) as u32).unwrap(),
items: items.try_into()?,
})); }));
} }
} }
@ -149,17 +178,20 @@ impl MailboxView {
&mut self, &mut self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
kind: &StoreType, kind: &StoreType,
_response: &StoreResponse, response: &StoreResponse,
flags: &[Flag<'a>], flags: &[Flag<'a>],
unchanged_since: Option<NonZeroU64>,
is_uid_store: &bool, is_uid_store: &bool,
) -> Result<Vec<Body<'static>>> { ) -> Result<(Vec<Body<'static>>, Vec<NonZeroU32>)> {
self.internal.sync().await?; self.internal.sync().await?;
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let idx = self.index()?; let idx = self.index()?;
let mails = idx.fetch(sequence_set, *is_uid_store)?; let (editable, in_conflict) = idx
for mi in mails.iter() { .fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
for mi in editable.iter() {
match kind { match kind {
StoreType::Add => { StoreType::Add => {
self.internal.mailbox.add_flags(mi.uuid, &flags[..]).await?; self.internal.mailbox.add_flags(mi.uuid, &flags[..]).await?;
@ -173,8 +205,23 @@ impl MailboxView {
} }
} }
// @TODO: handle _response let silence = match response {
self.update().await StoreResponse::Answer => HashSet::new(),
StoreResponse::Silent => editable.iter().map(|midx| midx.uuid).collect(),
};
let conflict_id_or_uid = match is_uid_store {
true => in_conflict.into_iter().map(|midx| midx.uid).collect(),
_ => in_conflict.into_iter().map(|midx| midx.i).collect(),
};
let summary = self.update(UpdateParameters {
with_uid: *is_uid_store,
with_modseq: unchanged_since.is_some(),
silence,
}).await?;
Ok((summary, conflict_id_or_uid))
} }
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> { pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
@ -192,7 +239,7 @@ impl MailboxView {
self.internal.mailbox.delete(msg).await?; self.internal.mailbox.delete(msg).await?;
} }
self.update().await self.update(UpdateParameters::default()).await
} }
pub async fn copy( pub async fn copy(
@ -247,7 +294,10 @@ impl MailboxView {
ret.push((mi.uid, dest_uid)); ret.push((mi.uid, dest_uid));
} }
let update = self.update().await?; let update = self.update(UpdateParameters {
with_uid: *is_uid_copy,
..UpdateParameters::default()
}).await?;
Ok((to_state.uidvalidity, ret, update)) Ok((to_state.uidvalidity, ret, update))
} }
@ -258,6 +308,7 @@ impl MailboxView {
&self, &self,
sequence_set: &SequenceSet, sequence_set: &SequenceSet,
ap: &AttributesProxy, ap: &AttributesProxy,
changed_since: Option<NonZeroU64>,
is_uid_fetch: &bool, is_uid_fetch: &bool,
) -> Result<Vec<Body<'static>>> { ) -> Result<Vec<Body<'static>>> {
// [1/6] Pre-compute data // [1/6] Pre-compute data
@ -270,7 +321,11 @@ impl MailboxView {
}; };
tracing::debug!("Query scope {:?}", query_scope); tracing::debug!("Query scope {:?}", query_scope);
let idx = self.index()?; let idx = self.index()?;
let mail_idx_list = idx.fetch(sequence_set, *is_uid_fetch)?; let mail_idx_list = idx.fetch_changed_since(
sequence_set,
changed_since,
*is_uid_fetch
)?;
// [2/6] Fetch the emails // [2/6] Fetch the emails
let uuids = mail_idx_list let uuids = mail_idx_list