Fix more sqlite deadlocks
This commit is contained in:
parent
4f5d17d464
commit
4539a6c229
2 changed files with 79 additions and 74 deletions
|
@ -1,9 +1,10 @@
|
||||||
use core::ops::Bound;
|
use core::ops::Bound;
|
||||||
|
|
||||||
|
use std::borrow::BorrowMut;
|
||||||
use std::marker::PhantomPinned;
|
use std::marker::PhantomPinned;
|
||||||
use std::pin::Pin;
|
use std::pin::Pin;
|
||||||
use std::ptr::NonNull;
|
use std::ptr::NonNull;
|
||||||
use std::sync::{Arc, Mutex, MutexGuard, RwLock};
|
use std::sync::{Arc, Mutex, MutexGuard};
|
||||||
|
|
||||||
use log::trace;
|
use log::trace;
|
||||||
|
|
||||||
|
@ -29,24 +30,26 @@ impl<T> From<rusqlite::Error> for TxError<T> {
|
||||||
|
|
||||||
// -- db
|
// -- db
|
||||||
|
|
||||||
pub struct SqliteDb {
|
pub struct SqliteDb(Mutex<SqliteDbInner>);
|
||||||
db: Mutex<Connection>,
|
|
||||||
trees: RwLock<Vec<String>>,
|
struct SqliteDbInner {
|
||||||
|
db: Connection,
|
||||||
|
trees: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl SqliteDb {
|
impl SqliteDb {
|
||||||
pub fn init(db: rusqlite::Connection) -> Db {
|
pub fn init(db: rusqlite::Connection) -> Db {
|
||||||
let s = Self {
|
let s = Self(Mutex::new(SqliteDbInner {
|
||||||
db: Mutex::new(db),
|
db,
|
||||||
trees: RwLock::new(Vec::new()),
|
trees: Vec::new(),
|
||||||
};
|
}));
|
||||||
Db(Arc::new(s))
|
Db(Arc::new(s))
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SqliteDbInner {
|
||||||
fn get_tree(&self, i: usize) -> Result<String> {
|
fn get_tree(&self, i: usize) -> Result<String> {
|
||||||
self.trees
|
self.trees
|
||||||
.read()
|
|
||||||
.unwrap()
|
|
||||||
.get(i)
|
.get(i)
|
||||||
.cloned()
|
.cloned()
|
||||||
.ok_or_else(|| Error("invalid tree id".into()))
|
.ok_or_else(|| Error("invalid tree id".into()))
|
||||||
|
@ -56,16 +59,13 @@ impl SqliteDb {
|
||||||
impl IDb for SqliteDb {
|
impl IDb for SqliteDb {
|
||||||
fn open_tree(&self, name: &str) -> Result<usize> {
|
fn open_tree(&self, name: &str) -> Result<usize> {
|
||||||
let name = format!("tree_{}", name.replace(':', "_COLON_"));
|
let name = format!("tree_{}", name.replace(':', "_COLON_"));
|
||||||
|
let mut this = self.0.lock().unwrap();
|
||||||
|
|
||||||
let mut trees = self.trees.write().unwrap();
|
if let Some(i) = this.trees.iter().position(|x| x == &name) {
|
||||||
if let Some(i) = trees.iter().position(|x| x == &name) {
|
|
||||||
Ok(i)
|
Ok(i)
|
||||||
} else {
|
} else {
|
||||||
trace!("open tree {}: lock db", name);
|
|
||||||
let db = self.db.lock().unwrap();
|
|
||||||
trace!("create table {}", name);
|
trace!("create table {}", name);
|
||||||
|
this.db.execute(
|
||||||
db.execute(
|
|
||||||
&format!(
|
&format!(
|
||||||
"CREATE TABLE IF NOT EXISTS {} (
|
"CREATE TABLE IF NOT EXISTS {} (
|
||||||
k BLOB PRIMARY KEY,
|
k BLOB PRIMARY KEY,
|
||||||
|
@ -75,10 +75,10 @@ impl IDb for SqliteDb {
|
||||||
),
|
),
|
||||||
[],
|
[],
|
||||||
)?;
|
)?;
|
||||||
trace!("table created: {}", name);
|
trace!("table created: {}, unlocking", name);
|
||||||
|
|
||||||
let i = trees.len();
|
let i = this.trees.len();
|
||||||
trees.push(name.to_string());
|
this.trees.push(name.to_string());
|
||||||
Ok(i)
|
Ok(i)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,10 +87,10 @@ impl IDb for SqliteDb {
|
||||||
let mut trees = vec![];
|
let mut trees = vec![];
|
||||||
|
|
||||||
trace!("list_trees: lock db");
|
trace!("list_trees: lock db");
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("list_trees: lock acquired");
|
trace!("list_trees: lock acquired");
|
||||||
|
|
||||||
let mut stmt = db.prepare(
|
let mut stmt = this.db.prepare(
|
||||||
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
|
"SELECT name FROM sqlite_schema WHERE type = 'table' AND name LIKE 'tree_%'",
|
||||||
)?;
|
)?;
|
||||||
let mut rows = stmt.query([])?;
|
let mut rows = stmt.query([])?;
|
||||||
|
@ -106,13 +106,15 @@ impl IDb for SqliteDb {
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
|
fn get(&self, tree: usize, key: &[u8]) -> Result<Option<Value<'_>>> {
|
||||||
let tree = self.get_tree(tree)?;
|
|
||||||
|
|
||||||
trace!("get {}: lock db", tree);
|
trace!("get {}: lock db", tree);
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("get {}: lock acquired", tree);
|
trace!("get {}: lock acquired", tree);
|
||||||
|
|
||||||
let mut stmt = db.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
|
let tree = this.get_tree(tree)?;
|
||||||
|
|
||||||
|
let mut stmt = this
|
||||||
|
.db
|
||||||
|
.prepare(&format!("SELECT v FROM {} WHERE k = ?1", tree))?;
|
||||||
let mut res_iter = stmt.query([key])?;
|
let mut res_iter = stmt.query([key])?;
|
||||||
match res_iter.next()? {
|
match res_iter.next()? {
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
|
@ -121,24 +123,24 @@ impl IDb for SqliteDb {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
|
fn remove(&self, tree: usize, key: &[u8]) -> Result<bool> {
|
||||||
let tree = self.get_tree(tree)?;
|
|
||||||
|
|
||||||
trace!("remove {}: lock db", tree);
|
trace!("remove {}: lock db", tree);
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("remove {}: lock acquired", tree);
|
trace!("remove {}: lock acquired", tree);
|
||||||
|
|
||||||
let res = db.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
|
let tree = this.get_tree(tree)?;
|
||||||
|
let res = this
|
||||||
|
.db
|
||||||
|
.execute(&format!("DELETE FROM {} WHERE k = ?1", tree), params![key])?;
|
||||||
Ok(res > 0)
|
Ok(res > 0)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn len(&self, tree: usize) -> Result<usize> {
|
fn len(&self, tree: usize) -> Result<usize> {
|
||||||
let tree = self.get_tree(tree)?;
|
|
||||||
|
|
||||||
trace!("len {}: lock db", tree);
|
trace!("len {}: lock db", tree);
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("len {}: lock acquired", tree);
|
trace!("len {}: lock acquired", tree);
|
||||||
|
|
||||||
let mut stmt = db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
|
let tree = this.get_tree(tree)?;
|
||||||
|
let mut stmt = this.db.prepare(&format!("SELECT COUNT(*) FROM {}", tree))?;
|
||||||
let mut res_iter = stmt.query([])?;
|
let mut res_iter = stmt.query([])?;
|
||||||
match res_iter.next()? {
|
match res_iter.next()? {
|
||||||
None => Ok(0),
|
None => Ok(0),
|
||||||
|
@ -147,13 +149,12 @@ impl IDb for SqliteDb {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
|
fn insert(&self, tree: usize, key: &[u8], value: &[u8]) -> Result<()> {
|
||||||
let tree = self.get_tree(tree)?;
|
|
||||||
|
|
||||||
trace!("insert {}: lock db", tree);
|
trace!("insert {}: lock db", tree);
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("insert {}: lock acquired", tree);
|
trace!("insert {}: lock acquired", tree);
|
||||||
|
|
||||||
db.execute(
|
let tree = this.get_tree(tree)?;
|
||||||
|
this.db.execute(
|
||||||
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
|
&format!("INSERT OR REPLACE INTO {} (k, v) VALUES (?1, ?2)", tree),
|
||||||
params![key, value],
|
params![key, value],
|
||||||
)?;
|
)?;
|
||||||
|
@ -161,25 +162,23 @@ impl IDb for SqliteDb {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
|
fn iter(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
let tree = self.get_tree(tree)?;
|
|
||||||
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
|
|
||||||
|
|
||||||
trace!("iter {}: lock db", tree);
|
trace!("iter {}: lock db", tree);
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("iter {}: lock acquired", tree);
|
trace!("iter {}: lock acquired", tree);
|
||||||
|
|
||||||
DbValueIterator::make(db, &sql, [])
|
let tree = this.get_tree(tree)?;
|
||||||
|
let sql = format!("SELECT k, v FROM {} ORDER BY k ASC", tree);
|
||||||
|
DbValueIterator::make(this, &sql, [])
|
||||||
}
|
}
|
||||||
|
|
||||||
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
|
fn iter_rev(&self, tree: usize) -> Result<ValueIter<'_>> {
|
||||||
let tree = self.get_tree(tree)?;
|
|
||||||
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
|
|
||||||
|
|
||||||
trace!("iter_rev {}: lock db", tree);
|
trace!("iter_rev {}: lock db", tree);
|
||||||
let db = self.db.lock().unwrap();
|
let this = self.0.lock().unwrap();
|
||||||
trace!("iter_rev {}: lock acquired", tree);
|
trace!("iter_rev {}: lock acquired", tree);
|
||||||
|
|
||||||
DbValueIterator::make(db, &sql, [])
|
let tree = this.get_tree(tree)?;
|
||||||
|
let sql = format!("SELECT k, v FROM {} ORDER BY k DESC", tree);
|
||||||
|
DbValueIterator::make(this, &sql, [])
|
||||||
}
|
}
|
||||||
|
|
||||||
fn range<'r>(
|
fn range<'r>(
|
||||||
|
@ -188,7 +187,11 @@ impl IDb for SqliteDb {
|
||||||
low: Bound<&'r [u8]>,
|
low: Bound<&'r [u8]>,
|
||||||
high: Bound<&'r [u8]>,
|
high: Bound<&'r [u8]>,
|
||||||
) -> Result<ValueIter<'_>> {
|
) -> Result<ValueIter<'_>> {
|
||||||
let tree = self.get_tree(tree)?;
|
trace!("range {}: lock db", tree);
|
||||||
|
let this = self.0.lock().unwrap();
|
||||||
|
trace!("range {}: lock acquired", tree);
|
||||||
|
|
||||||
|
let tree = this.get_tree(tree)?;
|
||||||
|
|
||||||
let (bounds_sql, params) = bounds_sql(low, high);
|
let (bounds_sql, params) = bounds_sql(low, high);
|
||||||
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
|
let sql = format!("SELECT k, v FROM {} {} ORDER BY k ASC", tree, bounds_sql);
|
||||||
|
@ -198,11 +201,7 @@ impl IDb for SqliteDb {
|
||||||
.map(|x| x as &dyn rusqlite::ToSql)
|
.map(|x| x as &dyn rusqlite::ToSql)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
trace!("range {}: lock db", tree);
|
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
|
||||||
let db = self.db.lock().unwrap();
|
|
||||||
trace!("range {}: lock acquired", tree);
|
|
||||||
|
|
||||||
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
|
|
||||||
}
|
}
|
||||||
fn range_rev<'r>(
|
fn range_rev<'r>(
|
||||||
&self,
|
&self,
|
||||||
|
@ -210,7 +209,11 @@ impl IDb for SqliteDb {
|
||||||
low: Bound<&'r [u8]>,
|
low: Bound<&'r [u8]>,
|
||||||
high: Bound<&'r [u8]>,
|
high: Bound<&'r [u8]>,
|
||||||
) -> Result<ValueIter<'_>> {
|
) -> Result<ValueIter<'_>> {
|
||||||
let tree = self.get_tree(tree)?;
|
trace!("range_rev {}: lock db", tree);
|
||||||
|
let this = self.0.lock().unwrap();
|
||||||
|
trace!("range_rev {}: lock acquired", tree);
|
||||||
|
|
||||||
|
let tree = this.get_tree(tree)?;
|
||||||
|
|
||||||
let (bounds_sql, params) = bounds_sql(low, high);
|
let (bounds_sql, params) = bounds_sql(low, high);
|
||||||
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
|
let sql = format!("SELECT k, v FROM {} {} ORDER BY k DESC", tree, bounds_sql);
|
||||||
|
@ -220,25 +223,21 @@ impl IDb for SqliteDb {
|
||||||
.map(|x| x as &dyn rusqlite::ToSql)
|
.map(|x| x as &dyn rusqlite::ToSql)
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
|
|
||||||
trace!("range_rev {}: lock db", tree);
|
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(this, &sql, params.as_ref())
|
||||||
let db = self.db.lock().unwrap();
|
|
||||||
trace!("range_rev {}: lock acquired", tree);
|
|
||||||
|
|
||||||
DbValueIterator::make::<&[&dyn rusqlite::ToSql]>(db, &sql, params.as_ref())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
fn transaction(&self, f: &dyn ITxFn) -> TxResult<(), ()> {
|
||||||
let trees = self.trees.read().unwrap();
|
|
||||||
|
|
||||||
trace!("transaction: lock db");
|
trace!("transaction: lock db");
|
||||||
let mut db = self.db.lock().unwrap();
|
let mut this = self.0.lock().unwrap();
|
||||||
trace!("transaction: lock acquired");
|
trace!("transaction: lock acquired");
|
||||||
|
|
||||||
|
let this_mut_ref: &mut SqliteDbInner = this.borrow_mut();
|
||||||
|
|
||||||
let mut tx = SqliteTx {
|
let mut tx = SqliteTx {
|
||||||
tx: db.transaction()?,
|
tx: this_mut_ref.db.transaction()?,
|
||||||
trees: trees.as_ref(),
|
trees: &this_mut_ref.trees,
|
||||||
};
|
};
|
||||||
let res = match f.try_on(&mut tx) {
|
let res = match f.try_on(&mut tx) {
|
||||||
TxFnResult::Ok => {
|
TxFnResult::Ok => {
|
||||||
|
@ -345,7 +344,7 @@ impl<'a> ITx for SqliteTx<'a> {
|
||||||
// ----
|
// ----
|
||||||
|
|
||||||
struct DbValueIterator<'a> {
|
struct DbValueIterator<'a> {
|
||||||
db: MutexGuard<'a, Connection>,
|
db: MutexGuard<'a, SqliteDbInner>,
|
||||||
stmt: Option<Statement<'a>>,
|
stmt: Option<Statement<'a>>,
|
||||||
iter: Option<Rows<'a>>,
|
iter: Option<Rows<'a>>,
|
||||||
_pin: PhantomPinned,
|
_pin: PhantomPinned,
|
||||||
|
@ -353,7 +352,7 @@ struct DbValueIterator<'a> {
|
||||||
|
|
||||||
impl<'a> DbValueIterator<'a> {
|
impl<'a> DbValueIterator<'a> {
|
||||||
fn make<P: rusqlite::Params>(
|
fn make<P: rusqlite::Params>(
|
||||||
db: MutexGuard<'a, Connection>,
|
db: MutexGuard<'a, SqliteDbInner>,
|
||||||
sql: &str,
|
sql: &str,
|
||||||
args: P,
|
args: P,
|
||||||
) -> Result<ValueIter<'a>> {
|
) -> Result<ValueIter<'a>> {
|
||||||
|
@ -368,7 +367,7 @@ impl<'a> DbValueIterator<'a> {
|
||||||
|
|
||||||
unsafe {
|
unsafe {
|
||||||
let db = NonNull::from(&boxed.db);
|
let db = NonNull::from(&boxed.db);
|
||||||
let stmt = db.as_ref().prepare(sql)?;
|
let stmt = db.as_ref().db.prepare(sql)?;
|
||||||
|
|
||||||
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
|
let mut_ref: Pin<&mut DbValueIterator<'a>> = Pin::as_mut(&mut boxed);
|
||||||
Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
|
Pin::get_unchecked_mut(mut_ref).stmt = Some(stmt);
|
||||||
|
|
|
@ -101,18 +101,16 @@ where
|
||||||
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
|
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
|
||||||
let now = now_msec();
|
let now = now_msec();
|
||||||
|
|
||||||
let mut entries = vec![];
|
|
||||||
let mut excluded = vec![];
|
|
||||||
|
|
||||||
// List entries in the GC todo list
|
// List entries in the GC todo list
|
||||||
// These entries are put there when a tombstone is inserted in the table
|
// These entries are put there when a tombstone is inserted in the table
|
||||||
// (see update_entry in data.rs)
|
// (see update_entry in data.rs)
|
||||||
|
let mut candidates = vec![];
|
||||||
for entry_kv in self.data.gc_todo.iter()? {
|
for entry_kv in self.data.gc_todo.iter()? {
|
||||||
let (k, vhash) = entry_kv?;
|
let (k, vhash) = entry_kv?;
|
||||||
let mut todo_entry = GcTodoEntry::parse(&k, &vhash);
|
let todo_entry = GcTodoEntry::parse(&k, &vhash);
|
||||||
|
|
||||||
if todo_entry.deletion_time() > now {
|
if todo_entry.deletion_time() > now {
|
||||||
if entries.is_empty() && excluded.is_empty() {
|
if candidates.is_empty() {
|
||||||
// If the earliest entry in the todo list shouldn't yet be processed,
|
// If the earliest entry in the todo list shouldn't yet be processed,
|
||||||
// return a duration to wait in the loop
|
// return a duration to wait in the loop
|
||||||
return Ok(Some(Duration::from_millis(
|
return Ok(Some(Duration::from_millis(
|
||||||
|
@ -124,15 +122,23 @@ where
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
let vhash = Hash::try_from(&vhash[..]).unwrap();
|
candidates.push(todo_entry);
|
||||||
|
if candidates.len() >= 2 * TABLE_GC_BATCH_SIZE {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut entries = vec![];
|
||||||
|
let mut excluded = vec![];
|
||||||
|
for mut todo_entry in candidates {
|
||||||
// Check if the tombstone is still the current value of the entry.
|
// Check if the tombstone is still the current value of the entry.
|
||||||
// If not, we don't actually want to GC it, and we will remove it
|
// If not, we don't actually want to GC it, and we will remove it
|
||||||
// from the gc_todo table later (below).
|
// from the gc_todo table later (below).
|
||||||
|
let vhash = todo_entry.value_hash;
|
||||||
todo_entry.value = self
|
todo_entry.value = self
|
||||||
.data
|
.data
|
||||||
.store
|
.store
|
||||||
.get(&k[..])?
|
.get(&todo_entry.key[..])?
|
||||||
.filter(|v| blake2sum(&v[..]) == vhash)
|
.filter(|v| blake2sum(&v[..]) == vhash)
|
||||||
.map(|v| v.to_vec());
|
.map(|v| v.to_vec());
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue