diff --git a/src/model/block.rs b/src/model/block.rs index 056d9098..56c85c6a 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -334,17 +334,28 @@ impl BlockManager { } if need_nodes.len() > 0 { - trace!("Block {:?} needed by {} nodes, sending", hash, need_nodes.len()); + trace!( + "Block {:?} needed by {} nodes, sending", + hash, + need_nodes.len() + ); let put_block_message = Arc::new(self.read_block(hash).await?); let put_resps = join_all(need_nodes.iter().map(|to| { - self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) - })).await; + self.rpc_client + .call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT) + })) + .await; for resp in put_resps { resp?; } } - trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len()); + trace!( + "Deleting block {:?}, offload finished ({} / {})", + hash, + need_nodes.len(), + who.len() + ); fs::remove_file(path).await?; self.resync_queue.remove(&hash)?; @@ -391,7 +402,7 @@ impl BlockManager { .try_call_many( &who[..], Message::PutBlock(PutBlockMessage { hash, data }), - RequestStrategy::with_quorum(self.replication.write_quorum()) + RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) .with_timeout(BLOCK_RW_TIMEOUT), ) .await?; @@ -420,12 +431,17 @@ impl BlockManager { } // 2. Repair blocks actually on disk - self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?; + self.repair_aux_read_dir_rec(&self.data_dir, must_exit) + .await?; Ok(()) } - fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver) -> BoxFuture<'a, Result<(), Error>> { + fn repair_aux_read_dir_rec<'a>( + &'a self, + path: &'a PathBuf, + must_exit: &'a watch::Receiver, + ) -> BoxFuture<'a, Result<(), Error>> { // Lists all blocks on disk and adds them to the resync queue. // This allows us to find blocks we are storing but don't actually need, // so that we can offload them if necessary and then delete them locally. @@ -441,7 +457,8 @@ impl BlockManager { let ent_type = data_dir_ent.file_type().await?; if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() { - self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?; + self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit) + .await?; } else if name.len() == 64 { let hash_bytes = match hex::decode(&name) { Ok(h) => h, @@ -457,7 +474,8 @@ impl BlockManager { } } Ok(()) - }.boxed() + } + .boxed() } } diff --git a/src/model/garage.rs b/src/model/garage.rs index 46e0d02f..467d0aec 100644 --- a/src/model/garage.rs +++ b/src/model/garage.rs @@ -65,10 +65,7 @@ impl Garage { read_quorum: (config.meta_replication_factor + 1) / 2, }; - let control_rep_param = TableFullReplication::new( - config.meta_epidemic_fanout, - (config.meta_epidemic_fanout + 1) / 2, - ); + let control_rep_param = TableFullReplication::new(config.control_write_max_faults); info!("Initialize block manager..."); let block_manager = BlockManager::new( diff --git a/src/table/table.rs b/src/table/table.rs index 8b16173e..1f6b7d25 100644 --- a/src/table/table.rs +++ b/src/table/table.rs @@ -61,9 +61,8 @@ pub trait TableReplication: Send + Sync { // Which nodes to send writes to fn write_nodes(&self, hash: &Hash, system: &System) -> Vec; - fn write_quorum(&self) -> usize; + fn write_quorum(&self, system: &System) -> usize; fn max_write_errors(&self) -> usize; - fn epidemic_writes(&self) -> bool; // Which are the nodes that do actually replicate the data fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec; @@ -119,7 +118,7 @@ where .try_call_many( &who[..], rpc, - RequestStrategy::with_quorum(self.replication.write_quorum()) + RequestStrategy::with_quorum(self.replication.write_quorum(&self.system)) .with_timeout(TABLE_RPC_TIMEOUT), ) .await?; @@ -382,7 +381,6 @@ where pub async fn handle_update(self: &Arc, entries: &[Arc]) -> Result<(), Error> { let syncer = self.syncer.load_full().unwrap(); - let mut epidemic_propagate = vec![]; for update_bytes in entries.iter() { let update = self.decode_entry(update_bytes.as_slice())?; @@ -410,22 +408,11 @@ where })?; if old_entry.as_ref() != Some(&new_entry) { - if self.replication.epidemic_writes() { - epidemic_propagate.push(new_entry.clone()); - } - self.instance.updated(old_entry, Some(new_entry)); syncer.invalidate(&tree_key[..]); } } - if epidemic_propagate.len() > 0 { - let self2 = self.clone(); - self.system - .background - .spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await }); - } - Ok(()) } diff --git a/src/table/table_fullcopy.rs b/src/table/table_fullcopy.rs index 5dd0ebb1..c55879d8 100644 --- a/src/table/table_fullcopy.rs +++ b/src/table/table_fullcopy.rs @@ -1,4 +1,3 @@ -use arc_swap::ArcSwapOption; use std::sync::Arc; use garage_rpc::membership::System; @@ -9,10 +8,7 @@ use crate::*; #[derive(Clone)] pub struct TableFullReplication { - pub write_factor: usize, - pub write_quorum: usize, - - neighbors: ArcSwapOption, + pub max_faults: usize, } #[derive(Clone)] @@ -22,45 +18,8 @@ struct Neighbors { } impl TableFullReplication { - pub fn new(write_factor: usize, write_quorum: usize) -> Self { - TableFullReplication { - write_factor, - write_quorum, - neighbors: ArcSwapOption::from(None), - } - } - - fn get_neighbors(&self, system: &System) -> Vec { - let neighbors = self.neighbors.load_full(); - if let Some(n) = neighbors { - if Arc::ptr_eq(&n.ring, &system.ring.borrow()) { - return n.neighbors.clone(); - } - } - - // Recalculate neighbors - let ring = system.ring.borrow().clone(); - let my_id = system.id; - - let mut nodes = vec![]; - for (node, _) in ring.config.members.iter() { - let node_ranking = fasthash(&[node.as_slice(), my_id.as_slice()].concat()); - nodes.push((*node, node_ranking)); - } - nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2)); - let mut neighbors = nodes - .drain(..) - .map(|(node, _)| node) - .filter(|node| *node != my_id) - .take(self.write_factor) - .collect::>(); - - neighbors.push(my_id); - self.neighbors.swap(Some(Arc::new(Neighbors { - ring, - neighbors: neighbors.clone(), - }))); - neighbors + pub fn new(max_faults: usize) -> Self { + TableFullReplication { max_faults } } } @@ -78,17 +37,14 @@ impl TableReplication for TableFullReplication { 1 } - fn write_nodes(&self, _hash: &Hash, system: &System) -> Vec { - self.get_neighbors(system) + fn write_nodes(&self, hash: &Hash, system: &System) -> Vec { + self.replication_nodes(hash, system.ring.borrow().as_ref()) } - fn write_quorum(&self) -> usize { - self.write_quorum + fn write_quorum(&self, system: &System) -> usize { + system.ring.borrow().config.members.len() - self.max_faults } fn max_write_errors(&self) -> usize { - self.write_factor - self.write_quorum - } - fn epidemic_writes(&self) -> bool { - true + self.max_faults } fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec { diff --git a/src/table/table_sharded.rs b/src/table/table_sharded.rs index cbb1bc01..47bdfeaf 100644 --- a/src/table/table_sharded.rs +++ b/src/table/table_sharded.rs @@ -31,15 +31,12 @@ impl TableReplication for TableShardedReplication { let ring = system.ring.borrow().clone(); ring.walk_ring(&hash, self.replication_factor) } - fn write_quorum(&self) -> usize { + fn write_quorum(&self, _system: &System) -> usize { self.write_quorum } fn max_write_errors(&self) -> usize { self.replication_factor - self.write_quorum } - fn epidemic_writes(&self) -> bool { - false - } fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec { ring.walk_ring(&hash, self.replication_factor) diff --git a/src/table/table_sync.rs b/src/table/table_sync.rs index 2c984226..5fa6793b 100644 --- a/src/table/table_sync.rs +++ b/src/table/table_sync.rs @@ -319,7 +319,13 @@ where } counter += 1; - debug!("Offloading {} items from {:?}..{:?} ({})", items.len(), begin, end, counter); + debug!( + "Offloading {} items from {:?}..{:?} ({})", + items.len(), + begin, + end, + counter + ); self.offload_items(&items, &nodes[..]).await?; } else { break; @@ -408,7 +414,11 @@ where .iter() .all(|x| *x == 0u8) { - trace!("range_checksum {:?} returning {} items", range, children.len()); + trace!( + "range_checksum {:?} returning {} items", + range, + children.len() + ); return Ok(RangeChecksum { bounds: range.clone(), children, @@ -423,7 +433,11 @@ where }; children.push((item_range, blake2sum(&value[..]))); } - trace!("range_checksum {:?} returning {} items", range, children.len()); + trace!( + "range_checksum {:?} returning {} items", + range, + children.len() + ); Ok(RangeChecksum { bounds: range.clone(), children, @@ -449,7 +463,11 @@ where } if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() { - trace!("range_checksum {:?} returning {} items", range, children.len()); + trace!( + "range_checksum {:?} returning {} items", + range, + children.len() + ); return Ok(RangeChecksum { bounds: range.clone(), children, @@ -464,7 +482,11 @@ where .iter() .all(|x| *x == 0u8) { - trace!("range_checksum {:?} returning {} items", range, children.len()); + trace!( + "range_checksum {:?} returning {} items", + range, + children.len() + ); return Ok(RangeChecksum { bounds: range.clone(), children, diff --git a/src/util/config.rs b/src/util/config.rs index f4c841b7..cd65e009 100644 --- a/src/util/config.rs +++ b/src/util/config.rs @@ -23,12 +23,12 @@ pub struct Config { #[serde(default = "default_block_size")] pub block_size: usize, + #[serde(default = "default_control_write_max_faults")] + pub control_write_max_faults: usize, + #[serde(default = "default_replication_factor")] pub meta_replication_factor: usize, - #[serde(default = "default_epidemic_fanout")] - pub meta_epidemic_fanout: usize, - #[serde(default = "default_replication_factor")] pub data_replication_factor: usize, @@ -68,8 +68,8 @@ fn default_block_size() -> usize { fn default_replication_factor() -> usize { 3 } -fn default_epidemic_fanout() -> usize { - 3 +fn default_control_write_max_faults() -> usize { + 1 } pub fn read_config(config_file: PathBuf) -> Result {