From ab80ade4f0034cbdcf15a99c674730f85eb06870 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Fri, 22 Jul 2022 16:42:26 +0200 Subject: [PATCH] Conversion between ByteStream and AsyncRead --- Cargo.lock | 2 +- Cargo.toml | 2 +- src/stream.rs | 51 ++++++++++++++++++++++++++++++++++++++++++++++++++- 3 files changed, 52 insertions(+), 3 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2312190..692b2e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -449,7 +449,7 @@ dependencies = [ [[package]] name = "netapp" -version = "0.4.4" +version = "0.5.0" dependencies = [ "arc-swap", "async-trait", diff --git a/Cargo.toml b/Cargo.toml index dd05d8c..377e09d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,7 +23,7 @@ telemetry = ["opentelemetry", "opentelemetry-contrib", "rand"] futures = "0.3.17" pin-project = "1.0.10" tokio = { version = "1.0", default-features = false, features = ["net", "rt", "rt-multi-thread", "sync", "time", "macros", "io-util", "signal"] } -tokio-util = { version = "0.6.8", default-features = false, features = ["compat"] } +tokio-util = { version = "0.6.8", default-features = false, features = ["compat", "io"] } tokio-stream = "0.1.7" serde = { version = "1.0", default-features = false, features = ["derive", "rc"] } diff --git a/src/stream.rs b/src/stream.rs index aa7641f..f5607b3 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -5,7 +5,8 @@ use std::task::{Context, Poll}; use bytes::Bytes; use futures::Future; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, TryStreamExt}; +use tokio::io::AsyncRead; /// A stream of associated data. /// @@ -18,6 +19,8 @@ pub type ByteStream = Pin + Send + Sync>>; pub type Packet = Result; +// ---- + pub struct ByteStreamReader { stream: ByteStream, buf: VecDeque, @@ -175,3 +178,49 @@ impl<'a> Future for ByteStreamReadExact<'a> { } } } + +// ---- + +fn u8_to_io_error(v: u8) -> std::io::Error { + use std::io::{Error, ErrorKind}; + let kind = match v { + 101 => ErrorKind::ConnectionAborted, + 102 => ErrorKind::BrokenPipe, + 103 => ErrorKind::WouldBlock, + 104 => ErrorKind::InvalidInput, + 105 => ErrorKind::InvalidData, + 106 => ErrorKind::TimedOut, + 107 => ErrorKind::Interrupted, + 108 => ErrorKind::UnexpectedEof, + 109 => ErrorKind::OutOfMemory, + 110 => ErrorKind::ConnectionReset, + _ => ErrorKind::Other, + }; + Error::new(kind, "(in netapp stream)") +} + +fn io_error_to_u8(e: std::io::Error) -> u8 { + use std::io::{ErrorKind}; + match e.kind() { + ErrorKind::ConnectionAborted => 101, + ErrorKind::BrokenPipe => 102, + ErrorKind::WouldBlock => 103, + ErrorKind::InvalidInput => 104, + ErrorKind::InvalidData => 105, + ErrorKind::TimedOut => 106, + ErrorKind::Interrupted => 107, + ErrorKind::UnexpectedEof => 108, + ErrorKind::OutOfMemory => 109, + ErrorKind::ConnectionReset => 110, + _ => 100, + } +} + +pub fn asyncread_stream(reader: R) -> ByteStream { + Box::pin(tokio_util::io::ReaderStream::new(reader) + .map_err(io_error_to_u8)) +} + +pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static { + tokio_util::io::StreamReader::new(stream.map_err(u8_to_io_error)) +}