Implement IDLE #72

Merged
quentin merged 9 commits from feat/idle into main 2024-01-19 14:04:04 +00:00
19 changed files with 732 additions and 440 deletions

2
Cargo.lock generated
View file

@ -1822,7 +1822,7 @@ dependencies = [
[[package]] [[package]]
name = "imap-flow" name = "imap-flow"
version = "0.1.0" version = "0.1.0"
source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#0f548a2070aace09f9f9a0b6ef221efefb8b110b" source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#60ff9e082ccfcd10a042b616d8038a578fa0c8ff"
dependencies = [ dependencies = [
"bounded-static", "bounded-static",
"bytes", "bytes",

View file

@ -2,7 +2,7 @@ use std::sync::{Arc, Weak};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use log::{debug, error, info}; use log::error;
use rand::prelude::*; use rand::prelude::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{watch, Notify}; use tokio::sync::{watch, Notify};
@ -84,21 +84,21 @@ impl<S: BayouState> Bayou<S> {
// 1. List checkpoints // 1. List checkpoints
let checkpoints = self.list_checkpoints().await?; let checkpoints = self.list_checkpoints().await?;
debug!("(sync) listed checkpoints: {:?}", checkpoints); tracing::debug!("(sync) listed checkpoints: {:?}", checkpoints);
// 2. Load last checkpoint if different from currently used one // 2. Load last checkpoint if different from currently used one
let checkpoint = if let Some((ts, key)) = checkpoints.last() { let checkpoint = if let Some((ts, key)) = checkpoints.last() {
if *ts == self.checkpoint.0 { if *ts == self.checkpoint.0 {
(*ts, None) (*ts, None)
} else { } else {
debug!("(sync) loading checkpoint: {}", key); tracing::debug!("(sync) loading checkpoint: {}", key);
let buf = self let buf = self
.storage .storage
.blob_fetch(&storage::BlobRef(key.to_string())) .blob_fetch(&storage::BlobRef(key.to_string()))
.await? .await?
.value; .value;
debug!("(sync) checkpoint body length: {}", buf.len()); tracing::debug!("(sync) checkpoint body length: {}", buf.len());
let ck = open_deserialize::<S>(&buf, &self.key)?; let ck = open_deserialize::<S>(&buf, &self.key)?;
(*ts, Some(ck)) (*ts, Some(ck))
@ -112,7 +112,7 @@ impl<S: BayouState> Bayou<S> {
} }
if let Some(ck) = checkpoint.1 { if let Some(ck) = checkpoint.1 {
debug!( tracing::debug!(
"(sync) updating checkpoint to loaded state at {:?}", "(sync) updating checkpoint to loaded state at {:?}",
checkpoint.0 checkpoint.0
); );
@ -127,7 +127,7 @@ impl<S: BayouState> Bayou<S> {
// 3. List all operations starting from checkpoint // 3. List all operations starting from checkpoint
let ts_ser = self.checkpoint.0.to_string(); let ts_ser = self.checkpoint.0.to_string();
debug!("(sync) looking up operations starting at {}", ts_ser); tracing::debug!("(sync) looking up operations starting at {}", ts_ser);
let ops_map = self let ops_map = self
.storage .storage
.row_fetch(&storage::Selector::Range { .row_fetch(&storage::Selector::Range {
@ -161,7 +161,7 @@ impl<S: BayouState> Bayou<S> {
} }
} }
ops.sort_by_key(|(ts, _)| *ts); ops.sort_by_key(|(ts, _)| *ts);
debug!("(sync) {} operations", ops.len()); tracing::debug!("(sync) {} operations", ops.len());
if ops.len() < self.history.len() { if ops.len() < self.history.len() {
bail!("Some operations have disappeared from storage!"); bail!("Some operations have disappeared from storage!");
@ -238,12 +238,16 @@ impl<S: BayouState> Bayou<S> {
Ok(()) Ok(())
} }
pub fn notifier(&self) -> std::sync::Weak<Notify> {
Arc::downgrade(&self.watch.learnt_remote_update)
}
/// Applies a new operation on the state. Once this function returns, /// Applies a new operation on the state. Once this function returns,
/// the operation has been safely persisted to storage backend. /// the operation has been safely persisted to storage backend.
/// Make sure to call `.opportunistic_sync()` before doing this, /// Make sure to call `.opportunistic_sync()` before doing this,
/// and even before calculating the `op` argument given here. /// and even before calculating the `op` argument given here.
pub async fn push(&mut self, op: S::Op) -> Result<()> { pub async fn push(&mut self, op: S::Op) -> Result<()> {
debug!("(push) add operation: {:?}", op); tracing::debug!("(push) add operation: {:?}", op);
let ts = Timestamp::after( let ts = Timestamp::after(
self.history self.history
@ -257,7 +261,7 @@ impl<S: BayouState> Bayou<S> {
seal_serialize(&op, &self.key)?, seal_serialize(&op, &self.key)?,
); );
self.storage.row_insert(vec![row_val]).await?; self.storage.row_insert(vec![row_val]).await?;
self.watch.notify.notify_one(); self.watch.propagate_local_update.notify_one();
let new_state = self.state().apply(&op); let new_state = self.state().apply(&op);
self.history.push((ts, op, Some(new_state))); self.history.push((ts, op, Some(new_state)));
@ -305,18 +309,18 @@ impl<S: BayouState> Bayou<S> {
{ {
Some(i) => i, Some(i) => i,
None => { None => {
debug!("(cp) Oldest operation is too recent to trigger checkpoint"); tracing::debug!("(cp) Oldest operation is too recent to trigger checkpoint");
return Ok(()); return Ok(());
} }
}; };
if i_cp < CHECKPOINT_MIN_OPS { if i_cp < CHECKPOINT_MIN_OPS {
debug!("(cp) Not enough old operations to trigger checkpoint"); tracing::debug!("(cp) Not enough old operations to trigger checkpoint");
return Ok(()); return Ok(());
} }
let ts_cp = self.history[i_cp].0; let ts_cp = self.history[i_cp].0;
debug!( tracing::debug!(
"(cp) we could checkpoint at time {} (index {} in history)", "(cp) we could checkpoint at time {} (index {} in history)",
ts_cp.to_string(), ts_cp.to_string(),
i_cp i_cp
@ -324,13 +328,13 @@ impl<S: BayouState> Bayou<S> {
// Check existing checkpoints: if last one is too recent, don't checkpoint again. // Check existing checkpoints: if last one is too recent, don't checkpoint again.
let existing_checkpoints = self.list_checkpoints().await?; let existing_checkpoints = self.list_checkpoints().await?;
debug!("(cp) listed checkpoints: {:?}", existing_checkpoints); tracing::debug!("(cp) listed checkpoints: {:?}", existing_checkpoints);
if let Some(last_cp) = existing_checkpoints.last() { if let Some(last_cp) = existing_checkpoints.last() {
if (ts_cp.msec as i128 - last_cp.0.msec as i128) if (ts_cp.msec as i128 - last_cp.0.msec as i128)
< CHECKPOINT_INTERVAL.as_millis() as i128 < CHECKPOINT_INTERVAL.as_millis() as i128
{ {
debug!( tracing::debug!(
"(cp) last checkpoint is too recent: {}, not checkpointing", "(cp) last checkpoint is too recent: {}, not checkpointing",
last_cp.0.to_string() last_cp.0.to_string()
); );
@ -338,7 +342,7 @@ impl<S: BayouState> Bayou<S> {
} }
} }
debug!("(cp) saving checkpoint at {}", ts_cp.to_string()); tracing::debug!("(cp) saving checkpoint at {}", ts_cp.to_string());
// Calculate state at time of checkpoint // Calculate state at time of checkpoint
let mut last_known_state = (0, &self.checkpoint.1); let mut last_known_state = (0, &self.checkpoint.1);
@ -354,7 +358,7 @@ impl<S: BayouState> Bayou<S> {
// Serialize and save checkpoint // Serialize and save checkpoint
let cryptoblob = seal_serialize(&state_cp, &self.key)?; let cryptoblob = seal_serialize(&state_cp, &self.key)?;
debug!("(cp) checkpoint body length: {}", cryptoblob.len()); tracing::debug!("(cp) checkpoint body length: {}", cryptoblob.len());
let blob_val = storage::BlobVal::new( let blob_val = storage::BlobVal::new(
storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())), storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
@ -369,7 +373,7 @@ impl<S: BayouState> Bayou<S> {
// Delete blobs // Delete blobs
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() { for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
debug!("(cp) drop old checkpoint {}", key); tracing::debug!("(cp) drop old checkpoint {}", key);
self.storage self.storage
.blob_rm(&storage::BlobRef(key.to_string())) .blob_rm(&storage::BlobRef(key.to_string()))
.await?; .await?;
@ -423,7 +427,8 @@ impl<S: BayouState> Bayou<S> {
struct K2vWatch { struct K2vWatch {
target: storage::RowRef, target: storage::RowRef,
rx: watch::Receiver<storage::RowRef>, rx: watch::Receiver<storage::RowRef>,
notify: Notify, propagate_local_update: Notify,
learnt_remote_update: Arc<Notify>,
} }
impl K2vWatch { impl K2vWatch {
@ -434,9 +439,15 @@ impl K2vWatch {
let storage = creds.storage.build().await?; let storage = creds.storage.build().await?;
let (tx, rx) = watch::channel::<storage::RowRef>(target.clone()); let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
let notify = Notify::new(); let propagate_local_update = Notify::new();
let learnt_remote_update = Arc::new(Notify::new());
let watch = Arc::new(K2vWatch { target, rx, notify }); let watch = Arc::new(K2vWatch {
target,
rx,
propagate_local_update,
learnt_remote_update,
});
tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
@ -448,18 +459,24 @@ impl K2vWatch {
storage: storage::Store, storage: storage::Store,
tx: watch::Sender<storage::RowRef>, tx: watch::Sender<storage::RowRef>,
) { ) {
let mut row = match Weak::upgrade(&self_weak) { let (mut row, remote_update) = match Weak::upgrade(&self_weak) {
Some(this) => this.target.clone(), Some(this) => (this.target.clone(), this.learnt_remote_update.clone()),
None => return, None => return,
}; };
while let Some(this) = Weak::upgrade(&self_weak) { while let Some(this) = Weak::upgrade(&self_weak) {
debug!( tracing::debug!(
"bayou k2v watch bg loop iter ({}, {})", "bayou k2v watch bg loop iter ({}, {})",
this.target.uid.shard, this.target.uid.sort this.target.uid.shard,
this.target.uid.sort
); );
tokio::select!( tokio::select!(
// Needed to exit: will force a loop iteration every minutes,
// that will stop the loop if other Arc references have been dropped
// and free resources. Otherwise we would be blocked waiting forever...
_ = tokio::time::sleep(Duration::from_secs(60)) => continue, _ = tokio::time::sleep(Duration::from_secs(60)) => continue,
// Watch if another instance has modified the log
update = storage.row_poll(&row) => { update = storage.row_poll(&row) => {
match update { match update {
Err(e) => { Err(e) => {
@ -468,23 +485,30 @@ impl K2vWatch {
} }
Ok(new_value) => { Ok(new_value) => {
row = new_value.row_ref; row = new_value.row_ref;
if tx.send(row.clone()).is_err() { if let Err(e) = tx.send(row.clone()) {
tracing::warn!(err=?e, "(watch) can't record the new log ref");
break; break;
} }
tracing::debug!(row=?row, "(watch) learnt remote update");
this.learnt_remote_update.notify_waiters();
} }
} }
} }
_ = this.notify.notified() => {
// It appears we have modified the log, informing other people
_ = this.propagate_local_update.notified() => {
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec(); let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
let row_val = storage::RowVal::new(row.clone(), rand); let row_val = storage::RowVal::new(row.clone(), rand);
if let Err(e) = storage.row_insert(vec![row_val]).await if let Err(e) = storage.row_insert(vec![row_val]).await
{ {
error!("Error in bayou k2v watch updater loop: {}", e); tracing::error!("Error in bayou k2v watch updater loop: {}", e);
tokio::time::sleep(Duration::from_secs(30)).await; tokio::time::sleep(Duration::from_secs(30)).await;
} }
} }
); );
} }
info!("bayou k2v watch bg loop exiting"); // unblock listeners
remote_update.notify_waiters();
tracing::info!("bayou k2v watch bg loop exiting");
} }
} }

View file

@ -1,5 +1,5 @@
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
use imap_codec::imap_types::command::FetchModifier; use imap_codec::imap_types::command::FetchModifier;
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
/// Internal decisions based on fetched attributes /// Internal decisions based on fetched attributes
/// passed by the client /// passed by the client
@ -8,7 +8,11 @@ pub struct AttributesProxy {
pub attrs: Vec<MessageDataItemName<'static>>, pub attrs: Vec<MessageDataItemName<'static>>,
} }
impl AttributesProxy { impl AttributesProxy {
pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self { pub fn new(
attrs: &MacroOrMessageDataItemNames<'static>,
modifiers: &[FetchModifier],
is_uid_fetch: bool,
) -> Self {
// Expand macros // Expand macros
let mut fetch_attrs = match attrs { let mut fetch_attrs = match attrs {
MacroOrMessageDataItemNames::Macro(m) => { MacroOrMessageDataItemNames::Macro(m) => {
@ -44,32 +48,30 @@ impl AttributesProxy {
} }
pub fn is_enabling_condstore(&self) -> bool { pub fn is_enabling_condstore(&self) -> bool {
self.attrs.iter().any(|x| { self.attrs
matches!(x, MessageDataItemName::ModSeq) .iter()
}) .any(|x| matches!(x, MessageDataItemName::ModSeq))
} }
pub fn need_body(&self) -> bool { pub fn need_body(&self) -> bool {
self.attrs.iter().any(|x| { self.attrs.iter().any(|x| match x {
match x { MessageDataItemName::Body
MessageDataItemName::Body | MessageDataItemName::Rfc822
| MessageDataItemName::Rfc822 | MessageDataItemName::Rfc822Text
| MessageDataItemName::Rfc822Text | MessageDataItemName::BodyStructure => true,
| MessageDataItemName::BodyStructure => true,
MessageDataItemName::BodyExt { MessageDataItemName::BodyExt {
section: Some(section), section: Some(section),
partial: _, partial: _,
peek: _, peek: _,
} => match section { } => match section {
Section::Header(None) Section::Header(None)
| Section::HeaderFields(None, _) | Section::HeaderFields(None, _)
| Section::HeaderFieldsNot(None, _) => false, | Section::HeaderFieldsNot(None, _) => false,
_ => true, _ => true,
}, },
MessageDataItemName::BodyExt { .. } => true, MessageDataItemName::BodyExt { .. } => true,
_ => false, _ => false,
}
}) })
} }
} }

View file

@ -1,4 +1,4 @@
use imap_codec::imap_types::command::{FetchModifier, StoreModifier, SelectExamineModifier}; use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier};
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
use imap_codec::imap_types::response::Capability; use imap_codec::imap_types::response::Capability;
@ -30,6 +30,7 @@ impl Default for ServerCapability {
Capability::Enable, Capability::Enable,
Capability::Move, Capability::Move,
Capability::LiteralPlus, Capability::LiteralPlus,
Capability::Idle,
capability_unselect(), capability_unselect(),
capability_condstore(), capability_condstore(),
//capability_qresync(), //capability_qresync(),
@ -72,7 +73,6 @@ impl ClientStatus {
} }
} }
pub struct ClientCapability { pub struct ClientCapability {
pub condstore: ClientStatus, pub condstore: ClientStatus,
pub utf8kind: Option<Utf8Kind>, pub utf8kind: Option<Utf8Kind>,
@ -100,13 +100,19 @@ impl ClientCapability {
} }
pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) { pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) {
if mods.iter().any(|x| matches!(x, FetchModifier::ChangedSince(..))) { if mods
.iter()
.any(|x| matches!(x, FetchModifier::ChangedSince(..)))
{
self.enable_condstore() self.enable_condstore()
} }
} }
pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) { pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) {
if mods.iter().any(|x| matches!(x, StoreModifier::UnchangedSince(..))) { if mods
.iter()
.any(|x| matches!(x, StoreModifier::UnchangedSince(..)))
{
self.enable_condstore() self.enable_condstore()
} }
} }

View file

@ -405,7 +405,7 @@ impl<'a> AuthenticatedContext<'a> {
it is therefore correct to not return it even if there are unseen messages it is therefore correct to not return it even if there are unseen messages
RFC9051 (imap4rev2) says that OK [UNSEEN] responses are deprecated after SELECT and EXAMINE RFC9051 (imap4rev2) says that OK [UNSEEN] responses are deprecated after SELECT and EXAMINE
For Aerogramme, we just don't send the OK [UNSEEN], it's correct to do in both specifications. For Aerogramme, we just don't send the OK [UNSEEN], it's correct to do in both specifications.
20 select "INBOX.achats" 20 select "INBOX.achats"
* FLAGS (\Answered \Flagged \Deleted \Seen \Draft $Forwarded JUNK $label1) * FLAGS (\Answered \Flagged \Deleted \Seen \Draft $Forwarded JUNK $label1)
@ -453,7 +453,7 @@ impl<'a> AuthenticatedContext<'a> {
.code(Code::ReadWrite) .code(Code::ReadWrite)
.set_body(data) .set_body(data)
.ok()?, .ok()?,
flow::Transition::Select(mb), flow::Transition::Select(mb, flow::MailboxPerm::ReadWrite),
)) ))
} }
@ -491,7 +491,7 @@ impl<'a> AuthenticatedContext<'a> {
.code(Code::ReadOnly) .code(Code::ReadOnly)
.set_body(data) .set_body(data)
.ok()?, .ok()?,
flow::Transition::Examine(mb), flow::Transition::Select(mb, flow::MailboxPerm::ReadOnly),
)) ))
} }

View file

@ -1,164 +0,0 @@
use std::sync::Arc;
use std::num::NonZeroU64;
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier};
use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated};
use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response;
use crate::mail::user::User;
pub struct ExaminedContext<'a> {
pub req: &'a Command<'static>,
pub user: &'a Arc<User>,
pub mailbox: &'a mut MailboxView,
pub server_capabilities: &'a ServerCapability,
pub client_capabilities: &'a mut ClientCapability,
}
pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
match &ctx.req.body {
// Any State
// noop is specific to this state
CommandBody::Capability => {
anystate::capability(ctx.req.tag.clone(), ctx.server_capabilities)
}
CommandBody::Logout => anystate::logout(),
// Specific to the EXAMINE state (specialization of the SELECTED state)
// ~3 commands -> close, fetch, search + NOOP
CommandBody::Close => ctx.close("CLOSE").await,
CommandBody::Fetch {
sequence_set,
macro_or_item_names,
modifiers,
uid,
} => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
CommandBody::Search {
charset,
criteria,
uid,
} => ctx.search(charset, criteria, uid).await,
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
CommandBody::Expunge { .. } | CommandBody::Store { .. } => Ok((
Response::build()
.to_req(ctx.req)
.message("Forbidden command: can't write in read-only mode (EXAMINE)")
.no()?,
flow::Transition::None,
)),
// UNSELECT extension (rfc3691)
CommandBody::Unselect => ctx.close("UNSELECT").await,
// In examined mode, we fallback to authenticated when needed
_ => {
authenticated::dispatch(authenticated::AuthenticatedContext {
req: ctx.req,
server_capabilities: ctx.server_capabilities,
client_capabilities: ctx.client_capabilities,
user: ctx.user,
})
.await
}
}
}
// --- PRIVATE ---
impl<'a> ExaminedContext<'a> {
/// CLOSE in examined state is not the same as in selected state
/// (in selected state it also does an EXPUNGE, here it doesn't)
async fn close(self, kind: &str) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
.message(format!("{} completed", kind))
.ok()?,
flow::Transition::Unselect,
))
}
pub async fn fetch(
self,
sequence_set: &SequenceSet,
attributes: &'a MacroOrMessageDataItemNames<'static>,
modifiers: &[FetchModifier],
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
let ap = AttributesProxy::new(attributes, modifiers, *uid);
let mut changed_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m {
FetchModifier::ChangedSince(val) => {
changed_since = Some(*val);
},
});
match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
Ok(resp) => {
// Capabilities enabling logic only on successful command
// (according to my understanding of the spec)
self.client_capabilities.attributes_enable(&ap);
self.client_capabilities.fetch_modifiers_enable(modifiers);
Ok((
Response::build()
.to_req(self.req)
.message("FETCH completed")
.set_body(resp)
.ok()?,
flow::Transition::None,
))
},
Err(e) => Ok((
Response::build()
.to_req(self.req)
.message(e.to_string())
.no()?,
flow::Transition::None,
)),
}
}
pub async fn search(
self,
charset: &Option<Charset<'a>>,
criteria: &SearchKey<'a>,
uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> {
let (found, enable_condstore) = self.mailbox.search(charset, criteria, *uid).await?;
if enable_condstore {
self.client_capabilities.enable_condstore();
}
Ok((
Response::build()
.to_req(self.req)
.set_body(found)
.message("SEARCH completed")
.ok()?,
flow::Transition::None,
))
}
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
self.mailbox.internal.mailbox.force_sync().await?;
let updates = self.mailbox.update(UpdateParameters::default()).await?;
Ok((
Response::build()
.to_req(self.req)
.message("NOOP completed.")
.set_body(updates)
.ok()?,
flow::Transition::None,
))
}
}

View file

@ -1,7 +1,6 @@
pub mod anonymous; pub mod anonymous;
pub mod anystate; pub mod anystate;
pub mod authenticated; pub mod authenticated;
pub mod examined;
pub mod selected; pub mod selected;
use crate::mail::user::INBOX; use crate::mail::user::INBOX;

View file

@ -1,5 +1,5 @@
use std::sync::Arc;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
@ -11,12 +11,12 @@ use imap_codec::imap_types::response::{Code, CodeOther};
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated, MailboxName}; use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response; use crate::imap::response::Response;
use crate::imap::attributes::AttributesProxy;
use crate::mail::user::User; use crate::mail::user::User;
pub struct SelectedContext<'a> { pub struct SelectedContext<'a> {
@ -25,6 +25,7 @@ pub struct SelectedContext<'a> {
pub mailbox: &'a mut MailboxView, pub mailbox: &'a mut MailboxView,
pub server_capabilities: &'a ServerCapability, pub server_capabilities: &'a ServerCapability,
pub client_capabilities: &'a mut ClientCapability, pub client_capabilities: &'a mut ClientCapability,
pub perm: &'a flow::MailboxPerm,
} }
pub async fn dispatch<'a>( pub async fn dispatch<'a>(
@ -39,14 +40,20 @@ pub async fn dispatch<'a>(
CommandBody::Logout => anystate::logout(), CommandBody::Logout => anystate::logout(),
// Specific to this state (7 commands + NOOP) // Specific to this state (7 commands + NOOP)
CommandBody::Close => ctx.close().await, CommandBody::Close => match ctx.perm {
flow::MailboxPerm::ReadWrite => ctx.close().await,
flow::MailboxPerm::ReadOnly => ctx.examine_close().await,
},
CommandBody::Noop | CommandBody::Check => ctx.noop().await, CommandBody::Noop | CommandBody::Check => ctx.noop().await,
CommandBody::Fetch { CommandBody::Fetch {
sequence_set, sequence_set,
macro_or_item_names, macro_or_item_names,
modifiers, modifiers,
uid, uid,
} => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await, } => {
ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid)
.await
}
CommandBody::Search { CommandBody::Search {
charset, charset,
criteria, criteria,
@ -60,7 +67,10 @@ pub async fn dispatch<'a>(
flags, flags,
modifiers, modifiers,
uid, uid,
} => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await, } => {
ctx.store(sequence_set, kind, response, flags, modifiers, uid)
.await
}
CommandBody::Copy { CommandBody::Copy {
sequence_set, sequence_set,
mailbox, mailbox,
@ -75,6 +85,15 @@ pub async fn dispatch<'a>(
// UNSELECT extension (rfc3691) // UNSELECT extension (rfc3691)
CommandBody::Unselect => ctx.unselect().await, CommandBody::Unselect => ctx.unselect().await,
// IDLE extension (rfc2177)
CommandBody::Idle => Ok((
Response::build()
.to_req(ctx.req)
.message("DUMMY command due to anti-pattern in the code")
.ok()?,
flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()),
)),
// In selected mode, we fallback to authenticated when needed // In selected mode, we fallback to authenticated when needed
_ => { _ => {
authenticated::dispatch(authenticated::AuthenticatedContext { authenticated::dispatch(authenticated::AuthenticatedContext {
@ -102,6 +121,18 @@ impl<'a> SelectedContext<'a> {
)) ))
} }
/// CLOSE in examined state is not the same as in selected state
/// (in selected state it also does an EXPUNGE, here it doesn't)
async fn examine_close(self) -> Result<(Response<'static>, flow::Transition)> {
Ok((
Response::build()
.to_req(self.req)
.message("CLOSE completed")
.ok()?,
flow::Transition::Unselect,
))
}
async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> { async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> {
Ok(( Ok((
Response::build() Response::build()
@ -124,10 +155,14 @@ impl<'a> SelectedContext<'a> {
modifiers.iter().for_each(|m| match m { modifiers.iter().for_each(|m| match m {
FetchModifier::ChangedSince(val) => { FetchModifier::ChangedSince(val) => {
changed_since = Some(*val); changed_since = Some(*val);
}, }
}); });
match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await { match self
.mailbox
.fetch(sequence_set, &ap, changed_since, uid)
.await
{
Ok(resp) => { Ok(resp) => {
// Capabilities enabling logic only on successful command // Capabilities enabling logic only on successful command
// (according to my understanding of the spec) // (according to my understanding of the spec)
@ -143,7 +178,7 @@ impl<'a> SelectedContext<'a> {
.ok()?, .ok()?,
flow::Transition::None, flow::Transition::None,
)) ))
}, }
Err(e) => Ok(( Err(e) => Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -189,6 +224,10 @@ impl<'a> SelectedContext<'a> {
} }
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> { async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None));
}
let tag = self.req.tag.clone(); let tag = self.req.tag.clone();
let data = self.mailbox.expunge().await?; let data = self.mailbox.expunge().await?;
@ -211,11 +250,15 @@ impl<'a> SelectedContext<'a> {
modifiers: &[StoreModifier], modifiers: &[StoreModifier],
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None));
}
let mut unchanged_since: Option<NonZeroU64> = None; let mut unchanged_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m { modifiers.iter().for_each(|m| match m {
StoreModifier::UnchangedSince(val) => { StoreModifier::UnchangedSince(val) => {
unchanged_since = Some(*val); unchanged_since = Some(*val);
}, }
}); });
let (data, modified) = self let (data, modified) = self
@ -224,25 +267,30 @@ impl<'a> SelectedContext<'a> {
.await?; .await?;
let mut ok_resp = Response::build() let mut ok_resp = Response::build()
.to_req(self.req) .to_req(self.req)
.message("STORE completed") .message("STORE completed")
.set_body(data); .set_body(data);
match modified[..] { match modified[..] {
[] => (), [] => (),
[_head, ..] => { [_head, ..] => {
let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::<Vec<_>>().join(",")); let modified_str = format!(
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes()))); "MODIFIED {}",
}, modified
.into_iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(",")
);
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(
modified_str.into_bytes(),
)));
}
}; };
self.client_capabilities.store_modifiers_enable(modifiers); self.client_capabilities.store_modifiers_enable(modifiers);
Ok((ok_resp.ok()?, Ok((ok_resp.ok()?, flow::Transition::None))
flow::Transition::None,
))
} }
async fn copy( async fn copy(
@ -251,6 +299,11 @@ impl<'a> SelectedContext<'a> {
mailbox: &MailboxCodec<'a>, mailbox: &MailboxCodec<'a>,
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
//@FIXME Could copy be valid in EXAMINE mode?
if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None));
}
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?; let mb_opt = self.user.open_mailbox(&name).await?;
@ -303,6 +356,10 @@ impl<'a> SelectedContext<'a> {
mailbox: &MailboxCodec<'a>, mailbox: &MailboxCodec<'a>,
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None));
}
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
let mb_opt = self.user.open_mailbox(&name).await?; let mb_opt = self.user.open_mailbox(&name).await?;
@ -350,4 +407,17 @@ impl<'a> SelectedContext<'a> {
flow::Transition::None, flow::Transition::None,
)) ))
} }
fn fail_read_only(&self) -> Option<Response<'static>> {
match self.perm {
flow::MailboxPerm::ReadWrite => None,
flow::MailboxPerm::ReadOnly => Some(
Response::build()
.to_req(self.req)
.message("Write command are forbidden while exmining mailbox")
.no()
.unwrap(),
),
}
}
} }

