aerogramme/src/imap/mod.rs

211 lines
6.9 KiB
Rust
Raw Normal View History

2024-01-05 09:05:30 +00:00
mod attributes;
2024-01-03 14:00:05 +00:00
mod capability;
2022-06-17 16:39:36 +00:00
mod command;
2024-01-05 09:05:30 +00:00
mod flags;
2022-06-22 15:26:52 +00:00
mod flow;
2024-01-05 09:05:30 +00:00
mod imf_view;
2024-01-06 10:07:53 +00:00
mod index;
2024-01-04 19:54:21 +00:00
mod mail_view;
mod mailbox_view;
2024-01-04 19:54:21 +00:00
mod mime_view;
mod response;
mod search;
2022-06-22 15:26:52 +00:00
mod session;
2022-06-17 16:39:36 +00:00
2024-01-02 19:23:33 +00:00
use std::net::SocketAddr;
2022-06-03 15:26:25 +00:00
2022-06-07 10:38:59 +00:00
use anyhow::Result;
2024-01-02 19:23:33 +00:00
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
2022-06-20 16:09:20 +00:00
use tokio::sync::watch;
2022-06-03 15:26:25 +00:00
2024-01-08 21:46:39 +00:00
use imap_codec::imap_types::{core::Text, response::Greeting};
2024-01-02 19:23:33 +00:00
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
2022-06-20 16:09:20 +00:00
use crate::config::ImapConfig;
2024-01-03 11:29:19 +00:00
use crate::imap::capability::ServerCapability;
2022-06-22 15:26:52 +00:00
use crate::login::ArcLoginProvider;
2022-06-09 08:43:38 +00:00
2022-06-17 16:39:36 +00:00
/// Server is a thin wrapper to register our Services in BàL
2024-01-02 19:23:33 +00:00
pub struct Server {
bind_addr: SocketAddr,
login_provider: ArcLoginProvider,
2024-01-03 11:29:19 +00:00
capabilities: ServerCapability,
2022-06-17 16:39:36 +00:00
}
2024-01-02 19:23:33 +00:00
struct ClientContext {
stream: AnyStream,
addr: SocketAddr,
2022-06-20 16:09:20 +00:00
login_provider: ArcLoginProvider,
2024-01-02 19:23:33 +00:00
must_exit: watch::Receiver<bool>,
2024-01-03 11:29:19 +00:00
server_capabilities: ServerCapability,
2022-06-07 10:38:59 +00:00
}
2022-06-29 10:50:44 +00:00
2024-01-02 19:23:33 +00:00
pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server {
Server {
bind_addr: config.bind_addr,
login_provider: login,
2024-01-03 11:29:19 +00:00
capabilities: ServerCapability::default(),
2022-06-07 10:38:59 +00:00
}
}
2022-06-29 10:50:44 +00:00
2024-01-02 19:23:33 +00:00
impl Server {
pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
let tcp = TcpListener::bind(self.bind_addr).await?;
tracing::info!("IMAP server listening on {:#}", self.bind_addr);
let mut connections = FuturesUnordered::new();
while !*must_exit.borrow() {
let wait_conn_finished = async {
if connections.is_empty() {
futures::future::pending().await
} else {
connections.next().await
}
};
let (socket, remote_addr) = tokio::select! {
a = tcp.accept() => a?,
_ = wait_conn_finished => continue,
_ = must_exit.changed() => continue,
};
tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext {
stream: AnyStream::new(socket),
addr: remote_addr.clone(),
login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(),
2024-01-03 11:29:19 +00:00
server_capabilities: self.capabilities.clone(),
2024-01-02 19:23:33 +00:00
};
let conn = tokio::spawn(client_wrapper(client));
connections.push(conn);
}
drop(tcp);
2022-06-07 10:38:59 +00:00
2024-01-02 19:23:33 +00:00
tracing::info!("IMAP server shutting down, draining remaining connections...");
while connections.next().await.is_some() {}
2022-06-07 10:38:59 +00:00
2024-01-02 19:23:33 +00:00
Ok(())
2022-06-07 10:38:59 +00:00
}
}
2024-01-02 19:23:33 +00:00
async fn client_wrapper(ctx: ClientContext) {
let addr = ctx.addr.clone();
match client(ctx).await {
Ok(()) => {
2024-01-03 15:52:31 +00:00
tracing::debug!("closing successful session for {:?}", addr);
2024-01-02 19:23:33 +00:00
}
Err(e) => {
tracing::error!("closing errored session for {:?}: {}", addr, e);
2022-06-14 08:19:24 +00:00
}
2022-06-03 15:37:39 +00:00
}
2022-06-03 15:26:25 +00:00
}
2022-06-29 10:50:44 +00:00
2024-01-02 19:23:33 +00:00
async fn client(mut ctx: ClientContext) -> Result<()> {
// Send greeting
let (mut server, _) = ServerFlow::send_greeting(
ctx.stream,
2024-01-08 21:46:39 +00:00
ServerFlowOptions {
crlf_relaxed: false,
literal_accept_text: Text::unvalidated("OK"),
literal_reject_text: Text::unvalidated("Literal rejected"),
..ServerFlowOptions::default()
},
2024-01-03 14:00:05 +00:00
Greeting::ok(
Some(Code::Capability(ctx.server_capabilities.to_vec())),
"Aerogramme",
)
.unwrap(),
2024-01-02 19:23:33 +00:00
)
.await?;
use crate::imap::response::{Body, Response as MyResponse};
use crate::imap::session::Instance;
use imap_codec::imap_types::command::Command;
2024-01-03 14:00:05 +00:00
use imap_codec::imap_types::response::{Code, Response, Status};
2024-01-02 19:23:33 +00:00
use tokio::sync::mpsc;
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
let bckgrnd = tokio::spawn(async move {
2024-01-03 11:29:19 +00:00
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
2024-01-02 19:23:33 +00:00
loop {
let cmd = match cmd_rx.recv().await {
None => break,
Some(cmd_recv) => cmd_recv,
};
2024-01-03 15:52:31 +00:00
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
2024-01-02 19:23:33 +00:00
let maybe_response = session.command(cmd).await;
2024-01-03 15:52:31 +00:00
tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response");
2024-01-02 19:23:33 +00:00
match resp_tx.send(maybe_response) {
Err(_) => break,
Ok(_) => (),
};
}
tracing::info!("runner is quitting");
});
// Main loop
loop {
tokio::select! {
// Managing imap_flow stuff
srv_evt = server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
Response::Status(Status::Bye(_)) => break,
_ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
match cmd_tx.try_send(command) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
}
_ => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
}
}
},
},
// Managing response generated by Aerogramme
maybe_msg = resp_rx.recv() => {
let response = match maybe_msg {
None => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
continue
},
Some(r) => r,
};
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => server.enqueue_data(d),
Body::Status(s) => server.enqueue_status(s),
};
}
server.enqueue_status(response.completion);
},
// When receiving a CTRL+C
_ = ctx.must_exit.changed() => {
server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
2022-06-03 15:26:25 +00:00
}
2024-01-02 19:23:33 +00:00
drop(cmd_tx);
bckgrnd.await?;
Ok(())
2022-06-09 08:43:38 +00:00
}