add streaming body to requests and responses #3

Merged
lx merged 64 commits from stream-body into main 2022-09-13 10:56:54 +00:00
4 changed files with 56 additions and 46 deletions
Showing only changes of commit 745c786184 - Show all commits

View file

@ -109,3 +109,39 @@ where
}
}
}
// ---- Helpers for serializing I/O Errors
pub(crate) fn u8_to_io_errorkind(v: u8) -> std::io::ErrorKind {
use std::io::ErrorKind;
match v {
101 => ErrorKind::ConnectionAborted,
102 => ErrorKind::BrokenPipe,
103 => ErrorKind::WouldBlock,
104 => ErrorKind::InvalidInput,
105 => ErrorKind::InvalidData,
106 => ErrorKind::TimedOut,
107 => ErrorKind::Interrupted,
108 => ErrorKind::UnexpectedEof,
109 => ErrorKind::OutOfMemory,
110 => ErrorKind::ConnectionReset,
_ => ErrorKind::Other,
}
}
pub(crate) fn io_errorkind_to_u8(kind: std::io::ErrorKind) -> u8 {
use std::io::ErrorKind;
match kind {
ErrorKind::ConnectionAborted => 101,
ErrorKind::BrokenPipe => 102,
ErrorKind::WouldBlock => 103,
ErrorKind::InvalidInput => 104,
ErrorKind::InvalidData => 105,
ErrorKind::TimedOut => 106,
ErrorKind::Interrupted => 107,
ErrorKind::UnexpectedEof => 108,
ErrorKind::OutOfMemory => 109,
ErrorKind::ConnectionReset => 110,
_ => 100,
}
}

View file

@ -81,9 +81,10 @@ pub(crate) trait RecvLoop: Sync + 'static {
read.read_exact(&mut next_slice[..]).await?;
let packet = if is_error {
let msg = String::from_utf8(next_slice).unwrap_or("<invalid utf8 error message>".into());
debug!("recv_loop: got id {}, error: {}", id, msg);
Some(Err(std::io::Error::new(std::io::ErrorKind::Other, msg)))
let kind = u8_to_io_errorkind(next_slice[0]);
let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or("<invalid utf8 error message>");
debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg);
Some(Err(std::io::Error::new(kind, msg.to_string())))
} else {
trace!(
"recv_loop: got id {}, size {}, has_cont {}",

View file

@ -4,7 +4,7 @@ use std::sync::Arc;
use std::task::{Context, Poll};
use async_trait::async_trait;
use bytes::Bytes;
use bytes::{Bytes, BytesMut, BufMut};
use log::*;
use futures::AsyncWriteExt;
@ -22,7 +22,11 @@ use crate::stream::*;
// CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream
// ERROR_MARKER if this chunk denotes an error
// (these two flags are exclusive, an error denotes the end of the stream)
// - [u8; chunk_length] chunk data / error message
// - [u8; chunk_length], either
// - if not error: chunk data
// - if error:
// - u8: error kind, encoded using error::io_errorkind_to_u8
// - rest: error message
pub(crate) type RequestID = u32;
pub(crate) type ChunkLength = u16;
@ -136,12 +140,17 @@ impl DataFrame {
Self::Data(bytes, has_cont)
}
Err(e) => {
let msg = format!("{}", e);
let mut msg = Bytes::from(msg.into_bytes());
if msg.len() > MAX_CHUNK_LENGTH as usize {
msg = msg.slice(..MAX_CHUNK_LENGTH as usize);
let mut buf = BytesMut::new();
buf.put_u8(io_errorkind_to_u8(e.kind()));
let msg = format!("{}", e).into_bytes();
if msg.len() > (MAX_CHUNK_LENGTH - 1) as usize {
buf.put(&msg[..(MAX_CHUNK_LENGTH - 1) as usize]);
} else {
buf.put(&msg[..]);
}
Self::Error(msg)
Self::Error(buf.freeze())
}
}
}

View file

@ -150,42 +150,6 @@ impl<'a> Future for ByteStreamReadExact<'a> {
// ----
/*
fn u8_to_io_error(v: u8) -> std::io::Error {
use std::io::{Error, ErrorKind};
let kind = match v {
101 => ErrorKind::ConnectionAborted,
102 => ErrorKind::BrokenPipe,
103 => ErrorKind::WouldBlock,
104 => ErrorKind::InvalidInput,
105 => ErrorKind::InvalidData,
106 => ErrorKind::TimedOut,
107 => ErrorKind::Interrupted,
108 => ErrorKind::UnexpectedEof,
109 => ErrorKind::OutOfMemory,
110 => ErrorKind::ConnectionReset,
_ => ErrorKind::Other,
};
Error::new(kind, "(in netapp stream)")
}
fn io_error_to_u8(e: std::io::Error) -> u8 {
use std::io::ErrorKind;
match e.kind() {
ErrorKind::ConnectionAborted => 101,
ErrorKind::BrokenPipe => 102,
ErrorKind::WouldBlock => 103,
ErrorKind::InvalidInput => 104,
ErrorKind::InvalidData => 105,
ErrorKind::TimedOut => 106,
ErrorKind::Interrupted => 107,
ErrorKind::UnexpectedEof => 108,
ErrorKind::OutOfMemory => 109,
ErrorKind::ConnectionReset => 110,
_ => 100,
}
}
*/
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
Box::pin(tokio_util::io::ReaderStream::new(reader))