From 8971f34c816c42a0eb2bbcead5ac7f05854ddfb6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Wed, 22 Apr 2020 17:04:33 +0000 Subject: [PATCH] Well they still have to exit when we're exiting though --- src/background.rs | 9 +++++++-- src/rpc_client.rs | 1 - src/rpc_server.rs | 14 +++++--------- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/background.rs b/src/background.rs index f0dbdcb50..937062dda 100644 --- a/src/background.rs +++ b/src/background.rs @@ -2,6 +2,8 @@ use core::future::Future; use std::pin::Pin; use futures::future::join_all; +use futures::select; +use futures_util::future::*; use std::sync::Arc; use tokio::sync::Mutex; use tokio::sync::{mpsc, watch, Notify}; @@ -88,7 +90,7 @@ impl BackgroundRunner { } async fn runner(self: Arc, i: usize) { - let stop_signal = self.stop_signal.clone(); + let mut stop_signal = self.stop_signal.clone(); loop { let must_exit: bool = *stop_signal.borrow(); if let Some(job) = self.dequeue_job(must_exit).await { @@ -100,7 +102,10 @@ impl BackgroundRunner { info!("Background runner {} exiting", i); return; } - self.job_notify.notified().await; + select! { + _ = self.job_notify.notified().fuse() => (), + _ = stop_signal.recv().fuse() => (), + } } } } diff --git a/src/rpc_client.rs b/src/rpc_client.rs index a5c44a2ec..aeb669568 100644 --- a/src/rpc_client.rs +++ b/src/rpc_client.rs @@ -263,4 +263,3 @@ impl RpcHttpClient { } } } - diff --git a/src/rpc_server.rs b/src/rpc_server.rs index 51661a669..bcf7496f1 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -47,16 +47,13 @@ where let begin_time = Instant::now(); let whole_body = hyper::body::to_bytes(req.into_body()).await?; let msg = rmp_serde::decode::from_read::<_, M>(whole_body.into_buf())?; - let req_str = debug_serialize(&msg); match handler(msg, sockaddr).await { Ok(resp) => { let resp_bytes = rmp_to_vec_all_named::>(&Ok(resp))?; - trace!( - "]RPC:{},ok ({} ms), request: {}", - name, - (Instant::now() - begin_time).as_millis(), - req_str, - ); + let rpc_duration = (Instant::now() - begin_time).as_millis(); + if rpc_duration > 100 { + debug!("RPC {} ok, took long: {} ms", name, rpc_duration,); + } Ok(Response::new(Body::from(resp_bytes))) } Err(e) => { @@ -65,11 +62,10 @@ where let mut err_response = Response::new(Body::from(rep_bytes)); *err_response.status_mut() = e.http_status_code(); warn!( - "RPC error ({}): {} ({} ms), request: {}", + "RPC error ({}): {} ({} ms)", name, e, (Instant::now() - begin_time).as_millis(), - req_str, ); Ok(err_response) }