From 1d9961e4118af0e26068e1d6c5c6c009a1292a88 Mon Sep 17 00:00:00 2001 From: Alex Auvolat Date: Tue, 16 Mar 2021 11:14:27 +0100 Subject: [PATCH] Simplify replication logic --- src/model/block.rs | 14 ++++++------ src/model/garage.rs | 7 +++++- src/rpc/ring.rs | 8 ++++++- src/table/gc.rs | 2 +- src/table/replication/fullcopy.rs | 33 +++++++++++------------------ src/table/replication/parameters.rs | 13 ++++++------ src/table/replication/sharded.rs | 20 ++++++++++------- src/table/sync.rs | 11 ++++------ src/table/table.rs | 10 ++++----- 9 files changed, 60 insertions(+), 58 deletions(-) diff --git a/src/model/block.rs b/src/model/block.rs index a3958866..41729685 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -319,10 +319,8 @@ impl BlockManager { if exists && !needed { trace!("Offloading block {:?}", hash); - let ring = self.system.ring.borrow().clone(); - - let mut who = self.replication.replication_nodes(&hash, &ring); - if who.len() < self.replication.write_quorum(&self.system) { + let mut who = self.replication.write_nodes(&hash); + if who.len() < self.replication.write_quorum() { return Err(Error::Message(format!("Not trying to offload block because we don't have a quorum of nodes to write to"))); } who.retain(|id| *id != self.system.id); @@ -367,7 +365,7 @@ impl BlockManager { ) .await?; } - trace!( + info!( "Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), @@ -391,7 +389,7 @@ impl BlockManager { } pub async fn rpc_get_block(&self, hash: &Hash) -> Result, Error> { - let who = self.replication.read_nodes(&hash, &self.system); + let who = self.replication.read_nodes(&hash); let resps = self .rpc_client .try_call_many( @@ -415,12 +413,12 @@ impl BlockManager { } pub async fn rpc_put_block(&self, hash: Hash, data: Vec) -> Result<(), Error> { - let who = self.replication.write_nodes(&hash, &self.system); + let who = self.replication.write_nodes(&hash); self.rpc_client .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) + RequestStrategy::with_quorum(self.replication.write_quorum()) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; diff --git a/src/model/garage.rs b/src/model/garage.rs index ced3c29e..5f7a67c9 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -54,18 +54,23 @@ impl Garage { ); let data_rep_param = TableShardedReplication { + system: system.clone(), replication_factor: config.data_replication_factor, write_quorum: (config.data_replication_factor + 1) / 2, read_quorum: 1, }; let meta_rep_param = TableShardedReplication { + system: system.clone(), replication_factor: config.meta_replication_factor, write_quorum: (config.meta_replication_factor + 1) / 2, read_quorum: (config.meta_replication_factor + 1) / 2, }; - let control_rep_param = TableFullReplication::new(config.control_write_max_faults); + let control_rep_param = TableFullReplication { + system: system.clone(), + max_faults: config.control_write_max_faults, + }; info!("Initialize block manager..."); let block_manager = BlockManager::new( diff --git a/src/rpc/ring.rs b/src/rpc/ring.rs index a89b730c..a0fdcf84 100644 --- a/src/rpc/ring.rs +++ b/src/rpc/ring.rs @@ -170,6 +170,11 @@ impl Ring { Self { config, ring } } + pub fn partition_of(&self, from: &Hash) -> u16 { + let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); + top >> (16 - PARTITION_BITS) + } + pub fn walk_ring(&self, from: &Hash, n: usize) -> Vec { if self.ring.len() != 1 << PARTITION_BITS { warn!("Ring not yet ready, read/writes will be lost!"); @@ -177,8 +182,9 @@ impl Ring { } let top = u16::from_be_bytes(from.as_slice()[0..2].try_into().unwrap()); - let partition_idx = (top >> (16 - PARTITION_BITS)) as usize; + assert_eq!(partition_idx, self.partition_of(from) as usize); + let partition = &self.ring[partition_idx]; let partition_top = diff --git a/src/table/gc.rs b/src/table/gc.rs index d37fdf35..061c5045 100644 --- a/src/table/gc.rs +++ b/src/table/gc.rs @@ -130,7 +130,7 @@ where let mut partitions = HashMap::new(); for (k, vhash, v) in entries { let pkh = Hash::try_from(&k[..32]).unwrap(); - let mut nodes = self.aux.replication.write_nodes(&pkh, &self.aux.system); + let mut nodes = self.aux.replication.write_nodes(&pkh); nodes.retain(|x| *x != self.aux.system.id); nodes.sort(); diff --git a/src/table/replication/fullcopy.rs b/src/table/replication/fullcopy.rs index a5faece9..aea8c1f3 100644 --- a/src/table/replication/fullcopy.rs +++ b/src/table/replication/fullcopy.rs @@ -8,21 +8,10 @@ use crate::replication::*; #[derive(Clone)] pub struct TableFullReplication { + pub system: Arc, pub max_faults: usize, } -#[derive(Clone)] -struct Neighbors { - ring: Arc, - neighbors: Vec, -} - -impl TableFullReplication { - pub fn new(max_faults: usize) -> Self { - TableFullReplication { max_faults } - } -} - impl TableReplication for TableFullReplication { // Full replication schema: all nodes store everything // Writes are disseminated in an epidemic manner in the network @@ -30,18 +19,23 @@ impl TableReplication for TableFullReplication { // Advantage: do all reads locally, extremely fast // Inconvenient: only suitable to reasonably small tables - fn read_nodes(&self, _hash: &Hash, system: &System) -> Vec { - vec![system.id] + fn partition_of(&self, _hash: &Hash) -> u16 { + 0u16 + } + + fn read_nodes(&self, _hash: &Hash) -> Vec { + vec![self.system.id] } fn read_quorum(&self) -> usize { 1 } - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { - self.replication_nodes(hash, system.ring.borrow().as_ref()) + fn write_nodes(&self, _hash: &Hash) -> Vec { + let ring = self.system.ring.borrow(); + ring.config.members.keys().cloned().collect::>() } - fn write_quorum(&self, system: &System) -> usize { - let nmembers = system.ring.borrow().config.members.len(); + fn write_quorum(&self) -> usize { + let nmembers = self.system.ring.borrow().config.members.len(); if nmembers > self.max_faults { nmembers - self.max_faults } else { @@ -52,9 +46,6 @@ impl TableReplication for TableFullReplication { self.max_faults } - fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec { - ring.config.members.keys().cloned().collect::>() - } fn split_points(&self, _ring: &Ring) -> Vec { let mut ret = vec![]; ret.push([0u8; 32].into()); diff --git a/src/table/replication/parameters.rs b/src/table/replication/parameters.rs index 4607b050..ace82bd9 100644 --- a/src/table/replication/parameters.rs +++ b/src/table/replication/parameters.rs @@ -1,4 +1,3 @@ -use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; @@ -7,16 +6,18 @@ pub trait TableReplication: Send + Sync { // See examples in table_sharded.rs and table_fullcopy.rs // To understand various replication methods + // Partition number of data item (for Merkle tree) + fn partition_of(&self, hash: &Hash) -> u16; + // Which nodes to send reads from - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec; + fn read_nodes(&self, hash: &Hash) -> Vec; fn read_quorum(&self) -> usize; // Which nodes to send writes to - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self, system: &System) -> usize; + fn write_nodes(&self, hash: &Hash) -> Vec; + fn write_quorum(&self) -> usize; fn max_write_errors(&self) -> usize; - // Which are the nodes that do actually replicate the data - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; + // Get partition boundaries fn split_points(&self, ring: &Ring) -> Vec; } diff --git a/src/table/replication/sharded.rs b/src/table/replication/sharded.rs index 886c7c08..966be31a 100644 --- a/src/table/replication/sharded.rs +++ b/src/table/replication/sharded.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use garage_rpc::membership::System; use garage_rpc::ring::Ring; use garage_util::data::*; @@ -6,6 +8,7 @@ use crate::replication::*; #[derive(Clone)] pub struct TableShardedReplication { + pub system: Arc, pub replication_factor: usize, pub read_quorum: usize, pub write_quorum: usize, @@ -19,28 +22,29 @@ impl TableReplication for TableShardedReplication { // - reads are done on all of the nodes that replicate the data // - writes as well - fn read_nodes(&self, hash: &Hash, system: &System) -> Vec { - let ring = system.ring.borrow().clone(); + fn partition_of(&self, hash: &Hash) -> u16 { + self.system.ring.borrow().partition_of(hash) + } + + fn read_nodes(&self, hash: &Hash) -> Vec { + let ring = self.system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) } fn read_quorum(&self) -> usize { self.read_quorum } - fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { - let ring = system.ring.borrow().clone(); + fn write_nodes(&self, hash: &Hash) -> Vec { + let ring = self.system.ring.borrow(); ring.walk_ring(&hash, self.replication_factor) } - fn write_quorum(&self, _system: &System) -> usize { + fn write_quorum(&self) -> usize { self.write_quorum } fn max_write_errors(&self) -> usize { self.replication_factor - self.write_quorum } - fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec { - ring.walk_ring(&hash, self.replication_factor) - } fn split_points(&self, ring: &Ring) -> Vec { let mut ret = vec![]; diff --git a/src/table/sync.rs b/src/table/sync.rs index f8fef53c..ac0305e2 100644 --- a/src/table/sync.rs +++ b/src/table/sync.rs @@ -218,10 +218,7 @@ where let nodes = self .aux .replication - .write_nodes( - &hash_of_merkle_partition(partition.range.begin), - &self.aux.system, - ) + .write_nodes(&hash_of_merkle_partition(partition.range.begin)) .into_iter() .filter(|node| *node != my_id) .collect::>(); @@ -293,7 +290,7 @@ where let nodes = self .aux .replication - .write_nodes(&begin, &self.aux.system) + .write_nodes(&begin) .into_iter() .collect::>(); if nodes.contains(&self.aux.system.id) { @@ -303,7 +300,7 @@ where ); break; } - if nodes.len() < self.aux.replication.write_quorum(&self.aux.system) { + if nodes.len() < self.aux.replication.write_quorum() { return Err(Error::Message(format!( "Not offloading as we don't have a quorum of nodes to write to." ))); @@ -616,7 +613,7 @@ impl SyncTodo { let begin_hash = hash_of_merkle_partition(begin); let end_hash = hash_of_merkle_partition_opt(end); - let nodes = aux.replication.replication_nodes(&begin_hash, &ring); + let nodes = aux.replication.write_nodes(&begin_hash); let retain = nodes.contains(&my_id); if !retain { diff --git a/src/table/table.rs b/src/table/table.rs index 2d3c5fe9..2ce5868f 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -91,7 +91,7 @@ where pub async fn insert(&self, e: &F::E) -> Result<(), Error> { let hash = e.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash, &self.aux.system); + let who = self.aux.replication.write_nodes(&hash); //eprintln!("insert who: {:?}", who); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(e)?)); @@ -101,7 +101,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.aux.replication.write_quorum(&self.aux.system)) + RequestStrategy::with_quorum(self.aux.replication.write_quorum()) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -113,7 +113,7 @@ where for entry in entries.iter() { let hash = entry.partition_key().hash(); - let who = self.aux.replication.write_nodes(&hash, &self.aux.system); + let who = self.aux.replication.write_nodes(&hash); let e_enc = Arc::new(ByteBuf::from(rmp_to_vec_all_named(entry)?)); for node in who { if !call_list.contains_key(&node) { @@ -150,7 +150,7 @@ where sort_key: &F::S, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash, &self.aux.system); + let who = self.aux.replication.read_nodes(&hash); //eprintln!("get who: {:?}", who); let rpc = TableRPC::::ReadEntry(partition_key.clone(), sort_key.clone()); @@ -207,7 +207,7 @@ where limit: usize, ) -> Result, Error> { let hash = partition_key.hash(); - let who = self.aux.replication.read_nodes(&hash, &self.aux.system); + let who = self.aux.replication.read_nodes(&hash); let rpc = TableRPC::::ReadRange(partition_key.clone(), begin_sort_key, filter, limit);