""" Garage Administration API v0+garage-v0.7.3 Administrate your Garage cluster programatically, including status, layout, keys, buckets, and maintainance tasks. *Disclaimer: The API is not stable yet, hence its v0 tag. The API can change at any time, and changes can include breaking backward compatibility. Read the changelog and upgrade your scripts before upgrading. Additionnaly, this specification is very early stage and can contain bugs, especially on error return codes/types that are not tested yet. Do not expect a well finished and polished product!* # noqa: E501 The version of the OpenAPI document: v0.7.3 Generated by: https://openapi-generator.tech """ import io import json import logging import re import ssl from urllib.parse import urlencode from urllib.parse import urlparse from urllib.request import proxy_bypass_environment import urllib3 import ipaddress from garage_admin_sdk.exceptions import ApiException, UnauthorizedException, ForbiddenException, NotFoundException, ServiceException, ApiValueError logger = logging.getLogger(__name__) class RESTResponse(io.IOBase): def __init__(self, resp): self.urllib3_response = resp self.status = resp.status self.reason = resp.reason self.data = resp.data def getheaders(self): """Returns a dictionary of the response headers.""" return self.urllib3_response.getheaders() def getheader(self, name, default=None): """Returns a given response header.""" return self.urllib3_response.getheader(name, default) class RESTClientObject(object): def __init__(self, configuration, pools_size=4, maxsize=None): # urllib3.PoolManager will pass all kw parameters to connectionpool # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/poolmanager.py#L75 # noqa: E501 # https://github.com/shazow/urllib3/blob/f9409436f83aeb79fbaf090181cd81b784f1b8ce/urllib3/connectionpool.py#L680 # noqa: E501 # maxsize is the number of requests to host that are allowed in parallel # noqa: E501 # Custom SSL certificates and client certificates: http://urllib3.readthedocs.io/en/latest/advanced-usage.html # noqa: E501 # cert_reqs if configuration.verify_ssl: cert_reqs = ssl.CERT_REQUIRED else: cert_reqs = ssl.CERT_NONE addition_pool_args = {} if configuration.assert_hostname is not None: addition_pool_args['assert_hostname'] = configuration.assert_hostname # noqa: E501 if configuration.retries is not None: addition_pool_args['retries'] = configuration.retries if configuration.socket_options is not None: addition_pool_args['socket_options'] = configuration.socket_options if maxsize is None: if configuration.connection_pool_maxsize is not None: maxsize = configuration.connection_pool_maxsize else: maxsize = 4 # https pool manager if configuration.proxy and not should_bypass_proxies( configuration.host, no_proxy=configuration.no_proxy or ''): self.pool_manager = urllib3.ProxyManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=configuration.ssl_ca_cert, cert_file=configuration.cert_file, key_file=configuration.key_file, proxy_url=configuration.proxy, proxy_headers=configuration.proxy_headers, **addition_pool_args ) else: self.pool_manager = urllib3.PoolManager( num_pools=pools_size, maxsize=maxsize, cert_reqs=cert_reqs, ca_certs=configuration.ssl_ca_cert, cert_file=configuration.cert_file, key_file=configuration.key_file, **addition_pool_args ) def request(self, method, url, query_params=None, headers=None, body=None, post_params=None, _preload_content=True, _request_timeout=None): """Perform requests. :param method: http request method :param url: http request url :param query_params: query parameters in the url :param headers: http request headers :param body: request json body, for `application/json` :param post_params: request post parameters, `application/x-www-form-urlencoded` and `multipart/form-data` :param _preload_content: if False, the urllib3.HTTPResponse object will be returned without reading/decoding response data. Default is True. :param _request_timeout: timeout setting for this request. If one number provided, it will be total request timeout. It can also be a pair (tuple) of (connection, read) timeouts. """ method = method.upper() assert method in ['GET', 'HEAD', 'DELETE', 'POST', 'PUT', 'PATCH', 'OPTIONS'] if post_params and body: raise ApiValueError( "body parameter cannot be used with post_params parameter." ) post_params = post_params or {} headers = headers or {} timeout = None if _request_timeout: if isinstance(_request_timeout, (int, float)): # noqa: E501,F821 timeout = urllib3.Timeout(total=_request_timeout) elif (isinstance(_request_timeout, tuple) and len(_request_timeout) == 2): timeout = urllib3.Timeout( connect=_request_timeout[0], read=_request_timeout[1]) try: # For `POST`, `PUT`, `PATCH`, `OPTIONS`, `DELETE` if method in ['POST', 'PUT', 'PATCH', 'OPTIONS', 'DELETE']: # Only set a default Content-Type for POST, PUT, PATCH and OPTIONS requests if (method != 'DELETE') and ('Content-Type' not in headers): headers['Content-Type'] = 'application/json' if query_params: url += '?' + urlencode(query_params) if ('Content-Type' not in headers) or (re.search('json', headers['Content-Type'], re.IGNORECASE)): request_body = None if body is not None: request_body = json.dumps(body) r = self.pool_manager.request( method, url, body=request_body, preload_content=_preload_content, timeout=timeout, headers=headers) elif headers['Content-Type'] == 'application/x-www-form-urlencoded': # noqa: E501 r = self.pool_manager.request( method, url, fields=post_params, encode_multipart=False, preload_content=_preload_content, timeout=timeout, headers=headers) elif headers['Content-Type'] == 'multipart/form-data': # must del headers['Content-Type'], or the correct # Content-Type which generated by urllib3 will be # overwritten. del headers['Content-Type'] r = self.pool_manager.request( method, url, fields=post_params, encode_multipart=True, preload_content=_preload_content, timeout=timeout, headers=headers) # Pass a `string` parameter directly in the body to support # other content types than Json when `body` argument is # provided in serialized form elif isinstance(body, str) or isinstance(body, bytes): request_body = body r = self.pool_manager.request( method, url, body=request_body, preload_content=_preload_content, timeout=timeout, headers=headers) else: # Cannot generate the request from given parameters msg = """Cannot prepare a request message for provided arguments. Please check that your arguments match declared content type.""" raise ApiException(status=0, reason=msg) # For `GET`, `HEAD` else: r = self.pool_manager.request(method, url, fields=query_params, preload_content=_preload_content, timeout=timeout, headers=headers) except urllib3.exceptions.SSLError as e: msg = "{0}\n{1}".format(type(e).__name__, str(e)) raise ApiException(status=0, reason=msg) if _preload_content: r = RESTResponse(r) # log response body logger.debug("response body: %s", r.data) if not 200 <= r.status <= 299: if r.status == 401: raise UnauthorizedException(http_resp=r) if r.status == 403: raise ForbiddenException(http_resp=r) if r.status == 404: raise NotFoundException(http_resp=r) if 500 <= r.status <= 599: raise ServiceException(http_resp=r) raise ApiException(http_resp=r) return r def GET(self, url, headers=None, query_params=None, _preload_content=True, _request_timeout=None): return self.request("GET", url, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, query_params=query_params) def HEAD(self, url, headers=None, query_params=None, _preload_content=True, _request_timeout=None): return self.request("HEAD", url, headers=headers, _preload_content=_preload_content, _request_timeout=_request_timeout, query_params=query_params) def OPTIONS(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("OPTIONS", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) def DELETE(self, url, headers=None, query_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("DELETE", url, headers=headers, query_params=query_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) def POST(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("POST", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) def PUT(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("PUT", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) def PATCH(self, url, headers=None, query_params=None, post_params=None, body=None, _preload_content=True, _request_timeout=None): return self.request("PATCH", url, headers=headers, query_params=query_params, post_params=post_params, _preload_content=_preload_content, _request_timeout=_request_timeout, body=body) # end of class RESTClientObject def is_ipv4(target): """ Test if IPv4 address or not """ try: chk = ipaddress.IPv4Address(target) return True except ipaddress.AddressValueError: return False def in_ipv4net(target, net): """ Test if target belongs to given IPv4 network """ try: nw = ipaddress.IPv4Network(net) ip = ipaddress.IPv4Address(target) if ip in nw: return True return False except ipaddress.AddressValueError: return False except ipaddress.NetmaskValueError: return False def should_bypass_proxies(url, no_proxy=None): """ Yet another requests.should_bypass_proxies Test if proxies should not be used for a particular url. """ parsed = urlparse(url) # special cases if parsed.hostname in [None, '']: return True # special cases if no_proxy in [None, '']: return False if no_proxy == '*': return True no_proxy = no_proxy.lower().replace(' ', ''); entries = ( host for host in no_proxy.split(',') if host ) if is_ipv4(parsed.hostname): for item in entries: if in_ipv4net(parsed.hostname, item): return True return proxy_bypass_environment(parsed.hostname, {'no': no_proxy})