Implement a queue crate to replace the todo trees

This commit is contained in:
Withings 2024-11-07 13:54:30 +01:00
parent a18b3f0d1f
commit ab3a409d04
Signed by: withings
GPG key ID: 7778B323E465AABB
21 changed files with 535 additions and 138 deletions

197
Cargo.lock generated
View file

@ -1188,6 +1188,18 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "filetime"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.4.1",
"windows-sys 0.52.0",
]
[[package]] [[package]]
name = "fixedbitset" name = "fixedbitset"
version = "0.4.2" version = "0.4.2"
@ -1213,6 +1225,15 @@ dependencies = [
name = "format_table" name = "format_table"
version = "0.1.1" version = "0.1.1"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "futures" name = "futures"
version = "0.3.30" version = "0.3.30"
@ -1325,6 +1346,7 @@ dependencies = [
"garage_net", "garage_net",
"garage_rpc", "garage_rpc",
"garage_table", "garage_table",
"garage_todo",
"garage_util", "garage_util",
"garage_web", "garage_web",
"git-version", "git-version",
@ -1471,6 +1493,7 @@ dependencies = [
"garage_net", "garage_net",
"garage_rpc", "garage_rpc",
"garage_table", "garage_table",
"garage_todo",
"garage_util", "garage_util",
"hex", "hex",
"http 1.0.0", "http 1.0.0",
@ -1557,6 +1580,7 @@ dependencies = [
"futures-util", "futures-util",
"garage_db", "garage_db",
"garage_rpc", "garage_rpc",
"garage_todo",
"garage_util", "garage_util",
"hex", "hex",
"hexdump", "hexdump",
@ -1568,6 +1592,15 @@ dependencies = [
"tracing", "tracing",
] ]
[[package]]
name = "garage_todo"
version = "1.0.1"
dependencies = [
"err-derive",
"tracing",
"yaque",
]
[[package]] [[package]]
name = "garage_util" name = "garage_util"
version = "1.0.1" version = "1.0.1"
@ -1583,6 +1616,7 @@ dependencies = [
"futures", "futures",
"garage_db", "garage_db",
"garage_net", "garage_net",
"garage_todo",
"hex", "hex",
"hexdump", "hexdump",
"http 1.0.0", "http 1.0.0",
@ -2112,6 +2146,26 @@ dependencies = [
"hashbrown 0.14.3", "hashbrown 0.14.3",
] ]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "inout" name = "inout"
version = "0.1.3" version = "0.1.3"
@ -2268,6 +2322,26 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "kqueue"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]] [[package]]
name = "kube" name = "kube"
version = "0.88.1" version = "0.88.1"
@ -2519,6 +2593,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [ dependencies = [
"libc", "libc",
"log",
"wasi", "wasi",
"windows-sys 0.48.0", "windows-sys 0.48.0",
] ]
@ -2583,6 +2658,33 @@ dependencies = [
"minimal-lexical", "minimal-lexical",
] ]
[[package]]
name = "notify"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
dependencies = [
"bitflags 1.3.2",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"windows-sys 0.45.0",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]] [[package]]
name = "nu-ansi-term" name = "nu-ansi-term"
version = "0.46.0" version = "0.46.0"
@ -3987,6 +4089,20 @@ dependencies = [
"unicode-xid", "unicode-xid",
] ]
[[package]]
name = "sysinfo"
version = "0.28.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c2f3ca6693feb29a89724516f016488e9aafc7f37264f898593ee4b942f31b"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"winapi",
]
[[package]] [[package]]
name = "syslog-tracing" name = "syslog-tracing"
version = "0.3.0" version = "0.3.0"
@ -4733,6 +4849,15 @@ dependencies = [
"windows-targets 0.52.0", "windows-targets 0.52.0",
] ]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]] [[package]]
name = "windows-sys" name = "windows-sys"
version = "0.48.0" version = "0.48.0"
@ -4751,6 +4876,21 @@ dependencies = [
"windows-targets 0.52.0", "windows-targets 0.52.0",
] ]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]] [[package]]
name = "windows-targets" name = "windows-targets"
version = "0.48.5" version = "0.48.5"
@ -4781,6 +4921,12 @@ dependencies = [
"windows_x86_64_msvc 0.52.0", "windows_x86_64_msvc 0.52.0",
] ]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]] [[package]]
name = "windows_aarch64_gnullvm" name = "windows_aarch64_gnullvm"
version = "0.48.5" version = "0.48.5"
@ -4793,6 +4939,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea" checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]] [[package]]
name = "windows_aarch64_msvc" name = "windows_aarch64_msvc"
version = "0.48.5" version = "0.48.5"
@ -4805,6 +4957,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef" checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]] [[package]]
name = "windows_i686_gnu" name = "windows_i686_gnu"
version = "0.48.5" version = "0.48.5"
@ -4817,6 +4975,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313" checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]] [[package]]
name = "windows_i686_msvc" name = "windows_i686_msvc"
version = "0.48.5" version = "0.48.5"
@ -4829,6 +4993,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a" checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]] [[package]]
name = "windows_x86_64_gnu" name = "windows_x86_64_gnu"
version = "0.48.5" version = "0.48.5"
@ -4841,6 +5011,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd" checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]] [[package]]
name = "windows_x86_64_gnullvm" name = "windows_x86_64_gnullvm"
version = "0.48.5" version = "0.48.5"
@ -4853,6 +5029,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e" checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]] [[package]]
name = "windows_x86_64_msvc" name = "windows_x86_64_msvc"
version = "0.48.5" version = "0.48.5"
@ -4896,6 +5078,21 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61" checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61"
[[package]]
name = "yaque"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487f1a92dacd945cc5bc78a8193cc00b9a2cce3c07746ca51533f513843f40d2"
dependencies = [
"futures",
"lazy_static",
"log",
"notify",
"rand",
"semver",
"sysinfo",
]
[[package]] [[package]]
name = "zerocopy" name = "zerocopy"
version = "0.7.32" version = "0.7.32"

