Perf measurement & bottleneck fix #102

Merged
quentin merged 16 commits from perf/cpu-ram-bottleneck into main 2024-02-23 17:32:39 +00:00
7 changed files with 54 additions and 41 deletions
Showing only changes of commit 9b26e251e3 - Show all commits

View file

@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::{
Command, CommandBody, ListReturnItem, SelectExamineModifier,
};
use imap_codec::imap_types::core::{Atom, Literal, Vec1, QuotedChar};
use imap_codec::imap_types::core::{Atom, Literal, QuotedChar, Vec1};
use imap_codec::imap_types::datetime::DateTime;
use imap_codec::imap_types::extensions::enable::CapabilityEnable;
use imap_codec::imap_types::flag::{Flag, FlagNameAttribute};
@ -14,12 +14,12 @@ use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
use imap_codec::imap_types::response::{Code, CodeOther, Data};
use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName};
use crate::imap::Body;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, MailboxName};
use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response;
use crate::imap::Body;
use crate::mail::uidindex::*;
use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW};
@ -560,8 +560,6 @@ impl<'a> AuthenticatedContext<'a> {
) -> Result<(Response<'static>, flow::Transition)> {
let append_tag = self.req.tag.clone();
match self.append_internal(mailbox, flags, date, message).await {
Ok((_mb_view, uidvalidity, uid, _modseq)) => Ok((
Response::build()
.tag(append_tag)
@ -623,8 +621,9 @@ impl<'a> AuthenticatedContext<'a> {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
// TODO: filter allowed flags? ping @Quentin
let (uidvalidity, uid, modseq) = view.internal.mailbox.append(msg, None, &flags[..]).await?;
//let unsollicited = view.update(UpdateParameters::default()).await?;
let (uidvalidity, uid, modseq) =
view.internal.mailbox.append(msg, None, &flags[..]).await?;
//let unsollicited = view.update(UpdateParameters::default()).await?;
Ok((view, uidvalidity, uid, modseq))
}

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
use imap_codec::imap_types::core::{Vec1, Charset};
use imap_codec::imap_types::core::{Charset, Vec1};
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
@ -59,7 +59,10 @@ pub async fn dispatch<'a>(
charset,
criteria,
uid,
} => ctx.search(charset, &SearchKey::And(criteria.clone()), uid).await,
} => {
ctx.search(charset, &SearchKey::And(criteria.clone()), uid)
.await
}
CommandBody::Expunge {
// UIDPLUS (rfc4315)
uid_sequence_set,

View file

@ -2,8 +2,8 @@ use std::error::Error as StdError;
use std::fmt;
use std::sync::Arc;
use tokio::sync::Notify;
use imap_codec::imap_types::core::Tag;
use tokio::sync::Notify;
use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User;

View file

@ -370,7 +370,7 @@ impl MailboxView {
.fetch()
.zip(futures::stream::iter(mail_idx_list))
// [3/6] Derive an IMAP-specific view from the results, apply the filters
.map(|(maybe_qr, midx)| match maybe_qr {
.map(|(maybe_qr, midx)| match maybe_qr {
Ok(qr) => Ok((MailView::new(&qr, midx)?.filter(&ap)?, midx)),
Err(e) => Err(e),
})
@ -381,7 +381,10 @@ impl MailboxView {
// [5/6] Register the \Seen flags
if matches!(seen, SeenFlag::MustAdd) {
let seen_flag = Flag::Seen.to_string();
self.internal.mailbox.add_flags(midx.uuid, &[seen_flag]).await?;
self.internal
.mailbox
.add_flags(midx.uuid, &[seen_flag])
.await?;
}
Ok::<_, anyhow::Error>(body)
@ -429,13 +432,12 @@ impl MailboxView {
Ok(true) => Some(Ok(*midx)),
Ok(_) => None,
Err(e) => Some(Err(e)),
}
},
Err(e) => Some(Err(e)),
};
futures::future::ready(r)
});
// 7. Chain both streams (part resolved from index, part resolved from metadata+body)
let main_stream = futures::stream::iter(kept_idx)
.map(Ok)

View file

@ -15,7 +15,7 @@ mod session;
use std::net::SocketAddr;
use anyhow::{anyhow, bail, Result, Context};
use anyhow::{anyhow, bail, Context, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;

View file

@ -4,8 +4,8 @@ use crate::imap::flow;
use crate::imap::request::Request;
use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider;
use anyhow::{anyhow, bail, Result, Context};
use imap_codec::imap_types::{core::Tag, command::Command};
use anyhow::{anyhow, bail, Context, Result};
use imap_codec::imap_types::{command::Command, core::Tag};
//-----
pub struct Instance {
@ -43,7 +43,11 @@ impl Instance {
.state
.apply(transition)
.context("IDLE transition failed")
.and_then(|_| self.state.notify().ok_or(anyhow!("IDLE state has no Notify object")));
.and_then(|_| {
self.state
.notify()
.ok_or(anyhow!("IDLE state has no Notify object"))
});
// Build an appropriate response
match maybe_stop {

View file

@ -2,8 +2,8 @@ use super::mailbox::MailMeta;
use super::snapshot::FrozenMailbox;
use super::unique_ident::UniqueIdent;
use anyhow::Result;
use futures::stream::{Stream, StreamExt, BoxStream};
use futures::future::FutureExt;
use futures::stream::{BoxStream, Stream, StreamExt};
/// Query is in charge of fetching efficiently
/// requested data for a list of emails
@ -34,7 +34,10 @@ impl QueryScope {
impl<'a, 'b> Query<'a, 'b> {
pub fn fetch(&self) -> BoxStream<Result<QueryResult>> {
match self.scope {
QueryScope::Index => Box::pin(futures::stream::iter(self.emails).map(|&uuid| Ok(QueryResult::IndexResult { uuid }))),
QueryScope::Index => Box::pin(
futures::stream::iter(self.emails)
.map(|&uuid| Ok(QueryResult::IndexResult { uuid })),
),
QueryScope::Partial => Box::pin(self.partial()),
QueryScope::Full => Box::pin(self.full()),
}
@ -42,40 +45,42 @@ impl<'a, 'b> Query<'a, 'b> {
// --- functions below are private *for reasons*
fn partial<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
async move {
let maybe_meta_list: Result<Vec<MailMeta>> = self.frozen.mailbox.fetch_meta(self.emails).await;
async move {
let maybe_meta_list: Result<Vec<MailMeta>> =
self.frozen.mailbox.fetch_meta(self.emails).await;
let list_res = maybe_meta_list
.map(|meta_list| meta_list
.into_iter()
.zip(self.emails)
.map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
.collect()
)
.map(|meta_list| {
meta_list
.into_iter()
.zip(self.emails)
.map(|(metadata, &uuid)| Ok(QueryResult::PartialResult { uuid, metadata }))
.collect()
})
.unwrap_or_else(|e| vec![Err(e)]);
futures::stream::iter(list_res)
}.flatten_stream()
}
.flatten_stream()
}
fn full<'d>(&'d self) -> impl Stream<Item = Result<QueryResult>> + 'd + Send {
self.partial()
.then(move |maybe_meta| async move {
let meta = maybe_meta?;
self.partial().then(move |maybe_meta| async move {
let meta = maybe_meta?;
let content = self
.frozen
.mailbox
.fetch_full(
*meta.uuid(),
&meta
let content = self
.frozen
.mailbox
.fetch_full(
*meta.uuid(),
&meta
.metadata()
.expect("meta to be PartialResult")
.message_key,
)
.await?;
)
.await?;
Ok(meta.into_full(content).expect("meta to be PartialResult"))
})
Ok(meta.into_full(content).expect("meta to be PartialResult"))
})
}
}