forked from Deuxfleurs/garage
layout: prepare for write sets
This commit is contained in:
parent
866196750f
commit
3b361d2959
13 changed files with 64 additions and 48 deletions
|
@ -354,7 +354,8 @@ impl BlockManager {
|
||||||
|
|
||||||
/// Send block to nodes that should have it
|
/// Send block to nodes that should have it
|
||||||
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
|
pub async fn rpc_put_block(&self, hash: Hash, data: Bytes) -> Result<(), Error> {
|
||||||
let who = self.replication.write_nodes(&hash);
|
// TODO: use quorums among latest write set
|
||||||
|
let who = self.replication.storage_nodes(&hash);
|
||||||
|
|
||||||
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
let (header, bytes) = DataBlock::from_buffer(data, self.compression_level)
|
||||||
.await
|
.await
|
||||||
|
|
|
@ -377,7 +377,7 @@ impl BlockResyncManager {
|
||||||
info!("Resync block {:?}: offloading and deleting", hash);
|
info!("Resync block {:?}: offloading and deleting", hash);
|
||||||
let existing_path = existing_path.unwrap();
|
let existing_path = existing_path.unwrap();
|
||||||
|
|
||||||
let mut who = manager.replication.write_nodes(hash);
|
let mut who = manager.replication.storage_nodes(hash);
|
||||||
if who.len() < manager.replication.write_quorum() {
|
if who.len() < manager.replication.write_quorum() {
|
||||||
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
return Err(Error::Message("Not trying to offload block because we don't have a quorum of nodes to write to".to_string()));
|
||||||
}
|
}
|
||||||
|
|
|
@ -127,7 +127,7 @@ impl K2VRpcHandler {
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(&partition.hash());
|
.storage_nodes(&partition.hash());
|
||||||
who.sort();
|
who.sort();
|
||||||
|
|
||||||
self.system
|
self.system
|
||||||
|
@ -168,7 +168,7 @@ impl K2VRpcHandler {
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(&partition.hash());
|
.storage_nodes(&partition.hash());
|
||||||
who.sort();
|
who.sort();
|
||||||
|
|
||||||
call_list.entry(who).or_default().push(InsertedItem {
|
call_list.entry(who).or_default().push(InsertedItem {
|
||||||
|
@ -223,11 +223,12 @@ impl K2VRpcHandler {
|
||||||
},
|
},
|
||||||
sort_key,
|
sort_key,
|
||||||
};
|
};
|
||||||
|
// TODO figure this out with write sets, does it still work????
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(&poll_key.partition.hash());
|
.read_nodes(&poll_key.partition.hash());
|
||||||
|
|
||||||
let rpc = self.system.rpc_helper().try_call_many(
|
let rpc = self.system.rpc_helper().try_call_many(
|
||||||
&self.endpoint,
|
&self.endpoint,
|
||||||
|
@ -284,11 +285,12 @@ impl K2VRpcHandler {
|
||||||
seen.restrict(&range);
|
seen.restrict(&range);
|
||||||
|
|
||||||
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
|
// Prepare PollRange RPC to send to the storage nodes responsible for the parititon
|
||||||
|
// TODO figure this out with write sets, does it still work????
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.item_table
|
.item_table
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(&range.partition.hash());
|
.read_nodes(&range.partition.hash());
|
||||||
let quorum = self.item_table.data.replication.read_quorum();
|
let quorum = self.item_table.data.replication.read_quorum();
|
||||||
let msg = K2VRpc::PollRange {
|
let msg = K2VRpc::PollRange {
|
||||||
range,
|
range,
|
||||||
|
|
|
@ -98,13 +98,26 @@ impl LayoutHistory {
|
||||||
.find(|x| x.version == sync_min)
|
.find(|x| x.version == sync_min)
|
||||||
.or(self.versions.last())
|
.or(self.versions.last())
|
||||||
.unwrap();
|
.unwrap();
|
||||||
version.nodes_of(position, version.replication_factor)
|
version
|
||||||
|
.nodes_of(position, version.replication_factor)
|
||||||
|
.collect()
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn write_sets_of<'a>(&'a self, position: &'a Hash) -> impl Iterator<Item = Vec<Uuid>> + 'a {
|
pub fn write_sets_of(&self, position: &Hash) -> Vec<Vec<Uuid>> {
|
||||||
self.versions
|
self.versions
|
||||||
.iter()
|
.iter()
|
||||||
.map(move |x| x.nodes_of(position, x.replication_factor))
|
.map(|x| x.nodes_of(position, x.replication_factor).collect())
|
||||||
|
.collect()
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn storage_nodes_of(&self, position: &Hash) -> Vec<Uuid> {
|
||||||
|
let mut ret = vec![];
|
||||||
|
for version in self.versions.iter() {
|
||||||
|
ret.extend(version.nodes_of(position, version.replication_factor));
|
||||||
|
}
|
||||||
|
ret.sort();
|
||||||
|
ret.dedup();
|
||||||
|
ret
|
||||||
}
|
}
|
||||||
|
|
||||||
// ------------------ update tracking ---------------
|
// ------------------ update tracking ---------------
|
||||||
|
|
|
@ -107,25 +107,24 @@ impl LayoutVersion {
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Return the n servers in which data for this hash should be replicated
|
/// Return the n servers in which data for this hash should be replicated
|
||||||
pub fn nodes_of(&self, position: &Hash, n: usize) -> Vec<Uuid> {
|
pub fn nodes_of(&self, position: &Hash, n: usize) -> impl Iterator<Item = Uuid> + '_ {
|
||||||
assert_eq!(n, self.replication_factor);
|
assert_eq!(n, self.replication_factor);
|
||||||
|
|
||||||
let data = &self.ring_assignment_data;
|
let data = &self.ring_assignment_data;
|
||||||
|
|
||||||
if data.len() != self.replication_factor * (1 << PARTITION_BITS) {
|
let partition_nodes = if data.len() == self.replication_factor * (1 << PARTITION_BITS) {
|
||||||
|
let partition_idx = self.partition_of(position) as usize;
|
||||||
|
let partition_start = partition_idx * self.replication_factor;
|
||||||
|
let partition_end = (partition_idx + 1) * self.replication_factor;
|
||||||
|
&data[partition_start..partition_end]
|
||||||
|
} else {
|
||||||
warn!("Ring not yet ready, read/writes will be lost!");
|
warn!("Ring not yet ready, read/writes will be lost!");
|
||||||
return vec![];
|
&[]
|
||||||
}
|
};
|
||||||
|
|
||||||
let partition_idx = self.partition_of(position) as usize;
|
|
||||||
let partition_start = partition_idx * self.replication_factor;
|
|
||||||
let partition_end = (partition_idx + 1) * self.replication_factor;
|
|
||||||
let partition_nodes = &data[partition_start..partition_end];
|
|
||||||
|
|
||||||
partition_nodes
|
partition_nodes
|
||||||
.iter()
|
.iter()
|
||||||
.map(|i| self.node_id_vec[*i as usize])
|
.map(move |i| self.node_id_vec[*i as usize])
|
||||||
.collect::<Vec<_>>()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ===================== internal information extractors ======================
|
// ===================== internal information extractors ======================
|
||||||
|
|
|
@ -449,8 +449,7 @@ impl System {
|
||||||
.iter()
|
.iter()
|
||||||
.map(|(_, h)| {
|
.map(|(_, h)| {
|
||||||
let pn = layout.current().nodes_of(h, replication_factor);
|
let pn = layout.current().nodes_of(h, replication_factor);
|
||||||
pn.iter()
|
pn.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
|
||||||
.filter(|x| nodes.get(x).map(|n| n.is_up).unwrap_or(false))
|
|
||||||
.count()
|
.count()
|
||||||
})
|
})
|
||||||
.collect::<Vec<usize>>();
|
.collect::<Vec<usize>>();
|
||||||
|
|
|
@ -254,7 +254,8 @@ impl<F: TableSchema, R: TableReplication> TableData<F, R> {
|
||||||
// of the GC algorithm, as in all cases GC is suspended if
|
// of the GC algorithm, as in all cases GC is suspended if
|
||||||
// any node of the partition is unavailable.
|
// any node of the partition is unavailable.
|
||||||
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
|
let pk_hash = Hash::try_from(&tree_key[..32]).unwrap();
|
||||||
let nodes = self.replication.write_nodes(&pk_hash);
|
// TODO: this probably breaks when the layout changes
|
||||||
|
let nodes = self.replication.storage_nodes(&pk_hash);
|
||||||
if nodes.first() == Some(&self.system.id) {
|
if nodes.first() == Some(&self.system.id) {
|
||||||
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
|
GcTodoEntry::new(tree_key, new_bytes_hash).save(&self.gc_todo)?;
|
||||||
}
|
}
|
||||||
|
|
|
@ -152,7 +152,7 @@ impl<F: TableSchema, R: TableReplication> TableGc<F, R> {
|
||||||
let mut partitions = HashMap::new();
|
let mut partitions = HashMap::new();
|
||||||
for entry in entries {
|
for entry in entries {
|
||||||
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
|
let pkh = Hash::try_from(&entry.key[..32]).unwrap();
|
||||||
let mut nodes = self.data.replication.write_nodes(&pkh);
|
let mut nodes = self.data.replication.storage_nodes(&pkh);
|
||||||
nodes.retain(|x| *x != self.system.id);
|
nodes.retain(|x| *x != self.system.id);
|
||||||
nodes.sort();
|
nodes.sort();
|
||||||
|
|
||||||
|
|
|
@ -27,6 +27,11 @@ pub struct TableFullReplication {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableFullReplication {
|
impl TableReplication for TableFullReplication {
|
||||||
|
fn storage_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
||||||
|
let layout = self.system.cluster_layout();
|
||||||
|
layout.current().all_nodes().to_vec()
|
||||||
|
}
|
||||||
|
|
||||||
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
||||||
vec![self.system.id]
|
vec![self.system.id]
|
||||||
}
|
}
|
||||||
|
@ -34,8 +39,8 @@ impl TableReplication for TableFullReplication {
|
||||||
1
|
1
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_nodes(&self, _hash: &Hash) -> Vec<Uuid> {
|
fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
|
||||||
self.system.cluster_layout().current().all_nodes().to_vec()
|
vec![self.storage_nodes(hash)]
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
let nmembers = self.system.cluster_layout().current().all_nodes().len();
|
let nmembers = self.system.cluster_layout().current().all_nodes().len();
|
||||||
|
|
|
@ -6,21 +6,23 @@ pub trait TableReplication: Send + Sync + 'static {
|
||||||
// See examples in table_sharded.rs and table_fullcopy.rs
|
// See examples in table_sharded.rs and table_fullcopy.rs
|
||||||
// To understand various replication methods
|
// To understand various replication methods
|
||||||
|
|
||||||
|
/// The entire list of all nodes that store a partition
|
||||||
|
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid>;
|
||||||
|
|
||||||
/// Which nodes to send read requests to
|
/// Which nodes to send read requests to
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid>;
|
||||||
/// Responses needed to consider a read succesfull
|
/// Responses needed to consider a read succesfull
|
||||||
fn read_quorum(&self) -> usize;
|
fn read_quorum(&self) -> usize;
|
||||||
|
|
||||||
/// Which nodes to send writes to
|
/// Which nodes to send writes to
|
||||||
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid>;
|
fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>>;
|
||||||
/// Responses needed to consider a write succesfull
|
/// Responses needed to consider a write succesfull in each set
|
||||||
fn write_quorum(&self) -> usize;
|
fn write_quorum(&self) -> usize;
|
||||||
fn max_write_errors(&self) -> usize;
|
fn max_write_errors(&self) -> usize;
|
||||||
|
|
||||||
// Accessing partitions, for Merkle tree & sync
|
// Accessing partitions, for Merkle tree & sync
|
||||||
/// Get partition for data with given hash
|
/// Get partition for data with given hash
|
||||||
fn partition_of(&self, hash: &Hash) -> Partition;
|
fn partition_of(&self, hash: &Hash) -> Partition;
|
||||||
|
|
||||||
/// List of partitions and nodes to sync with in current layout
|
/// List of partitions and nodes to sync with in current layout
|
||||||
fn sync_partitions(&self) -> SyncPartitions;
|
fn sync_partitions(&self) -> SyncPartitions;
|
||||||
}
|
}
|
||||||
|
|
|
@ -25,21 +25,19 @@ pub struct TableShardedReplication {
|
||||||
}
|
}
|
||||||
|
|
||||||
impl TableReplication for TableShardedReplication {
|
impl TableReplication for TableShardedReplication {
|
||||||
|
fn storage_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
|
self.system.cluster_layout().storage_nodes_of(hash)
|
||||||
|
}
|
||||||
|
|
||||||
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn read_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
||||||
self.system
|
self.system.cluster_layout().read_nodes_of(hash)
|
||||||
.cluster_layout()
|
|
||||||
.current()
|
|
||||||
.nodes_of(hash, self.replication_factor)
|
|
||||||
}
|
}
|
||||||
fn read_quorum(&self) -> usize {
|
fn read_quorum(&self) -> usize {
|
||||||
self.read_quorum
|
self.read_quorum
|
||||||
}
|
}
|
||||||
|
|
||||||
fn write_nodes(&self, hash: &Hash) -> Vec<Uuid> {
|
fn write_sets(&self, hash: &Hash) -> Vec<Vec<Uuid>> {
|
||||||
self.system
|
self.system.cluster_layout().write_sets_of(hash)
|
||||||
.cluster_layout()
|
|
||||||
.current()
|
|
||||||
.nodes_of(hash, self.replication_factor)
|
|
||||||
}
|
}
|
||||||
fn write_quorum(&self) -> usize {
|
fn write_quorum(&self) -> usize {
|
||||||
self.write_quorum
|
self.write_quorum
|
||||||
|
@ -60,13 +58,7 @@ impl TableReplication for TableShardedReplication {
|
||||||
.current()
|
.current()
|
||||||
.partitions()
|
.partitions()
|
||||||
.map(|(partition, first_hash)| {
|
.map(|(partition, first_hash)| {
|
||||||
let mut storage_nodes = layout
|
let storage_nodes = layout.storage_nodes_of(&first_hash);
|
||||||
.write_sets_of(&first_hash)
|
|
||||||
.map(|x| x.into_iter())
|
|
||||||
.flatten()
|
|
||||||
.collect::<Vec<_>>();
|
|
||||||
storage_nodes.sort();
|
|
||||||
storage_nodes.dedup();
|
|
||||||
SyncPartition {
|
SyncPartition {
|
||||||
partition,
|
partition,
|
||||||
first_hash,
|
first_hash,
|
||||||
|
|
|
@ -176,7 +176,7 @@ impl<F: TableSchema, R: TableReplication> TableSyncer<F, R> {
|
||||||
let nodes = self
|
let nodes = self
|
||||||
.data
|
.data
|
||||||
.replication
|
.replication
|
||||||
.write_nodes(begin)
|
.storage_nodes(begin)
|
||||||
.into_iter()
|
.into_iter()
|
||||||
.collect::<Vec<_>>();
|
.collect::<Vec<_>>();
|
||||||
if nodes.contains(&self.system.id) {
|
if nodes.contains(&self.system.id) {
|
||||||
|
|
|
@ -119,7 +119,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
|
|
||||||
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
|
async fn insert_internal(&self, e: &F::E) -> Result<(), Error> {
|
||||||
let hash = e.partition_key().hash();
|
let hash = e.partition_key().hash();
|
||||||
let who = self.data.replication.write_nodes(&hash);
|
// TODO: use write sets
|
||||||
|
let who = self.data.replication.storage_nodes(&hash);
|
||||||
|
|
||||||
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
|
let e_enc = Arc::new(ByteBuf::from(e.encode()?));
|
||||||
let rpc = TableRpc::<F>::Update(vec![e_enc]);
|
let rpc = TableRpc::<F>::Update(vec![e_enc]);
|
||||||
|
@ -171,7 +172,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
for entry in entries.into_iter() {
|
for entry in entries.into_iter() {
|
||||||
let entry = entry.borrow();
|
let entry = entry.borrow();
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
let who = self.data.replication.write_nodes(&hash);
|
// TODO: use write sets
|
||||||
|
let who = self.data.replication.storage_nodes(&hash);
|
||||||
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
||||||
for node in who {
|
for node in who {
|
||||||
call_list.entry(node).or_default().push(e_enc.clone());
|
call_list.entry(node).or_default().push(e_enc.clone());
|
||||||
|
|
Loading…
Reference in a new issue