garage-admin-sdk-python/garage_admin_sdk/rest.py

352 lines
14 KiB
Python

"""
Garage Administration API v0+garage-v0.9.0
Administrate your Garage cluster programatically, including status, layout, keys, buckets, and maintainance tasks. *Disclaimer: The API is not stable yet, hence its v0 tag. The API can change at any time, and changes can include breaking backward compatibility. Read the changelog and upgrade your scripts before upgrading. Additionnaly, this specification is very early stage and can contain bugs, especially on error return codes/types that are not tested yet. Do not expect a well finished and polished product!* # noqa: E501
The version of the OpenAPI document: v0.9.0
Generated by: https://openapi-generator.tech
"""
import io
import json
import logging
import re
import ssl
from urllib.parse import urlencode
from urllib.parse import urlparse
from urllib.request import proxy_bypass_environment
import urllib3
import ipaddress
from garage_admin_sdk.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError
logger = logging.getLogger(__name__)
class RESTResponse(io.IOBase):
def __init__(self, resp):
self.urllib3_response = resp
self.status = resp.status
self.reason = resp.reason
self.data = resp.data
def getheaders(self):
"""Returns a dictionary of the response headers."""
return self.urllib3_response.getheaders()
def getheader(self, name, default=None):
"""Returns a given response header."""
return self.urllib3_response.getheader(name, default)
class RESTClientObject(object):
def __init__(self, configuration, pools_size=4, maxsize=None):
# urllib3.PoolManager will pass all kw parameters to connectionpool
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501
# https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501
# maxsize is the number of requests to host that are allowed in parallel # noqa: E501
# Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501
# cert_reqs
if configuration.verify_ssl:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
addition_pool_args = {}
if configuration.assert_hostname is not None:
addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501
if configuration.retries is not None:
addition_pool_args['retries'] = configuration.retries
if configuration.socket_options is not None:
addition_pool_args['socket_options'] = configuration.socket_options
if maxsize is None:
if configuration.connection_pool_maxsize is not None:
maxsize = configuration.connection_pool_maxsize
else:
maxsize = 4
# https pool manager
if configuration.proxy and not should_bypass_proxies(
configuration.host, no_proxy=configuration.no_proxy or ''):
self.pool_manager = urllib3.ProxyManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=configuration.ssl_ca_cert,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
proxy_url=configuration.proxy,
proxy_headers=configuration.proxy_headers,
**addition_pool_args
)
else:
self.pool_manager = urllib3.PoolManager(
num_pools=pools_size,
maxsize=maxsize,
cert_reqs=cert_reqs,
ca_certs=configuration.ssl_ca_cert,
cert_file=configuration.cert_file,
key_file=configuration.key_file,
**addition_pool_args
)
def request(self, method, url, query_params=None, headers=None,
body=None, post_params=None, _preload_content=True,
_request_timeout=None):
"""Perform requests.
:param method: http request method
:param url: http request url
:param query_params: query parameters in the url
:param headers: http request headers
:param body: request json body, for `application/json`
:param post_params: request post parameters,
`application/x-www-form-urlencoded`
and `multipart/form-data`
:param _preload_content: if False, the urllib3.HTTPResponse object will
be returned without reading/decoding response
data. Default is True.
:param _request_timeout: timeout setting for this request. If one
number provided, it will be total request
timeout. It can also be a pair (tuple) of
(connection, read) timeouts.
"""
method = method.upper()
assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT',
'PATCH', 'OPTIONS']
if post_params and body:
raise ApiValueError(
"body parameter cannot be used with post_params parameter."
)
post_params = post_params or {}
headers = headers or {}
timeout = None
if _request_timeout:
if isinstance(_request_timeout, (int, float)): # noqa: E501,F821
timeout = urllib3.Timeout(total=_request_timeout)
elif (isinstance(_request_timeout, tuple) and
len(_request_timeout) == 2):
timeout = urllib3.Timeout(
connect=_request_timeout[0], read=_request_timeout[1])
try:
# For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE`
if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']:
# Only set a default Content-Type for POST, PUT, PATCH and OPTIONS requests
if (method != 'DELETE') and ('Content-Type' not in headers):
headers['Content-Type'] = 'application/json'
if query_params:
url += '?' + urlencode(query_params)
if ('Content-Type' not in headers) or (re.search('json',
headers['Content-Type'], re.IGNORECASE)):
request_body = None
if body is not None:
request_body = json.dumps(body)
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=False,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
elif headers['Content-Type'] == 'multipart/form-data':
# must del headers['Content-Type'], or the correct
# Content-Type which generated by urllib3 will be
# overwritten.
del headers['Content-Type']
r = self.pool_manager.request(
method, url,
fields=post_params,
encode_multipart=True,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
# Pass a `string` parameter directly in the body to support
# other content types than Json when `body` argument is
# provided in serialized form
elif isinstance(body, str) or isinstance(body, bytes):
request_body = body
r = self.pool_manager.request(
method, url,
body=request_body,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
else:
# Cannot generate the request from given parameters
msg = """Cannot prepare a request message for provided
arguments. Please check that your arguments match
declared content type."""
raise ApiException(status=0, reason=msg)
# For `GET`, `HEAD`
else:
r = self.pool_manager.request(method, url,
fields=query_params,
preload_content=_preload_content,
timeout=timeout,
headers=headers)
except urllib3.exceptions.SSLError as e:
msg = "{0}\n{1}".format(type(e).__name__, str(e))
raise ApiException(status=0, reason=msg)
if _preload_content:
r = RESTResponse(r)
# log response body
logger.debug("response body: %s", r.data)
if not 200 <= r.status <= 299:
if r.status == 401:
raise UnauthorizedException(http_resp=r)
if r.status == 403:
raise ForbiddenException(http_resp=r)
if r.status == 404:
raise NotFoundException(http_resp=r)
if 500 <= r.status <= 599:
raise ServiceException(http_resp=r)
raise ApiException(http_resp=r)
return r
def GET(self, url, headers=None, query_params=None, _preload_content=True,
_request_timeout=None):
return self.request("GET", url,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
query_params=query_params)
def HEAD(self, url, headers=None, query_params=None, _preload_content=True,
_request_timeout=None):
return self.request("HEAD", url,
headers=headers,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
query_params=query_params)
def OPTIONS(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("OPTIONS", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def DELETE(self, url, headers=None, query_params=None, body=None,
_preload_content=True, _request_timeout=None):
return self.request("DELETE", url,
headers=headers,
query_params=query_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def POST(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("POST", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def PUT(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("PUT", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
def PATCH(self, url, headers=None, query_params=None, post_params=None,
body=None, _preload_content=True, _request_timeout=None):
return self.request("PATCH", url,
headers=headers,
query_params=query_params,
post_params=post_params,
_preload_content=_preload_content,
_request_timeout=_request_timeout,
body=body)
# end of class RESTClientObject
def is_ipv4(target):
""" Test if IPv4 address or not
"""
try:
chk = ipaddress.IPv4Address(target)
return True
except ipaddress.AddressValueError:
return False
def in_ipv4net(target, net):
""" Test if target belongs to given IPv4 network
"""
try:
nw = ipaddress.IPv4Network(net)
ip = ipaddress.IPv4Address(target)
if ip in nw:
return True
return False
except ipaddress.AddressValueError:
return False
except ipaddress.NetmaskValueError:
return False
def should_bypass_proxies(url, no_proxy=None):
""" Yet another requests.should_bypass_proxies
Test if proxies should not be used for a particular url.
"""
parsed = urlparse(url)
# special cases
if parsed.hostname in [None, '']:
return True
# special cases
if no_proxy in [None, '']:
return False
if no_proxy == '*':
return True
no_proxy = no_proxy.lower().replace(' ', '');
entries = (
host for host in no_proxy.split(',') if host
)
if is_ipv4(parsed.hostname):
for item in entries:
if in_ipv4net(parsed.hostname, item):
return True
return proxy_bypass_environment(parsed.hostname, {'no': no_proxy})