diff --git a/script/simulate_ring.py b/script/simulate_ring.py old mode 100644 new mode 100755 index 611f2c985..47d748faf --- a/script/simulate_ring.py +++ b/script/simulate_ring.py @@ -1,6 +1,11 @@ +#!/usr/bin/env python3 + import hashlib import bisect import xxhash +import numpy as np + +REPLICATION_FACTOR = 3 def hash_str(s): xxh = xxhash.xxh64() @@ -10,11 +15,11 @@ def hash_str(s): def sha256_str(s): return hashlib.sha256(s.encode('ascii')).hexdigest() -def walk_ring_from_pos(tokens, dcs, start, rep): +def walk_ring_from_pos(tokens, dcs, start): ret = [] ret_dcs = set() delta = 0 - while len(ret) < rep: + while len(ret) < REPLICATION_FACTOR: i = (start + delta) % len(tokens) delta = delta + 1 @@ -26,6 +31,7 @@ def walk_ring_from_pos(tokens, dcs, start, rep): ret.append(token_node) return ret +""" def count_tokens_per_node(tokens): tokens_of_node = {} for _, _, token_node in tokens: @@ -34,7 +40,19 @@ def count_tokens_per_node(tokens): tokens_of_node[token_node] += 1 print("#tokens per node:") for node, ntok in sorted(list(tokens_of_node.items())): - print(node, ": ", ntok) + print("-", node, ": ", ntok) +""" + +def count_partitions_per_node(ring_node_list): + tokens_of_node = {} + for nodelist in ring_node_list: + for node_id in nodelist: + if node_id not in tokens_of_node: + tokens_of_node[node_id] = 0 + tokens_of_node[node_id] += 1 + print("#partitions per node:") + for node, ntok in sorted(list(tokens_of_node.items())): + print("-", node, ": ", ntok) def method1(nodes): @@ -47,20 +65,19 @@ def method1(nodes): tokens.append((token, dc, node_id)) tokens.sort(key=lambda tok: tok[0]) - #print(tokens) - count_tokens_per_node(tokens) - space_of_node = {} - def walk_ring(v, rep): + def walk_ring(v): i = bisect.bisect_left([tok for tok, _, _ in tokens], hash_str(v)) - return walk_ring_from_pos(tokens, dcs, i, rep) + return walk_ring_from_pos(tokens, dcs, i) - return walk_ring + ring_node_list = [walk_ring_from_pos(tokens, dcs, i) for i in range(len(tokens))] + + return walk_ring, ring_node_list def method2(nodes): - partition_bits = 8 + partition_bits = 10 partitions = list(range(2**partition_bits)) def partition_node(i): h, hn, hndc = None, None, None @@ -74,25 +91,26 @@ def method2(nodes): return (i, hndc, hn) partition_nodes = [partition_node(i) for i in partitions] - count_tokens_per_node(partition_nodes) dcs = list(set(node_dc for _, node_dc, _ in nodes)) - def walk_ring(v, rep): + def walk_ring(v): # xxh = xxhash.xxh32() # xxh.update(v.encode('ascii')) # vh = xxh.intdigest() # i = vh % (2**partition_bits) vh = hashlib.sha256(v.encode('ascii')).digest() i = (vh[0]<<8 | vh[1]) % (2**partition_bits) - return walk_ring_from_pos(partition_nodes, dcs, i, rep) + return walk_ring_from_pos(partition_nodes, dcs, i) - return walk_ring + ring_node_list = [walk_ring_from_pos(partition_nodes, dcs, i) for i in range(len(partition_nodes))] + + return walk_ring, ring_node_list def method3(nodes): - partition_bits = 8 + partition_bits = 10 queues = [] for (node_id, node_dc, n_tokens) in nodes: @@ -119,27 +137,27 @@ def method3(nodes): queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:]) break - count_tokens_per_node(partitions) dcs = list(set(node_dc for _, node_dc, _ in nodes)) - def walk_ring(v, rep): + def walk_ring(v): vh = hashlib.sha256(v.encode('ascii')).digest() i = (vh[0]<<8 | vh[1]) % (2**partition_bits) - return walk_ring_from_pos(partitions, dcs, i, rep) + return walk_ring_from_pos(partitions, dcs, i) - return walk_ring + ring_node_list = [walk_ring_from_pos(partitions, dcs, i) for i in range(len(partitions))] + + return walk_ring, ring_node_list def method4(nodes): - partition_bits = 8 - max_replicas = 3 + partition_bits = 10 partitions = [[] for _ in range(2**partition_bits)] dcs = list(set(node_dc for _, node_dc, _ in nodes)) # Maglev, improved for several replicas on several DCs - for ri in range(max_replicas): + for ri in range(REPLICATION_FACTOR): queues = [] for (node_id, node_dc, n_tokens) in nodes: que = [(i, hash_str(f"{node_id} {i}")) for i in range(2**partition_bits)] @@ -167,36 +185,58 @@ def method4(nodes): queues[iq] = (node_id, node_dc, n_tokens, node_queue[qi+1:]) break - # count - tokens_of_node = {} - for nodelist in partitions: - for node_dc, node_id in nodelist: - if node_id not in tokens_of_node: - tokens_of_node[node_id] = 0 - tokens_of_node[node_id] += 1 - print("#tokens per node:") - for node, ntok in sorted(list(tokens_of_node.items())): - print(node, ": ", ntok) - - def walk_ring(v, rep): + def walk_ring(v): vh = hashlib.sha256(v.encode('ascii')).digest() i = (vh[0]<<8 | vh[1]) % (2**partition_bits) - assert len(set([node_dc for node_dc, _ in partitions[i]])) == min(max_replicas, len(dcs)) + assert len(set([node_dc for node_dc, _ in partitions[i]])) == min(REPLICATION_FACTOR, len(dcs)) return [node_id for _, node_id in partitions[i]] - return walk_ring + ring_node_list = [[node_id for _, node_id in p] for p in partitions] -def evaluate_method(walk_ring): + return walk_ring, ring_node_list + +def evaluate_method(method, nodes): + walk_ring, ring_node_list = method(nodes) + print("Ring length:", len(ring_node_list)) + count_partitions_per_node(ring_node_list) + + print("Number of data items per node (100000 simulation):") node_data_counts = {} for i in range(100000): - nodes = walk_ring(f"{i}", 3) - for n in nodes: + inodes = walk_ring(f"{i}") + for n in inodes: if n not in node_data_counts: node_data_counts[n] = 0 node_data_counts[n] += 1 - print("Number of data items per node:") for n, v in sorted(list(node_data_counts.items())): - print(n, ": ", v) + print("-", n, ": ", v) + + dclist_per_ntok = {} + for node_id, _, ntok in nodes: + if ntok not in dclist_per_ntok: + dclist_per_ntok[ntok] = [] + dclist_per_ntok[ntok].append(node_data_counts[node_id]) + list_normalized = [] + for ntok, dclist in dclist_per_ntok.items(): + avg = sum(dclist)/len(dclist) + for v in dclist: + list_normalized.append(v / avg) + print("variance wrt. same-ntok mean:", "%.2f%%"%(np.var(list_normalized)*100)) + + num_changes_sum = [0, 0, 0, 0] + for n in nodes: + nodes2 = [nn for nn in nodes if nn != n] + _, ring_node_list_2 = method(nodes2) + if len(ring_node_list_2) != len(ring_node_list): + continue + num_changes = [0, 0, 0, 0] + for (ns1, ns2) in zip(ring_node_list, ring_node_list_2): + changes = sum(1 for x in ns1 if x not in ns2) + num_changes[changes] += 1 + for i, v in enumerate(num_changes): + num_changes_sum[i] += v / len(ring_node_list) + print("removing", n[1], n[0], ":", " ".join(["%.2f%%"%(x*100/len(ring_node_list)) for x in num_changes])) + print("1-node removal: partitions moved on 0/1/2/3 nodes: ", " ".join(["%.2f%%"%(x*100/len(nodes)) for x in num_changes_sum])) if __name__ == "__main__": @@ -206,46 +246,45 @@ if __name__ == "__main__": ('drosera', 'atuin', 64), ('datura', 'atuin', 64), ('io', 'jupiter', 128)] - method1_walk_ring = method1(nodes) - evaluate_method(method1_walk_ring) + nodes2 = [('digitale', 'atuin', 64), + ('drosera', 'atuin', 64), + ('datura', 'atuin', 64), + ('io', 'jupiter', 128), + ('isou', 'jupiter', 64), + ('mini', 'grog', 32), + ('mixi', 'grog', 32), + ('moxi', 'grog', 32), + ('modi', 'grog', 32), + ('geant', 'grisou', 128), + ('gipsie', 'grisou', 128), + ] + evaluate_method(method1, nodes2) print("------") print("method 2 (custom ring)") - nodes = [('digitale', 'atuin', 10), - ('drosera', 'atuin', 10), - ('datura', 'atuin', 10), - ('io', 'jupiter', 20)] - method2_walk_ring = method2(nodes) - evaluate_method(method2_walk_ring) - - print("------") - print("method 3 (maglev)") nodes = [('digitale', 'atuin', 4), ('drosera', 'atuin', 4), ('datura', 'atuin', 4), - ('io', 'jupiter', 8), - #('mini', 'grog', 2), - #('mixi', 'grog', 2), - #('moxi', 'grog', 2), - #('modi', 'grog', 2), - ] - method3_walk_ring = method3(nodes) - evaluate_method(method3_walk_ring) - - - print("------") - print("method 4 (maglev, multi-dc twist)") - nodes = [('digitale', 'atuin', 8), + ('io', 'jupiter', 8)] + nodes2 = [('digitale', 'atuin', 8), ('drosera', 'atuin', 8), ('datura', 'atuin', 8), ('io', 'jupiter', 16), + ('isou', 'jupiter', 8), ('mini', 'grog', 4), ('mixi', 'grog', 4), ('moxi', 'grog', 4), ('modi', 'grog', 4), ('geant', 'grisou', 16), ('gipsie', 'grisou', 16), - #('isou', 'jupiter', 8), ] - method4_walk_ring = method4(nodes) - evaluate_method(method4_walk_ring) + evaluate_method(method2, nodes2) + + print("------") + print("method 3 (maglev)") + evaluate_method(method3, nodes2) + + + print("------") + print("method 4 (maglev, multi-dc twist)") + evaluate_method(method4, nodes2) diff --git a/src/model/block.rs b/src/model/block.rs index 0e1863cb3..056d9098c 100644 --- a/src/model/block.rs +++ b/src/model/block.rs @@ -426,6 +426,9 @@ impl BlockManager { } fn repair_aux_read_dir_rec<'a>(&'a self, path: &'a PathBuf, must_exit: &'a watch::Receiver) -> BoxFuture<'a, Result<(), Error>> { + // Lists all blocks on disk and adds them to the resync queue. + // This allows us to find blocks we are storing but don't actually need, + // so that we can offload them if necessary and then delete them locally. async move { let mut ls_data_dir = fs::read_dir(path).await?; while let Some(data_dir_ent) = ls_data_dir.next().await {