From c9cbe5fc5250d9df9ade95e01aab7ae420c33197 Mon Sep 17 00:00:00 2001 From: Quentin Dufour Date: Fri, 23 Sep 2022 17:55:45 +0200 Subject: [PATCH] Add some support for minio --- scenarios/fragments/flavor.py | 26 ++++++- scenarios/fragments/minio.py | 136 ++++++++++++++++++++++++++-------- scenarios/minio-manual | 12 +++ scenarios/requirements.txt | 1 + topo/1mbps.yml | 3 +- 5 files changed, 144 insertions(+), 34 deletions(-) create mode 100755 scenarios/minio-manual diff --git a/scenarios/fragments/flavor.py b/scenarios/fragments/flavor.py index eeb8345..87c4e9d 100644 --- a/scenarios/fragments/flavor.py +++ b/scenarios/fragments/flavor.py @@ -2,14 +2,20 @@ from pathlib import Path from . import shared from os.path import exists -def add_path(d): +def grg_path(d): for flav, desc in d.items(): if "path" in desc: continue binary = f"garage-{desc['target']}-{desc['version']}" desc['path'] = Path(shared.binary_path) / binary return d -garage = add_path({ +def minio_path(d): + for flav, desc in d.items(): + if "path" in desc: continue + desc['path'] = Path(shared.binary_path) / flav + return d + +garage = grg_path({ "garage-local": { "path": "./garage/target/release/garage" }, "garage-v0.7": { "version": "v0.7.3", @@ -23,6 +29,12 @@ warp = { "warp-default": "mixed" } +minio = minio_path({ + "minio-20220917": { + "version": "2022-09-17T00-09-45Z" + } +}) + def download(): for flav, desc in garage.items(): @@ -33,3 +45,13 @@ def download(): shared.exec(f"wget https://garagehq.deuxfleurs.fr/_releases/{desc['version']}/{desc['target']}/garage -O {desc['path']}") shared.exec(f"chmod +x {desc['path']}") shared.exec(f"{desc['path']} --version") + + for flav, desc in minio.items(): + if "version" not in desc: continue + if exists(desc['path']): continue + + shared.exec(f"mkdir -p {shared.binary_path}") + shared.exec(f"wget https://dl.min.io/server/minio/release/linux-amd64/archive/minio.RELEASE.{desc['version']} -O {desc['path']}") + shared.exec(f"chmod +x {desc['path']}") + shared.exec(f"{desc['path']} --version") + diff --git a/scenarios/fragments/minio.py b/scenarios/fragments/minio.py index 431b983..ab6f476 100644 --- a/scenarios/fragments/minio.py +++ b/scenarios/fragments/minio.py @@ -1,62 +1,136 @@ #!/usr/bin/env python3 -import json, os, sys, time, pathlib, socket, shutil +import json, os, sys, time, pathlib, socket, shutil, urllib3 +import minio +from os import environ as env +from pathlib import Path +from . import shared, flavor -STORAGE_PATH = os.path.join(os.getcwd(), '.minio-testnet') -HOSTS_PATH = os.path.join(STORAGE_PATH, 'hosts.txt') -UNIX_SOCK = os.path.join(STORAGE_PATH, 'deploy.sock') -DATA_PATH = lambda nid: os.path.join(STORAGE_PATH, 'data'+str(nid)) +storage_path = "./i/am/not/defined" +version = flavor.minio["minio-20220917"] +unix_sock = str(Path(shared.storage_path) / "minio.sock") +access_key = "minioadmin" +secret_key = "minioadmin" +client = minio.Minio( + f"[{env['IP']}]:9000", + access_key="minioadmin", + secret_key="minioadmin", + secure=False, + http_client=urllib3.PoolManager( + timeout=5, + retries=1, + ) +) -def main(): - if int(os.environ['ID']) == 1: leader() - else: follower() +if 'HOST' in env: + storage_path = str(Path(shared.storage_path) / "minio" / env['HOST']) + if 'ZONE' in env and env['ZONE'] != "": + storage_path = str(Path(shared.storage_path) / "minio" / env['ZONE'] / env['HOST']) -def leader(): - shutil.rmtree(STORAGE_PATH, ignore_errors=True) - os.makedirs(STORAGE_PATH) - print(STORAGE_PATH) +stdout = Path(storage_path) / "minio.stdout" +stderr = Path(storage_path) / "minio.stderr" +pid = Path(storage_path) / "daemon.pid" + +def destroy(): + if os.path.exists(pid): + try: + shared.exec(f"kill -9 `cat {pid}`") + except: + pass + + if len(str(storage_path)) > 8: + shutil.rmtree(storage_path, ignore_errors=True) + +def deploy_coord(): + destroy() + if os.path.exists(unix_sock): + os.unlink(unix_sock) + + os.makedirs(storage_path) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.bind(UNIX_SOCK) + sock.bind(unix_sock) sock.listen() - n_serv = int(os.environ['SERVER_COUNT']) - fl = [ co for co, addr in [ sock.accept() for i in range(n_serv - 1) ]] + # Create sockets + fl = [ co for co, addr in [ sock.accept() for i in range(1, shared.count()) ]] - identities = [ json.loads(co.makefile().readline()) for co in fl ] + [ { "ip": os.environ['IP'], "path": make_data() } ] - print(f"ident: {identities}") + # Receive configurations, centralize them + me = [ { "ip": os.environ['IP'], "path": storage_path } ] + others = [ json.loads(co.makefile().readline()) for co in fl ] + identities = others + me + shared.log(f"ident: {identities}") + + # Dispatch them msg = f"{json.dumps(identities)}\n".encode() [ co.send(msg) for co in fl ] run_minio(identities) -def follower(): + while True: + try: + if client.bucket_exists("sync"): break + client.make_bucket("sync") + break + except Exception as e: + shared.log("waiting for bootstrap...", e) + time.sleep(1) + shared.log("ready") + +def deploy_follow(): + destroy() + os.makedirs(storage_path) + co = None while True: - time.sleep(1) try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - sock.connect(UNIX_SOCK) + sock.connect(unix_sock) co = sock.makefile() break except Exception as err: - print('conn failed, wait,', err) - my_identity = json.dumps({ "ip": os.environ['IP'], "path": make_data() }) + shared.log('conn failed, wait 1 sec, err is', err) + time.sleep(1) + + # send my identity + my_identity = json.dumps({ "ip": os.environ['IP'], "path": storage_path }) sock.send(f"{my_identity}\n".encode()) + + # get all identities = json.loads(co.readline()) run_minio(identities) + sync_on_bucket_up() + shared.log("ready") -def make_data(): - data_path = DATA_PATH(os.environ['ID']) - os.makedirs(data_path) - return data_path +def sync_on_bucket_up(): + while True: + try: + if client.bucket_exists("sync"): break + except: + pass + shared.log("waiting for bucket 'sync'...") + time.sleep(1) + +def sync_on_bucket_down(): + while True: + if not client.bucket_exists("sync"): break + time.sleep(1) + +def delete_sync_bucket(): + client.remove_bucket("sync") def run_minio(identities): - cmd = f"minio server --console-address ':9001' --address ':9000'" + + # Required to prevent Minio error: "/tmp/mknet-store/minio/node1` is part of root drive, will not be used" + # https://github.com/minio/minio/issues/15720 + env['CI'] = "true" + env['MINIO_CI_CD'] = "true" + + cmd = f"{version['path']} server --console-address ':9001' --address ':9000'" for ident in identities: cmd += f" http://[{ident['ip']}]:9000{ident['path']}" - cmd += f" > {os.path.join(STORAGE_PATH, 'minio'+os.environ['ID']+'.log')} 2>&1" - print("launch: ", cmd) - os.system(cmd) + cmd += f" > {stdout} 2> {stderr}" + cmd += f" & echo $! > {pid}" -__name__ == '__main__' and main() + shared.log("launch: ", cmd) + os.system(cmd) diff --git a/scenarios/minio-manual b/scenarios/minio-manual new file mode 100755 index 0000000..d151584 --- /dev/null +++ b/scenarios/minio-manual @@ -0,0 +1,12 @@ +#!/usr/bin/env python3 +from fragments import shared, flavor, minio +import sys + +for fl in sys.argv[1:]: + if fl in flavor.minio: + minio.version = flavor.minio[fl] + +if shared.id() == 1: + minio.deploy_coord() +else: + minio.deploy_follow() diff --git a/scenarios/requirements.txt b/scenarios/requirements.txt index 41a5912..b85592f 100644 --- a/scenarios/requirements.txt +++ b/scenarios/requirements.txt @@ -1 +1,2 @@ git+https://git.deuxfleurs.fr/quentin/garage-admin-sdk@7b1c1faf7a#egg=garage-admin-sdk&subdirectory=python +minio diff --git a/topo/1mbps.yml b/topo/1mbps.yml index 0508a18..e4e04dd 100644 --- a/topo/1mbps.yml +++ b/topo/1mbps.yml @@ -2,7 +2,8 @@ links: - &slow bandwidth: 1M latency: 500us - txqueuelen: 10 + #txqueuelen: 10 + limit: 10 - &1000 bandwidth: 1000M latency: 100us