View file

@ -1,9 +1,11 @@
use std::error::Error as StdError; use std::error::Error as StdError;
use std::fmt; use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Notify;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User; use crate::mail::user::User;
use imap_codec::imap_types::core::Tag;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -19,44 +21,84 @@ impl StdError for Error {}
pub enum State { pub enum State {
NotAuthenticated, NotAuthenticated,
Authenticated(Arc<User>), Authenticated(Arc<User>),
Selected(Arc<User>, MailboxView), Selected(Arc<User>, MailboxView, MailboxPerm),
// Examined is like Selected, but indicates that the mailbox is read-only Idle(
Examined(Arc<User>, MailboxView), Arc<User>,
MailboxView,
MailboxPerm,
Tag<'static>,
Arc<Notify>,
),
Logout, Logout,
} }
impl fmt::Display for State {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use State::*;
match self {
NotAuthenticated => write!(f, "NotAuthenticated"),
Authenticated(..) => write!(f, "Authenticated"),
Selected(..) => write!(f, "Selected"),
Idle(..) => write!(f, "Idle"),
Logout => write!(f, "Logout"),
}
}
}
#[derive(Clone)]
pub enum MailboxPerm {
ReadOnly,
ReadWrite,
}
pub enum Transition { pub enum Transition {
None, None,
Authenticate(Arc<User>), Authenticate(Arc<User>),
Examine(MailboxView), Select(MailboxView, MailboxPerm),
Select(MailboxView), Idle(Tag<'static>, Notify),
UnIdle,
Unselect, Unselect,
Logout, Logout,
} }
impl fmt::Display for Transition {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
use Transition::*;
match self {
None => write!(f, "None"),
Authenticate(..) => write!(f, "Authenticated"),
Select(..) => write!(f, "Selected"),
Idle(..) => write!(f, "Idle"),
UnIdle => write!(f, "UnIdle"),
Unselect => write!(f, "Unselect"),
Logout => write!(f, "Logout"),
}
}
}
// See RFC3501 section 3. // See RFC3501 section 3.
// https://datatracker.ietf.org/doc/html/rfc3501#page-13 // https://datatracker.ietf.org/doc/html/rfc3501#page-13
impl State { impl State {
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> { pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
let new_state = match (&self, tr) { tracing::debug!(state=%self, transition=%tr, "try change state");
(_s, Transition::None) => return Ok(()),
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Select(m),
) => State::Selected(u.clone(), m),
(
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
Transition::Examine(m),
) => State::Examined(u.clone(), m),
(State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
State::Authenticated(u.clone())
}
(_, Transition::Logout) => State::Logout,
_ => return Err(Error::ForbiddenTransition),
};
let new_state = match (std::mem::replace(self, State::Logout), tr) {
(s, Transition::None) => s,
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
(State::Authenticated(u) | State::Selected(u, _, _), Transition::Select(m, p)) => {
State::Selected(u, m, p)
}
(State::Selected(u, _, _), Transition::Unselect) => State::Authenticated(u.clone()),
(State::Selected(u, m, p), Transition::Idle(t, s)) => {
State::Idle(u, m, p, t, Arc::new(s))
}
(State::Idle(u, m, p, _, _), Transition::UnIdle) => State::Selected(u, m, p),
(_, Transition::Logout) => State::Logout,
(s, t) => {
tracing::error!(state=%s, transition=%t, "forbidden transition");
return Err(Error::ForbiddenTransition);
}
};
*self = new_state; *self = new_state;
tracing::debug!(state=%self, "transition succeeded");
Ok(()) Ok(())
} }

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroU64}; use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashSet;
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
@ -13,11 +13,11 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::mail::unique_ident::UniqueIdent;
use crate::mail::mailbox::Mailbox; use crate::mail::mailbox::Mailbox;
use crate::mail::query::QueryScope; use crate::mail::query::QueryScope;
use crate::mail::snapshot::FrozenMailbox; use crate::mail::snapshot::FrozenMailbox;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq}; use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
use crate::mail::unique_ident::UniqueIdent;
use crate::imap::attributes::AttributesProxy; use crate::imap::attributes::AttributesProxy;
use crate::imap::flags; use crate::imap::flags;
@ -64,7 +64,7 @@ pub struct MailboxView {
impl MailboxView { impl MailboxView {
/// Creates a new IMAP view into a mailbox. /// Creates a new IMAP view into a mailbox.
pub async fn new(mailbox: Arc<Mailbox>, is_cond: bool) -> Self { pub async fn new(mailbox: Arc<Mailbox>, is_cond: bool) -> Self {
Self { Self {
internal: mailbox.frozen().await, internal: mailbox.frozen().await,
is_condstore: is_cond, is_condstore: is_cond,
} }
@ -130,11 +130,9 @@ impl MailboxView {
let new_mail = new_snapshot.table.get(uuid); let new_mail = new_snapshot.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail { if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, modseq, flags)) = new_mail { if let Some((uid, modseq, flags)) = new_mail {
let mut items = vec![ let mut items = vec![MessageDataItem::Flags(
MessageDataItem::Flags( flags.iter().filter_map(|f| flags::from_str(f)).collect(),
flags.iter().filter_map(|f| flags::from_str(f)).collect(), )];
),
];
if params.with_uid { if params.with_uid {
items.push(MessageDataItem::Uid(*uid)); items.push(MessageDataItem::Uid(*uid));
@ -169,7 +167,7 @@ impl MailboxView {
data.push(self.highestmodseq_status()?); data.push(self.highestmodseq_status()?);
} }
/*self.unseen_first_status()? /*self.unseen_first_status()?
.map(|unseen_status| data.push(unseen_status));*/ .map(|unseen_status| data.push(unseen_status));*/
Ok(data) Ok(data)
} }
@ -188,8 +186,8 @@ impl MailboxView {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let idx = self.index()?; let idx = self.index()?;
let (editable, in_conflict) = idx let (editable, in_conflict) =
.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?; idx.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
for mi in editable.iter() { for mi in editable.iter() {
match kind { match kind {
@ -215,15 +213,30 @@ impl MailboxView {
_ => in_conflict.into_iter().map(|midx| midx.i).collect(), _ => in_conflict.into_iter().map(|midx| midx.i).collect(),
}; };
let summary = self.update(UpdateParameters { let summary = self
with_uid: *is_uid_store, .update(UpdateParameters {
with_modseq: unchanged_since.is_some(), with_uid: *is_uid_store,
silence, with_modseq: unchanged_since.is_some(),
}).await?; silence,
})
.await?;
Ok((summary, conflict_id_or_uid)) Ok((summary, conflict_id_or_uid))
} }
pub async fn idle_sync(&mut self) -> Result<Vec<Body<'static>>> {
self.internal
.mailbox
.notify()
.await
.upgrade()
.ok_or(anyhow!("test"))?
.notified()
.await;
self.internal.mailbox.opportunistic_sync().await?;
self.update(UpdateParameters::default()).await
}
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> { pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
self.internal.sync().await?; self.internal.sync().await?;
let state = self.internal.peek().await; let state = self.internal.peek().await;
@ -294,10 +307,12 @@ impl MailboxView {
ret.push((mi.uid, dest_uid)); ret.push((mi.uid, dest_uid));
} }
let update = self.update(UpdateParameters { let update = self
with_uid: *is_uid_copy, .update(UpdateParameters {
..UpdateParameters::default() with_uid: *is_uid_copy,
}).await?; ..UpdateParameters::default()
})
.await?;
Ok((to_state.uidvalidity, ret, update)) Ok((to_state.uidvalidity, ret, update))
} }
@ -321,11 +336,7 @@ impl MailboxView {
}; };
tracing::debug!("Query scope {:?}", query_scope); tracing::debug!("Query scope {:?}", query_scope);
let idx = self.index()?; let idx = self.index()?;
let mail_idx_list = idx.fetch_changed_since( let mail_idx_list = idx.fetch_changed_since(sequence_set, changed_since, *is_uid_fetch)?;
sequence_set,
changed_since,
*is_uid_fetch
)?;
// [2/6] Fetch the emails // [2/6] Fetch the emails
let uuids = mail_idx_list let uuids = mail_idx_list
@ -414,12 +425,19 @@ impl MailboxView {
let maybe_modseq = match is_modseq { let maybe_modseq = match is_modseq {
true => { true => {
let final_selection = kept_idx.iter().chain(kept_query.iter()); let final_selection = kept_idx.iter().chain(kept_query.iter());
final_selection.map(|in_idx| in_idx.modseq).max().map(|r| NonZeroU64::try_from(r)).transpose()? final_selection
}, .map(|in_idx| in_idx.modseq)
.max()
.map(|r| NonZeroU64::try_from(r))
.transpose()?
}
_ => None, _ => None,
}; };
Ok((vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], is_modseq)) Ok((
vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
is_modseq,
))
} }
// ---- // ----
@ -463,8 +481,10 @@ impl MailboxView {
pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> { pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> {
Ok(Body::Status(Status::ok( Ok(Body::Status(Status::ok(
None, None,
Some(Code::Other(CodeOther::unvalidated(format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes()))), Some(Code::Other(CodeOther::unvalidated(
format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes(),
))),
"Highest", "Highest",
)?)) )?))
} }

View file

@ -8,24 +8,30 @@ mod index;
mod mail_view; mod mail_view;
mod mailbox_view; mod mailbox_view;
mod mime_view; mod mime_view;
mod request;
mod response; mod response;
mod search; mod search;
mod session; mod session;
use std::net::SocketAddr; use std::net::SocketAddr;
use anyhow::Result; use anyhow::{bail, Result};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::sync::watch; use tokio::sync::watch;
use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream; use imap_flow::stream::AnyStream;
use crate::config::ImapConfig; use crate::config::ImapConfig;
use crate::imap::capability::ServerCapability; use crate::imap::capability::ServerCapability;
use crate::imap::request::Request;
use crate::imap::response::{Body, ResponseOrIdle};
use crate::imap::session::Instance;
use crate::login::ArcLoginProvider; use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL /// Server is a thin wrapper to register our Services in BàL
@ -35,8 +41,8 @@ pub struct Server {
capabilities: ServerCapability, capabilities: ServerCapability,
} }
#[derive(Clone)]
struct ClientContext { struct ClientContext {
stream: AnyStream,
addr: SocketAddr, addr: SocketAddr,
login_provider: ArcLoginProvider, login_provider: ArcLoginProvider,
must_exit: watch::Receiver<bool>, must_exit: watch::Receiver<bool>,
@ -74,13 +80,12 @@ impl Server {
tracing::info!("IMAP: accepted connection from {}", remote_addr); tracing::info!("IMAP: accepted connection from {}", remote_addr);
let client = ClientContext { let client = ClientContext {
stream: AnyStream::new(socket),
addr: remote_addr.clone(), addr: remote_addr.clone(),
login_provider: self.login_provider.clone(), login_provider: self.login_provider.clone(),
must_exit: must_exit.clone(), must_exit: must_exit.clone(),
server_capabilities: self.capabilities.clone(), server_capabilities: self.capabilities.clone(),
}; };
let conn = tokio::spawn(client_wrapper(client)); let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket)));
connections.push(conn); connections.push(conn);
} }
drop(tcp); drop(tcp);
@ -92,46 +97,87 @@ impl Server {
} }
} }
async fn client_wrapper(ctx: ClientContext) { use std::sync::Arc;
let addr = ctx.addr.clone(); use tokio::sync::mpsc::*;
match client(ctx).await { use tokio::sync::Notify;
Ok(()) => { use tokio_util::bytes::BytesMut;
tracing::debug!("closing successful session for {:?}", addr); enum LoopMode {
} Quit,
Err(e) => { Interactive,
tracing::error!("closing errored session for {:?}: {}", addr, e); Idle(BytesMut, Arc<Notify>),
}
}
} }
async fn client(mut ctx: ClientContext) -> Result<()> { // @FIXME a full refactor of this part of the code will be needed sooner or later
// Send greeting struct NetLoop {
let (mut server, _) = ServerFlow::send_greeting( ctx: ClientContext,
ctx.stream, server: ServerFlow,
ServerFlowOptions { cmd_tx: Sender<Request>,
crlf_relaxed: false, resp_rx: UnboundedReceiver<ResponseOrIdle>,
literal_accept_text: Text::unvalidated("OK"), }
literal_reject_text: Text::unvalidated("Literal rejected"),
..ServerFlowOptions::default() impl NetLoop {
}, async fn handler(ctx: ClientContext, sock: AnyStream) {
Greeting::ok( let addr = ctx.addr.clone();
Some(Code::Capability(ctx.server_capabilities.to_vec())),
"Aerogramme", let nl = match Self::new(ctx, sock).await {
Ok(nl) => {
tracing::debug!(addr=?addr, "netloop successfully initialized");
nl
}
Err(e) => {
tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
return;
}
};
match nl.core().await {
Ok(()) => {
tracing::debug!("closing successful netloop core for {:?}", addr);
}
Err(e) => {
tracing::error!("closing errored netloop core for {:?}: {}", addr, e);
}
}
}
async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> {
// Send greeting
let (mut server, _) = ServerFlow::send_greeting(
sock,
ServerFlowOptions {
crlf_relaxed: false,
literal_accept_text: Text::unvalidated("OK"),
literal_reject_text: Text::unvalidated("Literal rejected"),
..ServerFlowOptions::default()
},
Greeting::ok(
Some(Code::Capability(ctx.server_capabilities.to_vec())),
"Aerogramme",
)
.unwrap(),
) )
.unwrap(), .await?;
)
.await?;
use crate::imap::response::{Body, Response as MyResponse}; // Start a mailbox session in background
use crate::imap::session::Instance; let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3);
use imap_codec::imap_types::command::Command; let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>();
use imap_codec::imap_types::response::{Code, Response, Status}; tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
use tokio::sync::mpsc; // Return the object
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10); Ok(NetLoop {
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>(); ctx,
server,
cmd_tx,
resp_rx,
})
}
let bckgrnd = tokio::spawn(async move { /// Coms with the background session
async fn session(
ctx: ClientContext,
mut cmd_rx: Receiver<Request>,
resp_tx: UnboundedSender<ResponseOrIdle>,
) -> () {
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
loop { loop {
let cmd = match cmd_rx.recv().await { let cmd = match cmd_rx.recv().await {
@ -140,8 +186,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
}; };
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command"); tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
let maybe_response = session.command(cmd).await; let maybe_response = session.request(cmd).await;
tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response"); tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response");
match resp_tx.send(maybe_response) { match resp_tx.send(maybe_response) {
Err(_) => break, Err(_) => break,
@ -149,67 +195,150 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
}; };
} }
tracing::info!("runner is quitting"); tracing::info!("runner is quitting");
}); }
// Main loop async fn core(mut self) -> Result<()> {
loop { let mut mode = LoopMode::Interactive;
loop {
mode = match mode {
LoopMode::Interactive => self.interactive_mode().await?,
LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
LoopMode::Quit => break,
}
}
Ok(())
}
async fn interactive_mode(&mut self) -> Result<LoopMode> {
tokio::select! { tokio::select! {
// Managing imap_flow stuff // Managing imap_flow stuff
srv_evt = server.progress() => match srv_evt? { srv_evt = self.server.progress() => match srv_evt? {
ServerFlowEvent::ResponseSent { handle: _handle, response } => { ServerFlowEvent::ResponseSent { handle: _handle, response } => {
match response { match response {
Response::Status(Status::Bye(_)) => break, Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
_ => tracing::trace!("sent to {} content {:?}", ctx.addr, response), _ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
} }
}, },
ServerFlowEvent::CommandReceived { command } => { ServerFlowEvent::CommandReceived { command } => {
match cmd_tx.try_send(command) { match self.cmd_tx.try_send(Request::ImapCommand(command)) {
Ok(_) => (), Ok(_) => (),
Err(mpsc::error::TrySendError::Full(_)) => { Err(mpsc::error::TrySendError::Full(_)) => {
server.enqueue_status(Status::bye(None, "Too fast").unwrap()); self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr); tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
} }
_ => { _ => {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", ctx.addr); tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
} }
} }
}, },
flow => { flow => {
server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap()); self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow); tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
} }
}, },
// Managing response generated by Aerogramme // Managing response generated by Aerogramme
maybe_msg = resp_rx.recv() => { maybe_msg = self.resp_rx.recv() => match maybe_msg {
let response = match maybe_msg { Some(ResponseOrIdle::Response(response)) => {
None => { for body_elem in response.body.into_iter() {
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap()); let _handle = match body_elem {
tracing::error!("session task exited for {:?}, quitting", ctx.addr); Body::Data(d) => self.server.enqueue_data(d),
continue Body::Status(s) => self.server.enqueue_status(s),
}, };
Some(r) => r, }
}; self.server.enqueue_status(response.completion);
},
Some(ResponseOrIdle::StartIdle(stop)) => {
let cr = CommandContinuationRequest::basic(None, "Idling")?;
self.server.enqueue_continuation(cr);
self.cmd_tx.try_send(Request::Idle)?;
return Ok(LoopMode::Idle(BytesMut::new(), stop))
},
None => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
},
Some(_) => unreachable!(),
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => server.enqueue_data(d),
Body::Status(s) => server.enqueue_status(s),
};
}
server.enqueue_status(response.completion);
}, },
// When receiving a CTRL+C // When receiving a CTRL+C
_ = ctx.must_exit.changed() => { _ = self.ctx.must_exit.changed() => {
server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap()); self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
},
};
Ok(LoopMode::Interactive)
}
async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
// Flush send
loop {
match self.server.progress_send().await? {
Some(..) => continue,
None => break,
}
}
tokio::select! {
// Receiving IDLE event from background
maybe_msg = self.resp_rx.recv() => match maybe_msg {
// Session decided idle is terminated
Some(ResponseOrIdle::Response(response)) => {
for body_elem in response.body.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
self.server.enqueue_status(response.completion);
return Ok(LoopMode::Interactive)
},
// Session has some information for user
Some(ResponseOrIdle::IdleEvent(elems)) => {
for body_elem in elems.into_iter() {
let _handle = match body_elem {
Body::Data(d) => self.server.enqueue_data(d),
Body::Status(s) => self.server.enqueue_status(s),
};
}
self.cmd_tx.try_send(Request::Idle)?;
return Ok(LoopMode::Idle(buff, stop))
},
// Session crashed
None => {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
return Ok(LoopMode::Interactive)
},
// Session can't start idling while already idling, it's a logic error!
Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"),
},
// User is trying to interact with us
_read_client_bytes = self.server.stream.read(&mut buff) => {
use imap_codec::decode::Decoder;
let codec = imap_codec::IdleDoneCodec::new();
match codec.decode(&buff) {
Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => {
// Session will be informed that it must stop idle
// It will generate the "done" message and change the loop mode
stop.notify_one()
},
Err(_) => (),
_ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."),
};
return Ok(LoopMode::Idle(buff, stop))
},
// When receiving a CTRL+C
_ = self.ctx.must_exit.changed() => {
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
return Ok(LoopMode::Interactive)
}, },
}; };
} }
drop(cmd_tx);
bckgrnd.await?;
Ok(())
} }

