forked from Deuxfleurs/garage
block manager: refactor and fix monitoring/statistics
This commit is contained in:
parent
f38a31b330
commit
99ed18350f
1 changed files with 20 additions and 27 deletions
|
@ -491,8 +491,6 @@ impl BlockManager {
|
||||||
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
|
pub(crate) async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<(), Error> {
|
||||||
let tracer = opentelemetry::global::tracer("garage");
|
let tracer = opentelemetry::global::tracer("garage");
|
||||||
|
|
||||||
let write_size = data.inner_buffer().len() as u64;
|
|
||||||
|
|
||||||
self.lock_mutate(hash)
|
self.lock_mutate(hash)
|
||||||
.await
|
.await
|
||||||
.write_block(hash, data, self)
|
.write_block(hash, data, self)
|
||||||
|
@ -502,8 +500,6 @@ impl BlockManager {
|
||||||
))
|
))
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
self.metrics.bytes_written.add(write_size);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -530,31 +526,26 @@ impl BlockManager {
|
||||||
|
|
||||||
/// Read block from disk, verifying it's integrity
|
/// Read block from disk, verifying it's integrity
|
||||||
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
|
pub(crate) async fn read_block(&self, hash: &Hash) -> Result<DataBlock, Error> {
|
||||||
let data = self
|
let tracer = opentelemetry::global::tracer("garage");
|
||||||
.read_block_internal(hash)
|
async {
|
||||||
.bound_record_duration(&self.metrics.block_read_duration)
|
match self.find_block(hash).await {
|
||||||
.await?;
|
Some(p) => self.read_block_from(hash, &p).await,
|
||||||
|
None => {
|
||||||
self.metrics
|
// Not found but maybe we should have had it ??
|
||||||
.bytes_read
|
self.resync
|
||||||
.add(data.inner_buffer().len() as u64);
|
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
|
||||||
|
return Err(Error::Message(format!(
|
||||||
Ok(data)
|
"block {:?} not found on node",
|
||||||
}
|
hash
|
||||||
|
)));
|
||||||
async fn read_block_internal(&self, hash: &Hash) -> Result<DataBlock, Error> {
|
}
|
||||||
match self.find_block(hash).await {
|
|
||||||
Some(p) => self.read_block_from(hash, &p).await,
|
|
||||||
None => {
|
|
||||||
// Not found but maybe we should have had it ??
|
|
||||||
self.resync
|
|
||||||
.put_to_resync(hash, 2 * self.system.rpc.rpc_timeout())?;
|
|
||||||
return Err(Error::Message(format!(
|
|
||||||
"block {:?} not found on node",
|
|
||||||
hash
|
|
||||||
)));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
.bound_record_duration(&self.metrics.block_read_duration)
|
||||||
|
.with_context(Context::current_with_span(
|
||||||
|
tracer.start("BlockManager::read_block"),
|
||||||
|
))
|
||||||
|
.await
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn read_block_from(
|
pub(crate) async fn read_block_from(
|
||||||
|
@ -570,6 +561,7 @@ impl BlockManager {
|
||||||
let mut f = fs::File::open(&path).await?;
|
let mut f = fs::File::open(&path).await?;
|
||||||
let mut data = vec![];
|
let mut data = vec![];
|
||||||
f.read_to_end(&mut data).await?;
|
f.read_to_end(&mut data).await?;
|
||||||
|
self.metrics.bytes_read.add(data.len() as u64);
|
||||||
drop(f);
|
drop(f);
|
||||||
|
|
||||||
let data = if compressed {
|
let data = if compressed {
|
||||||
|
@ -731,6 +723,7 @@ impl BlockManagerLocked {
|
||||||
|
|
||||||
let mut f = fs::File::create(&path_tmp).await?;
|
let mut f = fs::File::create(&path_tmp).await?;
|
||||||
f.write_all(data).await?;
|
f.write_all(data).await?;
|
||||||
|
mgr.metrics.bytes_written.add(data.len() as u64);
|
||||||
|
|
||||||
if mgr.data_fsync {
|
if mgr.data_fsync {
|
||||||
f.sync_all().await?;
|
f.sync_all().await?;
|
||||||
|
|
Loading…
Reference in a new issue