use netapp streaming body #343

Merged
lx merged 31 commits from netapp-stream-body into main 2022-09-13 13:26:09 +00:00
Showing only changes of commit b823151a0b - Show all commits

View file

@ -11,7 +11,7 @@ use futures::Stream;
use futures_util::stream::StreamExt;
use tokio::fs;
use tokio::io::{AsyncReadExt, AsyncWriteExt, BufReader};
use tokio::sync::{mpsc, Mutex};
use tokio::sync::{mpsc, Mutex, MutexGuard};
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
@ -261,7 +261,7 @@ impl BlockManager {
> {
let (header, stream) = self.rpc_get_raw_block_streaming(hash, order_tag).await?;
match header {
DataBlockHeader::Plain => Ok(Box::pin(stream)),
DataBlockHeader::Plain => Ok(stream),
DataBlockHeader::Compressed => {
// Too many things, I hate it.
let reader = stream_asyncread(stream);
@ -389,11 +389,7 @@ impl BlockManager {
let write_size = data.inner_buffer().len() as u64;
self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"),
))
self.lock_mutate(hash)
.await
.write_block(hash, data, self)
.bound_record_duration(&self.metrics.block_write_duration)
@ -470,8 +466,7 @@ impl BlockManager {
if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1);
self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
self.lock_mutate(hash)
.await
.move_block_to_corrupted(hash, self)
.await?;
@ -484,8 +479,7 @@ impl BlockManager {
/// Check if this node has a block and whether it needs it
pub(crate) async fn check_block_status(&self, hash: &Hash) -> Result<BlockStatus, Error> {
self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
self.lock_mutate(hash)
.await
.check_block_status(hash, self)
.await
@ -499,8 +493,7 @@ impl BlockManager {
/// Delete block if it is not needed anymore
pub(crate) async fn delete_if_unneeded(&self, hash: &Hash) -> Result<(), Error> {
self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
self.lock_mutate(hash)
.await
.delete_if_unneeded(hash, self)
.await
@ -532,6 +525,16 @@ impl BlockManager {
path.set_extension("");
fs::metadata(&path).await.map(|_| false).map_err(Into::into)
}
async fn lock_mutate(&self, hash: &Hash) -> MutexGuard<'_, BlockManagerLocked> {
let tracer = opentelemetry::global::tracer("garage");
self.mutation_lock[hash.as_slice()[0] as usize]
.lock()
.with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"),
))
.await
}
}
#[async_trait]