Correctly defuse cancellation on simple requests

This commit is contained in:
Alex 2022-09-01 16:10:38 +02:00
parent b931d0d1cf
commit b82ad70dd5
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
2 changed files with 24 additions and 12 deletions

View file

@ -454,6 +454,8 @@ impl RespEnc {
let msg_len = reader.read_u32().await?; let msg_len = reader.read_u32().await?;
let msg = reader.read_exact(msg_len as usize).await?; let msg = reader.read_exact(msg_len as usize).await?;
reader.fill_buffer().await;
Ok(Self { Ok(Self {
msg, msg,
stream: Some(reader.into_stream()), stream: Some(reader.into_stream()),

View file

@ -95,6 +95,26 @@ impl ByteStreamReader {
fn try_get(&mut self, read_len: usize) -> Option<Bytes> { fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
self.buf.take_exact(read_len) self.buf.take_exact(read_len)
} }
fn add_stream_next(&mut self, packet: Option<Packet>) {
match packet {
Some(Ok(slice)) => {
self.buf.extend(slice);
}
Some(Err(e)) => {
self.err = Some(e);
self.eos = true;
}
None => {
self.eos = true;
}
}
}
pub async fn fill_buffer(&mut self) {
let packet = self.stream.next().await;
self.add_stream_next(packet);
}
} }
pub enum ReadExactError { pub enum ReadExactError {
@ -132,18 +152,8 @@ impl<'a> Future for ByteStreamReadExact<'a> {
} }
} }
match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { let next_packet = futures::ready!(this.reader.stream.as_mut().poll_next(cx));
Some(Ok(slice)) => { this.reader.add_stream_next(next_packet);
this.reader.buf.extend(slice);
}
Some(Err(e)) => {
this.reader.err = Some(e);
this.reader.eos = true;
}
None => {
this.reader.eos = true;
}
}
} }
} }
} }