#!/usr/bin/python
# -*- coding: utf-8 -*-
import importlib
import time
import types
import ssl
import sys
import urllib3
from urllib3.util import SSLContext
from urllib3.exceptions import HTTPError, ProtocolError, MaxRetryError, ReadTimeoutError
import xml.etree.ElementTree as etree
from collections import OrderedDict
from xml.sax.saxutils import escape
try:
# Python 3
import http
import http.client as httplib
except ImportError:
# Python 2
import httplib
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
[docs]def get_http_status_message(status_code):
try:
return httplib.responses[status_code]
except KeyError:
return "Unknown Status Code"
if sys.version_info.major == 2:
unicode = unicode
str = str
bytes = str
basestring = basestring
TYPE_TYPE = types.TypeType
TYPE_CLASS = types.ClassType
elif sys.version_info.major == 3:
str = str
unicode = str
bytes = bytes
basestring = (str, bytes)
TYPE_TYPE = type
TYPE_CLASS = type
else:
raise
[docs]class UnauthorizedError(Exception):
pass
[docs]class XMLWrapper:
[docs] @staticmethod
def parseXML(xml_str):
return etree.fromstring(xml_str)
[docs] @staticmethod
def getRootNode(node):
return node.getroot()
[docs] @staticmethod
def getChildNode(parent_node, child_name, find_prefix=''):
return parent_node.find(find_prefix + child_name)
[docs] @staticmethod
def getAllChildNode(parent_node, child_name, find_prefix=''):
return parent_node.findall(find_prefix + child_name)
[docs] @staticmethod
def getChildNodeByAttr(parent_node, child_name, attr_name, attr_value):
return parent_node.find(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
[docs] @staticmethod
def getAllChildNodeByAttr(parent_node, child_name, attr_name, attr_value):
return parent_node.findall(child_name + '[@' + attr_name + '=\'' + attr_value + '\']')
[docs] @staticmethod
def getNodeName(node):
return node.tag
[docs] @staticmethod
def getNodeText(node):
return node.text
[docs] @staticmethod
def getNodeAttr(node, attribute_name, find_prefix=''):
return node.get(find_prefix + attribute_name)
[docs] @staticmethod
def getNodePrefix(node, prefix_name):
prefix = ''
if len(node.attrib) == 0:
return prefix
for attrib_name, value in node.attrib.items():
if attrib_name[0] == "{":
prefix, ignore, tag = attrib_name[1:].partition("}")
return "{" + prefix + "}"
return prefix
[docs] @staticmethod
def getStringFromXML(node, pretty_print=False):
return etree.tostring(node, pretty_print=pretty_print)
# map request class
[docs]class CommonAPIRequest:
def __init__(self, **kwarg):
self.attributes = []
for key, value in sorted(kwarg.items()):
self.attributes.append(key)
setattr(self, key, value)
@staticmethod
def _checkContainerValue(value):
result_value = None
if isinstance(value, list):
result_value = list()
for list_value in value:
result_value.append(CommonAPIRequest.toContainer(list_value))
elif isinstance(value, CommonAPIRequest):
result_value = CommonAPIRequest.toContainer(value)
else:
result_value = value
return result_value
[docs] @staticmethod
def toContainer(data):
if isinstance(data, dict) or isinstance(data, OrderedDict):
return data
if isinstance(data, list):
data_list = list()
for value in data:
data_list.append(CommonAPIRequest._checkContainerValue(value))
return data_list
data_dict = OrderedDict()
data_dict['__name__'] = data.__class__.__name__
for key in data.attributes:
data_dict[key] = CommonAPIRequest._checkContainerValue(getattr(data, key))
# for key, value in data.__dict__.items():
# data_dict[key] = CommonAPIRequest._checkContainerValue(value)
return data_dict
# end map request class
[docs]class CommonResponseInfo:
def __init__(self, xml_object, find_prefix):
self._parseAttributesData(self.__class__, xml_object, find_prefix)
def _attributeCastToType(self, data_str, cast_type_name):
default_value = 0
if cast_type_name == 'bool':
default_value = False
elif cast_type_name == 'float':
default_value = 0.0
elif cast_type_name == 'str':
default_value = ''
cast_type = eval(cast_type_name)
data = None
if data_str is not None:
data = default_value
try:
if cast_type_name == 'bool':
data = (data_str.lower() in ['true', '1', 'yes', 'on'])
else:
data = cast_type(data_str)
except UnicodeEncodeError as err:
try:
data = data_str.encode('utf-8')
except:
pass
except ValueError as err:
pass
return data
def _isAttributeTypeDefault(self, attr_type_name):
return (attr_type_name == 'int' or attr_type_name == 'long' or
attr_type_name == 'float' or attr_type_name == 'bool' or attr_type_name == 'str')
def _is_empty_object(self, atrrib_data):
for key, value in atrrib_data.items():
if isinstance(value, list) and len(value) > 0:
return False
if not isinstance(value, list) and value is not None:
return False
return True
# This is actually the original version of _is_empty_object.
# Since it does not consider an empty list as empty (returns False) it caused bugs with setting the
# is_empty_object flag, but fixing that caused an issue on populating the data.
# So I was forced to preserve the original version for compatibility with the data population logic.
def _is_empty_object_except_empty_list(self, atrrib_data):
for key, value in atrrib_data.items():
if isinstance(value, list) and len(value) > 0:
return False
if value is not None:
return False
return True
def _append_object_list(self, attr_type_name, list_node, attr_type_instance, class_type, find_prefix):
if self._isAttributeTypeDefault(attr_type_name):
data_str = XMLWrapper.getNodeText(list_node)
data = self._attributeCastToType(data_str, attr_type_name)
else:
if attr_type_instance == object:
data = class_type(list_node, find_prefix)
else:
data = attr_type_instance(list_node, find_prefix)
if not (hasattr(list_node, "attrib") and list_node.attrib):
self._set_is_empty_to_true_if_not_set(data)
if self._is_empty_true(data):
return None
else:
return data
def _set_is_empty_to_true_if_not_set(self, data):
if not hasattr(data, "is_empty_object"):
setattr(data, "is_empty_object", True)
def _set_is_empty_to_false(self, data):
setattr(data, "is_empty_object", False)
def _is_empty_true(self, data):
return hasattr(data, "is_empty_object") and data.is_empty_object
def _parseAttributesData(self, class_type, xml_object, find_prefix):
attrib_data_dict = dict()
empty_object_size = len(self.__dict__)
for name, attr_type in self.__dict__.items():
if not isinstance(attr_type, (TYPE_TYPE, TYPE_CLASS)) and not isinstance(attr_type, dict):
continue
if not isinstance(attr_type, dict):
data = None
attr_type_name = attr_type.__name__
if self._isAttributeTypeDefault(attr_type_name):
data_str = XMLWrapper.getNodeAttr(xml_object, name)
if data_str is None:
child_attribute = XMLWrapper.getChildNode(xml_object, name)
if child_attribute is not None:
data_str = XMLWrapper.getNodeText(child_attribute)
data = self._attributeCastToType(data_str, attr_type_name)
else:
child_node = XMLWrapper.getChildNode(xml_object, name)
if child_node is not None:
child_type = XMLWrapper.getNodeAttr(child_node, 'type', find_prefix)
if child_type is None:
data = attr_type(child_node, find_prefix)
else:
data = child_type(child_node, find_prefix)
else:
# continue
data = None
attrib_data_dict[name] = data
else:
child_node = XMLWrapper.getChildNode(xml_object, name)
data_list = list()
attr_type_instance = attr_type['list']
attr_type_name = attr_type_instance.__name__
if child_node is not None:
child_count = 0
for list_node in child_node:
data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance,
class_type, find_prefix)
if data_object is not None:
data_list.append(data_object)
child_count += 1
# I think that it is a logical bug, but ...
# DannyK: In my opinion, this is done for backwards compatibility with element names that
# originally represented a single item and later on changed to a list.
# If a bug related to ghost objects is discovered in the future,
# this is the first place I would start debugging.
if child_count == 0:
for list_node in xml_object:
if XMLWrapper.getNodeName(list_node) == name:
data_object = self._append_object_list(attr_type_name, list_node, attr_type_instance,
class_type, find_prefix)
if data_object is not None:
data_list.append(data_object)
attrib_data_dict[name] = data_list
if not self._is_empty_object(attrib_data_dict):
self._set_is_empty_to_false(self)
elif len(self.__dict__) == empty_object_size:
self._set_is_empty_to_true_if_not_set(self)
if not self._is_empty_object_except_empty_list(attrib_data_dict):
for key, value in attrib_data_dict.items():
setattr(self, key, value)
[docs]class CommonApiResult:
def __init__(self, xml_object):
error_node = XMLWrapper.getChildNode(xml_object, 'Error')
self.error = None if error_node is None else XMLWrapper.getNodeText(error_node)
error_code_node = XMLWrapper.getChildNode(xml_object, 'ErrorCode')
self.error_code = None if error_code_node is None else XMLWrapper.getNodeText(error_code_node)
self.response_info = None
response_info_node = XMLWrapper.getChildNode(xml_object, 'ResponseInfo')
if response_info_node is not None:
find_prefix = XMLWrapper.getNodePrefix(response_info_node, 'xsi')
type_attr = XMLWrapper.getNodeAttr(response_info_node, find_prefix + 'type')
if type_attr is not None:
response_class = CommonApiResult.importAPIClass(type_attr)
if response_class is not None:
self.response_info = response_class(response_info_node, find_prefix)
success = XMLWrapper.getNodeAttr(xml_object, 'Success')
success = success.lower()
self.success = success in ['true', 'yes', 'on']
[docs] @staticmethod
def importAPIClass(name):
module = importlib.import_module('cloudshell.api.cloudshell_api')
if hasattr(module, name):
return getattr(module, name)
return None
[docs]class CloudShellAPIError(Exception):
def __init__(self, code, message, rawxml):
self.code = code
self.message = message
self.rawxml = rawxml
def __str__(self):
return 'CloudShell API error ' + str(self.code) + ': ' + self.message
def __repr__(self):
return 'CloudShell API error ' + str(self.code) + ': ' + self.message
[docs]class CommonAPISession:
def __init__(self, host, username, password, domain, connection_pool_size=25, **kwargs):
self.host = host
self.username = username
self.password = password
self.domain = domain
if sys.version_info[0] == 2 and sys.version_info[2] < 13:
ssl_protocol = ssl.PROTOCOL_SSLv23
else:
ssl_protocol = ssl.PROTOCOL_TLS
ctx = SSLContext(ssl_protocol)
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self._http = urllib3.PoolManager(num_pools=connection_pool_size, ssl_context=ctx, **kwargs)
def _parseXML(self, xml_str):
return etree.fromstring(xml_str)
def _replaceSendValue(self, data):
"""Normalize xml string, escape special xml characters
"""
if data is None:
return u''
try:
data_str = unicode(data)
except:
data_str = unicode(data.decode("utf-8"))
data_str = u"".join([escape(char) for char in data_str])
if data_str == 'True' or data_str == 'False':
return data_str.lower()
else:
return data_str
def _to_unicode_string(self, data):
if data is None:
return u''
try:
return unicode(data)
except:
return unicode(data.decode("utf-8"))
def _sendRequest(self, operation, message, request_headers, attempts=1):
""" Sending http POST request through URLLIB package
:param operation: operation name
:param message: request body
:param request_headers: header of the request
:return: http response
"""
operation_url = str(self.url + operation)
response = retry(self._http.request, ProtocolError, attempts, 1,
method="POST", url=operation_url, body=message.encode('utf-8'), headers=request_headers)
self._check_http_response(response)
return response
def _check_http_response(self, response):
if not (200 <= response.status < 300):
try:
status_description = get_http_status_message(response.status)
except ValueError:
status_description = 'Unknown'
if response.status == 401:
raise UnauthorizedError(
"({}) [{}] reason:{}".format(status_description, response.status,
response.reason))
raise Exception(
"({}) [{}] reason:{}".format(status_description, response.status,
response.reason))
if not response.data:
raise Exception(response.reason)
def _new_serializeRequestData(self, root_node, object_data, prev_type=None):
"""Generate xml from received request data using etree.xml
"""
if isinstance(object_data, dict):
if '__name__' in object_data:
working_node = etree.SubElement(root_node, object_data.pop('__name__'))
else:
working_node = root_node
for key, value in object_data.items():
if value is None:
continue
if isinstance(value, basestring):
new_node = etree.SubElement(working_node, key)
new_node.text = value
elif isinstance(value, bool):
new_node = etree.SubElement(working_node, key)
new_node.text = str(value).lower()
else:
child_node = working_node
if isinstance(value, list):
child_node = etree.SubElement(working_node, key)
serialized_node = self._new_serializeRequestData(child_node, value)
return root_node
elif isinstance(object_data, list):
for value in object_data:
serialized_node = self._new_serializeRequestData(root_node, value, list())
elif isinstance(object_data, basestring) or isinstance(object_data, int) or isinstance(object_data, float):
if prev_type is not None and isinstance(prev_type, list):
child_node = etree.SubElement(root_node, 'string')
child_node.text = object_data
elif isinstance(object_data, bool):
root_node.text = str(object_data).lower()
else:
root_node.text = self._to_unicode_string(object_data)
return root_node
[docs] def generateAPIRequest(self, kwargs):
"""
Generic method for generation and sending XML requests
:param return_type: type of returning data
:param kwargs: map of the parameters that need to be send to the server
:return: string data or API object
"""
if 'method_name' not in kwargs:
raise CloudShellAPIError(404, 'Key "method_name" not in input data!', '')
method_name = kwargs.pop('method_name', None)
message = self._serialize_request(kwargs, method_name)
http_response = self._sendRequest(method_name, message)
result = self._handle_api_response(http_response)
return result
def _handle_api_response(self, response):
'''
:param http response:
:return: either string or deserialized instance representing the api response
'''
response_str = response.data
response_str = self._remove_xml_namespace(response_str)
try:
api_response = self._deserialize_response(response_str)
except Exception:
try:
status_description = get_http_status_message(response.status)
except ValueError:
status_description = 'Unknown'
raise Exception("{} ({}) [{}] reason:{} data:{}".format(response.reason, status_description, response.status, response.reason, str(response_str)))
if not api_response.success:
raise CloudShellAPIError(api_response.error_code, api_response.error, response_str)
result = response_str
if api_response.response_info:
result = api_response.response_info
return result
def _serialize_request(self, kwargs, method_name):
request_node = etree.Element(method_name)
# request_str = '<' + method_name + '>\n'
for name in kwargs:
child_node = etree.SubElement(request_node, name)
self._new_serializeRequestData(child_node, kwargs[name])
return etree.tostring(request_node).decode("utf-8")
def _deserialize_response(self, response_str):
response_xml = XMLWrapper.parseXML(response_str)
api_result = CommonApiResult(response_xml)
return api_result
def _remove_xml_namespace(self, response_str):
return response_str.replace(b'xmlns="http://schemas.qualisystems.com/ResourceManagement/ApiCommandResult.xsd"',
b'') \
.replace(b'�', b'<NUL>')
def __prettify_xml(self, elem):
"""Return a pretty-printed XML string for the Element.
"""
from xml.dom.minidom import parseString
rough_string = etree.tostring(elem, 'utf-8')
reparsed = parseString(rough_string)
return reparsed.toprettyxml(indent="\t")
[docs]def retry(func, exception, attempts=1, delay=1, *args, **kwargs):
for i in range(attempts):
try:
return func(*args, **kwargs)
except (MaxRetryError, ProtocolError, ReadTimeoutError, HTTPError) as e:
if i == attempts -1:
raise e
time.sleep(delay)
delay *= 2
except Exception as e:
raise e