diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 0a278bc0..a7a99642 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -15,6 +15,9 @@ path = "lib.rs" err-derive.workspace = true hexdump.workspace = true tracing.workspace = true +opentelemetry.workspace = true +opentelemetry.workspace = true +xxhash-rust.workspace = true heed = { workspace = true, optional = true } rusqlite = { workspace = true, optional = true, features = ["backup"] } diff --git a/src/db/lmdb_adapter.rs b/src/db/lmdb_adapter.rs index de4c3910..e12bd012 100644 --- a/src/db/lmdb_adapter.rs +++ b/src/db/lmdb_adapter.rs @@ -10,6 +10,8 @@ use std::sync::{Arc, RwLock}; use heed::types::ByteSlice; use heed::{BytesDecode, Env, RoTxn, RwTxn, UntypedDatabase as Database}; +use xxhash_rust::xxh3::xxh3_128; + use crate::{ Db, Error, IDb, ITx, ITxFn, OnCommit, Result, TxError, TxFnResult, TxOpError, TxOpResult, TxResult, TxValueIter, Value, ValueIter, @@ -58,6 +60,40 @@ impl LmdbDb { } } +fn key_hash(key: &[u8]) -> [u8; 16] { + xxh3_128(key).to_ne_bytes() +} + +fn kv_to_value(key: &[u8], value: &[u8]) -> Vec { + [&key.len().to_ne_bytes(), key, value].concat() +} + +fn value_to_kv(value: &[u8]) -> (Vec, Vec) { + const USIZE_LEN: usize = std::mem::size_of::(); + let klen = usize::from_ne_bytes(value[0..USIZE_LEN].try_into().unwrap()); + ( + value[USIZE_LEN..klen+USIZE_LEN].to_vec(), + value[USIZE_LEN+klen..].to_vec() + ) +} + +fn key_hash(key: &[u8]) -> [u8; 16] { + xxh3_128(key).to_ne_bytes() +} + +fn kv_to_value(key: &[u8], value: &[u8]) -> Vec { + [&key.len().to_ne_bytes(), key, value].concat() +} + +fn value_to_kv(value: &[u8]) -> (Vec, Vec) { + const USIZE_LEN: usize = std::mem::size_of::(); + let klen = usize::from_ne_bytes(value[0..USIZE_LEN].try_into().unwrap()); + ( + value[USIZE_LEN..klen+USIZE_LEN].to_vec(), + value[USIZE_LEN+klen..].to_vec() + ) +} + impl IDb for LmdbDb { fn engine(&self) -> String { "LMDB (using Heed crate)".into() @@ -119,10 +155,11 @@ impl IDb for LmdbDb { let tree = self.get_tree(tree)?; let tx = self.db.read_txn()?; - let val = tree.get(&tx, key)?; + let kh = key_hash(key); + let val = tree.get(&tx, &kh)?; match val { None => Ok(None), - Some(v) => Ok(Some(v.to_vec())), + Some(v) => Ok(Some(value_to_kv(v).1)) } } @@ -135,7 +172,9 @@ impl IDb for LmdbDb { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; let mut tx = self.db.write_txn()?; - tree.put(&mut tx, key, value)?; + let kh = key_hash(key); + let value = kv_to_value(key, value); + tree.put(&mut tx, &kh, &value)?; tx.commit()?; Ok(()) } @@ -143,7 +182,8 @@ impl IDb for LmdbDb { fn remove(&self, tree: usize, key: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; let mut tx = self.db.write_txn()?; - tree.delete(&mut tx, key)?; + let kh = key_hash(key); + tree.delete(&mut tx, &kh)?; tx.commit()?; Ok(()) } @@ -242,8 +282,9 @@ impl<'a> LmdbTx<'a> { impl<'a> ITx for LmdbTx<'a> { fn get(&self, tree: usize, key: &[u8]) -> TxOpResult> { let tree = self.get_tree(tree)?; - match tree.get(&self.tx, key)? { - Some(v) => Ok(Some(v.to_vec())), + let kh = key_hash(key); + match tree.get(&self.tx, &kh)? { + Some(v) => Ok(Some(value_to_kv(v).1)), None => Ok(None), } } @@ -254,14 +295,18 @@ impl<'a> ITx for LmdbTx<'a> { fn insert(&mut self, tree: usize, key: &[u8], value: &[u8]) -> TxOpResult<()> { let tree = *self.get_tree(tree)?; - tree.put(&mut self.tx, key, value)?; + let kh = key_hash(key); + let value = kv_to_value(key, value); + tree.put(&mut self.tx, &kh, &value)?; Ok(()) } fn remove(&mut self, tree: usize, key: &[u8]) -> TxOpResult<()> { let tree = *self.get_tree(tree)?; - tree.delete(&mut self.tx, key)?; + let kh = key_hash(key); + tree.delete(&mut self.tx, &kh)?; Ok(()) } + fn clear(&mut self, tree: usize) -> TxOpResult<()> { let tree = *self.get_tree(tree)?; tree.clear(&mut self.tx)?; @@ -370,7 +415,7 @@ where match next { None => None, Some(Err(e)) => Some(Err(e.into())), - Some(Ok((k, v))) => Some(Ok((k.to_vec(), v.to_vec()))), + Some(Ok((_k, v))) => Some(Ok(value_to_kv(v))), } } } @@ -380,7 +425,7 @@ where fn tx_iter_item<'a>( item: std::result::Result<(&'a [u8], &'a [u8]), heed::Error>, ) -> TxOpResult<(Vec, Vec)> { - item.map(|(k, v)| (k.to_vec(), v.to_vec())) + item.map(|(_k, v)| value_to_kv(v)) .map_err(|e| TxOpError(Error::from(e))) }