View file

@ -2,6 +2,7 @@
resolver = "2" resolver = "2"
members = [ members = [
"src/db", "src/db",
"src/todo",
"src/util", "src/util",
"src/net", "src/net",
"src/rpc", "src/rpc",
@ -24,6 +25,7 @@ format_table = { version = "0.1.1", path = "src/format-table" }
garage_api = { version = "1.0.1", path = "src/api" } garage_api = { version = "1.0.1", path = "src/api" }
garage_block = { version = "1.0.1", path = "src/block" } garage_block = { version = "1.0.1", path = "src/block" }
garage_db = { version = "1.0.1", path = "src/db", default-features = false } garage_db = { version = "1.0.1", path = "src/db", default-features = false }
garage_todo = { version = "1.0.1", path = "src/todo", default-features = false }
garage_model = { version = "1.0.1", path = "src/model", default-features = false } garage_model = { version = "1.0.1", path = "src/model", default-features = false }
garage_net = { version = "1.0.1", path = "src/net" } garage_net = { version = "1.0.1", path = "src/net" }
garage_rpc = { version = "1.0.1", path = "src/rpc" } garage_rpc = { version = "1.0.1", path = "src/rpc" }
@ -86,6 +88,8 @@ rusqlite = "0.31.0"
r2d2 = "0.8" r2d2 = "0.8"
r2d2_sqlite = "0.24" r2d2_sqlite = "0.24"
yaque = "0.6"
async-compression = { version = "0.4", features = ["tokio", "zstd"] } async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false } zstd = { version = "0.13", default-features = false }

View file

@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies] [dependencies]
format_table.workspace = true format_table.workspace = true
garage_db.workspace = true garage_db.workspace = true
garage_todo.workspace = true
garage_api.workspace = true garage_api.workspace = true
garage_block.workspace = true garage_block.workspace = true
garage_model.workspace = true garage_model.workspace = true
@ -82,13 +83,14 @@ k2v-client.workspace = true
[features] [features]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ] default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v", "yaque" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ] k2v = [ "garage_util/k2v", "garage_api/k2v" ]
# Database engines # Database engines
lmdb = [ "garage_model/lmdb" ] lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ] sqlite = [ "garage_model/sqlite" ]
yaque = [ "garage_todo/yaque" ]
# Automatic registration and discovery via Consul API # Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ] consul-discovery = [ "garage_rpc/consul-discovery" ]

View file

