Compare commits

...

2 commits

Author SHA1 Message Date
bfb1845fdc
layout: refactor to use a RwLock on LayoutHistory
Some checks failed
continuous-integration/drone/pr Build is failing
continuous-integration/drone/push Build is failing
2023-11-09 14:12:05 +01:00
19ef1ec8e7
layout: more refactoring 2023-11-09 13:34:14 +01:00
8 changed files with 133 additions and 117 deletions

View file

@ -210,7 +210,7 @@ pub async fn handle_update_cluster_layout(
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?; let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
let mut layout = garage.system.cluster_layout().as_ref().clone(); let mut layout = garage.system.cluster_layout().clone();
let mut roles = layout.current().roles.clone(); let mut roles = layout.current().roles.clone();
roles.merge(&layout.staging.get().roles); roles.merge(&layout.staging.get().roles);
@ -256,7 +256,7 @@ pub async fn handle_apply_cluster_layout(
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let param = parse_json_body::<ApplyLayoutRequest>(req).await?; let param = parse_json_body::<ApplyLayoutRequest>(req).await?;
let layout = garage.system.cluster_layout().as_ref().clone(); let layout = garage.system.cluster_layout().clone();
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?; let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
garage garage
@ -273,7 +273,7 @@ pub async fn handle_apply_cluster_layout(
} }
pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> { pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
let layout = garage.system.cluster_layout().as_ref().clone(); let layout = garage.system.cluster_layout().clone();
let layout = layout.revert_staged_changes()?; let layout = layout.revert_staged_changes()?;
garage garage
.system .system

View file

@ -5,7 +5,6 @@ use serde::Serialize;
use garage_util::data::*; use garage_util::data::*;
use garage_rpc::layout::LayoutHistory;
use garage_table::util::*; use garage_table::util::*;
use garage_model::garage::Garage; use garage_model::garage::Garage;
@ -26,7 +25,8 @@ pub async fn handle_read_index(
) -> Result<Response<Body>, Error> { ) -> Result<Response<Body>, Error> {
let reverse = reverse.unwrap_or(false); let reverse = reverse.unwrap_or(false);
let layout: Arc<LayoutHistory> = garage.system.cluster_layout().clone(); // TODO: not only current
let node_id_vec = garage.system.cluster_layout().current().node_ids().to_vec();
let (partition_keys, more, next_start) = read_range( let (partition_keys, more, next_start) = read_range(
&garage.k2v.counter_table.table, &garage.k2v.counter_table.table,
@ -35,10 +35,7 @@ pub async fn handle_read_index(
&start, &start,
&end, &end,
limit, limit,
Some(( Some((DeletedFilter::NotDeleted, node_id_vec)),
DeletedFilter::NotDeleted,
layout.current().node_id_vec.clone(),
)),
EnumerationOrder::from_reverse(reverse), EnumerationOrder::from_reverse(reverse),
) )
.await?; .await?;
@ -57,7 +54,7 @@ pub async fn handle_read_index(
partition_keys: partition_keys partition_keys: partition_keys
.into_iter() .into_iter()
.map(|part| { .map(|part| {
let vals = part.filtered_values(&layout); let vals = part.filtered_values(&garage.system.cluster_layout());
ReadIndexResponseEntry { ReadIndexResponseEntry {
pk: part.sk, pk: part.sk,
entries: *vals.get(&s_entries).unwrap_or(&0), entries: *vals.get(&s_entries).unwrap_or(&0),

View file

@ -450,10 +450,8 @@ impl<'a> BucketHelper<'a> {
#[cfg(feature = "k2v")] #[cfg(feature = "k2v")]
{ {
use garage_rpc::layout::LayoutHistory; // TODO: not only current
use std::sync::Arc; let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec();
let layout: Arc<LayoutHistory> = self.0.system.cluster_layout().clone();
let k2vindexes = self let k2vindexes = self
.0 .0
.k2v .k2v
@ -462,10 +460,7 @@ impl<'a> BucketHelper<'a> {
.get_range( .get_range(
&bucket_id, &bucket_id,
None, None,
Some(( Some((DeletedFilter::NotDeleted, node_id_vec)),
DeletedFilter::NotDeleted,
layout.current().node_id_vec.clone(),
)),
10, 10,
EnumerationOrder::Forward, EnumerationOrder::Forward,
) )

View file

@ -1,8 +1,9 @@
use std::sync::Arc; use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::Duration; use std::time::Duration;
use tokio::sync::watch; use serde::{Deserialize, Serialize};
use tokio::sync::Mutex;
use tokio::sync::Notify;
use netapp::endpoint::Endpoint; use netapp::endpoint::Endpoint;
use netapp::peering::fullmesh::FullMeshPeeringStrategy; use netapp::peering::fullmesh::FullMeshPeeringStrategy;
@ -21,13 +22,23 @@ pub struct LayoutManager {
replication_factor: usize, replication_factor: usize,
persist_cluster_layout: Persister<LayoutHistory>, persist_cluster_layout: Persister<LayoutHistory>,
pub layout_watch: watch::Receiver<Arc<LayoutHistory>>, layout: Arc<RwLock<LayoutHistory>>,
update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>, pub(crate) change_notify: Arc<Notify>,
pub(crate) rpc_helper: RpcHelper, pub(crate) rpc_helper: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
} }
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
pub struct LayoutStatus {
/// Cluster layout version
pub cluster_layout_version: u64,
/// Hash of cluster layout update trackers
// (TODO) pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
}
impl LayoutManager { impl LayoutManager {
pub fn new( pub fn new(
config: &Config, config: &Config,
@ -35,7 +46,7 @@ impl LayoutManager {
system_endpoint: Arc<Endpoint<SystemRpc, System>>, system_endpoint: Arc<Endpoint<SystemRpc, System>>,
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
replication_factor: usize, replication_factor: usize,
) -> Result<Self, Error> { ) -> Result<Arc<Self>, Error> {
let persist_cluster_layout: Persister<LayoutHistory> = let persist_cluster_layout: Persister<LayoutHistory> =
Persister::new(&config.metadata_dir, "cluster_layout"); Persister::new(&config.metadata_dir, "cluster_layout");
@ -59,37 +70,49 @@ impl LayoutManager {
} }
}; };
let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout)); let layout = Arc::new(RwLock::new(cluster_layout));
let change_notify = Arc::new(Notify::new());
let rpc_helper = RpcHelper::new( let rpc_helper = RpcHelper::new(
node_id.into(), node_id.into(),
fullmesh, fullmesh,
layout_watch.clone(), layout.clone(),
config.rpc_timeout_msec.map(Duration::from_millis), config.rpc_timeout_msec.map(Duration::from_millis),
); );
Ok(Self { Ok(Arc::new(Self {
replication_factor, replication_factor,
persist_cluster_layout, persist_cluster_layout,
layout_watch, layout,
update_layout: Mutex::new(update_layout), change_notify,
system_endpoint, system_endpoint,
rpc_helper, rpc_helper,
}) }))
} }
// ---- PUBLIC INTERFACE ---- // ---- PUBLIC INTERFACE ----
pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> { pub fn status(&self) -> LayoutStatus {
let layout = self.layout();
LayoutStatus {
cluster_layout_version: layout.current().version,
cluster_layout_staging_hash: layout.staging_hash,
}
}
pub async fn update_cluster_layout(
self: &Arc<Self>,
layout: &LayoutHistory,
) -> Result<(), Error> {
self.handle_advertise_cluster_layout(layout).await?; self.handle_advertise_cluster_layout(layout).await?;
Ok(()) Ok(())
} }
pub fn history(&self) -> watch::Ref<Arc<LayoutHistory>> { pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
self.layout_watch.borrow() self.layout.read().unwrap()
} }
pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) { pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
let resp = self let resp = self
.rpc_helper .rpc_helper
.call( .call(
@ -108,7 +131,7 @@ impl LayoutManager {
/// Save network configuration to disc /// Save network configuration to disc
async fn save_cluster_layout(&self) -> Result<(), Error> { async fn save_cluster_layout(&self) -> Result<(), Error> {
let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone(); let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
self.persist_cluster_layout self.persist_cluster_layout
.save_async(&layout) .save_async(&layout)
.await .await
@ -116,15 +139,43 @@ impl LayoutManager {
Ok(()) Ok(())
} }
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
let mut layout = self.layout.write().unwrap();
let prev_layout_check = layout.check().is_ok();
if !prev_layout_check || adv.check().is_ok() {
if layout.merge(adv) {
if prev_layout_check && layout.check().is_err() {
panic!("Merged two correct layouts and got an incorrect layout.");
}
return Some(layout.clone());
}
}
None
}
// ---- RPC HANDLERS ---- // ---- RPC HANDLERS ----
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
let local_status = self.status();
if status.cluster_layout_version > local_status.cluster_layout_version
|| status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash
{
tokio::spawn({
let this = self.clone();
async move { this.pull_cluster_layout(from).await }
});
}
}
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc { pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
let layout = self.layout_watch.borrow().as_ref().clone(); let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
SystemRpc::AdvertiseClusterLayout(layout) SystemRpc::AdvertiseClusterLayout(layout)
} }
pub(crate) async fn handle_advertise_cluster_layout( pub(crate) async fn handle_advertise_cluster_layout(
&self, self: &Arc<Self>,
adv: &LayoutHistory, adv: &LayoutHistory,
) -> Result<SystemRpc, Error> { ) -> Result<SystemRpc, Error> {
if adv.current().replication_factor != self.replication_factor { if adv.current().replication_factor != self.replication_factor {
@ -137,37 +188,25 @@ impl LayoutManager {
return Err(Error::Message(msg)); return Err(Error::Message(msg));
} }
let update_layout = self.update_layout.lock().await; if let Some(new_layout) = self.merge_layout(adv) {
// TODO: don't clone each time an AdvertiseClusterLayout is received self.change_notify.notify_waiters();
let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
let prev_layout_check = layout.check().is_ok(); tokio::spawn({
if layout.merge(adv) { let this = self.clone();
if prev_layout_check && layout.check().is_err() { async move {
error!("New cluster layout is invalid, discarding."); if let Err(e) = this
return Err(Error::Message( .rpc_helper
"New cluster layout is invalid, discarding.".into(), .broadcast(
)); &this.system_endpoint,
} SystemRpc::AdvertiseClusterLayout(new_layout),
RequestStrategy::with_priority(PRIO_HIGH),
update_layout.send(Arc::new(layout.clone()))?; )
drop(update_layout); .await
{
/* TODO warn!("Error while broadcasting new cluster layout: {}", e);
tokio::spawn(async move { }
if let Err(e) = system
.rpc_helper()
.broadcast(
&system.system_endpoint,
SystemRpc::AdvertiseClusterLayout(layout),
RequestStrategy::with_priority(PRIO_HIGH),
)
.await
{
warn!("Error while broadcasting new cluster layout: {}", e);
} }
}); });
*/
self.save_cluster_layout().await?; self.save_cluster_layout().await?;
} }

View file

@ -226,7 +226,7 @@ mod v010 {
} }
/// The history of cluster layouts /// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize)] #[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
pub struct LayoutHistory { pub struct LayoutHistory {
/// The versions currently in use in the cluster /// The versions currently in use in the cluster
pub versions: Vec<LayoutVersion>, pub versions: Vec<LayoutVersion>,
@ -241,7 +241,7 @@ mod v010 {
} }
/// The tracker of acknowlegments and data syncs around the cluster /// The tracker of acknowlegments and data syncs around the cluster
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct UpdateTrackers { pub struct UpdateTrackers {
/// The highest layout version number each node has ack'ed /// The highest layout version number each node has ack'ed
pub ack_map: UpdateTracker, pub ack_map: UpdateTracker,
@ -253,7 +253,7 @@ mod v010 {
} }
/// The history of cluster layouts /// The history of cluster layouts
#[derive(Clone, Debug, Serialize, Deserialize, Default)] #[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
pub struct UpdateTracker(pub HashMap<Uuid, u64>); pub struct UpdateTracker(pub HashMap<Uuid, u64>);
impl garage_util::migrate::Migrate for LayoutHistory { impl garage_util::migrate::Migrate for LayoutHistory {

View file

@ -1,12 +1,11 @@
//! Contain structs related to making RPCs //! Contain structs related to making RPCs
use std::sync::Arc; use std::sync::{Arc, RwLock};
use std::time::Duration; use std::time::Duration;
use futures::future::join_all; use futures::future::join_all;
use futures::stream::futures_unordered::FuturesUnordered; use futures::stream::futures_unordered::FuturesUnordered;
use futures::stream::StreamExt; use futures::stream::StreamExt;
use tokio::select; use tokio::select;
use tokio::sync::watch;
use opentelemetry::KeyValue; use opentelemetry::KeyValue;
use opentelemetry::{ use opentelemetry::{
@ -91,7 +90,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
struct RpcHelperInner { struct RpcHelperInner {
our_node_id: Uuid, our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
layout_watch: watch::Receiver<Arc<LayoutHistory>>, layout: Arc<RwLock<LayoutHistory>>,
metrics: RpcMetrics, metrics: RpcMetrics,
rpc_timeout: Duration, rpc_timeout: Duration,
} }
@ -100,7 +99,7 @@ impl RpcHelper {
pub(crate) fn new( pub(crate) fn new(
our_node_id: Uuid, our_node_id: Uuid,
fullmesh: Arc<FullMeshPeeringStrategy>, fullmesh: Arc<FullMeshPeeringStrategy>,
layout_watch: watch::Receiver<Arc<LayoutHistory>>, layout: Arc<RwLock<LayoutHistory>>,
rpc_timeout: Option<Duration>, rpc_timeout: Option<Duration>,
) -> Self { ) -> Self {
let metrics = RpcMetrics::new(); let metrics = RpcMetrics::new();
@ -108,7 +107,7 @@ impl RpcHelper {
Self(Arc::new(RpcHelperInner { Self(Arc::new(RpcHelperInner {
our_node_id, our_node_id,
fullmesh, fullmesh,
layout_watch, layout,
metrics, metrics,
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT), rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
})) }))
@ -392,7 +391,7 @@ impl RpcHelper {
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> { pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
// Retrieve some status variables that we will use to sort requests // Retrieve some status variables that we will use to sort requests
let peer_list = self.0.fullmesh.get_peer_list(); let peer_list = self.0.fullmesh.get_peer_list();
let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone(); let layout = self.0.layout.read().unwrap();
let our_zone = match layout.current().node_role(&self.0.our_node_id) { let our_zone = match layout.current().node_role(&self.0.our_node_id) {
Some(pc) => &pc.zone, Some(pc) => &pc.zone,
None => "", None => "",

View file

@ -4,7 +4,7 @@ use std::io::{Read, Write};
use std::net::{IpAddr, SocketAddr}; use std::net::{IpAddr, SocketAddr};
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::sync::atomic::Ordering; use std::sync::atomic::Ordering;
use std::sync::{Arc, RwLock}; use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use arc_swap::ArcSwap; use arc_swap::ArcSwap;
@ -13,7 +13,7 @@ use futures::join;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use sodiumoxide::crypto::sign::ed25519; use sodiumoxide::crypto::sign::ed25519;
use tokio::select; use tokio::select;
use tokio::sync::watch; use tokio::sync::{watch, Notify};
use netapp::endpoint::{Endpoint, EndpointHandler}; use netapp::endpoint::{Endpoint, EndpointHandler};
use netapp::message::*; use netapp::message::*;
@ -33,7 +33,7 @@ use garage_util::time::*;
use crate::consul::ConsulDiscovery; use crate::consul::ConsulDiscovery;
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
use crate::kubernetes::*; use crate::kubernetes::*;
use crate::layout::manager::LayoutManager; use crate::layout::manager::{LayoutManager, LayoutStatus};
use crate::layout::*; use crate::layout::*;
use crate::replication_mode::*; use crate::replication_mode::*;
use crate::rpc_helper::*; use crate::rpc_helper::*;
@ -104,7 +104,7 @@ pub struct System {
#[cfg(feature = "kubernetes-discovery")] #[cfg(feature = "kubernetes-discovery")]
kubernetes_discovery: Option<KubernetesDiscoveryConfig>, kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
pub layout_manager: LayoutManager, pub layout_manager: Arc<LayoutManager>,
metrics: SystemMetrics, metrics: SystemMetrics,
@ -125,12 +125,8 @@ pub struct NodeStatus {
/// Replication factor configured on the node /// Replication factor configured on the node
pub replication_factor: usize, pub replication_factor: usize,
/// Cluster layout version /// Layout status
pub cluster_layout_version: u64, pub layout_status: LayoutStatus,
/// Hash of cluster layout update trackers
// (TODO) pub cluster_layout_trackers_hash: Hash,
/// Hash of cluster layout staging data
pub cluster_layout_staging_hash: Hash,
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`) /// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
#[serde(default)] #[serde(default)]
@ -284,7 +280,7 @@ impl System {
// ---- set up metrics and status exchange ---- // ---- set up metrics and status exchange ----
let metrics = SystemMetrics::new(replication_factor); let metrics = SystemMetrics::new(replication_factor);
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history()); let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics); local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
// ---- if enabled, set up additionnal peer discovery methods ---- // ---- if enabled, set up additionnal peer discovery methods ----
@ -349,12 +345,12 @@ impl System {
// ---- Public utilities / accessors ---- // ---- Public utilities / accessors ----
pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> { pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
self.layout_manager.history() self.layout_manager.layout()
} }
pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> { pub fn layout_notify(&self) -> Arc<Notify> {
self.layout_manager.layout_watch.clone() self.layout_manager.change_notify.clone()
} }
pub fn rpc_helper(&self) -> &RpcHelper { pub fn rpc_helper(&self) -> &RpcHelper {
@ -416,7 +412,6 @@ impl System {
} }
pub fn health(&self) -> ClusterHealth { pub fn health(&self) -> ClusterHealth {
let layout: Arc<_> = self.cluster_layout().clone();
let quorum = self.replication_mode.write_quorum(); let quorum = self.replication_mode.write_quorum();
let replication_factor = self.replication_factor; let replication_factor = self.replication_factor;
@ -427,6 +422,8 @@ impl System {
.collect::<HashMap<Uuid, _>>(); .collect::<HashMap<Uuid, _>>();
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count(); let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
let layout = self.cluster_layout(); // acquires a rwlock
// TODO: not only layout.current() // TODO: not only layout.current()
let storage_nodes = layout let storage_nodes = layout
.current() .current()
@ -536,9 +533,7 @@ impl System {
fn update_local_status(&self) { fn update_local_status(&self) {
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone(); let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
let layout = self.cluster_layout(); new_si.layout_status = self.layout_manager.status();
new_si.cluster_layout_version = layout.current().version;
new_si.cluster_layout_staging_hash = layout.staging_hash;
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics); new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
@ -571,14 +566,8 @@ impl System {
std::process::exit(1); std::process::exit(1);
} }
if info.cluster_layout_version > local_info.cluster_layout_version self.layout_manager
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash .handle_advertise_status(from, &info.layout_status);
{
tokio::spawn({
let system = self.clone();
async move { system.layout_manager.pull_cluster_layout(from).await }
});
}
self.node_status self.node_status
.write() .write()
@ -746,14 +735,13 @@ impl EndpointHandler<SystemRpc> for System {
} }
impl NodeStatus { impl NodeStatus {
fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self { fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
NodeStatus { NodeStatus {
hostname: gethostname::gethostname() hostname: gethostname::gethostname()
.into_string() .into_string()
.unwrap_or_else(|_| "<invalid utf-8>".to_string()), .unwrap_or_else(|_| "<invalid utf-8>".to_string()),
replication_factor, replication_factor,
cluster_layout_version: layout.current().version, layout_status: layout_manager.status(),
cluster_layout_staging_hash: layout.staging_hash,
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,
} }
@ -763,8 +751,7 @@ impl NodeStatus {
NodeStatus { NodeStatus {
hostname: "?".to_string(), hostname: "?".to_string(),
replication_factor: 0, replication_factor: 0,
cluster_layout_version: 0, layout_status: Default::default(),
cluster_layout_staging_hash: Hash::from([0u8; 32]),
meta_disk_avail: None, meta_disk_avail: None,
data_disk_avail: None, data_disk_avail: None,
} }

View file

@ -10,7 +10,7 @@ use rand::Rng;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use serde_bytes::ByteBuf; use serde_bytes::ByteBuf;
use tokio::select; use tokio::select;
use tokio::sync::{mpsc, watch}; use tokio::sync::{mpsc, watch, Notify};
use garage_util::background::*; use garage_util::background::*;
use garage_util::data::*; use garage_util::data::*;
@ -91,8 +91,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
bg.spawn_worker(SyncWorker { bg.spawn_worker(SyncWorker {
syncer: self.clone(), syncer: self.clone(),
layout_watch: self.system.layout_watch(), layout_notify: self.system.layout_notify(),
layout: self.system.cluster_layout().clone(), layout_version: self.system.cluster_layout().current().version,
add_full_sync_rx, add_full_sync_rx,
todo: vec![], todo: vec![],
next_full_sync: Instant::now() + Duration::from_secs(20), next_full_sync: Instant::now() + Duration::from_secs(20),
@ -492,8 +492,8 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
struct SyncWorker<F: TableSchema, R: TableReplication> { struct SyncWorker<F: TableSchema, R: TableReplication> {
syncer: Arc<TableSyncer<F, R>>, syncer: Arc<TableSyncer<F, R>>,
layout_watch: watch::Receiver<Arc<LayoutHistory>>, layout_notify: Arc<Notify>,
layout: Arc<LayoutHistory>, layout_version: u64,
add_full_sync_rx: mpsc::UnboundedReceiver<()>, add_full_sync_rx: mpsc::UnboundedReceiver<()>,
todo: Vec<TodoPartition>, todo: Vec<TodoPartition>,
next_full_sync: Instant, next_full_sync: Instant,
@ -593,12 +593,11 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
self.add_full_sync(); self.add_full_sync();
} }
}, },
_ = self.layout_watch.changed() => { _ = self.layout_notify.notified() => {
let new_layout = self.layout_watch.borrow(); let new_version = self.syncer.system.cluster_layout().current().version;
if !Arc::ptr_eq(&new_layout, &self.layout) { if new_version > self.layout_version {
self.layout = new_layout.clone(); self.layout_version = new_version;
drop(new_layout); debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME);
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
self.add_full_sync(); self.add_full_sync();
} }
}, },