NLnet task 3 #667
10 changed files with 157 additions and 47 deletions
|
@ -366,7 +366,7 @@ impl BlockManager {
|
||||||
.rpc_helper()
|
.rpc_helper()
|
||||||
.try_write_many_sets(
|
.try_write_many_sets(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
&who,
|
who.as_ref(),
|
||||||
put_block_rpc,
|
put_block_rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
RequestStrategy::with_priority(PRIO_NORMAL | PRIO_SECONDARY)
|
||||||
.with_quorum(self.replication.write_quorum()),
|
.with_quorum(self.replication.write_quorum()),
|
||||||
|
|
|
@ -329,7 +329,7 @@ pub async fn fetch_layout(
|
||||||
pub async fn send_layout(
|
pub async fn send_layout(
|
||||||
rpc_cli: &Endpoint<SystemRpc, ()>,
|
rpc_cli: &Endpoint<SystemRpc, ()>,
|
||||||
rpc_host: NodeID,
|
rpc_host: NodeID,
|
||||||
mut layout: LayoutHistory,
|
layout: LayoutHistory,
|
||||||
) -> Result<(), Error> {
|
) -> Result<(), Error> {
|
||||||
rpc_cli
|
rpc_cli
|
||||||
.call(
|
.call(
|
||||||
|
|
|
@ -1,5 +1,7 @@
|
||||||
|
use std::collections::HashMap;
|
||||||
use std::collections::HashSet;
|
use std::collections::HashSet;
|
||||||
use std::ops::Deref;
|
use std::ops::Deref;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
use garage_util::crdt::{Crdt, Lww, LwwMap};
|
use garage_util::crdt::{Crdt, Lww, LwwMap};
|
||||||
use garage_util::data::*;
|
use garage_util::data::*;
|
||||||
|
@ -21,6 +23,11 @@ pub struct LayoutHelper {
|
||||||
|
|
||||||
trackers_hash: Hash,
|
trackers_hash: Hash,
|
||||||
staging_hash: Hash,
|
staging_hash: Hash,
|
||||||
|
|
||||||
|
// ack lock: counts in-progress write operations for each
|
||||||
|
// layout version ; we don't increase the ack update tracker
|
||||||
|
// while this lock is nonzero
|
||||||
|
pub(crate) ack_lock: HashMap<u64, AtomicUsize>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Deref for LayoutHelper {
|
impl Deref for LayoutHelper {
|
||||||
|
@ -31,7 +38,7 @@ impl Deref for LayoutHelper {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl LayoutHelper {
|
impl LayoutHelper {
|
||||||
pub fn new(mut layout: LayoutHistory) -> Self {
|
pub fn new(mut layout: LayoutHistory, mut ack_lock: HashMap<u64, AtomicUsize>) -> Self {
|
||||||
layout.cleanup_old_versions();
|
layout.cleanup_old_versions();
|
||||||
|
|
||||||
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
|
let all_nongateway_nodes = layout.get_all_nongateway_nodes();
|
||||||
|
@ -51,6 +58,11 @@ impl LayoutHelper {
|
||||||
let trackers_hash = layout.calculate_trackers_hash();
|
let trackers_hash = layout.calculate_trackers_hash();
|
||||||
let staging_hash = layout.calculate_staging_hash();
|
let staging_hash = layout.calculate_staging_hash();
|
||||||
|
|
||||||
|
ack_lock.retain(|_, cnt| *cnt.get_mut() > 0);
|
||||||
|
ack_lock
|
||||||
|
.entry(layout.current().version)
|
||||||
|
.or_insert(AtomicUsize::new(0));
|
||||||
|
|
||||||
LayoutHelper {
|
LayoutHelper {
|
||||||
layout: Some(layout),
|
layout: Some(layout),
|
||||||
ack_map_min,
|
ack_map_min,
|
||||||
|
@ -59,6 +71,7 @@ impl LayoutHelper {
|
||||||
all_nongateway_nodes,
|
all_nongateway_nodes,
|
||||||
trackers_hash,
|
trackers_hash,
|
||||||
staging_hash,
|
staging_hash,
|
||||||
|
ack_lock,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -74,7 +87,10 @@ impl LayoutHelper {
|
||||||
{
|
{
|
||||||
let changed = f(&mut self.layout.as_mut().unwrap());
|
let changed = f(&mut self.layout.as_mut().unwrap());
|
||||||
if changed {
|
if changed {
|
||||||
*self = Self::new(self.layout.take().unwrap());
|
*self = Self::new(
|
||||||
|
self.layout.take().unwrap(),
|
||||||
|
std::mem::take(&mut self.ack_lock),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
changed
|
changed
|
||||||
}
|
}
|
||||||
|
@ -115,7 +131,7 @@ impl LayoutHelper {
|
||||||
.collect()
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
|
pub(crate) fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
|
||||||
self.layout()
|
self.layout()
|
||||||
.versions
|
.versions
|
||||||
.iter()
|
.iter()
|
||||||
|
@ -143,42 +159,72 @@ impl LayoutHelper {
|
||||||
|
|
||||||
// ------------------ helpers for update tracking ---------------
|
// ------------------ helpers for update tracking ---------------
|
||||||
|
|
||||||
pub(crate) fn sync_first(&mut self, node: Uuid) {
|
pub(crate) fn update_trackers(&mut self, local_node_id: Uuid) {
|
||||||
let first_version = self.versions.first().as_ref().unwrap().version;
|
// Ensure trackers for this node's values are up-to-date
|
||||||
self.update(|layout| layout.update_trackers.sync_map.set_max(node, first_version));
|
|
||||||
|
// 1. Acknowledge the last layout version which is not currently
|
||||||
|
// locked by an in-progress write operation
|
||||||
|
self.ack_max_free(local_node_id);
|
||||||
|
|
||||||
|
// 2. Assume the data on this node is sync'ed up at least to
|
||||||
|
// the first layout version in the history
|
||||||
|
self.sync_first(local_node_id);
|
||||||
|
|
||||||
|
// 3. Acknowledge everyone has synced up to min(self.sync_map)
|
||||||
|
self.sync_ack(local_node_id);
|
||||||
|
|
||||||
|
info!("ack_map: {:?}", self.update_trackers.ack_map);
|
||||||
|
info!("sync_map: {:?}", self.update_trackers.sync_map);
|
||||||
|
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn sync_ack(&mut self, node: Uuid) {
|
fn sync_first(&mut self, local_node_id: Uuid) {
|
||||||
|
let first_version = self.versions.first().as_ref().unwrap().version;
|
||||||
|
self.update(|layout| {
|
||||||
|
layout
|
||||||
|
.update_trackers
|
||||||
|
.sync_map
|
||||||
|
.set_max(local_node_id, first_version)
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sync_ack(&mut self, local_node_id: Uuid) {
|
||||||
let sync_map_min = self.sync_map_min;
|
let sync_map_min = self.sync_map_min;
|
||||||
self.update(|layout| {
|
self.update(|layout| {
|
||||||
layout
|
layout
|
||||||
.update_trackers
|
.update_trackers
|
||||||
.sync_ack_map
|
.sync_ack_map
|
||||||
.set_max(node, sync_map_min)
|
.set_max(local_node_id, sync_map_min)
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn ack_last(&mut self, node: Uuid) {
|
pub(crate) fn ack_max_free(&mut self, local_node_id: Uuid) -> bool {
|
||||||
let last_version = self.current().version;
|
let max_ack = self.max_free_ack();
|
||||||
self.update(|layout| layout.update_trackers.ack_map.set_max(node, last_version));
|
let changed = self.update(|layout| {
|
||||||
|
layout
|
||||||
|
.update_trackers
|
||||||
|
.ack_map
|
||||||
|
.set_max(local_node_id, max_ack)
|
||||||
|
});
|
||||||
|
if changed {
|
||||||
|
info!("ack_until updated to {}", max_ack);
|
||||||
|
}
|
||||||
|
changed
|
||||||
}
|
}
|
||||||
|
|
||||||
pub(crate) fn update_trackers_of(&mut self, node_id: Uuid) {
|
pub(crate) fn max_free_ack(&self) -> u64 {
|
||||||
// Ensure trackers for this node's values are up-to-date
|
self.layout()
|
||||||
|
.versions
|
||||||
// 1. Acknowledge the last layout version in the history
|
.iter()
|
||||||
self.ack_last(node_id);
|
.map(|x| x.version)
|
||||||
|
.take_while(|v| {
|
||||||
// 2. Assume the data on this node is sync'ed up at least to
|
self.ack_lock
|
||||||
// the first layout version in the history
|
.get(v)
|
||||||
self.sync_first(node_id);
|
.map(|x| x.load(Ordering::Relaxed) == 0)
|
||||||
|
.unwrap_or(true)
|
||||||
// 3. Acknowledge everyone has synced up to min(self.sync_map)
|
})
|
||||||
self.sync_ack(node_id);
|
.max()
|
||||||
|
.unwrap_or(self.min_stored())
|
||||||
info!("ack_map: {:?}", self.update_trackers.ack_map);
|
|
||||||
info!("sync_map: {:?}", self.update_trackers.sync_map);
|
|
||||||
info!("sync_ack_map: {:?}", self.update_trackers.sync_ack_map);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
use std::sync::{Arc, Mutex, RwLock, RwLockReadGuard};
|
use std::sync::{atomic::Ordering, Arc, Mutex, RwLock, RwLockReadGuard};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
@ -74,8 +74,8 @@ impl LayoutManager {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cluster_layout = LayoutHelper::new(cluster_layout);
|
let mut cluster_layout = LayoutHelper::new(cluster_layout, Default::default());
|
||||||
cluster_layout.update_trackers_of(node_id.into());
|
cluster_layout.update_trackers(node_id.into());
|
||||||
|
|
||||||
let layout = Arc::new(RwLock::new(cluster_layout));
|
let layout = Arc::new(RwLock::new(cluster_layout));
|
||||||
let change_notify = Arc::new(Notify::new());
|
let change_notify = Arc::new(Notify::new());
|
||||||
|
@ -139,13 +139,36 @@ impl LayoutManager {
|
||||||
|
|
||||||
let mut layout = self.layout.write().unwrap();
|
let mut layout = self.layout.write().unwrap();
|
||||||
if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
|
if layout.update(|l| l.update_trackers.sync_map.set_max(self.node_id, sync_until)) {
|
||||||
debug!("sync_until updated to {}", sync_until);
|
info!("sync_until updated to {}", sync_until);
|
||||||
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
|
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
|
||||||
layout.update_trackers.clone(),
|
layout.update_trackers.clone(),
|
||||||
));
|
));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
fn ack_new_version(self: &Arc<Self>) {
|
||||||
|
let mut layout = self.layout.write().unwrap();
|
||||||
|
if layout.ack_max_free(self.node_id) {
|
||||||
|
self.broadcast_update(SystemRpc::AdvertiseClusterLayoutTrackers(
|
||||||
|
layout.update_trackers.clone(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ---- ACK LOCKING ----
|
||||||
|
|
||||||
|
pub fn write_sets_of(self: &Arc<Self>, position: &Hash) -> WriteLock<Vec<Vec<Uuid>>> {
|
||||||
|
let layout = self.layout();
|
||||||
|
let version = layout.current().version;
|
||||||
|
let nodes = layout.write_sets_of(position);
|
||||||
|
layout
|
||||||
|
.ack_lock
|
||||||
|
.get(&version)
|
||||||
|
.unwrap()
|
||||||
|
.fetch_add(1, Ordering::Relaxed);
|
||||||
|
WriteLock::new(version, self, nodes)
|
||||||
|
}
|
||||||
|
|
||||||
// ---- INTERNALS ---
|
// ---- INTERNALS ---
|
||||||
|
|
||||||
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
|
fn merge_layout(&self, adv: &LayoutHistory) -> Option<LayoutHistory> {
|
||||||
|
@ -154,7 +177,7 @@ impl LayoutManager {
|
||||||
|
|
||||||
if !prev_layout_check || adv.check().is_ok() {
|
if !prev_layout_check || adv.check().is_ok() {
|
||||||
if layout.update(|l| l.merge(adv)) {
|
if layout.update(|l| l.merge(adv)) {
|
||||||
layout.update_trackers_of(self.node_id);
|
layout.update_trackers(self.node_id);
|
||||||
if prev_layout_check && layout.check().is_err() {
|
if prev_layout_check && layout.check().is_err() {
|
||||||
panic!("Merged two correct layouts and got an incorrect layout.");
|
panic!("Merged two correct layouts and got an incorrect layout.");
|
||||||
}
|
}
|
||||||
|
@ -168,7 +191,7 @@ impl LayoutManager {
|
||||||
let mut layout = self.layout.write().unwrap();
|
let mut layout = self.layout.write().unwrap();
|
||||||
if layout.update_trackers != *adv {
|
if layout.update_trackers != *adv {
|
||||||
if layout.update(|l| l.update_trackers.merge(adv)) {
|
if layout.update(|l| l.update_trackers.merge(adv)) {
|
||||||
layout.update_trackers_of(self.node_id);
|
layout.update_trackers(self.node_id);
|
||||||
return Some(layout.update_trackers.clone());
|
return Some(layout.update_trackers.clone());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -317,3 +340,42 @@ impl LayoutManager {
|
||||||
Ok(SystemRpc::Ok)
|
Ok(SystemRpc::Ok)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ---- ack lock ----
|
||||||
|
|
||||||
|
pub struct WriteLock<T> {
|
||||||
|
layout_version: u64,
|
||||||
|
layout_manager: Arc<LayoutManager>,
|
||||||
|
value: T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> WriteLock<T> {
|
||||||
|
fn new(version: u64, layout_manager: &Arc<LayoutManager>, value: T) -> Self {
|
||||||
|
Self {
|
||||||
|
layout_version: version,
|
||||||
|
layout_manager: layout_manager.clone(),
|
||||||
|
value,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> AsRef<T> for WriteLock<T> {
|
||||||
|
fn as_ref(&self) -> &T {
|
||||||
|
&self.value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T> Drop for WriteLock<T> {
|
||||||
|
fn drop(&mut self) {
|
||||||
|
let layout = self.layout_manager.layout(); // acquire read lock
|
||||||
|
if let Some(counter) = layout.ack_lock.get(&self.layout_version) {
|
||||||
|
let prev_lock = counter.fetch_sub(1, Ordering::Relaxed);
|
||||||
|
if prev_lock == 1 && layout.current().version > self.layout_version {
|
||||||
|
drop(layout); // release read lock, write lock will be acquired
|
||||||
|
self.layout_manager.ack_new_version();
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
error!("Could not find ack lock counter for layout version {}. This probably indicates a bug in Garage.", self.layout_version);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -11,6 +11,7 @@ pub mod manager;
|
||||||
// ---- re-exports ----
|
// ---- re-exports ----
|
||||||
|
|
||||||
pub use history::*;
|
pub use history::*;
|
||||||
|
pub use manager::WriteLock;
|
||||||
pub use schema::*;
|
pub use schema::*;
|
||||||
pub use version::*;
|
pub use version::*;
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,8 @@ pub struct TableFullReplication {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableFullReplication {
|
impl TableReplication for TableFullReplication {
|
||||||
|
type WriteSets = Vec<Vec<Uuid>>;
|
||||||
|
|
||||||
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
||||||
let layout = self.system.cluster_layout();
|
let layout = self.system.cluster_layout();
|
||||||
layout.current().all_nodes().to_vec()
|
layout.current().all_nodes().to_vec()
|
||||||
|
@ -39,7 +41,7 @@ impl TableReplication for TableFullReplication {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
|
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
|
||||||
vec![self.storage_nodes(hash)]
|
vec![self.storage_nodes(hash)]
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
|
|
|
@ -3,6 +3,8 @@ use garage_util::data::*;
|
||||||
|
|
||||||
/// Trait to describe how a table shall be replicated
|
/// Trait to describe how a table shall be replicated
|
||||||
pub trait TableReplication: Send + Sync + 'static {
|
pub trait TableReplication: Send + Sync + 'static {
|
||||||
|
type WriteSets: AsRef<Vec<Vec<Uuid>>> + Send + Sync + 'static;
|
||||||
|
|
||||||
// See examples in table_sharded.rs and table_fullcopy.rs
|
// See examples in table_sharded.rs and table_fullcopy.rs
|
||||||
// To understand various replication methods
|
// To understand various replication methods
|
||||||
|
|
||||||
|
@ -15,7 +17,7 @@ pub trait TableReplication: Send + Sync + 'static {
|
||||||
fn read_quorum(&self) -> usize;
|
fn read_quorum(&self) -> usize;
|
||||||
|
|
||||||
/// Which nodes to send writes to
|
/// Which nodes to send writes to
|
||||||
fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>>;
|
fn write_sets(&self, hash: &Hash) -> Self::WriteSets;
|
||||||
/// Responses needed to consider a write succesfull in each set
|
/// Responses needed to consider a write succesfull in each set
|
||||||
fn write_quorum(&self) -> usize;
|
fn write_quorum(&self) -> usize;
|
||||||
fn max_write_errors(&self) -> usize;
|
fn max_write_errors(&self) -> usize;
|
||||||
|
|
|
@ -25,6 +25,8 @@ pub struct TableShardedReplication {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
|
type WriteSets = WriteLock<Vec<Vec<Uuid>>>;
|
||||||
|
|
||||||
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
self.system.cluster_layout().storage_nodes_of(hash)
|
self.system.cluster_layout().storage_nodes_of(hash)
|
||||||
}
|
}
|
||||||
|
@ -36,8 +38,8 @@ impl TableReplication for TableShardedReplication {
|
||||||
self.read_quorum
|
self.read_quorum
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
|
fn write_sets(&self, hash: &Hash) -> Self::WriteSets {
|
||||||
self.system.cluster_layout().write_sets_of(hash)
|
self.system.layout_manager.write_sets_of(hash)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
self.write_quorum
|
self.write_quorum
|
||||||
|
|
|
@ -173,12 +173,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
}
|
}
|
||||||
|
|
||||||
if !items.is_empty() {
|
if !items.is_empty() {
|
||||||
let nodes = self
|
let nodes = self.data.replication.storage_nodes(begin);
|
||||||
.data
|
|
||||||
.replication
|
|
||||||
.storage_nodes(begin)
|
|
||||||
.into_iter()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
if nodes.contains(&self.system.id) {
|
if nodes.contains(&self.system.id) {
|
||||||
warn!(
|
warn!(
|
||||||
"({}) Interrupting offload as partitions seem to have changed",
|
"({}) Interrupting offload as partitions seem to have changed",
|
||||||
|
@ -202,7 +197,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
end,
|
end,
|
||||||
counter
|
counter
|
||||||
);
|
);
|
||||||
self.offload_items(&items, &nodes[..]).await?;
|
self.offload_items(&items, &nodes).await?;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
|
@ -128,7 +128,7 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
.rpc_helper()
|
.rpc_helper()
|
||||||
.try_write_many_sets(
|
.try_write_many_sets(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
&who,
|
who.as_ref(),
|
||||||
rpc,
|
rpc,
|
||||||
RequestStrategy::with_priority(PRIO_NORMAL)
|
RequestStrategy::with_priority(PRIO_NORMAL)
|
||||||
.with_quorum(self.data.replication.write_quorum()),
|
.with_quorum(self.data.replication.write_quorum()),
|
||||||
|
|
Loading…
Reference in a new issue