@ -197,7 +197,7 @@ impl AdminRpcHandler {
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap(); writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather table statistics // Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()]; let mut table = vec![" Table\tItems\tMklItems".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table)?); table.push(self.gather_table_stats(&self.garage.bucket_table)?);
table.push(self.gather_table_stats(&self.garage.key_table)?); table.push(self.gather_table_stats(&self.garage.key_table)?);
table.push(self.gather_table_stats(&self.garage.object_table)?); table.push(self.gather_table_stats(&self.garage.object_table)?);
@ -343,12 +343,10 @@ impl AdminRpcHandler {
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string(); let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!( Ok(format!(
" {}\t{}\t{}\t{}\t{}", " {}\t{}\t{}",
F::TABLE_NAME, F::TABLE_NAME,
data_len, data_len,
mkl_len, mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
)) ))
} }

View file

@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies] [dependencies]
garage_db.workspace = true garage_db.workspace = true
garage_todo.workspace = true
garage_rpc.workspace = true garage_rpc.workspace = true
garage_table.workspace = true garage_table.workspace = true
garage_block.workspace = true garage_block.workspace = true

View file

@ -4,6 +4,7 @@ use std::sync::Arc;
use garage_net::NetworkKey; use garage_net::NetworkKey;
use garage_db as db; use garage_db as db;
use garage_todo as todo;
use garage_util::background::*; use garage_util::background::*;
use garage_util::config::*; use garage_util::config::*;
@ -45,6 +46,8 @@ pub struct Garage {
/// The local database /// The local database
pub db: db::Db, pub db: db::Db,
/// The todo queues
pub todo: todo::Todo,
/// The membership manager /// The membership manager
pub system: Arc<System>, pub system: Arc<System>,
/// The block manager /// The block manager
@ -135,6 +138,18 @@ impl Garage {
let db = db::open_db(&db_path, db_engine, &db_opt) let db = db::open_db(&db_path, db_engine, &db_opt)
.ok_or_message("Unable to open metadata db")?; .ok_or_message("Unable to open metadata db")?;
info!("Initializing queues...");
let todo_engine = todo::Engine::from_str(&config.todo_engine)
.ok_or_message("Invalid `todo_engine` value in configuration file")?;
let mut todo_path = config.metadata_dir.clone();
match todo_engine {
todo::Engine::Yaque => {
todo_path.push("todo.yaque")
}
}
let todo = todo::open_todo(&todo_path, todo_engine)
.ok_or_message("Unable to start todo engine")?;
info!("Initializing RPC..."); info!("Initializing RPC...");
let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message( let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message(
"rpc_secret value is missing, not present in config file or in environment", "rpc_secret value is missing, not present in config file or in environment",
@ -175,7 +190,7 @@ impl Garage {
// ---- admin tables ---- // ---- admin tables ----
info!("Initialize bucket_table..."); info!("Initialize bucket_table...");
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db); let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db, &todo);
info!("Initialize bucket_alias_table..."); info!("Initialize bucket_alias_table...");
let bucket_alias_table = Table::new( let bucket_alias_table = Table::new(
@ -183,9 +198,10 @@ impl Garage {
control_rep_param.clone(), control_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&todo,
); );
info!("Initialize key_table_table..."); info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db); let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db, &todo);
// ---- S3 tables ---- // ---- S3 tables ----
info!("Initialize block_ref_table..."); info!("Initialize block_ref_table...");
@ -196,6 +212,7 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&todo,
); );
info!("Initialize version_table..."); info!("Initialize version_table...");
@ -206,10 +223,11 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&todo,
); );
info!("Initialize multipart upload counter table..."); info!("Initialize multipart upload counter table...");
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo);
info!("Initialize multipart upload table..."); info!("Initialize multipart upload table...");
let mpu_table = Table::new( let mpu_table = Table::new(
@ -220,10 +238,11 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&todo,
); );
info!("Initialize object counter table..."); info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db); let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo);
info!("Initialize object_table..."); info!("Initialize object_table...");
#[allow(clippy::redundant_clone)] #[allow(clippy::redundant_clone)]
@ -236,6 +255,7 @@ impl Garage {
meta_rep_param.clone(), meta_rep_param.clone(),
system.clone(), system.clone(),
&db, &db,
&todo,
); );
info!("Load lifecycle worker state..."); info!("Load lifecycle worker state...");
@ -245,7 +265,7 @@ impl Garage {
// ---- K2V ---- // ---- K2V ----
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param); let k2v = GarageK2V::new(system.clone(), &db, &todo, meta_rep_param);
// ---- setup block refcount recalculation ---- // ---- setup block refcount recalculation ----
// this function can be used to fix inconsistencies in the RC table // this function can be used to fix inconsistencies in the RC table
@ -261,6 +281,7 @@ impl Garage {
bg_vars, bg_vars,
replication_factor, replication_factor,
db, db,
todo,
system, system,
block_manager, block_manager,
bucket_table, bucket_table,
@ -335,9 +356,9 @@ impl Garage {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
impl GarageK2V { impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self { fn new(system: Arc<System>, db: &db::Db, todo: &todo::Todo, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table..."); info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db); let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, todo);
info!("Initialize K2V subscription manager..."); info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new()); let subscriptions = Arc::new(SubscriptionManager::new());
@ -351,6 +372,7 @@ impl GarageK2V {
meta_rep_param, meta_rep_param,
system.clone(), system.clone(),
db, db,
todo,
); );
info!("Initialize K2V RPC handler..."); info!("Initialize K2V RPC handler...");

View file

@ -6,6 +6,7 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use garage_db as db; use garage_db as db;
use garage_todo as todo;
use garage_rpc::layout::LayoutHelper; use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System; use garage_rpc::system::System;
@ -173,6 +174,7 @@ impl<T: CountedItem> IndexCounter<T> {
system: Arc<System>, system: Arc<System>,
replication: TableShardedReplication, replication: TableShardedReplication,
db: &db::Db, db: &db::Db,
todo: &todo::Todo,
) -> Arc<Self> { ) -> Arc<Self> {
Arc::new(Self { Arc::new(Self {
this_node: system.id, this_node: system.id,
@ -186,6 +188,7 @@ impl<T: CountedItem> IndexCounter<T> {
replication, replication,
system, system,
db, db,
todo,
), ),
}) })
} }

View file

@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies] [dependencies]
garage_db.workspace = true garage_db.workspace = true
garage_todo.workspace = true
garage_rpc.workspace = true garage_rpc.workspace = true
garage_util.workspace = true garage_util.workspace = true

