2024-01-05 09:05:30 +00:00
|
|
|
mod attributes;
|
2024-01-03 14:00:05 +00:00
|
|
|
mod capability;
|
2022-06-17 16:39:36 +00:00
|
|
|
mod command;
|
2024-01-05 09:05:30 +00:00
|
|
|
mod flags;
|
2022-06-22 15:26:52 +00:00
|
|
|
mod flow;
|
2024-01-05 09:05:30 +00:00
|
|
|
mod imf_view;
|
2024-01-06 10:07:53 +00:00
|
|
|
mod index;
|
2024-01-04 19:54:21 +00:00
|
|
|
mod mail_view;
|
2022-06-29 13:39:54 +00:00
|
|
|
mod mailbox_view;
|
2024-01-04 19:54:21 +00:00
|
|
|
mod mime_view;
|
2024-01-17 07:22:15 +00:00
|
|
|
mod request;
|
2024-01-01 16:54:48 +00:00
|
|
|
mod response;
|
2024-01-05 11:40:49 +00:00
|
|
|
mod search;
|
2022-06-22 15:26:52 +00:00
|
|
|
mod session;
|
2022-06-17 16:39:36 +00:00
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
use std::net::SocketAddr;
|
2022-06-03 15:26:25 +00:00
|
|
|
|
2024-01-17 07:22:15 +00:00
|
|
|
use anyhow::{Result, bail};
|
2024-01-02 19:23:33 +00:00
|
|
|
use futures::stream::{FuturesUnordered, StreamExt};
|
|
|
|
|
|
|
|
use tokio::net::TcpListener;
|
2022-06-20 16:09:20 +00:00
|
|
|
use tokio::sync::watch;
|
2024-01-17 07:22:15 +00:00
|
|
|
use tokio::sync::mpsc;
|
2022-06-03 15:26:25 +00:00
|
|
|
|
2024-01-08 21:46:39 +00:00
|
|
|
use imap_codec::imap_types::{core::Text, response::Greeting};
|
2024-01-02 19:23:33 +00:00
|
|
|
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
|
|
|
|
use imap_flow::stream::AnyStream;
|
2024-01-17 07:22:15 +00:00
|
|
|
use imap_codec::imap_types::response::{Code, Response, CommandContinuationRequest, Status};
|
2024-01-02 19:23:33 +00:00
|
|
|
|
2024-01-17 07:22:15 +00:00
|
|
|
use crate::imap::response::{Body, ResponseOrIdle};
|
|
|
|
use crate::imap::session::Instance;
|
|
|
|
use crate::imap::request::Request;
|
2022-06-20 16:09:20 +00:00
|
|
|
use crate::config::ImapConfig;
|
2024-01-03 11:29:19 +00:00
|
|
|
use crate::imap::capability::ServerCapability;
|
2022-06-22 15:26:52 +00:00
|
|
|
use crate::login::ArcLoginProvider;
|
2022-06-09 08:43:38 +00:00
|
|
|
|
2022-06-17 16:39:36 +00:00
|
|
|
/// Server is a thin wrapper to register our Services in BàL
|
2024-01-02 19:23:33 +00:00
|
|
|
pub struct Server {
|
|
|
|
bind_addr: SocketAddr,
|
|
|
|
login_provider: ArcLoginProvider,
|
2024-01-03 11:29:19 +00:00
|
|
|
capabilities: ServerCapability,
|
2022-06-17 16:39:36 +00:00
|
|
|
}
|
|
|
|
|
2024-01-17 07:22:15 +00:00
|
|
|
#[derive(Clone)]
|
2024-01-02 19:23:33 +00:00
|
|
|
struct ClientContext {
|
|
|
|
addr: SocketAddr,
|
2022-06-20 16:09:20 +00:00
|
|
|
login_provider: ArcLoginProvider,
|
2024-01-02 19:23:33 +00:00
|
|
|
must_exit: watch::Receiver<bool>,
|
2024-01-03 11:29:19 +00:00
|
|
|
server_capabilities: ServerCapability,
|
2022-06-07 10:38:59 +00:00
|
|
|
}
|
2022-06-29 10:50:44 +00:00
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server {
|
|
|
|
Server {
|
|
|
|
bind_addr: config.bind_addr,
|
|
|
|
login_provider: login,
|
2024-01-03 11:29:19 +00:00
|
|
|
capabilities: ServerCapability::default(),
|
2022-06-07 10:38:59 +00:00
|
|
|
}
|
|
|
|
}
|
2022-06-29 10:50:44 +00:00
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
impl Server {
|
|
|
|
pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
|
|
|
|
let tcp = TcpListener::bind(self.bind_addr).await?;
|
|
|
|
tracing::info!("IMAP server listening on {:#}", self.bind_addr);
|
|
|
|
|
|
|
|
let mut connections = FuturesUnordered::new();
|
|
|
|
|
|
|
|
while !*must_exit.borrow() {
|
|
|
|
let wait_conn_finished = async {
|
|
|
|
if connections.is_empty() {
|
|
|
|
futures::future::pending().await
|
|
|
|
} else {
|
|
|
|
connections.next().await
|
|
|
|
}
|
|
|
|
};
|
|
|
|
let (socket, remote_addr) = tokio::select! {
|
|
|
|
a = tcp.accept() => a?,
|
|
|
|
_ = wait_conn_finished => continue,
|
|
|
|
_ = must_exit.changed() => continue,
|
|
|
|
};
|
|
|
|
tracing::info!("IMAP: accepted connection from {}", remote_addr);
|
|
|
|
|
|
|
|
let client = ClientContext {
|
|
|
|
addr: remote_addr.clone(),
|
|
|
|
login_provider: self.login_provider.clone(),
|
|
|
|
must_exit: must_exit.clone(),
|
2024-01-03 11:29:19 +00:00
|
|
|
server_capabilities: self.capabilities.clone(),
|
2024-01-02 19:23:33 +00:00
|
|
|
};
|
2024-01-17 07:22:15 +00:00
|
|
|
let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket)));
|
2024-01-02 19:23:33 +00:00
|
|
|
connections.push(conn);
|
|
|
|
}
|
|
|
|
drop(tcp);
|
2022-06-07 10:38:59 +00:00
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
tracing::info!("IMAP server shutting down, draining remaining connections...");
|
|
|
|
while connections.next().await.is_some() {}
|
2022-06-07 10:38:59 +00:00
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
Ok(())
|
2022-06-07 10:38:59 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-01-17 07:22:15 +00:00
|
|
|
use tokio::sync::mpsc::*;
|
|
|
|
enum LoopMode {
|
|
|
|
Quit,
|
|
|
|
Interactive,
|
2024-01-17 09:14:48 +00:00
|
|
|
Idle,
|
2024-01-17 07:22:15 +00:00
|
|
|
}
|
|
|
|
|
2024-01-17 09:14:48 +00:00
|
|
|
// @FIXME a full refactor of this part of the code will be needed sooner or later
|
2024-01-17 07:22:15 +00:00
|
|
|
struct NetLoop {
|
|
|
|
ctx: ClientContext,
|
|
|
|
server: ServerFlow,
|
|
|
|
cmd_tx: Sender<Request>,
|
|
|
|
resp_rx: UnboundedReceiver<ResponseOrIdle>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl NetLoop {
|
|
|
|
async fn handler(ctx: ClientContext, sock: AnyStream) {
|
|
|
|
let addr = ctx.addr.clone();
|
|
|
|
|
|
|
|
let nl = match Self::new(ctx, sock).await {
|
|
|
|
Ok(nl) => {
|
|
|
|
tracing::debug!(addr=?addr, "netloop successfully initialized");
|
|
|
|
nl
|
|
|
|
},
|
|
|
|
Err(e) => {
|
|
|
|
tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
|
|
|
|
return
|
|
|
|
}
|
|
|
|
};
|
|
|
|
|
|
|
|
match nl.core().await {
|
|
|
|
Ok(()) => {
|
|
|
|
tracing::debug!("closing successful netloop core for {:?}", addr);
|
|
|
|
}
|
|
|
|
Err(e) => {
|
|
|
|
tracing::error!("closing errored netloop core for {:?}: {}", addr, e);
|
|
|
|
}
|
2022-06-14 08:19:24 +00:00
|
|
|
}
|
2022-06-03 15:37:39 +00:00
|
|
|
}
|
2022-06-29 10:50:44 +00:00
|
|
|
|
2024-01-17 07:22:15 +00:00
|
|
|
async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> {
|
|
|
|
// Send greeting
|
|
|
|
let (mut server, _) = ServerFlow::send_greeting(
|
|
|
|
sock,
|
|
|
|
ServerFlowOptions {
|
|
|
|
crlf_relaxed: false,
|
|
|
|
literal_accept_text: Text::unvalidated("OK"),
|
|
|
|
literal_reject_text: Text::unvalidated("Literal rejected"),
|
|
|
|
..ServerFlowOptions::default()
|
|
|
|
},
|
|
|
|
Greeting::ok(
|
|
|
|
Some(Code::Capability(ctx.server_capabilities.to_vec())),
|
|
|
|
"Aerogramme",
|
|
|
|
)
|
|
|
|
.unwrap(),
|
|
|
|
)
|
|
|
|
.await?;
|
|
|
|
|
|
|
|
// Start a mailbox session in background
|
|
|
|
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3);
|
|
|
|
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>();
|
|
|
|
tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
|
|
|
|
|
|
|
|
// Return the object
|
|
|
|
Ok(NetLoop { ctx, server, cmd_tx, resp_rx })
|
|
|
|
}
|
|
|
|
|
|
|
|
/// Coms with the background session
|
|
|
|
async fn session(ctx: ClientContext, mut cmd_rx: Receiver<Request>, resp_tx: UnboundedSender<ResponseOrIdle>) -> () {
|
2024-01-03 11:29:19 +00:00
|
|
|
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
|
2024-01-02 19:23:33 +00:00
|
|
|
loop {
|
|
|
|
let cmd = match cmd_rx.recv().await {
|
|
|
|
None => break,
|
|
|
|
Some(cmd_recv) => cmd_recv,
|
|
|
|
};
|
|
|
|
|
2024-01-03 15:52:31 +00:00
|
|
|
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
|
2024-01-17 07:22:15 +00:00
|
|
|
let maybe_response = session.request(cmd).await;
|
|
|
|
tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response");
|
2024-01-02 19:23:33 +00:00
|
|
|
|
|
|
|
match resp_tx.send(maybe_response) {
|
|
|
|
Err(_) => break,
|
|
|
|
Ok(_) => (),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
tracing::info!("runner is quitting");
|
2024-01-17 07:22:15 +00:00
|
|
|
}
|
|
|
|
|
|
|
|
async fn core(mut self) -> Result<()> {
|
|
|
|
let mut mode = LoopMode::Interactive;
|
|
|
|
loop {
|
|
|
|
mode = match mode {
|
|
|
|
LoopMode::Interactive => self.interactive_mode().await?,
|
2024-01-17 09:14:48 +00:00
|
|
|
LoopMode::Idle => self.idle_mode().await?,
|
2024-01-17 07:22:15 +00:00
|
|
|
LoopMode::Quit => break,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
Ok(())
|
|
|
|
}
|
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
|
2024-01-17 07:22:15 +00:00
|
|
|
async fn interactive_mode(&mut self) -> Result<LoopMode> {
|
2024-01-02 19:23:33 +00:00
|
|
|
tokio::select! {
|
|
|
|
// Managing imap_flow stuff
|
2024-01-17 07:22:15 +00:00
|
|
|
srv_evt = self.server.progress() => match srv_evt? {
|
2024-01-02 19:23:33 +00:00
|
|
|
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
|
|
|
|
match response {
|
2024-01-17 07:22:15 +00:00
|
|
|
Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
|
|
|
|
_ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
|
2024-01-02 19:23:33 +00:00
|
|
|
}
|
|
|
|
},
|
|
|
|
ServerFlowEvent::CommandReceived { command } => {
|
2024-01-17 07:22:15 +00:00
|
|
|
match self.cmd_tx.try_send(Request::ImapCommand(command)) {
|
2024-01-02 19:23:33 +00:00
|
|
|
Ok(_) => (),
|
|
|
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
2024-01-17 07:22:15 +00:00
|
|
|
self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
|
|
|
|
tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
|
2024-01-02 19:23:33 +00:00
|
|
|
}
|
|
|
|
_ => {
|
2024-01-17 07:22:15 +00:00
|
|
|
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
|
|
|
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
|
2024-01-02 19:23:33 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
},
|
2024-01-10 13:45:36 +00:00
|
|
|
flow => {
|
2024-01-17 07:22:15 +00:00
|
|
|
self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
|
|
|
|
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
|
2024-01-10 13:45:36 +00:00
|
|
|
}
|
2024-01-02 19:23:33 +00:00
|
|
|
},
|
|
|
|
|
|
|
|
// Managing response generated by Aerogramme
|
2024-01-17 07:22:15 +00:00
|
|
|
maybe_msg = self.resp_rx.recv() => match maybe_msg {
|
|
|
|
Some(ResponseOrIdle::Response(response)) => {
|
|
|
|
for body_elem in response.body.into_iter() {
|
|
|
|
let _handle = match body_elem {
|
|
|
|
Body::Data(d) => self.server.enqueue_data(d),
|
|
|
|
Body::Status(s) => self.server.enqueue_status(s),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
self.server.enqueue_status(response.completion);
|
|
|
|
},
|
2024-01-17 09:14:48 +00:00
|
|
|
Some(ResponseOrIdle::StartIdle) => {
|
|
|
|
let cr = CommandContinuationRequest::basic(None, "Idling")?;
|
2024-01-17 07:22:15 +00:00
|
|
|
self.server.enqueue_continuation(cr);
|
2024-01-17 09:14:48 +00:00
|
|
|
self.cmd_tx.try_send(Request::Idle)?;
|
|
|
|
return Ok(LoopMode::Idle)
|
2024-01-17 07:22:15 +00:00
|
|
|
},
|
|
|
|
None => {
|
|
|
|
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
|
|
|
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
|
|
|
|
},
|
2024-01-17 09:14:48 +00:00
|
|
|
Some(_) => unreachable!(),
|
|
|
|
|
2024-01-02 19:23:33 +00:00
|
|
|
},
|
|
|
|
|
|
|
|
// When receiving a CTRL+C
|
2024-01-17 07:22:15 +00:00
|
|
|
_ = self.ctx.must_exit.changed() => {
|
|
|
|
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
|
2024-01-02 19:23:33 +00:00
|
|
|
},
|
|
|
|
};
|
2024-01-17 07:22:15 +00:00
|
|
|
Ok(LoopMode::Interactive)
|
2022-06-03 15:26:25 +00:00
|
|
|
}
|
|
|
|
|
2024-01-17 09:14:48 +00:00
|
|
|
async fn idle_mode(&mut self) -> Result<LoopMode> {
|
|
|
|
tokio::select! {
|
|
|
|
maybe_msg = self.resp_rx.recv() => match maybe_msg {
|
|
|
|
Some(ResponseOrIdle::Response(response)) => {
|
|
|
|
for body_elem in response.body.into_iter() {
|
|
|
|
let _handle = match body_elem {
|
|
|
|
Body::Data(d) => self.server.enqueue_data(d),
|
|
|
|
Body::Status(s) => self.server.enqueue_status(s),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
self.server.enqueue_status(response.completion);
|
|
|
|
return Ok(LoopMode::Interactive)
|
|
|
|
},
|
|
|
|
Some(ResponseOrIdle::IdleEvent(elems)) => {
|
|
|
|
for body_elem in elems.into_iter() {
|
|
|
|
let _handle = match body_elem {
|
|
|
|
Body::Data(d) => self.server.enqueue_data(d),
|
|
|
|
Body::Status(s) => self.server.enqueue_status(s),
|
|
|
|
};
|
|
|
|
}
|
|
|
|
return Ok(LoopMode::Idle)
|
|
|
|
},
|
|
|
|
None => {
|
|
|
|
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
|
|
|
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
|
|
|
|
return Ok(LoopMode::Interactive)
|
|
|
|
},
|
|
|
|
Some(ResponseOrIdle::StartIdle) => unreachable!(),
|
|
|
|
}
|
|
|
|
};
|
|
|
|
/*self.cmd_tx.try_send(Request::Idle).unwrap();
|
|
|
|
Ok(LoopMode::Idle)*/
|
2024-01-17 07:22:15 +00:00
|
|
|
}
|
2022-06-09 08:43:38 +00:00
|
|
|
}
|