mod attributes; mod capability; mod command; mod flags; mod flow; mod imf_view; mod index; mod mail_view; mod mailbox_view; mod mime_view; mod request; mod response; mod search; mod session; use std::net::SocketAddr; use anyhow::{bail, Result}; use futures::stream::{FuturesUnordered, StreamExt}; use tokio::net::TcpListener; use tokio::sync::mpsc; use tokio::sync::watch; use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status}; use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::stream::AnyStream; use crate::config::ImapConfig; use crate::imap::capability::ServerCapability; use crate::imap::request::Request; use crate::imap::response::{Body, ResponseOrIdle}; use crate::imap::session::Instance; use crate::login::ArcLoginProvider; /// Server is a thin wrapper to register our Services in BàL pub struct Server { bind_addr: SocketAddr, login_provider: ArcLoginProvider, capabilities: ServerCapability, } #[derive(Clone)] struct ClientContext { addr: SocketAddr, login_provider: ArcLoginProvider, must_exit: watch::Receiver, server_capabilities: ServerCapability, } pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server { Server { bind_addr: config.bind_addr, login_provider: login, capabilities: ServerCapability::default(), } } impl Server { pub async fn run(self: Self, mut must_exit: watch::Receiver) -> Result<()> { let tcp = TcpListener::bind(self.bind_addr).await?; tracing::info!("IMAP server listening on {:#}", self.bind_addr); let mut connections = FuturesUnordered::new(); while !*must_exit.borrow() { let wait_conn_finished = async { if connections.is_empty() { futures::future::pending().await } else { connections.next().await } }; let (socket, remote_addr) = tokio::select! { a = tcp.accept() => a?, _ = wait_conn_finished => continue, _ = must_exit.changed() => continue, }; tracing::info!("IMAP: accepted connection from {}", remote_addr); let client = ClientContext { addr: remote_addr.clone(), login_provider: self.login_provider.clone(), must_exit: must_exit.clone(), server_capabilities: self.capabilities.clone(), }; let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket))); connections.push(conn); } drop(tcp); tracing::info!("IMAP server shutting down, draining remaining connections..."); while connections.next().await.is_some() {} Ok(()) } } use std::sync::Arc; use tokio::sync::mpsc::*; use tokio::sync::Notify; use tokio_util::bytes::BytesMut; enum LoopMode { Quit, Interactive, Idle(BytesMut, Arc), } // @FIXME a full refactor of this part of the code will be needed sooner or later struct NetLoop { ctx: ClientContext, server: ServerFlow, cmd_tx: Sender, resp_rx: UnboundedReceiver, } impl NetLoop { async fn handler(ctx: ClientContext, sock: AnyStream) { let addr = ctx.addr.clone(); let nl = match Self::new(ctx, sock).await { Ok(nl) => { tracing::debug!(addr=?addr, "netloop successfully initialized"); nl } Err(e) => { tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session"); return; } }; match nl.core().await { Ok(()) => { tracing::debug!("closing successful netloop core for {:?}", addr); } Err(e) => { tracing::error!("closing errored netloop core for {:?}: {}", addr, e); } } } async fn new(ctx: ClientContext, sock: AnyStream) -> Result { // Send greeting let (server, _) = ServerFlow::send_greeting( sock, ServerFlowOptions { crlf_relaxed: false, literal_accept_text: Text::unvalidated("OK"), literal_reject_text: Text::unvalidated("Literal rejected"), ..ServerFlowOptions::default() }, Greeting::ok( Some(Code::Capability(ctx.server_capabilities.to_vec())), "Aerogramme", ) .unwrap(), ) .await?; // Start a mailbox session in background let (cmd_tx, cmd_rx) = mpsc::channel::(3); let (resp_tx, resp_rx) = mpsc::unbounded_channel::(); tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx)); // Return the object Ok(NetLoop { ctx, server, cmd_tx, resp_rx, }) } /// Coms with the background session async fn session( ctx: ClientContext, mut cmd_rx: Receiver, resp_tx: UnboundedSender, ) -> () { let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); loop { let cmd = match cmd_rx.recv().await { None => break, Some(cmd_recv) => cmd_recv, }; tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command"); let maybe_response = session.request(cmd).await; tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response"); match resp_tx.send(maybe_response) { Err(_) => break, Ok(_) => (), }; } tracing::info!("runner is quitting"); } async fn core(mut self) -> Result<()> { let mut mode = LoopMode::Interactive; loop { mode = match mode { LoopMode::Interactive => self.interactive_mode().await?, LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?, LoopMode::Quit => break, } } Ok(()) } async fn interactive_mode(&mut self) -> Result { tokio::select! { // Managing imap_flow stuff srv_evt = self.server.progress() => match srv_evt? { ServerFlowEvent::ResponseSent { handle: _handle, response } => { match response { Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit), _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response), } }, ServerFlowEvent::CommandReceived { command } => { match self.cmd_tx.try_send(Request::ImapCommand(command)) { Ok(_) => (), Err(mpsc::error::TrySendError::Full(_)) => { self.server.enqueue_status(Status::bye(None, "Too fast").unwrap()); tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr); } _ => { self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); } } }, flow => { self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow); } }, // Managing response generated by Aerogramme maybe_msg = self.resp_rx.recv() => match maybe_msg { Some(ResponseOrIdle::Response(response)) => { for body_elem in response.body.into_iter() { let _handle = match body_elem { Body::Data(d) => self.server.enqueue_data(d), Body::Status(s) => self.server.enqueue_status(s), }; } self.server.enqueue_status(response.completion); }, Some(ResponseOrIdle::StartIdle(stop)) => { let cr = CommandContinuationRequest::basic(None, "Idling")?; self.server.enqueue_continuation(cr); self.cmd_tx.try_send(Request::Idle)?; return Ok(LoopMode::Idle(BytesMut::new(), stop)) }, None => { self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); }, Some(_) => unreachable!(), }, // When receiving a CTRL+C _ = self.ctx.must_exit.changed() => { self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); }, }; Ok(LoopMode::Interactive) } async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc) -> Result { // Flush send loop { match self.server.progress_send().await? { Some(..) => continue, None => break, } } tokio::select! { // Receiving IDLE event from background maybe_msg = self.resp_rx.recv() => match maybe_msg { // Session decided idle is terminated Some(ResponseOrIdle::Response(response)) => { for body_elem in response.body.into_iter() { let _handle = match body_elem { Body::Data(d) => self.server.enqueue_data(d), Body::Status(s) => self.server.enqueue_status(s), }; } self.server.enqueue_status(response.completion); return Ok(LoopMode::Interactive) }, // Session has some information for user Some(ResponseOrIdle::IdleEvent(elems)) => { for body_elem in elems.into_iter() { let _handle = match body_elem { Body::Data(d) => self.server.enqueue_data(d), Body::Status(s) => self.server.enqueue_status(s), }; } self.cmd_tx.try_send(Request::Idle)?; return Ok(LoopMode::Idle(buff, stop)) }, // Session crashed None => { self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr); return Ok(LoopMode::Interactive) }, // Session can't start idling while already idling, it's a logic error! Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"), }, // User is trying to interact with us _read_client_bytes = self.server.stream.read(&mut buff) => { use imap_codec::decode::Decoder; let codec = imap_codec::IdleDoneCodec::new(); match codec.decode(&buff) { Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => { // Session will be informed that it must stop idle // It will generate the "done" message and change the loop mode stop.notify_one() }, Err(_) => (), _ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."), }; return Ok(LoopMode::Idle(buff, stop)) }, // When receiving a CTRL+C _ = self.ctx.must_exit.changed() => { self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); return Ok(LoopMode::Interactive) }, }; } }