Refresh user data periodically

This commit is contained in:
Armaël Guéneau 2024-12-18 12:18:27 +01:00
parent f801c26d34
commit df217ab86c
5 changed files with 124 additions and 27 deletions

View file

@ -18,9 +18,6 @@
- take concrete actions for spam accounts: lock the account, send a warning - take concrete actions for spam accounts: lock the account, send a warning
email, then delete+purge account after some time. email, then delete+purge account after some time.
- allow changing the classification of already-classified users - allow changing the classification of already-classified users
- periodically refresh the database of users from forgejo, and merge them with
the local db, handling updates in users data (triggering re-classification if
needed)
- add backend to store data on garage instead of local files - add backend to store data on garage instead of local files
- replate the `api_token` file with a better mechanism: oauth maybe? - replate the `api_token` file with a better mechanism: oauth maybe?
- improve error handling - improve error handling

View file

@ -2,6 +2,7 @@ use std::collections::HashMap;
use std::path::Path; use std::path::Path;
use std::fs::File; use std::fs::File;
use std::io::{BufReader, BufWriter}; use std::io::{BufReader, BufWriter};
use std::time::{Duration, SystemTime};
use crate::data::*; use crate::data::*;
use crate::classifier::Classifier; use crate::classifier::Classifier;
@ -11,6 +12,7 @@ pub struct Db {
// persisted data // persisted data
pub users: HashMap<UserId, UserData>, pub users: HashMap<UserId, UserData>,
pub is_spam: HashMap<UserId, bool>, pub is_spam: HashMap<UserId, bool>,
last_scrape: u64,
// caches: computed from persisted data on load // caches: computed from persisted data on load
pub score: HashMap<UserId, f32>, pub score: HashMap<UserId, f32>,
pub tokens: HashMap<UserId, Vec<String>>, pub tokens: HashMap<UserId, Vec<String>>,
@ -29,12 +31,25 @@ impl Db {
} }
} }
pub fn last_scrape(&self) -> SystemTime {
std::time::UNIX_EPOCH + Duration::from_secs(self.last_scrape)
}
pub fn set_last_scrape_to_now(&mut self) {
self.last_scrape =
SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d: Duration| d.as_secs())
.unwrap_or(0);
}
pub fn from_path(path: &Path, classifier: &Classifier) -> anyhow::Result<Self> { pub fn from_path(path: &Path, classifier: &Classifier) -> anyhow::Result<Self> {
let file = File::open(path)?; let file = File::open(path)?;
let (users, is_spam) = serde_json::from_reader(BufReader::new(file))?; let (users, is_spam, last_scrape) = serde_json::from_reader(BufReader::new(file))?;
let mut db = Db { let mut db = Db {
users, users,
is_spam, is_spam,
last_scrape,
tokens: HashMap::new(), tokens: HashMap::new(),
score: HashMap::new(), score: HashMap::new(),
}; };
@ -51,17 +66,18 @@ impl Db {
let mut db = Db { let mut db = Db {
users, users,
is_spam, is_spam,
last_scrape: 0,
tokens: HashMap::new(), tokens: HashMap::new(),
score: HashMap::new(), score: HashMap::new(),
}; };
db.recompute_tokens(); db.recompute_tokens();
db.recompute_scores(classifier); db.recompute_scores(classifier);
db.set_last_scrape_to_now();
db db
} }
pub fn store_to_path(&self, path: &Path) -> anyhow::Result<()> { pub fn store_to_path(&self, path: &Path) -> anyhow::Result<()> {
let file = File::create(path)?; let file = File::create(path)?;
let dat: (&HashMap<UserId, UserData>, &HashMap<UserId, bool>) =
(&self.users, &self.is_spam); (&self.users, &self.is_spam);
serde_json::to_writer(BufWriter::new(file), &dat)?; serde_json::to_writer(BufWriter::new(file), &dat)?;
Ok(()) Ok(())

View file

@ -6,7 +6,8 @@ use serde::Deserialize;
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
use std::path::Path; use std::path::Path;
use std::sync::Mutex; use std::sync::{Arc, Mutex};
use std::time::Duration;
use tera::Tera; use tera::Tera;
mod classifier; mod classifier;
@ -18,14 +19,18 @@ use classifier::Classifier;
use data::*; use data::*;
use db::Db; use db::Db;
async fn load_db() -> anyhow::Result<(Db, Classifier)> { // Fetch user data from forgejo from time to time
let model_path = Path::new("model.json"); const FORGEJO_POLL_DELAY: Duration = Duration::from_secs(11 * 3600); // 11 hours
let classifier = if model_path.is_file() {
Classifier::new_from_pre_trained(&mut File::open(model_path)?)?
} else {
Classifier::new()
};
// Heuristic score thresholds used for:
// - the display color when displaying unclassified users (green/orange/red)
// - chosing when to remove an existing classification after a user's data changes
//
// Never used to *automatically* classify a user as spam/legit!
const GUESS_SPAM_THRESHOLD: f32 = 0.8;
const GUESS_LEGIT_THRESHOLD: f32 = 0.3;
fn forge() -> anyhow::Result<Forgejo> {
let api_token = std::fs::read_to_string(Path::new("api_token"))? let api_token = std::fs::read_to_string(Path::new("api_token"))?
.trim() .trim()
.to_string(); .to_string();
@ -33,12 +38,22 @@ async fn load_db() -> anyhow::Result<(Db, Classifier)> {
Auth::Token(&api_token), Auth::Token(&api_token),
url::Url::parse("https://git.deuxfleurs.fr")?, url::Url::parse("https://git.deuxfleurs.fr")?,
)?; )?;
Ok(forge)
}
async fn load_db(forge: &Forgejo) -> anyhow::Result<(Db, Classifier)> {
let model_path = Path::new("model.json");
let classifier = if model_path.is_file() {
Classifier::new_from_pre_trained(&mut File::open(model_path)?)?
} else {
Classifier::new()
};
let db_path = Path::new("db.json"); let db_path = Path::new("db.json");
let db: Db = if db_path.is_file() { let db: Db = if db_path.is_file() {
Db::from_path(db_path, &classifier)? Db::from_path(db_path, &classifier)?
} else { } else {
let db = Db::from_users(scrape::get_users_data(&forge).await?, HashMap::new(), &classifier); let db = Db::from_users(scrape::get_user_data(&forge).await?, HashMap::new(), &classifier);
db.store_to_path(db_path)?; db.store_to_path(db_path)?;
db db
}; };
@ -63,7 +78,10 @@ fn set_spam(db: &mut Db, classifier: &mut Classifier, ids: &[(UserId, bool)]) {
// classification conflict between concurrent queries. // classification conflict between concurrent queries.
// In this case we play it safe and erase the classification for this user; // In this case we play it safe and erase the classification for this user;
// it will need to be manually classified again. // it will need to be manually classified again.
eprintln!("Classification conflict (uid %d), forget current user classification"); eprintln!(
"Classification conflict for user {}; discarding our current classification",
db.users.get(user_id).unwrap().login
);
db.is_spam.remove(user_id); db.is_spam.remove(user_id);
}, },
_ => { _ => {
@ -81,7 +99,7 @@ lazy_static! {
match Tera::new("templates/**/*.html") { match Tera::new("templates/**/*.html") {
Ok(t) => t, Ok(t) => t,
Err(e) => { Err(e) => {
println!("Parsing error(s): {}", e); eprintln!("Parsing error(s): {}", e);
::std::process::exit(1); ::std::process::exit(1);
} }
} }
@ -89,8 +107,8 @@ lazy_static! {
} }
struct AppState { struct AppState {
db: Mutex<Db>, db: Arc<Mutex<Db>>,
classifier: Mutex<Classifier>, classifier: Arc<Mutex<Classifier>>,
} }
#[derive(Debug, Deserialize)] #[derive(Debug, Deserialize)]
@ -105,7 +123,7 @@ async fn index(data: web::Data<AppState>, q: web::Query<SortSetting>) -> impl Re
let db = &data.db.lock().unwrap(); let db = &data.db.lock().unwrap();
eprintln!("scoring users..."); eprintln!("scoring users...");
let mut users: Vec<_> = db let mut users: Vec<(&UserId, &UserData, f32)> = db
.unclassified_users() .unclassified_users()
.into_iter() .into_iter()
.map(|(id, u)| (id, u, *db.score.get(id).unwrap())) .map(|(id, u)| (id, u, *db.score.get(id).unwrap()))
@ -135,6 +153,8 @@ async fn index(data: web::Data<AppState>, q: web::Query<SortSetting>) -> impl Re
let classified_count = db.is_spam.len(); let classified_count = db.is_spam.len();
let mut context = tera::Context::new(); let mut context = tera::Context::new();
context.insert("spam_threshold", &GUESS_SPAM_THRESHOLD);
context.insert("legit_threshold", &GUESS_LEGIT_THRESHOLD);
context.insert("users", &users); context.insert("users", &users);
context.insert( context.insert(
"unclassified_users_count", "unclassified_users_count",
@ -171,20 +191,84 @@ async fn apply(data: web::Data<AppState>, req: web::Form<HashMap<i64, String>>)
.finish() .finish()
} }
async fn refresh_user_data(forge: &Forgejo, db: Arc<Mutex<Db>>, classifier: Arc<Mutex<Classifier>>) -> anyhow::Result<()> {
{
let db = &db.lock().unwrap();
let d = db.last_scrape().elapsed()?;
if d < FORGEJO_POLL_DELAY {
return Ok(());
}
}
eprintln!("Fetching user data");
let users = scrape::get_user_data(forge).await?;
let db: &mut Db = &mut *db.lock().unwrap();
let classifier = &classifier.lock().unwrap();
// NB: Some user accounts may have been deleted since last fetch (hopefully
// they were spammers).
// Such users will appear in the current [db] but not in the new [users].
// We don't want to keep them in the database, so we rebuild a fresh [db]
// containing only data for users who still exist.
let mut newdb = Db::from_users(users, HashMap::new(), classifier);
// Import spam classification from the previous Db
for (&user_id, user_data) in &newdb.users {
let &score = newdb.score.get(&user_id).unwrap();
if let Some(&user_was_spam) = db.is_spam.get(&user_id) {
if (user_was_spam && score < GUESS_SPAM_THRESHOLD) ||
(! user_was_spam && score > GUESS_LEGIT_THRESHOLD)
{
eprintln!(
"Score for user {} changed past threshold; discarding our current classification",
user_data.login
);
} else {
newdb.is_spam.insert(user_id, user_was_spam);
}
}
}
// switch to [newdb]
let _ = std::mem::replace(db, newdb);
db.store_to_path(Path::new("db.json")).unwrap(); // FIXME
Ok(())
}
async fn refresh_user_data_loop(forge: Arc<Forgejo>, db: Arc<Mutex<Db>>, classifier: Arc<Mutex<Classifier>>) {
loop {
tokio::time::sleep(FORGEJO_POLL_DELAY.mul_f32(0.1)).await;
if let Err(e) = refresh_user_data(&forge, db.clone(), classifier.clone()).await {
eprintln!("Error refreshing user data: {:?}", e);
}
}
}
#[actix_web::main] #[actix_web::main]
async fn main() -> std::io::Result<()> { async fn main() -> std::io::Result<()> {
eprintln!("Eval templates"); eprintln!("Eval templates");
let _ = *TEMPLATES; let _ = *TEMPLATES;
eprintln!("Load users and repos"); eprintln!("Load users and repos");
let (db, classifier) = load_db().await.unwrap(); // FIXME let forge = Arc::new(forge().unwrap()); // FIXME
let (db, classifier) = load_db(&forge).await.unwrap(); // FIXME
let db = Arc::new(Mutex::new(db));
let classifier = Arc::new(Mutex::new(classifier));
let st = web::Data::new(AppState { let st = web::Data::new(AppState {
db: Mutex::new(db), db: db.clone(),
classifier: Mutex::new(classifier), classifier: classifier.clone(),
}); });
println!("Launch web server at http://127.0.0.1:8080"); let _ = tokio::spawn(async move {
refresh_user_data_loop(forge.clone(), db.clone(), classifier.clone())
});
println!("Listening on http://127.0.0.1:8080");
HttpServer::new(move || { HttpServer::new(move || {
App::new() App::new()

View file

@ -68,7 +68,7 @@ async fn scrape_users(forge: &Forgejo) -> anyhow::Result<Vec<forgejo_api::struct
Ok(users) Ok(users)
} }
pub async fn get_users_data(forge: &Forgejo) -> anyhow::Result<HashMap<UserId, UserData>> { pub async fn get_user_data(forge: &Forgejo) -> anyhow::Result<HashMap<UserId, UserData>> {
let mut data = HashMap::new(); let mut data = HashMap::new();
let discard_empty = |o: Option<String>| { let discard_empty = |o: Option<String>| {

View file

@ -124,9 +124,9 @@
<label for="{{user_id}}-legit">Legit</label> <label for="{{user_id}}-legit">Legit</label>
</div> </div>
<div class="score <div class="score
{% if score >= 0.8 %} score-high {% endif %} {% if score >= spam_threshold %} score-high {% endif %}
{% if score < 0.8 and score > 0.3 %} score-mid {% endif %} {% if score < spam_threshold and score > legit_threshold %} score-mid {% endif %}
{% if score <= 0.3 %} score-low {% endif %} {% if score <= legit_threshold %} score-low {% endif %}
"> ">
{{ score | round(precision=2) }} {{ score | round(precision=2) }}
</div> </div>