7
src/imap/request.rs Normal file
View file

@ -0,0 +1,7 @@
use imap_codec::imap_types::command::Command;
#[derive(Debug)]
pub enum Request {
ImapCommand(Command<'static>),
Idle,
}

View file

@ -2,7 +2,10 @@ use anyhow::Result;
use imap_codec::imap_types::command::Command; use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::core::Tag; use imap_codec::imap_types::core::Tag;
use imap_codec::imap_types::response::{Code, Data, Status}; use imap_codec::imap_types::response::{Code, Data, Status};
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Debug)]
pub enum Body<'a> { pub enum Body<'a> {
Data(Data<'a>), Data(Data<'a>),
Status(Status<'a>), Status(Status<'a>),
@ -88,6 +91,7 @@ impl<'a> ResponseBuilder<'a> {
} }
} }
#[derive(Debug)]
pub struct Response<'a> { pub struct Response<'a> {
pub body: Vec<Body<'a>>, pub body: Vec<Body<'a>>,
pub completion: Status<'a>, pub completion: Status<'a>,
@ -110,3 +114,10 @@ impl<'a> Response<'a> {
}) })
} }
} }
#[derive(Debug)]
pub enum ResponseOrIdle {
Response(Response<'static>),
StartIdle(Arc<Notify>),
IdleEvent(Vec<Body<'static>>),
}

View file

@ -2,7 +2,7 @@ use std::num::{NonZeroU32, NonZeroU64};
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::search::{SearchKey, MetadataItemSearch}; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::imap::index::MailIndex; use crate::imap::index::MailIndex;
@ -115,12 +115,15 @@ impl<'a> Criteria<'a> {
pub fn is_modseq(&self) -> bool { pub fn is_modseq(&self) -> bool {
use SearchKey::*; use SearchKey::*;
match self.0 { match self.0 {
And(and_list) => and_list.as_ref().iter().any(|child| Criteria(child).is_modseq()), And(and_list) => and_list
.as_ref()
.iter()
.any(|child| Criteria(child).is_modseq()),
Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(), Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(),
Not(child) => Criteria(child).is_modseq(), Not(child) => Criteria(child).is_modseq(),
ModSeq { .. } => true, ModSeq { .. } => true,
_ => false, _ => false,
} }
} }
/// Returns emails that we now for sure we want to keep /// Returns emails that we now for sure we want to keep
@ -187,7 +190,10 @@ impl<'a> Criteria<'a> {
// Sequence logic // Sequence logic
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(), maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(),
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, midx).into(), ModSeq {
metadata_item,
modseq,
} => is_keep_modseq(metadata_item, modseq, midx).into(),
// All the stuff we can't evaluate yet // All the stuff we can't evaluate yet
Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_) Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_)
@ -225,7 +231,10 @@ impl<'a> Criteria<'a> {
//@FIXME Reevaluating our previous logic... //@FIXME Reevaluating our previous logic...
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx), maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx),
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(), ModSeq {
metadata_item,
modseq,
} => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
// Filter on mail meta // Filter on mail meta
Before(search_naive) => match mail_view.stored_naive_date() { Before(search_naive) => match mail_view.stored_naive_date() {
@ -331,7 +340,7 @@ fn approx_sequence_set_size(seq_set: &SequenceSet) -> u64 {
} }
// This is wrong as sequence UID can have holes, // This is wrong as sequence UID can have holes,
// as we don't know the number of messages in the mailbox also // as we don't know the number of messages in the mailbox also
// we gave to guess // we gave to guess
fn approx_sequence_size(seq: &Sequence) -> u64 { fn approx_sequence_size(seq: &Sequence) -> u64 {
match seq { match seq {
@ -473,9 +482,13 @@ fn is_keep_seq(sk: &SearchKey, midx: &MailIndex) -> bool {
} }
} }
fn is_keep_modseq(filter: &Option<MetadataItemSearch>, modseq: &NonZeroU64, midx: &MailIndex) -> bool { fn is_keep_modseq(
filter: &Option<MetadataItemSearch>,
modseq: &NonZeroU64,
midx: &MailIndex,
) -> bool {
if filter.is_some() { if filter.is_some() {
tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet"); tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet");
} }
modseq <= &midx.modseq modseq <= &midx.modseq
} }

View file

@ -1,8 +1,10 @@
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anonymous, authenticated, examined, selected}; use crate::imap::command::{anonymous, authenticated, selected};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::response::Response; use crate::imap::request::Request;
use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider; use crate::login::ArcLoginProvider;
use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::Command; use imap_codec::imap_types::command::Command;
//----- //-----
@ -23,7 +25,45 @@ impl Instance {
} }
} }
pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> { pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
match req {
Request::Idle => self.idle().await,
Request::ImapCommand(cmd) => self.command(cmd).await,
}
}
pub async fn idle(&mut self) -> ResponseOrIdle {
match self.idle_happy().await {
Ok(r) => r,
Err(e) => {
tracing::error!(err=?e, "something bad happened in idle");
ResponseOrIdle::Response(Response::bye().unwrap())
}
}
}
pub async fn idle_happy(&mut self) -> Result<ResponseOrIdle> {
let (mbx, tag, stop) = match &mut self.state {
flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()),
_ => bail!("Invalid session state, can't idle"),
};
tokio::select! {
_ = stop.notified() => {
self.state.apply(flow::Transition::UnIdle)?;
return Ok(ResponseOrIdle::Response(Response::build()
.tag(tag.clone())
.message("IDLE completed")
.ok()?))
},
change = mbx.idle_sync() => {
tracing::debug!("idle event");
return Ok(ResponseOrIdle::IdleEvent(change?));
}
}
}
pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle {
// Command behavior is modulated by the state. // Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths. // To prevent state error, we handle the same command in separate code paths.
let (resp, tr) = match &mut self.state { let (resp, tr) = match &mut self.state {
@ -44,26 +84,18 @@ impl Instance {
}; };
authenticated::dispatch(ctx).await authenticated::dispatch(ctx).await
} }
flow::State::Selected(ref user, ref mut mailbox) => { flow::State::Selected(ref user, ref mut mailbox, ref perm) => {
let ctx = selected::SelectedContext { let ctx = selected::SelectedContext {
req: &cmd, req: &cmd,
server_capabilities: &self.server_capabilities, server_capabilities: &self.server_capabilities,
client_capabilities: &mut self.client_capabilities, client_capabilities: &mut self.client_capabilities,
user, user,
mailbox, mailbox,
perm,
}; };
selected::dispatch(ctx).await selected::dispatch(ctx).await
} }
flow::State::Examined(ref user, ref mut mailbox) => { flow::State::Idle(..) => Err(anyhow!("can not receive command while idling")),
let ctx = examined::ExaminedContext {
req: &cmd,
server_capabilities: &self.server_capabilities,
client_capabilities: &mut self.client_capabilities,
user,
mailbox,
};
examined::dispatch(ctx).await
}
flow::State::Logout => Response::build() flow::State::Logout => Response::build()
.tag(cmd.tag.clone()) .tag(cmd.tag.clone())
.message("No commands are allowed in the LOGOUT state.") .message("No commands are allowed in the LOGOUT state.")
@ -88,15 +120,18 @@ impl Instance {
e, e,
cmd cmd
); );
return Response::build() return ResponseOrIdle::Response(Response::build()
.to_req(&cmd) .to_req(&cmd)
.message( .message(
"Internal error, processing command triggered an illegal IMAP state transition", "Internal error, processing command triggered an illegal IMAP state transition",
) )
.bad() .bad()
.unwrap(); .unwrap());
} }
resp match &self.state {
flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()),
_ => ResponseOrIdle::Response(resp),
}
} }
} }

