Compare commits

..

9 commits

Author SHA1 Message Date
3661a597fa Merge pull request 'feat: add use_local_tz configuration' (#908) from ragazenta/garage:feat/local-timezone into main
Reviewed-on: Deuxfleurs/garage#908
Reviewed-by: maximilien <me@mricher.fr>
2024-12-01 13:23:24 +00:00
0fd3c0e794
doc: add use_local_tz configuration 2024-11-25 10:35:00 +07:00
4c1bf42192
feat: add use_local_tz configuration
Used in lifecycle_worker to determine midnight time
2024-11-23 05:51:12 +07:00
906c8708fd Merge pull request 'add extraVolumes and extraVolumeMounts to helm chart' (#896) from eugene-davis/garage:main into main
Reviewed-on: Deuxfleurs/garage#896
Reviewed-by: maximilien <me@mricher.fr>
2024-11-19 22:23:13 +00:00
747889a096 Merge pull request 'Update Python SDK documentation' (#887) from cryptolukas/garage:fix-python-sdk-docs into main
Reviewed-on: Deuxfleurs/garage#887
2024-11-19 09:15:03 +00:00
feb09a4bc6 Merge pull request 'doc: update mastodon media header pruning section' (#888) from teutat3s/garage:doc-update-mastodon-media into main
Reviewed-on: Deuxfleurs/garage#888
2024-11-19 09:14:34 +00:00
116ad479a8
add extraVolumes and extraVolumeMounts to helm chart 2024-10-26 21:14:08 +02:00
teutat3s
b6a58c5c16
doc: update mastodon media header pruning section
This is now possible since the upstream issue has been resolved.
https://github.com/mastodon/mastodon/issues/9567
2024-10-17 20:59:21 +02:00
2b0bfa9b18 the old value do not work out of the box 2024-10-14 17:20:26 +02:00
27 changed files with 181 additions and 546 deletions

197
Cargo.lock generated
View file

@ -1188,18 +1188,6 @@ dependencies = [
"subtle",
]
[[package]]
name = "filetime"
version = "0.2.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ee447700ac8aa0b2f2bd7bc4462ad686ba06baa6727ac149a2d6277f0d240fd"
dependencies = [
"cfg-if",
"libc",
"redox_syscall 0.4.1",
"windows-sys 0.52.0",
]
[[package]]
name = "fixedbitset"
version = "0.4.2"
@ -1225,15 +1213,6 @@ dependencies = [
name = "format_table"
version = "0.1.1"
[[package]]
name = "fsevent-sys"
version = "4.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "76ee7a02da4d231650c7cea31349b889be2f45ddb3ef3032d2ec8185f6313fd2"
dependencies = [
"libc",
]
[[package]]
name = "futures"
version = "0.3.30"
@ -1346,7 +1325,6 @@ dependencies = [
"garage_net",
"garage_rpc",
"garage_table",
"garage_todo",
"garage_util",
"garage_web",
"git-version",
@ -1493,7 +1471,6 @@ dependencies = [
"garage_net",
"garage_rpc",
"garage_table",
"garage_todo",
"garage_util",
"hex",
"http 1.0.0",
@ -1580,7 +1557,6 @@ dependencies = [
"futures-util",
"garage_db",
"garage_rpc",
"garage_todo",
"garage_util",
"hex",
"hexdump",
@ -1592,15 +1568,6 @@ dependencies = [
"tracing",
]
[[package]]
name = "garage_todo"
version = "1.0.1"
dependencies = [
"err-derive",
"tracing",
"yaque",
]
[[package]]
name = "garage_util"
version = "1.0.1"
@ -1616,7 +1583,6 @@ dependencies = [
"futures",
"garage_db",
"garage_net",
"garage_todo",
"hex",
"hexdump",
"http 1.0.0",
@ -2146,26 +2112,6 @@ dependencies = [
"hashbrown 0.14.3",
]
[[package]]
name = "inotify"
version = "0.9.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8069d3ec154eb856955c1c0fbffefbf5f3c40a104ec912d4797314c1801abff"
dependencies = [
"bitflags 1.3.2",
"inotify-sys",
"libc",
]
[[package]]
name = "inotify-sys"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e05c02b5e89bff3b946cedeca278abc628fe811e604f027c45a8aa3cf793d0eb"
dependencies = [
"libc",
]
[[package]]
name = "inout"
version = "0.1.3"
@ -2322,26 +2268,6 @@ dependencies = [
"serde_json",
]
[[package]]
name = "kqueue"
version = "1.0.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7447f1ca1b7b563588a205fe93dea8df60fd981423a768bc1c0ded35ed147d0c"
dependencies = [
"kqueue-sys",
"libc",
]
[[package]]
name = "kqueue-sys"
version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ed9625ffda8729b85e45cf04090035ac368927b8cebc34898e7c120f52e4838b"
dependencies = [
"bitflags 1.3.2",
"libc",
]
[[package]]
name = "kube"
version = "0.88.1"
@ -2593,7 +2519,6 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09"
dependencies = [
"libc",
"log",
"wasi",
"windows-sys 0.48.0",
]
@ -2658,33 +2583,6 @@ dependencies = [
"minimal-lexical",
]
[[package]]
name = "notify"
version = "5.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "729f63e1ca555a43fe3efa4f3efdf4801c479da85b432242a7b726f353c88486"
dependencies = [
"bitflags 1.3.2",
"crossbeam-channel",
"filetime",
"fsevent-sys",
"inotify",
"kqueue",
"libc",
"mio",
"walkdir",
"windows-sys 0.45.0",
]
[[package]]
name = "ntapi"
version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8a3895c6391c39d7fe7ebc444a87eb2991b2a0bc718fdabd071eec617fc68e4"
dependencies = [
"winapi",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
@ -4089,20 +3987,6 @@ dependencies = [
"unicode-xid",
]
[[package]]
name = "sysinfo"
version = "0.28.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b4c2f3ca6693feb29a89724516f016488e9aafc7f37264f898593ee4b942f31b"
dependencies = [
"cfg-if",
"core-foundation-sys",
"libc",
"ntapi",
"once_cell",
"winapi",
]
[[package]]
name = "syslog-tracing"
version = "0.3.0"
@ -4849,15 +4733,6 @@ dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-sys"
version = "0.45.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75283be5efb2831d37ea142365f009c02ec203cd29a3ebecbc093d52315b66d0"
dependencies = [
"windows-targets 0.42.2",
]
[[package]]
name = "windows-sys"
version = "0.48.0"
@ -4876,21 +4751,6 @@ dependencies = [
"windows-targets 0.52.0",
]
[[package]]
name = "windows-targets"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8e5180c00cd44c9b1c88adb3693291f1cd93605ded80c250a75d472756b4d071"
dependencies = [
"windows_aarch64_gnullvm 0.42.2",
"windows_aarch64_msvc 0.42.2",
"windows_i686_gnu 0.42.2",
"windows_i686_msvc 0.42.2",
"windows_x86_64_gnu 0.42.2",
"windows_x86_64_gnullvm 0.42.2",
"windows_x86_64_msvc 0.42.2",
]
[[package]]
name = "windows-targets"
version = "0.48.5"
@ -4921,12 +4781,6 @@ dependencies = [
"windows_x86_64_msvc 0.52.0",
]
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "597a5118570b68bc08d8d59125332c54f1ba9d9adeedeef5b99b02ba2b0698f8"
[[package]]
name = "windows_aarch64_gnullvm"
version = "0.48.5"
@ -4939,12 +4793,6 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cb7764e35d4db8a7921e09562a0304bf2f93e0a51bfccee0bd0bb0b666b015ea"
[[package]]
name = "windows_aarch64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e08e8864a60f06ef0d0ff4ba04124db8b0fb3be5776a5cd47641e942e58c4d43"
[[package]]
name = "windows_aarch64_msvc"
version = "0.48.5"
@ -4957,12 +4805,6 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bbaa0368d4f1d2aaefc55b6fcfee13f41544ddf36801e793edbbfd7d7df075ef"
[[package]]
name = "windows_i686_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c61d927d8da41da96a81f029489353e68739737d3beca43145c8afec9a31a84f"
[[package]]
name = "windows_i686_gnu"
version = "0.48.5"
@ -4975,12 +4817,6 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a28637cb1fa3560a16915793afb20081aba2c92ee8af57b4d5f28e4b3e7df313"
[[package]]
name = "windows_i686_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "44d840b6ec649f480a41c8d80f9c65108b92d89345dd94027bfe06ac444d1060"
[[package]]
name = "windows_i686_msvc"
version = "0.48.5"
@ -4993,12 +4829,6 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ffe5e8e31046ce6230cc7215707b816e339ff4d4d67c65dffa206fd0f7aa7b9a"
[[package]]
name = "windows_x86_64_gnu"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8de912b8b8feb55c064867cf047dda097f92d51efad5b491dfb98f6bbb70cb36"
[[package]]
name = "windows_x86_64_gnu"
version = "0.48.5"
@ -5011,12 +4841,6 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3d6fa32db2bc4a2f5abeacf2b69f7992cd09dca97498da74a151a3132c26befd"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26d41b46a36d453748aedef1486d5c7a85db22e56aff34643984ea85514e94a3"
[[package]]
name = "windows_x86_64_gnullvm"
version = "0.48.5"
@ -5029,12 +4853,6 @@ version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a657e1e9d3f514745a572a6846d3c7aa7dbe1658c056ed9c3344c4109a6949e"
[[package]]
name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
[[package]]
name = "windows_x86_64_msvc"
version = "0.48.5"
@ -5078,21 +4896,6 @@ version = "0.8.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "53be06678ed9e83edb1745eb72efc0bbcd7b5c3c35711a860906aed827a13d61"
[[package]]
name = "yaque"
version = "0.6.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "487f1a92dacd945cc5bc78a8193cc00b9a2cce3c07746ca51533f513843f40d2"
dependencies = [
"futures",
"lazy_static",
"log",
"notify",
"rand",
"semver",
"sysinfo",
]
[[package]]
name = "zerocopy"
version = "0.7.32"

View file

@ -2,7 +2,6 @@
resolver = "2"
members = [
"src/db",
"src/todo",
"src/util",
"src/net",
"src/rpc",
@ -25,7 +24,6 @@ format_table = { version = "0.1.1", path = "src/format-table" }
garage_api = { version = "1.0.1", path = "src/api" }
garage_block = { version = "1.0.1", path = "src/block" }
garage_db = { version = "1.0.1", path = "src/db", default-features = false }
garage_todo = { version = "1.0.1", path = "src/todo", default-features = false }
garage_model = { version = "1.0.1", path = "src/model", default-features = false }
garage_net = { version = "1.0.1", path = "src/net" }
garage_rpc = { version = "1.0.1", path = "src/rpc" }
@ -88,8 +86,6 @@ rusqlite = "0.31.0"
r2d2 = "0.8"
r2d2_sqlite = "0.24"
yaque = "0.6"
async-compression = { version = "0.4", features = ["tokio", "zstd"] }
zstd = { version = "0.13", default-features = false }

View file

@ -23,7 +23,7 @@ client = minio.Minio(
"GKyourapikey",
"abcd[...]1234",
# Force the region, this is specific to garage
region="region",
region="garage",
)
```

View file

@ -335,6 +335,7 @@ From the [official Mastodon documentation](https://docs.joinmastodon.org/admin/t
```bash
$ RAILS_ENV=production bin/tootctl media remove --days 3
$ RAILS_ENV=production bin/tootctl media remove --days 15 --prune-profiles
$ RAILS_ENV=production bin/tootctl media remove-orphans
$ RAILS_ENV=production bin/tootctl preview_cards remove --days 15
```
@ -353,8 +354,6 @@ Imports: 1.7 KB
Settings: 0 Bytes
```
Unfortunately, [old avatars and headers cannot currently be cleaned up](https://github.com/mastodon/mastodon/issues/9567).
### Migrating your data
Data migration should be done with an efficient S3 client.

View file

@ -16,6 +16,7 @@ data_dir = "/var/lib/garage/data"
metadata_fsync = true
data_fsync = false
disable_scrub = false
use_local_tz = false
metadata_auto_snapshot_interval = "6h"
db_engine = "lmdb"
@ -99,6 +100,7 @@ Top-level configuration options:
[`data_fsync`](#data_fsync),
[`db_engine`](#db_engine),
[`disable_scrub`](#disable_scrub),
[`use_local_tz`](#use_local_tz),
[`lmdb_map_size`](#lmdb_map_size),
[`metadata_auto_snapshot_interval`](#metadata_auto_snapshot_interval),
[`metadata_dir`](#metadata_dir),
@ -427,6 +429,13 @@ you should delete it from the data directory and then call `garage repair
blocks` on the node to ensure that it re-obtains a copy from another node on
the network.
#### `use_local_tz` {#use_local_tz}
By default, Garage runs the lifecycle worker every day at midnight in UTC. Set the
`use_local_tz` configuration value to `true` if you want Garage to run the
lifecycle worker at midnight in your local timezone. If you have multiple nodes,
you should also ensure that each node has the same timezone configuration.
#### `block_size` {#block_size}
Garage splits stored objects in consecutive chunks of size `block_size`

View file

@ -76,6 +76,9 @@ spec:
- name: etc
mountPath: /etc/garage.toml
subPath: garage.toml
{{- with .Values.extraVolumeMounts }}
{{- toYaml . | nindent 12 }}
{{- end }}
# TODO
# livenessProbe:
# httpGet:
@ -110,6 +113,9 @@ spec:
- name: data
emptyDir: {}
{{- end }}
{{- with .Values.extraVolumes }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}

View file

@ -218,6 +218,10 @@ affinity: {}
environment: {}
extraVolumes: {}
extraVolumeMounts: {}
monitoring:
metrics:
# If true, a service for monitoring is created with a prometheus.io/scrape annotation

View file

@ -23,7 +23,6 @@ path = "tests/lib.rs"
[dependencies]
format_table.workspace = true
garage_db.workspace = true
garage_todo.workspace = true
garage_api.workspace = true
garage_block.workspace = true
garage_model.workspace = true
@ -83,14 +82,13 @@ k2v-client.workspace = true
[features]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v", "yaque" ]
default = [ "bundled-libs", "metrics", "lmdb", "sqlite", "k2v" ]
k2v = [ "garage_util/k2v", "garage_api/k2v" ]
# Database engines
lmdb = [ "garage_model/lmdb" ]
sqlite = [ "garage_model/sqlite" ]
yaque = [ "garage_todo/yaque" ]
# Automatic registration and discovery via Consul API
consul-discovery = [ "garage_rpc/consul-discovery" ]

View file

@ -197,7 +197,7 @@ impl AdminRpcHandler {
writeln!(&mut ret, "\nDatabase engine: {}", self.garage.db.engine()).unwrap();
// Gather table statistics
let mut table = vec![" Table\tItems\tMklItems".into()];
let mut table = vec![" Table\tItems\tMklItems\tMklTodo\tGcTodo".into()];
table.push(self.gather_table_stats(&self.garage.bucket_table)?);
table.push(self.gather_table_stats(&self.garage.key_table)?);
table.push(self.gather_table_stats(&self.garage.object_table)?);
@ -343,10 +343,12 @@ impl AdminRpcHandler {
let mkl_len = t.merkle_updater.merkle_tree_len()?.to_string();
Ok(format!(
" {}\t{}\t{}",
" {}\t{}\t{}\t{}\t{}",
F::TABLE_NAME,
data_len,
mkl_len,
t.merkle_updater.todo_len()?,
t.data.gc_todo_len()?
))
}

View file

@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
garage_todo.workspace = true
garage_rpc.workspace = true
garage_table.workspace = true
garage_block.workspace = true

View file

@ -4,7 +4,6 @@ use std::sync::Arc;
use garage_net::NetworkKey;
use garage_db as db;
use garage_todo as todo;
use garage_util::background::*;
use garage_util::config::*;
@ -46,8 +45,6 @@ pub struct Garage {
/// The local database
pub db: db::Db,
/// The todo queues
pub todo: todo::Todo,
/// The membership manager
pub system: Arc<System>,
/// The block manager
@ -138,18 +135,6 @@ impl Garage {
let db = db::open_db(&db_path, db_engine, &db_opt)
.ok_or_message("Unable to open metadata db")?;
info!("Initializing queues...");
let todo_engine = todo::Engine::from_str(&config.todo_engine)
.ok_or_message("Invalid `todo_engine` value in configuration file")?;
let mut todo_path = config.metadata_dir.clone();
match todo_engine {
todo::Engine::Yaque => {
todo_path.push("todo.yaque")
}
}
let todo = todo::open_todo(&todo_path, todo_engine)
.ok_or_message("Unable to start todo engine")?;
info!("Initializing RPC...");
let network_key = hex::decode(config.rpc_secret.as_ref().ok_or_message(
"rpc_secret value is missing, not present in config file or in environment",
@ -190,7 +175,7 @@ impl Garage {
// ---- admin tables ----
info!("Initialize bucket_table...");
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db, &todo);
let bucket_table = Table::new(BucketTable, control_rep_param.clone(), system.clone(), &db);
info!("Initialize bucket_alias_table...");
let bucket_alias_table = Table::new(
@ -198,10 +183,9 @@ impl Garage {
control_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize key_table_table...");
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db, &todo);
let key_table = Table::new(KeyTable, control_rep_param, system.clone(), &db);
// ---- S3 tables ----
info!("Initialize block_ref_table...");
@ -212,7 +196,6 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize version_table...");
@ -223,11 +206,10 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize multipart upload counter table...");
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo);
let mpu_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
info!("Initialize multipart upload table...");
let mpu_table = Table::new(
@ -238,11 +220,10 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Initialize object counter table...");
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db, &todo);
let object_counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), &db);
info!("Initialize object_table...");
#[allow(clippy::redundant_clone)]
@ -255,7 +236,6 @@ impl Garage {
meta_rep_param.clone(),
system.clone(),
&db,
&todo,
);
info!("Load lifecycle worker state...");
@ -265,7 +245,7 @@ impl Garage {
// ---- K2V ----
#[cfg(feature = "k2v")]
let k2v = GarageK2V::new(system.clone(), &db, &todo, meta_rep_param);
let k2v = GarageK2V::new(system.clone(), &db, meta_rep_param);
// ---- setup block refcount recalculation ----
// this function can be used to fix inconsistencies in the RC table
@ -281,7 +261,6 @@ impl Garage {
bg_vars,
replication_factor,
db,
todo,
system,
block_manager,
bucket_table,
@ -356,9 +335,9 @@ impl Garage {
#[cfg(feature = "k2v")]
impl GarageK2V {
fn new(system: Arc<System>, db: &db::Db, todo: &todo::Todo, meta_rep_param: TableShardedReplication) -> Self {
fn new(system: Arc<System>, db: &db::Db, meta_rep_param: TableShardedReplication) -> Self {
info!("Initialize K2V counter table...");
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db, todo);
let counter_table = IndexCounter::new(system.clone(), meta_rep_param.clone(), db);
info!("Initialize K2V subscription manager...");
let subscriptions = Arc::new(SubscriptionManager::new());
@ -372,7 +351,6 @@ impl GarageK2V {
meta_rep_param,
system.clone(),
db,
todo,
);
info!("Initialize K2V RPC handler...");

View file

@ -6,7 +6,6 @@ use std::sync::Arc;
use serde::{Deserialize, Serialize};
use garage_db as db;
use garage_todo as todo;
use garage_rpc::layout::LayoutHelper;
use garage_rpc::system::System;
@ -174,7 +173,6 @@ impl<T: CountedItem> IndexCounter<T> {
system: Arc<System>,
replication: TableShardedReplication,
db: &db::Db,
todo: &todo::Todo,
) -> Arc<Self> {
Arc::new(Self {
this_node: system.id,
@ -188,7 +186,6 @@ impl<T: CountedItem> IndexCounter<T> {
replication,
system,
db,
todo,
),
})
}

View file

@ -70,7 +70,7 @@ pub fn register_bg_vars(
impl LifecycleWorker {
pub fn new(garage: Arc<Garage>, persister: PersisterShared<LifecycleWorkerPersisted>) -> Self {
let today = today();
let today = today(garage.config.use_local_tz);
let last_completed = persister.get_with(|x| {
x.last_completed
.as_deref()
@ -205,8 +205,9 @@ impl Worker for LifecycleWorker {
async fn wait_for_work(&mut self) -> WorkerState {
match &self.state {
State::Completed(d) => {
let use_local_tz = self.garage.config.use_local_tz;
let next_day = d.succ_opt().expect("no next day");
let next_start = midnight_ts(next_day);
let next_start = midnight_ts(next_day, use_local_tz);
loop {
let now = now_msec();
if now < next_start {
@ -218,7 +219,7 @@ impl Worker for LifecycleWorker {
break;
}
}
self.state = State::start(std::cmp::max(next_day, today()));
self.state = State::start(std::cmp::max(next_day, today(use_local_tz)));
}
State::Running { .. } => (),
}
@ -385,10 +386,16 @@ fn check_size_filter(version_data: &ObjectVersionData, filter: &LifecycleFilter)
true
}
fn midnight_ts(date: NaiveDate) -> u64 {
date.and_hms_opt(0, 0, 0)
.expect("midnight does not exist")
.timestamp_millis() as u64
fn midnight_ts(date: NaiveDate, use_local_tz: bool) -> u64 {
let midnight = date.and_hms_opt(0, 0, 0).expect("midnight does not exist");
if use_local_tz {
return midnight
.and_local_timezone(Local)
.single()
.expect("bad local midnight")
.timestamp_millis() as u64;
}
midnight.timestamp_millis() as u64
}
fn next_date(ts: u64) -> NaiveDate {
@ -399,6 +406,9 @@ fn next_date(ts: u64) -> NaiveDate {
.expect("no next day")
}
fn today() -> NaiveDate {
fn today(use_local_tz: bool) -> NaiveDate {
if use_local_tz {
return Local::now().naive_local().date();
}
Utc::now().naive_utc().date()
}

View file

@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
garage_todo.workspace = true
garage_rpc.workspace = true
garage_util.workspace = true

View file

@ -6,7 +6,6 @@ use serde_bytes::ByteBuf;
use tokio::sync::Notify;
use garage_db as db;
use garage_todo as todo;
use garage_util::data::*;
use garage_util::error::*;
@ -30,19 +29,19 @@ pub struct TableData<F: TableSchema, R: TableReplication> {
pub store: db::Tree,
pub(crate) merkle_tree: db::Tree,
pub(crate) merkle_todo: todo::Queue,
pub(crate) merkle_todo: db::Tree,
pub(crate) merkle_todo_notify: Notify,
pub(crate) insert_queue: db::Tree,
pub(crate) insert_queue_notify: Arc<Notify>,
pub(crate) gc_todo: todo::Queue,
pub(crate) gc_todo: db::Tree,
pub(crate) metrics: TableMetrics,
}
impl<F: TableSchema, R: TableReplication> TableData<F, R> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db, todo: &todo::Todo) -> Arc<Self> {
pub fn new(system: Arc<System>, instance: F, replication: R, db: &db::Db) -> Arc<Self> {
let store = db
.open_tree(format!("{}:table", F::TABLE_NAME))
.expect("Unable to open DB tree");
@ -50,22 +49,24 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
let merkle_tree = db
.open_tree(format!("{}:merkle_tree", F::TABLE_NAME))
.expect("Unable to open DB Merkle tree tree");
let merkle_todo = todo
.open_queue(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open Merkle TODO queue");
let merkle_todo = db
.open_tree(format!("{}:merkle_todo", F::TABLE_NAME))
.expect("Unable to open DB Merkle TODO tree");
let insert_queue = db
.open_tree(format!("{}:insert_queue", F::TABLE_NAME))
.expect("Unable to open insert queue DB tree");
let gc_todo = todo
.open_queue(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC TODO queue");
let gc_todo = db
.open_tree(format!("{}:gc_todo_v2", F::TABLE_NAME))
.expect("Unable to open GC DB tree");
let metrics = TableMetrics::new(
F::TABLE_NAME,
store.clone(),
merkle_tree.clone(),
merkle_todo.clone(),
gc_todo.clone(),
);
Arc::new(Self {
@ -226,22 +227,22 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
if changed {
let new_bytes_hash = blake2sum(&new_bytes);
tx.insert(&self.merkle_todo, &tree_key, new_bytes_hash.as_slice())?;
tx.insert(&self.store, &tree_key, new_bytes)?;
self.instance
.updated(tx, old_entry.as_ref(), Some(&new_entry))?;
Ok(Some((tree_key.clone(), new_entry, new_bytes_hash)))
Ok(Some((new_entry, new_bytes_hash)))
} else {
Ok(None)
}
})?;
if let Some((tree_key, new_entry, new_bytes_hash)) = changed {
if let Some((new_entry, new_bytes_hash)) = changed {
self.metrics.internal_update_counter.add(1);
let is_tombstone = new_entry.is_tombstone();
self.merkle_todo.submit(&tree_key, new_bytes_hash.as_slice())?;
self.merkle_todo_notify.notify_one();
if is_tombstone {
// We are only responsible for GC'ing this item if we are the
@ -271,21 +272,21 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if cur_v == v => {
let old_entry = self.decode_entry(v).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(tx, Some(&old_entry), None)?;
Ok(Some(k))
Ok(true)
}
_ => Ok(None),
_ => Ok(false),
})?;
if let Some(k) = removed {
if removed {
self.metrics.internal_delete_counter.add(1);
self.merkle_todo.submit(k, &[])?;
self.merkle_todo_notify.notify_one();
Ok(true)
} else {
Ok(false)
}
}
Ok(removed)
}
pub(crate) fn delete_if_equal_hash(
@ -299,21 +300,21 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
.transaction(|tx| match tx.get(&self.store, k)? {
Some(cur_v) if blake2sum(&cur_v[..]) == vhash => {
let old_entry = self.decode_entry(&cur_v[..]).map_err(db::TxError::Abort)?;
tx.remove(&self.store, k)?;
tx.insert(&self.merkle_todo, k, vec![])?;
self.instance.updated(tx, Some(&old_entry), None)?;
Ok(Some(k))
Ok(true)
}
_ => Ok(None),
_ => Ok(false),
})?;
if let Some(k) = removed {
if removed {
self.metrics.internal_delete_counter.add(1);
self.merkle_todo.submit(k, &[])?;
self.merkle_todo_notify.notify_one();
Ok(true)
} else {
Ok(false)
}
}
Ok(removed)
}
// ---- Insert queue functions ----
@ -365,4 +366,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
}
}
}
pub fn gc_todo_len(&self) -> Result<usize, Error> {
Ok(self.gc_todo.len()?)
}
}

View file

@ -10,7 +10,7 @@ use serde_bytes::ByteBuf;
use futures::future::join_all;
use tokio::sync::watch;
use garage_todo as todo;
use garage_db as db;
use garage_util::background::*;
use garage_util::data::*;
@ -76,9 +76,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// These entries are put there when a tombstone is inserted in the table
// (see update_entry in data.rs)
let mut candidates = vec![];
while let Some(entry_kv) = self.data.gc_todo.reserve()? {
let (k, vhash) = entry_kv;
for entry_kv in self.data.gc_todo.iter()? {
let (k, vhash) = entry_kv?;
let todo_entry = GcTodoEntry::parse(&k, &vhash);
if todo_entry.deletion_time() > now {
@ -124,6 +123,13 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
}
}
// Remove from gc_todo entries for tombstones where we have
// detected that the current value has changed and
// is no longer a tombstone.
for entry in excluded {
entry.remove_if_equal(&self.data.gc_todo)?;
}
// Remaining in `entries` is the list of entries we want to GC,
// and for which they are still currently tombstones in the table.
@ -162,22 +168,16 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
let resps = join_all(
partitions
.into_iter()
.map(|(nodes, items)| async {
let r = self.try_send_and_delete(nodes, &items).await;
(items, r)
}),
.map(|(nodes, items)| self.try_send_and_delete(nodes, items)),
)
.await;
// Collect errors and return a single error value even if several
// errors occurred. Push failed items back into the GC queue.
// errors occurred.
let mut errs = vec![];
for (items, resp) in resps {
for resp in resps {
if let Err(e) = resp {
errs.push(e);
for item in items {
item.save(&self.data.gc_todo)?;
}
}
}
@ -197,7 +197,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
async fn try_send_and_delete(
&self,
nodes: Vec<Uuid>,
items: &Vec<GcTodoEntry>,
mut items: Vec<GcTodoEntry>,
) -> Result<(), Error> {
let n_items = items.len();
@ -216,8 +216,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
// and in deletes the list of keys and hashes of value for step 2.
let mut updates = vec![];
let mut deletes = vec![];
for item in items {
updates.push(ByteBuf::from(item.value.clone().unwrap()));
for item in items.iter_mut() {
updates.push(ByteBuf::from(item.value.take().unwrap()));
deletes.push((ByteBuf::from(item.key.clone()), item.value_hash));
}
@ -264,6 +264,8 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
self.data
.delete_if_equal_hash(&item.key[..], item.value_hash)
.err_context("GC: local delete tombstones")?;
item.remove_if_equal(&self.data.gc_todo)
.err_context("GC: remove from todo list after successfull GC")?;
}
Ok(())
@ -311,7 +313,7 @@ impl<F: TableSchema, R: TableReplication> Worker for GcWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(0 as u64),
queue_length: Some(self.gc.data.gc_todo_len().unwrap_or(0) as u64),
..Default::default()
}
}
@ -374,8 +376,24 @@ impl GcTodoEntry {
}
/// Saves the GcTodoEntry in the gc_todo tree
pub(crate) fn save(&self, gc_todo_tree: &todo::Queue) -> Result<(), Error> {
gc_todo_tree.submit(&self.todo_table_key(), self.value_hash.as_slice())?;
pub(crate) fn save(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.insert(self.todo_table_key(), self.value_hash.as_slice())?;
Ok(())
}
/// Removes the GcTodoEntry from the gc_todo tree if the
/// hash of the serialized value is the same here as in the tree.
/// This is usefull to remove a todo entry only under the condition
/// that it has not changed since the time it was read, i.e.
/// what we have to do is still the same
pub(crate) fn remove_if_equal(&self, gc_todo_tree: &db::Tree) -> Result<(), Error> {
gc_todo_tree.db().transaction(|txn| {
let key = self.todo_table_key();
if txn.get(gc_todo_tree, &key)?.as_deref() == Some(self.value_hash.as_slice()) {
txn.remove(gc_todo_tree, &key)?;
}
Ok(())
})?;
Ok(())
}

View file

@ -81,7 +81,7 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.reserve()? {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
Ok(WorkerState::Busy)
} else {
@ -110,6 +110,21 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
.db()
.transaction(|tx| self.update_item_rec(tx, k, &khash, &key, new_vhash))?;
let deleted = self.data.merkle_todo.db().transaction(|tx| {
let remove = matches!(tx.get(&self.data.merkle_todo, k)?, Some(ov) if ov == vhash_by);
if remove {
tx.remove(&self.data.merkle_todo, k)?;
}
Ok(remove)
})?;
if !deleted {
debug!(
"({}) Item not deleted from Merkle todo because it changed: {:?}",
F::TABLE_NAME,
k
);
}
Ok(())
}
@ -275,6 +290,10 @@ impl<F: TableSchema, R: TableReplication> MerkleUpdater<F, R> {
pub fn merkle_tree_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_tree.len()?)
}
pub fn todo_len(&self) -> Result<usize, Error> {
Ok(self.data.merkle_todo.len()?)
}
}
struct MerkleWorker<F: TableSchema, R: TableReplication>(Arc<MerkleUpdater<F, R>>);
@ -287,6 +306,7 @@ impl<F: TableSchema, R: TableReplication> Worker for MerkleWorker<F, R> {
fn status(&self) -> WorkerStatus {
WorkerStatus {
queue_length: Some(self.0.todo_len().unwrap_or(0) as u64),
..Default::default()
}
}

View file

@ -6,6 +6,8 @@ use garage_db as db;
pub struct TableMetrics {
pub(crate) _table_size: ValueObserver<u64>,
pub(crate) _merkle_tree_size: ValueObserver<u64>,
pub(crate) _merkle_todo_len: ValueObserver<u64>,
pub(crate) _gc_todo_len: ValueObserver<u64>,
pub(crate) get_request_counter: BoundCounter<u64>,
pub(crate) get_request_duration: BoundValueRecorder<f64>,
@ -23,6 +25,8 @@ impl TableMetrics {
table_name: &'static str,
store: db::Tree,
merkle_tree: db::Tree,
merkle_todo: db::Tree,
gc_todo: db::Tree,
) -> Self {
let meter = global::meter(table_name);
TableMetrics {
@ -54,6 +58,34 @@ impl TableMetrics {
)
.with_description("Number of nodes in table's Merkle tree")
.init(),
_merkle_todo_len: meter
.u64_value_observer(
"table.merkle_updater_todo_queue_length",
move |observer| {
if let Ok(v) = merkle_todo.len() {
observer.observe(
v as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Merkle tree updater TODO queue length")
.init(),
_gc_todo_len: meter
.u64_value_observer(
"table.gc_todo_queue_length",
move |observer| {
if let Ok(value) = gc_todo.len() {
observer.observe(
value as u64,
&[KeyValue::new("table_name", table_name)],
);
}
},
)
.with_description("Table garbage collector TODO queue length")
.init(),
get_request_counter: meter
.u64_counter("table.get_request_counter")

View file

@ -13,7 +13,6 @@ use opentelemetry::{
};
use garage_db as db;
use garage_todo as todo;
use garage_util::background::BackgroundRunner;
use garage_util::data::*;
@ -70,12 +69,12 @@ impl<F: TableSchema> Rpc for TableRpc<F> {
impl<F: TableSchema, R: TableReplication> Table<F, R> {
// =============== PUBLIC INTERFACE FUNCTIONS (new, insert, get, etc) ===============
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db, todo: &todo::Todo) -> Arc<Self> {
pub fn new(instance: F, replication: R, system: Arc<System>, db: &db::Db) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/table.rs/Rpc:{}", F::TABLE_NAME));
let data = TableData::new(system.clone(), instance, replication, db, todo);
let data = TableData::new(system.clone(), instance, replication, db);
let merkle_updater = MerkleUpdater::new(data.clone());

View file

@ -1,22 +0,0 @@
[package]
name = "garage_todo"
version = "1.0.1"
authors = ["Julien Kritter <julien.kritter@withings.com>", "Alex Auvolat <alex@adnab.me>"]
edition = "2018"
license = "AGPL-3.0"
description = "Abstraction over multiple queue implementations"
repository = "https://git.deuxfleurs.fr/Deuxfleurs/garage"
readme = "../../README.md"
[lib]
path = "lib.rs"
[dependencies]
err-derive.workspace = true
tracing.workspace = true
yaque = { workspace = true, optional = true }
[features]
default = [ "yaque" ]
yaque = [ "dep:yaque" ]

View file

@ -1,56 +0,0 @@
#[macro_use]
extern crate tracing;
#[cfg(feature = "yaque")]
pub mod yaque_adapter;
#[cfg(test)]
pub mod test;
pub mod open;
pub use open::*;
use std::sync::Arc;
use std::borrow::Cow;
use err_derive::Error;
#[derive(Clone)]
pub struct Todo(pub(crate) Arc<dyn ITodo>);
#[derive(Clone)]
pub struct Queue(pub(crate) Arc<dyn ITodo>, usize);
#[derive(Debug, Error)]
#[error(display = "{}", _0)]
pub struct Error(pub Cow<'static, str>);
pub type Result<T> = std::result::Result<T, Error>;
impl From<std::io::Error> for Error {
fn from(e: std::io::Error) -> Error {
Error(format!("IO: {}", e).into())
}
}
pub(crate) trait ITodo: Send + Sync {
fn open_queue(&self, name: &str) -> Result<usize>;
fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()>;
fn reserve(&self, queue_id: usize) -> Result<Option<(Vec<u8>, Vec<u8>)>>;
}
impl Todo {
pub fn open_queue<S: AsRef<str>>(&self, name: S) -> Result<Queue> {
let queue_id = self.0.open_queue(name.as_ref())?;
Ok(Queue(self.0.clone(), queue_id))
}
}
impl Queue {
pub fn submit(&self, k: &[u8], v: &[u8]) -> Result<()> {
self.0.submit(self.1, k, v)
}
pub fn reserve(&self) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
self.0.reserve(self.1)
}
}

View file

@ -1,66 +0,0 @@
use std::path::PathBuf;
use crate::{Todo, Result, Error};
/// List of supported todo engine types
///
/// The `enum` holds list of *all* todo engines that are are be supported by crate, no matter
/// if relevant feature is enabled or not. It allows us to distinguish between invalid engine
/// and valid engine, whose support is not enabled via feature flag.
#[derive(Clone, Copy, Debug, PartialEq, Eq)]
pub enum Engine {
Yaque,
}
impl Engine {
/// Return variant name as static `&str`
pub fn as_str(&self) -> &'static str {
match self {
Self::Yaque => "yaque",
}
}
}
impl std::fmt::Display for Engine {
fn fmt(&self, fmt: &mut std::fmt::Formatter) -> std::fmt::Result {
self.as_str().fmt(fmt)
}
}
impl std::str::FromStr for Engine {
type Err = Error;
fn from_str(text: &str) -> Result<Engine> {
match text {
#[cfg(feature = "yaque")]
"yaque" => Ok(Self::Yaque),
kind => Err(Error(
format!(
"Invalid todo engine: {} (options are: yaque)",
kind
)
.into(),
)),
}
}
}
pub fn open_todo(path: &PathBuf, engine: Engine) -> Result<Todo> {
match engine {
// ---- Yaque ----
#[cfg(feature = "yaque")]
Engine::Yaque => {
info!("Opening Yaque todo at: {}", path.display());
let engine = crate::yaque_adapter::Yaque::init(path);
Ok(engine)
}
// Pattern is unreachable when all supported todo engines are compiled into binary. The allow
// attribute is added so that we won't have to change this match in case stop building
// support for one or more engines by default.
#[allow(unreachable_patterns)]
engine => Err(Error(
format!("Todo engine support not available in this build: {}", engine).into(),
)),
}
}

View file

@ -1 +0,0 @@
use crate::*;

View file

@ -1,85 +0,0 @@
use crate::*;
use std::path::PathBuf;
use std::convert::TryInto;
use std::sync::RwLock;
use yaque::{channel, Receiver, Sender, TrySendError, TryRecvError};
pub struct Yaque {
base_path: PathBuf,
// the outer lock prevents opening 2 queues at once
// the inner locks allow reserving on multiple queues at once
queues: RwLock<Vec<RwLock<(Sender, Receiver)>>>,
}
impl From<TrySendError<Vec<u8>>> for Error {
fn from(e: TrySendError<Vec<u8>>) -> Error {
Error(format!("Yaque: {}", e).into())
}
}
impl From<TryRecvError> for Error {
fn from(e: TryRecvError) -> Error {
let message = match e {
TryRecvError::Io(ee) => ee.to_string(),
TryRecvError::QueueEmpty => "empty queue".into(),
};
Error(format!("Yaque: {}", message).into())
}
}
impl Yaque {
pub fn new(base_path: &PathBuf) -> Self {
Self {
base_path: base_path.clone(),
queues: RwLock::new(vec!()),
}
}
pub fn init(base_path: &PathBuf) -> Todo {
Todo(Arc::new(Self::new(base_path)))
}
}
impl ITodo for Yaque {
fn open_queue(&self, name: &str) -> Result<usize> {
let mut path = self.base_path.clone();
path.push(name);
let mut queues = self.queues.write().unwrap();
queues.push(RwLock::new(channel(path)?));
Ok(queues.len() - 1)
}
fn submit(&self, queue_id: usize, k: &[u8], v: &[u8]) -> Result<()> {
let queues = self.queues.read().unwrap();
let mut queue = queues[queue_id].write().unwrap();
let job = pack_kv(k, v);
queue.0.try_send(job)?;
Ok(())
}
fn reserve(&self, queue_id: usize) -> Result<Option<(Vec<u8>, Vec<u8>)>> {
let queues = self.queues.read().unwrap();
let mut queue = queues[queue_id].write().unwrap();
let job = queue.1.try_recv();
match job {
Ok(j) => Ok(Some(unpack_kv(&j))),
Err(TryRecvError::Io(e)) => Err(e.into()),
Err(TryRecvError::QueueEmpty) => Ok(None),
}
}
}
fn pack_kv(k: &[u8], v: &[u8]) -> Vec<u8> {
[&k.len().to_ne_bytes(), k, v].concat().to_vec()
}
fn unpack_kv(job: &[u8]) -> (Vec<u8>, Vec<u8>) {
const LEN_SIZE: usize = std::mem::size_of::<usize>();
let key_length_bytes = (&job[..LEN_SIZE]).try_into().unwrap();
let key_length = usize::from_ne_bytes(key_length_bytes);
let k = job[LEN_SIZE..LEN_SIZE+key_length].to_vec();
let v = job[LEN_SIZE+key_length..].to_vec();
(k, v)
}

View file

@ -15,7 +15,6 @@ path = "lib.rs"
[dependencies]
garage_db.workspace = true
garage_todo.workspace = true
garage_net.workspace = true
arc-swap.workspace = true

View file

@ -27,6 +27,10 @@ pub struct Config {
#[serde(default)]
pub disable_scrub: bool,
/// Use local timezone
#[serde(default)]
pub use_local_tz: bool,
/// Automatic snapshot interval for metadata
#[serde(default)]
pub metadata_auto_snapshot_interval: Option<String>,
@ -111,11 +115,6 @@ pub struct Config {
#[serde(default = "default_db_engine")]
pub db_engine: String,
// -- TODO
/// queue engine to use for metadata (options: yaque)
#[serde(default = "default_todo_engine")]
pub todo_engine: String,
/// LMDB map size
#[serde(deserialize_with = "deserialize_capacity", default)]
pub lmdb_map_size: usize,
@ -261,10 +260,6 @@ fn default_db_engine() -> String {
"lmdb".into()
}
fn default_todo_engine() -> String {
"yaque".into()
}
fn default_block_size() -> usize {
1048576
}

View file

@ -30,9 +30,6 @@ pub enum Error {
#[error(display = "DB error: {}", _0)]
Db(#[error(source)] garage_db::Error),
#[error(display = "Todo error: {}", _0)]
Todo(#[error(source)] garage_todo::Error),
#[error(display = "Messagepack encode error: {}", _0)]
RmpEncode(#[error(source)] rmp_serde::encode::Error),
#[error(display = "Messagepack decode error: {}", _0)]