diff --git a/Cargo.lock b/Cargo.lock index 164ba03d..bae0dbf0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1042,6 +1042,20 @@ dependencies = [ "num_cpus", ] +[[package]] +name = "dashmap" +version = "6.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" +dependencies = [ + "cfg-if", + "crossbeam-utils", + "hashbrown 0.14.3", + "lock_api", + "once_cell", + "parking_lot_core 0.9.9", +] + [[package]] name = "der" version = "0.6.1" @@ -1142,6 +1156,18 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "enum_dispatch" +version = "0.3.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" +dependencies = [ + "once_cell", + "proc-macro2", + "quote", + "syn 2.0.48", +] + [[package]] name = "env_logger" version = "0.10.2" @@ -1221,18 +1247,18 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" [[package]] name = "fjall" -version = "1.5.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2fd11be247941253b9861c7ca94526d092d8311ac0a1740415242e3d9efd7a5" +checksum = "5e33c3128fbd83d9d70ebcf093f3f91d8c20016af0aac545f80afbadd9dcd098" dependencies = [ "byteorder", - "crc32fast", - "fs_extra", + "dashmap 6.1.0", "log", "lsm-tree", "path-absolutize", "std-semaphore", "tempfile", + "xxhash-rust", ] [[package]] @@ -1254,12 +1280,6 @@ dependencies = [ name = "format_table" version = "0.1.1" -[[package]] -name = "fs_extra" -version = "1.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c" - [[package]] name = "futures" version = "0.3.30" @@ -2522,21 +2542,25 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" [[package]] name = "lsm-tree" -version = "1.5.0" +version = "2.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e6b7522d314955643781c4050b48477985e61f892434e14e9e6148d27c60435d" +checksum = "e7952bc71e90c0b58ce441dcf6cf8624cac042125dec1183ec9c48144f74378d" dependencies = [ "byteorder", - "crc32fast", "crossbeam-skiplist", "double-ended-peekable", + "enum_dispatch", "guardian", "log", "lz4_flex", "path-absolutize", "quick_cache", + "rustc-hash", "self_cell", "tempfile", + "value-log", + "varint-rs", + "xxhash-rust", ] [[package]] @@ -2544,9 +2568,6 @@ name = "lz4_flex" version = "0.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5" -dependencies = [ - "twox-hash", -] [[package]] name = "matchers" @@ -2579,6 +2600,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "min-max-heap" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18" + [[package]] name = "minimal-lexical" version = "0.2.1" @@ -2803,7 +2830,7 @@ checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8" dependencies = [ "async-trait", "crossbeam-channel", - "dashmap", + "dashmap 4.0.2", "fnv", "futures-channel", "futures-executor", @@ -3301,7 +3328,6 @@ version = "0.6.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2ec0b6fed0a0ff01fa82d0c8982389375dd59c72dae84d4f8a15b1a894c273f7" dependencies = [ - "ahash", "equivalent", "hashbrown 0.14.3", ] @@ -3548,6 +3574,12 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc-hash" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152" + [[package]] name = "rustc_version" version = "0.4.0" @@ -3991,12 +4023,6 @@ dependencies = [ "der", ] -[[package]] -name = "static_assertions" -version = "1.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" - [[package]] name = "static_init" version = "1.0.3" @@ -4579,16 +4605,6 @@ version = "0.2.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b" -[[package]] -name = "twox-hash" -version = "1.6.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675" -dependencies = [ - "cfg-if", - "static_assertions", -] - [[package]] name = "typenum" version = "1.17.0" @@ -4701,6 +4717,28 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-log" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e7c4b687fea1a6fe681fabdcc3e21cd01ce6df68d92c037ef2f3dacdd1daf4d" +dependencies = [ + "byteorder", + "log", + "min-max-heap", + "path-absolutize", + "quick_cache", + "rustc-hash", + "tempfile", + "xxhash-rust", +] + +[[package]] +name = "varint-rs" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" + [[package]] name = "vcpkg" version = "0.2.15" @@ -5047,9 +5085,9 @@ checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4" [[package]] name = "xxhash-rust" -version = "0.8.8" +version = "0.8.12" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61" +checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984" [[package]] name = "zerocopy" diff --git a/Cargo.toml b/Cargo.toml index fe3b32ea..93c208a0 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -85,7 +85,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] } rusqlite = "0.31.0" r2d2 = "0.8" r2d2_sqlite = "0.24" -fjall = "1.5" +fjall = "2.4" async-compression = { version = "0.4", features = ["tokio", "zstd"] } zstd = { version = "0.13", default-features = false } diff --git a/src/db/fjall_adapter.rs b/src/db/fjall_adapter.rs index 91703ddb..3094d64f 100644 --- a/src/db/fjall_adapter.rs +++ b/src/db/fjall_adapter.rs @@ -1,13 +1,11 @@ use core::ops::Bound; -use core::pin::Pin; -use core::ptr::NonNull; use std::collections::HashMap; use std::path::PathBuf; use std::sync::{Arc, RwLock}; use fjall::{ - PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace, + PartitionCreateOptions, PersistMode, TransactionalKeyspace, TransactionalPartitionHandle, WriteTransaction, }; @@ -185,13 +183,13 @@ impl IDb for FjallDb { fn iter(&self, tree_idx: usize) -> Result> { let tree = self.get_tree(tree_idx)?; let tx = self.keyspace.read_tx(); - TxAndIterator::make(tx, |tx| Ok(tx.iter(&tree))) + Ok(Box::new(tx.iter(&tree).map(iterator_remap))) } fn iter_rev(&self, tree_idx: usize) -> Result> { let tree = self.get_tree(tree_idx)?; let tx = self.keyspace.read_tx(); - TxAndIterator::make(tx, |tx| Ok(tx.iter(&tree).rev())) + Ok(Box::new(tx.iter(&tree).rev().map(iterator_remap))) } fn range<'r>( @@ -202,9 +200,7 @@ impl IDb for FjallDb { ) -> Result> { let tree = self.get_tree(tree_idx)?; let tx = self.keyspace.read_tx(); - TxAndIterator::make(tx, |tx| { - Ok(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))) - }) + Ok(Box::new(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)).map(iterator_remap))) } fn range_rev<'r>( &self, @@ -214,11 +210,7 @@ impl IDb for FjallDb { ) -> Result> { let tree = self.get_tree(tree_idx)?; let tx = self.keyspace.read_tx(); - TxAndIterator::make(tx, |tx| { - Ok(tx - .range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)) - .rev()) - }) + Ok(Box::new(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)).rev().map(iterator_remap))) } // ---- @@ -296,11 +288,11 @@ impl<'a> ITx for FjallTx<'a> { fn iter(&self, tree_idx: usize) -> TxOpResult> { let tree = self.get_tree(tree_idx)?.clone(); - Ok(Box::new(self.tx.iter(&tree).map(tx_iter_item))) + Ok(Box::new(self.tx.iter(&tree).map(iterator_remap_tx))) } fn iter_rev(&self, tree_idx: usize) -> TxOpResult> { let tree = self.get_tree(tree_idx)?.clone(); - Ok(Box::new(self.tx.iter(&tree).rev().map(tx_iter_item))) + Ok(Box::new(self.tx.iter(&tree).rev().map(iterator_remap_tx))) } fn range<'r>( @@ -309,12 +301,10 @@ impl<'a> ITx for FjallTx<'a> { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> TxOpResult> { - let tree = self.get_tree(tree_idx)?.clone(); - Ok(Box::new( - self.tx - .range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)) - .map(tx_iter_item), - )) + let tree = self.get_tree(tree_idx)?; + let low = clone_bound(low); + let high = clone_bound(high); + Ok(Box::new(self.tx.range::, ByteVecRangeBounds>(&tree, (low, high)).map(iterator_remap_tx))) } fn range_rev<'r>( &self, @@ -322,92 +312,39 @@ impl<'a> ITx for FjallTx<'a> { low: Bound<&'r [u8]>, high: Bound<&'r [u8]>, ) -> TxOpResult> { - let tree = self.get_tree(tree_idx)?.clone(); - Ok(Box::new( - self.tx - .range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)) - .rev() - .map(tx_iter_item), - )) + let tree = self.get_tree(tree_idx)?; + let low = clone_bound(low); + let high = clone_bound(high); + Ok(Box::new(self.tx.range::, ByteVecRangeBounds>(&tree, (low, high)).rev().map(iterator_remap_tx))) } } -// ---- iterators outside transactions ---- -// complicated, they must hold the transaction object -// therefore a bit of unsafe code (it is a self-referential struct) +// -- maps fjall's (k, v) to ours -type IteratorItem = fjall::Result<(fjall::UserKey, fjall::UserValue)>; - -struct TxAndIterator -where - I: Iterator, -{ - tx: ReadTransaction, - iter: Option, +fn iterator_remap(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> Result<(Value, Value)> { + r.map(|(k, v)| (k.to_vec(), v.to_vec())) + .map_err(|e| e.into()) } -impl<'a, I> TxAndIterator -where - I: Iterator + 'a, -{ - fn make(tx: ReadTransaction, iterfun: F) -> Result> - where - F: FnOnce(&ReadTransaction) -> Result, - { - let res = TxAndIterator { tx, iter: None }; - let mut boxed = Box::pin(res); - - // This unsafe allows us to bypass lifetime checks - let tx = unsafe { NonNull::from(&boxed.tx).as_ref() }; - let iter = iterfun(tx)?; - - let mut_ref = Pin::as_mut(&mut boxed); - // This unsafe allows us to write in a field of the pinned struct - unsafe { - Pin::get_unchecked_mut(mut_ref).iter = Some(iter); - } - - Ok(Box::new(TxAndIteratorPin(boxed))) - } +fn iterator_remap_tx(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> TxOpResult<(Value, Value)> { + r.map(|(k, v)| (k.to_vec(), v.to_vec())) + .map_err(|e| e.into()) } -impl<'a, I> Drop for TxAndIterator -where - I: Iterator, -{ - fn drop(&mut self) { - // ensure the iterator is dropped before the RoTxn it references - drop(self.iter.take()); - } -} - -struct TxAndIteratorPin(Pin>>) -where - I: Iterator; - -impl Iterator for TxAndIteratorPin -where - I: Iterator, -{ - type Item = Result<(Value, Value)>; - - fn next(&mut self) -> Option { - let mut_ref = Pin::as_mut(&mut self.0); - // This unsafe allows us to mutably access the iterator field - let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() }; - match next { - None => None, - Some(Err(e)) => Some(Err(e.into())), - Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))), - } - } -} - -// ---- iterators within transactions ---- - -fn tx_iter_item<'a>( - item: std::result::Result<(Arc<[u8]>, Arc<[u8]>), fjall::Error>, -) -> TxOpResult<(Vec, Vec)> { - item.map(|(k, v)| (k.to_vec(), v.to_vec())) - .map_err(|e| TxOpError(Error::from(e))) +// -- utils to deal with Garage's tightness on Bound lifetimes + +type ByteVecBound = Bound>; +type ByteVecRangeBounds = (ByteVecBound, ByteVecBound); + +fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound { + let value = match bound { + Bound::Excluded(v) | Bound::Included(v) => v.to_vec(), + Bound::Unbounded => vec!(), + }; + + match bound { + Bound::Included(_) => Bound::Included(value), + Bound::Excluded(_) => Bound::Excluded(value), + Bound::Unbounded => Bound::Unbounded, + } } diff --git a/src/db/open.rs b/src/db/open.rs index 19cb4e87..a6ec7b00 100644 --- a/src/db/open.rs +++ b/src/db/open.rs @@ -1,4 +1,6 @@ use std::path::PathBuf; +use std::sync::Arc; +use std::convert::TryInto; use crate::{Db, Error, Result}; @@ -54,6 +56,7 @@ impl std::str::FromStr for Engine { pub struct OpenOpt { pub fsync: bool, pub lmdb_map_size: Option, + pub fjall_block_cache_size: Option, } impl Default for OpenOpt { @@ -61,6 +64,7 @@ impl Default for OpenOpt { Self { fsync: false, lmdb_map_size: None, + fjall_block_cache_size: None, } } } @@ -122,7 +126,12 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result { Engine::Fjall => { info!("Opening Fjall database at: {}", path.display()); let fsync_ms = opt.fsync.then(|| 1000 as u16); - let keyspace = fjall::Config::new(path).fsync_ms(fsync_ms).open_transactional()?; + let mut config = fjall::Config::new(path).fsync_ms(fsync_ms); + if let Some(block_cache_size) = opt.fjall_block_cache_size { + let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes(block_cache_size.try_into().unwrap())); + config = config.block_cache(block_cache); + } + let keyspace = config.open_transactional()?; Ok(crate::fjall_adapter::FjallDb::init(path, keyspace)) } diff --git a/src/model/garage.rs b/src/model/garage.rs index e22b533f..96f187be 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -134,6 +134,10 @@ impl Garage { v if v == usize::default() => None, v => Some(v), }, + fjall_block_cache_size: match config.fjall_block_cache_size { + v if v == usize::default() => None, + v => Some(v), + }, }; let db = db::open_db(&db_path, db_engine, &db_opt) .ok_or_message("Unable to open metadata db")?; diff --git a/src/util/config.rs b/src/util/config.rs index 59329c0b..afdd8374 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -115,6 +115,10 @@ pub struct Config { #[serde(deserialize_with = "deserialize_capacity", default)] pub lmdb_map_size: usize, + /// Fjall block cache size + #[serde(deserialize_with = "deserialize_capacity", default)] + pub fjall_block_cache_size: usize, + // -- APIs /// Configuration for S3 api pub s3_api: S3ApiConfig,