Conversion between ByteStream and AsyncRead

This commit is contained in:
Alex 2022-07-22 16:42:26 +02:00
parent a5e5fd0408
commit ab80ade4f0
Signed by untrusted user: lx
GPG key ID: 0E496D15096376BE
3 changed files with 52 additions and 3 deletions

2
Cargo.lock generated
View file

@ -449,7 +449,7 @@ dependencies = [
[[package]] [[package]]
name = "netapp" name = "netapp"
version = "0.4.4" version = "0.5.0"
dependencies = [ dependencies = [
"arc-swap", "arc-swap",
"async-trait", "async-trait",

View file

@ -23,7 +23,7 @@ telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"]
futures = "0.3.17" futures = "0.3.17"
pin-project = "1.0.10" pin-project = "1.0.10"
tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros", "io-util", "signal"] } tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros", "io-util", "signal"] }
tokio-util = { version = "0.6.8", default-features = false, features = ["compat"] } tokio-util = { version = "0.6.8", default-features = false, features = ["compat", "io"] }
tokio-stream = "0.1.7" tokio-stream = "0.1.7"
serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } serde = { version = "1.0", default-features = false, features = ["derive", "rc"] }

View file

@ -5,7 +5,8 @@ use std::task::{Context, Poll};
use bytes::Bytes; use bytes::Bytes;
use futures::Future; use futures::Future;
use futures::{Stream, StreamExt}; use futures::{Stream, StreamExt, TryStreamExt};
use tokio::io::AsyncRead;
/// A stream of associated data. /// A stream of associated data.
/// ///
@ -18,6 +19,8 @@ pub type ByteStream = Pin<Box<dyn Stream<Item = Packet> + Send + Sync>>;
pub type Packet = Result<Bytes, u8>; pub type Packet = Result<Bytes, u8>;
// ----
pub struct ByteStreamReader { pub struct ByteStreamReader {
stream: ByteStream, stream: ByteStream,
buf: VecDeque<Bytes>, buf: VecDeque<Bytes>,
@ -175,3 +178,49 @@ impl<'a> Future for ByteStreamReadExact<'a> {
} }
} }
} }
// ----
fn u8_to_io_error(v: u8) -> std::io::Error {
use std::io::{Error, ErrorKind};
let kind = match v {
101 => ErrorKind::ConnectionAborted,
102 => ErrorKind::BrokenPipe,
103 => ErrorKind::WouldBlock,
104 => ErrorKind::InvalidInput,
105 => ErrorKind::InvalidData,
106 => ErrorKind::TimedOut,
107 => ErrorKind::Interrupted,
108 => ErrorKind::UnexpectedEof,
109 => ErrorKind::OutOfMemory,
110 => ErrorKind::ConnectionReset,
_ => ErrorKind::Other,
};
Error::new(kind, "(in netapp stream)")
}
fn io_error_to_u8(e: std::io::Error) -> u8 {
use std::io::{ErrorKind};
match e.kind() {
ErrorKind::ConnectionAborted => 101,
ErrorKind::BrokenPipe => 102,
ErrorKind::WouldBlock => 103,
ErrorKind::InvalidInput => 104,
ErrorKind::InvalidData => 105,
ErrorKind::TimedOut => 106,
ErrorKind::Interrupted => 107,
ErrorKind::UnexpectedEof => 108,
ErrorKind::OutOfMemory => 109,
ErrorKind::ConnectionReset => 110,
_ => 100,
}
}
pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> ByteStream {
Box::pin(tokio_util::io::ReaderStream::new(reader)
.map_err(io_error_to_u8))
}
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error))
}