metadata db snapshotting #775
8 changed files with 164 additions and 3 deletions
1
Cargo.lock
generated
1
Cargo.lock
generated
|
@ -1438,6 +1438,7 @@ dependencies = [
|
||||||
"garage_util",
|
"garage_util",
|
||||||
"hex",
|
"hex",
|
||||||
"opentelemetry",
|
"opentelemetry",
|
||||||
|
"parse_duration",
|
||||||
"rand",
|
"rand",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_bytes",
|
"serde_bytes",
|
||||||
|
|
|
@ -34,7 +34,7 @@ args@{
|
||||||
ignoreLockHash,
|
ignoreLockHash,
|
||||||
}:
|
}:
|
||||||
let
|
let
|
||||||
nixifiedLockHash = "8112e20b0e356bed77a9769600c2b2952662ec8af9548eecf8a2d46fe8433189";
|
nixifiedLockHash = "f99156ba9724d370b33258f076f078fefc945f0af79292b1a246bd48bef2a9b2";
|
||||||
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
workspaceSrc = if args.workspaceSrc == null then ./. else args.workspaceSrc;
|
||||||
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
currentLockHash = builtins.hashFile "sha256" (workspaceSrc + /Cargo.lock);
|
||||||
lockHashIgnored = if ignoreLockHash
|
lockHashIgnored = if ignoreLockHash
|
||||||
|
@ -2093,6 +2093,7 @@ in
|
||||||
garage_util = (rustPackages."unknown".garage_util."0.9.3" { inherit profileName; }).out;
|
garage_util = (rustPackages."unknown".garage_util."0.9.3" { inherit profileName; }).out;
|
||||||
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
|
hex = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".hex."0.4.3" { inherit profileName; }).out;
|
||||||
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
opentelemetry = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".opentelemetry."0.17.0" { inherit profileName; }).out;
|
||||||
|
parse_duration = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".parse_duration."2.1.1" { inherit profileName; }).out;
|
||||||
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
rand = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".rand."0.8.5" { inherit profileName; }).out;
|
||||||
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
|
serde = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde."1.0.196" { inherit profileName; }).out;
|
||||||
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.14" { inherit profileName; }).out;
|
serde_bytes = (rustPackages."registry+https://github.com/rust-lang/crates.io-index".serde_bytes."0.11.14" { inherit profileName; }).out;
|
||||||
|
|
|
@ -51,7 +51,7 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
||||||
let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone());
|
let (background, await_background_done) = BackgroundRunner::new(watch_cancel.clone());
|
||||||
|
|
||||||
info!("Spawning Garage workers...");
|
info!("Spawning Garage workers...");
|
||||||
garage.spawn_workers(&background);
|
garage.spawn_workers(&background)?;
|
||||||
|
|
||||||
if config.admin.trace_sink.is_some() {
|
if config.admin.trace_sink.is_some() {
|
||||||
info!("Initialize tracing...");
|
info!("Initialize tracing...");
|
||||||
|
|
|
@ -28,6 +28,7 @@ chrono.workspace = true
|
||||||
err-derive.workspace = true
|
err-derive.workspace = true
|
||||||
hex.workspace = true
|
hex.workspace = true
|
||||||
base64.workspace = true
|
base64.workspace = true
|
||||||
|
parse_duration.workspace = true
|
||||||
tracing.workspace = true
|
tracing.workspace = true
|
||||||
rand.workspace = true
|
rand.workspace = true
|
||||||
zstd.workspace = true
|
zstd.workspace = true
|
||||||
|
|
|
@ -278,7 +278,7 @@ impl Garage {
|
||||||
}))
|
}))
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) {
|
pub fn spawn_workers(self: &Arc<Self>, bg: &BackgroundRunner) -> Result<(), Error> {
|
||||||
self.block_manager.spawn_workers(bg);
|
self.block_manager.spawn_workers(bg);
|
||||||
|
|
||||||
self.bucket_table.spawn_workers(bg);
|
self.bucket_table.spawn_workers(bg);
|
||||||
|
@ -299,6 +299,23 @@ impl Garage {
|
||||||
|
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
self.k2v.spawn_workers(bg);
|
self.k2v.spawn_workers(bg);
|
||||||
|
|
||||||
|
if let Some(itv) = self.config.metadata_auto_snapshot_interval.as_deref() {
|
||||||
|
let interval = parse_duration::parse(itv)
|
||||||
|
.ok_or_message("Invalid `metadata_auto_snapshot_interval`")?;
|
||||||
|
if interval < std::time::Duration::from_secs(600) {
|
||||||
|
return Err(Error::Message(
|
||||||
|
"metadata_auto_snapshot_interval too small or negative".into(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
bg.spawn_worker(crate::snapshot::AutoSnapshotWorker::new(
|
||||||
|
self.clone(),
|
||||||
|
interval,
|
||||||
|
));
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
|
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
|
||||||
|
|
|
@ -19,3 +19,4 @@ pub mod s3;
|
||||||
pub mod garage;
|
pub mod garage;
|
||||||
pub mod helper;
|
pub mod helper;
|
||||||
pub mod migrate;
|
pub mod migrate;
|
||||||
|
pub mod snapshot;
|
||||||
|
|
136
src/model/snapshot.rs
Normal file
136
src/model/snapshot.rs
Normal file
|
@ -0,0 +1,136 @@
|
||||||
|
use std::fs;
|
||||||
|
use std::path::PathBuf;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use rand::prelude::*;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
|
use garage_util::background::*;
|
||||||
|
use garage_util::error::*;
|
||||||
|
|
||||||
|
use crate::garage::Garage;
|
||||||
|
|
||||||
|
// The two most recent snapshots are kept
|
||||||
|
const KEEP_SNAPSHOTS: usize = 2;
|
||||||
|
|
||||||
|
static SNAPSHOT_MUTEX: Mutex<()> = Mutex::new(());
|
||||||
|
|
||||||
|
// ================ snapshotting logic =====================
|
||||||
|
|
||||||
|
/// Run snashot_metadata in a blocking thread and async await on it
|
||||||
|
pub async fn async_snapshot_metadata(garage: &Arc<Garage>) -> Result<(), Error> {
|
||||||
|
let garage = garage.clone();
|
||||||
|
let worker = tokio::task::spawn_blocking(move || snapshot_metadata(&garage));
|
||||||
|
worker.await.unwrap()?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Take a snapshot of the metadata database, and erase older
|
||||||
|
/// snapshots if necessary.
|
||||||
|
/// This is not an async function, it should be spawned on a thread pool
|
||||||
|
pub fn snapshot_metadata(garage: &Garage) -> Result<(), Error> {
|
||||||
|
let lock = match SNAPSHOT_MUTEX.try_lock() {
|
||||||
|
Ok(lock) => lock,
|
||||||
|
Err(_) => {
|
||||||
|
return Err(Error::Message(
|
||||||
|
"Cannot acquire lock, another snapshot might be in progress".into(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let mut snapshots_dir = garage.config.metadata_dir.clone();
|
||||||
|
snapshots_dir.push("snapshots");
|
||||||
|
fs::create_dir_all(&snapshots_dir)?;
|
||||||
|
|
||||||
|
let mut new_path = snapshots_dir.clone();
|
||||||
|
new_path.push(chrono::Utc::now().to_rfc3339_opts(chrono::SecondsFormat::Secs, true));
|
||||||
|
|
||||||
|
info!("Snapshotting metadata db to {}", new_path.display());
|
||||||
|
garage.db.snapshot(&new_path)?;
|
||||||
|
info!("Metadata db snapshot finished");
|
||||||
|
|
||||||
|
if let Err(e) = cleanup_snapshots(&snapshots_dir) {
|
||||||
|
error!("Failed to do cleanup in snapshots directory: {}", e);
|
||||||
|
}
|
||||||
|
|
||||||
|
drop(lock);
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn cleanup_snapshots(snapshots_dir: &PathBuf) -> Result<(), Error> {
|
||||||
|
let mut snapshots =
|
||||||
|
fs::read_dir(&snapshots_dir)?.collect::<Result<Vec<fs::DirEntry>, std::io::Error>>()?;
|
||||||
|
|
||||||
|
snapshots.retain(|x| x.file_name().len() > 8);
|
||||||
|
snapshots.sort_by_key(|x| x.file_name());
|
||||||
|
|
||||||
|
for to_delete in snapshots.iter().rev().skip(KEEP_SNAPSHOTS) {
|
||||||
|
let path = snapshots_dir.join(to_delete.path());
|
||||||
|
if to_delete.metadata()?.file_type().is_dir() {
|
||||||
|
for file in fs::read_dir(&path)? {
|
||||||
|
let file = file?;
|
||||||
|
if file.metadata()?.is_file() {
|
||||||
|
fs::remove_file(path.join(file.path()))?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
std::fs::remove_dir(&path)?;
|
||||||
|
} else {
|
||||||
|
std::fs::remove_file(&path)?;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
// ================ auto snapshot worker =====================
|
||||||
|
|
||||||
|
pub struct AutoSnapshotWorker {
|
||||||
|
garage: Arc<Garage>,
|
||||||
|
next_snapshot: Instant,
|
||||||
|
snapshot_interval: Duration,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AutoSnapshotWorker {
|
||||||
|
pub(crate) fn new(garage: Arc<Garage>, snapshot_interval: Duration) -> Self {
|
||||||
|
Self {
|
||||||
|
garage,
|
||||||
|
snapshot_interval,
|
||||||
|
next_snapshot: Instant::now() + (snapshot_interval / 2),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Worker for AutoSnapshotWorker {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"Metadata snapshot worker".into()
|
||||||
|
}
|
||||||
|
fn status(&self) -> WorkerStatus {
|
||||||
|
WorkerStatus {
|
||||||
|
freeform: vec![format!(
|
||||||
|
"Next snapshot: {}",
|
||||||
|
(chrono::Utc::now() + (self.next_snapshot - Instant::now())).to_rfc3339()
|
||||||
|
)],
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn work(&mut self, _must_exit: &mut watch::Receiver<bool>) -> Result<WorkerState, Error> {
|
||||||
|
if Instant::now() < self.next_snapshot {
|
||||||
|
return Ok(WorkerState::Idle);
|
||||||
|
}
|
||||||
|
|
||||||
|
async_snapshot_metadata(&self.garage).await?;
|
||||||
|
|
||||||
|
let rand_factor = 1f32 + thread_rng().gen::<f32>() / 5f32;
|
||||||
|
self.next_snapshot = Instant::now() + self.snapshot_interval.mul_f32(rand_factor);
|
||||||
|
|
||||||
|
Ok(WorkerState::Idle)
|
||||||
|
}
|
||||||
|
async fn wait_for_work(&mut self) -> WorkerState {
|
||||||
|
tokio::time::sleep_until(self.next_snapshot.into()).await;
|
||||||
|
WorkerState::Busy
|
||||||
|
}
|
||||||
|
}
|
|
@ -27,6 +27,10 @@ pub struct Config {
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub disable_scrub: bool,
|
pub disable_scrub: bool,
|
||||||
|
|
||||||
|
/// Automatic snapshot interval for metadata
|
||||||
|
#[serde(default)]
|
||||||
|
pub metadata_auto_snapshot_interval: Option<String>,
|
||||||
|
|
||||||
/// Size of data blocks to save to disk
|
/// Size of data blocks to save to disk
|
||||||
#[serde(
|
#[serde(
|
||||||
deserialize_with = "deserialize_capacity",
|
deserialize_with = "deserialize_capacity",
|
||||||
|
|
Loading…
Reference in a new issue