From 2a41b8238496dfeac5ee0f273445299cbd112ff6 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 12:18:03 +0100 Subject: [PATCH] Simpler Merkle & sync --- src/rpc/ring.rs | 21 ++++- src/table/merkle.rs | 29 +++--- src/table/replication/fullcopy.rs | 15 ++-- src/table/replication/parameters.rs | 10 +-- src/table/replication/sharded.rs | 22 ++--- src/table/sync.rs | 134 ++++++++-------------------- 6 files changed, 85 insertions(+), 146 deletions(-) diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index a0fdcf8..490fb1d 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -5,6 +5,11 @@ use serde::{Deserialize, Serialize}; use garage_util::data::*; +// A partition number is encoded on 16 bits, +// i.e. we have up to 2**16 partitions. +// (in practice we have exactly 2**PARTITION_BITS partitions) +pub type Partition = u16; + // TODO: make this constant parametrizable in the config file // For deployments with many nodes it might make sense to bump // it up to 10. @@ -17,6 +22,7 @@ const PARTITION_MASK_U16: u16 = ((1 << PARTITION_BITS) - 1) << (16 - PARTITION_B // (most deployments use a replication factor of 3, so...) pub const MAX_REPLICATION: usize = 3; + #[derive(Clone, Debug, Serialize, Deserialize)] pub struct NetworkConfig { pub members: HashMap, @@ -170,11 +176,24 @@ impl Ring { Self { config, ring } } - pub fn partition_of(&self, from: &Hash) -> u16 { + pub fn partition_of(&self, from: &Hash) -> Partition { let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); top >> (16 - PARTITION_BITS) } + pub fn partitions(&self) -> Vec<(Partition, Hash)> { + let mut ret = vec![]; + + for (i, entry) in self.ring.iter().enumerate() { + ret.push((i as u16, entry.location)); + } + if ret.len() > 0 { + assert_eq!(ret[0].1, [0u8; 32].into()); + } + + ret + } + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); diff --git a/src/table/merkle.rs b/src/table/merkle.rs index 86fef4c..db05cca 100644 --- a/src/table/merkle.rs +++ b/src/table/merkle.rs @@ -1,4 +1,3 @@ -use std::convert::TryInto; use std::sync::Arc; use std::time::Duration; @@ -15,22 +14,12 @@ use garage_util::background::BackgroundRunner; use garage_util::data::*; use garage_util::error::Error; +use garage_rpc::ring::*; + use crate::data::*; use crate::replication::*; use crate::schema::*; -pub type MerklePartition = [u8; 2]; - -pub fn hash_of_merkle_partition(p: MerklePartition) -> Hash { - let mut partition_pos = [0u8; 32]; - partition_pos[0..2].copy_from_slice(&p[..]); - partition_pos.into() -} - -pub fn hash_of_merkle_partition_opt(p: Option) -> Hash { - p.map(hash_of_merkle_partition) - .unwrap_or([0xFFu8; 32].into()) -} // This modules partitions the data in 2**16 partitions, based on the top // 16 bits (two bytes) of item's partition keys' hashes. @@ -57,8 +46,8 @@ pub struct MerkleUpdater { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct MerkleNodeKey { - // partition: first 16 bits (two bytes) of the partition_key's hash - pub partition: [u8; 2], + // partition number + pub partition: Partition, // prefix: a prefix for the hash of full keys, i.e. hash(hash(partition_key)+sort_key) #[serde(with = "serde_bytes")] @@ -143,7 +132,7 @@ where }; let key = MerkleNodeKey { - partition: k[0..2].try_into().unwrap(), + partition: self.data.replication.partition_of(&Hash::try_from(&k[0..32]).unwrap()), prefix: vec![], }; self.data @@ -325,7 +314,7 @@ where impl MerkleNodeKey { fn encode(&self) -> Vec { let mut ret = Vec::with_capacity(2 + self.prefix.len()); - ret.extend(&self.partition[..]); + ret.extend(&u16::to_be_bytes(self.partition)[..]); ret.extend(&self.prefix[..]); ret } @@ -443,3 +432,9 @@ fn test_intermediate_aux() { ] ); } + +impl MerkleNode { + pub fn is_empty(&self) -> bool { + *self == MerkleNode::Empty + } +} diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index aea8c1f..bd658f6 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; use crate::replication::*; @@ -19,10 +19,6 @@ impl TableReplication for TableFullReplication { // Advantage: do all reads locally, extremely fast // Inconvenient: only suitable to reasonably small tables - fn partition_of(&self, _hash: &Hash) -> u16 { - 0u16 - } - fn read_nodes(&self, _hash: &Hash) -> Vec { vec![self.system.id] } @@ -46,9 +42,10 @@ impl TableReplication for TableFullReplication { self.max_faults } - fn split_points(&self, _ring: &Ring) -> Vec { - let mut ret = vec![]; - ret.push([0u8; 32].into()); - ret + fn partition_of(&self, _hash: &Hash) -> Partition { + 0u16 + } + fn partitions(&self) -> Vec<(Partition, Hash)> { + vec![(0u16, [0u8; 32].into())] } } diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index ace82bd..e46bd17 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,4 +1,4 @@ -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; @@ -6,9 +6,6 @@ pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods - // Partition number of data item (for Merkle tree) - fn partition_of(&self, hash: &Hash) -> u16; - // Which nodes to send reads from fn read_nodes(&self, hash: &Hash) -> Vec; fn read_quorum(&self) -> usize; @@ -18,6 +15,7 @@ pub trait TableReplication: Send + Sync { fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; - // Get partition boundaries - fn split_points(&self, ring: &Ring) -> Vec; + // Accessing partitions, for Merkle tree & sync + fn partition_of(&self, hash: &Hash) -> Partition; + fn partitions(&self) -> Vec<(Partition, Hash)>; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 966be31..dce74b0 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,7 +1,7 @@ use std::sync::Arc; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_util::data::*; use crate::replication::*; @@ -22,10 +22,6 @@ impl TableReplication for TableShardedReplication { // - reads are done on all of the nodes that replicate the data // - writes as well - fn partition_of(&self, hash: &Hash) -> u16 { - self.system.ring.borrow().partition_of(hash) - } - fn read_nodes(&self, hash: &Hash) -> Vec { let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) @@ -45,16 +41,10 @@ impl TableReplication for TableShardedReplication { self.replication_factor - self.write_quorum } - fn split_points(&self, ring: &Ring) -> Vec { - let mut ret = vec![]; - - for entry in ring.ring.iter() { - ret.push(entry.location); - } - if ret.len() > 0 { - assert_eq!(ret[0], [0u8; 32].into()); - } - - ret + fn partition_of(&self, hash: &Hash) -> Partition { + self.system.ring.borrow().partition_of(hash) + } + fn partitions(&self) -> Vec<(Partition, Hash)> { + self.system.ring.borrow().partitions() } } diff --git a/src/table/sync.rs b/src/table/sync.rs index 9c14839..f5c2ef3 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -1,5 +1,4 @@ use std::collections::VecDeque; -use std::convert::TryInto; use std::sync::{Arc, Mutex}; use std::time::{Duration, Instant}; @@ -15,7 +14,7 @@ use garage_util::data::*; use garage_util::error::Error; use garage_rpc::membership::System; -use garage_rpc::ring::Ring; +use garage_rpc::ring::*; use garage_rpc::rpc_client::*; use garage_rpc::rpc_server::*; @@ -38,20 +37,10 @@ pub struct TableSyncer { rpc_client: Arc>, } -type RootCk = Vec<(MerklePartition, Hash)>; - -#[derive(Debug, Clone, Copy, Serialize, Deserialize)] -pub struct PartitionRange { - begin: MerklePartition, - // if end is None, go all the way to partition 0xFFFF included - end: Option, -} - #[derive(Serialize, Deserialize)] pub(crate) enum SyncRPC { - RootCkHash(PartitionRange, Hash), - RootCkList(PartitionRange, RootCk), - CkNoDifference, + RootCkHash(Partition, Hash), + RootCkDifferent(bool), GetNode(MerkleNodeKey), Node(MerkleNodeKey, MerkleNode), Items(Vec>), @@ -66,7 +55,9 @@ struct SyncTodo { #[derive(Debug, Clone)] struct TodoPartition { - range: PartitionRange, + partition: Partition, + begin: Hash, + end: Hash, // Are we a node that stores this partition or not? retain: bool, @@ -222,7 +213,7 @@ where let nodes = self .data .replication - .write_nodes(&hash_of_merkle_partition(partition.range.begin)) + .write_nodes(&partition.begin) .into_iter() .filter(|node| *node != my_id) .collect::>(); @@ -254,8 +245,8 @@ where } } else { self.offload_partition( - &hash_of_merkle_partition(partition.range.begin), - &hash_of_merkle_partition_opt(partition.range.end), + &partition.begin, + &partition.end, must_exit, ) .await?; @@ -364,30 +355,13 @@ where // side to the other will happen when the other side syncs with us, // which they also do regularly. - fn get_root_ck(&self, range: PartitionRange) -> Result { - let begin = u16::from_be_bytes(range.begin); - let range_iter = match range.end { - Some(end) => { - let end = u16::from_be_bytes(end); - begin..=(end - 1) - } - None => begin..=0xFFFF, + fn get_root_ck(&self, partition: Partition) -> Result<(MerkleNodeKey, MerkleNode), Error> { + let key = MerkleNodeKey { + partition, + prefix: vec![], }; - - let mut ret = vec![]; - for i in range_iter { - let key = MerkleNodeKey { - partition: u16::to_be_bytes(i), - prefix: vec![], - }; - match self.merkle.read_node(&key)? { - MerkleNode::Empty => (), - x => { - ret.push((key.partition, hash_of(&x)?)); - } - } - } - Ok(ret) + let node = self.merkle.read_node(&key)?; + Ok((key, node)) } async fn do_sync_with( @@ -396,7 +370,7 @@ where who: UUID, must_exit: watch::Receiver, ) -> Result<(), Error> { - let root_ck = self.get_root_ck(partition.range)?; + let (root_ck_key, root_ck) = self.get_root_ck(partition.partition)?; if root_ck.is_empty() { debug!( "({}) Sync {:?} with {:?}: partition is empty.", @@ -404,51 +378,29 @@ where ); return Ok(()); } + let root_ck_hash = hash_of::(&root_ck)?; - let root_ck_hash = hash_of(&root_ck)?; - - // If their root checksum has level > than us, use that as a reference + // Check if they have the same root checksum + // If so, do nothing. let root_resp = self .rpc_client .call( who, - SyncRPC::RootCkHash(partition.range, root_ck_hash), + SyncRPC::RootCkHash(partition.partition, root_ck_hash), TABLE_SYNC_RPC_TIMEOUT, ) .await?; let mut todo = match root_resp { - SyncRPC::CkNoDifference => { + SyncRPC::RootCkDifferent(false) => { debug!( "({}) Sync {:?} with {:?}: no difference", self.data.name, partition, who ); return Ok(()); } - SyncRPC::RootCkList(_, their_root_ck) => { - let join = join_ordered(&root_ck[..], &their_root_ck[..]); - let mut todo = VecDeque::new(); - for (p, v1, v2) in join.iter() { - let diff = match (v1, v2) { - (Some(_), None) | (None, Some(_)) => true, - (Some(a), Some(b)) => a != b, - _ => false, - }; - if diff { - todo.push_back(MerkleNodeKey { - partition: **p, - prefix: vec![], - }); - } - } - debug!( - "({}) Sync {:?} with {:?}: todo.len() = {}", - self.data.name, - partition, - who, - todo.len() - ); - todo + SyncRPC::RootCkDifferent(true) => { + VecDeque::from(vec![root_ck_key]) } x => { return Err(Error::Message(format!( @@ -565,13 +517,9 @@ where async fn handle_rpc(self: &Arc, message: &SyncRPC) -> Result { match message { SyncRPC::RootCkHash(range, h) => { - let root_ck = self.get_root_ck(*range)?; - let hash = hash_of(&root_ck)?; - if hash == *h { - Ok(SyncRPC::CkNoDifference) - } else { - Ok(SyncRPC::RootCkList(*range, root_ck)) - } + let (_root_ck_key, root_ck) = self.get_root_ck(*range)?; + let hash = hash_of::(&root_ck)?; + Ok(SyncRPC::RootCkDifferent(hash != *h)) } SyncRPC::GetNode(k) => { let node = self.merkle.read_node(&k)?; @@ -596,39 +544,31 @@ impl SyncTodo { self.todo.clear(); - let ring = system.ring.borrow().clone(); - let split_points = data.replication.split_points(&ring); + let partitions = data.replication.partitions(); - for i in 0..split_points.len() { - let begin: MerklePartition = { - let b = split_points[i]; - assert_eq!(b.as_slice()[2..], [0u8; 30][..]); - b.as_slice()[..2].try_into().unwrap() - }; + for i in 0..partitions.len() { + let begin = partitions[i].1; - let end: Option = if i + 1 < split_points.len() { - let e = split_points[i + 1]; - assert_eq!(e.as_slice()[2..], [0u8; 30][..]); - Some(e.as_slice()[..2].try_into().unwrap()) + let end = if i + 1 < partitions.len() { + partitions[i+1].1 } else { - None + [0xFFu8; 32].into() }; - let begin_hash = hash_of_merkle_partition(begin); - let end_hash = hash_of_merkle_partition_opt(end); - - let nodes = data.replication.write_nodes(&begin_hash); + let nodes = data.replication.write_nodes(&begin); let retain = nodes.contains(&my_id); if !retain { // Check if we have some data to send, otherwise skip - if data.store.range(begin_hash..end_hash).next().is_none() { + if data.store.range(begin..end).next().is_none() { continue; } } self.todo.push(TodoPartition { - range: PartitionRange { begin, end }, + partition: partitions[i].0, + begin, + end, retain, }); }