Make use of BytesBuf from new Netapp
This commit is contained in:
parent
1ef87ac4cb
commit
13b5f28c7e
1 changed files with 10 additions and 33 deletions
|
@ -1,4 +1,4 @@
|
||||||
use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
|
use std::collections::{BTreeMap, BTreeSet, HashMap};
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use futures::prelude::*;
|
use futures::prelude::*;
|
||||||
|
@ -13,6 +13,7 @@ use opentelemetry::{
|
||||||
Context,
|
Context,
|
||||||
};
|
};
|
||||||
|
|
||||||
|
use garage_rpc::netapp::bytes_buf::BytesBuf;
|
||||||
use garage_table::*;
|
use garage_table::*;
|
||||||
use garage_util::async_hash::*;
|
use garage_util::async_hash::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -108,7 +109,7 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
size,
|
size,
|
||||||
etag: data_md5sum_hex.clone(),
|
etag: data_md5sum_hex.clone(),
|
||||||
},
|
},
|
||||||
first_block,
|
first_block.to_vec(),
|
||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -136,7 +137,6 @@ pub(crate) async fn save_stream<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
garage.version_table.insert(&version).await?;
|
garage.version_table.insert(&version).await?;
|
||||||
|
|
||||||
// Transfer data and verify checksum
|
// Transfer data and verify checksum
|
||||||
let first_block = Bytes::from(first_block);
|
|
||||||
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
||||||
|
|
||||||
let tx_result = (|| async {
|
let tx_result = (|| async {
|
||||||
|
@ -318,7 +318,6 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
|
||||||
chunker.next(),
|
chunker.next(),
|
||||||
)?;
|
)?;
|
||||||
if let Some(block) = next_block {
|
if let Some(block) = next_block {
|
||||||
let block = Bytes::from(block);
|
|
||||||
let (_, _, block_hash) = futures::future::join3(
|
let (_, _, block_hash) = futures::future::join3(
|
||||||
md5hasher.update(block.clone()),
|
md5hasher.update(block.clone()),
|
||||||
sha256hasher.update(block.clone()),
|
sha256hasher.update(block.clone()),
|
||||||
|
@ -387,8 +386,7 @@ struct StreamChunker<S: Stream<Item = Result<Bytes, Error>>> {
|
||||||
stream: S,
|
stream: S,
|
||||||
read_all: bool,
|
read_all: bool,
|
||||||
block_size: usize,
|
block_size: usize,
|
||||||
buf: VecDeque<Bytes>,
|
buf: BytesBuf,
|
||||||
buf_len: usize,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
|
impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
|
||||||
|
@ -397,45 +395,25 @@ impl<S: Stream<Item = Result<Bytes, Error>> + Unpin> StreamChunker<S> {
|
||||||
stream,
|
stream,
|
||||||
read_all: false,
|
read_all: false,
|
||||||
block_size,
|
block_size,
|
||||||
buf: VecDeque::with_capacity(8),
|
buf: BytesBuf::new(),
|
||||||
buf_len: 0,
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn next(&mut self) -> Result<Option<Vec<u8>>, Error> {
|
async fn next(&mut self) -> Result<Option<Bytes>, Error> {
|
||||||
while !self.read_all && self.buf_len < self.block_size {
|
while !self.read_all && self.buf.len() < self.block_size {
|
||||||
if let Some(block) = self.stream.next().await {
|
if let Some(block) = self.stream.next().await {
|
||||||
let bytes = block?;
|
let bytes = block?;
|
||||||
trace!("Body next: {} bytes", bytes.len());
|
trace!("Body next: {} bytes", bytes.len());
|
||||||
self.buf_len += bytes.len();
|
self.buf.extend(bytes);
|
||||||
self.buf.push_back(bytes);
|
|
||||||
} else {
|
} else {
|
||||||
self.read_all = true;
|
self.read_all = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if self.buf_len == 0 {
|
if self.buf.is_empty() {
|
||||||
Ok(None)
|
Ok(None)
|
||||||
} else {
|
} else {
|
||||||
let mut slices = Vec::with_capacity(self.buf.len());
|
Ok(Some(self.buf.take_max(self.block_size)))
|
||||||
let mut taken = 0;
|
|
||||||
while self.buf_len > 0 && taken < self.block_size {
|
|
||||||
let front = self.buf.pop_front().unwrap();
|
|
||||||
if taken + front.len() <= self.block_size {
|
|
||||||
taken += front.len();
|
|
||||||
self.buf_len -= front.len();
|
|
||||||
slices.push(front);
|
|
||||||
} else {
|
|
||||||
let front_take = self.block_size - taken;
|
|
||||||
slices.push(front.slice(..front_take));
|
|
||||||
self.buf.push_front(front.slice(front_take..));
|
|
||||||
self.buf_len -= front_take;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Ok(Some(
|
|
||||||
slices.iter().map(|x| &x[..]).collect::<Vec<_>>().concat(),
|
|
||||||
))
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -545,7 +523,6 @@ pub async fn handle_put_part(
|
||||||
// Copy block to store
|
// Copy block to store
|
||||||
let version = Version::new(version_uuid, bucket_id, key, false);
|
let version = Version::new(version_uuid, bucket_id, key, false);
|
||||||
|
|
||||||
let first_block = Bytes::from(first_block);
|
|
||||||
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
let first_block_hash = async_blake2sum(first_block.clone()).await;
|
||||||
|
|
||||||
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
let (_, data_md5sum, data_sha256sum) = read_and_put_blocks(
|
||||||
|
|
Loading…
Reference in a new issue