View file

@ -67,6 +67,11 @@ impl Mailbox {
self.mbox.write().await.opportunistic_sync().await self.mbox.write().await.opportunistic_sync().await
} }
/// Block until a sync has been done (due to changes in the event log)
pub async fn notify(&self) -> std::sync::Weak<tokio::sync::Notify> {
self.mbox.read().await.notifier()
}
// ---- Functions for reading the mailbox ---- // ---- Functions for reading the mailbox ----
/// Get a clone of the current UID Index of this mailbox /// Get a clone of the current UID Index of this mailbox
@ -199,6 +204,10 @@ impl MailboxInternal {
Ok(()) Ok(())
} }
fn notifier(&self) -> std::sync::Weak<tokio::sync::Notify> {
self.uid_index.notifier()
}
// ---- Functions for reading the mailbox ---- // ---- Functions for reading the mailbox ----
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> { async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {

View file

@ -140,8 +140,7 @@ impl BayouState for UidIndex {
let bump_uid = new.internalseq.get() - uid.get(); let bump_uid = new.internalseq.get() - uid.get();
let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap();
.unwrap();
} }
// Assign the real uid of the email // Assign the real uid of the email
@ -179,10 +178,10 @@ impl BayouState for UidIndex {
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required // Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq { if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; let bump_modseq =
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
.unwrap();
} }
// Add flags to the source of trust and the cache // Add flags to the source of trust and the cache
@ -205,10 +204,10 @@ impl BayouState for UidIndex {
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required // Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq { if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; let bump_modseq =
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
.unwrap();
} }
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
@ -228,10 +227,10 @@ impl BayouState for UidIndex {
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required // Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq { if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; let bump_modseq =
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
.unwrap();
} }
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
@ -248,7 +247,7 @@ impl BayouState for UidIndex {
existing_flags.append(&mut to_add); existing_flags.append(&mut to_add);
new.idx_by_flag.remove(*uid, &rm_flags); new.idx_by_flag.remove(*uid, &rm_flags);
new.idx_by_flag.insert(*uid, &to_add); new.idx_by_flag.insert(*uid, &to_add);
// Register that email has been modified // Register that email has been modified
new.idx_by_modseq.insert(new.internalmodseq, *ident); new.idx_by_modseq.insert(new.internalmodseq, *ident);
*email_modseq = new.internalmodseq; *email_modseq = new.internalmodseq;
@ -448,7 +447,12 @@ mod tests {
{ {
let m = UniqueIdent([0x03; 24]); let m = UniqueIdent([0x03; 24]);
let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU64::new(1).unwrap(), f); let ev = UidIndexOp::MailAdd(
m,
NonZeroU32::new(1).unwrap(),
NonZeroU64::new(1).unwrap(),
f,
);
state = state.apply(&ev); state = state.apply(&ev);
} }

View file

@ -1,8 +1,8 @@
use anyhow::Context; use anyhow::Context;
mod common; mod common;
use crate::common::fragments::*;
use crate::common::constants::*; use crate::common::constants::*;
use crate::common::fragments::*;
fn main() { fn main() {
rfc3501_imap4rev1_base(); rfc3501_imap4rev1_base();
@ -11,6 +11,7 @@ fn main() {
rfc6851_imapext_move(); rfc6851_imapext_move();
rfc7888_imapext_literal(); rfc7888_imapext_literal();
rfc4551_imapext_condstore(); rfc4551_imapext_condstore();
rfc2177_imapext_idle();
println!("✅ SUCCESS 🌟🚀🥳🙏🥹"); println!("✅ SUCCESS 🌟🚀🥳🙏🥹");
} }
@ -21,29 +22,40 @@ fn rfc3501_imap4rev1_base() {
capability(imap_socket, Extension::None).context("check server capabilities")?; capability(imap_socket, Extension::None).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
// UNSUBSCRIBE IS NOT IMPLEMENTED YET let select_res =
//unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?; select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS")); assert!(select_res.contains("* 0 EXISTS"));
check(imap_socket).context("check must run")?; check(imap_socket).context("check must run")?;
status(imap_socket, Mailbox::Archive, StatusKind::UidNext).context("status of archive from inbox")?; status(imap_socket, Mailbox::Archive, StatusKind::UidNext)
.context("status of archive from inbox")?;
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None) let srv_msg = fetch(
.context("fetch rfc822 message, should be our first message")?; imap_socket,
Selection::FirstId,
FetchKind::Rfc822,
FetchMod::None,
)
.context("fetch rfc822 message, should be our first message")?;
let orig_email = std::str::from_utf8(EMAIL1)?; let orig_email = std::str::from_utf8(EMAIL1)?;
assert!(srv_msg.contains(orig_email)); assert!(srv_msg.contains(orig_email));
copy(imap_socket, Selection::FirstId, Mailbox::Archive) copy(imap_socket, Selection::FirstId, Mailbox::Archive)
.context("copy message to the archive mailbox")?; .context("copy message to the archive mailbox")?;
append_email(imap_socket, Email::Basic).context("insert email in INBOX")?; append_email(imap_socket, Email::Basic).context("insert email in INBOX")?;
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?; noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something"); search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something");
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None) store(
.context("should add delete flag to the email")?; imap_socket,
Selection::FirstId,
Flag::Deleted,
StoreAction::AddFlags,
StoreMod::None,
)
.context("should add delete flag to the email")?;
expunge(imap_socket).context("expunge emails")?; expunge(imap_socket).context("expunge emails")?;
rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts) rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts)
.context("Archive mailbox is renamed Drafts")?; .context("Archive mailbox is renamed Drafts")?;
@ -63,19 +75,32 @@ fn rfc3691_imapext_unselect() {
capability(imap_socket, Extension::Unselect).context("check server capabilities")?; capability(imap_socket, Extension::Unselect).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS")); assert!(select_res.contains("* 0 EXISTS"));
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None) store(
.context("add delete flags to the email")?; imap_socket,
Selection::FirstId,
Flag::Deleted,
StoreAction::AddFlags,
StoreMod::None,
)
.context("add delete flags to the email")?;
unselect(imap_socket) unselect(imap_socket)
.context("unselect inbox while preserving email with the \\Delete flag")?; .context("unselect inbox while preserving email with the \\Delete flag")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?;
assert!(select_res.contains("* 1 EXISTS")); assert!(select_res.contains("* 1 EXISTS"));
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None) let srv_msg = fetch(
.context("message is still present")?; imap_socket,
Selection::FirstId,
FetchKind::Rfc822,
FetchMod::None,
)
.context("message is still present")?;
let orig_email = std::str::from_utf8(EMAIL2)?; let orig_email = std::str::from_utf8(EMAIL2)?;
assert!(srv_msg.contains(orig_email)); assert!(srv_msg.contains(orig_email));
@ -111,7 +136,8 @@ fn rfc6851_imapext_move() {
capability(imap_socket, Extension::Move).context("check server capabilities")?; capability(imap_socket, Extension::Move).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS")); assert!(select_res.contains("* 0 EXISTS"));
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
@ -123,15 +149,17 @@ fn rfc6851_imapext_move() {
unselect(imap_socket) unselect(imap_socket)
.context("unselect inbox while preserving email with the \\Delete flag")?; .context("unselect inbox while preserving email with the \\Delete flag")?;
let select_res = select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?; let select_res =
select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?;
assert!(select_res.contains("* 1 EXISTS")); assert!(select_res.contains("* 1 EXISTS"));
let srv_msg = fetch( let srv_msg = fetch(
imap_socket, imap_socket,
Selection::FirstId, Selection::FirstId,
FetchKind::Rfc822, FetchKind::Rfc822,
FetchMod::None, FetchMod::None,
).context("check mail exists")?; )
.context("check mail exists")?;
let orig_email = std::str::from_utf8(EMAIL2)?; let orig_email = std::str::from_utf8(EMAIL2)?;
assert!(srv_msg.contains(orig_email)); assert!(srv_msg.contains(orig_email));
@ -166,7 +194,8 @@ fn rfc4551_imapext_condstore() {
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
// RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE // RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?;
// RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE // RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE
assert!(select_res.contains("[HIGHESTMODSEQ 1]")); assert!(select_res.contains("[HIGHESTMODSEQ 1]"));
@ -175,14 +204,25 @@ fn rfc4551_imapext_condstore() {
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?; noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
let store_res = store(imap_socket, Selection::All, Flag::Important, StoreAction::AddFlags, StoreMod::UnchangedSince(1))?; let store_res = store(
imap_socket,
Selection::All,
Flag::Important,
StoreAction::AddFlags,
StoreMod::UnchangedSince(1),
)?;
assert!(store_res.contains("[MODIFIED 2]")); assert!(store_res.contains("[MODIFIED 2]"));
assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))")); assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))"));
assert!(!store_res.contains("* 2 FETCH")); assert!(!store_res.contains("* 2 FETCH"));
assert_eq!(store_res.lines().count(), 2); assert_eq!(store_res.lines().count(), 2);
// RFC 3.1.4. FETCH and UID FETCH Commands // RFC 3.1.4. FETCH and UID FETCH Commands
let fetch_res = fetch(imap_socket, Selection::All, FetchKind::Rfc822Size, FetchMod::ChangedSince(2))?; let fetch_res = fetch(
imap_socket,
Selection::All,
FetchKind::Rfc822Size,
FetchMod::ChangedSince(2),
)?;
assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))")); assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))"));
assert!(!fetch_res.contains("* 2 FETCH")); assert!(!fetch_res.contains("* 2 FETCH"));
assert_eq!(store_res.lines().count(), 2); assert_eq!(store_res.lines().count(), 2);
@ -200,3 +240,25 @@ fn rfc4551_imapext_condstore() {
}) })
.expect("test fully run"); .expect("test fully run");
} }
fn rfc2177_imapext_idle() {
println!("🧪 rfc2177_imapext_idle");
common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| {
// Test setup
connect(imap_socket).context("server says hello")?;
capability(imap_socket, Extension::Idle).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?;
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
// Check that new messages from LMTP are correctly detected during idling
start_idle(imap_socket).context("can't start idling")?;
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
let srv_msg = stop_idle(imap_socket).context("stop idling")?;
assert!(srv_msg.contains("* 1 EXISTS"));
Ok(())
})
.expect("test fully run");
}

