Merge pull request 'Implement imap-flow' (#34) from refactor/imap-flow into main

Reviewed-on: #34
This commit is contained in:
Quentin 2024-01-02 22:44:29 +00:00
commit b9a0c1e6ec
31 changed files with 2415 additions and 1647 deletions

1373
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,48 +7,67 @@ license = "AGPL-3.0"
description = "Encrypted mail storage over Garage"
[dependencies]
aws-config = { version = "1.1.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.9.0"
anyhow = "1.0.28"
argon2 = "0.5"
async-trait = "0.1"
backtrace = "0.3"
base64 = "0.21"
clap = { version = "3.1.18", features = ["derive", "env"] }
duplexify = "1.1.0"
eml-codec = { git = "https://git.deuxfleurs.fr/Deuxfleurs/eml-codec.git", branch = "main" }
hex = "0.4"
futures = "0.3"
im = "15"
itertools = "0.10"
lazy_static = "1.4"
ldap3 = { version = "0.10", default-features = false, features = ["tls-rustls"] }
log = "0.4"
hyper-rustls = { version = "0.24", features = ["http2"] }
nix = { version = "0.27", features = ["signal"] }
serde = "1.0.137"
rand = "0.8.5"
rmp-serde = "0.15"
rpassword = "7.0"
sodiumoxide = "0.2"
# async runtime
tokio = { version = "1.18", default-features = false, features = ["rt", "rt-multi-thread", "io-util", "net", "time", "macros", "sync", "signal", "fs"] }
tokio-util = { version = "0.7", features = [ "compat" ] }
toml = "0.5"
zstd = { version = "0.9", default-features = false }
futures = "0.3"
# debug
log = "0.4"
backtrace = "0.3"
tracing-subscriber = "0.3"
tracing = "0.1"
tower = "0.4"
imap-codec = { git = "https://github.com/superboum/imap-codec.git", branch = "v0.5.x" }
# language extensions
lazy_static = "1.4"
duplexify = "1.1.0"
im = "15"
anyhow = "1.0.28"
async-trait = "0.1"
itertools = "0.10"
chrono = { version = "0.4", default-features = false, features = ["alloc"] }
# process related
nix = { version = "0.27", features = ["signal"] }
clap = { version = "3.1.18", features = ["derive", "env"] }
# serialization
serde = "1.0.137"
rmp-serde = "0.15"
toml = "0.5"
base64 = "0.21"
hex = "0.4"
zstd = { version = "0.9", default-features = false }
# cryptography & security
sodiumoxide = "0.2"
argon2 = "0.5"
rand = "0.8.5"
hyper-rustls = { version = "0.24", features = ["http2"] }
rpassword = "7.0"
# login
ldap3 = { version = "0.10", default-features = false, features = ["tls-rustls"] }
# storage
k2v-client = { git = "https://git.deuxfleurs.fr/Deuxfleurs/garage.git", tag = "v0.9.0" }
boitalettres = { git = "https://git.deuxfleurs.fr/quentin/boitalettres.git", branch = "expose-mydatetime" }
aws-config = { version = "1.1.1", features = ["behavior-version-latest"] }
aws-sdk-s3 = "1.9.0"
# email protocols
eml-codec = { git = "https://git.deuxfleurs.fr/Deuxfleurs/eml-codec.git", branch = "main" }
smtp-message = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
smtp-server = { git = "http://github.com/Alexis211/kannader", branch = "feature/lmtp" }
#k2v-client = { path = "../garage/src/k2v-client" }
imap-codec = { version = "1.0.0", features = ["quirk_crlf_relaxed", "bounded-static"] }
imap-flow = { git = "https://github.com/duesee/imap-flow.git", rev = "e45ce7bb6ab6bda3c71a0c7b05e9b558a5902e90" }
[dev-dependencies]
[patch.crates-io]
imap-types = { git = "https://github.com/duesee/imap-codec", branch = "v2" }
imap-codec = { git = "https://github.com/duesee/imap-codec", branch = "v2" }
[[test]]
name = "imap_features"
path = "tests/imap_features.rs"
harness = false

View file

@ -450,10 +450,7 @@ impl K2vWatch {
) {
let mut row = match Weak::upgrade(&self_weak) {
Some(this) => this.target.clone(),
None => {
error!("can't start loop");
return;
}
None => return,
};
while let Some(this) = Weak::upgrade(&self_weak) {

View file

@ -26,6 +26,7 @@ pub struct ProviderConfig {
#[derive(Serialize, Deserialize, Debug, Clone)]
#[serde(tag = "user_driver")]
pub enum UserManagement {
Demo,
Static(LoginStaticConfig),
Ldap(LoginLdapConfig),
}

View file

@ -1,174 +0,0 @@
Command::FirstLogin {
creds,
user_secrets,
} => {
let creds = make_storage_creds(creds);
let user_secrets = make_user_secrets(user_secrets);
println!("Please enter your password for key decryption.");
println!("If you are using LDAP login, this must be your LDAP password.");
println!("If you are using the static login provider, enter any password, and this will also become your password for local IMAP access.");
let password = rpassword::prompt_password("Enter password: ")?;
let password_confirm = rpassword::prompt_password("Confirm password: ")?;
if password != password_confirm {
bail!("Passwords don't match.");
}
CryptoKeys::init(&creds, &user_secrets, &password).await?;
println!("");
println!("Cryptographic key setup is complete.");
println!("");
println!("If you are using the static login provider, add the following section to your .toml configuration file:");
println!("");
dump_config(&password, &creds);
}
Command::InitializeLocalKeys { creds } => {
let creds = make_storage_creds(creds);
println!("Please enter a password for local IMAP access.");
println!("This password is not used for key decryption, your keys will be printed below (do not lose them!)");
println!(
"If you plan on using LDAP login, stop right here and use `first-login` instead"
);
let password = rpassword::prompt_password("Enter password: ")?;
let password_confirm = rpassword::prompt_password("Confirm password: ")?;
if password != password_confirm {
bail!("Passwords don't match.");
}
let master = gen_key();
let (_, secret) = gen_keypair();
let keys = CryptoKeys::init_without_password(&creds, &master, &secret).await?;
println!("");
println!("Cryptographic key setup is complete.");
println!("");
println!("Add the following section to your .toml configuration file:");
println!("");
dump_config(&password, &creds);
dump_keys(&keys);
}
Command::AddPassword {
creds,
user_secrets,
gen,
} => {
let creds = make_storage_creds(creds);
let user_secrets = make_user_secrets(user_secrets);
let existing_password =
rpassword::prompt_password("Enter existing password to decrypt keys: ")?;
let new_password = if gen {
let password = base64::encode_config(
&u128::to_be_bytes(thread_rng().gen())[..10],
base64::URL_SAFE_NO_PAD,
);
println!("Your new password: {}", password);
println!("Keep it safe!");
password
} else {
let password = rpassword::prompt_password("Enter new password: ")?;
let password_confirm = rpassword::prompt_password("Confirm new password: ")?;
if password != password_confirm {
bail!("Passwords don't match.");
}
password
};
let keys = CryptoKeys::open(&creds, &user_secrets, &existing_password).await?;
keys.add_password(&creds, &user_secrets, &new_password)
.await?;
println!("");
println!("New password added successfully.");
}
Command::DeletePassword {
creds,
user_secrets,
allow_delete_all,
} => {
let creds = make_storage_creds(creds);
let user_secrets = make_user_secrets(user_secrets);
let existing_password = rpassword::prompt_password("Enter password to delete: ")?;
let keys = match allow_delete_all {
true => Some(CryptoKeys::open(&creds, &user_secrets, &existing_password).await?),
false => None,
};
CryptoKeys::delete_password(&creds, &existing_password, allow_delete_all).await?;
println!("");
println!("Password was deleted successfully.");
if let Some(keys) = keys {
println!("As a reminder, here are your cryptographic keys:");
dump_keys(&keys);
}
}
Command::ShowKeys {
creds,
user_secrets,
} => {
let creds = make_storage_creds(creds);
let user_secrets = make_user_secrets(user_secrets);
let existing_password = rpassword::prompt_password("Enter key decryption password: ")?;
let keys = CryptoKeys::open(&creds, &user_secrets, &existing_password).await?;
dump_keys(&keys);
}
}
Ok(())
}
fn make_storage_creds(c: StorageCredsArgs) -> StorageCredentials {
let s3_region = Region {
name: c.region.clone(),
endpoint: c.s3_endpoint,
};
let k2v_region = Region {
name: c.region,
endpoint: c.k2v_endpoint,
};
StorageCredentials {
k2v_region,
s3_region,
aws_access_key_id: c.aws_access_key_id,
aws_secret_access_key: c.aws_secret_access_key,
bucket: c.bucket,
}
}
fn make_user_secrets(c: UserSecretsArgs) -> UserSecrets {
UserSecrets {
user_secret: c.user_secret,
alternate_user_secrets: c
.alternate_user_secrets
.split(',')
.map(|x| x.trim())
.filter(|x| !x.is_empty())
.map(|x| x.to_string())
.collect(),
}
}
fn dump_config(password: &str, creds: &StorageCredentials) {
println!("[login_static.users.<username>]");
println!(
"password = \"{}\"",
hash_password(password).expect("unable to hash password")
);
println!("aws_access_key_id = \"{}\"", creds.aws_access_key_id);
println!(
"aws_secret_access_key = \"{}\"",
creds.aws_secret_access_key
);
}
fn dump_keys(keys: &CryptoKeys) {
println!("master_key = \"{}\"", base64::encode(&keys.master));
println!("secret_key = \"{}\"", base64::encode(&keys.secret));
}

View file

@ -1,92 +1,77 @@
use anyhow::{Error, Result};
use boitalettres::proto::{res::body::Data as Body, Request, Response};
use imap_codec::types::command::CommandBody;
use imap_codec::types::core::AString;
use imap_codec::types::response::{Capability, Data, Status};
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody};
use imap_codec::imap_types::core::AString;
use imap_codec::imap_types::secret::Secret;
use crate::imap::command::anystate;
use crate::imap::flow;
use crate::imap::response::Response;
use crate::login::ArcLoginProvider;
use crate::mail::user::User;
//--- dispatching
pub struct AnonymousContext<'a> {
pub req: &'a Request,
pub login_provider: Option<&'a ArcLoginProvider>,
pub req: &'a Command<'static>,
pub login_provider: &'a ArcLoginProvider,
}
pub async fn dispatch(ctx: AnonymousContext<'_>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body {
CommandBody::Noop => Ok((Response::ok("Noop completed.")?, flow::Transition::None)),
CommandBody::Capability => ctx.capability().await,
CommandBody::Logout => ctx.logout().await,
pub async fn dispatch(ctx: AnonymousContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()),
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => anystate::logout(),
// Specific to anonymous context (3 commands)
CommandBody::Login { username, password } => ctx.login(username, password).await,
_ => Ok((Response::no("Command unavailable")?, flow::Transition::None)),
CommandBody::Authenticate { .. } => {
anystate::not_implemented(ctx.req.tag.clone(), "authenticate")
}
//StartTLS is not implemented for now, we will probably go full TLS.
// Collect other commands
_ => anystate::wrong_state(ctx.req.tag.clone()),
}
}
//--- Command controllers, private
impl<'a> AnonymousContext<'a> {
async fn capability(self) -> Result<(Response, flow::Transition)> {
let capabilities = vec![Capability::Imap4Rev1, Capability::Idle];
let res = Response::ok("Server capabilities")?.with_body(Data::Capability(capabilities));
Ok((res, flow::Transition::None))
}
async fn login(
self,
username: &AString,
password: &AString,
) -> Result<(Response, flow::Transition)> {
username: &AString<'a>,
password: &Secret<AString<'a>>,
) -> Result<(Response<'static>, flow::Transition)> {
let (u, p) = (
String::try_from(username.clone())?,
String::try_from(password.clone())?,
std::str::from_utf8(username.as_ref())?,
std::str::from_utf8(password.declassify().as_ref())?,
);
tracing::info!(user = %u, "command.login");
let login_provider = match &self.login_provider {
Some(lp) => lp,
None => {
return Ok((
Response::no("Login command not available (already logged in)")?,
flow::Transition::None,
))
}
};
let creds = match login_provider.login(&u, &p).await {
let creds = match self.login_provider.login(&u, &p).await {
Err(e) => {
tracing::debug!(error=%e, "authentication failed");
return Ok((
Response::no("Authentication failed")?,
Response::build()
.to_req(self.req)
.message("Authentication failed")
.no()?,
flow::Transition::None,
));
}
Ok(c) => c,
};
let user = User::new(u.clone(), creds).await?;
let user = User::new(u.to_string(), creds).await?;
tracing::info!(username=%u, "connected");
Ok((
Response::ok("Completed")?,
Response::build()
.to_req(self.req)
.message("Completed")
.ok()?,
flow::Transition::Authenticate(user),
))
}
// C: 10 logout
// S: * BYE Logging out
// S: 10 OK Logout completed.
async fn logout(self) -> Result<(Response, flow::Transition)> {
// @FIXME we should implement From<Vec<Status>> and From<Vec<ImapStatus>> in
// boitalettres/src/proto/res/body.rs
Ok((
Response::ok("Logout completed")?.with_body(vec![Body::Status(
Status::bye(None, "Logging out")
.map_err(|e| Error::msg(e).context("Unable to generate IMAP status"))?,
)]),
flow::Transition::Logout,
))
}
}

View file

@ -0,0 +1,52 @@
use anyhow::Result;
use imap_codec::imap_types::core::{NonEmptyVec, Tag};
use imap_codec::imap_types::response::{Capability, Data};
use crate::imap::flow;
use crate::imap::response::Response;
pub(crate) fn capability(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
let capabilities: NonEmptyVec<Capability> =
(vec![Capability::Imap4Rev1, Capability::Idle]).try_into()?;
let res = Response::build()
.tag(tag)
.message("Server capabilities")
.data(Data::Capability(capabilities))
.ok()?;
Ok((res, flow::Transition::None))
}
pub(crate) fn noop_nothing(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build().tag(tag).message("Noop completed.").ok()?,
flow::Transition::None,
))
}
pub(crate) fn logout() -> Result<(Response<'static>, flow::Transition)> {
Ok((Response::bye()?, flow::Transition::Logout))
}
pub(crate) fn not_implemented<'a>(
tag: Tag<'a>,
what: &str,
) -> Result<(Response<'a>, flow::Transition)> {
Ok((
Response::build()
.tag(tag)
.message(format!("Command not implemented {}", what))
.bad()?,
flow::Transition::None,
))
}
pub(crate) fn wrong_state(tag: Tag<'static>) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.tag(tag)
.message("Command not authorized in this state")
.bad()?,
flow::Transition::None,
))
}