View file

@ -6,6 +6,7 @@ use serde_bytes::ByteBuf;
use tokio::sync::Notify; use tokio::sync::Notify;
use garage_db as db; use garage_db as db;
use garage_todo as todo;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::*; use garage_util::error::*;
@ -29,19 +30,19 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub store: db::Tree, pub store: db::Tree,
pub(crate) merkle_tree: db::Tree, pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree, pub(crate) merkle_todo: todo::Queue,
pub(crate) merkle_todo_notify: Notify, pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree, pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Arc<Notify>, pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: db::Tree, pub(crate) gc_todo: todo::Queue,
pub(crate) metrics: TableMetrics, pub(crate) metrics: TableMetrics,
} }
impl<F: TableSchema, R: TableReplication> TableData<F, R> { impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> { pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db, todo: &todo::Todo) -> Arc<Self> {
let store = db let store = db
.open_tree(format!("{}:table", F::TABLE_NAME)) .open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree"); .expect("Unable to open DB tree");
@ -49,24 +50,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let merkle_tree = db let merkle_tree = db
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME)) .open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree"); .expect("Unable to open DB Merkle tree tree");
let merkle_todo = db let merkle_todo = todo
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME)) .open_queue(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree"); .expect("Unable to open Merkle TODO queue");
let insert_queue = db let insert_queue = db
.open_tree(format!("{}:insert_queue", F::TABLE_NAME)) .open_tree(format!("{}:insert_queue", F::TABLE_NAME))
.expect("Unable to open insert queue DB tree"); .expect("Unable to open insert queue DB tree");
let gc_todo = db let gc_todo = todo
.open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME)) .open_queue(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree"); .expect("Unable to open GC TODO queue");
let metrics = TableMetrics::new( let metrics = TableMetrics::new(
F::TABLE_NAME, F::TABLE_NAME,
store.clone(), store.clone(),
merkle_tree.clone(), merkle_tree.clone(),
merkle_todo.clone(),
gc_todo.clone(),
); );
Arc::new(Self { Arc::new(Self {
@ -227,22 +226,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
if changed { if changed {
let new_bytes_hash = blake2sum(&new_bytes); let new_bytes_hash = blake2sum(&new_bytes);
tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
tx.insert(&self.store, &tree_key, new_bytes)?; tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance self.instance
.updated(tx, old_entry.as_ref(), Some(&new_entry))?; .updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash))) Ok(Some((tree_key.clone(), new_entry, new_bytes_hash)))
} else { } else {
Ok(None) Ok(None)
} }
})?; })?;
if let Some((new_entry, new_bytes_hash)) = changed { if let Some((tree_key, new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1); self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone(); let is_tombstone = new_entry.is_tombstone();
self.merkle_todo.submit(&tree_key, new_bytes_hash.as_slice())?;
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
if is_tombstone { if is_tombstone {
// We are only responsible for GC'ing this item if we are the // We are only responsible for GC'ing this item if we are the
@ -272,21 +271,21 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.transaction(|tx| match tx.get(&self.store, k)? { .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => { Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?; let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?; tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(tx, Some(&old_entry), None)?; self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true) Ok(Some(k))
} }
_ => Ok(false), _ => Ok(None),
})?; })?;
if removed { if let Some(k) = removed {
self.metrics.internal_delete_counter.add(1); self.metrics.internal_delete_counter.add(1);
self.merkle_todo.submit(k, &[])?;
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
} Ok(true)
Ok(removed) } else {
Ok(false)
}
} }
pub(crate) fn delete_if_equal_hash( pub(crate) fn delete_if_equal_hash(
@ -300,21 +299,21 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.transaction(|tx| match tx.get(&self.store, k)? { .transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => { Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?; let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?; tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(tx, Some(&old_entry), None)?; self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true) Ok(Some(k))
} }
_ => Ok(false), _ => Ok(None),
})?; })?;
if removed { if let Some(k) = removed {
self.metrics.internal_delete_counter.add(1); self.metrics.internal_delete_counter.add(1);
self.merkle_todo.submit(k, &[])?;
self.merkle_todo_notify.notify_one(); self.merkle_todo_notify.notify_one();
} Ok(true)
Ok(removed) } else {
Ok(false)
}
} }
// ---- Insert queue functions ---- // ---- Insert queue functions ----
@ -366,8 +365,4 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
} }
} }
} }
pub fn gc_todo_len(&self) -> Result<usize, Error> {
Ok(self.gc_todo.len()?)
}
} }

