WIP: Fjall DB engine #906

Draft
withings wants to merge 5 commits from withings/garage:feat/fjall-db-engine into main
6 changed files with 131 additions and 139 deletions
Showing only changes of commit c50cab80fa - Show all commits

110
Cargo.lock generated
View file

@ -1042,6 +1042,20 @@ dependencies = [
"num_cpus",
]
[[package]]
name = "dashmap"
version = "6.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
dependencies = [
"cfg-if",
"crossbeam-utils",
"hashbrown 0.14.3",
"lock_api",
"once_cell",
"parking_lot_core 0.9.9",
]
[[package]]
name = "der"
version = "0.6.1"
@ -1142,6 +1156,18 @@ dependencies = [
"cfg-if",
]
[[package]]
name = "enum_dispatch"
version = "0.3.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn 2.0.48",
]
[[package]]
name = "env_logger"
version = "0.10.2"
@ -1221,18 +1247,18 @@ checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80"
[[package]]
name = "fjall"
version = "1.5.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e2fd11be247941253b9861c7ca94526d092d8311ac0a1740415242e3d9efd7a5"
checksum = "5e33c3128fbd83d9d70ebcf093f3f91d8c20016af0aac545f80afbadd9dcd098"
dependencies = [
"byteorder",
"crc32fast",
"fs_extra",
"dashmap 6.1.0",
"log",
"lsm-tree",
"path-absolutize",
"std-semaphore",
"tempfile",
"xxhash-rust",
]
[[package]]
@ -1254,12 +1280,6 @@ dependencies = [
name = "format_table"
version = "0.1.1"
[[package]]
name = "fs_extra"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "42703706b716c37f96a77aea830392ad231f44c9e9a67872fa5548707e11b11c"
[[package]]
name = "futures"
version = "0.3.30"
@ -2522,21 +2542,25 @@ checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24"
[[package]]
name = "lsm-tree"
version = "1.5.0"
version = "2.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6b7522d314955643781c4050b48477985e61f892434e14e9e6148d27c60435d"
checksum = "e7952bc71e90c0b58ce441dcf6cf8624cac042125dec1183ec9c48144f74378d"
dependencies = [
"byteorder",
"crc32fast",
"crossbeam-skiplist",
"double-ended-peekable",
"enum_dispatch",
"guardian",
"log",
"lz4_flex",
"path-absolutize",
"quick_cache",
"rustc-hash",
"self_cell",
"tempfile",
"value-log",
"varint-rs",
"xxhash-rust",
]
[[package]]
@ -2544,9 +2568,6 @@ name = "lz4_flex"
version = "0.11.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75761162ae2b0e580d7e7c390558127e5f01b4194debd6221fd8c207fc80e3f5"
dependencies = [
"twox-hash",
]
[[package]]
name = "matchers"
@ -2579,6 +2600,12 @@ version = "0.3.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a"
[[package]]
name = "min-max-heap"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2687e6cf9c00f48e9284cf9fd15f2ef341d03cc7743abf9df4c5f07fdee50b18"
[[package]]
name = "minimal-lexical"
version = "0.2.1"
@ -2803,7 +2830,7 @@ checksum = "6105e89802af13fdf48c49d7646d3b533a70e536d818aae7e78ba0433d01acb8"
dependencies = [
"async-trait",
"crossbeam-channel",
"dashmap",
"dashmap 4.0.2",
"fnv",
"futures-channel",
"futures-executor",
@ -3301,7 +3328,6 @@ version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2ec0b6fed0a0ff01fa82d0c8982389375dd59c72dae84d4f8a15b1a894c273f7"
dependencies = [
"ahash",
"equivalent",
"hashbrown 0.14.3",
]
@ -3548,6 +3574,12 @@ version = "0.1.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76"
[[package]]
name = "rustc-hash"
version = "2.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "583034fd73374156e66797ed8e5b0d5690409c9226b22d87cb7f19821c05d152"
[[package]]
name = "rustc_version"
version = "0.4.0"
@ -3991,12 +4023,6 @@ dependencies = [
"der",
]
[[package]]
name = "static_assertions"
version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f"
[[package]]
name = "static_init"
version = "1.0.3"
@ -4579,16 +4605,6 @@ version = "0.2.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e421abadd41a4225275504ea4d6566923418b7f05506fbc9c0fe86ba7396114b"
[[package]]
name = "twox-hash"
version = "1.6.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "97fee6b57c6a41524a810daee9286c02d7752c4253064d0b05472833a438f675"
dependencies = [
"cfg-if",
"static_assertions",
]
[[package]]
name = "typenum"
version = "1.17.0"
@ -4701,6 +4717,28 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d"
[[package]]
name = "value-log"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6e7c4b687fea1a6fe681fabdcc3e21cd01ce6df68d92c037ef2f3dacdd1daf4d"
dependencies = [
"byteorder",
"log",
"min-max-heap",
"path-absolutize",
"quick_cache",
"rustc-hash",
"tempfile",
"xxhash-rust",
]
[[package]]
name = "varint-rs"
version = "2.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
[[package]]
name = "vcpkg"
version = "0.2.15"
@ -5047,9 +5085,9 @@ checksum = "66fee0b777b0f5ac1c69bb06d361268faafa61cd4682ae064a171c16c433e9e4"
[[package]]
name = "xxhash-rust"
version = "0.8.8"
version = "0.8.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61"
checksum = "6a5cbf750400958819fb6178eaa83bee5cd9c29a26a40cc241df8c70fdd46984"
[[package]]
name = "zerocopy"

View file

@ -85,7 +85,7 @@ heed = { version = "0.11", default-features = false, features = ["lmdb"] }
rusqlite = "0.31.0"
r2d2 = "0.8"
r2d2_sqlite = "0.24"
fjall = "1.5"
fjall = "2.4"
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false }

View file

@ -1,13 +1,11 @@
use core::ops::Bound;
use core::pin::Pin;
use core::ptr::NonNull;
use std::collections::HashMap;
use std::path::PathBuf;
use std::sync::{Arc, RwLock};
use fjall::{
PartitionCreateOptions, PersistMode, ReadTransaction, TransactionalKeyspace,
PartitionCreateOptions, PersistMode, TransactionalKeyspace,
TransactionalPartitionHandle, WriteTransaction,
};
@ -185,13 +183,13 @@ impl IDb for FjallDb {
fn iter(&self, tree_idx: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
TxAndIterator::make(tx, |tx| Ok(tx.iter(&tree)))
Ok(Box::new(tx.iter(&tree).map(iterator_remap)))
}
fn iter_rev(&self, tree_idx: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
TxAndIterator::make(tx, |tx| Ok(tx.iter(&tree).rev()))
Ok(Box::new(tx.iter(&tree).rev().map(iterator_remap)))
}
fn range<'r>(
@ -202,9 +200,7 @@ impl IDb for FjallDb {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
TxAndIterator::make(tx, |tx| {
Ok(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)))
})
Ok(Box::new(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)).map(iterator_remap)))
}
fn range_rev<'r>(
&self,
@ -214,11 +210,7 @@ impl IDb for FjallDb {
) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx();
TxAndIterator::make(tx, |tx| {
Ok(tx
.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.rev())
})
Ok(Box::new(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)).rev().map(iterator_remap)))
}
// ----
@ -296,11 +288,11 @@ impl<'a> ITx for FjallTx<'a> {
fn iter(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(self.tx.iter(&tree).map(tx_iter_item)))
Ok(Box::new(self.tx.iter(&tree).map(iterator_remap_tx)))
}
fn iter_rev(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(self.tx.iter(&tree).rev().map(tx_iter_item)))
Ok(Box::new(self.tx.iter(&tree).rev().map(iterator_remap_tx)))
}
fn range<'r>(
@ -309,12 +301,10 @@ impl<'a> ITx for FjallTx<'a> {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(
self.tx
.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.map(tx_iter_item),
))
let tree = self.get_tree(tree_idx)?;
let low = clone_bound(low);
let high = clone_bound(high);
Ok(Box::new(self.tx.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high)).map(iterator_remap_tx)))
}
fn range_rev<'r>(
&self,
@ -322,92 +312,39 @@ impl<'a> ITx for FjallTx<'a> {
low: Bound<&'r [u8]>,
high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(
self.tx
.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.rev()
.map(tx_iter_item),
))
let tree = self.get_tree(tree_idx)?;
let low = clone_bound(low);
let high = clone_bound(high);
Ok(Box::new(self.tx.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high)).rev().map(iterator_remap_tx)))
}
}
// ---- iterators outside transactions ----
// complicated, they must hold the transaction object
// therefore a bit of unsafe code (it is a self-referential struct)
// -- maps fjall's (k, v) to ours
type IteratorItem = fjall::Result<(fjall::UserKey, fjall::UserValue)>;
struct TxAndIterator<I>
where
I: Iterator<Item = IteratorItem>,
{
tx: ReadTransaction,
iter: Option<I>,
fn iterator_remap(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> Result<(Value, Value)> {
r.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| e.into())
}
impl<'a, I> TxAndIterator<I>
where
I: Iterator<Item = IteratorItem> + 'a,
{
fn make<F>(tx: ReadTransaction, iterfun: F) -> Result<ValueIter<'a>>
where
F: FnOnce(&ReadTransaction) -> Result<I>,
{
let res = TxAndIterator { tx, iter: None };
let mut boxed = Box::pin(res);
// This unsafe allows us to bypass lifetime checks
let tx = unsafe { NonNull::from(&boxed.tx).as_ref() };
let iter = iterfun(tx)?;
let mut_ref = Pin::as_mut(&mut boxed);
// This unsafe allows us to write in a field of the pinned struct
unsafe {
Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
}
Ok(Box::new(TxAndIteratorPin(boxed)))
}
fn iterator_remap_tx(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> TxOpResult<(Value, Value)> {
r.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| e.into())
}
impl<'a, I> Drop for TxAndIterator<I>
where
I: Iterator<Item = IteratorItem>,
{
fn drop(&mut self) {
// ensure the iterator is dropped before the RoTxn it references
drop(self.iter.take());
}
}
struct TxAndIteratorPin<I>(Pin<Box<TxAndIterator<I>>>)
where
I: Iterator<Item = IteratorItem>;
impl<I> Iterator for TxAndIteratorPin<I>
where
I: Iterator<Item = IteratorItem>,
{
type Item = Result<(Value, Value)>;
fn next(&mut self) -> Option<Self::Item> {
let mut_ref = Pin::as_mut(&mut self.0);
// This unsafe allows us to mutably access the iterator field
let next = unsafe { Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next() };
match next {
None => None,
Some(Err(e)) => Some(Err(e.into())),
Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))),
}
}
}
// ---- iterators within transactions ----
fn tx_iter_item<'a>(
item: std::result::Result<(Arc<[u8]>, Arc<[u8]>), fjall::Error>,
) -> TxOpResult<(Vec<u8>, Vec<u8>)> {
item.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| TxOpError(Error::from(e)))
// -- utils to deal with Garage's tightness on Bound lifetimes
type ByteVecBound = Bound<Vec<u8>>;
type ByteVecRangeBounds = (ByteVecBound, ByteVecBound);
fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound {
let value = match bound {
Bound::Excluded(v) | Bound::Included(v) => v.to_vec(),
Bound::Unbounded => vec!(),
};
match bound {
Bound::Included(_) => Bound::Included(value),
Bound::Excluded(_) => Bound::Excluded(value),
Bound::Unbounded => Bound::Unbounded,
}
}

