restic-alarm/src/main.rs
2024-04-10 18:22:41 +02:00

279 lines
8.6 KiB
Rust

use aws_config::Region;
use aws_sdk_s3 as s3;
use eyre::{OptionExt, WrapErr};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::convert::TryFrom;
use std::time::{Duration, SystemTime};
use std::fmt;
const RESTIC_ALARM_BUCKET: &str = "restic-alarm-state";
const RESTIC_ALARM_STATE_FILE: &str = "state.toml";
const RESTIC_ALARM_WATCH_DIR: &str = "watch/";
const S3_REGION: &str = "infracoll";
const S3_ENDPOINT: &str = "http://garage.isomorphis.me:3900";
#[derive(Serialize, Deserialize, Clone, Debug)]
struct State {
last_alert: HashMap<String, u64>,
}
impl State {
fn last_alert(&self, repo: &str) -> Option<Duration> {
let time = *self.last_alert.get(repo)?;
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
SystemTime::now().duration_since(time).ok()
}
fn update_last_alert(&mut self, repo: &str) {
let time = SystemTime::now()
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap();
self.last_alert.insert(repo.to_owned(), time.as_secs());
}
}
fn default_alert_interval() -> u64 {
1
}
fn default_alert_duration() -> u64 {
7
}
#[derive(Serialize, Deserialize)]
struct RepoConfig {
email: String,
// all durations below are measured in days
inactivity: u64,
#[serde(default = "default_alert_interval")]
alert_interval: u64,
#[serde(default = "default_alert_duration")]
alert_duration: u64,
}
struct AlertStatus {
alert: bool,
inactivity: Duration,
last_alert: Option<Duration>,
}
impl fmt::Display for AlertStatus {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let days = |d: Duration| {
d.as_secs() / (3600 * 24)
};
let hours = |d: Duration| {
(d.as_secs() / 3600) % 24
};
write!(f, "{}: inactivity: {} days, {} hours, last_alert: {}",
(if self.alert { "ALERT" } else { "no alert" }),
days(self.inactivity), hours(self.inactivity),
match self.last_alert {
None => "none".to_owned(),
Some(d) => format!("{} days, {} hours", days(d), hours(d))
})
}
}
fn is_alert_needed(
cfg: &RepoConfig,
inactive_for: Duration,
since_last_alert: Option<Duration>,
) -> bool {
let one_day: u64 = 3600 * 24;
inactive_for.as_secs() >= cfg.inactivity * one_day
&& inactive_for.as_secs() <= cfg.alert_duration * one_day
&& match since_last_alert {
Some(d) => d.as_secs() >= cfg.alert_interval * one_day,
None => true,
}
}
async fn send_email(email: &str, repo: &str, inactive: Duration) -> eyre::Result<()> {
use lettre::{AsyncSendmailTransport, AsyncTransport, Message};
let email = Message::builder()
.from("infracoll <infracoll>".parse().unwrap())
.to(email.parse()?)
.subject(format!("restic-alarm: inactive repository {}", repo))
.body(format!(
"Alert: Repository {} has been inactive for {} days.\n",
repo,
inactive.as_secs() / (3600 * 24)
))
.unwrap();
let mailer = AsyncSendmailTransport::new();
mailer.send(email).await?;
Ok(())
}
async fn write_state(client: &s3::Client, state: &State) -> eyre::Result<()> {
let data = toml::to_vec(state)?;
client
.put_object()
.bucket(RESTIC_ALARM_BUCKET)
.key(RESTIC_ALARM_STATE_FILE)
.body(s3::primitives::ByteStream::from(data))
.send()
.await?;
Ok(())
}
use s3::error::SdkError;
use s3::operation::get_object::GetObjectError;
fn is_no_such_key_error<R>(err: &SdkError<GetObjectError, R>) -> bool {
match err {
SdkError::ServiceError(e) => match e.err() {
GetObjectError::NoSuchKey(_) => true,
_ => false,
},
_ => false,
}
}
async fn read_state(client: &s3::Client) -> eyre::Result<State> {
let output = client
.get_object()
.bucket(RESTIC_ALARM_BUCKET)
.key(RESTIC_ALARM_STATE_FILE)
.send()
.await;
match output {
Ok(output) => {
let state_s = output
.body
.collect()
.await
.wrap_err("error reading state file")?
.into_bytes();
let state: State = toml::from_slice(&state_s).expect("invalid state file");
Ok(state)
}
Err(e) if is_no_such_key_error(&e) => {
let state = State {
last_alert: HashMap::new(),
};
write_state(client, &state).await?;
Ok(state)
}
Err(err) => Err(err)?,
}
}
async fn to_watch(client: &s3::Client) -> eyre::Result<Vec<String>> {
let mut to_watch = Vec::new();
let mut objs_resp = client
.list_objects_v2()
.bucket(RESTIC_ALARM_BUCKET)
.prefix(RESTIC_ALARM_WATCH_DIR)
.into_paginator()
.send();
while let Some(res) = objs_resp.next().await {
let output = res?;
for obj in output.contents() {
let key = obj.key().ok_or_eyre("object with no key")?;
to_watch.push(key.strip_prefix(RESTIC_ALARM_WATCH_DIR).unwrap().to_owned())
}
}
Ok(to_watch)
}
async fn read_repo_config(client: &s3::Client, repo: &str) -> eyre::Result<RepoConfig> {
let repo_config_path = RESTIC_ALARM_WATCH_DIR.to_owned() + repo;
let output = client
.get_object()
.bucket(RESTIC_ALARM_BUCKET)
.key(&repo_config_path)
.send()
.await?;
let config_s = output
.body
.collect()
.await
.wrap_err_with(|| format!("reading repo config file {}", &repo_config_path))?
.into_bytes();
let config = toml::from_slice(&config_s)
.wrap_err_with(|| format!("parsing repo config file {}", &repo_config_path))?;
Ok(config)
}
async fn repo_last_snapshot(client: &s3::Client, repo: &str) -> eyre::Result<Option<Duration>> {
let mut objs = client
.list_objects_v2()
.bucket(repo)
.prefix("snapshots/")
.into_paginator()
.send();
let mut modtimes = Vec::new();
while let Some(res) = objs.next().await {
let output = res.wrap_err("listing repo snapshots")?;
for obj in output.contents() {
if let Some(last_mod) = obj.last_modified() {
modtimes.push(SystemTime::try_from(last_mod.clone()).unwrap());
}
}
}
modtimes.sort();
match modtimes.last() {
None => Ok(None),
Some(time) => Ok(SystemTime::now().duration_since(*time).ok()),
}
}
// this function can fail for reasons that depend on the user-provided 'repo' config
// (e.g. if it fails to parse).
// So the error must not be propagated to the toplevel, which would abort the
// alert for remaining repositories; it should instead just be reported/logged.
async fn check_repo(client: &s3::Client, state: &mut State, repo: &str) -> eyre::Result<Option<AlertStatus>> {
let config = read_repo_config(client, repo).await?;
if let Some(inactivity) = repo_last_snapshot(client, repo).await? {
let last_alert = state.last_alert(repo);
let alert = is_alert_needed(&config, inactivity, last_alert);
if alert {
println!("Sending alert to {} about bucket {}", config.email, repo);
send_email(&config.email, repo, inactivity).await?;
state.update_last_alert(repo);
write_state(client, state).await?;
}
Ok(Some(AlertStatus { alert, inactivity, last_alert }))
} else {
Ok(None)
}
}
#[::tokio::main]
async fn main() -> eyre::Result<()> {
let sdk_config = aws_config::load_from_env().await;
let config = aws_sdk_s3::config::Builder::from(&sdk_config)
.region(Region::new(S3_REGION))
.endpoint_url(S3_ENDPOINT)
.force_path_style(true)
.build();
let client = aws_sdk_s3::Client::from_conf(config);
let repos = to_watch(&client).await?;
println!("Watching {} repos", repos.len());
for r in repos.iter() {
println!("- {}", r);
}
let mut state = read_state(&client).await?;
println!("Current state: {:?}", &state);
for repo in repos {
match check_repo(&client, &mut state, &repo).await {
Ok(None) => {
println!("{}: no snapshot, skipping", &repo)
},
Ok(Some(status)) => {
println!("{}: {}", &repo, status);
},
Err(err) => {
// is this the best way to log the error?
println!("Error ({}): {:?}", &repo, err)
}
}
}
Ok(())
}