Merge pull request 'Implement IDLE' (#72) from feat/idle into main
Reviewed-on: #72
This commit is contained in:
commit
0f227e44e4
19 changed files with 732 additions and 440 deletions
2
Cargo.lock
generated
2
Cargo.lock
generated
|
@ -1822,7 +1822,7 @@ dependencies = [
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "imap-flow"
|
name = "imap-flow"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#0f548a2070aace09f9f9a0b6ef221efefb8b110b"
|
source = "git+https://github.com/superboum/imap-flow.git?branch=custom/aerogramme#60ff9e082ccfcd10a042b616d8038a578fa0c8ff"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"bounded-static",
|
"bounded-static",
|
||||||
"bytes",
|
"bytes",
|
||||||
|
|
80
src/bayou.rs
80
src/bayou.rs
|
@ -2,7 +2,7 @@ use std::sync::{Arc, Weak};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use anyhow::{anyhow, bail, Result};
|
use anyhow::{anyhow, bail, Result};
|
||||||
use log::{debug, error, info};
|
use log::error;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::sync::{watch, Notify};
|
use tokio::sync::{watch, Notify};
|
||||||
|
@ -84,21 +84,21 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// 1. List checkpoints
|
// 1. List checkpoints
|
||||||
let checkpoints = self.list_checkpoints().await?;
|
let checkpoints = self.list_checkpoints().await?;
|
||||||
debug!("(sync) listed checkpoints: {:?}", checkpoints);
|
tracing::debug!("(sync) listed checkpoints: {:?}", checkpoints);
|
||||||
|
|
||||||
// 2. Load last checkpoint if different from currently used one
|
// 2. Load last checkpoint if different from currently used one
|
||||||
let checkpoint = if let Some((ts, key)) = checkpoints.last() {
|
let checkpoint = if let Some((ts, key)) = checkpoints.last() {
|
||||||
if *ts == self.checkpoint.0 {
|
if *ts == self.checkpoint.0 {
|
||||||
(*ts, None)
|
(*ts, None)
|
||||||
} else {
|
} else {
|
||||||
debug!("(sync) loading checkpoint: {}", key);
|
tracing::debug!("(sync) loading checkpoint: {}", key);
|
||||||
|
|
||||||
let buf = self
|
let buf = self
|
||||||
.storage
|
.storage
|
||||||
.blob_fetch(&storage::BlobRef(key.to_string()))
|
.blob_fetch(&storage::BlobRef(key.to_string()))
|
||||||
.await?
|
.await?
|
||||||
.value;
|
.value;
|
||||||
debug!("(sync) checkpoint body length: {}", buf.len());
|
tracing::debug!("(sync) checkpoint body length: {}", buf.len());
|
||||||
|
|
||||||
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
let ck = open_deserialize::<S>(&buf, &self.key)?;
|
||||||
(*ts, Some(ck))
|
(*ts, Some(ck))
|
||||||
|
@ -112,7 +112,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(ck) = checkpoint.1 {
|
if let Some(ck) = checkpoint.1 {
|
||||||
debug!(
|
tracing::debug!(
|
||||||
"(sync) updating checkpoint to loaded state at {:?}",
|
"(sync) updating checkpoint to loaded state at {:?}",
|
||||||
checkpoint.0
|
checkpoint.0
|
||||||
);
|
);
|
||||||
|
@ -127,7 +127,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// 3. List all operations starting from checkpoint
|
// 3. List all operations starting from checkpoint
|
||||||
let ts_ser = self.checkpoint.0.to_string();
|
let ts_ser = self.checkpoint.0.to_string();
|
||||||
debug!("(sync) looking up operations starting at {}", ts_ser);
|
tracing::debug!("(sync) looking up operations starting at {}", ts_ser);
|
||||||
let ops_map = self
|
let ops_map = self
|
||||||
.storage
|
.storage
|
||||||
.row_fetch(&storage::Selector::Range {
|
.row_fetch(&storage::Selector::Range {
|
||||||
|
@ -161,7 +161,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
ops.sort_by_key(|(ts, _)| *ts);
|
ops.sort_by_key(|(ts, _)| *ts);
|
||||||
debug!("(sync) {} operations", ops.len());
|
tracing::debug!("(sync) {} operations", ops.len());
|
||||||
|
|
||||||
if ops.len() < self.history.len() {
|
if ops.len() < self.history.len() {
|
||||||
bail!("Some operations have disappeared from storage!");
|
bail!("Some operations have disappeared from storage!");
|
||||||
|
@ -238,12 +238,16 @@ impl<S: BayouState> Bayou<S> {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn notifier(&self) -> std::sync::Weak<Notify> {
|
||||||
|
Arc::downgrade(&self.watch.learnt_remote_update)
|
||||||
|
}
|
||||||
|
|
||||||
/// Applies a new operation on the state. Once this function returns,
|
/// Applies a new operation on the state. Once this function returns,
|
||||||
/// the operation has been safely persisted to storage backend.
|
/// the operation has been safely persisted to storage backend.
|
||||||
/// Make sure to call `.opportunistic_sync()` before doing this,
|
/// Make sure to call `.opportunistic_sync()` before doing this,
|
||||||
/// and even before calculating the `op` argument given here.
|
/// and even before calculating the `op` argument given here.
|
||||||
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
pub async fn push(&mut self, op: S::Op) -> Result<()> {
|
||||||
debug!("(push) add operation: {:?}", op);
|
tracing::debug!("(push) add operation: {:?}", op);
|
||||||
|
|
||||||
let ts = Timestamp::after(
|
let ts = Timestamp::after(
|
||||||
self.history
|
self.history
|
||||||
|
@ -257,7 +261,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
seal_serialize(&op, &self.key)?,
|
seal_serialize(&op, &self.key)?,
|
||||||
);
|
);
|
||||||
self.storage.row_insert(vec![row_val]).await?;
|
self.storage.row_insert(vec![row_val]).await?;
|
||||||
self.watch.notify.notify_one();
|
self.watch.propagate_local_update.notify_one();
|
||||||
|
|
||||||
let new_state = self.state().apply(&op);
|
let new_state = self.state().apply(&op);
|
||||||
self.history.push((ts, op, Some(new_state)));
|
self.history.push((ts, op, Some(new_state)));
|
||||||
|
@ -305,18 +309,18 @@ impl<S: BayouState> Bayou<S> {
|
||||||
{
|
{
|
||||||
Some(i) => i,
|
Some(i) => i,
|
||||||
None => {
|
None => {
|
||||||
debug!("(cp) Oldest operation is too recent to trigger checkpoint");
|
tracing::debug!("(cp) Oldest operation is too recent to trigger checkpoint");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
if i_cp < CHECKPOINT_MIN_OPS {
|
if i_cp < CHECKPOINT_MIN_OPS {
|
||||||
debug!("(cp) Not enough old operations to trigger checkpoint");
|
tracing::debug!("(cp) Not enough old operations to trigger checkpoint");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
|
||||||
let ts_cp = self.history[i_cp].0;
|
let ts_cp = self.history[i_cp].0;
|
||||||
debug!(
|
tracing::debug!(
|
||||||
"(cp) we could checkpoint at time {} (index {} in history)",
|
"(cp) we could checkpoint at time {} (index {} in history)",
|
||||||
ts_cp.to_string(),
|
ts_cp.to_string(),
|
||||||
i_cp
|
i_cp
|
||||||
|
@ -324,13 +328,13 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Check existing checkpoints: if last one is too recent, don't checkpoint again.
|
// Check existing checkpoints: if last one is too recent, don't checkpoint again.
|
||||||
let existing_checkpoints = self.list_checkpoints().await?;
|
let existing_checkpoints = self.list_checkpoints().await?;
|
||||||
debug!("(cp) listed checkpoints: {:?}", existing_checkpoints);
|
tracing::debug!("(cp) listed checkpoints: {:?}", existing_checkpoints);
|
||||||
|
|
||||||
if let Some(last_cp) = existing_checkpoints.last() {
|
if let Some(last_cp) = existing_checkpoints.last() {
|
||||||
if (ts_cp.msec as i128 - last_cp.0.msec as i128)
|
if (ts_cp.msec as i128 - last_cp.0.msec as i128)
|
||||||
< CHECKPOINT_INTERVAL.as_millis() as i128
|
< CHECKPOINT_INTERVAL.as_millis() as i128
|
||||||
{
|
{
|
||||||
debug!(
|
tracing::debug!(
|
||||||
"(cp) last checkpoint is too recent: {}, not checkpointing",
|
"(cp) last checkpoint is too recent: {}, not checkpointing",
|
||||||
last_cp.0.to_string()
|
last_cp.0.to_string()
|
||||||
);
|
);
|
||||||
|
@ -338,7 +342,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
debug!("(cp) saving checkpoint at {}", ts_cp.to_string());
|
tracing::debug!("(cp) saving checkpoint at {}", ts_cp.to_string());
|
||||||
|
|
||||||
// Calculate state at time of checkpoint
|
// Calculate state at time of checkpoint
|
||||||
let mut last_known_state = (0, &self.checkpoint.1);
|
let mut last_known_state = (0, &self.checkpoint.1);
|
||||||
|
@ -354,7 +358,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Serialize and save checkpoint
|
// Serialize and save checkpoint
|
||||||
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
|
let cryptoblob = seal_serialize(&state_cp, &self.key)?;
|
||||||
debug!("(cp) checkpoint body length: {}", cryptoblob.len());
|
tracing::debug!("(cp) checkpoint body length: {}", cryptoblob.len());
|
||||||
|
|
||||||
let blob_val = storage::BlobVal::new(
|
let blob_val = storage::BlobVal::new(
|
||||||
storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
|
storage::BlobRef(format!("{}/checkpoint/{}", self.path, ts_cp.to_string())),
|
||||||
|
@ -369,7 +373,7 @@ impl<S: BayouState> Bayou<S> {
|
||||||
|
|
||||||
// Delete blobs
|
// Delete blobs
|
||||||
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
|
for (_ts, key) in existing_checkpoints[..last_to_keep].iter() {
|
||||||
debug!("(cp) drop old checkpoint {}", key);
|
tracing::debug!("(cp) drop old checkpoint {}", key);
|
||||||
self.storage
|
self.storage
|
||||||
.blob_rm(&storage::BlobRef(key.to_string()))
|
.blob_rm(&storage::BlobRef(key.to_string()))
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -423,7 +427,8 @@ impl<S: BayouState> Bayou<S> {
|
||||||
struct K2vWatch {
|
struct K2vWatch {
|
||||||
target: storage::RowRef,
|
target: storage::RowRef,
|
||||||
rx: watch::Receiver<storage::RowRef>,
|
rx: watch::Receiver<storage::RowRef>,
|
||||||
notify: Notify,
|
propagate_local_update: Notify,
|
||||||
|
learnt_remote_update: Arc<Notify>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl K2vWatch {
|
impl K2vWatch {
|
||||||
|
@ -434,9 +439,15 @@ impl K2vWatch {
|
||||||
let storage = creds.storage.build().await?;
|
let storage = creds.storage.build().await?;
|
||||||
|
|
||||||
let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
|
let (tx, rx) = watch::channel::<storage::RowRef>(target.clone());
|
||||||
let notify = Notify::new();
|
let propagate_local_update = Notify::new();
|
||||||
|
let learnt_remote_update = Arc::new(Notify::new());
|
||||||
|
|
||||||
let watch = Arc::new(K2vWatch { target, rx, notify });
|
let watch = Arc::new(K2vWatch {
|
||||||
|
target,
|
||||||
|
rx,
|
||||||
|
propagate_local_update,
|
||||||
|
learnt_remote_update,
|
||||||
|
});
|
||||||
|
|
||||||
tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
|
tokio::spawn(Self::background_task(Arc::downgrade(&watch), storage, tx));
|
||||||
|
|
||||||
|
@ -448,18 +459,24 @@ impl K2vWatch {
|
||||||
storage: storage::Store,
|
storage: storage::Store,
|
||||||
tx: watch::Sender<storage::RowRef>,
|
tx: watch::Sender<storage::RowRef>,
|
||||||
) {
|
) {
|
||||||
let mut row = match Weak::upgrade(&self_weak) {
|
let (mut row, remote_update) = match Weak::upgrade(&self_weak) {
|
||||||
Some(this) => this.target.clone(),
|
Some(this) => (this.target.clone(), this.learnt_remote_update.clone()),
|
||||||
None => return,
|
None => return,
|
||||||
};
|
};
|
||||||
|
|
||||||
while let Some(this) = Weak::upgrade(&self_weak) {
|
while let Some(this) = Weak::upgrade(&self_weak) {
|
||||||
debug!(
|
tracing::debug!(
|
||||||
"bayou k2v watch bg loop iter ({}, {})",
|
"bayou k2v watch bg loop iter ({}, {})",
|
||||||
this.target.uid.shard, this.target.uid.sort
|
this.target.uid.shard,
|
||||||
|
this.target.uid.sort
|
||||||
);
|
);
|
||||||
tokio::select!(
|
tokio::select!(
|
||||||
|
// Needed to exit: will force a loop iteration every minutes,
|
||||||
|
// that will stop the loop if other Arc references have been dropped
|
||||||
|
// and free resources. Otherwise we would be blocked waiting forever...
|
||||||
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
|
_ = tokio::time::sleep(Duration::from_secs(60)) => continue,
|
||||||
|
|
||||||
|
// Watch if another instance has modified the log
|
||||||
update = storage.row_poll(&row) => {
|
update = storage.row_poll(&row) => {
|
||||||
match update {
|
match update {
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
|
@ -468,23 +485,30 @@ impl K2vWatch {
|
||||||
}
|
}
|
||||||
Ok(new_value) => {
|
Ok(new_value) => {
|
||||||
row = new_value.row_ref;
|
row = new_value.row_ref;
|
||||||
if tx.send(row.clone()).is_err() {
|
if let Err(e) = tx.send(row.clone()) {
|
||||||
|
tracing::warn!(err=?e, "(watch) can't record the new log ref");
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
tracing::debug!(row=?row, "(watch) learnt remote update");
|
||||||
|
this.learnt_remote_update.notify_waiters();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
_ = this.notify.notified() => {
|
|
||||||
|
// It appears we have modified the log, informing other people
|
||||||
|
_ = this.propagate_local_update.notified() => {
|
||||||
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
|
let rand = u128::to_be_bytes(thread_rng().gen()).to_vec();
|
||||||
let row_val = storage::RowVal::new(row.clone(), rand);
|
let row_val = storage::RowVal::new(row.clone(), rand);
|
||||||
if let Err(e) = storage.row_insert(vec![row_val]).await
|
if let Err(e) = storage.row_insert(vec![row_val]).await
|
||||||
{
|
{
|
||||||
error!("Error in bayou k2v watch updater loop: {}", e);
|
tracing::error!("Error in bayou k2v watch updater loop: {}", e);
|
||||||
tokio::time::sleep(Duration::from_secs(30)).await;
|
tokio::time::sleep(Duration::from_secs(30)).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
}
|
}
|
||||||
info!("bayou k2v watch bg loop exiting");
|
// unblock listeners
|
||||||
|
remote_update.notify_waiters();
|
||||||
|
tracing::info!("bayou k2v watch bg loop exiting");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
|
|
||||||
use imap_codec::imap_types::command::FetchModifier;
|
use imap_codec::imap_types::command::FetchModifier;
|
||||||
|
use imap_codec::imap_types::fetch::{MacroOrMessageDataItemNames, MessageDataItemName, Section};
|
||||||
|
|
||||||
/// Internal decisions based on fetched attributes
|
/// Internal decisions based on fetched attributes
|
||||||
/// passed by the client
|
/// passed by the client
|
||||||
|
@ -8,7 +8,11 @@ pub struct AttributesProxy {
|
||||||
pub attrs: Vec<MessageDataItemName<'static>>,
|
pub attrs: Vec<MessageDataItemName<'static>>,
|
||||||
}
|
}
|
||||||
impl AttributesProxy {
|
impl AttributesProxy {
|
||||||
pub fn new(attrs: &MacroOrMessageDataItemNames<'static>, modifiers: &[FetchModifier], is_uid_fetch: bool) -> Self {
|
pub fn new(
|
||||||
|
attrs: &MacroOrMessageDataItemNames<'static>,
|
||||||
|
modifiers: &[FetchModifier],
|
||||||
|
is_uid_fetch: bool,
|
||||||
|
) -> Self {
|
||||||
// Expand macros
|
// Expand macros
|
||||||
let mut fetch_attrs = match attrs {
|
let mut fetch_attrs = match attrs {
|
||||||
MacroOrMessageDataItemNames::Macro(m) => {
|
MacroOrMessageDataItemNames::Macro(m) => {
|
||||||
|
@ -44,14 +48,13 @@ impl AttributesProxy {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn is_enabling_condstore(&self) -> bool {
|
pub fn is_enabling_condstore(&self) -> bool {
|
||||||
self.attrs.iter().any(|x| {
|
self.attrs
|
||||||
matches!(x, MessageDataItemName::ModSeq)
|
.iter()
|
||||||
})
|
.any(|x| matches!(x, MessageDataItemName::ModSeq))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn need_body(&self) -> bool {
|
pub fn need_body(&self) -> bool {
|
||||||
self.attrs.iter().any(|x| {
|
self.attrs.iter().any(|x| match x {
|
||||||
match x {
|
|
||||||
MessageDataItemName::Body
|
MessageDataItemName::Body
|
||||||
| MessageDataItemName::Rfc822
|
| MessageDataItemName::Rfc822
|
||||||
| MessageDataItemName::Rfc822Text
|
| MessageDataItemName::Rfc822Text
|
||||||
|
@ -69,7 +72,6 @@ impl AttributesProxy {
|
||||||
},
|
},
|
||||||
MessageDataItemName::BodyExt { .. } => true,
|
MessageDataItemName::BodyExt { .. } => true,
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
use imap_codec::imap_types::command::{FetchModifier, StoreModifier, SelectExamineModifier};
|
use imap_codec::imap_types::command::{FetchModifier, SelectExamineModifier, StoreModifier};
|
||||||
use imap_codec::imap_types::core::NonEmptyVec;
|
use imap_codec::imap_types::core::NonEmptyVec;
|
||||||
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
|
use imap_codec::imap_types::extensions::enable::{CapabilityEnable, Utf8Kind};
|
||||||
use imap_codec::imap_types::response::Capability;
|
use imap_codec::imap_types::response::Capability;
|
||||||
|
@ -30,6 +30,7 @@ impl Default for ServerCapability {
|
||||||
Capability::Enable,
|
Capability::Enable,
|
||||||
Capability::Move,
|
Capability::Move,
|
||||||
Capability::LiteralPlus,
|
Capability::LiteralPlus,
|
||||||
|
Capability::Idle,
|
||||||
capability_unselect(),
|
capability_unselect(),
|
||||||
capability_condstore(),
|
capability_condstore(),
|
||||||
//capability_qresync(),
|
//capability_qresync(),
|
||||||
|
@ -72,7 +73,6 @@ impl ClientStatus {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
pub struct ClientCapability {
|
pub struct ClientCapability {
|
||||||
pub condstore: ClientStatus,
|
pub condstore: ClientStatus,
|
||||||
pub utf8kind: Option<Utf8Kind>,
|
pub utf8kind: Option<Utf8Kind>,
|
||||||
|
@ -100,13 +100,19 @@ impl ClientCapability {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) {
|
pub fn fetch_modifiers_enable(&mut self, mods: &[FetchModifier]) {
|
||||||
if mods.iter().any(|x| matches!(x, FetchModifier::ChangedSince(..))) {
|
if mods
|
||||||
|
.iter()
|
||||||
|
.any(|x| matches!(x, FetchModifier::ChangedSince(..)))
|
||||||
|
{
|
||||||
self.enable_condstore()
|
self.enable_condstore()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) {
|
pub fn store_modifiers_enable(&mut self, mods: &[StoreModifier]) {
|
||||||
if mods.iter().any(|x| matches!(x, StoreModifier::UnchangedSince(..))) {
|
if mods
|
||||||
|
.iter()
|
||||||
|
.any(|x| matches!(x, StoreModifier::UnchangedSince(..)))
|
||||||
|
{
|
||||||
self.enable_condstore()
|
self.enable_condstore()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -453,7 +453,7 @@ impl<'a> AuthenticatedContext<'a> {
|
||||||
.code(Code::ReadWrite)
|
.code(Code::ReadWrite)
|
||||||
.set_body(data)
|
.set_body(data)
|
||||||
.ok()?,
|
.ok()?,
|
||||||
flow::Transition::Select(mb),
|
flow::Transition::Select(mb, flow::MailboxPerm::ReadWrite),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,7 +491,7 @@ impl<'a> AuthenticatedContext<'a> {
|
||||||
.code(Code::ReadOnly)
|
.code(Code::ReadOnly)
|
||||||
.set_body(data)
|
.set_body(data)
|
||||||
.ok()?,
|
.ok()?,
|
||||||
flow::Transition::Examine(mb),
|
flow::Transition::Select(mb, flow::MailboxPerm::ReadOnly),
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,164 +0,0 @@
|
||||||
use std::sync::Arc;
|
|
||||||
use std::num::NonZeroU64;
|
|
||||||
|
|
||||||
use anyhow::Result;
|
|
||||||
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier};
|
|
||||||
use imap_codec::imap_types::core::Charset;
|
|
||||||
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
|
|
||||||
use imap_codec::imap_types::search::SearchKey;
|
|
||||||
use imap_codec::imap_types::sequence::SequenceSet;
|
|
||||||
|
|
||||||
use crate::imap::attributes::AttributesProxy;
|
|
||||||
use crate::imap::capability::{ClientCapability, ServerCapability};
|
|
||||||
use crate::imap::command::{anystate, authenticated};
|
|
||||||
use crate::imap::flow;
|
|
||||||
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
|
|
||||||
use crate::imap::response::Response;
|
|
||||||
use crate::mail::user::User;
|
|
||||||
|
|
||||||
pub struct ExaminedContext<'a> {
|
|
||||||
pub req: &'a Command<'static>,
|
|
||||||
pub user: &'a Arc<User>,
|
|
||||||
pub mailbox: &'a mut MailboxView,
|
|
||||||
pub server_capabilities: &'a ServerCapability,
|
|
||||||
pub client_capabilities: &'a mut ClientCapability,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn dispatch(ctx: ExaminedContext<'_>) -> Result<(Response<'static>, flow::Transition)> {
|
|
||||||
match &ctx.req.body {
|
|
||||||
// Any State
|
|
||||||
// noop is specific to this state
|
|
||||||
CommandBody::Capability => {
|
|
||||||
anystate::capability(ctx.req.tag.clone(), ctx.server_capabilities)
|
|
||||||
}
|
|
||||||
CommandBody::Logout => anystate::logout(),
|
|
||||||
|
|
||||||
// Specific to the EXAMINE state (specialization of the SELECTED state)
|
|
||||||
// ~3 commands -> close, fetch, search + NOOP
|
|
||||||
CommandBody::Close => ctx.close("CLOSE").await,
|
|
||||||
CommandBody::Fetch {
|
|
||||||
sequence_set,
|
|
||||||
macro_or_item_names,
|
|
||||||
modifiers,
|
|
||||||
uid,
|
|
||||||
} => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
|
|
||||||
CommandBody::Search {
|
|
||||||
charset,
|
|
||||||
criteria,
|
|
||||||
uid,
|
|
||||||
} => ctx.search(charset, criteria, uid).await,
|
|
||||||
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
|
|
||||||
CommandBody::Expunge { .. } | CommandBody::Store { .. } => Ok((
|
|
||||||
Response::build()
|
|
||||||
.to_req(ctx.req)
|
|
||||||
.message("Forbidden command: can't write in read-only mode (EXAMINE)")
|
|
||||||
.no()?,
|
|
||||||
flow::Transition::None,
|
|
||||||
)),
|
|
||||||
|
|
||||||
// UNSELECT extension (rfc3691)
|
|
||||||
CommandBody::Unselect => ctx.close("UNSELECT").await,
|
|
||||||
|
|
||||||
// In examined mode, we fallback to authenticated when needed
|
|
||||||
_ => {
|
|
||||||
authenticated::dispatch(authenticated::AuthenticatedContext {
|
|
||||||
req: ctx.req,
|
|
||||||
server_capabilities: ctx.server_capabilities,
|
|
||||||
client_capabilities: ctx.client_capabilities,
|
|
||||||
user: ctx.user,
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// --- PRIVATE ---
|
|
||||||
|
|
||||||
impl<'a> ExaminedContext<'a> {
|
|
||||||
/// CLOSE in examined state is not the same as in selected state
|
|
||||||
/// (in selected state it also does an EXPUNGE, here it doesn't)
|
|
||||||
async fn close(self, kind: &str) -> Result<(Response<'static>, flow::Transition)> {
|
|
||||||
Ok((
|
|
||||||
Response::build()
|
|
||||||
.to_req(self.req)
|
|
||||||
.message(format!("{} completed", kind))
|
|
||||||
.ok()?,
|
|
||||||
flow::Transition::Unselect,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn fetch(
|
|
||||||
self,
|
|
||||||
sequence_set: &SequenceSet,
|
|
||||||
attributes: &'a MacroOrMessageDataItemNames<'static>,
|
|
||||||
modifiers: &[FetchModifier],
|
|
||||||
uid: &bool,
|
|
||||||
) -> Result<(Response<'static>, flow::Transition)> {
|
|
||||||
let ap = AttributesProxy::new(attributes, modifiers, *uid);
|
|
||||||
let mut changed_since: Option<NonZeroU64> = None;
|
|
||||||
modifiers.iter().for_each(|m| match m {
|
|
||||||
FetchModifier::ChangedSince(val) => {
|
|
||||||
changed_since = Some(*val);
|
|
||||||
},
|
|
||||||
});
|
|
||||||
|
|
||||||
match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
|
|
||||||
Ok(resp) => {
|
|
||||||
// Capabilities enabling logic only on successful command
|
|
||||||
// (according to my understanding of the spec)
|
|
||||||
self.client_capabilities.attributes_enable(&ap);
|
|
||||||
self.client_capabilities.fetch_modifiers_enable(modifiers);
|
|
||||||
|
|
||||||
Ok((
|
|
||||||
Response::build()
|
|
||||||
.to_req(self.req)
|
|
||||||
.message("FETCH completed")
|
|
||||||
.set_body(resp)
|
|
||||||
.ok()?,
|
|
||||||
flow::Transition::None,
|
|
||||||
))
|
|
||||||
},
|
|
||||||
Err(e) => Ok((
|
|
||||||
Response::build()
|
|
||||||
.to_req(self.req)
|
|
||||||
.message(e.to_string())
|
|
||||||
.no()?,
|
|
||||||
flow::Transition::None,
|
|
||||||
)),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn search(
|
|
||||||
self,
|
|
||||||
charset: &Option<Charset<'a>>,
|
|
||||||
criteria: &SearchKey<'a>,
|
|
||||||
uid: &bool,
|
|
||||||
) -> Result<(Response<'static>, flow::Transition)> {
|
|
||||||
let (found, enable_condstore) = self.mailbox.search(charset, criteria, *uid).await?;
|
|
||||||
if enable_condstore {
|
|
||||||
self.client_capabilities.enable_condstore();
|
|
||||||
}
|
|
||||||
Ok((
|
|
||||||
Response::build()
|
|
||||||
.to_req(self.req)
|
|
||||||
.set_body(found)
|
|
||||||
.message("SEARCH completed")
|
|
||||||
.ok()?,
|
|
||||||
flow::Transition::None,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn noop(self) -> Result<(Response<'static>, flow::Transition)> {
|
|
||||||
self.mailbox.internal.mailbox.force_sync().await?;
|
|
||||||
|
|
||||||
let updates = self.mailbox.update(UpdateParameters::default()).await?;
|
|
||||||
Ok((
|
|
||||||
Response::build()
|
|
||||||
.to_req(self.req)
|
|
||||||
.message("NOOP completed.")
|
|
||||||
.set_body(updates)
|
|
||||||
.ok()?,
|
|
||||||
flow::Transition::None,
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -1,7 +1,6 @@
|
||||||
pub mod anonymous;
|
pub mod anonymous;
|
||||||
pub mod anystate;
|
pub mod anystate;
|
||||||
pub mod authenticated;
|
pub mod authenticated;
|
||||||
pub mod examined;
|
|
||||||
pub mod selected;
|
pub mod selected;
|
||||||
|
|
||||||
use crate::mail::user::INBOX;
|
use crate::mail::user::INBOX;
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::sync::Arc;
|
|
||||||
use std::num::NonZeroU64;
|
use std::num::NonZeroU64;
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
|
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
|
||||||
|
@ -11,12 +11,12 @@ use imap_codec::imap_types::response::{Code, CodeOther};
|
||||||
use imap_codec::imap_types::search::SearchKey;
|
use imap_codec::imap_types::search::SearchKey;
|
||||||
use imap_codec::imap_types::sequence::SequenceSet;
|
use imap_codec::imap_types::sequence::SequenceSet;
|
||||||
|
|
||||||
|
use crate::imap::attributes::AttributesProxy;
|
||||||
use crate::imap::capability::{ClientCapability, ServerCapability};
|
use crate::imap::capability::{ClientCapability, ServerCapability};
|
||||||
use crate::imap::command::{anystate, authenticated, MailboxName};
|
use crate::imap::command::{anystate, authenticated, MailboxName};
|
||||||
use crate::imap::flow;
|
use crate::imap::flow;
|
||||||
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
|
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
|
||||||
use crate::imap::response::Response;
|
use crate::imap::response::Response;
|
||||||
use crate::imap::attributes::AttributesProxy;
|
|
||||||
use crate::mail::user::User;
|
use crate::mail::user::User;
|
||||||
|
|
||||||
pub struct SelectedContext<'a> {
|
pub struct SelectedContext<'a> {
|
||||||
|
@ -25,6 +25,7 @@ pub struct SelectedContext<'a> {
|
||||||
pub mailbox: &'a mut MailboxView,
|
pub mailbox: &'a mut MailboxView,
|
||||||
pub server_capabilities: &'a ServerCapability,
|
pub server_capabilities: &'a ServerCapability,
|
||||||
pub client_capabilities: &'a mut ClientCapability,
|
pub client_capabilities: &'a mut ClientCapability,
|
||||||
|
pub perm: &'a flow::MailboxPerm,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn dispatch<'a>(
|
pub async fn dispatch<'a>(
|
||||||
|
@ -39,14 +40,20 @@ pub async fn dispatch<'a>(
|
||||||
CommandBody::Logout => anystate::logout(),
|
CommandBody::Logout => anystate::logout(),
|
||||||
|
|
||||||
// Specific to this state (7 commands + NOOP)
|
// Specific to this state (7 commands + NOOP)
|
||||||
CommandBody::Close => ctx.close().await,
|
CommandBody::Close => match ctx.perm {
|
||||||
|
flow::MailboxPerm::ReadWrite => ctx.close().await,
|
||||||
|
flow::MailboxPerm::ReadOnly => ctx.examine_close().await,
|
||||||
|
},
|
||||||
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
|
CommandBody::Noop | CommandBody::Check => ctx.noop().await,
|
||||||
CommandBody::Fetch {
|
CommandBody::Fetch {
|
||||||
sequence_set,
|
sequence_set,
|
||||||
macro_or_item_names,
|
macro_or_item_names,
|
||||||
modifiers,
|
modifiers,
|
||||||
uid,
|
uid,
|
||||||
} => ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid).await,
|
} => {
|
||||||
|
ctx.fetch(sequence_set, macro_or_item_names, modifiers, uid)
|
||||||
|
.await
|
||||||
|
}
|
||||||
CommandBody::Search {
|
CommandBody::Search {
|
||||||
charset,
|
charset,
|
||||||
criteria,
|
criteria,
|
||||||
|
@ -60,7 +67,10 @@ pub async fn dispatch<'a>(
|
||||||
flags,
|
flags,
|
||||||
modifiers,
|
modifiers,
|
||||||
uid,
|
uid,
|
||||||
} => ctx.store(sequence_set, kind, response, flags, modifiers, uid).await,
|
} => {
|
||||||
|
ctx.store(sequence_set, kind, response, flags, modifiers, uid)
|
||||||
|
.await
|
||||||
|
}
|
||||||
CommandBody::Copy {
|
CommandBody::Copy {
|
||||||
sequence_set,
|
sequence_set,
|
||||||
mailbox,
|
mailbox,
|
||||||
|
@ -75,6 +85,15 @@ pub async fn dispatch<'a>(
|
||||||
// UNSELECT extension (rfc3691)
|
// UNSELECT extension (rfc3691)
|
||||||
CommandBody::Unselect => ctx.unselect().await,
|
CommandBody::Unselect => ctx.unselect().await,
|
||||||
|
|
||||||
|
// IDLE extension (rfc2177)
|
||||||
|
CommandBody::Idle => Ok((
|
||||||
|
Response::build()
|
||||||
|
.to_req(ctx.req)
|
||||||
|
.message("DUMMY command due to anti-pattern in the code")
|
||||||
|
.ok()?,
|
||||||
|
flow::Transition::Idle(ctx.req.tag.clone(), tokio::sync::Notify::new()),
|
||||||
|
)),
|
||||||
|
|
||||||
// In selected mode, we fallback to authenticated when needed
|
// In selected mode, we fallback to authenticated when needed
|
||||||
_ => {
|
_ => {
|
||||||
authenticated::dispatch(authenticated::AuthenticatedContext {
|
authenticated::dispatch(authenticated::AuthenticatedContext {
|
||||||
|
@ -102,6 +121,18 @@ impl<'a> SelectedContext<'a> {
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// CLOSE in examined state is not the same as in selected state
|
||||||
|
/// (in selected state it also does an EXPUNGE, here it doesn't)
|
||||||
|
async fn examine_close(self) -> Result<(Response<'static>, flow::Transition)> {
|
||||||
|
Ok((
|
||||||
|
Response::build()
|
||||||
|
.to_req(self.req)
|
||||||
|
.message("CLOSE completed")
|
||||||
|
.ok()?,
|
||||||
|
flow::Transition::Unselect,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> {
|
async fn unselect(self) -> Result<(Response<'static>, flow::Transition)> {
|
||||||
Ok((
|
Ok((
|
||||||
Response::build()
|
Response::build()
|
||||||
|
@ -124,10 +155,14 @@ impl<'a> SelectedContext<'a> {
|
||||||
modifiers.iter().for_each(|m| match m {
|
modifiers.iter().for_each(|m| match m {
|
||||||
FetchModifier::ChangedSince(val) => {
|
FetchModifier::ChangedSince(val) => {
|
||||||
changed_since = Some(*val);
|
changed_since = Some(*val);
|
||||||
},
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
match self.mailbox.fetch(sequence_set, &ap, changed_since, uid).await {
|
match self
|
||||||
|
.mailbox
|
||||||
|
.fetch(sequence_set, &ap, changed_since, uid)
|
||||||
|
.await
|
||||||
|
{
|
||||||
Ok(resp) => {
|
Ok(resp) => {
|
||||||
// Capabilities enabling logic only on successful command
|
// Capabilities enabling logic only on successful command
|
||||||
// (according to my understanding of the spec)
|
// (according to my understanding of the spec)
|
||||||
|
@ -143,7 +178,7 @@ impl<'a> SelectedContext<'a> {
|
||||||
.ok()?,
|
.ok()?,
|
||||||
flow::Transition::None,
|
flow::Transition::None,
|
||||||
))
|
))
|
||||||
},
|
}
|
||||||
Err(e) => Ok((
|
Err(e) => Ok((
|
||||||
Response::build()
|
Response::build()
|
||||||
.to_req(self.req)
|
.to_req(self.req)
|
||||||
|
@ -189,6 +224,10 @@ impl<'a> SelectedContext<'a> {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
|
async fn expunge(self) -> Result<(Response<'static>, flow::Transition)> {
|
||||||
|
if let Some(failed) = self.fail_read_only() {
|
||||||
|
return Ok((failed, flow::Transition::None));
|
||||||
|
}
|
||||||
|
|
||||||
let tag = self.req.tag.clone();
|
let tag = self.req.tag.clone();
|
||||||
let data = self.mailbox.expunge().await?;
|
let data = self.mailbox.expunge().await?;
|
||||||
|
|
||||||
|
@ -211,11 +250,15 @@ impl<'a> SelectedContext<'a> {
|
||||||
modifiers: &[StoreModifier],
|
modifiers: &[StoreModifier],
|
||||||
uid: &bool,
|
uid: &bool,
|
||||||
) -> Result<(Response<'static>, flow::Transition)> {
|
) -> Result<(Response<'static>, flow::Transition)> {
|
||||||
|
if let Some(failed) = self.fail_read_only() {
|
||||||
|
return Ok((failed, flow::Transition::None));
|
||||||
|
}
|
||||||
|
|
||||||
let mut unchanged_since: Option<NonZeroU64> = None;
|
let mut unchanged_since: Option<NonZeroU64> = None;
|
||||||
modifiers.iter().for_each(|m| match m {
|
modifiers.iter().for_each(|m| match m {
|
||||||
StoreModifier::UnchangedSince(val) => {
|
StoreModifier::UnchangedSince(val) => {
|
||||||
unchanged_since = Some(*val);
|
unchanged_since = Some(*val);
|
||||||
},
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
let (data, modified) = self
|
let (data, modified) = self
|
||||||
|
@ -228,21 +271,26 @@ impl<'a> SelectedContext<'a> {
|
||||||
.message("STORE completed")
|
.message("STORE completed")
|
||||||
.set_body(data);
|
.set_body(data);
|
||||||
|
|
||||||
|
|
||||||
match modified[..] {
|
match modified[..] {
|
||||||
[] => (),
|
[] => (),
|
||||||
[_head, ..] => {
|
[_head, ..] => {
|
||||||
let modified_str = format!("MODIFIED {}", modified.into_iter().map(|x| x.to_string()).collect::<Vec<_>>().join(","));
|
let modified_str = format!(
|
||||||
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(modified_str.into_bytes())));
|
"MODIFIED {}",
|
||||||
},
|
modified
|
||||||
|
.into_iter()
|
||||||
|
.map(|x| x.to_string())
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(",")
|
||||||
|
);
|
||||||
|
ok_resp = ok_resp.code(Code::Other(CodeOther::unvalidated(
|
||||||
|
modified_str.into_bytes(),
|
||||||
|
)));
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
self.client_capabilities.store_modifiers_enable(modifiers);
|
self.client_capabilities.store_modifiers_enable(modifiers);
|
||||||
|
|
||||||
Ok((ok_resp.ok()?,
|
Ok((ok_resp.ok()?, flow::Transition::None))
|
||||||
flow::Transition::None,
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn copy(
|
async fn copy(
|
||||||
|
@ -251,6 +299,11 @@ impl<'a> SelectedContext<'a> {
|
||||||
mailbox: &MailboxCodec<'a>,
|
mailbox: &MailboxCodec<'a>,
|
||||||
uid: &bool,
|
uid: &bool,
|
||||||
) -> Result<(Response<'static>, flow::Transition)> {
|
) -> Result<(Response<'static>, flow::Transition)> {
|
||||||
|
//@FIXME Could copy be valid in EXAMINE mode?
|
||||||
|
if let Some(failed) = self.fail_read_only() {
|
||||||
|
return Ok((failed, flow::Transition::None));
|
||||||
|
}
|
||||||
|
|
||||||
let name: &str = MailboxName(mailbox).try_into()?;
|
let name: &str = MailboxName(mailbox).try_into()?;
|
||||||
|
|
||||||
let mb_opt = self.user.open_mailbox(&name).await?;
|
let mb_opt = self.user.open_mailbox(&name).await?;
|
||||||
|
@ -303,6 +356,10 @@ impl<'a> SelectedContext<'a> {
|
||||||
mailbox: &MailboxCodec<'a>,
|
mailbox: &MailboxCodec<'a>,
|
||||||
uid: &bool,
|
uid: &bool,
|
||||||
) -> Result<(Response<'static>, flow::Transition)> {
|
) -> Result<(Response<'static>, flow::Transition)> {
|
||||||
|
if let Some(failed) = self.fail_read_only() {
|
||||||
|
return Ok((failed, flow::Transition::None));
|
||||||
|
}
|
||||||
|
|
||||||
let name: &str = MailboxName(mailbox).try_into()?;
|
let name: &str = MailboxName(mailbox).try_into()?;
|
||||||
|
|
||||||
let mb_opt = self.user.open_mailbox(&name).await?;
|
let mb_opt = self.user.open_mailbox(&name).await?;
|
||||||
|
@ -350,4 +407,17 @@ impl<'a> SelectedContext<'a> {
|
||||||
flow::Transition::None,
|
flow::Transition::None,
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn fail_read_only(&self) -> Option<Response<'static>> {
|
||||||
|
match self.perm {
|
||||||
|
flow::MailboxPerm::ReadWrite => None,
|
||||||
|
flow::MailboxPerm::ReadOnly => Some(
|
||||||
|
Response::build()
|
||||||
|
.to_req(self.req)
|
||||||
|
.message("Write command are forbidden while exmining mailbox")
|
||||||
|
.no()
|
||||||
|
.unwrap(),
|
||||||
|
),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,9 +1,11 @@
|
||||||
use std::error::Error as StdError;
|
use std::error::Error as StdError;
|
||||||
use std::fmt;
|
use std::fmt;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
use crate::imap::mailbox_view::MailboxView;
|
use crate::imap::mailbox_view::MailboxView;
|
||||||
use crate::mail::user::User;
|
use crate::mail::user::User;
|
||||||
|
use imap_codec::imap_types::core::Tag;
|
||||||
|
|
||||||
#[derive(Debug)]
|
#[derive(Debug)]
|
||||||
pub enum Error {
|
pub enum Error {
|
||||||
|
@ -19,44 +21,84 @@ impl StdError for Error {}
|
||||||
pub enum State {
|
pub enum State {
|
||||||
NotAuthenticated,
|
NotAuthenticated,
|
||||||
Authenticated(Arc<User>),
|
Authenticated(Arc<User>),
|
||||||
Selected(Arc<User>, MailboxView),
|
Selected(Arc<User>, MailboxView, MailboxPerm),
|
||||||
// Examined is like Selected, but indicates that the mailbox is read-only
|
Idle(
|
||||||
Examined(Arc<User>, MailboxView),
|
Arc<User>,
|
||||||
|
MailboxView,
|
||||||
|
MailboxPerm,
|
||||||
|
Tag<'static>,
|
||||||
|
Arc<Notify>,
|
||||||
|
),
|
||||||
Logout,
|
Logout,
|
||||||
}
|
}
|
||||||
|
impl fmt::Display for State {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
use State::*;
|
||||||
|
match self {
|
||||||
|
NotAuthenticated => write!(f, "NotAuthenticated"),
|
||||||
|
Authenticated(..) => write!(f, "Authenticated"),
|
||||||
|
Selected(..) => write!(f, "Selected"),
|
||||||
|
Idle(..) => write!(f, "Idle"),
|
||||||
|
Logout => write!(f, "Logout"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
|
pub enum MailboxPerm {
|
||||||
|
ReadOnly,
|
||||||
|
ReadWrite,
|
||||||
|
}
|
||||||
|
|
||||||
pub enum Transition {
|
pub enum Transition {
|
||||||
None,
|
None,
|
||||||
Authenticate(Arc<User>),
|
Authenticate(Arc<User>),
|
||||||
Examine(MailboxView),
|
Select(MailboxView, MailboxPerm),
|
||||||
Select(MailboxView),
|
Idle(Tag<'static>, Notify),
|
||||||
|
UnIdle,
|
||||||
Unselect,
|
Unselect,
|
||||||
Logout,
|
Logout,
|
||||||
}
|
}
|
||||||
|
impl fmt::Display for Transition {
|
||||||
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||||
|
use Transition::*;
|
||||||
|
match self {
|
||||||
|
None => write!(f, "None"),
|
||||||
|
Authenticate(..) => write!(f, "Authenticated"),
|
||||||
|
Select(..) => write!(f, "Selected"),
|
||||||
|
Idle(..) => write!(f, "Idle"),
|
||||||
|
UnIdle => write!(f, "UnIdle"),
|
||||||
|
Unselect => write!(f, "Unselect"),
|
||||||
|
Logout => write!(f, "Logout"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// See RFC3501 section 3.
|
// See RFC3501 section 3.
|
||||||
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
|
// https://datatracker.ietf.org/doc/html/rfc3501#page-13
|
||||||
impl State {
|
impl State {
|
||||||
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
|
pub fn apply(&mut self, tr: Transition) -> Result<(), Error> {
|
||||||
let new_state = match (&self, tr) {
|
tracing::debug!(state=%self, transition=%tr, "try change state");
|
||||||
(_s, Transition::None) => return Ok(()),
|
|
||||||
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
|
|
||||||
(
|
|
||||||
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
|
|
||||||
Transition::Select(m),
|
|
||||||
) => State::Selected(u.clone(), m),
|
|
||||||
(
|
|
||||||
State::Authenticated(u) | State::Selected(u, _) | State::Examined(u, _),
|
|
||||||
Transition::Examine(m),
|
|
||||||
) => State::Examined(u.clone(), m),
|
|
||||||
(State::Selected(u, _) | State::Examined(u, _), Transition::Unselect) => {
|
|
||||||
State::Authenticated(u.clone())
|
|
||||||
}
|
|
||||||
(_, Transition::Logout) => State::Logout,
|
|
||||||
_ => return Err(Error::ForbiddenTransition),
|
|
||||||
};
|
|
||||||
|
|
||||||
|
let new_state = match (std::mem::replace(self, State::Logout), tr) {
|
||||||
|
(s, Transition::None) => s,
|
||||||
|
(State::NotAuthenticated, Transition::Authenticate(u)) => State::Authenticated(u),
|
||||||
|
(State::Authenticated(u) | State::Selected(u, _, _), Transition::Select(m, p)) => {
|
||||||
|
State::Selected(u, m, p)
|
||||||
|
}
|
||||||
|
(State::Selected(u, _, _), Transition::Unselect) => State::Authenticated(u.clone()),
|
||||||
|
(State::Selected(u, m, p), Transition::Idle(t, s)) => {
|
||||||
|
State::Idle(u, m, p, t, Arc::new(s))
|
||||||
|
}
|
||||||
|
(State::Idle(u, m, p, _, _), Transition::UnIdle) => State::Selected(u, m, p),
|
||||||
|
(_, Transition::Logout) => State::Logout,
|
||||||
|
(s, t) => {
|
||||||
|
tracing::error!(state=%s, transition=%t, "forbidden transition");
|
||||||
|
return Err(Error::ForbiddenTransition);
|
||||||
|
}
|
||||||
|
};
|
||||||
*self = new_state;
|
*self = new_state;
|
||||||
|
tracing::debug!(state=%self, "transition succeeded");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,6 +1,6 @@
|
||||||
|
use std::collections::HashSet;
|
||||||
use std::num::{NonZeroU32, NonZeroU64};
|
use std::num::{NonZeroU32, NonZeroU64};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::collections::HashSet;
|
|
||||||
|
|
||||||
use anyhow::{anyhow, Error, Result};
|
use anyhow::{anyhow, Error, Result};
|
||||||
|
|
||||||
|
@ -13,11 +13,11 @@ use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
|
||||||
use imap_codec::imap_types::search::SearchKey;
|
use imap_codec::imap_types::search::SearchKey;
|
||||||
use imap_codec::imap_types::sequence::SequenceSet;
|
use imap_codec::imap_types::sequence::SequenceSet;
|
||||||
|
|
||||||
use crate::mail::unique_ident::UniqueIdent;
|
|
||||||
use crate::mail::mailbox::Mailbox;
|
use crate::mail::mailbox::Mailbox;
|
||||||
use crate::mail::query::QueryScope;
|
use crate::mail::query::QueryScope;
|
||||||
use crate::mail::snapshot::FrozenMailbox;
|
use crate::mail::snapshot::FrozenMailbox;
|
||||||
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
|
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
|
||||||
|
use crate::mail::unique_ident::UniqueIdent;
|
||||||
|
|
||||||
use crate::imap::attributes::AttributesProxy;
|
use crate::imap::attributes::AttributesProxy;
|
||||||
use crate::imap::flags;
|
use crate::imap::flags;
|
||||||
|
@ -130,11 +130,9 @@ impl MailboxView {
|
||||||
let new_mail = new_snapshot.table.get(uuid);
|
let new_mail = new_snapshot.table.get(uuid);
|
||||||
if old_mail.is_some() && old_mail != new_mail {
|
if old_mail.is_some() && old_mail != new_mail {
|
||||||
if let Some((uid, modseq, flags)) = new_mail {
|
if let Some((uid, modseq, flags)) = new_mail {
|
||||||
let mut items = vec![
|
let mut items = vec![MessageDataItem::Flags(
|
||||||
MessageDataItem::Flags(
|
|
||||||
flags.iter().filter_map(|f| flags::from_str(f)).collect(),
|
flags.iter().filter_map(|f| flags::from_str(f)).collect(),
|
||||||
),
|
)];
|
||||||
];
|
|
||||||
|
|
||||||
if params.with_uid {
|
if params.with_uid {
|
||||||
items.push(MessageDataItem::Uid(*uid));
|
items.push(MessageDataItem::Uid(*uid));
|
||||||
|
@ -188,8 +186,8 @@ impl MailboxView {
|
||||||
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
|
let flags = flags.iter().map(|x| x.to_string()).collect::<Vec<_>>();
|
||||||
|
|
||||||
let idx = self.index()?;
|
let idx = self.index()?;
|
||||||
let (editable, in_conflict) = idx
|
let (editable, in_conflict) =
|
||||||
.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
|
idx.fetch_unchanged_since(sequence_set, unchanged_since, *is_uid_store)?;
|
||||||
|
|
||||||
for mi in editable.iter() {
|
for mi in editable.iter() {
|
||||||
match kind {
|
match kind {
|
||||||
|
@ -215,15 +213,30 @@ impl MailboxView {
|
||||||
_ => in_conflict.into_iter().map(|midx| midx.i).collect(),
|
_ => in_conflict.into_iter().map(|midx| midx.i).collect(),
|
||||||
};
|
};
|
||||||
|
|
||||||
let summary = self.update(UpdateParameters {
|
let summary = self
|
||||||
|
.update(UpdateParameters {
|
||||||
with_uid: *is_uid_store,
|
with_uid: *is_uid_store,
|
||||||
with_modseq: unchanged_since.is_some(),
|
with_modseq: unchanged_since.is_some(),
|
||||||
silence,
|
silence,
|
||||||
}).await?;
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok((summary, conflict_id_or_uid))
|
Ok((summary, conflict_id_or_uid))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn idle_sync(&mut self) -> Result<Vec<Body<'static>>> {
|
||||||
|
self.internal
|
||||||
|
.mailbox
|
||||||
|
.notify()
|
||||||
|
.await
|
||||||
|
.upgrade()
|
||||||
|
.ok_or(anyhow!("test"))?
|
||||||
|
.notified()
|
||||||
|
.await;
|
||||||
|
self.internal.mailbox.opportunistic_sync().await?;
|
||||||
|
self.update(UpdateParameters::default()).await
|
||||||
|
}
|
||||||
|
|
||||||
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
|
pub async fn expunge(&mut self) -> Result<Vec<Body<'static>>> {
|
||||||
self.internal.sync().await?;
|
self.internal.sync().await?;
|
||||||
let state = self.internal.peek().await;
|
let state = self.internal.peek().await;
|
||||||
|
@ -294,10 +307,12 @@ impl MailboxView {
|
||||||
ret.push((mi.uid, dest_uid));
|
ret.push((mi.uid, dest_uid));
|
||||||
}
|
}
|
||||||
|
|
||||||
let update = self.update(UpdateParameters {
|
let update = self
|
||||||
|
.update(UpdateParameters {
|
||||||
with_uid: *is_uid_copy,
|
with_uid: *is_uid_copy,
|
||||||
..UpdateParameters::default()
|
..UpdateParameters::default()
|
||||||
}).await?;
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok((to_state.uidvalidity, ret, update))
|
Ok((to_state.uidvalidity, ret, update))
|
||||||
}
|
}
|
||||||
|
@ -321,11 +336,7 @@ impl MailboxView {
|
||||||
};
|
};
|
||||||
tracing::debug!("Query scope {:?}", query_scope);
|
tracing::debug!("Query scope {:?}", query_scope);
|
||||||
let idx = self.index()?;
|
let idx = self.index()?;
|
||||||
let mail_idx_list = idx.fetch_changed_since(
|
let mail_idx_list = idx.fetch_changed_since(sequence_set, changed_since, *is_uid_fetch)?;
|
||||||
sequence_set,
|
|
||||||
changed_since,
|
|
||||||
*is_uid_fetch
|
|
||||||
)?;
|
|
||||||
|
|
||||||
// [2/6] Fetch the emails
|
// [2/6] Fetch the emails
|
||||||
let uuids = mail_idx_list
|
let uuids = mail_idx_list
|
||||||
|
@ -414,12 +425,19 @@ impl MailboxView {
|
||||||
let maybe_modseq = match is_modseq {
|
let maybe_modseq = match is_modseq {
|
||||||
true => {
|
true => {
|
||||||
let final_selection = kept_idx.iter().chain(kept_query.iter());
|
let final_selection = kept_idx.iter().chain(kept_query.iter());
|
||||||
final_selection.map(|in_idx| in_idx.modseq).max().map(|r| NonZeroU64::try_from(r)).transpose()?
|
final_selection
|
||||||
},
|
.map(|in_idx| in_idx.modseq)
|
||||||
|
.max()
|
||||||
|
.map(|r| NonZeroU64::try_from(r))
|
||||||
|
.transpose()?
|
||||||
|
}
|
||||||
_ => None,
|
_ => None,
|
||||||
};
|
};
|
||||||
|
|
||||||
Ok((vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))], is_modseq))
|
Ok((
|
||||||
|
vec![Body::Data(Data::Search(selection_fmt, maybe_modseq))],
|
||||||
|
is_modseq,
|
||||||
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
@ -464,7 +482,9 @@ impl MailboxView {
|
||||||
pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> {
|
pub(crate) fn highestmodseq_status(&self) -> Result<Body<'static>> {
|
||||||
Ok(Body::Status(Status::ok(
|
Ok(Body::Status(Status::ok(
|
||||||
None,
|
None,
|
||||||
Some(Code::Other(CodeOther::unvalidated(format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes()))),
|
Some(Code::Other(CodeOther::unvalidated(
|
||||||
|
format!("HIGHESTMODSEQ {}", self.highestmodseq()).into_bytes(),
|
||||||
|
))),
|
||||||
"Highest",
|
"Highest",
|
||||||
)?))
|
)?))
|
||||||
}
|
}
|
||||||
|
|
235
src/imap/mod.rs
235
src/imap/mod.rs
|
@ -8,24 +8,30 @@ mod index;
|
||||||
mod mail_view;
|
mod mail_view;
|
||||||
mod mailbox_view;
|
mod mailbox_view;
|
||||||
mod mime_view;
|
mod mime_view;
|
||||||
|
mod request;
|
||||||
mod response;
|
mod response;
|
||||||
mod search;
|
mod search;
|
||||||
mod session;
|
mod session;
|
||||||
|
|
||||||
use std::net::SocketAddr;
|
use std::net::SocketAddr;
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::{bail, Result};
|
||||||
use futures::stream::{FuturesUnordered, StreamExt};
|
use futures::stream::{FuturesUnordered, StreamExt};
|
||||||
|
|
||||||
use tokio::net::TcpListener;
|
use tokio::net::TcpListener;
|
||||||
|
use tokio::sync::mpsc;
|
||||||
use tokio::sync::watch;
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
|
||||||
use imap_codec::imap_types::{core::Text, response::Greeting};
|
use imap_codec::imap_types::{core::Text, response::Greeting};
|
||||||
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
|
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
|
||||||
use imap_flow::stream::AnyStream;
|
use imap_flow::stream::AnyStream;
|
||||||
|
|
||||||
use crate::config::ImapConfig;
|
use crate::config::ImapConfig;
|
||||||
use crate::imap::capability::ServerCapability;
|
use crate::imap::capability::ServerCapability;
|
||||||
|
use crate::imap::request::Request;
|
||||||
|
use crate::imap::response::{Body, ResponseOrIdle};
|
||||||
|
use crate::imap::session::Instance;
|
||||||
use crate::login::ArcLoginProvider;
|
use crate::login::ArcLoginProvider;
|
||||||
|
|
||||||
/// Server is a thin wrapper to register our Services in BàL
|
/// Server is a thin wrapper to register our Services in BàL
|
||||||
|
@ -35,8 +41,8 @@ pub struct Server {
|
||||||
capabilities: ServerCapability,
|
capabilities: ServerCapability,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Clone)]
|
||||||
struct ClientContext {
|
struct ClientContext {
|
||||||
stream: AnyStream,
|
|
||||||
addr: SocketAddr,
|
addr: SocketAddr,
|
||||||
login_provider: ArcLoginProvider,
|
login_provider: ArcLoginProvider,
|
||||||
must_exit: watch::Receiver<bool>,
|
must_exit: watch::Receiver<bool>,
|
||||||
|
@ -74,13 +80,12 @@ impl Server {
|
||||||
tracing::info!("IMAP: accepted connection from {}", remote_addr);
|
tracing::info!("IMAP: accepted connection from {}", remote_addr);
|
||||||
|
|
||||||
let client = ClientContext {
|
let client = ClientContext {
|
||||||
stream: AnyStream::new(socket),
|
|
||||||
addr: remote_addr.clone(),
|
addr: remote_addr.clone(),
|
||||||
login_provider: self.login_provider.clone(),
|
login_provider: self.login_provider.clone(),
|
||||||
must_exit: must_exit.clone(),
|
must_exit: must_exit.clone(),
|
||||||
server_capabilities: self.capabilities.clone(),
|
server_capabilities: self.capabilities.clone(),
|
||||||
};
|
};
|
||||||
let conn = tokio::spawn(client_wrapper(client));
|
let conn = tokio::spawn(NetLoop::handler(client, AnyStream::new(socket)));
|
||||||
connections.push(conn);
|
connections.push(conn);
|
||||||
}
|
}
|
||||||
drop(tcp);
|
drop(tcp);
|
||||||
|
@ -92,22 +97,53 @@ impl Server {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn client_wrapper(ctx: ClientContext) {
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::mpsc::*;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
use tokio_util::bytes::BytesMut;
|
||||||
|
enum LoopMode {
|
||||||
|
Quit,
|
||||||
|
Interactive,
|
||||||
|
Idle(BytesMut, Arc<Notify>),
|
||||||
|
}
|
||||||
|
|
||||||
|
// @FIXME a full refactor of this part of the code will be needed sooner or later
|
||||||
|
struct NetLoop {
|
||||||
|
ctx: ClientContext,
|
||||||
|
server: ServerFlow,
|
||||||
|
cmd_tx: Sender<Request>,
|
||||||
|
resp_rx: UnboundedReceiver<ResponseOrIdle>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NetLoop {
|
||||||
|
async fn handler(ctx: ClientContext, sock: AnyStream) {
|
||||||
let addr = ctx.addr.clone();
|
let addr = ctx.addr.clone();
|
||||||
match client(ctx).await {
|
|
||||||
Ok(()) => {
|
let nl = match Self::new(ctx, sock).await {
|
||||||
tracing::debug!("closing successful session for {:?}", addr);
|
Ok(nl) => {
|
||||||
|
tracing::debug!(addr=?addr, "netloop successfully initialized");
|
||||||
|
nl
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::error!("closing errored session for {:?}: {}", addr, e);
|
tracing::error!(addr=?addr, err=?e, "netloop can not be initialized, closing session");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
match nl.core().await {
|
||||||
|
Ok(()) => {
|
||||||
|
tracing::debug!("closing successful netloop core for {:?}", addr);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!("closing errored netloop core for {:?}: {}", addr, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn client(mut ctx: ClientContext) -> Result<()> {
|
async fn new(mut ctx: ClientContext, sock: AnyStream) -> Result<Self> {
|
||||||
// Send greeting
|
// Send greeting
|
||||||
let (mut server, _) = ServerFlow::send_greeting(
|
let (mut server, _) = ServerFlow::send_greeting(
|
||||||
ctx.stream,
|
sock,
|
||||||
ServerFlowOptions {
|
ServerFlowOptions {
|
||||||
crlf_relaxed: false,
|
crlf_relaxed: false,
|
||||||
literal_accept_text: Text::unvalidated("OK"),
|
literal_accept_text: Text::unvalidated("OK"),
|
||||||
|
@ -122,16 +158,26 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
use crate::imap::response::{Body, Response as MyResponse};
|
// Start a mailbox session in background
|
||||||
use crate::imap::session::Instance;
|
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Request>(3);
|
||||||
use imap_codec::imap_types::command::Command;
|
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<ResponseOrIdle>();
|
||||||
use imap_codec::imap_types::response::{Code, Response, Status};
|
tokio::spawn(Self::session(ctx.clone(), cmd_rx, resp_tx));
|
||||||
|
|
||||||
use tokio::sync::mpsc;
|
// Return the object
|
||||||
let (cmd_tx, mut cmd_rx) = mpsc::channel::<Command<'static>>(10);
|
Ok(NetLoop {
|
||||||
let (resp_tx, mut resp_rx) = mpsc::unbounded_channel::<MyResponse<'static>>();
|
ctx,
|
||||||
|
server,
|
||||||
|
cmd_tx,
|
||||||
|
resp_rx,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
let bckgrnd = tokio::spawn(async move {
|
/// Coms with the background session
|
||||||
|
async fn session(
|
||||||
|
ctx: ClientContext,
|
||||||
|
mut cmd_rx: Receiver<Request>,
|
||||||
|
resp_tx: UnboundedSender<ResponseOrIdle>,
|
||||||
|
) -> () {
|
||||||
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
|
let mut session = Instance::new(ctx.login_provider, ctx.server_capabilities);
|
||||||
loop {
|
loop {
|
||||||
let cmd = match cmd_rx.recv().await {
|
let cmd = match cmd_rx.recv().await {
|
||||||
|
@ -140,8 +186,8 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
|
||||||
};
|
};
|
||||||
|
|
||||||
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
|
tracing::debug!(cmd=?cmd, sock=%ctx.addr, "command");
|
||||||
let maybe_response = session.command(cmd).await;
|
let maybe_response = session.request(cmd).await;
|
||||||
tracing::debug!(cmd=?maybe_response.completion, sock=%ctx.addr, "response");
|
tracing::debug!(cmd=?maybe_response, sock=%ctx.addr, "response");
|
||||||
|
|
||||||
match resp_tx.send(maybe_response) {
|
match resp_tx.send(maybe_response) {
|
||||||
Err(_) => break,
|
Err(_) => break,
|
||||||
|
@ -149,67 +195,150 @@ async fn client(mut ctx: ClientContext) -> Result<()> {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
tracing::info!("runner is quitting");
|
tracing::info!("runner is quitting");
|
||||||
});
|
}
|
||||||
|
|
||||||
// Main loop
|
async fn core(mut self) -> Result<()> {
|
||||||
|
let mut mode = LoopMode::Interactive;
|
||||||
loop {
|
loop {
|
||||||
|
mode = match mode {
|
||||||
|
LoopMode::Interactive => self.interactive_mode().await?,
|
||||||
|
LoopMode::Idle(buff, stop) => self.idle_mode(buff, stop).await?,
|
||||||
|
LoopMode::Quit => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn interactive_mode(&mut self) -> Result<LoopMode> {
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// Managing imap_flow stuff
|
// Managing imap_flow stuff
|
||||||
srv_evt = server.progress() => match srv_evt? {
|
srv_evt = self.server.progress() => match srv_evt? {
|
||||||
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
|
ServerFlowEvent::ResponseSent { handle: _handle, response } => {
|
||||||
match response {
|
match response {
|
||||||
Response::Status(Status::Bye(_)) => break,
|
Response::Status(Status::Bye(_)) => return Ok(LoopMode::Quit),
|
||||||
_ => tracing::trace!("sent to {} content {:?}", ctx.addr, response),
|
_ => tracing::trace!("sent to {} content {:?}", self.ctx.addr, response),
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
ServerFlowEvent::CommandReceived { command } => {
|
ServerFlowEvent::CommandReceived { command } => {
|
||||||
match cmd_tx.try_send(command) {
|
match self.cmd_tx.try_send(Request::ImapCommand(command)) {
|
||||||
Ok(_) => (),
|
Ok(_) => (),
|
||||||
Err(mpsc::error::TrySendError::Full(_)) => {
|
Err(mpsc::error::TrySendError::Full(_)) => {
|
||||||
server.enqueue_status(Status::bye(None, "Too fast").unwrap());
|
self.server.enqueue_status(Status::bye(None, "Too fast").unwrap());
|
||||||
tracing::error!("client {:?} is sending commands too fast, closing.", ctx.addr);
|
tracing::error!("client {:?} is sending commands too fast, closing.", self.ctx.addr);
|
||||||
}
|
}
|
||||||
_ => {
|
_ => {
|
||||||
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
||||||
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
|
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
flow => {
|
flow => {
|
||||||
server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
|
self.server.enqueue_status(Status::bye(None, "Unsupported server flow event").unwrap());
|
||||||
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", ctx.addr, flow);
|
tracing::error!("session task exited for {:?} due to unsupported flow {:?}", self.ctx.addr, flow);
|
||||||
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
||||||
// Managing response generated by Aerogramme
|
// Managing response generated by Aerogramme
|
||||||
maybe_msg = resp_rx.recv() => {
|
maybe_msg = self.resp_rx.recv() => match maybe_msg {
|
||||||
let response = match maybe_msg {
|
Some(ResponseOrIdle::Response(response)) => {
|
||||||
None => {
|
|
||||||
server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
|
||||||
tracing::error!("session task exited for {:?}, quitting", ctx.addr);
|
|
||||||
continue
|
|
||||||
},
|
|
||||||
Some(r) => r,
|
|
||||||
};
|
|
||||||
|
|
||||||
for body_elem in response.body.into_iter() {
|
for body_elem in response.body.into_iter() {
|
||||||
let _handle = match body_elem {
|
let _handle = match body_elem {
|
||||||
Body::Data(d) => server.enqueue_data(d),
|
Body::Data(d) => self.server.enqueue_data(d),
|
||||||
Body::Status(s) => server.enqueue_status(s),
|
Body::Status(s) => self.server.enqueue_status(s),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
server.enqueue_status(response.completion);
|
self.server.enqueue_status(response.completion);
|
||||||
|
},
|
||||||
|
Some(ResponseOrIdle::StartIdle(stop)) => {
|
||||||
|
let cr = CommandContinuationRequest::basic(None, "Idling")?;
|
||||||
|
self.server.enqueue_continuation(cr);
|
||||||
|
self.cmd_tx.try_send(Request::Idle)?;
|
||||||
|
return Ok(LoopMode::Idle(BytesMut::new(), stop))
|
||||||
|
},
|
||||||
|
None => {
|
||||||
|
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
||||||
|
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
|
||||||
|
},
|
||||||
|
Some(_) => unreachable!(),
|
||||||
|
|
||||||
},
|
},
|
||||||
|
|
||||||
// When receiving a CTRL+C
|
// When receiving a CTRL+C
|
||||||
_ = ctx.must_exit.changed() => {
|
_ = self.ctx.must_exit.changed() => {
|
||||||
server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
|
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
|
||||||
|
},
|
||||||
|
};
|
||||||
|
Ok(LoopMode::Interactive)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn idle_mode(&mut self, mut buff: BytesMut, stop: Arc<Notify>) -> Result<LoopMode> {
|
||||||
|
// Flush send
|
||||||
|
loop {
|
||||||
|
match self.server.progress_send().await? {
|
||||||
|
Some(..) => continue,
|
||||||
|
None => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
// Receiving IDLE event from background
|
||||||
|
maybe_msg = self.resp_rx.recv() => match maybe_msg {
|
||||||
|
// Session decided idle is terminated
|
||||||
|
Some(ResponseOrIdle::Response(response)) => {
|
||||||
|
for body_elem in response.body.into_iter() {
|
||||||
|
let _handle = match body_elem {
|
||||||
|
Body::Data(d) => self.server.enqueue_data(d),
|
||||||
|
Body::Status(s) => self.server.enqueue_status(s),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
self.server.enqueue_status(response.completion);
|
||||||
|
return Ok(LoopMode::Interactive)
|
||||||
|
},
|
||||||
|
// Session has some information for user
|
||||||
|
Some(ResponseOrIdle::IdleEvent(elems)) => {
|
||||||
|
for body_elem in elems.into_iter() {
|
||||||
|
let _handle = match body_elem {
|
||||||
|
Body::Data(d) => self.server.enqueue_data(d),
|
||||||
|
Body::Status(s) => self.server.enqueue_status(s),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
self.cmd_tx.try_send(Request::Idle)?;
|
||||||
|
return Ok(LoopMode::Idle(buff, stop))
|
||||||
|
},
|
||||||
|
|
||||||
|
// Session crashed
|
||||||
|
None => {
|
||||||
|
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
|
||||||
|
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
|
||||||
|
return Ok(LoopMode::Interactive)
|
||||||
|
},
|
||||||
|
|
||||||
|
// Session can't start idling while already idling, it's a logic error!
|
||||||
|
Some(ResponseOrIdle::StartIdle(..)) => bail!("can't start idling while already idling!"),
|
||||||
|
},
|
||||||
|
|
||||||
|
// User is trying to interact with us
|
||||||
|
_read_client_bytes = self.server.stream.read(&mut buff) => {
|
||||||
|
use imap_codec::decode::Decoder;
|
||||||
|
let codec = imap_codec::IdleDoneCodec::new();
|
||||||
|
match codec.decode(&buff) {
|
||||||
|
Ok(([], imap_codec::imap_types::extensions::idle::IdleDone)) => {
|
||||||
|
// Session will be informed that it must stop idle
|
||||||
|
// It will generate the "done" message and change the loop mode
|
||||||
|
stop.notify_one()
|
||||||
|
},
|
||||||
|
Err(_) => (),
|
||||||
|
_ => bail!("Client sent data after terminating the continuation without waiting for the server. This is an unsupported behavior and bug in Aerogramme, quitting."),
|
||||||
|
};
|
||||||
|
|
||||||
|
return Ok(LoopMode::Idle(buff, stop))
|
||||||
|
},
|
||||||
|
|
||||||
|
// When receiving a CTRL+C
|
||||||
|
_ = self.ctx.must_exit.changed() => {
|
||||||
|
self.server.enqueue_status(Status::bye(None, "Server is being shutdown").unwrap());
|
||||||
|
return Ok(LoopMode::Interactive)
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
drop(cmd_tx);
|
|
||||||
bckgrnd.await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
7
src/imap/request.rs
Normal file
7
src/imap/request.rs
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
use imap_codec::imap_types::command::Command;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum Request {
|
||||||
|
ImapCommand(Command<'static>),
|
||||||
|
Idle,
|
||||||
|
}
|
|
@ -2,7 +2,10 @@ use anyhow::Result;
|
||||||
use imap_codec::imap_types::command::Command;
|
use imap_codec::imap_types::command::Command;
|
||||||
use imap_codec::imap_types::core::Tag;
|
use imap_codec::imap_types::core::Tag;
|
||||||
use imap_codec::imap_types::response::{Code, Data, Status};
|
use imap_codec::imap_types::response::{Code, Data, Status};
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub enum Body<'a> {
|
pub enum Body<'a> {
|
||||||
Data(Data<'a>),
|
Data(Data<'a>),
|
||||||
Status(Status<'a>),
|
Status(Status<'a>),
|
||||||
|
@ -88,6 +91,7 @@ impl<'a> ResponseBuilder<'a> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
pub struct Response<'a> {
|
pub struct Response<'a> {
|
||||||
pub body: Vec<Body<'a>>,
|
pub body: Vec<Body<'a>>,
|
||||||
pub completion: Status<'a>,
|
pub completion: Status<'a>,
|
||||||
|
@ -110,3 +114,10 @@ impl<'a> Response<'a> {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub enum ResponseOrIdle {
|
||||||
|
Response(Response<'static>),
|
||||||
|
StartIdle(Arc<Notify>),
|
||||||
|
IdleEvent(Vec<Body<'static>>),
|
||||||
|
}
|
||||||
|
|
|
@ -2,7 +2,7 @@ use std::num::{NonZeroU32, NonZeroU64};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use imap_codec::imap_types::core::NonEmptyVec;
|
use imap_codec::imap_types::core::NonEmptyVec;
|
||||||
use imap_codec::imap_types::search::{SearchKey, MetadataItemSearch};
|
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
|
||||||
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
|
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
|
||||||
|
|
||||||
use crate::imap::index::MailIndex;
|
use crate::imap::index::MailIndex;
|
||||||
|
@ -115,7 +115,10 @@ impl<'a> Criteria<'a> {
|
||||||
pub fn is_modseq(&self) -> bool {
|
pub fn is_modseq(&self) -> bool {
|
||||||
use SearchKey::*;
|
use SearchKey::*;
|
||||||
match self.0 {
|
match self.0 {
|
||||||
And(and_list) => and_list.as_ref().iter().any(|child| Criteria(child).is_modseq()),
|
And(and_list) => and_list
|
||||||
|
.as_ref()
|
||||||
|
.iter()
|
||||||
|
.any(|child| Criteria(child).is_modseq()),
|
||||||
Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(),
|
Or(left, right) => Criteria(left).is_modseq() || Criteria(right).is_modseq(),
|
||||||
Not(child) => Criteria(child).is_modseq(),
|
Not(child) => Criteria(child).is_modseq(),
|
||||||
ModSeq { .. } => true,
|
ModSeq { .. } => true,
|
||||||
|
@ -187,7 +190,10 @@ impl<'a> Criteria<'a> {
|
||||||
// Sequence logic
|
// Sequence logic
|
||||||
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(),
|
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, midx).into(),
|
||||||
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(),
|
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, midx).into(),
|
||||||
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, midx).into(),
|
ModSeq {
|
||||||
|
metadata_item,
|
||||||
|
modseq,
|
||||||
|
} => is_keep_modseq(metadata_item, modseq, midx).into(),
|
||||||
|
|
||||||
// All the stuff we can't evaluate yet
|
// All the stuff we can't evaluate yet
|
||||||
Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_)
|
Bcc(_) | Cc(_) | From(_) | Header(..) | SentBefore(_) | SentOn(_) | SentSince(_)
|
||||||
|
@ -225,7 +231,10 @@ impl<'a> Criteria<'a> {
|
||||||
//@FIXME Reevaluating our previous logic...
|
//@FIXME Reevaluating our previous logic...
|
||||||
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx),
|
maybe_seq if is_sk_seq(maybe_seq) => is_keep_seq(maybe_seq, &mail_view.in_idx),
|
||||||
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx),
|
maybe_flag if is_sk_flag(maybe_flag) => is_keep_flag(maybe_flag, &mail_view.in_idx),
|
||||||
ModSeq { metadata_item , modseq } => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
|
ModSeq {
|
||||||
|
metadata_item,
|
||||||
|
modseq,
|
||||||
|
} => is_keep_modseq(metadata_item, modseq, &mail_view.in_idx).into(),
|
||||||
|
|
||||||
// Filter on mail meta
|
// Filter on mail meta
|
||||||
Before(search_naive) => match mail_view.stored_naive_date() {
|
Before(search_naive) => match mail_view.stored_naive_date() {
|
||||||
|
@ -473,7 +482,11 @@ fn is_keep_seq(sk: &SearchKey, midx: &MailIndex) -> bool {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_keep_modseq(filter: &Option<MetadataItemSearch>, modseq: &NonZeroU64, midx: &MailIndex) -> bool {
|
fn is_keep_modseq(
|
||||||
|
filter: &Option<MetadataItemSearch>,
|
||||||
|
modseq: &NonZeroU64,
|
||||||
|
midx: &MailIndex,
|
||||||
|
) -> bool {
|
||||||
if filter.is_some() {
|
if filter.is_some() {
|
||||||
tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet");
|
tracing::warn!(filter=?filter, "Ignoring search metadata filter as it's not supported yet");
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,8 +1,10 @@
|
||||||
use crate::imap::capability::{ClientCapability, ServerCapability};
|
use crate::imap::capability::{ClientCapability, ServerCapability};
|
||||||
use crate::imap::command::{anonymous, authenticated, examined, selected};
|
use crate::imap::command::{anonymous, authenticated, selected};
|
||||||
use crate::imap::flow;
|
use crate::imap::flow;
|
||||||
use crate::imap::response::Response;
|
use crate::imap::request::Request;
|
||||||
|
use crate::imap::response::{Response, ResponseOrIdle};
|
||||||
use crate::login::ArcLoginProvider;
|
use crate::login::ArcLoginProvider;
|
||||||
|
use anyhow::{anyhow, bail, Result};
|
||||||
use imap_codec::imap_types::command::Command;
|
use imap_codec::imap_types::command::Command;
|
||||||
|
|
||||||
//-----
|
//-----
|
||||||
|
@ -23,7 +25,45 @@ impl Instance {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn command(&mut self, cmd: Command<'static>) -> Response<'static> {
|
pub async fn request(&mut self, req: Request) -> ResponseOrIdle {
|
||||||
|
match req {
|
||||||
|
Request::Idle => self.idle().await,
|
||||||
|
Request::ImapCommand(cmd) => self.command(cmd).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn idle(&mut self) -> ResponseOrIdle {
|
||||||
|
match self.idle_happy().await {
|
||||||
|
Ok(r) => r,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::error!(err=?e, "something bad happened in idle");
|
||||||
|
ResponseOrIdle::Response(Response::bye().unwrap())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn idle_happy(&mut self) -> Result<ResponseOrIdle> {
|
||||||
|
let (mbx, tag, stop) = match &mut self.state {
|
||||||
|
flow::State::Idle(_, ref mut mbx, _, tag, stop) => (mbx, tag.clone(), stop.clone()),
|
||||||
|
_ => bail!("Invalid session state, can't idle"),
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::select! {
|
||||||
|
_ = stop.notified() => {
|
||||||
|
self.state.apply(flow::Transition::UnIdle)?;
|
||||||
|
return Ok(ResponseOrIdle::Response(Response::build()
|
||||||
|
.tag(tag.clone())
|
||||||
|
.message("IDLE completed")
|
||||||
|
.ok()?))
|
||||||
|
},
|
||||||
|
change = mbx.idle_sync() => {
|
||||||
|
tracing::debug!("idle event");
|
||||||
|
return Ok(ResponseOrIdle::IdleEvent(change?));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn command(&mut self, cmd: Command<'static>) -> ResponseOrIdle {
|
||||||
// Command behavior is modulated by the state.
|
// Command behavior is modulated by the state.
|
||||||
// To prevent state error, we handle the same command in separate code paths.
|
// To prevent state error, we handle the same command in separate code paths.
|
||||||
let (resp, tr) = match &mut self.state {
|
let (resp, tr) = match &mut self.state {
|
||||||
|
@ -44,26 +84,18 @@ impl Instance {
|
||||||
};
|
};
|
||||||
authenticated::dispatch(ctx).await
|
authenticated::dispatch(ctx).await
|
||||||
}
|
}
|
||||||
flow::State::Selected(ref user, ref mut mailbox) => {
|
flow::State::Selected(ref user, ref mut mailbox, ref perm) => {
|
||||||
let ctx = selected::SelectedContext {
|
let ctx = selected::SelectedContext {
|
||||||
req: &cmd,
|
req: &cmd,
|
||||||
server_capabilities: &self.server_capabilities,
|
server_capabilities: &self.server_capabilities,
|
||||||
client_capabilities: &mut self.client_capabilities,
|
client_capabilities: &mut self.client_capabilities,
|
||||||
user,
|
user,
|
||||||
mailbox,
|
mailbox,
|
||||||
|
perm,
|
||||||
};
|
};
|
||||||
selected::dispatch(ctx).await
|
selected::dispatch(ctx).await
|
||||||
}
|
}
|
||||||
flow::State::Examined(ref user, ref mut mailbox) => {
|
flow::State::Idle(..) => Err(anyhow!("can not receive command while idling")),
|
||||||
let ctx = examined::ExaminedContext {
|
|
||||||
req: &cmd,
|
|
||||||
server_capabilities: &self.server_capabilities,
|
|
||||||
client_capabilities: &mut self.client_capabilities,
|
|
||||||
user,
|
|
||||||
mailbox,
|
|
||||||
};
|
|
||||||
examined::dispatch(ctx).await
|
|
||||||
}
|
|
||||||
flow::State::Logout => Response::build()
|
flow::State::Logout => Response::build()
|
||||||
.tag(cmd.tag.clone())
|
.tag(cmd.tag.clone())
|
||||||
.message("No commands are allowed in the LOGOUT state.")
|
.message("No commands are allowed in the LOGOUT state.")
|
||||||
|
@ -88,15 +120,18 @@ impl Instance {
|
||||||
e,
|
e,
|
||||||
cmd
|
cmd
|
||||||
);
|
);
|
||||||
return Response::build()
|
return ResponseOrIdle::Response(Response::build()
|
||||||
.to_req(&cmd)
|
.to_req(&cmd)
|
||||||
.message(
|
.message(
|
||||||
"Internal error, processing command triggered an illegal IMAP state transition",
|
"Internal error, processing command triggered an illegal IMAP state transition",
|
||||||
)
|
)
|
||||||
.bad()
|
.bad()
|
||||||
.unwrap();
|
.unwrap());
|
||||||
}
|
}
|
||||||
|
|
||||||
resp
|
match &self.state {
|
||||||
|
flow::State::Idle(_, _, _, _, n) => ResponseOrIdle::StartIdle(n.clone()),
|
||||||
|
_ => ResponseOrIdle::Response(resp),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -67,6 +67,11 @@ impl Mailbox {
|
||||||
self.mbox.write().await.opportunistic_sync().await
|
self.mbox.write().await.opportunistic_sync().await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Block until a sync has been done (due to changes in the event log)
|
||||||
|
pub async fn notify(&self) -> std::sync::Weak<tokio::sync::Notify> {
|
||||||
|
self.mbox.read().await.notifier()
|
||||||
|
}
|
||||||
|
|
||||||
// ---- Functions for reading the mailbox ----
|
// ---- Functions for reading the mailbox ----
|
||||||
|
|
||||||
/// Get a clone of the current UID Index of this mailbox
|
/// Get a clone of the current UID Index of this mailbox
|
||||||
|
@ -199,6 +204,10 @@ impl MailboxInternal {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn notifier(&self) -> std::sync::Weak<tokio::sync::Notify> {
|
||||||
|
self.uid_index.notifier()
|
||||||
|
}
|
||||||
|
|
||||||
// ---- Functions for reading the mailbox ----
|
// ---- Functions for reading the mailbox ----
|
||||||
|
|
||||||
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
|
||||||
|
|
|
@ -140,8 +140,7 @@ impl BayouState for UidIndex {
|
||||||
let bump_uid = new.internalseq.get() - uid.get();
|
let bump_uid = new.internalseq.get() - uid.get();
|
||||||
let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
|
let bump_modseq = (new.internalmodseq.get() - modseq.get()) as u32;
|
||||||
new.uidvalidity =
|
new.uidvalidity =
|
||||||
NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq)
|
NonZeroU32::new(new.uidvalidity.get() + bump_uid + bump_modseq).unwrap();
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Assign the real uid of the email
|
// Assign the real uid of the email
|
||||||
|
@ -179,10 +178,10 @@ impl BayouState for UidIndex {
|
||||||
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
|
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
|
||||||
// Bump UIDValidity if required
|
// Bump UIDValidity if required
|
||||||
if *candidate_modseq < new.internalmodseq {
|
if *candidate_modseq < new.internalmodseq {
|
||||||
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32;
|
let bump_modseq =
|
||||||
|
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
|
||||||
new.uidvalidity =
|
new.uidvalidity =
|
||||||
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
|
NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Add flags to the source of trust and the cache
|
// Add flags to the source of trust and the cache
|
||||||
|
@ -205,10 +204,10 @@ impl BayouState for UidIndex {
|
||||||
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
|
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
|
||||||
// Bump UIDValidity if required
|
// Bump UIDValidity if required
|
||||||
if *candidate_modseq < new.internalmodseq {
|
if *candidate_modseq < new.internalmodseq {
|
||||||
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32;
|
let bump_modseq =
|
||||||
|
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
|
||||||
new.uidvalidity =
|
new.uidvalidity =
|
||||||
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
|
NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove flags from the source of trust and the cache
|
// Remove flags from the source of trust and the cache
|
||||||
|
@ -228,10 +227,10 @@ impl BayouState for UidIndex {
|
||||||
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
|
if let Some((uid, email_modseq, existing_flags)) = new.table.get_mut(ident) {
|
||||||
// Bump UIDValidity if required
|
// Bump UIDValidity if required
|
||||||
if *candidate_modseq < new.internalmodseq {
|
if *candidate_modseq < new.internalmodseq {
|
||||||
let bump_modseq = (new.internalmodseq.get() - candidate_modseq.get()) as u32;
|
let bump_modseq =
|
||||||
|
(new.internalmodseq.get() - candidate_modseq.get()) as u32;
|
||||||
new.uidvalidity =
|
new.uidvalidity =
|
||||||
NonZeroU32::new(new.uidvalidity.get() + bump_modseq)
|
NonZeroU32::new(new.uidvalidity.get() + bump_modseq).unwrap();
|
||||||
.unwrap();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove flags from the source of trust and the cache
|
// Remove flags from the source of trust and the cache
|
||||||
|
@ -448,7 +447,12 @@ mod tests {
|
||||||
{
|
{
|
||||||
let m = UniqueIdent([0x03; 24]);
|
let m = UniqueIdent([0x03; 24]);
|
||||||
let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
|
let f = vec!["\\Archive".to_string(), "\\Recent".to_string()];
|
||||||
let ev = UidIndexOp::MailAdd(m, NonZeroU32::new(1).unwrap(), NonZeroU64::new(1).unwrap(), f);
|
let ev = UidIndexOp::MailAdd(
|
||||||
|
m,
|
||||||
|
NonZeroU32::new(1).unwrap(),
|
||||||
|
NonZeroU64::new(1).unwrap(),
|
||||||
|
f,
|
||||||
|
);
|
||||||
state = state.apply(&ev);
|
state = state.apply(&ev);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,8 +1,8 @@
|
||||||
use anyhow::Context;
|
use anyhow::Context;
|
||||||
|
|
||||||
mod common;
|
mod common;
|
||||||
use crate::common::fragments::*;
|
|
||||||
use crate::common::constants::*;
|
use crate::common::constants::*;
|
||||||
|
use crate::common::fragments::*;
|
||||||
|
|
||||||
fn main() {
|
fn main() {
|
||||||
rfc3501_imap4rev1_base();
|
rfc3501_imap4rev1_base();
|
||||||
|
@ -11,6 +11,7 @@ fn main() {
|
||||||
rfc6851_imapext_move();
|
rfc6851_imapext_move();
|
||||||
rfc7888_imapext_literal();
|
rfc7888_imapext_literal();
|
||||||
rfc4551_imapext_condstore();
|
rfc4551_imapext_condstore();
|
||||||
|
rfc2177_imapext_idle();
|
||||||
println!("✅ SUCCESS 🌟🚀🥳🙏🥹");
|
println!("✅ SUCCESS 🌟🚀🥳🙏🥹");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -21,18 +22,23 @@ fn rfc3501_imap4rev1_base() {
|
||||||
capability(imap_socket, Extension::None).context("check server capabilities")?;
|
capability(imap_socket, Extension::None).context("check server capabilities")?;
|
||||||
login(imap_socket, Account::Alice).context("login test")?;
|
login(imap_socket, Account::Alice).context("login test")?;
|
||||||
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
|
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
|
||||||
// UNSUBSCRIBE IS NOT IMPLEMENTED YET
|
let select_res =
|
||||||
//unsubscribe_mailbox(imap_socket).context("unsubscribe from archive")?;
|
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
||||||
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
|
||||||
assert!(select_res.contains("* 0 EXISTS"));
|
assert!(select_res.contains("* 0 EXISTS"));
|
||||||
|
|
||||||
check(imap_socket).context("check must run")?;
|
check(imap_socket).context("check must run")?;
|
||||||
status(imap_socket, Mailbox::Archive, StatusKind::UidNext).context("status of archive from inbox")?;
|
status(imap_socket, Mailbox::Archive, StatusKind::UidNext)
|
||||||
|
.context("status of archive from inbox")?;
|
||||||
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
|
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
|
||||||
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
|
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
|
||||||
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
|
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
|
||||||
|
|
||||||
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None)
|
let srv_msg = fetch(
|
||||||
|
imap_socket,
|
||||||
|
Selection::FirstId,
|
||||||
|
FetchKind::Rfc822,
|
||||||
|
FetchMod::None,
|
||||||
|
)
|
||||||
.context("fetch rfc822 message, should be our first message")?;
|
.context("fetch rfc822 message, should be our first message")?;
|
||||||
let orig_email = std::str::from_utf8(EMAIL1)?;
|
let orig_email = std::str::from_utf8(EMAIL1)?;
|
||||||
assert!(srv_msg.contains(orig_email));
|
assert!(srv_msg.contains(orig_email));
|
||||||
|
@ -42,7 +48,13 @@ fn rfc3501_imap4rev1_base() {
|
||||||
append_email(imap_socket, Email::Basic).context("insert email in INBOX")?;
|
append_email(imap_socket, Email::Basic).context("insert email in INBOX")?;
|
||||||
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
|
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
|
||||||
search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something");
|
search(imap_socket, SearchKind::Text("OoOoO")).expect("search should return something");
|
||||||
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None)
|
store(
|
||||||
|
imap_socket,
|
||||||
|
Selection::FirstId,
|
||||||
|
Flag::Deleted,
|
||||||
|
StoreAction::AddFlags,
|
||||||
|
StoreMod::None,
|
||||||
|
)
|
||||||
.context("should add delete flag to the email")?;
|
.context("should add delete flag to the email")?;
|
||||||
expunge(imap_socket).context("expunge emails")?;
|
expunge(imap_socket).context("expunge emails")?;
|
||||||
rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts)
|
rename_mailbox(imap_socket, Mailbox::Archive, Mailbox::Drafts)
|
||||||
|
@ -63,18 +75,31 @@ fn rfc3691_imapext_unselect() {
|
||||||
|
|
||||||
capability(imap_socket, Extension::Unselect).context("check server capabilities")?;
|
capability(imap_socket, Extension::Unselect).context("check server capabilities")?;
|
||||||
login(imap_socket, Account::Alice).context("login test")?;
|
login(imap_socket, Account::Alice).context("login test")?;
|
||||||
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
let select_res =
|
||||||
|
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
||||||
assert!(select_res.contains("* 0 EXISTS"));
|
assert!(select_res.contains("* 0 EXISTS"));
|
||||||
|
|
||||||
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
|
noop_exists(imap_socket, 1).context("noop loop must detect a new email")?;
|
||||||
store(imap_socket, Selection::FirstId, Flag::Deleted, StoreAction::AddFlags, StoreMod::None)
|
store(
|
||||||
|
imap_socket,
|
||||||
|
Selection::FirstId,
|
||||||
|
Flag::Deleted,
|
||||||
|
StoreAction::AddFlags,
|
||||||
|
StoreMod::None,
|
||||||
|
)
|
||||||
.context("add delete flags to the email")?;
|
.context("add delete flags to the email")?;
|
||||||
unselect(imap_socket)
|
unselect(imap_socket)
|
||||||
.context("unselect inbox while preserving email with the \\Delete flag")?;
|
.context("unselect inbox while preserving email with the \\Delete flag")?;
|
||||||
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?;
|
let select_res =
|
||||||
|
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox again")?;
|
||||||
assert!(select_res.contains("* 1 EXISTS"));
|
assert!(select_res.contains("* 1 EXISTS"));
|
||||||
|
|
||||||
let srv_msg = fetch(imap_socket, Selection::FirstId, FetchKind::Rfc822, FetchMod::None)
|
let srv_msg = fetch(
|
||||||
|
imap_socket,
|
||||||
|
Selection::FirstId,
|
||||||
|
FetchKind::Rfc822,
|
||||||
|
FetchMod::None,
|
||||||
|
)
|
||||||
.context("message is still present")?;
|
.context("message is still present")?;
|
||||||
let orig_email = std::str::from_utf8(EMAIL2)?;
|
let orig_email = std::str::from_utf8(EMAIL2)?;
|
||||||
assert!(srv_msg.contains(orig_email));
|
assert!(srv_msg.contains(orig_email));
|
||||||
|
@ -111,7 +136,8 @@ fn rfc6851_imapext_move() {
|
||||||
capability(imap_socket, Extension::Move).context("check server capabilities")?;
|
capability(imap_socket, Extension::Move).context("check server capabilities")?;
|
||||||
login(imap_socket, Account::Alice).context("login test")?;
|
login(imap_socket, Account::Alice).context("login test")?;
|
||||||
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
|
create_mailbox(imap_socket, Mailbox::Archive).context("created mailbox archive")?;
|
||||||
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
let select_res =
|
||||||
|
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
||||||
assert!(select_res.contains("* 0 EXISTS"));
|
assert!(select_res.contains("* 0 EXISTS"));
|
||||||
|
|
||||||
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
|
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
|
||||||
|
@ -123,7 +149,8 @@ fn rfc6851_imapext_move() {
|
||||||
|
|
||||||
unselect(imap_socket)
|
unselect(imap_socket)
|
||||||
.context("unselect inbox while preserving email with the \\Delete flag")?;
|
.context("unselect inbox while preserving email with the \\Delete flag")?;
|
||||||
let select_res = select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?;
|
let select_res =
|
||||||
|
select(imap_socket, Mailbox::Archive, SelectMod::None).context("select archive")?;
|
||||||
assert!(select_res.contains("* 1 EXISTS"));
|
assert!(select_res.contains("* 1 EXISTS"));
|
||||||
|
|
||||||
let srv_msg = fetch(
|
let srv_msg = fetch(
|
||||||
|
@ -131,7 +158,8 @@ fn rfc6851_imapext_move() {
|
||||||
Selection::FirstId,
|
Selection::FirstId,
|
||||||
FetchKind::Rfc822,
|
FetchKind::Rfc822,
|
||||||
FetchMod::None,
|
FetchMod::None,
|
||||||
).context("check mail exists")?;
|
)
|
||||||
|
.context("check mail exists")?;
|
||||||
let orig_email = std::str::from_utf8(EMAIL2)?;
|
let orig_email = std::str::from_utf8(EMAIL2)?;
|
||||||
assert!(srv_msg.contains(orig_email));
|
assert!(srv_msg.contains(orig_email));
|
||||||
|
|
||||||
|
@ -166,7 +194,8 @@ fn rfc4551_imapext_condstore() {
|
||||||
login(imap_socket, Account::Alice).context("login test")?;
|
login(imap_socket, Account::Alice).context("login test")?;
|
||||||
|
|
||||||
// RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE
|
// RFC 3.1.8. CONDSTORE Parameter to SELECT and EXAMINE
|
||||||
let select_res = select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?;
|
let select_res =
|
||||||
|
select(imap_socket, Mailbox::Inbox, SelectMod::Condstore).context("select inbox")?;
|
||||||
// RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE
|
// RFC 3.1.2 New OK Untagged Responses for SELECT and EXAMINE
|
||||||
assert!(select_res.contains("[HIGHESTMODSEQ 1]"));
|
assert!(select_res.contains("[HIGHESTMODSEQ 1]"));
|
||||||
|
|
||||||
|
@ -175,14 +204,25 @@ fn rfc4551_imapext_condstore() {
|
||||||
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
|
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
|
||||||
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
|
lmtp_deliver_email(lmtp_socket, Email::Multipart).context("mail delivered successfully")?;
|
||||||
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
|
noop_exists(imap_socket, 2).context("noop loop must detect a new email")?;
|
||||||
let store_res = store(imap_socket, Selection::All, Flag::Important, StoreAction::AddFlags, StoreMod::UnchangedSince(1))?;
|
let store_res = store(
|
||||||
|
imap_socket,
|
||||||
|
Selection::All,
|
||||||
|
Flag::Important,
|
||||||
|
StoreAction::AddFlags,
|
||||||
|
StoreMod::UnchangedSince(1),
|
||||||
|
)?;
|
||||||
assert!(store_res.contains("[MODIFIED 2]"));
|
assert!(store_res.contains("[MODIFIED 2]"));
|
||||||
assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))"));
|
assert!(store_res.contains("* 1 FETCH (FLAGS (\\Important) MODSEQ (3))"));
|
||||||
assert!(!store_res.contains("* 2 FETCH"));
|
assert!(!store_res.contains("* 2 FETCH"));
|
||||||
assert_eq!(store_res.lines().count(), 2);
|
assert_eq!(store_res.lines().count(), 2);
|
||||||
|
|
||||||
// RFC 3.1.4. FETCH and UID FETCH Commands
|
// RFC 3.1.4. FETCH and UID FETCH Commands
|
||||||
let fetch_res = fetch(imap_socket, Selection::All, FetchKind::Rfc822Size, FetchMod::ChangedSince(2))?;
|
let fetch_res = fetch(
|
||||||
|
imap_socket,
|
||||||
|
Selection::All,
|
||||||
|
FetchKind::Rfc822Size,
|
||||||
|
FetchMod::ChangedSince(2),
|
||||||
|
)?;
|
||||||
assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))"));
|
assert!(fetch_res.contains("* 1 FETCH (RFC822.SIZE 84 MODSEQ (3))"));
|
||||||
assert!(!fetch_res.contains("* 2 FETCH"));
|
assert!(!fetch_res.contains("* 2 FETCH"));
|
||||||
assert_eq!(store_res.lines().count(), 2);
|
assert_eq!(store_res.lines().count(), 2);
|
||||||
|
@ -200,3 +240,25 @@ fn rfc4551_imapext_condstore() {
|
||||||
})
|
})
|
||||||
.expect("test fully run");
|
.expect("test fully run");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
fn rfc2177_imapext_idle() {
|
||||||
|
println!("🧪 rfc2177_imapext_idle");
|
||||||
|
common::aerogramme_provider_daemon_dev(|imap_socket, lmtp_socket| {
|
||||||
|
// Test setup
|
||||||
|
connect(imap_socket).context("server says hello")?;
|
||||||
|
capability(imap_socket, Extension::Idle).context("check server capabilities")?;
|
||||||
|
login(imap_socket, Account::Alice).context("login test")?;
|
||||||
|
select(imap_socket, Mailbox::Inbox, SelectMod::None).context("select inbox")?;
|
||||||
|
|
||||||
|
// Check that new messages from LMTP are correctly detected during idling
|
||||||
|
start_idle(imap_socket).context("can't start idling")?;
|
||||||
|
lmtp_handshake(lmtp_socket).context("handshake lmtp done")?;
|
||||||
|
lmtp_deliver_email(lmtp_socket, Email::Basic).context("mail delivered successfully")?;
|
||||||
|
let srv_msg = stop_idle(imap_socket).context("stop idling")?;
|
||||||
|
assert!(srv_msg.contains("* 1 EXISTS"));
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
})
|
||||||
|
.expect("test fully run");
|
||||||
|
}
|
||||||
|
|
|
@ -36,6 +36,7 @@ pub enum Extension {
|
||||||
Move,
|
Move,
|
||||||
Condstore,
|
Condstore,
|
||||||
LiteralPlus,
|
LiteralPlus,
|
||||||
|
Idle,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub enum Enable {
|
pub enum Enable {
|
||||||
|
@ -114,6 +115,7 @@ pub fn capability(imap: &mut TcpStream, ext: Extension) -> Result<()> {
|
||||||
Extension::Move => Some("MOVE"),
|
Extension::Move => Some("MOVE"),
|
||||||
Extension::Condstore => Some("CONDSTORE"),
|
Extension::Condstore => Some("CONDSTORE"),
|
||||||
Extension::LiteralPlus => Some("LITERAL+"),
|
Extension::LiteralPlus => Some("LITERAL+"),
|
||||||
|
Extension::Idle => Some("IDLE"),
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut buffer: [u8; 6000] = [0; 6000];
|
let mut buffer: [u8; 6000] = [0; 6000];
|
||||||
|
@ -286,7 +288,12 @@ pub fn noop_exists(imap: &mut TcpStream, must_exists: u32) -> Result<()> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn fetch(imap: &mut TcpStream, selection: Selection, kind: FetchKind, modifier: FetchMod) -> Result<String> {
|
pub fn fetch(
|
||||||
|
imap: &mut TcpStream,
|
||||||
|
selection: Selection,
|
||||||
|
kind: FetchKind,
|
||||||
|
modifier: FetchMod,
|
||||||
|
) -> Result<String> {
|
||||||
let mut buffer: [u8; 65535] = [0; 65535];
|
let mut buffer: [u8; 65535] = [0; 65535];
|
||||||
|
|
||||||
let sel_str = match selection {
|
let sel_str = match selection {
|
||||||
|
@ -367,7 +374,7 @@ pub fn store(
|
||||||
sel: Selection,
|
sel: Selection,
|
||||||
flag: Flag,
|
flag: Flag,
|
||||||
action: StoreAction,
|
action: StoreAction,
|
||||||
modifier: StoreMod
|
modifier: StoreMod,
|
||||||
) -> Result<String> {
|
) -> Result<String> {
|
||||||
let mut buffer: [u8; 6000] = [0; 6000];
|
let mut buffer: [u8; 6000] = [0; 6000];
|
||||||
|
|
||||||
|
@ -491,6 +498,22 @@ pub fn enable(imap: &mut TcpStream, ask: Enable, done: Option<Enable>) -> Result
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn start_idle(imap: &mut TcpStream) -> Result<()> {
|
||||||
|
let mut buffer: [u8; 1500] = [0; 1500];
|
||||||
|
imap.write(&b"98 IDLE\r\n"[..])?;
|
||||||
|
let read = read_lines(imap, &mut buffer, None)?;
|
||||||
|
assert_eq!(read[0], b'+');
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn stop_idle(imap: &mut TcpStream) -> Result<String> {
|
||||||
|
let mut buffer: [u8; 16536] = [0; 16536];
|
||||||
|
imap.write(&b"DONE\r\n"[..])?;
|
||||||
|
let read = read_lines(imap, &mut buffer, Some(&b"98 OK"[..]))?;
|
||||||
|
let srv_msg = std::str::from_utf8(read)?;
|
||||||
|
Ok(srv_msg.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
pub fn logout(imap: &mut TcpStream) -> Result<()> {
|
pub fn logout(imap: &mut TcpStream) -> Result<()> {
|
||||||
imap.write(&b"99 logout\r\n"[..])?;
|
imap.write(&b"99 logout\r\n"[..])?;
|
||||||
let mut buffer: [u8; 1500] = [0; 1500];
|
let mut buffer: [u8; 1500] = [0; 1500];
|
||||||
|
|
Loading…
Reference in a new issue