[rm-sled] Implement iterators in sqlite & lmdb transactions
All checks were successful
ci/woodpecker/push/debug Pipeline was successful
ci/woodpecker/pr/debug Pipeline was successful

with way too much unsafe code
This commit is contained in:
Alex 2024-03-08 16:38:01 +01:00
parent 66c23890c1
commit b942949940
Signed by: lx
GPG key ID: 0E496D15096376BE
4 changed files with 195 additions and 28 deletions

View file

@ -51,6 +51,7 @@ pub type Result<T> = std::result::Result<T, Error>;
pub struct TxOpError(pub(crate) Error); pub struct TxOpError(pub(crate) Error);
pub type TxOpResult<T> = std::result::Result<T, TxOpError>; pub type TxOpResult<T> = std::result::Result<T, TxOpError>;
#[derive(Debug)]
pub enum TxError<E> { pub enum TxError<E> {
Abort(E), Abort(E),
Db(Error), Db(Error),

View file

@ -261,32 +261,42 @@ impl<'a> ITx for LmdbTx<'a> {
Ok(()) Ok(())
} }
fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); let tree = *self.get_tree(tree)?;
Ok(Box::new(tree.iter(&self.tx)?.map(tx_iter_item)))
} }
fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); let tree = *self.get_tree(tree)?;
Ok(Box::new(tree.rev_iter(&self.tx)?.map(tx_iter_item)))
} }
fn range<'r>( fn range<'r>(
&self, &self,
_tree: usize, tree: usize,
_low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> { ) -> TxOpResult<TxValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); let tree = *self.get_tree(tree)?;
Ok(Box::new(
tree.range(&self.tx, &(low, high))?.map(tx_iter_item),
))
} }
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
_tree: usize, tree: usize,
_low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> { ) -> TxOpResult<TxValueIter<'_>> {
unimplemented!("Iterators in transactions not supported with LMDB backend"); let tree = *self.get_tree(tree)?;
Ok(Box::new(
tree.rev_range(&self.tx, &(low, high))?.map(tx_iter_item),
))
} }
} }
// ---- // ---- iterators outside transactions ----
// complicated, they must hold the transaction object
// therefore a bit of unsafe code (it is a self-referential struct)
type IteratorItem<'a> = heed::Result<( type IteratorItem<'a> = heed::Result<(
<ByteSlice as BytesDecode<'a>>::DItem, <ByteSlice as BytesDecode<'a>>::DItem,
@ -323,6 +333,7 @@ where
I: Iterator<Item = IteratorItem<'a>> + 'a, I: Iterator<Item = IteratorItem<'a>> + 'a,
{ {
fn drop(&mut self) { fn drop(&mut self) {
// ensure the iterator is dropped before the RoTxn it references
drop(self.iter.take()); drop(self.iter.take());
} }
} }
@ -342,7 +353,16 @@ where
} }
} }
// ---- // ---- iterators within transactions ----
fn tx_iter_item<'a>(
item: std::result::Result<(&'a [u8], &'a [u8]), heed::Error>,
) -> TxOpResult<(Vec<u8>, Vec<u8>)> {
item.map(|(k, v)| (k.to_vec(), v.to_vec()))
.map_err(|e| TxOpError(Error::from(e)))
}
// ---- utility ----
#[cfg(target_pointer_width = "64")] #[cfg(target_pointer_width = "64")]
pub fn recommended_map_size() -> usize { pub fn recommended_map_size() -> usize {

View file

@ -369,32 +369,58 @@ impl<'a> ITx for SqliteTx<'a> {
Ok(()) Ok(())
} }
fn iter(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { fn iter(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
unimplemented!(); let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
TxValueIterator::make(self, &sql, [])
} }
fn iter_rev(&self, _tree: usize) -> TxOpResult<TxValueIter<'_>> { fn iter_rev(&self, tree: usize) -> TxOpResult<TxValueIter<'_>> {
unimplemented!(); let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
TxValueIterator::make(self, &sql, [])
} }
fn range<'r>( fn range<'r>(
&self, &self,
_tree: usize, tree: usize,
_low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> { ) -> TxOpResult<TxValueIter<'_>> {
unimplemented!(); let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
let params = params
.iter()
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
TxValueIterator::make::<&[&dyn rusqlite::ToSql]>(self, &sql, params.as_ref())
} }
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
_tree: usize, tree: usize,
_low: Bound<&'r [u8]>, low: Bound<&'r [u8]>,
_high: Bound<&'r [u8]>, high: Bound<&'r [u8]>,
) -> TxOpResult<TxValueIter<'_>> { ) -> TxOpResult<TxValueIter<'_>> {
unimplemented!(); let tree = self.get_tree(tree)?;
let (bounds_sql, params) = bounds_sql(low, high);
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
let params = params
.iter()
.map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>();
TxValueIterator::make::<&[&dyn rusqlite::ToSql]>(self, &sql, params.as_ref())
} }
} }
// ---- // ---- iterators outside transactions ----
// complicated, they must hold the Statement and Row objects
// therefore quite some unsafe code (it is a self-referential struct)
struct DbValueIterator<'a> { struct DbValueIterator<'a> {
db: MutexGuard<'a, SqliteDbInner>, db: MutexGuard<'a, SqliteDbInner>,
@ -471,7 +497,78 @@ impl<'a> Iterator for DbValueIteratorPin<'a> {
} }
} }
// ---- // ---- iterators within transactions ----
// it's the same except we don't hold a mutex guard,
// only a Statement and a Rows object
struct TxValueIterator<'a> {
stmt: Statement<'a>,
iter: Option<Rows<'a>>,
_pin: PhantomPinned,
}
impl<'a> TxValueIterator<'a> {
fn make<P: rusqlite::Params>(
tx: &'a SqliteTx<'a>,
sql: &str,
args: P,
) -> TxOpResult<TxValueIter<'a>> {
let stmt = tx.tx.prepare(sql)?;
let res = TxValueIterator {
stmt,
iter: None,
_pin: PhantomPinned,
};
let mut boxed = Box::pin(res);
trace!("make iterator with sql: {}", sql);
unsafe {
let mut stmt = NonNull::from(&boxed.stmt);
let iter = stmt.as_mut().query(args)?;
let mut_ref: Pin<&mut TxValueIterator<'a>> = Pin::as_mut(&mut boxed);
Pin::get_unchecked_mut(mut_ref).iter = Some(iter);
}
Ok(Box::new(TxValueIteratorPin(boxed)))
}
}
impl<'a> Drop for TxValueIterator<'a> {
fn drop(&mut self) {
trace!("drop iter");
drop(self.iter.take());
}
}
struct TxValueIteratorPin<'a>(Pin<Box<TxValueIterator<'a>>>);
impl<'a> Iterator for TxValueIteratorPin<'a> {
type Item = TxOpResult<(Value, Value)>;
fn next(&mut self) -> Option<Self::Item> {
let next = unsafe {
let mut_ref: Pin<&mut TxValueIterator<'a>> = Pin::as_mut(&mut self.0);
Pin::get_unchecked_mut(mut_ref).iter.as_mut()?.next()
};
let row = match next {
Err(e) => return Some(Err(e.into())),
Ok(None) => return None,
Ok(Some(r)) => r,
};
let k = match row.get::<_, Vec<u8>>(0) {
Err(e) => return Some(Err(e.into())),
Ok(x) => x,
};
let v = match row.get::<_, Vec<u8>>(1) {
Err(e) => return Some(Err(e.into())),
Ok(y) => y,
};
Some(Ok((k, v)))
}
}
// ---- utility ----
fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<Vec<u8>>) { fn bounds_sql<'r>(low: Bound<&'r [u8]>, high: Bound<&'r [u8]>) -> (String, Vec<Vec<u8>>) {
let mut sql = String::new(); let mut sql = String::new();

View file

@ -10,8 +10,13 @@ fn test_suite(db: Db) {
let vb: &[u8] = &b"plip"[..]; let vb: &[u8] = &b"plip"[..];
let vc: &[u8] = &b"plup"[..]; let vc: &[u8] = &b"plup"[..];
// ---- test simple insert/delete ----
assert!(tree.insert(ka, va).unwrap().is_none()); assert!(tree.insert(ka, va).unwrap().is_none());
assert_eq!(tree.get(ka).unwrap().unwrap(), va); assert_eq!(tree.get(ka).unwrap().unwrap(), va);
assert_eq!(tree.len().unwrap(), 1);
// ---- test transaction logic ----
let res = db.transaction::<_, (), _>(|tx| { let res = db.transaction::<_, (), _>(|tx| {
assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va); assert_eq!(tx.get(&tree, ka).unwrap().unwrap(), va);
@ -37,6 +42,8 @@ fn test_suite(db: Db) {
assert!(matches!(res, Err(TxError::Abort(42)))); assert!(matches!(res, Err(TxError::Abort(42))));
assert_eq!(tree.get(ka).unwrap().unwrap(), vb); assert_eq!(tree.get(ka).unwrap().unwrap(), vb);
// ---- test iteration outside of transactions ----
let mut iter = tree.iter().unwrap(); let mut iter = tree.iter().unwrap();
let next = iter.next().unwrap().unwrap(); let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
@ -73,6 +80,48 @@ fn test_suite(db: Db) {
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb)); assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
assert!(iter.next().is_none()); assert!(iter.next().is_none());
drop(iter); drop(iter);
// ---- test iteration within transactions ----
db.transaction::<_, (), _>(|tx| {
let mut iter = tx.iter(&tree).unwrap();
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
assert!(iter.next().is_none());
Ok(())
})
.unwrap();
db.transaction::<_, (), _>(|tx| {
let mut iter = tx.range(&tree, kint..).unwrap();
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
assert!(iter.next().is_none());
Ok(())
})
.unwrap();
db.transaction::<_, (), _>(|tx| {
let mut iter = tx.range_rev(&tree, ..kint).unwrap();
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
assert!(iter.next().is_none());
Ok(())
})
.unwrap();
db.transaction::<_, (), _>(|tx| {
let mut iter = tx.iter_rev(&tree).unwrap();
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (kb, vc));
let next = iter.next().unwrap().unwrap();
assert_eq!((next.0.as_ref(), next.1.as_ref()), (ka, vb));
assert!(iter.next().is_none());
Ok(())
})
.unwrap();
} }
#[test] #[test]