diff --git a/src/command.rs b/src/command.rs index 4c2ec41..a9b39e4 100644 --- a/src/command.rs +++ b/src/command.rs @@ -1,5 +1,3 @@ -use std::sync::{Arc, Mutex}; - use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; use imap_codec::types::core::{Tag, AString}; @@ -10,15 +8,15 @@ use imap_codec::types::fetch_attributes::MacroOrFetchAttributes; use crate::mailstore::Mailstore; use crate::mailbox::Mailbox; -use crate::service::Session; +use crate::session; pub struct Command<'a> { tag: Tag, - session: &'a mut Session, + session: &'a mut session::Instance, } impl<'a> Command<'a> { - pub fn new(tag: Tag, session: &'a mut Session) -> Self { + pub fn new(tag: Tag, session: &'a mut session::Instance) -> Self { Self { tag, session } } diff --git a/src/main.rs b/src/main.rs index 8f7d1e5..1391e7a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -7,6 +7,7 @@ mod mailbox; mod mailstore; mod server; mod service; +mod session; mod time; mod uidindex; diff --git a/src/service.rs b/src/service.rs index 051e065..35c753b 100644 --- a/src/service.rs +++ b/src/service.rs @@ -1,4 +1,4 @@ -use std::sync::{Arc, Mutex}; +use std::sync::Arc; use std::task::{Context, Poll}; use anyhow::Result; @@ -7,17 +7,10 @@ use boitalettres::errors::Error as BalError; use boitalettres::proto::{Request, Response}; use futures::future::BoxFuture; use futures::future::FutureExt; -use imap_codec::types::command::CommandBody; -use tokio::sync::mpsc; -use tokio::sync::mpsc::error::TrySendError; use tower::Service; -use crate::command; -use crate::login::Credentials; use crate::mailstore::Mailstore; -use crate::mailbox::Mailbox; - -const MAX_PIPELINED_COMMANDS: usize = 10; +use crate::session; pub struct Instance { pub mailstore: Arc, @@ -39,21 +32,16 @@ impl<'a> Service<&'a AddrStream> for Instance { fn call(&mut self, addr: &'a AddrStream) -> Self::Future { tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept"); let ms = self.mailstore.clone(); - Box::pin(async { Ok(Connection::new(ms)) }) + async { Ok(Connection::new(ms)) }.boxed() } } pub struct Connection { - pub tx: mpsc::Sender, + session: session::Manager, } impl Connection { pub fn new(mailstore: Arc) -> Self { - let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); - tokio::spawn(async move { - let mut session = Session::new(mailstore, rx); - session.run().await; - }); - Self { tx } + Self { session: session::Manager::new(mailstore) } } } impl Service for Connection { @@ -67,41 +55,9 @@ impl Service for Connection { fn call(&mut self, req: Request) -> Self::Future { tracing::debug!("Got request: {:#?}", req); - match self.tx.try_send(req) { - Ok(()) => return async { Response::ok("Ok") }.boxed(), - Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(), - Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(), - } - - // send a future that await here later a oneshot command + self.session.process(req) } } -pub struct Session { - pub mailstore: Arc, - pub creds: Option, - pub selected: Option, - rx: mpsc::Receiver, -} -impl Session { - pub fn new(mailstore: Arc, rx: mpsc::Receiver) -> Self { - Self { mailstore, rx, creds: None, selected: None, } - } - - pub async fn run(&mut self) { - while let Some(req) = self.rx.recv().await { - let mut cmd = command::Command::new(req.tag, self); - let _ = match req.body { - CommandBody::Capability => cmd.capability().await, - CommandBody::Login { username, password } => cmd.login(username, password).await, - CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await, - CommandBody::List { reference, mailbox_wildcard } => cmd.list(reference, mailbox_wildcard).await, - CommandBody::Select { mailbox } => cmd.select(mailbox).await, - CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await, - _ => Response::bad("Error in IMAP command received by server."), - }; - } - } -} diff --git a/src/session.rs b/src/session.rs new file mode 100644 index 0000000..7553caf --- /dev/null +++ b/src/session.rs @@ -0,0 +1,102 @@ +use std::sync::Arc; + +use boitalettres::proto::{Request, Response}; +use boitalettres::errors::Error as BalError; +use imap_codec::types::command::CommandBody; +use tokio::sync::{oneshot,mpsc}; +use tokio::sync::mpsc::error::TrySendError; +use futures::future::BoxFuture; +use futures::future::FutureExt; + +use crate::command; +use crate::login::Credentials; +use crate::mailstore::Mailstore; +use crate::mailbox::Mailbox; + +/* This constant configures backpressure in the system, + * or more specifically, how many pipelined messages are allowed + * before refusing them + */ +const MAX_PIPELINED_COMMANDS: usize = 10; + +struct Message { + req: Request, + tx: oneshot::Sender, +} + +pub struct Manager { + tx: mpsc::Sender, +} + +//@FIXME we should garbage collect the Instance when the Manager is destroyed. +impl Manager { + pub fn new(mailstore: Arc) -> Self { + let (tx, mut rx) = mpsc::channel(MAX_PIPELINED_COMMANDS); + tokio::spawn(async move { + let mut instance = Instance::new(mailstore, rx); + instance.start().await; + }); + Self { tx } + } + + pub fn process(&self, req: Request) -> BoxFuture<'static, Result> { + let (tx, rx) = oneshot::channel(); + let msg = Message { req, tx }; + + // We use try_send on a bounded channel to protect the daemons from DoS. + // Pipelining requests in IMAP are a special case: they should not occure often + // and in a limited number (like 3 requests). Someone filling the channel + // will probably be malicious so we "rate limit" them. + match self.tx.try_send(msg) { + Ok(()) => (), + Err(TrySendError::Full(_)) => return async { Response::bad("Too fast! Send less pipelined requests!") }.boxed(), + Err(TrySendError::Closed(_)) => return async { Response::bad("The session task has exited") }.boxed(), + }; + + // @FIXME add a timeout, handle a session that fails. + async { + rx.await.or_else(|e| { + tracing::warn!("Got error {:#?}", e); + Response::bad("No response from the session handler") + }) + }.boxed() + } +} + +pub struct Instance { + rx: mpsc::Receiver, + + pub mailstore: Arc, + pub creds: Option, + pub selected: Option, +} +impl Instance { + fn new(mailstore: Arc, rx: mpsc::Receiver) -> Self { + Self { mailstore, rx, creds: None, selected: None, } + } + + //@FIXME add a function that compute the runner's name from its local info + // to ease debug + // fn name(&self) -> String { } + + async fn start(&mut self) { + //@FIXME add more info about the runner + tracing::debug!("starting runner"); + + while let Some(msg) = self.rx.recv().await { + let mut cmd = command::Command::new(msg.req.tag, self); + let _ = match msg.req.body { + CommandBody::Capability => cmd.capability().await, + CommandBody::Login { username, password } => cmd.login(username, password).await, + CommandBody::Lsub { reference, mailbox_wildcard } => cmd.lsub(reference, mailbox_wildcard).await, + CommandBody::List { reference, mailbox_wildcard } => cmd.list(reference, mailbox_wildcard).await, + CommandBody::Select { mailbox } => cmd.select(mailbox).await, + CommandBody::Fetch { sequence_set, attributes, uid } => cmd.fetch(sequence_set, attributes, uid).await, + _ => Response::bad("Error in IMAP command received by server."), + }; + } + + //@FIXME add more info about the runner + tracing::debug!("exiting runner"); + } +}