From 4d501b6947497d7705c64dcaf093e5e5a09a9235 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 22 Feb 2024 11:35:39 +0100 Subject: [PATCH] Compile streams --- src/imap/mailbox_view.rs | 125 ++++++++++++++++++++------------------- src/imap/search.rs | 21 +------ src/mail/mailbox.rs | 2 +- src/mail/query.rs | 68 ++++++++++----------- 4 files changed, 98 insertions(+), 118 deletions(-) diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index d57e9a3..5055c40 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use anyhow::{anyhow, Error, Result}; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{StreamExt, TryStreamExt}; use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::fetch::MessageDataItem; @@ -362,46 +362,33 @@ impl MailboxView { .iter() .map(|midx| midx.uuid) .collect::>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [3/6] Derive an IMAP-specific view from the results, apply the filters - let views = query_result - .iter() - .zip(mail_idx_list.into_iter()) - .map(|(qr, midx)| MailView::new(qr, midx)) - .collect::, _>>()?; + let query = self.internal.query(&uuids, query_scope); + //let query_result = self.internal.query(&uuids, query_scope).fetch().await?; - // [4/6] Apply the IMAP transformation, bubble up any error - // We get 2 results: - // - The one we send to the client - // - The \Seen flags we must set internally - let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views - .iter() - .map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body))) - .collect::, _>>()? - .into_iter() - .unzip(); - - // [5/6] Register the \Seen flags - flag_mgmt - .iter() - .filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd)) - .map(|(mv, _seen)| async move { - let seen_flag = Flag::Seen.to_string(); - self.internal - .mailbox - .add_flags(*mv.query_result.uuid(), &[seen_flag]) - .await?; - Ok::<_, anyhow::Error>(()) + let query_stream = query + .fetch() + .zip(futures::stream::iter(mail_idx_list)) + // [3/6] Derive an IMAP-specific view from the results, apply the filters + .map(|(maybe_qr, midx)| match maybe_qr { + Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)), + Err(e) => Err(e), }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::>()?; + // [4/6] Apply the IMAP transformation + .then(|maybe_ret| async move { + let ((body, seen), midx) = maybe_ret?; + + // [5/6] Register the \Seen flags + if matches!(seen, SeenFlag::MustAdd) { + let seen_flag = Flag::Seen.to_string(); + self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?; + } + + Ok::<_, anyhow::Error>(body) + }); // [6/6] Build the final result that will be sent to the client. - Ok(imap_ret) + query_stream.try_collect().await } /// A naive search implementation... @@ -423,39 +410,55 @@ impl MailboxView { // 3. Filter the selection based on the ID / UID / Flags let (kept_idx, to_fetch) = crit.filter_on_idx(&selection); - // 4. Fetch additional info about the emails + // 4.a Fetch additional info about the emails let query_scope = crit.query_scope(); let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::>(); - let query_result = self.internal.query(&uuids, query_scope).fetch().await?; + let query = self.internal.query(&uuids, query_scope); - // 5. If needed, filter the selection based on the body - let kept_query = crit.filter_on_query(&to_fetch, &query_result)?; + // 4.b We don't want to keep all data in memory, so we do the computing in a stream + let query_stream = query + .fetch() + .zip(futures::stream::iter(&to_fetch)) + // 5.a Build a mailview with the body, might fail with an error + // 5.b If needed, filter the selection based on the body, but keep the errors + // 6. Drop the query+mailbox, keep only the mail index + // Here we release a lot of memory, this is the most important part ^^ + .filter_map(|(maybe_qr, midx)| { + let r = match maybe_qr { + Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) { + Ok(true) => Some(Ok(*midx)), + Ok(_) => None, + Err(e) => Some(Err(e)), + } + Err(e) => Some(Err(e)), + }; + futures::future::ready(r) + }); - // 6. Format the result according to the client's taste: - // either return UID or ID. - let final_selection = kept_idx.iter().chain(kept_query.iter()); - let selection_fmt = match uid { - true => final_selection.map(|in_idx| in_idx.uid).collect(), - _ => final_selection.map(|in_idx| in_idx.i).collect(), - }; - // 7. Add the modseq entry if needed - let is_modseq = crit.is_modseq(); - let maybe_modseq = match is_modseq { - true => { - let final_selection = kept_idx.iter().chain(kept_query.iter()); - final_selection - .map(|in_idx| in_idx.modseq) - .max() - .map(|r| NonZeroU64::try_from(r)) - .transpose()? - } + // 7. Chain both streams (part resolved from index, part resolved from metadata+body) + let main_stream = futures::stream::iter(kept_idx) + .map(Ok) + .chain(query_stream) + .map_ok(|idx| match uid { + true => (idx.uid, idx.modseq), + _ => (idx.i, idx.modseq), + }); + + // 8. Do the actual computation + let internal_result: Vec<_> = main_stream.try_collect().await?; + let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip(); + + // 9. Aggregate the maximum modseq value + let maybe_modseq = match crit.is_modseq() { + true => modseqs.into_iter().max(), _ => None, }; + // 10. Return the final result Ok(( - vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], - is_modseq, + vec![Body::Data(Data::Search(selection, maybe_modseq))], + maybe_modseq.is_some(), )) } @@ -678,7 +681,7 @@ mod tests { content: rfc822.to_vec(), }; - let mv = MailView::new(&qr, &mail_in_idx)?; + let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?; let (res_body, _seen) = mv.filter(&ap)?; let fattr = match res_body { diff --git a/src/imap/search.rs b/src/imap/search.rs index d06c3bd..4ff70ee 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -1,13 +1,12 @@ use std::num::{NonZeroU32, NonZeroU64}; -use anyhow::Result; use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use crate::imap::index::MailIndex; use crate::imap::mail_view::MailView; -use crate::mail::query::{QueryResult, QueryScope}; +use crate::mail::query::QueryScope; pub enum SeqType { Undefined, @@ -145,22 +144,6 @@ impl<'a> Criteria<'a> { (to_keep, to_fetch) } - pub fn filter_on_query<'b>( - &self, - midx_list: &[&'b MailIndex<'b>], - query_result: &'b Vec, - ) -> Result>> { - Ok(midx_list - .iter() - .zip(query_result.iter()) - .map(|(midx, qr)| MailView::new(qr, midx)) - .collect::, _>>()? - .into_iter() - .filter(|mail_view| self.is_keep_on_query(mail_view)) - .map(|mail_view| mail_view.in_idx) - .collect()) - } - // ---- /// Here we are doing a partial filtering: we do not have access @@ -213,7 +196,7 @@ impl<'a> Criteria<'a> { /// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true, /// we could simplify the request to just body(x) and truncate the first OR. Today, we are /// not doing that, and thus we reevaluate everything. - fn is_keep_on_query(&self, mail_view: &MailView) -> bool { + pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool { use SearchKey::*; match self.0 { // Combinator logic diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index c20d815..9190883 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou) { /// The metadata of a message that is stored in K2V /// at pk = mail/, sk = -#[derive(Debug, Serialize, Deserialize)] +#[derive(Debug, Clone, Serialize, Deserialize)] pub struct MailMeta { /// INTERNALDATE field (milliseconds since epoch) pub internaldate: u64, diff --git a/src/mail/query.rs b/src/mail/query.rs index a183c5a..cbc5fe6 100644 --- a/src/mail/query.rs +++ b/src/mail/query.rs @@ -2,7 +2,8 @@ use super::mailbox::MailMeta; use super::snapshot::FrozenMailbox; use super::unique_ident::UniqueIdent; use anyhow::Result; -use futures::stream::{FuturesOrdered, StreamExt}; +use futures::stream::{Stream, StreamExt, BoxStream}; +use futures::future::FutureExt; /// Query is in charge of fetching efficiently /// requested data for a list of emails @@ -28,64 +29,57 @@ impl QueryScope { } } +//type QueryResultStream = Box>>; + impl<'a, 'b> Query<'a, 'b> { - pub async fn fetch(&self) -> Result> { + pub fn fetch(&self) -> BoxStream> { match self.scope { - QueryScope::Index => Ok(self - .emails - .iter() - .map(|&uuid| QueryResult::IndexResult { uuid }) - .collect()), - QueryScope::Partial => self.partial().await, - QueryScope::Full => self.full().await, + QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))), + QueryScope::Partial => Box::pin(self.partial()), + QueryScope::Full => Box::pin(self.full()), } } // --- functions below are private *for reasons* + fn partial<'d>(&'d self) -> impl Stream> + 'd + Send { + async move { + let maybe_meta_list: Result> = self.frozen.mailbox.fetch_meta(self.emails).await; + let list_res = maybe_meta_list + .map(|meta_list| meta_list + .into_iter() + .zip(self.emails) + .map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata })) + .collect() + ) + .unwrap_or_else(|e| vec![Err(e)]); - async fn partial(&self) -> Result> { - let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; - let result = meta - .into_iter() - .zip(self.emails.iter()) - .map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata }) - .collect::>(); - - Ok(result) + futures::stream::iter(list_res) + }.flatten_stream() } - /// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY - /// AND GENERATE SO MUCH NETWORK TRAFFIC. - /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH - /// SOMETHING LIKE AN ITERATOR - async fn full(&self) -> Result> { - let meta_list = self.partial().await?; - meta_list - .into_iter() - .map(|meta| async move { + fn full<'d>(&'d self) -> impl Stream> + 'd + Send { + self.partial() + .then(move |maybe_meta| async move { + let meta = maybe_meta?; + let content = self .frozen .mailbox .fetch_full( *meta.uuid(), &meta - .metadata() - .expect("meta to be PartialResult") - .message_key, - ) + .metadata() + .expect("meta to be PartialResult") + .message_key, + ) .await?; Ok(meta.into_full(content).expect("meta to be PartialResult")) }) - .collect::>() - .collect::>() - .await - .into_iter() - .collect::, _>>() } } -#[derive(Debug)] +#[derive(Debug, Clone)] pub enum QueryResult { IndexResult { uuid: UniqueIdent,