View file

@ -10,7 +10,7 @@ use serde_bytes::ByteBuf;
use futures::future::join_all; use futures::future::join_all;
use tokio::sync::watch; use tokio::sync::watch;
use garage_db as db; use garage_todo as todo;
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
@ -76,8 +76,9 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// These entries are put there when a tombstone is inserted in the table // These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs) // (see update_entry in data.rs)
let mut candidates = vec![]; let mut candidates = vec![];
for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?; while let Some(entry_kv) = self.data.gc_todo.reserve()? {
let (k, vhash) = entry_kv;
let todo_entry = GcTodoEntry::parse(&k, &vhash); let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now { if todo_entry.deletion_time() > now {
@ -123,13 +124,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
} }
} }
// Remove from gc_todo entries for tombstones where we have
// detected that the current value has changed and
// is no longer a tombstone.
for entry in excluded {
entry.remove_if_equal(&self.data.gc_todo)?;
}
// Remaining in `entries` is the list of entries we want to GC, // Remaining in `entries` is the list of entries we want to GC,
// and for which they are still currently tombstones in the table. // and for which they are still currently tombstones in the table.
@ -168,16 +162,22 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let resps = join_all( let resps = join_all(
partitions partitions
.into_iter() .into_iter()
.map(|(nodes, items)| self.try_send_and_delete(nodes, items)), .map(|(nodes, items)| async {
let r = self.try_send_and_delete(nodes, &items).await;
(items, r)
}),
) )
.await; .await;
// Collect errors and return a single error value even if several // Collect errors and return a single error value even if several
// errors occurred. // errors occurred. Push failed items back into the GC queue.
let mut errs = vec![]; let mut errs = vec![];
for resp in resps { for (items, resp) in resps {
if let Err(e) = resp { if let Err(e) = resp {
errs.push(e); errs.push(e);
for item in items {
item.save(&self.data.gc_todo)?;
}
} }
} }
@ -197,7 +197,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
async fn try_send_and_delete( async fn try_send_and_delete(
&self, &self,
nodes: Vec<Uuid>, nodes: Vec<Uuid>,
mut items: Vec<GcTodoEntry>, items: &Vec<GcTodoEntry>,
) -> Result<(), Error> { ) -> Result<(), Error> {
let n_items = items.len(); let n_items = items.len();
@ -216,8 +216,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// and in deletes the list of keys and hashes of value for step 2. // and in deletes the list of keys and hashes of value for step 2.
let mut updates = vec![]; let mut updates = vec![];
let mut deletes = vec![]; let mut deletes = vec![];
for item in items.iter_mut() { for item in items {
updates.push(ByteBuf::from(item.value.take().unwrap())); updates.push(ByteBuf::from(item.value.clone().unwrap()));
deletes.push((ByteBuf::from(item.key.clone()), item.value_hash)); deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
} }
@ -264,8 +264,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
self.data self.data
.delete_if_equal_hash(&item.key[..], item.value_hash) .delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?; .err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo)
.err_context("GC: remove from todo list after successfull GC")?;
} }
Ok(()) Ok(())
@ -313,7 +311,7 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
WorkerStatus { WorkerStatus {
queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64), queue_length: Some(0 as u64),
..Default::default() ..Default::default()
} }
} }
@ -376,24 +374,8 @@ impl GcTodoEntry {
} }
/// Saves the GcTodoEntry in the gc_todo tree /// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> { pub(crate) fn save(&self, gc_todo_tree: &todo::Queue) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?; gc_todo_tree.submit(&self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
/// Removes the GcTodoEntry from the gc_todo tree if the
/// hash of the serialized value is the same here as in the tree.
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.db().transaction(|txn| {
let key = self.todo_table_key();
if txn.get(gc_todo_tree, &key)?.as_deref() == Some(self.value_hash.as_slice()) {
txn.remove(gc_todo_tree, &key)?;
}
Ok(())
})?;
Ok(()) Ok(())
} }

