From f9db9a4b696569bbc56c40b9170320307ebcdd81 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 13:23:42 +0200 Subject: [PATCH] Simplify send.rs --- src/send.rs | 205 ++++++++++++-------------------------------------- src/stream.rs | 29 ++++--- 2 files changed, 68 insertions(+), 166 deletions(-) diff --git a/src/send.rs b/src/send.rs index 59805cf..a8cf966 100644 --- a/src/send.rs +++ b/src/send.rs @@ -8,7 +8,6 @@ use bytes::Bytes; use log::trace; use futures::AsyncWriteExt; -use futures::Stream; use kuska_handshake::async_std::BoxStreamWrite; use tokio::sync::mpsc; @@ -30,152 +29,14 @@ pub(crate) const MAX_CHUNK_LENGTH: ChunkLength = 0x3FF0; pub(crate) const ERROR_MARKER: ChunkLength = 0x4000; pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000; +struct SendQueue { + items: VecDeque<(u8, VecDeque)>, +} + struct SendQueueItem { id: RequestID, prio: RequestPriority, - data: DataReader, -} - -#[pin_project::pin_project] -struct DataReader { - #[pin] - reader: ByteStream, - packet: Packet, - pos: usize, - buf: Vec, - eos: bool, -} - -impl From for DataReader { - fn from(data: ByteStream) -> DataReader { - DataReader { - reader: data, - packet: Ok(Bytes::new()), - pos: 0, - buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), - eos: false, - } - } -} - -enum DataFrame { - Data { - /// a fixed size buffer containing some data, possibly padded with 0s - data: [u8; MAX_CHUNK_LENGTH as usize], - /// actual lenght of data - len: usize, - /// whethere there may be more data comming from this stream. Can be used for some - /// optimization. It's an error to set it to false if there is more data, but it is correct - /// (albeit sub-optimal) to set it to true if there is nothing coming after - may_have_more: bool, - }, - /// An error code automatically signals the end of the stream - Error(u8), -} - -impl DataFrame { - fn empty_last() -> Self { - DataFrame::Data { - data: [0; MAX_CHUNK_LENGTH as usize], - len: 0, - may_have_more: false, - } - } - - fn header(&self) -> [u8; 2] { - let header_u16 = match self { - DataFrame::Data { - len, - may_have_more: false, - .. - } => *len as u16, - DataFrame::Data { - len, - may_have_more: true, - .. - } => *len as u16 | CHUNK_HAS_CONTINUATION, - DataFrame::Error(e) => *e as u16 | ERROR_MARKER, - }; - ChunkLength::to_be_bytes(header_u16) - } - - fn data(&self) -> &[u8] { - match self { - DataFrame::Data { ref data, len, .. } => &data[..*len], - DataFrame::Error(_) => &[], - } - } - - fn may_have_more(&self) -> bool { - match self { - DataFrame::Data { may_have_more, .. } => *may_have_more, - DataFrame::Error(_) => false, - } - } -} - -impl Stream for DataReader { - type Item = DataFrame; - - fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let mut this = self.project(); - - if *this.eos { - // eos was reached at previous call to poll_next, where a partial packet - // was returned. Now return None - return Poll::Ready(None); - } - - loop { - let packet = match this.packet { - Ok(v) => v, - Err(e) => { - let e = *e; - *this.packet = Ok(Bytes::new()); - *this.eos = true; - return Poll::Ready(Some(DataFrame::Error(e))); - } - }; - let packet_left = packet.len() - *this.pos; - let buf_left = MAX_CHUNK_LENGTH as usize - this.buf.len(); - let to_read = std::cmp::min(buf_left, packet_left); - this.buf - .extend_from_slice(&packet[*this.pos..*this.pos + to_read]); - *this.pos += to_read; - if this.buf.len() == MAX_CHUNK_LENGTH as usize { - // we have a full buf, ready to send - break; - } - - // we don't have a full buf, packet is empty; try receive more - if let Some(p) = futures::ready!(this.reader.as_mut().poll_next(cx)) { - *this.packet = p; - *this.pos = 0; - // if buf is empty, we will loop and return the error directly. If buf - // isn't empty, send it before by breaking. - if this.packet.is_err() && !this.buf.is_empty() { - break; - } - } else { - *this.eos = true; - break; - } - } - - let mut body = [0; MAX_CHUNK_LENGTH as usize]; - let len = this.buf.len(); - body[..len].copy_from_slice(this.buf); - this.buf.clear(); - Poll::Ready(Some(DataFrame::Data { - data: body, - len, - may_have_more: !*this.eos, - })) - } -} - -struct SendQueue { - items: VecDeque<(u8, VecDeque)>, + data: ByteStreamReader, } impl SendQueue { @@ -232,35 +93,69 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> { let mut ready_item = None; for (j, item) in items_at_prio.iter_mut().enumerate() { - match Pin::new(&mut item.data).poll_next(ctx) { + let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); + match Pin::new(&mut item_reader).poll(ctx) { Poll::Pending => (), Poll::Ready(ready_v) => { - ready_item = Some((j, ready_v)); + ready_item = Some((j, ready_v, item.data.eos())); break; } } } - if let Some((j, ready_v)) = ready_item { + if let Some((j, bytes_or_err, eos)) = ready_item { + let data_frame = match bytes_or_err { + Ok(bytes) => DataFrame::Data(bytes, !eos), + Err(e) => DataFrame::Error(match e { + ReadExactError::Stream(code) => code, + _ => unreachable!(), + }), + }; let item = items_at_prio.remove(j).unwrap(); let id = item.id; - if ready_v - .as_ref() - .map(|data| data.may_have_more()) - .unwrap_or(false) - { + if !eos { items_at_prio.push_back(item); } else if items_at_prio.is_empty() { self.queue.items.remove(i); } - return Poll::Ready((id, ready_v.unwrap_or_else(DataFrame::empty_last))); + return Poll::Ready((id, data_frame)); } } - // TODO what do we do if self.queue is empty? We won't get scheduled again. + // If the queue is empty, this futures is eternally pending. + // This is ok because we use it in a select with another future + // that can interrupt it. Poll::Pending } } +enum DataFrame { + /// a fixed size buffer containing some data + a boolean indicating whether + /// there may be more data comming from this stream. Can be used for some + /// optimization. It's an error to set it to false if there is more data, but it is correct + /// (albeit sub-optimal) to set it to true if there is nothing coming after + Data(Bytes, bool), + /// An error code automatically signals the end of the stream + Error(u8), +} + +impl DataFrame { + fn header(&self) -> [u8; 2] { + let header_u16 = match self { + DataFrame::Data(data, false) => data.len() as u16, + DataFrame::Data(data, true) => data.len() as u16 | CHUNK_HAS_CONTINUATION, + DataFrame::Error(e) => *e as u16 | ERROR_MARKER, + }; + ChunkLength::to_be_bytes(header_u16) + } + + fn data(&self) -> &[u8] { + match self { + DataFrame::Data(ref data, _) => &data[..], + DataFrame::Error(_) => &[], + } + } +} + /// The SendLoop trait, which is implemented both by the client and the server /// connection objects (ServerConna and ClientConn) adds a method `.send_loop()` /// that takes a channel of messages to send and an asynchronous writer, @@ -295,7 +190,7 @@ pub(crate) trait SendLoop: Sync { sending.push(SendQueueItem { id, prio, - data: data.into(), + data: ByteStreamReader::new(data), }); } else { should_exit = true; diff --git a/src/stream.rs b/src/stream.rs index 6c23f4a..ae57d62 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -82,6 +82,23 @@ impl ByteStreamReader { } } + pub fn take_buffer(&mut self) -> Bytes { + let bytes = Bytes::from( + self .buf + .iter() + .map(|x| &x[..]) + .collect::>() + .concat(), + ); + self.buf.clear(); + self.buf_len = 0; + bytes + } + + pub fn eos(&self) -> bool { + self.buf.is_empty() && self.eos + } + fn try_get(&mut self, read_len: usize) -> Option { if self.buf_len >= read_len { let mut slices = Vec::with_capacity(self.buf.len()); @@ -144,17 +161,7 @@ impl<'a> Future for ByteStreamReadExact<'a> { if *this.fail_on_eos { return Poll::Ready(Err(ReadExactError::UnexpectedEos)); } else { - let bytes = Bytes::from( - this.reader - .buf - .iter() - .map(|x| &x[..]) - .collect::>() - .concat(), - ); - this.reader.buf.clear(); - this.reader.buf_len = 0; - return Poll::Ready(Ok(bytes)); + return Poll::Ready(Ok(this.reader.take_buffer())); } }