Python, GCC and additional libraries are required to compile and install the packages used by SNAPS. These
dependencies need to be installed whether or not a virtual Python environment is used.
+Note: SNAPS-OO works best under Python 2.7; however, all of the code except in snaps.openstack.provisioning.ansible
+should work properly within a Python 3.4.4 runtime. Ansible support will not work in any Python 3.x when the Ansible
+version is 2.3.0 or prior. No indications when this support will be added as of 5 May 2017.
+
CentOS 7
--------
#!/usr/bin/python
#
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
"""
Parses the "ports" configuration
:param config: The dictionary to parse
- :param os_creds: The OpenStack credentials object
:return: a list of PortConfig objects
"""
out = list()
flavor_creator.create(cleanup=cleanup)
flavors[flavor_config['name']] = flavor_creator
except Exception as e:
- for key, flavor_creator in flavors.iteritems():
+ for key, flavor_creator in flavors.items():
flavor_creator.clean()
raise e
logger.info('Created configured flavors')
images[image_config['name']] = deploy_utils.create_image(__get_os_credentials(os_conn_config),
ImageSettings(image_config), cleanup)
except Exception as e:
- for key, image_creator in images.iteritems():
+ for key, image_creator in images.items():
image_creator.clean()
raise e
logger.info('Created configured images')
network_dict[net_name] = deploy_utils.create_network(
os_creds, NetworkSettings(config=network_conf['network']), cleanup)
except Exception as e:
- for key, net_creator in network_dict.iteritems():
+ for key, net_creator in network_dict.items():
net_creator.clean()
raise e
router_dict[router_name] = deploy_utils.create_router(
os_creds, RouterSettings(config=router_conf['router']), cleanup)
except Exception as e:
- for key, router_creator in router_dict.iteritems():
+ for key, router_creator in router_dict.items():
router_creator.clean()
raise e
keypairs_dict[keypair_config['name']] = deploy_utils.create_keypair(
__get_os_credentials(os_conn_config), kp_settings, cleanup)
except Exception as e:
- for key, keypair_creator in keypairs_dict.iteritems():
+ for key, keypair_creator in keypairs_dict.items():
keypair_creator.clean()
raise e
else:
raise Exception('Instance configuration is None. Cannot instantiate')
except Exception as e:
- logger.error('Unexpected error creating instances. Attempting to cleanup environment - ' + e.message)
- for key, inst_creator in vm_dict.iteritems():
+ logger.error('Unexpected error creating instances. Attempting to cleanup environment - ' + str(e))
+ for key, inst_creator in vm_dict.items():
inst_creator.clean()
raise e
logger.info("Applying Ansible Playbooks")
if ansible_configs:
# Ensure all hosts are accepting SSH session requests
- for vm_inst in vm_dict.values():
+ for vm_inst in list(vm_dict.values()):
if not vm_inst.vm_ssh_active(block=True):
- logger.warn("Timeout waiting for instance to respond to SSH requests")
+ logger.warning("Timeout waiting for instance to respond to SSH requests")
return False
# Set CWD so the deployment file's playbook location can leverage relative paths
proxy_setting=proxy_settings)
if retval != 0:
# Not a fatal type of event
- logger.warn('Unable to apply playbook found at location - ' + ansible_config('playbook_location'))
+ logger.warning('Unable to apply playbook found at location - ' + ansible_config('playbook_location'))
def __get_connection_info(ansible_config, vm_dict):
"""
if var_config and vm_dict and len(vm_dict) > 0:
variables = dict()
- for key, value in var_config.iteritems():
+ for key, value in var_config.items():
value = __get_variable_value(value, os_creds, vm_dict, image_dict, flavor_dict)
if key and value:
variables[key] = value
logger.info("Set Jinga2 variable with key [" + key + "] the value [" + value + ']')
else:
- logger.warn('Key [' + str(key) + '] or Value [' + str(value) + '] must not be None')
+ logger.warning('Key [' + str(key) + '] or Value [' + str(value) + '] must not be None')
return variables
return None
if config:
os_config = config.get('openstack')
+ os_conn_config = None
+ flavor_dict = {}
image_dict = {}
network_dict = {}
router_dict = {}
arguments.clean is not ARG_NOT_SET)
logger.info('Completed creating/retrieving all configured instances')
except Exception as e:
- logger.error('Unexpected error deploying environment. Rolling back due to - ' + e.message)
+ logger.error('Unexpected error deploying environment. Rolling back due to - ' + str(e))
__cleanup(vm_dict, keypairs_dict, router_dict, network_dict, image_dict, flavor_dict, True)
raise e
arguments.clean_image is not ARG_NOT_SET)
elif arguments.deploy is not ARG_NOT_SET:
logger.info('Configuring NICs where required')
- for vm in vm_dict.itervalues():
+ for vm in vm_dict.values():
vm.config_nics()
logger.info('Completed NIC configuration')
def __cleanup(vm_dict, keypairs_dict, router_dict, network_dict, image_dict, flavor_dict, clean_image=False):
- for key, vm_inst in vm_dict.iteritems():
+ for key, vm_inst in vm_dict.items():
vm_inst.clean()
- for key, kp_inst in keypairs_dict.iteritems():
+ for key, kp_inst in keypairs_dict.items():
kp_inst.clean()
- for key, router_inst in router_dict.iteritems():
+ for key, router_inst in router_dict.items():
try:
router_inst.clean()
except Exception:
logger.warning("Router not found continuing to next component")
- for key, net_inst in network_dict.iteritems():
+ for key, net_inst in network_dict.items():
try:
net_inst.clean()
except Exception:
logger.warning("Network not found continuing to next component")
if clean_image:
- for key, image_inst in image_dict.iteritems():
+ for key, image_inst in image_dict.items():
image_inst.clean()
- for key, flavor_inst in flavor_dict.iteritems():
+ for key, flavor_inst in flavor_dict.items():
flavor_inst.clean()
args = parser.parse_args()
if args.deploy is ARG_NOT_SET and args.clean is ARG_NOT_SET:
- print 'Must enter either -d for deploy or -c for cleaning up and environment'
+ print('Must enter either -d for deploy or -c for cleaning up and environment')
exit(1)
if args.deploy is not ARG_NOT_SET and args.clean is not ARG_NOT_SET:
- print 'Cannot enter both options -d/--deploy and -c/--clean'
+ print('Cannot enter both options -d/--deploy and -c/--clean')
exit(1)
main(args)
def test_construction_positional(self):
props = {'foo': 'bar'}
image = Image('name', 'id', 100, props)
- self.assertEquals('name', image.name)
- self.assertEquals('id', image.id)
- self.assertEquals(100, image.size)
- self.assertEquals(props, image.properties)
+ self.assertEqual('name', image.name)
+ self.assertEqual('id', image.id)
+ self.assertEqual(100, image.size)
+ self.assertEqual(props, image.properties)
def test_construction_named(self):
props = {'foo': 'bar'}
image = Image(image_id='id', properties=props, name='name', size=101)
- self.assertEquals('name', image.name)
- self.assertEquals('id', image.id)
- self.assertEquals(101, image.size)
- self.assertEquals(props, image.properties)
+ self.assertEqual('name', image.name)
+ self.assertEqual('id', image.id)
+ self.assertEqual(101, image.size)
+ self.assertEqual(props, image.properties)
# See the License for the specific language governing permissions and
# limitations under the License.
import os
-import urllib2
import logging
+try:
+ import urllib.request as urllib
+except ImportError:
+ import urllib2 as urllib
import yaml
:raise Exception when file cannot be found
"""
if file_exists(file_path):
- return open(file_path, 'r')
+ return open(file_path, 'rb')
else:
raise Exception('File with path cannot be found - ' + file_path)
if not name:
name = url.rsplit('/')[-1]
dest = dest_path + '/' + name
- try:
- logger.debug('Downloading file from - ' + url)
- # Override proxy settings to use localhost to download file
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- urllib2.install_opener(opener)
- response = urllib2.urlopen(url)
- except (urllib2.HTTPError, urllib2.URLError):
- raise Exception
-
+ logger.debug('Downloading file from - ' + url)
+ # Override proxy settings to use localhost to download file
with open(dest, 'wb') as f:
logger.debug('Saving file to - ' + dest)
+ response = __get_url_response(url)
f.write(response.read())
return f
:param url: the URL to inspect
:return: the number of bytes
"""
- proxy_handler = urllib2.ProxyHandler({})
- opener = urllib2.build_opener(proxy_handler)
- urllib2.install_opener(opener)
- response = urllib2.urlopen(url)
+ response = __get_url_response(url)
return response.headers['Content-Length']
+def __get_url_response(url):
+ """
+ Returns a response object for a given URL
+ :param url: the URL
+ :return: the response
+ """
+ proxy_handler = urllib.ProxyHandler({})
+ opener = urllib.build_opener(proxy_handler)
+ urllib.install_opener(opener)
+ return urllib.urlopen(url)
+
+
def read_yaml(config_file_path):
"""
Reads the yaml file and returns a dictionary object representation
# TODO - Place this API call into glance_utils.
status = glance_utils.get_image_status(self.__glance, self.__image)
if not status:
- logger.warn('Cannot image status for image with ID - ' + self.__image.id)
+ logger.warning('Cannot image status for image with ID - ' + self.__image.id)
return False
if status == 'ERROR':
logger.info('Deleting Floating IP - ' + floating_ip.ip)
nova_utils.delete_floating_ip(self.__nova, floating_ip)
except Exception as e:
- logger.error('Error deleting Floating IP - ' + e.message)
+ logger.error('Error deleting Floating IP - ' + str(e))
self.__floating_ips = list()
self.__floating_ip_dict = dict()
try:
neutron_utils.delete_port(self.__neutron, port)
except PortNotFoundClient as e:
- logger.warn('Unexpected error deleting port - ' + e.message)
+ logger.warning('Unexpected error deleting port - ' + str(e))
pass
self.__ports = list()
logger.error('VM not deleted within the timeout period of ' +
str(self.instance_settings.vm_delete_timeout) + ' seconds')
except Exception as e:
- logger.error('Unexpected error while checking VM instance status - ' + e.message)
+ logger.error('Unexpected error while checking VM instance status - ' + str(e))
def __setup_ports(self, port_settings, cleanup):
"""
' on instance - ' + self.instance_settings.name)
return
except Exception as e:
- logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + e.message)
+ logger.debug('Retry adding floating IP to instance. Last attempt failed with - ' + str(e))
time.sleep(poll_interval)
count -= 1
pass
if subnet_name:
subnet = neutron_utils.get_subnet_by_name(self.__neutron, subnet_name)
if not subnet:
- logger.warn('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name)
+ logger.warning('Cannot retrieve port IP as subnet could not be located with name - ' + subnet_name)
return None
for fixed_ip in port_dict['fixed_ips']:
if fixed_ip['subnet_id'] == subnet['subnet']['id']:
for key, port in self.__ports:
if key == port_name:
return port
- logger.warn('Cannot find port with name - ' + port_name)
+ logger.warning('Cannot find port with name - ' + port_name)
return None
def config_nics(self):
[floating_ip], self.get_image_user(), self.keypair_settings.private_filepath,
variables, self.__os_creds.proxy_settings)
else:
- logger.warn('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' +
+ logger.warning('VM ' + self.instance_settings.name + ' cannot self configure NICs eth1++. ' +
'No playbook or keypairs found.')
def get_image_user(self):
return self.__vm_status_check(STATUS_DELETED, block, self.instance_settings.vm_delete_timeout,
poll_interval)
except NotFound as e:
- logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + e.message)
+ logger.debug("Instance not found when querying status for " + STATUS_DELETED + ' with message ' + str(e))
return True
def vm_active(self, block=False, poll_interval=POLL_INTERVAL):
"""
instance = self.__nova.servers.get(self.__vm.id)
if not instance:
- logger.warn('Cannot find instance with id - ' + self.__vm.id)
+ logger.warning('Cannot find instance with id - ' + self.__vm.id)
return False
if instance.status == 'ERROR':
self.keypair_settings.private_filepath,
proxy_settings=self.__os_creds.proxy_settings)
else:
- logger.warn('Cannot return an SSH client. No Floating IP configured')
+ logger.warning('Cannot return an SSH client. No Floating IP configured')
def add_security_group(self, security_group):
"""
self.vm_active(block=True)
if not security_group:
- logger.warn('Security group object is None, cannot add')
+ logger.warning('Security group object is None, cannot add')
return False
try:
nova_utils.add_security_group(self.__nova, self.get_vm_inst(), security_group['security_group']['name'])
return True
except NotFound as e:
- logger.warn('Security group not added - ' + e.message)
+ logger.warning('Security group not added - ' + str(e))
return False
def remove_security_group(self, security_group):
self.vm_active(block=True)
if not security_group:
- logger.warn('Security group object is None, cannot remove')
+ logger.warning('Security group object is None, cannot remove')
return False
try:
nova_utils.remove_security_group(self.__nova, self.get_vm_inst(), security_group)
return True
except NotFound as e:
- logger.warn('Security group not removed - ' + e.message)
+ logger.warning('Security group not removed - ' + str(e))
return False
self.security_group_names = set(config['security_group_names'])
elif isinstance(config['security_group_names'], set):
self.security_group_names = config['security_group_names']
- elif isinstance(config['security_group_names'], basestring):
+ elif isinstance(config['security_group_names'], str):
self.security_group_names = [config['security_group_names']]
else:
raise Exception('Invalid data type for security_group_names attribute')
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
import logging
import os
-from Crypto.PublicKey import RSA
from novaclient.exceptions import NotFound
from snaps.openstack.utils import nova_utils
"""
logger.info('Creating keypair %s...' % self.keypair_settings.name)
- try:
- self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name)
-
- if not self.__keypair and not cleanup:
- if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath):
- logger.info("Uploading existing keypair")
- self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name,
- self.keypair_settings.public_filepath)
- else:
- logger.info("Creating new keypair")
- # TODO - Make this value configurable
- keys = RSA.generate(1024)
- self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name,
- keys.publickey().exportKey('OpenSSH'))
- nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath,
- self.keypair_settings.private_filepath)
-
- return self.__keypair
- except Exception as e:
- logger.error('Unexpected error creating keypair named - ' + self.keypair_settings.name)
- self.clean()
- raise Exception(e.message)
+ self.__keypair = nova_utils.get_keypair_by_name(self.__nova, self.keypair_settings.name)
+
+ if not self.__keypair and not cleanup:
+ if self.keypair_settings.public_filepath and os.path.isfile(self.keypair_settings.public_filepath):
+ logger.info("Uploading existing keypair")
+ self.__keypair = nova_utils.upload_keypair_file(self.__nova, self.keypair_settings.name,
+ self.keypair_settings.public_filepath)
+ else:
+ logger.info("Creating new keypair")
+ # TODO - Make this value configurable
+ keys = nova_utils.create_keys(1024)
+ self.__keypair = nova_utils.upload_keypair(self.__nova, self.keypair_settings.name,
+ nova_utils.public_key_openssh(keys))
+ nova_utils.save_keys_to_files(keys, self.keypair_settings.public_filepath,
+ self.keypair_settings.private_filepath)
+
+ return self.__keypair
def clean(self):
"""
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
logger.info('Deleting subnet with name ' + subnet['subnet']['name'])
neutron_utils.delete_subnet(self.__neutron, subnet)
except NotFound as e:
- logger.warn('Error deleting subnet with message - ' + e.message)
+ logger.warning('Error deleting subnet with message - ' + str(e))
pass
self.__subnets = list()
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
:param cleanup: Denotes whether or not this is being called for cleanup or not
:return: The OpenStack Image object
"""
- try:
- self.__project = keystone_utils.get_project(keystone=self.__keystone,
- project_name=self.project_settings.name)
- if self.__project:
- logger.info('Found project with name - ' + self.project_settings.name)
- elif not cleanup:
- self.__project = keystone_utils.create_project(self.__keystone, self.project_settings)
- else:
- logger.info('Did not create image due to cleanup mode')
- except Exception as e:
- logger.error('Unexpected error. Rolling back')
- self.clean()
- raise Exception(e.message)
+ self.__project = keystone_utils.get_project(keystone=self.__keystone,
+ project_name=self.project_settings.name)
+ if self.__project:
+ logger.info('Found project with name - ' + self.project_settings.name)
+ elif not cleanup:
+ self.__project = keystone_utils.create_project(self.__keystone, self.project_settings)
+ else:
+ logger.info('Did not create image due to cleanup mode')
return self.__project
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
:return: the router object
"""
logger.debug('Creating Router with name - ' + self.router_settings.name)
- try:
- existing = False
- router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name)
- if router_inst:
- self.__router = router_inst
- existing = True
+ existing = False
+ router_inst = neutron_utils.get_router_by_name(self.__neutron, self.router_settings.name)
+ if router_inst:
+ self.__router = router_inst
+ existing = True
+ else:
+ if not cleanup:
+ self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings)
+
+ for internal_subnet_name in self.router_settings.internal_subnets:
+ internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name)
+ if internal_subnet:
+ self.__internal_subnets.append(internal_subnet)
+ if internal_subnet and not cleanup and not existing:
+ logger.debug('Adding router to subnet...')
+ self.__internal_router_interface = neutron_utils.add_interface_router(
+ self.__neutron, self.__router, subnet=internal_subnet)
else:
- if not cleanup:
- self.__router = neutron_utils.create_router(self.__neutron, self.__os_creds, self.router_settings)
-
- for internal_subnet_name in self.router_settings.internal_subnets:
- internal_subnet = neutron_utils.get_subnet_by_name(self.__neutron, internal_subnet_name)
- if internal_subnet:
- self.__internal_subnets.append(internal_subnet)
- if internal_subnet and not cleanup and not existing:
- logger.debug('Adding router to subnet...')
- self.__internal_router_interface = neutron_utils.add_interface_router(
- self.__neutron, self.__router, subnet=internal_subnet)
- else:
- raise Exception('Subnet not found with name ' + internal_subnet_name)
+ raise Exception('Subnet not found with name ' + internal_subnet_name)
+
+ for port_setting in self.router_settings.port_settings:
+ port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name)
+ logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name)
+ if port:
+ self.__ports.append(port)
- for port_setting in self.router_settings.port_settings:
- port = neutron_utils.get_port_by_name(self.__neutron, port_setting.name)
- logger.info('Retrieved port ' + port_setting.name + ' for router - ' + self.router_settings.name)
+ if not port and not cleanup and not existing:
+ port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting)
if port:
+ logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name)
self.__ports.append(port)
+ neutron_utils.add_interface_router(self.__neutron, self.__router, port=port)
+ else:
+ raise Exception('Error creating port with name - ' + port_setting.name)
- if not port and not cleanup and not existing:
- port = neutron_utils.create_port(self.__neutron, self.__os_creds, port_setting)
- if port:
- logger.info('Created port ' + port_setting.name + ' for router - ' + self.router_settings.name)
- self.__ports.append(port)
- neutron_utils.add_interface_router(self.__neutron, self.__router, port=port)
- else:
- raise Exception('Error creating port with name - ' + port_setting.name)
-
- return self.__router
- except Exception as e:
- self.clean()
- raise Exception(e.message)
+ return self.__router
def clean(self):
"""
policies.
:param external_gateway: Name of the external network to which to route
:param admin_state_up: The administrative status of the router. True = up / False = down (default True)
- :param enable_snat: Boolean value. Enable Source NAT (SNAT) attribute. Default is True. To persist this
- attribute value, set the enable_snat_by_default option in the neutron.conf file.
:param external_fixed_ips: Dictionary containing the IP address parameters.
:param internal_subnets: List of subnet names to which to connect this router for Floating IP purposes
:param port_settings: List of PortSettings objects
else:
raise Exception('Could not find the external network named - ' + self.external_gateway)
- #TODO: Enable SNAT option for Router
- #TODO: Add external_fixed_ips Tests
+ # TODO: Enable SNAT option for Router
+ # TODO: Add external_fixed_ips Tests
return {'router': out}
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
if not self.__security_group and not cleanup:
# Create the security group
self.__security_group = neutron_utils.create_security_group(self.__neutron, self.__keystone,
- self.sec_grp_settings)
+ self.sec_grp_settings)
# Get the rules added for free
auto_rules = neutron_utils.get_rules_by_security_group(self.__neutron, self.__security_group)
"""
Removes and deletes the rules then the security group.
"""
- for setting, rule in self.__rules.iteritems():
+ for setting, rule in self.__rules.items():
try:
neutron_utils.delete_security_group_rule(self.__neutron, rule)
except NotFound as e:
- logger.warn('Rule not found, cannot delete - ' + e.message)
+ logger.warning('Rule not found, cannot delete - ' + str(e))
pass
self.__rules = dict()
try:
neutron_utils.delete_security_group(self.__neutron, self.__security_group)
except NotFound as e:
- logger.warn('Security Group not found, cannot delete - ' + e.message)
+ logger.warning('Security Group not found, cannot delete - ' + str(e))
self.__security_group = None
if rule_setting:
self.__rules.pop(rule_setting)
else:
- logger.warn('Rule setting is None, cannot remove rule')
+ logger.warning('Rule setting is None, cannot remove rule')
def __get_setting_from_rule(self, rule):
"""
"""
if not direction:
return None
- if type(direction) is Direction:
+ if isinstance(direction, Direction):
return direction
- elif isinstance(direction, basestring):
- if direction == 'egress':
+ else:
+ dir_str = str(direction)
+ if dir_str == 'egress':
return Direction.egress
- elif direction == 'ingress':
+ elif dir_str == 'ingress':
return Direction.ingress
else:
- raise Exception('Invalid Direction - ' + direction)
- else:
- raise Exception('Invalid Direction object - ' + str(direction))
+ raise Exception('Invalid Direction - ' + dir_str)
def map_protocol(protocol):
"""
if not protocol:
return None
- elif type(protocol) is Protocol:
+ elif isinstance(protocol, Protocol):
return protocol
- elif isinstance(protocol, basestring):
- if protocol == 'icmp':
+ else:
+ proto_str = str(protocol)
+ if proto_str == 'icmp':
return Protocol.icmp
- elif protocol == 'tcp':
+ elif proto_str == 'tcp':
return Protocol.tcp
- elif protocol == 'udp':
+ elif proto_str == 'udp':
return Protocol.udp
- elif protocol == 'null':
+ elif proto_str == 'null':
return Protocol.null
else:
- raise Exception('Invalid Protocol - ' + protocol)
- else:
- raise Exception('Invalid Protocol object - ' + str(protocol))
+ raise Exception('Invalid Protocol - ' + proto_str)
def map_ethertype(ethertype):
"""
if not ethertype:
return None
- elif type(ethertype) is Ethertype:
+ elif isinstance(ethertype, Ethertype):
return ethertype
- elif isinstance(ethertype, basestring):
- if ethertype == 'IPv6':
+ else:
+ eth_str = str(ethertype)
+ if eth_str == 'IPv6':
return Ethertype.IPv6
- elif ethertype == 'IPv4':
+ elif eth_str == 'IPv4':
return Ethertype.IPv4
else:
- raise Exception('Invalid Ethertype - ' + ethertype)
- else:
- raise Exception('Invalid Ethertype object - ' + str(ethertype))
+ raise Exception('Invalid Ethertype - ' + eth_str)
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
+# and others. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# http://www.apache.org/licenses/LICENSE-2.0
def test_name_ram_disk_vcpus_only(self):
settings = FlavorSettings(name='foo', ram=1, disk=2, vcpus=3)
- self.assertEquals('foo', settings.name)
- self.assertEquals('auto', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(0, settings.ephemeral)
- self.assertEquals(0, settings.swap)
- self.assertEquals(1.0, settings.rxtx_factor)
- self.assertEquals(True, settings.is_public)
- self.assertEquals(None, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('auto', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(0, settings.ephemeral)
+ self.assertEqual(0, settings.swap)
+ self.assertEqual(1.0, settings.rxtx_factor)
+ self.assertEqual(True, settings.is_public)
+ self.assertEqual(None, settings.metadata)
def test_config_with_name_ram_disk_vcpus_only(self):
settings = FlavorSettings(config={'name': 'foo', 'ram': 1, 'disk': 2, 'vcpus': 3})
- self.assertEquals('foo', settings.name)
- self.assertEquals('auto', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(0, settings.ephemeral)
- self.assertEquals(0, settings.swap)
- self.assertEquals(1.0, settings.rxtx_factor)
- self.assertEquals(True, settings.is_public)
- self.assertEquals(None, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('auto', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(0, settings.ephemeral)
+ self.assertEqual(0, settings.swap)
+ self.assertEqual(1.0, settings.rxtx_factor)
+ self.assertEqual(True, settings.is_public)
+ self.assertEqual(None, settings.metadata)
def test_all(self):
metadata = {'foo': 'bar'}
settings = FlavorSettings(name='foo', flavor_id='bar', ram=1, disk=2, vcpus=3, ephemeral=4, swap=5,
rxtx_factor=6.0, is_public=False, metadata=metadata)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(4, settings.ephemeral)
- self.assertEquals(5, settings.swap)
- self.assertEquals(6.0, settings.rxtx_factor)
- self.assertEquals(False, settings.is_public)
- self.assertEquals(metadata, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(4, settings.ephemeral)
+ self.assertEqual(5, settings.swap)
+ self.assertEqual(6.0, settings.rxtx_factor)
+ self.assertEqual(False, settings.is_public)
+ self.assertEqual(metadata, settings.metadata)
def test_config_all(self):
metadata = {'foo': 'bar'}
settings = FlavorSettings(config={'name': 'foo', 'flavor_id': 'bar', 'ram': 1, 'disk': 2, 'vcpus': 3,
'ephemeral': 4, 'swap': 5, 'rxtx_factor': 6.0, 'is_public': False,
'metadata': metadata})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor_id)
- self.assertEquals(1, settings.ram)
- self.assertEquals(2, settings.disk)
- self.assertEquals(3, settings.vcpus)
- self.assertEquals(4, settings.ephemeral)
- self.assertEquals(5, settings.swap)
- self.assertEquals(6.0, settings.rxtx_factor)
- self.assertEquals(False, settings.is_public)
- self.assertEquals(metadata, settings.metadata)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor_id)
+ self.assertEqual(1, settings.ram)
+ self.assertEqual(2, settings.disk)
+ self.assertEqual(3, settings.vcpus)
+ self.assertEqual(4, settings.ephemeral)
+ self.assertEqual(5, settings.swap)
+ self.assertEqual(6.0, settings.rxtx_factor)
+ self.assertEqual(False, settings.is_public)
+ self.assertEqual(metadata, settings.metadata)
class CreateFlavorTests(OSComponentTestCase):
flavor_creator_2 = OpenStackFlavor(self.os_creds, flavor_settings)
flavor2 = flavor_creator_2.create()
- self.assertEquals(flavor.id, flavor2.id)
+ self.assertEqual(flavor.id, flavor2.id)
def test_create_clean_flavor(self):
"""
from snaps import file_utils
from snaps.openstack.create_image import ImageSettings
-import openstack_tests
+from snaps.openstack.tests import openstack_tests
from snaps.openstack.utils import glance_utils
from snaps.openstack import create_image
from snaps.openstack import os_credentials
def test_name_user_format_url_only(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals(properties, settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual(properties, settings.extra_properties)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_config_with_name_user_format_url_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'download_url': 'http://foo.com'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
self.assertIsNone(settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_name_user_format_file_only(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_config_with_name_user_format_file_only(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
self.assertIsNone(settings.nic_config_pb_loc)
def test_all_url(self):
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', url='http://foo.com',
extra_properties=properties, nic_config_pb_loc='/foo/bar',
kernel_image_settings=kernel_settings, ramdisk_image_settings=ramdisk_settings)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals(properties, settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual(properties, settings.extra_properties)
self.assertIsNone(settings.image_file)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
- self.assertEquals('kernel', settings.kernel_image_settings.name)
- self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
- self.assertEquals('bar', settings.kernel_image_settings.image_user)
- self.assertEquals('qcow2', settings.kernel_image_settings.format)
- self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
- self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
- self.assertEquals('bar', settings.ramdisk_image_settings.image_user)
- self.assertEquals('qcow2', settings.ramdisk_image_settings.format)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('kernel', settings.kernel_image_settings.name)
+ self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
+ self.assertEqual('bar', settings.kernel_image_settings.image_user)
+ self.assertEqual('qcow2', settings.kernel_image_settings.format)
+ self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
+ self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
+ self.assertEqual('bar', settings.ramdisk_image_settings.image_user)
+ self.assertEqual('qcow2', settings.ramdisk_image_settings.format)
def test_config_all_url(self):
settings = ImageSettings(
'image_user': 'bar', 'format': 'qcow2'},
'ramdisk_image_settings': {'name': 'ramdisk', 'download_url': 'http://ramdisk.com',
'image_user': 'bar', 'format': 'qcow2'}})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
- self.assertEquals('http://foo.com', settings.url)
- self.assertEquals('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
+ self.assertEqual('http://foo.com', settings.url)
+ self.assertEqual('{\'hw_video_model\': \'vga\'}', settings.extra_properties)
self.assertIsNone(settings.image_file)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
- self.assertEquals('kernel', settings.kernel_image_settings.name)
- self.assertEquals('http://kernel.com', settings.kernel_image_settings.url)
- self.assertEquals('ramdisk', settings.ramdisk_image_settings.name)
- self.assertEquals('http://ramdisk.com', settings.ramdisk_image_settings.url)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('kernel', settings.kernel_image_settings.name)
+ self.assertEqual('http://kernel.com', settings.kernel_image_settings.url)
+ self.assertEqual('ramdisk', settings.ramdisk_image_settings.name)
+ self.assertEqual('http://ramdisk.com', settings.ramdisk_image_settings.url)
def test_all_file(self):
properties = {'hw_video_model': 'vga'}
settings = ImageSettings(name='foo', image_user='bar', img_format='qcow2', image_file='/foo/bar.qcow',
extra_properties=properties, nic_config_pb_loc='/foo/bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
- self.assertEquals(properties, settings.extra_properties)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
+ self.assertEqual(properties, settings.extra_properties)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
def test_config_all_file(self):
settings = ImageSettings(config={'name': 'foo', 'image_user': 'bar', 'format': 'qcow2',
'image_file': '/foo/bar.qcow',
'extra_properties': '{\'hw_video_model\' : \'vga\'}',
'nic_config_pb_loc': '/foo/bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.image_user)
- self.assertEquals('qcow2', settings.format)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.image_user)
+ self.assertEqual('qcow2', settings.format)
self.assertIsNone(settings.url)
- self.assertEquals('/foo/bar.qcow', settings.image_file)
- self.assertEquals('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
- self.assertEquals('/foo/bar', settings.nic_config_pb_loc)
+ self.assertEqual('/foo/bar.qcow', settings.image_file)
+ self.assertEqual('{\'hw_video_model\' : \'vga\'}', settings.extra_properties)
+ self.assertEqual('/foo/bar', settings.nic_config_pb_loc)
class CreateImageSuccessTests(OSIntegrationTestCase):
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(created_image.size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
def test_create_image_clean_url_properties(self):
"""
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def test_create_image_clean_file(self):
"""
retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
def test_create_delete_image(self):
"""
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
# Delete Image manually
glance_utils.delete_image(self.glance, created_image)
retrieved_image = glance_utils.get_image(self.glance, self.image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creator.get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(self.image_settings), retrieved_image.size)
- self.assertEquals(image1.name, retrieved_image.name)
- self.assertEquals(image1.id, retrieved_image.id)
- self.assertEquals(image1.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creator.get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(self.image_settings), retrieved_image.size)
+ self.assertEqual(image1.name, retrieved_image.name)
+ self.assertEqual(image1.id, retrieved_image.id)
+ self.assertEqual(image1.properties, retrieved_image.properties)
# Should be retrieving the instance data
os_image_2 = create_image.OpenStackImage(self.os_creds, self.image_settings)
image2 = os_image_2.create()
- self.assertEquals(image1.id, image2.id)
+ self.assertEqual(image1.id, image2.id)
class CreateImageNegativeTests(OSIntegrationTestCase):
main_image = glance_utils.get_image(self.glance, image_settings.name)
self.assertIsNotNone(main_image)
self.assertIsNotNone(image_creator.get_image())
- self.assertEquals(image_creator.get_image().id, main_image.id)
+ self.assertEqual(image_creator.get_image().id, main_image.id)
kernel_image = glance_utils.get_image(self.glance, image_settings.kernel_image_settings.name)
self.assertIsNotNone(kernel_image)
self.assertIsNotNone(image_creator.get_kernel_image())
- self.assertEquals(kernel_image.id, image_creator.get_kernel_image().id)
+ self.assertEqual(kernel_image.id, image_creator.get_kernel_image().id)
ramdisk_image = glance_utils.get_image(self.glance, image_settings.ramdisk_image_settings.name)
self.assertIsNotNone(ramdisk_image)
self.assertIsNotNone(image_creator.get_ramdisk_image())
- self.assertEquals(ramdisk_image.id, image_creator.get_ramdisk_image().id)
+ self.assertEqual(ramdisk_image.id, image_creator.get_ramdisk_image().id)
def test_create_three_part_image_from_file_3_creators(self):
"""
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_file_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
- self.assertEquals(get_image_size(kernel_file_image_settings), kernel_image.size)
+ self.assertEqual(get_image_size(kernel_file_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-initramfs'
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_file_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
- self.assertEquals(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
+ self.assertEqual(get_image_size(ramdisk_file_image_settings), ramdisk_image.size)
# Create the main image
image_url = 'http://download.cirros-cloud.net/0.3.4/cirros-0.3.4-x86_64-disk.img'
retrieved_image = glance_utils.get_image(self.glance, file_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(file_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(file_image_settings), retrieved_image.size)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def test_create_three_part_image_from_url_3_creators(self):
"""
self.image_creators.append(create_image.OpenStackImage(self.os_creds, kernel_image_settings))
kernel_image = self.image_creators[-1].create()
self.assertIsNotNone(kernel_image)
- self.assertEquals(get_image_size(kernel_image_settings), kernel_image.size)
+ self.assertEqual(get_image_size(kernel_image_settings), kernel_image.size)
# Create the ramdisk image
ramdisk_image_settings = openstack_tests.cirros_image_settings(
self.image_creators.append(create_image.OpenStackImage(self.os_creds, ramdisk_image_settings))
ramdisk_image = self.image_creators[-1].create()
self.assertIsNotNone(ramdisk_image)
- self.assertEquals(get_image_size(ramdisk_image_settings), ramdisk_image.size)
+ self.assertEqual(get_image_size(ramdisk_image_settings), ramdisk_image.size)
# Create the main image
os_image_settings = openstack_tests.cirros_image_settings(
retrieved_image = glance_utils.get_image(self.glance, os_image_settings.name)
self.assertIsNotNone(retrieved_image)
- self.assertEquals(self.image_creators[-1].get_image().size, retrieved_image.size)
- self.assertEquals(get_image_size(os_image_settings), retrieved_image.size)
+ self.assertEqual(self.image_creators[-1].get_image().size, retrieved_image.size)
+ self.assertEqual(get_image_size(os_image_settings), retrieved_image.size)
- self.assertEquals(created_image.name, retrieved_image.name)
- self.assertEquals(created_image.id, retrieved_image.id)
- self.assertEquals(created_image.properties, retrieved_image.properties)
+ self.assertEqual(created_image.name, retrieved_image.name)
+ self.assertEqual(created_image.id, retrieved_image.id)
+ self.assertEqual(created_image.properties, retrieved_image.properties)
def get_image_size(image_settings):
def test_name_flavor_port_only(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
settings = VmInstanceSettings(name='foo', flavor='bar', port_settings=[port_settings])
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(0, len(settings.security_group_names))
- self.assertEquals(0, len(settings.floating_ip_settings))
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(0, len(settings.security_group_names))
+ self.assertEqual(0, len(settings.floating_ip_settings))
self.assertIsNone(settings.sudo_user)
- self.assertEquals(900, settings.vm_boot_timeout)
- self.assertEquals(300, settings.vm_delete_timeout)
- self.assertEquals(180, settings.ssh_connect_timeout)
+ self.assertEqual(900, settings.vm_boot_timeout)
+ self.assertEqual(300, settings.vm_delete_timeout)
+ self.assertEqual(180, settings.ssh_connect_timeout)
self.assertIsNone(settings.availability_zone)
def test_config_with_name_flavor_port_only(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
settings = VmInstanceSettings(config={'name': 'foo', 'flavor': 'bar', 'ports': [port_settings]})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(0, len(settings.security_group_names))
- self.assertEquals(0, len(settings.floating_ip_settings))
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(0, len(settings.security_group_names))
+ self.assertEqual(0, len(settings.floating_ip_settings))
self.assertIsNone(settings.sudo_user)
- self.assertEquals(900, settings.vm_boot_timeout)
- self.assertEquals(300, settings.vm_delete_timeout)
- self.assertEquals(180, settings.ssh_connect_timeout)
+ self.assertEqual(900, settings.vm_boot_timeout)
+ self.assertEqual(300, settings.vm_delete_timeout)
+ self.assertEqual(180, settings.ssh_connect_timeout)
self.assertIsNone(settings.availability_zone)
def test_all(self):
security_group_names=['sec_grp_1'], floating_ip_settings=[fip_settings],
sudo_user='joe', vm_boot_timeout=999, vm_delete_timeout=333,
ssh_connect_timeout=111, availability_zone='server name')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(1, len(settings.security_group_names))
- self.assertEquals('sec_grp_1', settings.security_group_names[0])
- self.assertEquals(1, len(settings.floating_ip_settings))
- self.assertEquals('foo-fip', settings.floating_ip_settings[0].name)
- self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name)
- self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name)
- self.assertEquals('joe', settings.sudo_user)
- self.assertEquals(999, settings.vm_boot_timeout)
- self.assertEquals(333, settings.vm_delete_timeout)
- self.assertEquals(111, settings.ssh_connect_timeout)
- self.assertEquals('server name', settings.availability_zone)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(1, len(settings.security_group_names))
+ self.assertEqual('sec_grp_1', settings.security_group_names[0])
+ self.assertEqual(1, len(settings.floating_ip_settings))
+ self.assertEqual('foo-fip', settings.floating_ip_settings[0].name)
+ self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name)
+ self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name)
+ self.assertEqual('joe', settings.sudo_user)
+ self.assertEqual(999, settings.vm_boot_timeout)
+ self.assertEqual(333, settings.vm_delete_timeout)
+ self.assertEqual(111, settings.ssh_connect_timeout)
+ self.assertEqual('server name', settings.availability_zone)
def test_config_all(self):
port_settings = PortSettings(name='foo-port', network_name='bar-net')
'floating_ips': [fip_settings], 'sudo_user': 'joe',
'vm_boot_timeout': 999, 'vm_delete_timeout': 333,
'ssh_connect_timeout': 111, 'availability_zone': 'server name'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.flavor)
- self.assertEquals(1, len(settings.port_settings))
- self.assertEquals('foo-port', settings.port_settings[0].name)
- self.assertEquals('bar-net', settings.port_settings[0].network_name)
- self.assertEquals(1, len(settings.security_group_names))
- self.assertEquals(1, len(settings.floating_ip_settings))
- self.assertEquals('foo-fip', settings.floating_ip_settings[0].name)
- self.assertEquals('bar-port', settings.floating_ip_settings[0].port_name)
- self.assertEquals('foo-bar-router', settings.floating_ip_settings[0].router_name)
- self.assertEquals('joe', settings.sudo_user)
- self.assertEquals(999, settings.vm_boot_timeout)
- self.assertEquals(333, settings.vm_delete_timeout)
- self.assertEquals(111, settings.ssh_connect_timeout)
- self.assertEquals('server name', settings.availability_zone)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.flavor)
+ self.assertEqual(1, len(settings.port_settings))
+ self.assertEqual('foo-port', settings.port_settings[0].name)
+ self.assertEqual('bar-net', settings.port_settings[0].network_name)
+ self.assertEqual(1, len(settings.security_group_names))
+ self.assertEqual(1, len(settings.floating_ip_settings))
+ self.assertEqual('foo-fip', settings.floating_ip_settings[0].name)
+ self.assertEqual('bar-port', settings.floating_ip_settings[0].port_name)
+ self.assertEqual('foo-bar-router', settings.floating_ip_settings[0].router_name)
+ self.assertEqual('joe', settings.sudo_user)
+ self.assertEqual(999, settings.vm_boot_timeout)
+ self.assertEqual(333, settings.vm_delete_timeout)
+ self.assertEqual(111, settings.ssh_connect_timeout)
+ self.assertEqual('server name', settings.availability_zone)
class FloatingIpSettingsUnitTests(unittest.TestCase):
def test_name_port_router_only(self):
settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router')
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
self.assertIsNone(settings.subnet_name)
self.assertTrue(settings.provisioning)
def test_config_with_name_port_router_only(self):
settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
self.assertIsNone(settings.subnet_name)
self.assertTrue(settings.provisioning)
def test_all(self):
settings = FloatingIpSettings(name='foo', port_name='foo-port', router_name='bar-router',
subnet_name='bar-subnet', provisioning=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
- self.assertEquals('bar-subnet', settings.subnet_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
+ self.assertEqual('bar-subnet', settings.subnet_name)
self.assertFalse(settings.provisioning)
def test_config_all(self):
settings = FloatingIpSettings(config={'name': 'foo', 'port_name': 'foo-port', 'router_name': 'bar-router',
'subnet_name': 'bar-subnet', 'provisioning': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('foo-port', settings.port_name)
- self.assertEquals('bar-router', settings.router_name)
- self.assertEquals('bar-subnet', settings.subnet_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('foo-port', settings.port_name)
+ self.assertEqual('bar-router', settings.router_name)
+ self.assertEqual('bar-subnet', settings.subnet_name)
self.assertFalse(settings.provisioning)
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
self.os_creds, instance_settings, self.image_creator.image_settings)
vm_inst = self.inst_creator.create()
- self.assertEquals(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
+ self.assertEqual(1, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
# Delete instance
nova_utils.delete_vm_instance(self.nova, vm_inst)
self.assertTrue(self.inst_creator.vm_deleted(block=True))
- self.assertEquals(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
+ self.assertEqual(0, len(nova_utils.get_servers_by_name(self.nova, instance_settings.name)))
# Exception should not be thrown
self.inst_creator.clean()
try:
inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.keypair_creator:
try:
self.keypair_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning keypair with message - ' + e.message)
+ logger.error('Unexpected exception cleaning keypair with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.router_creator:
try:
self.router_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning router with message - ' + e.message)
+ logger.error('Unexpected exception cleaning router with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
self.inst_creators.append(inst_creator)
vm_inst = inst_creator.create()
- self.assertEquals(ip_1, inst_creator.get_port_ip(self.port_1_name))
+ self.assertEqual(ip_1, inst_creator.get_port_ip(self.port_1_name))
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
def test_ssh_client_fip_before_active(self):
"""
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
self.assertIsNotNone(vm_inst)
self.assertTrue(inst_creator.vm_active(block=True))
- self.assertEquals(vm_inst, inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, inst_creator.get_vm_inst())
self.assertTrue(validate_ssh_client(inst_creator))
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(ip, self.inst_creator.get_port_ip(
+ self.assertEqual(ip, self.inst_creator.get_port_ip(
self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name))
def test_set_custom_invalid_ip_one_subnet(self):
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
+ self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
def test_set_custom_invalid_mac(self):
"""
self.image_creator.image_settings)
self.inst_creator.create(block=True)
- self.assertEquals(ip, self.inst_creator.get_port_ip(
+ self.assertEqual(ip, self.inst_creator.get_port_ip(
self.port_1_name, subnet_name=self.net_config.network_settings.subnet_settings[0].name))
- self.assertEquals(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
+ self.assertEqual(mac_addr, self.inst_creator.get_port_mac(self.port_1_name))
def test_set_allowed_address_pairs(self):
"""
port = self.inst_creator.get_port_by_name(port_settings.name)
self.assertIsNotNone(port)
self.assertIsNotNone(port['port'].get('allowed_address_pairs'))
- self.assertEquals(1, len(port['port']['allowed_address_pairs']))
+ self.assertEqual(1, len(port['port']['allowed_address_pairs']))
validation_utils.objects_equivalent(pair, port['port']['allowed_address_pairs'][0])
def test_set_allowed_address_pairs_bad_mac(self):
try:
inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
vm = creator.get_vm_inst()
deployed_zone = vm._info['OS-EXT-AZ:availability_zone']
deployed_host = vm._info['OS-EXT-SRV-ATTR:host']
- self.assertEquals(zone, deployed_zone + ':' + deployed_host)
+ self.assertEqual(zone, deployed_zone + ':' + deployed_host)
index += 1
self.keypair_creator.create()
except Exception as e:
self.tearDown()
- raise Exception(e.message)
+ raise Exception(str(e))
def tearDown(self):
"""
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.keypair_creator:
try:
self.keypair_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning keypair with message - ' + e.message)
+ logger.error('Unexpected exception cleaning keypair with message - ' + str(e))
if os.path.isfile(self.keypair_pub_filepath):
os.remove(self.keypair_pub_filepath)
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
for router_creator in self.router_creators:
try:
router_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning router with message - ' + e.message)
+ logger.error('Unexpected exception cleaning router with message - ' + str(e))
for network_creator in self.network_creators:
try:
network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
vm_inst = self.inst_creator.create(block=True)
- self.assertEquals(vm_inst, self.inst_creator.get_vm_inst())
+ self.assertEqual(vm_inst, self.inst_creator.get_vm_inst())
# Effectively blocks until VM has been properly activated
self.assertTrue(self.inst_creator.vm_active(block=True))
# Effectively blocks until VM's ssh port has been opened
self.assertTrue(self.inst_creator.vm_ssh_active(block=True))
+ # TODO - Refactor config_nics() to return a status that can be validated here.
self.inst_creator.config_nics()
# TODO - *** ADD VALIDATION HERE ***
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
for sec_grp_creator in self.sec_grp_creators:
try:
sec_grp_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning security group with message - ' + e.message)
+ logger.error('Unexpected exception cleaning security group with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
try:
self.inst_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning VM instance with message - ' + e.message)
+ logger.error('Unexpected exception cleaning VM instance with message - ' + str(e))
if self.flavor_creator:
try:
self.flavor_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning flavor with message - ' + e.message)
+ logger.error('Unexpected exception cleaning flavor with message - ' + str(e))
if self.network_creator:
try:
self.network_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning network with message - ' + e.message)
+ logger.error('Unexpected exception cleaning network with message - ' + str(e))
if self.image_creator:
try:
self.image_creator.clean()
except Exception as e:
- logger.error('Unexpected exception cleaning image with message - ' + e.message)
+ logger.error('Unexpected exception cleaning image with message - ' + str(e))
super(self.__class__, self).__clean__()
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
import uuid
import unittest
-from Crypto.PublicKey import RSA
-
from snaps.openstack.create_keypairs import KeypairSettings, OpenStackKeypair
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
def test_name_only(self):
settings = KeypairSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_config_with_name_only(self):
settings = KeypairSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_name_pub_only(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub')
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_config_with_name_pub_only(self):
settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
self.assertIsNone(settings.private_filepath)
def test_name_priv_only(self):
settings = KeypairSettings(name='foo', private_filepath='/foo/bar')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_config_with_name_priv_only(self):
settings = KeypairSettings(config={'name': 'foo', 'private_filepath': '/foo/bar'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertIsNone(settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_all(self):
settings = KeypairSettings(name='foo', public_filepath='/foo/bar.pub', private_filepath='/foo/bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
def test_config_all(self):
settings = KeypairSettings(config={'name': 'foo', 'public_filepath': '/foo/bar.pub',
'private_filepath': '/foo/bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('/foo/bar.pub', settings.public_filepath)
- self.assertEquals('/foo/bar', settings.private_filepath)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('/foo/bar.pub', settings.public_filepath)
+ self.assertEqual('/foo/bar', settings.private_filepath)
class CreateKeypairsTests(OSIntegrationTestCase):
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
def test_create_delete_keypair(self):
"""
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
def test_create_keypair_save_both(self):
"""
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
self.assertTrue(os.path.isfile(self.priv_file_path))
Tests the creation of an existing public keypair from a file
:return:
"""
- keys = RSA.generate(1024)
+ keys = nova_utils.create_keys()
nova_utils.save_keys_to_files(keys=keys, pub_file_path=self.pub_file_path)
self.keypair_creator = OpenStackKeypair(
self.os_creds, KeypairSettings(name=self.keypair_name, public_filepath=self.pub_file_path))
self.keypair_creator.create()
keypair = nova_utils.keypair_exists(self.nova, self.keypair_creator.get_keypair())
- self.assertEquals(self.keypair_creator.get_keypair(), keypair)
+ self.assertEqual(self.keypair_creator.get_keypair(), keypair)
file_key = open(os.path.expanduser(self.pub_file_path)).read()
- self.assertEquals(self.keypair_creator.get_keypair().public_key, file_key)
+ self.assertEqual(self.keypair_creator.get_keypair().public_key, file_key)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
def test_name_only(self):
settings = NetworkSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.shared)
self.assertIsNone(settings.project_name)
self.assertFalse(settings.external)
self.assertIsNone(settings.network_type)
- self.assertEquals(0, len(settings.subnet_settings))
+ self.assertEqual(0, len(settings.subnet_settings))
def test_config_with_name_only(self):
settings = NetworkSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.shared)
self.assertIsNone(settings.project_name)
self.assertFalse(settings.external)
self.assertIsNone(settings.network_type)
- self.assertEquals(0, len(settings.subnet_settings))
+ self.assertEqual(0, len(settings.subnet_settings))
def test_all(self):
sub_settings = SubnetSettings(name='foo-subnet', cidr='10.0.0.0/24')
settings = NetworkSettings(name='foo', admin_state_up=False, shared=True, project_name='bar', external=True,
network_type='flat', physical_network='phy', subnet_settings=[sub_settings])
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertFalse(settings.admin_state_up)
self.assertTrue(settings.shared)
- self.assertEquals('bar', settings.project_name)
+ self.assertEqual('bar', settings.project_name)
self.assertTrue(settings.external)
- self.assertEquals('flat', settings.network_type)
- self.assertEquals('phy', settings.physical_network)
- self.assertEquals(1, len(settings.subnet_settings))
- self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
+ self.assertEqual('flat', settings.network_type)
+ self.assertEqual('phy', settings.physical_network)
+ self.assertEqual(1, len(settings.subnet_settings))
+ self.assertEqual('foo-subnet', settings.subnet_settings[0].name)
def test_config_all(self):
settings = NetworkSettings(config={'name': 'foo', 'admin_state_up': False, 'shared': True,
'physical_network': 'phy',
'subnets':
[{'subnet': {'name': 'foo-subnet', 'cidr': '10.0.0.0/24'}}]})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
self.assertFalse(settings.admin_state_up)
self.assertTrue(settings.shared)
- self.assertEquals('bar', settings.project_name)
+ self.assertEqual('bar', settings.project_name)
self.assertTrue(settings.external)
- self.assertEquals('flat', settings.network_type)
- self.assertEquals('phy', settings.physical_network)
- self.assertEquals(1, len(settings.subnet_settings))
- self.assertEquals('foo-subnet', settings.subnet_settings[0].name)
+ self.assertEqual('flat', settings.network_type)
+ self.assertEqual('phy', settings.physical_network)
+ self.assertEqual(1, len(settings.subnet_settings))
+ self.assertEqual('foo-subnet', settings.subnet_settings[0].name)
class SubnetSettingsUnitTests(unittest.TestCase):
def test_name_cidr_only(self):
settings = SubnetSettings(name='foo', cidr='10.0.0.0/24')
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(4, settings.ip_version)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(4, settings.ip_version)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.start)
self.assertIsNone(settings.end)
self.assertIsNone(settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
self.assertIsNone(settings.host_routes)
self.assertIsNone(settings.destination)
self.assertIsNone(settings.nexthop)
def test_config_with_name_cidr_only(self):
settings = SubnetSettings(config={'name': 'foo', 'cidr': '10.0.0.0/24'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(4, settings.ip_version)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(4, settings.ip_version)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.start)
self.assertIsNone(settings.end)
self.assertIsNone(settings.gateway_ip)
self.assertIsNone(settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
self.assertIsNone(settings.host_routes)
self.assertIsNone(settings.destination)
self.assertIsNone(settings.nexthop)
start='10.0.0.2', end='10.0.0.101', gateway_ip='10.0.0.1', enable_dhcp=False,
dns_nameservers=['8.8.8.8'], host_routes=[host_routes], destination='dest',
nexthop='hop', ipv6_ra_mode='dhcpv6-stateful', ipv6_address_mode='slaac')
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(6, settings.ip_version)
- self.assertEquals('bar-project', settings.project_name)
- self.assertEquals('10.0.0.2', settings.start)
- self.assertEquals('10.0.0.101', settings.end)
- self.assertEquals('10.0.0.1', settings.gateway_ip)
- self.assertEquals(False, settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
- self.assertEquals(1, len(settings.host_routes))
- self.assertEquals(host_routes, settings.host_routes[0])
- self.assertEquals('dest', settings.destination)
- self.assertEquals('hop', settings.nexthop)
- self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
- self.assertEquals('slaac', settings.ipv6_address_mode)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(6, settings.ip_version)
+ self.assertEqual('bar-project', settings.project_name)
+ self.assertEqual('10.0.0.2', settings.start)
+ self.assertEqual('10.0.0.101', settings.end)
+ self.assertEqual('10.0.0.1', settings.gateway_ip)
+ self.assertEqual(False, settings.enable_dhcp)
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.host_routes))
+ self.assertEqual(host_routes, settings.host_routes[0])
+ self.assertEqual('dest', settings.destination)
+ self.assertEqual('hop', settings.nexthop)
+ self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode)
+ self.assertEqual('slaac', settings.ipv6_address_mode)
def test_config_all(self):
host_routes = {'destination': '0.0.0.0/0', 'nexthop': '123.456.78.9'}
'dns_nameservers': ['8.8.8.8'], 'host_routes': [host_routes],
'destination': 'dest', 'nexthop': 'hop', 'ipv6_ra_mode': 'dhcpv6-stateful',
'ipv6_address_mode': 'slaac'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('10.0.0.0/24', settings.cidr)
- self.assertEquals(6, settings.ip_version)
- self.assertEquals('bar-project', settings.project_name)
- self.assertEquals('10.0.0.2', settings.start)
- self.assertEquals('10.0.0.101', settings.end)
- self.assertEquals('10.0.0.1', settings.gateway_ip)
- self.assertEquals(False, settings.enable_dhcp)
- self.assertEquals(1, len(settings.dns_nameservers))
- self.assertEquals('8.8.8.8', settings.dns_nameservers[0])
- self.assertEquals(1, len(settings.host_routes))
- self.assertEquals(host_routes, settings.host_routes[0])
- self.assertEquals('dest', settings.destination)
- self.assertEquals('hop', settings.nexthop)
- self.assertEquals('dhcpv6-stateful', settings.ipv6_ra_mode)
- self.assertEquals('slaac', settings.ipv6_address_mode)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('10.0.0.0/24', settings.cidr)
+ self.assertEqual(6, settings.ip_version)
+ self.assertEqual('bar-project', settings.project_name)
+ self.assertEqual('10.0.0.2', settings.start)
+ self.assertEqual('10.0.0.101', settings.end)
+ self.assertEqual('10.0.0.1', settings.gateway_ip)
+ self.assertEqual(False, settings.enable_dhcp)
+ self.assertEqual(1, len(settings.dns_nameservers))
+ self.assertEqual('8.8.8.8', settings.dns_nameservers[0])
+ self.assertEqual(1, len(settings.host_routes))
+ self.assertEqual(host_routes, settings.host_routes[0])
+ self.assertEqual('dest', settings.destination)
+ self.assertEqual('hop', settings.nexthop)
+ self.assertEqual('dhcpv6-stateful', settings.ipv6_ra_mode)
+ self.assertEqual('slaac', settings.ipv6_address_mode)
class PortSettingsUnitTests(unittest.TestCase):
def test_name_netname_only(self):
settings = PortSettings(name='foo', network_name='bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.mac_address)
def test_config_with_name_netname_only(self):
settings = PortSettings(config={'name': 'foo', 'network_name': 'bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertTrue(settings.admin_state_up)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.mac_address)
security_groups=['foo_grp_id'], allowed_address_pairs=allowed_address_pairs,
opt_value='opt value', opt_name='opt name', device_owner='owner',
device_id='device number')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertFalse(settings.admin_state_up)
- self.assertEquals('foo-project', settings.project_name)
- self.assertEquals('1234', settings.mac_address)
- self.assertEquals(ip_addrs, settings.ip_addrs)
- self.assertEquals(fixed_ips, settings.fixed_ips)
- self.assertEquals(1, len(settings.security_groups))
- self.assertEquals('foo_grp_id', settings.security_groups[0])
- self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
- self.assertEquals('opt value', settings.opt_value)
- self.assertEquals('opt name', settings.opt_name)
- self.assertEquals('owner', settings.device_owner)
- self.assertEquals('device number', settings.device_id)
+ self.assertEqual('foo-project', settings.project_name)
+ self.assertEqual('1234', settings.mac_address)
+ self.assertEqual(ip_addrs, settings.ip_addrs)
+ self.assertEqual(fixed_ips, settings.fixed_ips)
+ self.assertEqual(1, len(settings.security_groups))
+ self.assertEqual('foo_grp_id', settings.security_groups[0])
+ self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs)
+ self.assertEqual('opt value', settings.opt_value)
+ self.assertEqual('opt name', settings.opt_name)
+ self.assertEqual('owner', settings.device_owner)
+ self.assertEqual('device number', settings.device_id)
def test_config_all(self):
ip_addrs = [{'subnet_name', 'foo-sub', 'ip', '10.0.0.10'}]
'fixed_ips': fixed_ips, 'security_groups': ['foo_grp_id'],
'allowed_address_pairs': allowed_address_pairs, 'opt_value': 'opt value',
'opt_name': 'opt name', 'device_owner': 'owner', 'device_id': 'device number'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.network_name)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.network_name)
self.assertFalse(settings.admin_state_up)
- self.assertEquals('foo-project', settings.project_name)
- self.assertEquals('1234', settings.mac_address)
- self.assertEquals(ip_addrs, settings.ip_addrs)
- self.assertEquals(fixed_ips, settings.fixed_ips)
- self.assertEquals(1, len(settings.security_groups))
- self.assertEquals('foo_grp_id', settings.security_groups[0])
- self.assertEquals(allowed_address_pairs, settings.allowed_address_pairs)
- self.assertEquals('opt value', settings.opt_value)
- self.assertEquals('opt name', settings.opt_name)
- self.assertEquals('owner', settings.device_owner)
- self.assertEquals('device number', settings.device_id)
+ self.assertEqual('foo-project', settings.project_name)
+ self.assertEqual('1234', settings.mac_address)
+ self.assertEqual(ip_addrs, settings.ip_addrs)
+ self.assertEqual(fixed_ips, settings.fixed_ips)
+ self.assertEqual(1, len(settings.security_groups))
+ self.assertEqual('foo_grp_id', settings.security_groups[0])
+ self.assertEqual(allowed_address_pairs, settings.allowed_address_pairs)
+ self.assertEqual('opt value', settings.opt_value)
+ self.assertEqual('opt name', settings.opt_name)
+ self.assertEqual('owner', settings.device_owner)
+ self.assertEqual('device number', settings.device_id)
class CreateNetworkSuccessTests(OSIntegrationTestCase):
self.net_creator2 = OpenStackNetwork(self.os_creds, self.net_config.network_settings)
self.net_creator2.create()
- self.assertEquals(self.net_creator.get_network()['network']['id'],
- self.net_creator2.get_network()['network']['id'])
+ self.assertEqual(self.net_creator.get_network()['network']['id'],
+ self.net_creator2.get_network()['network']['id'])
class CreateNetworkTypeTests(OSComponentTestCase):
# Validate network was created
neutron_utils_tests.validate_network(self.neutron, net_settings.name, True)
- self.assertEquals(network_type, network['network']['provider:network_type'])
+ self.assertEqual(network_type, network['network']['provider:network_type'])
# TODO - determine what value we need to place into physical_network
# - Do not know what vaule to place into the 'physical_network' setting.
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
def test_name_only(self):
settings = ProjectSettings(name='foo')
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('default', settings.domain)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_config_with_name_only(self):
settings = ProjectSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('default', settings.domain)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('default', settings.domain)
self.assertIsNone(settings.description)
self.assertTrue(settings.enabled)
def test_all(self):
settings = ProjectSettings(name='foo', domain='bar', description='foobar', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
def test_config_all(self):
settings = ProjectSettings(config={'name': 'foo', 'domain': 'bar', 'description': 'foobar', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.domain)
- self.assertEquals('foobar', settings.description)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.domain)
+ self.assertEqual('foobar', settings.description)
self.assertFalse(settings.enabled)
retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
def test_create_project_2x(self):
"""
retrieved_project = keystone_utils.get_project(keystone=self.keystone, project_name=self.project_settings.name)
self.assertIsNotNone(retrieved_project)
- self.assertEquals(created_project, retrieved_project)
+ self.assertEqual(created_project, retrieved_project)
project2 = OpenStackProject(self.os_creds, self.project_settings).create()
- self.assertEquals(retrieved_project, project2)
+ self.assertEqual(retrieved_project, project2)
def test_create_delete_project(self):
"""
self.sec_grp_creators.append(sec_grp_creator)
if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
else:
self.fail('Cannot locate the project or tenant ID')
self.sec_grp_creators.append(sec_grp_creator)
if 'tenant_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['tenant_id'])
elif 'project_id' in sec_grp['security_group']:
- self.assertEquals(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
+ self.assertEqual(self.project_creator.get_project().id, sec_grp['security_group']['project_id'])
else:
self.fail('Cannot locate the project or tenant ID')
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
def test_name_and_direction(self):
settings = SecurityGroupRuleSettings(sec_grp_name='foo', direction=Direction.ingress)
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
def test_config_name_and_direction(self):
settings = SecurityGroupRuleSettings(config={'sec_grp_name': 'foo', 'direction': 'ingress'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals(Direction.ingress, settings.direction)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.direction)
def test_all(self):
settings = SecurityGroupRuleSettings(
sec_grp_name='foo', description='fubar', direction=Direction.egress, remote_group_id='rgi',
protocol=Protocol.icmp, ethertype=Ethertype.IPv6, port_range_min=1, port_range_max=2,
remote_ip_prefix='prfx')
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.icmp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.icmp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
def test_config_all(self):
settings = SecurityGroupRuleSettings(
'port_range_min': 1,
'port_range_max': 2,
'remote_ip_prefix': 'prfx'})
- self.assertEquals('foo', settings.sec_grp_name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals(Direction.egress, settings.direction)
- self.assertEquals('rgi', settings.remote_group_id)
- self.assertEquals(Protocol.tcp, settings.protocol)
- self.assertEquals(Ethertype.IPv6, settings.ethertype)
- self.assertEquals(1, settings.port_range_min)
- self.assertEquals(2, settings.port_range_max)
- self.assertEquals('prfx', settings.remote_ip_prefix)
+ self.assertEqual('foo', settings.sec_grp_name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual(Direction.egress, settings.direction)
+ self.assertEqual('rgi', settings.remote_group_id)
+ self.assertEqual(Protocol.tcp, settings.protocol)
+ self.assertEqual(Ethertype.IPv6, settings.ethertype)
+ self.assertEqual(1, settings.port_range_min)
+ self.assertEqual(2, settings.port_range_max)
+ self.assertEqual('prfx', settings.remote_ip_prefix)
class SecurityGroupSettingsUnitTests(unittest.TestCase):
def test_name_only(self):
settings = SecurityGroupSettings(name='foo')
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_config_with_name_only(self):
settings = SecurityGroupSettings(config={'name': 'foo'})
- self.assertEquals('foo', settings.name)
+ self.assertEqual('foo', settings.name)
def test_invalid_rule(self):
rule_setting = SecurityGroupRuleSettings(sec_grp_name='bar', direction=Direction.ingress)
settings = SecurityGroupSettings(
name='bar', description='fubar', project_name='foo', rule_settings=rule_settings)
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(rule_settings[0], settings.rule_settings[0])
- self.assertEquals(rule_settings[1], settings.rule_settings[1])
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(rule_settings[0], settings.rule_settings[0])
+ self.assertEqual(rule_settings[1], settings.rule_settings[1])
def test_config_all(self):
settings = SecurityGroupSettings(
'project_name': 'foo',
'rules': [{'sec_grp_name': 'bar', 'direction': 'ingress'}]})
- self.assertEquals('bar', settings.name)
- self.assertEquals('fubar', settings.description)
- self.assertEquals('foo', settings.project_name)
- self.assertEquals(1, len(settings.rule_settings))
- self.assertEquals('bar', settings.rule_settings[0].sec_grp_name)
- self.assertEquals(Direction.ingress, settings.rule_settings[0].direction)
+ self.assertEqual('bar', settings.name)
+ self.assertEqual('fubar', settings.description)
+ self.assertEqual('foo', settings.project_name)
+ self.assertEqual(1, len(settings.rule_settings))
+ self.assertEqual('bar', settings.rule_settings[0].sec_grp_name)
+ self.assertEqual(Direction.ingress, settings.rule_settings[0].direction)
class CreateSecurityGroupTests(OSIntegrationTestCase):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_create_delete_group(self):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_create_group_with_several_rules(self):
sec_grp = neutron_utils.get_security_group(self.neutron, self.sec_grp_name)
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
def test_add_rule(self):
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.add_rule(SecurityGroupRuleSettings(sec_grp_name=self.sec_grp_creator.sec_grp_settings.name,
direction=Direction.egress, protocol=Protocol.icmp))
rules2 = neutron_utils.get_rules_by_security_group(self.neutron, self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) + 1, len(rules2))
+ self.assertEqual(len(rules) + 1, len(rules2))
def test_remove_rule_by_id(self):
"""
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.remove_rule(rule_id=rules[0]['security_group_rule']['id'])
rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
def test_remove_rule_by_setting(self):
"""
validation_utils.objects_equivalent(self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(self.sec_grp_creator.get_rules()), len(rules))
+ self.assertEqual(len(self.sec_grp_creator.get_rules()), len(rules))
validation_utils.objects_equivalent(self.sec_grp_creator.get_rules(), rules)
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
rules_after_del = neutron_utils.get_rules_by_security_group(self.neutron,
self.sec_grp_creator.get_security_group())
- self.assertEquals(len(rules) - 1, len(rules_after_del))
+ self.assertEqual(len(rules) - 1, len(rules_after_del))
# TODO - Add more tests with different rules. Rule creation parameters can be somewhat complex
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
def test_name_pass_only(self):
settings = UserSettings(name='foo', password='bar')
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.email)
self.assertTrue(settings.enabled)
def test_config_with_name_pass_only(self):
settings = UserSettings(config={'name': 'foo', 'password': 'bar'})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
self.assertIsNone(settings.project_name)
self.assertIsNone(settings.email)
self.assertTrue(settings.enabled)
def test_all(self):
settings = UserSettings(name='foo', password='bar', project_name='proj-foo', email='foo@bar.com', enabled=False)
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
- self.assertEquals('proj-foo', settings.project_name)
- self.assertEquals('foo@bar.com', settings.email)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
+ self.assertEqual('proj-foo', settings.project_name)
+ self.assertEqual('foo@bar.com', settings.email)
self.assertFalse(settings.enabled)
def test_config_all(self):
settings = UserSettings(config={'name': 'foo', 'password': 'bar', 'project_name': 'proj-foo',
'email': 'foo@bar.com', 'enabled': False})
- self.assertEquals('foo', settings.name)
- self.assertEquals('bar', settings.password)
- self.assertEquals('proj-foo', settings.project_name)
- self.assertEquals('foo@bar.com', settings.email)
+ self.assertEqual('foo', settings.name)
+ self.assertEqual('bar', settings.password)
+ self.assertEqual('proj-foo', settings.project_name)
+ self.assertEqual('foo@bar.com', settings.email)
self.assertFalse(settings.enabled)
retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name)
self.assertIsNotNone(retrieved_user)
- self.assertEquals(created_user, retrieved_user)
+ self.assertEqual(created_user, retrieved_user)
def test_create_user_2x(self):
"""
retrieved_user = keystone_utils.get_user(self.keystone, self.user_settings.name)
self.assertIsNotNone(retrieved_user)
- self.assertEquals(created_user, retrieved_user)
+ self.assertEqual(created_user, retrieved_user)
# Create user for the second time to ensure it is the same
user2 = OpenStackUser(self.os_creds, self.user_settings).create()
- self.assertEquals(retrieved_user, user2)
+ self.assertEqual(retrieved_user, user2)
def test_create_delete_user(self):
"""
# Delete user
self.user_creator.clean()
self.assertIsNone(self.user_creator.get_user())
-
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
:return: T/F
"""
if (type(dict1) is dict or type(dict1) is _DictWithMeta) and (type(dict2) is dict or type(dict2) is _DictWithMeta):
- for key, value1 in dict1.iteritems():
+ for key, value1 in dict1.items():
if not objects_equivalent(value1, dict2.get(key)):
return False
return True
kwargs['name'] = image_settings.name
kwargs['disk_format'] = image_settings.format
kwargs['container_format'] = 'bare'
-
if image_settings.extra_properties:
- for key, value in image_settings.extra_properties.iteritems():
- kwargs[key] = value
+ kwargs.update(image_settings.extra_properties)
created_image = glance.images.create(**kwargs)
image_file = file_utils.get_file(image_filename)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
from neutronclient.common.exceptions import NotFound
from neutronclient.neutron.client import Client
-import keystone_utils
+from snaps.openstack.utils import keystone_utils
__author__ = 'spisarski'
net_filter['project_id'] = project_id
networks = neutron.list_networks(**net_filter)
- for network, netInsts in networks.iteritems():
+ for network, netInsts in networks.items():
for inst in netInsts:
if inst.get('name') == network_name:
if project_id and inst.get('project_id') == project_id:
:return:
"""
networks = neutron.list_networks(**{'id': network_id})
- for network, netInsts in networks.iteritems():
+ for network, netInsts in networks.items():
for inst in netInsts:
if inst.get('id') == network_id:
return {'network': inst}
:return:
"""
subnets = neutron.list_subnets(**{'name': subnet_name})
- for subnet, subnetInst in subnets.iteritems():
+ for subnet, subnetInst in subnets.items():
for inst in subnetInst:
if inst.get('name') == subnet_name:
return {'subnet': inst}
:return:
"""
routers = neutron.list_routers(**{'name': router_name})
- for router, routerInst in routers.iteritems():
+ for router, routerInst in routers.items():
for inst in routerInst:
if inst.get('name') == router_name:
return {'router': inst}
logger.info('Removing router interface from router named ' + router['router']['name'])
neutron.remove_interface_router(router=router['router']['id'], body=__create_port_json_body(subnet, port))
except NotFound as e:
- logger.warn('Could not remove router interface. NotFound - ' + e.message)
+ logger.warning('Could not remove router interface. NotFound - ' + str(e))
pass
else:
- logger.warn('Could not remove router interface, No router object')
+ logger.warning('Could not remove router interface, No router object')
def __create_port_json_body(subnet=None, port=None):
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
+from cryptography.hazmat.primitives import serialization
+from cryptography.hazmat.primitives.asymmetric import rsa
+from cryptography.hazmat.backends import default_backend
+
import os
import logging
-import keystone_utils
+from snaps.openstack.utils import keystone_utils
from novaclient.client import Client
from novaclient.exceptions import NotFound
return nova.servers.get(server)
+def create_keys(key_size=2048):
+ """
+ Generates public and private keys
+ :param key_size: the number of bytes for the key size
+ :return: the cryptography keys
+ """
+ return rsa.generate_private_key(backend=default_backend(), public_exponent=65537,
+ key_size=key_size)
+
+
+def public_key_openssh(keys):
+ """
+ Returns the public key for OpenSSH
+ :param keys: the keys generated by create_keys() from cryptography
+ :return: the OpenSSH public key
+ """
+ return keys.public_key().public_bytes(serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+
+
def save_keys_to_files(keys=None, pub_file_path=None, priv_file_path=None):
"""
Saves the generated RSA generated keys to the filesystem
- :param keys: the keys to save
+ :param keys: the keys to save generated by cryptography
:param pub_file_path: the path to the public keys
:param priv_file_path: the path to the private keys
:return: None
if not os.path.isdir(pub_dir):
os.mkdir(pub_dir)
public_handle = open(pub_file_path, 'wb')
- public_handle.write(keys.publickey().exportKey('OpenSSH'))
+ public_bytes = keys.public_key().public_bytes(
+ serialization.Encoding.OpenSSH, serialization.PublicFormat.OpenSSH)
+ public_handle.write(public_bytes)
public_handle.close()
os.chmod(pub_file_path, 0o400)
logger.info("Saved public key to - " + pub_file_path)
if not os.path.isdir(priv_dir):
os.mkdir(priv_dir)
private_handle = open(priv_file_path, 'wb')
- private_handle.write(keys.exportKey())
+ private_handle.write(keys.private_bytes(encoding=serialization.Encoding.PEM,
+ format=serialization.PrivateFormat.TraditionalOpenSSL,
+ encryption_algorithm=serialization.NoEncryption()))
private_handle.close()
os.chmod(priv_file_path, 0o400)
logger.info("Saved private key to - " + priv_file_path)
:param file_path: the path to the public key file
:return: the keypair object
"""
- with open(os.path.expanduser(file_path)) as fpubkey:
+ with open(os.path.expanduser(file_path), 'rb') as fpubkey:
logger.info('Saving keypair to - ' + file_path)
return upload_keypair(nova, name, fpubkey.read())
:return: the keypair object
"""
logger.info('Creating keypair with name - ' + name)
- return nova.keypairs.create(name=name, public_key=key)
+ return nova.keypairs.create(name=name, public_key=key.decode('utf-8'))
def keypair_exists(nova, keypair_obj):
zones = nova.availability_zones.list()
for zone in zones:
if zone.zoneName == 'nova':
- for key, host in zone.hosts.iteritems():
+ for key, host in zone.hosts.items():
out.append(zone.zoneName + ':' + key)
return out
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
"""
project_settings = ProjectSettings(name=self.project_name)
self.project = keystone_utils.create_project(self.keystone, project_settings)
- self.assertEquals(self.project_name, self.project.name)
+ self.assertEqual(self.project_name, self.project.name)
project = keystone_utils.get_project(keystone=self.keystone, project_name=project_settings.name)
self.assertIsNotNone(project)
- self.assertEquals(self.project_name, self.project.name)
+ self.assertEqual(self.project_name, self.project.name)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
Tests the neutron_utils.create_neutron_net() function with an empty network name
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, NetworkSettings(name=''))
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds,
+ network_settings=NetworkSettings(name=''))
def test_create_network_null_name(self):
"""
Tests the neutron_utils.create_neutron_net() function when the network name is None
"""
with self.assertRaises(Exception):
- self.network = neutron_utils.create_network(self.neutron, NetworkSettings())
+ self.network = neutron_utils.create_network(self.neutron, self.os_creds,
+ network_settings=NetworkSettings())
class NeutronUtilsSubnetTests(OSComponentTestCase):
:return: True/False
"""
ports = neutron.list_ports()
- for port, port_insts in ports.iteritems():
+ for port, port_insts in ports.items():
for inst in port_insts:
if inst['id'] == port_obj['port']['id']:
return inst['name'] == this_port_name
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
import os
import uuid
-from Crypto.PublicKey import RSA
-
from snaps.openstack.utils import nova_utils
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.create_flavor import FlavorSettings
self.pub_key_file_path = self.priv_key_file_path + '.pub'
self.nova = nova_utils.nova_client(self.os_creds)
- self.keys = RSA.generate(1024)
- self.public_key = self.keys.publickey().exportKey('OpenSSH')
+ self.keys = nova_utils.create_keys()
+ self.public_key = nova_utils.public_key_openssh(self.keys)
self.keypair_name = guid
self.keypair = None
self.floating_ip = None
"""
self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
- self.assertEquals(self.keypair, result)
+ self.assertEqual(self.keypair, result)
keypair = nova_utils.get_keypair_by_name(self.nova, self.keypair_name)
- self.assertEquals(self.keypair, keypair)
+ self.assertEqual(self.keypair, keypair)
def test_create_delete_keypair(self):
"""
"""
self.keypair = nova_utils.upload_keypair(self.nova, self.keypair_name, self.public_key)
result = nova_utils.keypair_exists(self.nova, self.keypair)
- self.assertEquals(self.keypair, result)
+ self.assertEqual(self.keypair, result)
nova_utils.delete_keypair(self.nova, self.keypair)
result2 = nova_utils.keypair_exists(self.nova, self.keypair)
self.assertIsNone(result2)
nova_utils.save_keys_to_files(self.keys, self.pub_key_file_path, self.priv_key_file_path)
self.keypair = nova_utils.upload_keypair_file(self.nova, self.keypair_name, self.pub_key_file_path)
pub_key = open(os.path.expanduser(self.pub_key_file_path)).read()
- self.assertEquals(self.keypair.public_key, pub_key)
+ self.assertEqual(self.keypair.public_key, pub_key)
def test_floating_ips(self):
"""
self.floating_ip = nova_utils.create_floating_ip(self.nova, self.ext_net_name)
returned = nova_utils.get_floating_ip(self.nova, self.floating_ip)
- self.assertEquals(self.floating_ip, returned)
+ self.assertEqual(self.floating_ip, returned)
class NovaUtilsFlavorTests(OSComponentTestCase):
Validates the flavor_settings against the OpenStack flavor object
"""
self.assertIsNotNone(self.flavor)
- self.assertEquals(self.flavor_settings.name, self.flavor.name)
- self.assertEquals(self.flavor_settings.flavor_id, self.flavor.id)
- self.assertEquals(self.flavor_settings.ram, self.flavor.ram)
- self.assertEquals(self.flavor_settings.disk, self.flavor.disk)
- self.assertEquals(self.flavor_settings.vcpus, self.flavor.vcpus)
- self.assertEquals(self.flavor_settings.ephemeral, self.flavor.ephemeral)
+ self.assertEqual(self.flavor_settings.name, self.flavor.name)
+ self.assertEqual(self.flavor_settings.flavor_id, self.flavor.id)
+ self.assertEqual(self.flavor_settings.ram, self.flavor.ram)
+ self.assertEqual(self.flavor_settings.disk, self.flavor.disk)
+ self.assertEqual(self.flavor_settings.vcpus, self.flavor.vcpus)
+ self.assertEqual(self.flavor_settings.ephemeral, self.flavor.ephemeral)
if self.flavor_settings.swap == 0:
- self.assertEquals('', self.flavor.swap)
+ self.assertEqual('', self.flavor.swap)
else:
- self.assertEquals(self.flavor_settings.swap, self.flavor.swap)
+ self.assertEqual(self.flavor_settings.swap, self.flavor.swap)
- self.assertEquals(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
- self.assertEquals(self.flavor_settings.is_public, self.flavor.is_public)
+ self.assertEqual(self.flavor_settings.rxtx_factor, self.flavor.rxtx_factor)
+ self.assertEqual(self.flavor_settings.is_public, self.flavor.is_public)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
ssh.connect(ip, username=user, key_filename=private_key_filepath, sock=proxy_cmd)
return ssh
except Exception as e:
- logger.warn('Unable to connect via SSH with message - ' + e.message)
+ logger.warning('Unable to connect via SSH with message - ' + str(e))
self.inst_creator = create_instance.OpenStackVmInstance(
self.os_creds, instance_settings, self.image_creator.image_settings,
keypair_settings=self.keypair_creator.keypair_settings)
- except Exception as e:
+ except:
self.tearDown()
- raise Exception(e.message)
+ raise
def tearDown(self):
"""
retval = ansible_utils.apply_playbook('provisioning/tests/playbooks/simple_playbook.yml', [ip], user, priv_key,
proxy_setting=self.os_creds.proxy_settings)
- self.assertEquals(0, retval)
+ self.assertEqual(0, retval)
ssh = ansible_utils.ssh_client(ip, user, priv_key, self.os_creds.proxy_settings)
self.assertIsNotNone(ssh)
with open(self.test_file_local_path) as f:
file_contents = f.readline()
- self.assertEquals('Hello World!', file_contents)
+ self.assertEqual('Hello World!', file_contents)
def test_apply_template_playbook(self):
"""
with open(self.test_file_local_path) as f:
file_contents = f.readline()
- self.assertEquals('Hello Foo!', file_contents)
+ self.assertEqual('Hello Foo!', file_contents)
-# Copyright (c) 2016 Cable Television Laboratories, Inc. ("CableLabs")
+# Copyright (c) 2017 Cable Television Laboratories, Inc. ("CableLabs")
# and others. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
:return:
"""
os_env_dict = file_utils.read_os_env_file('openstack/tests/conf/overcloudrc_test')
- self.assertEquals('test_pw', os_env_dict['OS_PASSWORD'])
- self.assertEquals('http://foo:5000/v2.0/', os_env_dict['OS_AUTH_URL'])
- self.assertEquals('admin', os_env_dict['OS_USERNAME'])
- self.assertEquals('admin', os_env_dict['OS_TENANT_NAME'])
+ self.assertEqual('test_pw', os_env_dict['OS_PASSWORD'])
+ self.assertEqual('http://foo:5000/v2.0/', os_env_dict['OS_AUTH_URL'])
+ self.assertEqual('admin', os_env_dict['OS_USERNAME'])
+ self.assertEqual('admin', os_env_dict['OS_TENANT_NAME'])