LMTP refactoring, implement EXPUNGE

This commit is contained in:
Alex 2022-07-13 11:19:08 +02:00
parent a1ca6d9def
commit faca15f164
Signed by: lx
GPG Key ID: 0E496D15096376BE
4 changed files with 63 additions and 39 deletions

View File

@ -160,7 +160,7 @@ impl MailboxView {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let mails = self.get_mail_ids(sequence_set)?; let mails = self.get_mail_ids(sequence_set)?;
for (i, uid, uuid) in mails.iter() { for (_i, _uid, uuid) in mails.iter() {
match kind { match kind {
StoreType::Add => { StoreType::Add => {
self.mailbox.add_flags(*uuid, &flags[..]).await?; self.mailbox.add_flags(*uuid, &flags[..]).await?;
@ -178,7 +178,19 @@ impl MailboxView {
} }
pub async fn expunge(&mut self) -> Result<Vec<Body>> { pub async fn expunge(&mut self) -> Result<Vec<Body>> {
unimplemented!() let deleted_flag = Flag::Deleted.to_string();
let msgs = self
.known_state
.table
.iter()
.filter(|(_uuid, (_uid, flags))| flags.iter().any(|x| *x == deleted_flag))
.map(|(uuid, _)| *uuid);
for msg in msgs {
self.mailbox.delete(msg).await?;
}
self.update().await
} }
/// Looks up state changes in the mailbox and produces a set of IMAP /// Looks up state changes in the mailbox and produces a set of IMAP

View File

@ -5,7 +5,11 @@ use anyhow::Result;
use async_trait::async_trait; use async_trait::async_trait;
use duplexify::Duplex; use duplexify::Duplex;
use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite}; use futures::{io, AsyncRead, AsyncReadExt, AsyncWrite};
use futures::{stream, stream::FuturesUnordered, StreamExt}; use futures::{
stream,
stream::{FuturesOrdered, FuturesUnordered},
StreamExt,
};
use log::*; use log::*;
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::select; use tokio::select;
@ -56,11 +60,12 @@ impl LmtpServer {
_ = wait_conn_finished => continue, _ = wait_conn_finished => continue,
_ = must_exit.changed() => continue, _ = must_exit.changed() => continue,
}; };
info!("LMTP: accepted connection from {}", remote_addr);
let conn = tokio::spawn(smtp_server::interact( let conn = tokio::spawn(smtp_server::interact(
socket.compat(), socket.compat(),
smtp_server::IsAlreadyTls::No, smtp_server::IsAlreadyTls::No,
Conn { remote_addr }, (),
self.clone(), self.clone(),
)); ));
@ -77,10 +82,6 @@ impl LmtpServer {
// ---- // ----
pub struct Conn {
remote_addr: SocketAddr,
}
pub struct Message { pub struct Message {
to: Vec<PublicCredentials>, to: Vec<PublicCredentials>,
} }
@ -89,21 +90,21 @@ pub struct Message {
impl Config for LmtpServer { impl Config for LmtpServer {
type Protocol = smtp_server::protocol::Lmtp; type Protocol = smtp_server::protocol::Lmtp;
type ConnectionUserMeta = Conn; type ConnectionUserMeta = ();
type MailUserMeta = Message; type MailUserMeta = Message;
fn hostname(&self, _conn_meta: &ConnectionMetadata<Conn>) -> &str { fn hostname(&self, _conn_meta: &ConnectionMetadata<()>) -> &str {
&self.hostname &self.hostname
} }
async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<Conn>) -> Message { async fn new_mail(&self, _conn_meta: &mut ConnectionMetadata<()>) -> Message {
Message { to: vec![] } Message { to: vec![] }
} }
async fn tls_accept<IO>( async fn tls_accept<IO>(
&self, &self,
_io: IO, _io: IO,
_conn_meta: &mut ConnectionMetadata<Conn>, _conn_meta: &mut ConnectionMetadata<()>,
) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>> ) -> io::Result<Duplex<Pin<Box<dyn Send + AsyncRead>>, Pin<Box<dyn Send + AsyncWrite>>>>
where where
IO: Send + AsyncRead + AsyncWrite, IO: Send + AsyncRead + AsyncWrite,
@ -118,7 +119,7 @@ impl Config for LmtpServer {
&self, &self,
from: Option<Email>, from: Option<Email>,
_meta: &mut MailMetadata<Message>, _meta: &mut MailMetadata<Message>,
_conn_meta: &mut ConnectionMetadata<Conn>, _conn_meta: &mut ConnectionMetadata<()>,
) -> Decision<Option<Email>> { ) -> Decision<Option<Email>> {
Decision::Accept { Decision::Accept {
reply: reply::okay_from().convert(), reply: reply::okay_from().convert(),
@ -130,7 +131,7 @@ impl Config for LmtpServer {
&self, &self,
to: Email, to: Email,
meta: &mut MailMetadata<Message>, meta: &mut MailMetadata<Message>,
_conn_meta: &mut ConnectionMetadata<Conn>, _conn_meta: &mut ConnectionMetadata<()>,
) -> Decision<Email> { ) -> Decision<Email> {
let to_str = match to.hostname.as_ref() { let to_str = match to.hostname.as_ref() {
Some(h) => format!("{}@{}", to.localpart, h), Some(h) => format!("{}@{}", to.localpart, h),
@ -158,7 +159,7 @@ impl Config for LmtpServer {
&'resp self, &'resp self,
reader: &mut EscapedDataReader<'_, R>, reader: &mut EscapedDataReader<'_, R>,
meta: MailMetadata<Message>, meta: MailMetadata<Message>,
_conn_meta: &'resp mut ConnectionMetadata<Conn>, _conn_meta: &'resp mut ConnectionMetadata<()>,
) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'resp>> ) -> Pin<Box<dyn futures::Stream<Item = Decision<()>> + Send + 'resp>>
where where
R: Send + Unpin + AsyncRead, R: Send + Unpin + AsyncRead,
@ -176,8 +177,8 @@ impl Config for LmtpServer {
}; };
let mut text = Vec::new(); let mut text = Vec::new();
if reader.read_to_end(&mut text).await.is_err() { if let Err(e) = reader.read_to_end(&mut text).await {
return err_response_stream(meta, "io error".into()); return err_response_stream(meta, format!("io error: {}", e));
} }
reader.complete(); reader.complete();
@ -186,23 +187,29 @@ impl Config for LmtpServer {
Err(e) => return err_response_stream(meta, e.to_string()), Err(e) => return err_response_stream(meta, e.to_string()),
}; };
Box::pin(stream::iter(meta.user.to.into_iter()).then(move |creds| { Box::pin(
let encrypted_message = encrypted_message.clone(); meta.user
async move { .to
match encrypted_message.deliver_to(creds).await { .into_iter()
Ok(()) => Decision::Accept { .map(move |creds| {
reply: reply::okay_mail().convert(), let encrypted_message = encrypted_message.clone();
res: (), async move {
}, match encrypted_message.deliver_to(creds).await {
Err(e) => Decision::Reject { Ok(()) => Decision::Accept {
reply: Reply { reply: reply::okay_mail().convert(),
code: ReplyCode::POLICY_REASON, res: (),
ecode: None, },
text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())], Err(e) => Decision::Reject {
}, reply: Reply {
}, code: ReplyCode::POLICY_REASON,
} ecode: None,
} text: vec![smtp_message::MaybeUtf8::Utf8(e.to_string())],
})) },
},
}
}
})
.collect::<FuturesOrdered<_>>(),
)
} }
} }

View File

@ -214,9 +214,14 @@ impl FlagIndex {
}); });
} }
fn remove(&mut self, uid: ImapUid, flags: &Vec<Flag>) -> () { fn remove(&mut self, uid: ImapUid, flags: &Vec<Flag>) -> () {
flags.iter().for_each(|flag| { for flag in flags.iter() {
self.0.get_mut(flag).and_then(|set| set.remove(&uid)); if let Some(set) = self.0.get_mut(flag) {
}); set.remove(&uid);
if set.is_empty() {
self.0.remove(flag);
}
}
}
} }
pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> { pub fn get(&self, f: &Flag) -> Option<&OrdSet<ImapUid>> {

View File

@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use anyhow::{bail, Result}; use anyhow::{bail, Result};
use futures::{try_join, StreamExt}; use futures::try_join;
use log::*; use log::*;
use tokio::sync::watch; use tokio::sync::watch;