diff --git a/README.md b/README.md index 01f1fd0..c213741 100644 --- a/README.md +++ b/README.md @@ -18,9 +18,6 @@ - take concrete actions for spam accounts: lock the account, send a warning email, then delete+purge account after some time. - allow changing the classification of already-classified users -- periodically refresh the database of users from forgejo, and merge them with - the local db, handling updates in users data (triggering re-classification if - needed) - add backend to store data on garage instead of local files - replate the `api_token` file with a better mechanism: oauth maybe? - improve error handling diff --git a/src/db.rs b/src/db.rs index 2b1c35e..8dddc18 100644 --- a/src/db.rs +++ b/src/db.rs @@ -2,6 +2,7 @@ use std::collections::HashMap; use std::path::Path; use std::fs::File; use std::io::{BufReader, BufWriter}; +use std::time::{Duration, SystemTime}; use crate::data::*; use crate::classifier::Classifier; @@ -11,6 +12,7 @@ pub struct Db { // persisted data pub users: HashMap, pub is_spam: HashMap, + last_scrape: u64, // caches: computed from persisted data on load pub score: HashMap, pub tokens: HashMap>, @@ -29,12 +31,25 @@ impl Db { } } + pub fn last_scrape(&self) -> SystemTime { + std::time::UNIX_EPOCH + Duration::from_secs(self.last_scrape) + } + + pub fn set_last_scrape_to_now(&mut self) { + self.last_scrape = + SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .map(|d: Duration| d.as_secs()) + .unwrap_or(0); + } + pub fn from_path(path: &Path, classifier: &Classifier) -> anyhow::Result { let file = File::open(path)?; - let (users, is_spam) = serde_json::from_reader(BufReader::new(file))?; + let (users, is_spam, last_scrape) = serde_json::from_reader(BufReader::new(file))?; let mut db = Db { users, is_spam, + last_scrape, tokens: HashMap::new(), score: HashMap::new(), }; @@ -51,17 +66,18 @@ impl Db { let mut db = Db { users, is_spam, + last_scrape: 0, tokens: HashMap::new(), score: HashMap::new(), }; db.recompute_tokens(); db.recompute_scores(classifier); + db.set_last_scrape_to_now(); db } pub fn store_to_path(&self, path: &Path) -> anyhow::Result<()> { let file = File::create(path)?; - let dat: (&HashMap, &HashMap) = (&self.users, &self.is_spam); serde_json::to_writer(BufWriter::new(file), &dat)?; Ok(()) diff --git a/src/main.rs b/src/main.rs index 106f541..a1f0884 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,7 +6,8 @@ use serde::Deserialize; use std::collections::HashMap; use std::fs::File; use std::path::Path; -use std::sync::Mutex; +use std::sync::{Arc, Mutex}; +use std::time::Duration; use tera::Tera; mod classifier; @@ -18,14 +19,18 @@ use classifier::Classifier; use data::*; use db::Db; -async fn load_db() -> anyhow::Result<(Db, Classifier)> { - let model_path = Path::new("model.json"); - let classifier = if model_path.is_file() { - Classifier::new_from_pre_trained(&mut File::open(model_path)?)? - } else { - Classifier::new() - }; +// Fetch user data from forgejo from time to time +const FORGEJO_POLL_DELAY: Duration = Duration::from_secs(11 * 3600); // 11 hours +// Heuristic score thresholds used for: +// - the display color when displaying unclassified users (green/orange/red) +// - chosing when to remove an existing classification after a user's data changes +// +// Never used to *automatically* classify a user as spam/legit! +const GUESS_SPAM_THRESHOLD: f32 = 0.8; +const GUESS_LEGIT_THRESHOLD: f32 = 0.3; + +fn forge() -> anyhow::Result { let api_token = std::fs::read_to_string(Path::new("api_token"))? .trim() .to_string(); @@ -33,12 +38,22 @@ async fn load_db() -> anyhow::Result<(Db, Classifier)> { Auth::Token(&api_token), url::Url::parse("https://git.deuxfleurs.fr")?, )?; + Ok(forge) +} + +async fn load_db(forge: &Forgejo) -> anyhow::Result<(Db, Classifier)> { + let model_path = Path::new("model.json"); + let classifier = if model_path.is_file() { + Classifier::new_from_pre_trained(&mut File::open(model_path)?)? + } else { + Classifier::new() + }; let db_path = Path::new("db.json"); let db: Db = if db_path.is_file() { Db::from_path(db_path, &classifier)? } else { - let db = Db::from_users(scrape::get_users_data(&forge).await?, HashMap::new(), &classifier); + let db = Db::from_users(scrape::get_user_data(&forge).await?, HashMap::new(), &classifier); db.store_to_path(db_path)?; db }; @@ -63,7 +78,10 @@ fn set_spam(db: &mut Db, classifier: &mut Classifier, ids: &[(UserId, bool)]) { // classification conflict between concurrent queries. // In this case we play it safe and erase the classification for this user; // it will need to be manually classified again. - eprintln!("Classification conflict (uid %d), forget current user classification"); + eprintln!( + "Classification conflict for user {}; discarding our current classification", + db.users.get(user_id).unwrap().login + ); db.is_spam.remove(user_id); }, _ => { @@ -81,7 +99,7 @@ lazy_static! { match Tera::new("templates/**/*.html") { Ok(t) => t, Err(e) => { - println!("Parsing error(s): {}", e); + eprintln!("Parsing error(s): {}", e); ::std::process::exit(1); } } @@ -89,8 +107,8 @@ lazy_static! { } struct AppState { - db: Mutex, - classifier: Mutex, + db: Arc>, + classifier: Arc>, } #[derive(Debug, Deserialize)] @@ -105,7 +123,7 @@ async fn index(data: web::Data, q: web::Query) -> impl Re let db = &data.db.lock().unwrap(); eprintln!("scoring users..."); - let mut users: Vec<_> = db + let mut users: Vec<(&UserId, &UserData, f32)> = db .unclassified_users() .into_iter() .map(|(id, u)| (id, u, *db.score.get(id).unwrap())) @@ -135,6 +153,8 @@ async fn index(data: web::Data, q: web::Query) -> impl Re let classified_count = db.is_spam.len(); let mut context = tera::Context::new(); + context.insert("spam_threshold", &GUESS_SPAM_THRESHOLD); + context.insert("legit_threshold", &GUESS_LEGIT_THRESHOLD); context.insert("users", &users); context.insert( "unclassified_users_count", @@ -171,20 +191,84 @@ async fn apply(data: web::Data, req: web::Form>) .finish() } +async fn refresh_user_data(forge: &Forgejo, db: Arc>, classifier: Arc>) -> anyhow::Result<()> { + { + let db = &db.lock().unwrap(); + let d = db.last_scrape().elapsed()?; + if d < FORGEJO_POLL_DELAY { + return Ok(()); + } + } + + eprintln!("Fetching user data"); + let users = scrape::get_user_data(forge).await?; + + let db: &mut Db = &mut *db.lock().unwrap(); + let classifier = &classifier.lock().unwrap(); + + // NB: Some user accounts may have been deleted since last fetch (hopefully + // they were spammers). + // Such users will appear in the current [db] but not in the new [users]. + // We don't want to keep them in the database, so we rebuild a fresh [db] + // containing only data for users who still exist. + + let mut newdb = Db::from_users(users, HashMap::new(), classifier); + + // Import spam classification from the previous Db + for (&user_id, user_data) in &newdb.users { + let &score = newdb.score.get(&user_id).unwrap(); + if let Some(&user_was_spam) = db.is_spam.get(&user_id) { + if (user_was_spam && score < GUESS_SPAM_THRESHOLD) || + (! user_was_spam && score > GUESS_LEGIT_THRESHOLD) + { + eprintln!( + "Score for user {} changed past threshold; discarding our current classification", + user_data.login + ); + } else { + newdb.is_spam.insert(user_id, user_was_spam); + } + } + } + + // switch to [newdb] + let _ = std::mem::replace(db, newdb); + + db.store_to_path(Path::new("db.json")).unwrap(); // FIXME + + Ok(()) +} + +async fn refresh_user_data_loop(forge: Arc, db: Arc>, classifier: Arc>) { + loop { + tokio::time::sleep(FORGEJO_POLL_DELAY.mul_f32(0.1)).await; + if let Err(e) = refresh_user_data(&forge, db.clone(), classifier.clone()).await { + eprintln!("Error refreshing user data: {:?}", e); + } + } +} + #[actix_web::main] async fn main() -> std::io::Result<()> { eprintln!("Eval templates"); let _ = *TEMPLATES; eprintln!("Load users and repos"); - let (db, classifier) = load_db().await.unwrap(); // FIXME + let forge = Arc::new(forge().unwrap()); // FIXME + let (db, classifier) = load_db(&forge).await.unwrap(); // FIXME + let db = Arc::new(Mutex::new(db)); + let classifier = Arc::new(Mutex::new(classifier)); let st = web::Data::new(AppState { - db: Mutex::new(db), - classifier: Mutex::new(classifier), + db: db.clone(), + classifier: classifier.clone(), }); - println!("Launch web server at http://127.0.0.1:8080"); + let _ = tokio::spawn(async move { + refresh_user_data_loop(forge.clone(), db.clone(), classifier.clone()) + }); + + println!("Listening on http://127.0.0.1:8080"); HttpServer::new(move || { App::new() diff --git a/src/scrape.rs b/src/scrape.rs index c02f166..aea3a3f 100644 --- a/src/scrape.rs +++ b/src/scrape.rs @@ -68,7 +68,7 @@ async fn scrape_users(forge: &Forgejo) -> anyhow::Result anyhow::Result> { +pub async fn get_user_data(forge: &Forgejo) -> anyhow::Result> { let mut data = HashMap::new(); let discard_empty = |o: Option| { diff --git a/templates/index.html b/templates/index.html index 64e78e9..822db7b 100644 --- a/templates/index.html +++ b/templates/index.html @@ -124,9 +124,9 @@
{{ score | round(precision=2) }}