parent
c8e0aefc42
commit
ce339cf89b
4 changed files with 26 additions and 0 deletions
|
@ -70,6 +70,7 @@ impl fmt::Debug for Body {
|
|||
pub enum Data {
|
||||
Data(ImapData),
|
||||
Status(ImapStatus),
|
||||
Close,
|
||||
}
|
||||
|
||||
impl Encode for Data {
|
||||
|
@ -77,6 +78,7 @@ impl Encode for Data {
|
|||
match self {
|
||||
Data::Data(ref data) => data.encode(writer),
|
||||
Data::Status(ref status) => status.encode(writer),
|
||||
Data::Close => Ok(()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -13,6 +13,7 @@ pub(crate) mod stream;
|
|||
pub struct Response {
|
||||
pub(crate) status: Status,
|
||||
pub(crate) body: Option<Body>,
|
||||
pub(crate) close: bool,
|
||||
}
|
||||
|
||||
impl Response {
|
||||
|
@ -20,6 +21,7 @@ impl Response {
|
|||
Ok(Response {
|
||||
status: Status::new(code, msg)?,
|
||||
body: None,
|
||||
close: false,
|
||||
})
|
||||
}
|
||||
|
||||
|
@ -50,6 +52,11 @@ impl Response {
|
|||
self.body = Some(body.into());
|
||||
self
|
||||
}
|
||||
|
||||
pub fn close(mut self, close: bool) -> Self {
|
||||
self.close = close;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Response {
|
||||
|
|
|
@ -5,6 +5,7 @@ use super::body::Data;
|
|||
use super::Response;
|
||||
|
||||
pub fn response_stream(res: Response, tag: Option<Tag>) -> impl Stream<Item = Data> {
|
||||
let close = res.close;
|
||||
let (body, status) = res.split();
|
||||
let body = body.map(|body| body.into_stream());
|
||||
|
||||
|
@ -19,5 +20,9 @@ pub fn response_stream(res: Response, tag: Option<Tag>) -> impl Stream<Item = Da
|
|||
let item = status.into_imap(tag);
|
||||
tracing::trace!(?item, "response_stream.yield");
|
||||
yield item.into();
|
||||
|
||||
if close {
|
||||
yield Data::Close;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -23,6 +23,8 @@ pub struct Connection<C> {
|
|||
#[pin]
|
||||
outbox: ConcatAll<BoxStream<'static, Data>>,
|
||||
write_buf: BytesMut,
|
||||
|
||||
close: bool,
|
||||
}
|
||||
|
||||
impl<C> Connection<C> {
|
||||
|
@ -34,6 +36,8 @@ impl<C> Connection<C> {
|
|||
|
||||
outbox: ConcatAll::new(),
|
||||
write_buf: BytesMut::new(),
|
||||
|
||||
close: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -105,6 +109,10 @@ where
|
|||
while let Poll::Ready(Some(data)) = this.outbox.as_mut().poll_next(cx) {
|
||||
tracing::trace!(?data, "transport.write_buf");
|
||||
|
||||
if let Data::Close = data {
|
||||
*this.close = true;
|
||||
}
|
||||
|
||||
if let Err(err) = data.encode(&mut writer) {
|
||||
tracing::error!(?err, "transport.encode_error");
|
||||
return Poll::Ready(Err(Box::new(err)));
|
||||
|
@ -118,6 +126,10 @@ where
|
|||
}
|
||||
this.write_buf.clear();
|
||||
|
||||
if *this.close {
|
||||
futures::ready!(this.conn.as_mut().poll_close(cx))?;
|
||||
}
|
||||
|
||||
Poll::Ready(Ok(()))
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue