Compare commits
No commits in common. "bfb1845fdc981a370539d641a5d80f438f184f07" and "8a2b1dd422fb57abe611d8c1cf3cb0b55f487189" have entirely different histories.
bfb1845fdc
...
8a2b1dd422
8 changed files with 117 additions and 133 deletions
|
@ -210,7 +210,7 @@ pub async fn handle_update_cluster_layout(
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
|
let updates = parse_json_body::<UpdateClusterLayoutRequest>(req).await?;
|
||||||
|
|
||||||
let mut layout = garage.system.cluster_layout().clone();
|
let mut layout = garage.system.cluster_layout().as_ref().clone();
|
||||||
|
|
||||||
let mut roles = layout.current().roles.clone();
|
let mut roles = layout.current().roles.clone();
|
||||||
roles.merge(&layout.staging.get().roles);
|
roles.merge(&layout.staging.get().roles);
|
||||||
|
@ -256,7 +256,7 @@ pub async fn handle_apply_cluster_layout(
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let param = parse_json_body::<ApplyLayoutRequest>(req).await?;
|
let param = parse_json_body::<ApplyLayoutRequest>(req).await?;
|
||||||
|
|
||||||
let layout = garage.system.cluster_layout().clone();
|
let layout = garage.system.cluster_layout().as_ref().clone();
|
||||||
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
let (layout, msg) = layout.apply_staged_changes(Some(param.version))?;
|
||||||
|
|
||||||
garage
|
garage
|
||||||
|
@ -273,7 +273,7 @@ pub async fn handle_apply_cluster_layout(
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
|
pub async fn handle_revert_cluster_layout(garage: &Arc<Garage>) -> Result<Response<Body>, Error> {
|
||||||
let layout = garage.system.cluster_layout().clone();
|
let layout = garage.system.cluster_layout().as_ref().clone();
|
||||||
let layout = layout.revert_staged_changes()?;
|
let layout = layout.revert_staged_changes()?;
|
||||||
garage
|
garage
|
||||||
.system
|
.system
|
||||||
|
|
|
@ -5,6 +5,7 @@ use serde::Serialize;
|
||||||
|
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
|
||||||
|
use garage_rpc::layout::LayoutHistory;
|
||||||
use garage_table::util::*;
|
use garage_table::util::*;
|
||||||
|
|
||||||
use garage_model::garage::Garage;
|
use garage_model::garage::Garage;
|
||||||
|
@ -25,8 +26,7 @@ pub async fn handle_read_index(
|
||||||
) -> Result<Response<Body>, Error> {
|
) -> Result<Response<Body>, Error> {
|
||||||
let reverse = reverse.unwrap_or(false);
|
let reverse = reverse.unwrap_or(false);
|
||||||
|
|
||||||
// TODO: not only current
|
let layout: Arc<LayoutHistory> = garage.system.cluster_layout().clone();
|
||||||
let node_id_vec = garage.system.cluster_layout().current().node_ids().to_vec();
|
|
||||||
|
|
||||||
let (partition_keys, more, next_start) = read_range(
|
let (partition_keys, more, next_start) = read_range(
|
||||||
&garage.k2v.counter_table.table,
|
&garage.k2v.counter_table.table,
|
||||||
|
@ -35,7 +35,10 @@ pub async fn handle_read_index(
|
||||||
&start,
|
&start,
|
||||||
&end,
|
&end,
|
||||||
limit,
|
limit,
|
||||||
Some((DeletedFilter::NotDeleted, node_id_vec)),
|
Some((
|
||||||
|
DeletedFilter::NotDeleted,
|
||||||
|
layout.current().node_id_vec.clone(),
|
||||||
|
)),
|
||||||
EnumerationOrder::from_reverse(reverse),
|
EnumerationOrder::from_reverse(reverse),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -54,7 +57,7 @@ pub async fn handle_read_index(
|
||||||
partition_keys: partition_keys
|
partition_keys: partition_keys
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.map(|part| {
|
.map(|part| {
|
||||||
let vals = part.filtered_values(&garage.system.cluster_layout());
|
let vals = part.filtered_values(&layout);
|
||||||
ReadIndexResponseEntry {
|
ReadIndexResponseEntry {
|
||||||
pk: part.sk,
|
pk: part.sk,
|
||||||
entries: *vals.get(&s_entries).unwrap_or(&0),
|
entries: *vals.get(&s_entries).unwrap_or(&0),
|
||||||
|
|
|
@ -450,8 +450,10 @@ impl<'a> BucketHelper<'a> {
|
||||||
|
|
||||||
#[cfg(feature = "k2v")]
|
#[cfg(feature = "k2v")]
|
||||||
{
|
{
|
||||||
// TODO: not only current
|
use garage_rpc::layout::LayoutHistory;
|
||||||
let node_id_vec = self.0.system.cluster_layout().current().node_ids().to_vec();
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
let layout: Arc<LayoutHistory> = self.0.system.cluster_layout().clone();
|
||||||
let k2vindexes = self
|
let k2vindexes = self
|
||||||
.0
|
.0
|
||||||
.k2v
|
.k2v
|
||||||
|
@ -460,7 +462,10 @@ impl<'a> BucketHelper<'a> {
|
||||||
.get_range(
|
.get_range(
|
||||||
&bucket_id,
|
&bucket_id,
|
||||||
None,
|
None,
|
||||||
Some((DeletedFilter::NotDeleted, node_id_vec)),
|
Some((
|
||||||
|
DeletedFilter::NotDeleted,
|
||||||
|
layout.current().node_id_vec.clone(),
|
||||||
|
)),
|
||||||
10,
|
10,
|
||||||
EnumerationOrder::Forward,
|
EnumerationOrder::Forward,
|
||||||
)
|
)
|
||||||
|
|
|
@ -1,9 +1,8 @@
|
||||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use tokio::sync::watch;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
use tokio::sync::Notify;
|
|
||||||
|
|
||||||
use netapp::endpoint::Endpoint;
|
use netapp::endpoint::Endpoint;
|
||||||
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
use netapp::peering::fullmesh::FullMeshPeeringStrategy;
|
||||||
|
@ -22,23 +21,13 @@ pub struct LayoutManager {
|
||||||
replication_factor: usize,
|
replication_factor: usize,
|
||||||
persist_cluster_layout: Persister<LayoutHistory>,
|
persist_cluster_layout: Persister<LayoutHistory>,
|
||||||
|
|
||||||
layout: Arc<RwLock<LayoutHistory>>,
|
pub layout_watch: watch::Receiver<Arc<LayoutHistory>>,
|
||||||
pub(crate) change_notify: Arc<Notify>,
|
update_layout: Mutex<watch::Sender<Arc<LayoutHistory>>>,
|
||||||
|
|
||||||
pub(crate) rpc_helper: RpcHelper,
|
pub(crate) rpc_helper: RpcHelper,
|
||||||
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, Default)]
|
|
||||||
pub struct LayoutStatus {
|
|
||||||
/// Cluster layout version
|
|
||||||
pub cluster_layout_version: u64,
|
|
||||||
/// Hash of cluster layout update trackers
|
|
||||||
// (TODO) pub cluster_layout_trackers_hash: Hash,
|
|
||||||
/// Hash of cluster layout staging data
|
|
||||||
pub cluster_layout_staging_hash: Hash,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LayoutManager {
|
impl LayoutManager {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
config: &Config,
|
config: &Config,
|
||||||
|
@ -46,7 +35,7 @@ impl LayoutManager {
|
||||||
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
|
||||||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
fullmesh: Arc<FullMeshPeeringStrategy>,
|
||||||
replication_factor: usize,
|
replication_factor: usize,
|
||||||
) -> Result<Arc<Self>, Error> {
|
) -> Result<Self, Error> {
|
||||||
let persist_cluster_layout: Persister<LayoutHistory> =
|
let persist_cluster_layout: Persister<LayoutHistory> =
|
||||||
Persister::new(&config.metadata_dir, "cluster_layout");
|
Persister::new(&config.metadata_dir, "cluster_layout");
|
||||||
|
|
||||||
|
@ -70,49 +59,37 @@ impl LayoutManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let layout = Arc::new(RwLock::new(cluster_layout));
|
let (update_layout, layout_watch) = watch::channel(Arc::new(cluster_layout));
|
||||||
let change_notify = Arc::new(Notify::new());
|
|
||||||
|
|
||||||
let rpc_helper = RpcHelper::new(
|
let rpc_helper = RpcHelper::new(
|
||||||
node_id.into(),
|
node_id.into(),
|
||||||
fullmesh,
|
fullmesh,
|
||||||
layout.clone(),
|
layout_watch.clone(),
|
||||||
config.rpc_timeout_msec.map(Duration::from_millis),
|
config.rpc_timeout_msec.map(Duration::from_millis),
|
||||||
);
|
);
|
||||||
|
|
||||||
Ok(Arc::new(Self {
|
Ok(Self {
|
||||||
replication_factor,
|
replication_factor,
|
||||||
persist_cluster_layout,
|
persist_cluster_layout,
|
||||||
layout,
|
layout_watch,
|
||||||
change_notify,
|
update_layout: Mutex::new(update_layout),
|
||||||
system_endpoint,
|
system_endpoint,
|
||||||
rpc_helper,
|
rpc_helper,
|
||||||
}))
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- PUBLIC INTERFACE ----
|
// ---- PUBLIC INTERFACE ----
|
||||||
|
|
||||||
pub fn status(&self) -> LayoutStatus {
|
pub async fn update_cluster_layout(&self, layout: &LayoutHistory) -> Result<(), Error> {
|
||||||
let layout = self.layout();
|
|
||||||
LayoutStatus {
|
|
||||||
cluster_layout_version: layout.current().version,
|
|
||||||
cluster_layout_staging_hash: layout.staging_hash,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn update_cluster_layout(
|
|
||||||
self: &Arc<Self>,
|
|
||||||
layout: &LayoutHistory,
|
|
||||||
) -> Result<(), Error> {
|
|
||||||
self.handle_advertise_cluster_layout(layout).await?;
|
self.handle_advertise_cluster_layout(layout).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
|
pub fn history(&self) -> watch::Ref<Arc<LayoutHistory>> {
|
||||||
self.layout.read().unwrap()
|
self.layout_watch.borrow()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn pull_cluster_layout(self: &Arc<Self>, peer: Uuid) {
|
pub(crate) async fn pull_cluster_layout(&self, peer: Uuid) {
|
||||||
let resp = self
|
let resp = self
|
||||||
.rpc_helper
|
.rpc_helper
|
||||||
.call(
|
.call(
|
||||||
|
@ -131,7 +108,7 @@ impl LayoutManager {
|
||||||
|
|
||||||
/// Save network configuration to disc
|
/// Save network configuration to disc
|
||||||
async fn save_cluster_layout(&self) -> Result<(), Error> {
|
async fn save_cluster_layout(&self) -> Result<(), Error> {
|
||||||
let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
|
let layout: Arc<LayoutHistory> = self.layout_watch.borrow().clone();
|
||||||
self.persist_cluster_layout
|
self.persist_cluster_layout
|
||||||
.save_async(&layout)
|
.save_async(&layout)
|
||||||
.await
|
.await
|
||||||
|
@ -139,43 +116,15 @@ impl LayoutManager {
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
|
|
||||||
let mut layout = self.layout.write().unwrap();
|
|
||||||
let prev_layout_check = layout.check().is_ok();
|
|
||||||
|
|
||||||
if !prev_layout_check || adv.check().is_ok() {
|
|
||||||
if layout.merge(adv) {
|
|
||||||
if prev_layout_check && layout.check().is_err() {
|
|
||||||
panic!("Merged two correct layouts and got an incorrect layout.");
|
|
||||||
}
|
|
||||||
|
|
||||||
return Some(layout.clone());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
None
|
|
||||||
}
|
|
||||||
|
|
||||||
// ---- RPC HANDLERS ----
|
// ---- RPC HANDLERS ----
|
||||||
|
|
||||||
pub(crate) fn handle_advertise_status(self: &Arc<Self>, from: Uuid, status: &LayoutStatus) {
|
|
||||||
let local_status = self.status();
|
|
||||||
if status.cluster_layout_version > local_status.cluster_layout_version
|
|
||||||
|| status.cluster_layout_staging_hash != local_status.cluster_layout_staging_hash
|
|
||||||
{
|
|
||||||
tokio::spawn({
|
|
||||||
let this = self.clone();
|
|
||||||
async move { this.pull_cluster_layout(from).await }
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
|
pub(crate) fn handle_pull_cluster_layout(&self) -> SystemRpc {
|
||||||
let layout = self.layout.read().unwrap().clone(); // TODO: avoid cloning
|
let layout = self.layout_watch.borrow().as_ref().clone();
|
||||||
SystemRpc::AdvertiseClusterLayout(layout)
|
SystemRpc::AdvertiseClusterLayout(layout)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) async fn handle_advertise_cluster_layout(
|
pub(crate) async fn handle_advertise_cluster_layout(
|
||||||
self: &Arc<Self>,
|
&self,
|
||||||
adv: &LayoutHistory,
|
adv: &LayoutHistory,
|
||||||
) -> Result<SystemRpc, Error> {
|
) -> Result<SystemRpc, Error> {
|
||||||
if adv.current().replication_factor != self.replication_factor {
|
if adv.current().replication_factor != self.replication_factor {
|
||||||
|
@ -188,25 +137,37 @@ impl LayoutManager {
|
||||||
return Err(Error::Message(msg));
|
return Err(Error::Message(msg));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(new_layout) = self.merge_layout(adv) {
|
let update_layout = self.update_layout.lock().await;
|
||||||
self.change_notify.notify_waiters();
|
// TODO: don't clone each time an AdvertiseClusterLayout is received
|
||||||
|
let mut layout: LayoutHistory = self.layout_watch.borrow().as_ref().clone();
|
||||||
|
|
||||||
tokio::spawn({
|
let prev_layout_check = layout.check().is_ok();
|
||||||
let this = self.clone();
|
if layout.merge(adv) {
|
||||||
async move {
|
if prev_layout_check && layout.check().is_err() {
|
||||||
if let Err(e) = this
|
error!("New cluster layout is invalid, discarding.");
|
||||||
.rpc_helper
|
return Err(Error::Message(
|
||||||
.broadcast(
|
"New cluster layout is invalid, discarding.".into(),
|
||||||
&this.system_endpoint,
|
));
|
||||||
SystemRpc::AdvertiseClusterLayout(new_layout),
|
}
|
||||||
RequestStrategy::with_priority(PRIO_HIGH),
|
|
||||||
)
|
update_layout.send(Arc::new(layout.clone()))?;
|
||||||
.await
|
drop(update_layout);
|
||||||
{
|
|
||||||
warn!("Error while broadcasting new cluster layout: {}", e);
|
/* TODO
|
||||||
}
|
tokio::spawn(async move {
|
||||||
|
if let Err(e) = system
|
||||||
|
.rpc_helper()
|
||||||
|
.broadcast(
|
||||||
|
&system.system_endpoint,
|
||||||
|
SystemRpc::AdvertiseClusterLayout(layout),
|
||||||
|
RequestStrategy::with_priority(PRIO_HIGH),
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
warn!("Error while broadcasting new cluster layout: {}", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
*/
|
||||||
|
|
||||||
self.save_cluster_layout().await?;
|
self.save_cluster_layout().await?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -226,7 +226,7 @@ mod v010 {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The history of cluster layouts
|
/// The history of cluster layouts
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, PartialEq)]
|
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||||
pub struct LayoutHistory {
|
pub struct LayoutHistory {
|
||||||
/// The versions currently in use in the cluster
|
/// The versions currently in use in the cluster
|
||||||
pub versions: Vec<LayoutVersion>,
|
pub versions: Vec<LayoutVersion>,
|
||||||
|
@ -241,7 +241,7 @@ mod v010 {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The tracker of acknowlegments and data syncs around the cluster
|
/// The tracker of acknowlegments and data syncs around the cluster
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
|
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||||
pub struct UpdateTrackers {
|
pub struct UpdateTrackers {
|
||||||
/// The highest layout version number each node has ack'ed
|
/// The highest layout version number each node has ack'ed
|
||||||
pub ack_map: UpdateTracker,
|
pub ack_map: UpdateTracker,
|
||||||
|
@ -253,7 +253,7 @@ mod v010 {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// The history of cluster layouts
|
/// The history of cluster layouts
|
||||||
#[derive(Clone, Debug, Serialize, Deserialize, Default, PartialEq)]
|
#[derive(Clone, Debug, Serialize, Deserialize, Default)]
|
||||||
pub struct UpdateTracker(pub HashMap<Uuid, u64>);
|
pub struct UpdateTracker(pub HashMap<Uuid, u64>);
|
||||||
|
|
||||||
impl garage_util::migrate::Migrate for LayoutHistory {
|
impl garage_util::migrate::Migrate for LayoutHistory {
|
||||||
|
|
|
@ -1,11 +1,12 @@
|
||||||
//! Contain structs related to making RPCs
|
//! Contain structs related to making RPCs
|
||||||
use std::sync::{Arc, RwLock};
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use futures::stream::futures_unordered::FuturesUnordered;
|
use futures::stream::futures_unordered::FuturesUnordered;
|
||||||
use futures::stream::StreamExt;
|
use futures::stream::StreamExt;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use opentelemetry::KeyValue;
|
use opentelemetry::KeyValue;
|
||||||
use opentelemetry::{
|
use opentelemetry::{
|
||||||
|
@ -90,7 +91,7 @@ pub struct RpcHelper(Arc<RpcHelperInner>);
|
||||||
struct RpcHelperInner {
|
struct RpcHelperInner {
|
||||||
our_node_id: Uuid,
|
our_node_id: Uuid,
|
||||||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
fullmesh: Arc<FullMeshPeeringStrategy>,
|
||||||
layout: Arc<RwLock<LayoutHistory>>,
|
layout_watch: watch::Receiver<Arc<LayoutHistory>>,
|
||||||
metrics: RpcMetrics,
|
metrics: RpcMetrics,
|
||||||
rpc_timeout: Duration,
|
rpc_timeout: Duration,
|
||||||
}
|
}
|
||||||
|
@ -99,7 +100,7 @@ impl RpcHelper {
|
||||||
pub(crate) fn new(
|
pub(crate) fn new(
|
||||||
our_node_id: Uuid,
|
our_node_id: Uuid,
|
||||||
fullmesh: Arc<FullMeshPeeringStrategy>,
|
fullmesh: Arc<FullMeshPeeringStrategy>,
|
||||||
layout: Arc<RwLock<LayoutHistory>>,
|
layout_watch: watch::Receiver<Arc<LayoutHistory>>,
|
||||||
rpc_timeout: Option<Duration>,
|
rpc_timeout: Option<Duration>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
let metrics = RpcMetrics::new();
|
let metrics = RpcMetrics::new();
|
||||||
|
@ -107,7 +108,7 @@ impl RpcHelper {
|
||||||
Self(Arc::new(RpcHelperInner {
|
Self(Arc::new(RpcHelperInner {
|
||||||
our_node_id,
|
our_node_id,
|
||||||
fullmesh,
|
fullmesh,
|
||||||
layout,
|
layout_watch,
|
||||||
metrics,
|
metrics,
|
||||||
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
|
rpc_timeout: rpc_timeout.unwrap_or(DEFAULT_TIMEOUT),
|
||||||
}))
|
}))
|
||||||
|
@ -391,7 +392,7 @@ impl RpcHelper {
|
||||||
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
|
pub fn request_order(&self, nodes: &[Uuid]) -> Vec<Uuid> {
|
||||||
// Retrieve some status variables that we will use to sort requests
|
// Retrieve some status variables that we will use to sort requests
|
||||||
let peer_list = self.0.fullmesh.get_peer_list();
|
let peer_list = self.0.fullmesh.get_peer_list();
|
||||||
let layout = self.0.layout.read().unwrap();
|
let layout: Arc<LayoutHistory> = self.0.layout_watch.borrow().clone();
|
||||||
let our_zone = match layout.current().node_role(&self.0.our_node_id) {
|
let our_zone = match layout.current().node_role(&self.0.our_node_id) {
|
||||||
Some(pc) => &pc.zone,
|
Some(pc) => &pc.zone,
|
||||||
None => "",
|
None => "",
|
||||||
|
|
|
@ -4,7 +4,7 @@ use std::io::{Read, Write};
|
||||||
use std::net::{IpAddr, SocketAddr};
|
use std::net::{IpAddr, SocketAddr};
|
||||||
use std::path::{Path, PathBuf};
|
use std::path::{Path, PathBuf};
|
||||||
use std::sync::atomic::Ordering;
|
use std::sync::atomic::Ordering;
|
||||||
use std::sync::{Arc, RwLock, RwLockReadGuard};
|
use std::sync::{Arc, RwLock};
|
||||||
use std::time::{Duration, Instant};
|
use std::time::{Duration, Instant};
|
||||||
|
|
||||||
use arc_swap::ArcSwap;
|
use arc_swap::ArcSwap;
|
||||||
|
@ -13,7 +13,7 @@ use futures::join;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use sodiumoxide::crypto::sign::ed25519;
|
use sodiumoxide::crypto::sign::ed25519;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{watch, Notify};
|
use tokio::sync::watch;
|
||||||
|
|
||||||
use netapp::endpoint::{Endpoint, EndpointHandler};
|
use netapp::endpoint::{Endpoint, EndpointHandler};
|
||||||
use netapp::message::*;
|
use netapp::message::*;
|
||||||
|
@ -33,7 +33,7 @@ use garage_util::time::*;
|
||||||
use crate::consul::ConsulDiscovery;
|
use crate::consul::ConsulDiscovery;
|
||||||
#[cfg(feature = "kubernetes-discovery")]
|
#[cfg(feature = "kubernetes-discovery")]
|
||||||
use crate::kubernetes::*;
|
use crate::kubernetes::*;
|
||||||
use crate::layout::manager::{LayoutManager, LayoutStatus};
|
use crate::layout::manager::LayoutManager;
|
||||||
use crate::layout::*;
|
use crate::layout::*;
|
||||||
use crate::replication_mode::*;
|
use crate::replication_mode::*;
|
||||||
use crate::rpc_helper::*;
|
use crate::rpc_helper::*;
|
||||||
|
@ -104,7 +104,7 @@ pub struct System {
|
||||||
#[cfg(feature = "kubernetes-discovery")]
|
#[cfg(feature = "kubernetes-discovery")]
|
||||||
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
|
kubernetes_discovery: Option<KubernetesDiscoveryConfig>,
|
||||||
|
|
||||||
pub layout_manager: Arc<LayoutManager>,
|
pub layout_manager: LayoutManager,
|
||||||
|
|
||||||
metrics: SystemMetrics,
|
metrics: SystemMetrics,
|
||||||
|
|
||||||
|
@ -125,8 +125,12 @@ pub struct NodeStatus {
|
||||||
/// Replication factor configured on the node
|
/// Replication factor configured on the node
|
||||||
pub replication_factor: usize,
|
pub replication_factor: usize,
|
||||||
|
|
||||||
/// Layout status
|
/// Cluster layout version
|
||||||
pub layout_status: LayoutStatus,
|
pub cluster_layout_version: u64,
|
||||||
|
/// Hash of cluster layout update trackers
|
||||||
|
// (TODO) pub cluster_layout_trackers_hash: Hash,
|
||||||
|
/// Hash of cluster layout staging data
|
||||||
|
pub cluster_layout_staging_hash: Hash,
|
||||||
|
|
||||||
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
|
/// Disk usage on partition containing metadata directory (tuple: `(avail, total)`)
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
|
@ -280,7 +284,7 @@ impl System {
|
||||||
// ---- set up metrics and status exchange ----
|
// ---- set up metrics and status exchange ----
|
||||||
let metrics = SystemMetrics::new(replication_factor);
|
let metrics = SystemMetrics::new(replication_factor);
|
||||||
|
|
||||||
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager);
|
let mut local_status = NodeStatus::initial(replication_factor, &layout_manager.history());
|
||||||
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
|
local_status.update_disk_usage(&config.metadata_dir, &config.data_dir, &metrics);
|
||||||
|
|
||||||
// ---- if enabled, set up additionnal peer discovery methods ----
|
// ---- if enabled, set up additionnal peer discovery methods ----
|
||||||
|
@ -345,12 +349,12 @@ impl System {
|
||||||
|
|
||||||
// ---- Public utilities / accessors ----
|
// ---- Public utilities / accessors ----
|
||||||
|
|
||||||
pub fn cluster_layout(&self) -> RwLockReadGuard<'_, LayoutHistory> {
|
pub fn cluster_layout(&self) -> watch::Ref<Arc<LayoutHistory>> {
|
||||||
self.layout_manager.layout()
|
self.layout_manager.history()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn layout_notify(&self) -> Arc<Notify> {
|
pub fn layout_watch(&self) -> watch::Receiver<Arc<LayoutHistory>> {
|
||||||
self.layout_manager.change_notify.clone()
|
self.layout_manager.layout_watch.clone()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn rpc_helper(&self) -> &RpcHelper {
|
pub fn rpc_helper(&self) -> &RpcHelper {
|
||||||
|
@ -412,6 +416,7 @@ impl System {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn health(&self) -> ClusterHealth {
|
pub fn health(&self) -> ClusterHealth {
|
||||||
|
let layout: Arc<_> = self.cluster_layout().clone();
|
||||||
let quorum = self.replication_mode.write_quorum();
|
let quorum = self.replication_mode.write_quorum();
|
||||||
let replication_factor = self.replication_factor;
|
let replication_factor = self.replication_factor;
|
||||||
|
|
||||||
|
@ -422,8 +427,6 @@ impl System {
|
||||||
.collect::<HashMap<Uuid, _>>();
|
.collect::<HashMap<Uuid, _>>();
|
||||||
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
|
let connected_nodes = nodes.iter().filter(|(_, n)| n.is_up).count();
|
||||||
|
|
||||||
let layout = self.cluster_layout(); // acquires a rwlock
|
|
||||||
|
|
||||||
// TODO: not only layout.current()
|
// TODO: not only layout.current()
|
||||||
let storage_nodes = layout
|
let storage_nodes = layout
|
||||||
.current()
|
.current()
|
||||||
|
@ -533,7 +536,9 @@ impl System {
|
||||||
fn update_local_status(&self) {
|
fn update_local_status(&self) {
|
||||||
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
|
let mut new_si: NodeStatus = self.local_status.load().as_ref().clone();
|
||||||
|
|
||||||
new_si.layout_status = self.layout_manager.status();
|
let layout = self.cluster_layout();
|
||||||
|
new_si.cluster_layout_version = layout.current().version;
|
||||||
|
new_si.cluster_layout_staging_hash = layout.staging_hash;
|
||||||
|
|
||||||
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
|
new_si.update_disk_usage(&self.metadata_dir, &self.data_dir, &self.metrics);
|
||||||
|
|
||||||
|
@ -566,8 +571,14 @@ impl System {
|
||||||
std::process::exit(1);
|
std::process::exit(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
self.layout_manager
|
if info.cluster_layout_version > local_info.cluster_layout_version
|
||||||
.handle_advertise_status(from, &info.layout_status);
|
|| info.cluster_layout_staging_hash != local_info.cluster_layout_staging_hash
|
||||||
|
{
|
||||||
|
tokio::spawn({
|
||||||
|
let system = self.clone();
|
||||||
|
async move { system.layout_manager.pull_cluster_layout(from).await }
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
self.node_status
|
self.node_status
|
||||||
.write()
|
.write()
|
||||||
|
@ -735,13 +746,14 @@ impl EndpointHandler<SystemRpc> for System {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NodeStatus {
|
impl NodeStatus {
|
||||||
fn initial(replication_factor: usize, layout_manager: &LayoutManager) -> Self {
|
fn initial(replication_factor: usize, layout: &LayoutHistory) -> Self {
|
||||||
NodeStatus {
|
NodeStatus {
|
||||||
hostname: gethostname::gethostname()
|
hostname: gethostname::gethostname()
|
||||||
.into_string()
|
.into_string()
|
||||||
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
.unwrap_or_else(|_| "<invalid utf-8>".to_string()),
|
||||||
replication_factor,
|
replication_factor,
|
||||||
layout_status: layout_manager.status(),
|
cluster_layout_version: layout.current().version,
|
||||||
|
cluster_layout_staging_hash: layout.staging_hash,
|
||||||
meta_disk_avail: None,
|
meta_disk_avail: None,
|
||||||
data_disk_avail: None,
|
data_disk_avail: None,
|
||||||
}
|
}
|
||||||
|
@ -751,7 +763,8 @@ impl NodeStatus {
|
||||||
NodeStatus {
|
NodeStatus {
|
||||||
hostname: "?".to_string(),
|
hostname: "?".to_string(),
|
||||||
replication_factor: 0,
|
replication_factor: 0,
|
||||||
layout_status: Default::default(),
|
cluster_layout_version: 0,
|
||||||
|
cluster_layout_staging_hash: Hash::from([0u8; 32]),
|
||||||
meta_disk_avail: None,
|
meta_disk_avail: None,
|
||||||
data_disk_avail: None,
|
data_disk_avail: None,
|
||||||
}
|
}
|
||||||
|
|
|
@ -10,7 +10,7 @@ use rand::Rng;
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use serde_bytes::ByteBuf;
|
use serde_bytes::ByteBuf;
|
||||||
use tokio::select;
|
use tokio::select;
|
||||||
use tokio::sync::{mpsc, watch, Notify};
|
use tokio::sync::{mpsc, watch};
|
||||||
|
|
||||||
use garage_util::background::*;
|
use garage_util::background::*;
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -91,8 +91,8 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
|
|
||||||
bg.spawn_worker(SyncWorker {
|
bg.spawn_worker(SyncWorker {
|
||||||
syncer: self.clone(),
|
syncer: self.clone(),
|
||||||
layout_notify: self.system.layout_notify(),
|
layout_watch: self.system.layout_watch(),
|
||||||
layout_version: self.system.cluster_layout().current().version,
|
layout: self.system.cluster_layout().clone(),
|
||||||
add_full_sync_rx,
|
add_full_sync_rx,
|
||||||
todo: vec![],
|
todo: vec![],
|
||||||
next_full_sync: Instant::now() + Duration::from_secs(20),
|
next_full_sync: Instant::now() + Duration::from_secs(20),
|
||||||
|
@ -492,8 +492,8 @@ impl<F: TableSchema, R: TableReplication> EndpointHandler<SyncRpc> for TableSync
|
||||||
|
|
||||||
struct SyncWorker<F: TableSchema, R: TableReplication> {
|
struct SyncWorker<F: TableSchema, R: TableReplication> {
|
||||||
syncer: Arc<TableSyncer<F, R>>,
|
syncer: Arc<TableSyncer<F, R>>,
|
||||||
layout_notify: Arc<Notify>,
|
layout_watch: watch::Receiver<Arc<LayoutHistory>>,
|
||||||
layout_version: u64,
|
layout: Arc<LayoutHistory>,
|
||||||
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
|
add_full_sync_rx: mpsc::UnboundedReceiver<()>,
|
||||||
todo: Vec<TodoPartition>,
|
todo: Vec<TodoPartition>,
|
||||||
next_full_sync: Instant,
|
next_full_sync: Instant,
|
||||||
|
@ -593,11 +593,12 @@ impl<F: TableSchema, R: TableReplication> Worker for SyncWorker<F, R> {
|
||||||
self.add_full_sync();
|
self.add_full_sync();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
_ = self.layout_notify.notified() => {
|
_ = self.layout_watch.changed() => {
|
||||||
let new_version = self.syncer.system.cluster_layout().current().version;
|
let new_layout = self.layout_watch.borrow();
|
||||||
if new_version > self.layout_version {
|
if !Arc::ptr_eq(&new_layout, &self.layout) {
|
||||||
self.layout_version = new_version;
|
self.layout = new_layout.clone();
|
||||||
debug!("({}) Layout changed, adding full sync to syncer todo list", F::TABLE_NAME);
|
drop(new_layout);
|
||||||
|
debug!("({}) Ring changed, adding full sync to syncer todo list", F::TABLE_NAME);
|
||||||
self.add_full_sync();
|
self.add_full_sync();
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
|
Loading…
Add table
Reference in a new issue