Simulate stuff moving around
This commit is contained in:
parent
5fe95ebae7
commit
49c25a1509
2 changed files with 110 additions and 68 deletions
175
script/simulate_ring.py
Normal file → Executable file
175
script/simulate_ring.py
Normal file → Executable file
|
@ -1,6 +1,11 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import hashlib
|
import hashlib
|
||||||
import bisect
|
import bisect
|
||||||
import xxhash
|
import xxhash
|
||||||
|
import numpy as np
|
||||||
|
|
||||||
|
REPLICATION_FACTOR = 3
|
||||||
|
|
||||||
def hash_str(s):
|
def hash_str(s):
|
||||||
xxh = xxhash.xxh64()
|
xxh = xxhash.xxh64()
|
||||||
|
@ -10,11 +15,11 @@ def hash_str(s):
|
||||||
def sha256_str(s):
|
def sha256_str(s):
|
||||||
return hashlib.sha256(s.encode('ascii')).hexdigest()
|
return hashlib.sha256(s.encode('ascii')).hexdigest()
|
||||||
|
|
||||||
def walk_ring_from_pos(tokens, dcs, start, rep):
|
def walk_ring_from_pos(tokens, dcs, start):
|
||||||
ret = []
|
ret = []
|
||||||
ret_dcs = set()
|
ret_dcs = set()
|
||||||
delta = 0
|
delta = 0
|
||||||
while len(ret) < rep:
|
while len(ret) < REPLICATION_FACTOR:
|
||||||
i = (start + delta) % len(tokens)
|
i = (start + delta) % len(tokens)
|
||||||
delta = delta + 1
|
delta = delta + 1
|
||||||
|
|
||||||
|
@ -26,6 +31,7 @@ def walk_ring_from_pos(tokens, dcs, start, rep):
|
||||||
ret.append(token_node)
|
ret.append(token_node)
|
||||||
return ret
|
return ret
|
||||||
|
|
||||||
|
"""
|
||||||
def count_tokens_per_node(tokens):
|
def count_tokens_per_node(tokens):
|
||||||
tokens_of_node = {}
|
tokens_of_node = {}
|
||||||
for _, _, token_node in tokens:
|
for _, _, token_node in tokens:
|
||||||
|
@ -34,7 +40,19 @@ def count_tokens_per_node(tokens):
|
||||||
tokens_of_node[token_node] += 1
|
tokens_of_node[token_node] += 1
|
||||||
print("#tokens per node:")
|
print("#tokens per node:")
|
||||||
for node, ntok in sorted(list(tokens_of_node.items())):
|
for node, ntok in sorted(list(tokens_of_node.items())):
|
||||||
print(node, ": ", ntok)
|
print("-", node, ": ", ntok)
|
||||||
|
"""
|
||||||
|
|
||||||
|
def count_partitions_per_node(ring_node_list):
|
||||||
|
tokens_of_node = {}
|
||||||
|
for nodelist in ring_node_list:
|
||||||
|
for node_id in nodelist:
|
||||||
|
if node_id not in tokens_of_node:
|
||||||
|
tokens_of_node[node_id] = 0
|
||||||
|
tokens_of_node[node_id] += 1
|
||||||
|
print("#partitions per node:")
|
||||||
|
for node, ntok in sorted(list(tokens_of_node.items())):
|
||||||
|
print("-", node, ": ", ntok)
|
||||||
|
|
||||||
|
|
||||||
def method1(nodes):
|
def method1(nodes):
|
||||||
|
@ -47,20 +65,19 @@ def method1(nodes):
|
||||||
tokens.append((token, dc, node_id))
|
tokens.append((token, dc, node_id))
|
||||||
tokens.sort(key=lambda tok: tok[0])
|
tokens.sort(key=lambda tok: tok[0])
|
||||||
|
|
||||||
#print(tokens)
|
|
||||||
count_tokens_per_node(tokens)
|
|
||||||
|
|
||||||
space_of_node = {}
|
space_of_node = {}
|
||||||
|
|
||||||
def walk_ring(v, rep):
|
def walk_ring(v):
|
||||||
i = bisect.bisect_left([tok for tok, _, _ in tokens], hash_str(v))
|
i = bisect.bisect_left([tok for tok, _, _ in tokens], hash_str(v))
|
||||||
return walk_ring_from_pos(tokens, dcs, i, rep)
|
return walk_ring_from_pos(tokens, dcs, i)
|
||||||
|
|
||||||
return walk_ring
|
ring_node_list = [walk_ring_from_pos(tokens, dcs, i) for i in range(len(tokens))]
|
||||||
|
|
||||||
|
return walk_ring, ring_node_list
|
||||||
|
|
||||||
|
|
||||||
def method2(nodes):
|
def method2(nodes):
|
||||||
partition_bits = 8
|
partition_bits = 10
|
||||||
partitions = list(range(2**partition_bits))
|
partitions = list(range(2**partition_bits))
|
||||||
def partition_node(i):
|
def partition_node(i):
|
||||||
h, hn, hndc = None, None, None
|
h, hn, hndc = None, None, None
|
||||||
|
@ -74,25 +91,26 @@ def method2(nodes):
|
||||||
return (i, hndc, hn)
|
return (i, hndc, hn)
|
||||||
|
|
||||||
partition_nodes = [partition_node(i) for i in partitions]
|
partition_nodes = [partition_node(i) for i in partitions]
|
||||||
count_tokens_per_node(partition_nodes)
|
|
||||||
|
|
||||||
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
||||||
|
|
||||||
|
|
||||||
def walk_ring(v, rep):
|
def walk_ring(v):
|
||||||
# xxh = xxhash.xxh32()
|
# xxh = xxhash.xxh32()
|
||||||
# xxh.update(v.encode('ascii'))
|
# xxh.update(v.encode('ascii'))
|
||||||
# vh = xxh.intdigest()
|
# vh = xxh.intdigest()
|
||||||
# i = vh % (2**partition_bits)
|
# i = vh % (2**partition_bits)
|
||||||
vh = hashlib.sha256(v.encode('ascii')).digest()
|
vh = hashlib.sha256(v.encode('ascii')).digest()
|
||||||
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
||||||
return walk_ring_from_pos(partition_nodes, dcs, i, rep)
|
return walk_ring_from_pos(partition_nodes, dcs, i)
|
||||||
|
|
||||||
return walk_ring
|
ring_node_list = [walk_ring_from_pos(partition_nodes, dcs, i) for i in range(len(partition_nodes))]
|
||||||
|
|
||||||
|
return walk_ring, ring_node_list
|
||||||
|
|
||||||
|
|
||||||
def method3(nodes):
|
def method3(nodes):
|
||||||
partition_bits = 8
|
partition_bits = 10
|
||||||
|
|
||||||
queues = []
|
queues = []
|
||||||
for (node_id, node_dc, n_tokens) in nodes:
|
for (node_id, node_dc, n_tokens) in nodes:
|
||||||
|
@ -119,27 +137,27 @@ def method3(nodes):
|
||||||
queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:])
|
queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:])
|
||||||
break
|
break
|
||||||
|
|
||||||
count_tokens_per_node(partitions)
|
|
||||||
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
||||||
|
|
||||||
def walk_ring(v, rep):
|
def walk_ring(v):
|
||||||
vh = hashlib.sha256(v.encode('ascii')).digest()
|
vh = hashlib.sha256(v.encode('ascii')).digest()
|
||||||
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
||||||
return walk_ring_from_pos(partitions, dcs, i, rep)
|
return walk_ring_from_pos(partitions, dcs, i)
|
||||||
|
|
||||||
return walk_ring
|
ring_node_list = [walk_ring_from_pos(partitions, dcs, i) for i in range(len(partitions))]
|
||||||
|
|
||||||
|
return walk_ring, ring_node_list
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
def method4(nodes):
|
def method4(nodes):
|
||||||
partition_bits = 8
|
partition_bits = 10
|
||||||
max_replicas = 3
|
|
||||||
|
|
||||||
partitions = [[] for _ in range(2**partition_bits)]
|
partitions = [[] for _ in range(2**partition_bits)]
|
||||||
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
||||||
|
|
||||||
# Maglev, improved for several replicas on several DCs
|
# Maglev, improved for several replicas on several DCs
|
||||||
for ri in range(max_replicas):
|
for ri in range(REPLICATION_FACTOR):
|
||||||
queues = []
|
queues = []
|
||||||
for (node_id, node_dc, n_tokens) in nodes:
|
for (node_id, node_dc, n_tokens) in nodes:
|
||||||
que = [(i, hash_str(f"{node_id} {i}")) for i in range(2**partition_bits)]
|
que = [(i, hash_str(f"{node_id} {i}")) for i in range(2**partition_bits)]
|
||||||
|
@ -167,36 +185,58 @@ def method4(nodes):
|
||||||
queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:])
|
queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:])
|
||||||
break
|
break
|
||||||
|
|
||||||
# count
|
def walk_ring(v):
|
||||||
tokens_of_node = {}
|
|
||||||
for nodelist in partitions:
|
|
||||||
for node_dc, node_id in nodelist:
|
|
||||||
if node_id not in tokens_of_node:
|
|
||||||
tokens_of_node[node_id] = 0
|
|
||||||
tokens_of_node[node_id] += 1
|
|
||||||
print("#tokens per node:")
|
|
||||||
for node, ntok in sorted(list(tokens_of_node.items())):
|
|
||||||
print(node, ": ", ntok)
|
|
||||||
|
|
||||||
def walk_ring(v, rep):
|
|
||||||
vh = hashlib.sha256(v.encode('ascii')).digest()
|
vh = hashlib.sha256(v.encode('ascii')).digest()
|
||||||
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
||||||
assert len(set([node_dc for node_dc, _ in partitions[i]])) == min(max_replicas, len(dcs))
|
assert len(set([node_dc for node_dc, _ in partitions[i]])) == min(REPLICATION_FACTOR, len(dcs))
|
||||||
return [node_id for _, node_id in partitions[i]]
|
return [node_id for _, node_id in partitions[i]]
|
||||||
|
|
||||||
return walk_ring
|
ring_node_list = [[node_id for _, node_id in p] for p in partitions]
|
||||||
|
|
||||||
def evaluate_method(walk_ring):
|
return walk_ring, ring_node_list
|
||||||
|
|
||||||
|
def evaluate_method(method, nodes):
|
||||||
|
walk_ring, ring_node_list = method(nodes)
|
||||||
|
print("Ring length:", len(ring_node_list))
|
||||||
|
count_partitions_per_node(ring_node_list)
|
||||||
|
|
||||||
|
print("Number of data items per node (100000 simulation):")
|
||||||
node_data_counts = {}
|
node_data_counts = {}
|
||||||
for i in range(100000):
|
for i in range(100000):
|
||||||
nodes = walk_ring(f"{i}", 3)
|
inodes = walk_ring(f"{i}")
|
||||||
for n in nodes:
|
for n in inodes:
|
||||||
if n not in node_data_counts:
|
if n not in node_data_counts:
|
||||||
node_data_counts[n] = 0
|
node_data_counts[n] = 0
|
||||||
node_data_counts[n] += 1
|
node_data_counts[n] += 1
|
||||||
print("Number of data items per node:")
|
|
||||||
for n, v in sorted(list(node_data_counts.items())):
|
for n, v in sorted(list(node_data_counts.items())):
|
||||||
print(n, ": ", v)
|
print("-", n, ": ", v)
|
||||||
|
|
||||||
|
dclist_per_ntok = {}
|
||||||
|
for node_id, _, ntok in nodes:
|
||||||
|
if ntok not in dclist_per_ntok:
|
||||||
|
dclist_per_ntok[ntok] = []
|
||||||
|
dclist_per_ntok[ntok].append(node_data_counts[node_id])
|
||||||
|
list_normalized = []
|
||||||
|
for ntok, dclist in dclist_per_ntok.items():
|
||||||
|
avg = sum(dclist)/len(dclist)
|
||||||
|
for v in dclist:
|
||||||
|
list_normalized.append(v / avg)
|
||||||
|
print("variance wrt. same-ntok mean:", "%.2f%%"%(np.var(list_normalized)*100))
|
||||||
|
|
||||||
|
num_changes_sum = [0, 0, 0, 0]
|
||||||
|
for n in nodes:
|
||||||
|
nodes2 = [nn for nn in nodes if nn != n]
|
||||||
|
_, ring_node_list_2 = method(nodes2)
|
||||||
|
if len(ring_node_list_2) != len(ring_node_list):
|
||||||
|
continue
|
||||||
|
num_changes = [0, 0, 0, 0]
|
||||||
|
for (ns1, ns2) in zip(ring_node_list, ring_node_list_2):
|
||||||
|
changes = sum(1 for x in ns1 if x not in ns2)
|
||||||
|
num_changes[changes] += 1
|
||||||
|
for i, v in enumerate(num_changes):
|
||||||
|
num_changes_sum[i] += v / len(ring_node_list)
|
||||||
|
print("removing", n[1], n[0], ":", " ".join(["%.2f%%"%(x*100/len(ring_node_list)) for x in num_changes]))
|
||||||
|
print("1-node removal: partitions moved on 0/1/2/3 nodes: ", " ".join(["%.2f%%"%(x*100/len(nodes)) for x in num_changes_sum]))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
|
@ -206,46 +246,45 @@ if __name__ == "__main__":
|
||||||
('drosera', 'atuin', 64),
|
('drosera', 'atuin', 64),
|
||||||
('datura', 'atuin', 64),
|
('datura', 'atuin', 64),
|
||||||
('io', 'jupiter', 128)]
|
('io', 'jupiter', 128)]
|
||||||
method1_walk_ring = method1(nodes)
|
nodes2 = [('digitale', 'atuin', 64),
|
||||||
evaluate_method(method1_walk_ring)
|
('drosera', 'atuin', 64),
|
||||||
|
('datura', 'atuin', 64),
|
||||||
|
('io', 'jupiter', 128),
|
||||||
|
('isou', 'jupiter', 64),
|
||||||
|
('mini', 'grog', 32),
|
||||||
|
('mixi', 'grog', 32),
|
||||||
|
('moxi', 'grog', 32),
|
||||||
|
('modi', 'grog', 32),
|
||||||
|
('geant', 'grisou', 128),
|
||||||
|
('gipsie', 'grisou', 128),
|
||||||
|
]
|
||||||
|
evaluate_method(method1, nodes2)
|
||||||
|
|
||||||
print("------")
|
print("------")
|
||||||
print("method 2 (custom ring)")
|
print("method 2 (custom ring)")
|
||||||
nodes = [('digitale', 'atuin', 10),
|
|
||||||
('drosera', 'atuin', 10),
|
|
||||||
('datura', 'atuin', 10),
|
|
||||||
('io', 'jupiter', 20)]
|
|
||||||
method2_walk_ring = method2(nodes)
|
|
||||||
evaluate_method(method2_walk_ring)
|
|
||||||
|
|
||||||
print("------")
|
|
||||||
print("method 3 (maglev)")
|
|
||||||
nodes = [('digitale', 'atuin', 4),
|
nodes = [('digitale', 'atuin', 4),
|
||||||
('drosera', 'atuin', 4),
|
('drosera', 'atuin', 4),
|
||||||
('datura', 'atuin', 4),
|
('datura', 'atuin', 4),
|
||||||
('io', 'jupiter', 8),
|
('io', 'jupiter', 8)]
|
||||||
#('mini', 'grog', 2),
|
nodes2 = [('digitale', 'atuin', 8),
|
||||||
#('mixi', 'grog', 2),
|
|
||||||
#('moxi', 'grog', 2),
|
|
||||||
#('modi', 'grog', 2),
|
|
||||||
]
|
|
||||||
method3_walk_ring = method3(nodes)
|
|
||||||
evaluate_method(method3_walk_ring)
|
|
||||||
|
|
||||||
|
|
||||||
print("------")
|
|
||||||
print("method 4 (maglev, multi-dc twist)")
|
|
||||||
nodes = [('digitale', 'atuin', 8),
|
|
||||||
('drosera', 'atuin', 8),
|
('drosera', 'atuin', 8),
|
||||||
('datura', 'atuin', 8),
|
('datura', 'atuin', 8),
|
||||||
('io', 'jupiter', 16),
|
('io', 'jupiter', 16),
|
||||||
|
('isou', 'jupiter', 8),
|
||||||
('mini', 'grog', 4),
|
('mini', 'grog', 4),
|
||||||
('mixi', 'grog', 4),
|
('mixi', 'grog', 4),
|
||||||
('moxi', 'grog', 4),
|
('moxi', 'grog', 4),
|
||||||
('modi', 'grog', 4),
|
('modi', 'grog', 4),
|
||||||
('geant', 'grisou', 16),
|
('geant', 'grisou', 16),
|
||||||
('gipsie', 'grisou', 16),
|
('gipsie', 'grisou', 16),
|
||||||
#('isou', 'jupiter', 8),
|
|
||||||
]
|
]
|
||||||
method4_walk_ring = method4(nodes)
|
evaluate_method(method2, nodes2)
|
||||||
evaluate_method(method4_walk_ring)
|
|
||||||
|
print("------")
|
||||||
|
print("method 3 (maglev)")
|
||||||
|
evaluate_method(method3, nodes2)
|
||||||
|
|
||||||
|
|
||||||
|
print("------")
|
||||||
|
print("method 4 (maglev, multi-dc twist)")
|
||||||
|
evaluate_method(method4, nodes2)
|
||||||
|
|
|
@ -426,6 +426,9 @@ impl BlockManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
|
fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver<bool>) -> BoxFuture<'a, Result<(), Error>> {
|
||||||
|
// Lists all blocks on disk and adds them to the resync queue.
|
||||||
|
// This allows us to find blocks we are storing but don't actually need,
|
||||||
|
// so that we can offload them if necessary and then delete them locally.
|
||||||
async move {
|
async move {
|
||||||
let mut ls_data_dir = fs::read_dir(path).await?;
|
let mut ls_data_dir = fs::read_dir(path).await?;
|
||||||
while let Some(data_dir_ent) = ls_data_dir.next().await {
|
while let Some(data_dir_ent) = ls_data_dir.next().await {
|
||||||
|
|
Loading…
Reference in a new issue