Implement IDLE #72

Merged
quentin merged 9 commits from feat/idle into main 2024-01-19 14:04:04 +00:00
6 changed files with 59 additions and 20 deletions
Showing only changes of commit 1a0247e935 - Show all commits

View file

@ -81,7 +81,10 @@ pub async fn dispatch<'a>(
// IDLE extension (rfc2177) // IDLE extension (rfc2177)
CommandBody::Idle => { CommandBody::Idle => {
unimplemented!() Ok((
Response::build().to_req(ctx.req).message("DUMMY response due to anti-pattern").ok()?,
flow::Transition::Idle(tokio::sync::Notify::new()),
))
} }
// In selected mode, we fallback to authenticated when needed // In selected mode, we fallback to authenticated when needed

View file

@ -1,6 +1,7 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Notify;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User; use crate::mail::user::User;
@ -20,7 +21,7 @@ pub enum State {
NotAuthenticated, NotAuthenticated,
Authenticated(Arc<User>), Authenticated(Arc<User>),
Selected(Arc<User>, MailboxView, MailboxPerm), Selected(Arc<User>, MailboxView, MailboxPerm),
Idle(Arc<User>, MailboxView, MailboxPerm), Idle(Arc<User>, MailboxView, MailboxPerm, Notify),
Logout, Logout,
} }
@ -34,7 +35,7 @@ pub enum Transition {
None, None,
Authenticate(Arc<User>), Authenticate(Arc<User>),
Select(MailboxView, MailboxPerm), Select(MailboxView, MailboxPerm),
Idle, Idle(Notify),
UnIdle, UnIdle,
Unselect, Unselect,
Logout, Logout,
@ -54,10 +55,10 @@ impl State {
(State::Selected(u, _, _) , Transition::Unselect) => { (State::Selected(u, _, _) , Transition::Unselect) => {
State::Authenticated(u.clone()) State::Authenticated(u.clone())
} }
(State::Selected(u, m, p), Transition::Idle) => { (State::Selected(u, m, p), Transition::Idle(s)) => {
State::Idle(u, m, p) State::Idle(u, m, p, s)
}, },
(State::Idle(u, m, p), Transition::UnIdle) => { (State::Idle(u, m, p, _), Transition::UnIdle) => {
State::Selected(u, m, p) State::Selected(u, m, p)
}, },
(_, Transition::Logout) => State::Logout, (_, Transition::Logout) => State::Logout,

View file

@ -101,9 +101,10 @@ use tokio::sync::mpsc::*;
enum LoopMode { enum LoopMode {
Quit, Quit,
Interactive, Interactive,
IdleUntil(tokio::sync::Notify), Idle,
} }
// @FIXME a full refactor of this part of the code will be needed sooner or later
struct NetLoop { struct NetLoop {
ctx: ClientContext, ctx: ClientContext,
server: ServerFlow, server: ServerFlow,
@ -189,7 +190,7 @@ impl NetLoop {
loop { loop {
mode = match mode { mode = match mode {
LoopMode::Interactive => self.interactive_mode().await?, LoopMode::Interactive => self.interactive_mode().await?,
LoopMode::IdleUntil(notif) => self.idle_mode(notif).await?, LoopMode::Idle => self.idle_mode().await?,
LoopMode::Quit => break, LoopMode::Quit => break,
} }
} }
@ -237,15 +238,18 @@ impl NetLoop {
} }
self.server.enqueue_status(response.completion); self.server.enqueue_status(response.completion);
}, },
Some(ResponseOrIdle::Idle) => { Some(ResponseOrIdle::StartIdle) => {
let cr = CommandContinuationRequest::basic(None, "idling")?; let cr = CommandContinuationRequest::basic(None, "Idling")?;
self.server.enqueue_continuation(cr); self.server.enqueue_continuation(cr);
return Ok(LoopMode::IdleUntil(tokio::sync::Notify::new())) self.cmd_tx.try_send(Request::Idle)?;
return Ok(LoopMode::Idle)
}, },
None => { None => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
}, },
Some(_) => unreachable!(),
}, },
// When receiving a CTRL+C // When receiving a CTRL+C
@ -256,7 +260,37 @@ impl NetLoop {
Ok(LoopMode::Interactive) Ok(LoopMode::Interactive)
} }
async fn idle_mode(&mut self, notif: tokio::sync::Notify) -> Result<LoopMode> { async fn idle_mode(&mut self) -> Result<LoopMode> {
Ok(LoopMode::IdleUntil(notif)) tokio::select! {
maybe_msg = self.resp_rx.recv() => match maybe_msg {
Some(ResponseOrIdle::Response(response)) => {
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
self.server.enqueue_status(response.completion);
return Ok(LoopMode::Interactive)
},
Some(ResponseOrIdle::IdleEvent(elems)) => {
for body_elem in elems.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
return Ok(LoopMode::Idle)
},
None => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
return Ok(LoopMode::Interactive)
},
Some(ResponseOrIdle::StartIdle) => unreachable!(),
}
};
/*self.cmd_tx.try_send(Request::Idle).unwrap();
Ok(LoopMode::Idle)*/
} }
} }

View file

@ -4,5 +4,5 @@ use tokio::sync::Notify;
#[derive(Debug)] #[derive(Debug)]
pub enum Request { pub enum Request {
ImapCommand(Command<'static>), ImapCommand(Command<'static>),
IdleUntil(Notify), Idle,
} }

View file

@ -116,5 +116,6 @@ impl<'a> Response<'a> {
#[derive(Debug)] #[derive(Debug)]
pub enum ResponseOrIdle { pub enum ResponseOrIdle {
Response(Response<'static>), Response(Response<'static>),
Idle, StartIdle,
IdleEvent(Vec<Body<'static>>),
} }

View file

@ -27,14 +27,14 @@ impl Instance {
pub async fn request(&mut self, req: Request) -> ResponseOrIdle { pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
match req { match req {
Request::IdleUntil(stop) => ResponseOrIdle::Response(self.idle(stop).await), Request::Idle => ResponseOrIdle::Response(self.idle().await),
Request::ImapCommand(cmd) => self.command(cmd).await, Request::ImapCommand(cmd) => self.command(cmd).await,
} }
} }
pub async fn idle(&mut self, stop: tokio::sync::Notify) -> Response<'static> { pub async fn idle(&mut self) -> Response<'static> {
let (user, mbx) = match &mut self.state { let (user, mbx, perm, stop) = match &mut self.state {
flow::State::Idle(ref user, ref mut mailbox, ref perm) => (user, mailbox), flow::State::Idle(ref user, ref mut mailbox, ref perm, ref stop) => (user, mailbox, perm, stop),
_ => unreachable!(), _ => unreachable!(),
}; };
@ -109,7 +109,7 @@ impl Instance {
} }
match self.state { match self.state {
flow::State::Idle(..) => ResponseOrIdle::Idle, flow::State::Idle(..) => ResponseOrIdle::StartIdle,
_ => ResponseOrIdle::Response(resp), _ => ResponseOrIdle::Response(resp),
} }
} }