Source code for cloudshell.api.common_cloudshell_api

#!/usr/bin/python
# -*- coding: utf-8 -*-

import importlib
import types
import ssl
import sys
import urllib3
import http
import urllib3.util as urllib_util
from urllib3.exceptions import HTTPError
import xml.etree.ElementTree as etree

from collections import OrderedDict
from xml.sax.saxutils import escape
import time
from urllib3.exceptions import ProtocolError, MaxRetryError, ReadTimeoutError

urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

if sys.version_info.major == 2:
    unicode = unicode
    str = str
    bytes = str
    basestring = basestring
    TYPE_TYPE = types.TypeType
    TYPE_CLASS = types.ClassType
elif sys.version_info.major == 3:
    str = str
    unicode = str
    bytes = bytes
    basestring = (str, bytes)
    TYPE_TYPE = type
    TYPE_CLASS = type
else:
    raise


[docs]class XMLWrapper:
[docs] @staticmethod def parseXML(xml_str): return etree.fromstring(xml_str)
[docs] @staticmethod def getRootNode(node): return node.getroot()
[docs] @staticmethod def getChildNode(parent_node, child_name, find_prefix=''): return parent_node.find(find_prefix + child_name)
[docs] @staticmethod def getAllChildNode(parent_node, child_name, find_prefix=''): return parent_node.findall(find_prefix + child_name)
[docs] @staticmethod def getChildNodeByAttr(parent_node, child_name, attr_name, attr_value): return parent_node.find(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
[docs] @staticmethod def getAllChildNodeByAttr(parent_node, child_name, attr_name, attr_value): return parent_node.findall(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
[docs] @staticmethod def getNodeName(node): return node.tag
[docs] @staticmethod def getNodeText(node): return node.text
[docs] @staticmethod def getNodeAttr(node, attribute_name, find_prefix=''): return node.get(find_prefix + attribute_name)
[docs] @staticmethod def getNodePrefix(node, prefix_name): prefix = '' if len(node.attrib) == 0: return prefix for attrib_name, value in node.attrib.items(): if attrib_name[0] == "{": prefix, ignore, tag = attrib_name[1:].partition("}") return "{" + prefix + "}" return prefix
[docs] @staticmethod def getStringFromXML(node, pretty_print=False): return etree.tostring(node, pretty_print=pretty_print)
# map request class
[docs]class CommonAPIRequest: def __init__(self, **kwarg): self.attributes = [] for key, value in sorted(kwarg.items()): self.attributes.append(key) setattr(self, key, value) @staticmethod def _checkContainerValue(value): result_value = None if isinstance(value, list): result_value = list() for list_value in value: result_value.append(CommonAPIRequest.toContainer(list_value)) elif isinstance(value, CommonAPIRequest): result_value = CommonAPIRequest.toContainer(value) else: result_value = value return result_value
[docs] @staticmethod def toContainer(data): if isinstance(data, dict) or isinstance(data, OrderedDict): return data if isinstance(data, list): data_list = list() for value in data: data_list.append(CommonAPIRequest._checkContainerValue(value)) return data_list data_dict = OrderedDict() data_dict['__name__'] = data.__class__.__name__ for key in data.attributes: data_dict[key] = CommonAPIRequest._checkContainerValue(getattr(data, key)) # for key, value in data.__dict__.items(): # data_dict[key] = CommonAPIRequest._checkContainerValue(value) return data_dict
# end map request class
[docs]class CommonResponseInfo: def __init__(self, xml_object, find_prefix): self._parseAttributesData(self.__class__, xml_object, find_prefix) def _attributeCastToType(self, data_str, cast_type_name): default_value = 0 if cast_type_name == 'bool': default_value = False elif cast_type_name == 'float': default_value = 0.0 elif cast_type_name == 'str': default_value = '' cast_type = eval(cast_type_name) data = None if data_str is not None: data = default_value try: if cast_type_name == 'bool': data = (data_str.lower() in ['true', '1', 'yes', 'on']) else: data = cast_type(data_str) except UnicodeEncodeError as err: try: data = data_str.encode('utf-8') except: pass except ValueError as err: pass return data def _isAttributeTypeDefault(self, attr_type_name): return (attr_type_name == 'int' or attr_type_name == 'long' or attr_type_name == 'float' or attr_type_name == 'bool' or attr_type_name == 'str') def _is_empty_object(self, atrrib_data): for key, value in atrrib_data.items(): if isinstance(value, list) and len(value) > 0: return False if not isinstance(value, list) and value is not None: return False return True def _append_object_list(self, attr_type_name, list_node, attr_type_instance, class_type, find_prefix): if self._isAttributeTypeDefault(attr_type_name): data_str = XMLWrapper.getNodeText(list_node) data = self._attributeCastToType(data_str, attr_type_name) else: if attr_type_instance == object: data = class_type(list_node, find_prefix) else: data = attr_type_instance(list_node, find_prefix) if not (hasattr(list_node, "attrib") and list_node.attrib): self._set_is_empty_to_true_if_not_set(data) if self._is_empty_true(data): return None else: return data def _set_is_empty_to_true_if_not_set(self, data): if not hasattr(data, "is_empty_object"): setattr(data, "is_empty_object", True) def _set_is_empty_to_false(self, data): setattr(data, "is_empty_object", False) def _is_empty_true(self, data): return hasattr(data, "is_empty_object") and data.is_empty_object def _parseAttributesData(self, class_type, xml_object, find_prefix): attrib_data_dict = dict() empty_object_size = len(self.__dict__) for name, attr_type in self.__dict__.items(): if not isinstance(attr_type, (TYPE_TYPE, TYPE_CLASS)) and not isinstance(attr_type, dict): continue if not isinstance(attr_type, dict): data = None attr_type_name = attr_type.__name__ if self._isAttributeTypeDefault(attr_type_name): data_str = XMLWrapper.getNodeAttr(xml_object, name) if data_str is None: child_attribute = XMLWrapper.getChildNode(xml_object, name) if child_attribute is not None: data_str = XMLWrapper.getNodeText(child_attribute) data = self._attributeCastToType(data_str, attr_type_name) else: child_node = XMLWrapper.getChildNode(xml_object, name) if child_node is not None: child_type = XMLWrapper.getNodeAttr(child_node, 'type', find_prefix) if child_type is None: data = attr_type(child_node, find_prefix) else: data = child_type(child_node, find_prefix) else: # continue data = None attrib_data_dict[name] = data else: child_node = XMLWrapper.getChildNode(xml_object, name) data_list = list() attr_type_instance = attr_type['list'] attr_type_name = attr_type_instance.__name__ if child_node is not None: child_count = 0 for list_node in child_node: data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance, class_type, find_prefix) if data_object is not None: data_list.append(data_object) child_count += 1 # I think that it is a logical bug, but ... # DannyK: In my opinion, this is done for backwards compatibility with element names that # originally represented a single item and later on changed to a list. # If a bug related to ghost objects is discovered in the future, # this is the first place I would start debugging. if child_count == 0: for list_node in xml_object: if XMLWrapper.getNodeName(list_node) == name: data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance, class_type, find_prefix) if data_object is not None: data_list.append(data_object) attrib_data_dict[name] = data_list if not self._is_empty_object(attrib_data_dict): self._set_is_empty_to_false(self) for key, value in attrib_data_dict.items(): setattr(self, key, value) elif len(self.__dict__) == empty_object_size: self._set_is_empty_to_true_if_not_set(self)
[docs]class CommonApiResult: def __init__(self, xml_object): error_node = XMLWrapper.getChildNode(xml_object, 'Error') self.error = None if error_node is None else XMLWrapper.getNodeText(error_node) error_code_node = XMLWrapper.getChildNode(xml_object, 'ErrorCode') self.error_code = None if error_code_node is None else XMLWrapper.getNodeText(error_code_node) self.response_info = None response_info_node = XMLWrapper.getChildNode(xml_object, 'ResponseInfo') if response_info_node is not None: find_prefix = XMLWrapper.getNodePrefix(response_info_node, 'xsi') type_attr = XMLWrapper.getNodeAttr(response_info_node, find_prefix + 'type') if type_attr is not None: response_class = CommonApiResult.importAPIClass(type_attr) if response_class is not None: self.response_info = response_class(response_info_node, find_prefix) success = XMLWrapper.getNodeAttr(xml_object, 'Success') success = success.lower() self.success = success in ['true', 'yes', 'on']
[docs] @staticmethod def importAPIClass(name): module = importlib.import_module('cloudshell.api.cloudshell_api') if hasattr(module, name): return getattr(module, name) return None
[docs]class CloudShellAPIError(Exception): def __init__(self, code, message, rawxml): self.code = code self.message = message self.rawxml = rawxml def __str__(self): return 'CloudShell API error ' + str(self.code) + ': ' + self.message def __repr__(self): return 'CloudShell API error ' + str(self.code) + ': ' + self.message
[docs]class CommonAPISession: def __init__(self, host, username, password, domain, connection_pool_size=25): self.host = host self.username = username self.password = password self.domain = domain if sys.version_info[0] == 2 and sys.version_info[2] < 13: ssl_protocol = ssl.PROTOCOL_SSLv23 else: ssl_protocol = ssl.PROTOCOL_TLS ctx = urllib_util.SSLContext(ssl_protocol) ctx.check_hostname = False ctx.verify_mode = ssl.CERT_NONE self._http = urllib3.PoolManager(num_pools=connection_pool_size, ssl_context=ctx) def _parseXML(self, xml_str): return etree.fromstring(xml_str) def _replaceSendValue(self, data): """Normalize xml string, escape special xml characters """ if data is None: return u'' try: data_str = unicode(data) except: data_str = unicode(data.decode("utf-8")) data_str = u"".join([escape(char) for char in data_str]) if data_str == 'True' or data_str == 'False': return data_str.lower() else: return data_str def _to_unicode_string(self, data): if data is None: return u'' try: return unicode(data) except: return unicode(data.decode("utf-8")) def _sendRequest(self, operation, message, request_headers, attempts=1): """ Sending http POST request through URLLIB package :param operation: operation name :param message: request body :param request_headers: header of the request :return: http response """ operation_url = str(self.url + operation) response = retry(self._http.request, ProtocolError, attempts, 1, method="POST", url=operation_url, body=message.encode('utf-8'), headers=request_headers) self._check_http_response(response) return response def _check_http_response(self, response): if not (200 <= response.status < 300): try: status_description = http.HTTPStatus(response.status).phrase except ValueError: status_description = 'Unknown' raise Exception(f"({status_description}) [{response.status}] reason:{response.reason}") if not response.data: raise Exception(response.reason) def _new_serializeRequestData(self, root_node, object_data, prev_type=None): """Generate xml from received request data using etree.xml """ if isinstance(object_data, dict): if '__name__' in object_data: working_node = etree.SubElement(root_node, object_data.pop('__name__')) else: working_node = root_node for key, value in object_data.items(): if value is None: continue if isinstance(value, basestring): new_node = etree.SubElement(working_node, key) new_node.text = value elif isinstance(value, bool): new_node = etree.SubElement(working_node, key) new_node.text = str(value).lower() else: child_node = working_node if isinstance(value, list): child_node = etree.SubElement(working_node, key) serialized_node = self._new_serializeRequestData(child_node, value) return root_node elif isinstance(object_data, list): for value in object_data: serialized_node = self._new_serializeRequestData(root_node, value, list()) elif isinstance(object_data, basestring) or isinstance(object_data, int) or isinstance(object_data, float): if prev_type is not None and isinstance(prev_type, list): child_node = etree.SubElement(root_node, 'string') child_node.text = object_data elif isinstance(object_data, bool): root_node.text = str(object_data).lower() else: root_node.text = self._to_unicode_string(object_data) return root_node
[docs] def generateAPIRequest(self, kwargs): """ Generic method for generation and sending XML requests :param return_type: type of returning data :param kwargs: map of the parameters that need to be send to the server :return: string data or API object """ if 'method_name' not in kwargs: raise CloudShellAPIError(404, 'Key "method_name" not in input data!', '') method_name = kwargs.pop('method_name', None) message = self._serialize_request(kwargs, method_name) http_response = self._sendRequest(method_name, message) result = self._handle_api_response(http_response) return result
def _handle_api_response(self, response): ''' :param http response: :return: either string or deserialized instance representing the api response ''' response_str = response.data response_str = self._remove_xml_namespace(response_str) try: api_response = self._deserialize_response(response_str) except Exception: try: status_description = http.HTTPStatus(response.status).phrase except ValueError: status_description = 'Unknown' raise Exception(f"{response.reason} ({status_description}) [{response.status}] reason:{response.reason} data:{str(response_str)}") if not api_response.success: raise CloudShellAPIError(api_response.error_code, api_response.error, response_str) result = response_str if api_response.response_info: result = api_response.response_info return result def _serialize_request(self, kwargs, method_name): request_node = etree.Element(method_name) # request_str = '<' + method_name + '>\n' for name in kwargs: child_node = etree.SubElement(request_node, name) self._new_serializeRequestData(child_node, kwargs[name]) return etree.tostring(request_node).decode("utf-8") def _deserialize_response(self, response_str): response_xml = XMLWrapper.parseXML(response_str) api_result = CommonApiResult(response_xml) return api_result def _remove_xml_namespace(self, response_str): return response_str.replace(b'xmlns="http://schemas.qualisystems.com/ResourceManagement/ApiCommandResult.xsd"', b'') \ .replace(b'&#x0;', b'<NUL>') def __prettify_xml(self, elem): """Return a pretty-printed XML string for the Element. """ from xml.dom.minidom import parseString rough_string = etree.tostring(elem, 'utf-8') reparsed = parseString(rough_string) return reparsed.toprettyxml(indent="\t")
[docs]def retry(func, exception, attempts=1, delay=1, *args, **kwargs): for i in range(attempts): try: return func(*args, **kwargs) except (MaxRetryError, ProtocolError, ReadTimeoutError, HTTPError) as e: if i == attempts -1: raise e time.sleep(delay) delay *= 2 except Exception as e: raise e