View file

@ -36,6 +36,7 @@ pub enum Extension {
Move, Move,
Condstore, Condstore,
LiteralPlus, LiteralPlus,
Idle,
} }
pub enum Enable { pub enum Enable {
@ -114,6 +115,7 @@ pub fn capability(imap: &mut TcpStream, ext: Extension) -> Result<()> {
Extension::Move => Some("MOVE"), Extension::Move => Some("MOVE"),
Extension::Condstore => Some("CONDSTORE"), Extension::Condstore => Some("CONDSTORE"),
Extension::LiteralPlus => Some("LITERAL+"), Extension::LiteralPlus => Some("LITERAL+"),
Extension::Idle => Some("IDLE"),
}; };
let mut buffer: [u8; 6000] = [0; 6000]; let mut buffer: [u8; 6000] = [0; 6000];
@ -286,7 +288,12 @@ pub fn noop_exists(imap: &mut TcpStream, must_exists: u32) -> Result<()> {
} }
} }
pub fn fetch(imap: &mut TcpStream, selection: Selection, kind: FetchKind, modifier: FetchMod) -> Result<String> { pub fn fetch(
imap: &mut TcpStream,
selection: Selection,
kind: FetchKind,
modifier: FetchMod,
) -> Result<String> {
let mut buffer: [u8; 65535] = [0; 65535]; let mut buffer: [u8; 65535] = [0; 65535];
let sel_str = match selection { let sel_str = match selection {
@ -363,11 +370,11 @@ pub fn search(imap: &mut TcpStream, sk: SearchKind) -> Result<String> {
} }
pub fn store( pub fn store(
imap: &mut TcpStream, imap: &mut TcpStream,
sel: Selection, sel: Selection,
flag: Flag, flag: Flag,
action: StoreAction, action: StoreAction,
modifier: StoreMod modifier: StoreMod,
) -> Result<String> { ) -> Result<String> {
let mut buffer: [u8; 6000] = [0; 6000]; let mut buffer: [u8; 6000] = [0; 6000];
@ -491,6 +498,22 @@ pub fn enable(imap: &mut TcpStream, ask: Enable, done: Option<Enable>) -> Result
Ok(()) Ok(())
} }
pub fn start_idle(imap: &mut TcpStream) -> Result<()> {
let mut buffer: [u8; 1500] = [0; 1500];
imap.write(&b"98 IDLE\r\n"[..])?;
let read = read_lines(imap, &mut buffer, None)?;
assert_eq!(read[0], b'+');
Ok(())
}
pub fn stop_idle(imap: &mut TcpStream) -> Result<String> {
let mut buffer: [u8; 16536] = [0; 16536];
imap.write(&b"DONE\r\n"[..])?;
let read = read_lines(imap, &mut buffer, Some(&b"98 OK"[..]))?;
let srv_msg = std::str::from_utf8(read)?;
Ok(srv_msg.to_string())
}
pub fn logout(imap: &mut TcpStream) -> Result<()> { pub fn logout(imap: &mut TcpStream) -> Result<()> {
imap.write(&b"99 logout\r\n"[..])?; imap.write(&b"99 logout\r\n"[..])?;
let mut buffer: [u8; 1500] = [0; 1500]; let mut buffer: [u8; 1500] = [0; 1500];