Garage v1.0 #683
1 changed files with 12 additions and 1 deletions
|
@ -196,6 +196,8 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
let hash = entry.partition_key().hash();
|
let hash = entry.partition_key().hash();
|
||||||
let mut write_sets = self.data.replication.write_sets(&hash);
|
let mut write_sets = self.data.replication.write_sets(&hash);
|
||||||
for set in write_sets.as_mut().iter_mut() {
|
for set in write_sets.as_mut().iter_mut() {
|
||||||
|
// Sort nodes in each write sets to merge write sets with same
|
||||||
|
// nodes but in possibly different orders
|
||||||
set.sort();
|
set.sort();
|
||||||
}
|
}
|
||||||
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
let e_enc = Arc::new(ByteBuf::from(entry.encode()?));
|
||||||
|
@ -220,7 +222,16 @@ impl<F: TableSchema, R: TableReplication> Table<F, R> {
|
||||||
for (write_sets, entry_enc) in entries_vec.iter() {
|
for (write_sets, entry_enc) in entries_vec.iter() {
|
||||||
for write_set in write_sets.as_ref().iter() {
|
for write_set in write_sets.as_ref().iter() {
|
||||||
for node in write_set.iter() {
|
for node in write_set.iter() {
|
||||||
call_list.entry(*node).or_default().push(entry_enc.clone())
|
let node_entries = call_list.entry(*node).or_default();
|
||||||
|
match node_entries.last() {
|
||||||
|
Some(x) if Arc::ptr_eq(x, entry_enc) => {
|
||||||
|
// skip if entry already in list to send to this node
|
||||||
|
// (could happen if node is in several write sets for this entry)
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
node_entries.push(entry_enc.clone());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in a new issue