View file

@ -81,7 +81,7 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
} }
fn updater_loop_iter(&self) -> Result<WorkerState, Error> { fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? { if let Some((key, valhash)) = self.data.merkle_todo.reserve()? {
self.update_item(&key, &valhash)?; self.update_item(&key, &valhash)?;
Ok(WorkerState::Busy) Ok(WorkerState::Busy)
} else { } else {
@ -110,21 +110,6 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
.db() .db()
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?; .transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove {
tx.remove(&self.data.merkle_todo, k)?;
}
Ok(remove)
})?;
if !deleted {
debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}",
F::TABLE_NAME,
k
);
}
Ok(()) Ok(())
} }
@ -290,10 +275,6 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub fn merkle_tree_len(&self) -> Result<usize, Error> { pub fn merkle_tree_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_tree.len()?) Ok(self.data.merkle_tree.len()?)
} }
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
}
} }
struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>); struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
@ -306,7 +287,6 @@ impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn status(&self) -> WorkerStatus { fn status(&self) -> WorkerStatus {
WorkerStatus { WorkerStatus {
queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
..Default::default() ..Default::default()
} }
} }

View file

@ -6,8 +6,6 @@ use garage_db as db;
pub struct TableMetrics { pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>, pub(crate) _table_size: ValueObserver<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>, pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>, pub(crate) get_request_counter: BoundCounter<u64>,
pub(crate) get_request_duration: BoundValueRecorder<f64>, pub(crate) get_request_duration: BoundValueRecorder<f64>,
@ -25,8 +23,6 @@ impl TableMetrics {
table_name: &'static str, table_name: &'static str,
store: db::Tree, store: db::Tree,
merkle_tree: db::Tree, merkle_tree: db::Tree,
merkle_todo: db::Tree,
gc_todo: db::Tree,
) -> Self { ) -> Self {
let meter = global::meter(table_name); let meter = global::meter(table_name);
TableMetrics { TableMetrics {
@ -58,34 +54,6 @@ impl TableMetrics {
) )
.with_description("Number of nodes in table's Merkle tree") .with_description("Number of nodes in table's Merkle tree")
.init(), .init(),
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
if let Ok(v) = merkle_todo.len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Merkle tree updater TODO queue length")
.init(),
_gc_todo_len: meter
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
if let Ok(value) = gc_todo.len() {
observer.observe(
value as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Table garbage collector TODO queue length")
.init(),
get_request_counter: meter get_request_counter: meter
.u64_counter("table.get_request_counter") .u64_counter("table.get_request_counter")

View file

@ -13,6 +13,7 @@ use opentelemetry::{
}; };
use garage_db as db; use garage_db as db;
use garage_todo as todo;
use garage_util::background::BackgroundRunner; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
@ -69,12 +70,12 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
impl<F: TableSchema, R: TableReplication> Table<F, R> { impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) =============== // =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> { pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db, todo: &todo::Todo) -> Arc<Self> {
let endpoint = system let endpoint = system
.netapp .netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME)); .endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
let data = TableData::new(system.clone(), instance, replication, db); let data = TableData::new(system.clone(), instance, replication, db, todo);
let merkle_updater = MerkleUpdater::new(data.clone()); let merkle_updater = MerkleUpdater::new(data.clone());

22
src/todo/Cargo.toml Normal file
View file

@ -0,0 +1,22 @@
[package]
name = "garage_todo"
version = "1.0.1"
authors = ["Julien Kritter <julien.kritter@withings.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Abstraction over multiple queue implementations"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
[lib]
path = "lib.rs"
[dependencies]
err-derive.workspace = true
tracing.workspace = true
yaque = { workspace = true, optional = true }
[features]
default = [ "yaque" ]
yaque = [ "dep:yaque" ]

56
src/todo/lib.rs Normal file
View file

@ -0,0 +1,56 @@
#[macro_use]
extern crate tracing;
#[cfg(feature = "yaque")]
pub mod yaque_adapter;
#[cfg(test)]
pub mod test;
pub mod open;
pub use open::*;
use std::sync::Arc;
use std::borrow::Cow;
use err_derive::Error;
#[derive(Clone)]
pub struct Todo(pub(crate) Arc<dyn ITodo>);
#[derive(Clone)]
pub struct Queue(pub(crate) Arc<dyn ITodo>, usize);
#[derive(Debug, Error)]
#[error(display = "{}", _0)]
pub struct Error(pub Cow<'static, str>);
pub type Result<T> = std::result::Result<T, Error>;
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
Error(format!("IO: {}", e).into())
}
}
pub(crate) trait ITodo: Send + Sync {
fn open_queue(&self, name: &str) -> Result<usize>;
fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()>;
fn reserve(&self, queue_id: usize) -> Result<Option<(Vec<u8>, Vec<u8>)>>;
}
impl Todo {
pub fn open_queue<S: AsRef<str>>(&self, name: S) -> Result<Queue> {
let queue_id = self.0.open_queue(name.as_ref())?;
Ok(Queue(self.0.clone(), queue_id))
}
}
impl Queue {
pub fn submit(&self, k: &[u8], v: &[u8]) -> Result<()> {
self.0.submit(self.1, k, v)
}
pub fn reserve(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
self.0.reserve(self.1)
}
}

66
src/todo/open.rs Normal file
View file

@ -0,0 +1,66 @@
use std::path::PathBuf;
use crate::{Todo, Result, Error};
/// List of supported todo engine types
///
/// The `enum` holds list of *all* todo engines that are are be supported by crate, no matter
/// if relevant feature is enabled or not. It allows us to distinguish between invalid engine
/// and valid engine, whose support is not enabled via feature flag.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Engine {
Yaque,
}
impl Engine {
/// Return variant name as static `&str`
pub fn as_str(&self) -> &'static str {
match self {
Self::Yaque => "yaque",
}
}
}
impl std::fmt::Display for Engine {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
self.as_str().fmt(fmt)
}
}
impl std::str::FromStr for Engine {
type Err = Error;
fn from_str(text: &str) -> Result<Engine> {
match text {
#[cfg(feature = "yaque")]
"yaque" => Ok(Self::Yaque),
kind => Err(Error(
format!(
"Invalid todo engine: {} (options are: yaque)",
kind
)
.into(),
)),
}
}
}
pub fn open_todo(path: &PathBuf, engine: Engine) -> Result<Todo> {
match engine {
// ---- Yaque ----
#[cfg(feature = "yaque")]
Engine::Yaque => {
info!("Opening Yaque todo at: {}", path.display());
let engine = crate::yaque_adapter::Yaque::init(path);
Ok(engine)
}
// Pattern is unreachable when all supported todo engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building
// support for one or more engines by default.
#[allow(unreachable_patterns)]
engine => Err(Error(
format!("Todo engine support not available in this build: {}", engine).into(),
)),
}
}

