boitalettres/src/server/pipeline.rs

137 lines
3.5 KiB
Rust

use std::pin::Pin;
use std::task::{self, Poll};
use bytes::BytesMut;
use futures::io::{AsyncRead, AsyncWrite};
use futures::sink::Sink;
use futures::stream::Stream;
use crate::proto::{Request, Response};
type Error = Box<dyn std::error::Error + Send + Sync + 'static>;
#[pin_project::pin_project]
pub struct Connection<C> {
#[pin]
conn: C,
read_buf: BytesMut,
write_buf: BytesMut,
}
impl<C> Connection<C> {
pub fn new(conn: C) -> Self {
Self {
conn,
read_buf: BytesMut::new(),
write_buf: BytesMut::new(),
}
}
}
impl<C> Stream for Connection<C>
where
C: AsyncRead + Unpin,
{
type Item = Result<Request, Error>;
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
use imap_codec::parse::command::command as parse_command;
let mut this = self.project();
loop {
let (input, command) = match parse_command(this.read_buf) {
Ok(res) => res,
Err(e) if e.is_incomplete() => {
let mut buf = [0u8; 256];
let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?;
let data = &buf[..read];
this.read_buf.extend(data);
continue;
}
Err(e) => {
return Poll::Ready(Some(Err(format!("Error: {:?}", e).into())));
}
};
tracing::debug!("Received: {:#?}", command);
*this.read_buf = input.into();
let req = command;
return Poll::Ready(Some(Ok(req)));
}
}
}
impl<C> Connection<C>
where
C: AsyncWrite,
{
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<(), Error>> {
use bytes::Buf;
let mut this = self.project();
while !this.write_buf.is_empty() {
let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?;
this.write_buf.advance(written);
}
Poll::Ready(Ok(()))
}
pub(crate) fn send(&mut self, item: Response) -> Result<(), Error> {
use bytes::BufMut;
use imap_codec::codec::Encode;
let mut writer = BufMut::writer(&mut self.write_buf);
tracing::debug!("Sending: {:#?}", item);
item.encode(&mut writer)?;
Ok(())
}
}
impl<C> Sink<Response> for Connection<C>
where
C: AsyncWrite + Unpin,
{
type Error = Error;
fn poll_ready(
self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.poll_flush_buffer(cx))?;
Poll::Ready(Ok(()))
}
fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
debug_assert!(self.write_buf.is_empty());
self.get_mut().send(item)?;
Ok(())
}
fn poll_flush(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.as_mut().poll_flush_buffer(cx))?;
futures::ready!(self.project().conn.poll_flush(cx))?;
Poll::Ready(Ok(()))
}
fn poll_close(
mut self: Pin<&mut Self>,
cx: &mut task::Context<'_>,
) -> Poll<Result<(), Self::Error>> {
futures::ready!(self.as_mut().poll_flush_buffer(cx))?;
futures::ready!(self.project().conn.poll_close(cx))?;
Poll::Ready(Ok(()))
}
}