Fix multiple shutdown issues #633

Merged
lx merged 2 commits from fix-shutdown into main 2023-09-12 12:54:51 +00:00
2 changed files with 24 additions and 17 deletions

View file

@ -130,20 +130,27 @@ pub async fn run_server(config_file: PathBuf, secrets: Secrets) -> Result<(), Er
warn!("This Garage version is built without the metrics feature"); warn!("This Garage version is built without the metrics feature");
} }
// Stuff runs if servers.is_empty() {
// Nothing runs except netapp (not in servers)
// Await shutdown signal before proceeding to shutting down netapp
wait_from(watch_cancel).await;
} else {
// Stuff runs
// When a cancel signal is sent, stuff stops // When a cancel signal is sent, stuff stops
// Collect stuff // Collect stuff
for (desc, join_handle) in servers { for (desc, join_handle) in servers {
if let Err(e) = join_handle.await? { if let Err(e) = join_handle.await? {
error!("{} server exited with error: {}", desc, e); error!("{} server exited with error: {}", desc, e);
} else { } else {
info!("{} server exited without error.", desc); info!("{} server exited without error.", desc);
}
} }
} }
// Remove RPC handlers for system to break reference cycles // Remove RPC handlers for system to break reference cycles
info!("Deregistering RPC handlers for shutdown...");
garage.system.netapp.drop_all_handlers(); garage.system.netapp.drop_all_handlers();
opentelemetry::global::shutdown_tracer_provider(); opentelemetry::global::shutdown_tracer_provider();

View file

@ -9,10 +9,10 @@ use std::time::{Duration, Instant};
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
use async_trait::async_trait; use async_trait::async_trait;
use futures::{join, select}; use futures::join;
use futures_util::future::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
use tokio::select;
use tokio::sync::watch; use tokio::sync::watch;
use tokio::sync::Mutex; use tokio::sync::Mutex;
@ -702,7 +702,7 @@ impl System {
async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) { async fn status_exchange_loop(&self, mut stop_signal: watch::Receiver<bool>) {
while !*stop_signal.borrow() { while !*stop_signal.borrow() {
let restart_at = tokio::time::sleep(STATUS_EXCHANGE_INTERVAL); let restart_at = Instant::now() + STATUS_EXCHANGE_INTERVAL;
self.update_local_status(); self.update_local_status();
let local_status: NodeStatus = self.local_status.load().as_ref().clone(); let local_status: NodeStatus = self.local_status.load().as_ref().clone();
@ -711,13 +711,14 @@ impl System {
.broadcast( .broadcast(
&self.system_endpoint, &self.system_endpoint,
SystemRpc::AdvertiseStatus(local_status), SystemRpc::AdvertiseStatus(local_status),
RequestStrategy::with_priority(PRIO_HIGH), RequestStrategy::with_priority(PRIO_HIGH)
.with_custom_timeout(STATUS_EXCHANGE_INTERVAL),
) )
.await; .await;
select! { select! {
_ = restart_at.fuse() => {}, _ = tokio::time::sleep_until(restart_at.into()) => {},
_ = stop_signal.changed().fuse() => {}, _ = stop_signal.changed() => {},
} }
} }
} }
@ -799,10 +800,9 @@ impl System {
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
tokio::spawn(self.clone().advertise_to_kubernetes()); tokio::spawn(self.clone().advertise_to_kubernetes());
let restart_at = tokio::time::sleep(DISCOVERY_INTERVAL);
select! { select! {
_ = restart_at.fuse() => {}, _ = tokio::time::sleep(DISCOVERY_INTERVAL) => {},
_ = stop_signal.changed().fuse() => {}, _ = stop_signal.changed() => {},
} }
} }
} }