some refactoring on data read/write path #729
6 changed files with 132 additions and 150 deletions
|
@ -13,7 +13,7 @@ use http::header::{
|
||||||
use hyper::{body::Body, Request, Response, StatusCode};
|
use hyper::{body::Body, Request, Response, StatusCode};
|
||||||
use tokio::sync::mpsc;
|
use tokio::sync::mpsc;
|
||||||
|
|
||||||
use garage_block::manager::BlockStream;
|
use garage_net::stream::ByteStream;
|
||||||
use garage_rpc::rpc_helper::OrderTag;
|
use garage_rpc::rpc_helper::OrderTag;
|
||||||
use garage_table::EmptyKey;
|
use garage_table::EmptyKey;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -286,7 +286,7 @@ pub async fn handle_get(
|
||||||
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
|
Ok(resp_builder.body(bytes_body(bytes.to_vec().into()))?)
|
||||||
}
|
}
|
||||||
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
ObjectVersionData::FirstBlock(_, first_block_hash) => {
|
||||||
let (tx, rx) = mpsc::channel::<BlockStream>(2);
|
let (tx, rx) = mpsc::channel::<ByteStream>(2);
|
||||||
|
|
||||||
let order_stream = OrderTag::stream();
|
let order_stream = OrderTag::stream();
|
||||||
let first_block_hash = *first_block_hash;
|
let first_block_hash = *first_block_hash;
|
||||||
|
@ -494,7 +494,7 @@ fn body_from_blocks_range(
|
||||||
}
|
}
|
||||||
|
|
||||||
let order_stream = OrderTag::stream();
|
let order_stream = OrderTag::stream();
|
||||||
let (tx, rx) = mpsc::channel::<BlockStream>(2);
|
let (tx, rx) = mpsc::channel::<ByteStream>(2);
|
||||||
|
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
match async {
|
match async {
|
||||||
|
@ -542,7 +542,7 @@ fn body_from_blocks_range(
|
||||||
})
|
})
|
||||||
.filter_map(futures::future::ready);
|
.filter_map(futures::future::ready);
|
||||||
|
|
||||||
let block_stream: BlockStream = Box::pin(block_stream);
|
let block_stream: ByteStream = Box::pin(block_stream);
|
||||||
tx.send(Box::pin(block_stream))
|
tx.send(Box::pin(block_stream))
|
||||||
.await
|
.await
|
||||||
.ok_or_message("channel closed")?;
|
.ok_or_message("channel closed")?;
|
||||||
|
@ -562,7 +562,7 @@ fn body_from_blocks_range(
|
||||||
response_body_from_block_stream(rx)
|
response_body_from_block_stream(rx)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
|
fn response_body_from_block_stream(rx: mpsc::Receiver<ByteStream>) -> ResBody {
|
||||||
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
|
let body_stream = tokio_stream::wrappers::ReceiverStream::new(rx)
|
||||||
.flatten()
|
.flatten()
|
||||||
.map(|x| {
|
.map(|x| {
|
||||||
|
@ -572,7 +572,7 @@ fn response_body_from_block_stream(rx: mpsc::Receiver<BlockStream>) -> ResBody {
|
||||||
ResBody::new(http_body_util::StreamBody::new(body_stream))
|
ResBody::new(http_body_util::StreamBody::new(body_stream))
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error_stream_item<E: std::fmt::Display>(e: E) -> BlockStream {
|
fn error_stream_item<E: std::fmt::Display>(e: E) -> ByteStream {
|
||||||
let err = std::io::Error::new(
|
let err = std::io::Error::new(
|
||||||
std::io::ErrorKind::Other,
|
std::io::ErrorKind::Other,
|
||||||
format!("Error while getting object data: {}", e),
|
format!("Error while getting object data: {}", e),
|
||||||
|
|
|
@ -2,107 +2,98 @@ use std::path::PathBuf;
|
||||||
|
|
||||||
use bytes::Bytes;
|
use bytes::Bytes;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use zstd::stream::{decode_all as zstd_decode, Encoder};
|
use zstd::stream::Encoder;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
use garage_util::error::*;
|
use garage_util::error::*;
|
||||||
|
|
||||||
|
use garage_net::stream::ByteStream;
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
#[derive(Debug, Serialize, Deserialize, Copy, Clone)]
|
||||||
pub enum DataBlockHeader {
|
pub enum DataBlockHeader {
|
||||||
Plain,
|
Plain,
|
||||||
Compressed,
|
Compressed,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// A possibly compressed block of data
|
#[derive(Debug)]
|
||||||
pub enum DataBlock {
|
pub struct DataBlockElem<T> {
|
||||||
/// Uncompressed data
|
header: DataBlockHeader,
|
||||||
Plain(Bytes),
|
elem: T,
|
||||||
/// Data compressed with zstd
|
|
||||||
Compressed(Bytes),
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug)]
|
/// A possibly compressed block of data
|
||||||
pub enum DataBlockPath {
|
pub type DataBlock = DataBlockElem<Bytes>;
|
||||||
/// Uncompressed data fail
|
|
||||||
Plain(PathBuf),
|
/// A path to a possibly compressed block of data
|
||||||
/// Compressed data fail
|
pub type DataBlockPath = DataBlockElem<PathBuf>;
|
||||||
Compressed(PathBuf),
|
|
||||||
|
/// A stream of possibly compressed block data
|
||||||
|
pub type DataBlockStream = DataBlockElem<ByteStream>;
|
||||||
|
|
||||||
|
impl DataBlockHeader {
|
||||||
|
pub fn is_compressed(&self) -> bool {
|
||||||
|
matches!(self, DataBlockHeader::Compressed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> DataBlockElem<T> {
|
||||||
|
pub fn from_parts(header: DataBlockHeader, elem: T) -> Self {
|
||||||
|
Self { header, elem }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn plain(elem: T) -> Self {
|
||||||
|
Self {
|
||||||
|
header: DataBlockHeader::Plain,
|
||||||
|
elem,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn compressed(elem: T) -> Self {
|
||||||
|
Self {
|
||||||
|
header: DataBlockHeader::Compressed,
|
||||||
|
elem,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn into_parts(self) -> (DataBlockHeader, T) {
|
||||||
|
(self.header, self.elem)
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn as_parts_ref(&self) -> (DataBlockHeader, &T) {
|
||||||
|
(self.header, &self.elem)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl DataBlock {
|
impl DataBlock {
|
||||||
/// Query whether this block is compressed
|
/// Verify data integrity. Does not return the buffer content.
|
||||||
pub fn is_compressed(&self) -> bool {
|
|
||||||
matches!(self, DataBlock::Compressed(_))
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the inner, possibly compressed buffer. You should probably use [`DataBlock::verify_get`]
|
|
||||||
/// instead
|
|
||||||
pub fn inner_buffer(&self) -> &[u8] {
|
|
||||||
use DataBlock::*;
|
|
||||||
let (Plain(ref res) | Compressed(ref res)) = self;
|
|
||||||
res
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Get the buffer, possibly decompressing it, and verify it's integrity.
|
|
||||||
/// For Plain block, data is compared to hash, for Compressed block, zstd checksumming system
|
|
||||||
/// is used instead.
|
|
||||||
pub fn verify_get(self, hash: Hash) -> Result<Bytes, Error> {
|
|
||||||
match self {
|
|
||||||
DataBlock::Plain(data) => {
|
|
||||||
if blake2sum(&data) == hash {
|
|
||||||
Ok(data)
|
|
||||||
} else {
|
|
||||||
Err(Error::CorruptData(hash))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
DataBlock::Compressed(data) => zstd_decode(&data[..])
|
|
||||||
.map_err(|_| Error::CorruptData(hash))
|
|
||||||
.map(Bytes::from),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Verify data integrity. Allocate less than [`DataBlock::verify_get`] and don't consume self, but
|
|
||||||
/// does not return the buffer content.
|
|
||||||
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
pub fn verify(&self, hash: Hash) -> Result<(), Error> {
|
||||||
match self {
|
match self.header {
|
||||||
DataBlock::Plain(data) => {
|
DataBlockHeader::Plain => {
|
||||||
if blake2sum(data) == hash {
|
if blake2sum(&self.elem) == hash {
|
||||||
Ok(())
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
Err(Error::CorruptData(hash))
|
Err(Error::CorruptData(hash))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DataBlock::Compressed(data) => zstd::stream::copy_decode(&data[..], std::io::sink())
|
DataBlockHeader::Compressed => {
|
||||||
.map_err(|_| Error::CorruptData(hash)),
|
zstd::stream::copy_decode(&self.elem[..], std::io::sink())
|
||||||
|
.map_err(|_| Error::CorruptData(hash))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
|
pub async fn from_buffer(data: Bytes, level: Option<i32>) -> DataBlock {
|
||||||
tokio::task::spawn_blocking(move || {
|
tokio::task::spawn_blocking(move || {
|
||||||
if let Some(level) = level {
|
if let Some(level) = level {
|
||||||
if let Ok(data) = zstd_encode(&data[..], level) {
|
if let Ok(data_compressed) = zstd_encode(&data[..], level) {
|
||||||
return DataBlock::Compressed(data.into());
|
return DataBlock::compressed(data_compressed.into());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DataBlock::Plain(data)
|
DataBlock::plain(data.into())
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
.unwrap()
|
.unwrap()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn into_parts(self) -> (DataBlockHeader, Bytes) {
|
|
||||||
match self {
|
|
||||||
DataBlock::Plain(data) => (DataBlockHeader::Plain, data),
|
|
||||||
DataBlock::Compressed(data) => (DataBlockHeader::Compressed, data),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_parts(h: DataBlockHeader, bytes: Bytes) -> Self {
|
|
||||||
match h {
|
|
||||||
DataBlockHeader::Plain => DataBlock::Plain(bytes),
|
|
||||||
DataBlockHeader::Compressed => DataBlock::Compressed(bytes),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
fn zstd_encode<R: std::io::Read>(mut source: R, level: i32) -> std::io::Result<Vec<u8>> {
|
||||||
|
|
|
@ -1,5 +1,4 @@
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
use std::pin::Pin;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
@ -9,8 +8,6 @@ use bytes::Bytes;
|
||||||
use rand::prelude::*;
|
use rand::prelude::*;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use futures::Stream;
|
|
||||||
use futures_util::stream::StreamExt;
|
|
||||||
use tokio::fs;
|
use tokio::fs;
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
|
||||||
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
use tokio::sync::{mpsc, Mutex, MutexGuard};
|
||||||
|
@ -20,7 +17,7 @@ use opentelemetry::{
|
||||||
Context,
|
Context,
|
||||||
};
|
};
|
||||||
|
|
||||||
use garage_net::stream::{stream_asyncread, ByteStream};
|
use garage_net::stream::{read_stream_to_end, stream_asyncread, ByteStream};
|
||||||
|
|
||||||
use garage_db as db;
|
use garage_db as db;
|
||||||
|
|
||||||
|
@ -53,9 +50,6 @@ pub const INLINE_THRESHOLD: usize = 3072;
|
||||||
// to delete the block locally.
|
// to delete the block locally.
|
||||||
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
pub(crate) const BLOCK_GC_DELAY: Duration = Duration::from_secs(600);
|
||||||
|
|
||||||
pub type BlockStream =
|
|
||||||
Pin<Box<dyn Stream<Item = Result<Bytes, std::io::Error>> + Send + Sync + 'static>>;
|
|
||||||
|
|
||||||
/// RPC messages used to share blocks of data between nodes
|
/// RPC messages used to share blocks of data between nodes
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
pub enum BlockRpc {
|
pub enum BlockRpc {
|
||||||
|
@ -235,10 +229,8 @@ impl BlockManager {
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<(DataBlockHeader, ByteStream), Error> {
|
) -> Result<DataBlockStream, Error> {
|
||||||
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
|
self.rpc_get_raw_block_internal(hash, order_tag, |stream| async move { Ok(stream) })
|
||||||
Ok((header, stream))
|
|
||||||
})
|
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -249,10 +241,12 @@ impl BlockManager {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<DataBlock, Error> {
|
) -> Result<DataBlock, Error> {
|
||||||
self.rpc_get_raw_block_internal(hash, order_tag, |header, stream| async move {
|
self.rpc_get_raw_block_internal(hash, order_tag, |block_stream| async move {
|
||||||
|
let (header, stream) = block_stream.into_parts();
|
||||||
read_stream_to_end(stream)
|
read_stream_to_end(stream)
|
||||||
.await
|
.await
|
||||||
.map(|data| DataBlock::from_parts(header, data))
|
.err_context("error in block data stream")
|
||||||
|
.map(|data| DataBlock::from_parts(header, data.into_bytes()))
|
||||||
})
|
})
|
||||||
.await
|
.await
|
||||||
}
|
}
|
||||||
|
@ -264,7 +258,7 @@ impl BlockManager {
|
||||||
f: F,
|
f: F,
|
||||||
) -> Result<T, Error>
|
) -> Result<T, Error>
|
||||||
where
|
where
|
||||||
F: Fn(DataBlockHeader, ByteStream) -> Fut,
|
F: Fn(DataBlockStream) -> Fut,
|
||||||
Fut: futures::Future<Output = Result<T, Error>>,
|
Fut: futures::Future<Output = Result<T, Error>>,
|
||||||
{
|
{
|
||||||
let who = self.replication.read_nodes(hash);
|
let who = self.replication.read_nodes(hash);
|
||||||
|
@ -286,8 +280,8 @@ impl BlockManager {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
let (header, stream) = match res.into_parts() {
|
let block_stream = match res.into_parts() {
|
||||||
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => (header, stream),
|
(Ok(BlockRpc::PutBlock { hash: _, header }), Some(stream)) => DataBlockStream::from_parts(header, stream),
|
||||||
(Ok(_), _) => {
|
(Ok(_), _) => {
|
||||||
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
|
debug!("Get block {:?}: node {:?} returned a malformed response", hash, node);
|
||||||
continue;
|
continue;
|
||||||
|
@ -297,7 +291,7 @@ impl BlockManager {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
match f(header, stream).await {
|
match f(block_stream).await {
|
||||||
Ok(ret) => return Ok(ret),
|
Ok(ret) => return Ok(ret),
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
|
debug!("Get block {:?}: error reading stream from node {:?}: {}", hash, node, e);
|
||||||
|
@ -321,14 +315,14 @@ impl BlockManager {
|
||||||
|
|
||||||
// ---- Public interface ----
|
// ---- Public interface ----
|
||||||
|
|
||||||
/// Ask nodes that might have a block for it,
|
/// Ask nodes that might have a block for it, return it as a stream
|
||||||
/// return it as a stream
|
|
||||||
pub async fn rpc_get_block_streaming(
|
pub async fn rpc_get_block_streaming(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<BlockStream, Error> {
|
) -> Result<ByteStream, Error> {
|
||||||
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
let block_stream = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
|
||||||
|
let (header, stream) = block_stream.into_parts();
|
||||||
match header {
|
match header {
|
||||||
DataBlockHeader::Plain => Ok(stream),
|
DataBlockHeader::Plain => Ok(stream),
|
||||||
DataBlockHeader::Compressed => {
|
DataBlockHeader::Compressed => {
|
||||||
|
@ -341,15 +335,14 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Ask nodes that might have a block for it
|
/// Ask nodes that might have a block for it, return it as one big Bytes
|
||||||
pub async fn rpc_get_block(
|
pub async fn rpc_get_block(
|
||||||
&self,
|
&self,
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
order_tag: Option<OrderTag>,
|
order_tag: Option<OrderTag>,
|
||||||
) -> Result<Bytes, Error> {
|
) -> Result<Bytes, Error> {
|
||||||
self.rpc_get_raw_block(hash, order_tag)
|
let stream = self.rpc_get_block_streaming(hash, order_tag).await?;
|
||||||
.await?
|
Ok(read_stream_to_end(stream).await?.into_bytes())
|
||||||
.verify_get(*hash)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Send block to nodes that should have it
|
/// Send block to nodes that should have it
|
||||||
|
@ -482,7 +475,7 @@ impl BlockManager {
|
||||||
stream: Option<ByteStream>,
|
stream: Option<ByteStream>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let stream = stream.ok_or_message("missing stream")?;
|
let stream = stream.ok_or_message("missing stream")?;
|
||||||
let bytes = read_stream_to_end(stream).await?;
|
let bytes = read_stream_to_end(stream).await?.into_bytes();
|
||||||
let data = DataBlock::from_parts(header, bytes);
|
let data = DataBlock::from_parts(header, bytes);
|
||||||
self.write_block(&hash, &data).await
|
self.write_block(&hash, &data).await
|
||||||
}
|
}
|
||||||
|
@ -553,10 +546,7 @@ impl BlockManager {
|
||||||
hash: &Hash,
|
hash: &Hash,
|
||||||
block_path: &DataBlockPath,
|
block_path: &DataBlockPath,
|
||||||
) -> Result<DataBlock, Error> {
|
) -> Result<DataBlock, Error> {
|
||||||
let (path, compressed) = match block_path {
|
let (header, path) = block_path.as_parts_ref();
|
||||||
DataBlockPath::Plain(p) => (p, false),
|
|
||||||
DataBlockPath::Compressed(p) => (p, true),
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut f = fs::File::open(&path).await?;
|
let mut f = fs::File::open(&path).await?;
|
||||||
let mut data = vec![];
|
let mut data = vec![];
|
||||||
|
@ -564,11 +554,7 @@ impl BlockManager {
|
||||||
self.metrics.bytes_read.add(data.len() as u64);
|
self.metrics.bytes_read.add(data.len() as u64);
|
||||||
drop(f);
|
drop(f);
|
||||||
|
|
||||||
let data = if compressed {
|
let data = DataBlock::from_parts(header, data.into());
|
||||||
DataBlock::Compressed(data.into())
|
|
||||||
} else {
|
|
||||||
DataBlock::Plain(data.into())
|
|
||||||
};
|
|
||||||
|
|
||||||
if data.verify(*hash).is_err() {
|
if data.verify(*hash).is_err() {
|
||||||
self.metrics.corruption_counter.add(1);
|
self.metrics.corruption_counter.add(1);
|
||||||
|
@ -621,20 +607,20 @@ impl BlockManager {
|
||||||
// first and then a compressed one (as compression may have been
|
// first and then a compressed one (as compression may have been
|
||||||
// previously enabled).
|
// previously enabled).
|
||||||
if fs::metadata(&path).await.is_ok() {
|
if fs::metadata(&path).await.is_ok() {
|
||||||
return Some(DataBlockPath::Plain(path));
|
return Some(DataBlockPath::plain(path));
|
||||||
}
|
}
|
||||||
path.set_extension("zst");
|
path.set_extension("zst");
|
||||||
if fs::metadata(&path).await.is_ok() {
|
if fs::metadata(&path).await.is_ok() {
|
||||||
return Some(DataBlockPath::Compressed(path));
|
return Some(DataBlockPath::compressed(path));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
path.set_extension("zst");
|
path.set_extension("zst");
|
||||||
if fs::metadata(&path).await.is_ok() {
|
if fs::metadata(&path).await.is_ok() {
|
||||||
return Some(DataBlockPath::Compressed(path));
|
return Some(DataBlockPath::compressed(path));
|
||||||
}
|
}
|
||||||
path.set_extension("");
|
path.set_extension("");
|
||||||
if fs::metadata(&path).await.is_ok() {
|
if fs::metadata(&path).await.is_ok() {
|
||||||
return Some(DataBlockPath::Plain(path));
|
return Some(DataBlockPath::plain(path));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -704,8 +690,8 @@ impl BlockManagerLocked {
|
||||||
mgr: &BlockManager,
|
mgr: &BlockManager,
|
||||||
existing_path: Option<DataBlockPath>,
|
existing_path: Option<DataBlockPath>,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
let compressed = data.is_compressed();
|
let (header, data) = data.as_parts_ref();
|
||||||
let data = data.inner_buffer();
|
let compressed = header.is_compressed();
|
||||||
|
|
||||||
let directory = mgr.data_layout.load().primary_block_dir(hash);
|
let directory = mgr.data_layout.load().primary_block_dir(hash);
|
||||||
|
|
||||||
|
@ -715,24 +701,25 @@ impl BlockManagerLocked {
|
||||||
tgt_path.set_extension("zst");
|
tgt_path.set_extension("zst");
|
||||||
}
|
}
|
||||||
|
|
||||||
let to_delete = match (existing_path, compressed) {
|
let existing_info = existing_path.map(|x| x.into_parts());
|
||||||
|
let to_delete = match (existing_info, compressed) {
|
||||||
// If the block is stored in the wrong directory,
|
// If the block is stored in the wrong directory,
|
||||||
// write it again at the correct path and delete the old path
|
// write it again at the correct path and delete the old path
|
||||||
(Some(DataBlockPath::Plain(p)), false) if p != tgt_path => Some(p),
|
(Some((DataBlockHeader::Plain, p)), false) if p != tgt_path => Some(p),
|
||||||
(Some(DataBlockPath::Compressed(p)), true) if p != tgt_path => Some(p),
|
(Some((DataBlockHeader::Compressed, p)), true) if p != tgt_path => Some(p),
|
||||||
|
|
||||||
// If the block is already stored not compressed but we have a compressed
|
// If the block is already stored not compressed but we have a compressed
|
||||||
// copy, write the compressed copy and delete the uncompressed one
|
// copy, write the compressed copy and delete the uncompressed one
|
||||||
(Some(DataBlockPath::Plain(plain_path)), true) => Some(plain_path),
|
(Some((DataBlockHeader::Plain, plain_path)), true) => Some(plain_path),
|
||||||
|
|
||||||
// If the block is already stored compressed,
|
// If the block is already stored compressed,
|
||||||
// keep the stored copy, we have nothing to do
|
// keep the stored copy, we have nothing to do
|
||||||
(Some(DataBlockPath::Compressed(_)), _) => return Ok(()),
|
(Some((DataBlockHeader::Compressed, _)), _) => return Ok(()),
|
||||||
|
|
||||||
// If the block is already stored not compressed,
|
// If the block is already stored not compressed,
|
||||||
// and we don't have a compressed copy either,
|
// and we don't have a compressed copy either,
|
||||||
// keep the stored copy, we have nothing to do
|
// keep the stored copy, we have nothing to do
|
||||||
(Some(DataBlockPath::Plain(_)), false) => return Ok(()),
|
(Some((DataBlockHeader::Plain, _)), false) => return Ok(()),
|
||||||
|
|
||||||
// If the block isn't stored already, just store what is given to us
|
// If the block isn't stored already, just store what is given to us
|
||||||
(None, _) => None,
|
(None, _) => None,
|
||||||
|
@ -784,18 +771,14 @@ impl BlockManagerLocked {
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
|
async fn move_block_to_corrupted(&self, block_path: &DataBlockPath) -> Result<(), Error> {
|
||||||
let (path, path2) = match block_path {
|
let (header, path) = block_path.as_parts_ref();
|
||||||
DataBlockPath::Plain(p) => {
|
|
||||||
let mut p2 = p.clone();
|
let mut path2 = path.clone();
|
||||||
p2.set_extension("corrupted");
|
if header.is_compressed() {
|
||||||
(p, p2)
|
path2.set_extension("zst.corrupted");
|
||||||
|
} else {
|
||||||
|
path2.set_extension("corrupted");
|
||||||
}
|
}
|
||||||
DataBlockPath::Compressed(p) => {
|
|
||||||
let mut p2 = p.clone();
|
|
||||||
p2.set_extension("zst.corrupted");
|
|
||||||
(p, p2)
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
fs::rename(path, path2).await?;
|
fs::rename(path, path2).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
@ -805,9 +788,7 @@ impl BlockManagerLocked {
|
||||||
let rc = mgr.rc.get_block_rc(hash)?;
|
let rc = mgr.rc.get_block_rc(hash)?;
|
||||||
if rc.is_deletable() {
|
if rc.is_deletable() {
|
||||||
while let Some(path) = mgr.find_block(hash).await {
|
while let Some(path) = mgr.find_block(hash).await {
|
||||||
let path = match path {
|
let (_header, path) = path.as_parts_ref();
|
||||||
DataBlockPath::Plain(p) | DataBlockPath::Compressed(p) => p,
|
|
||||||
};
|
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
mgr.metrics.delete_counter.add(1);
|
mgr.metrics.delete_counter.add(1);
|
||||||
}
|
}
|
||||||
|
@ -824,24 +805,10 @@ impl BlockManagerLocked {
|
||||||
let data = mgr.read_block_from(hash, &wrong_path).await?;
|
let data = mgr.read_block_from(hash, &wrong_path).await?;
|
||||||
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
|
self.write_block_inner(hash, &data, mgr, Some(wrong_path))
|
||||||
.await?;
|
.await?;
|
||||||
Ok(data.inner_buffer().len())
|
Ok(data.as_parts_ref().1.len())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn read_stream_to_end(mut stream: ByteStream) -> Result<Bytes, Error> {
|
|
||||||
let mut parts: Vec<Bytes> = vec![];
|
|
||||||
while let Some(part) = stream.next().await {
|
|
||||||
parts.push(part.ok_or_message("error in stream")?);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(parts
|
|
||||||
.iter()
|
|
||||||
.map(|x| &x[..])
|
|
||||||
.collect::<Vec<_>>()
|
|
||||||
.concat()
|
|
||||||
.into())
|
|
||||||
}
|
|
||||||
|
|
||||||
struct DeleteOnDrop(Option<PathBuf>);
|
struct DeleteOnDrop(Option<PathBuf>);
|
||||||
|
|
||||||
impl DeleteOnDrop {
|
impl DeleteOnDrop {
|
||||||
|
|
|
@ -584,8 +584,8 @@ impl Worker for RebalanceWorker {
|
||||||
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
|
let prim_loc = self.manager.data_layout.load().primary_block_dir(&hash);
|
||||||
if path.ancestors().all(|x| x != prim_loc) {
|
if path.ancestors().all(|x| x != prim_loc) {
|
||||||
let block_path = match path.extension() {
|
let block_path = match path.extension() {
|
||||||
None => DataBlockPath::Plain(path.clone()),
|
None => DataBlockPath::plain(path.clone()),
|
||||||
Some(x) if x.to_str() == Some("zst") => DataBlockPath::Compressed(path.clone()),
|
Some(x) if x.to_str() == Some("zst") => DataBlockPath::compressed(path.clone()),
|
||||||
_ => {
|
_ => {
|
||||||
warn!("not rebalancing file: {}", path.to_string_lossy());
|
warn!("not rebalancing file: {}", path.to_string_lossy());
|
||||||
return Ok(WorkerState::Busy);
|
return Ok(WorkerState::Busy);
|
||||||
|
|
|
@ -3,6 +3,8 @@ use std::collections::VecDeque;
|
||||||
|
|
||||||
use bytes::BytesMut;
|
use bytes::BytesMut;
|
||||||
|
|
||||||
|
use crate::stream::ByteStream;
|
||||||
|
|
||||||
pub use bytes::Bytes;
|
pub use bytes::Bytes;
|
||||||
|
|
||||||
/// A circular buffer of bytes, internally represented as a list of Bytes
|
/// A circular buffer of bytes, internally represented as a list of Bytes
|
||||||
|
@ -119,6 +121,17 @@ impl BytesBuf {
|
||||||
pub fn into_slices(self) -> VecDeque<Bytes> {
|
pub fn into_slices(self) -> VecDeque<Bytes> {
|
||||||
self.buf
|
self.buf
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Return the entire buffer concatenated into a single big Bytes
|
||||||
|
pub fn into_bytes(mut self) -> Bytes {
|
||||||
|
self.take_all()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Return the content as a stream of individual chunks
|
||||||
|
pub fn into_stream(self) -> ByteStream {
|
||||||
|
use futures::stream::StreamExt;
|
||||||
|
Box::pin(futures::stream::iter(self.buf).map(|x| Ok(x)))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Default for BytesBuf {
|
impl Default for BytesBuf {
|
||||||
|
|
|
@ -200,3 +200,14 @@ pub fn asyncread_stream<R: AsyncRead + Send + Sync + 'static>(reader: R) -> Byte
|
||||||
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
|
pub fn stream_asyncread(stream: ByteStream) -> impl AsyncRead + Send + Sync + 'static {
|
||||||
tokio_util::io::StreamReader::new(stream)
|
tokio_util::io::StreamReader::new(stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Reads all of the content of a `ByteStream` into a BytesBuf
|
||||||
|
/// that contains everything
|
||||||
|
pub async fn read_stream_to_end(mut stream: ByteStream) -> Result<BytesBuf, std::io::Error> {
|
||||||
|
let mut buf = BytesBuf::new();
|
||||||
|
while let Some(part) = stream.next().await {
|
||||||
|
buf.extend(part?);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(buf)
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue