Perf measurement & bottleneck fix #102

Merged
quentin merged 16 commits from perf/cpu-ram-bottleneck into main 2024-02-23 17:32:39 +00:00
13 changed files with 181 additions and 126 deletions
Showing only changes of commit 2adf73dd8e - Show all commits

6
Cargo.lock generated
View file

@ -1965,7 +1965,7 @@ dependencies = [
[[package]] [[package]]
name = "imap-codec" name = "imap-codec"
version = "2.0.0" version = "2.0.0"
source = "git+https://github.com/superboum/imap-codec?branch=custom/aerogramme#0adcc244282c64cc7874ffa9cd22e4a451ee19f8" source = "git+https://github.com/superboum/imap-codec?branch=custom/aerogramme#d8a5afc03fb771232e94c73af6a05e79dc80bbed"
dependencies = [ dependencies = [
"abnf-core", "abnf-core",
"base64 0.21.7", "base64 0.21.7",
@ -1980,7 +1980,7 @@ dependencies = [
[[package]] [[package]]
name = "imap-flow" name = "imap-flow"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#2bea066da1e09ad04bb5fb71b0dd8d6e5d9e3d19" source = "git+https://github.com/duesee/imap-flow.git?branch=main#68c1da5d1c56dbe543d9736de9683259d1d28191"
dependencies = [ dependencies = [
"bounded-static", "bounded-static",
"bytes", "bytes",
@ -1994,7 +1994,7 @@ dependencies = [
[[package]] [[package]]
name = "imap-types" name = "imap-types"
version = "2.0.0" version = "2.0.0"
source = "git+https://github.com/superboum/imap-codec?branch=custom/aerogramme#0adcc244282c64cc7874ffa9cd22e4a451ee19f8" source = "git+https://github.com/superboum/imap-codec?branch=custom/aerogramme#d8a5afc03fb771232e94c73af6a05e79dc80bbed"
dependencies = [ dependencies = [
"base64 0.21.7", "base64 0.21.7",
"bounded-static", "bounded-static",

View file

@ -64,7 +64,7 @@ eml-codec = "0.1.2"
smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" } smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
imap-codec = { version = "2.0.0", features = ["bounded-static", "ext_condstore_qresync"] } imap-codec = { version = "2.0.0", features = ["bounded-static", "ext_condstore_qresync"] }
imap-flow = { git = "https://github.com/superboum/imap-flow.git", branch = "custom/aerogramme" } imap-flow = { git = "https://github.com/duesee/imap-flow.git", branch = "main" }
thiserror = "1.0.56" thiserror = "1.0.56"
[dev-dependencies] [dev-dependencies]

View file

@ -1,5 +1,5 @@
use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier}; use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier};
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
use imap_codec::imap_types::response::Capability; use imap_codec::imap_types::response::Capability;
use std::collections::HashSet; use std::collections::HashSet;
@ -49,7 +49,7 @@ impl Default for ServerCapability {
} }
impl ServerCapability { impl ServerCapability {
pub fn to_vec(&self) -> NonEmptyVec<Capability<'static>> { pub fn to_vec(&self) -> Vec1<Capability<'static>> {
self.0 self.0
.iter() .iter()
.map(|v| v.clone()) .map(|v| v.clone())

View file

@ -6,7 +6,7 @@ use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::{ use imap_codec::imap_types::command::{
Command, CommandBody, ListReturnItem, SelectExamineModifier, Command, CommandBody, ListReturnItem, SelectExamineModifier,
}; };
use imap_codec::imap_types::core::{Atom, Literal, NonEmptyVec, QuotedChar}; use imap_codec::imap_types::core::{Atom, Literal, Vec1, QuotedChar};
use imap_codec::imap_types::datetime::DateTime; use imap_codec::imap_types::datetime::DateTime;
use imap_codec::imap_types::extensions::enable::CapabilityEnable; use imap_codec::imap_types::extensions::enable::CapabilityEnable;
use imap_codec::imap_types::flag::{Flag, FlagNameAttribute}; use imap_codec::imap_types::flag::{Flag, FlagNameAttribute};
@ -584,7 +584,7 @@ impl<'a> AuthenticatedContext<'a> {
fn enable( fn enable(
self, self,
cap_enable: &NonEmptyVec<CapabilityEnable<'static>>, cap_enable: &Vec1<CapabilityEnable<'static>>,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
let mut response_builder = Response::build().to_req(self.req); let mut response_builder = Response::build().to_req(self.req);
let capabilities = self.client_capabilities.try_enable(cap_enable.as_ref()); let capabilities = self.client_capabilities.try_enable(cap_enable.as_ref());

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::core::{Vec1, Charset};
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType}; use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec; use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
@ -54,11 +54,12 @@ pub async fn dispatch<'a>(
ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid) ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid)
.await .await
} }
//@FIXME SearchKey::And is a legacy hack, should be refactored
CommandBody::Search { CommandBody::Search {
charset, charset,
criteria, criteria,
uid, uid,
} => ctx.search(charset, criteria, uid).await, } => ctx.search(charset, &SearchKey::And(criteria.clone()), uid).await,
CommandBody::Expunge { CommandBody::Expunge {
// UIDPLUS (rfc4315) // UIDPLUS (rfc4315)
uid_sequence_set, uid_sequence_set,
@ -88,15 +89,6 @@ pub async fn dispatch<'a>(
// UNSELECT extension (rfc3691) // UNSELECT extension (rfc3691)
CommandBody::Unselect => ctx.unselect().await, CommandBody::Unselect => ctx.unselect().await,
// IDLE extension (rfc2177)
CommandBody::Idle => Ok((
Response::build()
.to_req(ctx.req)
.message("DUMMY command due to anti-pattern in the code")
.ok()?,
flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()),
)),
// In selected mode, we fallback to authenticated when needed // In selected mode, we fallback to authenticated when needed
_ => { _ => {
authenticated::dispatch(authenticated::AuthenticatedContext { authenticated::dispatch(authenticated::AuthenticatedContext {

View file

@ -1,11 +1,12 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Notify; use tokio::sync::Notify;
use imap_codec::imap_types::core::Tag;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User; use crate::mail::user::User;
use imap_codec::imap_types::core::Tag;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -31,6 +32,14 @@ pub enum State {
), ),
Logout, Logout,
} }
impl State {
pub fn notify(&self) -> Option<Arc<Notify>> {
match self {
Self::Idle(_, _, _, _, anotif) => Some(anotif.clone()),
_ => None,
}
}
}
impl fmt::Display for State { impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use State::*; use State::*;

View file

@ -6,7 +6,7 @@ use anyhow::{anyhow, Error, Result};
use futures::stream::{StreamExt, TryStreamExt}; use futures::stream::{StreamExt, TryStreamExt};
use imap_codec::imap_types::core::Charset; use imap_codec::imap_types::core::{Charset, Vec1};
use imap_codec::imap_types::fetch::MessageDataItem; use imap_codec::imap_types::fetch::MessageDataItem;
use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType}; use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType};
use imap_codec::imap_types::response::{Code, CodeOther, Data, Status}; use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
@ -629,7 +629,7 @@ impl MailboxView {
mod tests { mod tests {
use super::*; use super::*;
use imap_codec::encode::Encoder; use imap_codec::encode::Encoder;
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::fetch::Section; use imap_codec::imap_types::fetch::Section;
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName}; use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName};
use imap_codec::imap_types::response::Response; use imap_codec::imap_types::response::Response;
@ -749,7 +749,7 @@ mod tests {
let test_repr = Response::Data(Data::Fetch { let test_repr = Response::Data(Data::Fetch {
seq: NonZeroU32::new(1).unwrap(), seq: NonZeroU32::new(1).unwrap(),
items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure( items: Vec1::from(MessageDataItem::Body(mime_view::bodystructure(
&message.child, &message.child,
false, false,
)?)), )?)),

View file

@ -8,7 +8,7 @@ use imap_codec::imap_types::body::{
BasicFields, Body as FetchBody, BodyStructure, MultiPartExtensionData, SinglePartExtensionData, BasicFields, Body as FetchBody, BodyStructure, MultiPartExtensionData, SinglePartExtensionData,
SpecificFields, SpecificFields,
}; };
use imap_codec::imap_types::core::{AString, IString, NString, NonEmptyVec}; use imap_codec::imap_types::core::{AString, IString, NString, Vec1};
use imap_codec::imap_types::fetch::{Part as FetchPart, Section as FetchSection}; use imap_codec::imap_types::fetch::{Part as FetchPart, Section as FetchSection};
use eml_codec::{ use eml_codec::{
@ -141,8 +141,8 @@ impl<'a> NodeMime<'a> {
enum SubsettedSection<'a> { enum SubsettedSection<'a> {
Part, Part,
Header, Header,
HeaderFields(&'a NonEmptyVec<AString<'a>>), HeaderFields(&'a Vec1<AString<'a>>),
HeaderFieldsNot(&'a NonEmptyVec<AString<'a>>), HeaderFieldsNot(&'a Vec1<AString<'a>>),
Text, Text,
Mime, Mime,
} }
@ -238,7 +238,7 @@ impl<'a> SelectedMime<'a> {
/// case-insensitive but otherwise exact. /// case-insensitive but otherwise exact.
fn header_fields( fn header_fields(
&self, &self,
fields: &'a NonEmptyVec<AString<'a>>, fields: &'a Vec1<AString<'a>>,
invert: bool, invert: bool,
) -> Result<ExtractedFull<'a>> { ) -> Result<ExtractedFull<'a>> {
// Build a lowercase ascii hashset with the fields to fetch // Build a lowercase ascii hashset with the fields to fetch
@ -398,8 +398,8 @@ impl<'a> NodeMult<'a> {
.filter_map(|inner| NodeMime(&inner).structure(is_ext).ok()) .filter_map(|inner| NodeMime(&inner).structure(is_ext).ok())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
NonEmptyVec::validate(&inner_bodies)?; Vec1::validate(&inner_bodies)?;
let bodies = NonEmptyVec::unvalidated(inner_bodies); let bodies = Vec1::unvalidated(inner_bodies);
Ok(BodyStructure::Multi { Ok(BodyStructure::Multi {
bodies, bodies,

View file

@ -15,7 +15,7 @@ mod session;
use std::net::SocketAddr; use std::net::SocketAddr;
use anyhow::{bail, Result}; use anyhow::{anyhow, bail, Result, Context};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener; use tokio::net::TcpListener;
@ -144,13 +144,6 @@ use tokio_util::bytes::BytesMut;
const PIPELINABLE_COMMANDS: usize = 64; const PIPELINABLE_COMMANDS: usize = 64;
#[derive(Debug)]
enum LoopMode {
Quit,
Interactive,
Idle(BytesMut, Arc<Notify>),
}
// @FIXME a full refactor of this part of the code will be needed sooner or later // @FIXME a full refactor of this part of the code will be needed sooner or later
struct NetLoop { struct NetLoop {
ctx: ClientContext, ctx: ClientContext,
@ -163,7 +156,7 @@ impl NetLoop {
async fn handler(ctx: ClientContext, sock: AnyStream) { async fn handler(ctx: ClientContext, sock: AnyStream) {
let addr = ctx.addr.clone(); let addr = ctx.addr.clone();
let nl = match Self::new(ctx, sock).await { let mut nl = match Self::new(ctx, sock).await {
Ok(nl) => { Ok(nl) => {
tracing::debug!(addr=?addr, "netloop successfully initialized"); tracing::debug!(addr=?addr, "netloop successfully initialized");
nl nl
@ -241,85 +234,111 @@ impl NetLoop {
tracing::info!("runner is quitting"); tracing::info!("runner is quitting");
} }
async fn core(mut self) -> Result<()> { async fn core(&mut self) -> Result<()> {
tracing::trace!("Starting the core loop"); let mut maybe_idle: Option<Arc<Notify>> = None;
let mut mode = LoopMode::Interactive;
loop { loop {
tracing::trace!(mode=?mode, "Core loop iter"); tokio::select! {
mode = match mode { // Managing imap_flow stuff
LoopMode::Interactive => self.interactive_mode().await?, srv_evt = self.server.progress() => match srv_evt? {
LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, ServerFlowEvent::ResponseSent { handle: _handle, response } => {
LoopMode::Quit => break, match response {
} Response::Status(Status::Bye(_)) => return Ok(()),
_ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
match self.cmd_tx.try_send(Request::ImapCommand(command)) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
}
_ => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}
}
},
ServerFlowEvent::IdleCommandReceived { tag } => {
match self.cmd_tx.try_send(Request::IdleStart(tag)) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
}
_ => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}
}
}
ServerFlowEvent::IdleDoneReceived => {
tracing::trace!("client sent DONE and want to stop IDLE");
maybe_idle.ok_or(anyhow!("Received IDLE done but not idling currently"))?.notify_one();
maybe_idle = None;
}
flow => {
self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
},
// Managing response generated by Aerogramme
maybe_msg = self.resp_rx.recv() => match maybe_msg {
Some(ResponseOrIdle::Response(response)) => {
tracing::trace!("Interactive, server has a response for the client");
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
self.server.enqueue_status(response.completion);
},
Some(ResponseOrIdle::IdleAccept(stop)) => {
tracing::trace!("Interactive, server agreed to switch in idle mode");
let cr = CommandContinuationRequest::basic(None, "Idling")?;
self.server.idle_accept(cr).or(Err(anyhow!("refused continuation for idle accept")))?;
self.cmd_tx.try_send(Request::IdlePoll)?;
if maybe_idle.is_some() {
bail!("Can't start IDLE if already idling");
}
maybe_idle = Some(stop);
},
Some(ResponseOrIdle::IdleEvent(elems)) => {
tracing::trace!("server imap session has some change to communicate to the client");
for body_elem in elems.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
self.cmd_tx.try_send(Request::IdlePoll)?;
},
Some(ResponseOrIdle::IdleReject(response)) => {
tracing::trace!("inform client that session rejected idle");
self.server
.idle_reject(response.completion)
.or(Err(anyhow!("wrong reject command")))?;
},
None => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
},
Some(_) => unreachable!(),
},
// When receiving a CTRL+C
_ = self.ctx.must_exit.changed() => {
tracing::trace!("Interactive, CTRL+C, exiting");
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
} }
Ok(())
}
async fn interactive_mode(&mut self) -> Result<LoopMode> {
tokio::select! {
// Managing imap_flow stuff
srv_evt = self.server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
_ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
match self.cmd_tx.try_send(Request::ImapCommand(command)) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
}
_ => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}
}
},
flow => {
self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
}
},
// Managing response generated by Aerogramme
maybe_msg = self.resp_rx.recv() => match maybe_msg {
Some(ResponseOrIdle::Response(response)) => {
tracing::trace!("Interactive, server has a response for the client");
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
self.server.enqueue_status(response.completion);
},
Some(ResponseOrIdle::StartIdle(stop)) => {
tracing::trace!("Interactive, server agreed to switch in idle mode");
let cr = CommandContinuationRequest::basic(None, "Idling")?;
self.server.enqueue_continuation(cr);
self.cmd_tx.try_send(Request::Idle)?;
return Ok(LoopMode::Idle(BytesMut::new(), stop))
},
None => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
},
Some(_) => unreachable!(),
},
// When receiving a CTRL+C
_ = self.ctx.must_exit.changed() => {
tracing::trace!("Interactive, CTRL+C, exiting");
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
Ok(LoopMode::Interactive)
} }
/*
async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> { async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
// Flush send // Flush send
loop { loop {
@ -398,5 +417,5 @@ impl NetLoop {
return Ok(LoopMode::Interactive) return Ok(LoopMode::Interactive)
}, },
}; };
} }*/
} }

View file

@ -1,7 +1,9 @@
use imap_codec::imap_types::command::Command; use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::core::Tag;
#[derive(Debug)] #[derive(Debug)]
pub enum Request { pub enum Request {
ImapCommand(Command<'static>), ImapCommand(Command<'static>),
Idle, IdleStart(Tag<'static>),
IdlePoll,
} }

View file

@ -118,6 +118,7 @@ impl<'a> Response<'a> {
#[derive(Debug)] #[derive(Debug)]
pub enum ResponseOrIdle { pub enum ResponseOrIdle {
Response(Response<'static>), Response(Response<'static>),
StartIdle(Arc<Notify>), IdleAccept(Arc<Notify>),
IdleReject(Response<'static>),
IdleEvent(Vec<Body<'static>>), IdleEvent(Vec<Body<'static>>),
} }

View file

@ -1,6 +1,6 @@
use std::num::{NonZeroU32, NonZeroU64}; use std::num::{NonZeroU32, NonZeroU64};
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
@ -48,7 +48,7 @@ impl<'a> Criteria<'a> {
let mut new_vec = base.0.into_inner(); let mut new_vec = base.0.into_inner();
new_vec.extend_from_slice(ext.0.as_ref()); new_vec.extend_from_slice(ext.0.as_ref());
let seq = SequenceSet( let seq = SequenceSet(
NonEmptyVec::try_from(new_vec) Vec1::try_from(new_vec)
.expect("merging non empty vec lead to non empty vec"), .expect("merging non empty vec lead to non empty vec"),
); );
(seq, x) (seq, x)

View file

@ -4,8 +4,8 @@ use crate::imap::flow;
use crate::imap::request::Request; use crate::imap::request::Request;
use crate::imap::response::{Response, ResponseOrIdle}; use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider; use crate::login::ArcLoginProvider;
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result, Context};
use imap_codec::imap_types::command::Command; use imap_codec::imap_types::{core::Tag, command::Command};
//----- //-----
pub struct Instance { pub struct Instance {
@ -27,13 +27,44 @@ impl Instance {
pub async fn request(&mut self, req: Request) -> ResponseOrIdle { pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
match req { match req {
Request::Idle => self.idle().await, Request::IdleStart(tag) => self.idle_init(tag),
Request::IdlePoll => self.idle_poll().await,
Request::ImapCommand(cmd) => self.command(cmd).await, Request::ImapCommand(cmd) => self.command(cmd).await,
} }
} }
pub async fn idle(&mut self) -> ResponseOrIdle { pub fn idle_init(&mut self, tag: Tag<'static>) -> ResponseOrIdle {
match self.idle_happy().await { // Build transition
//@FIXME the notifier should be hidden inside the state and thus not part of the transition!
let transition = flow::Transition::Idle(tag.clone(), tokio::sync::Notify::new());
// Try to apply the transition and get the stop notifier
let maybe_stop = self
.state
.apply(transition)
.context("IDLE transition failed")
.and_then(|_| self.state.notify().ok_or(anyhow!("IDLE state has no Notify object")));
// Build an appropriate response
match maybe_stop {
Ok(stop) => ResponseOrIdle::IdleAccept(stop),
Err(e) => {
tracing::error!(err=?e, "unable to init idle due to a transition error");
//ResponseOrIdle::IdleReject(tag)
let no = Response::build()
.tag(tag)
.message(
"Internal error, processing command triggered an illegal IMAP state transition",
)
.no()
.unwrap();
ResponseOrIdle::IdleReject(no)
}
}
}
pub async fn idle_poll(&mut self) -> ResponseOrIdle {
match self.idle_poll_happy().await {
Ok(r) => r, Ok(r) => r,
Err(e) => { Err(e) => {
tracing::error!(err=?e, "something bad happened in idle"); tracing::error!(err=?e, "something bad happened in idle");
@ -42,7 +73,7 @@ impl Instance {
} }
} }
pub async fn idle_happy(&mut self) -> Result<ResponseOrIdle> { pub async fn idle_poll_happy(&mut self) -> Result<ResponseOrIdle> {
let (mbx, tag, stop) = match &mut self.state { let (mbx, tag, stop) = match &mut self.state {
flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()), flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()),
_ => bail!("Invalid session state, can't idle"), _ => bail!("Invalid session state, can't idle"),
@ -128,10 +159,11 @@ impl Instance {
.bad() .bad()
.unwrap()); .unwrap());
} }
ResponseOrIdle::Response(resp)
match &self.state { /*match &self.state {
flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()),
_ => ResponseOrIdle::Response(resp), _ => ResponseOrIdle::Response(resp),
} }*/
} }
} }