forked from Deuxfleurs/mknet
229 lines
6.5 KiB
Python
229 lines
6.5 KiB
Python
import glob, json, time
|
|
from os.path import exists
|
|
from os import environ as env
|
|
from pathlib import Path
|
|
from . import shared, flavor
|
|
|
|
import garage_admin_sdk
|
|
from garage_admin_sdk.api import nodes_api, layout_api, key_api
|
|
from garage_admin_sdk.model.node_cluster_info import NodeClusterInfo
|
|
from garage_admin_sdk.model.layout_version import LayoutVersion
|
|
from garage_admin_sdk.model.add_key_request import AddKeyRequest
|
|
from garage_admin_sdk.model.update_key_request import UpdateKeyRequest
|
|
from garage_admin_sdk.model.update_key_request_allow import UpdateKeyRequestAllow
|
|
|
|
storage_path = "./i/am/not/defined"
|
|
rpc_secret = "3e9abff5f9e480afbadb46a77b7a26fe0e404258f0dc3fd5386b0ba8e0ad2fba"
|
|
metrics = "cacce0b2de4bc2d9f5b5fdff551e01ac1496055aed248202d415398987e35f81"
|
|
admin = "ae8cb40ea7368bbdbb6430af11cca7da833d3458a5f52086f4e805a570fb5c2a"
|
|
key = None
|
|
|
|
version = flavor.garage["garage-v0.7"]
|
|
|
|
configuration = garage_admin_sdk.Configuration(
|
|
host = "http://localhost:3903/v0",
|
|
access_token = admin
|
|
)
|
|
api = garage_admin_sdk.ApiClient(configuration)
|
|
nodes = nodes_api.NodesApi(api)
|
|
layout = layout_api.LayoutApi(api)
|
|
keys = key_api.KeyApi(api)
|
|
|
|
|
|
# Setup, launch on import
|
|
if 'HOST' in env:
|
|
storage_path = Path(shared.storage_path) / "garage" / env['HOST']
|
|
if 'ZONE' in env and env['ZONE'] != "":
|
|
storage_path = Path(shared.storage_path) / "garage" / env['ZONE'] / env['HOST']
|
|
config = storage_path / "garage.toml"
|
|
env['GARAGE_CONFIG_FILE'] = str(config)
|
|
|
|
def deploy_coord(gw=None, uroot={}, us3_api={}, us3_web={}, uadmin={}):
|
|
destroy()
|
|
shared.log("start daemon")
|
|
shared.exec(f"{version['path']} --version")
|
|
daemon(uroot,us3_api,us3_web,uadmin)
|
|
shared.log("discover nodes")
|
|
connect()
|
|
shared.log("build layout")
|
|
create_layout(gw=gw)
|
|
shared.log("create key")
|
|
create_key()
|
|
shared.log("ready")
|
|
|
|
def deploy_follow(uroot={}, us3_api={}, us3_web={}, uadmin={}):
|
|
destroy()
|
|
shared.log("start daemon")
|
|
daemon(uroot,us3_api,us3_web,uadmin)
|
|
shared.log("wait for coord")
|
|
sync_on_key_up()
|
|
shared.log("ready")
|
|
|
|
def to_toml(d): return "\n".join([ f"{k} = {v}" if type(v) is int else f"{k} = \"{v}\"" for k, v in d.items() ])
|
|
def daemon(uroot={}, us3_api={}, us3_web={}, uadmin={}):
|
|
root = {
|
|
"metadata_dir": f"{storage_path}/meta",
|
|
"data_dir": f"{storage_path}/data",
|
|
"replication_mode": "3",
|
|
"rpc_bind_addr": "[::]:3901",
|
|
"rpc_public_addr": f"[{env['IP']}]:3901",
|
|
"rpc_secret": f"{rpc_secret}",
|
|
}
|
|
root.update(uroot)
|
|
|
|
s3_api = {
|
|
"s3_region": "garage",
|
|
"api_bind_addr": "[::]:3900",
|
|
"root_domain": ".s3.garage",
|
|
}
|
|
s3_api.update(us3_api)
|
|
|
|
s3_web = {
|
|
"bind_addr": "[::]:3902",
|
|
"root_domain": ".web.garage",
|
|
"index": "index.html",
|
|
}
|
|
s3_web.update(us3_web)
|
|
|
|
sect_admin = {
|
|
"api_bind_addr": "0.0.0.0:3903",
|
|
"metrics_token": f"{metrics}",
|
|
"admin_token": f"{admin}",
|
|
}
|
|
sect_admin.update(uadmin)
|
|
|
|
|
|
shared.exec(f"mkdir -p {storage_path}")
|
|
with open(config, 'w+') as f:
|
|
f.write(f"""
|
|
{to_toml(root)}
|
|
bootstrap_peers = []
|
|
|
|
[s3_api]
|
|
{to_toml(s3_api)}
|
|
|
|
[s3_web]
|
|
{to_toml(s3_web)}
|
|
|
|
[admin]
|
|
{to_toml(sect_admin)}
|
|
""")
|
|
|
|
if shared.id() == 1:
|
|
shared.exec(f"{version['path']} --version")
|
|
shared.exec(f"{version['path']} server 2>> {storage_path}/logs.stderr 1>> {storage_path}/logs.stdout & echo $! > {storage_path}/daemon.pid")
|
|
time.sleep(1)
|
|
|
|
node_info = storage_path / "node_info"
|
|
node_id = shared.fn_retry(lambda: nodes.get_nodes().node)
|
|
with open(node_info, 'w+') as f:
|
|
f.write(json.dumps({
|
|
"node_addr": f"{node_id}@{env['IP']}:3901",
|
|
"node_id": node_id,
|
|
"zone": env['ZONE'] if 'ZONE' in env and env['ZONE'] != "" else env['HOST'],
|
|
"host": env['HOST'],
|
|
}))
|
|
|
|
def destroy():
|
|
dpid = Path(storage_path) / "daemon.pid"
|
|
if exists(dpid):
|
|
try:
|
|
shared.exec(f"kill -9 $(cat {dpid})")
|
|
except:
|
|
pass
|
|
shared.exec(f"rm -f {dpid}")
|
|
if len(str(storage_path)) < 8: # arbitrary, stupid safe guard
|
|
print(storage_path)
|
|
raise Exception("You tried to clean a storage path that might be the root of your FS, panicking...")
|
|
shared.exec(f"rm -fr {storage_path}")
|
|
|
|
# this function is ugly, sorry :s
|
|
_cluster_info = None
|
|
def cluster_info():
|
|
global _cluster_info
|
|
if _cluster_info is not None: return _cluster_info
|
|
shared.log("fetch cluster info")
|
|
|
|
while True:
|
|
node_files = glob.glob(f"{shared.storage_path}/**/node_info", recursive=True)
|
|
if len(node_files) >= shared.count(): break
|
|
shared.log(f"found {len(node_files)} over {shared.count()}, wait 1 sec.")
|
|
time.sleep(1)
|
|
|
|
_cluster_info = [ json.loads(Path(f).read_text()) for f in node_files ]
|
|
return _cluster_info
|
|
|
|
|
|
def connect():
|
|
cinf = cluster_info()
|
|
shared.log("start connections...")
|
|
while True:
|
|
try:
|
|
ret = nodes.add_node([n['node_addr'] for n in cinf], _request_timeout=3)
|
|
except:
|
|
shared.log("not ready, retry in 1sec")
|
|
time.sleep(1)
|
|
continue
|
|
|
|
for st in ret:
|
|
if not st.success:
|
|
continue
|
|
#raise Exception("Node connect failed", ret)
|
|
break
|
|
|
|
shared.log("all nodes connected")
|
|
|
|
def create_layout(gw=None):
|
|
if gw is None:
|
|
gw = []
|
|
|
|
v = layout.get_layout().version
|
|
|
|
cinf = cluster_info()
|
|
nlay = dict()
|
|
for n in cinf:
|
|
capa = 1
|
|
if n['host'] in gw:
|
|
capa = None
|
|
|
|
nlay[n['node_id']] = NodeClusterInfo(
|
|
zone = n['zone'],
|
|
capacity = capa,
|
|
tags = [ n['host'] ],
|
|
)
|
|
layout.add_layout(nlay)
|
|
layout.apply_layout(LayoutVersion(version=v+1))
|
|
|
|
shared.log(layout.get_layout())
|
|
|
|
def create_key():
|
|
global key
|
|
kinfo = shared.fn_retry(lambda: keys.add_key(AddKeyRequest(name="mknet")))
|
|
allow_create = UpdateKeyRequestAllow(create_bucket=True)
|
|
keys.update_key(kinfo.access_key_id, UpdateKeyRequest(allow=allow_create))
|
|
key = kinfo
|
|
|
|
|
|
def delete_key():
|
|
global key
|
|
keys.delete_key(key.access_key_id)
|
|
key = None
|
|
|
|
def sync_on_key_up():
|
|
global key
|
|
while True:
|
|
try:
|
|
key = keys.search_key("mknet")
|
|
return key
|
|
except:
|
|
pass
|
|
time.sleep(1)
|
|
|
|
def sync_on_key_down():
|
|
while True:
|
|
try:
|
|
keys.search_key("mknet")
|
|
except:
|
|
return
|
|
time.sleep(1)
|
|
|