From ce339cf89b5742dbdb8eadac9ec7a8fc5703e15e Mon Sep 17 00:00:00 2001 From: KokaKiwi Date: Mon, 25 Jul 2022 13:12:39 +0200 Subject: [PATCH] Support closing connection after a Response Closes #11 --- src/proto/res/body.rs | 2 ++ src/proto/res/mod.rs | 7 +++++++ src/proto/res/stream.rs | 5 +++++ src/server/pipeline.rs | 12 ++++++++++++ 4 files changed, 26 insertions(+) diff --git a/src/proto/res/body.rs b/src/proto/res/body.rs index 048a90c..a2ab9b8 100644 --- a/src/proto/res/body.rs +++ b/src/proto/res/body.rs @@ -70,6 +70,7 @@ impl fmt::Debug for Body { pub enum Data { Data(ImapData), Status(ImapStatus), + Close, } impl Encode for Data { @@ -77,6 +78,7 @@ impl Encode for Data { match self { Data::Data(ref data) => data.encode(writer), Data::Status(ref status) => status.encode(writer), + Data::Close => Ok(()), } } } diff --git a/src/proto/res/mod.rs b/src/proto/res/mod.rs index e970d51..d1d9896 100644 --- a/src/proto/res/mod.rs +++ b/src/proto/res/mod.rs @@ -13,6 +13,7 @@ pub(crate) mod stream; pub struct Response { pub(crate) status: Status, pub(crate) body: Option, + pub(crate) close: bool, } impl Response { @@ -20,6 +21,7 @@ impl Response { Ok(Response { status: Status::new(code, msg)?, body: None, + close: false, }) } @@ -50,6 +52,11 @@ impl Response { self.body = Some(body.into()); self } + + pub fn close(mut self, close: bool) -> Self { + self.close = close; + self + } } impl Response { diff --git a/src/proto/res/stream.rs b/src/proto/res/stream.rs index 99a4f7a..77fe4a7 100644 --- a/src/proto/res/stream.rs +++ b/src/proto/res/stream.rs @@ -5,6 +5,7 @@ use super::body::Data; use super::Response; pub fn response_stream(res: Response, tag: Option) -> impl Stream { + let close = res.close; let (body, status) = res.split(); let body = body.map(|body| body.into_stream()); @@ -19,5 +20,9 @@ pub fn response_stream(res: Response, tag: Option) -> impl Stream { #[pin] outbox: ConcatAll>, write_buf: BytesMut, + + close: bool, } impl Connection { @@ -34,6 +36,8 @@ impl Connection { outbox: ConcatAll::new(), write_buf: BytesMut::new(), + + close: false, } } } @@ -105,6 +109,10 @@ where while let Poll::Ready(Some(data)) = this.outbox.as_mut().poll_next(cx) { tracing::trace!(?data, "transport.write_buf"); + if let Data::Close = data { + *this.close = true; + } + if let Err(err) = data.encode(&mut writer) { tracing::error!(?err, "transport.encode_error"); return Poll::Ready(Err(Box::new(err))); @@ -118,6 +126,10 @@ where } this.write_buf.clear(); + if *this.close { + futures::ready!(this.conn.as_mut().poll_close(cx))?; + } + Poll::Ready(Ok(())) } }