Some improvements to Garage internals #451

Merged
lx merged 14 commits from internals-rework into main 2023-01-03 11:37:32 +00:00
3 changed files with 23 additions and 37 deletions
Showing only changes of commit 510b620108 - Show all commits

View file

@ -21,7 +21,6 @@ use netapp::peering::fullmesh::FullMeshPeeringStrategy;
use netapp::util::parse_and_resolve_peer_addr_async; use netapp::util::parse_and_resolve_peer_addr_async;
use netapp::{NetApp, NetworkKey, NodeID, NodeKey}; use netapp::{NetApp, NetworkKey, NodeID, NodeKey};
use garage_util::background::{self};
use garage_util::config::Config; use garage_util::config::Config;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use garage_util::config::KubernetesDiscoveryConfig; use garage_util::config::KubernetesDiscoveryConfig;
@ -622,11 +621,7 @@ impl System {
if info.cluster_layout_version > local_info.cluster_layout_version if info.cluster_layout_version > local_info.cluster_layout_version
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash || info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
{ {
let self2 = self.clone(); tokio::spawn(self.clone().pull_cluster_layout(from));
background::spawn(async move {
self2.pull_cluster_layout(from).await;
Ok(())
});
} }
self.node_status self.node_status
@ -668,16 +663,18 @@ impl System {
drop(update_ring); drop(update_ring);
let self2 = self.clone(); let self2 = self.clone();
background::spawn(async move { tokio::spawn(async move {
self2 if let Err(e) = self2
.rpc .rpc
.broadcast( .broadcast(
&self2.system_endpoint, &self2.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout), SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH), RequestStrategy::with_priority(PRIO_HIGH),
) )
.await?; .await
Ok(()) {
warn!("Error while broadcasting new cluster layout: {}", e);
}
}); });
self.save_cluster_layout().await?; self.save_cluster_layout().await?;
@ -766,12 +763,12 @@ impl System {
} }
for (node_id, node_addr) in ping_list { for (node_id, node_addr) in ping_list {
background::spawn( let self2 = self.clone();
self.netapp tokio::spawn(async move {
.clone() if let Err(e) = self2.netapp.clone().try_connect(node_addr, node_id).await {
.try_connect(node_addr, node_id) error!("{}\n{}", connect_error_message(node_addr, node_id), e);
.map(move |r| r.err_context(connect_error_message(node_addr, node_id))), }
); });
} }
} }

View file

@ -14,7 +14,7 @@ use opentelemetry::{
use garage_db as db; use garage_db as db;
use garage_util::background::{self, BackgroundRunner}; use garage_util::background::BackgroundRunner;
use garage_util::data::*; use garage_util::data::*;
use garage_util::error::Error; use garage_util::error::Error;
use garage_util::metrics::RecordDuration; use garage_util::metrics::RecordDuration;
@ -275,7 +275,11 @@ where
if not_all_same { if not_all_same {
let self2 = self.clone(); let self2 = self.clone();
let ent2 = ret_entry.clone(); let ent2 = ret_entry.clone();
background::spawn(async move { self2.repair_on_read(&who[..], ent2).await }); tokio::spawn(async move {
if let Err(e) = self2.repair_on_read(&who[..], ent2).await {
warn!("Error doing repair on read: {}", e);
}
});
} }
} }
@ -372,11 +376,12 @@ where
.into_iter() .into_iter()
.map(|k| ret.get(&k).unwrap().clone()) .map(|k| ret.get(&k).unwrap().clone())
.collect::<Vec<_>>(); .collect::<Vec<_>>();
background::spawn(async move { tokio::spawn(async move {
for v in to_repair { for v in to_repair {
self2.repair_on_read(&who[..], v).await?; if let Err(e) = self2.repair_on_read(&who[..], v).await {
warn!("Error doing repair on read: {}", e);
}
} }
Ok(())
}); });
} }

View file

@ -2,20 +2,15 @@
pub mod worker; pub mod worker;
use core::future::Future;
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::Arc; use std::sync::Arc;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch};
use crate::error::Error;
use worker::WorkerProcessor; use worker::WorkerProcessor;
pub use worker::{Worker, WorkerState}; pub use worker::{Worker, WorkerState};
pub(crate) type JobOutput = Result<(), Error>;
/// Job runner for futures and async functions /// Job runner for futures and async functions
pub struct BackgroundRunner { pub struct BackgroundRunner {
send_worker: mpsc::UnboundedSender<Box<dyn Worker>>, send_worker: mpsc::UnboundedSender<Box<dyn Worker>>,
@ -77,14 +72,3 @@ impl BackgroundRunner {
.expect("Could not put worker in queue"); .expect("Could not put worker in queue");
} }
} }
pub fn spawn<T>(job: T)
where
T: Future<Output = JobOutput> + Send + 'static,
{
tokio::spawn(async move {
if let Err(e) = job.await {
error!("{}", e);
}
});
}