Checkpoint: add merkle tree in data table

This commit is contained in:
Alex 2021-03-11 13:47:21 +01:00
parent 3214dd52dd
commit 8d63738cb0
11 changed files with 450 additions and 82 deletions

View file

@ -12,7 +12,7 @@ steps:
image: alpine/git image: alpine/git
commands: commands:
- mkdir -p cargo - mkdir -p cargo
- git clone https://git.deuxfleurs.fr/Deuxfleurs/garage.git - git clone $DRONE_GIT_HTTP_URL
- cd garage - cd garage
- git checkout $DRONE_COMMIT - git checkout $DRONE_COMMIT
@ -34,8 +34,8 @@ steps:
- 'garage/target' - 'garage/target'
- 'cargo/registry/index' - 'cargo/registry/index'
- 'cargo/registry/cache' - 'cargo/registry/cache'
- 'cargo/git/db'
- 'cargo/bin' - 'cargo/bin'
- 'cargo/git/db'
path_style: true path_style: true
endpoint: https://garage.deuxfleurs.fr endpoint: https://garage.deuxfleurs.fr

View file

@ -350,8 +350,7 @@ impl AdminRpcHandler {
.background .background
.spawn_worker("Repair worker".into(), move |must_exit| async move { .spawn_worker("Repair worker".into(), move |must_exit| async move {
repair.repair_worker(opt, must_exit).await repair.repair_worker(opt, must_exit).await
}) });
.await;
Ok(AdminRPC::Ok(format!( Ok(AdminRPC::Ok(format!(
"Repair launched on {:?}", "Repair launched on {:?}",
self.garage.system.id self.garage.system.id

View file

@ -49,7 +49,7 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
let (send_cancel, watch_cancel) = watch::channel(false); let (send_cancel, watch_cancel) = watch::channel(false);
let background = BackgroundRunner::new(16, watch_cancel.clone()); let background = BackgroundRunner::new(16, watch_cancel.clone());
let garage = Garage::new(config, db, background.clone(), &mut rpc_server).await; let garage = Garage::new(config, db, background.clone(), &mut rpc_server);
info!("Crate admin RPC handler..."); info!("Crate admin RPC handler...");
AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server);

View file

@ -127,18 +127,16 @@ impl BlockManager {
} }
} }
pub async fn spawn_background_worker(self: Arc<Self>) { pub fn spawn_background_worker(self: Arc<Self>) {
// Launch 2 simultaneous workers for background resync loop preprocessing // Launch 2 simultaneous workers for background resync loop preprocessing
for i in 0..2usize { for i in 0..2u64 {
let bm2 = self.clone(); let bm2 = self.clone();
let background = self.system.background.clone(); let background = self.system.background.clone();
tokio::spawn(async move { tokio::spawn(async move {
tokio::time::delay_for(Duration::from_secs(10)).await; tokio::time::delay_for(Duration::from_secs(10 * (i + 1))).await;
background background.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
bm2.resync_loop(must_exit) bm2.resync_loop(must_exit)
}) });
.await;
}); });
} }
} }

View file

