forked from Deuxfleurs/garage
Remove epidemic propagation for fully replicated stuff: write directly to all nodes
This commit is contained in:
parent
d7e148d302
commit
3882d5ba36
7 changed files with 71 additions and 94 deletions
|
@ -334,17 +334,28 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
if need_nodes.len() > 0 {
|
if need_nodes.len() > 0 {
|
||||||
trace!("Block {:?} needed by {} nodes, sending", hash, need_nodes.len());
|
trace!(
|
||||||
|
"Block {:?} needed by {} nodes, sending",
|
||||||
|
hash,
|
||||||
|
need_nodes.len()
|
||||||
|
);
|
||||||
|
|
||||||
let put_block_message = Arc::new(self.read_block(hash).await?);
|
let put_block_message = Arc::new(self.read_block(hash).await?);
|
||||||
let put_resps = join_all(need_nodes.iter().map(|to| {
|
let put_resps = join_all(need_nodes.iter().map(|to| {
|
||||||
self.rpc_client.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
|
self.rpc_client
|
||||||
})).await;
|
.call_arc(*to, put_block_message.clone(), BLOCK_RW_TIMEOUT)
|
||||||
|
}))
|
||||||
|
.await;
|
||||||
for resp in put_resps {
|
for resp in put_resps {
|
||||||
resp?;
|
resp?;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
trace!("Deleting block {:?}, offload finished ({} / {})", hash, need_nodes.len(), who.len());
|
trace!(
|
||||||
|
"Deleting block {:?}, offload finished ({} / {})",
|
||||||
|
hash,
|
||||||
|
need_nodes.len(),
|
||||||
|
who.len()
|
||||||
|
);
|
||||||
|
|
||||||
fs::remove_file(path).await?;
|
fs::remove_file(path).await?;
|
||||||
self.resync_queue.remove(&hash)?;
|
self.resync_queue.remove(&hash)?;
|
||||||
|
@ -391,7 +402,7 @@ impl BlockManager {
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&who[..],
|
&who[..],
|
||||||
Message::PutBlock(PutBlockMessage { hash, data }),
|
Message::PutBlock(PutBlockMessage { hash, data }),
|
||||||
RequestStrategy::with_quorum(self.replication.write_quorum())
|
RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
|
||||||
.with_timeout(BLOCK_RW_TIMEOUT),
|
.with_timeout(BLOCK_RW_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -420,12 +431,17 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
// 2. Repair blocks actually on disk
|
// 2. Repair blocks actually on disk
|
||||||
self.repair_aux_read_dir_rec(&self.data_dir, must_exit).await?;
|
self.repair_aux_read_dir_rec(&self.data_dir, must_exit)
|
||||||
|
.await?;
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
|
fn repair_aux_read_dir_rec<'a>(
|
||||||
|
&'a self,
|
||||||
|
path: &'a PathBuf,
|
||||||
|
must_exit: &'a watch::Receiver<bool>,
|
||||||
|
) -> BoxFuture<'a, Result<(), Error>> {
|
||||||
// Lists all blocks on disk and adds them to the resync queue.
|
// Lists all blocks on disk and adds them to the resync queue.
|
||||||
// This allows us to find blocks we are storing but don't actually need,
|
// This allows us to find blocks we are storing but don't actually need,
|
||||||
// so that we can offload them if necessary and then delete them locally.
|
// so that we can offload them if necessary and then delete them locally.
|
||||||
|
@ -441,7 +457,8 @@ impl BlockManager {
|
||||||
let ent_type = data_dir_ent.file_type().await?;
|
let ent_type = data_dir_ent.file_type().await?;
|
||||||
|
|
||||||
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
if name.len() == 2 && hex::decode(&name).is_ok() && ent_type.is_dir() {
|
||||||
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit).await?;
|
self.repair_aux_read_dir_rec(&data_dir_ent.path(), must_exit)
|
||||||
|
.await?;
|
||||||
} else if name.len() == 64 {
|
} else if name.len() == 64 {
|
||||||
let hash_bytes = match hex::decode(&name) {
|
let hash_bytes = match hex::decode(&name) {
|
||||||
Ok(h) => h,
|
Ok(h) => h,
|
||||||
|
@ -457,7 +474,8 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}.boxed()
|
}
|
||||||
|
.boxed()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -65,10 +65,7 @@ impl Garage {
|
||||||
read_quorum: (config.meta_replication_factor + 1) / 2,
|
read_quorum: (config.meta_replication_factor + 1) / 2,
|
||||||
};
|
};
|
||||||
|
|
||||||
let control_rep_param = TableFullReplication::new(
|
let control_rep_param = TableFullReplication::new(config.control_write_max_faults);
|
||||||
config.meta_epidemic_fanout,
|
|
||||||
(config.meta_epidemic_fanout + 1) / 2,
|
|
||||||
);
|
|
||||||
|
|
||||||
info!("Initialize block manager...");
|
info!("Initialize block manager...");
|
||||||
let block_manager = BlockManager::new(
|
let block_manager = BlockManager::new(
|
||||||
|
|
|
@ -61,9 +61,8 @@ pub trait TableReplication: Send + Sync {
|
||||||
|
|
||||||
// Which nodes to send writes to
|
// Which nodes to send writes to
|
||||||
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
|
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID>;
|
||||||
fn write_quorum(&self) -> usize;
|
fn write_quorum(&self, system: &System) -> usize;
|
||||||
fn max_write_errors(&self) -> usize;
|
fn max_write_errors(&self) -> usize;
|
||||||
fn epidemic_writes(&self) -> bool;
|
|
||||||
|
|
||||||
// Which are the nodes that do actually replicate the data
|
// Which are the nodes that do actually replicate the data
|
||||||
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
|
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID>;
|
||||||
|
@ -119,7 +118,7 @@ where
|
||||||
.try_call_many(
|
.try_call_many(
|
||||||
&who[..],
|
&who[..],
|
||||||
rpc,
|
rpc,
|
||||||
RequestStrategy::with_quorum(self.replication.write_quorum())
|
RequestStrategy::with_quorum(self.replication.write_quorum(&self.system))
|
||||||
.with_timeout(TABLE_RPC_TIMEOUT),
|
.with_timeout(TABLE_RPC_TIMEOUT),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
@ -382,7 +381,6 @@ where
|
||||||
|
|
||||||
pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
|
pub async fn handle_update(self: &Arc<Self>, entries: &[Arc<ByteBuf>]) -> Result<(), Error> {
|
||||||
let syncer = self.syncer.load_full().unwrap();
|
let syncer = self.syncer.load_full().unwrap();
|
||||||
let mut epidemic_propagate = vec![];
|
|
||||||
|
|
||||||
for update_bytes in entries.iter() {
|
for update_bytes in entries.iter() {
|
||||||
let update = self.decode_entry(update_bytes.as_slice())?;
|
let update = self.decode_entry(update_bytes.as_slice())?;
|
||||||
|
@ -410,22 +408,11 @@ where
|
||||||
})?;
|
})?;
|
||||||
|
|
||||||
if old_entry.as_ref() != Some(&new_entry) {
|
if old_entry.as_ref() != Some(&new_entry) {
|
||||||
if self.replication.epidemic_writes() {
|
|
||||||
epidemic_propagate.push(new_entry.clone());
|
|
||||||
}
|
|
||||||
|
|
||||||
self.instance.updated(old_entry, Some(new_entry));
|
self.instance.updated(old_entry, Some(new_entry));
|
||||||
syncer.invalidate(&tree_key[..]);
|
syncer.invalidate(&tree_key[..]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if epidemic_propagate.len() > 0 {
|
|
||||||
let self2 = self.clone();
|
|
||||||
self.system
|
|
||||||
.background
|
|
||||||
.spawn_cancellable(async move { self2.insert_many(&epidemic_propagate[..]).await });
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,3 @@
|
||||||
use arc_swap::ArcSwapOption;
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
|
|
||||||
use garage_rpc::membership::System;
|
use garage_rpc::membership::System;
|
||||||
|
@ -9,10 +8,7 @@ use crate::*;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct TableFullReplication {
|
pub struct TableFullReplication {
|
||||||
pub write_factor: usize,
|
pub max_faults: usize,
|
||||||
pub write_quorum: usize,
|
|
||||||
|
|
||||||
neighbors: ArcSwapOption<Neighbors>,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
|
@ -22,45 +18,8 @@ struct Neighbors {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableFullReplication {
|
impl TableFullReplication {
|
||||||
pub fn new(write_factor: usize, write_quorum: usize) -> Self {
|
pub fn new(max_faults: usize) -> Self {
|
||||||
TableFullReplication {
|
TableFullReplication { max_faults }
|
||||||
write_factor,
|
|
||||||
write_quorum,
|
|
||||||
neighbors: ArcSwapOption::from(None),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_neighbors(&self, system: &System) -> Vec<UUID> {
|
|
||||||
let neighbors = self.neighbors.load_full();
|
|
||||||
if let Some(n) = neighbors {
|
|
||||||
if Arc::ptr_eq(&n.ring, &system.ring.borrow()) {
|
|
||||||
return n.neighbors.clone();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Recalculate neighbors
|
|
||||||
let ring = system.ring.borrow().clone();
|
|
||||||
let my_id = system.id;
|
|
||||||
|
|
||||||
let mut nodes = vec![];
|
|
||||||
for (node, _) in ring.config.members.iter() {
|
|
||||||
let node_ranking = fasthash(&[node.as_slice(), my_id.as_slice()].concat());
|
|
||||||
nodes.push((*node, node_ranking));
|
|
||||||
}
|
|
||||||
nodes.sort_by(|(_, rank1), (_, rank2)| rank1.cmp(rank2));
|
|
||||||
let mut neighbors = nodes
|
|
||||||
.drain(..)
|
|
||||||
.map(|(node, _)| node)
|
|
||||||
.filter(|node| *node != my_id)
|
|
||||||
.take(self.write_factor)
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
|
|
||||||
neighbors.push(my_id);
|
|
||||||
self.neighbors.swap(Some(Arc::new(Neighbors {
|
|
||||||
ring,
|
|
||||||
neighbors: neighbors.clone(),
|
|
||||||
})));
|
|
||||||
neighbors
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -78,17 +37,14 @@ impl TableReplication for TableFullReplication {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_nodes(&self, _hash: &Hash, system: &System) -> Vec<UUID> {
|
fn write_nodes(&self, hash: &Hash, system: &System) -> Vec<UUID> {
|
||||||
self.get_neighbors(system)
|
self.replication_nodes(hash, system.ring.borrow().as_ref())
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self, system: &System) -> usize {
|
||||||
self.write_quorum
|
system.ring.borrow().config.members.len() - self.max_faults
|
||||||
}
|
}
|
||||||
fn max_write_errors(&self) -> usize {
|
fn max_write_errors(&self) -> usize {
|
||||||
self.write_factor - self.write_quorum
|
self.max_faults
|
||||||
}
|
|
||||||
fn epidemic_writes(&self) -> bool {
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
|
fn replication_nodes(&self, _hash: &Hash, ring: &Ring) -> Vec<UUID> {
|
||||||
|
|
|
@ -31,15 +31,12 @@ impl TableReplication for TableShardedReplication {
|
||||||
let ring = system.ring.borrow().clone();
|
let ring = system.ring.borrow().clone();
|
||||||
ring.walk_ring(&hash, self.replication_factor)
|
ring.walk_ring(&hash, self.replication_factor)
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self, _system: &System) -> usize {
|
||||||
self.write_quorum
|
self.write_quorum
|
||||||
}
|
}
|
||||||
fn max_write_errors(&self) -> usize {
|
fn max_write_errors(&self) -> usize {
|
||||||
self.replication_factor - self.write_quorum
|
self.replication_factor - self.write_quorum
|
||||||
}
|
}
|
||||||
fn epidemic_writes(&self) -> bool {
|
|
||||||
false
|
|
||||||
}
|
|
||||||
|
|
||||||
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
|
fn replication_nodes(&self, hash: &Hash, ring: &Ring) -> Vec<UUID> {
|
||||||
ring.walk_ring(&hash, self.replication_factor)
|
ring.walk_ring(&hash, self.replication_factor)
|
||||||
|
|
|
@ -319,7 +319,13 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
counter += 1;
|
counter += 1;
|
||||||
debug!("Offloading {} items from {:?}..{:?} ({})", items.len(), begin, end, counter);
|
debug!(
|
||||||
|
"Offloading {} items from {:?}..{:?} ({})",
|
||||||
|
items.len(),
|
||||||
|
begin,
|
||||||
|
end,
|
||||||
|
counter
|
||||||
|
);
|
||||||
self.offload_items(&items, &nodes[..]).await?;
|
self.offload_items(&items, &nodes[..]).await?;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
|
@ -408,7 +414,11 @@ where
|
||||||
.iter()
|
.iter()
|
||||||
.all(|x| *x == 0u8)
|
.all(|x| *x == 0u8)
|
||||||
{
|
{
|
||||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
trace!(
|
||||||
|
"range_checksum {:?} returning {} items",
|
||||||
|
range,
|
||||||
|
children.len()
|
||||||
|
);
|
||||||
return Ok(RangeChecksum {
|
return Ok(RangeChecksum {
|
||||||
bounds: range.clone(),
|
bounds: range.clone(),
|
||||||
children,
|
children,
|
||||||
|
@ -423,7 +433,11 @@ where
|
||||||
};
|
};
|
||||||
children.push((item_range, blake2sum(&value[..])));
|
children.push((item_range, blake2sum(&value[..])));
|
||||||
}
|
}
|
||||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
trace!(
|
||||||
|
"range_checksum {:?} returning {} items",
|
||||||
|
range,
|
||||||
|
children.len()
|
||||||
|
);
|
||||||
Ok(RangeChecksum {
|
Ok(RangeChecksum {
|
||||||
bounds: range.clone(),
|
bounds: range.clone(),
|
||||||
children,
|
children,
|
||||||
|
@ -449,7 +463,11 @@ where
|
||||||
}
|
}
|
||||||
|
|
||||||
if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
|
if sub_ck.found_limit.is_none() || sub_ck.hash.is_none() {
|
||||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
trace!(
|
||||||
|
"range_checksum {:?} returning {} items",
|
||||||
|
range,
|
||||||
|
children.len()
|
||||||
|
);
|
||||||
return Ok(RangeChecksum {
|
return Ok(RangeChecksum {
|
||||||
bounds: range.clone(),
|
bounds: range.clone(),
|
||||||
children,
|
children,
|
||||||
|
@ -464,7 +482,11 @@ where
|
||||||
.iter()
|
.iter()
|
||||||
.all(|x| *x == 0u8)
|
.all(|x| *x == 0u8)
|
||||||
{
|
{
|
||||||
trace!("range_checksum {:?} returning {} items", range, children.len());
|
trace!(
|
||||||
|
"range_checksum {:?} returning {} items",
|
||||||
|
range,
|
||||||
|
children.len()
|
||||||
|
);
|
||||||
return Ok(RangeChecksum {
|
return Ok(RangeChecksum {
|
||||||
bounds: range.clone(),
|
bounds: range.clone(),
|
||||||
children,
|
children,
|
||||||
|
|
|
@ -23,12 +23,12 @@ pub struct Config {
|
||||||
#[serde(default = "default_block_size")]
|
#[serde(default = "default_block_size")]
|
||||||
pub block_size: usize,
|
pub block_size: usize,
|
||||||
|
|
||||||
|
#[serde(default = "default_control_write_max_faults")]
|
||||||
|
pub control_write_max_faults: usize,
|
||||||
|
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub meta_replication_factor: usize,
|
pub meta_replication_factor: usize,
|
||||||
|
|
||||||
#[serde(default = "default_epidemic_fanout")]
|
|
||||||
pub meta_epidemic_fanout: usize,
|
|
||||||
|
|
||||||
#[serde(default = "default_replication_factor")]
|
#[serde(default = "default_replication_factor")]
|
||||||
pub data_replication_factor: usize,
|
pub data_replication_factor: usize,
|
||||||
|
|
||||||
|
@ -68,8 +68,8 @@ fn default_block_size() -> usize {
|
||||||
fn default_replication_factor() -> usize {
|
fn default_replication_factor() -> usize {
|
||||||
3
|
3
|
||||||
}
|
}
|
||||||
fn default_epidemic_fanout() -> usize {
|
fn default_control_write_max_faults() -> usize {
|
||||||
3
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
pub fn read_config(config_file: PathBuf) -> Result<Config, Error> {
|
||||||
|
|
Loading…
Reference in a new issue