View file

@ -2,37 +2,42 @@ use std::collections::BTreeMap;
use std::sync::Arc;
use anyhow::{anyhow, bail, Result};
use boitalettres::proto::res::body::Data as Body;
use boitalettres::proto::{Request, Response};
use imap_codec::types::command::{CommandBody, StatusAttribute};
use imap_codec::types::core::NonZeroBytes;
use imap_codec::types::datetime::MyDateTime;
use imap_codec::types::flag::{Flag, FlagNameAttribute};
use imap_codec::types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
use imap_codec::types::response::{Code, Data, StatusAttributeValue};
use imap_codec::imap_types::command::{Command, CommandBody};
use imap_codec::imap_types::core::{Atom, Literal, QuotedChar};
use imap_codec::imap_types::datetime::DateTime;
use imap_codec::imap_types::flag::{Flag, FlagNameAttribute};
use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
use imap_codec::imap_types::response::{Code, CodeOther, Data};
use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName};
use crate::imap::command::anonymous;
use crate::imap::command::{anystate, MailboxName};
use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView;
use crate::imap::response::Response;
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::*;
use crate::mail::user::{User, INBOX, MAILBOX_HIERARCHY_DELIMITER};
use crate::mail::user::{User, MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW};
use crate::mail::IMF;
pub struct AuthenticatedContext<'a> {
pub req: &'a Request,
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
}
pub async fn dispatch(ctx: AuthenticatedContext<'_>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body {
pub async fn dispatch<'a>(
ctx: AuthenticatedContext<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any state
CommandBody::Noop => anystate::noop_nothing(ctx.req.tag.clone()),
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => anystate::logout(),
// Specific to this state (11 commands)
CommandBody::Create { mailbox } => ctx.create(mailbox).await,
CommandBody::Delete { mailbox } => ctx.delete(mailbox).await,
CommandBody::Rename {
mailbox,
new_mailbox,
} => ctx.rename(mailbox, new_mailbox).await,
CommandBody::Rename { from, to } => ctx.rename(from, to).await,
CommandBody::Lsub {
reference,
mailbox_wildcard,
@ -43,8 +48,8 @@ pub async fn dispatch(ctx: AuthenticatedContext<'_>) -> Result<(Response, flow::
} => ctx.list(reference, mailbox_wildcard, false).await,
CommandBody::Status {
mailbox,
attributes,
} => ctx.status(mailbox, attributes).await,
item_names,
} => ctx.status(mailbox, item_names).await,
CommandBody::Subscribe { mailbox } => ctx.subscribe(mailbox).await,
CommandBody::Unsubscribe { mailbox } => ctx.unsubscribe(mailbox).await,
CommandBody::Select { mailbox } => ctx.select(mailbox).await,
@ -55,90 +60,148 @@ pub async fn dispatch(ctx: AuthenticatedContext<'_>) -> Result<(Response, flow::
date,
message,
} => ctx.append(mailbox, flags, date, message).await,
_ => {
let ctx = anonymous::AnonymousContext {
req: ctx.req,
login_provider: None,
};
anonymous::dispatch(ctx).await
}
// Collect other commands
_ => anystate::wrong_state(ctx.req.tag.clone()),
}
}
// --- PRIVATE ---
impl<'a> AuthenticatedContext<'a> {
async fn create(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
if name == INBOX {
return Ok((
Response::bad("Cannot create INBOX")?,
flow::Transition::None,
));
}
async fn create(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name = match mailbox {
MailboxCodec::Inbox => {
return Ok((
Response::build()
.to_req(self.req)
.message("Cannot create INBOX")
.bad()?,
flow::Transition::None,
));
}
MailboxCodec::Other(aname) => std::str::from_utf8(aname.as_ref())?,
};
match self.user.create_mailbox(&name).await {
Ok(()) => Ok((Response::ok("CREATE complete")?, flow::Transition::None)),
Err(e) => Ok((Response::no(&e.to_string())?, flow::Transition::None)),
Ok(()) => Ok((
Response::build()
.to_req(self.req)
.message("CREATE complete")
.ok()?,
flow::Transition::None,
)),
Err(e) => Ok((
Response::build()
.to_req(self.req)
.message(&e.to_string())
.no()?,
flow::Transition::None,
)),
}
}
async fn delete(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
async fn delete(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
match self.user.delete_mailbox(&name).await {
Ok(()) => Ok((Response::ok("DELETE complete")?, flow::Transition::None)),
Err(e) => Ok((Response::no(&e.to_string())?, flow::Transition::None)),
Ok(()) => Ok((
Response::build()
.to_req(self.req)
.message("DELETE complete")
.ok()?,
flow::Transition::None,
)),
Err(e) => Ok((
Response::build()
.to_req(self.req)
.message(e.to_string())
.no()?,
flow::Transition::None,
)),
}
}
async fn rename(
self,
mailbox: &MailboxCodec,
new_mailbox: &MailboxCodec,
) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
let new_name = String::try_from(new_mailbox.clone())?;
from: &MailboxCodec<'a>,
to: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(from).try_into()?;
let new_name: &str = MailboxName(to).try_into()?;
match self.user.rename_mailbox(&name, &new_name).await {
Ok(()) => Ok((Response::ok("RENAME complete")?, flow::Transition::None)),
Err(e) => Ok((Response::no(&e.to_string())?, flow::Transition::None)),
Ok(()) => Ok((
Response::build()
.to_req(self.req)
.message("RENAME complete")
.ok()?,
flow::Transition::None,
)),
Err(e) => Ok((
Response::build()
.to_req(self.req)
.message(e.to_string())
.no()?,
flow::Transition::None,
)),
}
}
async fn list(
self,
reference: &MailboxCodec,
mailbox_wildcard: &ListMailbox,
reference: &MailboxCodec<'a>,
mailbox_wildcard: &ListMailbox<'a>,
is_lsub: bool,
) -> Result<(Response, flow::Transition)> {
let reference = String::try_from(reference.clone())?;
) -> Result<(Response<'static>, flow::Transition)> {
let mbx_hier_delim: QuotedChar = QuotedChar::unvalidated(MBX_HIER_DELIM_RAW);
let reference: &str = MailboxName(reference).try_into()?;
if !reference.is_empty() {
return Ok((
Response::bad("References not supported")?,
Response::build()
.to_req(self.req)
.message("References not supported")
.bad()?,
flow::Transition::None,
));
}
let wildcard = String::try_from(mailbox_wildcard.clone())?;
// @FIXME would probably need a rewrite to better use the imap_codec library
let wildcard = match mailbox_wildcard {
ListMailbox::Token(v) => std::str::from_utf8(v.as_ref())?,
ListMailbox::String(v) => std::str::from_utf8(v.as_ref())?,
};
if wildcard.is_empty() {
if is_lsub {
return Ok((
Response::ok("LSUB complete")?.with_body(vec![Data::Lsub {
items: vec![],
delimiter: Some(MAILBOX_HIERARCHY_DELIMITER),
mailbox: "".try_into().unwrap(),
}]),
Response::build()
.to_req(self.req)
.message("LSUB complete")
.data(Data::Lsub {
items: vec![],
delimiter: Some(mbx_hier_delim),
mailbox: "".try_into().unwrap(),
})
.ok()?,
flow::Transition::None,
));
} else {
return Ok((
Response::ok("LIST complete")?.with_body(vec![Data::List {
items: vec![],
delimiter: Some(MAILBOX_HIERARCHY_DELIMITER),
mailbox: "".try_into().unwrap(),
}]),
Response::build()
.to_req(self.req)
.message("LIST complete")
.data(Data::List {
items: vec![],
delimiter: Some(mbx_hier_delim),
mailbox: "".try_into().unwrap(),
})
.ok()?,
flow::Transition::None,
));
}
@ -147,7 +210,7 @@ impl<'a> AuthenticatedContext<'a> {
let mailboxes = self.user.list_mailboxes().await?;
let mut vmailboxes = BTreeMap::new();
for mb in mailboxes.iter() {
for (i, _) in mb.match_indices(MAILBOX_HIERARCHY_DELIMITER) {
for (i, _) in mb.match_indices(MBX_HIER_DELIM_RAW) {
if i > 0 {
let smb = &mb[..i];
vmailboxes.entry(smb).or_insert(false);
@ -163,22 +226,22 @@ impl<'a> AuthenticatedContext<'a> {
.to_string()
.try_into()
.map_err(|_| anyhow!("invalid mailbox name"))?;
let mut items = vec![FlagNameAttribute::Extension(
"Subscribed".try_into().unwrap(),
)];
let mut items = vec![FlagNameAttribute::try_from(Atom::unvalidated(
"Subscribed",
))?];
if !*is_real {
items.push(FlagNameAttribute::Noselect);
}
if is_lsub {
ret.push(Data::Lsub {
items,
delimiter: Some(MAILBOX_HIERARCHY_DELIMITER),
delimiter: Some(mbx_hier_delim),
mailbox,
});
} else {
ret.push(Data::List {
items,
delimiter: Some(MAILBOX_HIERARCHY_DELIMITER),
delimiter: Some(mbx_hier_delim),
mailbox,
});
}
@ -190,79 +253,120 @@ impl<'a> AuthenticatedContext<'a> {
} else {
"LIST completed"
};
Ok((Response::ok(msg)?.with_body(ret), flow::Transition::None))
Ok((
Response::build()
.to_req(self.req)
.message(msg)
.many_data(ret)
.ok()?,
flow::Transition::None,
))
}
async fn status(
self,
mailbox: &MailboxCodec,
attributes: &[StatusAttribute],
) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
let mb_opt = self.user.open_mailbox(&name).await?;
mailbox: &MailboxCodec<'static>,
attributes: &[StatusDataItemName],
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(name).await?;
let mb = match mb_opt {
Some(mb) => mb,
None => {
return Ok((
Response::no("Mailbox does not exist")?,
Response::build()
.to_req(self.req)
.message("Mailbox does not exist")
.no()?,
flow::Transition::None,
))
}
};
let (view, _data) = MailboxView::new(mb).await?;
let view = MailboxView::new(mb).await;
let mut ret_attrs = vec![];
for attr in attributes.iter() {
ret_attrs.push(match attr {
StatusAttribute::Messages => StatusAttributeValue::Messages(view.exists()?),
StatusAttribute::Unseen => StatusAttributeValue::Unseen(view.unseen_count() as u32),
StatusAttribute::Recent => StatusAttributeValue::Recent(view.recent()?),
StatusAttribute::UidNext => StatusAttributeValue::UidNext(view.uidnext()),
StatusAttribute::UidValidity => {
StatusAttributeValue::UidValidity(view.uidvalidity())
StatusDataItemName::Messages => StatusDataItem::Messages(view.exists()?),
StatusDataItemName::Unseen => StatusDataItem::Unseen(view.unseen_count() as u32),
StatusDataItemName::Recent => StatusDataItem::Recent(view.recent()?),
StatusDataItemName::UidNext => StatusDataItem::UidNext(view.uidnext()),
StatusDataItemName::UidValidity => {
StatusDataItem::UidValidity(view.uidvalidity())
}
StatusDataItemName::Deleted => {
bail!("quota not implemented, can't return deleted elements waiting for EXPUNGE");
},
StatusDataItemName::DeletedStorage => {
bail!("quota not implemented, can't return freed storage after EXPUNGE will be run");
},
});
}
let data = vec![Body::Data(Data::Status {
let data = Data::Status {
mailbox: mailbox.clone(),
attributes: ret_attrs,
})];
items: ret_attrs.into(),
};
Ok((
Response::ok("STATUS completed")?.with_body(data),
Response::build()
.to_req(self.req)
.message("STATUS completed")
.data(data)
.ok()?,
flow::Transition::None,
))
}
async fn subscribe(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
async fn subscribe(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
if self.user.has_mailbox(&name).await? {
Ok((Response::ok("SUBSCRIBE complete")?, flow::Transition::None))
Ok((
Response::build()
.to_req(self.req)
.message("SUBSCRIBE complete")
.ok()?,
flow::Transition::None,
))
} else {
Ok((
Response::bad(&format!("Mailbox {} does not exist", name))?,
Response::build()
.to_req(self.req)
.message(format!("Mailbox {} does not exist", name))
.bad()?,
flow::Transition::None,
))
}
}
async fn unsubscribe(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
async fn unsubscribe(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
if self.user.has_mailbox(&name).await? {
Ok((
Response::bad(&format!(
"Cannot unsubscribe from mailbox {}: not supported by Aerogramme",
name
))?,
Response::build()
.to_req(self.req)
.message(format!(
"Cannot unsubscribe from mailbox {}: not supported by Aerogramme",
name
))
.bad()?,
flow::Transition::None,
))
} else {
Ok((
Response::bad(&format!("Mailbox {} does not exist", name))?,
Response::build()
.to_req(self.req)
.message(format!("Mailbox {} does not exist", name))
.no()?,
flow::Transition::None,
))
}
@ -301,83 +405,113 @@ impl<'a> AuthenticatedContext<'a> {
* TRACE END ---
*/
async fn select(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
async fn select(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
let mb = match mb_opt {
Some(mb) => mb,
None => {
return Ok((
Response::no("Mailbox does not exist")?,
Response::build()
.to_req(self.req)
.message("Mailbox does not exist")
.no()?,
flow::Transition::None,
))
}
};
tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.selected");
let (mb, data) = MailboxView::new(mb).await?;
let mb = MailboxView::new(mb).await;
let data = mb.summary()?;
Ok((
Response::ok("Select completed")?
.with_extra_code(Code::ReadWrite)
.with_body(data),
Response::build()
.message("Select completed")
.to_req(self.req)
.code(Code::ReadWrite)
.set_body(data)
.ok()?,
flow::Transition::Select(mb),
))
}
async fn examine(self, mailbox: &MailboxCodec) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
async fn examine(
self,
mailbox: &MailboxCodec<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
let mb = match mb_opt {
Some(mb) => mb,
None => {
return Ok((
Response::no("Mailbox does not exist")?,
Response::build()
.to_req(self.req)
.message("Mailbox does not exist")
.no()?,
flow::Transition::None,
))
}
};
tracing::info!(username=%self.user.username, mailbox=%name, "mailbox.examined");
let (mb, data) = MailboxView::new(mb).await?;
let mb = MailboxView::new(mb).await;
let data = mb.summary()?;
Ok((
Response::ok("Examine completed")?
.with_extra_code(Code::ReadOnly)
.with_body(data),
Response::build()
.to_req(self.req)
.message("Examine completed")
.code(Code::ReadOnly)
.set_body(data)
.ok()?,
flow::Transition::Examine(mb),
))
}
async fn append(
self,
mailbox: &MailboxCodec,
flags: &[Flag],
date: &Option<MyDateTime>,
message: &NonZeroBytes,
) -> Result<(Response, flow::Transition)> {
mailbox: &MailboxCodec<'a>,
flags: &[Flag<'a>],
date: &Option<DateTime>,
message: &Literal<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
let append_tag = self.req.tag.clone();
match self.append_internal(mailbox, flags, date, message).await {
Ok((_mb, uidvalidity, uid)) => Ok((
Response::ok("APPEND completed")?.with_extra_code(Code::Other(
"APPENDUID".try_into().unwrap(),
Some(format!("{} {}", uidvalidity, uid)),
)),
Response::build()
.tag(append_tag)
.message("APPEND completed")
.code(Code::Other(CodeOther::unvalidated(
format!("APPENDUID {} {}", uidvalidity, uid).into_bytes(),
)))
.ok()?,
flow::Transition::None,
)),
Err(e) => Ok((
Response::build()
.tag(append_tag)
.message(e.to_string())
.no()?,
flow::Transition::None,
)),
Err(e) => Ok((Response::no(&e.to_string())?, flow::Transition::None)),
}
}
pub(crate) async fn append_internal(
self,
mailbox: &MailboxCodec,
flags: &[Flag],
date: &Option<MyDateTime>,
message: &NonZeroBytes,
mailbox: &MailboxCodec<'a>,
flags: &[Flag<'a>],
date: &Option<DateTime>,
message: &Literal<'a>,
) -> Result<(Arc<Mailbox>, ImapUidvalidity, ImapUidvalidity)> {
let name = String::try_from(mailbox.clone())?;
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
let mb = match mb_opt {
@ -389,8 +523,8 @@ impl<'a> AuthenticatedContext<'a> {
bail!("Cannot set date when appending message");
}
let msg = IMF::try_from(message.as_slice())
.map_err(|_| anyhow!("Could not parse e-mail message"))?;
let msg =
IMF::try_from(message.data()).map_err(|_| anyhow!("Could not parse e-mail message"))?;
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
// TODO: filter allowed flags? ping @Quentin
@ -422,7 +556,7 @@ fn matches_wildcard(wildcard: &str, name: &str) -> bool {
&& j > 0
&& matches[i - 1][j]
&& (wildcard[j - 1] == '*'
|| (wildcard[j - 1] == '%' && name[i - 1] != MAILBOX_HIERARCHY_DELIMITER)));
|| (wildcard[j - 1] == '%' && name[i - 1] != MBX_HIER_DELIM_RAW)));
}
}

View file

@ -1,56 +1,60 @@
use std::sync::Arc;
use anyhow::Result;
use boitalettres::proto::Request;
use boitalettres::proto::Response;
use imap_codec::types::command::{CommandBody, SearchKey};
use imap_codec::types::core::{Charset, NonZeroBytes};
use imap_codec::types::datetime::MyDateTime;
use imap_codec::types::fetch_attributes::MacroOrFetchAttributes;
use imap_codec::types::flag::Flag;
use imap_codec::types::mailbox::Mailbox as MailboxCodec;
use imap_codec::types::response::Code;
use imap_codec::types::sequence::SequenceSet;
use imap_codec::imap_types::command::{Command, CommandBody};
use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::command::authenticated;
use crate::imap::command::{anystate, authenticated};
use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView;
use crate::imap::response::Response;
use crate::mail::user::User;
pub struct ExaminedContext<'a> {
pub req: &'a Request,
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
}
pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body {
// CLOSE in examined state is not the same as in selected state
// (in selected state it also does an EXPUNGE, here it doesn't)
pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => anystate::logout(),
// Specific to the EXAMINE state (specialization of the SELECTED state)
// ~3 commands -> close, fetch, search + NOOP
CommandBody::Close => ctx.close().await,
CommandBody::Fetch {
sequence_set,
attributes,
macro_or_item_names,
uid,
} => ctx.fetch(sequence_set, attributes, uid).await,
} => ctx.fetch(sequence_set, macro_or_item_names, uid).await,
CommandBody::Search {
charset,
criteria,
uid,
} => ctx.search(charset, criteria, uid).await,
CommandBody::Noop => ctx.noop().await,
CommandBody::Append {
mailbox,
flags,
date,
message,
} => ctx.append(mailbox, flags, date, message).await,
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
CommandBody::Expunge { .. } | CommandBody::Store { .. } => Ok((
Response::build()
.to_req(ctx.req)
.message("Forbidden command: can't write in read-only mode (EXAMINE)")
.bad()?,
flow::Transition::None,
)),
// In examined mode, we fallback to authenticated when needed
_ => {
let ctx = authenticated::AuthenticatedContext {
authenticated::dispatch(authenticated::AuthenticatedContext {
req: ctx.req,
user: ctx.user,
};
authenticated::dispatch(ctx).await
})
.await
}
}
}
@ -58,71 +62,69 @@ pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response, flow::Trans
// --- PRIVATE ---
impl<'a> ExaminedContext<'a> {
async fn close(self) -> Result<(Response, flow::Transition)> {
Ok((Response::ok("CLOSE completed")?, flow::Transition::Unselect))
/// CLOSE in examined state is not the same as in selected state
/// (in selected state it also does an EXPUNGE, here it doesn't)
async fn close(self) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
.message("CLOSE completed")
.ok()?,
flow::Transition::Unselect,
))
}
pub async fn fetch(
self,
sequence_set: &SequenceSet,
attributes: &MacroOrFetchAttributes,
attributes: &'a MacroOrMessageDataItemNames<'static>,
uid: &bool,
) -> Result<(Response, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await {
Ok(resp) => Ok((
Response::ok("FETCH completed")?.with_body(resp),
Response::build()
.to_req(self.req)
.message("FETCH completed")
.set_body(resp)
.ok()?,
flow::Transition::None,
)),
Err(e) => Ok((
Response::build()
.to_req(self.req)
.message(e.to_string())
.no()?,
flow::Transition::None,
)),
Err(e) => Ok((Response::no(&e.to_string())?, flow::Transition::None)),
}
}
pub async fn search(
self,
_charset: &Option<Charset>,
_criteria: &SearchKey,
_charset: &Option<Charset<'a>>,
_criteria: &SearchKey<'a>,
_uid: &bool,
) -> Result<(Response, flow::Transition)> {
Ok((Response::bad("Not implemented")?, flow::Transition::None))
}
pub async fn noop(self) -> Result<(Response, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::ok("NOOP completed.")?.with_body(updates),
Response::build()
.to_req(self.req)
.message("Not implemented")
.bad()?,
flow::Transition::None,
))
}
async fn append(
self,
mailbox: &MailboxCodec,
flags: &[Flag],
date: &Option<MyDateTime>,
message: &NonZeroBytes,
) -> Result<(Response, flow::Transition)> {
let ctx2 = authenticated::AuthenticatedContext {
req: self.req,
user: self.user,
};
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
match ctx2.append_internal(mailbox, flags, date, message).await {
Ok((mb, uidvalidity, uid)) => {
let resp = Response::ok("APPEND completed")?.with_extra_code(Code::Other(
"APPENDUID".try_into().unwrap(),
Some(format!("{} {}", uidvalidity, uid)),
));
if Arc::ptr_eq(&mb, &self.mailbox.mailbox) {
let data = self.mailbox.update().await?;
Ok((resp.with_body(data), flow::Transition::None))
} else {
Ok((resp, flow::Transition::None))
}
}
Err(e) => Ok((Response::no(&e.to_string())?, flow::Transition::None)),
}
let updates = self.mailbox.update().await?;
Ok((
Response::build()
.to_req(self.req)
.message("NOOP completed.")
.set_body(updates)
.ok()?,
flow::Transition::None,
))
}
}

View file

@ -1,4 +1,21 @@
pub mod anonymous;
pub mod anystate;
pub mod authenticated;
pub mod examined;
pub mod selected;
use crate::mail::user::INBOX;
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
/// Convert an IMAP mailbox name/identifier representation
/// to an utf-8 string that is used internally in Aerogramme
struct MailboxName<'a>(&'a MailboxCodec<'a>);
impl<'a> TryInto<&'a str> for MailboxName<'a> {
type Error = std::str::Utf8Error;
fn try_into(self) -> Result<&'a str, Self::Error> {
match self.0 {
MailboxCodec::Inbox => Ok(INBOX),
MailboxCodec::Other(aname) => Ok(std::str::from_utf8(aname.as_ref())?),
}
}
}

View file

@ -1,31 +1,50 @@
use std::sync::Arc;
use anyhow::Result;
use boitalettres::proto::Request;
use boitalettres::proto::Response;
use imap_codec::types::command::CommandBody;
use imap_codec::types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::types::mailbox::Mailbox as MailboxCodec;
use imap_codec::types::response::Code;
use imap_codec::types::sequence::SequenceSet;
use imap_codec::imap_types::command::{Command, CommandBody};
use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
use imap_codec::imap_types::response::{Code, CodeOther};
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::command::examined;
use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow;
use crate::imap::mailbox_view::MailboxView;
use crate::imap::response::Response;
use crate::mail::user::User;
pub struct SelectedContext<'a> {
pub req: &'a Request,
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
}
pub async fn dispatch(ctx: SelectedContext<'_>) -> Result<(Response, flow::Transition)> {
match &ctx.req.command.body {
// Only write commands here, read commands are handled in
// `examined.rs`
pub async fn dispatch<'a>(
ctx: SelectedContext<'a>,
) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => anystate::capability(ctx.req.tag.clone()),
CommandBody::Logout => anystate::logout(),
// Specific to this state (7 commands + NOOP)
CommandBody::Close => ctx.close().await,
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
CommandBody::Fetch {
sequence_set,
macro_or_item_names,
uid,
} => ctx.fetch(sequence_set, macro_or_item_names, uid).await,
CommandBody::Search {
charset,
criteria,
uid,
} => ctx.search(charset, criteria, uid).await,
CommandBody::Expunge => ctx.expunge().await,
CommandBody::Store {
sequence_set,
@ -39,13 +58,14 @@ pub async fn dispatch(ctx: SelectedContext<'_>) -> Result<(Response, flow::Trans
mailbox,
uid,
} => ctx.copy(sequence_set, mailbox, uid).await,
// In selected mode, we fallback to authenticated when needed
_ => {
let ctx = examined::ExaminedContext {
authenticated::dispatch(authenticated::AuthenticatedContext {
req: ctx.req,
user: ctx.user,
mailbox: ctx.mailbox,
};
examined::dispatch(ctx).await
})
.await
}
}
}
@ -53,18 +73,81 @@ pub async fn dispatch(ctx: SelectedContext<'_>) -> Result<(Response, flow::Trans
// --- PRIVATE ---
impl<'a> SelectedContext<'a> {
async fn close(self) -> Result<(Response, flow::Transition)> {
async fn close(self) -> Result<(Response<'static>, flow::Transition)> {
// We expunge messages,
// but we don't send the untagged EXPUNGE responses
let tag = self.req.tag.clone();
self.expunge().await?;
Ok((Response::ok("CLOSE completed")?, flow::Transition::Unselect))
Ok((
Response::build().tag(tag).message("CLOSE completed").ok()?,
flow::Transition::Unselect,
))
}
async fn expunge(self) -> Result<(Response, flow::Transition)> {
pub async fn fetch(
self,
sequence_set: &SequenceSet,
attributes: &'a MacroOrMessageDataItemNames<'static>,
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
match self.mailbox.fetch(sequence_set, attributes, uid).await {
Ok(resp) => Ok((
Response::build()
.to_req(self.req)
.message("FETCH completed")
.set_body(resp)
.ok()?,
flow::Transition::None,
)),
Err(e) => Ok((
Response::build()
.to_req(self.req)
.message(e.to_string())
.no()?,
flow::Transition::None,
)),
}
}
pub async fn search(
self,
_charset: &Option<Charset<'a>>,
_criteria: &SearchKey<'a>,
_uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
.message("Not implemented")
.bad()?,
flow::Transition::None,
))
}
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.mailbox.force_sync().await?;
let updates = self.mailbox.update().await?;
Ok((
Response::build()
.to_req(self.req)
.message("NOOP completed.")
.set_body(updates)
.ok()?,
flow::Transition::None,
))
}
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
let tag = self.req.tag.clone();
let data = self.mailbox.expunge().await?;
Ok((
Response::ok("EXPUNGE completed")?.with_body(data),
Response::build()
.tag(tag)
.message("EXPUNGE completed")
.set_body(data)
.ok()?,
flow::Transition::None,
))
}
@ -74,16 +157,20 @@ impl<'a> SelectedContext<'a> {
sequence_set: &SequenceSet,
kind: &StoreType,
response: &StoreResponse,
flags: &[Flag],
flags: &[Flag<'a>],
uid: &bool,
) -> Result<(Response, flow::Transition)> {
) -> Result<(Response<'static>, flow::Transition)> {
let data = self
.mailbox
.store(sequence_set, kind, response, flags, uid)
.await?;
Ok((
Response::ok("STORE completed")?.with_body(data),
Response::build()
.to_req(self.req)
.message("STORE completed")
.set_body(data)
.ok()?,
flow::Transition::None,
))
}
@ -91,18 +178,21 @@ impl<'a> SelectedContext<'a> {
async fn copy(
self,
sequence_set: &SequenceSet,
mailbox: &MailboxCodec,
mailbox: &MailboxCodec<'a>,
uid: &bool,
) -> Result<(Response, flow::Transition)> {
let name = String::try_from(mailbox.clone())?;
) -> Result<(Response<'static>, flow::Transition)> {
let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?;
let mb = match mb_opt {
Some(mb) => mb,
None => {
return Ok((
Response::no("Destination mailbox does not exist")?
.with_extra_code(Code::TryCreate),
Response::build()
.to_req(self.req)
.message("Destination mailbox does not exist")
.code(Code::TryCreate)
.no()?,
flow::Transition::None,
))
}
@ -126,10 +216,13 @@ impl<'a> SelectedContext<'a> {
);
Ok((
Response::ok("COPY completed")?.with_extra_code(Code::Other(
"COPYUID".try_into().unwrap(),
Some(copyuid_str),
)),
Response::build()
.to_req(self.req)
.message("COPY completed")
.code(Code::Other(CodeOther::unvalidated(
format!("COPYUID {}", copyuid_str).into_bytes(),
)))
.ok()?,
flow::Transition::None,
))
}

View file

@ -37,23 +37,27 @@ pub enum Transition {
// See RFC3501 section 3.
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
impl State {
pub fn apply(self, tr: Transition) -> Result<Self, Error> {
match (self, tr) {
(s, Transition::None) => Ok(s),
(State::NotAuthenticated, Transition::Authenticate(u)) => Ok(State::Authenticated(u)),
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
let new_state = match (&self, tr) {
(_s, Transition::None) => return Ok(()),
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Select(m),
) => Ok(State::Selected(u, m)),
) => State::Selected(u.clone(), m),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Examine(m),
) => Ok(State::Examined(u, m)),
) => State::Examined(u.clone(), m),
(State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
Ok(State::Authenticated(u))
State::Authenticated(u.clone())
}
(_, Transition::Logout) => Ok(State::Logout),
_ => Err(Error::ForbiddenTransition),
}
(_, Transition::Logout) => State::Logout,
_ => return Err(Error::ForbiddenTransition),
};
*self = new_state;
Ok(())
}
}

View file

@ -4,22 +4,20 @@ use std::num::NonZeroU32;
use std::sync::Arc;
use anyhow::{anyhow, bail, Error, Result};
use boitalettres::proto::res::body::Data as Body;
use chrono::{Offset, TimeZone, Utc};
use futures::stream::{FuturesOrdered, StreamExt};
use imap_codec::types::address::Address;
use imap_codec::types::body::{BasicFields, Body as FetchBody, BodyStructure, SpecificFields};
use imap_codec::types::core::{AString, Atom, IString, NString};
use imap_codec::types::datetime::MyDateTime;
use imap_codec::types::envelope::Envelope;
use imap_codec::types::fetch_attributes::{
FetchAttribute, MacroOrFetchAttributes, Section as FetchSection,
use imap_codec::imap_types::body::{BasicFields, Body as FetchBody, BodyStructure, SpecificFields};
use imap_codec::imap_types::core::{AString, Atom, IString, NString, NonEmptyVec};
use imap_codec::imap_types::datetime::DateTime;
use imap_codec::imap_types::envelope::{Address, Envelope};
use imap_codec::imap_types::fetch::{
MacroOrMessageDataItemNames, MessageDataItem, MessageDataItemName, Section as FetchSection,
};
use imap_codec::types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::types::response::{Code, Data, MessageAttribute, Status};
use imap_codec::types::sequence::{self, SequenceSet};
use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType};
use imap_codec::imap_types::response::{Code, Data, Status};
use imap_codec::imap_types::sequence::{self, SequenceSet};
use eml_codec::{
header, imf, mime,
@ -28,6 +26,7 @@ use eml_codec::{
};
use crate::cryptoblob::Key;
use crate::imap::response::Body;
use crate::mail::mailbox::{MailMeta, Mailbox};
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, UidIndex};
use crate::mail::unique_ident::UniqueIdent;
@ -77,19 +76,31 @@ impl<'a> FetchedMail<'a> {
}
pub struct AttributesProxy {
attrs: Vec<FetchAttribute>,
attrs: Vec<MessageDataItemName<'static>>,
}
impl AttributesProxy {
fn new(attrs: &MacroOrFetchAttributes, is_uid_fetch: bool) -> Self {
fn new(attrs: &MacroOrMessageDataItemNames<'static>, is_uid_fetch: bool) -> Self {
// Expand macros
let mut fetch_attrs = match attrs {
MacroOrFetchAttributes::Macro(m) => m.expand(),
MacroOrFetchAttributes::FetchAttributes(a) => a.clone(),
MacroOrMessageDataItemNames::Macro(m) => {
use imap_codec::imap_types::fetch::Macro;
use MessageDataItemName::*;
match m {
Macro::All => vec![Flags, InternalDate, Rfc822Size, Envelope],
Macro::Fast => vec![Flags, InternalDate, Rfc822Size],
Macro::Full => vec![Flags, InternalDate, Rfc822Size, Envelope, Body],
_ => {
tracing::error!("unimplemented macro");
vec![]
}
}
}
MacroOrMessageDataItemNames::MessageDataItemNames(a) => a.clone(),
};
// Handle uids
if is_uid_fetch && !fetch_attrs.contains(&FetchAttribute::Uid) {
fetch_attrs.push(FetchAttribute::Uid);
if is_uid_fetch && !fetch_attrs.contains(&MessageDataItemName::Uid) {
fetch_attrs.push(MessageDataItemName::Uid);
}
Self { attrs: fetch_attrs }
@ -99,11 +110,11 @@ impl AttributesProxy {
self.attrs.iter().any(|x| {
matches!(
x,
FetchAttribute::Body
| FetchAttribute::BodyExt { .. }
| FetchAttribute::Rfc822
| FetchAttribute::Rfc822Text
| FetchAttribute::BodyStructure
MessageDataItemName::Body
| MessageDataItemName::BodyExt { .. }
| MessageDataItemName::Rfc822
| MessageDataItemName::Rfc822Text
| MessageDataItemName::BodyStructure
)
})
}
@ -127,16 +138,20 @@ pub struct MailView<'a> {
meta: &'a MailMeta,
flags: &'a Vec<String>,
content: FetchedMail<'a>,
add_seen: bool,
}
enum SeenFlag {
DoNothing,
MustAdd,
}
impl<'a> MailView<'a> {
fn uid(&self) -> MessageAttribute {
MessageAttribute::Uid(self.ids.uid)
fn uid(&self) -> MessageDataItem<'static> {
MessageDataItem::Uid(self.ids.uid.clone())
}
fn flags(&self) -> MessageAttribute {
MessageAttribute::Flags(
fn flags(&self) -> MessageDataItem<'static> {
MessageDataItem::Flags(
self.flags
.iter()
.filter_map(|f| string_to_flag(f))
@ -144,12 +159,12 @@ impl<'a> MailView<'a> {
)
}
fn rfc_822_size(&self) -> MessageAttribute {
MessageAttribute::Rfc822Size(self.meta.rfc822_size as u32)
fn rfc_822_size(&self) -> MessageDataItem<'static> {
MessageDataItem::Rfc822Size(self.meta.rfc822_size as u32)
}
fn rfc_822_header(&self) -> MessageAttribute {
MessageAttribute::Rfc822Header(NString(
fn rfc_822_header(&self) -> MessageDataItem<'static> {
MessageDataItem::Rfc822Header(NString(
self.meta
.headers
.to_vec()
@ -159,41 +174,42 @@ impl<'a> MailView<'a> {
))
}
fn rfc_822_text(&self) -> Result<MessageAttribute> {
Ok(MessageAttribute::Rfc822Text(NString(
fn rfc_822_text(&self) -> Result<MessageDataItem<'static>> {
Ok(MessageDataItem::Rfc822Text(NString(
self.content
.as_full()?
.raw_body
.to_vec()
.try_into()
.ok()
.map(IString::Literal),
)))
}
fn rfc822(&self) -> Result<MessageAttribute> {
Ok(MessageAttribute::Rfc822(NString(
fn rfc822(&self) -> Result<MessageDataItem<'static>> {
Ok(MessageDataItem::Rfc822(NString(
self.content
.as_full()?
.raw_body
.clone()
.raw_part
.to_vec()
.try_into()
.ok()
.map(IString::Literal),
)))
}
fn envelope(&self) -> MessageAttribute {
MessageAttribute::Envelope(message_envelope(self.content.imf()))
fn envelope(&self) -> MessageDataItem<'static> {
MessageDataItem::Envelope(message_envelope(self.content.imf().clone()))
}
fn body(&self) -> Result<MessageAttribute> {
Ok(MessageAttribute::Body(build_imap_email_struct(
fn body(&self) -> Result<MessageDataItem<'static>> {
Ok(MessageDataItem::Body(build_imap_email_struct(
self.content.as_full()?.child.as_ref(),
)?))
}
fn body_structure(&self) -> Result<MessageAttribute> {
Ok(MessageAttribute::Body(build_imap_email_struct(
fn body_structure(&self) -> Result<MessageDataItem<'static>> {
Ok(MessageDataItem::Body(build_imap_email_struct(
self.content.as_full()?.child.as_ref(),
)?))
}
@ -202,12 +218,14 @@ impl<'a> MailView<'a> {
/// peek does not implicitly set the \Seen flag
/// eg. BODY[HEADER.FIELDS (DATE FROM)]
/// eg. BODY[]<0.2048>
fn body_ext(
&mut self,
section: &Option<FetchSection>,
fn body_ext<'b>(
&self,
section: &Option<FetchSection<'b>>,
partial: &Option<(u32, NonZeroU32)>,
peek: &bool,
) -> Result<MessageAttribute> {
) -> Result<(MessageDataItem<'b>, SeenFlag)> {
let mut seen = SeenFlag::DoNothing;
// Extract message section
let text = get_message_section(self.content.as_anypart()?, section)?;
@ -215,7 +233,7 @@ impl<'a> MailView<'a> {
if !peek && !self.flags.iter().any(|x| *x == seen_flag) {
// Add \Seen flag
//self.mailbox.add_flags(uuid, &[seen_flag]).await?;
self.add_seen = true;
seen = SeenFlag::MustAdd;
}
// Handle <<partial>> which cut the message bytes
@ -223,49 +241,60 @@ impl<'a> MailView<'a> {
let data = NString(text.to_vec().try_into().ok().map(IString::Literal));
return Ok(MessageAttribute::BodyExt {
section: section.clone(),
origin,
data,
});
return Ok((
MessageDataItem::BodyExt {
section: section.as_ref().map(|fs| fs.clone()),
origin,
data,
},
seen,
));
}
fn internal_date(&self) -> Result<MessageAttribute> {
fn internal_date(&self) -> Result<MessageDataItem<'static>> {
let dt = Utc
.fix()
.timestamp_opt(i64::try_from(self.meta.internaldate / 1000)?, 0)
.earliest()
.ok_or(anyhow!("Unable to parse internal date"))?;
Ok(MessageAttribute::InternalDate(MyDateTime(dt)))
Ok(MessageDataItem::InternalDate(DateTime::unvalidated(dt)))
}
fn filter(&mut self, ap: &AttributesProxy) -> Result<Body> {
fn filter<'b>(&self, ap: &AttributesProxy) -> Result<(Body<'static>, SeenFlag)> {
let mut seen = SeenFlag::DoNothing;
let res_attrs = ap
.attrs
.iter()
.map(|attr| match attr {
FetchAttribute::Uid => Ok(self.uid()),
FetchAttribute::Flags => Ok(self.flags()),
FetchAttribute::Rfc822Size => Ok(self.rfc_822_size()),
FetchAttribute::Rfc822Header => Ok(self.rfc_822_header()),
FetchAttribute::Rfc822Text => self.rfc_822_text(),
FetchAttribute::Rfc822 => self.rfc822(),
FetchAttribute::Envelope => Ok(self.envelope()),
FetchAttribute::Body => self.body(),
FetchAttribute::BodyStructure => self.body_structure(),
FetchAttribute::BodyExt {
MessageDataItemName::Uid => Ok(self.uid()),
MessageDataItemName::Flags => Ok(self.flags()),
MessageDataItemName::Rfc822Size => Ok(self.rfc_822_size()),
MessageDataItemName::Rfc822Header => Ok(self.rfc_822_header()),
MessageDataItemName::Rfc822Text => self.rfc_822_text(),
MessageDataItemName::Rfc822 => self.rfc822(),
MessageDataItemName::Envelope => Ok(self.envelope()),
MessageDataItemName::Body => self.body(),
MessageDataItemName::BodyStructure => self.body_structure(),
MessageDataItemName::BodyExt {
section,
partial,
peek,
} => self.body_ext(section, partial, peek),
FetchAttribute::InternalDate => self.internal_date(),
} => {
let (body, has_seen) = self.body_ext(section, partial, peek)?;
seen = has_seen;
Ok(body)
}
MessageDataItemName::InternalDate => self.internal_date(),
})
.collect::<Result<Vec<_>, _>>()?;
Ok(Body::Data(Data::Fetch {
seq_or_uid: self.ids.i,
attributes: res_attrs,
}))
Ok((
Body::Data(Data::Fetch {
seq: self.ids.i,
items: res_attrs.try_into()?,
}),
seen,
))
}
}
@ -376,7 +405,6 @@ impl<'a> MailSelectionBuilder<'a> {
meta,
flags,
content,
add_seen: false,
})
.collect())
}
@ -396,35 +424,26 @@ pub struct MailboxView {
impl MailboxView {
/// Creates a new IMAP view into a mailbox.
/// Generates the necessary IMAP messages so that the client
/// has a satisfactory summary of the current mailbox's state.
/// These are the messages that are sent in response to a SELECT command.
pub async fn new(mailbox: Arc<Mailbox>) -> Result<(Self, Vec<Body>)> {
pub async fn new(mailbox: Arc<Mailbox>) -> Self {
let state = mailbox.current_uid_index().await;
let new_view = Self {
Self {
mailbox,
known_state: state,
};
let mut data = Vec::<Body>::new();
data.push(new_view.exists_status()?);
data.push(new_view.recent_status()?);
data.extend(new_view.flags_status()?.into_iter());
data.push(new_view.uidvalidity_status()?);
data.push(new_view.uidnext_status()?);
Ok((new_view, data))
}
}
/// Create an updated view, useful to make a diff
/// between what the client knows and new stuff
/// Produces a set of IMAP responses describing the change between
/// what the client knows and what is actually in the mailbox.
/// This does NOT trigger a sync, it bases itself on what is currently
/// loaded in RAM by Bayou.
pub async fn update(&mut self) -> Result<Vec<Body>> {
let new_view = MailboxView {
mailbox: self.mailbox.clone(),
known_state: self.mailbox.current_uid_index().await,
pub async fn update(&mut self) -> Result<Vec<Body<'static>>> {
let old_view: &mut Self = self;
let new_view = Self {
mailbox: old_view.mailbox.clone(),
known_state: old_view.mailbox.current_uid_index().await,
};
let mut data = Vec::<Body>::new();
@ -446,7 +465,7 @@ impl MailboxView {
// - notify client of expunged mails
let mut n_expunge = 0;
for (i, (_uid, uuid)) in self.known_state.idx_by_uid.iter().enumerate() {
for (i, (_uid, uuid)) in old_view.known_state.idx_by_uid.iter().enumerate() {
if !new_view.known_state.table.contains_key(uuid) {
data.push(Body::Data(Data::Expunge(
NonZeroU32::try_from((i + 1 - n_expunge) as u32).unwrap(),
@ -456,49 +475,63 @@ impl MailboxView {
}
// - if new mails arrived, notify client of number of existing mails
if new_view.known_state.table.len() != self.known_state.table.len() - n_expunge
|| new_view.known_state.uidvalidity != self.known_state.uidvalidity
if new_view.known_state.table.len() != old_view.known_state.table.len() - n_expunge
|| new_view.known_state.uidvalidity != old_view.known_state.uidvalidity
{
data.push(new_view.exists_status()?);
}
if new_view.known_state.uidvalidity != self.known_state.uidvalidity {
if new_view.known_state.uidvalidity != old_view.known_state.uidvalidity {
// TODO: do we want to push less/more info than this?
data.push(new_view.uidvalidity_status()?);
data.push(new_view.uidnext_status()?);
} else {
// - if flags changed for existing mails, tell client
for (i, (_uid, uuid)) in new_view.known_state.idx_by_uid.iter().enumerate() {
let old_mail = self.known_state.table.get(uuid);
let old_mail = old_view.known_state.table.get(uuid);
let new_mail = new_view.known_state.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, flags)) = new_mail {
data.push(Body::Data(Data::Fetch {
seq_or_uid: NonZeroU32::try_from((i + 1) as u32).unwrap(),
attributes: vec![
MessageAttribute::Uid(*uid),
MessageAttribute::Flags(
seq: NonZeroU32::try_from((i + 1) as u32).unwrap(),
items: vec![
MessageDataItem::Uid(*uid),
MessageDataItem::Flags(
flags.iter().filter_map(|f| string_to_flag(f)).collect(),
),
],
]
.try_into()?,
}));
}
}
}
}
*self = new_view;
*old_view = new_view;
Ok(data)
}
pub async fn store(
/// Generates the necessary IMAP messages so that the client
/// has a satisfactory summary of the current mailbox's state.
/// These are the messages that are sent in response to a SELECT command.
pub fn summary(&self) -> Result<Vec<Body<'static>>> {
let mut data = Vec::<Body>::new();
data.push(self.exists_status()?);
data.push(self.recent_status()?);
data.extend(self.flags_status()?.into_iter());
data.push(self.uidvalidity_status()?);
data.push(self.uidnext_status()?);
Ok(data)
}
pub async fn store<'a>(
&mut self,
sequence_set: &SequenceSet,
kind: &StoreType,
_response: &StoreResponse,
flags: &[Flag],
flags: &[Flag<'a>],
is_uid_store: &bool,
) -> Result<Vec<Body>> {
) -> Result<Vec<Body<'static>>> {
self.mailbox.opportunistic_sync().await?;
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
@ -522,7 +555,7 @@ impl MailboxView {
self.update().await
}
pub async fn expunge(&mut self) -> Result<Vec<Body>> {
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
self.mailbox.opportunistic_sync().await?;
let deleted_flag = Flag::Deleted.to_string();
@ -569,12 +602,12 @@ impl MailboxView {
/// Looks up state changes in the mailbox and produces a set of IMAP
/// responses describing the new state.
pub async fn fetch(
pub async fn fetch<'b>(
&self,
sequence_set: &SequenceSet,
attributes: &MacroOrFetchAttributes,
attributes: &'b MacroOrMessageDataItemNames<'static>,
is_uid_fetch: &bool,
) -> Result<Vec<Body>> {
) -> Result<Vec<Body<'static>>> {
let ap = AttributesProxy::new(attributes, *is_uid_fetch);
// Prepare data
@ -619,31 +652,37 @@ impl MailboxView {
selection.with_bodies(bodies.as_slice());
// Build mail selection views
let mut views = selection.build()?;
let views = selection.build()?;
// Filter views to build the result
let ret = views
.iter_mut()
.filter_map(|mv| mv.filter(&ap).ok())
.collect::<Vec<_>>();
// Register seen flags
let future_flags = views
// Also identify what must be put as seen
let filtered_view = views
.iter()
.filter(|mv| mv.add_seen)
.map(|mv| async move {
.filter_map(|mv| mv.filter(&ap).ok().map(|(body, seen)| (mv, body, seen)))
.collect::<Vec<_>>();
// Register seen flags
let future_flags = filtered_view
.iter()
.filter(|(_mv, _body, seen)| matches!(seen, SeenFlag::MustAdd))
.map(|(mv, _body, _seen)| async move {
let seen_flag = Flag::Seen.to_string();
self.mailbox.add_flags(mv.ids.uuid, &[seen_flag]).await?;
Ok::<_, anyhow::Error>(())
})
.collect::<FuturesOrdered<_>>();
future_flags
.collect::<Vec<_>>()
.await
.into_iter()
.collect::<Result<_, _>>()?;
Ok(ret)
let command_body = filtered_view
.into_iter()
.map(|(_mv, body, _seen)| body)
.collect::<Vec<_>>();
Ok(command_body)
}
// ----
@ -717,7 +756,7 @@ impl MailboxView {
// ----
/// Produce an OK [UIDVALIDITY _] message corresponding to `known_state`
fn uidvalidity_status(&self) -> Result<Body> {
fn uidvalidity_status(&self) -> Result<Body<'static>> {
let uid_validity = Status::ok(
None,
Some(Code::UidValidity(self.uidvalidity())),
@ -732,7 +771,7 @@ impl MailboxView {
}
/// Produce an OK [UIDNEXT _] message corresponding to `known_state`
fn uidnext_status(&self) -> Result<Body> {
fn uidnext_status(&self) -> Result<Body<'static>> {
let next_uid = Status::ok(
None,
Some(Code::UidNext(self.uidnext())),
@ -748,7 +787,7 @@ impl MailboxView {
/// Produce an EXISTS message corresponding to the number of mails
/// in `known_state`
fn exists_status(&self) -> Result<Body> {
fn exists_status(&self) -> Result<Body<'static>> {
Ok(Body::Data(Data::Exists(self.exists()?)))
}
@ -758,7 +797,7 @@ impl MailboxView {
/// Produce a RECENT message corresponding to the number of
/// recent mails in `known_state`
fn recent_status(&self) -> Result<Body> {
fn recent_status(&self) -> Result<Body<'static>> {
Ok(Body::Data(Data::Recent(self.recent()?)))
}
@ -774,27 +813,48 @@ impl MailboxView {
/// Produce a FLAGS and a PERMANENTFLAGS message that indicates
/// the flags that are in `known_state` + default flags
fn flags_status(&self) -> Result<Vec<Body>> {
let mut flags: Vec<Flag> = self
fn flags_status(&self) -> Result<Vec<Body<'static>>> {
let mut body = vec![];
// 1. Collecting all the possible flags in the mailbox
// 1.a Fetch them from our index
let mut known_flags: Vec<Flag> = self
.known_state
.idx_by_flag
.flags()
.filter_map(|f| string_to_flag(f))
.filter_map(|f| match string_to_flag(f) {
Some(FlagFetch::Flag(fl)) => Some(fl),
_ => None,
})
.collect();
// 1.b Merge it with our default flags list
for f in DEFAULT_FLAGS.iter() {
if !flags.contains(f) {
flags.push(f.clone());
if !known_flags.contains(f) {
known_flags.push(f.clone());
}
}
let mut ret = vec![Body::Data(Data::Flags(flags.clone()))];
// 1.c Create the IMAP message
body.push(Body::Data(Data::Flags(known_flags.clone())));
flags.push(Flag::Permanent);
let permanent_flags =
Status::ok(None, Some(Code::PermanentFlags(flags)), "Flags permitted")
.map_err(Error::msg)?;
ret.push(Body::Status(permanent_flags));
// 2. Returning flags that are persisted
// 2.a Always advertise our default flags
let mut permanent = DEFAULT_FLAGS
.iter()
.map(|f| FlagPerm::Flag(f.clone()))
.collect::<Vec<_>>();
// 2.b Say that we support any keyword flag
permanent.push(FlagPerm::Asterisk);
// 2.c Create the IMAP message
let permanent_flags = Status::ok(
None,
Some(Code::PermanentFlags(permanent)),
"Flags permitted",
)
.map_err(Error::msg)?;
body.push(Body::Status(permanent_flags));
Ok(ret)
// Done!
Ok(body)
}
pub(crate) fn unseen_count(&self) -> usize {
@ -809,21 +869,21 @@ impl MailboxView {
}
}
fn string_to_flag(f: &str) -> Option<Flag> {
fn string_to_flag(f: &str) -> Option<FlagFetch<'static>> {
match f.chars().next() {
Some('\\') => match f {
"\\Seen" => Some(Flag::Seen),
"\\Answered" => Some(Flag::Answered),
"\\Flagged" => Some(Flag::Flagged),
"\\Deleted" => Some(Flag::Deleted),
"\\Draft" => Some(Flag::Draft),
"\\Recent" => Some(Flag::Recent),
"\\Seen" => Some(FlagFetch::Flag(Flag::Seen)),
"\\Answered" => Some(FlagFetch::Flag(Flag::Answered)),
"\\Flagged" => Some(FlagFetch::Flag(Flag::Flagged)),
"\\Deleted" => Some(FlagFetch::Flag(Flag::Deleted)),
"\\Draft" => Some(FlagFetch::Flag(Flag::Draft)),
"\\Recent" => Some(FlagFetch::Recent),
_ => match Atom::try_from(f.strip_prefix('\\').unwrap().to_string()) {
Err(_) => {
tracing::error!(flag=%f, "Unable to encode flag as IMAP atom");
None
}
Ok(a) => Some(Flag::Extension(a)),
Ok(a) => Some(FlagFetch::Flag(Flag::system(a))),
},
},
Some(_) => match Atom::try_from(f.to_string()) {
@ -831,7 +891,7 @@ fn string_to_flag(f: &str) -> Option<Flag> {
tracing::error!(flag=%f, "Unable to encode flag as IMAP atom");
None
}
Ok(a) => Some(Flag::Keyword(a)),
Ok(a) => Some(FlagFetch::Flag(Flag::keyword(a))),
},
None => None,
}
@ -858,7 +918,7 @@ fn string_to_flag(f: &str) -> Option<Flag> {
//@FIXME return an error if the envelope is invalid instead of panicking
//@FIXME some fields must be defaulted if there are not set.
fn message_envelope(msg: &imf::Imf) -> Envelope {
fn message_envelope(msg: &imf::Imf) -> Envelope<'static> {
let from = msg.from.iter().map(convert_mbx).collect::<Vec<_>>();
Envelope {
@ -900,7 +960,7 @@ fn message_envelope(msg: &imf::Imf) -> Envelope {
}
}
fn convert_addresses(addrlist: &Vec<imf::address::AddressRef>) -> Vec<Address> {
fn convert_addresses(addrlist: &Vec<imf::address::AddressRef>) -> Vec<Address<'static>> {
let mut acc = vec![];
for item in addrlist {
match item {
@ -911,23 +971,23 @@ fn convert_addresses(addrlist: &Vec<imf::address::AddressRef>) -> Vec<Address> {
return acc;
}
fn convert_mbx(addr: &imf::mailbox::MailboxRef) -> Address {
Address::new(
NString(
fn convert_mbx(addr: &imf::mailbox::MailboxRef) -> Address<'static> {
Address {
name: NString(
addr.name
.as_ref()
.map(|x| IString::try_from(x.to_string()).unwrap()),
),
// SMTP at-domain-list (source route) seems obsolete since at least 1991
// https://www.mhonarc.org/archive/html/ietf-822/1991-06/msg00060.html
NString(None),
NString(Some(
adl: NString(None),
mailbox: NString(Some(
IString::try_from(addr.addrspec.local_part.to_string()).unwrap(),
)),
NString(Some(
host: NString(Some(
IString::try_from(addr.addrspec.domain.to_string()).unwrap(),
)),
)
}
}
/*
@ -945,19 +1005,23 @@ b fetch 29878:29879 (BODY)
b OK Fetch completed (0.001 + 0.000 secs).
*/
fn build_imap_email_struct<'a>(part: &AnyPart<'a>) -> Result<BodyStructure> {
fn build_imap_email_struct<'a>(part: &AnyPart<'a>) -> Result<BodyStructure<'static>> {
match part {
AnyPart::Mult(x) => {
let itype = &x.mime.interpreted_type;
let subtype = IString::try_from(itype.subtype.to_string())
.unwrap_or(unchecked_istring("alternative"));
let inner_bodies = x
.children
.iter()
.filter_map(|inner| build_imap_email_struct(&inner).ok())
.collect::<Vec<_>>();
NonEmptyVec::validate(&inner_bodies)?;
let bodies = NonEmptyVec::unvalidated(inner_bodies);
Ok(BodyStructure::Multi {
bodies: x
.children
.iter()
.filter_map(|inner| build_imap_email_struct(&inner).ok())
.collect(),
bodies,
subtype,
extension_data: None,
/*Some(MultipartExtensionData {
@ -996,7 +1060,7 @@ fn build_imap_email_struct<'a>(part: &AnyPart<'a>) -> Result<BodyStructure> {
number_of_lines: nol(x.body),
},
},
extension: None,
extension_data: None,
})
}
AnyPart::Bin(x) => {
@ -1009,9 +1073,10 @@ fn build_imap_email_struct<'a>(part: &AnyPart<'a>) -> Result<BodyStructure> {
};
let ct = x.mime.fields.ctype.as_ref().unwrap_or(&default);
let type_ = IString::try_from(String::from_utf8_lossy(ct.main).to_string()).or(Err(
anyhow!("Unable to build IString from given Content-Type type given"),
))?;
let r#type =
IString::try_from(String::from_utf8_lossy(ct.main).to_string()).or(Err(
anyhow!("Unable to build IString from given Content-Type type given"),
))?;
let subtype =
IString::try_from(String::from_utf8_lossy(ct.sub).to_string()).or(Err(anyhow!(
@ -1021,9 +1086,9 @@ fn build_imap_email_struct<'a>(part: &AnyPart<'a>) -> Result<BodyStructure> {
Ok(BodyStructure::Single {
body: FetchBody {
basic,
specific: SpecificFields::Basic { type_, subtype },
specific: SpecificFields::Basic { r#type, subtype },
},
extension: None,
extension_data: None,
})
}
AnyPart::Msg(x) => {
@ -1033,12 +1098,12 @@ fn build_imap_email_struct<'a>(part: &AnyPart<'a>) -> Result<BodyStructure> {
body: FetchBody {
basic,
specific: SpecificFields::Message {
envelope: message_envelope(&x.imf),
envelope: Box::new(message_envelope(&x.imf)),
body_structure: Box::new(build_imap_email_struct(x.child.as_ref())?),
number_of_lines: nol(x.raw_part),
},
},
extension: None,
extension_data: None,
})
}
}
@ -1059,7 +1124,7 @@ fn unchecked_istring(s: &'static str) -> IString {
IString::try_from(s).expect("this value is expected to be a valid imap-codec::IString")
}
fn basic_fields(m: &mime::NaiveMIME, sz: usize) -> Result<BasicFields> {
fn basic_fields(m: &mime::NaiveMIME, sz: usize) -> Result<BasicFields<'static>> {
let parameter_list = m
.ctype
.as_ref()
@ -1136,20 +1201,18 @@ fn get_message_section<'a>(
.ok_or(anyhow!("Part must be a message"))?;
match section {
Some(FetchSection::Text(None)) => Ok(msg.raw_body.into()),
Some(FetchSection::Text(Some(part))) => {
map_subpart(parsed, part.0.as_slice(), |part_msg| {
Ok(part_msg
.as_message()
.ok_or(Error::msg(
"Not a message/rfc822 part while expected by request (TEXT)",
))?
.raw_body
.into())
})
}
Some(FetchSection::Text(Some(part))) => map_subpart(parsed, part.0.as_ref(), |part_msg| {
Ok(part_msg
.as_message()
.ok_or(Error::msg(
"Not a message/rfc822 part while expected by request (TEXT)",
))?
.raw_body
.into())
}),
Some(FetchSection::Header(part)) => map_subpart(
parsed,
part.as_ref().map(|p| p.0.as_slice()).unwrap_or(&[]),
part.as_ref().map(|p| p.0.as_ref()).unwrap_or(&[]),
|part_msg| {
Ok(part_msg
.as_message()
@ -1165,17 +1228,18 @@ fn get_message_section<'a>(
) => {
let invert = matches!(section, Some(FetchSection::HeaderFieldsNot(_, _)));
let fields = fields
.as_ref()
.iter()
.map(|x| match x {
AString::Atom(a) => a.as_bytes(),
AString::String(IString::Literal(l)) => l.as_slice(),
AString::String(IString::Quoted(q)) => q.as_bytes(),
AString::Atom(a) => a.inner().as_bytes(),
AString::String(IString::Literal(l)) => l.as_ref(),
AString::String(IString::Quoted(q)) => q.inner().as_bytes(),
})
.collect::<Vec<_>>();
map_subpart(
parsed,
part.as_ref().map(|p| p.0.as_slice()).unwrap_or(&[]),
part.as_ref().map(|p| p.0.as_ref()).unwrap_or(&[]),
|part_msg| {
let mut ret = vec![];
for f in &part_msg.mime().kv {
@ -1195,7 +1259,7 @@ fn get_message_section<'a>(
},
)
}
Some(FetchSection::Part(part)) => map_subpart(parsed, part.0.as_slice(), |part| {
Some(FetchSection::Part(part)) => map_subpart(parsed, part.0.as_ref(), |part| {
let bytes = match &part {
AnyPart::Txt(p) => p.body,
AnyPart::Bin(p) => p.body,
@ -1204,7 +1268,7 @@ fn get_message_section<'a>(
};
Ok(bytes.to_vec().into())
}),
Some(FetchSection::Mime(part)) => map_subpart(parsed, part.0.as_slice(), |part| {
Some(FetchSection::Mime(part)) => map_subpart(parsed, part.0.as_ref(), |part| {
let bytes = match &part {
AnyPart::Txt(p) => p.mime.fields.raw,
AnyPart::Bin(p) => p.mime.fields.raw,
@ -1245,18 +1309,22 @@ mod tests {
use super::*;
use crate::cryptoblob;
use crate::mail::unique_ident;
use imap_codec::codec::Encode;
use imap_codec::types::fetch_attributes::Section;
use imap_codec::encode::Encoder;
use imap_codec::imap_types::fetch::Section;
use imap_codec::imap_types::response::Response;
use imap_codec::ResponseCodec;
use std::fs;
#[test]
fn mailview_body_ext() -> Result<()> {
let ap = AttributesProxy::new(
&MacroOrFetchAttributes::FetchAttributes(vec![FetchAttribute::BodyExt {
section: Some(Section::Header(None)),
partial: None,
peek: false,
}]),
&MacroOrMessageDataItemNames::MessageDataItemNames(vec![
MessageDataItemName::BodyExt {
section: Some(Section::Header(None)),
partial: None,
peek: false,
},
]),
false,
);
@ -1276,27 +1344,26 @@ mod tests {
let rfc822 = b"Subject: hello\r\nFrom: a@a.a\r\nTo: b@b.b\r\nDate: Thu, 12 Oct 2023 08:45:28 +0000\r\n\r\nhello world";
let content = FetchedMail::new_from_message(eml_codec::parse_message(rfc822)?.1);
let mut mv = MailView {
let mv = MailView {
ids: &ids,
content,
meta: &meta,
flags: &flags,
add_seen: false,
};
let res_body = mv.filter(&ap)?;
let (res_body, _seen) = mv.filter(&ap)?;
let fattr = match res_body {
Body::Data(Data::Fetch {
seq_or_uid: _seq,
attributes: attr,
seq: _seq,
items: attr,
}) => Ok(attr),
_ => Err(anyhow!("Not a fetch body")),
}?;
assert_eq!(fattr.len(), 1);
assert_eq!(fattr.as_ref().len(), 1);
let (sec, _orig, _data) = match &fattr[0] {
MessageAttribute::BodyExt {
let (sec, _orig, _data) = match &fattr.as_ref()[0] {
MessageDataItem::BodyExt {
section,
origin,
data,
@ -1345,22 +1412,24 @@ mod tests {
for pref in prefixes.iter() {
println!("{}", pref);
let txt = fs::read(format!("{}.eml", pref))?;
let exp = fs::read(format!("{}.dovecot.body", pref))?;
let oracle = fs::read(format!("{}.dovecot.body", pref))?;
let message = eml_codec::parse_message(&txt).unwrap().1;
let mut resp = Vec::new();
MessageAttribute::Body(build_imap_email_struct(&message.child)?)
.encode(&mut resp)
.unwrap();
let test_repr = Response::Data(Data::Fetch {
seq: NonZeroU32::new(1).unwrap(),
items: NonEmptyVec::from(MessageDataItem::Body(build_imap_email_struct(
&message.child,
)?)),
});
let test_bytes = ResponseCodec::new().encode(&test_repr).dump();
let test_str = String::from_utf8_lossy(&test_bytes).to_lowercase();
let resp_str = String::from_utf8_lossy(&resp).to_lowercase();
let oracle_str =
format!("* 1 FETCH {}\r\n", String::from_utf8_lossy(&oracle)).to_lowercase();
let exp_no_parenthesis = &exp[1..exp.len() - 1];
let exp_str = String::from_utf8_lossy(exp_no_parenthesis).to_lowercase();
println!("aerogramme: {}\n\ndovecot: {}\n\n", resp_str, exp_str);
println!("aerogramme: {}\n\ndovecot: {}\n\n", test_str, oracle_str);
//println!("\n\n {} \n\n", String::from_utf8_lossy(&resp));
assert_eq!(resp_str, exp_str);
assert_eq!(test_str, oracle_str);
}
Ok(())

View file

@ -1,105 +1,186 @@
mod command;
mod flow;
mod mailbox_view;
mod response;
mod session;
use std::task::{Context, Poll};
use std::net::SocketAddr;
use anyhow::Result;
use boitalettres::errors::Error as BalError;
use boitalettres::proto::{Request, Response};
use boitalettres::server::accept::addr::AddrIncoming;
use boitalettres::server::accept::addr::AddrStream;
use boitalettres::server::Server as ImapServer;
use futures::future::BoxFuture;
use futures::future::FutureExt;
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::watch;
use tower::Service;
use imap_codec::imap_types::response::Greeting;
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream;
use crate::config::ImapConfig;
use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL
pub struct Server(ImapServer<AddrIncoming, Instance>);
pub struct Server {
bind_addr: SocketAddr,
login_provider: ArcLoginProvider,
}
pub async fn new(config: ImapConfig, login: ArcLoginProvider) -> Result<Server> {
//@FIXME add a configuration parameter
let incoming = AddrIncoming::new(config.bind_addr).await?;
tracing::info!("IMAP activated, will listen on {:#}", incoming.local_addr);
struct ClientContext {
stream: AnyStream,
addr: SocketAddr,
login_provider: ArcLoginProvider,
must_exit: watch::Receiver<bool>,
}
let imap = ImapServer::new(incoming).serve(Instance::new(login.clone()));
Ok(Server(imap))
pub fn new(config: ImapConfig, login: ArcLoginProvider) -> Server {
Server {
bind_addr: config.bind_addr,
login_provider: login,
}
}
impl Server {
pub async fn run(self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
tracing::info!("IMAP started!");
tokio::select! {
s = self.0 => s?,
_ = must_exit.changed() => tracing::info!("Stopped IMAP server"),
pub async fn run(self: Self, mut must_exit: watch::Receiver<bool>) -> Result<()> {
let tcp = TcpListener::bind(self.bind_addr).await?;
tracing::info!("IMAP server listening on {:#}", self.bind_addr);
let mut connections = FuturesUnordered::new();
while !*must_exit.borrow() {
let wait_conn_finished = async {
if connections.is_empty() {
futures::future::pending().await
} else {
connections.next().await
}
};
let (socket, remote_addr) = tokio::select! {
a = tcp.accept() => a?,
_ = wait_conn_finished => continue,
_ = must_exit.changed() => continue,
};
tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext {
stream: AnyStream::new(socket),
addr: remote_addr.clone(),
login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(),
};
let conn = tokio::spawn(client_wrapper(client));
connections.push(conn);
}
drop(tcp);
tracing::info!("IMAP server shutting down, draining remaining connections...");
while connections.next().await.is_some() {}
Ok(())
}
}
//---
/// Instance is the main Tokio Tower service that we register in BàL.
/// It receives new connection demands and spawn a dedicated service.
struct Instance {
login_provider: ArcLoginProvider,
}
impl Instance {
pub fn new(login_provider: ArcLoginProvider) -> Self {
Self { login_provider }
}
}
impl<'a> Service<&'a AddrStream> for Instance {
type Response = Connection;
type Error = anyhow::Error;
type Future = BoxFuture<'static, Result<Self::Response>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, addr: &'a AddrStream) -> Self::Future {
tracing::info!(remote_addr = %addr.remote_addr, local_addr = %addr.local_addr, "accept");
let lp = self.login_provider.clone();
async { Ok(Connection::new(lp)) }.boxed()
}
}
//---
/// Connection is the per-connection Tokio Tower service we register in BàL.
/// It handles a single TCP connection, and thus has a business logic.
struct Connection {
session: session::Manager,
}
impl Connection {
pub fn new(login_provider: ArcLoginProvider) -> Self {
Self {
session: session::Manager::new(login_provider),
async fn client_wrapper(ctx: ClientContext) {
let addr = ctx.addr.clone();
match client(ctx).await {
Ok(()) => {
tracing::info!("closing successful session for {:?}", addr);
}
Err(e) => {
tracing::error!("closing errored session for {:?}: {}", addr, e);
}
}
}
impl Service<Request> for Connection {
type Response = Response;
type Error = BalError;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
async fn client(mut ctx: ClientContext) -> Result<()> {
// Send greeting
let (mut server, _) = ServerFlow::send_greeting(
ctx.stream,
ServerFlowOptions::default(),
Greeting::ok(None, "Aerogramme").unwrap(),
)
.await?;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
use crate::imap::response::{Body, Response as MyResponse};
use crate::imap::session::Instance;
use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::response::{Response, Status};
use tokio::sync::mpsc;
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
let bckgrnd = tokio::spawn(async move {
let mut session = Instance::new(ctx.login_provider);
loop {
let cmd = match cmd_rx.recv().await {
None => break,
Some(cmd_recv) => cmd_recv,
};
let maybe_response = session.command(cmd).await;
match resp_tx.send(maybe_response) {
Err(_) => break,
Ok(_) => (),
};
}
tracing::info!("runner is quitting");
});
// Main loop
loop {
tokio::select! {
// Managing imap_flow stuff
srv_evt = server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response {
Response::Status(Status::Bye(_)) => break,
_ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
}
},
ServerFlowEvent::CommandReceived { command } => {
match cmd_tx.try_send(command) {
Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => {
server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
}
_ => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
}
}
},
},
// Managing response generated by Aerogramme
maybe_msg = resp_rx.recv() => {
let response = match maybe_msg {
None => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
continue
},
Some(r) => r,
};
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => server.enqueue_data(d),
Body::Status(s) => server.enqueue_status(s),
};
}
server.enqueue_status(response.completion);
},
// When receiving a CTRL+C
_ = ctx.must_exit.changed() => {
server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
}
fn call(&mut self, req: Request) -> Self::Future {
tracing::debug!("Got request: {:#?}", req.command);
self.session.process(req)
}
drop(cmd_tx);
bckgrnd.await?;
Ok(())
}

112
src/imap/response.rs Normal file
View file

@ -0,0 +1,112 @@
use anyhow::Result;
use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::core::Tag;
use imap_codec::imap_types::response::{Code, Data, Status};
pub enum Body<'a> {
Data(Data<'a>),
Status(Status<'a>),
}
pub struct ResponseBuilder<'a> {
tag: Option<Tag<'a>>,
code: Option<Code<'a>>,
text: String,
body: Vec<Body<'a>>,
}
impl<'a> ResponseBuilder<'a> {
pub fn to_req(mut self, cmd: &Command<'a>) -> Self {
self.tag = Some(cmd.tag.clone());
self
}
pub fn tag(mut self, tag: Tag<'a>) -> Self {
self.tag = Some(tag);
self
}
pub fn message(mut self, txt: impl Into<String>) -> Self {
self.text = txt.into();
self
}
pub fn code(mut self, code: Code<'a>) -> Self {
self.code = Some(code);
self
}
pub fn data(mut self, data: Data<'a>) -> Self {
self.body.push(Body::Data(data));
self
}
pub fn many_data(mut self, data: Vec<Data<'a>>) -> Self {
for d in data.into_iter() {
self = self.data(d);
}
self
}
#[allow(dead_code)]
pub fn info(mut self, status: Status<'a>) -> Self {
self.body.push(Body::Status(status));
self
}
#[allow(dead_code)]
pub fn many_info(mut self, status: Vec<Status<'a>>) -> Self {
for d in status.into_iter() {
self = self.info(d);
}
self
}
pub fn set_body(mut self, body: Vec<Body<'a>>) -> Self {
self.body = body;
self
}
pub fn ok(self) -> Result<Response<'a>> {
Ok(Response {
completion: Status::ok(self.tag, self.code, self.text)?,
body: self.body,
})
}
pub fn no(self) -> Result<Response<'a>> {
Ok(Response {
completion: Status::no(self.tag, self.code, self.text)?,
body: self.body,
})
}
pub fn bad(self) -> Result<Response<'a>> {
Ok(Response {
completion: Status::bad(self.tag, self.code, self.text)?,
body: self.body,
})
}
}
pub struct Response<'a> {
pub body: Vec<Body<'a>>,
pub completion: Status<'a>,
}
impl<'a> Response<'a> {
pub fn build() -> ResponseBuilder<'a> {
ResponseBuilder {
tag: None,
code: None,
text: "".to_string(),
body: vec![],
}
}
pub fn bye() -> Result<Response<'a>> {
Ok(Response {
completion: Status::bye(None, "bye")?,
body: vec![],
})
}
}

View file

@ -1,180 +1,86 @@
use anyhow::Error;
use boitalettres::errors::Error as BalError;
use boitalettres::proto::{Request, Response};
use futures::future::BoxFuture;
use futures::future::FutureExt;
use tokio::sync::mpsc::error::TrySendError;
use tokio::sync::{mpsc, oneshot};
use crate::imap::command::{anonymous, authenticated, examined, selected};
use crate::imap::flow;
use crate::imap::response::Response;
use crate::login::ArcLoginProvider;
/* This constant configures backpressure in the system,
* or more specifically, how many pipelined messages are allowed
* before refusing them
*/
const MAX_PIPELINED_COMMANDS: usize = 10;
struct Message {
req: Request,
tx: oneshot::Sender<Result<Response, BalError>>,
}
use imap_codec::imap_types::command::Command;
//-----
pub struct Manager {
tx: mpsc::Sender<Message>,
}
impl Manager {
pub fn new(login_provider: ArcLoginProvider) -> Self {
let (tx, rx) = mpsc::channel(MAX_PIPELINED_COMMANDS);
tokio::spawn(async move {
let instance = Instance::new(login_provider, rx);
instance.start().await;
});
Self { tx }
}
pub fn process(&self, req: Request) -> BoxFuture<'static, Result<Response, BalError>> {
let (tx, rx) = oneshot::channel();
let msg = Message { req, tx };
// We use try_send on a bounded channel to protect the daemons from DoS.
// Pipelining requests in IMAP are a special case: they should not occure often
// and in a limited number (like 3 requests). Someone filling the channel
// will probably be malicious so we "rate limit" them.
match self.tx.try_send(msg) {
Ok(()) => (),
Err(TrySendError::Full(_)) => {
return async { Response::bad("Too fast! Send less pipelined requests.") }.boxed()
}
Err(TrySendError::Closed(_)) => {
return async { Err(BalError::Text("Terminated session".to_string())) }.boxed()
}
};
// @FIXME add a timeout, handle a session that fails.
async {
match rx.await {
Ok(r) => r,
Err(e) => {
tracing::warn!("Got error {:#?}", e);
Response::bad("No response from the session handler")
}
}
}
.boxed()
}
}
//-----
pub struct Instance {
rx: mpsc::Receiver<Message>,
pub login_provider: ArcLoginProvider,
pub state: flow::State,
}
impl Instance {
fn new(login_provider: ArcLoginProvider, rx: mpsc::Receiver<Message>) -> Self {
pub fn new(login_provider: ArcLoginProvider) -> Self {
Self {
login_provider,
rx,
state: flow::State::NotAuthenticated,
}
}
//@FIXME add a function that compute the runner's name from its local info
// to ease debug
// fn name(&self) -> String { }
async fn start(mut self) {
//@FIXME add more info about the runner
tracing::debug!("starting runner");
while let Some(msg) = self.rx.recv().await {
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths.
let ctrl = match &mut self.state {
flow::State::NotAuthenticated => {
let ctx = anonymous::AnonymousContext {
req: &msg.req,
login_provider: Some(&self.login_provider),
};
anonymous::dispatch(ctx).await
}
flow::State::Authenticated(ref user) => {
let ctx = authenticated::AuthenticatedContext {
req: &msg.req,
user,
};
authenticated::dispatch(ctx).await
}
flow::State::Selected(ref user, ref mut mailbox) => {
let ctx = selected::SelectedContext {
req: &msg.req,
user,
mailbox,
};
selected::dispatch(ctx).await
}
flow::State::Examined(ref user, ref mut mailbox) => {
let ctx = examined::ExaminedContext {
req: &msg.req,
user,
mailbox,
};
examined::dispatch(ctx).await
}
flow::State::Logout => {
Response::bad("No commands are allowed in the LOGOUT state.")
.map(|r| (r, flow::Transition::None))
.map_err(Error::msg)
}
};
// Process result
let res = match ctrl {
Ok((res, tr)) => {
//@FIXME remove unwrap
self.state = match self.state.apply(tr) {
Ok(new_state) => new_state,
Err(e) => {
tracing::error!("Invalid transition: {}, exiting", e);
break;
}
};
//@FIXME enrich here the command with some global status
Ok(res)
}
// Cast from anyhow::Error to Bal::Error
// @FIXME proper error handling would be great
Err(e) => match e.downcast::<BalError>() {
Ok(be) => Err(be),
Err(e) => {
tracing::warn!(error=%e, "internal.error");
Response::bad("Internal error")
}
},
};
//@FIXME I think we should quit this thread on error and having our manager watch it,
// and then abort the session as it is corrupted.
msg.tx.send(res).unwrap_or_else(|e| {
tracing::warn!("failed to send imap response to manager: {:#?}", e)
});
if let flow::State::Logout = &self.state {
break;
pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
// Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths.
let (resp, tr) = match &mut self.state {
flow::State::NotAuthenticated => {
let ctx = anonymous::AnonymousContext {
req: &cmd,
login_provider: &self.login_provider,
};
anonymous::dispatch(ctx).await
}
flow::State::Authenticated(ref user) => {
let ctx = authenticated::AuthenticatedContext { req: &cmd, user };
authenticated::dispatch(ctx).await
}
flow::State::Selected(ref user, ref mut mailbox) => {
let ctx = selected::SelectedContext {
req: &cmd,
user,
mailbox,
};
selected::dispatch(ctx).await
}
flow::State::Examined(ref user, ref mut mailbox) => {
let ctx = examined::ExaminedContext {
req: &cmd,
user,
mailbox,
};
examined::dispatch(ctx).await
}
flow::State::Logout => Response::build()
.tag(cmd.tag.clone())
.message("No commands are allowed in the LOGOUT state.")
.bad()
.map(|r| (r, flow::Transition::None)),
}
.unwrap_or_else(|err| {
tracing::error!("Command error {:?} occured while processing {:?}", err, cmd);
(
Response::build()
.to_req(&cmd)
.message("Internal error while processing command")
.bad()
.unwrap(),
flow::Transition::None,
)
});
if let Err(e) = self.state.apply(tr) {
tracing::error!(
"Transition error {:?} occured while processing on command {:?}",
e,
cmd
);
return Response::build()
.to_req(&cmd)
.message(
"Internal error, processing command triggered an illegal IMAP state transition",
)
.bad()
.unwrap();
}
//@FIXME add more info about the runner
tracing::debug!("exiting runner");
resp
}
}

View file

@ -0,0 +1,51 @@
use crate::login::*;
use crate::storage::*;
pub struct DemoLoginProvider {
keys: CryptoKeys,
in_memory_store: in_memory::MemDb,
}
impl DemoLoginProvider {
pub fn new() -> Self {
Self {
keys: CryptoKeys::init(),
in_memory_store: in_memory::MemDb::new(),
}
}
}
#[async_trait]
impl LoginProvider for DemoLoginProvider {
async fn login(&self, username: &str, password: &str) -> Result<Credentials> {
tracing::debug!(user=%username, "login");
if username != "alice" {
bail!("user does not exist");
}
if password != "hunter2" {
bail!("wrong password");
}
let storage = self.in_memory_store.builder("alice").await;
let keys = self.keys.clone();
Ok(Credentials { storage, keys })
}
async fn public_login(&self, email: &str) -> Result<PublicCredentials> {
tracing::debug!(user=%email, "public_login");
if email != "alice@example.tld" {
bail!("invalid email address");
}
let storage = self.in_memory_store.builder("alice").await;
let public_key = self.keys.public.clone();
Ok(PublicCredentials {
storage,
public_key,
})
}
}

View file

@ -1,3 +1,4 @@
pub mod demo_provider;
pub mod ldap_provider;
pub mod static_provider;

View file

@ -29,7 +29,12 @@ struct Args {
#[clap(subcommand)]
command: Command,
/// A special mode dedicated to developers, NOT INTENDED FOR PRODUCTION
#[clap(long)]
dev: bool,
#[clap(short, long, env = "CONFIG_FILE", default_value = "aerogramme.toml")]
/// Path to the main Aerogramme configuration file
config_file: PathBuf,
}
@ -158,7 +163,22 @@ async fn main() -> Result<()> {
tracing_subscriber::fmt::init();
let args = Args::parse();
let any_config = read_config(args.config_file)?;
let any_config = if args.dev {
use std::net::*;
AnyConfig::Provider(ProviderConfig {
pid: None,
imap: ImapConfig {
bind_addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 1143),
},
lmtp: LmtpConfig {
bind_addr: SocketAddr::new(IpAddr::V6(Ipv6Addr::new(0, 0, 0, 0, 0, 0, 0, 1)), 1025),
hostname: "example.tld".to_string(),
},
users: UserManagement::Demo,
})
} else {
read_config(args.config_file)?
};
match (&args.command, any_config) {
(Command::Companion(subcommand), AnyConfig::Companion(config)) => match subcommand {
@ -184,8 +204,8 @@ async fn main() -> Result<()> {
ProviderCommand::Account(cmd) => {
let user_file = match config.users {
UserManagement::Static(conf) => conf.user_list,
UserManagement::Ldap(_) => {
panic!("LDAP account management is not supported from Aerogramme.")
_ => {
panic!("Only static account management is supported from Aerogramme.")
}
};
account_management(&args.command, cmd, user_file)?;

View file

@ -11,7 +11,7 @@ use crate::config::*;
use crate::imap;
use crate::lmtp::*;
use crate::login::ArcLoginProvider;
use crate::login::{ldap_provider::*, static_provider::*};
use crate::login::{demo_provider::*, ldap_provider::*, static_provider::*};
pub struct Server {
lmtp_server: Option<Arc<LmtpServer>>,
@ -25,7 +25,7 @@ impl Server {
let login = Arc::new(StaticLoginProvider::new(config.users).await?);
let lmtp_server = None;
let imap_server = Some(imap::new(config.imap, login.clone()).await?);
let imap_server = Some(imap::new(config.imap, login.clone()));
Ok(Self {
lmtp_server,
imap_server,
@ -36,12 +36,13 @@ impl Server {
pub async fn from_provider_config(config: ProviderConfig) -> Result<Self> {
tracing::info!("Init as provider");
let login: ArcLoginProvider = match config.users {
UserManagement::Demo => Arc::new(DemoLoginProvider::new()),
UserManagement::Static(x) => Arc::new(StaticLoginProvider::new(x).await?),
UserManagement::Ldap(x) => Arc::new(LdapLoginProvider::new(x)?),
};
let lmtp_server = Some(LmtpServer::new(config.lmtp, login.clone()));
let imap_server = Some(imap::new(config.imap, login.clone()).await?);
let imap_server = Some(imap::new(config.imap, login.clone()));
Ok(Self {
lmtp_server,

394
tests/imap_features.rs Normal file
View file

@ -0,0 +1,394 @@
use anyhow::{bail, Context, Result};
use std::io::{Read, Write};
use std::net::{Shutdown, TcpStream};
use std::process::Command;
use std::{thread, time};
static SMALL_DELAY: time::Duration = time::Duration::from_millis(200);
static EMAIL1: &[u8] = b"Date: Sat, 8 Jul 2023 07:14:29 +0200\r
From: Bob Robert <bob@example.tld>\r
To: Alice Malice <alice@example.tld>\r
CC: =?ISO-8859-1?Q?Andr=E9?= Pirard <PIRARD@vm1.ulg.ac.be>\r
Subject: =?ISO-8859-1?B?SWYgeW91IGNhbiByZWFkIHRoaXMgeW8=?=\r
=?ISO-8859-2?B?dSB1bmRlcnN0YW5kIHRoZSBleGFtcGxlLg==?=\r
X-Unknown: something something\r
Bad entry\r
on multiple lines\r
Message-ID: <NTAxNzA2AC47634Y366BAMTY4ODc5MzQyODY0ODY5@www.grrrndzero.org>\r
MIME-Version: 1.0\r
Content-Type: multipart/alternative;\r
boundary=\"b1_e376dc71bafc953c0b0fdeb9983a9956\"\r
Content-Transfer-Encoding: 7bit\r
\r
This is a multi-part message in MIME format.\r
\r
--b1_e376dc71bafc953c0b0fdeb9983a9956\r
Content-Type: text/plain; charset=utf-8\r
Content-Transfer-Encoding: quoted-printable\r
\r
GZ\r
OoOoO\r
oOoOoOoOo\r
oOoOoOoOoOoOoOoOo\r
oOoOoOoOoOoOoOoOoOoOoOo\r
oOoOoOoOoOoOoOoOoOoOoOoOoOoOo\r
OoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoO\r
\r
--b1_e376dc71bafc953c0b0fdeb9983a9956\r
Content-Type: text/html; charset=us-ascii\r
\r
<div style=\"text-align: center;\"><strong>GZ</strong><br />\r
OoOoO<br />\r
oOoOoOoOo<br />\r
oOoOoOoOoOoOoOoOo<br />\r
oOoOoOoOoOoOoOoOoOoOoOo<br />\r
oOoOoOoOoOoOoOoOoOoOoOoOoOoOo<br />\r
OoOoOoOoOoOoOoOoOoOoOoOoOoOoOoOoO<br />\r
</div>\r
\r
--b1_e376dc71bafc953c0b0fdeb9983a9956--\r
";
static EMAIL2: &[u8] = b"From: alice@example.com\r
To: alice@example.tld\r
Subject: Test\r
\r
Hello world!\r
";
fn main() {
let mut daemon = Command::new(env!("CARGO_BIN_EXE_aerogramme"))
.arg("--dev")
.arg("provider")
.arg("daemon")
.spawn()
.expect("daemon should be started");
let mut max_retry = 20;
let mut imap_socket = loop {
max_retry -= 1;
match (TcpStream::connect("[::1]:1143"), max_retry) {
(Err(e), 0) => panic!("no more retry, last error is: {}", e),
(Err(e), _) => {
println!("unable to connect: {} ; will retry in 1 sec", e);
}
(Ok(v), _) => break v,
}
thread::sleep(SMALL_DELAY);
};
let mut lmtp_socket = TcpStream::connect("[::1]:1025").expect("lmtp socket must be connected");
println!("-- ready to test imap features --");
let result = generic_test(&mut imap_socket, &mut lmtp_socket);
println!("-- test teardown --");
imap_socket
.shutdown(Shutdown::Both)
.expect("closing imap socket at the end of the test");
lmtp_socket
.shutdown(Shutdown::Both)
.expect("closing lmtp socket at the end of the test");
daemon.kill().expect("daemon should be killed");
result.expect("all tests passed");
}
fn generic_test(imap_socket: &mut TcpStream, lmtp_socket: &mut TcpStream) -> Result<()> {
connect(imap_socket).context("server says hello")?;
capability(imap_socket).context("check server capabilities")?;
login(imap_socket).context("login test")?;
create_mailbox(imap_socket).context("created mailbox archive")?;
// UNSUBSCRIBE IS NOT IMPLEMENTED YET
//unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?;
select_inbox(imap_socket).context("select inbox")?;
check(imap_socket).context("check must run")?;
status_mailbox(imap_socket).context("status of archive from inbox")?;
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, EMAIL1).context("mail delivered successfully")?;
noop_exists(imap_socket).context("noop loop must detect a new email")?;
fetch_rfc822(imap_socket, EMAIL1).context("fetch rfc822 message")?;
copy_email(imap_socket).context("copy message to the archive mailbox")?;
append_email(imap_socket, EMAIL2).context("insert email in INBOX")?;
// SEARCH IS NOT IMPLEMENTED YET
//search(imap_socket).expect("search should return something");
add_flags_email(imap_socket).context("should add delete and important flags to the email")?;
expunge(imap_socket).context("expunge emails")?;
rename_mailbox(imap_socket).context("archive mailbox is renamed my-archives")?;
delete_mailbox(imap_socket).context("my-archives mailbox is deleted")?;
Ok(())
}
fn connect(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..4], &b"* OK"[..]);
Ok(())
}
fn capability(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"5 capability\r\n"[..])?;
let mut buffer: [u8; 1500] = [0; 1500];
let read = read_lines(imap, &mut buffer, Some(&b"5 OK"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
assert!(srv_msg.contains("IMAP4REV1"));
assert!(srv_msg.contains("IDLE"));
Ok(())
}
fn login(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
imap.write(&b"10 login alice hunter2\r\n"[..])?;
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"10 OK"[..]);
Ok(())
}
fn create_mailbox(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
imap.write(&b"15 create archive\r\n"[..])?;
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..12], &b"15 OK CREATE"[..]);
Ok(())
}
#[allow(dead_code)]
fn unsubscribe_mailbox(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 6000] = [0; 6000];
imap.write(&b"16 lsub \"\" *\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"16 OK LSUB"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
assert!(srv_msg.contains(" INBOX\r\n"));
assert!(srv_msg.contains(" archive\r\n"));
imap.write(&b"17 unsubscribe archive\r\n"[..])?;
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"17 OK"[..]);
imap.write(&b"18 lsub \"\" *\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"18 OK LSUB"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
assert!(srv_msg.contains(" INBOX\r\n"));
assert!(!srv_msg.contains(" archive\r\n"));
Ok(())
}
fn select_inbox(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 6000] = [0; 6000];
imap.write(&b"20 select inbox\r\n"[..])?;
let _read = read_lines(imap, &mut buffer, Some(&b"20 OK"[..]))?;
Ok(())
}
fn check(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
imap.write(&b"21 check\r\n"[..])?;
let _read = read_lines(imap, &mut buffer, Some(&b"21 OK"[..]))?;
Ok(())
}
fn status_mailbox(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"25 STATUS archive (UIDNEXT MESSAGES)\r\n"[..])?;
let mut buffer: [u8; 6000] = [0; 6000];
let _read = read_lines(imap, &mut buffer, Some(&b"25 OK"[..]))?;
Ok(())
}
fn lmtp_handshake(lmtp: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
let _read = read_lines(lmtp, &mut buffer, None)?;
assert_eq!(&buffer[..4], &b"220 "[..]);
lmtp.write(&b"LHLO example.tld\r\n"[..])?;
let _read = read_lines(lmtp, &mut buffer, Some(&b"250 "[..]))?;
Ok(())
}
fn lmtp_deliver_email(lmtp: &mut TcpStream, email: &[u8]) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
lmtp.write(&b"MAIL FROM:<bob@example.tld>\r\n"[..])?;
let _read = read_lines(lmtp, &mut buffer, Some(&b"250 2.0.0"[..]))?;
lmtp.write(&b"RCPT TO:<alice@example.tld>\r\n"[..])?;
let _read = read_lines(lmtp, &mut buffer, Some(&b"250 2.1.5"[..]))?;
lmtp.write(&b"DATA\r\n"[..])?;
let _read = read_lines(lmtp, &mut buffer, Some(&b"354 "[..]))?;
lmtp.write(email)?;
lmtp.write(&b"\r\n.\r\n"[..])?;
let _read = read_lines(lmtp, &mut buffer, Some(&b"250 2.0.0"[..]))?;
Ok(())
}
fn noop_exists(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 6000] = [0; 6000];
let mut max_retry = 20;
loop {
max_retry -= 1;
imap.write(&b"30 NOOP\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"30 OK NOOP"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
match (max_retry, srv_msg.contains("* 1 EXISTS")) {
(_, true) => break,
(0, _) => bail!("no more retry"),
_ => (),
}
thread::sleep(SMALL_DELAY);
}
Ok(())
}
fn fetch_rfc822(imap: &mut TcpStream, ref_mail: &[u8]) -> Result<()> {
let mut buffer: [u8; 65535] = [0; 65535];
imap.write(&b"40 fetch 1 rfc822\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"40 OK FETCH"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
let orig_email = std::str::from_utf8(ref_mail)?;
assert!(srv_msg.contains(orig_email));
Ok(())
}
fn copy_email(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 65535] = [0; 65535];
imap.write(&b"45 copy 1 archive\r\n"[..])?;
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"45 OK"[..]);
Ok(())
}
fn append_email(imap: &mut TcpStream, ref_mail: &[u8]) -> Result<()> {
let mut buffer: [u8; 6000] = [0; 6000];
assert_ne!(ref_mail.len(), 0);
let append_cmd = format!("47 append inbox (\\Seen) {{{}}}\r\n", ref_mail.len());
println!("append cmd: {}", append_cmd);
imap.write(append_cmd.as_bytes())?;
// wait for continuation
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(read[0], b'+');
// write our stuff
imap.write(ref_mail)?;
imap.write(&b"\r\n"[..])?;
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"47 OK"[..]);
// noop to force a sync
imap.write(&b"48 NOOP\r\n"[..])?;
let _read = read_lines(imap, &mut buffer, Some(&b"48 OK NOOP"[..]))?;
// check it is stored successfully
imap.write(&b"49 fetch 2 rfc822.size\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"49 OK"[..]))?;
let expected = format!("* 2 FETCH (RFC822.SIZE {})", ref_mail.len());
let expbytes = expected.as_bytes();
assert_eq!(&read[..expbytes.len()], expbytes);
Ok(())
}
fn add_flags_email(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
imap.write(&b"50 store 1 +FLAGS (\\Deleted \\Important)\r\n"[..])?;
let _read = read_lines(imap, &mut buffer, Some(&b"50 OK STORE"[..]))?;
Ok(())
}
#[allow(dead_code)]
/// Not yet implemented
fn search(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"55 search text \"OoOoO\"\r\n"[..])?;
let mut buffer: [u8; 1500] = [0; 1500];
let _read = read_lines(imap, &mut buffer, Some(&b"55 OK SEARCH"[..]))?;
Ok(())
}
fn expunge(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"60 expunge\r\n"[..])?;
let mut buffer: [u8; 1500] = [0; 1500];
let _read = read_lines(imap, &mut buffer, Some(&b"60 OK EXPUNGE"[..]))?;
Ok(())
}
fn rename_mailbox(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"70 rename archive my-archives\r\n"[..])?;
let mut buffer: [u8; 1500] = [0; 1500];
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"70 OK"[..]);
imap.write(&b"71 list \"\" *\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"71 OK LIST"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
assert!(!srv_msg.contains(" archive\r\n"));
assert!(srv_msg.contains(" INBOX\r\n"));
assert!(srv_msg.contains(" my-archives\r\n"));
Ok(())
}
fn delete_mailbox(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"80 delete my-archives\r\n"[..])?;
let mut buffer: [u8; 1500] = [0; 1500];
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(&read[..5], &b"80 OK"[..]);
imap.write(&b"81 list \"\" *\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"81 OK LIST"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
assert!(!srv_msg.contains(" archive\r\n"));
assert!(!srv_msg.contains(" my-archives\r\n"));
assert!(srv_msg.contains(" INBOX\r\n"));
Ok(())
}
fn read_lines<'a, F: Read>(
reader: &mut F,
buffer: &'a mut [u8],
stop_marker: Option<&[u8]>,
) -> Result<&'a [u8]> {
let mut nbytes = 0;
loop {
nbytes += reader.read(&mut buffer[nbytes..])?;
//println!("partial read: {}", std::str::from_utf8(&buffer[..nbytes])?);
let pre_condition = match stop_marker {
None => true,
Some(mark) => buffer[..nbytes].windows(mark.len()).any(|w| w == mark),
};
if pre_condition && &buffer[nbytes - 2..nbytes] == &b"\r\n"[..] {
break;
}
}
println!("read: {}", std::str::from_utf8(&buffer[..nbytes])?);
Ok(&buffer[..nbytes])
}