Spawn all background workers in a separate step

This commit is contained in:
Alex 2022-12-14 12:28:07 +01:00
parent 83c8467e23
commit 2183518edc
Signed by: lx
GPG key ID: 0E496D15096376BE
12 changed files with 115 additions and 61 deletions

1
Cargo.lock generated
View file

@ -1243,6 +1243,7 @@ dependencies = [
name = "garage_table"
version = "0.8.0"
dependencies = [
"arc-swap",
"async-trait",
"bytes",
"futures",

View file

@ -3,6 +3,7 @@ use std::pin::Pin;
use std::sync::Arc;
use std::time::Duration;
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
@ -87,7 +88,7 @@ pub struct BlockManager {
pub(crate) metrics: BlockManagerMetrics,
tx_scrub_command: mpsc::Sender<ScrubWorkerCommand>,
tx_scrub_command: ArcSwapOption<mpsc::Sender<ScrubWorkerCommand>>,
}
#[derive(Serialize, Deserialize, Clone, Debug)]
@ -126,8 +127,6 @@ impl BlockManager {
let metrics =
BlockManagerMetrics::new(rc.rc.clone(), resync.queue.clone(), resync.errors.clone());
let (scrub_tx, scrub_rx) = mpsc::channel(1);
let block_manager = Arc::new(Self {
replication,
data_dir,
@ -138,21 +137,26 @@ impl BlockManager {
system,
endpoint,
metrics,
tx_scrub_command: scrub_tx,
tx_scrub_command: ArcSwapOption::new(None),
});
block_manager.endpoint.set_handler(block_manager.clone());
block_manager
}
pub fn spawn_workers(self: &Arc<Self>) {
// Spawn a bunch of resync workers
for index in 0..MAX_RESYNC_WORKERS {
let worker = ResyncWorker::new(index, block_manager.clone());
block_manager.system.background.spawn_worker(worker);
let worker = ResyncWorker::new(index, self.clone());
self.system.background.spawn_worker(worker);
}
// Spawn scrub worker
let scrub_worker = ScrubWorker::new(block_manager.clone(), scrub_rx);
block_manager.system.background.spawn_worker(scrub_worker);
block_manager
let (scrub_tx, scrub_rx) = mpsc::channel(1);
self.tx_scrub_command.store(Some(Arc::new(scrub_tx)));
self.system
.background
.spawn_worker(ScrubWorker::new(self.clone(), scrub_rx));
}
/// Ask nodes that might have a (possibly compressed) block for it
@ -325,8 +329,11 @@ impl BlockManager {
}
/// Send command to start/stop/manager scrub worker
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) {
let _ = self.tx_scrub_command.send(cmd).await;
pub async fn send_scrub_command(&self, cmd: ScrubWorkerCommand) -> Result<(), Error> {
let tx = self.tx_scrub_command.load();
let tx = tx.as_ref().ok_or_message("scrub worker is not running")?;
tx.send(cmd).await.ok_or_message("send error")?;
Ok(())
}
/// Get the reference count of a block

View file

@ -759,7 +759,7 @@ impl AdminRpcHandler {
)))
}
} else {
launch_online_repair(self.garage.clone(), opt).await;
launch_online_repair(self.garage.clone(), opt).await?;
Ok(AdminRpc::Ok(format!(
"Repair launched on {:?}",
self.garage.system.id
@ -944,7 +944,7 @@ impl AdminRpcHandler {
self.garage
.block_manager
.send_scrub_command(scrub_command)
.await;
.await?;
Ok(AdminRpc::Ok("Scrub tranquility updated".into()))
}
WorkerSetCmd::ResyncWorkerCount { worker_count } => {

View file

@ -15,15 +15,15 @@ use garage_util::error::Error;
use crate::*;
pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) -> Result<(), Error> {
match opt.what {
RepairWhat::Tables => {
info!("Launching a full sync of tables");
garage.bucket_table.syncer.add_full_sync();
garage.object_table.syncer.add_full_sync();
garage.version_table.syncer.add_full_sync();
garage.block_ref_table.syncer.add_full_sync();
garage.key_table.syncer.add_full_sync();
garage.bucket_table.syncer.add_full_sync()?;
garage.object_table.syncer.add_full_sync()?;
garage.version_table.syncer.add_full_sync()?;
garage.block_ref_table.syncer.add_full_sync()?;
garage.key_table.syncer.add_full_sync()?;
}
RepairWhat::Versions => {
info!("Repairing the versions table");
@ -56,9 +56,10 @@ pub async fn launch_online_repair(garage: Arc<Garage>, opt: RepairOpt) {
}
};
info!("Sending command to scrub worker: {:?}", cmd);
garage.block_manager.send_scrub_command(cmd).await;
garage.block_manager.send_scrub_command(cmd).await?;
}
}
Ok(())
}
// ----

View file

@ -42,6 +42,9 @@ pub async fn run_server(config_file: PathBuf) -> Result<(), Error> {
info!("Initializing Garage main data store...");
let garage = Garage::new(config.clone(), background)?;
info!("Spawning Garage workers...");
garage.spawn_workers();
if config.admin.trace_sink.is_some() {
info!("Initialize tracing...");

View file

@ -273,6 +273,22 @@ impl Garage {
}))
}
pub fn spawn_workers(&self) {
self.block_manager.spawn_workers();
self.bucket_table.spawn_workers();
self.bucket_alias_table.spawn_workers();
self.key_table.spawn_workers();
self.object_table.spawn_workers();
self.object_counter_table.spawn_workers();
self.version_table.spawn_workers();
self.block_ref_table.spawn_workers();
#[cfg(feature = "k2v")]
self.k2v.spawn_workers();
}
pub fn bucket_helper(&self) -> helper::bucket::BucketHelper {
helper::bucket::BucketHelper(self)
}
@ -307,4 +323,9 @@ impl GarageK2V {
rpc,
}
}
pub fn spawn_workers(&self) {
self.item_table.spawn_workers();
self.counter_table.spawn_workers();
}
}

View file

@ -164,6 +164,10 @@ impl<T: CountedItem> IndexCounter<T> {
})
}
pub fn spawn_workers(&self) {
self.table.spawn_workers();
}
pub fn count(
&self,
tx: &mut db::Transaction,

View file

@ -21,6 +21,7 @@ garage_util = { version = "0.8.0", path = "../util" }
opentelemetry = "0.17"
async-trait = "0.1.7"
arc-swap = "1.0"
bytes = "1.0"
hex = "0.4"
hexdump = "0.1"

View file

@ -54,24 +54,27 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
pub(crate) fn new(system: Arc<System>, data: Arc<TableData<F, R>>) -> Arc<Self> {
let endpoint = system
.netapp
.endpoint(format!("garage_table/gc.rs/Rpc:{}", F::TABLE_NAME));
let gc = Arc::new(Self {
system: system.clone(),
system,
data,
endpoint,
});
gc.endpoint.set_handler(gc.clone());
system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
pub(crate) fn spawn_workers(self: &Arc<Self>) {
self.system
.background
.spawn_worker(GcWorker::new(self.clone()));
}
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();

View file

@ -70,17 +70,17 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch(background: &BackgroundRunner, data: Arc<TableData<F, R>>) -> Arc<Self> {
pub(crate) fn new(data: Arc<TableData<F, R>>) -> Arc<Self> {
let empty_node_hash = blake2sum(&rmp_to_vec_all_named(&MerkleNode::Empty).unwrap()[..]);
let ret = Arc::new(Self {
Arc::new(Self {
data,
empty_node_hash,
});
})
}
background.spawn_worker(MerkleWorker(ret.clone()));
ret
pub(crate) fn spawn_workers(self: &Arc<Self>, background: &BackgroundRunner) {
background.spawn_worker(MerkleWorker(self.clone()));
}
fn updater_loop_iter(&self) -> Result<WorkerState, Error> {

View file

@ -2,6 +2,7 @@ use std::collections::VecDeque;
use std::sync::Arc;
use std::time::{Duration, Instant};
use arc_swap::ArcSwapOption;
use async_trait::async_trait;
use futures_util::stream::*;
use opentelemetry::KeyValue;
@ -13,7 +14,7 @@ use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
use garage_util::error::{Error, OkOrMessage};
use garage_rpc::ring::*;
use garage_rpc::system::System;
@ -32,7 +33,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
add_full_sync_tx: mpsc::UnboundedSender<()>,
add_full_sync_tx: ArcSwapOption<mpsc::UnboundedSender<()>>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@ -65,7 +66,7 @@ where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
pub(crate) fn launch(
pub(crate) fn new(
system: Arc<System>,
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
@ -74,34 +75,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
system,
data,
merkle,
add_full_sync_tx,
add_full_sync_tx: ArcSwapOption::new(None),
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
system.background.spawn_worker(SyncWorker {
syncer: syncer.clone(),
ring_recv: system.ring.clone(),
ring: system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
pub fn add_full_sync(&self) {
if self.add_full_sync_tx.send(()).is_err() {
error!("({}) Could not add full sync", F::TABLE_NAME);
}
pub(crate) fn spawn_workers(self: &Arc<Self>) {
let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
self.add_full_sync_tx
.store(Some(Arc::new(add_full_sync_tx)));
self.system.background.spawn_worker(SyncWorker {
syncer: self.clone(),
ring_recv: self.system.ring.clone(),
ring: self.system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
}
pub fn add_full_sync(&self) -> Result<(), Error> {
let tx = self.add_full_sync_tx.load();
let tx = tx
.as_ref()
.ok_or_message("table sync worker is not running")?;
tx.send(()).ok_or_message("send error")?;
Ok(())
}
// ----

View file

@ -36,6 +36,7 @@ pub struct Table<F: TableSchema + 'static, R: TableReplication + 'static> {
pub data: Arc<TableData<F, R>>,
pub merkle_updater: Arc<MerkleUpdater<F, R>>,
pub syncer: Arc<TableSyncer<F, R>>,
gc: Arc<TableGc<F, R>>,
endpoint: Arc<Endpoint<TableRpc<F>, Self>>,
}
@ -76,29 +77,34 @@ where
let data = TableData::new(system.clone(), instance, replication, db);
let merkle_updater = MerkleUpdater::launch(&system.background, data.clone());
let merkle_updater = MerkleUpdater::new(data.clone());
let syncer = TableSyncer::launch(system.clone(), data.clone(), merkle_updater.clone());
TableGc::launch(system.clone(), data.clone());
let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
let gc = TableGc::new(system.clone(), data.clone());
let table = Arc::new(Self {
system,
data,
merkle_updater,
gc,
syncer,
endpoint,
});
table
.system
.background
.spawn_worker(InsertQueueWorker(table.clone()));
table.endpoint.set_handler(table.clone());
table
}
pub fn spawn_workers(self: &Arc<Self>) {
self.merkle_updater.spawn_workers(&self.system.background);
self.syncer.spawn_workers();
self.gc.spawn_workers();
self.system
.background
.spawn_worker(InsertQueueWorker(self.clone()));
}
pub async fn insert(&self, e: &F::E) -> Result<(), Error> {
let tracer = opentelemetry::global::tracer("garage_table");
let span = tracer.start(format!("{} insert", F::TABLE_NAME));