WIP rewrite with a query manager

This commit is contained in:
Quentin 2024-01-05 18:59:19 +01:00
parent adf4d33f22
commit 4806f7ff84
Signed by: quentin
GPG key ID: E9602264D639FF68
7 changed files with 194 additions and 144 deletions

View file

@ -125,7 +125,7 @@ impl<'a> ExaminedContext<'a> {
}
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
self.mailbox.0.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
Ok((

View file

@ -152,7 +152,7 @@ impl<'a> SelectedContext<'a> {
}
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
self.mailbox.0.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
Ok((

View file

@ -1,6 +1,6 @@
use std::num::NonZeroU32;
use anyhow::{anyhow, bail, Result};
use anyhow::{anyhow, bail, Result, Context};
use chrono::{Offset, TimeZone, Utc};
use imap_codec::imap_types::core::{IString, NString};
@ -22,16 +22,31 @@ use crate::imap::imf_view::message_envelope;
use crate::imap::mailbox_view::MailIdentifiers;
use crate::imap::mime_view;
use crate::imap::response::Body;
use crate::mail::mailbox::MailMeta;
use crate::mail::query::QueryResult;
pub struct MailView<'a> {
pub ids: &'a MailIdentifiers,
pub meta: &'a MailMeta,
pub flags: &'a Vec<String>,
pub query_result: &'a QueryResult<'a>,
pub content: FetchedMail<'a>,
}
impl<'a> MailView<'a> {
pub fn new(query_result: &'a QueryResult<'a>) -> Result<Self> {
Ok(Self {
query_result,
content: match query_result {
QueryResult::FullResult { content, .. } => {
let (_, parsed) = eml_codec::parse_message(content).context("Invalid mail body")?;
FetchedMail::new_from_message(parsed)
},
QueryResult::PartialResult { metadata, .. } => {
let (_, parsed) = eml_codec::parse_imf(&metadata.headers).context("Invalid mail headers")?;
FetchedMail::Partial(parsed)
}
QueryResult::IndexResult { .. } => FetchedMail::None,
}
})
}
fn uid(&self) -> MessageDataItem<'static> {
MessageDataItem::Uid(self.ids.uid.clone())
}
@ -193,6 +208,7 @@ pub enum SeenFlag {
// -------------------
pub enum FetchedMail<'a> {
None,
Partial(imf::Imf<'a>),
Full(AnyPart<'a>),
}

View file

@ -12,15 +12,18 @@ use imap_codec::imap_types::response::{Code, Data, Status};
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::{self, SequenceSet};
use crate::mail::mailbox::Mailbox;
use crate::mail::snapshot::FrozenMailbox;
use crate::mail::query::QueryScope;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity};
use crate::mail::unique_ident::UniqueIdent;
use crate::imap::attributes::AttributesProxy;
use crate::imap::flags;
use crate::imap::mail_view::SeenFlag;
use crate::imap::mail_view::{MailView, SeenFlag};
use crate::imap::response::Body;
use crate::imap::search;
use crate::imap::selectors::MailSelectionBuilder;
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, UidIndex};
use crate::mail::unique_ident::UniqueIdent;
const DEFAULT_FLAGS: [Flag; 5] = [
Flag::Seen,
@ -37,20 +40,12 @@ const DEFAULT_FLAGS: [Flag; 5] = [
/// To do this, it keeps a variable `known_state` that corresponds to
/// what the client knows, and produces IMAP messages to be sent to the
/// client that go along updates to `known_state`.
pub struct MailboxView {
pub(crate) mailbox: Arc<Mailbox>,
known_state: UidIndex,
}
pub struct MailboxView (pub FrozenMailbox);
impl MailboxView {
/// Creates a new IMAP view into a mailbox.
pub async fn new(mailbox: Arc<Mailbox>) -> Self {
let state = mailbox.current_uid_index().await;
Self {
mailbox,
known_state: state,
}
Self(mailbox.frozen().await)
}
/// Create an updated view, useful to make a diff
@ -60,11 +55,8 @@ impl MailboxView {
/// This does NOT trigger a sync, it bases itself on what is currently
/// loaded in RAM by Bayou.
pub async fn update(&mut self) -> Result<Vec<Body<'static>>> {
let old_view: &mut Self = self;
let new_view = Self {
mailbox: old_view.mailbox.clone(),
known_state: old_view.mailbox.current_uid_index().await,
};
let old_snapshot = self.0.update().await;
let new_snapshot = &self.0.snapshot;
let mut data = Vec::<Body>::new();
@ -85,8 +77,8 @@ impl MailboxView {
// - notify client of expunged mails
let mut n_expunge = 0;
for (i, (_uid, uuid)) in old_view.known_state.idx_by_uid.iter().enumerate() {
if !new_view.known_state.table.contains_key(uuid) {
for (i, (_uid, uuid)) in old_snapshot.idx_by_uid.iter().enumerate() {
if !new_snapshot.table.contains_key(uuid) {
data.push(Body::Data(Data::Expunge(
NonZeroU32::try_from((i + 1 - n_expunge) as u32).unwrap(),
)));
@ -95,21 +87,21 @@ impl MailboxView {
}
// - if new mails arrived, notify client of number of existing mails
if new_view.known_state.table.len() != old_view.known_state.table.len() - n_expunge
|| new_view.known_state.uidvalidity != old_view.known_state.uidvalidity
if new_snapshot.table.len() != old_snapshot.table.len() - n_expunge
|| new_snapshot.uidvalidity != old_snapshot.uidvalidity
{
data.push(new_view.exists_status()?);
data.push(self.exists_status()?);
}
if new_view.known_state.uidvalidity != old_view.known_state.uidvalidity {
if new_snapshot.uidvalidity != old_snapshot.uidvalidity {
// TODO: do we want to push less/more info than this?
data.push(new_view.uidvalidity_status()?);
data.push(new_view.uidnext_status()?);
data.push(self.uidvalidity_status()?);
data.push(self.uidnext_status()?);
} else {
// - if flags changed for existing mails, tell client
for (i, (_uid, uuid)) in new_view.known_state.idx_by_uid.iter().enumerate() {
let old_mail = old_view.known_state.table.get(uuid);
let new_mail = new_view.known_state.table.get(uuid);
for (i, (_uid, uuid)) in new_snapshot.idx_by_uid.iter().enumerate() {
let old_mail = old_snapshot.table.get(uuid);
let new_mail = new_snapshot.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, flags)) = new_mail {
data.push(Body::Data(Data::Fetch {
@ -126,7 +118,6 @@ impl MailboxView {
}
}
}
*old_view = new_view;
Ok(data)
}
@ -152,7 +143,7 @@ impl MailboxView {
flags: &[Flag<'a>],
is_uid_store: &bool,
) -> Result<Vec<Body<'static>>> {
self.mailbox.opportunistic_sync().await?;
self.0.sync().await?;
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
@ -160,13 +151,13 @@ impl MailboxView {
for mi in mails.iter() {
match kind {
StoreType::Add => {
self.mailbox.add_flags(mi.uuid, &flags[..]).await?;
self.0.mailbox.add_flags(mi.uuid, &flags[..]).await?;
}
StoreType::Remove => {
self.mailbox.del_flags(mi.uuid, &flags[..]).await?;
self.0.mailbox.del_flags(mi.uuid, &flags[..]).await?;
}
StoreType::Replace => {
self.mailbox.set_flags(mi.uuid, &flags[..]).await?;
self.0.mailbox.set_flags(mi.uuid, &flags[..]).await?;
}
}
}
@ -176,10 +167,10 @@ impl MailboxView {
}
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
self.mailbox.opportunistic_sync().await?;
self.0.sync().await?;
let state = self.0.peek().await;
let deleted_flag = Flag::Deleted.to_string();
let state = self.mailbox.current_uid_index().await;
let msgs = state
.table
.iter()
@ -187,7 +178,7 @@ impl MailboxView {
.map(|(uuid, _)| *uuid);
for msg in msgs {
self.mailbox.delete(msg).await?;
self.0.mailbox.delete(msg).await?;
}
self.update().await
@ -203,7 +194,7 @@ impl MailboxView {
let mut new_uuids = vec![];
for mi in mails.iter() {
new_uuids.push(to.copy_from(&self.mailbox, mi.uuid).await?);
new_uuids.push(to.copy_from(&self.0.mailbox, mi.uuid).await?);
}
let mut ret = vec![];
@ -229,7 +220,7 @@ impl MailboxView {
let mails = self.get_mail_ids(sequence_set, *is_uid_copy)?;
for mi in mails.iter() {
to.move_from(&self.mailbox, mi.uuid).await?;
to.move_from(&self.0.mailbox, mi.uuid).await?;
}
let mut ret = vec![];
@ -256,82 +247,49 @@ impl MailboxView {
attributes: &'b MacroOrMessageDataItemNames<'static>,
is_uid_fetch: &bool,
) -> Result<Vec<Body<'static>>> {
// [1/6] Pre-compute data
// a. what are the uuids of the emails we want?
// b. do we need to fetch the full body?
let ap = AttributesProxy::new(attributes, *is_uid_fetch);
// Prepare data
let query_scope = match ap.need_body() {
true => QueryScope::Full,
_ => QueryScope::Partial,
};
let mids = MailIdentifiersList(self.get_mail_ids(sequence_set, *is_uid_fetch)?);
let mail_count = mids.0.len();
let uuids = mids.uuids();
let meta = self.mailbox.fetch_meta(&uuids).await?;
let flags = uuids
.iter()
.map(|uuid| {
self.known_state
.table
.get(uuid)
.map(|(_uuid, f)| f)
.ok_or(anyhow!("missing email from the flag table"))
})
// [2/6] Fetch the emails
let query = self.0.query(&uuids, query_scope);
let query_result = query.fetch().await?;
// [3/6] Derive an IMAP-specific view from the results, apply the filters
let views = query_result.iter()
.map(MailView::new)
.collect::<Result<Vec<_>, _>>()?;
// Start filling data to build the view
let mut selection = MailSelectionBuilder::new(ap.need_body(), mail_count);
selection
.with_mail_identifiers(&mids.0)
.with_metadata(&meta)
.with_flags(&flags);
// Asynchronously fetch full bodies (if needed)
let btc = selection.bodies_to_collect();
let future_bodies = btc
// [4/6] Apply the IMAP transformation to keep only relevant fields
let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views
.iter()
.map(|bi| async move {
let body = self.mailbox.fetch_full(*bi.msg_uuid, bi.msg_key).await?;
Ok::<_, anyhow::Error>(body)
})
.collect::<FuturesOrdered<_>>();
let bodies = future_bodies
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<Vec<_>, _>>()?;
.filter_map(|mv| mv.filter(&ap).ok().map(|(body, seen)| ((mv, seen), body)))
.unzip();
// Add bodies
selection.with_bodies(bodies.as_slice());
// Build mail selection views
let views = selection.build()?;
// Filter views to build the result
// Also identify what must be put as seen
let filtered_view = views
// [5/6] Register seen flags
flag_mgmt
.iter()
.filter_map(|mv| mv.filter(&ap).ok().map(|(body, seen)| (mv, body, seen)))
.collect::<Vec<_>>();
// Register seen flags
let future_flags = filtered_view
.iter()
.filter(|(_mv, _body, seen)| matches!(seen, SeenFlag::MustAdd))
.map(|(mv, _body, _seen)| async move {
.filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd))
.map(|(mv, _seen)| async move {
let seen_flag = Flag::Seen.to_string();
self.mailbox.add_flags(mv.ids.uuid, &[seen_flag]).await?;
self.0.mailbox.add_flags(*mv.query_result.uuid(), &[seen_flag]).await?;
Ok::<_, anyhow::Error>(())
})
.collect::<FuturesOrdered<_>>();
future_flags
.collect::<FuturesOrdered<_>>()
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<_, _>>()?;
let command_body = filtered_view
.into_iter()
.map(|(_mv, body, _seen)| body)
.collect::<Vec<_>>();
Ok(command_body)
// [6/6] Build the final result that will be sent to the client.
Ok(imap_ret)
}
/// A very naive search implementation...
@ -367,7 +325,8 @@ impl MailboxView {
by_uid: bool,
) -> Result<Vec<MailIdentifiers>> {
let mail_vec = self
.known_state
.0
.snapshot
.idx_by_uid
.iter()
.map(|(uid, uuid)| (*uid, *uuid))
@ -439,7 +398,7 @@ impl MailboxView {
}
pub(crate) fn uidvalidity(&self) -> ImapUidvalidity {
self.known_state.uidvalidity
self.0.snapshot.uidvalidity
}
/// Produce an OK [UIDNEXT _] message corresponding to `known_state`
@ -454,7 +413,7 @@ impl MailboxView {
}
pub(crate) fn uidnext(&self) -> ImapUid {
self.known_state.uidnext
self.0.snapshot.uidnext
}
/// Produce an EXISTS message corresponding to the number of mails
@ -464,7 +423,7 @@ impl MailboxView {
}
pub(crate) fn exists(&self) -> Result<u32> {
Ok(u32::try_from(self.known_state.idx_by_uid.len())?)
Ok(u32::try_from(self.0.snapshot.idx_by_uid.len())?)
}
/// Produce a RECENT message corresponding to the number of
@ -475,7 +434,8 @@ impl MailboxView {
pub(crate) fn recent(&self) -> Result<u32> {
let recent = self
.known_state
.0
.snapshot
.idx_by_flag
.get(&"\\Recent".to_string())
.map(|os| os.len())
@ -490,8 +450,8 @@ impl MailboxView {
// 1. Collecting all the possible flags in the mailbox
// 1.a Fetch them from our index
let mut known_flags: Vec<Flag> = self
.known_state
let mut known_flags: Vec<Flag> = self.0
.snapshot
.idx_by_flag
.flags()
.filter_map(|f| match flags::from_str(f) {
@ -530,9 +490,9 @@ impl MailboxView {
}
pub(crate) fn unseen_count(&self) -> usize {
let total = self.known_state.table.len();
let seen = self
.known_state
let total = self.0.snapshot.table.len();
let seen = self.0
.snapshot
.idx_by_flag
.get(&Flag::Seen.to_string())
.map(|x| x.len())

View file

@ -82,7 +82,7 @@ impl Mailbox {
self.mbox.read().await.fetch_full(id, message_key).await
}
async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox {
pub async fn frozen(self: &std::sync::Arc<Self>) -> super::snapshot::FrozenMailbox {
super::snapshot::FrozenMailbox::new(self.clone()).await
}

View file

@ -10,10 +10,27 @@ use futures::stream::{FuturesUnordered, StreamExt};
pub struct Query<'a,'b> {
pub frozen: &'a FrozenMailbox,
pub emails: &'b [UniqueIdent],
pub scope: QueryScope,
}
pub enum QueryScope {
Index,
Partial,
Full,
}
impl<'a,'b> Query<'a,'b> {
pub fn index(&self) -> Result<Vec<IndexResult>> {
pub async fn fetch(&self) -> Result<Vec<QueryResult>> {
match self.scope {
QueryScope::Index => self.index(),
QueryScope::Partial => self.partial().await,
QueryScope::Full => self.full().await,
}
}
// --- functions below are private *for reasons*
fn index(&self) -> Result<Vec<QueryResult>> {
self
.emails
.iter()
@ -23,18 +40,18 @@ impl<'a,'b> Query<'a,'b> {
.snapshot
.table
.get(uuid)
.map(|index| IndexResult { uuid: *uuid, index })
.map(|index| QueryResult::IndexResult { uuid: *uuid, index })
.ok_or(anyhow!("missing email in index"))
})
.collect::<Result<Vec<_>, _>>()
}
pub async fn partial(&self) -> Result<Vec<PartialResult>> {
async fn partial(&self) -> Result<Vec<QueryResult>> {
let meta = self.frozen.mailbox.fetch_meta(self.emails).await?;
let result = meta
.into_iter()
.zip(self.index()?)
.map(|(metadata, index)| PartialResult { uuid: index.uuid, index: index.index, metadata })
.map(|(metadata, index)| index.into_partial(metadata).expect("index to be IndexResult"))
.collect::<Vec<_>>();
Ok(result)
}
@ -43,18 +60,17 @@ impl<'a,'b> Query<'a,'b> {
/// AND GENERATE SO MUCH NETWORK TRAFFIC.
/// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH
/// SOMETHING LIKE AN ITERATOR
pub async fn full(&self) -> Result<Vec<FullResult>> {
async fn full(&self) -> Result<Vec<QueryResult>> {
let meta_list = self.partial().await?;
meta_list
.into_iter()
.map(|meta| async move {
let content = self.frozen.mailbox.fetch_full(meta.uuid, &meta.metadata.message_key).await?;
Ok(FullResult {
uuid: meta.uuid,
index: meta.index,
metadata: meta.metadata,
content,
})
let content = self.frozen.mailbox.fetch_full(
*meta.uuid(),
&meta.metadata().expect("meta to be PartialResult").message_key
).await?;
Ok(meta.into_full(content).expect("meta to be PartialResult"))
})
.collect::<FuturesUnordered<_>>()
.collect::<Vec<_>>()
@ -64,18 +80,66 @@ impl<'a,'b> Query<'a,'b> {
}
}
pub struct IndexResult<'a> {
pub uuid: UniqueIdent,
pub index: &'a IndexEntry,
pub enum QueryResult<'a> {
IndexResult {
uuid: UniqueIdent,
index: &'a IndexEntry,
},
PartialResult {
uuid: UniqueIdent,
index: &'a IndexEntry,
metadata: MailMeta,
},
FullResult {
uuid: UniqueIdent,
index: &'a IndexEntry,
metadata: MailMeta,
content: Vec<u8>,
}
}
pub struct PartialResult<'a> {
pub uuid: UniqueIdent,
pub index: &'a IndexEntry,
pub metadata: MailMeta,
}
pub struct FullResult<'a> {
pub uuid: UniqueIdent,
pub index: &'a IndexEntry,
pub metadata: MailMeta,
pub content: Vec<u8>,
impl<'a> QueryResult<'a> {
pub fn uuid(&self) -> &UniqueIdent {
match self {
Self::IndexResult { uuid, .. } => uuid,
Self::PartialResult { uuid, .. } => uuid,
Self::FullResult { uuid, .. } => uuid,
}
}
pub fn index(&self) -> &IndexEntry {
match self {
Self::IndexResult { index, .. } => index,
Self::PartialResult { index, .. } => index,
Self::FullResult { index, .. } => index,
}
}
pub fn metadata(&self) -> Option<&MailMeta> {
match self {
Self::IndexResult { .. } => None,
Self::PartialResult { metadata, .. } => Some(metadata),
Self::FullResult { metadata, .. } => Some(metadata),
}
}
pub fn content(&self) -> Option<&[u8]> {
match self {
Self::FullResult { content, .. } => Some(content),
_ => None,
}
}
fn into_partial(self, metadata: MailMeta) -> Option<Self> {
match self {
Self::IndexResult { uuid, index } => Some(Self::PartialResult { uuid, index, metadata }),
_ => None,
}
}
fn into_full(self, content: Vec<u8>) -> Option<Self> {
match self {
Self::PartialResult { uuid, index, metadata } => Some(Self::FullResult { uuid, index, metadata, content }),
_ => None,
}
}
}

View file

@ -4,6 +4,8 @@ use anyhow::Result;
use super::mailbox::Mailbox;
use super::uidindex::UidIndex;
use super::unique_ident::UniqueIdent;
use super::query::{Query, QueryScope};
/// A Frozen Mailbox has a snapshot of the current mailbox
/// state that is desynchronized with the real mailbox state.
@ -49,4 +51,12 @@ impl FrozenMailbox {
old_snapshot
}
pub fn query<'a, 'b>(&'a self, uuids: &'b [UniqueIdent], scope: QueryScope) -> Query<'a, 'b> {
Query {
frozen: self,
emails: uuids,
scope,
}
}
}