View file

@ -1,4 +1,6 @@
use std::path::PathBuf;
use std::sync::Arc;
use std::convert::TryInto;
use crate::{Db, Error, Result};
@ -54,6 +56,7 @@ impl std::str::FromStr for Engine {
pub struct OpenOpt {
pub fsync: bool,
pub lmdb_map_size: Option<usize>,
pub fjall_block_cache_size: Option<usize>,
}
impl Default for OpenOpt {
@ -61,6 +64,7 @@ impl Default for OpenOpt {
Self {
fsync: false,
lmdb_map_size: None,
fjall_block_cache_size: None,
}
}
}
@ -122,7 +126,12 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
Engine::Fjall => {
info!("Opening Fjall database at: {}", path.display());
let fsync_ms = opt.fsync.then(|| 1000 as u16);
let keyspace = fjall::Config::new(path).fsync_ms(fsync_ms).open_transactional()?;
let mut config = fjall::Config::new(path).fsync_ms(fsync_ms);
if let Some(block_cache_size) = opt.fjall_block_cache_size {
let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes(block_cache_size.try_into().unwrap()));
config = config.block_cache(block_cache);
}
let keyspace = config.open_transactional()?;
Ok(crate::fjall_adapter::FjallDb::init(path, keyspace))
}

View file

@ -134,6 +134,10 @@ impl Garage {
v if v == usize::default() => None,
v => Some(v),
},
fjall_block_cache_size: match config.fjall_block_cache_size {
v if v == usize::default() => None,
v => Some(v),
},
};
let db = db::open_db(&db_path, db_engine, &db_opt)
.ok_or_message("Unable to open metadata db")?;

View file

@ -115,6 +115,10 @@ pub struct Config {
#[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize,
/// Fjall block cache size
#[serde(deserialize_with = "deserialize_capacity", default)]
pub fjall_block_cache_size: usize,
// -- APIs
/// Configuration for S3 api
pub s3_api: S3ApiConfig,