@ -35,7 +35,7 @@ pub struct Garage {
} }
impl Garage { impl Garage {
pub async fn new( pub fn new(
config: Config, config: Config,
db: sled::Db, db: sled::Db,
background: Arc<BackgroundRunner>, background: Arc<BackgroundRunner>,
@ -86,8 +86,7 @@ impl Garage {
&db, &db,
"block_ref".to_string(), "block_ref".to_string(),
rpc_server, rpc_server,
) );
.await;
info!("Initialize version_table..."); info!("Initialize version_table...");
let version_table = Table::new( let version_table = Table::new(
@ -100,8 +99,7 @@ impl Garage {
&db, &db,
"version".to_string(), "version".to_string(),
rpc_server, rpc_server,
) );
.await;
info!("Initialize object_table..."); info!("Initialize object_table...");
let object_table = Table::new( let object_table = Table::new(
@ -114,8 +112,7 @@ impl Garage {
&db, &db,
"object".to_string(), "object".to_string(),
rpc_server, rpc_server,
) );
.await;
info!("Initialize bucket_table..."); info!("Initialize bucket_table...");
let bucket_table = Table::new( let bucket_table = Table::new(
@ -125,8 +122,7 @@ impl Garage {
&db, &db,
"bucket".to_string(), "bucket".to_string(),
rpc_server, rpc_server,
) );
.await;
info!("Initialize key_table_table..."); info!("Initialize key_table_table...");
let key_table = Table::new( let key_table = Table::new(
@ -136,8 +132,7 @@ impl Garage {
&db, &db,
"key".to_string(), "key".to_string(),
rpc_server, rpc_server,
) );
.await;
info!("Initialize Garage..."); info!("Initialize Garage...");
let garage = Arc::new(Self { let garage = Arc::new(Self {
@ -155,7 +150,7 @@ impl Garage {
info!("Start block manager background thread..."); info!("Start block manager background thread...");
garage.block_manager.garage.swap(Some(garage.clone())); garage.block_manager.garage.swap(Some(garage.clone()));
garage.block_manager.clone().spawn_background_worker().await; garage.block_manager.clone().spawn_background_worker();
garage garage
} }

View file

@ -319,8 +319,7 @@ impl System {
.background .background
.spawn_worker(format!("ping loop"), |stop_signal| { .spawn_worker(format!("ping loop"), |stop_signal| {
self2.ping_loop(stop_signal).map(Ok) self2.ping_loop(stop_signal).map(Ok)
}) });
.await;
if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) {
let self2 = self.clone(); let self2 = self.clone();
@ -330,8 +329,7 @@ impl System {
self2 self2
.consul_loop(stop_signal, consul_host, consul_service_name) .consul_loop(stop_signal, consul_host, consul_service_name)
.map(Ok) .map(Ok)
}) });
.await;
} }
} }

View file

@ -7,6 +7,7 @@ pub mod crdt;
pub mod schema; pub mod schema;
pub mod util; pub mod util;
pub mod merkle;
pub mod table; pub mod table;
pub mod table_fullcopy; pub mod table_fullcopy;
pub mod table_sharded; pub mod table_sharded;

352
src/table/merkle.rs Normal file
View file

@ -0,0 +1,352 @@
use std::convert::TryInto;
use std::sync::Arc;
use std::time::Duration;
use futures::select;
use futures_util::future::*;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use sled::transaction::{
ConflictableTransactionError, ConflictableTransactionResult, TransactionalTree,
};
use tokio::sync::{watch, Notify};
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
use garage_util::error::Error;
// This modules partitions the data in 2**16 partitions, based on the top
// 16 bits (two bytes) of item's partition keys' hashes.
// It builds one Merkle tree for each of these 2**16 partitions.
pub(crate) struct MerkleUpdater {
table_name: String,
background: Arc<BackgroundRunner>,
// Content of the todo tree: items where
// - key = the key of an item in the main table, ie hash(partition_key)+sort_key
// - value = the hash of the full serialized item, if present,
// or an empty vec if item is absent (deleted)
pub(crate) todo: sled::Tree,
pub(crate) todo_notify: Notify,
// Content of the merkle tree: items where
// - key = .bytes() for MerkleNodeKey
// - value = serialization of a MerkleNode, assumed to be MerkleNode::empty if not found
pub(crate) merkle_tree: sled::Tree,
empty_node_hash: Hash,
}
#[derive(Clone)]
pub struct MerkleNodeKey {
// partition: first 16 bits (two bytes) of the partition_key's hash
pub partition: [u8; 2],
// prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key)
pub prefix: Vec<u8>,
}
#[derive(PartialEq, Eq, Debug, Serialize, Deserialize)]
pub enum MerkleNode {
// The empty Merkle node
Empty,
// An intermediate Merkle tree node for a prefix
// Contains the hashes of the 256 possible next prefixes
Intermediate(Vec<(u8, Hash)>),
// A final node for an item
// Contains the full key of the item and the hash of the value
Leaf(Vec<u8>, Hash),
}
impl MerkleUpdater {
pub(crate) fn new(
table_name: String,
background: Arc<BackgroundRunner>,
todo: sled::Tree,
merkle_tree: sled::Tree,
) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
Arc::new(Self {
table_name,
background,
todo,
todo_notify: Notify::new(),
merkle_tree,
empty_node_hash,
})
}
pub(crate) fn launch(self: &Arc<Self>) {
let self2 = self.clone();
self.background.spawn_worker(
format!("Merkle tree updater for {}", self.table_name),
|must_exit: watch::Receiver<bool>| self2.updater_loop(must_exit),
);
}
async fn updater_loop(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
) -> Result<(), Error> {
while !*must_exit.borrow() {
if let Some(x) = self.todo.iter().next() {
match x {
Ok((key, valhash)) => {
if let Err(e) = self.update_item(&key[..], &valhash[..]) {
warn!("Error while updating Merkle tree item: {}", e);
}
}
Err(e) => {
warn!("Error while iterating on Merkle todo tree: {}", e);
tokio::time::delay_for(Duration::from_secs(10)).await;
}
}
} else {
select! {
_ = self.todo_notify.notified().fuse() => (),
_ = must_exit.recv().fuse() => (),
}
}
}
Ok(())
}
fn update_item(&self, k: &[u8], vhash_by: &[u8]) -> Result<(), Error> {
let khash = blake2sum(k);
let new_vhash = if vhash_by.len() == 0 {
None
} else {
let vhash_by: [u8; 32] = vhash_by
.try_into()
.map_err(|_| Error::Message(format!("Invalid value in Merkle todo table")))?;
Some(Hash::from(vhash_by))
};
let key = MerkleNodeKey {
partition: k[0..2].try_into().unwrap(),
prefix: vec![],
};
self.merkle_tree
.transaction(|tx| self.update_item_rec(tx, k, khash, &key, new_vhash))?;
let deleted = self
.todo
.compare_and_swap::<_, _, Vec<u8>>(k, Some(vhash_by), None)?
.is_ok();
if !deleted {
info!(
"Item not deleted from Merkle todo because it changed: {:?}",
k
);
}
Ok(())
}
fn update_item_rec(
&self,
tx: &TransactionalTree,
k: &[u8],
khash: Hash,
key: &MerkleNodeKey,
new_vhash: Option<Hash>,
) -> ConflictableTransactionResult<Option<Hash>, Error> {
let i = key.prefix.len();
let mutate = match self.read_node_txn(tx, &key)? {
MerkleNode::Empty => {
if let Some(vhv) = new_vhash {
Some(MerkleNode::Leaf(k.to_vec(), vhv))
} else {
None
}
}
MerkleNode::Intermediate(mut children) => {
let key2 = key.next_key(khash);
if let Some(subhash) = self.update_item_rec(tx, k, khash, &key2, new_vhash)? {
if subhash == self.empty_node_hash {
intermediate_rm_child(&mut children, key2.prefix[i]);
} else {
intermediate_set_child(&mut children, key2.prefix[i], subhash);
}
if children.len() == 0 {
// should not happen
warn!("Replacing intermediate node with empty node, should not happen.");
Some(MerkleNode::Empty)
} else if children.len() == 1 {
// move node down to this level
let key_sub = key.add_byte(children[0].0);
let subnode = self.read_node_txn(tx, &key_sub)?;
tx.remove(key_sub.encode())?;
Some(subnode)
} else {
Some(MerkleNode::Intermediate(children))
}
} else {
None
}
}
MerkleNode::Leaf(exlf_key, exlf_hash) => {
if exlf_key == k {
match new_vhash {
Some(vhv) if vhv == exlf_hash => None,
Some(vhv) => Some(MerkleNode::Leaf(k.to_vec(), vhv)),
None => Some(MerkleNode::Empty),
}
} else {
if let Some(vhv) = new_vhash {
// Create two sub-nodes and replace by intermediary node
let (pos1, h1) = {
let key2 = key.next_key(blake2sum(&exlf_key[..]));
let subhash =
self.put_node_txn(tx, &key2, &MerkleNode::Leaf(exlf_key, exlf_hash))?;
(key2.prefix[i], subhash)
};
let (pos2, h2) = {
let key2 = key.next_key(khash);
let subhash =
self.put_node_txn(tx, &key2, &MerkleNode::Leaf(k.to_vec(), vhv))?;
(key2.prefix[i], subhash)
};
let mut int = vec![];
intermediate_set_child(&mut int, pos1, h1);
intermediate_set_child(&mut int, pos2, h2);
Some(MerkleNode::Intermediate(int))
} else {
None
}
}
}
};
if let Some(new_node) = mutate {
let hash = self.put_node_txn(tx, &key, &new_node)?;
Ok(Some(hash))
} else {
Ok(None)
}
}
// Merkle tree node manipulation
fn read_node_txn(
&self,
tx: &TransactionalTree,
k: &MerkleNodeKey,
) -> ConflictableTransactionResult<MerkleNode, Error> {
let ent = tx.get(k.encode())?;
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])
.map_err(|e| ConflictableTransactionError::Abort(e.into()))?),
}
}
fn put_node_txn(
&self,
tx: &TransactionalTree,
k: &MerkleNodeKey,
v: &MerkleNode,
) -> ConflictableTransactionResult<Hash, Error> {
if *v == MerkleNode::Empty {
tx.remove(k.encode())?;
Ok(self.empty_node_hash)
} else {
let vby = rmp_to_vec_all_named(v)
.map_err(|e| ConflictableTransactionError::Abort(e.into()))?;
let rethash = blake2sum(&vby[..]);
tx.insert(k.encode(), vby)?;
Ok(rethash)
}
}
pub(crate) fn read_node(
&self,
k: &MerkleNodeKey,
) -> Result<MerkleNode, Error> {
let ent = self.merkle_tree.get(k.encode())?;
match ent {
None => Ok(MerkleNode::Empty),
Some(v) => Ok(rmp_serde::decode::from_read_ref::<_, MerkleNode>(&v[..])?)
}
}
}
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());
ret.extend(&self.partition[..]);
ret.extend(&self.prefix[..]);
ret
}
pub fn next_key(&self, h: Hash) -> Self {
assert!(&h.as_slice()[0..self.prefix.len()] == &self.prefix[..]);
let mut s2 = self.clone();
s2.prefix.push(h.as_slice()[self.prefix.len()]);
s2
}
pub fn add_byte(&self, b: u8) -> Self {
let mut s2 = self.clone();
s2.prefix.push(b);
s2
}
}
fn intermediate_set_child(ch: &mut Vec<(u8, Hash)>, pos: u8, v: Hash) {
for i in 0..ch.len() {
if ch[i].0 == pos {
ch[i].1 = v;
return;
} else if ch[i].0 > pos {
ch.insert(i, (pos, v));
return;
}
}
ch.insert(ch.len(), (pos, v));
}
fn intermediate_rm_child(ch: &mut Vec<(u8, Hash)>, pos: u8) {
for i in 0..ch.len() {
if ch[i].0 == pos {
ch.remove(i);
return;
}
}
}
#[test]
fn test_intermediate_aux() {
let mut v = vec![];
intermediate_set_child(&mut v, 12u8, [12u8; 32].into());
assert!(v == vec![(12u8, [12u8; 32].into())]);
intermediate_set_child(&mut v, 42u8, [42u8; 32].into());
assert!(v == vec![(12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
intermediate_set_child(&mut v, 4u8, [4u8; 32].into());
assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [12u8; 32].into()), (42u8, [42u8; 32].into())]);
intermediate_set_child(&mut v, 12u8, [8u8; 32].into());
assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
intermediate_set_child(&mut v, 6u8, [6u8; 32].into());
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into()), (42u8, [42u8; 32].into())]);
intermediate_rm_child(&mut v, 42u8);
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
intermediate_rm_child(&mut v, 11u8);
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [6u8; 32].into()), (12u8, [8u8; 32].into())]);
intermediate_rm_child(&mut v, 6u8);
assert!(v == vec![(4u8, [4u8; 32].into()), (12u8, [8u8; 32].into())]);
intermediate_set_child(&mut v, 6u8, [7u8; 32].into());
assert!(v == vec![(4u8, [4u8; 32].into()), (6u8, [7u8; 32].into()), (12u8, [8u8; 32].into())]);
}

