mknet/scenarios/fragments/garage.py

215 lines
6.1 KiB
Python

import glob, json, time
from os.path import exists
from os import environ as env
from pathlib import Path
from . import shared
try:
import garage_admin_sdk
from garage_admin_sdk.api import nodes_api, layout_api, key_api
from garage_admin_sdk.model.node_cluster_info import NodeClusterInfo
from garage_admin_sdk.model.layout_version import LayoutVersion
from garage_admin_sdk.model.add_key_request import AddKeyRequest
from garage_admin_sdk.model.update_key_request import UpdateKeyRequest
from garage_admin_sdk.model.update_key_request_allow import UpdateKeyRequestAllow
except:
pass
storage_path = "./i/am/not/defined"
rpc_secret = "3e9abff5f9e480afbadb46a77b7a26fe0e404258f0dc3fd5386b0ba8e0ad2fba"
metrics = "cacce0b2de4bc2d9f5b5fdff551e01ac1496055aed248202d415398987e35f81"
admin = "ae8cb40ea7368bbdbb6430af11cca7da833d3458a5f52086f4e805a570fb5c2a"
key = None
version_flavor = {
"garage-local": { "path": "./garage/target/release/garage" },
"garage-v0.7": { "version": "v0.7.3", "target": "x86_64-unknown-linux-musl" },
"garage-v0.8": { "version": "89b8087ba81c508ba382aa6c9cb6bb3afa6a43c8", "target": "x86_64-unknown-linux-musl" },
}
version = version_flavor["garage-v0.7"]
configuration = garage_admin_sdk.Configuration(
host = "http://localhost:3903/v0",
access_token = admin
)
api = garage_admin_sdk.ApiClient(configuration)
nodes = nodes_api.NodesApi(api)
layout = layout_api.LayoutApi(api)
keys = key_api.KeyApi(api)
# Setup, launch on import
if 'HOST' in env:
storage_path = Path(shared.storage_path) / "garage" / env['HOST']
if 'ZONE' in env and env['ZONE'] != "":
storage_path = Path(shared.storage_path) / "garage" / env['ZONE'] / env['HOST']
config = storage_path / "garage.toml"
env['GARAGE_CONFIG_FILE'] = str(config)
def deploy_coord(version=None, target=None):
destroy()
shared.log("start daemon")
daemon()
shared.log("discover nodes")
connect()
shared.log("build layout")
create_layout()
shared.log("create key")
create_key()
shared.log("ready")
def deploy_follow(version=None, target=None):
destroy()
shared.log("start daemon")
daemon()
shared.log("wait for coord")
sync_on_key_up()
shared.log("ready")
def path(vers=None):
if vers is None:
vers = version
if "path" in vers: return vers["path"]
else:
binary = f"garage-{vers['target']}-{vers['version']}"
return Path(shared.binary_path) / binary
def download():
for flav, version in version_flavor.items():
if "path" in version: continue
p = path(vers=version)
if exists(p): continue
shared.exec(f"mkdir -p {shared.binary_path}")
shared.exec(f"wget https://garagehq.deuxfleurs.fr/_releases/{version['version']}/{version['target']}/garage -O {p}")
shared.exec(f"chmod +x {p}")
shared.exec(f"{p} --version")
def daemon():
shared.exec(f"mkdir -p {storage_path}")
with open(config, 'w+') as f:
f.write(f"""
metadata_dir = "{storage_path}/meta"
data_dir = "{storage_path}/data"
replication_mode = "3"
rpc_bind_addr = "[::]:3901"
rpc_public_addr = "[{env['IP']}]:3901"
rpc_secret = "{rpc_secret}"
bootstrap_peers=[]
[s3_api]
s3_region = "garage"
api_bind_addr = "[::]:3900"
root_domain = ".s3.garage"
[s3_web]
bind_addr = "[::]:3902"
root_domain = ".web.garage"
index = "index.html"
[admin]
api_bind_addr = "0.0.0.0:3903"
metrics_token = "{metrics}"
admin_token = "{admin}"
""")
shared.exec(f"{path()} server 2>> {storage_path}/logs.stderr 1>> {storage_path}/logs.stdout & echo $! > {storage_path}/daemon.pid")
time.sleep(1)
node_info = storage_path / "node_info"
node_id = shared.fn_retry(lambda: nodes.get_nodes().node)
with open(node_info, 'w+') as f:
f.write(json.dumps({
"node_addr": f"{node_id}@{env['IP']}:3901",
"node_id": node_id,
"zone": env['ZONE'] if 'ZONE' in env and env['ZONE'] != "" else env['HOST'],
"host": env['HOST'],
}))
def destroy():
dpid = Path(storage_path) / "daemon.pid"
if exists(dpid):
try:
shared.exec(f"kill -9 $(cat {dpid})")
except:
pass
shared.exec(f"rm -f {dpid}")
if len(str(storage_path)) < 8: # arbitrary, stupid safe guard
print(storage_path)
raise Exception("You tried to clean a storage path that might be the root of your FS, panicking...")
shared.exec(f"rm -fr {storage_path}")
# this function is ugly, sorry :s
_cluster_info = None
def cluster_info():
global _cluster_info
if _cluster_info is not None: return _cluster_info
while True:
time.sleep(1)
node_files = glob.glob(f"{shared.storage_path}/**/node_info", recursive=True)
if len(node_files) == shared.count(): break
_cluster_info = [ json.loads(Path(f).read_text()) for f in node_files ]
return _cluster_info
def connect():
cinf = cluster_info()
ret = nodes.add_node([n['node_addr'] for n in cinf])
for st in ret:
if not st.success:
raise Exception("Node connect failed", ret)
shared.log("all nodes connected")
def create_layout():
v = layout.get_layout().version
cinf = cluster_info()
nlay = dict()
for n in cinf:
nlay[n['node_id']] = NodeClusterInfo(
zone = n['zone'],
capacity = 1,
tags = [ n['host'] ],
)
layout.add_layout(nlay)
layout.apply_layout(LayoutVersion(version=v+1))
def create_key():
global key
kinfo = shared.fn_retry(lambda: keys.add_key(AddKeyRequest(name="mknet")))
allow_create = UpdateKeyRequestAllow(create_bucket=True)
keys.update_key(kinfo.access_key_id, UpdateKeyRequest(allow=allow_create))
key = kinfo
def delete_key():
global key
keys.delete_key(key.access_key_id)
key = None
def sync_on_key_up():
global key
while True:
try:
key = keys.search_key("mknet")
return key
except:
pass
time.sleep(1)
def sync_on_key_down():
while True:
try:
keys.search_key("mknet")
except:
return
time.sleep(1)