From faca15f164c8e2860d27144f75f2dee05742ec6d Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 13 Jul 2022 11:19:08 +0200 Subject: [PATCH] LMTP refactoring, implement EXPUNGE --- src/imap/mailbox_view.rs | 16 +++++++-- src/lmtp.rs | 73 ++++++++++++++++++++++------------------ src/mail/uidindex.rs | 11 ++++-- src/server.rs | 2 +- 4 files changed, 63 insertions(+), 39 deletions(-) diff --git a/src/imap/mailbox_view.rs b/src/imap/mailbox_view.rs index f293bfa..371bc50 100644 --- a/src/imap/mailbox_view.rs +++ b/src/imap/mailbox_view.rs @@ -160,7 +160,7 @@ impl MailboxView { let flags = flags.iter().map(|x| x.to_string()).collect::>(); let mails = self.get_mail_ids(sequence_set)?; - for (i, uid, uuid) in mails.iter() { + for (_i, _uid, uuid) in mails.iter() { match kind { StoreType::Add => { self.mailbox.add_flags(*uuid, &flags[..]).await?; @@ -178,7 +178,19 @@ impl MailboxView { } pub async fn expunge(&mut self) -> Result> { - unimplemented!() + let deleted_flag = Flag::Deleted.to_string(); + let msgs = self + .known_state + .table + .iter() + .filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag)) + .map(|(uuid, _)| *uuid); + + for msg in msgs { + self.mailbox.delete(msg).await?; + } + + self.update().await } /// Looks up state changes in the mailbox and produces a set of IMAP diff --git a/src/lmtp.rs b/src/lmtp.rs index 28194e3..f88ed8f 100644 --- a/src/lmtp.rs +++ b/src/lmtp.rs @@ -5,7 +5,11 @@ use anyhow::Result; use async_trait::async_trait; use duplexify::Duplex; use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite}; -use futures::{stream, stream::FuturesUnordered, StreamExt}; +use futures::{ + stream, + stream::{FuturesOrdered, FuturesUnordered}, + StreamExt, +}; use log::*; use tokio::net::TcpListener; use tokio::select; @@ -56,11 +60,12 @@ impl LmtpServer { _ = wait_conn_finished => continue, _ = must_exit.changed() => continue, }; + info!("LMTP: accepted connection from {}", remote_addr); let conn = tokio::spawn(smtp_server::interact( socket.compat(), smtp_server::IsAlreadyTls::No, - Conn { remote_addr }, + (), self.clone(), )); @@ -77,10 +82,6 @@ impl LmtpServer { // ---- -pub struct Conn { - remote_addr: SocketAddr, -} - pub struct Message { to: Vec, } @@ -89,21 +90,21 @@ pub struct Message { impl Config for LmtpServer { type Protocol = smtp_server::protocol::Lmtp; - type ConnectionUserMeta = Conn; + type ConnectionUserMeta = (); type MailUserMeta = Message; - fn hostname(&self, _conn_meta: &ConnectionMetadata) -> &str { + fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str { &self.hostname } - async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata) -> Message { + async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) -> Message { Message { to: vec![] } } async fn tls_accept( &self, _io: IO, - _conn_meta: &mut ConnectionMetadata, + _conn_meta: &mut ConnectionMetadata<()>, ) -> io::Result>, Pin>>> where IO: Send + AsyncRead + AsyncWrite, @@ -118,7 +119,7 @@ impl Config for LmtpServer { &self, from: Option, _meta: &mut MailMetadata, - _conn_meta: &mut ConnectionMetadata, + _conn_meta: &mut ConnectionMetadata<()>, ) -> Decision> { Decision::Accept { reply: reply::okay_from().convert(), @@ -130,7 +131,7 @@ impl Config for LmtpServer { &self, to: Email, meta: &mut MailMetadata, - _conn_meta: &mut ConnectionMetadata, + _conn_meta: &mut ConnectionMetadata<()>, ) -> Decision { let to_str = match to.hostname.as_ref() { Some(h) => format!("{}@{}", to.localpart, h), @@ -158,7 +159,7 @@ impl Config for LmtpServer { &'resp self, reader: &mut EscapedDataReader<'_, R>, meta: MailMetadata, - _conn_meta: &'resp mut ConnectionMetadata, + _conn_meta: &'resp mut ConnectionMetadata<()>, ) -> Pin> + Send + 'resp>> where R: Send + Unpin + AsyncRead, @@ -176,8 +177,8 @@ impl Config for LmtpServer { }; let mut text = Vec::new(); - if reader.read_to_end(&mut text).await.is_err() { - return err_response_stream(meta, "io error".into()); + if let Err(e) = reader.read_to_end(&mut text).await { + return err_response_stream(meta, format!("io error: {}", e)); } reader.complete(); @@ -186,23 +187,29 @@ impl Config for LmtpServer { Err(e) => return err_response_stream(meta, e.to_string()), }; - Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| { - let encrypted_message = encrypted_message.clone(); - async move { - match encrypted_message.deliver_to(creds).await { - Ok(()) => Decision::Accept { - reply: reply::okay_mail().convert(), - res: (), - }, - Err(e) => Decision::Reject { - reply: Reply { - code: ReplyCode::POLICY_REASON, - ecode: None, - text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], - }, - }, - } - } - })) + Box::pin( + meta.user + .to + .into_iter() + .map(move |creds| { + let encrypted_message = encrypted_message.clone(); + async move { + match encrypted_message.deliver_to(creds).await { + Ok(()) => Decision::Accept { + reply: reply::okay_mail().convert(), + res: (), + }, + Err(e) => Decision::Reject { + reply: Reply { + code: ReplyCode::POLICY_REASON, + ecode: None, + text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], + }, + }, + } + } + }) + .collect::>(), + ) } } diff --git a/src/mail/uidindex.rs b/src/mail/uidindex.rs index 3a3e252..43d6507 100644 --- a/src/mail/uidindex.rs +++ b/src/mail/uidindex.rs @@ -214,9 +214,14 @@ impl FlagIndex { }); } fn remove(&mut self, uid: ImapUid, flags: &Vec) -> () { - flags.iter().for_each(|flag| { - self.0.get_mut(flag).and_then(|set| set.remove(&uid)); - }); + for flag in flags.iter() { + if let Some(set) = self.0.get_mut(flag) { + set.remove(&uid); + if set.is_empty() { + self.0.remove(flag); + } + } + } } pub fn get(&self, f: &Flag) -> Option<&OrdSet> { diff --git a/src/server.rs b/src/server.rs index 55fa5ba..f0eb35f 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use anyhow::{bail, Result}; -use futures::{try_join, StreamExt}; +use futures::try_join; use log::*; use tokio::sync::watch;