Compare commits

..

1 commit

Author SHA1 Message Date
3b3086d513 Implement a queue crate to replace the todo trees
Some checks failed
ci/woodpecker/pr/debug Pipeline failed
2024-11-18 15:36:41 +00:00
27 changed files with 546 additions and 181 deletions

197
Cargo.lock generated
View file

@ -1188,6 +1188,18 @@ dependencies = [
"subtle",
]
[[package]]
name = "filetime"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.4.1",
"windows-sys 0.52.0",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@ -1213,6 +1225,15 @@ dependencies = [
name = "format_table"
version = "0.1.1"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.30"
@ -1325,6 +1346,7 @@ dependencies = [
"garage_net",
"garage_rpc",
"garage_table",
"garage_todo",
"garage_util",
"garage_web",
"git-version",
@ -1471,6 +1493,7 @@ dependencies = [
"garage_net",
"garage_rpc",
"garage_table",
"garage_todo",
"garage_util",
"hex",
"http 1.0.0",
@ -1557,6 +1580,7 @@ dependencies = [
"futures-util",
"garage_db",
"garage_rpc",
"garage_todo",
"garage_util",
"hex",
"hexdump",
@ -1568,6 +1592,15 @@ dependencies = [
"tracing",
]
[[package]]
name = "garage_todo"
version = "1.0.1"
dependencies = [
"err-derive",
"tracing",
"yaque",
]
[[package]]
name = "garage_util"
version = "1.0.1"
@ -1583,6 +1616,7 @@ dependencies = [
"futures",
"garage_db",
"garage_net",
"garage_todo",
"hex",
"hexdump",
"http 1.0.0",
@ -2112,6 +2146,26 @@ dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "inout"
version = "0.1.3"
@ -2268,6 +2322,26 @@ dependencies = [
"serde_json",
]
[[package]]
name = "kqueue"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "kube"
version = "0.88.1"
@ -2519,6 +2593,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.48.0",
]
@ -2583,6 +2658,33 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
dependencies = [
"bitflags 1.3.2",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"windows-sys 0.45.0",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -3987,6 +4089,20 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sysinfo"
version = "0.28.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c2f3ca6693feb29a89724516f016488e9aafc7f37264f898593ee4b942f31b"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"winapi",
]
[[package]]
name = "syslog-tracing"
version = "0.3.0"
@ -4733,6 +4849,15 @@ dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
@ -4751,6 +4876,21 @@ dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
@ -4781,6 +4921,12 @@ dependencies = [
"windows_x86_64_msvc 0.52.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
@ -4793,6 +4939,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
@ -4805,6 +4957,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
@ -4817,6 +4975,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
@ -4829,6 +4993,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
@ -4841,6 +5011,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
@ -4853,6 +5029,12 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
@ -4896,6 +5078,21 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61"
[[package]]
name = "yaque"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487f1a92dacd945cc5bc78a8193cc00b9a2cce3c07746ca51533f513843f40d2"
dependencies = [
"futures",
"lazy_static",
"log",
"notify",
"rand",
"semver",
"sysinfo",
]
[[package]]
name = "zerocopy"
version = "0.7.32"

View file

@ -2,6 +2,7 @@
resolver = "2"
members = [
"src/db",
"src/todo",
"src/util",
"src/net",
"src/rpc",
@ -24,6 +25,7 @@ format_table = { version = "0.1.1", path = "src/format-table" }
garage_api = { version = "1.0.1", path = "src/api" }
garage_block = { version = "1.0.1", path = "src/block" }
garage_db = { version = "1.0.1", path = "src/db", default-features = false }
garage_todo = { version = "1.0.1", path = "src/todo", default-features = false }
garage_model = { version = "1.0.1", path = "src/model", default-features = false }
garage_net = { version = "1.0.1", path = "src/net" }
garage_rpc = { version = "1.0.1", path = "src/rpc" }
@ -86,6 +88,8 @@ rusqlite = "0.31.0"
r2d2 = "0.8"
r2d2_sqlite = "0.24"
yaque = "0.6"
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false }

View file

@ -23,7 +23,7 @@ client = minio.Minio(
"GKyourapikey",
"abcd[...]1234",
# Force the region, this is specific to garage
region="garage",
region="region",
)
```

View file

@ -335,7 +335,6 @@ From the [official Mastodon documentation](https://docs.joinmastodon.org/admin/t
```bash
$ RAILS_ENV=production bin/tootctl media remove --days 3
$ RAILS_ENV=production bin/tootctl media remove --days 15 --prune-profiles
$ RAILS_ENV=production bin/tootctl media remove-orphans
$ RAILS_ENV=production bin/tootctl preview_cards remove --days 15
```
@ -354,6 +353,8 @@ Imports: 1.7 KB
Settings: 0 Bytes
```
Unfortunately, [old avatars and headers cannot currently be cleaned up](https://github.com/mastodon/mastodon/issues/9567).
### Migrating your data
Data migration should be done with an efficient S3 client.

View file

@ -16,7 +16,6 @@ data_dir = "/var/lib/garage/data"
metadata_fsync = true
data_fsync = false
disable_scrub = false
use_local_tz = false
metadata_auto_snapshot_interval = "6h"
db_engine = "lmdb"
@ -100,7 +99,6 @@ Top-level configuration options:
[`data_fsync`](#data_fsync),
[`db_engine`](#db_engine),
[`disable_scrub`](#disable_scrub),
[`use_local_tz`](#use_local_tz),
[`lmdb_map_size`](#lmdb_map_size),
[`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval),
[`metadata_dir`](#metadata_dir),
@ -429,13 +427,6 @@ you should delete it from the data directory and then call `garage repair
blocks` on the node to ensure that it re-obtains a copy from another node on
the network.
#### `use_local_tz` {#use_local_tz}
By default, Garage runs the lifecycle worker every day at midnight in UTC. Set the
`use_local_tz` configuration value to `true` if you want Garage to run the
lifecycle worker at midnight in your local timezone. If you have multiple nodes,
you should also ensure that each node has the same timezone configuration.
#### `block_size` {#block_size}
Garage splits stored objects in consecutive chunks of size `block_size`

View file

@ -76,9 +76,6 @@ spec:
- name: etc
mountPath: /etc/garage.toml
subPath: garage.toml
{{- with .Values.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
# TODO
# livenessProbe:
# httpGet:
@ -113,9 +110,6 @@ spec:
- name: data
emptyDir: {}
{{- end }}
{{- with .Values.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View file

@ -218,10 +218,6 @@ affinity: {}
environment: {}
extraVolumes: {}
extraVolumeMounts: {}
monitoring:
metrics:
# If true, a service for monitoring is created with a prometheus.io/scrape annotation

View file

@ -23,6 +23,7 @@ path = "tests/lib.rs"
[dependencies]
format_table.workspace = true
garage_db.workspace = true
garage_todo.workspace = true
garage_api.workspace = true
garage_block.workspace = true
garage_model.workspace = true
@ -82,13 +83,14 @@ k2v-client.workspace = true
[features]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v", "yaque" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
yaque = [ "garage_todo/yaque" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]

View file

@ -197,7 +197,7 @@ impl AdminRpcHandler {
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
let mut table = vec![" Table\tItems\tMklItems".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table)?);
table.push(self.gather_table_stats(&self.garage.key_table)?);
table.push(self.gather_table_stats(&self.garage.object_table)?);
@ -343,12 +343,10 @@ impl AdminRpcHandler {
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}\t{}\t{}",
" {}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}

View file

@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
garage_todo.workspace = true
garage_rpc.workspace = true
garage_table.workspace = true
garage_block.workspace = true

View file

@ -4,6 +4,7 @@ use std::sync::Arc;
use garage_net::NetworkKey;
use garage_db as db;
use garage_todo as todo;
use garage_util::background::*;
use garage_util::config::*;
@ -45,6 +46,8 @@ pub struct Garage {
/// The local database
pub db: db::Db,
/// The todo queues
pub todo: todo::Todo,
/// The membership manager
pub system: Arc<System>,
/// The block manager
@ -135,6 +138,18 @@ impl Garage {
let db = db::open_db(&db_path, db_engine, &db_opt)
.ok_or_message("Unable to open metadata db")?;
info!("Initializing queues...");
let todo_engine = todo::Engine::from_str(&config.todo_engine)
.ok_or_message("Invalid `todo_engine` value in configuration file")?;
let mut todo_path = config.metadata_dir.clone();
match todo_engine {
todo::Engine::Yaque => {
todo_path.push("todo.yaque")
}
}
let todo = todo::open_todo(&todo_path, todo_engine)
.ok_or_message("Unable to start todo engine")?;
info!("Initializing RPC...");
let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message(
"rpc_secret value is missing, not present in config file or in environment",
@ -175,7 +190,7 @@ impl Garage {
// ---- admin tables ----
info!("Initialize bucket_table...");
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db, &todo);
info!("Initialize bucket_alias_table...");
let bucket_alias_table = Table::new(
@ -183,9 +198,10 @@ impl Garage {
control_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db, &todo);
// ---- S3 tables ----
info!("Initialize block_ref_table...");
@ -196,6 +212,7 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize version_table...");
@ -206,10 +223,11 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize multipart upload counter table...");
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo);
info!("Initialize multipart upload table...");
let mpu_table = Table::new(
@ -220,10 +238,11 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo);
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
@ -236,6 +255,7 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Load lifecycle worker state...");
@ -245,7 +265,7 @@ impl Garage {
// ---- K2V ----
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
let k2v = GarageK2V::new(system.clone(), &db, &todo, meta_rep_param);
// ---- setup block refcount recalculation ----
// this function can be used to fix inconsistencies in the RC table
@ -261,6 +281,7 @@ impl Garage {
bg_vars,
replication_factor,
db,
todo,
system,
block_manager,
bucket_table,
@ -335,9 +356,9 @@ impl Garage {
#[cfg(feature = "k2v")]
impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
fn new(system: Arc<System>, db: &db::Db, todo: &todo::Todo, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, todo);
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
@ -351,6 +372,7 @@ impl GarageK2V {
meta_rep_param,
system.clone(),
db,
todo,
);
info!("Initialize K2V RPC handler...");

View file

@ -6,6 +6,7 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_todo as todo;
use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
@ -173,6 +174,7 @@ impl<T: CountedItem> IndexCounter<T> {
system: Arc<System>,
replication: TableShardedReplication,
db: &db::Db,
todo: &todo::Todo,
) -> Arc<Self> {
Arc::new(Self {
this_node: system.id,
@ -186,6 +188,7 @@ impl<T: CountedItem> IndexCounter<T> {
replication,
system,
db,
todo,
),
})
}

View file

@ -70,7 +70,7 @@ pub fn register_bg_vars(
impl LifecycleWorker {
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
let today = today(garage.config.use_local_tz);
let today = today();
let last_completed = persister.get_with(|x| {
x.last_completed
.as_deref()
@ -205,9 +205,8 @@ impl Worker for LifecycleWorker {
async fn wait_for_work(&mut self) -> WorkerState {
match &self.state {
State::Completed(d) => {
let use_local_tz = self.garage.config.use_local_tz;
let next_day = d.succ_opt().expect("no next day");
let next_start = midnight_ts(next_day, use_local_tz);
let next_start = midnight_ts(next_day);
loop {
let now = now_msec();
if now < next_start {
@ -219,7 +218,7 @@ impl Worker for LifecycleWorker {
break;
}
}
self.state = State::start(std::cmp::max(next_day, today(use_local_tz)));
self.state = State::start(std::cmp::max(next_day, today()));
}
State::Running { .. } => (),
}
@ -386,16 +385,10 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter)
true
}
fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
let midnight = date.and_hms_opt(0, 0, 0).expect("midnight does not exist");
if use_local_tz {
return midnight
.and_local_timezone(Local)
.single()
.expect("bad local midnight")
.timestamp_millis() as u64;
}
midnight.timestamp_millis() as u64
fn midnight_ts(date: NaiveDate) -> u64 {
date.and_hms_opt(0, 0, 0)
.expect("midnight does not exist")
.timestamp_millis() as u64
}
fn next_date(ts: u64) -> NaiveDate {
@ -406,9 +399,6 @@ fn next_date(ts: u64) -> NaiveDate {
.expect("no next day")
}
fn today(use_local_tz: bool) -> NaiveDate {
if use_local_tz {
return Local::now().naive_local().date();
}
fn today() -> NaiveDate {
Utc::now().naive_utc().date()
}

View file

@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
garage_todo.workspace = true
garage_rpc.workspace = true
garage_util.workspace = true

View file

@ -6,6 +6,7 @@ use serde_bytes::ByteBuf;
use tokio::sync::Notify;
use garage_db as db;
use garage_todo as todo;
use garage_util::data::*;
use garage_util::error::*;
@ -29,19 +30,19 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub store: db::Tree,
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo: todo::Queue,
pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: db::Tree,
pub(crate) gc_todo: todo::Queue,
pub(crate) metrics: TableMetrics,
}
impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db, todo: &todo::Todo) -> Arc<Self> {
let store = db
.open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@ -49,24 +50,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let merkle_tree = db
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree");
let merkle_todo = db
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let merkle_todo = todo
.open_queue(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open Merkle TODO queue");
let insert_queue = db
.open_tree(format!("{}:insert_queue", F::TABLE_NAME))
.expect("Unable to open insert queue DB tree");
let gc_todo = db
.open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree");
let gc_todo = todo
.open_queue(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC TODO queue");
let metrics = TableMetrics::new(
F::TABLE_NAME,
store.clone(),
merkle_tree.clone(),
merkle_todo.clone(),
gc_todo.clone(),
);
Arc::new(Self {
@ -227,22 +226,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
if changed {
let new_bytes_hash = blake2sum(&new_bytes);
tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
.updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((new_entry, new_bytes_hash)))
Ok(Some((tree_key.clone(), new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
if let Some((new_entry, new_bytes_hash)) = changed {
if let Some((tree_key, new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
self.merkle_todo.submit(&tree_key, new_bytes_hash.as_slice())?;
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@ -272,21 +271,21 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
Ok(Some(k))
}
_ => Ok(false),
_ => Ok(None),
})?;
if removed {
if let Some(k) = removed {
self.metrics.internal_delete_counter.add(1);
self.merkle_todo.submit(k, &[])?;
self.merkle_todo_notify.notify_one();
}
Ok(removed)
Ok(true)
} else {
Ok(false)
}
}
pub(crate) fn delete_if_equal_hash(
@ -300,21 +299,21 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(tx, Some(&old_entry), None)?;
Ok(true)
Ok(Some(k))
}
_ => Ok(false),
_ => Ok(None),
})?;
if removed {
if let Some(k) = removed {
self.metrics.internal_delete_counter.add(1);
self.merkle_todo.submit(k, &[])?;
self.merkle_todo_notify.notify_one();
}
Ok(removed)
Ok(true)
} else {
Ok(false)
}
}
// ---- Insert queue functions ----
@ -366,8 +365,4 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
}
}
pub fn gc_todo_len(&self) -> Result<usize, Error> {
Ok(self.gc_todo.len()?)
}
}

View file

@ -10,7 +10,7 @@ use serde_bytes::ByteBuf;
use futures::future::join_all;
use tokio::sync::watch;
use garage_db as db;
use garage_todo as todo;
use garage_util::background::*;
use garage_util::data::*;
@ -76,8 +76,9 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
let mut candidates = vec![];
for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
while let Some(entry_kv) = self.data.gc_todo.reserve()? {
let (k, vhash) = entry_kv;
let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
@ -123,13 +124,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
}
}
// Remove from gc_todo entries for tombstones where we have
// detected that the current value has changed and
// is no longer a tombstone.
for entry in excluded {
entry.remove_if_equal(&self.data.gc_todo)?;
}
// Remaining in `entries` is the list of entries we want to GC,
// and for which they are still currently tombstones in the table.
@ -168,16 +162,22 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let resps = join_all(
partitions
.into_iter()
.map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
.map(|(nodes, items)| async {
let r = self.try_send_and_delete(nodes, &items).await;
(items, r)
}),
)
.await;
// Collect errors and return a single error value even if several
// errors occurred.
// errors occurred. Push failed items back into the GC queue.
let mut errs = vec![];
for resp in resps {
for (items, resp) in resps {
if let Err(e) = resp {
errs.push(e);
for item in items {
item.save(&self.data.gc_todo)?;
}
}
}
@ -197,7 +197,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
async fn try_send_and_delete(
&self,
nodes: Vec<Uuid>,
mut items: Vec<GcTodoEntry>,
items: &Vec<GcTodoEntry>,
) -> Result<(), Error> {
let n_items = items.len();
@ -216,8 +216,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// and in deletes the list of keys and hashes of value for step 2.
let mut updates = vec![];
let mut deletes = vec![];
for item in items.iter_mut() {
updates.push(ByteBuf::from(item.value.take().unwrap()));
for item in items {
updates.push(ByteBuf::from(item.value.clone().unwrap()));
deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
}
@ -264,8 +264,6 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
self.data
.delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo)
.err_context("GC: remove from todo list after successfull GC")?;
}
Ok(())
@ -313,7 +311,7 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
queue_length: Some(0 as u64),
..Default::default()
}
}
@ -376,24 +374,8 @@ impl GcTodoEntry {
}
/// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
/// Removes the GcTodoEntry from the gc_todo tree if the
/// hash of the serialized value is the same here as in the tree.
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.db().transaction(|txn| {
let key = self.todo_table_key();
if txn.get(gc_todo_tree, &key)?.as_deref() == Some(self.value_hash.as_slice()) {
txn.remove(gc_todo_tree, &key)?;
}
Ok(())
})?;
pub(crate) fn save(&self, gc_todo_tree: &todo::Queue) -> Result<(), Error> {
gc_todo_tree.submit(&self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}

View file

@ -81,7 +81,7 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
if let Some((key, valhash)) = self.data.merkle_todo.reserve()? {
self.update_item(&key, &valhash)?;
Ok(WorkerState::Busy)
} else {
@ -110,21 +110,6 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
.db()
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove {
tx.remove(&self.data.merkle_todo, k)?;
}
Ok(remove)
})?;
if !deleted {
debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}",
F::TABLE_NAME,
k
);
}
Ok(())
}
@ -290,10 +275,6 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub fn merkle_tree_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_tree.len()?)
}
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
}
}
struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
@ -306,7 +287,6 @@ impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
..Default::default()
}
}

View file

@ -6,8 +6,6 @@ use garage_db as db;
pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>,
pub(crate) get_request_duration: BoundValueRecorder<f64>,
@ -25,8 +23,6 @@ impl TableMetrics {
table_name: &'static str,
store: db::Tree,
merkle_tree: db::Tree,
merkle_todo: db::Tree,
gc_todo: db::Tree,
) -> Self {
let meter = global::meter(table_name);
TableMetrics {
@ -58,34 +54,6 @@ impl TableMetrics {
)
.with_description("Number of nodes in table's Merkle tree")
.init(),
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
if let Ok(v) = merkle_todo.len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Merkle tree updater TODO queue length")
.init(),
_gc_todo_len: meter
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
if let Ok(value) = gc_todo.len() {
observer.observe(
value as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Table garbage collector TODO queue length")
.init(),
get_request_counter: meter
.u64_counter("table.get_request_counter")

View file

@ -13,6 +13,7 @@ use opentelemetry::{
};
use garage_db as db;
use garage_todo as todo;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@ -69,12 +70,12 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db, todo: &todo::Todo) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
let data = TableData::new(system.clone(), instance, replication, db);
let data = TableData::new(system.clone(), instance, replication, db, todo);
let merkle_updater = MerkleUpdater::new(data.clone());

22
src/todo/Cargo.toml Normal file
View file

@ -0,0 +1,22 @@
[package]
name = "garage_todo"
version = "1.0.1"
authors = ["Julien Kritter <julien.kritter@withings.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Abstraction over multiple queue implementations"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
[lib]
path = "lib.rs"
[dependencies]
err-derive.workspace = true
tracing.workspace = true
yaque = { workspace = true, optional = true }
[features]
default = [ "yaque" ]
yaque = [ "dep:yaque" ]

56
src/todo/lib.rs Normal file
View file

@ -0,0 +1,56 @@
#[macro_use]
extern crate tracing;
#[cfg(feature = "yaque")]
pub mod yaque_adapter;
#[cfg(test)]
pub mod test;
pub mod open;
pub use open::*;
use std::sync::Arc;
use std::borrow::Cow;
use err_derive::Error;
#[derive(Clone)]
pub struct Todo(pub(crate) Arc<dyn ITodo>);
#[derive(Clone)]
pub struct Queue(pub(crate) Arc<dyn ITodo>, usize);
#[derive(Debug, Error)]
#[error(display = "{}", _0)]
pub struct Error(pub Cow<'static, str>);
pub type Result<T> = std::result::Result<T, Error>;
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
Error(format!("IO: {}", e).into())
}
}
pub(crate) trait ITodo: Send + Sync {
fn open_queue(&self, name: &str) -> Result<usize>;
fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()>;
fn reserve(&self, queue_id: usize) -> Result<Option<(Vec<u8>, Vec<u8>)>>;
}
impl Todo {
pub fn open_queue<S: AsRef<str>>(&self, name: S) -> Result<Queue> {
let queue_id = self.0.open_queue(name.as_ref())?;
Ok(Queue(self.0.clone(), queue_id))
}
}
impl Queue {
pub fn submit(&self, k: &[u8], v: &[u8]) -> Result<()> {
self.0.submit(self.1, k, v)
}
pub fn reserve(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
self.0.reserve(self.1)
}
}

66
src/todo/open.rs Normal file
View file

@ -0,0 +1,66 @@
use std::path::PathBuf;
use crate::{Todo, Result, Error};
/// List of supported todo engine types
///
/// The `enum` holds list of *all* todo engines that are are be supported by crate, no matter
/// if relevant feature is enabled or not. It allows us to distinguish between invalid engine
/// and valid engine, whose support is not enabled via feature flag.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Engine {
Yaque,
}
impl Engine {
/// Return variant name as static `&str`
pub fn as_str(&self) -> &'static str {
match self {
Self::Yaque => "yaque",
}
}
}
impl std::fmt::Display for Engine {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
self.as_str().fmt(fmt)
}
}
impl std::str::FromStr for Engine {
type Err = Error;
fn from_str(text: &str) -> Result<Engine> {
match text {
#[cfg(feature = "yaque")]
"yaque" => Ok(Self::Yaque),
kind => Err(Error(
format!(
"Invalid todo engine: {} (options are: yaque)",
kind
)
.into(),
)),
}
}
}
pub fn open_todo(path: &PathBuf, engine: Engine) -> Result<Todo> {
match engine {
// ---- Yaque ----
#[cfg(feature = "yaque")]
Engine::Yaque => {
info!("Opening Yaque todo at: {}", path.display());
let engine = crate::yaque_adapter::Yaque::init(path);
Ok(engine)
}
// Pattern is unreachable when all supported todo engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building
// support for one or more engines by default.
#[allow(unreachable_patterns)]
engine => Err(Error(
format!("Todo engine support not available in this build: {}", engine).into(),
)),
}
}

1
src/todo/test.rs Normal file
View file

@ -0,0 +1 @@
use crate::*;

85
src/todo/yaque_adapter.rs Normal file
View file

@ -0,0 +1,85 @@
use crate::*;
use std::path::PathBuf;
use std::convert::TryInto;
use std::sync::RwLock;
use yaque::{channel, Receiver, Sender, TrySendError, TryRecvError};
pub struct Yaque {
base_path: PathBuf,
// the outer lock prevents opening 2 queues at once
// the inner locks allow reserving on multiple queues at once
queues: RwLock<Vec<RwLock<(Sender, Receiver)>>>,
}
impl From<TrySendError<Vec<u8>>> for Error {
fn from(e: TrySendError<Vec<u8>>) -> Error {
Error(format!("Yaque: {}", e).into())
}
}
impl From<TryRecvError> for Error {
fn from(e: TryRecvError) -> Error {
let message = match e {
TryRecvError::Io(ee) => ee.to_string(),
TryRecvError::QueueEmpty => "empty queue".into(),
};
Error(format!("Yaque: {}", message).into())
}
}
impl Yaque {
pub fn new(base_path: &PathBuf) -> Self {
Self {
base_path: base_path.clone(),
queues: RwLock::new(vec!()),
}
}
pub fn init(base_path: &PathBuf) -> Todo {
Todo(Arc::new(Self::new(base_path)))
}
}
impl ITodo for Yaque {
fn open_queue(&self, name: &str) -> Result<usize> {
let mut path = self.base_path.clone();
path.push(name);
let mut queues = self.queues.write().unwrap();
queues.push(RwLock::new(channel(path)?));
Ok(queues.len() - 1)
}
fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()> {
let queues = self.queues.read().unwrap();
let mut queue = queues[queue_id].write().unwrap();
let job = pack_kv(k, v);
queue.0.try_send(job)?;
Ok(())
}
fn reserve(&self, queue_id: usize) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let queues = self.queues.read().unwrap();
let mut queue = queues[queue_id].write().unwrap();
let job = queue.1.try_recv();
match job {
Ok(j) => Ok(Some(unpack_kv(&j))),
Err(TryRecvError::Io(e)) => Err(e.into()),
Err(TryRecvError::QueueEmpty) => Ok(None),
}
}
}
fn pack_kv(k: &[u8], v: &[u8]) -> Vec<u8> {
[&k.len().to_ne_bytes(), k, v].concat().to_vec()
}
fn unpack_kv(job: &[u8]) -> (Vec<u8>, Vec<u8>) {
const LEN_SIZE: usize = std::mem::size_of::<usize>();
let key_length_bytes = (&job[..LEN_SIZE]).try_into().unwrap();
let key_length = usize::from_ne_bytes(key_length_bytes);
let k = job[LEN_SIZE..LEN_SIZE+key_length].to_vec();
let v = job[LEN_SIZE+key_length..].to_vec();
(k, v)
}

View file

@ -15,6 +15,7 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
garage_todo.workspace = true
garage_net.workspace = true
arc-swap.workspace = true

View file

@ -27,10 +27,6 @@ pub struct Config {
#[serde(default)]
pub disable_scrub: bool,
/// Use local timezone
#[serde(default)]
pub use_local_tz: bool,
/// Automatic snapshot interval for metadata
#[serde(default)]
pub metadata_auto_snapshot_interval: Option<String>,
@ -115,6 +111,11 @@ pub struct Config {
#[serde(default = "default_db_engine")]
pub db_engine: String,
// -- TODO
/// queue engine to use for metadata (options: yaque)
#[serde(default = "default_todo_engine")]
pub todo_engine: String,
/// LMDB map size
#[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize,
@ -260,6 +261,10 @@ fn default_db_engine() -> String {
"lmdb".into()
}
fn default_todo_engine() -> String {
"yaque".into()
}
fn default_block_size() -> usize {
1048576
}

View file

@ -30,6 +30,9 @@ pub enum Error {
#[error(display = "DB error: {}", _0)]
Db(#[error(source)] garage_db::Error),
#[error(display = "Todo error: {}", _0)]
Todo(#[error(source)] garage_todo::Error),
#[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]