351 lines
10 KiB
Rust
351 lines
10 KiB
Rust
use aws_sdk_s3 as s3;
|
|
use eyre::{eyre, OptionExt, WrapErr};
|
|
use serde::{Deserialize, Serialize};
|
|
use std::collections::HashMap;
|
|
use std::convert::TryFrom;
|
|
use std::time::{Duration, SystemTime};
|
|
use std::{env, fmt};
|
|
|
|
const RESTIC_ALARM_BUCKET: &str = "restic-alarm-state";
|
|
const RESTIC_ALARM_STATE_FILE: &str = "state.toml";
|
|
const RESTIC_ALARM_WATCH_DIR: &str = "watch/";
|
|
|
|
struct SmtpConfig {
|
|
address: String,
|
|
username: String,
|
|
password: String,
|
|
}
|
|
|
|
impl SmtpConfig {
|
|
fn mailer(&self) -> Result<lettre::SmtpTransport, lettre::transport::smtp::Error> {
|
|
use lettre::transport::smtp::authentication::Credentials;
|
|
use lettre::SmtpTransport;
|
|
Ok(SmtpTransport::relay(&self.address)?
|
|
.credentials(Credentials::new(
|
|
self.username.to_owned(),
|
|
self.password.to_owned(),
|
|
))
|
|
.build())
|
|
}
|
|
|
|
async fn from_env() -> eyre::Result<SmtpConfig> {
|
|
let address = env::var("SMTP_ADDRESS")?;
|
|
let username = env::var("SMTP_USERNAME")?;
|
|
let password = env::var("SMTP_PASSWORD")?;
|
|
let smtp = SmtpConfig {
|
|
address,
|
|
username,
|
|
password,
|
|
};
|
|
if !smtp.mailer()?.test_connection()? {
|
|
return Err(eyre!("Unable to contact the SMTP relay"));
|
|
}
|
|
Ok(smtp)
|
|
}
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize, Clone, Debug)]
|
|
struct State {
|
|
last_alert: HashMap<String, u64>,
|
|
}
|
|
|
|
impl State {
|
|
fn last_alert(&self, repo: &str) -> Option<Duration> {
|
|
let time = *self.last_alert.get(repo)?;
|
|
let time = SystemTime::UNIX_EPOCH + Duration::from_secs(time);
|
|
SystemTime::now().duration_since(time).ok()
|
|
}
|
|
|
|
fn update_last_alert(&mut self, repo: &str) {
|
|
let time = SystemTime::now()
|
|
.duration_since(SystemTime::UNIX_EPOCH)
|
|
.unwrap();
|
|
self.last_alert.insert(repo.to_owned(), time.as_secs());
|
|
}
|
|
}
|
|
|
|
fn default_alert_interval() -> u64 {
|
|
1
|
|
}
|
|
fn default_alert_duration() -> u64 {
|
|
7
|
|
}
|
|
|
|
#[derive(Serialize, Deserialize)]
|
|
struct RepoConfig {
|
|
name: Option<String>,
|
|
email: String,
|
|
// all durations below are measured in days
|
|
inactivity: u64,
|
|
#[serde(default = "default_alert_interval")]
|
|
alert_interval: u64,
|
|
#[serde(default = "default_alert_duration")]
|
|
alert_duration: u64,
|
|
}
|
|
|
|
struct AlertStatus {
|
|
alert: bool,
|
|
inactivity: Duration,
|
|
last_alert: Option<Duration>,
|
|
}
|
|
|
|
impl fmt::Display for AlertStatus {
|
|
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
|
let days = |d: Duration| d.as_secs() / (3600 * 24);
|
|
let hours = |d: Duration| (d.as_secs() / 3600) % 24;
|
|
let minutes = |d: Duration| (d.as_secs() / 60) % 60;
|
|
write!(
|
|
f,
|
|
"{}: inactivity: {}d{}h{}m, last_alert: {}",
|
|
(if self.alert { "ALERT" } else { "no alert" }),
|
|
days(self.inactivity),
|
|
hours(self.inactivity),
|
|
minutes(self.inactivity),
|
|
match self.last_alert {
|
|
None => "none".to_owned(),
|
|
Some(d) => format!("{}d{}h{}m", days(d), hours(d), minutes(d)),
|
|
}
|
|
)
|
|
}
|
|
}
|
|
|
|
fn repo_name(repo: &str, name: &Option<String>) -> String {
|
|
match name {
|
|
None => repo.to_owned(),
|
|
Some(short_name) => format!("{} ({})", short_name, repo),
|
|
}
|
|
}
|
|
|
|
fn is_alert_needed(
|
|
cfg: &RepoConfig,
|
|
inactive_for: Duration,
|
|
since_last_alert: Option<Duration>,
|
|
) -> bool {
|
|
let one_day: u64 = 3600 * 24;
|
|
inactive_for.as_secs() >= cfg.inactivity * one_day
|
|
&& inactive_for.as_secs() <= cfg.alert_duration * one_day
|
|
&& match since_last_alert {
|
|
Some(d) => d.as_secs() >= cfg.alert_interval * one_day,
|
|
None => true,
|
|
}
|
|
}
|
|
|
|
async fn send_email(
|
|
smtp: &SmtpConfig,
|
|
name: &Option<String>,
|
|
email: &str,
|
|
repo: &str,
|
|
inactive: Duration,
|
|
) -> eyre::Result<()> {
|
|
use lettre::{Message, Transport};
|
|
let email = Message::builder()
|
|
.from(smtp.username.parse().unwrap())
|
|
.to(email.parse()?)
|
|
.subject(format!(
|
|
"restic-alarm: inactive repository {}",
|
|
match name {
|
|
None => repo,
|
|
Some(name) => name,
|
|
}
|
|
))
|
|
.body(format!(
|
|
"Alert: Repository {} has been inactive for {} days.\n",
|
|
repo_name(repo, name),
|
|
inactive.as_secs() / (3600 * 24)
|
|
))
|
|
.unwrap();
|
|
smtp.mailer()?.send(&email)?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn write_state(client: &s3::Client, state: &State) -> eyre::Result<()> {
|
|
let data = toml::to_vec(state)?;
|
|
client
|
|
.put_object()
|
|
.bucket(RESTIC_ALARM_BUCKET)
|
|
.key(RESTIC_ALARM_STATE_FILE)
|
|
.body(s3::primitives::ByteStream::from(data))
|
|
.send()
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
use s3::error::SdkError;
|
|
use s3::operation::get_object::GetObjectError;
|
|
|
|
fn is_no_such_key_error<R>(err: &SdkError<GetObjectError, R>) -> bool {
|
|
match err {
|
|
SdkError::ServiceError(e) => match e.err() {
|
|
GetObjectError::NoSuchKey(_) => true,
|
|
_ => false,
|
|
},
|
|
_ => false,
|
|
}
|
|
}
|
|
|
|
async fn read_state(client: &s3::Client) -> eyre::Result<State> {
|
|
let output = client
|
|
.get_object()
|
|
.bucket(RESTIC_ALARM_BUCKET)
|
|
.key(RESTIC_ALARM_STATE_FILE)
|
|
.send()
|
|
.await;
|
|
match output {
|
|
Ok(output) => {
|
|
let state_s = output
|
|
.body
|
|
.collect()
|
|
.await
|
|
.wrap_err("error reading state file")?
|
|
.into_bytes();
|
|
let state: State = toml::from_slice(&state_s).expect("invalid state file");
|
|
Ok(state)
|
|
}
|
|
Err(e) if is_no_such_key_error(&e) => {
|
|
let state = State {
|
|
last_alert: HashMap::new(),
|
|
};
|
|
write_state(client, &state).await?;
|
|
Ok(state)
|
|
}
|
|
Err(err) => Err(err)?,
|
|
}
|
|
}
|
|
|
|
async fn to_watch(client: &s3::Client) -> eyre::Result<Vec<String>> {
|
|
let mut to_watch = Vec::new();
|
|
let mut objs_resp = client
|
|
.list_objects_v2()
|
|
.bucket(RESTIC_ALARM_BUCKET)
|
|
.prefix(RESTIC_ALARM_WATCH_DIR)
|
|
.into_paginator()
|
|
.send();
|
|
while let Some(res) = objs_resp.next().await {
|
|
let output = res?;
|
|
for obj in output.contents() {
|
|
let key = obj.key().ok_or_eyre("object with no key")?;
|
|
to_watch.push(key.strip_prefix(RESTIC_ALARM_WATCH_DIR).unwrap().to_owned())
|
|
}
|
|
}
|
|
Ok(to_watch)
|
|
}
|
|
|
|
async fn read_repo_config(client: &s3::Client, repo: &str) -> eyre::Result<RepoConfig> {
|
|
let repo_config_path = RESTIC_ALARM_WATCH_DIR.to_owned() + repo;
|
|
let output = client
|
|
.get_object()
|
|
.bucket(RESTIC_ALARM_BUCKET)
|
|
.key(&repo_config_path)
|
|
.send()
|
|
.await?;
|
|
let config_s = output
|
|
.body
|
|
.collect()
|
|
.await
|
|
.wrap_err_with(|| format!("reading repo config file {}", &repo_config_path))?
|
|
.into_bytes();
|
|
let config = toml::from_slice(&config_s)
|
|
.wrap_err_with(|| format!("parsing repo config file {}", &repo_config_path))?;
|
|
Ok(config)
|
|
}
|
|
|
|
async fn repo_last_snapshot(client: &s3::Client, repo: &str) -> eyre::Result<Option<Duration>> {
|
|
let mut objs = client
|
|
.list_objects_v2()
|
|
.bucket(repo)
|
|
.prefix("snapshots/")
|
|
.into_paginator()
|
|
.send();
|
|
let mut modtimes = Vec::new();
|
|
while let Some(res) = objs.next().await {
|
|
let output = res.wrap_err("listing repo snapshots")?;
|
|
for obj in output.contents() {
|
|
if let Some(last_mod) = obj.last_modified() {
|
|
modtimes.push(SystemTime::try_from(last_mod.clone()).unwrap());
|
|
}
|
|
}
|
|
}
|
|
modtimes.sort();
|
|
match modtimes.last() {
|
|
None => Ok(None),
|
|
Some(time) => Ok(SystemTime::now().duration_since(*time).ok()),
|
|
}
|
|
}
|
|
|
|
struct RepoInfo {
|
|
name: Option<String>,
|
|
alert_status: Option<AlertStatus>,
|
|
}
|
|
|
|
// this function can fail for reasons that depend on the user-provided 'repo' config
|
|
// (e.g. if it fails to parse).
|
|
// So the error must not be propagated to the toplevel, which would abort the
|
|
// alert for remaining repositories; it should instead just be reported/logged.
|
|
async fn check_repo(
|
|
smtp: &SmtpConfig,
|
|
client: &s3::Client,
|
|
state: &mut State,
|
|
repo: &str,
|
|
) -> eyre::Result<RepoInfo> {
|
|
let config = read_repo_config(client, repo).await?;
|
|
let alert_status = if let Some(inactivity) = repo_last_snapshot(client, repo).await? {
|
|
let last_alert = state.last_alert(repo);
|
|
let alert = is_alert_needed(&config, inactivity, last_alert);
|
|
if alert {
|
|
println!(
|
|
"Sending alert to {} about repo {}",
|
|
config.email,
|
|
repo_name(repo, &config.name)
|
|
);
|
|
send_email(smtp, &config.name, &config.email, repo, inactivity).await?;
|
|
state.update_last_alert(repo);
|
|
write_state(client, state).await?;
|
|
}
|
|
Some(AlertStatus {
|
|
alert,
|
|
inactivity,
|
|
last_alert,
|
|
})
|
|
} else {
|
|
None
|
|
};
|
|
Ok(RepoInfo {
|
|
name: config.name.clone(),
|
|
alert_status,
|
|
})
|
|
}
|
|
|
|
#[::tokio::main]
|
|
async fn main() -> eyre::Result<()> {
|
|
let sdk_config = aws_config::load_from_env().await;
|
|
let config = aws_sdk_s3::config::Builder::from(&sdk_config)
|
|
.force_path_style(true)
|
|
.build();
|
|
let client = aws_sdk_s3::Client::from_conf(config);
|
|
let smtp = SmtpConfig::from_env().await?;
|
|
|
|
let repos = to_watch(&client).await?;
|
|
println!("Watching {} repos", repos.len());
|
|
let mut state = read_state(&client).await?;
|
|
|
|
for repo in repos {
|
|
match check_repo(&smtp, &client, &mut state, &repo).await {
|
|
Ok(RepoInfo {
|
|
name,
|
|
alert_status: None,
|
|
}) => println!("{}: no snapshot, skipping", repo_name(&repo, &name)),
|
|
Ok(RepoInfo {
|
|
name,
|
|
alert_status: Some(status),
|
|
}) => println!("{}: {}", repo_name(&repo, &name), status),
|
|
Err(err) =>
|
|
// is this the best way to log the error?
|
|
{
|
|
println!("{}: ERROR: {:?}", &repo, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
Ok(())
|
|
}
|