diff --git a/src/error.rs b/src/error.rs index f374341..2fa4594 100644 --- a/src/error.rs +++ b/src/error.rs @@ -109,3 +109,39 @@ where } } } + +// ---- Helpers for serializing I/O Errors + +pub(crate) fn u8_to_io_errorkind(v: u8) -> std::io::ErrorKind { + use std::io::ErrorKind; + match v { + 101 => ErrorKind::ConnectionAborted, + 102 => ErrorKind::BrokenPipe, + 103 => ErrorKind::WouldBlock, + 104 => ErrorKind::InvalidInput, + 105 => ErrorKind::InvalidData, + 106 => ErrorKind::TimedOut, + 107 => ErrorKind::Interrupted, + 108 => ErrorKind::UnexpectedEof, + 109 => ErrorKind::OutOfMemory, + 110 => ErrorKind::ConnectionReset, + _ => ErrorKind::Other, + } +} + +pub(crate) fn io_errorkind_to_u8(kind: std::io::ErrorKind) -> u8 { + use std::io::ErrorKind; + match kind { + ErrorKind::ConnectionAborted => 101, + ErrorKind::BrokenPipe => 102, + ErrorKind::WouldBlock => 103, + ErrorKind::InvalidInput => 104, + ErrorKind::InvalidData => 105, + ErrorKind::TimedOut => 106, + ErrorKind::Interrupted => 107, + ErrorKind::UnexpectedEof => 108, + ErrorKind::OutOfMemory => 109, + ErrorKind::ConnectionReset => 110, + _ => 100, + } +} diff --git a/src/recv.rs b/src/recv.rs index f8d68da..f8606f3 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -81,9 +81,10 @@ pub(crate) trait RecvLoop: Sync + 'static { read.read_exact(&mut next_slice[..]).await?; let packet = if is_error { - let msg = String::from_utf8(next_slice).unwrap_or("".into()); - debug!("recv_loop: got id {}, error: {}", id, msg); - Some(Err(std::io::Error::new(std::io::ErrorKind::Other, msg))) + let kind = u8_to_io_errorkind(next_slice[0]); + let msg = std::str::from_utf8(&next_slice[1..]).unwrap_or(""); + debug!("recv_loop: got id {}, error {:?}: {}", id, kind, msg); + Some(Err(std::io::Error::new(kind, msg.to_string()))) } else { trace!( "recv_loop: got id {}, size {}, has_cont {}", diff --git a/src/send.rs b/src/send.rs index f362962..287fe40 100644 --- a/src/send.rs +++ b/src/send.rs @@ -4,7 +4,7 @@ use std::sync::Arc; use std::task::{Context, Poll}; use async_trait::async_trait; -use bytes::Bytes; +use bytes::{Bytes, BytesMut, BufMut}; use log::*; use futures::AsyncWriteExt; @@ -22,7 +22,11 @@ use crate::stream::*; // CHUNK_HAS_CONTINUATION when this is not the last chunk of the stream // ERROR_MARKER if this chunk denotes an error // (these two flags are exclusive, an error denotes the end of the stream) -// - [u8; chunk_length] chunk data / error message +// - [u8; chunk_length], either +// - if not error: chunk data +// - if error: +// - u8: error kind, encoded using error::io_errorkind_to_u8 +// - rest: error message pub(crate) type RequestID = u32; pub(crate) type ChunkLength = u16; @@ -136,12 +140,17 @@ impl DataFrame { Self::Data(bytes, has_cont) } Err(e) => { - let msg = format!("{}", e); - let mut msg = Bytes::from(msg.into_bytes()); - if msg.len() > MAX_CHUNK_LENGTH as usize { - msg = msg.slice(..MAX_CHUNK_LENGTH as usize); + let mut buf = BytesMut::new(); + buf.put_u8(io_errorkind_to_u8(e.kind())); + + let msg = format!("{}", e).into_bytes(); + if msg.len() > (MAX_CHUNK_LENGTH - 1) as usize { + buf.put(&msg[..(MAX_CHUNK_LENGTH - 1) as usize]); + } else { + buf.put(&msg[..]); } - Self::Error(msg) + + Self::Error(buf.freeze()) } } } diff --git a/src/stream.rs b/src/stream.rs index 3518246..6e00e5f 100644 --- a/src/stream.rs +++ b/src/stream.rs @@ -150,42 +150,6 @@ impl<'a> Future for ByteStreamReadExact<'a> { // ---- -/* -fn u8_to_io_error(v: u8) -> std::io::Error { - use std::io::{Error, ErrorKind}; - let kind = match v { - 101 => ErrorKind::ConnectionAborted, - 102 => ErrorKind::BrokenPipe, - 103 => ErrorKind::WouldBlock, - 104 => ErrorKind::InvalidInput, - 105 => ErrorKind::InvalidData, - 106 => ErrorKind::TimedOut, - 107 => ErrorKind::Interrupted, - 108 => ErrorKind::UnexpectedEof, - 109 => ErrorKind::OutOfMemory, - 110 => ErrorKind::ConnectionReset, - _ => ErrorKind::Other, - }; - Error::new(kind, "(in netapp stream)") -} - -fn io_error_to_u8(e: std::io::Error) -> u8 { - use std::io::ErrorKind; - match e.kind() { - ErrorKind::ConnectionAborted => 101, - ErrorKind::BrokenPipe => 102, - ErrorKind::WouldBlock => 103, - ErrorKind::InvalidInput => 104, - ErrorKind::InvalidData => 105, - ErrorKind::TimedOut => 106, - ErrorKind::Interrupted => 107, - ErrorKind::UnexpectedEof => 108, - ErrorKind::OutOfMemory => 109, - ErrorKind::ConnectionReset => 110, - _ => 100, - } -} -*/ pub fn asyncread_stream(reader: R) -> ByteStream { Box::pin(tokio_util::io::ReaderStream::new(reader))