add streaming body to requests and responses #3
4 changed files with 85 additions and 41 deletions
|
@ -190,7 +190,7 @@ impl ClientConn {
|
||||||
#[cfg(feature = "telemetry")]
|
#[cfg(feature = "telemetry")]
|
||||||
span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64));
|
span.set_attribute(KeyValue::new("len_query_msg", req_msg_len as i64));
|
||||||
|
|
||||||
query_send.send((id, prio, req_stream))?;
|
query_send.send((id, prio, req_order, req_stream))?;
|
||||||
|
|
||||||
cfg_if::cfg_if! {
|
cfg_if::cfg_if! {
|
||||||
if #[cfg(feature = "telemetry")] {
|
if #[cfg(feature = "telemetry")] {
|
||||||
|
|
|
@ -44,7 +44,7 @@ pub const PRIO_SECONDARY: RequestPriority = 0x01;
|
||||||
#[derive(Clone, Copy)]
|
#[derive(Clone, Copy)]
|
||||||
pub struct OrderTagStream(u64);
|
pub struct OrderTagStream(u64);
|
||||||
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
|
#[derive(Clone, Copy, Serialize, Deserialize, Debug)]
|
||||||
pub struct OrderTag(u64, u64);
|
pub struct OrderTag(pub(crate) u64, pub(crate) u64);
|
||||||
|
|
||||||
impl OrderTag {
|
impl OrderTag {
|
||||||
pub fn stream() -> OrderTagStream {
|
pub fn stream() -> OrderTagStream {
|
||||||
|
|
120
src/send.rs
120
src/send.rs
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::VecDeque;
|
use std::collections::{HashMap, VecDeque};
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::task::{Context, Poll};
|
use std::task::{Context, Poll};
|
||||||
|
@ -7,7 +7,7 @@ use async_trait::async_trait;
|
||||||
use bytes::{BufMut, Bytes, BytesMut};
|
use bytes::{BufMut, Bytes, BytesMut};
|
||||||
use log::*;
|
use log::*;
|
||||||
|
|
||||||
use futures::AsyncWriteExt;
|
use futures::{AsyncWriteExt, Future};
|
||||||
use kuska_handshake::async_std::BoxStreamWrite;
|
use kuska_handshake::async_std::BoxStreamWrite;
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
|
@ -36,15 +36,21 @@ pub(crate) const ERROR_MARKER: ChunkLength = 0x4000;
|
||||||
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
|
pub(crate) const CHUNK_HAS_CONTINUATION: ChunkLength = 0x8000;
|
||||||
pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF;
|
pub(crate) const CHUNK_LENGTH_MASK: ChunkLength = 0x3FFF;
|
||||||
|
|
||||||
pub(crate) type SendStream = (RequestID, RequestPriority, ByteStream);
|
pub(crate) type SendStream = (RequestID, RequestPriority, Option<OrderTag>, ByteStream);
|
||||||
|
|
||||||
struct SendQueue {
|
struct SendQueue {
|
||||||
items: Vec<(u8, VecDeque<SendQueueItem>)>,
|
items: Vec<(u8, SendQueuePriority)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
struct SendQueuePriority {
|
||||||
|
items: VecDeque<SendQueueItem>,
|
||||||
|
order: HashMap<u64, VecDeque<u64>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
struct SendQueueItem {
|
struct SendQueueItem {
|
||||||
id: RequestID,
|
id: RequestID,
|
||||||
prio: RequestPriority,
|
prio: RequestPriority,
|
||||||
|
order_tag: Option<OrderTag>,
|
||||||
data: ByteStreamReader,
|
data: ByteStreamReader,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -59,11 +65,11 @@ impl SendQueue {
|
||||||
let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) {
|
let pos_prio = match self.items.binary_search_by(|(p, _)| p.cmp(&prio)) {
|
||||||
Ok(i) => i,
|
Ok(i) => i,
|
||||||
Err(i) => {
|
Err(i) => {
|
||||||
self.items.insert(i, (prio, VecDeque::new()));
|
self.items.insert(i, (prio, SendQueuePriority::new()));
|
||||||
i
|
i
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
self.items[pos_prio].1.push_back(item);
|
self.items[pos_prio].1.push(item);
|
||||||
}
|
}
|
||||||
fn is_empty(&self) -> bool {
|
fn is_empty(&self) -> bool {
|
||||||
self.items.iter().all(|(_k, v)| v.is_empty())
|
self.items.iter().all(|(_k, v)| v.is_empty())
|
||||||
|
@ -75,6 +81,69 @@ impl SendQueue {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
impl SendQueuePriority {
|
||||||
|
fn new() -> Self {
|
||||||
|
Self {
|
||||||
|
items: VecDeque::new(),
|
||||||
|
order: HashMap::new(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fn push(&mut self, item: SendQueueItem) {
|
||||||
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
|
let order_vec = self.order.entry(stream).or_default();
|
||||||
|
let i = order_vec.iter().take_while(|o2| **o2 < order).count();
|
||||||
|
order_vec.insert(i, order);
|
||||||
|
}
|
||||||
|
self.items.push_back(item);
|
||||||
|
}
|
||||||
|
fn is_empty(&self) -> bool {
|
||||||
|
self.items.is_empty()
|
||||||
|
}
|
||||||
|
fn poll_next_ready(&mut self, ctx: &mut Context<'_>) -> Poll<(RequestID, DataFrame)> {
|
||||||
|
for (j, item) in self.items.iter_mut().enumerate() {
|
||||||
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
|
if order > *self.order.get(&stream).unwrap().front().unwrap() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
||||||
|
if let Poll::Ready(bytes_or_err) = Pin::new(&mut item_reader).poll(ctx) {
|
||||||
|
let id = item.id;
|
||||||
|
let eos = item.data.eos();
|
||||||
|
|
||||||
|
let packet = bytes_or_err.map_err(|e| match e {
|
||||||
|
ReadExactError::Stream(err) => err,
|
||||||
|
_ => unreachable!(),
|
||||||
|
});
|
||||||
|
|
||||||
|
if eos || packet.is_err() {
|
||||||
|
if let Some(OrderTag(stream, order)) = item.order_tag {
|
||||||
|
assert_eq!(
|
||||||
|
self.order.get_mut(&stream).unwrap().pop_front(),
|
||||||
|
Some(order)
|
||||||
|
)
|
||||||
|
}
|
||||||
|
self.items.remove(j);
|
||||||
|
}
|
||||||
|
|
||||||
|
let data_frame = DataFrame::from_packet(packet, !eos);
|
||||||
|
|
||||||
|
return Poll::Ready((id, data_frame));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
fn dump(&self, prio: u8) -> String {
|
||||||
|
self.items
|
||||||
|
.iter()
|
||||||
|
.map(|i| format!("[{} {} {:?}]", prio, i.id, i.order_tag))
|
||||||
|
.collect::<Vec<_>>()
|
||||||
|
.join(" ")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
struct SendQueuePollNextReady<'a> {
|
struct SendQueuePollNextReady<'a> {
|
||||||
queue: &'a mut SendQueue,
|
queue: &'a mut SendQueue,
|
||||||
}
|
}
|
||||||
|
@ -84,37 +153,11 @@ impl<'a> futures::Future for SendQueuePollNextReady<'a> {
|
||||||
|
|
||||||
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
|
fn poll(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
|
||||||
for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() {
|
for (i, (_prio, items_at_prio)) in self.queue.items.iter_mut().enumerate() {
|
||||||
let mut ready_item = None;
|
if let Poll::Ready(res) = items_at_prio.poll_next_ready(ctx) {
|
||||||
for (j, item) in items_at_prio.iter_mut().enumerate() {
|
if items_at_prio.is_empty() {
|
||||||
let mut item_reader = item.data.read_exact_or_eos(MAX_CHUNK_LENGTH as usize);
|
|
||||||
match Pin::new(&mut item_reader).poll(ctx) {
|
|
||||||
Poll::Pending => (),
|
|
||||||
Poll::Ready(ready_v) => {
|
|
||||||
ready_item = Some((j, ready_v));
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some((j, bytes_or_err)) = ready_item {
|
|
||||||
let item = items_at_prio.remove(j).unwrap();
|
|
||||||
let id = item.id;
|
|
||||||
let eos = item.data.eos();
|
|
||||||
|
|
||||||
let packet = bytes_or_err.map_err(|e| match e {
|
|
||||||
ReadExactError::Stream(err) => err,
|
|
||||||
_ => unreachable!(),
|
|
||||||
});
|
|
||||||
|
|
||||||
let data_frame = DataFrame::from_packet(packet, !eos);
|
|
||||||
|
|
||||||
if !eos && !matches!(data_frame, DataFrame::Error(_)) {
|
|
||||||
items_at_prio.push_back(item);
|
|
||||||
} else if items_at_prio.is_empty() {
|
|
||||||
self.queue.items.remove(i);
|
self.queue.items.remove(i);
|
||||||
}
|
}
|
||||||
|
return Poll::Ready(res);
|
||||||
return Poll::Ready((id, data_frame));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// If the queue is empty, this futures is eternally pending.
|
// If the queue is empty, this futures is eternally pending.
|
||||||
|
@ -200,8 +243,7 @@ pub(crate) trait SendLoop: Sync {
|
||||||
sending
|
sending
|
||||||
.items
|
.items
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, i)| i.iter().map(|x| x.id))
|
.map(|(prio, i)| i.dump(*prio))
|
||||||
.flatten()
|
|
||||||
.collect::<Vec<_>>()
|
.collect::<Vec<_>>()
|
||||||
);
|
);
|
||||||
|
|
||||||
|
@ -217,12 +259,14 @@ pub(crate) trait SendLoop: Sync {
|
||||||
// recv_fut is cancellation-safe according to tokio doc,
|
// recv_fut is cancellation-safe according to tokio doc,
|
||||||
// send_fut is cancellation-safe as implemented above?
|
// send_fut is cancellation-safe as implemented above?
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
|
biased; // always read incomming channel first if it has data
|
||||||
sth = recv_fut => {
|
sth = recv_fut => {
|
||||||
if let Some((id, prio, data)) = sth {
|
if let Some((id, prio, order_tag, data)) = sth {
|
||||||
trace!("send_loop: add stream {} to send", id);
|
trace!("send_loop: add stream {} to send", id);
|
||||||
sending.push(SendQueueItem {
|
sending.push(SendQueueItem {
|
||||||
id,
|
id,
|
||||||
prio,
|
prio,
|
||||||
|
order_tag,
|
||||||
data: ByteStreamReader::new(data),
|
data: ByteStreamReader::new(data),
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -186,7 +186,7 @@ impl RecvLoop for ServerConn {
|
||||||
|
|
||||||
let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result);
|
let (resp_stream, resp_order) = RespEnc::encode(resp_enc_result);
|
||||||
resp_send
|
resp_send
|
||||||
.send((id, prio, resp_stream))
|
.send((id, prio, resp_order, resp_stream))
|
||||||
.log_err("ServerConn recv_handler send resp bytes");
|
.log_err("ServerConn recv_handler send resp bytes");
|
||||||
Ok::<_, Error>(())
|
Ok::<_, Error>(())
|
||||||
});
|
});
|
||||||
|
|
Loading…
Reference in a new issue