WIP: Fjall DB engine #906

Draft
withings wants to merge 5 commits from withings/garage:feat/fjall-db-engine into main
3 changed files with 56 additions and 38 deletions
Showing only changes of commit 04d3847200 - Show all commits

View file

@ -5,8 +5,8 @@ use std::path::PathBuf;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock};
use fjall::{ use fjall::{
PartitionCreateOptions, PersistMode, TransactionalKeyspace, PartitionCreateOptions, PersistMode, TransactionalKeyspace, TransactionalPartitionHandle,
TransactionalPartitionHandle, WriteTransaction, WriteTransaction,
}; };
use crate::{ use crate::{
@ -183,13 +183,13 @@ impl IDb for FjallDb {
fn iter(&self, tree_idx: usize) -> Result<ValueIter<'_>> { fn iter(&self, tree_idx: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?; let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx(); let tx = self.keyspace.read_tx();
Ok(Box::new(tx.iter(&tree).map(iterator_remap))) Ok(Box::new(tx.iter(&tree).map(iterator_remap)))
} }
fn iter_rev(&self, tree_idx: usize) -> Result<ValueIter<'_>> { fn iter_rev(&self, tree_idx: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?; let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx(); let tx = self.keyspace.read_tx();
Ok(Box::new(tx.iter(&tree).rev().map(iterator_remap))) Ok(Box::new(tx.iter(&tree).rev().map(iterator_remap)))
} }
fn range<'r>( fn range<'r>(
@ -200,7 +200,10 @@ impl IDb for FjallDb {
) -> Result<ValueIter<'_>> { ) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?; let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx(); let tx = self.keyspace.read_tx();
Ok(Box::new(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)).map(iterator_remap))) Ok(Box::new(
tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.map(iterator_remap),
))
} }
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
@ -210,7 +213,11 @@ impl IDb for FjallDb {
) -> Result<ValueIter<'_>> { ) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree_idx)?; let tree = self.get_tree(tree_idx)?;
let tx = self.keyspace.read_tx(); let tx = self.keyspace.read_tx();
Ok(Box::new(tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high)).rev().map(iterator_remap))) Ok(Box::new(
tx.range::<&'r [u8], ByteRefRangeBound>(&tree, (low, high))
.rev()
.map(iterator_remap),
))
} }
// ---- // ----
@ -288,11 +295,11 @@ impl<'a> ITx for FjallTx<'a> {
fn iter(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> { fn iter(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone(); let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(self.tx.iter(&tree).map(iterator_remap_tx))) Ok(Box::new(self.tx.iter(&tree).map(iterator_remap_tx)))
} }
fn iter_rev(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> { fn iter_rev(&self, tree_idx: usize) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?.clone(); let tree = self.get_tree(tree_idx)?.clone();
Ok(Box::new(self.tx.iter(&tree).rev().map(iterator_remap_tx))) Ok(Box::new(self.tx.iter(&tree).rev().map(iterator_remap_tx)))
} }
fn range<'r>( fn range<'r>(
@ -302,9 +309,13 @@ impl<'a> ITx for FjallTx<'a> {
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> { ) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?; let tree = self.get_tree(tree_idx)?;
let low = clone_bound(low); let low = clone_bound(low);
let high = clone_bound(high); let high = clone_bound(high);
Ok(Box::new(self.tx.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high)).map(iterator_remap_tx))) Ok(Box::new(
self.tx
.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high))
.map(iterator_remap_tx),
))
} }
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
@ -313,22 +324,27 @@ impl<'a> ITx for FjallTx<'a> {
high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> { ) -> TxOpResult<TxValueIter<'_>> {
let tree = self.get_tree(tree_idx)?; let tree = self.get_tree(tree_idx)?;
let low = clone_bound(low); let low = clone_bound(low);
let high = clone_bound(high); let high = clone_bound(high);
Ok(Box::new(self.tx.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high)).rev().map(iterator_remap_tx))) Ok(Box::new(
self.tx
.range::<Vec<u8>, ByteVecRangeBounds>(&tree, (low, high))
.rev()
.map(iterator_remap_tx),
))
} }
} }
// -- maps fjall's (k, v) to ours // -- maps fjall's (k, v) to ours
fn iterator_remap(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> Result<(Value, Value)> { fn iterator_remap(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> Result<(Value, Value)> {
r.map(|(k, v)| (k.to_vec(), v.to_vec())) r.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
fn iterator_remap_tx(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> TxOpResult<(Value, Value)> { fn iterator_remap_tx(r: fjall::Result<(fjall::Slice, fjall::Slice)>) -> TxOpResult<(Value, Value)> {
r.map(|(k, v)| (k.to_vec(), v.to_vec())) r.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| e.into()) .map_err(|e| e.into())
} }
// -- utils to deal with Garage's tightness on Bound lifetimes // -- utils to deal with Garage's tightness on Bound lifetimes
@ -337,14 +353,14 @@ type ByteVecBound = Bound<Vec<u8>>;
type ByteVecRangeBounds = (ByteVecBound, ByteVecBound); type ByteVecRangeBounds = (ByteVecBound, ByteVecBound);
fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound { fn clone_bound(bound: Bound<&[u8]>) -> ByteVecBound {
let value = match bound { let value = match bound {
Bound::Excluded(v) | Bound::Included(v) => v.to_vec(), Bound::Excluded(v) | Bound::Included(v) => v.to_vec(),
Bound::Unbounded => vec!(), Bound::Unbounded => vec![],
}; };
match bound { match bound {
Bound::Included(_) => Bound::Included(value), Bound::Included(_) => Bound::Included(value),
Bound::Excluded(_) => Bound::Excluded(value), Bound::Excluded(_) => Bound::Excluded(value),
Bound::Unbounded => Bound::Unbounded, Bound::Unbounded => Bound::Unbounded,
} }
} }

View file

@ -1,6 +1,6 @@
use std::convert::TryInto;
use std::path::PathBuf; use std::path::PathBuf;
use std::sync::Arc; use std::sync::Arc;
use std::convert::TryInto;
use crate::{Db, Error, Result}; use crate::{Db, Error, Result};
@ -56,7 +56,7 @@ impl std::str::FromStr for Engine {
pub struct OpenOpt { pub struct OpenOpt {
pub fsync: bool, pub fsync: bool,
pub lmdb_map_size: Option<usize>, pub lmdb_map_size: Option<usize>,
pub fjall_block_cache_size: Option<usize>, pub fjall_block_cache_size: Option<usize>,
} }
impl Default for OpenOpt { impl Default for OpenOpt {
@ -64,7 +64,7 @@ impl Default for OpenOpt {
Self { Self {
fsync: false, fsync: false,
lmdb_map_size: None, lmdb_map_size: None,
fjall_block_cache_size: None, fjall_block_cache_size: None,
} }
} }
} }
@ -125,12 +125,14 @@ pub fn open_db(path: &PathBuf, engine: Engine, opt: &OpenOpt) -> Result<Db> {
#[cfg(feature = "fjall")] #[cfg(feature = "fjall")]
Engine::Fjall => { Engine::Fjall => {
info!("Opening Fjall database at: {}", path.display()); info!("Opening Fjall database at: {}", path.display());
let fsync_ms = opt.fsync.then(|| 1000 as u16); let fsync_ms = opt.fsync.then(|| 1000 as u16);
Outdated
Review

I think the correct implementation of opt.fsync == false would be to disable all fsync operations in fjall, in particular setting manual_journal_persist to true so that transactions would not do an fsync call. This is the meaning of that option for other db engines. Even with opt.fsync == false we can set fsync_ms to something reasonable like 1000, because if i understand correctly, the fsyncs will now be done by background threads at a regular interval and will not interfere with interactive operations. @marvinj97 please correct me if I'm wrong.

I think the correct implementation of `opt.fsync == false` would be to disable all fsync operations in fjall, in particular setting `manual_journal_persist` to `true` so that transactions would not do an fsync call. This is the meaning of that option for other db engines. Even with `opt.fsync == false` we can set fsync_ms to something reasonable like 1000, because if i understand correctly, the fsyncs will now be done by background threads at a regular interval and will not interfere with interactive operations. @marvinj97 please correct me if I'm wrong.

By default, every write operation (such as a WriteTx.commit) flushes to OS buffers, but not to disk. This is the same behaviour as RocksDB, and gives you crash safety, but not power loss/kernel panic safety.
manual_journal_persist skips flushing to OS buffers, so all the data is kept in the user-space BufWriter (unless it is full or you call Keyspace::persist or set a WriteTransaction::durability level), so you can lose data if the application cannot unwind properly (e.g. it is killed).

By default, every write operation (such as a WriteTx.commit) flushes to _OS buffers_, but **not** to disk. This is the same behaviour as RocksDB, and gives you crash safety, but not power loss/kernel panic safety. `manual_journal_persist` skips flushing to OS buffers, so all the data is kept in the user-space BufWriter (unless it is full or you call `Keyspace::persist` or set a `WriteTransaction::durability` level), so you can lose data if the application cannot unwind properly (e.g. it is killed).
let mut config = fjall::Config::new(path).fsync_ms(fsync_ms); let mut config = fjall::Config::new(path).fsync_ms(fsync_ms);
if let Some(block_cache_size) = opt.fjall_block_cache_size { if let Some(block_cache_size) = opt.fjall_block_cache_size {
let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes(block_cache_size.try_into().unwrap())); let block_cache = Arc::new(fjall::BlockCache::with_capacity_bytes(
config = config.block_cache(block_cache); block_cache_size.try_into().unwrap(),
} ));
config = config.block_cache(block_cache);
}
let keyspace = config.open_transactional()?; let keyspace = config.open_transactional()?;
Ok(crate::fjall_adapter::FjallDb::init(path, keyspace)) Ok(crate::fjall_adapter::FjallDb::init(path, keyspace))
} }

View file

@ -115,8 +115,8 @@ pub struct Config {
#[serde(deserialize_with = "deserialize_capacity", default)] #[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize, pub lmdb_map_size: usize,
/// Fjall block cache size /// Fjall block cache size
#[serde(deserialize_with = "deserialize_capacity", default)] #[serde(deserialize_with = "deserialize_capacity", default)]
pub fjall_block_cache_size: usize, pub fjall_block_cache_size: usize,
// -- APIs // -- APIs