2024-01-05 09:05:30 +00:00
mod attributes ;
2024-01-03 14:00:05 +00:00
mod capability ;
2022-06-17 16:39:36 +00:00
mod command ;
2024-01-05 09:05:30 +00:00
mod flags ;
2022-06-22 15:26:52 +00:00
mod flow ;
2024-01-05 09:05:30 +00:00
mod imf_view ;
2024-01-06 10:07:53 +00:00
mod index ;
2024-01-04 19:54:21 +00:00
mod mail_view ;
2022-06-29 13:39:54 +00:00
mod mailbox_view ;
2024-01-04 19:54:21 +00:00
mod mime_view ;
2024-01-17 07:22:15 +00:00
mod request ;
2024-01-01 16:54:48 +00:00
mod response ;
2024-01-05 11:40:49 +00:00
mod search ;
2022-06-22 15:26:52 +00:00
mod session ;
2022-06-17 16:39:36 +00:00
2024-01-02 19:23:33 +00:00
use std ::net ::SocketAddr ;
2022-06-03 15:26:25 +00:00
2024-01-18 17:03:21 +00:00
use anyhow ::{ bail , Result } ;
2024-01-02 19:23:33 +00:00
use futures ::stream ::{ FuturesUnordered , StreamExt } ;
use tokio ::net ::TcpListener ;
2024-01-17 07:22:15 +00:00
use tokio ::sync ::mpsc ;
2024-01-18 17:03:21 +00:00
use tokio ::sync ::watch ;
2022-06-03 15:26:25 +00:00
2024-01-18 17:03:21 +00:00
use imap_codec ::imap_types ::response ::{ Code , CommandContinuationRequest , Response , Status } ;
2024-01-08 21:46:39 +00:00
use imap_codec ::imap_types ::{ core ::Text , response ::Greeting } ;
2024-01-02 19:23:33 +00:00
use imap_flow ::server ::{ ServerFlow , ServerFlowEvent , ServerFlowOptions } ;
use imap_flow ::stream ::AnyStream ;
2024-01-23 15:14:58 +00:00
use tokio_rustls ::TlsAcceptor ;
use rustls_pemfile ::{ certs , private_key } ;
2024-01-02 19:23:33 +00:00
2024-01-23 15:14:58 +00:00
use crate ::config ::{ ImapConfig , ImapUnsecureConfig } ;
2024-01-03 11:29:19 +00:00
use crate ::imap ::capability ::ServerCapability ;
2024-01-18 17:03:21 +00:00
use crate ::imap ::request ::Request ;
use crate ::imap ::response ::{ Body , ResponseOrIdle } ;
use crate ::imap ::session ::Instance ;
2022-06-22 15:26:52 +00:00
use crate ::login ::ArcLoginProvider ;
2022-06-09 08:43:38 +00:00
2022-06-17 16:39:36 +00:00
/// Server is a thin wrapper to register our Services in BàL
2024-01-02 19:23:33 +00:00
pub struct Server {
bind_addr : SocketAddr ,
login_provider : ArcLoginProvider ,
2024-01-03 11:29:19 +00:00
capabilities : ServerCapability ,
2024-01-23 15:14:58 +00:00
tls : Option < TlsAcceptor > ,
2022-06-17 16:39:36 +00:00
}
2024-01-17 07:22:15 +00:00
#[ derive(Clone) ]
2024-01-02 19:23:33 +00:00
struct ClientContext {
addr : SocketAddr ,
2022-06-20 16:09:20 +00:00
login_provider : ArcLoginProvider ,
2024-01-02 19:23:33 +00:00
must_exit : watch ::Receiver < bool > ,
2024-01-03 11:29:19 +00:00
server_capabilities : ServerCapability ,
2022-06-07 10:38:59 +00:00
}
2022-06-29 10:50:44 +00:00
2024-01-23 15:14:58 +00:00
pub fn new ( config : ImapConfig , login : ArcLoginProvider ) -> Result < Server > {
let loaded_certs = certs ( & mut std ::io ::BufReader ::new ( std ::fs ::File ::open ( config . certs ) ? ) ) . collect ::< Result < Vec < _ > , _ > > ( ) ? ;
let loaded_key = private_key ( & mut std ::io ::BufReader ::new ( std ::fs ::File ::open ( config . key ) ? ) ) ? . unwrap ( ) ;
let tls_config = rustls ::ServerConfig ::builder ( )
. with_no_client_auth ( )
. with_single_cert ( loaded_certs , loaded_key ) ? ;
let acceptor = TlsAcceptor ::from ( Arc ::new ( tls_config ) ) ;
Ok ( Server {
bind_addr : config . bind_addr ,
login_provider : login ,
capabilities : ServerCapability ::default ( ) ,
tls : Some ( acceptor ) ,
} )
}
pub fn new_unsecure ( config : ImapUnsecureConfig , login : ArcLoginProvider ) -> Server {
2024-01-02 19:23:33 +00:00
Server {
bind_addr : config . bind_addr ,
login_provider : login ,
2024-01-03 11:29:19 +00:00
capabilities : ServerCapability ::default ( ) ,
2024-01-23 15:14:58 +00:00
tls : None ,
2022-06-07 10:38:59 +00:00
}
}
2022-06-29 10:50:44 +00:00
2024-01-02 19:23:33 +00:00
impl Server {
pub async fn run ( self : Self , mut must_exit : watch ::Receiver < bool > ) -> Result < ( ) > {
let tcp = TcpListener ::bind ( self . bind_addr ) . await ? ;
tracing ::info! ( " IMAP server listening on {:#} " , self . bind_addr ) ;
let mut connections = FuturesUnordered ::new ( ) ;
while ! * must_exit . borrow ( ) {
let wait_conn_finished = async {
if connections . is_empty ( ) {
futures ::future ::pending ( ) . await
} else {
connections . next ( ) . await
}
} ;
let ( socket , remote_addr ) = tokio ::select! {
a = tcp . accept ( ) = > a ? ,
_ = wait_conn_finished = > continue ,
_ = must_exit . changed ( ) = > continue ,
} ;
tracing ::info! ( " IMAP: accepted connection from {} " , remote_addr ) ;
2024-01-23 15:14:58 +00:00
let stream = match self . tls . clone ( ) {
Some ( acceptor ) = > {
let stream = match acceptor . accept ( socket ) . await {
Ok ( v ) = > v ,
Err ( e ) = > {
tracing ::error! ( err = ? e , " TLS negociation failed " ) ;
continue ;
}
} ;
AnyStream ::new ( stream )
} ,
None = > AnyStream ::new ( socket ) ,
} ;
2024-01-02 19:23:33 +00:00
let client = ClientContext {
addr : remote_addr . clone ( ) ,
login_provider : self . login_provider . clone ( ) ,
must_exit : must_exit . clone ( ) ,
2024-01-03 11:29:19 +00:00
server_capabilities : self . capabilities . clone ( ) ,
2024-01-02 19:23:33 +00:00
} ;
2024-01-23 15:14:58 +00:00
let conn = tokio ::spawn ( NetLoop ::handler ( client , stream ) ) ;
2024-01-02 19:23:33 +00:00
connections . push ( conn ) ;
}
drop ( tcp ) ;
2022-06-07 10:38:59 +00:00
2024-01-02 19:23:33 +00:00
tracing ::info! ( " IMAP server shutting down, draining remaining connections... " ) ;
while connections . next ( ) . await . is_some ( ) { }
2022-06-07 10:38:59 +00:00
2024-01-02 19:23:33 +00:00
Ok ( ( ) )
2022-06-07 10:38:59 +00:00
}
}
2024-01-18 17:03:21 +00:00
use std ::sync ::Arc ;
2024-01-17 15:56:05 +00:00
use tokio ::sync ::mpsc ::* ;
use tokio ::sync ::Notify ;
2024-01-18 17:03:21 +00:00
use tokio_util ::bytes ::BytesMut ;
2024-01-17 07:22:15 +00:00
enum LoopMode {
Quit ,
Interactive ,
2024-01-17 15:56:05 +00:00
Idle ( BytesMut , Arc < Notify > ) ,
2024-01-17 07:22:15 +00:00
}
2024-01-17 09:14:48 +00:00
// @FIXME a full refactor of this part of the code will be needed sooner or later
2024-01-17 07:22:15 +00:00
struct NetLoop {
ctx : ClientContext ,
server : ServerFlow ,
cmd_tx : Sender < Request > ,
resp_rx : UnboundedReceiver < ResponseOrIdle > ,
}
impl NetLoop {
async fn handler ( ctx : ClientContext , sock : AnyStream ) {
let addr = ctx . addr . clone ( ) ;
let nl = match Self ::new ( ctx , sock ) . await {
Ok ( nl ) = > {
tracing ::debug! ( addr = ? addr , " netloop successfully initialized " ) ;
nl
2024-01-18 17:03:21 +00:00
}
2024-01-17 07:22:15 +00:00
Err ( e ) = > {
tracing ::error! ( addr = ? addr , err = ? e , " netloop can not be initialized, closing session " ) ;
2024-01-18 17:03:21 +00:00
return ;
2024-01-17 07:22:15 +00:00
}
} ;
match nl . core ( ) . await {
Ok ( ( ) ) = > {
tracing ::debug! ( " closing successful netloop core for {:?} " , addr ) ;
}
Err ( e ) = > {
tracing ::error! ( " closing errored netloop core for {:?}: {} " , addr , e ) ;
}
2022-06-14 08:19:24 +00:00
}
2022-06-03 15:37:39 +00:00
}
2022-06-29 10:50:44 +00:00
2024-01-19 16:42:57 +00:00
async fn new ( ctx : ClientContext , sock : AnyStream ) -> Result < Self > {
2024-01-17 07:22:15 +00:00
// Send greeting
2024-01-19 16:42:57 +00:00
let ( server , _ ) = ServerFlow ::send_greeting (
2024-01-17 07:22:15 +00:00
sock ,
ServerFlowOptions {
crlf_relaxed : false ,
literal_accept_text : Text ::unvalidated ( " OK " ) ,
literal_reject_text : Text ::unvalidated ( " Literal rejected " ) ,
.. ServerFlowOptions ::default ( )
} ,
Greeting ::ok (
Some ( Code ::Capability ( ctx . server_capabilities . to_vec ( ) ) ) ,
" Aerogramme " ,
)
2024-01-18 17:03:21 +00:00
. unwrap ( ) ,
)
. await ? ;
2024-01-17 07:22:15 +00:00
// Start a mailbox session in background
2024-01-19 16:42:57 +00:00
let ( cmd_tx , cmd_rx ) = mpsc ::channel ::< Request > ( 3 ) ;
let ( resp_tx , resp_rx ) = mpsc ::unbounded_channel ::< ResponseOrIdle > ( ) ;
2024-01-17 07:22:15 +00:00
tokio ::spawn ( Self ::session ( ctx . clone ( ) , cmd_rx , resp_tx ) ) ;
// Return the object
2024-01-18 17:03:21 +00:00
Ok ( NetLoop {
ctx ,
server ,
cmd_tx ,
resp_rx ,
} )
2024-01-17 07:22:15 +00:00
}
/// Coms with the background session
2024-01-18 17:03:21 +00:00
async fn session (
ctx : ClientContext ,
mut cmd_rx : Receiver < Request > ,
resp_tx : UnboundedSender < ResponseOrIdle > ,
) -> ( ) {
2024-01-03 11:29:19 +00:00
let mut session = Instance ::new ( ctx . login_provider , ctx . server_capabilities ) ;
2024-01-02 19:23:33 +00:00
loop {
let cmd = match cmd_rx . recv ( ) . await {
None = > break ,
Some ( cmd_recv ) = > cmd_recv ,
} ;
2024-01-03 15:52:31 +00:00
tracing ::debug! ( cmd = ? cmd , sock = % ctx . addr , " command " ) ;
2024-01-17 07:22:15 +00:00
let maybe_response = session . request ( cmd ) . await ;
tracing ::debug! ( cmd = ? maybe_response , sock = % ctx . addr , " response " ) ;
2024-01-02 19:23:33 +00:00
match resp_tx . send ( maybe_response ) {
Err ( _ ) = > break ,
Ok ( _ ) = > ( ) ,
} ;
}
tracing ::info! ( " runner is quitting " ) ;
2024-01-17 07:22:15 +00:00
}
async fn core ( mut self ) -> Result < ( ) > {
let mut mode = LoopMode ::Interactive ;
loop {
mode = match mode {
LoopMode ::Interactive = > self . interactive_mode ( ) . await ? ,
2024-01-17 15:56:05 +00:00
LoopMode ::Idle ( buff , stop ) = > self . idle_mode ( buff , stop ) . await ? ,
2024-01-17 07:22:15 +00:00
LoopMode ::Quit = > break ,
}
}
Ok ( ( ) )
}
async fn interactive_mode ( & mut self ) -> Result < LoopMode > {
2024-01-02 19:23:33 +00:00
tokio ::select! {
// Managing imap_flow stuff
2024-01-17 07:22:15 +00:00
srv_evt = self . server . progress ( ) = > match srv_evt ? {
2024-01-02 19:23:33 +00:00
ServerFlowEvent ::ResponseSent { handle : _handle , response } = > {
match response {
2024-01-17 07:22:15 +00:00
Response ::Status ( Status ::Bye ( _ ) ) = > return Ok ( LoopMode ::Quit ) ,
_ = > tracing ::trace! ( " sent to {} content {:?} " , self . ctx . addr , response ) ,
2024-01-02 19:23:33 +00:00
}
} ,
ServerFlowEvent ::CommandReceived { command } = > {
2024-01-17 07:22:15 +00:00
match self . cmd_tx . try_send ( Request ::ImapCommand ( command ) ) {
2024-01-02 19:23:33 +00:00
Ok ( _ ) = > ( ) ,
Err ( mpsc ::error ::TrySendError ::Full ( _ ) ) = > {
2024-01-17 07:22:15 +00:00
self . server . enqueue_status ( Status ::bye ( None , " Too fast " ) . unwrap ( ) ) ;
tracing ::error! ( " client {:?} is sending commands too fast, closing. " , self . ctx . addr ) ;
2024-01-02 19:23:33 +00:00
}
_ = > {
2024-01-17 07:22:15 +00:00
self . server . enqueue_status ( Status ::bye ( None , " Internal session exited " ) . unwrap ( ) ) ;
tracing ::error! ( " session task exited for {:?}, quitting " , self . ctx . addr ) ;
2024-01-02 19:23:33 +00:00
}
}
} ,
2024-01-10 13:45:36 +00:00
flow = > {
2024-01-17 07:22:15 +00:00
self . server . enqueue_status ( Status ::bye ( None , " Unsupported server flow event " ) . unwrap ( ) ) ;
tracing ::error! ( " session task exited for {:?} due to unsupported flow {:?} " , self . ctx . addr , flow ) ;
2024-01-10 13:45:36 +00:00
}
2024-01-02 19:23:33 +00:00
} ,
// Managing response generated by Aerogramme
2024-01-17 07:22:15 +00:00
maybe_msg = self . resp_rx . recv ( ) = > match maybe_msg {
Some ( ResponseOrIdle ::Response ( response ) ) = > {
for body_elem in response . body . into_iter ( ) {
let _handle = match body_elem {
Body ::Data ( d ) = > self . server . enqueue_data ( d ) ,
Body ::Status ( s ) = > self . server . enqueue_status ( s ) ,
} ;
}
self . server . enqueue_status ( response . completion ) ;
} ,
2024-01-17 15:56:05 +00:00
Some ( ResponseOrIdle ::StartIdle ( stop ) ) = > {
2024-01-17 09:14:48 +00:00
let cr = CommandContinuationRequest ::basic ( None , " Idling " ) ? ;
2024-01-17 07:22:15 +00:00
self . server . enqueue_continuation ( cr ) ;
2024-01-17 09:14:48 +00:00
self . cmd_tx . try_send ( Request ::Idle ) ? ;
2024-01-17 15:56:05 +00:00
return Ok ( LoopMode ::Idle ( BytesMut ::new ( ) , stop ) )
2024-01-17 07:22:15 +00:00
} ,
None = > {
self . server . enqueue_status ( Status ::bye ( None , " Internal session exited " ) . unwrap ( ) ) ;
tracing ::error! ( " session task exited for {:?}, quitting " , self . ctx . addr ) ;
} ,
2024-01-17 09:14:48 +00:00
Some ( _ ) = > unreachable! ( ) ,
2024-01-18 17:03:21 +00:00
2024-01-02 19:23:33 +00:00
} ,
// When receiving a CTRL+C
2024-01-17 07:22:15 +00:00
_ = self . ctx . must_exit . changed ( ) = > {
self . server . enqueue_status ( Status ::bye ( None , " Server is being shutdown " ) . unwrap ( ) ) ;
2024-01-02 19:23:33 +00:00
} ,
} ;
2024-01-17 07:22:15 +00:00
Ok ( LoopMode ::Interactive )
2022-06-03 15:26:25 +00:00
}
2024-01-17 15:56:05 +00:00
async fn idle_mode ( & mut self , mut buff : BytesMut , stop : Arc < Notify > ) -> Result < LoopMode > {
// Flush send
loop {
match self . server . progress_send ( ) . await ? {
Some ( .. ) = > continue ,
None = > break ,
}
}
2024-01-17 09:14:48 +00:00
tokio ::select! {
2024-01-17 15:56:05 +00:00
// Receiving IDLE event from background
2024-01-17 09:14:48 +00:00
maybe_msg = self . resp_rx . recv ( ) = > match maybe_msg {
2024-01-17 15:56:05 +00:00
// Session decided idle is terminated
2024-01-17 09:14:48 +00:00
Some ( ResponseOrIdle ::Response ( response ) ) = > {
for body_elem in response . body . into_iter ( ) {
let _handle = match body_elem {
Body ::Data ( d ) = > self . server . enqueue_data ( d ) ,
Body ::Status ( s ) = > self . server . enqueue_status ( s ) ,
} ;
}
self . server . enqueue_status ( response . completion ) ;
return Ok ( LoopMode ::Interactive )
} ,
2024-01-17 15:56:05 +00:00
// Session has some information for user
2024-01-17 09:14:48 +00:00
Some ( ResponseOrIdle ::IdleEvent ( elems ) ) = > {
for body_elem in elems . into_iter ( ) {
let _handle = match body_elem {
Body ::Data ( d ) = > self . server . enqueue_data ( d ) ,
Body ::Status ( s ) = > self . server . enqueue_status ( s ) ,
} ;
}
2024-01-17 15:56:05 +00:00
self . cmd_tx . try_send ( Request ::Idle ) ? ;
return Ok ( LoopMode ::Idle ( buff , stop ) )
2024-01-17 09:14:48 +00:00
} ,
2024-01-17 15:56:05 +00:00
// Session crashed
2024-01-17 09:14:48 +00:00
None = > {
self . server . enqueue_status ( Status ::bye ( None , " Internal session exited " ) . unwrap ( ) ) ;
tracing ::error! ( " session task exited for {:?}, quitting " , self . ctx . addr ) ;
return Ok ( LoopMode ::Interactive )
} ,
2024-01-17 15:56:05 +00:00
// Session can't start idling while already idling, it's a logic error!
Some ( ResponseOrIdle ::StartIdle ( .. ) ) = > bail! ( " can't start idling while already idling! " ) ,
} ,
// User is trying to interact with us
_read_client_bytes = self . server . stream . read ( & mut buff ) = > {
use imap_codec ::decode ::Decoder ;
let codec = imap_codec ::IdleDoneCodec ::new ( ) ;
match codec . decode ( & buff ) {
Ok ( ( [ ] , imap_codec ::imap_types ::extensions ::idle ::IdleDone ) ) = > {
// Session will be informed that it must stop idle
// It will generate the "done" message and change the loop mode
stop . notify_one ( )
} ,
Err ( _ ) = > ( ) ,
_ = > bail! ( " Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting. " ) ,
} ;
return Ok ( LoopMode ::Idle ( buff , stop ) )
} ,
// When receiving a CTRL+C
_ = self . ctx . must_exit . changed ( ) = > {
self . server . enqueue_status ( Status ::bye ( None , " Server is being shutdown " ) . unwrap ( ) ) ;
return Ok ( LoopMode ::Interactive )
} ,
2024-01-17 09:14:48 +00:00
} ;
2024-01-17 07:22:15 +00:00
}
2022-06-09 08:43:38 +00:00
}