reformat code

This commit is contained in:
Quentin 2024-01-18 18:03:21 +01:00
parent 43b668531f
commit 2c5adc8f16
Signed by: quentin
GPG key ID: E9602264D639FF68
14 changed files with 295 additions and 180 deletions

View file

@ -442,7 +442,12 @@ impl K2vWatch {
let propagate_local_update = Notify::new(); let propagate_local_update = Notify::new();
let learnt_remote_update = Arc::new(Notify::new()); let learnt_remote_update = Arc::new(Notify::new());
let watch = Arc::new(K2vWatch { target, rx, propagate_local_update, learnt_remote_update }); let watch = Arc::new(K2vWatch {
target,
rx,
propagate_local_update,
learnt_remote_update,
});
tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx)); tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
@ -462,7 +467,8 @@ impl K2vWatch {
while let Some(this) = Weak::upgrade(&self_weak) { while let Some(this) = Weak::upgrade(&self_weak) {
tracing::debug!( tracing::debug!(
"bayou k2v watch bg loop iter ({}, {})", "bayou k2v watch bg loop iter ({}, {})",
this.target.uid.shard, this.target.uid.sort this.target.uid.shard,
this.target.uid.sort
); );
tokio::select!( tokio::select!(
// Needed to exit: will force a loop iteration every minutes, // Needed to exit: will force a loop iteration every minutes,

View file

@ -1,5 +1,5 @@
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
use imap_codec::imap_types::command::FetchModifier; use imap_codec::imap_types::command::FetchModifier;
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
/// Internal decisions based on fetched attributes /// Internal decisions based on fetched attributes
/// passed by the client /// passed by the client
@ -8,7 +8,11 @@ pub struct AttributesProxy {
pub attrs: Vec<MessageDataItemName<'static>>, pub attrs: Vec<MessageDataItemName<'static>>,
} }
impl AttributesProxy { impl AttributesProxy {
pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self { pub fn new(
attrs: &MacroOrMessageDataItemNames<'static>,
modifiers: &[FetchModifier],
is_uid_fetch: bool,
) -> Self {
// Expand macros // Expand macros
let mut fetch_attrs = match attrs { let mut fetch_attrs = match attrs {
MacroOrMessageDataItemNames::Macro(m) => { MacroOrMessageDataItemNames::Macro(m) => {
@ -44,14 +48,13 @@ impl AttributesProxy {
} }
pub fn is_enabling_condstore(&self) -> bool { pub fn is_enabling_condstore(&self) -> bool {
self.attrs.iter().any(|x| { self.attrs
matches!(x, MessageDataItemName::ModSeq) .iter()
}) .any(|x| matches!(x, MessageDataItemName::ModSeq))
} }
pub fn need_body(&self) -> bool { pub fn need_body(&self) -> bool {
self.attrs.iter().any(|x| { self.attrs.iter().any(|x| match x {
match x {
MessageDataItemName::Body MessageDataItemName::Body
| MessageDataItemName::Rfc822 | MessageDataItemName::Rfc822
| MessageDataItemName::Rfc822Text | MessageDataItemName::Rfc822Text
@ -69,7 +72,6 @@ impl AttributesProxy {
}, },
MessageDataItemName::BodyExt { .. } => true, MessageDataItemName::BodyExt { .. } => true,
_ => false, _ => false,
}
}) })
} }
} }

View file

@ -1,4 +1,4 @@
use imap_codec::imap_types::command::{FetchModifier, StoreModifier, SelectExamineModifier}; use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier};
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind}; use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
use imap_codec::imap_types::response::Capability; use imap_codec::imap_types::response::Capability;
@ -72,7 +72,6 @@ impl ClientStatus {
} }
} }
pub struct ClientCapability { pub struct ClientCapability {
pub condstore: ClientStatus, pub condstore: ClientStatus,
pub utf8kind: Option<Utf8Kind>, pub utf8kind: Option<Utf8Kind>,
@ -100,13 +99,19 @@ impl ClientCapability {
} }
pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) { pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) {
if mods.iter().any(|x| matches!(x, FetchModifier::ChangedSince(..))) { if mods
.iter()
.any(|x| matches!(x, FetchModifier::ChangedSince(..)))
{
self.enable_condstore() self.enable_condstore()
} }
} }
pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) { pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) {
if mods.iter().any(|x| matches!(x, StoreModifier::UnchangedSince(..))) { if mods
.iter()
.any(|x| matches!(x, StoreModifier::UnchangedSince(..)))
{
self.enable_condstore() self.enable_condstore()
} }
} }

