Source code for cloudshell.api.common_cloudshell_api

#!/usr/bin/python
# -*- coding: utf-8 -*-

import importlib
import time
import types
import ssl
import sys
import urllib3
from urllib3.util import SSLContext

from urllib3.exceptions import HTTPError, ProtocolError, MaxRetryError, ReadTimeoutError
import xml.etree.ElementTree as etree
from collections import OrderedDict
from xml.sax.saxutils import escape

try:
    # Python 3
    import http
    import http.client as httplib
except ImportError:
    # Python 2
    import httplib

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

[docs]def get_http_status_message(status_code): try: return httplib.responses[status_code] except KeyError: return "Unknown Status Code"
if sys.version_info.major == 2: unicode = unicode str = str bytes = str basestring = basestring TYPE_TYPE = types.TypeType TYPE_CLASS = types.ClassType elif sys.version_info.major == 3: str = str unicode = str bytes = bytes basestring = (str, bytes) TYPE_TYPE = type TYPE_CLASS = type else: raise
[docs]class UnauthorizedError(Exception): pass
[docs]class XMLWrapper:
[docs] @staticmethod def parseXML(xml_str): return etree.fromstring(xml_str)
[docs] @staticmethod def getRootNode(node): return node.getroot()
[docs] @staticmethod def getChildNode(parent_node, child_name, find_prefix=''): return parent_node.find(find_prefix + child_name)
[docs] @staticmethod def getAllChildNode(parent_node, child_name, find_prefix=''): return parent_node.findall(find_prefix + child_name)
[docs] @staticmethod def getChildNodeByAttr(parent_node, child_name, attr_name, attr_value): return parent_node.find(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
[docs] @staticmethod def getAllChildNodeByAttr(parent_node, child_name, attr_name, attr_value): return parent_node.findall(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
[docs] @staticmethod def getNodeName(node): return node.tag
[docs] @staticmethod def getNodeText(node): return node.text
[docs] @staticmethod def getNodeAttr(node, attribute_name, find_prefix=''): return node.get(find_prefix + attribute_name)
[docs] @staticmethod def getNodePrefix(node, prefix_name): prefix = '' if len(node.attrib) == 0: return prefix for attrib_name, value in node.attrib.items(): if attrib_name[0] == "{": prefix, ignore, tag = attrib_name[1:].partition("}") return "{" + prefix + "}" return prefix
[docs] @staticmethod def getStringFromXML(node, pretty_print=False): return etree.tostring(node, pretty_print=pretty_print)
# map request class
[docs]class CommonAPIRequest: def __init__(self, **kwarg): self.attributes = [] for key, value in sorted(kwarg.items()): self.attributes.append(key) setattr(self, key, value) @staticmethod def _checkContainerValue(value): result_value = None if isinstance(value, list): result_value = list() for list_value in value: result_value.append(CommonAPIRequest.toContainer(list_value)) elif isinstance(value, CommonAPIRequest): result_value = CommonAPIRequest.toContainer(value) else: result_value = value return result_value
[docs] @staticmethod def toContainer(data): if isinstance(data, dict) or isinstance(data, OrderedDict): return data if isinstance(data, list): data_list = list() for value in data: data_list.append(CommonAPIRequest._checkContainerValue(value)) return data_list data_dict = OrderedDict() data_dict['__name__'] = data.__class__.__name__ for key in data.attributes: data_dict[key] = CommonAPIRequest._checkContainerValue(getattr(data, key)) # for key, value in data.__dict__.items(): # data_dict[key] = CommonAPIRequest._checkContainerValue(value) return data_dict
# end map request class
[docs]class CommonResponseInfo: def __init__(self, xml_object, find_prefix): self._parseAttributesData(self.__class__, xml_object, find_prefix) def _attributeCastToType(self, data_str, cast_type_name): default_value = 0 if cast_type_name == 'bool': default_value = False elif cast_type_name == 'float': default_value = 0.0 elif cast_type_name == 'str': default_value = '' cast_type = eval(cast_type_name) data = None if data_str is not None: data = default_value try: if cast_type_name == 'bool': data = (data_str.lower() in ['true', '1', 'yes', 'on']) else: data = cast_type(data_str) except UnicodeEncodeError as err: try: data = data_str.encode('utf-8') except: pass except ValueError as err: pass return data def _isAttributeTypeDefault(self, attr_type_name): return (attr_type_name == 'int' or attr_type_name == 'long' or attr_type_name == 'float' or attr_type_name == 'bool' or attr_type_name == 'str') def _is_empty_object(self, atrrib_data): for key, value in atrrib_data.items(): if isinstance(value, list) and len(value) > 0: return False if not isinstance(value, list) and value is not None: return False return True # This is actually the original version of _is_empty_object. # Since it does not consider an empty list as empty (returns False) it caused bugs with setting the # is_empty_object flag, but fixing that caused an issue on populating the data. # So I was forced to preserve the original version for compatibility with the data population logic. def _is_empty_object_except_empty_list(self, atrrib_data): for key, value in atrrib_data.items(): if isinstance(value, list) and len(value) > 0: return False if value is not None: return False return True def _append_object_list(self, attr_type_name, list_node, attr_type_instance, class_type, find_prefix): if self._isAttributeTypeDefault(attr_type_name): data_str = XMLWrapper.getNodeText(list_node) data = self._attributeCastToType(data_str, attr_type_name) else: if attr_type_instance == object: data = class_type(list_node, find_prefix) else: data = attr_type_instance(list_node, find_prefix) if not (hasattr(list_node, "attrib") and list_node.attrib): self._set_is_empty_to_true_if_not_set(data) if self._is_empty_true(data): return None else: return data def _set_is_empty_to_true_if_not_set(self, data): if not hasattr(data, "is_empty_object"): setattr(data, "is_empty_object", True) def _set_is_empty_to_false(self, data): setattr(data, "is_empty_object", False) def _is_empty_true(self, data): return hasattr(data, "is_empty_object") and data.is_empty_object def _parseAttributesData(self, class_type, xml_object, find_prefix): attrib_data_dict = dict() empty_object_size = len(self.__dict__) for name, attr_type in self.__dict__.items(): if not isinstance(attr_type, (TYPE_TYPE, TYPE_CLASS)) and not isinstance(attr_type, dict): continue if not isinstance(attr_type, dict): data = None attr_type_name = attr_type.__name__ if self._isAttributeTypeDefault(attr_type_name): data_str = XMLWrapper.getNodeAttr(xml_object, name) if data_str is None: child_attribute = XMLWrapper.getChildNode(xml_object, name) if child_attribute is not None: data_str = XMLWrapper.getNodeText(child_attribute) data = self._attributeCastToType(data_str, attr_type_name) else: child_node = XMLWrapper.getChildNode(xml_object, name) if child_node is not None: child_type = XMLWrapper.getNodeAttr(child_node, 'type', find_prefix) if child_type is None: data = attr_type(child_node, find_prefix) else: data = child_type(child_node, find_prefix) else: # continue data = None attrib_data_dict[name] = data else: child_node = XMLWrapper.getChildNode(xml_object, name) data_list = list() attr_type_instance = attr_type['list'] attr_type_name = attr_type_instance.__name__ if child_node is not None: child_count = 0 for list_node in child_node: data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance, class_type, find_prefix) if data_object is not None: data_list.append(data_object) child_count += 1 # I think that it is a logical bug, but ... # DannyK: In my opinion, this is done for backwards compatibility with element names that # originally represented a single item and later on changed to a list. # If a bug related to ghost objects is discovered in the future, # this is the first place I would start debugging. if child_count == 0: for list_node in xml_object: if XMLWrapper.getNodeName(list_node) == name: data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance, class_type, find_prefix) if data_object is not None: data_list.append(data_object) attrib_data_dict[name] = data_list if not self._is_empty_object(attrib_data_dict): self._set_is_empty_to_false(self) elif len(self.__dict__) == empty_object_size: self._set_is_empty_to_true_if_not_set(self) if not self._is_empty_object_except_empty_list(attrib_data_dict): for key, value in attrib_data_dict.items(): setattr(self, key, value)
[docs]class CommonApiResult: def __init__(self, xml_object): error_node = XMLWrapper.getChildNode(xml_object, 'Error') self.error = None if error_node is None else XMLWrapper.getNodeText(error_node) error_code_node = XMLWrapper.getChildNode(xml_object, 'ErrorCode') self.error_code = None if error_code_node is None else XMLWrapper.getNodeText(error_code_node) self.response_info = None response_info_node = XMLWrapper.getChildNode(xml_object, 'ResponseInfo') if response_info_node is not None: find_prefix = XMLWrapper.getNodePrefix(response_info_node, 'xsi') type_attr = XMLWrapper.getNodeAttr(response_info_node, find_prefix + 'type') if type_attr is not None: response_class = CommonApiResult.importAPIClass(type_attr) if response_class is not None: self.response_info = response_class(response_info_node, find_prefix) success = XMLWrapper.getNodeAttr(xml_object, 'Success') success = success.lower() self.success = success in ['true', 'yes', 'on']
[docs] @staticmethod def importAPIClass(name): module = importlib.import_module('cloudshell.api.cloudshell_api') if hasattr(module, name): return getattr(module, name) return None
[docs]class CloudShellAPIError(Exception): def __init__(self, code, message, rawxml): self.code = code self.message = message self.rawxml = rawxml def __str__(self): return 'CloudShell API error ' + str(self.code) + ': ' + self.message def __repr__(self): return 'CloudShell API error ' + str(self.code) + ': ' + self.message
[docs]class CommonAPISession: def __init__(self, host, username, password, domain, connection_pool_size=25, **kwargs): self.host = host self.username = username self.password = password self.domain = domain if sys.version_info[0] == 2 and sys.version_info[2] < 13: ssl_protocol = ssl.PROTOCOL_SSLv23 else: ssl_protocol = ssl.PROTOCOL_TLS ctx = SSLContext(ssl_protocol) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self._http = urllib3.PoolManager(num_pools=connection_pool_size, ssl_context=ctx, **kwargs) def _parseXML(self, xml_str): return etree.fromstring(xml_str) def _replaceSendValue(self, data): """Normalize xml string, escape special xml characters """ if data is None: return u'' try: data_str = unicode(data) except: data_str = unicode(data.decode("utf-8")) data_str = u"".join([escape(char) for char in data_str]) if data_str == 'True' or data_str == 'False': return data_str.lower() else: return data_str def _to_unicode_string(self, data): if data is None: return u'' try: return unicode(data) except: return unicode(data.decode("utf-8")) def _sendRequest(self, operation, message, request_headers, attempts=1): """ Sending http POST request through URLLIB package :param operation: operation name :param message: request body :param request_headers: header of the request :return: http response """ operation_url = str(self.url + operation) response = retry(self._http.request, ProtocolError, attempts, 1, method="POST", url=operation_url, body=message.encode('utf-8'), headers=request_headers) self._check_http_response(response) return response def _check_http_response(self, response): if not (200 <= response.status < 300): try: status_description = get_http_status_message(response.status) except ValueError: status_description = 'Unknown' if response.status == 401: raise UnauthorizedError( "({}) [{}] reason:{}".format(status_description, response.status, response.reason)) raise Exception( "({}) [{}] reason:{}".format(status_description, response.status, response.reason)) if not response.data: raise Exception(response.reason) def _new_serializeRequestData(self, root_node, object_data, prev_type=None): """Generate xml from received request data using etree.xml """ if isinstance(object_data, dict): if '__name__' in object_data: working_node = etree.SubElement(root_node, object_data.pop('__name__')) else: working_node = root_node for key, value in object_data.items(): if value is None: continue if isinstance(value, basestring): new_node = etree.SubElement(working_node, key) new_node.text = value elif isinstance(value, bool): new_node = etree.SubElement(working_node, key) new_node.text = str(value).lower() else: child_node = working_node if isinstance(value, list): child_node = etree.SubElement(working_node, key) serialized_node = self._new_serializeRequestData(child_node, value) return root_node elif isinstance(object_data, list): for value in object_data: serialized_node = self._new_serializeRequestData(root_node, value, list()) elif isinstance(object_data, basestring) or isinstance(object_data, int) or isinstance(object_data, float): if prev_type is not None and isinstance(prev_type, list): child_node = etree.SubElement(root_node, 'string') child_node.text = object_data elif isinstance(object_data, bool): root_node.text = str(object_data).lower() else: root_node.text = self._to_unicode_string(object_data) return root_node
[docs] def generateAPIRequest(self, kwargs): """ Generic method for generation and sending XML requests :param return_type: type of returning data :param kwargs: map of the parameters that need to be send to the server :return: string data or API object """ if 'method_name' not in kwargs: raise CloudShellAPIError(404, 'Key "method_name" not in input data!', '') method_name = kwargs.pop('method_name', None) message = self._serialize_request(kwargs, method_name) http_response = self._sendRequest(method_name, message) result = self._handle_api_response(http_response) return result
def _handle_api_response(self, response): ''' :param http response: :return: either string or deserialized instance representing the api response ''' response_str = response.data response_str = self._remove_xml_namespace(response_str) try: api_response = self._deserialize_response(response_str) except Exception: try: status_description = get_http_status_message(response.status) except ValueError: status_description = 'Unknown' raise Exception("{} ({}) [{}] reason:{} data:{}".format(response.reason, status_description, response.status, response.reason, str(response_str))) if not api_response.success: raise CloudShellAPIError(api_response.error_code, api_response.error, response_str) result = response_str if api_response.response_info: result = api_response.response_info return result def _serialize_request(self, kwargs, method_name): request_node = etree.Element(method_name) # request_str = '<' + method_name + '>\n' for name in kwargs: child_node = etree.SubElement(request_node, name) self._new_serializeRequestData(child_node, kwargs[name]) return etree.tostring(request_node).decode("utf-8") def _deserialize_response(self, response_str): response_xml = XMLWrapper.parseXML(response_str) api_result = CommonApiResult(response_xml) return api_result def _remove_xml_namespace(self, response_str): return response_str.replace(b'xmlns="http://schemas.qualisystems.com/ResourceManagement/ApiCommandResult.xsd"', b'') \ .replace(b'&#x0;', b'<NUL>') def __prettify_xml(self, elem): """Return a pretty-printed XML string for the Element. """ from xml.dom.minidom import parseString rough_string = etree.tostring(elem, 'utf-8') reparsed = parseString(rough_string) return reparsed.toprettyxml(indent="\t")
[docs]def retry(func, exception, attempts=1, delay=1, *args, **kwargs): for i in range(attempts): try: return func(*args, **kwargs) except (MaxRetryError, ProtocolError, ReadTimeoutError, HTTPError) as e: if i == attempts -1: raise e time.sleep(delay) delay *= 2 except Exception as e: raise e