refactor: move the refresh job to a separate file
This commit is contained in:
parent
df217ab86c
commit
6a23483073
2 changed files with 70 additions and 59 deletions
62
src/main.rs
62
src/main.rs
|
@ -12,8 +12,9 @@ use tera::Tera;
|
||||||
|
|
||||||
mod classifier;
|
mod classifier;
|
||||||
mod data;
|
mod data;
|
||||||
mod scrape;
|
|
||||||
mod db;
|
mod db;
|
||||||
|
mod scrape;
|
||||||
|
mod workers;
|
||||||
|
|
||||||
use classifier::Classifier;
|
use classifier::Classifier;
|
||||||
use data::*;
|
use data::*;
|
||||||
|
@ -191,63 +192,6 @@ async fn apply(data: web::Data<AppState>, req: web::Form<HashMap<i64, String>>)
|
||||||
.finish()
|
.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn refresh_user_data(forge: &Forgejo, db: Arc<Mutex<Db>>, classifier: Arc<Mutex<Classifier>>) -> anyhow::Result<()> {
|
|
||||||
{
|
|
||||||
let db = &db.lock().unwrap();
|
|
||||||
let d = db.last_scrape().elapsed()?;
|
|
||||||
if d < FORGEJO_POLL_DELAY {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
eprintln!("Fetching user data");
|
|
||||||
let users = scrape::get_user_data(forge).await?;
|
|
||||||
|
|
||||||
let db: &mut Db = &mut *db.lock().unwrap();
|
|
||||||
let classifier = &classifier.lock().unwrap();
|
|
||||||
|
|
||||||
// NB: Some user accounts may have been deleted since last fetch (hopefully
|
|
||||||
// they were spammers).
|
|
||||||
// Such users will appear in the current [db] but not in the new [users].
|
|
||||||
// We don't want to keep them in the database, so we rebuild a fresh [db]
|
|
||||||
// containing only data for users who still exist.
|
|
||||||
|
|
||||||
let mut newdb = Db::from_users(users, HashMap::new(), classifier);
|
|
||||||
|
|
||||||
// Import spam classification from the previous Db
|
|
||||||
for (&user_id, user_data) in &newdb.users {
|
|
||||||
let &score = newdb.score.get(&user_id).unwrap();
|
|
||||||
if let Some(&user_was_spam) = db.is_spam.get(&user_id) {
|
|
||||||
if (user_was_spam && score < GUESS_SPAM_THRESHOLD) ||
|
|
||||||
(! user_was_spam && score > GUESS_LEGIT_THRESHOLD)
|
|
||||||
{
|
|
||||||
eprintln!(
|
|
||||||
"Score for user {} changed past threshold; discarding our current classification",
|
|
||||||
user_data.login
|
|
||||||
);
|
|
||||||
} else {
|
|
||||||
newdb.is_spam.insert(user_id, user_was_spam);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// switch to [newdb]
|
|
||||||
let _ = std::mem::replace(db, newdb);
|
|
||||||
|
|
||||||
db.store_to_path(Path::new("db.json")).unwrap(); // FIXME
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn refresh_user_data_loop(forge: Arc<Forgejo>, db: Arc<Mutex<Db>>, classifier: Arc<Mutex<Classifier>>) {
|
|
||||||
loop {
|
|
||||||
tokio::time::sleep(FORGEJO_POLL_DELAY.mul_f32(0.1)).await;
|
|
||||||
if let Err(e) = refresh_user_data(&forge, db.clone(), classifier.clone()).await {
|
|
||||||
eprintln!("Error refreshing user data: {:?}", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[actix_web::main]
|
#[actix_web::main]
|
||||||
async fn main() -> std::io::Result<()> {
|
async fn main() -> std::io::Result<()> {
|
||||||
eprintln!("Eval templates");
|
eprintln!("Eval templates");
|
||||||
|
@ -265,7 +209,7 @@ async fn main() -> std::io::Result<()> {
|
||||||
});
|
});
|
||||||
|
|
||||||
let _ = tokio::spawn(async move {
|
let _ = tokio::spawn(async move {
|
||||||
refresh_user_data_loop(forge.clone(), db.clone(), classifier.clone())
|
workers::refresh_user_data(forge.clone(), db.clone(), classifier.clone())
|
||||||
});
|
});
|
||||||
|
|
||||||
println!("Listening on http://127.0.0.1:8080");
|
println!("Listening on http://127.0.0.1:8080");
|
||||||
|
|
67
src/workers.rs
Normal file
67
src/workers.rs
Normal file
|
@ -0,0 +1,67 @@
|
||||||
|
use std::sync::{Arc, Mutex};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
use std::path::Path;
|
||||||
|
use forgejo_api::Forgejo;
|
||||||
|
use crate::db::Db;
|
||||||
|
use crate::classifier::Classifier;
|
||||||
|
use crate::scrape;
|
||||||
|
|
||||||
|
use crate::FORGEJO_POLL_DELAY;
|
||||||
|
use crate::{GUESS_LEGIT_THRESHOLD, GUESS_SPAM_THRESHOLD};
|
||||||
|
|
||||||
|
async fn try_refresh_user_data(forge: &Forgejo, db: Arc<Mutex<Db>>, classifier: Arc<Mutex<Classifier>>) -> anyhow::Result<()> {
|
||||||
|
{
|
||||||
|
let db = &db.lock().unwrap();
|
||||||
|
let d = db.last_scrape().elapsed()?;
|
||||||
|
if d < FORGEJO_POLL_DELAY {
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
eprintln!("Fetching user data");
|
||||||
|
let users = scrape::get_user_data(forge).await?;
|
||||||
|
|
||||||
|
let db: &mut Db = &mut *db.lock().unwrap();
|
||||||
|
let classifier = &classifier.lock().unwrap();
|
||||||
|
|
||||||
|
// NB: Some user accounts may have been deleted since last fetch (hopefully
|
||||||
|
// they were spammers).
|
||||||
|
// Such users will appear in the current [db] but not in the new [users].
|
||||||
|
// We don't want to keep them in the database, so we rebuild a fresh [db]
|
||||||
|
// containing only data for users who still exist.
|
||||||
|
|
||||||
|
let mut newdb = Db::from_users(users, HashMap::new(), classifier);
|
||||||
|
|
||||||
|
// Import spam classification from the previous Db
|
||||||
|
for (&user_id, user_data) in &newdb.users {
|
||||||
|
let &score = newdb.score.get(&user_id).unwrap();
|
||||||
|
if let Some(&user_was_spam) = db.is_spam.get(&user_id) {
|
||||||
|
if (user_was_spam && score < GUESS_SPAM_THRESHOLD) ||
|
||||||
|
(! user_was_spam && score > GUESS_LEGIT_THRESHOLD)
|
||||||
|
{
|
||||||
|
eprintln!(
|
||||||
|
"Score for user {} changed past threshold; discarding our current classification",
|
||||||
|
user_data.login
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
newdb.is_spam.insert(user_id, user_was_spam);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// switch to [newdb]
|
||||||
|
let _ = std::mem::replace(db, newdb);
|
||||||
|
|
||||||
|
db.store_to_path(Path::new("db.json")).unwrap(); // FIXME
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn refresh_user_data(forge: Arc<Forgejo>, db: Arc<Mutex<Db>>, classifier: Arc<Mutex<Classifier>>) {
|
||||||
|
loop {
|
||||||
|
tokio::time::sleep(FORGEJO_POLL_DELAY.mul_f32(0.1)).await;
|
||||||
|
if let Err(e) = try_refresh_user_data(&forge, db.clone(), classifier.clone()).await {
|
||||||
|
eprintln!("Error refreshing user data: {:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in a new issue