From 26989bba1409bfc093e58ef98e75885b10ad7c1c Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Thu, 21 Jul 2022 18:15:07 +0200 Subject: [PATCH] Use Bytes instead of Vec --- Cargo.lock | 16 +++++----------- Cargo.toml | 2 +- src/message.rs | 8 ++++---- src/recv.rs | 3 ++- src/send.rs | 5 +++-- src/util.rs | 3 ++- 6 files changed, 17 insertions(+), 20 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 356c3ba..2312190 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -76,15 +76,9 @@ checksum = "14c189c53d098945499cdfa7ecc63567cf3886b3332b312a5b4585d8d3a6a610" [[package]] name = "bytes" -version = "0.6.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0dcbc35f504eb6fc275a6d20e4ebcda18cf50d40ba6fabff8c711fa16cb3b16" - -[[package]] -name = "bytes" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c4872d67bab6358e59559027aa3b9157c53d9358c51423c17554809a8858e0f8" +checksum = "f0b3de4a0c5e67e16066a0715723abd91edc2f9001d09c46e1dca929351e130e" [[package]] name = "cc" @@ -459,7 +453,7 @@ version = "0.4.4" dependencies = [ "arc-swap", "async-trait", - "bytes 0.6.0", + "bytes", "cfg-if", "chrono", "env_logger 0.8.4", @@ -924,7 +918,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2c2416fdedca8443ae44b4527de1ea633af61d8f7169ffa6e72c5b53d24efcc" dependencies = [ "autocfg", - "bytes 1.1.0", + "bytes", "libc", "memchr", "mio", @@ -964,7 +958,7 @@ version = "0.6.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d3725d3efa29485e87311c5b699de63cde14b00ed4d256b8318aa30ca452cd" dependencies = [ - "bytes 1.1.0", + "bytes", "futures-core", "futures-io", "futures-sink", diff --git a/Cargo.toml b/Cargo.toml index a2f0ab1..d8a4908 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,7 +36,7 @@ log = "0.4.8" arc-swap = "1.1" async-trait = "0.1.7" err-derive = "0.2.3" -bytes = "0.6.0" +bytes = "1.2" lru = { version = "0.6", optional = true } cfg-if = "1.0" diff --git a/src/message.rs b/src/message.rs index dbcc857..6d50254 100644 --- a/src/message.rs +++ b/src/message.rs @@ -192,8 +192,8 @@ impl Framing { // required because otherwise the borrow-checker complains let Framing { direct, stream } = self; - let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec()) }) - .chain(stream::once(async move { Ok(direct) })); + let res = stream::once(async move { Ok(u32::to_be_bytes(len).to_vec().into()) }) + .chain(stream::once(async move { Ok(direct.into()) })); if let Some(stream) = stream { Box::pin(res.chain(stream)) @@ -217,7 +217,7 @@ impl Framing { let mut len = [0; 4]; len.copy_from_slice(&packet[..4]); let len = u32::from_be_bytes(len); - packet.drain(..4); + packet = packet.slice(4..); let mut buffer = Vec::new(); let len = len as usize; @@ -226,7 +226,7 @@ impl Framing { buffer.extend_from_slice(&packet[..max_cp]); if buffer.len() == len { - packet.drain(..max_cp); + packet = packet.slice(max_cp..); break; } packet = stream diff --git a/src/recv.rs b/src/recv.rs index f5221e6..abe7b9a 100644 --- a/src/recv.rs +++ b/src/recv.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use bytes::Bytes; use log::trace; use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}; @@ -85,7 +86,7 @@ pub(crate) trait RecvLoop: Sync + 'static { let mut next_slice = vec![0; size as usize]; read.read_exact(&mut next_slice[..]).await?; trace!("recv_loop: read {} bytes", next_slice.len()); - Ok(next_slice) + Ok(Bytes::from(next_slice)) }; let mut sender = if let Some(send) = streams.remove(&(id)) { diff --git a/src/send.rs b/src/send.rs index 0179eb2..660e85c 100644 --- a/src/send.rs +++ b/src/send.rs @@ -3,6 +3,7 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; +use bytes::Bytes; use async_trait::async_trait; use log::trace; @@ -49,7 +50,7 @@ impl From for DataReader { fn from(data: ByteStream) -> DataReader { DataReader { reader: data, - packet: Ok(Vec::new()), + packet: Ok(Bytes::new()), pos: 0, buf: Vec::with_capacity(MAX_CHUNK_LENGTH as usize), eos: false, @@ -130,7 +131,7 @@ impl Stream for DataReader { Ok(v) => v, Err(e) => { let e = *e; - *this.packet = Ok(Vec::new()); + *this.packet = Ok(Bytes::new()); *this.eos = true; return Poll::Ready(Some(DataFrame::Error(e))); } diff --git a/src/util.rs b/src/util.rs index 6fbafe6..e81a89c 100644 --- a/src/util.rs +++ b/src/util.rs @@ -4,6 +4,7 @@ use std::pin::Pin; use log::info; use serde::Serialize; +use bytes::Bytes; use futures::Stream; use tokio::sync::watch; @@ -27,7 +28,7 @@ pub type NetworkKey = sodiumoxide::crypto::auth::Key; /// meaning, it's up to your application to define their semantic. pub type ByteStream = Pin + Send>>; -pub type Packet = Result, u8>; +pub type Packet = Result; /// Utility function: encodes any serializable value in MessagePack binary format /// using the RMP library.