avoid wastefully writing blocks to nodes that will discard them as soon as the layout migration is finished
304 lines
8 KiB
Rust
304 lines
8 KiB
Rust
use std::collections::HashMap;
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
use garage_util::data::*;
|
|
|
|
use super::*;
|
|
use crate::replication_mode::*;
|
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
|
pub struct RpcLayoutDigest {
|
|
/// Cluster layout version
|
|
pub current_version: u64,
|
|
/// Number of active layout versions
|
|
pub active_versions: usize,
|
|
/// Hash of cluster layout update trackers
|
|
pub trackers_hash: Hash,
|
|
/// Hash of cluster layout staging data
|
|
pub staging_hash: Hash,
|
|
}
|
|
|
|
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
|
|
pub struct SyncLayoutDigest {
|
|
current: u64,
|
|
ack_map_min: u64,
|
|
min_stored: u64,
|
|
}
|
|
|
|
pub struct LayoutHelper {
|
|
replication_factor: ReplicationFactor,
|
|
consistency_mode: ConsistencyMode,
|
|
layout: Option<LayoutHistory>,
|
|
|
|
// cached values
|
|
ack_map_min: u64,
|
|
sync_map_min: u64,
|
|
|
|
all_nodes: Vec<Uuid>,
|
|
all_nongateway_nodes: Vec<Uuid>,
|
|
|
|
trackers_hash: Hash,
|
|
staging_hash: Hash,
|
|
is_check_ok: bool,
|
|
|
|
// ack lock: counts in-progress write operations for each
|
|
// layout version ; we don't increase the ack update tracker
|
|
// while this lock is nonzero
|
|
pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
|
|
}
|
|
|
|
impl LayoutHelper {
|
|
pub fn new(
|
|
replication_factor: ReplicationFactor,
|
|
consistency_mode: ConsistencyMode,
|
|
mut layout: LayoutHistory,
|
|
mut ack_lock: HashMap<u64, AtomicUsize>,
|
|
) -> Self {
|
|
// In the new() function of the helper, we do a bunch of cleanup
|
|
// and calculations on the layout history to make sure things are
|
|
// correct and we have rapid access to important values such as
|
|
// the layout versions to use when reading to ensure consistency.
|
|
|
|
if consistency_mode != ConsistencyMode::Consistent {
|
|
// Fast path for when no consistency is required.
|
|
// In this case we only need to keep the last version of the layout,
|
|
// we don't care about coordinating stuff in the cluster.
|
|
layout.keep_current_version_only();
|
|
}
|
|
|
|
layout.cleanup_old_versions();
|
|
|
|
let all_nodes = layout.get_all_nodes();
|
|
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
|
|
|
|
layout.clamp_update_trackers(&all_nodes);
|
|
|
|
let min_version = layout.min_stored();
|
|
|
|
// ack_map_min is the minimum value of ack_map among all nodes
|
|
// in the cluster (gateway, non-gateway, current and previous layouts).
|
|
// It is the highest layout version which all of these nodes have
|
|
// acknowledged, indicating that they are aware of it and are no
|
|
// longer processing write operations that did not take it into account.
|
|
let ack_map_min = layout
|
|
.update_trackers
|
|
.ack_map
|
|
.min_among(&all_nodes, min_version);
|
|
|
|
// sync_map_min is the minimum value of sync_map among storage nodes
|
|
// in the cluster (non-gateway nodes only, current and previous layouts).
|
|
// It is the highest layout version for which we know that all relevant
|
|
// storage nodes have fulfilled a sync, and therefore it is safe to
|
|
// use a read quorum within that layout to ensure consistency.
|
|
// Gateway nodes are excluded here because they hold no relevant data
|
|
// (they store the bucket and access key tables, but we don't have
|
|
// consistency on those).
|
|
// This value is calculated using quorums to allow progress even
|
|
// if not all nodes have successfully completed a sync.
|
|
let sync_map_min =
|
|
layout.calculate_sync_map_min_with_quorum(replication_factor, &all_nongateway_nodes);
|
|
|
|
let trackers_hash = layout.calculate_trackers_hash();
|
|
let staging_hash = layout.calculate_staging_hash();
|
|
|
|
ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
|
|
ack_lock
|
|
.entry(layout.current().version)
|
|
.or_insert(AtomicUsize::new(0));
|
|
|
|
let is_check_ok = layout.check().is_ok();
|
|
|
|
LayoutHelper {
|
|
replication_factor,
|
|
consistency_mode,
|
|
layout: Some(layout),
|
|
ack_map_min,
|
|
sync_map_min,
|
|
all_nodes,
|
|
all_nongateway_nodes,
|
|
trackers_hash,
|
|
staging_hash,
|
|
ack_lock,
|
|
is_check_ok,
|
|
}
|
|
}
|
|
|
|
// ------------------ single updating function --------------
|
|
|
|
pub(crate) fn update<F>(&mut self, f: F) -> bool
|
|
where
|
|
F: FnOnce(&mut LayoutHistory) -> bool,
|
|
{
|
|
let changed = f(self.layout.as_mut().unwrap());
|
|
if changed {
|
|
*self = Self::new(
|
|
self.replication_factor,
|
|
self.consistency_mode,
|
|
self.layout.take().unwrap(),
|
|
std::mem::take(&mut self.ack_lock),
|
|
);
|
|
}
|
|
changed
|
|
}
|
|
|
|
// ------------------ read helpers ---------------
|
|
|
|
pub fn inner(&self) -> &LayoutHistory {
|
|
self.layout.as_ref().unwrap()
|
|
}
|
|
|
|
pub fn current(&self) -> &LayoutVersion {
|
|
self.inner().current()
|
|
}
|
|
|
|
pub fn versions(&self) -> &[LayoutVersion] {
|
|
&self.inner().versions
|
|
}
|
|
|
|
pub fn is_check_ok(&self) -> bool {
|
|
self.is_check_ok
|
|
}
|
|
|
|
/// Return all nodes that have a role (gateway or storage)
|
|
/// in one of the currently active layout versions
|
|
pub fn all_nodes(&self) -> &[Uuid] {
|
|
&self.all_nodes
|
|
}
|
|
|
|
/// Return all nodes that are configured to store data
|
|
/// in one of the currently active layout versions
|
|
pub fn all_nongateway_nodes(&self) -> &[Uuid] {
|
|
&self.all_nongateway_nodes
|
|
}
|
|
|
|
pub fn ack_map_min(&self) -> u64 {
|
|
self.ack_map_min
|
|
}
|
|
|
|
pub fn sync_map_min(&self) -> u64 {
|
|
self.sync_map_min
|
|
}
|
|
|
|
pub fn sync_digest(&self) -> SyncLayoutDigest {
|
|
SyncLayoutDigest {
|
|
current: self.current().version,
|
|
ack_map_min: self.ack_map_min(),
|
|
min_stored: self.inner().min_stored(),
|
|
}
|
|
}
|
|
|
|
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
|
let sync_min = self.sync_map_min;
|
|
let version = self
|
|
.versions()
|
|
.iter()
|
|
.find(|x| x.version == sync_min)
|
|
.or(self.versions().last())
|
|
.unwrap();
|
|
version
|
|
.nodes_of(position, version.replication_factor)
|
|
.collect()
|
|
}
|
|
|
|
pub fn storage_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
|
|
self.versions()
|
|
.iter()
|
|
.map(|x| x.nodes_of(position, x.replication_factor).collect())
|
|
.collect()
|
|
}
|
|
|
|
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
|
let mut ret = vec![];
|
|
for version in self.versions().iter() {
|
|
ret.extend(version.nodes_of(position, version.replication_factor));
|
|
}
|
|
ret.sort();
|
|
ret.dedup();
|
|
ret
|
|
}
|
|
|
|
pub fn current_storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
|
let ver = self.current();
|
|
ver.nodes_of(position, ver.replication_factor).collect()
|
|
}
|
|
|
|
pub fn trackers_hash(&self) -> Hash {
|
|
self.trackers_hash
|
|
}
|
|
|
|
pub fn staging_hash(&self) -> Hash {
|
|
self.staging_hash
|
|
}
|
|
|
|
pub fn digest(&self) -> RpcLayoutDigest {
|
|
RpcLayoutDigest {
|
|
current_version: self.current().version,
|
|
active_versions: self.versions().len(),
|
|
trackers_hash: self.trackers_hash,
|
|
staging_hash: self.staging_hash,
|
|
}
|
|
}
|
|
|
|
// ------------------ helpers for update tracking ---------------
|
|
|
|
pub(crate) fn update_update_trackers(&mut self, local_node_id: Uuid) {
|
|
// Ensure trackers for this node's values are up-to-date
|
|
|
|
// 1. Acknowledge the last layout version which is not currently
|
|
// locked by an in-progress write operation
|
|
self.update_ack_to_max_free(local_node_id);
|
|
|
|
// 2. Assume the data on this node is sync'ed up at least to
|
|
// the first layout version in the history
|
|
let first_version = self.inner().min_stored();
|
|
self.update(|layout| {
|
|
layout
|
|
.update_trackers
|
|
.sync_map
|
|
.set_max(local_node_id, first_version)
|
|
});
|
|
|
|
// 3. Acknowledge everyone has synced up to min(self.sync_map)
|
|
let sync_map_min = self.sync_map_min;
|
|
self.update(|layout| {
|
|
layout
|
|
.update_trackers
|
|
.sync_ack_map
|
|
.set_max(local_node_id, sync_map_min)
|
|
});
|
|
|
|
debug!("ack_map: {:?}", self.inner().update_trackers.ack_map);
|
|
debug!("sync_map: {:?}", self.inner().update_trackers.sync_map);
|
|
debug!(
|
|
"sync_ack_map: {:?}",
|
|
self.inner().update_trackers.sync_ack_map
|
|
);
|
|
}
|
|
|
|
pub(crate) fn update_ack_to_max_free(&mut self, local_node_id: Uuid) -> bool {
|
|
let max_free = self
|
|
.versions()
|
|
.iter()
|
|
.map(|x| x.version)
|
|
.skip_while(|v| {
|
|
self.ack_lock
|
|
.get(v)
|
|
.map(|x| x.load(Ordering::Relaxed) == 0)
|
|
.unwrap_or(true)
|
|
})
|
|
.next()
|
|
.unwrap_or(self.current().version);
|
|
let changed = self.update(|layout| {
|
|
layout
|
|
.update_trackers
|
|
.ack_map
|
|
.set_max(local_node_id, max_free)
|
|
});
|
|
if changed {
|
|
info!("ack_until updated to {}", max_free);
|
|
}
|
|
changed
|
|
}
|
|
}
|