use aws_sdk_s3 as s3; use eyre::{eyre, OptionExt, WrapErr}; use serde::{Deserialize, Serialize}; use std::collections::HashMap; use std::convert::TryFrom; use std::time::{Duration, SystemTime}; use std::{env, fmt}; const RESTIC_ALARM_BUCKET: &str = "restic-alarm-state"; const RESTIC_ALARM_STATE_FILE: &str = "state.toml"; const RESTIC_ALARM_WATCH_DIR: &str = "watch/"; struct SmtpConfig { address: String, username: String, password: String, } impl SmtpConfig { fn mailer(&self) -> Result { use lettre::transport::smtp::authentication::Credentials; use lettre::SmtpTransport; Ok(SmtpTransport::relay(&self.address)? .credentials(Credentials::new( self.username.to_owned(), self.password.to_owned(), )) .build()) } async fn from_env() -> eyre::Result { let address = env::var("SMTP_ADDRESS")?; let username = env::var("SMTP_USERNAME")?; let password = env::var("SMTP_PASSWORD")?; let smtp = SmtpConfig { address, username, password, }; if !smtp.mailer()?.test_connection()? { return Err(eyre!("Unable to contact the SMTP relay")); } Ok(smtp) } } #[derive(Serialize, Deserialize, Clone, Debug)] struct State { last_alert: HashMap, } impl State { fn last_alert(&self, repo: &str) -> Option { let time = *self.last_alert.get(repo)?; let time = SystemTime::UNIX_EPOCH + Duration::from_secs(time); SystemTime::now().duration_since(time).ok() } fn update_last_alert(&mut self, repo: &str) { let time = SystemTime::now() .duration_since(SystemTime::UNIX_EPOCH) .unwrap(); self.last_alert.insert(repo.to_owned(), time.as_secs()); } } fn default_alert_interval() -> u64 { 1 } fn default_alert_duration() -> u64 { 7 } #[derive(Serialize, Deserialize)] struct RepoConfig { name: Option, email: String, // all durations below are measured in days inactivity: u64, #[serde(default = "default_alert_interval")] alert_interval: u64, #[serde(default = "default_alert_duration")] alert_duration: u64, } struct AlertStatus { alert: bool, inactivity: Duration, last_alert: Option, } impl fmt::Display for AlertStatus { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let days = |d: Duration| d.as_secs() / (3600 * 24); let hours = |d: Duration| (d.as_secs() / 3600) % 24; let minutes = |d: Duration| (d.as_secs() / 60) % 60; write!( f, "{}: inactivity: {}d{}h{}m, last_alert: {}", (if self.alert { "ALERT" } else { "no alert" }), days(self.inactivity), hours(self.inactivity), minutes(self.inactivity), match self.last_alert { None => "none".to_owned(), Some(d) => format!("{}d{}h{}m", days(d), hours(d), minutes(d)), } ) } } fn repo_name(repo: &str, name: &Option) -> String { match name { None => repo.to_owned(), Some(short_name) => format!("{} ({})", short_name, repo), } } fn is_alert_needed( cfg: &RepoConfig, inactive_for: Duration, since_last_alert: Option, ) -> bool { let one_day: u64 = 3600 * 24; inactive_for.as_secs() >= cfg.inactivity * one_day && inactive_for.as_secs() <= cfg.alert_duration * one_day && match since_last_alert { Some(d) => d.as_secs() >= cfg.alert_interval * one_day, None => true, } } async fn send_email( smtp: &SmtpConfig, name: &Option, email: &str, repo: &str, inactive: Duration, ) -> eyre::Result<()> { use lettre::{Message, Transport}; let email = Message::builder() .from(smtp.username.parse().unwrap()) .to(email.parse()?) .subject(format!( "restic-alarm: inactive repository {}", match name { None => repo, Some(name) => name, } )) .body(format!( "Alert: Repository {} has been inactive for {} days.\n", repo_name(repo, name), inactive.as_secs() / (3600 * 24) )) .unwrap(); smtp.mailer()?.send(&email)?; Ok(()) } async fn write_state(client: &s3::Client, state: &State) -> eyre::Result<()> { let data = toml::to_vec(state)?; client .put_object() .bucket(RESTIC_ALARM_BUCKET) .key(RESTIC_ALARM_STATE_FILE) .body(s3::primitives::ByteStream::from(data)) .send() .await?; Ok(()) } use s3::error::SdkError; use s3::operation::get_object::GetObjectError; fn is_no_such_key_error(err: &SdkError) -> bool { match err { SdkError::ServiceError(e) => match e.err() { GetObjectError::NoSuchKey(_) => true, _ => false, }, _ => false, } } async fn read_state(client: &s3::Client) -> eyre::Result { let output = client .get_object() .bucket(RESTIC_ALARM_BUCKET) .key(RESTIC_ALARM_STATE_FILE) .send() .await; match output { Ok(output) => { let state_s = output .body .collect() .await .wrap_err("error reading state file")? .into_bytes(); let state: State = toml::from_slice(&state_s).expect("invalid state file"); Ok(state) } Err(e) if is_no_such_key_error(&e) => { let state = State { last_alert: HashMap::new(), }; write_state(client, &state).await?; Ok(state) } Err(err) => Err(err)?, } } async fn to_watch(client: &s3::Client) -> eyre::Result> { let mut to_watch = Vec::new(); let mut objs_resp = client .list_objects_v2() .bucket(RESTIC_ALARM_BUCKET) .prefix(RESTIC_ALARM_WATCH_DIR) .into_paginator() .send(); while let Some(res) = objs_resp.next().await { let output = res?; for obj in output.contents() { let key = obj.key().ok_or_eyre("object with no key")?; to_watch.push(key.strip_prefix(RESTIC_ALARM_WATCH_DIR).unwrap().to_owned()) } } Ok(to_watch) } async fn read_repo_config(client: &s3::Client, repo: &str) -> eyre::Result { let repo_config_path = RESTIC_ALARM_WATCH_DIR.to_owned() + repo; let output = client .get_object() .bucket(RESTIC_ALARM_BUCKET) .key(&repo_config_path) .send() .await?; let config_s = output .body .collect() .await .wrap_err_with(|| format!("reading repo config file {}", &repo_config_path))? .into_bytes(); let config = toml::from_slice(&config_s) .wrap_err_with(|| format!("parsing repo config file {}", &repo_config_path))?; Ok(config) } async fn repo_last_snapshot(client: &s3::Client, repo: &str) -> eyre::Result> { let mut objs = client .list_objects_v2() .bucket(repo) .prefix("snapshots/") .into_paginator() .send(); let mut modtimes = Vec::new(); while let Some(res) = objs.next().await { let output = res.wrap_err("listing repo snapshots")?; for obj in output.contents() { if let Some(last_mod) = obj.last_modified() { modtimes.push(SystemTime::try_from(last_mod.clone()).unwrap()); } } } modtimes.sort(); match modtimes.last() { None => Ok(None), Some(time) => Ok(SystemTime::now().duration_since(*time).ok()), } } struct RepoInfo { name: Option, alert_status: Option, } // this function can fail for reasons that depend on the user-provided 'repo' config // (e.g. if it fails to parse). // So the error must not be propagated to the toplevel, which would abort the // alert for remaining repositories; it should instead just be reported/logged. async fn check_repo( smtp: &SmtpConfig, client: &s3::Client, state: &mut State, repo: &str, ) -> eyre::Result { let config = read_repo_config(client, repo).await?; let alert_status = if let Some(inactivity) = repo_last_snapshot(client, repo).await? { let last_alert = state.last_alert(repo); let alert = is_alert_needed(&config, inactivity, last_alert); if alert { println!( "Sending alert to {} about repo {}", config.email, repo_name(repo, &config.name) ); send_email(smtp, &config.name, &config.email, repo, inactivity).await?; state.update_last_alert(repo); write_state(client, state).await?; } Some(AlertStatus { alert, inactivity, last_alert, }) } else { None }; Ok(RepoInfo { name: config.name.clone(), alert_status, }) } #[::tokio::main] async fn main() -> eyre::Result<()> { let sdk_config = aws_config::load_from_env().await; let config = aws_sdk_s3::config::Builder::from(&sdk_config) .force_path_style(true) .build(); let client = aws_sdk_s3::Client::from_conf(config); let smtp = SmtpConfig::from_env().await?; let repos = to_watch(&client).await?; println!("Watching {} repos", repos.len()); let mut state = read_state(&client).await?; for repo in repos { match check_repo(&smtp, &client, &mut state, &repo).await { Ok(RepoInfo { name, alert_status: None, }) => println!("{}: no snapshot, skipping", repo_name(&repo, &name)), Ok(RepoInfo { name, alert_status: Some(status), }) => println!("{}: {}", repo_name(&repo, &name), status), Err(err) => // is this the best way to log the error? { println!("{}: ERROR: {:?}", &repo, err) } } } Ok(()) }