boitalettres/src/server/pipeline.rs

201 lines
5.6 KiB
Rust
Raw Normal View History

2022-05-09 17:15:52 +00:00
use std::pin::Pin;
use std::task::{self, Poll};
use bytes::BytesMut;
use futures::prelude::*;
use futures::stream::BoxStream;
2022-05-23 01:20:06 +00:00
use imap_codec::types::core::Tag;
2022-05-09 17:15:52 +00:00
use crate::proto::res::body::Data;
2022-05-09 17:15:52 +00:00
use crate::proto::{Request, Response};
use crate::util::stream::ConcatAll;
2022-05-09 17:15:52 +00:00
type Error = tower::BoxError;
type Result<T, E = Error> = std::result::Result<T, E>;
2022-05-09 17:15:52 +00:00
#[pin_project::pin_project]
pub struct Connection<C> {
#[pin]
pub conn: C,
2022-05-09 17:15:52 +00:00
read_buf: BytesMut,
#[pin]
outbox: ConcatAll<BoxStream<'static, Data>>,
2022-05-09 17:15:52 +00:00
write_buf: BytesMut,
}
impl<C> Connection<C> {
pub fn new(conn: C) -> Self {
Self {
conn,
read_buf: BytesMut::with_capacity(1024),
outbox: ConcatAll::new(),
2022-05-09 17:15:52 +00:00
write_buf: BytesMut::new(),
}
}
}
impl<C> Stream for Connection<C>
where
C: AsyncRead + Unpin,
{
type Item = Result<Request>;
2022-05-09 17:15:52 +00:00
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
use imap_codec::parse::command::command as parse_command;
let mut this = self.project();
loop {
let (input, command) = match parse_command(this.read_buf) {
Ok(res) => res,
Err(e) if e.is_incomplete() => {
let mut buf = [0u8; 256];
2022-05-11 17:15:12 +00:00
tracing::trace!("transport.poll_read");
// let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?;
let read = match this.conn.as_mut().poll_read(cx, &mut buf) {
Poll::Ready(res) => res?,
Poll::Pending => {
tracing::trace!("transport.pending");
return Poll::Pending;
}
};
2022-05-11 17:15:12 +00:00
tracing::trace!(read = read, "transport.poll_next");
if read == 0 {
return Poll::Ready(None);
}
2022-05-09 17:15:52 +00:00
let data = &buf[..read];
this.read_buf.extend(data);
continue;
}
Err(e) => {
return Poll::Ready(Some(Err(format!("Error: {:?}", e).into())));
}
};
2022-05-11 17:15:12 +00:00
tracing::debug!(command = ?command, "transport.recv");
2022-05-09 17:15:52 +00:00
*this.read_buf = input.into();
let req = Request { command };
2022-05-09 17:15:52 +00:00
return Poll::Ready(Some(Ok(req)));
}
}
}
impl<C> Connection<C>
where
C: AsyncWrite,
{
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<()>> {
use bytes::{Buf, BufMut};
use imap_codec::codec::Encode;
2022-05-09 17:15:52 +00:00
let mut this = self.project();
tracing::debug!(size = this.outbox.len(), "transport.flush_outbox");
let mut writer = this.write_buf.writer();
while let Poll::Ready(Some(data)) = this.outbox.as_mut().poll_next(cx) {
tracing::trace!(?data, "transport.write_buf");
if let Err(err) = data.encode(&mut writer) {
tracing::error!(?err, "transport.encode_error");
return Poll::Ready(Err(Box::new(err)));
}
}
2022-05-11 17:15:12 +00:00
tracing::debug!(size = this.write_buf.len(), "transport.flush_buffer");
2022-05-09 17:15:52 +00:00
while !this.write_buf.is_empty() {
let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?;
this.write_buf.advance(written);
}
this.write_buf.clear();
2022-05-09 17:15:52 +00:00
Poll::Ready(Ok(()))
}
}
2022-05-23 01:20:06 +00:00
impl<C> Sink<(Option<Tag>, Response)> for Connection<C>
2022-05-09 17:15:52 +00:00
where
C: AsyncWrite + Unpin,
{
type Error = Error;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.poll_flush_buffer(cx))?;
Poll::Ready(Ok(()))
}
2022-05-23 01:20:06 +00:00
fn start_send(
mut self: Pin<&mut Self>,
2022-05-23 01:20:06 +00:00
(tag, res): (Option<Tag>, Response),
) -> Result<(), Self::Error> {
use crate::proto::res::stream::response_stream;
2022-05-23 01:20:06 +00:00
tracing::debug!(?tag, ?res, "transport.start_send");
self.outbox.push(Box::pin(response_stream(res, tag)));
2022-05-09 17:15:52 +00:00
Ok(())
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.as_mut().poll_flush_buffer(cx))?;
futures::ready!(self.project().conn.poll_flush(cx))?;
Poll::Ready(Ok(()))
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.as_mut().poll_flush_buffer(cx))?;
futures::ready!(self.project().conn.poll_close(cx))?;
Poll::Ready(Ok(()))
}
}
impl<C> Sink<Response> for Connection<C>
where
Self: Sink<(Option<Tag>, Response)>,
{
type Error = <Connection<C> as Sink<(Option<Tag>, Response)>>::Error;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
<Self as Sink<(Option<Tag>, Response)>>::poll_ready(self, cx)
}
fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
<Self as Sink<(Option<Tag>, Response)>>::start_send(self, (None, item))
}
fn poll_flush(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
<Self as Sink<(Option<Tag>, Response)>>::poll_flush(self, cx)
}
fn poll_close(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
<Self as Sink<(Option<Tag>, Response)>>::poll_close(self, cx)
}
}