From 3b361d2959e3d577bdae6f8a5ccb0c9d5526b7ea Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 14 Nov 2023 14:28:16 +0100 Subject: [PATCH] layout: prepare for write sets --- src/block/manager.rs | 3 ++- src/block/resync.rs | 2 +- src/model/k2v/rpc.rs | 10 ++++++---- src/rpc/layout/history.rs | 19 ++++++++++++++++--- src/rpc/layout/version.rs | 21 ++++++++++----------- src/rpc/system.rs | 3 +-- src/table/data.rs | 3 ++- src/table/gc.rs | 2 +- src/table/replication/fullcopy.rs | 9 +++++++-- src/table/replication/parameters.rs | 8 +++++--- src/table/replication/sharded.rs | 24 ++++++++---------------- src/table/sync.rs | 2 +- src/table/table.rs | 6 ++++-- 13 files changed, 64 insertions(+), 48 deletions(-) diff --git a/src/block/manager.rs b/src/block/manager.rs index 72b4ea66..2bb9c23d 100644 --- a/src/block/manager.rs +++ b/src/block/manager.rs @@ -354,7 +354,8 @@ impl BlockManager { /// Send block to nodes that should have it pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash); + // TODO: use quorums among latest write set + let who = self.replication.storage_nodes(&hash); let (header, bytes) = DataBlock::from_buffer(data, self.compression_level) .await diff --git a/src/block/resync.rs b/src/block/resync.rs index fedcd6f5..122d0142 100644 --- a/src/block/resync.rs +++ b/src/block/resync.rs @@ -377,7 +377,7 @@ impl BlockResyncManager { info!("Resync block {:?}: offloading and deleting", hash); let existing_path = existing_path.unwrap(); - let mut who = manager.replication.write_nodes(hash); + let mut who = manager.replication.storage_nodes(hash); if who.len() < manager.replication.write_quorum() { return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string())); } diff --git a/src/model/k2v/rpc.rs b/src/model/k2v/rpc.rs index 2f548ad7..aa3323d5 100644 --- a/src/model/k2v/rpc.rs +++ b/src/model/k2v/rpc.rs @@ -127,7 +127,7 @@ impl K2VRpcHandler { .item_table .data .replication - .write_nodes(&partition.hash()); + .storage_nodes(&partition.hash()); who.sort(); self.system @@ -168,7 +168,7 @@ impl K2VRpcHandler { .item_table .data .replication - .write_nodes(&partition.hash()); + .storage_nodes(&partition.hash()); who.sort(); call_list.entry(who).or_default().push(InsertedItem { @@ -223,11 +223,12 @@ impl K2VRpcHandler { }, sort_key, }; + // TODO figure this out with write sets, does it still work???? let nodes = self .item_table .data .replication - .write_nodes(&poll_key.partition.hash()); + .read_nodes(&poll_key.partition.hash()); let rpc = self.system.rpc_helper().try_call_many( &self.endpoint, @@ -284,11 +285,12 @@ impl K2VRpcHandler { seen.restrict(&range); // Prepare PollRange RPC to send to the storage nodes responsible for the parititon + // TODO figure this out with write sets, does it still work???? let nodes = self .item_table .data .replication - .write_nodes(&range.partition.hash()); + .read_nodes(&range.partition.hash()); let quorum = self.item_table.data.replication.read_quorum(); let msg = K2VRpc::PollRange { range, diff --git a/src/rpc/layout/history.rs b/src/rpc/layout/history.rs index 69348873..dce492c9 100644 --- a/src/rpc/layout/history.rs +++ b/src/rpc/layout/history.rs @@ -98,13 +98,26 @@ impl LayoutHistory { .find(|x| x.version == sync_min) .or(self.versions.last()) .unwrap(); - version.nodes_of(position, version.replication_factor) + version + .nodes_of(position, version.replication_factor) + .collect() } - pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator> + 'a { + pub fn write_sets_of(&self, position: &Hash) -> Vec> { self.versions .iter() - .map(move |x| x.nodes_of(position, x.replication_factor)) + .map(|x| x.nodes_of(position, x.replication_factor).collect()) + .collect() + } + + pub fn storage_nodes_of(&self, position: &Hash) -> Vec { + let mut ret = vec![]; + for version in self.versions.iter() { + ret.extend(version.nodes_of(position, version.replication_factor)); + } + ret.sort(); + ret.dedup(); + ret } // ------------------ update tracking --------------- diff --git a/src/rpc/layout/version.rs b/src/rpc/layout/version.rs index 2cbdcee2..912ee538 100644 --- a/src/rpc/layout/version.rs +++ b/src/rpc/layout/version.rs @@ -107,25 +107,24 @@ impl LayoutVersion { } /// Return the n servers in which data for this hash should be replicated - pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec { + pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator + '_ { assert_eq!(n, self.replication_factor); let data = &self.ring_assignment_data; - if data.len() != self.replication_factor * (1 << PARTITION_BITS) { + let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) { + let partition_idx = self.partition_of(position) as usize; + let partition_start = partition_idx * self.replication_factor; + let partition_end = (partition_idx + 1) * self.replication_factor; + &data[partition_start..partition_end] + } else { warn!("Ring not yet ready, read/writes will be lost!"); - return vec![]; - } - - let partition_idx = self.partition_of(position) as usize; - let partition_start = partition_idx * self.replication_factor; - let partition_end = (partition_idx + 1) * self.replication_factor; - let partition_nodes = &data[partition_start..partition_end]; + &[] + }; partition_nodes .iter() - .map(|i| self.node_id_vec[*i as usize]) - .collect::>() + .map(move |i| self.node_id_vec[*i as usize]) } // ===================== internal information extractors ====================== diff --git a/src/rpc/system.rs b/src/rpc/system.rs index 86c02e86..31d78bf6 100644 --- a/src/rpc/system.rs +++ b/src/rpc/system.rs @@ -449,8 +449,7 @@ impl System { .iter() .map(|(_, h)| { let pn = layout.current().nodes_of(h, replication_factor); - pn.iter() - .filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) + pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false)) .count() }) .collect::>(); diff --git a/src/table/data.rs b/src/table/data.rs index bbfdf58b..7f6b7847 100644 --- a/src/table/data.rs +++ b/src/table/data.rs @@ -254,7 +254,8 @@ impl TableData { // of the GC algorithm, as in all cases GC is suspended if // any node of the partition is unavailable. let pk_hash = Hash::try_from(&tree_key[..32]).unwrap(); - let nodes = self.replication.write_nodes(&pk_hash); + // TODO: this probably breaks when the layout changes + let nodes = self.replication.storage_nodes(&pk_hash); if nodes.first() == Some(&self.system.id) { GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?; } diff --git a/src/table/gc.rs b/src/table/gc.rs index 2135a358..002cfbf4 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -152,7 +152,7 @@ impl TableGc { let mut partitions = HashMap::new(); for entry in entries { let pkh = Hash::try_from(&entry.key[..32]).unwrap(); - let mut nodes = self.data.replication.write_nodes(&pkh); + let mut nodes = self.data.replication.storage_nodes(&pkh); nodes.retain(|x| *x != self.system.id); nodes.sort(); diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index beaacc2b..cb5471af 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -27,6 +27,11 @@ pub struct TableFullReplication { } impl TableReplication for TableFullReplication { + fn storage_nodes(&self, _hash: &Hash) -> Vec { + let layout = self.system.cluster_layout(); + layout.current().all_nodes().to_vec() + } + fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } @@ -34,8 +39,8 @@ impl TableReplication for TableFullReplication { 1 } - fn write_nodes(&self, _hash: &Hash) -> Vec { - self.system.cluster_layout().current().all_nodes().to_vec() + fn write_sets(&self, hash: &Hash) -> Vec> { + vec![self.storage_nodes(hash)] } fn write_quorum(&self) -> usize { let nmembers = self.system.cluster_layout().current().all_nodes().len(); diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 2a7d3585..2f842409 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -6,21 +6,23 @@ pub trait TableReplication: Send + Sync + 'static { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods + /// The entire list of all nodes that store a partition + fn storage_nodes(&self, hash: &Hash) -> Vec; + /// Which nodes to send read requests to fn read_nodes(&self, hash: &Hash) -> Vec; /// Responses needed to consider a read succesfull fn read_quorum(&self) -> usize; /// Which nodes to send writes to - fn write_nodes(&self, hash: &Hash) -> Vec; - /// Responses needed to consider a write succesfull + fn write_sets(&self, hash: &Hash) -> Vec>; + /// Responses needed to consider a write succesfull in each set fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; // Accessing partitions, for Merkle tree & sync /// Get partition for data with given hash fn partition_of(&self, hash: &Hash) -> Partition; - /// List of partitions and nodes to sync with in current layout fn sync_partitions(&self) -> SyncPartitions; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index f02b1d66..1320a189 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -25,21 +25,19 @@ pub struct TableShardedReplication { } impl TableReplication for TableShardedReplication { + fn storage_nodes(&self, hash: &Hash) -> Vec { + self.system.cluster_layout().storage_nodes_of(hash) + } + fn read_nodes(&self, hash: &Hash) -> Vec { - self.system - .cluster_layout() - .current() - .nodes_of(hash, self.replication_factor) + self.system.cluster_layout().read_nodes_of(hash) } fn read_quorum(&self) -> usize { self.read_quorum } - fn write_nodes(&self, hash: &Hash) -> Vec { - self.system - .cluster_layout() - .current() - .nodes_of(hash, self.replication_factor) + fn write_sets(&self, hash: &Hash) -> Vec> { + self.system.cluster_layout().write_sets_of(hash) } fn write_quorum(&self) -> usize { self.write_quorum @@ -60,13 +58,7 @@ impl TableReplication for TableShardedReplication { .current() .partitions() .map(|(partition, first_hash)| { - let mut storage_nodes = layout - .write_sets_of(&first_hash) - .map(|x| x.into_iter()) - .flatten() - .collect::>(); - storage_nodes.sort(); - storage_nodes.dedup(); + let storage_nodes = layout.storage_nodes_of(&first_hash); SyncPartition { partition, first_hash, diff --git a/src/table/sync.rs b/src/table/sync.rs index 8c21db8b..b67cdd79 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -176,7 +176,7 @@ impl TableSyncer { let nodes = self .data .replication - .write_nodes(begin) + .storage_nodes(begin) .into_iter() .collect::>(); if nodes.contains(&self.system.id) { diff --git a/src/table/table.rs b/src/table/table.rs index 997fd7dc..bf08d5a0 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -119,7 +119,8 @@ impl Table { async fn insert_internal(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.data.replication.write_nodes(&hash); + // TODO: use write sets + let who = self.data.replication.storage_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(e.encode()?)); let rpc = TableRpc::::Update(vec![e_enc]); @@ -171,7 +172,8 @@ impl Table { for entry in entries.into_iter() { let entry = entry.borrow(); let hash = entry.partition_key().hash(); - let who = self.data.replication.write_nodes(&hash); + // TODO: use write sets + let who = self.data.replication.storage_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(entry.encode()?)); for node in who { call_list.entry(node).or_default().push(e_enc.clone());