Re-enable proto

This commit is contained in:
Quentin 2024-03-08 09:55:33 +01:00
parent 1edf0b15ec
commit 11462f80c4
Signed by: quentin
GPG key ID: E9602264D639FF68
32 changed files with 992 additions and 132 deletions

929
Cargo.lock generated

File diff suppressed because it is too large Load diff

View file

@ -7,7 +7,7 @@ members = [
"aero-dav",
"aero-dav/fuzz",
"aero-collections",
# "aero-proto",
"aero-proto",
# "aerogramme",
]
@ -20,7 +20,7 @@ aero-bayou = { version = "0.3.0", path = "aero-bayou" }
aero-sasl = { version = "0.3.0", path = "aero-sasl" }
aero-dav = { version = "0.3.0", path = "aero-dav" }
aero-collections = { version = "0.3.0", path = "aero-collections" }
#aero-proto = { version = "0.3.0", path = "aero-proto" }
aero-proto = { version = "0.3.0", path = "aero-proto" }
#aerogramme = { version = "0.3.0", path = "aerogramme" }
# async runtime

View file

@ -1,4 +1,4 @@
use super::types as dav;
//use super::types as dav;
use super::caltypes::*;
use super::xml;
use super::error;
@ -7,25 +7,25 @@ use super::error;
// ---- EXTENSIONS ---
impl xml::QRead<Violation> for Violation {
async fn qread(xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
async fn qread(_xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
unreachable!();
}
}
impl xml::QRead<Property> for Property {
async fn qread(xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
async fn qread(_xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
unreachable!();
}
}
impl xml::QRead<PropertyRequest> for PropertyRequest {
async fn qread(xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
async fn qread(_xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
unreachable!();
}
}
impl xml::QRead<ResourceType> for ResourceType {
async fn qread(xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
async fn qread(_xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
unreachable!();
}
}

View file

@ -1,7 +1,5 @@
use quick_xml::Error as QError;
use quick_xml::events::{Event, BytesEnd, BytesStart, BytesText};
use quick_xml::name::PrefixDeclaration;
use tokio::io::AsyncWrite;
use quick_xml::events::{Event, BytesText};
use super::caltypes::*;
use super::xml::{Node, QWrite, IWrite, Writer};
@ -627,7 +625,7 @@ impl QWrite for ParamFilterMatch {
impl QWrite for TimeZone {
async fn qwrite(&self, xml: &mut Writer<impl IWrite>) -> Result<(), QError> {
let mut start = xml.create_cal_element("timezone");
let start = xml.create_cal_element("timezone");
let end = start.to_end();
xml.q.write_event_async(Event::Start(start.clone())).await?;
@ -638,7 +636,7 @@ impl QWrite for TimeZone {
impl QWrite for Filter {
async fn qwrite(&self, xml: &mut Writer<impl IWrite>) -> Result<(), QError> {
let mut start = xml.create_cal_element("filter");
let start = xml.create_cal_element("filter");
let end = start.to_end();
xml.q.write_event_async(Event::Start(start.clone())).await?;

View file

@ -1,14 +1,9 @@
use std::future::Future;
use quick_xml::events::Event;
use quick_xml::events::attributes::AttrError;
use quick_xml::name::{Namespace, QName, PrefixDeclaration, ResolveResult, ResolveResult::*};
use quick_xml::reader::NsReader;
use tokio::io::AsyncBufRead;
use chrono::DateTime;
use super::types::*;
use super::error::ParsingError;
use super::xml::{Node, QRead, Reader, IRead, DAV_URN, CAL_URN};
use super::xml::{Node, QRead, Reader, IRead, DAV_URN};
//@TODO (1) Rewrite all objects as Href,
// where we return Ok(None) instead of trying to find the object at any cost.
@ -119,7 +114,7 @@ impl QRead<LockInfo> for LockInfo {
impl<E: Extension> QRead<PropValue<E>> for PropValue<E> {
async fn qread(xml: &mut Reader<impl IRead>) -> Result<Self, ParsingError> {
xml.open(DAV_URN, "prop").await?;
let mut acc = xml.collect::<Property<E>>().await?;
let acc = xml.collect::<Property<E>>().await?;
xml.close().await?;
Ok(PropValue(acc))
}
@ -352,8 +347,6 @@ impl<E: Extension> QRead<PropertyRequest<E>> for PropertyRequest<E> {
impl<E: Extension> QRead<Property<E>> for Property<E> {
async fn qread(xml: &mut Reader<impl IRead>) -> Result<Self, ParsingError> {
use chrono::{DateTime, FixedOffset, TimeZone};
// Core WebDAV properties
if xml.maybe_open(DAV_URN, "creationdate").await?.is_some() {
let datestr = xml.tag_string().await?;
@ -592,7 +585,7 @@ impl QRead<LockType> for LockType {
impl QRead<Href> for Href {
async fn qread(xml: &mut Reader<impl IRead>) -> Result<Self, ParsingError> {
xml.open(DAV_URN, "href").await?;
let mut url = xml.tag_string().await?;
let url = xml.tag_string().await?;
xml.close().await?;
Ok(Href(url))
}

View file

@ -1,5 +1,4 @@
#![feature(type_alias_impl_trait)]
#![feature(async_fn_in_trait)]
#![feature(async_closure)]
#![feature(trait_alias)]

View file

@ -6,12 +6,12 @@ use super::error;
#[derive(Debug, PartialEq)]
pub struct Disabled(());
impl xml::QRead<Disabled> for Disabled {
async fn qread(xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
async fn qread(_xml: &mut xml::Reader<impl xml::IRead>) -> Result<Self, error::ParsingError> {
Err(error::ParsingError::Recoverable)
}
}
impl xml::QWrite for Disabled {
async fn qwrite(&self, xml: &mut xml::Writer<impl xml::IWrite>) -> Result<(), quick_xml::Error> {
async fn qwrite(&self, _xml: &mut xml::Writer<impl xml::IWrite>) -> Result<(), quick_xml::Error> {
unreachable!();
}
}

35
aero-proto/Cargo.toml Normal file
View file

@ -0,0 +1,35 @@
[package]
name = "aero-proto"
version = "0.3.0"
authors = ["Alex Auvolat <alex@adnab.me>", "Quentin Dufour <quentin@dufour.io>"]
edition = "2021"
license = "EUPL-1.2"
description = "Binding between Aerogramme's internal components and well-known protocols"
[dependencies]
aero-sasl.workspace = true
aero-dav.workspace = true
aero-user.workspace = true
aero-collections.workspace = true
async-trait.workspace = true
anyhow.workspace = true
hyper.workspace = true
base64.workspace = true
hyper-util.workspace = true
http-body-util.workspace = true
futures.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tokio-rustls.workspace = true
rustls.workspace = true
rustls-pemfile.workspace = true
imap-codec.workspace = true
imap-flow.workspace = true
chrono.workspace = true
eml-codec.workspace = true
thiserror.workspace = true
duplexify.workspace = true
smtp-message.workspace = true
smtp-server.workspace = true
tracing.workspace = true

View file

@ -11,9 +11,9 @@ use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::watch;
use crate::config::DavUnsecureConfig;
use crate::login::ArcLoginProvider;
use crate::user::User;
use aero_user::config::DavUnsecureConfig;
use aero_user::login::ArcLoginProvider;
use aero_collections::user::User;
pub struct Server {
bind_addr: SocketAddr,
@ -110,7 +110,7 @@ async fn auth(
// Call login provider
let creds = match login.login(username, password).await {
Ok(c) => c,
Err(e) => return Ok(Response::builder()
Err(_) => return Ok(Response::builder()
.status(401)
.body(Full::new(Bytes::from("Wrong credentials")))?),
};
@ -140,6 +140,7 @@ async fn router(user: std::sync::Arc<User>, req: Request<impl hyper::body::Body>
Ok(Response::new(Full::new(Bytes::from("Hello World!"))))
}
async fn collections(user: std::sync::Arc<User>, req: Request<impl hyper::body::Body>) -> Result<Response<Full<Bytes>>> {
#[allow(dead_code)]
async fn collections(_user: std::sync::Arc<User>, _req: Request<impl hyper::body::Body>) -> Result<Response<Full<Bytes>>> {
unimplemented!();
}

View file

@ -4,12 +4,13 @@ use imap_codec::imap_types::core::AString;
use imap_codec::imap_types::response::Code;
use imap_codec::imap_types::secret::Secret;
use aero_user::login::ArcLoginProvider;
use aero_collections::user::User;
use crate::imap::capability::ServerCapability;
use crate::imap::command::anystate;
use crate::imap::flow;
use crate::imap::response::Response;
use crate::login::ArcLoginProvider;
use crate::user::User;
//--- dispatching

View file

@ -14,17 +14,16 @@ use imap_codec::imap_types::mailbox::{ListMailbox, Mailbox as MailboxCodec};
use imap_codec::imap_types::response::{Code, CodeOther, Data};
use imap_codec::imap_types::status::{StatusDataItem, StatusDataItemName};
use aero_collections::mail::uidindex::*;
use aero_collections::user::User;
use aero_collections::mail::IMF;
use aero_collections::mail::namespace::MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, MailboxName};
use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::mailbox_view::MailboxView;
use crate::imap::response::Response;
use crate::imap::Body;
use crate::mail::uidindex::*;
use crate::user::User;
use crate::mail::IMF;
use crate::mail::namespace::MAILBOX_HIERARCHY_DELIMITER as MBX_HIER_DELIM_RAW;
pub struct AuthenticatedContext<'a> {
pub req: &'a Command<'static>,
@ -611,7 +610,7 @@ impl<'a> AuthenticatedContext<'a> {
Some(mb) => mb,
None => bail!("Mailbox does not exist"),
};
let mut view = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await;
let view = MailboxView::new(mb, self.client_capabilities.condstore.is_enabled()).await;
if date.is_some() {
tracing::warn!("Cannot set date when appending message");

View file

@ -3,7 +3,7 @@ pub mod anystate;
pub mod authenticated;
pub mod selected;
use crate::mail::namespace::INBOX;
use aero_collections::mail::namespace::INBOX;
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
/// Convert an IMAP mailbox name/identifier representation

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use anyhow::Result;
use imap_codec::imap_types::command::{Command, CommandBody, FetchModifier, StoreModifier};
use imap_codec::imap_types::core::{Charset, Vec1};
use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MacroOrMessageDataItemNames;
use imap_codec::imap_types::flag::{Flag, StoreResponse, StoreType};
use imap_codec::imap_types::mailbox::Mailbox as MailboxCodec;
@ -11,13 +11,14 @@ use imap_codec::imap_types::response::{Code, CodeOther};
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
use aero_collections::user::User;
use crate::imap::attributes::AttributesProxy;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anystate, authenticated, MailboxName};
use crate::imap::flow;
use crate::imap::mailbox_view::{MailboxView, UpdateParameters};
use crate::imap::response::Response;
use crate::user::User;
pub struct SelectedContext<'a> {
pub req: &'a Command<'static>,

View file

@ -5,8 +5,9 @@ use std::sync::Arc;
use imap_codec::imap_types::core::Tag;
use tokio::sync::Notify;
use aero_collections::user::User;
use crate::imap::mailbox_view::MailboxView;
use crate::user::User;
#[derive(Debug)]
pub enum Error {

View file

@ -3,8 +3,8 @@ use std::num::{NonZeroU32, NonZeroU64};
use anyhow::{anyhow, Result};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use crate::mail::uidindex::{ImapUid, ModSeq, UidIndex};
use crate::mail::unique_ident::UniqueIdent;
use aero_collections::mail::uidindex::{ImapUid, ModSeq, UidIndex};
use aero_collections::mail::unique_ident::UniqueIdent;
pub struct Index<'a> {
pub imap_index: Vec<MailIndex<'a>>,

View file

@ -16,7 +16,7 @@ use eml_codec::{
part::{composite::Message, AnyPart},
};
use crate::mail::query::QueryResult;
use aero_collections::mail::query::QueryResult;
use crate::imap::attributes::AttributesProxy;
use crate::imap::flags;

View file

@ -6,18 +6,18 @@ use anyhow::{anyhow, Error, Result};
use futures::stream::{StreamExt, TryStreamExt};
use imap_codec::imap_types::core::{Charset, Vec1};
use imap_codec::imap_types::core::Charset;
use imap_codec::imap_types::fetch::MessageDataItem;
use imap_codec::imap_types::flag::{Flag, FlagFetch, FlagPerm, StoreResponse, StoreType};
use imap_codec::imap_types::response::{Code, CodeOther, Data, Status};
use imap_codec::imap_types::search::SearchKey;
use imap_codec::imap_types::sequence::SequenceSet;
use crate::mail::mailbox::Mailbox;
use crate::mail::query::QueryScope;
use crate::mail::snapshot::FrozenMailbox;
use crate::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
use crate::mail::unique_ident::UniqueIdent;
use aero_collections::mail::mailbox::Mailbox;
use aero_collections::mail::query::QueryScope;
use aero_collections::mail::snapshot::FrozenMailbox;
use aero_collections::mail::uidindex::{ImapUid, ImapUidvalidity, ModSeq};
use aero_collections::mail::unique_ident::UniqueIdent;
use crate::imap::attributes::AttributesProxy;
use crate::imap::flags;

View file

@ -384,6 +384,8 @@ impl<'a> NodeMsg<'a> {
})
}
}
#[allow(dead_code)]
struct NodeMult<'a>(&'a NodeMime<'a>, &'a composite::Multipart<'a>);
impl<'a> NodeMult<'a> {
fn structure(&self, is_ext: bool) -> Result<BodyStructure<'static>> {

View file

@ -15,13 +15,11 @@ mod session;
use std::net::SocketAddr;
use anyhow::{anyhow, bail, Context, Result};
use anyhow::{anyhow, bail, Result};
use futures::stream::{FuturesUnordered, StreamExt};
use tokio::net::TcpListener;
use tokio::sync::mpsc;
use tokio::sync::watch;
use imap_codec::imap_types::response::{Code, CommandContinuationRequest, Response, Status};
use imap_codec::imap_types::{core::Text, response::Greeting};
use imap_flow::server::{ServerFlow, ServerFlowEvent, ServerFlowOptions};
@ -29,12 +27,13 @@ use imap_flow::stream::AnyStream;
use rustls_pemfile::{certs, private_key};
use tokio_rustls::TlsAcceptor;
use crate::config::{ImapConfig, ImapUnsecureConfig};
use aero_user::config::{ImapConfig, ImapUnsecureConfig};
use aero_user::login::ArcLoginProvider;
use crate::imap::capability::ServerCapability;
use crate::imap::request::Request;
use crate::imap::response::{Body, ResponseOrIdle};
use crate::imap::session::Instance;
use crate::login::ArcLoginProvider;
/// Server is a thin wrapper to register our Services in BàL
pub struct Server {
@ -140,7 +139,6 @@ impl Server {
use std::sync::Arc;
use tokio::sync::mpsc::*;
use tokio::sync::Notify;
use tokio_util::bytes::BytesMut;
const PIPELINABLE_COMMANDS: usize = 64;
@ -325,8 +323,6 @@ impl NetLoop {
self.server.enqueue_status(Status::bye(None, "Internal session exited").unwrap());
tracing::error!("session task exited for {:?}, quitting", self.ctx.addr);
},
Some(_) => unreachable!(),
},
// When receiving a CTRL+C

View file

@ -4,9 +4,10 @@ use imap_codec::imap_types::core::Vec1;
use imap_codec::imap_types::search::{MetadataItemSearch, SearchKey};
use imap_codec::imap_types::sequence::{SeqOrUid, Sequence, SequenceSet};
use aero_collections::mail::query::QueryScope;
use crate::imap::index::MailIndex;
use crate::imap::mail_view::MailView;
use crate::mail::query::QueryScope;
pub enum SeqType {
Undefined,

View file

@ -1,11 +1,13 @@
use anyhow::{anyhow, bail, Context, Result};
use imap_codec::imap_types::{command::Command, core::Tag};
use aero_user::login::ArcLoginProvider;
use crate::imap::capability::{ClientCapability, ServerCapability};
use crate::imap::command::{anonymous, authenticated, selected};
use crate::imap::flow;
use crate::imap::request::Request;
use crate::imap::response::{Response, ResponseOrIdle};
use crate::login::ArcLoginProvider;
use anyhow::{anyhow, bail, Context, Result};
use imap_codec::imap_types::{command::Command, core::Tag};
//-----
pub struct Instance {

6
aero-proto/src/lib.rs Normal file
View file

@ -0,0 +1,6 @@
#![feature(async_closure)]
pub mod dav;
pub mod imap;
pub mod lmtp;
pub mod sasl;

View file

@ -10,18 +10,16 @@ use futures::{
stream::{FuturesOrdered, FuturesUnordered},
StreamExt,
};
use log::*;
use tokio::net::TcpListener;
use tokio::select;
use tokio::sync::watch;
use tokio_util::compat::*;
use smtp_message::{DataUnescaper, Email, EscapedDataReader, Reply, ReplyCode};
use smtp_server::{reply, Config, ConnectionMetadata, Decision, MailMetadata};
use crate::config::*;
use crate::login::*;
use crate::mail::incoming::EncryptedMessage;
use aero_user::config::*;
use aero_user::login::*;
use aero_collections::mail::incoming::EncryptedMessage;
pub struct LmtpServer {
bind_addr: SocketAddr,
@ -43,7 +41,7 @@ impl LmtpServer {
pub async fn run(self: &Arc<Self>, mut must_exit: watch::Receiver<bool>) -> Result<()> {
let tcp = TcpListener::bind(self.bind_addr).await?;
info!("LMTP server listening on {:#}", self.bind_addr);
tracing::info!("LMTP server listening on {:#}", self.bind_addr);
let mut connections = FuturesUnordered::new();
@ -60,7 +58,7 @@ impl LmtpServer {
_ = wait_conn_finished => continue,
_ = must_exit.changed() => continue,
};
info!("LMTP: accepted connection from {}", remote_addr);
tracing::info!("LMTP: accepted connection from {}", remote_addr);
let conn = tokio::spawn(smtp_server::interact(
socket.compat(),
@ -73,7 +71,7 @@ impl LmtpServer {
}
drop(tcp);
info!("LMTP server shutting down, draining remaining connections...");
tracing::info!("LMTP server shutting down, draining remaining connections...");
while connections.next().await.is_some() {}
Ok(())

View file

@ -6,10 +6,11 @@ use tokio::io::BufStream;
use tokio::io::{AsyncBufReadExt, AsyncWriteExt};
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::watch;
use tokio_util::bytes::BytesMut;
use aero_user::config::AuthConfig;
use aero_user::login::ArcLoginProvider;
use aero_sasl::{flow::State, decode::client_command, encode::Encode};
pub struct AuthServer {
login_provider: ArcLoginProvider,
@ -108,7 +109,8 @@ impl NetLoop {
tracing::trace!(cmd=?cmd, "Received command");
// Make some progress in our local state
self.state.progress(cmd, &self.login).await;
let login = async |user: String, pass: String| self.login.login(user.as_str(), pass.as_str()).await.is_ok();
self.state.progress(cmd, login).await;
if matches!(self.state, State::Error) {
bail!("Internal state is in error, previous logs explain what went wrong");
}

View file

@ -28,9 +28,9 @@ impl State {
Self::Init
}
async fn try_auth_plain<'a, X, F>(&self, data: &'a [u8], login: X) -> AuthRes
async fn try_auth_plain<X, F>(&self, data: &[u8], login: X) -> AuthRes
where
X: FnOnce(&'a str, &'a str) -> F,
X: FnOnce(String, String) -> F,
F: Future<Output=bool>,
{
// Check that we can extract user's login+pass
@ -56,7 +56,7 @@ impl State {
};
// Try to connect user
match login(user, password).await {
match login(user.to_string(), password.to_string()).await {
true => AuthRes::Success(user.to_string()),
false => {
tracing::warn!("login failed");
@ -67,7 +67,7 @@ impl State {
pub async fn progress<F,X>(&mut self, cmd: ClientCommand, login: X)
where
X: FnOnce(&str, &str) -> F,
X: FnOnce(String, String) -> F,
F: Future<Output=bool>,
{
let new_state = 'state: {