145 lines
3.7 KiB
Rust
145 lines
3.7 KiB
Rust
use std::pin::Pin;
|
|
use std::task::{self, Poll};
|
|
|
|
use bytes::BytesMut;
|
|
use futures::io::{AsyncRead, AsyncWrite};
|
|
use futures::sink::Sink;
|
|
use futures::stream::Stream;
|
|
|
|
use crate::proto::{Request, Response};
|
|
|
|
type Error = tower::BoxError;
|
|
type Result<T, E = Error> = std::result::Result<T, E>;
|
|
|
|
#[pin_project::pin_project]
|
|
pub struct Connection<C> {
|
|
#[pin]
|
|
conn: C,
|
|
read_buf: BytesMut,
|
|
write_buf: BytesMut,
|
|
}
|
|
|
|
impl<C> Connection<C> {
|
|
pub fn new(conn: C) -> Self {
|
|
Self {
|
|
conn,
|
|
read_buf: BytesMut::new(),
|
|
write_buf: BytesMut::new(),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<C> Stream for Connection<C>
|
|
where
|
|
C: AsyncRead + Unpin,
|
|
{
|
|
type Item = Result<Request>;
|
|
|
|
fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
|
|
use imap_codec::parse::command::command as parse_command;
|
|
|
|
let mut this = self.project();
|
|
|
|
loop {
|
|
let (input, command) = match parse_command(this.read_buf) {
|
|
Ok(res) => res,
|
|
Err(e) if e.is_incomplete() => {
|
|
let mut buf = [0u8; 256];
|
|
|
|
let read = futures::ready!(this.conn.as_mut().poll_read(cx, &mut buf))?;
|
|
tracing::trace!(read = read, "transport.poll_next");
|
|
|
|
if read == 0 {
|
|
return Poll::Ready(None);
|
|
}
|
|
|
|
let data = &buf[..read];
|
|
this.read_buf.extend(data);
|
|
|
|
continue;
|
|
}
|
|
Err(e) => {
|
|
return Poll::Ready(Some(Err(format!("Error: {:?}", e).into())));
|
|
}
|
|
};
|
|
tracing::debug!(command = ?command, "transport.recv");
|
|
|
|
*this.read_buf = input.into();
|
|
|
|
let req = command;
|
|
return Poll::Ready(Some(Ok(req)));
|
|
}
|
|
}
|
|
}
|
|
|
|
impl<C> Connection<C>
|
|
where
|
|
C: AsyncWrite,
|
|
{
|
|
fn poll_flush_buffer(self: Pin<&mut Self>, cx: &mut task::Context) -> Poll<Result<()>> {
|
|
use bytes::Buf;
|
|
|
|
let mut this = self.project();
|
|
|
|
tracing::debug!(size = this.write_buf.len(), "transport.flush_buffer");
|
|
while !this.write_buf.is_empty() {
|
|
let written = futures::ready!(this.conn.as_mut().poll_write(cx, this.write_buf))?;
|
|
this.write_buf.advance(written);
|
|
}
|
|
|
|
Poll::Ready(Ok(()))
|
|
}
|
|
|
|
pub(crate) fn send(&mut self, item: Response) -> Result<()> {
|
|
use bytes::BufMut;
|
|
use imap_codec::codec::Encode;
|
|
|
|
let mut writer = BufMut::writer(&mut self.write_buf);
|
|
|
|
tracing::debug!(item = ?item, "transport.send");
|
|
item.encode(&mut writer)?;
|
|
|
|
Ok(())
|
|
}
|
|
}
|
|
|
|
impl<C> Sink<Response> for Connection<C>
|
|
where
|
|
C: AsyncWrite + Unpin,
|
|
{
|
|
type Error = Error;
|
|
|
|
fn poll_ready(
|
|
self: Pin<&mut Self>,
|
|
cx: &mut task::Context<'_>,
|
|
) -> Poll<Result<(), Self::Error>> {
|
|
futures::ready!(self.poll_flush_buffer(cx))?;
|
|
Poll::Ready(Ok(()))
|
|
}
|
|
|
|
fn start_send(self: Pin<&mut Self>, item: Response) -> Result<(), Self::Error> {
|
|
debug_assert!(self.write_buf.is_empty());
|
|
self.get_mut().send(item)?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn poll_flush(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut task::Context<'_>,
|
|
) -> Poll<Result<(), Self::Error>> {
|
|
futures::ready!(self.as_mut().poll_flush_buffer(cx))?;
|
|
futures::ready!(self.project().conn.poll_flush(cx))?;
|
|
Poll::Ready(Ok(()))
|
|
}
|
|
|
|
fn poll_close(
|
|
mut self: Pin<&mut Self>,
|
|
cx: &mut task::Context<'_>,
|
|
) -> Poll<Result<(), Self::Error>> {
|
|
futures::ready!(self.as_mut().poll_flush_buffer(cx))?;
|
|
futures::ready!(self.project().conn.poll_close(cx))?;
|
|
Poll::Ready(Ok(()))
|
|
}
|
|
}
|