add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
6 changed files with 17 additions and 20 deletions
Showing only changes of commit 26989bba14 - Show all commits

16
Cargo.lock generated
View file

@ -76,15 +76,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "0.6.0" version = "1.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e"
[[package]]
name = "bytes"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8"
[[package]] [[package]]
name = "cc" name = "cc"
@ -459,7 +453,7 @@ version = "0.4.4"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",
"bytes 0.6.0", "bytes",
"cfg-if", "cfg-if",
"chrono", "chrono",
"env_logger 0.8.4", "env_logger 0.8.4",
@ -924,7 +918,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc"
dependencies = [ dependencies = [
"autocfg", "autocfg",
"bytes 1.1.0", "bytes",
"libc", "libc",
"memchr", "memchr",
"mio", "mio",
@ -964,7 +958,7 @@ version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd"
dependencies = [ dependencies = [
"bytes 1.1.0", "bytes",
"futures-core", "futures-core",
"futures-io", "futures-io",
"futures-sink", "futures-sink",

View file

@ -36,7 +36,7 @@ log = "0.4.8"
arc-swap = "1.1" arc-swap = "1.1"
async-trait = "0.1.7" async-trait = "0.1.7"
err-derive = "0.2.3" err-derive = "0.2.3"
bytes = "0.6.0" bytes = "1.2"
lru = { version = "0.6", optional = true } lru = { version = "0.6", optional = true }
cfg-if = "1.0" cfg-if = "1.0"

View file

@ -192,8 +192,8 @@ impl Framing {
// required because otherwise the borrow-checker complains // required because otherwise the borrow-checker complains
let Framing { direct, stream } = self; let Framing { direct, stream } = self;
let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec()) }) let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec().into()) })
.chain(stream::once(async move { Ok(direct) })); .chain(stream::once(async move { Ok(direct.into()) }));
if let Some(stream) = stream { if let Some(stream) = stream {
Box::pin(res.chain(stream)) Box::pin(res.chain(stream))
@ -217,7 +217,7 @@ impl Framing {
let mut len = [0; 4]; let mut len = [0; 4];
len.copy_from_slice(&packet[..4]); len.copy_from_slice(&packet[..4]);
let len = u32::from_be_bytes(len); let len = u32::from_be_bytes(len);
packet.drain(..4); packet = packet.slice(4..);
let mut buffer = Vec::new(); let mut buffer = Vec::new();
let len = len as usize; let len = len as usize;
@ -226,7 +226,7 @@ impl Framing {
buffer.extend_from_slice(&packet[..max_cp]); buffer.extend_from_slice(&packet[..max_cp]);
if buffer.len() == len { if buffer.len() == len {
packet.drain(..max_cp); packet = packet.slice(max_cp..);
break; break;
} }
packet = stream packet = stream

View file

@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use bytes::Bytes;
use log::trace; use log::trace;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
@ -85,7 +86,7 @@ pub(crate) trait RecvLoop: Sync + 'static {
let mut next_slice = vec![0; size as usize]; let mut next_slice = vec![0; size as usize];
read.read_exact(&mut next_slice[..]).await?; read.read_exact(&mut next_slice[..]).await?;
trace!("recv_loop: read {} bytes", next_slice.len()); trace!("recv_loop: read {} bytes", next_slice.len());
Ok(next_slice) Ok(Bytes::from(next_slice))
}; };
let mut sender = if let Some(send) = streams.remove(&(id)) { let mut sender = if let Some(send) = streams.remove(&(id)) {

View file

@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc; use std::sync::Arc;
use std::task::{Context, Poll}; use std::task::{Context, Poll};
use bytes::Bytes;
use async_trait::async_trait; use async_trait::async_trait;
use log::trace; use log::trace;
@ -49,7 +50,7 @@ impl From<ByteStream> for DataReader {
fn from(data: ByteStream) -> DataReader { fn from(data: ByteStream) -> DataReader {
DataReader { DataReader {
reader: data, reader: data,
packet: Ok(Vec::new()), packet: Ok(Bytes::new()),
pos: 0, pos: 0,
buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize),
eos: false, eos: false,
@ -130,7 +131,7 @@ impl Stream for DataReader {
Ok(v) => v, Ok(v) => v,
Err(e) => { Err(e) => {
let e = *e; let e = *e;
*this.packet = Ok(Vec::new()); *this.packet = Ok(Bytes::new());
*this.eos = true; *this.eos = true;
return Poll::Ready(Some(DataFrame::Error(e))); return Poll::Ready(Some(DataFrame::Error(e)));
} }

View file

@ -4,6 +4,7 @@ use std::pin::Pin;
use log::info; use log::info;
use serde::Serialize; use serde::Serialize;
use bytes::Bytes;
use futures::Stream; use futures::Stream;
use tokio::sync::watch; use tokio::sync::watch;
@ -27,7 +28,7 @@ pub type NetworkKey = sodiumoxide::crypto::auth::Key;
/// meaning, it's up to your application to define their semantic. /// meaning, it's up to your application to define their semantic.
pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send>>; pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send>>;
pub type Packet = Result<Vec<u8>, u8>; pub type Packet = Result<Bytes, u8>;
/// Utility function: encodes any serializable value in MessagePack binary format /// Utility function: encodes any serializable value in MessagePack binary format
/// using the RMP library. /// using the RMP library.