From 53289b69e5037700689665b4edf20f2382ff15f6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Sat, 11 Apr 2020 18:51:11 +0200 Subject: [PATCH] Background task runner that replaces tokio::spawn --- src/api_server.rs | 33 ++++++++++++- src/background.rs | 113 ++++++++++++++++++++++++++++++++++++++++++++ src/error.rs | 6 +++ src/main.rs | 1 + src/membership.rs | 56 +++++++++++++++------- src/object_table.rs | 40 ++++++++++++++-- src/rpc_server.rs | 5 +- src/server.rs | 99 ++++++++++++++++++++++++-------------- 8 files changed, 291 insertions(+), 62 deletions(-) create mode 100644 src/background.rs diff --git a/src/api_server.rs b/src/api_server.rs index 153991b2..53c3e176 100644 --- a/src/api_server.rs +++ b/src/api_server.rs @@ -23,7 +23,7 @@ type BodyType = Box + Send + Unpin>; pub async fn run_api_server( garage: Arc, shutdown_signal: impl Future, -) -> Result<(), hyper::Error> { +) -> Result<(), Error> { let addr = ([0, 0, 0, 0], garage.system.config.api_port).into(); let service = make_service_fn(|conn: &AddrStream| { @@ -42,7 +42,8 @@ pub async fn run_api_server( let graceful = server.with_graceful_shutdown(shutdown_signal); println!("API server listening on http://{}", addr); - graceful.await + graceful.await?; + Ok(()) } async fn handler( @@ -92,6 +93,13 @@ async fn handler_inner( version_uuid ))))) } + &Method::DELETE => { + let version_uuid = handle_delete(garage, &bucket, &key).await?; + Ok(Response::new(Box::new(BytesBody::from(format!( + "{:?}\n", + version_uuid + ))))) + } _ => Err(Error::BadRequest(format!("Invalid method"))), } } @@ -257,6 +265,27 @@ impl BodyChunker { } } +async fn handle_delete(garage: Arc, bucket: &str, key: &str) -> Result { + let version_uuid = gen_uuid(); + + let mut object = Object { + bucket: bucket.into(), + key: key.into(), + versions: Vec::new(), + }; + object.versions.push(Box::new(ObjectVersion { + uuid: version_uuid.clone(), + timestamp: now_msec(), + mime_type: "application/x-delete-marker".into(), + size: 0, + is_complete: true, + data: ObjectVersionData::DeleteMarker, + })); + + garage.object_table.insert(&object).await?; + return Ok(version_uuid); +} + async fn handle_get( garage: Arc, bucket: &str, diff --git a/src/background.rs b/src/background.rs new file mode 100644 index 00000000..de0f07af --- /dev/null +++ b/src/background.rs @@ -0,0 +1,113 @@ +use core::future::Future; +use std::pin::Pin; + +use futures::future::join_all; +use std::sync::Arc; +use std::time::Duration; +use tokio::sync::Mutex; +use tokio::sync::{mpsc, watch}; + +use crate::error::Error; + +type JobOutput = Result<(), Error>; +type Job = Pin + Send>>; + +pub struct BackgroundRunner { + n_runners: usize, + stop_signal: watch::Receiver, + + queue_in: mpsc::UnboundedSender<(Job, bool)>, + queue_out: Mutex>, + + workers: Mutex>>, +} + +impl BackgroundRunner { + pub fn new(n_runners: usize, stop_signal: watch::Receiver) -> Arc { + let (queue_in, queue_out) = mpsc::unbounded_channel(); + Arc::new(Self { + n_runners, + stop_signal, + queue_in, + queue_out: Mutex::new(queue_out), + workers: Mutex::new(Vec::new()), + }) + } + + pub async fn run(self: Arc) { + let mut workers = self.workers.lock().await; + for _i in 0..self.n_runners { + workers.push(tokio::spawn(self.clone().runner())); + } + drop(workers); + + let mut stop_signal = self.stop_signal.clone(); + while let Some(exit_now) = stop_signal.recv().await { + if exit_now { + let mut workers = self.workers.lock().await; + let workers_vec = workers.drain(..).collect::>(); + join_all(workers_vec).await; + return; + } + } + } + + pub fn spawn(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + let _: Result<_, _> = self.queue_in.clone().send((boxed, false)); + } + + pub fn spawn_cancellable(&self, job: T) + where + T: Future + Send + 'static, + { + let boxed: Job = Box::pin(job); + let _: Result<_, _> = self.queue_in.clone().send((boxed, true)); + } + + pub async fn spawn_worker(self: Arc, worker: F) + where + F: FnOnce(watch::Receiver) -> T + Send + 'static, + T: Future + Send + 'static, + { + let mut workers = self.workers.lock().await; + let stop_signal = self.stop_signal.clone(); + workers.push(tokio::spawn(async move { + if let Err(e) = worker(stop_signal).await { + eprintln!("Worker stopped with error: {}", e); + } + })); + } + + async fn runner(self: Arc) { + let stop_signal = self.stop_signal.clone(); + loop { + let must_exit: bool = *stop_signal.borrow(); + if let Some(job) = self.dequeue_job(must_exit).await { + if let Err(e) = job.await { + eprintln!("Job failed: {}", e) + } + } else { + if must_exit { + return; + } + tokio::time::delay_for(Duration::from_secs(1)).await; + } + } + } + + async fn dequeue_job(&self, must_exit: bool) -> Option { + let mut queue = self.queue_out.lock().await; + while let Ok((job, cancellable)) = queue.try_recv() { + if cancellable && must_exit { + continue; + } else { + return Some(job); + } + } + None + } +} diff --git a/src/error.rs b/src/error.rs index 5eb5a960..7a116954 100644 --- a/src/error.rs +++ b/src/error.rs @@ -63,3 +63,9 @@ impl From> for Error { } } } + +impl From> for Error { + fn from(_e: tokio::sync::watch::error::SendError) -> Error { + Error::Message(format!("Watch send error")) + } +} diff --git a/src/main.rs b/src/main.rs index 15848f2e..533afcc7 100644 --- a/src/main.rs +++ b/src/main.rs @@ -2,6 +2,7 @@ mod data; mod error; mod proto; +mod background; mod membership; mod table; diff --git a/src/membership.rs b/src/membership.rs index b713c7a4..cb8dba99 100644 --- a/src/membership.rs +++ b/src/membership.rs @@ -8,10 +8,14 @@ use std::sync::Arc; use std::time::Duration; use futures::future::join_all; +use futures::select; +use futures_util::future::*; use sha2::{Digest, Sha256}; use tokio::prelude::*; +use tokio::sync::watch; use tokio::sync::RwLock; +use crate::background::BackgroundRunner; use crate::data::*; use crate::error::Error; use crate::proto::*; @@ -29,6 +33,8 @@ pub struct System { pub rpc_client: RpcClient, pub members: RwLock, + + pub background: Arc, } pub struct Members { @@ -181,7 +187,7 @@ fn read_network_config(metadata_dir: &PathBuf) -> Result { } impl System { - pub fn new(config: Config, id: UUID) -> Self { + pub fn new(config: Config, id: UUID, background: Arc) -> Self { let net_config = match read_network_config(&config.metadata_dir) { Ok(x) => x, Err(e) => { @@ -209,24 +215,24 @@ impl System { id, rpc_client: RpcClient::new(), members: RwLock::new(members), + background, } } - async fn save_network_config(self: Arc) { + async fn save_network_config(self: Arc) -> Result<(), Error> { let mut path = self.config.metadata_dir.clone(); path.push("network_config"); let members = self.members.read().await; let data = - rmp_to_vec_all_named(&members.config).expect("Error while encoding network config"); + rmp_to_vec_all_named(&members.config)?; drop(members); let mut f = tokio::fs::File::create(path.as_path()) - .await - .expect("Could not create network_config"); + .await?; f.write_all(&data[..]) - .await - .expect("Could not write network_config"); + .await?; + Ok(()) } pub async fn make_ping(&self) -> Message { @@ -260,7 +266,10 @@ impl System { .collect::>(); self.clone().ping_nodes(bootstrap_peers).await; - tokio::spawn(self.ping_loop()); + self.background + .clone() + .spawn_worker(|stop_signal| self.ping_loop(stop_signal).map(Ok)) + .await; } pub async fn ping_nodes(self: Arc, peers: Vec<(SocketAddr, Option)>) { @@ -294,10 +303,12 @@ impl System { }); } if is_new || members.status_hash != info.status_hash { - tokio::spawn(self.clone().pull_status(info.id.clone())); + self.background + .spawn_cancellable(self.clone().pull_status(info.id.clone()).map(Ok)); } if is_new || members.config.version < info.config_version { - tokio::spawn(self.clone().pull_config(info.id.clone())); + self.background + .spawn_cancellable(self.clone().pull_config(info.id.clone()).map(Ok)); } } else if let Some(id) = id_option { let remaining_attempts = members @@ -345,10 +356,10 @@ impl System { drop(members); if is_new || status_hash != ping.status_hash { - tokio::spawn(self.clone().pull_status(ping.id.clone())); + self.background.spawn_cancellable(self.clone().pull_status(ping.id.clone()).map(Ok)); } if is_new || config_version < ping.config_version { - tokio::spawn(self.clone().pull_config(ping.id.clone())); + self.background.spawn_cancellable(self.clone().pull_config(ping.id.clone()).map(Ok)); } Ok(self.make_ping().await) @@ -405,7 +416,7 @@ impl System { drop(members); if to_ping.len() > 0 { - tokio::spawn(self.clone().ping_nodes(to_ping)); + self.background.spawn_cancellable(self.clone().ping_nodes(to_ping).map(Ok)); } Ok(Message::Ok) @@ -420,17 +431,18 @@ impl System { members.config = adv.clone(); members.rebuild_ring(); - tokio::spawn( + self.background.spawn_cancellable( self.clone() - .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT), + .broadcast(Message::AdvertiseConfig(adv.clone()), PING_TIMEOUT) + .map(Ok), ); - tokio::spawn(self.clone().save_network_config()); + self.background.spawn(self.clone().save_network_config()); } Ok(Message::Ok) } - pub async fn ping_loop(self: Arc) { + pub async fn ping_loop(self: Arc, mut stop_signal: watch::Receiver) { loop { let restart_at = tokio::time::delay_for(PING_INTERVAL); @@ -445,7 +457,15 @@ impl System { self.clone().ping_nodes(ping_addrs).await; - restart_at.await + select! { + _ = restart_at.fuse() => (), + must_exit = stop_signal.recv().fuse() => { + match must_exit { + None | Some(true) => return, + _ => (), + } + } + } } } diff --git a/src/object_table.rs b/src/object_table.rs index 392e0dc7..7add9968 100644 --- a/src/object_table.rs +++ b/src/object_table.rs @@ -38,6 +38,12 @@ pub enum ObjectVersionData { FirstBlock(Hash), } +impl ObjectVersion { + fn cmp_key(&self) -> (u64, &UUID) { + (self.timestamp, &self.uuid) + } +} + impl Entry for Object { fn partition_key(&self) -> &String { &self.bucket @@ -48,9 +54,10 @@ impl Entry for Object { fn merge(&mut self, other: &Self) { for other_v in other.versions.iter() { - match self.versions.binary_search_by(|v| { - (v.timestamp, &v.uuid).cmp(&(other_v.timestamp, &other_v.uuid)) - }) { + match self + .versions + .binary_search_by(|v| v.cmp_key().cmp(&other_v.cmp_key())) + { Ok(i) => { let mut v = &mut self.versions[i]; if other_v.size > v.size { @@ -91,7 +98,30 @@ impl TableFormat for ObjectTable { type E = Object; async fn updated(&self, old: Option<&Self::E>, new: &Self::E) { - //unimplemented!() - // TODO + let old = old.cloned(); + let new = new.clone(); + let garage = self.garage.read().await.as_ref().cloned().unwrap(); + garage.clone().background.spawn(async move { + // Propagate deletion of old versions + if let Some(old_v) = old { + for v in old_v.versions.iter() { + if new + .versions + .binary_search_by(|nv| nv.cmp_key().cmp(&v.cmp_key())) + .is_err() + { + let deleted_version = Version { + uuid: v.uuid.clone(), + deleted: true, + blocks: vec![], + bucket: old_v.bucket.clone(), + key: old_v.key.clone(), + }; + garage.version_table.insert(&deleted_version).await?; + } + } + } + Ok(()) + }); } } diff --git a/src/rpc_server.rs b/src/rpc_server.rs index cbcfac79..98798614 100644 --- a/src/rpc_server.rs +++ b/src/rpc_server.rs @@ -90,7 +90,7 @@ async fn handler( pub async fn run_rpc_server( garage: Arc, shutdown_signal: impl Future, -) -> Result<(), hyper::Error> { +) -> Result<(), Error> { let bind_addr = ([0, 0, 0, 0], garage.system.config.rpc_port).into(); let service = make_service_fn(|conn: &AddrStream| { @@ -112,5 +112,6 @@ pub async fn run_rpc_server( let graceful = server.with_graceful_shutdown(shutdown_signal); println!("RPC server listening on http://{}", bind_addr); - graceful.await + graceful.await?; + Ok(()) } diff --git a/src/server.rs b/src/server.rs index b62c18cc..1f926bf0 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,13 +1,15 @@ -use futures::channel::oneshot; +use futures_util::future::FutureExt; use serde::Deserialize; use std::collections::HashMap; use std::io::{Read, Write}; use std::net::SocketAddr; use std::path::PathBuf; use std::sync::Arc; +use tokio::sync::watch; use tokio::sync::{Mutex, RwLock}; use crate::api_server; +use crate::background::*; use crate::data::*; use crate::error::Error; use crate::membership::System; @@ -15,10 +17,31 @@ use crate::proto::*; use crate::rpc_server; use crate::table::*; +#[derive(Deserialize, Debug)] +pub struct Config { + pub metadata_dir: PathBuf, + pub data_dir: PathBuf, + + pub api_port: u16, + pub rpc_port: u16, + + pub bootstrap_peers: Vec, + + #[serde(default = "default_block_size")] + pub block_size: usize, + + #[serde(default = "default_meta_replication_factor")] + pub meta_replication_factor: usize, + + #[serde(default = "default_data_replication_factor")] + pub data_replication_factor: usize, +} + pub struct Garage { pub db: sled::Db, pub system: Arc, pub fs_lock: Mutex<()>, + pub background: Arc, pub table_rpc_handlers: HashMap>, @@ -28,8 +51,13 @@ pub struct Garage { } impl Garage { - pub async fn new(config: Config, id: UUID, db: sled::Db) -> Arc { - let system = Arc::new(System::new(config, id)); + pub async fn new( + config: Config, + id: UUID, + db: sled::Db, + background: Arc, + ) -> Arc { + let system = Arc::new(System::new(config, id, background.clone())); let meta_rep_param = TableReplicationParams { replication_factor: system.config.meta_replication_factor, @@ -56,6 +84,14 @@ impl Garage { "version".to_string(), meta_rep_param.clone(), )); + + let data_rep_param = TableReplicationParams { + replication_factor: system.config.data_replication_factor, + write_quorum: (system.config.data_replication_factor + 1) / 2, + read_quorum: 1, + timeout: DEFAULT_TIMEOUT, + }; + let block_ref_table = Arc::new(Table::new( BlockRefTable { garage: RwLock::new(None), @@ -63,13 +99,14 @@ impl Garage { system.clone(), &db, "block_ref".to_string(), - meta_rep_param.clone(), + data_rep_param.clone(), )); let mut garage = Self { db, system: system.clone(), fs_lock: Mutex::new(()), + background, table_rpc_handlers: HashMap::new(), object_table, version_table, @@ -105,22 +142,8 @@ fn default_block_size() -> usize { fn default_meta_replication_factor() -> usize { 3 } - -#[derive(Deserialize, Debug)] -pub struct Config { - pub metadata_dir: PathBuf, - pub data_dir: PathBuf, - - pub api_port: u16, - pub rpc_port: u16, - - pub bootstrap_peers: Vec, - - #[serde(default = "default_block_size")] - pub block_size: usize, - - #[serde(default = "default_meta_replication_factor")] - pub meta_replication_factor: usize, +fn default_data_replication_factor() -> usize { + 3 } fn read_config(config_file: PathBuf) -> Result { @@ -157,19 +180,22 @@ fn gen_node_id(metadata_dir: &PathBuf) -> Result { } } -async fn shutdown_signal(chans: Vec>) { +async fn shutdown_signal(send_cancel: watch::Sender) -> Result<(), Error> { // Wait for the CTRL+C signal tokio::signal::ctrl_c() .await .expect("failed to install CTRL+C signal handler"); println!("Received CTRL+C, shutting down."); - for ch in chans { - ch.send(()).unwrap(); - } + send_cancel.broadcast(true)?; + Ok(()) } -async fn wait_from(chan: oneshot::Receiver<()>) -> () { - chan.await.unwrap() +async fn wait_from(mut chan: watch::Receiver) -> () { + while let Some(exit_now) = chan.recv().await { + if exit_now { + return; + } + } } pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { @@ -182,17 +208,20 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> { let id = gen_node_id(&config.metadata_dir).expect("Unable to read or generate node ID"); println!("Node ID: {}", hex::encode(&id)); - let garage = Garage::new(config, id, db).await; + let (send_cancel, watch_cancel) = watch::channel(false); - let (tx1, rx1) = oneshot::channel(); - let (tx2, rx2) = oneshot::channel(); + let background = BackgroundRunner::new(8, watch_cancel.clone()); + let garage = Garage::new(config, id, db, background.clone()).await; - let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(rx1)); - let api_server = api_server::run_api_server(garage.clone(), wait_from(rx2)); + let rpc_server = rpc_server::run_rpc_server(garage.clone(), wait_from(watch_cancel.clone())); + let api_server = api_server::run_api_server(garage.clone(), wait_from(watch_cancel.clone())); - tokio::spawn(shutdown_signal(vec![tx1, tx2])); - tokio::spawn(garage.system.clone().bootstrap()); - - futures::try_join!(rpc_server, api_server)?; + futures::try_join!( + garage.system.clone().bootstrap().map(Ok), + rpc_server, + api_server, + background.run().map(Ok), + shutdown_signal(send_cancel), + )?; Ok(()) }