Fix multiple shutdown issues #633
2 changed files with 24 additions and 17 deletions
|
@ -130,20 +130,27 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
|
|||
warn!("This Garage version is built without the metrics feature");
|
||||
}
|
||||
|
||||
// Stuff runs
|
||||
if servers.is_empty() {
|
||||
// Nothing runs except netapp (not in servers)
|
||||
// Await shutdown signal before proceeding to shutting down netapp
|
||||
wait_from(watch_cancel).await;
|
||||
} else {
|
||||
// Stuff runs
|
||||
|
||||
// When a cancel signal is sent, stuff stops
|
||||
// When a cancel signal is sent, stuff stops
|
||||
|
||||
// Collect stuff
|
||||
for (desc, join_handle) in servers {
|
||||
if let Err(e) = join_handle.await? {
|
||||
error!("{} server exited with error: {}", desc, e);
|
||||
} else {
|
||||
info!("{} server exited without error.", desc);
|
||||
// Collect stuff
|
||||
for (desc, join_handle) in servers {
|
||||
if let Err(e) = join_handle.await? {
|
||||
error!("{} server exited with error: {}", desc, e);
|
||||
} else {
|
||||
info!("{} server exited without error.", desc);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Remove RPC handlers for system to break reference cycles
|
||||
info!("Deregistering RPC handlers for shutdown...");
|
||||
garage.system.netapp.drop_all_handlers();
|
||||
opentelemetry::global::shutdown_tracer_provider();
|
||||
|
||||
|
|
|
@ -9,10 +9,10 @@ use std::time::{Duration, Instant};
|
|||
|
||||
use arc_swap::ArcSwap;
|
||||
use async_trait::async_trait;
|
||||
use futures::{join, select};
|
||||
use futures_util::future::*;
|
||||
use futures::join;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use sodiumoxide::crypto::sign::ed25519;
|
||||
use tokio::select;
|
||||
use tokio::sync::watch;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
|
@ -702,7 +702,7 @@ impl System {
|
|||
|
||||
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
|
||||
while !*stop_signal.borrow() {
|
||||
let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL);
|
||||
let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
|
||||
|
||||
self.update_local_status();
|
||||
let local_status: NodeStatus = self.local_status.load().as_ref().clone();
|
||||
|
@ -711,13 +711,14 @@ impl System {
|
|||
.broadcast(
|
||||
&self.system_endpoint,
|
||||
SystemRpc::AdvertiseStatus(local_status),
|
||||
RequestStrategy::with_priority(PRIO_HIGH),
|
||||
RequestStrategy::with_priority(PRIO_HIGH)
|
||||
.with_custom_timeout(STATUS_EXCHANGE_INTERVAL),
|
||||
)
|
||||
.await;
|
||||
|
||||
select! {
|
||||
_ = restart_at.fuse() => {},
|
||||
_ = stop_signal.changed().fuse() => {},
|
||||
_ = tokio::time::sleep_until(restart_at.into()) => {},
|
||||
_ = stop_signal.changed() => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -799,10 +800,9 @@ impl System {
|
|||
#[cfg(feature = "kubernetes-discovery")]
|
||||
tokio::spawn(self.clone().advertise_to_kubernetes());
|
||||
|
||||
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
|
||||
select! {
|
||||
_ = restart_at.fuse() => {},
|
||||
_ = stop_signal.changed().fuse() => {},
|
||||
_ = tokio::time::sleep(DISCOVERY_INTERVAL) => {},
|
||||
_ = stop_signal.changed() => {},
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue