New worker semantics applied to garage_table
continuous-integration/drone/push Build is failing Details

This commit is contained in:
Alex 2022-06-21 13:50:55 +02:00
parent e12bc3b595
commit 3119ea59b0
Signed by: lx
GPG Key ID: 0E496D15096376BE
6 changed files with 248 additions and 202 deletions

View File

@ -8,12 +8,11 @@ use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use futures::future::join_all;
use futures::select;
use futures_util::future::*;
use tokio::sync::watch;
use garage_db::counted_tree_hack::CountedTree;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::*;
use garage_util::time::*;
@ -69,35 +68,11 @@ where
gc.endpoint.set_handler(gc.clone());
let gc1 = gc.clone();
system.background.spawn_worker(
format!("GC loop for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| gc1.gc_loop(must_exit),
);
system.background.spawn_worker(GcWorker::new(gc.clone()));
gc
}
async fn gc_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.gc_loop_iter().await {
Ok(None) => {
// Stuff was done, loop immediately
}
Ok(Some(wait_delay)) => {
// Nothing was done, wait specified delay.
select! {
_ = tokio::time::sleep(wait_delay).fuse() => {},
_ = must_exit.changed().fuse() => {},
}
}
Err(e) => {
warn!("({}) Error doing GC: {}", F::TABLE_NAME, e);
}
}
}
}
async fn gc_loop_iter(&self) -> Result<Option<Duration>, Error> {
let now = now_msec();
@ -328,6 +303,57 @@ where
}
}
struct GcWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
gc: Arc<TableGc<F, R>>,
wait_delay: Duration,
}
impl<F, R> GcWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn new(gc: Arc<TableGc<F, R>>) -> Self {
Self {
gc,
wait_delay: Duration::from_secs(0),
}
}
}
#[async_trait]
impl<F, R> Worker for GcWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String {
format!("Table GC: {}", F::TABLE_NAME)
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> {
match self.gc.gc_loop_iter().await? {
None => Ok(WorkerStatus::Busy),
Some(delay) => {
self.wait_delay = delay;
Ok(WorkerStatus::Idle)
}
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
tokio::time::sleep(self.wait_delay).await;
WorkerStatus::Busy
}
}
/// An entry stored in the gc_todo Sled tree associated with the table
/// Contains helper function for parsing, saving, and removing
/// such entry in Sled

View File

@ -1,14 +1,13 @@
use std::sync::Arc;
use std::time::Duration;
use futures::select;
use futures_util::future::*;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use garage_db as db;
use garage_util::background::BackgroundRunner;
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@ -78,43 +77,17 @@ where
empty_node_hash,
});
let ret2 = ret.clone();
background.spawn_worker(
format!("Merkle tree updater for {}", F::TABLE_NAME),
|must_exit: watch::Receiver<bool>| ret2.updater_loop(must_exit),
);
background.spawn_worker(MerkleWorker(ret.clone()));
ret
}
async fn updater_loop(self: Arc<Self>, mut must_exit: watch::Receiver<bool>) {
while !*must_exit.borrow() {
match self.updater_loop_iter() {
Ok(true) => (),
Ok(false) => {
select! {
_ = self.data.merkle_todo_notify.notified().fuse() => {},
_ = must_exit.changed().fuse() => {},
}
}
Err(e) => {
warn!(
"({}) Error while updating Merkle tree item: {}",
F::TABLE_NAME,
e
);
tokio::time::sleep(Duration::from_secs(10)).await;
}
}
}
}
fn updater_loop_iter(&self) -> Result<bool, Error> {
fn updater_loop_iter(&self) -> Result<WorkerStatus, Error> {
if let Some((key, valhash)) = self.data.merkle_todo.first()? {
self.update_item(&key, &valhash)?;
Ok(true)
Ok(WorkerStatus::Busy)
} else {
Ok(false)
Ok(WorkerStatus::Idle)
}
}
@ -325,6 +298,34 @@ where
}
}
struct MerkleWorker<F, R>(Arc<MerkleUpdater<F, R>>)
where
F: TableSchema + 'static,
R: TableReplication + 'static;
#[async_trait]
impl<F, R> Worker for MerkleWorker<F, R>
where
F: TableSchema + 'static,
R: TableReplication + 'static,
{
fn name(&self) -> String {
format!("Merkle tree updater: {}", F::TABLE_NAME)
}
async fn work(
&mut self,
_must_exit: &mut watch::Receiver<bool>,
) -> Result<WorkerStatus, Error> {
self.0.updater_loop_iter()
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
tokio::time::sleep(Duration::from_secs(10)).await;
WorkerStatus::Busy
}
}
impl MerkleNodeKey {
fn encode(&self) -> Vec<u8> {
let mut ret = Vec::with_capacity(2 + self.prefix.len());

View File

@ -1,17 +1,17 @@
use std::collections::VecDeque;
use std::sync::{Arc, Mutex};
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use futures::select;
use futures_util::future::*;
use futures_util::stream::*;
use opentelemetry::KeyValue;
use rand::Rng;
use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf;
use tokio::select;
use tokio::sync::{mpsc, watch};
use garage_util::background::*;
use garage_util::data::*;
use garage_util::error::Error;
@ -34,7 +34,7 @@ pub struct TableSyncer<F: TableSchema + 'static, R: TableReplication + 'static>
data: Arc<TableData<F, R>>,
merkle: Arc<MerkleUpdater<F, R>>,
todo: Mutex<SyncTodo>,
add_full_sync_tx: mpsc::UnboundedSender<()>,
endpoint: Arc<Endpoint<SyncRpc, Self>>,
}
@ -52,10 +52,6 @@ impl Rpc for SyncRpc {
type Response = Result<SyncRpc, Error>;
}
struct SyncTodo {
todo: Vec<TodoPartition>,
}
#[derive(Debug, Clone)]
struct TodoPartition {
partition: Partition,
@ -80,118 +76,40 @@ where
.netapp
.endpoint(format!("garage_table/sync.rs/Rpc:{}", F::TABLE_NAME));
let todo = SyncTodo { todo: vec![] };
let (add_full_sync_tx, add_full_sync_rx) = mpsc::unbounded_channel();
let syncer = Arc::new(Self {
system: system.clone(),
data,
merkle,
todo: Mutex::new(todo),
add_full_sync_tx,
endpoint,
});
syncer.endpoint.set_handler(syncer.clone());
let (busy_tx, busy_rx) = mpsc::unbounded_channel();
let s1 = syncer.clone();
system.background.spawn_worker(
format!("table sync watcher for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| s1.watcher_task(must_exit, busy_rx),
);
let s2 = syncer.clone();
system.background.spawn_worker(
format!("table syncer for {}", F::TABLE_NAME),
move |must_exit: watch::Receiver<bool>| s2.syncer_task(must_exit, busy_tx),
);
let s3 = syncer.clone();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(20)).await;
s3.add_full_sync();
system.background.spawn_worker(SyncWorker {
syncer: syncer.clone(),
ring_recv: system.ring.clone(),
ring: system.ring.borrow().clone(),
add_full_sync_rx,
todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20),
});
syncer
}
async fn watcher_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
mut busy_rx: mpsc::UnboundedReceiver<bool>,
) {
let mut prev_ring: Arc<Ring> = self.system.ring.borrow().clone();
let mut ring_recv: watch::Receiver<Arc<Ring>> = self.system.ring.clone();
let mut nothing_to_do_since = Some(Instant::now());
while !*must_exit.borrow() {
select! {
_ = ring_recv.changed().fuse() => {
let new_ring = ring_recv.borrow();
if !Arc::ptr_eq(&new_ring, &prev_ring) {
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
prev_ring = new_ring.clone();
}
}
busy_opt = busy_rx.recv().fuse() => {
if let Some(busy) = busy_opt {
if busy {
nothing_to_do_since = None;
} else if nothing_to_do_since.is_none() {
nothing_to_do_since = Some(Instant::now());
}
}
}
_ = must_exit.changed().fuse() => {},
_ = tokio::time::sleep(Duration::from_secs(1)).fuse() => {
if nothing_to_do_since.map(|t| Instant::now() - t >= ANTI_ENTROPY_INTERVAL).unwrap_or(false) {
nothing_to_do_since = None;
debug!("({}) Interval passed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
}
}
}
}
}
pub fn add_full_sync(&self) {
self.todo
.lock()
.unwrap()
.add_full_sync(&self.data, &self.system);
}
async fn syncer_task(
self: Arc<Self>,
mut must_exit: watch::Receiver<bool>,
busy_tx: mpsc::UnboundedSender<bool>,
) {
while !*must_exit.borrow() {
let task = self.todo.lock().unwrap().pop_task();
if let Some(partition) = task {
busy_tx.send(true).unwrap();
let res = self
.clone()
.sync_partition(&partition, &mut must_exit)
.await;
if let Err(e) = res {
warn!(
"({}) Error while syncing {:?}: {}",
F::TABLE_NAME,
partition,
e
);
}
} else {
busy_tx.send(false).unwrap();
tokio::time::sleep(Duration::from_secs(1)).await;
}
if self.add_full_sync_tx.send(()).is_err() {
error!("({}) Could not add full sync", F::TABLE_NAME);
}
}
// ----
async fn sync_partition(
self: Arc<Self>,
self: &Arc<Self>,
partition: &TodoPartition,
must_exit: &mut watch::Receiver<bool>,
) -> Result<(), Error> {
@ -577,12 +495,22 @@ where
}
}
impl SyncTodo {
fn add_full_sync<F: TableSchema, R: TableReplication>(
&mut self,
data: &TableData<F, R>,
system: &System,
) {
// -------- Sync Worker ---------
struct SyncWorker<F: TableSchema + 'static, R: TableReplication + 'static> {
syncer: Arc<TableSyncer<F, R>>,
ring_recv: watch::Receiver<Arc<Ring>>,
ring: Arc<Ring>,
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
todo: Vec<TodoPartition>,
next_full_sync: Instant,
}
impl<F: TableSchema + 'static, R: TableReplication + 'static> SyncWorker<F, R> {
fn add_full_sync(&mut self) {
let system = &self.syncer.system;
let data = &self.syncer.data;
let my_id = system.id;
self.todo.clear();
@ -623,6 +551,8 @@ impl SyncTodo {
retain,
});
}
self.next_full_sync = Instant::now() + ANTI_ENTROPY_INTERVAL;
}
fn pop_task(&mut self) -> Option<TodoPartition> {
@ -641,6 +571,51 @@ impl SyncTodo {
}
}
#[async_trait]
impl<F: TableSchema + 'static, R: TableReplication + 'static> Worker for SyncWorker<F, R> {
fn name(&self) -> String {
format!("Table sync worker for {}", F::TABLE_NAME)
}
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error> {
if let Some(partition) = self.pop_task() {
self.syncer.sync_partition(&partition, must_exit).await?;
Ok(WorkerStatus::Busy)
} else {
Ok(WorkerStatus::Idle)
}
}
async fn wait_for_work(&mut self, _must_exit: &watch::Receiver<bool>) -> WorkerStatus {
select! {
s = self.add_full_sync_rx.recv() => match s {
Some(()) => {
self.add_full_sync();
}
None => (),
},
_ = self.ring_recv.changed() => {
let new_ring = self.ring_recv.borrow();
if !Arc::ptr_eq(&new_ring, &self.ring) {
self.ring = new_ring.clone();
drop(new_ring);
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync();
}
},
_ = tokio::time::sleep(self.next_full_sync - Instant::now()) => {
self.add_full_sync();
}
}
match self.todo.is_empty() {
false => WorkerStatus::Busy,
true => WorkerStatus::Idle,
}
}
}
// ---- UTIL ----
fn hash_of<T: Serialize>(x: &T) -> Result<Hash, Error> {
Ok(blake2sum(&rmp_to_vec_all_named(x)?[..]))
}

View File

@ -34,16 +34,15 @@ impl Worker for JobWorker {
}
}
async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus {
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus {
loop {
match self.job_chan.lock().await.recv().await {
Some((job, cancellable)) => {
if cancellable && *must_exit.borrow() {
// skip job
continue;
}
self.next_job = Some(job);
return WorkerStatus::Busy
return WorkerStatus::Busy;
}
None => return WorkerStatus::Done,
}

View File

@ -10,7 +10,8 @@ use std::sync::Arc;
use tokio::sync::{mpsc, watch, Mutex};
use crate::error::Error;
use worker::{Worker, WorkerProcessor};
use worker::WorkerProcessor;
pub use worker::{Worker, WorkerStatus};
pub(crate) type JobOutput = Result<(), Error>;
pub(crate) type Job = Pin<Box<dyn Future<Output = JobOutput> + Send>>;
@ -30,9 +31,7 @@ impl BackgroundRunner {
let (send_worker, worker_out) = mpsc::unbounded_channel::<Box<dyn Worker>>();
let await_all_done =
tokio::spawn(
async move { WorkerProcessor::new(worker_out, stop_signal).run().await },
);
tokio::spawn(async move { WorkerProcessor::new(worker_out, stop_signal).run().await });
let (send_job, queue_out) = mpsc::unbounded_channel();
let queue_out = Arc::new(Mutex::new(queue_out));
@ -40,11 +39,14 @@ impl BackgroundRunner {
for i in 0..n_runners {
let queue_out = queue_out.clone();
send_worker.send(Box::new(job_worker::JobWorker {
index: i,
job_chan: queue_out.clone(),
next_job: None,
})).ok().unwrap();
send_worker
.send(Box::new(job_worker::JobWorker {
index: i,
job_chan: queue_out.clone(),
next_job: None,
}))
.ok()
.unwrap();
}
let bgrunner = Arc::new(Self {

View File

@ -1,16 +1,16 @@
use std::time::{Duration, Instant};
use tracing::*;
use async_trait::async_trait;
use futures::future::*;
use tokio::select;
use futures::stream::FuturesUnordered;
use futures::StreamExt;
use tokio::select;
use tokio::sync::{mpsc, watch};
use tracing::*;
use crate::error::Error;
#[derive(PartialEq, Copy, Clone)]
#[derive(PartialEq, Copy, Clone, Debug)]
pub enum WorkerStatus {
Busy,
Idle,
@ -20,8 +20,20 @@ pub enum WorkerStatus {
#[async_trait]
pub trait Worker: Send {
fn name(&self) -> String;
/// Work: do a basic unit of work, if one is available (otherwise, should return
/// WorkerStatus::Idle immediately). We will do our best to not interrupt this future in the
/// middle of processing, it will only be interrupted at the last minute when Garage is trying
/// to exit and this hasn't returned yet. This function may return an error to indicate that
/// its unit of work could not be processed due to an error: the error will be logged and
/// .work() will be called again immediately.
async fn work(&mut self, must_exit: &mut watch::Receiver<bool>) -> Result<WorkerStatus, Error>;
async fn wait_for_work(&mut self, must_exit: &mut watch::Receiver<bool>) -> WorkerStatus;
/// Wait for work: await for some task to become available. This future can be interrupted in
/// the middle for any reason. This future doesn't have to await on must_exit.changed(), we
/// are doing it for you. Therefore it only receives a read refernce to must_exit which allows
/// it to check if we are exiting.
async fn wait_for_work(&mut self, must_exit: &watch::Receiver<bool>) -> WorkerStatus;
}
pub(crate) struct WorkerProcessor {
@ -58,10 +70,12 @@ impl WorkerProcessor {
let task_id = next_task_id;
next_task_id += 1;
let stop_signal = self.stop_signal.clone();
let stop_signal_worker = self.stop_signal.clone();
workers.push(async move {
let mut worker = WorkerHandler {
task_id,
stop_signal,
stop_signal_worker,
worker: new_worker,
status: WorkerStatus::Busy,
};
@ -91,15 +105,22 @@ impl WorkerProcessor {
let drain_half_time = Instant::now() + Duration::from_secs(5);
let drain_everything = async move {
while let Some(mut worker) = workers.next().await {
if worker.status == WorkerStatus::Busy
|| (worker.status == WorkerStatus::Idle && Instant::now() < drain_half_time)
{
workers.push(async move {
worker.step().await;
worker
}.boxed());
if worker.status == WorkerStatus::Done {
info!(
"Worker {} (TID {}) exited",
worker.worker.name(),
worker.task_id
);
} else if Instant::now() > drain_half_time {
warn!("Worker {} (TID {}) interrupted between two iterations in state {:?} (this should be fine)", worker.worker.name(), worker.task_id, worker.status);
} else {
info!("Worker {} (TID {}) exited", worker.worker.name(), worker.task_id);
workers.push(
async move {
worker.step().await;
worker
}
.boxed(),
);
}
}
};
@ -109,7 +130,7 @@ impl WorkerProcessor {
info!("All workers exited in time \\o/");
}
_ = tokio::time::sleep(Duration::from_secs(9)) => {
warn!("Some workers could not exit in time, we are cancelling some things in the middle.");
error!("Some workers could not exit in time, we are cancelling some things in the middle");
}
}
}
@ -119,27 +140,49 @@ impl WorkerProcessor {
struct WorkerHandler {
task_id: usize,
stop_signal: watch::Receiver<bool>,
stop_signal_worker: watch::Receiver<bool>,
worker: Box<dyn Worker>,
status: WorkerStatus,
}
impl WorkerHandler {
async fn step(&mut self) {
async fn step(&mut self) {
match self.status {
WorkerStatus::Busy => {
match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
self.status = s;
WorkerStatus::Busy => match self.worker.work(&mut self.stop_signal).await {
Ok(s) => {
self.status = s;
}
Err(e) => {
error!(
"Error in worker {} (TID {}): {}",
self.worker.name(),
self.task_id,
e
);
}
},
WorkerStatus::Idle => {
if *self.stop_signal.borrow() {
select! {
new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => {
self.status = new_st;
}
_ = tokio::time::sleep(Duration::from_secs(1)) => {
// stay in Idle state
}
}
Err(e) => {
error!("Error in worker {}: {}", self.worker.name(), e);
} else {
select! {
new_st = self.worker.wait_for_work(&mut self.stop_signal_worker) => {
self.status = new_st;
}
_ = self.stop_signal.changed() => {
// stay in Idle state
}
}
}
}
WorkerStatus::Idle => {
self.status = self.worker.wait_for_work(&mut self.stop_signal).await;
}
WorkerStatus::Done => unreachable!()
WorkerStatus::Done => unreachable!(),
}
}
}