Abstract database behind generic interface and implement alternative drivers #322

Merged
lx merged 64 commits from db-abstraction into main 2022-06-08 08:01:56 +00:00
8 changed files with 119 additions and 34 deletions
Showing only changes of commit bd9ff432d7 - Show all commits

1
Cargo.lock generated
View file

@ -1017,6 +1017,7 @@ dependencies = [
"clap 3.1.18", "clap 3.1.18",
"err-derive 0.3.1", "err-derive 0.3.1",
"hexdump", "hexdump",
"log",
"mktemp", "mktemp",
"rusqlite", "rusqlite",
"sled", "sled",

View file

@ -19,6 +19,7 @@ required-features = ["cli"]
[dependencies] [dependencies]
err-derive = "0.3" err-derive = "0.3"
hexdump = "0.1" hexdump = "0.1"
log = "0.4"
sled = "0.34" sled = "0.34"
rusqlite = "0.27" rusqlite = "0.27"

View file

@ -2,7 +2,7 @@ use std::path::PathBuf;
use garage_db::*; use garage_db::*;
use clap::{Parser}; use clap::Parser;
/// K2V command line interface /// K2V command line interface
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
@ -41,12 +41,10 @@ fn do_conversion(args: Args) -> Result<()> {
fn open_db(path: PathBuf, engine: String) -> Result<Db> { fn open_db(path: PathBuf, engine: String) -> Result<Db> {
match engine.as_str() { match engine.as_str() {
"sled" => { "sled" => {
let db = sled_adapter::sled::Config::default() let db = sled_adapter::sled::Config::default().path(&path).open()?;
.path(&path)
.open()?;
Ok(sled_adapter::SledDb::init(db)) Ok(sled_adapter::SledDb::init(db))
} }
"sqlite" | "rusqlite" => { "sqlite" | "sqlite3" | "rusqlite" => {
let db = sqlite_adapter::rusqlite::Connection::open(&path)?; let db = sqlite_adapter::rusqlite::Connection::open(&path)?;
Ok(sqlite_adapter::SqliteDb::init(db)) Ok(sqlite_adapter::SqliteDb::init(db))
} }

View file

@ -180,7 +180,13 @@ impl Db {
pub fn import(&self, other: &Db) -> Result<()> { pub fn import(&self, other: &Db) -> Result<()> {
let existing_trees = self.list_trees()?; let existing_trees = self.list_trees()?;
if !existing_trees.is_empty() { if !existing_trees.is_empty() {
return Err(Error(format!("destination database already contains data: {:?}", existing_trees).into())); return Err(Error(
format!(
"destination database already contains data: {:?}",
existing_trees
)
.into(),
));
} }
let tree_names = other.list_trees()?; let tree_names = other.list_trees()?;

View file

@ -5,6 +5,8 @@ use std::pin::Pin;
use std::ptr::NonNull; use std::ptr::NonNull;
use std::sync::{Arc, Mutex, MutexGuard, RwLock}; use std::sync::{Arc, Mutex, MutexGuard, RwLock};
use log::trace;
use rusqlite::{params, Connection, Rows, Statement, Transaction}; use rusqlite::{params, Connection, Rows, Statement, Transaction};
use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter};
@ -53,13 +55,17 @@ impl SqliteDb {
impl IDb for SqliteDb { impl IDb for SqliteDb {
fn open_tree(&self, name: &str) -> Result<usize> { fn open_tree(&self, name: &str) -> Result<usize> {
let name = format!("tree_{}", name.replace(":", "_COLON_")); let name = format!("tree_{}", name.replace(':', "_COLON_"));
let mut trees = self.trees.write().unwrap(); let mut trees = self.trees.write().unwrap();
if let Some(i) = trees.iter().position(|x| x == &name) { if let Some(i) = trees.iter().position(|x| x == &name) {
Ok(i) Ok(i)
} else { } else {
self.db.lock().unwrap().execute( trace!("open tree {}: lock db", name);
let db = self.db.lock().unwrap();
trace!("create table {}", name);
db.execute(
&format!( &format!(
"CREATE TABLE IF NOT EXISTS {} ( "CREATE TABLE IF NOT EXISTS {} (
k BLOB PRIMARY KEY, k BLOB PRIMARY KEY,
@ -69,6 +75,8 @@ impl IDb for SqliteDb {
), ),
[], [],
)?; )?;
trace!("table created: {}", name);
let i = trees.len(); let i = trees.len();
trees.push(name.to_string()); trees.push(name.to_string());
Ok(i) Ok(i)
@ -77,7 +85,11 @@ impl IDb for SqliteDb {
fn list_trees(&self) -> Result<Vec<String>> { fn list_trees(&self) -> Result<Vec<String>> {
let mut trees = vec![]; let mut trees = vec![];
trace!("list_trees: lock db");
let db = self.db.lock().unwrap(); let db = self.db.lock().unwrap();
trace!("list_trees: lock acquired");
let mut stmt = db.prepare( let mut stmt = db.prepare(
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
)?; )?;
@ -94,7 +106,11 @@ impl IDb for SqliteDb {
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> { fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
trace!("get: lock db");
let db = self.db.lock().unwrap(); let db = self.db.lock().unwrap();
trace!("get: lock acquired");
let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
let mut res_iter = stmt.query([key])?; let mut res_iter = stmt.query([key])?;
match res_iter.next()? { match res_iter.next()? {
@ -105,14 +121,22 @@ impl IDb for SqliteDb {
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> { fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
trace!("remove: lock db");
let db = self.db.lock().unwrap(); let db = self.db.lock().unwrap();
trace!("remove: lock acquired");
let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
Ok(res > 0) Ok(res > 0)
} }
fn len(&self, tree: usize) -> Result<usize> { fn len(&self, tree: usize) -> Result<usize> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
trace!("len: lock db");
let db = self.db.lock().unwrap(); let db = self.db.lock().unwrap();
trace!("len: lock acquired");
let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
let mut res_iter = stmt.query([])?; let mut res_iter = stmt.query([])?;
match res_iter.next()? { match res_iter.next()? {
@ -123,7 +147,11 @@ impl IDb for SqliteDb {
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
trace!("insert: lock db");
let db = self.db.lock().unwrap(); let db = self.db.lock().unwrap();
trace!("insert: lock acquired");
db.execute( db.execute(
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
params![key, value], params![key, value],
@ -134,13 +162,23 @@ impl IDb for SqliteDb {
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> { fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
DbValueIterator::make(self.db.lock().unwrap(), &sql, [])
trace!("iter {}: lock db", tree);
let db = self.db.lock().unwrap();
trace!("iter {}: lock acquired", tree);
DbValueIterator::make(db, &sql, [])
} }
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> { fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
let tree = self.get_tree(tree)?; let tree = self.get_tree(tree)?;
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
DbValueIterator::make(self.db.lock().unwrap(), &sql, [])
trace!("iter_rev {}: lock db", tree);
let db = self.db.lock().unwrap();
trace!("iter_rev {}: lock acquired", tree);
DbValueIterator::make(db, &sql, [])
} }
fn range<'r>( fn range<'r>(
@ -158,11 +196,12 @@ impl IDb for SqliteDb {
.iter() .iter()
.map(|x| x as &dyn rusqlite::ToSql) .map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(
self.db.lock().unwrap(), trace!("range {}: lock db", tree);
&sql, let db = self.db.lock().unwrap();
params.as_ref(), trace!("range {}: lock acquired", tree);
)
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
} }
fn range_rev<'r>( fn range_rev<'r>(
&self, &self,
@ -179,23 +218,28 @@ impl IDb for SqliteDb {
.iter() .iter()
.map(|x| x as &dyn rusqlite::ToSql) .map(|x| x as &dyn rusqlite::ToSql)
.collect::<Vec<_>>(); .collect::<Vec<_>>();
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(
self.db.lock().unwrap(), trace!("range_rev {}: lock db", tree);
&sql, let db = self.db.lock().unwrap();
params.as_ref(), trace!("range_rev {}: lock acquired", tree);
)
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
} }
// ---- // ----
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
let trees = self.trees.read().unwrap(); let trees = self.trees.read().unwrap();
trace!("transaction: lock db");
let mut db = self.db.lock().unwrap(); let mut db = self.db.lock().unwrap();
trace!("transaction: lock acquired");
let tx = SqliteTx { let tx = SqliteTx {
tx: db.transaction()?, tx: db.transaction()?,
trees: trees.as_ref(), trees: trees.as_ref(),
}; };
match f.try_on(&tx) { let res = match f.try_on(&tx) {
TxFnResult::Ok => { TxFnResult::Ok => {
tx.tx.commit()?; tx.tx.commit()?;
Ok(()) Ok(())
@ -210,7 +254,10 @@ impl IDb for SqliteDb {
"(this message will be discarded)".into(), "(this message will be discarded)".into(),
))) )))
} }
} };
trace!("transaction done");
res
} }
} }
@ -337,6 +384,7 @@ impl<'a> DbValueIterator<'a> {
impl<'a> Drop for DbValueIterator<'a> { impl<'a> Drop for DbValueIterator<'a> {
fn drop(&mut self) { fn drop(&mut self) {
trace!("drop iter");
drop(self.iter.take()); drop(self.iter.take());
drop(self.stmt.take()); drop(self.stmt.take());
} }

View file

@ -32,15 +32,32 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let config = read_config(config_file).expect("Unable to read config file"); let config = read_config(config_file).expect("Unable to read config file");
info!("Opening database..."); info!("Opening database...");
let mut db_path = config.metadata_dir.clone(); let db = match config.db_engine.as_str() {
db_path.push("db"); "sled" => {
let db = db::sled_adapter::sled::Config::default() let mut db_path = config.metadata_dir.clone();
.path(&db_path) db_path.push("db");
.cache_capacity(config.sled_cache_capacity) let db = db::sled_adapter::sled::Config::default()
.flush_every_ms(Some(config.sled_flush_every_ms)) .path(&db_path)
.open() .cache_capacity(config.sled_cache_capacity)
.expect("Unable to open sled DB"); .flush_every_ms(Some(config.sled_flush_every_ms))
let db = db::sled_adapter::SledDb::init(db); .open()
.expect("Unable to open sled DB");
db::sled_adapter::SledDb::init(db)
}
"sqlite" => {
let mut db_path = config.metadata_dir.clone();
db_path.push("db.sqlite");
let db = db::sqlite_adapter::rusqlite::Connection::open(db_path)
.expect("Unable to open sqlite DB");
db::sqlite_adapter::SqliteDb::init(db)
}
e => {
return Err(Error::Message(format!(
"Unsupported DB engine: {} (options: sled, sqlite)",
e
)));
}
};
info!("Initializing background runner..."); info!("Initializing background runner...");
let watch_cancel = netapp::util::watch_ctrl_c(); let watch_cancel = netapp::util::watch_ctrl_c();

View file

@ -110,9 +110,14 @@ where
} }
fn updater_loop_iter(&self) -> Result<bool, Error> { fn updater_loop_iter(&self) -> Result<bool, Error> {
if let Some(x) = self.data.merkle_todo.iter()?.next() { // TODO undo this iter hack
let mut iter = self.data.merkle_todo.iter()?;
if let Some(x) = iter.next() {
let (key, valhash) = x?; let (key, valhash) = x?;
self.update_item(&key[..], &valhash[..])?; let key = key.to_vec();
let valhash = valhash.to_vec();
drop(iter);
self.update_item(&key, &valhash)?;
Ok(true) Ok(true)
} else { } else {
Ok(false) Ok(false)

View file

@ -64,14 +64,19 @@ pub struct Config {
#[serde(default)] #[serde(default)]
pub kubernetes_skip_crd: bool, pub kubernetes_skip_crd: bool,
// -- DB
/// Database engine to use for metadata (options: sled, sqlite)
#[serde(default = "default_db_engine")]
pub db_engine: String,
/// Sled cache size, in bytes /// Sled cache size, in bytes
#[serde(default = "default_sled_cache_capacity")] #[serde(default = "default_sled_cache_capacity")]
pub sled_cache_capacity: u64, pub sled_cache_capacity: u64,
/// Sled flush interval in milliseconds /// Sled flush interval in milliseconds
#[serde(default = "default_sled_flush_every_ms")] #[serde(default = "default_sled_flush_every_ms")]
pub sled_flush_every_ms: u64, pub sled_flush_every_ms: u64,
// -- APIs
/// Configuration for S3 api /// Configuration for S3 api
pub s3_api: S3ApiConfig, pub s3_api: S3ApiConfig,
@ -129,6 +134,10 @@ pub struct AdminConfig {
pub trace_sink: Option<String>, pub trace_sink: Option<String>,
} }
fn default_db_engine() -> String {
"sled".into()
}
fn default_sled_cache_capacity() -> u64 { fn default_sled_cache_capacity() -> u64 {
128 * 1024 * 1024 128 * 1024 * 1024
} }