2023-11-16 12:26:43 +00:00
|
|
|
use std::collections::HashMap;
|
|
|
|
use std::ops::Deref;
|
|
|
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
|
|
|
|
2023-11-16 12:51:40 +00:00
|
|
|
use serde::{Deserialize, Serialize};
|
|
|
|
|
2023-11-16 12:26:43 +00:00
|
|
|
use garage_util::data::*;
|
|
|
|
|
|
|
|
use super::schema::*;
|
2023-11-27 10:52:57 +00:00
|
|
|
use crate::rpc_helper::RpcHelper;
|
2023-11-16 12:26:43 +00:00
|
|
|
|
2023-11-16 12:51:40 +00:00
|
|
|
#[derive(Debug, Clone, Serialize, Deserialize, Default, PartialEq, Eq)]
|
|
|
|
pub struct LayoutDigest {
|
|
|
|
/// Cluster layout version
|
|
|
|
pub current_version: u64,
|
|
|
|
/// Number of active layout versions
|
|
|
|
pub active_versions: usize,
|
|
|
|
/// Hash of cluster layout update trackers
|
|
|
|
pub trackers_hash: Hash,
|
|
|
|
/// Hash of cluster layout staging data
|
|
|
|
pub staging_hash: Hash,
|
|
|
|
}
|
|
|
|
|
2023-11-16 12:26:43 +00:00
|
|
|
pub struct LayoutHelper {
|
|
|
|
layout: Option<LayoutHistory>,
|
|
|
|
|
|
|
|
// cached values
|
|
|
|
ack_map_min: u64,
|
|
|
|
sync_map_min: u64,
|
|
|
|
|
|
|
|
all_nodes: Vec<Uuid>,
|
|
|
|
all_nongateway_nodes: Vec<Uuid>,
|
|
|
|
|
2023-11-16 12:51:40 +00:00
|
|
|
trackers_hash: Hash,
|
|
|
|
staging_hash: Hash,
|
2023-11-16 12:26:43 +00:00
|
|
|
|
|
|
|
// ack lock: counts in-progress write operations for each
|
|
|
|
// layout version ; we don't increase the ack update tracker
|
|
|
|
// while this lock is nonzero
|
|
|
|
pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
|
|
|
|
}
|
|
|
|
|
|
|
|
impl Deref for LayoutHelper {
|
|
|
|
type Target = LayoutHistory;
|
|
|
|
fn deref(&self) -> &LayoutHistory {
|
|
|
|
self.layout()
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
impl LayoutHelper {
|
|
|
|
pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
|
|
|
|
layout.cleanup_old_versions();
|
|
|
|
|
|
|
|
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
|
|
|
|
layout.clamp_update_trackers(&all_nongateway_nodes);
|
|
|
|
|
|
|
|
let min_version = layout.min_stored();
|
|
|
|
let ack_map_min = layout
|
|
|
|
.update_trackers
|
|
|
|
.ack_map
|
|
|
|
.min(&all_nongateway_nodes, min_version);
|
|
|
|
let sync_map_min = layout
|
|
|
|
.update_trackers
|
|
|
|
.sync_map
|
|
|
|
.min(&all_nongateway_nodes, min_version);
|
|
|
|
|
|
|
|
let all_nodes = layout.get_all_nodes();
|
|
|
|
let trackers_hash = layout.calculate_trackers_hash();
|
|
|
|
let staging_hash = layout.calculate_staging_hash();
|
|
|
|
|
|
|
|
ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
|
|
|
|
ack_lock
|
|
|
|
.entry(layout.current().version)
|
|
|
|
.or_insert(AtomicUsize::new(0));
|
|
|
|
|
|
|
|
LayoutHelper {
|
|
|
|
layout: Some(layout),
|
|
|
|
ack_map_min,
|
|
|
|
sync_map_min,
|
|
|
|
all_nodes,
|
|
|
|
all_nongateway_nodes,
|
|
|
|
trackers_hash,
|
|
|
|
staging_hash,
|
|
|
|
ack_lock,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------ single updating function --------------
|
|
|
|
|
|
|
|
fn layout(&self) -> &LayoutHistory {
|
|
|
|
self.layout.as_ref().unwrap()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn update<F>(&mut self, f: F) -> bool
|
|
|
|
where
|
|
|
|
F: FnOnce(&mut LayoutHistory) -> bool,
|
|
|
|
{
|
|
|
|
let changed = f(&mut self.layout.as_mut().unwrap());
|
|
|
|
if changed {
|
|
|
|
*self = Self::new(
|
|
|
|
self.layout.take().unwrap(),
|
|
|
|
std::mem::take(&mut self.ack_lock),
|
|
|
|
);
|
|
|
|
}
|
|
|
|
changed
|
|
|
|
}
|
|
|
|
|
|
|
|
// ------------------ read helpers ---------------
|
|
|
|
|
|
|
|
pub fn all_nodes(&self) -> &[Uuid] {
|
|
|
|
&self.all_nodes
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn all_nongateway_nodes(&self) -> &[Uuid] {
|
|
|
|
&self.all_nongateway_nodes
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn all_ack(&self) -> u64 {
|
|
|
|
self.ack_map_min
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn sync_versions(&self) -> (u64, u64, u64) {
|
|
|
|
(
|
|
|
|
self.layout().current().version,
|
|
|
|
self.all_ack(),
|
|
|
|
self.layout().min_stored(),
|
|
|
|
)
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
|
|
|
let sync_min = self.sync_map_min;
|
|
|
|
let version = self
|
|
|
|
.layout()
|
|
|
|
.versions
|
|
|
|
.iter()
|
|
|
|
.find(|x| x.version == sync_min)
|
|
|
|
.or(self.layout().versions.last())
|
|
|
|
.unwrap();
|
|
|
|
version
|
|
|
|
.nodes_of(position, version.replication_factor)
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
2023-11-27 10:52:57 +00:00
|
|
|
pub fn block_read_nodes_of(&self, position: &Hash, rpc_helper: &RpcHelper) -> Vec<Uuid> {
|
|
|
|
let mut ret = Vec::with_capacity(12);
|
|
|
|
let ver_iter = self
|
|
|
|
.layout()
|
|
|
|
.versions
|
|
|
|
.iter()
|
|
|
|
.rev()
|
|
|
|
.chain(self.layout().old_versions.iter().rev());
|
|
|
|
for ver in ver_iter {
|
|
|
|
if ver.version > self.sync_map_min {
|
|
|
|
continue;
|
|
|
|
}
|
|
|
|
let nodes = ver.nodes_of(position, ver.replication_factor);
|
|
|
|
for node in rpc_helper.request_order(nodes) {
|
|
|
|
if !ret.contains(&node) {
|
|
|
|
ret.push(node);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
ret
|
|
|
|
}
|
|
|
|
|
2023-11-16 12:26:43 +00:00
|
|
|
pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
|
|
|
|
self.layout()
|
|
|
|
.versions
|
|
|
|
.iter()
|
|
|
|
.map(|x| x.nodes_of(position, x.replication_factor).collect())
|
|
|
|
.collect()
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
|
|
|
let mut ret = vec![];
|
|
|
|
for version in self.layout().versions.iter() {
|
|
|
|
ret.extend(version.nodes_of(position, version.replication_factor));
|
|
|
|
}
|
|
|
|
ret.sort();
|
|
|
|
ret.dedup();
|
|
|
|
ret
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn trackers_hash(&self) -> Hash {
|
|
|
|
self.trackers_hash
|
|
|
|
}
|
|
|
|
|
|
|
|
pub fn staging_hash(&self) -> Hash {
|
|
|
|
self.staging_hash
|
|
|
|
}
|
|
|
|
|
2023-11-16 12:51:40 +00:00
|
|
|
pub fn digest(&self) -> LayoutDigest {
|
|
|
|
LayoutDigest {
|
|
|
|
current_version: self.current().version,
|
|
|
|
active_versions: self.versions.len(),
|
|
|
|
trackers_hash: self.trackers_hash,
|
|
|
|
staging_hash: self.staging_hash,
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2023-11-16 12:26:43 +00:00
|
|
|
// ------------------ helpers for update tracking ---------------
|
|
|
|
|
|
|
|
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
|
|
|
|
// Ensure trackers for this node's values are up-to-date
|
|
|
|
|
|
|
|
// 1. Acknowledge the last layout version which is not currently
|
|
|
|
// locked by an in-progress write operation
|
|
|
|
self.ack_max_free(local_node_id);
|
|
|
|
|
|
|
|
// 2. Assume the data on this node is sync'ed up at least to
|
|
|
|
// the first layout version in the history
|
|
|
|
self.sync_first(local_node_id);
|
|
|
|
|
|
|
|
// 3. Acknowledge everyone has synced up to min(self.sync_map)
|
|
|
|
self.sync_ack(local_node_id);
|
|
|
|
|
|
|
|
info!("ack_map: {:?}", self.update_trackers.ack_map);
|
|
|
|
info!("sync_map: {:?}", self.update_trackers.sync_map);
|
|
|
|
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sync_first(&mut self, local_node_id: Uuid) {
|
|
|
|
let first_version = self.versions.first().as_ref().unwrap().version;
|
|
|
|
self.update(|layout| {
|
|
|
|
layout
|
|
|
|
.update_trackers
|
|
|
|
.sync_map
|
|
|
|
.set_max(local_node_id, first_version)
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
fn sync_ack(&mut self, local_node_id: Uuid) {
|
|
|
|
let sync_map_min = self.sync_map_min;
|
|
|
|
self.update(|layout| {
|
|
|
|
layout
|
|
|
|
.update_trackers
|
|
|
|
.sync_ack_map
|
|
|
|
.set_max(local_node_id, sync_map_min)
|
|
|
|
});
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
|
|
|
|
let max_ack = self.max_free_ack();
|
|
|
|
let changed = self.update(|layout| {
|
|
|
|
layout
|
|
|
|
.update_trackers
|
|
|
|
.ack_map
|
|
|
|
.set_max(local_node_id, max_ack)
|
|
|
|
});
|
|
|
|
if changed {
|
|
|
|
info!("ack_until updated to {}", max_ack);
|
|
|
|
}
|
|
|
|
changed
|
|
|
|
}
|
|
|
|
|
|
|
|
pub(crate) fn max_free_ack(&self) -> u64 {
|
|
|
|
self.layout()
|
|
|
|
.versions
|
|
|
|
.iter()
|
|
|
|
.map(|x| x.version)
|
|
|
|
.take_while(|v| {
|
|
|
|
self.ack_lock
|
|
|
|
.get(v)
|
|
|
|
.map(|x| x.load(Ordering::Relaxed) == 0)
|
|
|
|
.unwrap_or(true)
|
|
|
|
})
|
|
|
|
.max()
|
|
|
|
.unwrap_or(self.min_stored())
|
|
|
|
}
|
|
|
|
}
|