From 3d23f0c936516ed89f2888fb44babb3994e8d579 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 17 Jan 2024 08:22:15 +0100 Subject: [PATCH 1/9] WIP refactor idle --- src/imap/command/authenticated.rs | 4 +- src/imap/command/selected.rs | 52 +++++++- src/imap/flow.rs | 36 +++--- src/imap/mod.rs | 201 ++++++++++++++++++------------ src/imap/request.rs | 8 ++ src/imap/response.rs | 8 ++ src/imap/session.rs | 46 ++++--- 7 files changed, 245 insertions(+), 110 deletions(-) create mode 100644 src/imap/request.rs diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index 9b6bb24..e17699a 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -453,7 +453,7 @@ impl<'a> AuthenticatedContext<'a> { .code(Code::ReadWrite) .set_body(data) .ok()?, - flow::Transition::Select(mb), + flow::Transition::Select(mb, flow::MailboxPerm::ReadWrite), )) } @@ -491,7 +491,7 @@ impl<'a> AuthenticatedContext<'a> { .code(Code::ReadOnly) .set_body(data) .ok()?, - flow::Transition::Examine(mb), + flow::Transition::Select(mb, flow::MailboxPerm::ReadOnly), )) } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index c13b71a..c9c5337 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -25,6 +25,7 @@ pub struct SelectedContext<'a> { pub mailbox: &'a mut MailboxView, pub server_capabilities: &'a ServerCapability, pub client_capabilities: &'a mut ClientCapability, + pub perm: &'a flow::MailboxPerm, } pub async fn dispatch<'a>( @@ -39,7 +40,10 @@ pub async fn dispatch<'a>( CommandBody::Logout => anystate::logout(), // Specific to this state (7 commands + NOOP) - CommandBody::Close => ctx.close().await, + CommandBody::Close => match ctx.perm { + flow::MailboxPerm::ReadWrite => ctx.close().await, + flow::MailboxPerm::ReadOnly => ctx.examine_close().await, + }, CommandBody::Noop | CommandBody::Check => ctx.noop().await, CommandBody::Fetch { sequence_set, @@ -75,6 +79,11 @@ pub async fn dispatch<'a>( // UNSELECT extension (rfc3691) CommandBody::Unselect => ctx.unselect().await, + // IDLE extension (rfc2177) + CommandBody::Idle => { + unimplemented!() + } + // In selected mode, we fallback to authenticated when needed _ => { authenticated::dispatch(authenticated::AuthenticatedContext { @@ -102,6 +111,18 @@ impl<'a> SelectedContext<'a> { )) } + /// CLOSE in examined state is not the same as in selected state + /// (in selected state it also does an EXPUNGE, here it doesn't) + async fn examine_close(self) -> Result<(Response<'static>, flow::Transition)> { + Ok(( + Response::build() + .to_req(self.req) + .message("CLOSE completed") + .ok()?, + flow::Transition::Unselect, + )) + } + async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> { Ok(( Response::build() @@ -189,6 +210,10 @@ impl<'a> SelectedContext<'a> { } async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> { + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let tag = self.req.tag.clone(); let data = self.mailbox.expunge().await?; @@ -211,6 +236,10 @@ impl<'a> SelectedContext<'a> { modifiers: &[StoreModifier], uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let mut unchanged_since: Option = None; modifiers.iter().for_each(|m| match m { StoreModifier::UnchangedSince(val) => { @@ -251,6 +280,11 @@ impl<'a> SelectedContext<'a> { mailbox: &MailboxCodec<'a>, uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + //@FIXME Could copy be valid in EXAMINE mode? + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -303,6 +337,10 @@ impl<'a> SelectedContext<'a> { mailbox: &MailboxCodec<'a>, uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { + if let Some(failed) = self.fail_read_only() { + return Ok((failed, flow::Transition::None)) + } + let name: &str = MailboxName(mailbox).try_into()?; let mb_opt = self.user.open_mailbox(&name).await?; @@ -350,4 +388,16 @@ impl<'a> SelectedContext<'a> { flow::Transition::None, )) } + + fn fail_read_only(&self) -> Option> { + match self.perm { + flow::MailboxPerm::ReadWrite => None, + flow::MailboxPerm::ReadOnly => { + Some(Response::build() + .to_req(self.req) + .message("Write command are forbidden while exmining mailbox") + .no().unwrap()) + }, + } + } } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 95810c1..ff348ca 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -19,17 +19,23 @@ impl StdError for Error {} pub enum State { NotAuthenticated, Authenticated(Arc), - Selected(Arc, MailboxView), - // Examined is like Selected, but indicates that the mailbox is read-only - Examined(Arc, MailboxView), + Selected(Arc, MailboxView, MailboxPerm), + Idle(Arc, MailboxView, MailboxPerm), Logout, } +#[derive(Clone)] +pub enum MailboxPerm { + ReadOnly, + ReadWrite, +} + pub enum Transition { None, Authenticate(Arc), - Examine(MailboxView), - Select(MailboxView), + Select(MailboxView, MailboxPerm), + Idle, + UnIdle, Unselect, Logout, } @@ -38,20 +44,22 @@ pub enum Transition { // https://datatracker.ietf.org/doc/html/rfc3501#page-13 impl State { pub fn apply(&mut self, tr: Transition) -> Result<(), Error> { - let new_state = match (&self, tr) { + let new_state = match (std::mem::replace(self, State::NotAuthenticated), tr) { (_s, Transition::None) => return Ok(()), (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u), ( - State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _), - Transition::Select(m), - ) => State::Selected(u.clone(), m), - ( - State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _), - Transition::Examine(m), - ) => State::Examined(u.clone(), m), - (State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => { + State::Authenticated(u) | State::Selected(u, _, _), + Transition::Select(m, p), + ) => State::Selected(u, m, p), + (State::Selected(u, _, _) , Transition::Unselect) => { State::Authenticated(u.clone()) } + (State::Selected(u, m, p), Transition::Idle) => { + State::Idle(u, m, p) + }, + (State::Idle(u, m, p), Transition::UnIdle) => { + State::Selected(u, m, p) + }, (_, Transition::Logout) => State::Logout, _ => return Err(Error::ForbiddenTransition), }; diff --git a/src/imap/mod.rs b/src/imap/mod.rs index 61a265a..baa15f7 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -8,22 +8,28 @@ mod index; mod mail_view; mod mailbox_view; mod mime_view; +mod request; mod response; mod search; mod session; use std::net::SocketAddr; -use anyhow::Result; +use anyhow::{Result, bail}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; use tokio::sync::watch; +use tokio::sync::mpsc; use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; +use imap_codec::imap_types::response::{Code, Response, CommandContinuationRequest, Status}; +use crate::imap::response::{Body, ResponseOrIdle}; +use crate::imap::session::Instance; +use crate::imap::request::Request; use crate::config::ImapConfig; use crate::imap::capability::ServerCapability; use crate::login::ArcLoginProvider; @@ -35,8 +41,8 @@ pub struct Server { capabilities: ServerCapability, } +#[derive(Clone)] struct ClientContext { - stream: AnyStream, addr: SocketAddr, login_provider: ArcLoginProvider, must_exit: watch::Receiver, @@ -74,13 +80,12 @@ impl Server { tracing::info!("IMAP: accepted connection from {}", remote_addr); let client = ClientContext { - stream: AnyStream::new(socket), addr: remote_addr.clone(), login_provider: self.login_provider.clone(), must_exit: must_exit.clone(), server_capabilities: self.capabilities.clone(), }; - let conn = tokio::spawn(client_wrapper(client)); + let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket))); connections.push(conn); } drop(tcp); @@ -92,46 +97,74 @@ impl Server { } } -async fn client_wrapper(ctx: ClientContext) { - let addr = ctx.addr.clone(); - match client(ctx).await { - Ok(()) => { - tracing::debug!("closing successful session for {:?}", addr); - } - Err(e) => { - tracing::error!("closing errored session for {:?}: {}", addr, e); - } - } +use tokio::sync::mpsc::*; +enum LoopMode { + Quit, + Interactive, + IdleUntil(tokio::sync::Notify), } -async fn client(mut ctx: ClientContext) -> Result<()> { - // Send greeting - let (mut server, _) = ServerFlow::send_greeting( - ctx.stream, - ServerFlowOptions { - crlf_relaxed: false, - literal_accept_text: Text::unvalidated("OK"), - literal_reject_text: Text::unvalidated("Literal rejected"), - ..ServerFlowOptions::default() - }, - Greeting::ok( - Some(Code::Capability(ctx.server_capabilities.to_vec())), - "Aerogramme", - ) - .unwrap(), - ) - .await?; +struct NetLoop { + ctx: ClientContext, + server: ServerFlow, + cmd_tx: Sender, + resp_rx: UnboundedReceiver, +} - use crate::imap::response::{Body, Response as MyResponse}; - use crate::imap::session::Instance; - use imap_codec::imap_types::command::Command; - use imap_codec::imap_types::response::{Code, Response, Status}; +impl NetLoop { + async fn handler(ctx: ClientContext, sock: AnyStream) { + let addr = ctx.addr.clone(); - use tokio::sync::mpsc; - let (cmd_tx, mut cmd_rx) = mpsc::channel::>(10); - let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::>(); + let nl = match Self::new(ctx, sock).await { + Ok(nl) => { + tracing::debug!(addr=?addr, "netloop successfully initialized"); + nl + }, + Err(e) => { + tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session"); + return + } + }; - let bckgrnd = tokio::spawn(async move { + match nl.core().await { + Ok(()) => { + tracing::debug!("closing successful netloop core for {:?}", addr); + } + Err(e) => { + tracing::error!("closing errored netloop core for {:?}: {}", addr, e); + } + } + } + + async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result { + // Send greeting + let (mut server, _) = ServerFlow::send_greeting( + sock, + ServerFlowOptions { + crlf_relaxed: false, + literal_accept_text: Text::unvalidated("OK"), + literal_reject_text: Text::unvalidated("Literal rejected"), + ..ServerFlowOptions::default() + }, + Greeting::ok( + Some(Code::Capability(ctx.server_capabilities.to_vec())), + "Aerogramme", + ) + .unwrap(), + ) + .await?; + + // Start a mailbox session in background + let (cmd_tx, mut cmd_rx) = mpsc::channel::(3); + let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::(); + tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx)); + + // Return the object + Ok(NetLoop { ctx, server, cmd_tx, resp_rx }) + } + + /// Coms with the background session + async fn session(ctx: ClientContext, mut cmd_rx: Receiver, resp_tx: UnboundedSender) -> () { let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); loop { let cmd = match cmd_rx.recv().await { @@ -140,8 +173,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> { }; tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command"); - let maybe_response = session.command(cmd).await; - tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response"); + let maybe_response = session.request(cmd).await; + tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response"); match resp_tx.send(maybe_response) { Err(_) => break, @@ -149,67 +182,81 @@ async fn client(mut ctx: ClientContext) -> Result<()> { }; } tracing::info!("runner is quitting"); - }); + } - // Main loop - loop { + async fn core(mut self) -> Result<()> { + let mut mode = LoopMode::Interactive; + loop { + mode = match mode { + LoopMode::Interactive => self.interactive_mode().await?, + LoopMode::IdleUntil(notif) => self.idle_mode(notif).await?, + LoopMode::Quit => break, + } + } + Ok(()) + } + + + async fn interactive_mode(&mut self) -> Result { tokio::select! { // Managing imap_flow stuff - srv_evt = server.progress() => match srv_evt? { + srv_evt = self.server.progress() => match srv_evt? { ServerFlowEvent::ResponseSent { handle: _handle, response } => { match response { - Response::Status(Status::Bye(_)) => break, - _ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), + Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), + _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } }, ServerFlowEvent::CommandReceived { command } => { - match cmd_tx.try_send(command) { + match self.cmd_tx.try_send(Request::ImapCommand(command)) { Ok(_) => (), Err(mpsc::error::TrySendError::Full(_)) => { - server.enqueue_status(Status::bye(None, "Too fast").unwrap()); - tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); + self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); + tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); } _ => { - server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", ctx.addr); + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); } } }, flow => { - server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); - tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow); - + self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); + tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } }, // Managing response generated by Aerogramme - maybe_msg = resp_rx.recv() => { - let response = match maybe_msg { - None => { - server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); - tracing::error!("session task exited for {:?}, quitting", ctx.addr); - continue - }, - Some(r) => r, - }; - - for body_elem in response.body.into_iter() { - let _handle = match body_elem { - Body::Data(d) => server.enqueue_data(d), - Body::Status(s) => server.enqueue_status(s), - }; - } - server.enqueue_status(response.completion); + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + }, + Some(ResponseOrIdle::Idle) => { + let cr = CommandContinuationRequest::basic(None, "idling")?; + self.server.enqueue_continuation(cr); + return Ok(LoopMode::IdleUntil(tokio::sync::Notify::new())) + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + }, }, // When receiving a CTRL+C - _ = ctx.must_exit.changed() => { - server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + _ = self.ctx.must_exit.changed() => { + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); }, }; + Ok(LoopMode::Interactive) } - drop(cmd_tx); - bckgrnd.await?; - Ok(()) + async fn idle_mode(&mut self, notif: tokio::sync::Notify) -> Result { + Ok(LoopMode::IdleUntil(notif)) + } } diff --git a/src/imap/request.rs b/src/imap/request.rs new file mode 100644 index 0000000..c458276 --- /dev/null +++ b/src/imap/request.rs @@ -0,0 +1,8 @@ +use imap_codec::imap_types::command::Command; +use tokio::sync::Notify; + +#[derive(Debug)] +pub enum Request { + ImapCommand(Command<'static>), + IdleUntil(Notify), +} diff --git a/src/imap/response.rs b/src/imap/response.rs index d20e58e..a9978e1 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -3,6 +3,7 @@ use imap_codec::imap_types::command::Command; use imap_codec::imap_types::core::Tag; use imap_codec::imap_types::response::{Code, Data, Status}; +#[derive(Debug)] pub enum Body<'a> { Data(Data<'a>), Status(Status<'a>), @@ -88,6 +89,7 @@ impl<'a> ResponseBuilder<'a> { } } +#[derive(Debug)] pub struct Response<'a> { pub body: Vec>, pub completion: Status<'a>, @@ -110,3 +112,9 @@ impl<'a> Response<'a> { }) } } + +#[derive(Debug)] +pub enum ResponseOrIdle { + Response(Response<'static>), + Idle, +} diff --git a/src/imap/session.rs b/src/imap/session.rs index 6b26478..d86e6ff 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,7 +1,9 @@ +use anyhow::anyhow; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anonymous, authenticated, examined, selected}; use crate::imap::flow; -use crate::imap::response::Response; +use crate::imap::request::Request; +use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; use imap_codec::imap_types::command::Command; @@ -23,7 +25,24 @@ impl Instance { } } - pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> { + pub async fn request(&mut self, req: Request) -> ResponseOrIdle { + match req { + Request::IdleUntil(stop) => ResponseOrIdle::Response(self.idle(stop).await), + Request::ImapCommand(cmd) => self.command(cmd).await, + } + } + + pub async fn idle(&mut self, stop: tokio::sync::Notify) -> Response<'static> { + let (user, mbx) = match &mut self.state { + flow::State::Idle(ref user, ref mut mailbox, ref perm) => (user, mailbox), + _ => unreachable!(), + }; + + unimplemented!(); + } + + + pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle { // Command behavior is modulated by the state. // To prevent state error, we handle the same command in separate code paths. let (resp, tr) = match &mut self.state { @@ -44,26 +63,18 @@ impl Instance { }; authenticated::dispatch(ctx).await } - flow::State::Selected(ref user, ref mut mailbox) => { + flow::State::Selected(ref user, ref mut mailbox, ref perm) => { let ctx = selected::SelectedContext { req: &cmd, server_capabilities: &self.server_capabilities, client_capabilities: &mut self.client_capabilities, user, mailbox, + perm, }; selected::dispatch(ctx).await } - flow::State::Examined(ref user, ref mut mailbox) => { - let ctx = examined::ExaminedContext { - req: &cmd, - server_capabilities: &self.server_capabilities, - client_capabilities: &mut self.client_capabilities, - user, - mailbox, - }; - examined::dispatch(ctx).await - } + flow::State::Idle(..) => Err(anyhow!("can not receive command while idling")), flow::State::Logout => Response::build() .tag(cmd.tag.clone()) .message("No commands are allowed in the LOGOUT state.") @@ -88,15 +99,18 @@ impl Instance { e, cmd ); - return Response::build() + return ResponseOrIdle::Response(Response::build() .to_req(&cmd) .message( "Internal error, processing command triggered an illegal IMAP state transition", ) .bad() - .unwrap(); + .unwrap()); } - resp + match self.state { + flow::State::Idle(..) => ResponseOrIdle::Idle, + _ => ResponseOrIdle::Response(resp), + } } } From 0eb8156cde27c54734cbe3d269ab05a876ef53ac Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 17 Jan 2024 08:33:08 +0100 Subject: [PATCH 2/9] Delete EXAMINE that has been merged in SELECTED --- src/imap/command/examined.rs | 164 ----------------------------------- src/imap/command/mod.rs | 1 - src/imap/session.rs | 2 +- 3 files changed, 1 insertion(+), 166 deletions(-) delete mode 100644 src/imap/command/examined.rs diff --git a/src/imap/command/examined.rs b/src/imap/command/examined.rs deleted file mode 100644 index 9fc0990..0000000 --- a/src/imap/command/examined.rs +++ /dev/null @@ -1,164 +0,0 @@ -use std::sync::Arc; -use std::num::NonZeroU64; - -use anyhow::Result; -use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier}; -use imap_codec::imap_types::core::Charset; -use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames; -use imap_codec::imap_types::search::SearchKey; -use imap_codec::imap_types::sequence::SequenceSet; - -use crate::imap::attributes::AttributesProxy; -use crate::imap::capability::{ClientCapability, ServerCapability}; -use crate::imap::command::{anystate, authenticated}; -use crate::imap::flow; -use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; -use crate::imap::response::Response; -use crate::mail::user::User; - -pub struct ExaminedContext<'a> { - pub req: &'a Command<'static>, - pub user: &'a Arc, - pub mailbox: &'a mut MailboxView, - pub server_capabilities: &'a ServerCapability, - pub client_capabilities: &'a mut ClientCapability, -} - -pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> { - match &ctx.req.body { - // Any State - // noop is specific to this state - CommandBody::Capability => { - anystate::capability(ctx.req.tag.clone(), ctx.server_capabilities) - } - CommandBody::Logout => anystate::logout(), - - // Specific to the EXAMINE state (specialization of the SELECTED state) - // ~3 commands -> close, fetch, search + NOOP - CommandBody::Close => ctx.close("CLOSE").await, - CommandBody::Fetch { - sequence_set, - macro_or_item_names, - modifiers, - uid, - } => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await, - CommandBody::Search { - charset, - criteria, - uid, - } => ctx.search(charset, criteria, uid).await, - CommandBody::Noop | CommandBody::Check => ctx.noop().await, - CommandBody::Expunge { .. } | CommandBody::Store { .. } => Ok(( - Response::build() - .to_req(ctx.req) - .message("Forbidden command: can't write in read-only mode (EXAMINE)") - .no()?, - flow::Transition::None, - )), - - // UNSELECT extension (rfc3691) - CommandBody::Unselect => ctx.close("UNSELECT").await, - - // In examined mode, we fallback to authenticated when needed - _ => { - authenticated::dispatch(authenticated::AuthenticatedContext { - req: ctx.req, - server_capabilities: ctx.server_capabilities, - client_capabilities: ctx.client_capabilities, - user: ctx.user, - }) - .await - } - } -} - -// --- PRIVATE --- - -impl<'a> ExaminedContext<'a> { - /// CLOSE in examined state is not the same as in selected state - /// (in selected state it also does an EXPUNGE, here it doesn't) - async fn close(self, kind: &str) -> Result<(Response<'static>, flow::Transition)> { - Ok(( - Response::build() - .to_req(self.req) - .message(format!("{} completed", kind)) - .ok()?, - flow::Transition::Unselect, - )) - } - - pub async fn fetch( - self, - sequence_set: &SequenceSet, - attributes: &'a MacroOrMessageDataItemNames<'static>, - modifiers: &[FetchModifier], - uid: &bool, - ) -> Result<(Response<'static>, flow::Transition)> { - let ap = AttributesProxy::new(attributes, modifiers, *uid); - let mut changed_since: Option = None; - modifiers.iter().for_each(|m| match m { - FetchModifier::ChangedSince(val) => { - changed_since = Some(*val); - }, - }); - - match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await { - Ok(resp) => { - // Capabilities enabling logic only on successful command - // (according to my understanding of the spec) - self.client_capabilities.attributes_enable(&ap); - self.client_capabilities.fetch_modifiers_enable(modifiers); - - Ok(( - Response::build() - .to_req(self.req) - .message("FETCH completed") - .set_body(resp) - .ok()?, - flow::Transition::None, - )) - }, - Err(e) => Ok(( - Response::build() - .to_req(self.req) - .message(e.to_string()) - .no()?, - flow::Transition::None, - )), - } - } - - pub async fn search( - self, - charset: &Option>, - criteria: &SearchKey<'a>, - uid: &bool, - ) -> Result<(Response<'static>, flow::Transition)> { - let (found, enable_condstore) = self.mailbox.search(charset, criteria, *uid).await?; - if enable_condstore { - self.client_capabilities.enable_condstore(); - } - Ok(( - Response::build() - .to_req(self.req) - .set_body(found) - .message("SEARCH completed") - .ok()?, - flow::Transition::None, - )) - } - - pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> { - self.mailbox.internal.mailbox.force_sync().await?; - - let updates = self.mailbox.update(UpdateParameters::default()).await?; - Ok(( - Response::build() - .to_req(self.req) - .message("NOOP completed.") - .set_body(updates) - .ok()?, - flow::Transition::None, - )) - } -} diff --git a/src/imap/command/mod.rs b/src/imap/command/mod.rs index dc95746..073040e 100644 --- a/src/imap/command/mod.rs +++ b/src/imap/command/mod.rs @@ -1,7 +1,6 @@ pub mod anonymous; pub mod anystate; pub mod authenticated; -pub mod examined; pub mod selected; use crate::mail::user::INBOX; diff --git a/src/imap/session.rs b/src/imap/session.rs index d86e6ff..11c2764 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,6 +1,6 @@ use anyhow::anyhow; use crate::imap::capability::{ClientCapability, ServerCapability}; -use crate::imap::command::{anonymous, authenticated, examined, selected}; +use crate::imap::command::{anonymous, authenticated, selected}; use crate::imap::flow; use crate::imap::request::Request; use crate::imap::response::{Response, ResponseOrIdle}; From 1a0247e9352619bed45dfb8101133261cfecb512 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 17 Jan 2024 10:14:48 +0100 Subject: [PATCH 3/9] WIP idle --- src/imap/command/selected.rs | 5 +++- src/imap/flow.rs | 11 +++++---- src/imap/mod.rs | 48 ++++++++++++++++++++++++++++++------ src/imap/request.rs | 2 +- src/imap/response.rs | 3 ++- src/imap/session.rs | 10 ++++---- 6 files changed, 59 insertions(+), 20 deletions(-) diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index c9c5337..b62e2cb 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -81,7 +81,10 @@ pub async fn dispatch<'a>( // IDLE extension (rfc2177) CommandBody::Idle => { - unimplemented!() + Ok(( + Response::build().to_req(ctx.req).message("DUMMY response due to anti-pattern").ok()?, + flow::Transition::Idle(tokio::sync::Notify::new()), + )) } // In selected mode, we fallback to authenticated when needed diff --git a/src/imap/flow.rs b/src/imap/flow.rs index ff348ca..d1e27d4 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -1,6 +1,7 @@ use std::error::Error as StdError; use std::fmt; use std::sync::Arc; +use tokio::sync::Notify; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; @@ -20,7 +21,7 @@ pub enum State { NotAuthenticated, Authenticated(Arc), Selected(Arc, MailboxView, MailboxPerm), - Idle(Arc, MailboxView, MailboxPerm), + Idle(Arc, MailboxView, MailboxPerm, Notify), Logout, } @@ -34,7 +35,7 @@ pub enum Transition { None, Authenticate(Arc), Select(MailboxView, MailboxPerm), - Idle, + Idle(Notify), UnIdle, Unselect, Logout, @@ -54,10 +55,10 @@ impl State { (State::Selected(u, _, _) , Transition::Unselect) => { State::Authenticated(u.clone()) } - (State::Selected(u, m, p), Transition::Idle) => { - State::Idle(u, m, p) + (State::Selected(u, m, p), Transition::Idle(s)) => { + State::Idle(u, m, p, s) }, - (State::Idle(u, m, p), Transition::UnIdle) => { + (State::Idle(u, m, p, _), Transition::UnIdle) => { State::Selected(u, m, p) }, (_, Transition::Logout) => State::Logout, diff --git a/src/imap/mod.rs b/src/imap/mod.rs index baa15f7..edfbbc4 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -101,9 +101,10 @@ use tokio::sync::mpsc::*; enum LoopMode { Quit, Interactive, - IdleUntil(tokio::sync::Notify), + Idle, } +// @FIXME a full refactor of this part of the code will be needed sooner or later struct NetLoop { ctx: ClientContext, server: ServerFlow, @@ -189,7 +190,7 @@ impl NetLoop { loop { mode = match mode { LoopMode::Interactive => self.interactive_mode().await?, - LoopMode::IdleUntil(notif) => self.idle_mode(notif).await?, + LoopMode::Idle => self.idle_mode().await?, LoopMode::Quit => break, } } @@ -237,15 +238,18 @@ impl NetLoop { } self.server.enqueue_status(response.completion); }, - Some(ResponseOrIdle::Idle) => { - let cr = CommandContinuationRequest::basic(None, "idling")?; + Some(ResponseOrIdle::StartIdle) => { + let cr = CommandContinuationRequest::basic(None, "Idling")?; self.server.enqueue_continuation(cr); - return Ok(LoopMode::IdleUntil(tokio::sync::Notify::new())) + self.cmd_tx.try_send(Request::Idle)?; + return Ok(LoopMode::Idle) }, None => { self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, + Some(_) => unreachable!(), + }, // When receiving a CTRL+C @@ -256,7 +260,37 @@ impl NetLoop { Ok(LoopMode::Interactive) } - async fn idle_mode(&mut self, notif: tokio::sync::Notify) -> Result { - Ok(LoopMode::IdleUntil(notif)) + async fn idle_mode(&mut self) -> Result { + tokio::select! { + maybe_msg = self.resp_rx.recv() => match maybe_msg { + Some(ResponseOrIdle::Response(response)) => { + for body_elem in response.body.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + self.server.enqueue_status(response.completion); + return Ok(LoopMode::Interactive) + }, + Some(ResponseOrIdle::IdleEvent(elems)) => { + for body_elem in elems.into_iter() { + let _handle = match body_elem { + Body::Data(d) => self.server.enqueue_data(d), + Body::Status(s) => self.server.enqueue_status(s), + }; + } + return Ok(LoopMode::Idle) + }, + None => { + self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); + tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); + return Ok(LoopMode::Interactive) + }, + Some(ResponseOrIdle::StartIdle) => unreachable!(), + } + }; + /*self.cmd_tx.try_send(Request::Idle).unwrap(); + Ok(LoopMode::Idle)*/ } } diff --git a/src/imap/request.rs b/src/imap/request.rs index c458276..2382b09 100644 --- a/src/imap/request.rs +++ b/src/imap/request.rs @@ -4,5 +4,5 @@ use tokio::sync::Notify; #[derive(Debug)] pub enum Request { ImapCommand(Command<'static>), - IdleUntil(Notify), + Idle, } diff --git a/src/imap/response.rs b/src/imap/response.rs index a9978e1..7b7f92d 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -116,5 +116,6 @@ impl<'a> Response<'a> { #[derive(Debug)] pub enum ResponseOrIdle { Response(Response<'static>), - Idle, + StartIdle, + IdleEvent(Vec>), } diff --git a/src/imap/session.rs b/src/imap/session.rs index 11c2764..d15016f 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -27,14 +27,14 @@ impl Instance { pub async fn request(&mut self, req: Request) -> ResponseOrIdle { match req { - Request::IdleUntil(stop) => ResponseOrIdle::Response(self.idle(stop).await), + Request::Idle => ResponseOrIdle::Response(self.idle().await), Request::ImapCommand(cmd) => self.command(cmd).await, } } - pub async fn idle(&mut self, stop: tokio::sync::Notify) -> Response<'static> { - let (user, mbx) = match &mut self.state { - flow::State::Idle(ref user, ref mut mailbox, ref perm) => (user, mailbox), + pub async fn idle(&mut self) -> Response<'static> { + let (user, mbx, perm, stop) = match &mut self.state { + flow::State::Idle(ref user, ref mut mailbox, ref perm, ref stop) => (user, mailbox, perm, stop), _ => unreachable!(), }; @@ -109,7 +109,7 @@ impl Instance { } match self.state { - flow::State::Idle(..) => ResponseOrIdle::Idle, + flow::State::Idle(..) => ResponseOrIdle::StartIdle, _ => ResponseOrIdle::Response(resp), } } From 4a15ceacf1f45b15ae9b926110f48447c258ba1c Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 17 Jan 2024 10:28:04 +0100 Subject: [PATCH 4/9] Update dependency --- Cargo.lock | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index e00317b..4b49312 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1822,7 +1822,7 @@ dependencies = [ [[package]] name = "imap-flow" version = "0.1.0" -source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#0f548a2070aace09f9f9a0b6ef221efefb8b110b" +source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#60ff9e082ccfcd10a042b616d8038a578fa0c8ff" dependencies = [ "bounded-static", "bytes", From e1161cab0e71ec604e376d2d87f7d1226f3f0244 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Wed, 17 Jan 2024 16:56:05 +0100 Subject: [PATCH 5/9] idle sync --- src/bayou.rs | 36 ++++++++++++++++++--- src/imap/command/selected.rs | 2 +- src/imap/flow.rs | 4 +-- src/imap/mod.rs | 62 +++++++++++++++++++++++++++++------- src/imap/request.rs | 1 - src/imap/response.rs | 4 ++- src/imap/session.rs | 14 ++++++-- src/mail/mailbox.rs | 10 ++++++ 8 files changed, 110 insertions(+), 23 deletions(-) diff --git a/src/bayou.rs b/src/bayou.rs index d77e9dc..14f9728 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -238,6 +238,22 @@ impl Bayou { Ok(()) } + pub async fn idle_sync(&mut self) -> Result<()> { + tracing::debug!("start idle_sync"); + loop { + tracing::trace!("idle_sync loop"); + let fut_notif = self.watch.learnt_remote_update.notified(); + + if self.last_sync_watch_ct != *self.watch.rx.borrow() { + break + } + fut_notif.await; + } + tracing::trace!("idle_sync done"); + self.sync().await?; + Ok(()) + } + /// Applies a new operation on the state. Once this function returns, /// the operation has been safely persisted to storage backend. /// Make sure to call `.opportunistic_sync()` before doing this, @@ -257,7 +273,7 @@ impl Bayou { seal_serialize(&op, &self.key)?, ); self.storage.row_insert(vec![row_val]).await?; - self.watch.notify.notify_one(); + self.watch.propagate_local_update.notify_one(); let new_state = self.state().apply(&op); self.history.push((ts, op, Some(new_state))); @@ -423,7 +439,8 @@ impl Bayou { struct K2vWatch { target: storage::RowRef, rx: watch::Receiver, - notify: Notify, + propagate_local_update: Notify, + learnt_remote_update: Notify, } impl K2vWatch { @@ -434,9 +451,10 @@ impl K2vWatch { let storage = creds.storage.build().await?; let (tx, rx) = watch::channel::(target.clone()); - let notify = Notify::new(); + let propagate_local_update = Notify::new(); + let learnt_remote_update = Notify::new(); - let watch = Arc::new(K2vWatch { target, rx, notify }); + let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); @@ -459,7 +477,12 @@ impl K2vWatch { this.target.uid.shard, this.target.uid.sort ); tokio::select!( + // Needed to exit: will force a loop iteration every minutes, + // that will stop the loop if other Arc references have been dropped + // and free resources. Otherwise we would be blocked waiting forever... _ = tokio::time::sleep(Duration::from_secs(60)) => continue, + + // Watch if another instance has modified the log update = storage.row_poll(&row) => { match update { Err(e) => { @@ -471,10 +494,13 @@ impl K2vWatch { if tx.send(row.clone()).is_err() { break; } + this.learnt_remote_update.notify_waiters(); } } } - _ = this.notify.notified() => { + + // It appears we have modified the log, informing other people + _ = this.propagate_local_update.notified() => { let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); let row_val = storage::RowVal::new(row.clone(), rand); if let Err(e) = storage.row_insert(vec![row_val]).await diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index b62e2cb..4eb4e61 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -82,7 +82,7 @@ pub async fn dispatch<'a>( // IDLE extension (rfc2177) CommandBody::Idle => { Ok(( - Response::build().to_req(ctx.req).message("DUMMY response due to anti-pattern").ok()?, + Response::build().to_req(ctx.req).message("DUMMY command due to anti-pattern in the code").ok()?, flow::Transition::Idle(tokio::sync::Notify::new()), )) } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index d1e27d4..37f225b 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -21,7 +21,7 @@ pub enum State { NotAuthenticated, Authenticated(Arc), Selected(Arc, MailboxView, MailboxPerm), - Idle(Arc, MailboxView, MailboxPerm, Notify), + Idle(Arc, MailboxView, MailboxPerm, Arc), Logout, } @@ -56,7 +56,7 @@ impl State { State::Authenticated(u.clone()) } (State::Selected(u, m, p), Transition::Idle(s)) => { - State::Idle(u, m, p, s) + State::Idle(u, m, p, Arc::new(s)) }, (State::Idle(u, m, p, _), Transition::UnIdle) => { State::Selected(u, m, p) diff --git a/src/imap/mod.rs b/src/imap/mod.rs index edfbbc4..c50c3fc 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -97,11 +97,14 @@ impl Server { } } -use tokio::sync::mpsc::*; +use tokio::sync::mpsc::*; +use tokio_util::bytes::BytesMut; +use tokio::sync::Notify; +use std::sync::Arc; enum LoopMode { Quit, Interactive, - Idle, + Idle(BytesMut, Arc), } // @FIXME a full refactor of this part of the code will be needed sooner or later @@ -190,7 +193,7 @@ impl NetLoop { loop { mode = match mode { LoopMode::Interactive => self.interactive_mode().await?, - LoopMode::Idle => self.idle_mode().await?, + LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, LoopMode::Quit => break, } } @@ -238,11 +241,11 @@ impl NetLoop { } self.server.enqueue_status(response.completion); }, - Some(ResponseOrIdle::StartIdle) => { + Some(ResponseOrIdle::StartIdle(stop)) => { let cr = CommandContinuationRequest::basic(None, "Idling")?; self.server.enqueue_continuation(cr); self.cmd_tx.try_send(Request::Idle)?; - return Ok(LoopMode::Idle) + return Ok(LoopMode::Idle(BytesMut::new(), stop)) }, None => { self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); @@ -260,9 +263,19 @@ impl NetLoop { Ok(LoopMode::Interactive) } - async fn idle_mode(&mut self) -> Result { + async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc) -> Result { + // Flush send + loop { + match self.server.progress_send().await? { + Some(..) => continue, + None => break, + } + } + tokio::select! { + // Receiving IDLE event from background maybe_msg = self.resp_rx.recv() => match maybe_msg { + // Session decided idle is terminated Some(ResponseOrIdle::Response(response)) => { for body_elem in response.body.into_iter() { let _handle = match body_elem { @@ -273,6 +286,7 @@ impl NetLoop { self.server.enqueue_status(response.completion); return Ok(LoopMode::Interactive) }, + // Session has some information for user Some(ResponseOrIdle::IdleEvent(elems)) => { for body_elem in elems.into_iter() { let _handle = match body_elem { @@ -280,17 +294,43 @@ impl NetLoop { Body::Status(s) => self.server.enqueue_status(s), }; } - return Ok(LoopMode::Idle) + self.cmd_tx.try_send(Request::Idle)?; + return Ok(LoopMode::Idle(buff, stop)) }, + + // Session crashed None => { self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); return Ok(LoopMode::Interactive) }, - Some(ResponseOrIdle::StartIdle) => unreachable!(), - } + + // Session can't start idling while already idling, it's a logic error! + Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"), + }, + + // User is trying to interact with us + _read_client_bytes = self.server.stream.read(&mut buff) => { + use imap_codec::decode::Decoder; + let codec = imap_codec::IdleDoneCodec::new(); + match codec.decode(&buff) { + Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => { + // Session will be informed that it must stop idle + // It will generate the "done" message and change the loop mode + stop.notify_one() + }, + Err(_) => (), + _ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."), + }; + + return Ok(LoopMode::Idle(buff, stop)) + }, + + // When receiving a CTRL+C + _ = self.ctx.must_exit.changed() => { + self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); + return Ok(LoopMode::Interactive) + }, }; - /*self.cmd_tx.try_send(Request::Idle).unwrap(); - Ok(LoopMode::Idle)*/ } } diff --git a/src/imap/request.rs b/src/imap/request.rs index 2382b09..49b4992 100644 --- a/src/imap/request.rs +++ b/src/imap/request.rs @@ -1,5 +1,4 @@ use imap_codec::imap_types::command::Command; -use tokio::sync::Notify; #[derive(Debug)] pub enum Request { diff --git a/src/imap/response.rs b/src/imap/response.rs index 7b7f92d..afcb29f 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -1,4 +1,6 @@ +use std::sync::Arc; use anyhow::Result; +use tokio::sync::Notify; use imap_codec::imap_types::command::Command; use imap_codec::imap_types::core::Tag; use imap_codec::imap_types::response::{Code, Data, Status}; @@ -116,6 +118,6 @@ impl<'a> Response<'a> { #[derive(Debug)] pub enum ResponseOrIdle { Response(Response<'static>), - StartIdle, + StartIdle(Arc), IdleEvent(Vec>), } diff --git a/src/imap/session.rs b/src/imap/session.rs index d15016f..1d473ed 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -38,6 +38,16 @@ impl Instance { _ => unreachable!(), }; + tokio::select! { + _ = stop.notified() => { + return Response::build() + .tag(imap_codec::imap_types::core::Tag::try_from("FIXME").unwrap()) + .message("IDLE completed") + .ok() + .unwrap() + } + } + unimplemented!(); } @@ -108,8 +118,8 @@ impl Instance { .unwrap()); } - match self.state { - flow::State::Idle(..) => ResponseOrIdle::StartIdle, + match &self.state { + flow::State::Idle(_, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), _ => ResponseOrIdle::Response(resp), } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 5e95f32..4310a73 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -67,6 +67,11 @@ impl Mailbox { self.mbox.write().await.opportunistic_sync().await } + /// Block until a sync has been done (due to changes in the event log) + pub async fn idle_sync(&self) -> Result<()> { + self.mbox.write().await.idle_sync().await + } + // ---- Functions for reading the mailbox ---- /// Get a clone of the current UID Index of this mailbox @@ -199,6 +204,11 @@ impl MailboxInternal { Ok(()) } + async fn idle_sync(&mut self) -> Result<()> { + self.uid_index.idle_sync().await?; + Ok(()) + } + // ---- Functions for reading the mailbox ---- async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result> { From 185033c462b92854117bc57258bf33b3579a7ca5 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 18 Jan 2024 17:33:57 +0100 Subject: [PATCH 6/9] idling works!!! --- src/bayou.rs | 68 ++++++++++++++++-------------------- src/imap/command/selected.rs | 2 +- src/imap/flow.rs | 11 +++--- src/imap/mailbox_view.rs | 6 ++++ src/imap/session.rs | 38 +++++++++++++------- src/mail/mailbox.rs | 9 +++-- 6 files changed, 72 insertions(+), 62 deletions(-) diff --git a/src/bayou.rs b/src/bayou.rs index 14f9728..1c157e6 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -2,7 +2,7 @@ use std::sync::{Arc, Weak}; use std::time::{Duration, Instant}; use anyhow::{anyhow, bail, Result}; -use log::{debug, error, info}; +use log::error; use rand::prelude::*; use serde::{Deserialize, Serialize}; use tokio::sync::{watch, Notify}; @@ -84,21 +84,21 @@ impl Bayou { // 1. List checkpoints let checkpoints = self.list_checkpoints().await?; - debug!("(sync) listed checkpoints: {:?}", checkpoints); + tracing::debug!("(sync) listed checkpoints: {:?}", checkpoints); // 2. Load last checkpoint if different from currently used one let checkpoint = if let Some((ts, key)) = checkpoints.last() { if *ts == self.checkpoint.0 { (*ts, None) } else { - debug!("(sync) loading checkpoint: {}", key); + tracing::debug!("(sync) loading checkpoint: {}", key); let buf = self .storage .blob_fetch(&storage::BlobRef(key.to_string())) .await? .value; - debug!("(sync) checkpoint body length: {}", buf.len()); + tracing::debug!("(sync) checkpoint body length: {}", buf.len()); let ck = open_deserialize::(&buf, &self.key)?; (*ts, Some(ck)) @@ -112,7 +112,7 @@ impl Bayou { } if let Some(ck) = checkpoint.1 { - debug!( + tracing::debug!( "(sync) updating checkpoint to loaded state at {:?}", checkpoint.0 ); @@ -127,7 +127,7 @@ impl Bayou { // 3. List all operations starting from checkpoint let ts_ser = self.checkpoint.0.to_string(); - debug!("(sync) looking up operations starting at {}", ts_ser); + tracing::debug!("(sync) looking up operations starting at {}", ts_ser); let ops_map = self .storage .row_fetch(&storage::Selector::Range { @@ -161,7 +161,7 @@ impl Bayou { } } ops.sort_by_key(|(ts, _)| *ts); - debug!("(sync) {} operations", ops.len()); + tracing::debug!("(sync) {} operations", ops.len()); if ops.len() < self.history.len() { bail!("Some operations have disappeared from storage!"); @@ -238,20 +238,8 @@ impl Bayou { Ok(()) } - pub async fn idle_sync(&mut self) -> Result<()> { - tracing::debug!("start idle_sync"); - loop { - tracing::trace!("idle_sync loop"); - let fut_notif = self.watch.learnt_remote_update.notified(); - - if self.last_sync_watch_ct != *self.watch.rx.borrow() { - break - } - fut_notif.await; - } - tracing::trace!("idle_sync done"); - self.sync().await?; - Ok(()) + pub fn notifier(&self) -> std::sync::Weak { + Arc::downgrade(&self.watch.learnt_remote_update) } /// Applies a new operation on the state. Once this function returns, @@ -259,7 +247,7 @@ impl Bayou { /// Make sure to call `.opportunistic_sync()` before doing this, /// and even before calculating the `op` argument given here. pub async fn push(&mut self, op: S::Op) -> Result<()> { - debug!("(push) add operation: {:?}", op); + tracing::debug!("(push) add operation: {:?}", op); let ts = Timestamp::after( self.history @@ -321,18 +309,18 @@ impl Bayou { { Some(i) => i, None => { - debug!("(cp) Oldest operation is too recent to trigger checkpoint"); + tracing::debug!("(cp) Oldest operation is too recent to trigger checkpoint"); return Ok(()); } }; if i_cp < CHECKPOINT_MIN_OPS { - debug!("(cp) Not enough old operations to trigger checkpoint"); + tracing::debug!("(cp) Not enough old operations to trigger checkpoint"); return Ok(()); } let ts_cp = self.history[i_cp].0; - debug!( + tracing::debug!( "(cp) we could checkpoint at time {} (index {} in history)", ts_cp.to_string(), i_cp @@ -340,13 +328,13 @@ impl Bayou { // Check existing checkpoints: if last one is too recent, don't checkpoint again. let existing_checkpoints = self.list_checkpoints().await?; - debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); + tracing::debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); if let Some(last_cp) = existing_checkpoints.last() { if (ts_cp.msec as i128 - last_cp.0.msec as i128) < CHECKPOINT_INTERVAL.as_millis() as i128 { - debug!( + tracing::debug!( "(cp) last checkpoint is too recent: {}, not checkpointing", last_cp.0.to_string() ); @@ -354,7 +342,7 @@ impl Bayou { } } - debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); + tracing::debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); // Calculate state at time of checkpoint let mut last_known_state = (0, &self.checkpoint.1); @@ -370,7 +358,7 @@ impl Bayou { // Serialize and save checkpoint let cryptoblob = seal_serialize(&state_cp, &self.key)?; - debug!("(cp) checkpoint body length: {}", cryptoblob.len()); + tracing::debug!("(cp) checkpoint body length: {}", cryptoblob.len()); let blob_val = storage::BlobVal::new( storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), @@ -385,7 +373,7 @@ impl Bayou { // Delete blobs for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { - debug!("(cp) drop old checkpoint {}", key); + tracing::debug!("(cp) drop old checkpoint {}", key); self.storage .blob_rm(&storage::BlobRef(key.to_string())) .await?; @@ -440,7 +428,7 @@ struct K2vWatch { target: storage::RowRef, rx: watch::Receiver, propagate_local_update: Notify, - learnt_remote_update: Notify, + learnt_remote_update: Arc, } impl K2vWatch { @@ -452,7 +440,7 @@ impl K2vWatch { let (tx, rx) = watch::channel::(target.clone()); let propagate_local_update = Notify::new(); - let learnt_remote_update = Notify::new(); + let learnt_remote_update = Arc::new(Notify::new()); let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); @@ -466,13 +454,13 @@ impl K2vWatch { storage: storage::Store, tx: watch::Sender, ) { - let mut row = match Weak::upgrade(&self_weak) { - Some(this) => this.target.clone(), + let (mut row, remote_update) = match Weak::upgrade(&self_weak) { + Some(this) => (this.target.clone(), this.learnt_remote_update.clone()), None => return, }; while let Some(this) = Weak::upgrade(&self_weak) { - debug!( + tracing::debug!( "bayou k2v watch bg loop iter ({}, {})", this.target.uid.shard, this.target.uid.sort ); @@ -491,9 +479,11 @@ impl K2vWatch { } Ok(new_value) => { row = new_value.row_ref; - if tx.send(row.clone()).is_err() { + if let Err(e) = tx.send(row.clone()) { + tracing::warn!(err=?e, "(watch) can't record the new log ref"); break; } + tracing::debug!(row=?row, "(watch) learnt remote update"); this.learnt_remote_update.notify_waiters(); } } @@ -505,12 +495,14 @@ impl K2vWatch { let row_val = storage::RowVal::new(row.clone(), rand); if let Err(e) = storage.row_insert(vec![row_val]).await { - error!("Error in bayou k2v watch updater loop: {}", e); + tracing::error!("Error in bayou k2v watch updater loop: {}", e); tokio::time::sleep(Duration::from_secs(30)).await; } } ); } - info!("bayou k2v watch bg loop exiting"); + // unblock listeners + remote_update.notify_waiters(); + tracing::info!("bayou k2v watch bg loop exiting"); } } diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index 4eb4e61..ca2e268 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -83,7 +83,7 @@ pub async fn dispatch<'a>( CommandBody::Idle => { Ok(( Response::build().to_req(ctx.req).message("DUMMY command due to anti-pattern in the code").ok()?, - flow::Transition::Idle(tokio::sync::Notify::new()), + flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), )) } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 37f225b..72d9e8e 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -3,6 +3,7 @@ use std::fmt; use std::sync::Arc; use tokio::sync::Notify; +use imap_codec::imap_types::core::Tag; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; @@ -21,7 +22,7 @@ pub enum State { NotAuthenticated, Authenticated(Arc), Selected(Arc, MailboxView, MailboxPerm), - Idle(Arc, MailboxView, MailboxPerm, Arc), + Idle(Arc, MailboxView, MailboxPerm, Tag<'static>, Arc), Logout, } @@ -35,7 +36,7 @@ pub enum Transition { None, Authenticate(Arc), Select(MailboxView, MailboxPerm), - Idle(Notify), + Idle(Tag<'static>, Notify), UnIdle, Unselect, Logout, @@ -55,10 +56,10 @@ impl State { (State::Selected(u, _, _) , Transition::Unselect) => { State::Authenticated(u.clone()) } - (State::Selected(u, m, p), Transition::Idle(s)) => { - State::Idle(u, m, p, Arc::new(s)) + (State::Selected(u, m, p), Transition::Idle(t, s)) => { + State::Idle(u, m, p, t, Arc::new(s)) }, - (State::Idle(u, m, p, _), Transition::UnIdle) => { + (State::Idle(u, m, p, _, _), Transition::UnIdle) => { State::Selected(u, m, p) }, (_, Transition::Logout) => State::Logout, diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 07fa3ad..85a4961 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -224,6 +224,12 @@ impl MailboxView { Ok((summary, conflict_id_or_uid)) } + pub async fn idle_sync(&mut self) -> Result>> { + self.internal.mailbox.notify().await.upgrade().ok_or(anyhow!("test"))?.notified().await; + self.internal.mailbox.opportunistic_sync().await?; + self.update(UpdateParameters::default()).await + } + pub async fn expunge(&mut self) -> Result>> { self.internal.sync().await?; let state = self.internal.peek().await; diff --git a/src/imap/session.rs b/src/imap/session.rs index 1d473ed..f4e3d0f 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,4 +1,4 @@ -use anyhow::anyhow; +use anyhow::{Result, anyhow, bail}; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anonymous, authenticated, selected}; use crate::imap::flow; @@ -27,28 +27,40 @@ impl Instance { pub async fn request(&mut self, req: Request) -> ResponseOrIdle { match req { - Request::Idle => ResponseOrIdle::Response(self.idle().await), + Request::Idle => self.idle().await, Request::ImapCommand(cmd) => self.command(cmd).await, } } - pub async fn idle(&mut self) -> Response<'static> { - let (user, mbx, perm, stop) = match &mut self.state { - flow::State::Idle(ref user, ref mut mailbox, ref perm, ref stop) => (user, mailbox, perm, stop), - _ => unreachable!(), + pub async fn idle(&mut self) -> ResponseOrIdle { + match self.idle_happy().await { + Ok(r) => r, + Err(e) => { + tracing::error!(err=?e, "something bad happened in idle"); + ResponseOrIdle::Response(Response::bye().unwrap()) + } + } + } + + pub async fn idle_happy(&mut self) -> Result { + let (mbx, tag, stop) = match &mut self.state { + flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()), + _ => bail!("Invalid session state, can't idle"), }; tokio::select! { _ = stop.notified() => { - return Response::build() - .tag(imap_codec::imap_types::core::Tag::try_from("FIXME").unwrap()) + self.state.apply(flow::Transition::UnIdle)?; + return Ok(ResponseOrIdle::Response(Response::build() + .tag(tag.clone()) .message("IDLE completed") - .ok() - .unwrap() + .ok()?)) + }, + change = mbx.idle_sync() => { + tracing::debug!("idle event"); + return Ok(ResponseOrIdle::IdleEvent(change?)); } } - - unimplemented!(); } @@ -119,7 +131,7 @@ impl Instance { } match &self.state { - flow::State::Idle(_, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), + flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()), _ => ResponseOrIdle::Response(resp), } } diff --git a/src/mail/mailbox.rs b/src/mail/mailbox.rs index 4310a73..c20d815 100644 --- a/src/mail/mailbox.rs +++ b/src/mail/mailbox.rs @@ -68,8 +68,8 @@ impl Mailbox { } /// Block until a sync has been done (due to changes in the event log) - pub async fn idle_sync(&self) -> Result<()> { - self.mbox.write().await.idle_sync().await + pub async fn notify(&self) -> std::sync::Weak { + self.mbox.read().await.notifier() } // ---- Functions for reading the mailbox ---- @@ -204,9 +204,8 @@ impl MailboxInternal { Ok(()) } - async fn idle_sync(&mut self) -> Result<()> { - self.uid_index.idle_sync().await?; - Ok(()) + fn notifier(&self) -> std::sync::Weak { + self.uid_index.notifier() } // ---- Functions for reading the mailbox ---- From 43b668531f060a2c0f950da96b363b2ea7cf4e06 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 18 Jan 2024 18:02:24 +0100 Subject: [PATCH 7/9] fix a transition bug --- src/imap/flow.rs | 39 +++++++++++++++++++++++++++++++++++---- 1 file changed, 35 insertions(+), 4 deletions(-) diff --git a/src/imap/flow.rs b/src/imap/flow.rs index 72d9e8e..e817e77 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -25,6 +25,18 @@ pub enum State { Idle(Arc, MailboxView, MailboxPerm, Tag<'static>, Arc), Logout, } +impl fmt::Display for State { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use State::*; + match self { + NotAuthenticated => write!(f, "NotAuthenticated"), + Authenticated(..) => write!(f, "Authenticated"), + Selected(..) => write!(f, "Selected"), + Idle(..) => write!(f, "Idle"), + Logout => write!(f, "Logout"), + } + } +} #[derive(Clone)] pub enum MailboxPerm { @@ -41,13 +53,29 @@ pub enum Transition { Unselect, Logout, } +impl fmt::Display for Transition { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + use Transition::*; + match self { + None => write!(f, "None"), + Authenticate(..) => write!(f, "Authenticated"), + Select(..) => write!(f, "Selected"), + Idle(..) => write!(f, "Idle"), + UnIdle => write!(f, "UnIdle"), + Unselect => write!(f, "Unselect"), + Logout => write!(f, "Logout"), + } + } +} // See RFC3501 section 3. // https://datatracker.ietf.org/doc/html/rfc3501#page-13 impl State { pub fn apply(&mut self, tr: Transition) -> Result<(), Error> { - let new_state = match (std::mem::replace(self, State::NotAuthenticated), tr) { - (_s, Transition::None) => return Ok(()), + tracing::debug!(state=%self, transition=%tr, "try change state"); + + let new_state = match (std::mem::replace(self, State::Logout), tr) { + (s, Transition::None) => s, (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u), ( State::Authenticated(u) | State::Selected(u, _, _), @@ -63,10 +91,13 @@ impl State { State::Selected(u, m, p) }, (_, Transition::Logout) => State::Logout, - _ => return Err(Error::ForbiddenTransition), + (s, t) => { + tracing::error!(state=%s, transition=%t, "forbidden transition"); + return Err(Error::ForbiddenTransition) + } }; - *self = new_state; + tracing::debug!(state=%self, "transition succeeded"); Ok(()) } From 2c5adc8f166c6117ece353376b9071f5e30857b1 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Thu, 18 Jan 2024 18:03:21 +0100 Subject: [PATCH 8/9] reformat code --- src/bayou.rs | 10 +++- src/imap/attributes.rs | 50 +++++++++--------- src/imap/capability.rs | 13 +++-- src/imap/command/authenticated.rs | 2 +- src/imap/command/selected.rs | 87 ++++++++++++++++++------------- src/imap/flow.rs | 27 +++++----- src/imap/mailbox_view.rs | 76 ++++++++++++++++----------- src/imap/mod.rs | 44 +++++++++------- src/imap/response.rs | 4 +- src/imap/search.rs | 29 ++++++++--- src/imap/session.rs | 3 +- src/mail/uidindex.rs | 30 ++++++----- tests/behavior.rs | 87 +++++++++++++++++++++++-------- tests/common/fragments.rs | 13 +++-- 14 files changed, 295 insertions(+), 180 deletions(-) diff --git a/src/bayou.rs b/src/bayou.rs index 1c157e6..9faff5a 100644 --- a/src/bayou.rs +++ b/src/bayou.rs @@ -442,7 +442,12 @@ impl K2vWatch { let propagate_local_update = Notify::new(); let learnt_remote_update = Arc::new(Notify::new()); - let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); + let watch = Arc::new(K2vWatch { + target, + rx, + propagate_local_update, + learnt_remote_update, + }); tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); @@ -462,7 +467,8 @@ impl K2vWatch { while let Some(this) = Weak::upgrade(&self_weak) { tracing::debug!( "bayou k2v watch bg loop iter ({}, {})", - this.target.uid.shard, this.target.uid.sort + this.target.uid.shard, + this.target.uid.sort ); tokio::select!( // Needed to exit: will force a loop iteration every minutes, diff --git a/src/imap/attributes.rs b/src/imap/attributes.rs index d094f1a..89446a8 100644 --- a/src/imap/attributes.rs +++ b/src/imap/attributes.rs @@ -1,5 +1,5 @@ -use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section}; use imap_codec::imap_types::command::FetchModifier; +use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section}; /// Internal decisions based on fetched attributes /// passed by the client @@ -8,7 +8,11 @@ pub struct AttributesProxy { pub attrs: Vec>, } impl AttributesProxy { - pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self { + pub fn new( + attrs: &MacroOrMessageDataItemNames<'static>, + modifiers: &[FetchModifier], + is_uid_fetch: bool, + ) -> Self { // Expand macros let mut fetch_attrs = match attrs { MacroOrMessageDataItemNames::Macro(m) => { @@ -44,32 +48,30 @@ impl AttributesProxy { } pub fn is_enabling_condstore(&self) -> bool { - self.attrs.iter().any(|x| { - matches!(x, MessageDataItemName::ModSeq) - }) + self.attrs + .iter() + .any(|x| matches!(x, MessageDataItemName::ModSeq)) } pub fn need_body(&self) -> bool { - self.attrs.iter().any(|x| { - match x { - MessageDataItemName::Body - | MessageDataItemName::Rfc822 - | MessageDataItemName::Rfc822Text - | MessageDataItemName::BodyStructure => true, + self.attrs.iter().any(|x| match x { + MessageDataItemName::Body + | MessageDataItemName::Rfc822 + | MessageDataItemName::Rfc822Text + | MessageDataItemName::BodyStructure => true, - MessageDataItemName::BodyExt { - section: Some(section), - partial: _, - peek: _, - } => match section { - Section::Header(None) - | Section::HeaderFields(None, _) - | Section::HeaderFieldsNot(None, _) => false, - _ => true, - }, - MessageDataItemName::BodyExt { .. } => true, - _ => false, - } + MessageDataItemName::BodyExt { + section: Some(section), + partial: _, + peek: _, + } => match section { + Section::Header(None) + | Section::HeaderFields(None, _) + | Section::HeaderFieldsNot(None, _) => false, + _ => true, + }, + MessageDataItemName::BodyExt { .. } => true, + _ => false, }) } } diff --git a/src/imap/capability.rs b/src/imap/capability.rs index 6533ccb..f60c01f 100644 --- a/src/imap/capability.rs +++ b/src/imap/capability.rs @@ -1,4 +1,4 @@ -use imap_codec::imap_types::command::{FetchModifier, StoreModifier, SelectExamineModifier}; +use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier}; use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::response::Capability; @@ -72,7 +72,6 @@ impl ClientStatus { } } - pub struct ClientCapability { pub condstore: ClientStatus, pub utf8kind: Option, @@ -100,13 +99,19 @@ impl ClientCapability { } pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) { - if mods.iter().any(|x| matches!(x, FetchModifier::ChangedSince(..))) { + if mods + .iter() + .any(|x| matches!(x, FetchModifier::ChangedSince(..))) + { self.enable_condstore() } } pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) { - if mods.iter().any(|x| matches!(x, StoreModifier::UnchangedSince(..))) { + if mods + .iter() + .any(|x| matches!(x, StoreModifier::UnchangedSince(..))) + { self.enable_condstore() } } diff --git a/src/imap/command/authenticated.rs b/src/imap/command/authenticated.rs index e17699a..3fd132f 100644 --- a/src/imap/command/authenticated.rs +++ b/src/imap/command/authenticated.rs @@ -405,7 +405,7 @@ impl<'a> AuthenticatedContext<'a> { it is therefore correct to not return it even if there are unseen messages RFC9051 (imap4rev2) says that OK [UNSEEN] responses are deprecated after SELECT and EXAMINE For Aerogramme, we just don't send the OK [UNSEEN], it's correct to do in both specifications. - + 20 select "INBOX.achats" * FLAGS (\Answered \Flagged \Deleted \Seen \Draft $Forwarded JUNK $label1) diff --git a/src/imap/command/selected.rs b/src/imap/command/selected.rs index ca2e268..98b3b00 100644 --- a/src/imap/command/selected.rs +++ b/src/imap/command/selected.rs @@ -1,5 +1,5 @@ -use std::sync::Arc; use std::num::NonZeroU64; +use std::sync::Arc; use anyhow::Result; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; @@ -11,12 +11,12 @@ use imap_codec::imap_types::response::{Code, CodeOther}; use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::sequence::SequenceSet; +use crate::imap::attributes::AttributesProxy; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anystate, authenticated, MailboxName}; use crate::imap::flow; use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::response::Response; -use crate::imap::attributes::AttributesProxy; use crate::mail::user::User; pub struct SelectedContext<'a> { @@ -50,7 +50,10 @@ pub async fn dispatch<'a>( macro_or_item_names, modifiers, uid, - } => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await, + } => { + ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid) + .await + } CommandBody::Search { charset, criteria, @@ -64,7 +67,10 @@ pub async fn dispatch<'a>( flags, modifiers, uid, - } => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await, + } => { + ctx.store(sequence_set, kind, response, flags, modifiers, uid) + .await + } CommandBody::Copy { sequence_set, mailbox, @@ -80,12 +86,13 @@ pub async fn dispatch<'a>( CommandBody::Unselect => ctx.unselect().await, // IDLE extension (rfc2177) - CommandBody::Idle => { - Ok(( - Response::build().to_req(ctx.req).message("DUMMY command due to anti-pattern in the code").ok()?, - flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), - )) - } + CommandBody::Idle => Ok(( + Response::build() + .to_req(ctx.req) + .message("DUMMY command due to anti-pattern in the code") + .ok()?, + flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), + )), // In selected mode, we fallback to authenticated when needed _ => { @@ -148,10 +155,14 @@ impl<'a> SelectedContext<'a> { modifiers.iter().for_each(|m| match m { FetchModifier::ChangedSince(val) => { changed_since = Some(*val); - }, + } }); - match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await { + match self + .mailbox + .fetch(sequence_set, &ap, changed_since, uid) + .await + { Ok(resp) => { // Capabilities enabling logic only on successful command // (according to my understanding of the spec) @@ -167,7 +178,7 @@ impl<'a> SelectedContext<'a> { .ok()?, flow::Transition::None, )) - }, + } Err(e) => Ok(( Response::build() .to_req(self.req) @@ -214,7 +225,7 @@ impl<'a> SelectedContext<'a> { async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> { if let Some(failed) = self.fail_read_only() { - return Ok((failed, flow::Transition::None)) + return Ok((failed, flow::Transition::None)); } let tag = self.req.tag.clone(); @@ -240,14 +251,14 @@ impl<'a> SelectedContext<'a> { uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { if let Some(failed) = self.fail_read_only() { - return Ok((failed, flow::Transition::None)) + return Ok((failed, flow::Transition::None)); } let mut unchanged_since: Option = None; modifiers.iter().for_each(|m| match m { StoreModifier::UnchangedSince(val) => { unchanged_since = Some(*val); - }, + } }); let (data, modified) = self @@ -256,25 +267,30 @@ impl<'a> SelectedContext<'a> { .await?; let mut ok_resp = Response::build() - .to_req(self.req) - .message("STORE completed") - .set_body(data); - + .to_req(self.req) + .message("STORE completed") + .set_body(data); match modified[..] { [] => (), [_head, ..] => { - let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::>().join(",")); - ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes()))); - }, + let modified_str = format!( + "MODIFIED {}", + modified + .into_iter() + .map(|x| x.to_string()) + .collect::>() + .join(",") + ); + ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated( + modified_str.into_bytes(), + ))); + } }; - self.client_capabilities.store_modifiers_enable(modifiers); - Ok((ok_resp.ok()?, - flow::Transition::None, - )) + Ok((ok_resp.ok()?, flow::Transition::None)) } async fn copy( @@ -285,7 +301,7 @@ impl<'a> SelectedContext<'a> { ) -> Result<(Response<'static>, flow::Transition)> { //@FIXME Could copy be valid in EXAMINE mode? if let Some(failed) = self.fail_read_only() { - return Ok((failed, flow::Transition::None)) + return Ok((failed, flow::Transition::None)); } let name: &str = MailboxName(mailbox).try_into()?; @@ -341,7 +357,7 @@ impl<'a> SelectedContext<'a> { uid: &bool, ) -> Result<(Response<'static>, flow::Transition)> { if let Some(failed) = self.fail_read_only() { - return Ok((failed, flow::Transition::None)) + return Ok((failed, flow::Transition::None)); } let name: &str = MailboxName(mailbox).try_into()?; @@ -395,12 +411,13 @@ impl<'a> SelectedContext<'a> { fn fail_read_only(&self) -> Option> { match self.perm { flow::MailboxPerm::ReadWrite => None, - flow::MailboxPerm::ReadOnly => { - Some(Response::build() - .to_req(self.req) - .message("Write command are forbidden while exmining mailbox") - .no().unwrap()) - }, + flow::MailboxPerm::ReadOnly => Some( + Response::build() + .to_req(self.req) + .message("Write command are forbidden while exmining mailbox") + .no() + .unwrap(), + ), } } } diff --git a/src/imap/flow.rs b/src/imap/flow.rs index e817e77..6ddd092 100644 --- a/src/imap/flow.rs +++ b/src/imap/flow.rs @@ -3,9 +3,9 @@ use std::fmt; use std::sync::Arc; use tokio::sync::Notify; -use imap_codec::imap_types::core::Tag; use crate::imap::mailbox_view::MailboxView; use crate::mail::user::User; +use imap_codec::imap_types::core::Tag; #[derive(Debug)] pub enum Error { @@ -22,7 +22,13 @@ pub enum State { NotAuthenticated, Authenticated(Arc), Selected(Arc, MailboxView, MailboxPerm), - Idle(Arc, MailboxView, MailboxPerm, Tag<'static>, Arc), + Idle( + Arc, + MailboxView, + MailboxPerm, + Tag<'static>, + Arc, + ), Logout, } impl fmt::Display for State { @@ -77,23 +83,18 @@ impl State { let new_state = match (std::mem::replace(self, State::Logout), tr) { (s, Transition::None) => s, (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u), - ( - State::Authenticated(u) | State::Selected(u, _, _), - Transition::Select(m, p), - ) => State::Selected(u, m, p), - (State::Selected(u, _, _) , Transition::Unselect) => { - State::Authenticated(u.clone()) + (State::Authenticated(u) | State::Selected(u, _, _), Transition::Select(m, p)) => { + State::Selected(u, m, p) } + (State::Selected(u, _, _), Transition::Unselect) => State::Authenticated(u.clone()), (State::Selected(u, m, p), Transition::Idle(t, s)) => { State::Idle(u, m, p, t, Arc::new(s)) - }, - (State::Idle(u, m, p, _, _), Transition::UnIdle) => { - State::Selected(u, m, p) - }, + } + (State::Idle(u, m, p, _, _), Transition::UnIdle) => State::Selected(u, m, p), (_, Transition::Logout) => State::Logout, (s, t) => { tracing::error!(state=%s, transition=%t, "forbidden transition"); - return Err(Error::ForbiddenTransition) + return Err(Error::ForbiddenTransition); } }; *self = new_state; diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index 85a4961..0efa987 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -1,6 +1,6 @@ +use std::collections::HashSet; use std::num::{NonZeroU32, NonZeroU64}; use std::sync::Arc; -use std::collections::HashSet; use anyhow::{anyhow, Error, Result}; @@ -13,11 +13,11 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status}; use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::sequence::SequenceSet; -use crate::mail::unique_ident::UniqueIdent; use crate::mail::mailbox::Mailbox; use crate::mail::query::QueryScope; use crate::mail::snapshot::FrozenMailbox; use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq}; +use crate::mail::unique_ident::UniqueIdent; use crate::imap::attributes::AttributesProxy; use crate::imap::flags; @@ -64,7 +64,7 @@ pub struct MailboxView { impl MailboxView { /// Creates a new IMAP view into a mailbox. pub async fn new(mailbox: Arc, is_cond: bool) -> Self { - Self { + Self { internal: mailbox.frozen().await, is_condstore: is_cond, } @@ -130,11 +130,9 @@ impl MailboxView { let new_mail = new_snapshot.table.get(uuid); if old_mail.is_some() && old_mail != new_mail { if let Some((uid, modseq, flags)) = new_mail { - let mut items = vec![ - MessageDataItem::Flags( - flags.iter().filter_map(|f| flags::from_str(f)).collect(), - ), - ]; + let mut items = vec![MessageDataItem::Flags( + flags.iter().filter_map(|f| flags::from_str(f)).collect(), + )]; if params.with_uid { items.push(MessageDataItem::Uid(*uid)); @@ -169,7 +167,7 @@ impl MailboxView { data.push(self.highestmodseq_status()?); } /*self.unseen_first_status()? - .map(|unseen_status| data.push(unseen_status));*/ + .map(|unseen_status| data.push(unseen_status));*/ Ok(data) } @@ -188,8 +186,8 @@ impl MailboxView { let flags = flags.iter().map(|x| x.to_string()).collect::>(); let idx = self.index()?; - let (editable, in_conflict) = idx - .fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?; + let (editable, in_conflict) = + idx.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?; for mi in editable.iter() { match kind { @@ -215,17 +213,26 @@ impl MailboxView { _ => in_conflict.into_iter().map(|midx| midx.i).collect(), }; - let summary = self.update(UpdateParameters { - with_uid: *is_uid_store, - with_modseq: unchanged_since.is_some(), - silence, - }).await?; + let summary = self + .update(UpdateParameters { + with_uid: *is_uid_store, + with_modseq: unchanged_since.is_some(), + silence, + }) + .await?; Ok((summary, conflict_id_or_uid)) } pub async fn idle_sync(&mut self) -> Result>> { - self.internal.mailbox.notify().await.upgrade().ok_or(anyhow!("test"))?.notified().await; + self.internal + .mailbox + .notify() + .await + .upgrade() + .ok_or(anyhow!("test"))? + .notified() + .await; self.internal.mailbox.opportunistic_sync().await?; self.update(UpdateParameters::default()).await } @@ -300,10 +307,12 @@ impl MailboxView { ret.push((mi.uid, dest_uid)); } - let update = self.update(UpdateParameters { - with_uid: *is_uid_copy, - ..UpdateParameters::default() - }).await?; + let update = self + .update(UpdateParameters { + with_uid: *is_uid_copy, + ..UpdateParameters::default() + }) + .await?; Ok((to_state.uidvalidity, ret, update)) } @@ -327,11 +336,7 @@ impl MailboxView { }; tracing::debug!("Query scope {:?}", query_scope); let idx = self.index()?; - let mail_idx_list = idx.fetch_changed_since( - sequence_set, - changed_since, - *is_uid_fetch - )?; + let mail_idx_list = idx.fetch_changed_since(sequence_set, changed_since, *is_uid_fetch)?; // [2/6] Fetch the emails let uuids = mail_idx_list @@ -420,12 +425,19 @@ impl MailboxView { let maybe_modseq = match is_modseq { true => { let final_selection = kept_idx.iter().chain(kept_query.iter()); - final_selection.map(|in_idx| in_idx.modseq).max().map(|r| NonZeroU64::try_from(r)).transpose()? - }, + final_selection + .map(|in_idx| in_idx.modseq) + .max() + .map(|r| NonZeroU64::try_from(r)) + .transpose()? + } _ => None, }; - Ok((vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], is_modseq)) + Ok(( + vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], + is_modseq, + )) } // ---- @@ -469,8 +481,10 @@ impl MailboxView { pub(crate) fn highestmodseq_status(&self) -> Result> { Ok(Body::Status(Status::ok( - None, - Some(Code::Other(CodeOther::unvalidated(format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes()))), + None, + Some(Code::Other(CodeOther::unvalidated( + format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes(), + ))), "Highest", )?)) } diff --git a/src/imap/mod.rs b/src/imap/mod.rs index c50c3fc..40c4d4f 100644 --- a/src/imap/mod.rs +++ b/src/imap/mod.rs @@ -15,23 +15,23 @@ mod session; use std::net::SocketAddr; -use anyhow::{Result, bail}; +use anyhow::{bail, Result}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; -use tokio::sync::watch; use tokio::sync::mpsc; +use tokio::sync::watch; +use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status}; use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; -use imap_codec::imap_types::response::{Code, Response, CommandContinuationRequest, Status}; -use crate::imap::response::{Body, ResponseOrIdle}; -use crate::imap::session::Instance; -use crate::imap::request::Request; use crate::config::ImapConfig; use crate::imap::capability::ServerCapability; +use crate::imap::request::Request; +use crate::imap::response::{Body, ResponseOrIdle}; +use crate::imap::session::Instance; use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in Bร L @@ -97,10 +97,10 @@ impl Server { } } -use tokio::sync::mpsc::*; -use tokio_util::bytes::BytesMut; -use tokio::sync::Notify; use std::sync::Arc; +use tokio::sync::mpsc::*; +use tokio::sync::Notify; +use tokio_util::bytes::BytesMut; enum LoopMode { Quit, Interactive, @@ -123,10 +123,10 @@ impl NetLoop { Ok(nl) => { tracing::debug!(addr=?addr, "netloop successfully initialized"); nl - }, + } Err(e) => { tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session"); - return + return; } }; @@ -153,10 +153,10 @@ impl NetLoop { Greeting::ok( Some(Code::Capability(ctx.server_capabilities.to_vec())), "Aerogramme", - ) - .unwrap(), ) - .await?; + .unwrap(), + ) + .await?; // Start a mailbox session in background let (cmd_tx, mut cmd_rx) = mpsc::channel::(3); @@ -164,11 +164,20 @@ impl NetLoop { tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx)); // Return the object - Ok(NetLoop { ctx, server, cmd_tx, resp_rx }) + Ok(NetLoop { + ctx, + server, + cmd_tx, + resp_rx, + }) } /// Coms with the background session - async fn session(ctx: ClientContext, mut cmd_rx: Receiver, resp_tx: UnboundedSender) -> () { + async fn session( + ctx: ClientContext, + mut cmd_rx: Receiver, + resp_tx: UnboundedSender, + ) -> () { let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); loop { let cmd = match cmd_rx.recv().await { @@ -200,7 +209,6 @@ impl NetLoop { Ok(()) } - async fn interactive_mode(&mut self) -> Result { tokio::select! { // Managing imap_flow stuff @@ -252,7 +260,7 @@ impl NetLoop { tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, Some(_) => unreachable!(), - + }, // When receiving a CTRL+C diff --git a/src/imap/response.rs b/src/imap/response.rs index afcb29f..40e6927 100644 --- a/src/imap/response.rs +++ b/src/imap/response.rs @@ -1,9 +1,9 @@ -use std::sync::Arc; use anyhow::Result; -use tokio::sync::Notify; use imap_codec::imap_types::command::Command; use imap_codec::imap_types::core::Tag; use imap_codec::imap_types::response::{Code, Data, Status}; +use std::sync::Arc; +use tokio::sync::Notify; #[derive(Debug)] pub enum Body<'a> { diff --git a/src/imap/search.rs b/src/imap/search.rs index 61cbad5..d06c3bd 100644 --- a/src/imap/search.rs +++ b/src/imap/search.rs @@ -2,7 +2,7 @@ use std::num::{NonZeroU32, NonZeroU64}; use anyhow::Result; use imap_codec::imap_types::core::NonEmptyVec; -use imap_codec::imap_types::search::{SearchKey, MetadataItemSearch}; +use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use crate::imap::index::MailIndex; @@ -115,12 +115,15 @@ impl<'a> Criteria<'a> { pub fn is_modseq(&self) -> bool { use SearchKey::*; match self.0 { - And(and_list) => and_list.as_ref().iter().any(|child| Criteria(child).is_modseq()), + And(and_list) => and_list + .as_ref() + .iter() + .any(|child| Criteria(child).is_modseq()), Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(), Not(child) => Criteria(child).is_modseq(), ModSeq { .. } => true, _ => false, - } + } } /// Returns emails that we now for sure we want to keep @@ -187,7 +190,10 @@ impl<'a> Criteria<'a> { // Sequence logic maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(), - ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, midx).into(), + ModSeq { + metadata_item, + modseq, + } => is_keep_modseq(metadata_item, modseq, midx).into(), // All the stuff we can't evaluate yet Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_) @@ -225,7 +231,10 @@ impl<'a> Criteria<'a> { //@FIXME Reevaluating our previous logic... maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx), - ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(), + ModSeq { + metadata_item, + modseq, + } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(), // Filter on mail meta Before(search_naive) => match mail_view.stored_naive_date() { @@ -331,7 +340,7 @@ fn approx_sequence_set_size(seq_set: &SequenceSet) -> u64 { } // This is wrong as sequence UID can have holes, -// as we don't know the number of messages in the mailbox also +// as we don't know the number of messages in the mailbox also // we gave to guess fn approx_sequence_size(seq: &Sequence) -> u64 { match seq { @@ -473,9 +482,13 @@ fn is_keep_seq(sk: &SearchKey, midx: &MailIndex) -> bool { } } -fn is_keep_modseq(filter: &Option, modseq: &NonZeroU64, midx: &MailIndex) -> bool { +fn is_keep_modseq( + filter: &Option, + modseq: &NonZeroU64, + midx: &MailIndex, +) -> bool { if filter.is_some() { tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet"); } - modseq <= &midx.modseq + modseq <= &midx.modseq } diff --git a/src/imap/session.rs b/src/imap/session.rs index f4e3d0f..12bbfee 100644 --- a/src/imap/session.rs +++ b/src/imap/session.rs @@ -1,10 +1,10 @@ -use anyhow::{Result, anyhow, bail}; use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::command::{anonymous, authenticated, selected}; use crate::imap::flow; use crate::imap::request::Request; use crate::imap::response::{Response, ResponseOrIdle}; use crate::login::ArcLoginProvider; +use anyhow::{anyhow, bail, Result}; use imap_codec::imap_types::command::Command; //----- @@ -63,7 +63,6 @@ impl Instance { } } - pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle { // Command behavior is modulated by the state. // To prevent state error, we handle the same command in separate code paths. diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 248aab1..5a06670 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -140,8 +140,7 @@ impl BayouState for UidIndex { let bump_uid = new.internalseq.get() - uid.get(); let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq) - .unwrap(); + NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap(); } // Assign the real uid of the email @@ -179,10 +178,10 @@ impl BayouState for UidIndex { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { // Bump UIDValidity if required if *candidate_modseq < new.internalmodseq { - let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_modseq) - .unwrap(); + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); } // Add flags to the source of trust and the cache @@ -205,10 +204,10 @@ impl BayouState for UidIndex { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { // Bump UIDValidity if required if *candidate_modseq < new.internalmodseq { - let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_modseq) - .unwrap(); + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); } // Remove flags from the source of trust and the cache @@ -228,10 +227,10 @@ impl BayouState for UidIndex { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { // Bump UIDValidity if required if *candidate_modseq < new.internalmodseq { - let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; + let bump_modseq = + (new.internalmodseq.get() - candidate_modseq.get()) as u32; new.uidvalidity = - NonZeroU32::new(new.uidvalidity.get() + bump_modseq) - .unwrap(); + NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap(); } // Remove flags from the source of trust and the cache @@ -248,7 +247,7 @@ impl BayouState for UidIndex { existing_flags.append(&mut to_add); new.idx_by_flag.remove(*uid, &rm_flags); new.idx_by_flag.insert(*uid, &to_add); - + // Register that email has been modified new.idx_by_modseq.insert(new.internalmodseq, *ident); *email_modseq = new.internalmodseq; @@ -448,7 +447,12 @@ mod tests { { let m = UniqueIdent([0x03; 24]); let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; - let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU64::new(1).unwrap(), f); + let ev = UidIndexOp::MailAdd( + m, + NonZeroU32::new(1).unwrap(), + NonZeroU64::new(1).unwrap(), + f, + ); state = state.apply(&ev); } diff --git a/tests/behavior.rs b/tests/behavior.rs index 205f5e1..699f59d 100644 --- a/tests/behavior.rs +++ b/tests/behavior.rs @@ -1,8 +1,8 @@ use anyhow::Context; mod common; -use crate::common::fragments::*; use crate::common::constants::*; +use crate::common::fragments::*; fn main() { rfc3501_imap4rev1_base(); @@ -23,27 +23,40 @@ fn rfc3501_imap4rev1_base() { create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; // UNSUBSCRIBE IS NOT IMPLEMENTED YET //unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?; - let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; + let select_res = + select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; assert!(select_res.contains("* 0 EXISTS")); check(imap_socket).context("check must run")?; - status(imap_socket, Mailbox::Archive, StatusKind::UidNext).context("status of archive from inbox")?; + status(imap_socket, Mailbox::Archive, StatusKind::UidNext) + .context("status of archive from inbox")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?; - let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None) - .context("fetch rfc822 message, should be our first message")?; + let srv_msg = fetch( + imap_socket, + Selection::FirstId, + FetchKind::Rfc822, + FetchMod::None, + ) + .context("fetch rfc822 message, should be our first message")?; let orig_email = std::str::from_utf8(EMAIL1)?; assert!(srv_msg.contains(orig_email)); - + copy(imap_socket, Selection::FirstId, Mailbox::Archive) .context("copy message to the archive mailbox")?; append_email(imap_socket, Email::Basic).context("insert email in INBOX")?; noop_exists(imap_socket, 2).context("noop loop must detect a new email")?; search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something"); - store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None) - .context("should add delete flag to the email")?; + store( + imap_socket, + Selection::FirstId, + Flag::Deleted, + StoreAction::AddFlags, + StoreMod::None, + ) + .context("should add delete flag to the email")?; expunge(imap_socket).context("expunge emails")?; rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts) .context("Archive mailbox is renamed Drafts")?; @@ -63,19 +76,32 @@ fn rfc3691_imapext_unselect() { capability(imap_socket, Extension::Unselect).context("check server capabilities")?; login(imap_socket, Account::Alice).context("login test")?; - let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; + let select_res = + select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; assert!(select_res.contains("* 0 EXISTS")); noop_exists(imap_socket, 1).context("noop loop must detect a new email")?; - store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None) - .context("add delete flags to the email")?; + store( + imap_socket, + Selection::FirstId, + Flag::Deleted, + StoreAction::AddFlags, + StoreMod::None, + ) + .context("add delete flags to the email")?; unselect(imap_socket) .context("unselect inbox while preserving email with the \\Delete flag")?; - let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?; + let select_res = + select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?; assert!(select_res.contains("* 1 EXISTS")); - let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None) - .context("message is still present")?; + let srv_msg = fetch( + imap_socket, + Selection::FirstId, + FetchKind::Rfc822, + FetchMod::None, + ) + .context("message is still present")?; let orig_email = std::str::from_utf8(EMAIL2)?; assert!(srv_msg.contains(orig_email)); @@ -111,7 +137,8 @@ fn rfc6851_imapext_move() { capability(imap_socket, Extension::Move).context("check server capabilities")?; login(imap_socket, Account::Alice).context("login test")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; - let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; + let select_res = + select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; assert!(select_res.contains("* 0 EXISTS")); lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; @@ -123,15 +150,17 @@ fn rfc6851_imapext_move() { unselect(imap_socket) .context("unselect inbox while preserving email with the \\Delete flag")?; - let select_res = select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?; + let select_res = + select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?; assert!(select_res.contains("* 1 EXISTS")); let srv_msg = fetch( - imap_socket, - Selection::FirstId, - FetchKind::Rfc822, + imap_socket, + Selection::FirstId, + FetchKind::Rfc822, FetchMod::None, - ).context("check mail exists")?; + ) + .context("check mail exists")?; let orig_email = std::str::from_utf8(EMAIL2)?; assert!(srv_msg.contains(orig_email)); @@ -166,7 +195,8 @@ fn rfc4551_imapext_condstore() { login(imap_socket, Account::Alice).context("login test")?; // RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE - let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?; + let select_res = + select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?; // RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE assert!(select_res.contains("[HIGHESTMODSEQ 1]")); @@ -175,14 +205,25 @@ fn rfc4551_imapext_condstore() { lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; noop_exists(imap_socket, 2).context("noop loop must detect a new email")?; - let store_res = store(imap_socket, Selection::All, Flag::Important, StoreAction::AddFlags, StoreMod::UnchangedSince(1))?; + let store_res = store( + imap_socket, + Selection::All, + Flag::Important, + StoreAction::AddFlags, + StoreMod::UnchangedSince(1), + )?; assert!(store_res.contains("[MODIFIED 2]")); assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))")); assert!(!store_res.contains("* 2 FETCH")); assert_eq!(store_res.lines().count(), 2); // RFC 3.1.4. FETCH and UID FETCH Commands - let fetch_res = fetch(imap_socket, Selection::All, FetchKind::Rfc822Size, FetchMod::ChangedSince(2))?; + let fetch_res = fetch( + imap_socket, + Selection::All, + FetchKind::Rfc822Size, + FetchMod::ChangedSince(2), + )?; assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))")); assert!(!fetch_res.contains("* 2 FETCH")); assert_eq!(store_res.lines().count(), 2); diff --git a/tests/common/fragments.rs b/tests/common/fragments.rs index f9ad87e..29d5d10 100644 --- a/tests/common/fragments.rs +++ b/tests/common/fragments.rs @@ -286,7 +286,12 @@ pub fn noop_exists(imap: &mut TcpStream, must_exists: u32) -> Result<()> { } } -pub fn fetch(imap: &mut TcpStream, selection: Selection, kind: FetchKind, modifier: FetchMod) -> Result { +pub fn fetch( + imap: &mut TcpStream, + selection: Selection, + kind: FetchKind, + modifier: FetchMod, +) -> Result { let mut buffer: [u8; 65535] = [0; 65535]; let sel_str = match selection { @@ -363,11 +368,11 @@ pub fn search(imap: &mut TcpStream, sk: SearchKind) -> Result { } pub fn store( - imap: &mut TcpStream, - sel: Selection, + imap: &mut TcpStream, + sel: Selection, flag: Flag, action: StoreAction, - modifier: StoreMod + modifier: StoreMod, ) -> Result { let mut buffer: [u8; 6000] = [0; 6000]; From 23aa313e11f344da07143d60ce446b5f23d5f362 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 19 Jan 2024 14:13:43 +0100 Subject: [PATCH 9/9] Testing idle --- src/imap/capability.rs | 1 + tests/behavior.rs | 25 +++++++++++++++++++++++-- tests/common/fragments.rs | 18 ++++++++++++++++++ 3 files changed, 42 insertions(+), 2 deletions(-) diff --git a/src/imap/capability.rs b/src/imap/capability.rs index f60c01f..7959832 100644 --- a/src/imap/capability.rs +++ b/src/imap/capability.rs @@ -30,6 +30,7 @@ impl Default for ServerCapability { Capability::Enable, Capability::Move, Capability::LiteralPlus, + Capability::Idle, capability_unselect(), capability_condstore(), //capability_qresync(), diff --git a/tests/behavior.rs b/tests/behavior.rs index 699f59d..2e3c610 100644 --- a/tests/behavior.rs +++ b/tests/behavior.rs @@ -11,6 +11,7 @@ fn main() { rfc6851_imapext_move(); rfc7888_imapext_literal(); rfc4551_imapext_condstore(); + rfc2177_imapext_idle(); println!("โœ… SUCCESS ๐ŸŒŸ๐Ÿš€๐Ÿฅณ๐Ÿ™๐Ÿฅน"); } @@ -21,8 +22,6 @@ fn rfc3501_imap4rev1_base() { capability(imap_socket, Extension::None).context("check server capabilities")?; login(imap_socket, Account::Alice).context("login test")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; - // UNSUBSCRIBE IS NOT IMPLEMENTED YET - //unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?; let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; assert!(select_res.contains("* 0 EXISTS")); @@ -241,3 +240,25 @@ fn rfc4551_imapext_condstore() { }) .expect("test fully run"); } + + +fn rfc2177_imapext_idle() { + println!("๐Ÿงช rfc2177_imapext_idle"); + common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| { + // Test setup + connect(imap_socket).context("server says hello")?; + capability(imap_socket, Extension::Idle).context("check server capabilities")?; + login(imap_socket, Account::Alice).context("login test")?; + select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; + + // Check that new messages from LMTP are correctly detected during idling + start_idle(imap_socket).context("can't start idling")?; + lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; + lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?; + let srv_msg = stop_idle(imap_socket).context("stop idling")?; + assert!(srv_msg.contains("* 1 EXISTS")); + + Ok(()) + }) + .expect("test fully run"); +} diff --git a/tests/common/fragments.rs b/tests/common/fragments.rs index 29d5d10..7f7967a 100644 --- a/tests/common/fragments.rs +++ b/tests/common/fragments.rs @@ -36,6 +36,7 @@ pub enum Extension { Move, Condstore, LiteralPlus, + Idle, } pub enum Enable { @@ -114,6 +115,7 @@ pub fn capability(imap: &mut TcpStream, ext: Extension) -> Result<()> { Extension::Move => Some("MOVE"), Extension::Condstore => Some("CONDSTORE"), Extension::LiteralPlus => Some("LITERAL+"), + Extension::Idle => Some("IDLE"), }; let mut buffer: [u8; 6000] = [0; 6000]; @@ -496,6 +498,22 @@ pub fn enable(imap: &mut TcpStream, ask: Enable, done: Option) -> Result Ok(()) } +pub fn start_idle(imap: &mut TcpStream) -> Result<()> { + let mut buffer: [u8; 1500] = [0; 1500]; + imap.write(&b"98 IDLE\r\n"[..])?; + let read = read_lines(imap, &mut buffer, None)?; + assert_eq!(read[0], b'+'); + Ok(()) +} + +pub fn stop_idle(imap: &mut TcpStream) -> Result { + let mut buffer: [u8; 16536] = [0; 16536]; + imap.write(&b"DONE\r\n"[..])?; + let read = read_lines(imap, &mut buffer, Some(&b"98 OK"[..]))?; + let srv_msg = std::str::from_utf8(read)?; + Ok(srv_msg.to_string()) +} + pub fn logout(imap: &mut TcpStream) -> Result<()> { imap.write(&b"99 logout\r\n"[..])?; let mut buffer: [u8; 1500] = [0; 1500];