forked from Deuxfleurs/garage
Log which workers are doing what
This commit is contained in:
parent
ec7f9f07e2
commit
53cf4d1baa
5 changed files with 25 additions and 16 deletions
|
@ -182,7 +182,9 @@ impl AdminRpcHandler {
|
||||||
self.garage
|
self.garage
|
||||||
.system
|
.system
|
||||||
.background
|
.background
|
||||||
.spawn_worker(move |must_exit| async move { self2.repair_worker(must_exit).await })
|
.spawn_worker("Repair worker".into(), move |must_exit| async move {
|
||||||
|
self2.repair_worker(must_exit).await
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
Ok(AdminRPC::Ok(format!(
|
Ok(AdminRPC::Ok(format!(
|
||||||
"Repair launched on {:?}",
|
"Repair launched on {:?}",
|
||||||
|
|
|
@ -36,8 +36,8 @@ impl BackgroundRunner {
|
||||||
|
|
||||||
pub async fn run(self: Arc<Self>) {
|
pub async fn run(self: Arc<Self>) {
|
||||||
let mut workers = self.workers.lock().await;
|
let mut workers = self.workers.lock().await;
|
||||||
for _i in 0..self.n_runners {
|
for i in 0..self.n_runners {
|
||||||
workers.push(tokio::spawn(self.clone().runner()));
|
workers.push(tokio::spawn(self.clone().runner(i)));
|
||||||
}
|
}
|
||||||
drop(workers);
|
drop(workers);
|
||||||
|
|
||||||
|
@ -68,7 +68,7 @@ impl BackgroundRunner {
|
||||||
let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
|
let _: Result<_, _> = self.queue_in.clone().send((boxed, true));
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn spawn_worker<F, T>(&self, worker: F)
|
pub async fn spawn_worker<F, T>(&self, name: String, worker: F)
|
||||||
where
|
where
|
||||||
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
|
F: FnOnce(watch::Receiver<bool>) -> T + Send + 'static,
|
||||||
T: Future<Output = JobOutput> + Send + 'static,
|
T: Future<Output = JobOutput> + Send + 'static,
|
||||||
|
@ -77,14 +77,14 @@ impl BackgroundRunner {
|
||||||
let stop_signal = self.stop_signal.clone();
|
let stop_signal = self.stop_signal.clone();
|
||||||
workers.push(tokio::spawn(async move {
|
workers.push(tokio::spawn(async move {
|
||||||
if let Err(e) = worker(stop_signal).await {
|
if let Err(e) = worker(stop_signal).await {
|
||||||
eprintln!("Worker stopped with error: {}", e);
|
eprintln!("Worker stopped with error: {}, error: {}", name, e);
|
||||||
} else {
|
} else {
|
||||||
println!("A worker exited successfully (which one?)");
|
println!("Worker exited successfully: {}", name);
|
||||||
}
|
}
|
||||||
}));
|
}));
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn runner(self: Arc<Self>) {
|
async fn runner(self: Arc<Self>, i: usize) {
|
||||||
let stop_signal = self.stop_signal.clone();
|
let stop_signal = self.stop_signal.clone();
|
||||||
loop {
|
loop {
|
||||||
let must_exit: bool = *stop_signal.borrow();
|
let must_exit: bool = *stop_signal.borrow();
|
||||||
|
@ -94,6 +94,7 @@ impl BackgroundRunner {
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
if must_exit {
|
if must_exit {
|
||||||
|
eprintln!("Background runner {} exiting", i);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
tokio::time::delay_for(Duration::from_secs(1)).await;
|
tokio::time::delay_for(Duration::from_secs(1)).await;
|
||||||
|
|
|
@ -107,11 +107,13 @@ impl BlockManager {
|
||||||
|
|
||||||
pub async fn spawn_background_worker(self: Arc<Self>) {
|
pub async fn spawn_background_worker(self: Arc<Self>) {
|
||||||
// Launch 2 simultaneous workers for background resync loop preprocessing
|
// Launch 2 simultaneous workers for background resync loop preprocessing
|
||||||
for _i in 0..2usize {
|
for i in 0..2usize {
|
||||||
let bm2 = self.clone();
|
let bm2 = self.clone();
|
||||||
self.system
|
self.system
|
||||||
.background
|
.background
|
||||||
.spawn_worker(move |must_exit| bm2.resync_loop(must_exit))
|
.spawn_worker(format!("block resync worker {}", i), move |must_exit| {
|
||||||
|
bm2.resync_loop(must_exit)
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -391,7 +391,9 @@ impl System {
|
||||||
|
|
||||||
self.clone()
|
self.clone()
|
||||||
.background
|
.background
|
||||||
.spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok))
|
.spawn_worker(format!("ping loop"), |stop_signal| {
|
||||||
|
self.ping_loop(stop_signal).map(Ok)
|
||||||
|
})
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -96,18 +96,20 @@ where
|
||||||
table
|
table
|
||||||
.system
|
.system
|
||||||
.background
|
.background
|
||||||
.spawn_worker(move |must_exit: watch::Receiver<bool>| {
|
.spawn_worker(
|
||||||
s1.watcher_task(must_exit, busy_rx)
|
format!("table sync watcher for {}", table.name),
|
||||||
})
|
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
let s2 = syncer.clone();
|
let s2 = syncer.clone();
|
||||||
table
|
table
|
||||||
.system
|
.system
|
||||||
.background
|
.background
|
||||||
.spawn_worker(move |must_exit: watch::Receiver<bool>| {
|
.spawn_worker(
|
||||||
s2.syncer_task(must_exit, busy_tx)
|
format!("table syncer for {}", table.name),
|
||||||
})
|
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
|
|
||||||
syncer
|
syncer
|
||||||
|
|
Loading…
Reference in a new issue