1
src/todo/test.rs Normal file
View file

@ -0,0 +1 @@
use crate::*;

85
src/todo/yaque_adapter.rs Normal file
View file

@ -0,0 +1,85 @@
use crate::*;
use std::path::PathBuf;
use std::convert::TryInto;
use std::sync::RwLock;
use yaque::{channel, Receiver, Sender, TrySendError, TryRecvError};
pub struct Yaque {
base_path: PathBuf,
// the outer lock prevents opening 2 queues at once
// the inner locks allow reserving on multiple queues at once
queues: RwLock<Vec<RwLock<(Sender, Receiver)>>>,
}
impl From<TrySendError<Vec<u8>>> for Error {
fn from(e: TrySendError<Vec<u8>>) -> Error {
Error(format!("Yaque: {}", e).into())
}
}
impl From<TryRecvError> for Error {
fn from(e: TryRecvError) -> Error {
let message = match e {
TryRecvError::Io(ee) => ee.to_string(),
TryRecvError::QueueEmpty => "empty queue".into(),
};
Error(format!("Yaque: {}", message).into())
}
}
impl Yaque {
pub fn new(base_path: &PathBuf) -> Self {
Self {
base_path: base_path.clone(),
queues: RwLock::new(vec!()),
}
}
pub fn init(base_path: &PathBuf) -> Todo {
Todo(Arc::new(Self::new(base_path)))
}
}
impl ITodo for Yaque {
fn open_queue(&self, name: &str) -> Result<usize> {
let mut path = self.base_path.clone();
path.push(name);
let mut queues = self.queues.write().unwrap();
queues.push(RwLock::new(channel(path)?));
Ok(queues.len() - 1)
}
fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()> {
let queues = self.queues.read().unwrap();
let mut queue = queues[queue_id].write().unwrap();
let job = pack_kv(k, v);
queue.0.try_send(job)?;
Ok(())
}
fn reserve(&self, queue_id: usize) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let queues = self.queues.read().unwrap();
let mut queue = queues[queue_id].write().unwrap();
let job = queue.1.try_recv();
match job {
Ok(j) => Ok(Some(unpack_kv(&j))),
Err(TryRecvError::Io(e)) => Err(e.into()),
Err(TryRecvError::QueueEmpty) => Ok(None),
}
}
}
fn pack_kv(k: &[u8], v: &[u8]) -> Vec<u8> {
[&k.len().to_ne_bytes(), k, v].concat().to_vec()
}
fn unpack_kv(job: &[u8]) -> (Vec<u8>, Vec<u8>) {
const LEN_SIZE: usize = std::mem::size_of::<usize>();
let key_length_bytes = (&job[..LEN_SIZE]).try_into().unwrap();
let key_length = usize::from_ne_bytes(key_length_bytes);
let k = job[LEN_SIZE..LEN_SIZE+key_length].to_vec();
let v = job[LEN_SIZE+key_length..].to_vec();
(k, v)
}

