Implement beginning of a FETCH command

This commit is contained in:
Alex 2022-06-29 19:24:21 +02:00
parent a8d0e4a994
commit 509e7e4bed
Signed by: lx
GPG key ID: 0E496D15096376BE
4 changed files with 308 additions and 33 deletions

View file

@ -42,17 +42,22 @@ pub async fn dispatch<'a>(ctx: SelectedContext<'a>) -> Result<(Response, flow::T
impl<'a> SelectedContext<'a> {
pub async fn fetch(
self,
_sequence_set: &SequenceSet,
_attributes: &MacroOrFetchAttributes,
_uid: &bool,
sequence_set: &SequenceSet,
attributes: &MacroOrFetchAttributes,
uid: &bool,
) -> Result<(Response, flow::Transition)> {
Ok((Response::bad("Not implemented")?, flow::Transition::None))
let resp = self.mailbox.fetch(sequence_set, attributes, uid).await?;
Ok((
Response::ok("FETCH completed")?.with_body(resp),
flow::Transition::None,
))
}
pub async fn noop(self) -> Result<(Response, flow::Transition)> {
let updates = self.mailbox.update().await?;
Ok((
Response::ok("Noop completed.")?.with_body(updates),
Response::ok("NOOP completed.")?.with_body(updates),
flow::Transition::None,
))
}

View file

@ -1,11 +1,14 @@
use std::num::NonZeroU32;
use std::sync::Arc;
use anyhow::{Error, Result};
use anyhow::{anyhow, bail, Error, Result};
use boitalettres::proto::res::body::Data as Body;
use imap_codec::types::core::Atom;
use futures::stream::{FuturesOrdered, StreamExt};
use imap_codec::types::core::{Atom, IString, NString};
use imap_codec::types::fetch_attributes::{FetchAttribute, MacroOrFetchAttributes};
use imap_codec::types::flag::Flag;
use imap_codec::types::response::{Code, Data, MessageAttribute, Status};
use imap_codec::types::sequence::{self, SequenceSet};
use crate::mail::mailbox::Mailbox;
use crate::mail::uidindex::UidIndex;
@ -132,6 +135,121 @@ impl MailboxView {
Ok(data)
}
/// Looks up state changes in the mailbox and produces a set of IMAP
/// responses describing the new state.
pub async fn fetch(
&self,
sequence_set: &SequenceSet,
attributes: &MacroOrFetchAttributes,
uid: &bool,
) -> Result<Vec<Body>> {
if *uid {
bail!("UID FETCH not implemented");
}
let mail_vec = self
.known_state
.idx_by_uid
.iter()
.map(|(uid, uuid)| (*uid, *uuid))
.collect::<Vec<_>>();
let mut mails = vec![];
let iter_strat = sequence::Strategy::Naive {
largest: NonZeroU32::try_from((self.known_state.idx_by_uid.len() + 1) as u32).unwrap(),
};
for i in sequence_set.iter(iter_strat) {
if let Some(mail) = mail_vec.get(i.get() as usize - 1) {
mails.push((i, *mail));
} else {
bail!("No such mail: {}", i);
}
}
let mails_uuid = mails
.iter()
.map(|(_i, (_uid, uuid))| *uuid)
.collect::<Vec<_>>();
let mails_meta = self.mailbox.fetch_meta(&mails_uuid).await?;
let fetch_attrs = match attributes {
MacroOrFetchAttributes::Macro(m) => m.expand(),
MacroOrFetchAttributes::FetchAttributes(a) => a.clone(),
};
let need_body = fetch_attrs.iter().any(|x| {
matches!(
x,
FetchAttribute::Body
| FetchAttribute::BodyExt { .. }
| FetchAttribute::Rfc822
| FetchAttribute::Rfc822Text
| FetchAttribute::BodyStructure
)
});
let mails = if need_body {
let mut iter = mails
.into_iter()
.zip(mails_meta.into_iter())
.map(|((i, (uid, uuid)), meta)| async move {
let body = self.mailbox.fetch_full(uuid, &meta.message_key).await?;
Ok::<_, anyhow::Error>((i, uid, uuid, meta, Some(body)))
})
.collect::<FuturesOrdered<_>>();
let mut mails = vec![];
while let Some(m) = iter.next().await {
mails.push(m?);
}
mails
} else {
mails
.into_iter()
.zip(mails_meta.into_iter())
.map(|((i, (uid, uuid)), meta)| (i, uid, uuid, meta, None))
.collect::<Vec<_>>()
};
let mut ret = vec![];
for (i, uid, uuid, meta, body) in mails {
let mut attributes = vec![MessageAttribute::Uid(uid)];
let (uid2, flags) = self
.known_state
.table
.get(&uuid)
.ok_or_else(|| anyhow!("Mail not in uidindex table: {}", uuid))?;
for attr in fetch_attrs.iter() {
match attr {
FetchAttribute::Uid => (),
FetchAttribute::Flags => {
attributes.push(MessageAttribute::Flags(
flags.iter().filter_map(|f| string_to_flag(f)).collect(),
));
}
FetchAttribute::Rfc822Size => {
attributes.push(MessageAttribute::Rfc822Size(meta.rfc822_size as u32))
}
FetchAttribute::Rfc822Header => {
attributes.push(MessageAttribute::Rfc822Header(NString(Some(
IString::Literal(meta.headers.clone().try_into().unwrap()),
))))
}
// TODO
_ => (),
}
}
ret.push(Body::Data(Data::Fetch {
seq_or_uid: i,
attributes,
}));
}
Ok(ret)
}
// ----
/// Produce an OK [UIDVALIDITY _] message corresponding to `known_state`

View file

@ -1,14 +1,20 @@
use anyhow::Result;
use anyhow::{anyhow, bail, Result};
use k2v_client::K2vClient;
use rusoto_s3::S3Client;
use k2v_client::{BatchReadOp, Filter, K2vValue};
use rusoto_s3::{
DeleteObjectRequest, GetObjectRequest, ListObjectsV2Request, PutObjectRequest, S3Client, S3,
};
use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt;
use tokio::sync::RwLock;
use crate::bayou::Bayou;
use crate::cryptoblob::Key;
use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use crate::login::Credentials;
use crate::mail::uidindex::*;
use crate::mail::unique_ident::*;
use crate::mail::IMF;
use crate::time::now_msec;
pub struct Mailbox {
id: UniqueIdent,
@ -48,8 +54,8 @@ impl Mailbox {
}
/// Insert an email in the mailbox
pub async fn append<'a>(&self, _msg: IMF<'a>) -> Result<()> {
unimplemented!()
pub async fn append<'a>(&self, msg: IMF<'a>) -> Result<()> {
self.mbox.write().await.append(msg, None).await
}
/// Copy an email from an other Mailbox to this mailbox
@ -58,21 +64,15 @@ impl Mailbox {
unimplemented!()
}
/// Delete all emails with the \Delete flag in the mailbox
/// Can be called by CLOSE and EXPUNGE
/// @FIXME do we want to implement this feature or a simpler "delete" command
/// The controller could then "fetch \Delete" and call delete on each email?
pub async fn expunge(&self) -> Result<()> {
unimplemented!()
/// Fetch the metadata (headers + some more info) of the specified
/// mail IDs
pub async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
self.mbox.read().await.fetch_meta(ids).await
}
/// Update flags of a range of emails
pub async fn store(&self) -> Result<()> {
unimplemented!()
}
pub async fn fetch(&self) -> Result<()> {
unimplemented!()
/// Fetch an entire e-mail
pub async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
self.mbox.read().await.fetch_full(id, message_key).await
}
/// Test procedure TODO WILL REMOVE THIS
@ -98,16 +98,141 @@ struct MailboxInternal {
}
impl MailboxInternal {
pub async fn test(&mut self) -> Result<()> {
async fn fetch_meta(&self, ids: &[UniqueIdent]) -> Result<Vec<MailMeta>> {
let ids = ids.iter().map(|x| x.to_string()).collect::<Vec<_>>();
let ops = ids
.iter()
.map(|id| BatchReadOp {
partition_key: &self.mail_path,
filter: Filter {
start: Some(id),
end: None,
prefix: None,
limit: None,
reverse: false,
},
single_item: true,
conflicts_only: false,
tombstones: false,
})
.collect::<Vec<_>>();
let res_vec = self.k2v.read_batch(&ops).await?;
let mut meta_vec = vec![];
for res in res_vec {
if res.items.len() != 1 {
bail!("Expected 1 item, got {}", res.items.len());
}
let (_, cv) = res.items.iter().next().unwrap();
if cv.value.len() != 1 {
bail!("Expected 1 value, got {}", cv.value.len());
}
match &cv.value[0] {
K2vValue::Tombstone => bail!("Expected value, got tombstone"),
K2vValue::Value(v) => {
let meta = open_deserialize::<MailMeta>(v, &self.encryption_key)?;
meta_vec.push(meta);
}
}
}
Ok(meta_vec)
}
async fn fetch_full(&self, id: UniqueIdent, message_key: &Key) -> Result<Vec<u8>> {
let mut gor = GetObjectRequest::default();
gor.bucket = self.bucket.clone();
gor.key = format!("{}/{}", self.mail_path, id);
let obj_res = self.s3.get_object(gor).await?;
let obj_body = obj_res.body.ok_or(anyhow!("Missing object body"))?;
let mut buf = Vec::with_capacity(obj_res.content_length.unwrap_or(128) as usize);
obj_body.into_async_read().read_to_end(&mut buf).await?;
Ok(cryptoblob::open(&buf, &message_key)?)
}
async fn append(&mut self, mail: IMF<'_>, ident: Option<UniqueIdent>) -> Result<()> {
let ident = ident.unwrap_or_else(|| gen_ident());
let message_key = gen_key();
futures::try_join!(
async {
// Encrypt and save mail body
let message_blob = cryptoblob::seal(mail.raw, &message_key)?;
let mut por = PutObjectRequest::default();
por.bucket = self.bucket.clone();
por.key = format!("{}/{}", self.mail_path, ident);
por.body = Some(message_blob.into());
self.s3.put_object(por).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Save mail meta
let meta = MailMeta {
internaldate: now_msec(),
headers: mail.raw[..mail.parsed.offset_body].to_vec(),
message_key: message_key.clone(),
rfc822_size: mail.raw.len(),
};
let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?;
self.k2v
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
.await?;
Ok::<_, anyhow::Error>(())
}
)?;
// Add mail to Bayou mail index
let add_mail_op = self
.uid_index
.state()
.op_mail_add(ident, vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
Ok(())
}
async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
let del_mail_op = self.uid_index.state().op_mail_del(ident);
self.uid_index.push(del_mail_op).await?;
futures::try_join!(
async {
// Delete mail body from S3
let mut dor = DeleteObjectRequest::default();
dor.bucket = self.bucket.clone();
dor.key = format!("{}/{}", self.mail_path, ident);
self.s3.delete_object(dor).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Delete mail meta from K2V
let sk = ident.to_string();
let v = self.k2v.read_item(&self.mail_path, &sk).await?;
self.k2v
.delete_item(&self.mail_path, &sk, v.causality)
.await?;
Ok::<_, anyhow::Error>(())
}
)?;
Ok(())
}
// ----
async fn test(&mut self) -> Result<()> {
self.uid_index.sync().await?;
dump(&self.uid_index);
let add_mail_op = self
.uid_index
.state()
.op_mail_add(gen_ident(), vec!["\\Unseen".into()]);
self.uid_index.push(add_mail_op).await?;
let mail = br#"From: Garage team <garagehq@deuxfleurs.fr>
Subject: Welcome to Aerogramme!!
This is just a test email, feel free to ignore."#;
let mail = IMF::try_from(&mail[..]).unwrap();
self.append(mail, None).await?;
dump(&self.uid_index);
@ -121,8 +246,8 @@ impl MailboxInternal {
.skip(3 + i)
.next()
.unwrap();
let del_mail_op = self.uid_index.state().op_mail_del(*ident);
self.uid_index.push(del_mail_op).await?;
self.delete(*ident).await?;
dump(&self.uid_index);
}
@ -148,3 +273,19 @@ fn dump(uid_index: &Bayou<UidIndex>) {
}
println!("");
}
// ----
/// The metadata of a message that is stored in K2V
/// at pk = mail/<mailbox uuid>, sk = <message uuid>
#[derive(Serialize, Deserialize)]
pub struct MailMeta {
/// INTERNALDATE field (milliseconds since epoch)
pub internaldate: u64,
/// Headers of the message
pub headers: Vec<u8>,
/// Secret key for decrypting entire message
pub message_key: Key,
/// RFC822 size
pub rfc822_size: usize,
}

View file

@ -1,3 +1,5 @@
use std::convert::TryFrom;
pub mod mailbox;
pub mod uidindex;
pub mod unique_ident;
@ -9,3 +11,12 @@ pub struct IMF<'a> {
raw: &'a [u8],
parsed: mail_parser::Message<'a>,
}
impl<'a> TryFrom<&'a [u8]> for IMF<'a> {
type Error = ();
fn try_from(body: &'a [u8]) -> Result<IMF<'a>, ()> {
let parsed = mail_parser::Message::parse(body).ok_or(())?;
Ok(Self { raw: body, parsed })
}
}