performance improvements #342

Merged
lx merged 21 commits from lx-perf-improvements into main 2022-09-12 14:38:44 +00:00
4 changed files with 59 additions and 28 deletions
Showing only changes of commit 2f111e6b3d - Show all commits

View file

@ -8,6 +8,11 @@ use hyper::{Request, Response};
use md5::{digest::generic_array::*, Digest as Md5Digest, Md5}; use md5::{digest::generic_array::*, Digest as Md5Digest, Md5};
use sha2::Sha256; use sha2::Sha256;
use opentelemetry::{
trace::{FutureExt as OtelFutureExt, TraceContextExt, Tracer},
Context,
};
use garage_table::*; use garage_table::*;
use garage_util::async_hash::*; use garage_util::async_hash::*;
use garage_util::data::*; use garage_util::data::*;
@ -279,12 +284,21 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
first_block_hash: Hash, first_block_hash: Hash,
chunker: &mut StreamChunker<S>, chunker: &mut StreamChunker<S>,
) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> { ) -> Result<(u64, GenericArray<u8, typenum::U16>, Hash), Error> {
let tracer = opentelemetry::global::tracer("garage");
let first_block = Bytes::from(first_block); let first_block = Bytes::from(first_block);
let md5hasher = AsyncHasher::<Md5>::new(); let md5hasher = AsyncHasher::<Md5>::new();
let sha256hasher = AsyncHasher::<Sha256>::new(); let sha256hasher = AsyncHasher::<Sha256>::new();
md5hasher.update(first_block.clone());
sha256hasher.update(first_block.clone()); futures::future::join(
md5hasher.update(first_block.clone()),
sha256hasher.update(first_block.clone()),
)
.with_context(Context::current_with_span(
tracer.start("Hash first block (md5, sha256)"),
))
.await;
let mut next_offset = first_block.len(); let mut next_offset = first_block.len();
let mut put_curr_version_block = put_block_meta( let mut put_curr_version_block = put_block_meta(
@ -307,9 +321,15 @@ async fn read_and_put_blocks<S: Stream<Item = Result<Bytes, Error>> + Unpin>(
)?; )?;
if let Some(block) = next_block { if let Some(block) = next_block {
let block = Bytes::from(block); let block = Bytes::from(block);
md5hasher.update(block.clone()); let (_, _, block_hash) = futures::future::join3(
sha256hasher.update(block.clone()); md5hasher.update(block.clone()),
let block_hash = async_blake2sum(block.clone()).await; sha256hasher.update(block.clone()),
async_blake2sum(block.clone()),
)
.with_context(Context::current_with_span(
tracer.start("Hash block (md5, sha256, blake2)"),
))
.await;
let block_len = block.len(); let block_len = block.len();
put_curr_version_block = put_block_meta( put_curr_version_block = put_block_meta(
garage, garage,

View file

@ -93,7 +93,7 @@ pub struct BlockManager {
compression_level: Option<i32>, compression_level: Option<i32>,
background_tranquility: u32, background_tranquility: u32,
mutation_lock: Mutex<BlockManagerLocked>, mutation_lock: [Mutex<BlockManagerLocked>; 256],
pub(crate) rc: BlockRc, pub(crate) rc: BlockRc,
@ -150,8 +150,6 @@ impl BlockManager {
.netapp .netapp
.endpoint("garage_block/manager.rs/Rpc".to_string()); .endpoint("garage_block/manager.rs/Rpc".to_string());
let manager_locked = BlockManagerLocked();
let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone()); let metrics = BlockManagerMetrics::new(resync_queue.clone(), resync_errors.clone());
let block_manager = Arc::new(Self { let block_manager = Arc::new(Self {
@ -159,7 +157,7 @@ impl BlockManager {
data_dir, data_dir,
compression_level, compression_level,
background_tranquility, background_tranquility,
mutation_lock: Mutex::new(manager_locked), mutation_lock: [(); 256].map(|_| Mutex::new(BlockManagerLocked())),
rc, rc,
resync_queue, resync_queue,
resync_notify: Notify::new(), resync_notify: Notify::new(),
@ -313,14 +311,21 @@ impl BlockManager {
/// Write a block to disk /// Write a block to disk
async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> { async fn write_block(&self, hash: &Hash, data: &DataBlock) -> Result<BlockRpc, Error> {
let tracer = opentelemetry::global::tracer("garage");
let write_size = data.inner_buffer().len() as u64; let write_size = data.inner_buffer().len() as u64;
let res = self let res = self.mutation_lock[hash.as_slice()[0] as usize]
.mutation_lock
.lock() .lock()
.with_context(Context::current_with_span(
tracer.start("Acquire mutation_lock"),
))
.await .await
.write_block(hash, data, self) .write_block(hash, data, self)
.bound_record_duration(&self.metrics.block_write_duration) .bound_record_duration(&self.metrics.block_write_duration)
.with_context(Context::current_with_span(
tracer.start("BlockManagerLocked::write_block"),
))
.await?; .await?;
self.metrics.bytes_written.add(write_size); self.metrics.bytes_written.add(write_size);
@ -370,7 +375,7 @@ impl BlockManager {
if data.verify(*hash).is_err() { if data.verify(*hash).is_err() {
self.metrics.corruption_counter.add(1); self.metrics.corruption_counter.add(1);
self.mutation_lock self.mutation_lock[hash.as_slice()[0] as usize]
.lock() .lock()
.await .await
.move_block_to_corrupted(hash, self) .move_block_to_corrupted(hash, self)
@ -384,8 +389,7 @@ impl BlockManager {
/// Check if this node should have a block, but don't actually have it /// Check if this node should have a block, but don't actually have it
async fn need_block(&self, hash: &Hash) -> Result<bool, Error> { async fn need_block(&self, hash: &Hash) -> Result<bool, Error> {
let BlockStatus { exists, needed } = self let BlockStatus { exists, needed } = self.mutation_lock[hash.as_slice()[0] as usize]
.mutation_lock
.lock() .lock()
.await .await
.check_block_status(hash, self) .check_block_status(hash, self)
@ -608,8 +612,7 @@ impl BlockManager {
} }
async fn resync_block(&self, hash: &Hash) -> Result<(), Error> { async fn resync_block(&self, hash: &Hash) -> Result<(), Error> {
let BlockStatus { exists, needed } = self let BlockStatus { exists, needed } = self.mutation_lock[hash.as_slice()[0] as usize]
.mutation_lock
.lock() .lock()
.await .await
.check_block_status(hash, self) .check_block_status(hash, self)
@ -694,7 +697,7 @@ impl BlockManager {
who.len() who.len()
); );
self.mutation_lock self.mutation_lock[hash.as_slice()[0] as usize]
.lock() .lock()
.await .await
.delete_if_unneeded(hash, self) .delete_if_unneeded(hash, self)

View file

@ -104,11 +104,16 @@ impl Garage {
std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory"); std::fs::create_dir_all(&db_path).expect("Unable to create LMDB data directory");
let map_size = garage_db::lmdb_adapter::recommended_map_size(); let map_size = garage_db::lmdb_adapter::recommended_map_size();
let db = db::lmdb_adapter::heed::EnvOpenOptions::new() use db::lmdb_adapter::heed;
.max_dbs(100) let mut env_builder = heed::EnvOpenOptions::new();
.map_size(map_size) env_builder.max_dbs(100);
.open(&db_path) env_builder.max_readers(500);
.expect("Unable to open LMDB DB"); env_builder.map_size(map_size);
unsafe {
env_builder.flag(heed::flags::Flags::MdbNoSync);
env_builder.flag(heed::flags::Flags::MdbNoMetaSync);
}
let db = env_builder.open(&db_path).expect("Unable to open LMDB DB");
db::lmdb_adapter::LmdbDb::init(db) db::lmdb_adapter::LmdbDb::init(db)
} }
e => { e => {

View file

@ -1,7 +1,7 @@
use bytes::Bytes; use bytes::Bytes;
use digest::Digest; use digest::Digest;
use tokio::sync::mpsc; use tokio::sync::{mpsc, oneshot};
use tokio::task::JoinHandle; use tokio::task::JoinHandle;
use crate::data::*; use crate::data::*;
@ -27,25 +27,28 @@ pub async fn async_blake2sum(data: Bytes) -> Hash {
// ---- // ----
pub struct AsyncHasher<D: Digest> { pub struct AsyncHasher<D: Digest> {
sendblk: mpsc::UnboundedSender<Bytes>, sendblk: mpsc::UnboundedSender<(Bytes, oneshot::Sender<()>)>,
task: JoinHandle<digest::Output<D>>, task: JoinHandle<digest::Output<D>>,
} }
impl<D: Digest> AsyncHasher<D> { impl<D: Digest> AsyncHasher<D> {
pub fn new() -> Self { pub fn new() -> Self {
let (sendblk, mut recvblk) = mpsc::unbounded_channel::<Bytes>(); let (sendblk, mut recvblk) = mpsc::unbounded_channel::<(Bytes, oneshot::Sender<()>)>();
let task = tokio::task::spawn_blocking(move || { let task = tokio::task::spawn_blocking(move || {
let mut digest = D::new(); let mut digest = D::new();
while let Some(blk) = recvblk.blocking_recv() { while let Some((blk, ch)) = recvblk.blocking_recv() {
digest.update(&blk[..]); digest.update(&blk[..]);
let _ = ch.send(());
} }
digest.finalize() digest.finalize()
}); });
Self { sendblk, task } Self { sendblk, task }
} }
pub fn update(&self, b: Bytes) { pub async fn update(&self, b: Bytes) {
self.sendblk.send(b).unwrap() let (tx, rx) = oneshot::channel();
self.sendblk.send((b, tx)).unwrap();
let _ = rx.await;
} }
pub async fn finalize(self) -> digest::Output<D> { pub async fn finalize(self) -> digest::Output<D> {