Futures must be ordered

This commit is contained in:
Quentin 2024-01-08 21:18:45 +01:00
parent fe28120676
commit a90f425d32
Signed by: quentin
GPG key ID: E9602264D639FF68
6 changed files with 34 additions and 58 deletions

View file

@ -104,16 +104,21 @@ impl<'a> Index<'a> {
return Ok(vec![]); return Ok(vec![]);
} }
let iter_strat = sequence::Strategy::Naive { let iter_strat = sequence::Strategy::Naive {
largest: self.last().expect("The mailbox is not empty").uid, largest: NonZeroU32::try_from(self.imap_index.len() as u32)?,
}; };
sequence_set let mut acc = sequence_set
.iter(iter_strat) .iter(iter_strat)
.map(|wanted_id| { .map(|wanted_id| {
self.imap_index self.imap_index
.get((wanted_id.get() as usize) - 1) .get((wanted_id.get() as usize) - 1)
.ok_or(anyhow!("Mail not found")) .ok_or(anyhow!("Mail not found"))
}) })
.collect::<Result<Vec<_>>>() .collect::<Result<Vec<_>>>()?;
// Sort the result to be consistent with UID
acc.sort_by(|a, b| a.i.cmp(&b.i));
Ok(acc)
} }
pub fn fetch( pub fn fetch(

View file

@ -27,13 +27,13 @@ use crate::imap::response::Body;
pub struct MailView<'a> { pub struct MailView<'a> {
pub in_idx: &'a MailIndex<'a>, pub in_idx: &'a MailIndex<'a>,
pub query_result: &'a QueryResult<'a>, pub query_result: &'a QueryResult,
pub content: FetchedMail<'a>, pub content: FetchedMail<'a>,
} }
impl<'a> MailView<'a> { impl<'a> MailView<'a> {
pub fn new( pub fn new(
query_result: &'a QueryResult<'a>, query_result: &'a QueryResult,
in_idx: &'a MailIndex<'a>, in_idx: &'a MailIndex<'a>,
) -> Result<MailView<'a>> { ) -> Result<MailView<'a>> {
Ok(Self { Ok(Self {

View file

@ -1,7 +1,7 @@
use std::num::NonZeroU32; use std::num::NonZeroU32;
use std::sync::Arc; use std::sync::Arc;
use anyhow::{anyhow, Context, Error, Result}; use anyhow::{anyhow, Error, Result};
use futures::stream::{FuturesOrdered, StreamExt}; use futures::stream::{FuturesOrdered, StreamExt};
@ -259,6 +259,7 @@ impl MailboxView {
true => QueryScope::Full, true => QueryScope::Full,
_ => QueryScope::Partial, _ => QueryScope::Partial,
}; };
tracing::debug!("Query scope {:?}", query_scope);
let idx = self.index()?; let idx = self.index()?;
let mail_idx_list = idx.fetch(sequence_set, *is_uid_fetch)?; let mail_idx_list = idx.fetch(sequence_set, *is_uid_fetch)?;
@ -544,7 +545,6 @@ mod tests {
let rfc822 = b"Subject: hello\r\nFrom: a@a.a\r\nTo: b@b.b\r\nDate: Thu, 12 Oct 2023 08:45:28 +0000\r\n\r\nhello world"; let rfc822 = b"Subject: hello\r\nFrom: a@a.a\r\nTo: b@b.b\r\nDate: Thu, 12 Oct 2023 08:45:28 +0000\r\n\r\nhello world";
let qr = QueryResult::FullResult { let qr = QueryResult::FullResult {
uuid: mail_in_idx.uuid.clone(), uuid: mail_in_idx.uuid.clone(),
index: &index_entry,
metadata: meta, metadata: meta,
content: rfc822.to_vec(), content: rfc822.to_vec(),
}; };
@ -619,6 +619,7 @@ mod tests {
seq: NonZeroU32::new(1).unwrap(), seq: NonZeroU32::new(1).unwrap(),
items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure( items: NonEmptyVec::from(MessageDataItem::Body(mime_view::bodystructure(
&message.child, &message.child,
false,
)?)), )?)),
}); });
let test_bytes = ResponseCodec::new().encode(&test_repr).dump(); let test_bytes = ResponseCodec::new().encode(&test_repr).dump();

View file

@ -134,7 +134,7 @@ impl<'a> Criteria<'a> {
pub fn filter_on_query<'b>( pub fn filter_on_query<'b>(
&self, &self,
midx_list: &[&'b MailIndex<'b>], midx_list: &[&'b MailIndex<'b>],
query_result: &'b Vec<QueryResult<'b>>, query_result: &'b Vec<QueryResult>,
) -> Result<Vec<&'b MailIndex<'b>>> { ) -> Result<Vec<&'b MailIndex<'b>>> {
Ok(midx_list Ok(midx_list
.iter() .iter()

View file

@ -486,7 +486,7 @@ fn dump(uid_index: &Bayou<UidIndex>) {
/// The metadata of a message that is stored in K2V /// The metadata of a message that is stored in K2V
/// at pk = mail/<mailbox uuid>, sk = <message uuid> /// at pk = mail/<mailbox uuid>, sk = <message uuid>
#[derive(Serialize, Deserialize)] #[derive(Debug, Serialize, Deserialize)]
pub struct MailMeta { pub struct MailMeta {
/// INTERNALDATE field (milliseconds since epoch) /// INTERNALDATE field (milliseconds since epoch)
pub internaldate: u64, pub internaldate: u64,

View file

@ -1,9 +1,8 @@
use super::mailbox::MailMeta; use super::mailbox::MailMeta;
use super::snapshot::FrozenMailbox; use super::snapshot::FrozenMailbox;
use super::uidindex::IndexEntry;
use super::unique_ident::UniqueIdent; use super::unique_ident::UniqueIdent;
use anyhow::{anyhow, Result}; use anyhow::Result;
use futures::stream::{FuturesUnordered, StreamExt}; use futures::stream::{FuturesOrdered, StreamExt};
/// Query is in charge of fetching efficiently /// Query is in charge of fetching efficiently
/// requested data for a list of emails /// requested data for a list of emails
@ -13,7 +12,7 @@ pub struct Query<'a, 'b> {
pub scope: QueryScope, pub scope: QueryScope,
} }
#[allow(dead_code)] #[derive(Debug)]
pub enum QueryScope { pub enum QueryScope {
Index, Index,
Partial, Partial,
@ -30,41 +29,26 @@ impl QueryScope {
} }
impl<'a, 'b> Query<'a, 'b> { impl<'a, 'b> Query<'a, 'b> {
pub async fn fetch(&self) -> Result<Vec<QueryResult<'a>>> { pub async fn fetch(&self) -> Result<Vec<QueryResult>> {
match self.scope { match self.scope {
QueryScope::Index => self.index(), QueryScope::Index => Ok(self.emails.iter().map(|&uuid| QueryResult::IndexResult { uuid }).collect()),
QueryScope::Partial => self.partial().await, QueryScope::Partial =>self.partial().await,
QueryScope::Full => self.full().await, QueryScope::Full => self.full().await,
} }
} }
// --- functions below are private *for reasons* // --- functions below are private *for reasons*
fn index(&self) -> Result<Vec<QueryResult<'a>>> { async fn partial(&self) -> Result<Vec<QueryResult>> {
self.emails
.iter()
.map(|uuid| {
self.frozen
.snapshot
.table
.get(uuid)
.map(|index| QueryResult::IndexResult { uuid: *uuid, index })
.ok_or(anyhow!("missing email in index"))
})
.collect::<Result<Vec<_>, _>>()
}
async fn partial(&self) -> Result<Vec<QueryResult<'a>>> {
let meta = self.frozen.mailbox.fetch_meta(self.emails).await?; let meta = self.frozen.mailbox.fetch_meta(self.emails).await?;
let result = meta let result = meta
.into_iter() .into_iter()
.zip(self.index()?) .zip(self.emails.iter())
.map(|(metadata, index)| { .map(|(metadata, &uuid)| {
index QueryResult::PartialResult { uuid, metadata }
.into_partial(metadata)
.expect("index to be IndexResult")
}) })
.collect::<Vec<_>>(); .collect::<Vec<_>>();
Ok(result) Ok(result)
} }
@ -72,7 +56,7 @@ impl<'a, 'b> Query<'a, 'b> {
/// AND GENERATE SO MUCH NETWORK TRAFFIC. /// AND GENERATE SO MUCH NETWORK TRAFFIC.
/// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH /// THIS FUNCTION SHOULD BE REWRITTEN, FOR EXAMPLE WITH
/// SOMETHING LIKE AN ITERATOR /// SOMETHING LIKE AN ITERATOR
async fn full(&self) -> Result<Vec<QueryResult<'a>>> { async fn full(&self) -> Result<Vec<QueryResult>> {
let meta_list = self.partial().await?; let meta_list = self.partial().await?;
meta_list meta_list
.into_iter() .into_iter()
@ -91,7 +75,7 @@ impl<'a, 'b> Query<'a, 'b> {
Ok(meta.into_full(content).expect("meta to be PartialResult")) Ok(meta.into_full(content).expect("meta to be PartialResult"))
}) })
.collect::<FuturesUnordered<_>>() .collect::<FuturesOrdered<_>>()
.collect::<Vec<_>>() .collect::<Vec<_>>()
.await .await
.into_iter() .into_iter()
@ -99,24 +83,22 @@ impl<'a, 'b> Query<'a, 'b> {
} }
} }
pub enum QueryResult<'a> { #[derive(Debug)]
pub enum QueryResult {
IndexResult { IndexResult {
uuid: UniqueIdent, uuid: UniqueIdent,
index: &'a IndexEntry,
}, },
PartialResult { PartialResult {
uuid: UniqueIdent, uuid: UniqueIdent,
index: &'a IndexEntry,
metadata: MailMeta, metadata: MailMeta,
}, },
FullResult { FullResult {
uuid: UniqueIdent, uuid: UniqueIdent,
index: &'a IndexEntry,
metadata: MailMeta, metadata: MailMeta,
content: Vec<u8>, content: Vec<u8>,
}, },
} }
impl<'a> QueryResult<'a> { impl QueryResult {
pub fn uuid(&self) -> &UniqueIdent { pub fn uuid(&self) -> &UniqueIdent {
match self { match self {
Self::IndexResult { uuid, .. } => uuid, Self::IndexResult { uuid, .. } => uuid,
@ -125,16 +107,7 @@ impl<'a> QueryResult<'a> {
} }
} }
#[allow(dead_code)] pub fn metadata(&self) -> Option<&MailMeta> {
pub fn index(&self) -> &IndexEntry {
match self {
Self::IndexResult { index, .. } => index,
Self::PartialResult { index, .. } => index,
Self::FullResult { index, .. } => index,
}
}
pub fn metadata(&'a self) -> Option<&'a MailMeta> {
match self { match self {
Self::IndexResult { .. } => None, Self::IndexResult { .. } => None,
Self::PartialResult { metadata, .. } => Some(metadata), Self::PartialResult { metadata, .. } => Some(metadata),
@ -143,7 +116,7 @@ impl<'a> QueryResult<'a> {
} }
#[allow(dead_code)] #[allow(dead_code)]
pub fn content(&'a self) -> Option<&'a [u8]> { pub fn content(&self) -> Option<&[u8]> {
match self { match self {
Self::FullResult { content, .. } => Some(content), Self::FullResult { content, .. } => Some(content),
_ => None, _ => None,
@ -152,9 +125,8 @@ impl<'a> QueryResult<'a> {
fn into_partial(self, metadata: MailMeta) -> Option<Self> { fn into_partial(self, metadata: MailMeta) -> Option<Self> {
match self { match self {
Self::IndexResult { uuid, index } => Some(Self::PartialResult { Self::IndexResult { uuid } => Some(Self::PartialResult {
uuid, uuid,
index,
metadata, metadata,
}), }),
_ => None, _ => None,
@ -165,11 +137,9 @@ impl<'a> QueryResult<'a> {
match self { match self {
Self::PartialResult { Self::PartialResult {
uuid, uuid,
index,
metadata, metadata,
} => Some(Self::FullResult { } => Some(Self::FullResult {
uuid, uuid,
index,
metadata, metadata,
content, content,
}), }),