tweaks & fmt
This commit is contained in:
parent
f65518dfb5
commit
08df74f005
1 changed files with 83 additions and 66 deletions
149
src/main.rs
149
src/main.rs
|
@ -1,10 +1,10 @@
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::time::{SystemTime,Duration};
|
|
||||||
use std::convert::TryFrom;
|
|
||||||
use serde::{Deserialize, Serialize};
|
|
||||||
use aws_config::Region;
|
use aws_config::Region;
|
||||||
use aws_sdk_s3 as s3;
|
use aws_sdk_s3 as s3;
|
||||||
use eyre::{WrapErr,OptionExt};
|
use eyre::{OptionExt, WrapErr};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::convert::TryFrom;
|
||||||
|
use std::time::{Duration, SystemTime};
|
||||||
|
|
||||||
const RESTIC_ALARM_BUCKET: &str = "restic-alarm-state";
|
const RESTIC_ALARM_BUCKET: &str = "restic-alarm-state";
|
||||||
const RESTIC_ALARM_STATE_FILE: &str = "state.toml";
|
const RESTIC_ALARM_STATE_FILE: &str = "state.toml";
|
||||||
|
@ -15,7 +15,7 @@ const S3_ENDPOINT: &str = "http://garage.isomorphis.me:3900";
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize, Clone)]
|
#[derive(Serialize, Deserialize, Clone)]
|
||||||
struct State {
|
struct State {
|
||||||
last_alert: HashMap<String, u64>
|
last_alert: HashMap<String, u64>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl State {
|
impl State {
|
||||||
|
@ -26,13 +26,19 @@ impl State {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn update_last_alert(&mut self, repo: &str) {
|
fn update_last_alert(&mut self, repo: &str) {
|
||||||
let time = SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap();
|
let time = SystemTime::now()
|
||||||
|
.duration_since(SystemTime::UNIX_EPOCH)
|
||||||
|
.unwrap();
|
||||||
self.last_alert.insert(repo.to_owned(), time.as_secs());
|
self.last_alert.insert(repo.to_owned(), time.as_secs());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fn default_alert_interval() -> u64 { 1 }
|
fn default_alert_interval() -> u64 {
|
||||||
fn default_alert_duration() -> u64 { 7 }
|
1
|
||||||
|
}
|
||||||
|
fn default_alert_duration() -> u64 {
|
||||||
|
7
|
||||||
|
}
|
||||||
|
|
||||||
#[derive(Serialize, Deserialize)]
|
#[derive(Serialize, Deserialize)]
|
||||||
struct RepoConfig {
|
struct RepoConfig {
|
||||||
|
@ -45,74 +51,87 @@ struct RepoConfig {
|
||||||
alert_duration: u64,
|
alert_duration: u64,
|
||||||
}
|
}
|
||||||
|
|
||||||
fn is_alert_needed(cfg: &RepoConfig, inactive_for: Duration, since_last_alert: Option<Duration>) -> bool {
|
fn is_alert_needed(
|
||||||
|
cfg: &RepoConfig,
|
||||||
|
inactive_for: Duration,
|
||||||
|
since_last_alert: Option<Duration>,
|
||||||
|
) -> bool {
|
||||||
let one_day: u64 = 3600 * 24;
|
let one_day: u64 = 3600 * 24;
|
||||||
inactive_for.as_secs() >= cfg.inactivity * one_day
|
inactive_for.as_secs() >= cfg.inactivity * one_day
|
||||||
&& inactive_for.as_secs() <= cfg.alert_duration * one_day
|
&& inactive_for.as_secs() <= cfg.alert_duration * one_day
|
||||||
&& match since_last_alert {
|
&& match since_last_alert {
|
||||||
Some(d) => d.as_secs() >= cfg.alert_interval * one_day,
|
Some(d) => d.as_secs() >= cfg.alert_interval * one_day,
|
||||||
None => true,
|
None => true,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_email(email: &str, repo: &str, inactive: Duration) -> eyre::Result<()> {
|
async fn send_email(email: &str, repo: &str, inactive: Duration) -> eyre::Result<()> {
|
||||||
use lettre::{Message, AsyncSendmailTransport, AsyncTransport};
|
use lettre::{AsyncSendmailTransport, AsyncTransport, Message};
|
||||||
let email = Message::builder()
|
let email = Message::builder()
|
||||||
.from("infracoll <infracoll>".parse().unwrap())
|
.from("infracoll <infracoll>".parse().unwrap())
|
||||||
.to(email.parse()?)
|
.to(email.parse()?)
|
||||||
.subject(format!("restic-alarm: inactive repository {}", repo))
|
.subject(format!("restic-alarm: inactive repository {}", repo))
|
||||||
.body(format!("Alert: Repository {} has been inactive for {} days.\n",
|
.body(format!(
|
||||||
repo, inactive.as_secs() / (3600*24)))
|
"Alert: Repository {} has been inactive for {} days.\n",
|
||||||
|
repo,
|
||||||
|
inactive.as_secs() / (3600 * 24)
|
||||||
|
))
|
||||||
.unwrap();
|
.unwrap();
|
||||||
let mailer = AsyncSendmailTransport::new();
|
let mailer = AsyncSendmailTransport::new();
|
||||||
mailer.send(email).await?;
|
mailer.send(email).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn write_state(client: &s3::Client, state: &State) -> eyre::Result<()> {
|
||||||
|
let data = toml::to_vec(state)?;
|
||||||
|
client
|
||||||
|
.put_object()
|
||||||
|
.bucket(RESTIC_ALARM_BUCKET)
|
||||||
|
.key(RESTIC_ALARM_STATE_FILE)
|
||||||
|
.body(s3::primitives::ByteStream::from(data))
|
||||||
|
.send()
|
||||||
|
.await?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
use s3::error::SdkError;
|
use s3::error::SdkError;
|
||||||
use s3::operation::get_object::GetObjectError;
|
use s3::operation::get_object::GetObjectError;
|
||||||
|
|
||||||
fn is_no_such_key_error<R>(err: &SdkError<GetObjectError,R>) -> bool {
|
fn is_no_such_key_error<R>(err: &SdkError<GetObjectError, R>) -> bool {
|
||||||
match err {
|
match err {
|
||||||
SdkError::ServiceError(e) => {
|
SdkError::ServiceError(e) => match e.err() {
|
||||||
match e.err() {
|
GetObjectError::NoSuchKey(_) => true,
|
||||||
GetObjectError::NoSuchKey(_) => true,
|
_ => false,
|
||||||
_ => false,
|
|
||||||
}
|
|
||||||
},
|
},
|
||||||
_ => false,
|
_ => false,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn write_state(client: &s3::Client, state: &State) -> eyre::Result<()> {
|
|
||||||
let data = toml::to_vec(state)?;
|
|
||||||
client.put_object()
|
|
||||||
.bucket(RESTIC_ALARM_BUCKET)
|
|
||||||
.key(RESTIC_ALARM_STATE_FILE)
|
|
||||||
.body(s3::primitives::ByteStream::from(data))
|
|
||||||
.send().await?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn read_state(client: &s3::Client) -> eyre::Result<State> {
|
async fn read_state(client: &s3::Client) -> eyre::Result<State> {
|
||||||
let output = client
|
let output = client
|
||||||
.get_object()
|
.get_object()
|
||||||
.bucket(RESTIC_ALARM_BUCKET)
|
.bucket(RESTIC_ALARM_BUCKET)
|
||||||
.key(RESTIC_ALARM_STATE_FILE)
|
.key(RESTIC_ALARM_STATE_FILE)
|
||||||
.send().await;
|
.send()
|
||||||
|
.await;
|
||||||
match output {
|
match output {
|
||||||
Ok(output) => {
|
Ok(output) => {
|
||||||
let state_s = output.body.collect().await
|
let state_s = output
|
||||||
.wrap_err("error reading state file")?
|
.body
|
||||||
.into_bytes();
|
.collect()
|
||||||
|
.await
|
||||||
|
.wrap_err("error reading state file")?
|
||||||
|
.into_bytes();
|
||||||
let state: State = toml::from_slice(&state_s).expect("invalid state file");
|
let state: State = toml::from_slice(&state_s).expect("invalid state file");
|
||||||
Ok(state)
|
Ok(state)
|
||||||
},
|
}
|
||||||
Err(e) if is_no_such_key_error(&e) => {
|
Err(e) if is_no_such_key_error(&e) => {
|
||||||
let state = State { last_alert: HashMap::new() };
|
let state = State {
|
||||||
|
last_alert: HashMap::new(),
|
||||||
|
};
|
||||||
write_state(client, &state).await?;
|
write_state(client, &state).await?;
|
||||||
Ok(state)
|
Ok(state)
|
||||||
},
|
}
|
||||||
Err(err) => Err(err)?,
|
Err(err) => Err(err)?,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -123,7 +142,8 @@ async fn to_watch(client: &s3::Client) -> eyre::Result<Vec<String>> {
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(RESTIC_ALARM_BUCKET)
|
.bucket(RESTIC_ALARM_BUCKET)
|
||||||
.prefix(RESTIC_ALARM_WATCH_DIR)
|
.prefix(RESTIC_ALARM_WATCH_DIR)
|
||||||
.into_paginator().send();
|
.into_paginator()
|
||||||
|
.send();
|
||||||
while let Some(res) = objs_resp.next().await {
|
while let Some(res) = objs_resp.next().await {
|
||||||
let output = res?;
|
let output = res?;
|
||||||
for obj in output.contents() {
|
for obj in output.contents() {
|
||||||
|
@ -140,8 +160,12 @@ async fn read_repo_config(client: &s3::Client, repo: &str) -> eyre::Result<RepoC
|
||||||
.get_object()
|
.get_object()
|
||||||
.bucket(RESTIC_ALARM_BUCKET)
|
.bucket(RESTIC_ALARM_BUCKET)
|
||||||
.key(&repo_config_path)
|
.key(&repo_config_path)
|
||||||
.send().await?;
|
.send()
|
||||||
let config_s = output.body.collect().await
|
.await?;
|
||||||
|
let config_s = output
|
||||||
|
.body
|
||||||
|
.collect()
|
||||||
|
.await
|
||||||
.wrap_err_with(|| format!("reading repo config file {}", &repo_config_path))?
|
.wrap_err_with(|| format!("reading repo config file {}", &repo_config_path))?
|
||||||
.into_bytes();
|
.into_bytes();
|
||||||
let config = toml::from_slice(&config_s)
|
let config = toml::from_slice(&config_s)
|
||||||
|
@ -154,10 +178,11 @@ async fn repo_last_snapshot(client: &s3::Client, repo: &str) -> eyre::Result<Opt
|
||||||
.list_objects_v2()
|
.list_objects_v2()
|
||||||
.bucket(repo)
|
.bucket(repo)
|
||||||
.prefix("snapshots/")
|
.prefix("snapshots/")
|
||||||
.into_paginator().send();
|
.into_paginator()
|
||||||
|
.send();
|
||||||
let mut modtimes = Vec::new();
|
let mut modtimes = Vec::new();
|
||||||
while let Some(res) = objs.next().await {
|
while let Some(res) = objs.next().await {
|
||||||
let output = res?;
|
let output = res.wrap_err("listing repo snapshots")?;
|
||||||
for obj in output.contents() {
|
for obj in output.contents() {
|
||||||
if let Some(last_mod) = obj.last_modified() {
|
if let Some(last_mod) = obj.last_modified() {
|
||||||
modtimes.push(SystemTime::try_from(last_mod.clone()).unwrap());
|
modtimes.push(SystemTime::try_from(last_mod.clone()).unwrap());
|
||||||
|
@ -167,8 +192,7 @@ async fn repo_last_snapshot(client: &s3::Client, repo: &str) -> eyre::Result<Opt
|
||||||
modtimes.sort();
|
modtimes.sort();
|
||||||
match modtimes.last() {
|
match modtimes.last() {
|
||||||
None => Ok(None),
|
None => Ok(None),
|
||||||
Some(time) =>
|
Some(time) => Ok(SystemTime::now().duration_since(*time).ok()),
|
||||||
Ok(SystemTime::now().duration_since(*time).ok())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -176,21 +200,17 @@ async fn repo_last_snapshot(client: &s3::Client, repo: &str) -> eyre::Result<Opt
|
||||||
// (e.g. if it fails to parse).
|
// (e.g. if it fails to parse).
|
||||||
// So the error must not be propagated to the toplevel, which would abort the
|
// So the error must not be propagated to the toplevel, which would abort the
|
||||||
// alert for remaining repositories; it should instead just be reported/logged.
|
// alert for remaining repositories; it should instead just be reported/logged.
|
||||||
async fn check_repo(client: &s3::Client, state: &State, repo: &str) -> eyre::Result<State> {
|
async fn check_repo(client: &s3::Client, state: &mut State, repo: &str) -> eyre::Result<()> {
|
||||||
let config = read_repo_config(client, repo).await?;
|
let config = read_repo_config(client, repo).await?;
|
||||||
let mut state = state.to_owned();
|
if let Some(last_snapshot) = repo_last_snapshot(client, repo).await? {
|
||||||
match repo_last_snapshot(client, repo).await? {
|
if is_alert_needed(&config, last_snapshot, state.last_alert(repo)) {
|
||||||
Some(last_snapshot) => {
|
println!("Sending alert to {} about bucket {}", config.email, repo);
|
||||||
let is_alert = is_alert_needed(&config, last_snapshot, state.last_alert(repo));
|
send_email(&config.email, repo, last_snapshot).await?;
|
||||||
if is_alert {
|
}
|
||||||
println!("Sending alert to {} about bucket {}", config.email, repo);
|
state.update_last_alert(repo);
|
||||||
send_email(&config.email, repo, last_snapshot).await?;
|
write_state(client, state).await?;
|
||||||
}
|
|
||||||
state.update_last_alert(repo)
|
|
||||||
},
|
|
||||||
None => (),
|
|
||||||
}
|
}
|
||||||
Ok(state)
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
#[::tokio::main]
|
#[::tokio::main]
|
||||||
|
@ -206,11 +226,8 @@ async fn main() -> eyre::Result<()> {
|
||||||
let repos = to_watch(&client).await?;
|
let repos = to_watch(&client).await?;
|
||||||
let mut state = read_state(&client).await?;
|
let mut state = read_state(&client).await?;
|
||||||
for repo in repos {
|
for repo in repos {
|
||||||
match check_repo(&client, &state, &repo).await {
|
match check_repo(&client, &mut state, &repo).await {
|
||||||
Ok(new_state) => {
|
Ok(()) => (),
|
||||||
state = new_state;
|
|
||||||
write_state(&client, &state).await?;
|
|
||||||
},
|
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
// is this the best way to log the error?
|
// is this the best way to log the error?
|
||||||
eprintln!("Error: {:?}", err)
|
eprintln!("Error: {:?}", err)
|
||||||
|
|
Loading…
Reference in a new issue