From ab3a409d04be93b3980136ea6de71b7cd9f1016e Mon Sep 17 00:00:00 2001 From: Julien Kritter Date: Thu, 7 Nov 2024 13:54:30 +0100 Subject: [PATCH] Implement a queue crate to replace the todo trees --- Cargo.lock | 197 +++++++++++++++++++++++++++++++++++++ Cargo.toml | 4 + src/garage/Cargo.toml | 4 +- src/garage/admin/mod.rs | 6 +- src/model/Cargo.toml | 1 + src/model/garage.rs | 36 +++++-- src/model/index_counter.rs | 3 + src/table/Cargo.toml | 1 + src/table/data.rs | 63 ++++++------ src/table/gc.rs | 56 ++++------- src/table/merkle.rs | 22 +---- src/table/metrics.rs | 32 ------ src/table/table.rs | 5 +- src/todo/Cargo.toml | 22 +++++ src/todo/lib.rs | 56 +++++++++++ src/todo/open.rs | 66 +++++++++++++ src/todo/test.rs | 1 + src/todo/yaque_adapter.rs | 85 ++++++++++++++++ src/util/Cargo.toml | 1 + src/util/config.rs | 9 ++ src/util/error.rs | 3 + 21 files changed, 535 insertions(+), 138 deletions(-) create mode 100644 src/todo/Cargo.toml create mode 100644 src/todo/lib.rs create mode 100644 src/todo/open.rs create mode 100644 src/todo/test.rs create mode 100644 src/todo/yaque_adapter.rs diff --git a/Cargo.lock b/Cargo.lock index fa313874..322bb388 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1188,6 +1188,18 @@ dependencies = [ "subtle", ] +[[package]] +name = "filetime" +version = "0.2.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall 0.4.1", + "windows-sys 0.52.0", +] + [[package]] name = "fixedbitset" version = "0.4.2" @@ -1213,6 +1225,15 @@ dependencies = [ name = "format_table" version = "0.1.1" +[[package]] +name = "fsevent-sys" +version = "4.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2" +dependencies = [ + "libc", +] + [[package]] name = "futures" version = "0.3.30" @@ -1325,6 +1346,7 @@ dependencies = [ "garage_net", "garage_rpc", "garage_table", + "garage_todo", "garage_util", "garage_web", "git-version", @@ -1471,6 +1493,7 @@ dependencies = [ "garage_net", "garage_rpc", "garage_table", + "garage_todo", "garage_util", "hex", "http 1.0.0", @@ -1557,6 +1580,7 @@ dependencies = [ "futures-util", "garage_db", "garage_rpc", + "garage_todo", "garage_util", "hex", "hexdump", @@ -1568,6 +1592,15 @@ dependencies = [ "tracing", ] +[[package]] +name = "garage_todo" +version = "1.0.1" +dependencies = [ + "err-derive", + "tracing", + "yaque", +] + [[package]] name = "garage_util" version = "1.0.1" @@ -1583,6 +1616,7 @@ dependencies = [ "futures", "garage_db", "garage_net", + "garage_todo", "hex", "hexdump", "http 1.0.0", @@ -2112,6 +2146,26 @@ dependencies = [ "hashbrown 0.14.3", ] +[[package]] +name = "inotify" +version = "0.9.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff" +dependencies = [ + "bitflags 1.3.2", + "inotify-sys", + "libc", +] + +[[package]] +name = "inotify-sys" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb" +dependencies = [ + "libc", +] + [[package]] name = "inout" version = "0.1.3" @@ -2268,6 +2322,26 @@ dependencies = [ "serde_json", ] +[[package]] +name = "kqueue" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c" +dependencies = [ + "kqueue-sys", + "libc", +] + +[[package]] +name = "kqueue-sys" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b" +dependencies = [ + "bitflags 1.3.2", + "libc", +] + [[package]] name = "kube" version = "0.88.1" @@ -2519,6 +2593,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" dependencies = [ "libc", + "log", "wasi", "windows-sys 0.48.0", ] @@ -2583,6 +2658,33 @@ dependencies = [ "minimal-lexical", ] +[[package]] +name = "notify" +version = "5.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486" +dependencies = [ + "bitflags 1.3.2", + "crossbeam-channel", + "filetime", + "fsevent-sys", + "inotify", + "kqueue", + "libc", + "mio", + "walkdir", + "windows-sys 0.45.0", +] + +[[package]] +name = "ntapi" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4" +dependencies = [ + "winapi", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3987,6 +4089,20 @@ dependencies = [ "unicode-xid", ] +[[package]] +name = "sysinfo" +version = "0.28.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4c2f3ca6693feb29a89724516f016488e9aafc7f37264f898593ee4b942f31b" +dependencies = [ + "cfg-if", + "core-foundation-sys", + "libc", + "ntapi", + "once_cell", + "winapi", +] + [[package]] name = "syslog-tracing" version = "0.3.0" @@ -4733,6 +4849,15 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "windows-sys" +version = "0.45.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0" +dependencies = [ + "windows-targets 0.42.2", +] + [[package]] name = "windows-sys" version = "0.48.0" @@ -4751,6 +4876,21 @@ dependencies = [ "windows-targets 0.52.0", ] +[[package]] +name = "windows-targets" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071" +dependencies = [ + "windows_aarch64_gnullvm 0.42.2", + "windows_aarch64_msvc 0.42.2", + "windows_i686_gnu 0.42.2", + "windows_i686_msvc 0.42.2", + "windows_x86_64_gnu 0.42.2", + "windows_x86_64_gnullvm 0.42.2", + "windows_x86_64_msvc 0.42.2", +] + [[package]] name = "windows-targets" version = "0.48.5" @@ -4781,6 +4921,12 @@ dependencies = [ "windows_x86_64_msvc 0.52.0", ] +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8" + [[package]] name = "windows_aarch64_gnullvm" version = "0.48.5" @@ -4793,6 +4939,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" +[[package]] +name = "windows_aarch64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43" + [[package]] name = "windows_aarch64_msvc" version = "0.48.5" @@ -4805,6 +4957,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" +[[package]] +name = "windows_i686_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f" + [[package]] name = "windows_i686_gnu" version = "0.48.5" @@ -4817,6 +4975,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" +[[package]] +name = "windows_i686_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060" + [[package]] name = "windows_i686_msvc" version = "0.48.5" @@ -4829,6 +4993,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" +[[package]] +name = "windows_x86_64_gnu" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36" + [[package]] name = "windows_x86_64_gnu" version = "0.48.5" @@ -4841,6 +5011,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3" + [[package]] name = "windows_x86_64_gnullvm" version = "0.48.5" @@ -4853,6 +5029,12 @@ version = "0.52.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" +[[package]] +name = "windows_x86_64_msvc" +version = "0.42.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0" + [[package]] name = "windows_x86_64_msvc" version = "0.48.5" @@ -4896,6 +5078,21 @@ version = "0.8.8" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61" +[[package]] +name = "yaque" +version = "0.6.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "487f1a92dacd945cc5bc78a8193cc00b9a2cce3c07746ca51533f513843f40d2" +dependencies = [ + "futures", + "lazy_static", + "log", + "notify", + "rand", + "semver", + "sysinfo", +] + [[package]] name = "zerocopy" version = "0.7.32" diff --git a/Cargo.toml b/Cargo.toml index f327763e..43ff306f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,6 +2,7 @@ resolver = "2" members = [ "src/db", + "src/todo", "src/util", "src/net", "src/rpc", @@ -24,6 +25,7 @@ format_table = { version = "0.1.1", path = "src/format-table" } garage_api = { version = "1.0.1", path = "src/api" } garage_block = { version = "1.0.1", path = "src/block" } garage_db = { version = "1.0.1", path = "src/db", default-features = false } +garage_todo = { version = "1.0.1", path = "src/todo", default-features = false } garage_model = { version = "1.0.1", path = "src/model", default-features = false } garage_net = { version = "1.0.1", path = "src/net" } garage_rpc = { version = "1.0.1", path = "src/rpc" } @@ -86,6 +88,8 @@ rusqlite = "0.31.0" r2d2 = "0.8" r2d2_sqlite = "0.24" +yaque = "0.6" + async-compression = { version = "0.4", features = ["tokio", "zstd"] } zstd = { version = "0.13", default-features = false } diff --git a/src/garage/Cargo.toml b/src/garage/Cargo.toml index 483e33c0..dbd85291 100644 --- a/src/garage/Cargo.toml +++ b/src/garage/Cargo.toml @@ -23,6 +23,7 @@ path = "tests/lib.rs" [dependencies] format_table.workspace = true garage_db.workspace = true +garage_todo.workspace = true garage_api.workspace = true garage_block.workspace = true garage_model.workspace = true @@ -82,13 +83,14 @@ k2v-client.workspace = true [features] -default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ] +default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v", "yaque" ] k2v = [ "garage_util/k2v", "garage_api/k2v" ] # Database engines lmdb = [ "garage_model/lmdb" ] sqlite = [ "garage_model/sqlite" ] +yaque = [ "garage_todo/yaque" ] # Automatic registration and discovery via Consul API consul-discovery = [ "garage_rpc/consul-discovery" ] diff --git a/src/garage/admin/mod.rs b/src/garage/admin/mod.rs index e2468143..967d8c7b 100644 --- a/src/garage/admin/mod.rs +++ b/src/garage/admin/mod.rs @@ -197,7 +197,7 @@ impl AdminRpcHandler { writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); // Gather table statistics - let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; + let mut table = vec![" Table\tItems\tMklItems".into()]; table.push(self.gather_table_stats(&self.garage.bucket_table)?); table.push(self.gather_table_stats(&self.garage.key_table)?); table.push(self.gather_table_stats(&self.garage.object_table)?); @@ -343,12 +343,10 @@ impl AdminRpcHandler { let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); Ok(format!( - " {}\t{}\t{}\t{}\t{}", + " {}\t{}\t{}", F::TABLE_NAME, data_len, mkl_len, - t.merkle_updater.todo_len()?, - t.data.gc_todo_len()? )) } diff --git a/src/model/Cargo.toml b/src/model/Cargo.toml index 12931a4c..86f699cb 100644 --- a/src/model/Cargo.toml +++ b/src/model/Cargo.toml @@ -15,6 +15,7 @@ path = "lib.rs" [dependencies] garage_db.workspace = true +garage_todo.workspace = true garage_rpc.workspace = true garage_table.workspace = true garage_block.workspace = true diff --git a/src/model/garage.rs b/src/model/garage.rs index 363b02dd..5953fd82 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -4,6 +4,7 @@ use std::sync::Arc; use garage_net::NetworkKey; use garage_db as db; +use garage_todo as todo; use garage_util::background::*; use garage_util::config::*; @@ -45,6 +46,8 @@ pub struct Garage { /// The local database pub db: db::Db, + /// The todo queues + pub todo: todo::Todo, /// The membership manager pub system: Arc, /// The block manager @@ -135,6 +138,18 @@ impl Garage { let db = db::open_db(&db_path, db_engine, &db_opt) .ok_or_message("Unable to open metadata db")?; + info!("Initializing queues..."); + let todo_engine = todo::Engine::from_str(&config.todo_engine) + .ok_or_message("Invalid `todo_engine` value in configuration file")?; + let mut todo_path = config.metadata_dir.clone(); + match todo_engine { + todo::Engine::Yaque => { + todo_path.push("todo.yaque") + } + } + let todo = todo::open_todo(&todo_path, todo_engine) + .ok_or_message("Unable to start todo engine")?; + info!("Initializing RPC..."); let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message( "rpc_secret value is missing, not present in config file or in environment", @@ -175,7 +190,7 @@ impl Garage { // ---- admin tables ---- info!("Initialize bucket_table..."); - let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); + let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db, &todo); info!("Initialize bucket_alias_table..."); let bucket_alias_table = Table::new( @@ -183,9 +198,10 @@ impl Garage { control_rep_param.clone(), system.clone(), &db, + &todo, ); info!("Initialize key_table_table..."); - let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); + let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db, &todo); // ---- S3 tables ---- info!("Initialize block_ref_table..."); @@ -196,6 +212,7 @@ impl Garage { meta_rep_param.clone(), system.clone(), &db, + &todo, ); info!("Initialize version_table..."); @@ -206,10 +223,11 @@ impl Garage { meta_rep_param.clone(), system.clone(), &db, + &todo, ); info!("Initialize multipart upload counter table..."); - let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo); info!("Initialize multipart upload table..."); let mpu_table = Table::new( @@ -220,10 +238,11 @@ impl Garage { meta_rep_param.clone(), system.clone(), &db, + &todo, ); info!("Initialize object counter table..."); - let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); + let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo); info!("Initialize object_table..."); #[allow(clippy::redundant_clone)] @@ -236,6 +255,7 @@ impl Garage { meta_rep_param.clone(), system.clone(), &db, + &todo, ); info!("Load lifecycle worker state..."); @@ -245,7 +265,7 @@ impl Garage { // ---- K2V ---- #[cfg(feature = "k2v")] - let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); + let k2v = GarageK2V::new(system.clone(), &db, &todo, meta_rep_param); // ---- setup block refcount recalculation ---- // this function can be used to fix inconsistencies in the RC table @@ -261,6 +281,7 @@ impl Garage { bg_vars, replication_factor, db, + todo, system, block_manager, bucket_table, @@ -335,9 +356,9 @@ impl Garage { #[cfg(feature = "k2v")] impl GarageK2V { - fn new(system: Arc, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { + fn new(system: Arc, db: &db::Db, todo: &todo::Todo, meta_rep_param: TableShardedReplication) -> Self { info!("Initialize K2V counter table..."); - let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); + let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, todo); info!("Initialize K2V subscription manager..."); let subscriptions = Arc::new(SubscriptionManager::new()); @@ -351,6 +372,7 @@ impl GarageK2V { meta_rep_param, system.clone(), db, + todo, ); info!("Initialize K2V RPC handler..."); diff --git a/src/model/index_counter.rs b/src/model/index_counter.rs index aa13ee7b..0457f6bf 100644 --- a/src/model/index_counter.rs +++ b/src/model/index_counter.rs @@ -6,6 +6,7 @@ use std::sync::Arc; use serde::{Deserialize, Serialize}; use garage_db as db; +use garage_todo as todo; use garage_rpc::layout::LayoutHelper; use garage_rpc::system::System; @@ -173,6 +174,7 @@ impl IndexCounter { system: Arc, replication: TableShardedReplication, db: &db::Db, + todo: &todo::Todo, ) -> Arc { Arc::new(Self { this_node: system.id, @@ -186,6 +188,7 @@ impl IndexCounter { replication, system, db, + todo, ), }) } diff --git a/src/table/Cargo.toml b/src/table/Cargo.toml index e704cd3c..6efbd713 100644 --- a/src/table/Cargo.toml +++ b/src/table/Cargo.toml @@ -15,6 +15,7 @@ path = "lib.rs" [dependencies] garage_db.workspace = true +garage_todo.workspace = true garage_rpc.workspace = true garage_util.workspace = true diff --git a/src/table/data.rs b/src/table/data.rs index 09f4e008..3c009e45 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -6,6 +6,7 @@ use serde_bytes::ByteBuf; use tokio::sync::Notify; use garage_db as db; +use garage_todo as todo; use garage_util::data::*; use garage_util::error::*; @@ -29,19 +30,19 @@ pub struct TableData { pub store: db::Tree, pub(crate) merkle_tree: db::Tree, - pub(crate) merkle_todo: db::Tree, + pub(crate) merkle_todo: todo::Queue, pub(crate) merkle_todo_notify: Notify, pub(crate) insert_queue: db::Tree, pub(crate) insert_queue_notify: Arc, - pub(crate) gc_todo: db::Tree, + pub(crate) gc_todo: todo::Queue, pub(crate) metrics: TableMetrics, } impl TableData { - pub fn new(system: Arc, instance: F, replication: R, db: &db::Db) -> Arc { + pub fn new(system: Arc, instance: F, replication: R, db: &db::Db, todo: &todo::Todo) -> Arc { let store = db .open_tree(format!("{}:table", F::TABLE_NAME)) .expect("Unable to open DB tree"); @@ -49,24 +50,22 @@ impl TableData { let merkle_tree = db .open_tree(format!("{}:merkle_tree", F::TABLE_NAME)) .expect("Unable to open DB Merkle tree tree"); - let merkle_todo = db - .open_tree(format!("{}:merkle_todo", F::TABLE_NAME)) - .expect("Unable to open DB Merkle TODO tree"); + let merkle_todo = todo + .open_queue(format!("{}:merkle_todo", F::TABLE_NAME)) + .expect("Unable to open Merkle TODO queue"); let insert_queue = db .open_tree(format!("{}:insert_queue", F::TABLE_NAME)) .expect("Unable to open insert queue DB tree"); - let gc_todo = db - .open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME)) - .expect("Unable to open GC DB tree"); + let gc_todo = todo + .open_queue(format!("{}:gc_todo_v2", F::TABLE_NAME)) + .expect("Unable to open GC TODO queue"); let metrics = TableMetrics::new( F::TABLE_NAME, store.clone(), merkle_tree.clone(), - merkle_todo.clone(), - gc_todo.clone(), ); Arc::new(Self { @@ -227,22 +226,22 @@ impl TableData { if changed { let new_bytes_hash = blake2sum(&new_bytes); - tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?; tx.insert(&self.store, &tree_key, new_bytes)?; self.instance .updated(tx, old_entry.as_ref(), Some(&new_entry))?; - Ok(Some((new_entry, new_bytes_hash))) + Ok(Some((tree_key.clone(), new_entry, new_bytes_hash))) } else { Ok(None) } })?; - if let Some((new_entry, new_bytes_hash)) = changed { + if let Some((tree_key, new_entry, new_bytes_hash)) = changed { self.metrics.internal_update_counter.add(1); let is_tombstone = new_entry.is_tombstone(); + self.merkle_todo.submit(&tree_key, new_bytes_hash.as_slice())?; self.merkle_todo_notify.notify_one(); if is_tombstone { // We are only responsible for GC'ing this item if we are the @@ -272,21 +271,21 @@ impl TableData { .transaction(|tx| match tx.get(&self.store, k)? { Some(cur_v) if cur_v == v => { let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; - tx.remove(&self.store, k)?; - tx.insert(&self.merkle_todo, k, vec![])?; - self.instance.updated(tx, Some(&old_entry), None)?; - Ok(true) + Ok(Some(k)) } - _ => Ok(false), + _ => Ok(None), })?; - if removed { + if let Some(k) = removed { self.metrics.internal_delete_counter.add(1); + self.merkle_todo.submit(k, &[])?; self.merkle_todo_notify.notify_one(); - } - Ok(removed) + Ok(true) + } else { + Ok(false) + } } pub(crate) fn delete_if_equal_hash( @@ -300,21 +299,21 @@ impl TableData { .transaction(|tx| match tx.get(&self.store, k)? { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; - tx.remove(&self.store, k)?; - tx.insert(&self.merkle_todo, k, vec![])?; - self.instance.updated(tx, Some(&old_entry), None)?; - Ok(true) + Ok(Some(k)) } - _ => Ok(false), + _ => Ok(None), })?; - if removed { + if let Some(k) = removed { self.metrics.internal_delete_counter.add(1); + self.merkle_todo.submit(k, &[])?; self.merkle_todo_notify.notify_one(); - } - Ok(removed) + Ok(true) + } else { + Ok(false) + } } // ---- Insert queue functions ---- @@ -366,8 +365,4 @@ impl TableData { } } } - - pub fn gc_todo_len(&self) -> Result { - Ok(self.gc_todo.len()?) - } } diff --git a/src/table/gc.rs b/src/table/gc.rs index d30a1849..ae8ddc3e 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -10,7 +10,7 @@ use serde_bytes::ByteBuf; use futures::future::join_all; use tokio::sync::watch; -use garage_db as db; +use garage_todo as todo; use garage_util::background::*; use garage_util::data::*; @@ -76,8 +76,9 @@ impl TableGc { // These entries are put there when a tombstone is inserted in the table // (see update_entry in data.rs) let mut candidates = vec![]; - for entry_kv in self.data.gc_todo.iter()? { - let (k, vhash) = entry_kv?; + + while let Some(entry_kv) = self.data.gc_todo.reserve()? { + let (k, vhash) = entry_kv; let todo_entry = GcTodoEntry::parse(&k, &vhash); if todo_entry.deletion_time() > now { @@ -123,13 +124,6 @@ impl TableGc { } } - // Remove from gc_todo entries for tombstones where we have - // detected that the current value has changed and - // is no longer a tombstone. - for entry in excluded { - entry.remove_if_equal(&self.data.gc_todo)?; - } - // Remaining in `entries` is the list of entries we want to GC, // and for which they are still currently tombstones in the table. @@ -168,16 +162,22 @@ impl TableGc { let resps = join_all( partitions .into_iter() - .map(|(nodes, items)| self.try_send_and_delete(nodes, items)), + .map(|(nodes, items)| async { + let r = self.try_send_and_delete(nodes, &items).await; + (items, r) + }), ) .await; // Collect errors and return a single error value even if several - // errors occurred. + // errors occurred. Push failed items back into the GC queue. let mut errs = vec![]; - for resp in resps { + for (items, resp) in resps { if let Err(e) = resp { errs.push(e); + for item in items { + item.save(&self.data.gc_todo)?; + } } } @@ -197,7 +197,7 @@ impl TableGc { async fn try_send_and_delete( &self, nodes: Vec, - mut items: Vec, + items: &Vec, ) -> Result<(), Error> { let n_items = items.len(); @@ -216,8 +216,8 @@ impl TableGc { // and in deletes the list of keys and hashes of value for step 2. let mut updates = vec![]; let mut deletes = vec![]; - for item in items.iter_mut() { - updates.push(ByteBuf::from(item.value.take().unwrap())); + for item in items { + updates.push(ByteBuf::from(item.value.clone().unwrap())); deletes.push((ByteBuf::from(item.key.clone()), item.value_hash)); } @@ -264,8 +264,6 @@ impl TableGc { self.data .delete_if_equal_hash(&item.key[..], item.value_hash) .err_context("GC: local delete tombstones")?; - item.remove_if_equal(&self.data.gc_todo) - .err_context("GC: remove from todo list after successfull GC")?; } Ok(()) @@ -313,7 +311,7 @@ impl Worker for GcWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), + queue_length: Some(0 as u64), ..Default::default() } } @@ -376,24 +374,8 @@ impl GcTodoEntry { } /// Saves the GcTodoEntry in the gc_todo tree - pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { - gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; - Ok(()) - } - - /// Removes the GcTodoEntry from the gc_todo tree if the - /// hash of the serialized value is the same here as in the tree. - /// This is usefull to remove a todo entry only under the condition - /// that it has not changed since the time it was read, i.e. - /// what we have to do is still the same - pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { - gc_todo_tree.db().transaction(|txn| { - let key = self.todo_table_key(); - if txn.get(gc_todo_tree, &key)?.as_deref() == Some(self.value_hash.as_slice()) { - txn.remove(gc_todo_tree, &key)?; - } - Ok(()) - })?; + pub(crate) fn save(&self, gc_todo_tree: &todo::Queue) -> Result<(), Error> { + gc_todo_tree.submit(&self.todo_table_key(), self.value_hash.as_slice())?; Ok(()) } diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 596d5805..9a40c3d2 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -81,7 +81,7 @@ impl MerkleUpdater { } fn updater_loop_iter(&self) -> Result { - if let Some((key, valhash)) = self.data.merkle_todo.first()? { + if let Some((key, valhash)) = self.data.merkle_todo.reserve()? { self.update_item(&key, &valhash)?; Ok(WorkerState::Busy) } else { @@ -110,21 +110,6 @@ impl MerkleUpdater { .db() .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; - let deleted = self.data.merkle_todo.db().transaction(|tx| { - let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by); - if remove { - tx.remove(&self.data.merkle_todo, k)?; - } - Ok(remove) - })?; - - if !deleted { - debug!( - "({}) Item not deleted from Merkle todo because it changed: {:?}", - F::TABLE_NAME, - k - ); - } Ok(()) } @@ -290,10 +275,6 @@ impl MerkleUpdater { pub fn merkle_tree_len(&self) -> Result { Ok(self.data.merkle_tree.len()?) } - - pub fn todo_len(&self) -> Result { - Ok(self.data.merkle_todo.len()?) - } } struct MerkleWorker(Arc>); @@ -306,7 +287,6 @@ impl Worker for MerkleWorker { fn status(&self) -> WorkerStatus { WorkerStatus { - queue_length: Some(self.0.todo_len().unwrap_or(0) as u64), ..Default::default() } } diff --git a/src/table/metrics.rs b/src/table/metrics.rs index 7bb0959a..94131bb5 100644 --- a/src/table/metrics.rs +++ b/src/table/metrics.rs @@ -6,8 +6,6 @@ use garage_db as db; pub struct TableMetrics { pub(crate) _table_size: ValueObserver, pub(crate) _merkle_tree_size: ValueObserver, - pub(crate) _merkle_todo_len: ValueObserver, - pub(crate) _gc_todo_len: ValueObserver, pub(crate) get_request_counter: BoundCounter, pub(crate) get_request_duration: BoundValueRecorder, @@ -25,8 +23,6 @@ impl TableMetrics { table_name: &'static str, store: db::Tree, merkle_tree: db::Tree, - merkle_todo: db::Tree, - gc_todo: db::Tree, ) -> Self { let meter = global::meter(table_name); TableMetrics { @@ -58,34 +54,6 @@ impl TableMetrics { ) .with_description("Number of nodes in table's Merkle tree") .init(), - _merkle_todo_len: meter - .u64_value_observer( - "table.merkle_updater_todo_queue_length", - move |observer| { - if let Ok(v) = merkle_todo.len() { - observer.observe( - v as u64, - &[KeyValue::new("table_name", table_name)], - ); - } - }, - ) - .with_description("Merkle tree updater TODO queue length") - .init(), - _gc_todo_len: meter - .u64_value_observer( - "table.gc_todo_queue_length", - move |observer| { - if let Ok(value) = gc_todo.len() { - observer.observe( - value as u64, - &[KeyValue::new("table_name", table_name)], - ); - } - }, - ) - .with_description("Table garbage collector TODO queue length") - .init(), get_request_counter: meter .u64_counter("table.get_request_counter") diff --git a/src/table/table.rs b/src/table/table.rs index a5be2910..e9b35704 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -13,6 +13,7 @@ use opentelemetry::{ }; use garage_db as db; +use garage_todo as todo; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -69,12 +70,12 @@ impl Rpc for TableRpc { impl Table { // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== - pub fn new(instance: F, replication: R, system: Arc, db: &db::Db) -> Arc { + pub fn new(instance: F, replication: R, system: Arc, db: &db::Db, todo: &todo::Todo) -> Arc { let endpoint = system .netapp .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); - let data = TableData::new(system.clone(), instance, replication, db); + let data = TableData::new(system.clone(), instance, replication, db, todo); let merkle_updater = MerkleUpdater::new(data.clone()); diff --git a/src/todo/Cargo.toml b/src/todo/Cargo.toml new file mode 100644 index 00000000..b178b9d5 --- /dev/null +++ b/src/todo/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "garage_todo" +version = "1.0.1" +authors = ["Julien Kritter ", "Alex Auvolat "] +edition = "2018" +license = "AGPL-3.0" +description = "Abstraction over multiple queue implementations" +repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage" +readme = "../../README.md" + +[lib] +path = "lib.rs" + +[dependencies] +err-derive.workspace = true +tracing.workspace = true + +yaque = { workspace = true, optional = true } + +[features] +default = [ "yaque" ] +yaque = [ "dep:yaque" ] diff --git a/src/todo/lib.rs b/src/todo/lib.rs new file mode 100644 index 00000000..604bb992 --- /dev/null +++ b/src/todo/lib.rs @@ -0,0 +1,56 @@ +#[macro_use] +extern crate tracing; + +#[cfg(feature = "yaque")] +pub mod yaque_adapter; + +#[cfg(test)] +pub mod test; + +pub mod open; +pub use open::*; + +use std::sync::Arc; +use std::borrow::Cow; + +use err_derive::Error; + +#[derive(Clone)] +pub struct Todo(pub(crate) Arc); +#[derive(Clone)] +pub struct Queue(pub(crate) Arc, usize); + +#[derive(Debug, Error)] +#[error(display = "{}", _0)] +pub struct Error(pub Cow<'static, str>); + +pub type Result = std::result::Result; + +impl From for Error { + fn from(e: std::io::Error) -> Error { + Error(format!("IO: {}", e).into()) + } +} + +pub(crate) trait ITodo: Send + Sync { + fn open_queue(&self, name: &str) -> Result; + fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()>; + fn reserve(&self, queue_id: usize) -> Result, Vec)>>; +} + +impl Todo { + pub fn open_queue>(&self, name: S) -> Result { + let queue_id = self.0.open_queue(name.as_ref())?; + Ok(Queue(self.0.clone(), queue_id)) + } +} + +impl Queue { + pub fn submit(&self, k: &[u8], v: &[u8]) -> Result<()> { + self.0.submit(self.1, k, v) + } + + pub fn reserve(&self) -> Result, Vec)>> { + self.0.reserve(self.1) + } +} diff --git a/src/todo/open.rs b/src/todo/open.rs new file mode 100644 index 00000000..80f2f170 --- /dev/null +++ b/src/todo/open.rs @@ -0,0 +1,66 @@ +use std::path::PathBuf; + +use crate::{Todo, Result, Error}; + +/// List of supported todo engine types +/// +/// The `enum` holds list of *all* todo engines that are are be supported by crate, no matter +/// if relevant feature is enabled or not. It allows us to distinguish between invalid engine +/// and valid engine, whose support is not enabled via feature flag. +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum Engine { + Yaque, +} + +impl Engine { + /// Return variant name as static `&str` + pub fn as_str(&self) -> &'static str { + match self { + Self::Yaque => "yaque", + } + } +} + +impl std::fmt::Display for Engine { + fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result { + self.as_str().fmt(fmt) + } +} + +impl std::str::FromStr for Engine { + type Err = Error; + + fn from_str(text: &str) -> Result { + match text { + #[cfg(feature = "yaque")] + "yaque" => Ok(Self::Yaque), + kind => Err(Error( + format!( + "Invalid todo engine: {} (options are: yaque)", + kind + ) + .into(), + )), + } + } +} + +pub fn open_todo(path: &PathBuf, engine: Engine) -> Result { + match engine { + // ---- Yaque ---- + #[cfg(feature = "yaque")] + Engine::Yaque => { + info!("Opening Yaque todo at: {}", path.display()); + let engine = crate::yaque_adapter::Yaque::init(path); + Ok(engine) + } + + // Pattern is unreachable when all supported todo engines are compiled into binary. The allow + // attribute is added so that we won't have to change this match in case stop building + // support for one or more engines by default. + #[allow(unreachable_patterns)] + engine => Err(Error( + format!("Todo engine support not available in this build: {}", engine).into(), + )), + } +} diff --git a/src/todo/test.rs b/src/todo/test.rs new file mode 100644 index 00000000..c7b7e813 --- /dev/null +++ b/src/todo/test.rs @@ -0,0 +1 @@ +use crate::*; diff --git a/src/todo/yaque_adapter.rs b/src/todo/yaque_adapter.rs new file mode 100644 index 00000000..807fbf4d --- /dev/null +++ b/src/todo/yaque_adapter.rs @@ -0,0 +1,85 @@ +use crate::*; + +use std::path::PathBuf; +use std::convert::TryInto; +use std::sync::RwLock; + +use yaque::{channel, Receiver, Sender, TrySendError, TryRecvError}; + +pub struct Yaque { + base_path: PathBuf, + // the outer lock prevents opening 2 queues at once + // the inner locks allow reserving on multiple queues at once + queues: RwLock>>, +} + +impl From>> for Error { + fn from(e: TrySendError>) -> Error { + Error(format!("Yaque: {}", e).into()) + } +} + +impl From for Error { + fn from(e: TryRecvError) -> Error { + let message = match e { + TryRecvError::Io(ee) => ee.to_string(), + TryRecvError::QueueEmpty => "empty queue".into(), + }; + Error(format!("Yaque: {}", message).into()) + } +} + +impl Yaque { + pub fn new(base_path: &PathBuf) -> Self { + Self { + base_path: base_path.clone(), + queues: RwLock::new(vec!()), + } + } + + pub fn init(base_path: &PathBuf) -> Todo { + Todo(Arc::new(Self::new(base_path))) + } +} + +impl ITodo for Yaque { + fn open_queue(&self, name: &str) -> Result { + let mut path = self.base_path.clone(); + path.push(name); + let mut queues = self.queues.write().unwrap(); + queues.push(RwLock::new(channel(path)?)); + Ok(queues.len() - 1) + } + + fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()> { + let queues = self.queues.read().unwrap(); + let mut queue = queues[queue_id].write().unwrap(); + let job = pack_kv(k, v); + queue.0.try_send(job)?; + Ok(()) + } + + fn reserve(&self, queue_id: usize) -> Result, Vec)>> { + let queues = self.queues.read().unwrap(); + let mut queue = queues[queue_id].write().unwrap(); + let job = queue.1.try_recv(); + match job { + Ok(j) => Ok(Some(unpack_kv(&j))), + Err(TryRecvError::Io(e)) => Err(e.into()), + Err(TryRecvError::QueueEmpty) => Ok(None), + } + } +} + +fn pack_kv(k: &[u8], v: &[u8]) -> Vec { + [&k.len().to_ne_bytes(), k, v].concat().to_vec() +} + +fn unpack_kv(job: &[u8]) -> (Vec, Vec) { + const LEN_SIZE: usize = std::mem::size_of::(); + let key_length_bytes = (&job[..LEN_SIZE]).try_into().unwrap(); + let key_length = usize::from_ne_bytes(key_length_bytes); + let k = job[LEN_SIZE..LEN_SIZE+key_length].to_vec(); + let v = job[LEN_SIZE+key_length..].to_vec(); + (k, v) +} diff --git a/src/util/Cargo.toml b/src/util/Cargo.toml index da3e39b8..89824a26 100644 --- a/src/util/Cargo.toml +++ b/src/util/Cargo.toml @@ -15,6 +15,7 @@ path = "lib.rs" [dependencies] garage_db.workspace = true +garage_todo.workspace = true garage_net.workspace = true arc-swap.workspace = true diff --git a/src/util/config.rs b/src/util/config.rs index 59329c0b..3dc901f3 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -111,6 +111,11 @@ pub struct Config { #[serde(default = "default_db_engine")] pub db_engine: String, + // -- TODO + /// queue engine to use for metadata (options: yaque) + #[serde(default = "default_todo_engine")] + pub todo_engine: String, + /// LMDB map size #[serde(deserialize_with = "deserialize_capacity", default)] pub lmdb_map_size: usize, @@ -256,6 +261,10 @@ fn default_db_engine() -> String { "lmdb".into() } +fn default_todo_engine() -> String { + "yaque".into() +} + fn default_block_size() -> usize { 1048576 } diff --git a/src/util/error.rs b/src/util/error.rs index 75fd3f9c..c05a9354 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -30,6 +30,9 @@ pub enum Error { #[error(display = "DB error: {}", _0)] Db(#[error(source)] garage_db::Error), + #[error(display = "Todo error: {}", _0)] + Todo(#[error(source)] garage_todo::Error), + #[error(display = "Messagepack encode error: {}", _0)] RmpEncode(#[error(source)] rmp_serde::encode::Error), #[error(display = "Messagepack decode error: {}", _0)]