View file

@ -8,6 +8,7 @@ use arc_swap::ArcSwapOption;
use futures::stream::*; use futures::stream::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use sled::Transactional;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
@ -18,6 +19,7 @@ use garage_rpc::rpc_client::*;
use garage_rpc::rpc_server::*; use garage_rpc::rpc_server::*;
use crate::crdt::CRDT; use crate::crdt::CRDT;
use crate::merkle::*;
use crate::schema::*; use crate::schema::*;
use crate::table_sync::*; use crate::table_sync::*;
@ -33,6 +35,7 @@ pub struct Table<F: TableSchema, R: TableReplication> {
pub system: Arc<System>, pub system: Arc<System>,
pub store: sled::Tree, pub store: sled::Tree,
pub syncer: ArcSwapOption<TableSyncer<F, R>>, pub syncer: ArcSwapOption<TableSyncer<F, R>>,
merkle_updater: Arc<MerkleUpdater>,
} }
#[derive(Serialize, Deserialize)] #[derive(Serialize, Deserialize)]
@ -77,7 +80,7 @@ where
{ {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub async fn new( pub fn new(
instance: F, instance: F,
replication: R, replication: R,
system: Arc<System>, system: Arc<System>,
@ -85,11 +88,27 @@ where
name: String, name: String,
rpc_server: &mut RpcServer, rpc_server: &mut RpcServer,
) -> Arc<Self> { ) -> Arc<Self> {
let store = db.open_tree(&name).expect("Unable to open DB tree"); let store = db
.open_tree(&format!("{}:table", name))
.expect("Unable to open DB tree");
let merkle_todo_store = db
.open_tree(&format!("{}:merkle_todo", name))
.expect("Unable to open DB Merkle TODO tree");
let merkle_tree_store = db
.open_tree(&format!("{}:merkle_tree", name))
.expect("Unable to open DB Merkle tree tree");
let rpc_path = format!("table_{}", name); let rpc_path = format!("table_{}", name);
let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path); let rpc_client = system.rpc_client::<TableRPC<F>>(&rpc_path);
let merkle_updater = MerkleUpdater::new(
name.clone(),
system.background.clone(),
merkle_todo_store,
merkle_tree_store,
);
let table = Arc::new(Self { let table = Arc::new(Self {
instance, instance,
replication, replication,
@ -98,12 +117,15 @@ where
system, system,
store, store,
syncer: ArcSwapOption::from(None), syncer: ArcSwapOption::from(None),
merkle_updater,
}); });
table.clone().register_handler(rpc_server, rpc_path); table.clone().register_handler(rpc_server, rpc_path);
let syncer = TableSyncer::launch(table.clone()).await; let syncer = TableSyncer::launch(table.clone());
table.syncer.swap(Some(syncer)); table.syncer.swap(Some(syncer));
table.merkle_updater.launch();
table table
} }
@ -322,7 +344,7 @@ where
Ok(TableRPC::Update(values)) Ok(TableRPC::Update(values))
} }
TableRPC::Update(pairs) => { TableRPC::Update(pairs) => {
self.handle_update(pairs).await?; self.handle_update(pairs)?;
Ok(TableRPC::Ok) Ok(TableRPC::Ok)
} }
TableRPC::SyncRPC(rpc) => { TableRPC::SyncRPC(rpc) => {
@ -380,15 +402,20 @@ where
Ok(ret) Ok(ret)
} }
pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> { // ========== CODE THAT ACTUALLY MODIFIES THE TREE ================
let syncer = self.syncer.load_full().unwrap();
pub fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
for update_bytes in entries.iter() { for update_bytes in entries.iter() {
let update = self.decode_entry(update_bytes.as_slice())?; self.update_entry(update_bytes.as_slice())?;
}
Ok(())
}
pub(crate) fn update_entry(self: &Arc<Self>, update_bytes: &[u8]) -> Result<(), Error> {
let update = self.decode_entry(update_bytes)?;
let tree_key = self.tree_key(update.partition_key(), update.sort_key()); let tree_key = self.tree_key(update.partition_key(), update.sort_key());
let (old_entry, new_entry) = self.store.transaction(|db| { let changed = (&self.store, &self.merkle_updater.todo).transaction(|(db, mkl_todo)| {
let (old_entry, new_entry) = match db.get(&tree_key)? { let (old_entry, new_entry) = match db.get(&tree_key)? {
Some(prev_bytes) => { Some(prev_bytes) => {
let old_entry = self let old_entry = self
@ -401,32 +428,38 @@ where
None => (None, update.clone()), None => (None, update.clone()),
}; };
if Some(&new_entry) != old_entry.as_ref() {
let new_bytes = rmp_to_vec_all_named(&new_entry) let new_bytes = rmp_to_vec_all_named(&new_entry)
.map_err(Error::RMPEncode) .map_err(Error::RMPEncode)
.map_err(sled::transaction::ConflictableTransactionError::Abort)?; .map_err(sled::transaction::ConflictableTransactionError::Abort)?;
mkl_todo.insert(tree_key.clone(), blake2sum(&new_bytes[..]).to_vec())?;
db.insert(tree_key.clone(), new_bytes)?; db.insert(tree_key.clone(), new_bytes)?;
Ok((old_entry, new_entry)) Ok(Some((old_entry, new_entry)))
} else {
Ok(None)
}
})?; })?;
if old_entry.as_ref() != Some(&new_entry) { if let Some((old_entry, new_entry)) = changed {
self.instance.updated(old_entry, Some(new_entry)); self.instance.updated(old_entry, Some(new_entry));
syncer.invalidate(&tree_key[..]); self.syncer.load_full().unwrap().invalidate(&tree_key[..]);
}
} }
Ok(()) Ok(())
} }
pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> { pub(crate) fn delete_if_equal(self: &Arc<Self>, k: &[u8], v: &[u8]) -> Result<bool, Error> {
let removed = self.store.transaction(|txn| { let removed = (&self.store, &self.merkle_updater.todo).transaction(|(txn, mkl_todo)| {
if let Some(cur_v) = txn.get(k)? { if let Some(cur_v) = txn.get(k)? {
if cur_v == v { if cur_v == v {
txn.remove(k)?; txn.remove(k)?;
mkl_todo.insert(k, vec![])?;
return Ok(true); return Ok(true);
} }
} }
Ok(false) Ok(false)
})?; })?;
if removed { if removed {
let old_entry = self.decode_entry(v)?; let old_entry = self.decode_entry(v)?;
self.instance.updated(Some(old_entry), None); self.instance.updated(Some(old_entry), None);

View file

@ -106,7 +106,7 @@ where
F: TableSchema + 'static, F: TableSchema + 'static,
R: TableReplication + 'static, R: TableReplication + 'static,
{ {
pub(crate) async fn launch(table: Arc<Table<F, R>>) -> Arc<Self> { pub(crate) fn launch(table: Arc<Table<F, R>>) -> Arc<Self> {
let todo = SyncTodo { todo: Vec::new() }; let todo = SyncTodo { todo: Vec::new() };
let syncer = Arc::new(TableSyncer { let syncer = Arc::new(TableSyncer {
table: table.clone(), table: table.clone(),
@ -119,24 +119,16 @@ where
let (busy_tx, busy_rx) = mpsc::unbounded_channel(); let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone(); let s1 = syncer.clone();
table table.system.background.spawn_worker(
.system
.background
.spawn_worker(
format!("table sync watcher for {}", table.name), format!("table sync watcher for {}", table.name),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx), move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
) );
.await;
let s2 = syncer.clone(); let s2 = syncer.clone();
table table.system.background.spawn_worker(
.system
.background
.spawn_worker(
format!("table syncer for {}", table.name), format!("table syncer for {}", table.name),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx), move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
) );
.await;
let s3 = syncer.clone(); let s3 = syncer.clone();
tokio::spawn(async move { tokio::spawn(async move {
@ -630,7 +622,7 @@ where
} }
} }
if diff_items.len() > 0 { if diff_items.len() > 0 {
self.table.handle_update(&diff_items[..]).await?; self.table.handle_update(&diff_items[..])?;
} }
if items_to_send.len() > 0 { if items_to_send.len() > 0 {
self.send_items(who, items_to_send).await?; self.send_items(who, items_to_send).await?;

View file

@ -1,11 +1,11 @@
use core::future::Future; use core::future::Future;
use std::pin::Pin; use std::pin::Pin;
use std::sync::Mutex;
use futures::future::join_all; use futures::future::join_all;
use futures::select; use futures::select;
use futures_util::future::*; use futures_util::future::*;
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::sync::{mpsc, watch, Notify}; use tokio::sync::{mpsc, watch, Notify};
use crate::error::Error; use crate::error::Error;
@ -38,7 +38,7 @@ impl BackgroundRunner {
} }
pub async fn run(self: Arc<Self>) { pub async fn run(self: Arc<Self>) {
let mut workers = self.workers.lock().await; let mut workers = self.workers.lock().unwrap();
for i in 0..self.n_runners { for i in 0..self.n_runners {
workers.push(tokio::spawn(self.clone().runner(i))); workers.push(tokio::spawn(self.clone().runner(i)));
} }
@ -47,7 +47,7 @@ impl BackgroundRunner {
let mut stop_signal = self.stop_signal.clone(); let mut stop_signal = self.stop_signal.clone();
while let Some(exit_now) = stop_signal.recv().await { while let Some(exit_now) = stop_signal.recv().await {
if exit_now { if exit_now {
let mut workers = self.workers.lock().await; let mut workers = self.workers.lock().unwrap();
let workers_vec = workers.drain(..).collect::<Vec<_>>(); let workers_vec = workers.drain(..).collect::<Vec<_>>();
join_all(workers_vec).await; join_all(workers_vec).await;
return; return;
@ -73,12 +73,12 @@ impl BackgroundRunner {
self.job_notify.notify(); self.job_notify.notify();
} }
pub async fn spawn_worker<F, T>(&self, name: String, worker: F) pub fn spawn_worker<F, T>(&self, name: String, worker: F)
where where
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static, F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
T: Future<Output = JobOutput> + Send + 'static, T: Future<Output = JobOutput> + Send + 'static,
{ {
let mut workers = self.workers.lock().await; let mut workers = self.workers.lock().unwrap();
let stop_signal = self.stop_signal.clone(); let stop_signal = self.stop_signal.clone();
workers.push(tokio::spawn(async move { workers.push(tokio::spawn(async move {
if let Err(e) = worker(stop_signal).await { if let Err(e) = worker(stop_signal).await {
@ -93,7 +93,7 @@ impl BackgroundRunner {
let mut stop_signal = self.stop_signal.clone(); let mut stop_signal = self.stop_signal.clone();
loop { loop {
let must_exit: bool = *stop_signal.borrow(); let must_exit: bool = *stop_signal.borrow();
if let Some(job) = self.dequeue_job(must_exit).await { if let Some(job) = self.dequeue_job(must_exit) {
if let Err(e) = job.await { if let Err(e) = job.await {
error!("Job failed: {}", e) error!("Job failed: {}", e)
} }
@ -110,8 +110,8 @@ impl BackgroundRunner {
} }
} }
async fn dequeue_job(&self, must_exit: bool) -> Option<Job> { fn dequeue_job(&self, must_exit: bool) -> Option<Job> {
let mut queue = self.queue_out.lock().await; let mut queue = self.queue_out.lock().unwrap();
while let Ok((job, cancellable)) = queue.try_recv() { while let Ok((job, cancellable)) = queue.try_recv() {
if cancellable && must_exit { if cancellable && must_exit {
continue; continue;