View file

@ -1,5 +1,5 @@
use std::sync::Arc;
use std::num::NonZeroU64; use std::num::NonZeroU64;
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier}; use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
@ -11,12 +11,12 @@ use imap_codec::imap_types::response::{Code, CodeOther};
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated, MailboxName}; use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters}; use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response; use crate::imap::response::Response;
use crate::imap::attributes::AttributesProxy;
use crate::mail::user::User; use crate::mail::user::User;
pub struct SelectedContext<'a> { pub struct SelectedContext<'a> {
@ -50,7 +50,10 @@ pub async fn dispatch<'a>(
macro_or_item_names, macro_or_item_names,
modifiers, modifiers,
uid, uid,
} => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await, } => {
ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid)
.await
}
CommandBody::Search { CommandBody::Search {
charset, charset,
criteria, criteria,
@ -64,7 +67,10 @@ pub async fn dispatch<'a>(
flags, flags,
modifiers, modifiers,
uid, uid,
} => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await, } => {
ctx.store(sequence_set, kind, response, flags, modifiers, uid)
.await
}
CommandBody::Copy { CommandBody::Copy {
sequence_set, sequence_set,
mailbox, mailbox,
@ -80,12 +86,13 @@ pub async fn dispatch<'a>(
CommandBody::Unselect => ctx.unselect().await, CommandBody::Unselect => ctx.unselect().await,
// IDLE extension (rfc2177) // IDLE extension (rfc2177)
CommandBody::Idle => { CommandBody::Idle => Ok((
Ok(( Response::build()
Response::build().to_req(ctx.req).message("DUMMY command due to anti-pattern in the code").ok()?, .to_req(ctx.req)
.message("DUMMY command due to anti-pattern in the code")
.ok()?,
flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()), flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()),
)) )),
}
// In selected mode, we fallback to authenticated when needed // In selected mode, we fallback to authenticated when needed
_ => { _ => {
@ -148,10 +155,14 @@ impl<'a> SelectedContext<'a> {
modifiers.iter().for_each(|m| match m { modifiers.iter().for_each(|m| match m {
FetchModifier::ChangedSince(val) => { FetchModifier::ChangedSince(val) => {
changed_since = Some(*val); changed_since = Some(*val);
}, }
}); });
match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await { match self
.mailbox
.fetch(sequence_set, &ap, changed_since, uid)
.await
{
Ok(resp) => { Ok(resp) => {
// Capabilities enabling logic only on successful command // Capabilities enabling logic only on successful command
// (according to my understanding of the spec) // (according to my understanding of the spec)
@ -167,7 +178,7 @@ impl<'a> SelectedContext<'a> {
.ok()?, .ok()?,
flow::Transition::None, flow::Transition::None,
)) ))
}, }
Err(e) => Ok(( Err(e) => Ok((
Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
@ -214,7 +225,7 @@ impl<'a> SelectedContext<'a> {
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> { async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
if let Some(failed) = self.fail_read_only() { if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None)) return Ok((failed, flow::Transition::None));
} }
let tag = self.req.tag.clone(); let tag = self.req.tag.clone();
@ -240,14 +251,14 @@ impl<'a> SelectedContext<'a> {
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
if let Some(failed) = self.fail_read_only() { if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None)) return Ok((failed, flow::Transition::None));
} }
let mut unchanged_since: Option<NonZeroU64> = None; let mut unchanged_since: Option<NonZeroU64> = None;
modifiers.iter().for_each(|m| match m { modifiers.iter().for_each(|m| match m {
StoreModifier::UnchangedSince(val) => { StoreModifier::UnchangedSince(val) => {
unchanged_since = Some(*val); unchanged_since = Some(*val);
}, }
}); });
let (data, modified) = self let (data, modified) = self
@ -260,21 +271,26 @@ impl<'a> SelectedContext<'a> {
.message("STORE completed") .message("STORE completed")
.set_body(data); .set_body(data);
match modified[..] { match modified[..] {
[] => (), [] => (),
[_head, ..] => { [_head, ..] => {
let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::<Vec<_>>().join(",")); let modified_str = format!(
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes()))); "MODIFIED {}",
}, modified
.into_iter()
.map(|x| x.to_string())
.collect::<Vec<_>>()
.join(",")
);
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(
modified_str.into_bytes(),
)));
}
}; };
self.client_capabilities.store_modifiers_enable(modifiers); self.client_capabilities.store_modifiers_enable(modifiers);
Ok((ok_resp.ok()?, Ok((ok_resp.ok()?, flow::Transition::None))
flow::Transition::None,
))
} }
async fn copy( async fn copy(
@ -285,7 +301,7 @@ impl<'a> SelectedContext<'a> {
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
//@FIXME Could copy be valid in EXAMINE mode? //@FIXME Could copy be valid in EXAMINE mode?
if let Some(failed) = self.fail_read_only() { if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None)) return Ok((failed, flow::Transition::None));
} }
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
@ -341,7 +357,7 @@ impl<'a> SelectedContext<'a> {
uid: &bool, uid: &bool,
) -> Result<(Response<'static>, flow::Transition)> { ) -> Result<(Response<'static>, flow::Transition)> {
if let Some(failed) = self.fail_read_only() { if let Some(failed) = self.fail_read_only() {
return Ok((failed, flow::Transition::None)) return Ok((failed, flow::Transition::None));
} }
let name: &str = MailboxName(mailbox).try_into()?; let name: &str = MailboxName(mailbox).try_into()?;
@ -395,12 +411,13 @@ impl<'a> SelectedContext<'a> {
fn fail_read_only(&self) -> Option<Response<'static>> { fn fail_read_only(&self) -> Option<Response<'static>> {
match self.perm { match self.perm {
flow::MailboxPerm::ReadWrite => None, flow::MailboxPerm::ReadWrite => None,
flow::MailboxPerm::ReadOnly => { flow::MailboxPerm::ReadOnly => Some(
Some(Response::build() Response::build()
.to_req(self.req) .to_req(self.req)
.message("Write command are forbidden while exmining mailbox") .message("Write command are forbidden while exmining mailbox")
.no().unwrap()) .no()
}, .unwrap(),
),
} }
} }
} }

