layout: add helpers to LayoutHistory and prepare integration with Table

This commit is contained in:
Alex 2023-11-09 16:32:31 +01:00
parent 9d95f6f704
commit df36cf3099
Signed by: lx
GPG key ID: 0E496D15096376BE
5 changed files with 74 additions and 13 deletions

View file

@ -32,14 +32,6 @@ impl LayoutHistory {
self.versions.last().as_ref().unwrap()
}
pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
self.versions
.iter()
.map(|x| x.nongateway_nodes())
.flatten()
.collect::<HashSet<_>>()
}
pub fn update_hashes(&mut self) {
self.trackers_hash = self.calculate_trackers_hash();
self.staging_hash = self.calculate_staging_hash();
@ -53,6 +45,39 @@ impl LayoutHistory {
blake2sum(&nonversioned_encode(&self.staging).unwrap()[..])
}
// ------------------ who stores what now? ---------------
pub fn max_ack(&self) -> u64 {
self.calculate_global_min(&self.update_trackers.ack_map)
}
pub fn all_storage_nodes(&self) -> HashSet<Uuid> {
// TODO: cache this
self.versions
.iter()
.map(|x| x.nongateway_nodes())
.flatten()
.collect::<HashSet<_>>()
}
pub fn read_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
let sync_min = self.calculate_global_min(&self.update_trackers.sync_map);
let version = self
.versions
.iter()
.find(|x| x.version == sync_min)
.or(self.versions.last())
.unwrap();
version.nodes_of(position, version.replication_factor)
}
pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
self.versions
.iter()
.map(|x| x.nodes_of(position, x.replication_factor))
.collect::<Vec<_>>()
}
// ------------------ update tracking ---------------
pub(crate) fn update_trackers(&mut self, node_id: Uuid) {

View file

@ -1,4 +1,5 @@
use std::sync::{Arc, RwLock, RwLockReadGuard};
use std::collections::HashMap;
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
use std::time::Duration;
use serde::{Deserialize, Serialize};
@ -26,6 +27,8 @@ pub struct LayoutManager {
layout: Arc<RwLock<LayoutHistory>>,
pub(crate) change_notify: Arc<Notify>,
table_sync_version: Mutex<HashMap<String, u64>>,
pub(crate) rpc_helper: RpcHelper,
system_endpoint: Arc<Endpoint<SystemRpc, System>>,
}
@ -117,6 +120,34 @@ impl LayoutManager {
Ok(())
}
pub fn add_table(&self, table_name: &'static str) {
let first_version = self.layout().versions.first().unwrap().version;
self.table_sync_version
.lock()
.unwrap()
.insert(table_name.to_string(), first_version);
}
pub fn sync_table_until(self: &Arc<Self>, table_name: &'static str, version: u64) {
let mut table_sync_version = self.table_sync_version.lock().unwrap();
*table_sync_version.get_mut(table_name).unwrap() = version;
let sync_until = table_sync_version.iter().map(|(_, v)| *v).max().unwrap();
drop(table_sync_version);
let mut layout = self.layout.write().unwrap();
if layout
.update_trackers
.sync_map
.set_max(self.node_id, sync_until)
{
layout.update_hashes();
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
layout.update_trackers.clone(),
));
}
}
// ---- INTERNALS ---
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {

View file

@ -375,14 +375,17 @@ impl UpdateTracker {
changed
}
pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) {
pub(crate) fn set_max(&mut self, peer: Uuid, value: u64) -> bool {
match self.0.get_mut(&peer) {
Some(e) => {
*e = std::cmp::max(*e, value);
Some(e) if *e < value => {
*e = value;
true
}
None => {
self.0.insert(peer, value);
true
}
_ => false,
}
}
}

View file

@ -109,7 +109,7 @@ impl LayoutVersion {
.collect::<Vec<_>>()
}
/// Walk the ring to find the n servers in which data should be replicated
/// Return the n servers in which data for this hash should be replicated
pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> {
assert_eq!(n, self.replication_factor);

View file

@ -80,6 +80,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
let syncer = TableSyncer::new(system.clone(), data.clone(), merkle_updater.clone());
let gc = TableGc::new(system.clone(), data.clone());
system.layout_manager.add_table(F::TABLE_NAME);
let table = Arc::new(Self {
system,
data,