#!/usr/bin/env python3 import json, os, sys, time, pathlib, socket, shutil, urllib3 import minio from os import environ as env from pathlib import Path from . import shared, flavor storage_path = "./i/am/not/defined" version = flavor.minio["minio-20220917"] unix_sock = str(Path(shared.storage_path) / "minio.sock") access_key = "minioadmin" secret_key = "minioadmin" client = minio.Minio( f"[{env['IP']}]:9000", access_key="minioadmin", secret_key="minioadmin", secure=False, http_client=urllib3.PoolManager( timeout=2, retries=False, ) ) if 'HOST' in env: storage_path = str(Path(shared.storage_path) / "minio" / env['HOST']) if 'ZONE' in env and env['ZONE'] != "": storage_path = str(Path(shared.storage_path) / "minio" / env['ZONE'] / env['HOST']) stdout = Path(storage_path) / "minio.stdout" stderr = Path(storage_path) / "minio.stderr" pid = Path(storage_path) / "daemon.pid" def destroy(): if os.path.exists(pid): try: shared.exec(f"kill -9 `cat {pid}`") except: pass if len(str(storage_path)) > 8: shutil.rmtree(storage_path, ignore_errors=True) def deploy_coord(): destroy() if os.path.exists(unix_sock): os.unlink(unix_sock) os.makedirs(storage_path) sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.bind(unix_sock) sock.listen() # Create sockets fl = [ co for co, addr in [ sock.accept() for i in range(1, shared.count()) ]] # Receive configurations, centralize them me = [ { "ip": os.environ['IP'], "path": storage_path } ] others = [ json.loads(co.makefile().readline()) for co in fl ] identities = others + me shared.log(f"ident: {identities}") # Dispatch them msg = f"{json.dumps(identities)}\n".encode() [ co.send(msg) for co in fl ] run_minio(identities) while True: try: if client.bucket_exists("sync"): break client.make_bucket("sync") break except Exception as e: shared.log("waiting for bootstrap...", e) time.sleep(1) shared.log("ready") def deploy_follow(): destroy() os.makedirs(storage_path) co = None while True: try: sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(unix_sock) co = sock.makefile() break except Exception as err: shared.log('conn failed, wait 1 sec, err is', err) time.sleep(1) # send my identity my_identity = json.dumps({ "ip": os.environ['IP'], "path": storage_path }) sock.send(f"{my_identity}\n".encode()) # get all identities = json.loads(co.readline()) run_minio(identities) sync_on_bucket_up() shared.log("ready") def sync_on_bucket_up(): while True: try: if client.bucket_exists("sync"): break except: pass shared.log("waiting for bucket 'sync'...") time.sleep(1) def sync_on_bucket_down(): while True: try: if not client.bucket_exists("sync"): break time.sleep(1) except Exception as e: shared.log("the cluster is probably already half shutdown, so errors are expected ->", e) break def delete_sync_bucket(): client.remove_bucket("sync") def run_minio(identities): # Required to prevent Minio error: "/tmp/mknet-store/minio/node1` is part of root drive, will not be used" # https://github.com/minio/minio/issues/15720 env['CI'] = "true" env['MINIO_CI_CD'] = "true" cmd = f"{version['path']} server --console-address ':9001' --address ':9000'" for ident in identities: cmd += f" http://[{ident['ip']}]:9000{ident['path']}" cmd += f" > {stdout} 2> {stderr}" cmd += f" & echo $! > {pid}" shared.log("launch: ", cmd) os.system(cmd)