View file

@ -3,9 +3,9 @@ use std::fmt;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Notify; use tokio::sync::Notify;
use imap_codec::imap_types::core::Tag;
use crate::imap::mailbox_view::MailboxView; use crate::imap::mailbox_view::MailboxView;
use crate::mail::user::User; use crate::mail::user::User;
use imap_codec::imap_types::core::Tag;
#[derive(Debug)] #[derive(Debug)]
pub enum Error { pub enum Error {
@ -22,7 +22,13 @@ pub enum State {
NotAuthenticated, NotAuthenticated,
Authenticated(Arc<User>), Authenticated(Arc<User>),
Selected(Arc<User>, MailboxView, MailboxPerm), Selected(Arc<User>, MailboxView, MailboxPerm),
Idle(Arc<User>, MailboxView, MailboxPerm, Tag<'static>, Arc<Notify>), Idle(
Arc<User>,
MailboxView,
MailboxPerm,
Tag<'static>,
Arc<Notify>,
),
Logout, Logout,
} }
impl fmt::Display for State { impl fmt::Display for State {
@ -77,23 +83,18 @@ impl State {
let new_state = match (std::mem::replace(self, State::Logout), tr) { let new_state = match (std::mem::replace(self, State::Logout), tr) {
(s, Transition::None) => s, (s, Transition::None) => s,
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u), (State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
( (State::Authenticated(u) | State::Selected(u, _, _), Transition::Select(m, p)) => {
State::Authenticated(u) | State::Selected(u, _, _), State::Selected(u, m, p)
Transition::Select(m, p),
) => State::Selected(u, m, p),
(State::Selected(u, _, _) , Transition::Unselect) => {
State::Authenticated(u.clone())
} }
(State::Selected(u, _, _), Transition::Unselect) => State::Authenticated(u.clone()),
(State::Selected(u, m, p), Transition::Idle(t, s)) => { (State::Selected(u, m, p), Transition::Idle(t, s)) => {
State::Idle(u, m, p, t, Arc::new(s)) State::Idle(u, m, p, t, Arc::new(s))
}, }
(State::Idle(u, m, p, _, _), Transition::UnIdle) => { (State::Idle(u, m, p, _, _), Transition::UnIdle) => State::Selected(u, m, p),
State::Selected(u, m, p)
},
(_, Transition::Logout) => State::Logout, (_, Transition::Logout) => State::Logout,
(s, t) => { (s, t) => {
tracing::error!(state=%s, transition=%t, "forbidden transition"); tracing::error!(state=%s, transition=%t, "forbidden transition");
return Err(Error::ForbiddenTransition) return Err(Error::ForbiddenTransition);
} }
}; };
*self = new_state; *self = new_state;

View file

@ -1,6 +1,6 @@
use std::collections::HashSet;
use std::num::{NonZeroU32, NonZeroU64}; use std::num::{NonZeroU32, NonZeroU64};
use std::sync::Arc; use std::sync::Arc;
use std::collections::HashSet;
use anyhow::{anyhow, Error, Result}; use anyhow::{anyhow, Error, Result};
@ -13,11 +13,11 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
use imap_codec::imap_types::search::SearchKey; use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet; use imap_codec::imap_types::sequence::SequenceSet;
use crate::mail::unique_ident::UniqueIdent;
use crate::mail::mailbox::Mailbox; use crate::mail::mailbox::Mailbox;
use crate::mail::query::QueryScope; use crate::mail::query::QueryScope;
use crate::mail::snapshot::FrozenMailbox; use crate::mail::snapshot::FrozenMailbox;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq}; use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
use crate::mail::unique_ident::UniqueIdent;
use crate::imap::attributes::AttributesProxy; use crate::imap::attributes::AttributesProxy;
use crate::imap::flags; use crate::imap::flags;
@ -130,11 +130,9 @@ impl MailboxView {
let new_mail = new_snapshot.table.get(uuid); let new_mail = new_snapshot.table.get(uuid);
if old_mail.is_some() && old_mail != new_mail { if old_mail.is_some() && old_mail != new_mail {
if let Some((uid, modseq, flags)) = new_mail { if let Some((uid, modseq, flags)) = new_mail {
let mut items = vec![ let mut items = vec![MessageDataItem::Flags(
MessageDataItem::Flags(
flags.iter().filter_map(|f| flags::from_str(f)).collect(), flags.iter().filter_map(|f| flags::from_str(f)).collect(),
), )];
];
if params.with_uid { if params.with_uid {
items.push(MessageDataItem::Uid(*uid)); items.push(MessageDataItem::Uid(*uid));
@ -188,8 +186,8 @@ impl MailboxView {
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>(); let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let idx = self.index()?; let idx = self.index()?;
let (editable, in_conflict) = idx let (editable, in_conflict) =
.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?; idx.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
for mi in editable.iter() { for mi in editable.iter() {
match kind { match kind {
@ -215,17 +213,26 @@ impl MailboxView {
_ => in_conflict.into_iter().map(|midx| midx.i).collect(), _ => in_conflict.into_iter().map(|midx| midx.i).collect(),
}; };
let summary = self.update(UpdateParameters { let summary = self
.update(UpdateParameters {
with_uid: *is_uid_store, with_uid: *is_uid_store,
with_modseq: unchanged_since.is_some(), with_modseq: unchanged_since.is_some(),
silence, silence,
}).await?; })
.await?;
Ok((summary, conflict_id_or_uid)) Ok((summary, conflict_id_or_uid))
} }
pub async fn idle_sync(&mut self) -> Result<Vec<Body<'static>>> { pub async fn idle_sync(&mut self) -> Result<Vec<Body<'static>>> {
self.internal.mailbox.notify().await.upgrade().ok_or(anyhow!("test"))?.notified().await; self.internal
.mailbox
.notify()
.await
.upgrade()
.ok_or(anyhow!("test"))?
.notified()
.await;
self.internal.mailbox.opportunistic_sync().await?; self.internal.mailbox.opportunistic_sync().await?;
self.update(UpdateParameters::default()).await self.update(UpdateParameters::default()).await
} }
@ -300,10 +307,12 @@ impl MailboxView {
ret.push((mi.uid, dest_uid)); ret.push((mi.uid, dest_uid));
} }
let update = self.update(UpdateParameters { let update = self
.update(UpdateParameters {
with_uid: *is_uid_copy, with_uid: *is_uid_copy,
..UpdateParameters::default() ..UpdateParameters::default()
}).await?; })
.await?;
Ok((to_state.uidvalidity, ret, update)) Ok((to_state.uidvalidity, ret, update))
} }
@ -327,11 +336,7 @@ impl MailboxView {
}; };
tracing::debug!("Query scope {:?}", query_scope); tracing::debug!("Query scope {:?}", query_scope);
let idx = self.index()?; let idx = self.index()?;
let mail_idx_list = idx.fetch_changed_since( let mail_idx_list = idx.fetch_changed_since(sequence_set, changed_since, *is_uid_fetch)?;
sequence_set,
changed_since,
*is_uid_fetch
)?;
// [2/6] Fetch the emails // [2/6] Fetch the emails
let uuids = mail_idx_list let uuids = mail_idx_list
@ -420,12 +425,19 @@ impl MailboxView {
let maybe_modseq = match is_modseq { let maybe_modseq = match is_modseq {
true => { true => {
let final_selection = kept_idx.iter().chain(kept_query.iter()); let final_selection = kept_idx.iter().chain(kept_query.iter());
final_selection.map(|in_idx| in_idx.modseq).max().map(|r| NonZeroU64::try_from(r)).transpose()? final_selection
}, .map(|in_idx| in_idx.modseq)
.max()
.map(|r| NonZeroU64::try_from(r))
.transpose()?
}
_ => None, _ => None,
}; };
Ok((vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], is_modseq)) Ok((
vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
is_modseq,
))
} }
// ---- // ----
@ -470,7 +482,9 @@ impl MailboxView {
pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> { pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> {
Ok(Body::Status(Status::ok( Ok(Body::Status(Status::ok(
None, None,
Some(Code::Other(CodeOther::unvalidated(format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes()))), Some(Code::Other(CodeOther::unvalidated(
format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes(),
))),
"Highest", "Highest",
)?)) )?))
} }

View file

@ -15,23 +15,23 @@ mod session;
use std::net::SocketAddr; use std::net::SocketAddr;
use anyhow::{Result, bail}; use anyhow::{bail, Result};
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener; use tokio::net::TcpListener;
use tokio::sync::watch;
use tokio::sync::mpsc; use tokio::sync::mpsc;
use tokio::sync::watch;
use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
use imap_codec::imap_types::{core::Text, response::Greeting}; use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions}; use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
use imap_flow::stream::AnyStream; use imap_flow::stream::AnyStream;
use imap_codec::imap_types::response::{Code, Response, CommandContinuationRequest, Status};
use crate::imap::response::{Body, ResponseOrIdle};
use crate::imap::session::Instance;
use crate::imap::request::Request;
use crate::config::ImapConfig; use crate::config::ImapConfig;
use crate::imap::capability::ServerCapability; use crate::imap::capability::ServerCapability;
use crate::imap::request::Request;
use crate::imap::response::{Body, ResponseOrIdle};
use crate::imap::session::Instance;
use crate::login::ArcLoginProvider; use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL /// Server is a thin wrapper to register our Services in BàL
@ -97,10 +97,10 @@ impl Server {
} }
} }
use tokio::sync::mpsc::*;
use tokio_util::bytes::BytesMut;
use tokio::sync::Notify;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::mpsc::*;
use tokio::sync::Notify;
use tokio_util::bytes::BytesMut;
enum LoopMode { enum LoopMode {
Quit, Quit,
Interactive, Interactive,
@ -123,10 +123,10 @@ impl NetLoop {
Ok(nl) => { Ok(nl) => {
tracing::debug!(addr=?addr, "netloop successfully initialized"); tracing::debug!(addr=?addr, "netloop successfully initialized");
nl nl
}, }
Err(e) => { Err(e) => {
tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session"); tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
return return;
} }
}; };
@ -164,11 +164,20 @@ impl NetLoop {
tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx)); tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
// Return the object // Return the object
Ok(NetLoop { ctx, server, cmd_tx, resp_rx }) Ok(NetLoop {
ctx,
server,
cmd_tx,
resp_rx,
})
} }
/// Coms with the background session /// Coms with the background session
async fn session(ctx: ClientContext, mut cmd_rx: Receiver<Request>, resp_tx: UnboundedSender<ResponseOrIdle>) -> () { async fn session(
ctx: ClientContext,
mut cmd_rx: Receiver<Request>,
resp_tx: UnboundedSender<ResponseOrIdle>,
) -> () {
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities); let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
loop { loop {
let cmd = match cmd_rx.recv().await { let cmd = match cmd_rx.recv().await {
@ -200,7 +209,6 @@ impl NetLoop {
Ok(()) Ok(())
} }
async fn interactive_mode(&mut self) -> Result<LoopMode> { async fn interactive_mode(&mut self) -> Result<LoopMode> {
tokio::select! { tokio::select! {
// Managing imap_flow stuff // Managing imap_flow stuff

View file

@ -1,9 +1,9 @@
use std::sync::Arc;
use anyhow::Result; use anyhow::Result;
use tokio::sync::Notify;
use imap_codec::imap_types::command::Command; use imap_codec::imap_types::command::Command;
use imap_codec::imap_types::core::Tag; use imap_codec::imap_types::core::Tag;
use imap_codec::imap_types::response::{Code, Data, Status}; use imap_codec::imap_types::response::{Code, Data, Status};
use std::sync::Arc;
use tokio::sync::Notify;
#[derive(Debug)] #[derive(Debug)]
pub enum Body<'a> { pub enum Body<'a> {

View file

@ -2,7 +2,7 @@ use std::num::{NonZeroU32, NonZeroU64};
use anyhow::Result; use anyhow::Result;
use imap_codec::imap_types::core::NonEmptyVec; use imap_codec::imap_types::core::NonEmptyVec;
use imap_codec::imap_types::search::{SearchKey, MetadataItemSearch}; use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet}; use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::imap::index::MailIndex; use crate::imap::index::MailIndex;
@ -115,7 +115,10 @@ impl<'a> Criteria<'a> {
pub fn is_modseq(&self) -> bool { pub fn is_modseq(&self) -> bool {
use SearchKey::*; use SearchKey::*;
match self.0 { match self.0 {
And(and_list) => and_list.as_ref().iter().any(|child| Criteria(child).is_modseq()), And(and_list) => and_list
.as_ref()
.iter()
.any(|child| Criteria(child).is_modseq()),
Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(), Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(),
Not(child) => Criteria(child).is_modseq(), Not(child) => Criteria(child).is_modseq(),
ModSeq { .. } => true, ModSeq { .. } => true,
@ -187,7 +190,10 @@ impl<'a> Criteria<'a> {
// Sequence logic // Sequence logic
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(), maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(),
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, midx).into(), ModSeq {
metadata_item,
modseq,
} => is_keep_modseq(metadata_item, modseq, midx).into(),
// All the stuff we can't evaluate yet // All the stuff we can't evaluate yet
Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_) Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_)
@ -225,7 +231,10 @@ impl<'a> Criteria<'a> {
//@FIXME Reevaluating our previous logic... //@FIXME Reevaluating our previous logic...
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx), maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx),
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx), maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx),
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(), ModSeq {
metadata_item,
modseq,
} => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
// Filter on mail meta // Filter on mail meta
Before(search_naive) => match mail_view.stored_naive_date() { Before(search_naive) => match mail_view.stored_naive_date() {
@ -473,7 +482,11 @@ fn is_keep_seq(sk: &SearchKey, midx: &MailIndex) -> bool {
} }
} }
fn is_keep_modseq(filter: &Option<MetadataItemSearch>, modseq: &NonZeroU64, midx: &MailIndex) -> bool { fn is_keep_modseq(
filter: &Option<MetadataItemSearch>,
modseq: &NonZeroU64,
midx: &MailIndex,
) -> bool {
if filter.is_some() { if filter.is_some() {
tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet"); tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet");
} }

View file

@ -1,10 +1,10 @@
use anyhow::{Result, anyhow, bail};
use crate::imap::capability::{ClientCapability, ServerCapability}; use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anonymous, authenticated, selected}; use crate::imap::command::{anonymous, authenticated, selected};
use crate::imap::flow; use crate::imap::flow;
use crate::imap::request::Request; use crate::imap::request::Request;
use crate::imap::response::{Response, ResponseOrIdle}; use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider; use crate::login::ArcLoginProvider;
use anyhow::{anyhow, bail, Result};
use imap_codec::imap_types::command::Command; use imap_codec::imap_types::command::Command;
//----- //-----
@ -63,7 +63,6 @@ impl Instance {
} }
} }
pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle { pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle {
// Command behavior is modulated by the state. // Command behavior is modulated by the state.
// To prevent state error, we handle the same command in separate code paths. // To prevent state error, we handle the same command in separate code paths.

View file

@ -140,8 +140,7 @@ impl BayouState for UidIndex {
let bump_uid = new.internalseq.get() - uid.get(); let bump_uid = new.internalseq.get() - uid.get();
let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32; let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap();
.unwrap();
} }
// Assign the real uid of the email // Assign the real uid of the email
@ -179,10 +178,10 @@ impl BayouState for UidIndex {
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required // Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq { if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; let bump_modseq =
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
.unwrap();
} }
// Add flags to the source of trust and the cache // Add flags to the source of trust and the cache
@ -205,10 +204,10 @@ impl BayouState for UidIndex {
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required // Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq { if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; let bump_modseq =
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
.unwrap();
} }
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
@ -228,10 +227,10 @@ impl BayouState for UidIndex {
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) { if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
// Bump UIDValidity if required // Bump UIDValidity if required
if *candidate_modseq < new.internalmodseq { if *candidate_modseq < new.internalmodseq {
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32; let bump_modseq =
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
new.uidvalidity = new.uidvalidity =
NonZeroU32::new(new.uidvalidity.get() + bump_modseq) NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
.unwrap();
} }
// Remove flags from the source of trust and the cache // Remove flags from the source of trust and the cache
@ -448,7 +447,12 @@ mod tests {
{ {
let m = UniqueIdent([0x03; 24]); let m = UniqueIdent([0x03; 24]);
let f = vec!["\\Archive".to_string(), "\\Recent".to_string()]; let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU64::new(1).unwrap(), f); let ev = UidIndexOp::MailAdd(
m,
NonZeroU32::new(1).unwrap(),
NonZeroU64::new(1).unwrap(),
f,
);
state = state.apply(&ev); state = state.apply(&ev);
} }

View file

@ -1,8 +1,8 @@
use anyhow::Context; use anyhow::Context;
mod common; mod common;
use crate::common::fragments::*;
use crate::common::constants::*; use crate::common::constants::*;
use crate::common::fragments::*;
fn main() { fn main() {
rfc3501_imap4rev1_base(); rfc3501_imap4rev1_base();
@ -23,16 +23,23 @@ fn rfc3501_imap4rev1_base() {
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
// UNSUBSCRIBE IS NOT IMPLEMENTED YET // UNSUBSCRIBE IS NOT IMPLEMENTED YET
//unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?; //unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS")); assert!(select_res.contains("* 0 EXISTS"));
check(imap_socket).context("check must run")?; check(imap_socket).context("check must run")?;
status(imap_socket, Mailbox::Archive, StatusKind::UidNext).context("status of archive from inbox")?; status(imap_socket, Mailbox::Archive, StatusKind::UidNext)
.context("status of archive from inbox")?;
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None) let srv_msg = fetch(
imap_socket,
Selection::FirstId,
FetchKind::Rfc822,
FetchMod::None,
)
.context("fetch rfc822 message, should be our first message")?; .context("fetch rfc822 message, should be our first message")?;
let orig_email = std::str::from_utf8(EMAIL1)?; let orig_email = std::str::from_utf8(EMAIL1)?;
assert!(srv_msg.contains(orig_email)); assert!(srv_msg.contains(orig_email));
@ -42,7 +49,13 @@ fn rfc3501_imap4rev1_base() {
append_email(imap_socket, Email::Basic).context("insert email in INBOX")?; append_email(imap_socket, Email::Basic).context("insert email in INBOX")?;
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?; noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something"); search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something");
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None) store(
imap_socket,
Selection::FirstId,
Flag::Deleted,
StoreAction::AddFlags,
StoreMod::None,
)
.context("should add delete flag to the email")?; .context("should add delete flag to the email")?;
expunge(imap_socket).context("expunge emails")?; expunge(imap_socket).context("expunge emails")?;
rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts) rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts)
@ -63,18 +76,31 @@ fn rfc3691_imapext_unselect() {
capability(imap_socket, Extension::Unselect).context("check server capabilities")?; capability(imap_socket, Extension::Unselect).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS")); assert!(select_res.contains("* 0 EXISTS"));
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?; noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None) store(
imap_socket,
Selection::FirstId,
Flag::Deleted,
StoreAction::AddFlags,
StoreMod::None,
)
.context("add delete flags to the email")?; .context("add delete flags to the email")?;
unselect(imap_socket) unselect(imap_socket)
.context("unselect inbox while preserving email with the \\Delete flag")?; .context("unselect inbox while preserving email with the \\Delete flag")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?;
assert!(select_res.contains("* 1 EXISTS")); assert!(select_res.contains("* 1 EXISTS"));
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None) let srv_msg = fetch(
imap_socket,
Selection::FirstId,
FetchKind::Rfc822,
FetchMod::None,
)
.context("message is still present")?; .context("message is still present")?;
let orig_email = std::str::from_utf8(EMAIL2)?; let orig_email = std::str::from_utf8(EMAIL2)?;
assert!(srv_msg.contains(orig_email)); assert!(srv_msg.contains(orig_email));
@ -111,7 +137,8 @@ fn rfc6851_imapext_move() {
capability(imap_socket, Extension::Move).context("check server capabilities")?; capability(imap_socket, Extension::Move).context("check server capabilities")?;
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?; create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
assert!(select_res.contains("* 0 EXISTS")); assert!(select_res.contains("* 0 EXISTS"));
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?; lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
@ -123,7 +150,8 @@ fn rfc6851_imapext_move() {
unselect(imap_socket) unselect(imap_socket)
.context("unselect inbox while preserving email with the \\Delete flag")?; .context("unselect inbox while preserving email with the \\Delete flag")?;
let select_res = select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?; let select_res =
select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?;
assert!(select_res.contains("* 1 EXISTS")); assert!(select_res.contains("* 1 EXISTS"));
let srv_msg = fetch( let srv_msg = fetch(
@ -131,7 +159,8 @@ fn rfc6851_imapext_move() {
Selection::FirstId, Selection::FirstId,
FetchKind::Rfc822, FetchKind::Rfc822,
FetchMod::None, FetchMod::None,
).context("check mail exists")?; )
.context("check mail exists")?;
let orig_email = std::str::from_utf8(EMAIL2)?; let orig_email = std::str::from_utf8(EMAIL2)?;
assert!(srv_msg.contains(orig_email)); assert!(srv_msg.contains(orig_email));
@ -166,7 +195,8 @@ fn rfc4551_imapext_condstore() {
login(imap_socket, Account::Alice).context("login test")?; login(imap_socket, Account::Alice).context("login test")?;
// RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE // RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?; let select_res =
select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?;
// RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE // RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE
assert!(select_res.contains("[HIGHESTMODSEQ 1]")); assert!(select_res.contains("[HIGHESTMODSEQ 1]"));
@ -175,14 +205,25 @@ fn rfc4551_imapext_condstore() {
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?; lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?; noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
let store_res = store(imap_socket, Selection::All, Flag::Important, StoreAction::AddFlags, StoreMod::UnchangedSince(1))?; let store_res = store(
imap_socket,
Selection::All,
Flag::Important,
StoreAction::AddFlags,
StoreMod::UnchangedSince(1),
)?;
assert!(store_res.contains("[MODIFIED 2]")); assert!(store_res.contains("[MODIFIED 2]"));
assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))")); assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))"));
assert!(!store_res.contains("* 2 FETCH")); assert!(!store_res.contains("* 2 FETCH"));
assert_eq!(store_res.lines().count(), 2); assert_eq!(store_res.lines().count(), 2);
// RFC 3.1.4. FETCH and UID FETCH Commands // RFC 3.1.4. FETCH and UID FETCH Commands
let fetch_res = fetch(imap_socket, Selection::All, FetchKind::Rfc822Size, FetchMod::ChangedSince(2))?; let fetch_res = fetch(
imap_socket,
Selection::All,
FetchKind::Rfc822Size,
FetchMod::ChangedSince(2),
)?;
assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))")); assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))"));
assert!(!fetch_res.contains("* 2 FETCH")); assert!(!fetch_res.contains("* 2 FETCH"));
assert_eq!(store_res.lines().count(), 2); assert_eq!(store_res.lines().count(), 2);

View file

@ -286,7 +286,12 @@ pub fn noop_exists(imap: &mut TcpStream, must_exists: u32) -> Result<()> {
} }
} }
pub fn fetch(imap: &mut TcpStream, selection: Selection, kind: FetchKind, modifier: FetchMod) -> Result<String> { pub fn fetch(
imap: &mut TcpStream,
selection: Selection,
kind: FetchKind,
modifier: FetchMod,
) -> Result<String> {
let mut buffer: [u8; 65535] = [0; 65535]; let mut buffer: [u8; 65535] = [0; 65535];
let sel_str = match selection { let sel_str = match selection {
@ -367,7 +372,7 @@ pub fn store(
sel: Selection, sel: Selection,
flag: Flag, flag: Flag,
action: StoreAction, action: StoreAction,
modifier: StoreMod modifier: StoreMod,
) -> Result<String> { ) -> Result<String> {
let mut buffer: [u8; 6000] = [0; 6000]; let mut buffer: [u8; 6000] = [0; 6000];