diff --git a/src/garage/admin_rpc.rs b/src/garage/admin_rpc.rs index 40674e757..b2145ca5f 100644 --- a/src/garage/admin_rpc.rs +++ b/src/garage/admin_rpc.rs @@ -193,7 +193,12 @@ impl AdminRpcHandler { let key_ids = self .garage .key_table - .get_range(&EmptyKey, None, Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), 10000) + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Deleted(DeletedFilter::NotDeleted)), + 10000, + ) .await? .iter() .map(|k| (k.key_id.to_string(), k.name.get().clone())) @@ -257,15 +262,24 @@ impl AdminRpcHandler { } async fn get_existing_key(&self, pattern: &str) -> Result { - let candidates = self.garage + let candidates = self + .garage .key_table - .get_range(&EmptyKey, None, Some(KeyFilter::Matches(pattern.to_string())), 10) + .get_range( + &EmptyKey, + None, + Some(KeyFilter::Matches(pattern.to_string())), + 10, + ) .await? .into_iter() .filter(|k| !k.deleted.get()) .collect::>(); if candidates.len() != 1 { - Err(Error::Message(format!("{} matching keys", candidates.len()))) + Err(Error::Message(format!( + "{} matching keys", + candidates.len() + ))) } else { Ok(candidates.into_iter().next().unwrap()) } @@ -469,12 +483,7 @@ impl AdminRpcHandler { t.data.merkle_updater.merkle_tree_len() ) .unwrap(); - writeln!( - to, - " GC todo queue length: {}", - t.data.gc_todo_len() - ) - .unwrap(); + writeln!(to, " GC todo queue length: {}", t.data.gc_todo_len()).unwrap(); Ok(()) } } diff --git a/src/garage/repair.rs b/src/garage/repair.rs index 4ee664527..8200f1f01 100644 --- a/src/garage/repair.rs +++ b/src/garage/repair.rs @@ -16,11 +16,7 @@ pub struct Repair { } impl Repair { - pub async fn repair_worker( - &self, - opt: RepairOpt, - must_exit: watch::Receiver, - ) { + pub async fn repair_worker(&self, opt: RepairOpt, must_exit: watch::Receiver) { if let Err(e) = self.repair_worker_aux(opt, must_exit).await { warn!("Repair worker failed with error: {}", e); } diff --git a/src/garage/server.rs b/src/garage/server.rs index ce90ecab6..c45a69b80 100644 --- a/src/garage/server.rs +++ b/src/garage/server.rs @@ -47,10 +47,15 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing background runner..."); let (send_cancel, watch_cancel) = watch::channel(false); - let background = BackgroundRunner::new(16, watch_cancel.clone()); + let (background, await_background_done) = BackgroundRunner::new(16, watch_cancel.clone()); info!("Initializing Garage main data store..."); - let garage = Garage::new(config, db, background.clone(), &mut rpc_server); + let garage = Garage::new(config.clone(), db, background, &mut rpc_server); + let bootstrap = garage.system.clone().bootstrap( + &config.bootstrap_peers[..], + config.consul_host, + config.consul_service_name, + ); info!("Crate admin RPC handler..."); AdminRpcHandler::new(garage.clone()).register_handler(&mut rpc_server); @@ -58,21 +63,13 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Initializing RPC and API servers..."); let run_rpc_server = Arc::new(rpc_server).run(wait_from(watch_cancel.clone())); let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - let web_server = web_server::run_web_server(garage.clone(), wait_from(watch_cancel.clone())); + let web_server = web_server::run_web_server(garage, wait_from(watch_cancel.clone())); futures::try_join!( - garage - .system - .clone() - .bootstrap( - &garage.config.bootstrap_peers[..], - garage.config.consul_host.clone(), - garage.config.consul_service_name.clone() - ) - .map(|rv| { - info!("Bootstrap done"); - Ok(rv) - }), + bootstrap.map(|rv| { + info!("Bootstrap done"); + Ok(rv) + }), run_rpc_server.map(|rv| { info!("RPC server exited"); rv @@ -85,9 +82,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { info!("Web server exited"); rv }), - background.run().map(|rv| { - info!("Background runner exited"); - Ok(rv) + await_background_done.map(|rv| { + info!("Background runner exited: {:?}", rv); + Ok(()) }), shutdown_signal(send_cancel), )?; diff --git a/src/model/block.rs b/src/model/block.rs index 7185372c3..a39588664 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -254,19 +254,18 @@ impl BlockManager { Ok(()) } - async fn resync_loop( - self: Arc, - mut must_exit: watch::Receiver, - ) { + async fn resync_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { if let Err(e) = self.resync_iter(&mut must_exit).await { warn!("Error in block resync loop: {}", e); - tokio::time::sleep(Duration::from_secs(10)).await; + select! { + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => (), + _ = must_exit.changed().fuse() => (), + } } } } - async fn resync_iter(&self, must_exit: &mut watch::Receiver) -> Result<(), Error> { if let Some(first_item) = self.resync_queue.iter().next() { let (time_bytes, hash_bytes) = first_item?; @@ -280,7 +279,7 @@ impl BlockManager { self.put_to_resync(&hash, RESYNC_RETRY_TIMEOUT)?; } self.resync_queue.remove(&time_bytes)?; - res?; // propagate error to delay main loop + res?; // propagate error to delay main loop } else { let delay = tokio::time::sleep(Duration::from_millis(time_msec - now)); select! { diff --git a/src/model/key_table.rs b/src/model/key_table.rs index 88d7b4ffd..02dcf68cb 100644 --- a/src/model/key_table.rs +++ b/src/model/key_table.rs @@ -109,7 +109,8 @@ impl TableSchema for KeyTable { KeyFilter::Deleted(df) => df.apply(entry.deleted.get()), KeyFilter::Matches(pat) => { let pat = pat.to_lowercase(); - entry.key_id.to_lowercase().starts_with(&pat) || entry.name.get().to_lowercase() == pat + entry.key_id.to_lowercase().starts_with(&pat) + || entry.name.get().to_lowercase() == pat } } } diff --git a/src/model/object_table.rs b/src/model/object_table.rs index d08bba707..99fad3ce5 100644 --- a/src/model/object_table.rs +++ b/src/model/object_table.rs @@ -195,8 +195,7 @@ impl TableSchema for ObjectTable { fn updated(&self, old: Option, new: Option) { let version_table = self.version_table.clone(); - // TODO not cancellable - self.background.spawn_cancellable(async move { + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of old versions for v in old_v.versions.iter() { diff --git a/src/model/version_table.rs b/src/model/version_table.rs index 193438909..841fbfea3 100644 --- a/src/model/version_table.rs +++ b/src/model/version_table.rs @@ -110,8 +110,7 @@ impl TableSchema for VersionTable { fn updated(&self, old: Option, new: Option) { let block_ref_table = self.block_ref_table.clone(); - // TODO not cancellable - self.background.spawn_cancellable(async move { + self.background.spawn(async move { if let (Some(old_v), Some(new_v)) = (old, new) { // Propagate deletion of version blocks if new_v.deleted.get() && !old_v.deleted.get() { diff --git a/src/rpc/membership.rs b/src/rpc/membership.rs index 6cc3ed2e3..4e9822fac 100644 --- a/src/rpc/membership.rs +++ b/src/rpc/membership.rs @@ -11,9 +11,9 @@ use futures::future::join_all; use futures::select; use futures_util::future::*; use serde::{Deserialize, Serialize}; +use tokio::io::AsyncWriteExt; use tokio::sync::watch; use tokio::sync::Mutex; -use tokio::io::AsyncWriteExt; use garage_util::background::BackgroundRunner; use garage_util::data::*; @@ -316,17 +316,16 @@ impl System { self.clone().ping_nodes(bootstrap_peers).await; let self2 = self.clone(); - self.clone() - .background - .spawn_worker(format!("ping loop"), |stop_signal| self2.ping_loop(stop_signal)); + self.background + .spawn_worker(format!("ping loop"), |stop_signal| { + self2.ping_loop(stop_signal) + }); if let (Some(consul_host), Some(consul_service_name)) = (consul_host, consul_service_name) { let self2 = self.clone(); - self.clone() - .background + self.background .spawn_worker(format!("Consul loop"), |stop_signal| { - self2 - .consul_loop(stop_signal, consul_host, consul_service_name) + self2.consul_loop(stop_signal, consul_host, consul_service_name) }); } } @@ -531,7 +530,7 @@ impl System { .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) .map(Ok), ); - self.background.spawn(self.clone().save_network_config()).await; + self.background.spawn(self.clone().save_network_config()); } Ok(Message::Ok) @@ -568,7 +567,7 @@ impl System { consul_host: String, consul_service_name: String, ) { - loop { + while !*stop_signal.borrow() { let restart_at = tokio::time::sleep(CONSUL_INTERVAL); match get_consul_nodes(&consul_host, &consul_service_name).await { @@ -583,11 +582,7 @@ impl System { select! { _ = restart_at.fuse() => (), - _ = stop_signal.changed().fuse() => { - if *stop_signal.borrow() { - return; - } - } + _ = stop_signal.changed().fuse() => (), } } } diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index 215ab0318..a89b730cc 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -161,11 +161,11 @@ impl Ring { }) .collect::>(); - eprintln!("RING: --"); - for e in ring.iter() { - eprintln!("{:?}", e); - } - eprintln!("END --"); + // eprintln!("RING: --"); + // for e in ring.iter() { + // eprintln!("{:?}", e); + // } + // eprintln!("END --"); Self { config, ring } } diff --git a/src/rpc/rpc_client.rs b/src/rpc/rpc_client.rs index 602862562..cffcf106c 100644 --- a/src/rpc/rpc_client.rs +++ b/src/rpc/rpc_client.rs @@ -198,7 +198,7 @@ impl RpcClient { let wait_finished_fut = tokio::spawn(async move { resp_stream.collect::>().await; }); - self.background.spawn(wait_finished_fut.map(|_| Ok(()))).await; + self.background.spawn(wait_finished_fut.map(|_| Ok(()))); } Ok(results) diff --git a/src/rpc/rpc_server.rs b/src/rpc/rpc_server.rs index 3c5014c40..0c5bf6f9b 100644 --- a/src/rpc/rpc_server.rs +++ b/src/rpc/rpc_server.rs @@ -13,9 +13,9 @@ use hyper::service::{make_service_fn, service_fn}; use hyper::{Body, Method, Request, Response, Server, StatusCode}; use serde::{Deserialize, Serialize}; use tokio::net::{TcpListener, TcpStream}; -use tokio_stream::wrappers::TcpListenerStream; use tokio_rustls::server::TlsStream; use tokio_rustls::TlsAcceptor; +use tokio_stream::wrappers::TcpListenerStream; use garage_util::config::TlsConfig; use garage_util::data::*; @@ -52,7 +52,11 @@ where trace!( "Request message: {}", - serde_json::to_string(&msg).unwrap_or("".into()).chars().take(100).collect::() + serde_json::to_string(&msg) + .unwrap_or("".into()) + .chars() + .take(100) + .collect::() ); match handler(msg, sockaddr).await { diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 86289bf1b..60b7833f5 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -101,10 +101,7 @@ impl MerkleUpdater { ret } - async fn updater_loop( - self: Arc, - mut must_exit: watch::Receiver, - ) { + async fn updater_loop(self: Arc, mut must_exit: watch::Receiver) { while !*must_exit.borrow() { if let Some(x) = self.todo.iter().next() { match x { diff --git a/src/table/sync.rs b/src/table/sync.rs index 65231cd58..f8fef53cb 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -3,7 +3,7 @@ use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; -use futures::{select}; +use futures::select; use futures_util::future::*; use futures_util::stream::*; use rand::Rng; diff --git a/src/util/background.rs b/src/util/background.rs index 0ec9779ad..35d41d9f7 100644 --- a/src/util/background.rs +++ b/src/util/background.rs @@ -1,10 +1,11 @@ use core::future::Future; use std::pin::Pin; -use std::sync::Mutex; - -use arc_swap::ArcSwapOption; use std::sync::Arc; -use tokio::sync::{mpsc, watch}; +use std::time::Duration; + +use futures::future::*; +use futures::select; +use tokio::sync::{mpsc, watch, Mutex}; use crate::error::Error; @@ -14,99 +15,115 @@ type Job = Pin + Send>>; pub struct BackgroundRunner { pub stop_signal: watch::Receiver, - queue_in: ArcSwapOption>, - - workers: Mutex>>, + queue_in: mpsc::UnboundedSender<(Job, bool)>, + worker_in: mpsc::UnboundedSender>, } impl BackgroundRunner { - pub fn new(n_runners: usize, stop_signal: watch::Receiver) -> Arc { - let (queue_in, queue_out) = mpsc::unbounded_channel(); + pub fn new( + n_runners: usize, + stop_signal: watch::Receiver, + ) -> (Arc, tokio::task::JoinHandle<()>) { + let (worker_in, mut worker_out) = mpsc::unbounded_channel(); - let mut workers = vec![]; - let queue_out = Arc::new(tokio::sync::Mutex::new(queue_out)); + let stop_signal_2 = stop_signal.clone(); + let await_all_done = tokio::spawn(async move { + loop { + let wkr = { + select! { + item = worker_out.recv().fuse() => { + match item { + Some(x) => x, + None => break, + } + } + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => { + if *stop_signal_2.borrow() { + break; + } else { + continue; + } + } + } + }; + if let Err(e) = wkr.await { + error!("Error while awaiting for worker: {}", e); + } + } + }); + + let (queue_in, queue_out) = mpsc::unbounded_channel(); + let queue_out = Arc::new(Mutex::new(queue_out)); for i in 0..n_runners { let queue_out = queue_out.clone(); let stop_signal = stop_signal.clone(); - workers.push(tokio::spawn(async move { - while let Some((job, cancellable)) = queue_out.lock().await.recv().await { - if cancellable && *stop_signal.borrow() { - continue; + worker_in + .send(tokio::spawn(async move { + loop { + let (job, cancellable) = { + select! { + item = wait_job(&queue_out).fuse() => match item { + // We received a task, process it + Some(x) => x, + // We received a signal that no more tasks will ever be sent + // because the sending side was dropped. Exit now. + None => break, + }, + _ = tokio::time::sleep(Duration::from_secs(10)).fuse() => { + if *stop_signal.borrow() { + // Nothing has been going on for 10 secs, and we are shutting + // down. Exit now. + break; + } else { + // Nothing is going on but we don't want to exit. + continue; + } + } + } + }; + if cancellable && *stop_signal.borrow() { + continue; + } + if let Err(e) = job.await { + error!("Job failed: {}", e) + } } - if let Err(e) = job.await { - error!("Job failed: {}", e) - } - } - info!("Worker {} exiting", i); - })); + info!("Background worker {} exiting", i); + })) + .unwrap(); } - Arc::new(Self { + let bgrunner = Arc::new(Self { stop_signal, - queue_in: ArcSwapOption::new(Some(Arc::new(queue_in))), - workers: Mutex::new(workers), - }) - } - - pub async fn run(self: Arc) { - let mut stop_signal = self.stop_signal.clone(); - - loop { - let exit_now = match stop_signal.changed().await { - Ok(()) => *stop_signal.borrow(), - Err(e) => { - error!("Watch .changed() error: {}", e); - true - } - }; - if exit_now { - break; - } - } - - info!("Closing background job queue_in..."); - drop(self.queue_in.swap(None)); - - info!("Waiting for all workers to terminate..."); - while let Some(task) = self.workers.lock().unwrap().pop() { - if let Err(e) = task.await { - warn!("Error awaiting task: {}", e); - } - } + queue_in, + worker_in, + }); + (bgrunner, await_all_done) } // Spawn a task to be run in background - pub async fn spawn(&self, job: T) + pub fn spawn(&self, job: T) where T: Future + Send + 'static, { - match self.queue_in.load().as_ref() { - Some(chan) => { - let boxed: Job = Box::pin(job); - chan.send((boxed, false)).map_err(|_| "send error").unwrap(); - } - None => { - warn!("Doing background job now because we are exiting..."); - if let Err(e) = job.await { - warn!("Task failed: {}", e); - } - } - } + let boxed: Job = Box::pin(job); + self.queue_in + .send((boxed, false)) + .map_err(|_| "could not put job in queue") + .unwrap(); } pub fn spawn_cancellable(&self, job: T) where T: Future + Send + 'static, { - match self.queue_in.load().as_ref() { - Some(chan) => { - let boxed: Job = Box::pin(job); - chan.send((boxed, false)).map_err(|_| "send error").unwrap(); - } - None => (), // drop job if we are exiting - } + let boxed: Job = Box::pin(job); + self.queue_in + .send((boxed, true)) + .map_err(|_| "could not put job in queue") + .unwrap(); } pub fn spawn_worker(&self, name: String, worker: F) @@ -114,11 +131,19 @@ impl BackgroundRunner { F: FnOnce(watch::Receiver) -> T + Send + 'static, T: Future + Send + 'static, { - let mut workers = self.workers.lock().unwrap(); let stop_signal = self.stop_signal.clone(); - workers.push(tokio::spawn(async move { + let task = tokio::spawn(async move { + info!("Worker started: {}", name); worker(stop_signal).await; info!("Worker exited: {}", name); - })); + }); + self.worker_in + .send(task) + .map_err(|_| "could not put job in queue") + .unwrap(); } } + +async fn wait_job(q: &Mutex>) -> Option<(Job, bool)> { + q.lock().await.recv().await +}