add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
5 changed files with 63 additions and 26 deletions
Showing only changes of commit 74e57016f6 - Show all commits

View file

@ -179,10 +179,9 @@ impl ClientConn {
}))); })));
} }
trace!( debug!(
"request: query_send {} (serialized message: {} bytes)", "request: query_send {}, path {}, prio {} (serialized message: {} bytes)",
id, id, path, prio, req_msg_len
req_msg_len
); );
#[cfg(feature = "telemetry")] #[cfg(feature = "telemetry")]
@ -201,7 +200,7 @@ impl ClientConn {
} }
let resp_enc = RespEnc::decode(stream).await?; let resp_enc = RespEnc::decode(stream).await?;
trace!("request response {}", id); debug!("client: got response to request {} (path {})", id, path);
Resp::from_enc(resp_enc) Resp::from_enc(resp_enc)
} }
} }

View file

@ -59,7 +59,6 @@ pub(crate) trait RecvLoop: Sync + 'static {
{ {
let mut streams: HashMap<RequestID, Sender> = HashMap::new(); let mut streams: HashMap<RequestID, Sender> = HashMap::new();
loop { loop {
trace!("recv_loop: reading packet");
let mut header_id = [0u8; RequestID::BITS as usize / 8]; let mut header_id = [0u8; RequestID::BITS as usize / 8];
match read.read_exact(&mut header_id[..]).await { match read.read_exact(&mut header_id[..]).await {
Ok(_) => (), Ok(_) => (),
@ -67,22 +66,31 @@ pub(crate) trait RecvLoop: Sync + 'static {
Err(e) => return Err(e.into()), Err(e) => return Err(e.into()),
}; };
let id = RequestID::from_be_bytes(header_id); let id = RequestID::from_be_bytes(header_id);
trace!("recv_loop: got header id: {:04x}", id);
let mut header_size = [0u8; ChunkLength::BITS as usize / 8]; let mut header_size = [0u8; ChunkLength::BITS as usize / 8];
read.read_exact(&mut header_size[..]).await?; read.read_exact(&mut header_size[..]).await?;
let size = ChunkLength::from_be_bytes(header_size); let size = ChunkLength::from_be_bytes(header_size);
trace!("recv_loop: got header size: {:04x}", size);
let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0; let has_cont = (size & CHUNK_HAS_CONTINUATION) != 0;
let is_error = (size & ERROR_MARKER) != 0; let is_error = (size & ERROR_MARKER) != 0;
let packet = if is_error { let packet = if is_error {
trace!(
"recv_loop: got id {}, header_size {:04x}, error {}",
id,
size,
size & !ERROR_MARKER
);
Err((size & !ERROR_MARKER) as u8) Err((size & !ERROR_MARKER) as u8)
} else { } else {
let size = size & !CHUNK_HAS_CONTINUATION; let size = size & !CHUNK_HAS_CONTINUATION;
let mut next_slice = vec![0; size as usize]; let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?; read.read_exact(&mut next_slice[..]).await?;
trace!("recv_loop: read {} bytes", next_slice.len()); trace!(
"recv_loop: got id {}, header_size {:04x}, {} bytes",
id,
size,
next_slice.len()
);
Ok(Bytes::from(next_slice)) Ok(Bytes::from(next_slice))
}; };
@ -90,6 +98,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
send send
} else { } else {
let (send, recv) = mpsc::channel(4); let (send, recv) = mpsc::channel(4);
trace!("recv_loop: id {} is new channel", id);
self.recv_handler( self.recv_handler(
id, id,
Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)), Box::pin(tokio_stream::wrappers::ReceiverStream::new(recv)),
@ -102,8 +111,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
let _ = sender.send(packet).await; let _ = sender.send(packet).await;
if has_cont { if has_cont {
assert!(!is_error);
streams.insert(id, sender); streams.insert(id, sender);
} else { } else {
trace!("recv_loop: close channel id {}", id);
sender.end(); sender.end();
} }
} }

View file

@ -74,36 +74,54 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
type Output = (RequestID, DataFrame); type Output = (RequestID, DataFrame);
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> { fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
for i in 0..self.queue.items.len() { for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() {
let (_prio, items_at_prio) = &mut self.queue.items[i];
let mut ready_item = None; let mut ready_item = None;
for (j, item) in items_at_prio.iter_mut().enumerate() { for (j, item) in items_at_prio.iter_mut().enumerate() {
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize); let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
match Pin::new(&mut item_reader).poll(ctx) { match Pin::new(&mut item_reader).poll(ctx) {
Poll::Pending => (), Poll::Pending => (),
Poll::Ready(ready_v) => { Poll::Ready(ready_v) => {
ready_item = Some((j, ready_v, item.data.eos())); ready_item = Some((j, ready_v));
break; break;
} }
} }
} }
if let Some((j, bytes_or_err, eos)) = ready_item { if let Some((j, bytes_or_err)) = ready_item {
let item = items_at_prio.remove(j).unwrap();
let id = item.id;
let eos = item.data.eos();
let data_frame = match bytes_or_err { let data_frame = match bytes_or_err {
Ok(bytes) => DataFrame::Data(bytes, !eos), Ok(bytes) => {
trace!(
"send queue poll next ready: id {} eos {:?} bytes {}",
id,
eos,
bytes.len()
);
DataFrame::Data(bytes, !eos)
}
Err(e) => DataFrame::Error(match e { Err(e) => DataFrame::Error(match e {
ReadExactError::Stream(code) => code, ReadExactError::Stream(code) => {
trace!(
"send queue poll next ready: id {} eos {:?} ERROR {}",
id,
eos,
code
);
code
}
_ => unreachable!(), _ => unreachable!(),
}), }),
}; };
let item = items_at_prio.remove(j).unwrap();
let id = item.id; if !eos && !matches!(data_frame, DataFrame::Error(_)) {
if !eos {
items_at_prio.push_back(item); items_at_prio.push_back(item);
} else if items_at_prio.is_empty() { } else if items_at_prio.is_empty() {
self.queue.items.remove(i); self.queue.items.remove(i);
} }
return Poll::Ready((id, data_frame)); return Poll::Ready((id, data_frame));
} }
} }
@ -173,6 +191,7 @@ pub(crate) trait SendLoop: Sync {
match futures::future::select(recv_fut, send_fut).await { match futures::future::select(recv_fut, send_fut).await {
Either::Left((sth, _send_fut)) => { Either::Left((sth, _send_fut)) => {
if let Some((id, prio, data)) = sth { if let Some((id, prio, data)) = sth {
trace!("send_loop: add stream {} to send", id);
sending.push(SendQueueItem { sending.push(SendQueueItem {
id, id,
prio, prio,
@ -183,7 +202,12 @@ pub(crate) trait SendLoop: Sync {
}; };
} }
Either::Right(((id, data), _recv_fut)) => { Either::Right(((id, data), _recv_fut)) => {
trace!("send_loop: sending bytes for {}", id); trace!(
"send_loop: id {}, send {} bytes, header_size {}",
id,
data.data().len(),
hex::encode(data.header())
);
let header_id = RequestID::to_be_bytes(id); let header_id = RequestID::to_be_bytes(id);
write.write_all(&header_id[..]).await?; write.write_all(&header_id[..]).await?;

View file

@ -3,7 +3,7 @@ use std::sync::Arc;
use arc_swap::ArcSwapOption; use arc_swap::ArcSwapOption;
use async_trait::async_trait; use async_trait::async_trait;
use log::{debug, trace}; use log::*;
use futures::io::{AsyncReadExt, AsyncWriteExt}; use futures::io::{AsyncReadExt, AsyncWriteExt};
use kuska_handshake::async_std::{handshake_server, BoxStream}; use kuska_handshake::async_std::{handshake_server, BoxStream};
@ -175,7 +175,8 @@ impl RecvLoop for ServerConn {
let self2 = self.clone(); let self2 = self.clone();
tokio::spawn(async move { tokio::spawn(async move {
trace!("ServerConn recv_handler {}", id); debug!("server: recv_handler got {}", id);
let (prio, resp_enc) = match ReqEnc::decode(stream).await { let (prio, resp_enc) = match ReqEnc::decode(stream).await {
Ok(req_enc) => { Ok(req_enc) => {
let prio = req_enc.prio; let prio = req_enc.prio;
@ -192,7 +193,7 @@ impl RecvLoop for ServerConn {
Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)), Err(e) => (PRIO_NORMAL, RespEnc::from_err(e)),
}; };
trace!("ServerConn sending response to {}: ", id); debug!("server: sending response to {}", id);
resp_send resp_send
.send((id, prio, resp_enc.encode())) .send((id, prio, resp_enc.encode()))

View file

@ -93,7 +93,7 @@ impl ByteStreamReader {
} }
pub fn eos(&self) -> bool { pub fn eos(&self) -> bool {
self.buf.is_empty() && self.eos self.buf_len == 0 && self.eos
} }
fn try_get(&mut self, read_len: usize) -> Option<Bytes> { fn try_get(&mut self, read_len: usize) -> Option<Bytes> {
@ -164,8 +164,10 @@ impl<'a> Future for ByteStreamReadExact<'a> {
match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) { match futures::ready!(this.reader.stream.as_mut().poll_next(cx)) {
Some(Ok(slice)) => { Some(Ok(slice)) => {
this.reader.buf_len += slice.len(); if !slice.is_empty() {
this.reader.buf.push_back(slice); this.reader.buf_len += slice.len();
this.reader.buf.push_back(slice);
}
} }
Some(Err(e)) => { Some(Err(e)) => {
this.reader.err = Some(e); this.reader.err = Some(e);