View file

@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies] [dependencies]
garage_db.workspace = true garage_db.workspace = true
garage_todo.workspace = true
garage_net.workspace = true garage_net.workspace = true
arc-swap.workspace = true arc-swap.workspace = true

View file

@ -111,6 +111,11 @@ pub struct Config {
#[serde(default = "default_db_engine")] #[serde(default = "default_db_engine")]
pub db_engine: String, pub db_engine: String,
// -- TODO
/// queue engine to use for metadata (options: yaque)
#[serde(default = "default_todo_engine")]
pub todo_engine: String,
/// LMDB map size /// LMDB map size
#[serde(deserialize_with = "deserialize_capacity", default)] #[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize, pub lmdb_map_size: usize,
@ -256,6 +261,10 @@ fn default_db_engine() -> String {
"lmdb".into() "lmdb".into()
} }
fn default_todo_engine() -> String {
"yaque".into()
}
fn default_block_size() -> usize { fn default_block_size() -> usize {
1048576 1048576
} }

View file

@ -30,6 +30,9 @@ pub enum Error {
#[error(display = "DB error: {}", _0)] #[error(display = "DB error: {}", _0)]
Db(#[error(source)] garage_db::Error), Db(#[error(source)] garage_db::Error),
#[error(display = "Todo error: {}", _0)]
Todo(#[error(source)] garage_todo::Error),
#[error(display = "Messagepack encode error: {}", _0)] #[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error), RmpEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)] #[error(display = "Messagepack decode error: {}", _0)]