diff --git a/Cargo.lock b/Cargo.lock index 73879369..c43aa81f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1017,6 +1017,7 @@ dependencies = [ "clap 3.1.18", "err-derive 0.3.1", "hexdump", + "log", "mktemp", "rusqlite", "sled", diff --git a/src/db/Cargo.toml b/src/db/Cargo.toml index 22abc0b9..36b96229 100644 --- a/src/db/Cargo.toml +++ b/src/db/Cargo.toml @@ -19,6 +19,7 @@ required-features = ["cli"] [dependencies] err-derive = "0.3" hexdump = "0.1" +log = "0.4" sled = "0.34" rusqlite = "0.27" diff --git a/src/db/bin/convert.rs b/src/db/bin/convert.rs index 8c4f0ddc..7525bcc9 100644 --- a/src/db/bin/convert.rs +++ b/src/db/bin/convert.rs @@ -2,7 +2,7 @@ use std::path::PathBuf; use garage_db::*; -use clap::{Parser}; +use clap::Parser; /// K2V command line interface #[derive(Parser, Debug)] @@ -41,12 +41,10 @@ fn do_conversion(args: Args) -> Result<()> { fn open_db(path: PathBuf, engine: String) -> Result { match engine.as_str() { "sled" => { - let db = sled_adapter::sled::Config::default() - .path(&path) - .open()?; + let db = sled_adapter::sled::Config::default().path(&path).open()?; Ok(sled_adapter::SledDb::init(db)) } - "sqlite" | "rusqlite" => { + "sqlite" | "sqlite3" | "rusqlite" => { let db = sqlite_adapter::rusqlite::Connection::open(&path)?; Ok(sqlite_adapter::SqliteDb::init(db)) } diff --git a/src/db/lib.rs b/src/db/lib.rs index 49ec0765..1b31df43 100644 --- a/src/db/lib.rs +++ b/src/db/lib.rs @@ -180,7 +180,13 @@ impl Db { pub fn import(&self, other: &Db) -> Result<()> { let existing_trees = self.list_trees()?; if !existing_trees.is_empty() { - return Err(Error(format!("destination database already contains data: {:?}", existing_trees).into())); + return Err(Error( + format!( + "destination database already contains data: {:?}", + existing_trees + ) + .into(), + )); } let tree_names = other.list_trees()?; diff --git a/src/db/sqlite_adapter.rs b/src/db/sqlite_adapter.rs index 386eb951..49c07562 100644 --- a/src/db/sqlite_adapter.rs +++ b/src/db/sqlite_adapter.rs @@ -5,6 +5,8 @@ use std::pin::Pin; use std::ptr::NonNull; use std::sync::{Arc, Mutex, MutexGuard, RwLock}; +use log::trace; + use rusqlite::{params, Connection, Rows, Statement, Transaction}; use crate::{Db, Error, IDb, ITx, ITxFn, Result, TxError, TxFnResult, TxResult, Value, ValueIter}; @@ -53,13 +55,17 @@ impl SqliteDb { impl IDb for SqliteDb { fn open_tree(&self, name: &str) -> Result { - let name = format!("tree_{}", name.replace(":", "_COLON_")); + let name = format!("tree_{}", name.replace(':', "_COLON_")); let mut trees = self.trees.write().unwrap(); if let Some(i) = trees.iter().position(|x| x == &name) { Ok(i) } else { - self.db.lock().unwrap().execute( + trace!("open tree {}: lock db", name); + let db = self.db.lock().unwrap(); + trace!("create table {}", name); + + db.execute( &format!( "CREATE TABLE IF NOT EXISTS {} ( k BLOB PRIMARY KEY, @@ -69,6 +75,8 @@ impl IDb for SqliteDb { ), [], )?; + trace!("table created: {}", name); + let i = trees.len(); trees.push(name.to_string()); Ok(i) @@ -77,7 +85,11 @@ impl IDb for SqliteDb { fn list_trees(&self) -> Result> { let mut trees = vec![]; + + trace!("list_trees: lock db"); let db = self.db.lock().unwrap(); + trace!("list_trees: lock acquired"); + let mut stmt = db.prepare( "SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'", )?; @@ -94,7 +106,11 @@ impl IDb for SqliteDb { fn get(&self, tree: usize, key: &[u8]) -> Result>> { let tree = self.get_tree(tree)?; + + trace!("get: lock db"); let db = self.db.lock().unwrap(); + trace!("get: lock acquired"); + let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?; let mut res_iter = stmt.query([key])?; match res_iter.next()? { @@ -105,14 +121,22 @@ impl IDb for SqliteDb { fn remove(&self, tree: usize, key: &[u8]) -> Result { let tree = self.get_tree(tree)?; + + trace!("remove: lock db"); let db = self.db.lock().unwrap(); + trace!("remove: lock acquired"); + let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?; Ok(res > 0) } fn len(&self, tree: usize) -> Result { let tree = self.get_tree(tree)?; + + trace!("len: lock db"); let db = self.db.lock().unwrap(); + trace!("len: lock acquired"); + let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?; let mut res_iter = stmt.query([])?; match res_iter.next()? { @@ -123,7 +147,11 @@ impl IDb for SqliteDb { fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> { let tree = self.get_tree(tree)?; + + trace!("insert: lock db"); let db = self.db.lock().unwrap(); + trace!("insert: lock acquired"); + db.execute( &format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree), params![key, value], @@ -134,13 +162,23 @@ impl IDb for SqliteDb { fn iter(&self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree); - DbValueIterator::make(self.db.lock().unwrap(), &sql, []) + + trace!("iter {}: lock db", tree); + let db = self.db.lock().unwrap(); + trace!("iter {}: lock acquired", tree); + + DbValueIterator::make(db, &sql, []) } fn iter_rev(&self, tree: usize) -> Result> { let tree = self.get_tree(tree)?; let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree); - DbValueIterator::make(self.db.lock().unwrap(), &sql, []) + + trace!("iter_rev {}: lock db", tree); + let db = self.db.lock().unwrap(); + trace!("iter_rev {}: lock acquired", tree); + + DbValueIterator::make(db, &sql, []) } fn range<'r>( @@ -158,11 +196,12 @@ impl IDb for SqliteDb { .iter() .map(|x| x as &dyn rusqlite::ToSql) .collect::>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>( - self.db.lock().unwrap(), - &sql, - params.as_ref(), - ) + + trace!("range {}: lock db", tree); + let db = self.db.lock().unwrap(); + trace!("range {}: lock acquired", tree); + + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref()) } fn range_rev<'r>( &self, @@ -179,23 +218,28 @@ impl IDb for SqliteDb { .iter() .map(|x| x as &dyn rusqlite::ToSql) .collect::>(); - DbValueIterator::make::<&[&dyn rusqlite::ToSql]>( - self.db.lock().unwrap(), - &sql, - params.as_ref(), - ) + + trace!("range_rev {}: lock db", tree); + let db = self.db.lock().unwrap(); + trace!("range_rev {}: lock acquired", tree); + + DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref()) } // ---- fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> { let trees = self.trees.read().unwrap(); + + trace!("transaction: lock db"); let mut db = self.db.lock().unwrap(); + trace!("transaction: lock acquired"); + let tx = SqliteTx { tx: db.transaction()?, trees: trees.as_ref(), }; - match f.try_on(&tx) { + let res = match f.try_on(&tx) { TxFnResult::Ok => { tx.tx.commit()?; Ok(()) @@ -210,7 +254,10 @@ impl IDb for SqliteDb { "(this message will be discarded)".into(), ))) } - } + }; + + trace!("transaction done"); + res } } @@ -337,6 +384,7 @@ impl<'a> DbValueIterator<'a> { impl<'a> Drop for DbValueIterator<'a> { fn drop(&mut self) { + trace!("drop iter"); drop(self.iter.take()); drop(self.stmt.take()); } diff --git a/src/garage/server.rs b/src/garage/server.rs index b102067e..bd34456d 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -32,15 +32,32 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let config = read_config(config_file).expect("Unable to read config file"); info!("Opening database..."); - let mut db_path = config.metadata_dir.clone(); - db_path.push("db"); - let db = db::sled_adapter::sled::Config::default() - .path(&db_path) - .cache_capacity(config.sled_cache_capacity) - .flush_every_ms(Some(config.sled_flush_every_ms)) - .open() - .expect("Unable to open sled DB"); - let db = db::sled_adapter::SledDb::init(db); + let db = match config.db_engine.as_str() { + "sled" => { + let mut db_path = config.metadata_dir.clone(); + db_path.push("db"); + let db = db::sled_adapter::sled::Config::default() + .path(&db_path) + .cache_capacity(config.sled_cache_capacity) + .flush_every_ms(Some(config.sled_flush_every_ms)) + .open() + .expect("Unable to open sled DB"); + db::sled_adapter::SledDb::init(db) + } + "sqlite" => { + let mut db_path = config.metadata_dir.clone(); + db_path.push("db.sqlite"); + let db = db::sqlite_adapter::rusqlite::Connection::open(db_path) + .expect("Unable to open sqlite DB"); + db::sqlite_adapter::SqliteDb::init(db) + } + e => { + return Err(Error::Message(format!( + "Unsupported DB engine: {} (options: sled, sqlite)", + e + ))); + } + }; info!("Initializing background runner..."); let watch_cancel = netapp::util::watch_ctrl_c(); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index f7dca97b..48d2c5dd 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -110,9 +110,14 @@ where } fn updater_loop_iter(&self) -> Result { - if let Some(x) = self.data.merkle_todo.iter()?.next() { + // TODO undo this iter hack + let mut iter = self.data.merkle_todo.iter()?; + if let Some(x) = iter.next() { let (key, valhash) = x?; - self.update_item(&key[..], &valhash[..])?; + let key = key.to_vec(); + let valhash = valhash.to_vec(); + drop(iter); + self.update_item(&key, &valhash)?; Ok(true) } else { Ok(false) diff --git a/src/util/config.rs b/src/util/config.rs index 99ebce31..3b37adbb 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -64,14 +64,19 @@ pub struct Config { #[serde(default)] pub kubernetes_skip_crd: bool, + // -- DB + /// Database engine to use for metadata (options: sled, sqlite) + #[serde(default = "default_db_engine")] + pub db_engine: String, + /// Sled cache size, in bytes #[serde(default = "default_sled_cache_capacity")] pub sled_cache_capacity: u64, - /// Sled flush interval in milliseconds #[serde(default = "default_sled_flush_every_ms")] pub sled_flush_every_ms: u64, + // -- APIs /// Configuration for S3 api pub s3_api: S3ApiConfig, @@ -129,6 +134,10 @@ pub struct AdminConfig { pub trace_sink: Option, } +fn default_db_engine() -> String { + "sled".into() +} + fn default_sled_cache_capacity() -> u64 { 128 * 1024 * 1024 }