forked from Deuxfleurs/mknet
140 lines
3.8 KiB
Python
140 lines
3.8 KiB
Python
#!/usr/bin/env python3
|
|
import json, os, sys, time, pathlib, socket, shutil, urllib3
|
|
import minio
|
|
from os import environ as env
|
|
from pathlib import Path
|
|
from . import shared, flavor
|
|
|
|
storage_path = "./i/am/not/defined"
|
|
version = flavor.minio["minio-20220917"]
|
|
unix_sock = str(Path(shared.storage_path) / "minio.sock")
|
|
access_key = "minioadmin"
|
|
secret_key = "minioadmin"
|
|
client = minio.Minio(
|
|
f"[{env['IP']}]:9000",
|
|
access_key="minioadmin",
|
|
secret_key="minioadmin",
|
|
secure=False,
|
|
http_client=urllib3.PoolManager(
|
|
timeout=2,
|
|
retries=False,
|
|
)
|
|
)
|
|
|
|
if 'HOST' in env:
|
|
storage_path = str(Path(shared.storage_path) / "minio" / env['HOST'])
|
|
if 'ZONE' in env and env['ZONE'] != "":
|
|
storage_path = str(Path(shared.storage_path) / "minio" / env['ZONE'] / env['HOST'])
|
|
|
|
stdout = Path(storage_path) / "minio.stdout"
|
|
stderr = Path(storage_path) / "minio.stderr"
|
|
pid = Path(storage_path) / "daemon.pid"
|
|
|
|
def destroy():
|
|
if os.path.exists(pid):
|
|
try:
|
|
shared.exec(f"kill -9 `cat {pid}`")
|
|
except:
|
|
pass
|
|
|
|
if len(str(storage_path)) > 8:
|
|
shutil.rmtree(storage_path, ignore_errors=True)
|
|
|
|
def deploy_coord():
|
|
destroy()
|
|
if os.path.exists(unix_sock):
|
|
os.unlink(unix_sock)
|
|
|
|
os.makedirs(storage_path)
|
|
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.bind(unix_sock)
|
|
sock.listen()
|
|
|
|
# Create sockets
|
|
fl = [ co for co, addr in [ sock.accept() for i in range(1, shared.count()) ]]
|
|
|
|
# Receive configurations, centralize them
|
|
me = [ { "ip": os.environ['IP'], "path": storage_path } ]
|
|
others = [ json.loads(co.makefile().readline()) for co in fl ]
|
|
identities = others + me
|
|
shared.log(f"ident: {identities}")
|
|
|
|
# Dispatch them
|
|
msg = f"{json.dumps(identities)}\n".encode()
|
|
[ co.send(msg) for co in fl ]
|
|
|
|
run_minio(identities)
|
|
|
|
while True:
|
|
try:
|
|
if client.bucket_exists("sync"): break
|
|
client.make_bucket("sync")
|
|
break
|
|
except Exception as e:
|
|
shared.log("waiting for bootstrap...", e)
|
|
time.sleep(1)
|
|
shared.log("ready")
|
|
|
|
def deploy_follow():
|
|
destroy()
|
|
os.makedirs(storage_path)
|
|
|
|
co = None
|
|
while True:
|
|
try:
|
|
sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
|
|
sock.connect(unix_sock)
|
|
co = sock.makefile()
|
|
break
|
|
except Exception as err:
|
|
shared.log('conn failed, wait 1 sec, err is', err)
|
|
time.sleep(1)
|
|
|
|
# send my identity
|
|
my_identity = json.dumps({ "ip": os.environ['IP'], "path": storage_path })
|
|
sock.send(f"{my_identity}\n".encode())
|
|
|
|
# get all
|
|
identities = json.loads(co.readline())
|
|
|
|
run_minio(identities)
|
|
sync_on_bucket_up()
|
|
shared.log("ready")
|
|
|
|
def sync_on_bucket_up():
|
|
while True:
|
|
try:
|
|
if client.bucket_exists("sync"): break
|
|
except:
|
|
pass
|
|
shared.log("waiting for bucket 'sync'...")
|
|
time.sleep(1)
|
|
|
|
def sync_on_bucket_down():
|
|
while True:
|
|
try:
|
|
if not client.bucket_exists("sync"): break
|
|
time.sleep(1)
|
|
except Exception as e:
|
|
shared.log("the cluster is probably already half shutdown, so errors are expected ->", e)
|
|
break
|
|
|
|
def delete_sync_bucket():
|
|
client.remove_bucket("sync")
|
|
|
|
def run_minio(identities):
|
|
|
|
# Required to prevent Minio error: "/tmp/mknet-store/minio/node1` is part of root drive, will not be used"
|
|
# https://github.com/minio/minio/issues/15720
|
|
env['CI'] = "true"
|
|
env['MINIO_CI_CD'] = "true"
|
|
|
|
cmd = f"{version['path']} server --console-address ':9001' --address ':9000'"
|
|
for ident in identities:
|
|
cmd += f" http://[{ident['ip']}]:9000{ident['path']}"
|
|
cmd += f" > {stdout} 2> {stderr}"
|
|
cmd += f" & echo $! > {pid}"
|
|
|
|
shared.log("launch: ", cmd)
|
|
os.system(cmd)
|