Implement move and copy mail

This commit is contained in:
Alex 2022-06-30 12:07:01 +02:00
parent 497ad4b5ea
commit 0048b43f27
Signed by: lx
GPG key ID: 0E496D15096376BE
2 changed files with 90 additions and 7 deletions

View file

@ -1,13 +1,13 @@
use anyhow::{anyhow, bail, Result}; use anyhow::{anyhow, bail, Result};
use k2v_client::K2vClient; use k2v_client::K2vClient;
use k2v_client::{BatchReadOp, Filter, K2vValue}; use k2v_client::{BatchReadOp, Filter, K2vValue};
use rusoto_s3::{DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3}; use rusoto_s3::{CopyObjectRequest, DeleteObjectRequest, GetObjectRequest, PutObjectRequest, S3Client, S3};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
use tokio::sync::RwLock; use tokio::sync::RwLock;
use crate::bayou::Bayou; use crate::bayou::Bayou;
use crate::cryptoblob::{self, gen_key, open_deserialize, Key}; use crate::cryptoblob::{self, gen_key, open_deserialize, seal_serialize, Key};
use crate::login::Credentials; use crate::login::Credentials;
use crate::mail::uidindex::*; use crate::mail::uidindex::*;
use crate::mail::unique_ident::*; use crate::mail::unique_ident::*;
@ -88,14 +88,38 @@ impl Mailbox {
/// Copy an email from an other Mailbox to this mailbox /// Copy an email from an other Mailbox to this mailbox
/// (use this when possible, as it allows for a certain number of storage optimizations) /// (use this when possible, as it allows for a certain number of storage optimizations)
pub async fn copy_from(&self, _from: &Mailbox, _uuid: UniqueIdent) -> Result<()> { pub async fn copy_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
unimplemented!() if self.id == from.id {
bail!("Cannot copy into same mailbox");
}
let (mut selflock, mut fromlock);
if self.id < from.id {
selflock = self.mbox.write().await;
fromlock = from.mbox.write().await;
} else {
fromlock = from.mbox.write().await;
selflock = self.mbox.write().await;
};
selflock.copy_from(&fromlock, uuid).await
} }
/// Move an email from an other Mailbox to this mailbox /// Move an email from an other Mailbox to this mailbox
/// (use this when possible, as it allows for a certain number of storage optimizations) /// (use this when possible, as it allows for a certain number of storage optimizations)
pub async fn move_from(&self, _from: &Mailbox, _uuid: UniqueIdent) -> Result<()> { pub async fn move_from(&self, from: &Mailbox, uuid: UniqueIdent) -> Result<()> {
unimplemented!() if self.id == from.id {
bail!("Cannot copy move same mailbox");
}
let (mut selflock, mut fromlock);
if self.id < from.id {
selflock = self.mbox.write().await;
fromlock = from.mbox.write().await;
} else {
fromlock = from.mbox.write().await;
selflock = self.mbox.write().await;
};
selflock.move_from(&mut fromlock, uuid).await
} }
// ---- // ----
@ -220,7 +244,7 @@ impl MailboxInternal {
message_key: message_key.clone(), message_key: message_key.clone(),
rfc822_size: mail.raw.len(), rfc822_size: mail.raw.len(),
}; };
let meta_blob = cryptoblob::seal_serialize(&meta, &self.encryption_key)?; let meta_blob = seal_serialize(&meta, &self.encryption_key)?;
self.k2v self.k2v
.insert_item(&self.mail_path, &ident.to_string(), meta_blob, None) .insert_item(&self.mail_path, &ident.to_string(), meta_blob, None)
.await?; .await?;
@ -239,6 +263,10 @@ impl MailboxInternal {
} }
async fn delete(&mut self, ident: UniqueIdent) -> Result<()> { async fn delete(&mut self, ident: UniqueIdent) -> Result<()> {
if !self.uid_index.state().table.contains_key(&ident) {
bail!("Cannot delete mail that doesn't exit");
}
let del_mail_op = self.uid_index.state().op_mail_del(ident); let del_mail_op = self.uid_index.state().op_mail_del(ident);
self.uid_index.push(del_mail_op).await?; self.uid_index.push(del_mail_op).await?;
@ -264,6 +292,52 @@ impl MailboxInternal {
Ok(()) Ok(())
} }
async fn copy_from(&mut self, from: &MailboxInternal, source_id: UniqueIdent) -> Result<()> {
let new_id = gen_ident();
self.copy_internal(from, source_id, new_id).await
}
async fn move_from(&mut self, from: &mut MailboxInternal, id: UniqueIdent) -> Result<()> {
self.copy_internal(from, id, id).await?;
from.delete(id).await?;
Ok(())
}
async fn copy_internal(&mut self, from: &MailboxInternal, source_id: UniqueIdent, new_id: UniqueIdent) -> Result<()> {
let flags = from.uid_index.state()
.table.get(&source_id).ok_or(anyhow!("Source mail not found"))?.1.clone();
futures::try_join!(
async {
// Copy mail body from S3
let mut cor = CopyObjectRequest::default();
cor.bucket = self.bucket.clone();
cor.key = format!("{}/{}", self.mail_path, new_id);
cor.copy_source = format!("{}/{}/{}", from.bucket, from.mail_path, source_id);
self.s3.copy_object(cor).await?;
Ok::<_, anyhow::Error>(())
},
async {
// Copy mail meta in K2V
let meta = &from.fetch_meta(&[source_id]).await?[0];
let meta_blob = seal_serialize(meta, &self.encryption_key)?;
self.k2v
.insert_item(&self.mail_path, &new_id.to_string(), meta_blob, None)
.await?;
Ok::<_, anyhow::Error>(())
},
)?;
// Add mail to Bayou mail index
let add_mail_op = self
.uid_index
.state()
.op_mail_add(new_id, flags);
self.uid_index.push(add_mail_op).await?;
Ok(())
}
// ---- // ----
async fn test(&mut self) -> Result<()> { async fn test(&mut self) -> Result<()> {

View file

@ -36,6 +36,7 @@ pub enum UidIndexOp {
MailDel(UniqueIdent), MailDel(UniqueIdent),
FlagAdd(UniqueIdent, Vec<Flag>), FlagAdd(UniqueIdent, Vec<Flag>),
FlagDel(UniqueIdent, Vec<Flag>), FlagDel(UniqueIdent, Vec<Flag>),
BumpUidvalidity(u32),
} }
impl UidIndex { impl UidIndex {
@ -59,6 +60,11 @@ impl UidIndex {
UidIndexOp::FlagDel(ident, flags) UidIndexOp::FlagDel(ident, flags)
} }
#[must_use]
pub fn op_bump_uidvalidity(&self, count: u32) -> UidIndexOp {
UidIndexOp::BumpUidvalidity(count)
}
// INTERNAL functions to keep state consistent // INTERNAL functions to keep state consistent
fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, flags: &Vec<Flag>) { fn reg_email(&mut self, ident: UniqueIdent, uid: ImapUid, flags: &Vec<Flag>) {
@ -156,6 +162,9 @@ impl BayouState for UidIndex {
new.idx_by_flag.remove(*uid, rm_flags); new.idx_by_flag.remove(*uid, rm_flags);
} }
} }
UidIndexOp::BumpUidvalidity(count) => {
new.uidvalidity = ImapUidvalidity::new(new.uidvalidity.get() + *count).unwrap_or(ImapUidvalidity::new(u32::MAX).unwrap());
}
} }
new new
} }