Compile streams
This commit is contained in:
parent
de5717a020
commit
4d501b6947
4 changed files with 98 additions and 118 deletions
|
@ -4,7 +4,7 @@ use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::{anyhow, Error, Result};
|
use anyhow::{anyhow, Error, Result};
|
||||||
|
|
||||||
use futures::stream::{FuturesOrdered, StreamExt};
|
use futures::stream::{StreamExt, TryStreamExt};
|
||||||
|
|
||||||
use imap_codec::imap_types::core::Charset;
|
use imap_codec::imap_types::core::Charset;
|
||||||
use imap_codec::imap_types::fetch::MessageDataItem;
|
use imap_codec::imap_types::fetch::MessageDataItem;
|
||||||
|
@ -362,46 +362,33 @@ impl MailboxView {
|
||||||
.iter()
|
.iter()
|
||||||
.map(|midx| midx.uuid)
|
.map(|midx| midx.uuid)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
|
|
||||||
|
|
||||||
|
let query = self.internal.query(&uuids, query_scope);
|
||||||
|
//let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
|
||||||
|
|
||||||
|
let query_stream = query
|
||||||
|
.fetch()
|
||||||
|
.zip(futures::stream::iter(mail_idx_list))
|
||||||
// [3/6] Derive an IMAP-specific view from the results, apply the filters
|
// [3/6] Derive an IMAP-specific view from the results, apply the filters
|
||||||
let views = query_result
|
.map(|(maybe_qr, midx)| match maybe_qr {
|
||||||
.iter()
|
Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)),
|
||||||
.zip(mail_idx_list.into_iter())
|
Err(e) => Err(e),
|
||||||
.map(|(qr, midx)| MailView::new(qr, midx))
|
})
|
||||||
.collect::<Result<Vec<_>, _>>()?;
|
// [4/6] Apply the IMAP transformation
|
||||||
|
.then(|maybe_ret| async move {
|
||||||
// [4/6] Apply the IMAP transformation, bubble up any error
|
let ((body, seen), midx) = maybe_ret?;
|
||||||
// We get 2 results:
|
|
||||||
// - The one we send to the client
|
|
||||||
// - The \Seen flags we must set internally
|
|
||||||
let (flag_mgmt, imap_ret): (Vec<_>, Vec<_>) = views
|
|
||||||
.iter()
|
|
||||||
.map(|mv| mv.filter(&ap).map(|(body, seen)| ((mv, seen), body)))
|
|
||||||
.collect::<Result<Vec<_>, _>>()?
|
|
||||||
.into_iter()
|
|
||||||
.unzip();
|
|
||||||
|
|
||||||
// [5/6] Register the \Seen flags
|
// [5/6] Register the \Seen flags
|
||||||
flag_mgmt
|
if matches!(seen, SeenFlag::MustAdd) {
|
||||||
.iter()
|
|
||||||
.filter(|(_mv, seen)| matches!(seen, SeenFlag::MustAdd))
|
|
||||||
.map(|(mv, _seen)| async move {
|
|
||||||
let seen_flag = Flag::Seen.to_string();
|
let seen_flag = Flag::Seen.to_string();
|
||||||
self.internal
|
self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?;
|
||||||
.mailbox
|
}
|
||||||
.add_flags(*mv.query_result.uuid(), &[seen_flag])
|
|
||||||
.await?;
|
Ok::<_, anyhow::Error>(body)
|
||||||
Ok::<_, anyhow::Error>(())
|
});
|
||||||
})
|
|
||||||
.collect::<FuturesOrdered<_>>()
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Result<_, _>>()?;
|
|
||||||
|
|
||||||
// [6/6] Build the final result that will be sent to the client.
|
// [6/6] Build the final result that will be sent to the client.
|
||||||
Ok(imap_ret)
|
query_stream.try_collect().await
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A naive search implementation...
|
/// A naive search implementation...
|
||||||
|
@ -423,39 +410,55 @@ impl MailboxView {
|
||||||
// 3. Filter the selection based on the ID / UID / Flags
|
// 3. Filter the selection based on the ID / UID / Flags
|
||||||
let (kept_idx, to_fetch) = crit.filter_on_idx(&selection);
|
let (kept_idx, to_fetch) = crit.filter_on_idx(&selection);
|
||||||
|
|
||||||
// 4. Fetch additional info about the emails
|
// 4.a Fetch additional info about the emails
|
||||||
let query_scope = crit.query_scope();
|
let query_scope = crit.query_scope();
|
||||||
let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>();
|
let uuids = to_fetch.iter().map(|midx| midx.uuid).collect::<Vec<_>>();
|
||||||
let query_result = self.internal.query(&uuids, query_scope).fetch().await?;
|
let query = self.internal.query(&uuids, query_scope);
|
||||||
|
|
||||||
// 5. If needed, filter the selection based on the body
|
// 4.b We don't want to keep all data in memory, so we do the computing in a stream
|
||||||
let kept_query = crit.filter_on_query(&to_fetch, &query_result)?;
|
let query_stream = query
|
||||||
|
.fetch()
|
||||||
// 6. Format the result according to the client's taste:
|
.zip(futures::stream::iter(&to_fetch))
|
||||||
// either return UID or ID.
|
// 5.a Build a mailview with the body, might fail with an error
|
||||||
let final_selection = kept_idx.iter().chain(kept_query.iter());
|
// 5.b If needed, filter the selection based on the body, but keep the errors
|
||||||
let selection_fmt = match uid {
|
// 6. Drop the query+mailbox, keep only the mail index
|
||||||
true => final_selection.map(|in_idx| in_idx.uid).collect(),
|
// Here we release a lot of memory, this is the most important part ^^
|
||||||
_ => final_selection.map(|in_idx| in_idx.i).collect(),
|
.filter_map(|(maybe_qr, midx)| {
|
||||||
};
|
let r = match maybe_qr {
|
||||||
|
Ok(qr) => match MailView::new(&qr, midx).map(|mv| crit.is_keep_on_query(&mv)) {
|
||||||
// 7. Add the modseq entry if needed
|
Ok(true) => Some(Ok(*midx)),
|
||||||
let is_modseq = crit.is_modseq();
|
Ok(_) => None,
|
||||||
let maybe_modseq = match is_modseq {
|
Err(e) => Some(Err(e)),
|
||||||
true => {
|
|
||||||
let final_selection = kept_idx.iter().chain(kept_query.iter());
|
|
||||||
final_selection
|
|
||||||
.map(|in_idx| in_idx.modseq)
|
|
||||||
.max()
|
|
||||||
.map(|r| NonZeroU64::try_from(r))
|
|
||||||
.transpose()?
|
|
||||||
}
|
}
|
||||||
|
Err(e) => Some(Err(e)),
|
||||||
|
};
|
||||||
|
futures::future::ready(r)
|
||||||
|
});
|
||||||
|
|
||||||
|
|
||||||
|
// 7. Chain both streams (part resolved from index, part resolved from metadata+body)
|
||||||
|
let main_stream = futures::stream::iter(kept_idx)
|
||||||
|
.map(Ok)
|
||||||
|
.chain(query_stream)
|
||||||
|
.map_ok(|idx| match uid {
|
||||||
|
true => (idx.uid, idx.modseq),
|
||||||
|
_ => (idx.i, idx.modseq),
|
||||||
|
});
|
||||||
|
|
||||||
|
// 8. Do the actual computation
|
||||||
|
let internal_result: Vec<_> = main_stream.try_collect().await?;
|
||||||
|
let (selection, modseqs): (Vec<_>, Vec<_>) = internal_result.into_iter().unzip();
|
||||||
|
|
||||||
|
// 9. Aggregate the maximum modseq value
|
||||||
|
let maybe_modseq = match crit.is_modseq() {
|
||||||
|
true => modseqs.into_iter().max(),
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// 10. Return the final result
|
||||||
Ok((
|
Ok((
|
||||||
vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
|
vec![Body::Data(Data::Search(selection, maybe_modseq))],
|
||||||
is_modseq,
|
maybe_modseq.is_some(),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -678,7 +681,7 @@ mod tests {
|
||||||
content: rfc822.to_vec(),
|
content: rfc822.to_vec(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mv = MailView::new(&qr, &mail_in_idx)?;
|
let mv = MailView::new(std::borrow::Cow::Borrowed(&qr), &mail_in_idx)?;
|
||||||
let (res_body, _seen) = mv.filter(&ap)?;
|
let (res_body, _seen) = mv.filter(&ap)?;
|
||||||
|
|
||||||
let fattr = match res_body {
|
let fattr = match res_body {
|
||||||
|
|
|
@ -1,13 +1,12 @@
|
||||||
use std::num::{NonZeroU32, NonZeroU64};
|
use std::num::{NonZeroU32, NonZeroU64};
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use imap_codec::imap_types::core::NonEmptyVec;
|
use imap_codec::imap_types::core::NonEmptyVec;
|
||||||
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
|
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
|
||||||
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
|
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
|
||||||
|
|
||||||
use crate::imap::index::MailIndex;
|
use crate::imap::index::MailIndex;
|
||||||
use crate::imap::mail_view::MailView;
|
use crate::imap::mail_view::MailView;
|
||||||
use crate::mail::query::{QueryResult, QueryScope};
|
use crate::mail::query::QueryScope;
|
||||||
|
|
||||||
pub enum SeqType {
|
pub enum SeqType {
|
||||||
Undefined,
|
Undefined,
|
||||||
|
@ -145,22 +144,6 @@ impl<'a> Criteria<'a> {
|
||||||
(to_keep, to_fetch)
|
(to_keep, to_fetch)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn filter_on_query<'b>(
|
|
||||||
&self,
|
|
||||||
midx_list: &[&'b MailIndex<'b>],
|
|
||||||
query_result: &'b Vec<QueryResult>,
|
|
||||||
) -> Result<Vec<&'b MailIndex<'b>>> {
|
|
||||||
Ok(midx_list
|
|
||||||
.iter()
|
|
||||||
.zip(query_result.iter())
|
|
||||||
.map(|(midx, qr)| MailView::new(qr, midx))
|
|
||||||
.collect::<Result<Vec<_>, _>>()?
|
|
||||||
.into_iter()
|
|
||||||
.filter(|mail_view| self.is_keep_on_query(mail_view))
|
|
||||||
.map(|mail_view| mail_view.in_idx)
|
|
||||||
.collect())
|
|
||||||
}
|
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
/// Here we are doing a partial filtering: we do not have access
|
/// Here we are doing a partial filtering: we do not have access
|
||||||
|
@ -213,7 +196,7 @@ impl<'a> Criteria<'a> {
|
||||||
/// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true,
|
/// the email, as body(x) might be false. So we need to check it. But as seqid(x) is true,
|
||||||
/// we could simplify the request to just body(x) and truncate the first OR. Today, we are
|
/// we could simplify the request to just body(x) and truncate the first OR. Today, we are
|
||||||
/// not doing that, and thus we reevaluate everything.
|
/// not doing that, and thus we reevaluate everything.
|
||||||
fn is_keep_on_query(&self, mail_view: &MailView) -> bool {
|
pub fn is_keep_on_query(&self, mail_view: &MailView) -> bool {
|
||||||
use SearchKey::*;
|
use SearchKey::*;
|
||||||
match self.0 {
|
match self.0 {
|
||||||
// Combinator logic
|
// Combinator logic
|
||||||
|
|
|
@ -498,7 +498,7 @@ fn dump(uid_index: &Bayou<UidIndex>) {
|
||||||
|
|
||||||
/// The metadata of a message that is stored in K2V
|
/// The metadata of a message that is stored in K2V
|
||||||
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
|
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||||
pub struct MailMeta {
|
pub struct MailMeta {
|
||||||
/// INTERNALDATE field (milliseconds since epoch)
|
/// INTERNALDATE field (milliseconds since epoch)
|
||||||
pub internaldate: u64,
|
pub internaldate: u64,
|
||||||
|
|
|
@ -2,7 +2,8 @@ use super::mailbox::MailMeta;
|
||||||
use super::snapshot::FrozenMailbox;
|
use super::snapshot::FrozenMailbox;
|
||||||
use super::unique_ident::UniqueIdent;
|
use super::unique_ident::UniqueIdent;
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use futures::stream::{FuturesOrdered, StreamExt};
|
use futures::stream::{Stream, StreamExt, BoxStream};
|
||||||
|
use futures::future::FutureExt;
|
||||||
|
|
||||||
/// Query is in charge of fetching efficiently
|
/// Query is in charge of fetching efficiently
|
||||||
/// requested data for a list of emails
|
/// requested data for a list of emails
|
||||||
|
@ -28,41 +29,39 @@ impl QueryScope {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//type QueryResultStream = Box<dyn Stream<Item = Result<QueryResult>>>;
|
||||||
|
|
||||||
impl<'a, 'b> Query<'a, 'b> {
|
impl<'a, 'b> Query<'a, 'b> {
|
||||||
pub async fn fetch(&self) -> Result<Vec<QueryResult>> {
|
pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
|
||||||
match self.scope {
|
match self.scope {
|
||||||
QueryScope::Index => Ok(self
|
QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))),
|
||||||
.emails
|
QueryScope::Partial => Box::pin(self.partial()),
|
||||||
.iter()
|
QueryScope::Full => Box::pin(self.full()),
|
||||||
.map(|&uuid| QueryResult::IndexResult { uuid })
|
|
||||||
.collect()),
|
|
||||||
QueryScope::Partial => self.partial().await,
|
|
||||||
QueryScope::Full => self.full().await,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// --- functions below are private *for reasons*
|
// --- functions below are private *for reasons*
|
||||||
|
fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
|
||||||
async fn partial(&self) -> Result<Vec<QueryResult>> {
|
async move {
|
||||||
let meta = self.frozen.mailbox.fetch_meta(self.emails).await?;
|
let maybe_meta_list: Result<Vec<MailMeta>> = self.frozen.mailbox.fetch_meta(self.emails).await;
|
||||||
let result = meta
|
let list_res = maybe_meta_list
|
||||||
|
.map(|meta_list| meta_list
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.zip(self.emails.iter())
|
.zip(self.emails)
|
||||||
.map(|(metadata, &uuid)| QueryResult::PartialResult { uuid, metadata })
|
.map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
|
||||||
.collect::<Vec<_>>();
|
.collect()
|
||||||
|
)
|
||||||
|
.unwrap_or_else(|e| vec![Err(e)]);
|
||||||
|
|
||||||
Ok(result)
|
futures::stream::iter(list_res)
|
||||||
|
}.flatten_stream()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// @FIXME WARNING: THIS CAN ALLOCATE A LOT OF MEMORY
|
fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
|
||||||
/// AND GENERATE SO MUCH NETWORK TRAFFIC.
|
self.partial()
|
||||||
/// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH
|
.then(move |maybe_meta| async move {
|
||||||
/// SOMETHING LIKE AN ITERATOR
|
let meta = maybe_meta?;
|
||||||
async fn full(&self) -> Result<Vec<QueryResult>> {
|
|
||||||
let meta_list = self.partial().await?;
|
|
||||||
meta_list
|
|
||||||
.into_iter()
|
|
||||||
.map(|meta| async move {
|
|
||||||
let content = self
|
let content = self
|
||||||
.frozen
|
.frozen
|
||||||
.mailbox
|
.mailbox
|
||||||
|
@ -77,15 +76,10 @@ impl<'a, 'b> Query<'a, 'b> {
|
||||||
|
|
||||||
Ok(meta.into_full(content).expect("meta to be PartialResult"))
|
Ok(meta.into_full(content).expect("meta to be PartialResult"))
|
||||||
})
|
})
|
||||||
.collect::<FuturesOrdered<_>>()
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.await
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Result<Vec<_>, _>>()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug, Clone)]
|
||||||
pub enum QueryResult {
|
pub enum QueryResult {
|
||||||
IndexResult {
|
IndexResult {
|
||||||
uuid: UniqueIdent,
|
uuid: UniqueIdent,
|
||||||
|
|
Loading…
Reference in a new issue