Add adapted version of maglev for multi-dc
All checks were successful
continuous-integration/drone/push Build is passing
All checks were successful
continuous-integration/drone/push Build is passing
This commit is contained in:
parent
24f924afdb
commit
1abbca37c4
1 changed files with 76 additions and 2 deletions
|
@ -60,7 +60,7 @@ def method1(nodes):
|
|||
|
||||
|
||||
def method2(nodes):
|
||||
partition_bits = 10
|
||||
partition_bits = 8
|
||||
partitions = list(range(2**partition_bits))
|
||||
def partition_node(i):
|
||||
h, hn, hndc = None, None, None
|
||||
|
@ -92,7 +92,7 @@ def method2(nodes):
|
|||
|
||||
|
||||
def method3(nodes):
|
||||
partition_bits = 10
|
||||
partition_bits = 8
|
||||
|
||||
queues = []
|
||||
for (node_id, node_dc, n_tokens) in nodes:
|
||||
|
@ -130,6 +130,62 @@ def method3(nodes):
|
|||
return walk_ring
|
||||
|
||||
|
||||
|
||||
def method4(nodes):
|
||||
partition_bits = 8
|
||||
max_replicas = 3
|
||||
|
||||
partitions = [[] for _ in range(2**partition_bits)]
|
||||
dcs = list(set(node_dc for _, node_dc, _ in nodes))
|
||||
|
||||
# Maglev, improved for several replicas on several DCs
|
||||
for ri in range(max_replicas):
|
||||
queues = []
|
||||
for (node_id, node_dc, n_tokens) in nodes:
|
||||
que = [(i, hash_str(f"{node_id} {i}")) for i in range(2**partition_bits)]
|
||||
que.sort(key=lambda x: x[1])
|
||||
que = [x[0] for x in que]
|
||||
queues.append((node_id, node_dc, n_tokens, que))
|
||||
|
||||
queues.sort(key=lambda x: hash_str("{} {}".format(ri, x[0])))
|
||||
|
||||
remaining = 2**partition_bits
|
||||
while remaining > 0:
|
||||
for toktok in range(100):
|
||||
for iq in range(len(queues)):
|
||||
node_id, node_dc, n_tokens, node_queue = queues[iq]
|
||||
if toktok >= n_tokens:
|
||||
continue
|
||||
for qi, qv in enumerate(node_queue):
|
||||
if len(partitions[qv]) != ri:
|
||||
continue
|
||||
p_dcs = set([x[0] for x in partitions[qv]])
|
||||
p_nodes = [x[1] for x in partitions[qv]]
|
||||
if node_dc not in p_dcs or (len(p_dcs) == len(dcs) and node_id not in p_nodes):
|
||||
partitions[qv].append((node_dc, node_id))
|
||||
remaining -= 1
|
||||
queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:])
|
||||
break
|
||||
|
||||
# count
|
||||
tokens_of_node = {}
|
||||
for nodelist in partitions:
|
||||
for node_dc, node_id in nodelist:
|
||||
if node_id not in tokens_of_node:
|
||||
tokens_of_node[node_id] = 0
|
||||
tokens_of_node[node_id] += 1
|
||||
print("#tokens per node:")
|
||||
for node, ntok in sorted(list(tokens_of_node.items())):
|
||||
print(node, ": ", ntok)
|
||||
|
||||
def walk_ring(v, rep):
|
||||
vh = hashlib.sha256(v.encode('ascii')).digest()
|
||||
i = (vh[0]<<8 | vh[1]) % (2**partition_bits)
|
||||
assert len(set([node_dc for node_dc, _ in partitions[i]])) == min(max_replicas, len(dcs))
|
||||
return [node_id for _, node_id in partitions[i]]
|
||||
|
||||
return walk_ring
|
||||
|
||||
def evaluate_method(walk_ring):
|
||||
node_data_counts = {}
|
||||
for i in range(100000):
|
||||
|
@ -175,3 +231,21 @@ if __name__ == "__main__":
|
|||
]
|
||||
method3_walk_ring = method3(nodes)
|
||||
evaluate_method(method3_walk_ring)
|
||||
|
||||
|
||||
print("------")
|
||||
print("method 4 (maglev, multi-dc twist)")
|
||||
nodes = [('digitale', 'atuin', 8),
|
||||
('drosera', 'atuin', 8),
|
||||
('datura', 'atuin', 8),
|
||||
('io', 'jupiter', 16),
|
||||
('mini', 'grog', 4),
|
||||
('mixi', 'grog', 4),
|
||||
('moxi', 'grog', 4),
|
||||
('modi', 'grog', 4),
|
||||
('geant', 'grisou', 16),
|
||||
('gipsie', 'grisou', 16),
|
||||
#('isou', 'jupiter', 8),
|
||||
]
|
||||
method4_walk_ring = method4(nodes)
|
||||
evaluate_method(method4_walk_ring)
|
||||
|
|
Loading…
Reference in a new issue