The pattern being replaced has unwittingly added the requirement that
all creator credentials must be of type 'admin' as when looking up
the associated project ID required a call to keystone.projects.list().
As the SNAPS integration tests were always creating users with an 'admin'
role, this issue was not caught. As part of this patch, integration test
users will no longer be admin.
JIRA: SNAPS-274
Change-Id: I02957f69e31a9d4dfa63362d371f061687e59fbf
Signed-off-by: spisarski <s.pisarski@cablelabs.com>
29 files changed:
- project_id = None
- if self.project_name:
- keystone = keystone_utils.keystone_client(os_creds)
- project = keystone_utils.get_project(
- keystone=keystone, project_name=self.project_name)
- if project:
- project_id = project.id
-
+ keystone = keystone_utils.keystone_client(os_creds)
network = neutron_utils.get_network(
network = neutron_utils.get_network(
- neutron, network_name=self.network_name, project_id=project_id)
+ neutron, keystone, network_name=self.network_name,
+ project_name=self.project_name)
if not network:
raise PortConfigError(
'Cannot locate network with name - ' + self.network_name)
if not network:
raise PortConfigError(
'Cannot locate network with name - ' + self.network_name)
if self.name:
out['name'] = self.name
if self.project_name:
if self.name:
out['name'] = self.name
if self.project_name:
+ project = keystone_utils.get_project(
+ keystone=keystone, project_name=self.project_name)
+ project_id = None
+ if project:
+ project_id = project.id
if project_id:
out['tenant_id'] = project_id
else:
if project_id:
out['tenant_id'] = project_id
else:
sec_grp_ids = list()
for sec_grp_name in self.security_groups:
sec_grp = neutron_utils.get_security_group(
sec_grp_ids = list()
for sec_grp_name in self.security_groups:
sec_grp = neutron_utils.get_security_group(
- neutron, sec_grp_name=sec_grp_name, project_id=project_id)
+ neutron, sec_grp_name=sec_grp_name,
+ project_name=self.project_name)
if sec_grp:
sec_grp_ids.append(sec_grp.id)
out['security_groups'] = sec_grp_ids
if sec_grp:
sec_grp_ids.append(sec_grp.id)
out['security_groups'] = sec_grp_ids
if not self.name:
raise RouterConfigError('Name is required')
if not self.name:
raise RouterConfigError('Name is required')
- def dict_for_neutron(self, neutron, os_creds, project_id):
+ def dict_for_neutron(self, neutron, os_creds):
"""
Returns a dictionary object representing this object.
This is meant to be converted into JSON designed for use by the Neutron
"""
Returns a dictionary object representing this object.
This is meant to be converted into JSON designed for use by the Neutron
:param os_creds: The OpenStack credentials for retrieving the keystone
client for looking up the project ID when the
self.project_name is not None
:param os_creds: The OpenStack credentials for retrieving the keystone
client for looking up the project ID when the
self.project_name is not None
- :param project_id: the associated project ID to use when
- self.project_name is None
:return: the dictionary object
"""
out = dict()
ext_gw = dict()
:return: the dictionary object
"""
out = dict()
ext_gw = dict()
+ keystone = keystone_utils.keystone_client(os_creds)
+
if self.name:
out['name'] = self.name
if self.project_name:
if self.name:
out['name'] = self.name
if self.project_name:
- keystone = keystone_utils.keystone_client(os_creds)
project = keystone_utils.get_project(
keystone=keystone, project_name=self.project_name)
if project:
project = keystone_utils.get_project(
keystone=keystone, project_name=self.project_name)
if project:
- project_id = project.id
- if project_id:
- out['tenant_id'] = project_id
+ out['tenant_id'] = project.id
else:
raise RouterConfigError(
'Could not find project ID for project named - ' +
else:
raise RouterConfigError(
'Could not find project ID for project named - ' +
out['admin_state_up'] = self.admin_state_up
if self.external_gateway:
ext_net = neutron_utils.get_network(
out['admin_state_up'] = self.admin_state_up
if self.external_gateway:
ext_net = neutron_utils.get_network(
- neutron, network_name=self.external_gateway)
+ neutron, keystone, network_name=self.external_gateway)
if ext_net:
ext_gw['network_id'] = ext_net.id
out['external_gateway_info'] = ext_gw
if ext_net:
ext_gw['network_id'] = ext_net.id
out['external_gateway_info'] = ext_gw
'Rule settings must correspond with the name of this '
'security group')
'Rule settings must correspond with the name of this '
'security group')
- def dict_for_neutron(self, keystone, project_id):
+ def dict_for_neutron(self, keystone):
"""
Returns a dictionary object representing this object.
This is meant to be converted into JSON designed for use by the Neutron
"""
Returns a dictionary object representing this object.
This is meant to be converted into JSON designed for use by the Neutron
TODO - expand automated testing to exercise all parameters
:param keystone: the Keystone client
TODO - expand automated testing to exercise all parameters
:param keystone: the Keystone client
- :param project_id: the default project ID
:return: the dictionary object
"""
out = dict()
:return: the dictionary object
"""
out = dict()
raise SecurityGroupConfigError(
'Could not find project ID for project named - ' +
self.project_name)
raise SecurityGroupConfigError(
'Could not find project ID for project named - ' +
self.project_name)
- else:
- out['tenant_id'] = project_id
return {'security_group': out}
return {'security_group': out}
raise SecurityGroupRuleConfigError(
'direction and sec_grp_name are required')
raise SecurityGroupRuleConfigError(
'direction and sec_grp_name are required')
- def dict_for_neutron(self, neutron, project_id):
+ def dict_for_neutron(self, neutron, keystone, project_name):
"""
Returns a dictionary object representing this object.
This is meant to be converted into JSON designed for use by the Neutron
API
:param neutron: the neutron client for performing lookups
"""
Returns a dictionary object representing this object.
This is meant to be converted into JSON designed for use by the Neutron
API
:param neutron: the neutron client for performing lookups
- :param project_id: the ID of the project associated with the group
+ :param project_name: the name of the project associated with the group
:return: the dictionary object
"""
out = dict()
:return: the dictionary object
"""
out = dict()
out['protocol'] = self.protocol.value
if self.sec_grp_name:
sec_grp = neutron_utils.get_security_group(
out['protocol'] = self.protocol.value
if self.sec_grp_name:
sec_grp = neutron_utils.get_security_group(
- neutron, sec_grp_name=self.sec_grp_name, project_id=project_id)
+ neutron, keystone, sec_grp_name=self.sec_grp_name,
+ project_name=project_name)
if sec_grp:
out['security_group_id'] = sec_grp.id
else:
if sec_grp:
out['security_group_id'] = sec_grp.id
else:
from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
from snaps.openstack.openstack_creator import OpenStackComputeObject
from snaps.config.vm_inst import VmInstanceConfig, FloatingIpConfig
from snaps.openstack.openstack_creator import OpenStackComputeObject
-from snaps.openstack.utils import glance_utils, cinder_utils, settings_utils
+from snaps.openstack.utils import (
+ glance_utils, cinder_utils, settings_utils, keystone_utils)
from snaps.openstack.utils import neutron_utils
from snaps.openstack.utils import nova_utils
from snaps.openstack.utils.nova_utils import RebootType
from snaps.openstack.utils import neutron_utils
from snaps.openstack.utils import nova_utils
from snaps.openstack.utils.nova_utils import RebootType
logger = logging.getLogger('create_instance')
POLL_INTERVAL = 3
logger = logging.getLogger('create_instance')
POLL_INTERVAL = 3
-VOL_DETACH_TIMEOUT = 120
STATUS_ACTIVE = 'ACTIVE'
STATUS_DELETED = 'DELETED'
STATUS_ACTIVE = 'ACTIVE'
STATUS_DELETED = 'DELETED'
super(self.__class__, self).initialize()
self.__neutron = neutron_utils.neutron_client(self._os_creds)
super(self.__class__, self).initialize()
self.__neutron = neutron_utils.neutron_client(self._os_creds)
+ self.__keystone = keystone_utils.keystone_client(self._os_creds)
self.__ports = self.__query_ports(self.instance_settings.port_settings)
self.__lookup_existing_vm_by_name()
self.__ports = self.__query_ports(self.instance_settings.port_settings)
self.__lookup_existing_vm_by_name()
within the project
"""
server = nova_utils.get_server(
within the project
"""
server = nova_utils.get_server(
- self._nova, self.__neutron,
+ self._nova, self.__neutron, self.__keystone,
vm_inst_settings=self.instance_settings)
if server:
if server.name == self.instance_settings.name:
vm_inst_settings=self.instance_settings)
if server:
if server.name == self.instance_settings.name:
"""
glance = glance_utils.glance_client(self._os_creds)
self.__vm = nova_utils.create_server(
"""
glance = glance_utils.glance_client(self._os_creds)
self.__vm = nova_utils.create_server(
- self._nova, self.__neutron, glance, self.instance_settings,
- self.image_settings, self.project_id, self.keypair_settings)
+ self._nova, self.__keystone, self.__neutron, glance,
+ self.instance_settings, self.image_settings,
+ self._os_creds.project_name, self.keypair_settings)
logger.info('Created instance with name - %s',
self.instance_settings.name)
logger.info('Created instance with name - %s',
self.instance_settings.name)
if volume and self.vm_active(block=True):
vm = nova_utils.attach_volume(
if volume and self.vm_active(block=True):
vm = nova_utils.attach_volume(
- self._nova, self.__neutron, self.__vm, volume,
- self.project_id, timeout=VOL_DETACH_TIMEOUT)
+ self._nova, self.__neutron, self.__keystone, self.__vm,
+ volume, self._os_creds.project_name)
if vm:
self.__vm = vm
else:
if vm:
self.__vm = vm
else:
- logger.warn('Volume [%s] not attached within timeout '
- 'of [%s]', volume.name, VOL_DETACH_TIMEOUT)
+ logger.warn(
+ 'Volume [%s] attachment timeout ', volume.name)
else:
logger.warn('Unable to attach volume named [%s]',
volume_name)
else:
logger.warn('Unable to attach volume named [%s]',
volume_name)
floating_ip_setting.router_name)
if ext_gateway and self.vm_active(block=True):
floating_ip = neutron_utils.create_floating_ip(
floating_ip_setting.router_name)
if ext_gateway and self.vm_active(block=True):
floating_ip = neutron_utils.create_floating_ip(
- self.__neutron, ext_gateway, port.id)
+ self.__neutron, self.__keystone, ext_gateway, port.id)
self.__floating_ip_dict[floating_ip_setting.name] = floating_ip
logger.info(
self.__floating_ip_dict[floating_ip_setting.name] = floating_ip
logger.info(
cinder, volume_rec['id'])
if volume:
vm = nova_utils.detach_volume(
cinder, volume_rec['id'])
if volume:
vm = nova_utils.detach_volume(
- self._nova, self.__neutron, self.__vm, volume,
- self.project_id, timeout=VOL_DETACH_TIMEOUT)
+ self._nova, self.__neutron, self.__keystone, self.__vm,
+ volume, self._os_creds.project_name)
if vm:
self.__vm = vm
else:
if vm:
self.__vm = vm
else:
for port_setting in port_settings:
port = neutron_utils.get_port(
for port_setting in port_settings:
port = neutron_utils.get_port(
- self.__neutron, port_settings=port_setting,
- project_id=self.project_id)
+ self.__neutron, self.__keystone, port_settings=port_setting,
+ project_name=self._os_creds.project_name)
if port:
ports.append((port_setting.name, port))
if port:
ports.append((port_setting.name, port))
for port_setting in port_settings:
port = neutron_utils.get_port(
for port_setting in port_settings:
port = neutron_utils.get_port(
- self.__neutron, port_settings=port_setting)
+ self.__neutron, self.__keystone, port_settings=port_setting)
if not port:
port = neutron_utils.create_port(
self.__neutron, self._os_creds, port_setting)
if not port:
port = neutron_utils.create_port(
self.__neutron, self._os_creds, port_setting)
:return: Server object
"""
return nova_utils.get_server_object_by_id(
:return: Server object
"""
return nova_utils.get_server_object_by_id(
- self._nova, self.__neutron, self.__vm.id, self.project_id)
+ self._nova, self.__neutron, self.__keystone, self.__vm.id,
+ self._os_creds.project_name)
def get_console_output(self):
"""
def get_console_output(self):
"""
STATUS_ACTIVE, block, self.instance_settings.vm_boot_timeout,
poll_interval):
self.__vm = nova_utils.get_server_object_by_id(
STATUS_ACTIVE, block, self.instance_settings.vm_boot_timeout,
poll_interval):
self.__vm = nova_utils.get_server_object_by_id(
- self._nova, self.__neutron, self.__vm.id, self.project_id)
+ self._nova, self.__neutron, self.__keystone, self.__vm.id,
+ self._os_creds.project_name)
self._nova, self.__vm, reboot_type=reboot_type)
self._nova, self.__vm, reboot_type=reboot_type)
-def generate_creator(os_creds, vm_inst, image_config, project_id,
+def generate_creator(os_creds, vm_inst, image_config, project_name,
keypair_config=None):
"""
Initializes an OpenStackVmInstance object
:param os_creds: the OpenStack credentials
:param vm_inst: the SNAPS-OO VmInst domain object
:param image_config: the associated ImageConfig object
keypair_config=None):
"""
Initializes an OpenStackVmInstance object
:param os_creds: the OpenStack credentials
:param vm_inst: the SNAPS-OO VmInst domain object
:param image_config: the associated ImageConfig object
- :param project_id: the associated project ID
+ :param project_name: the associated project ID
:param keypair_config: the associated KeypairConfig object (optional)
:return: an initialized OpenStackVmInstance object
"""
nova = nova_utils.nova_client(os_creds)
:param keypair_config: the associated KeypairConfig object (optional)
:return: an initialized OpenStackVmInstance object
"""
nova = nova_utils.nova_client(os_creds)
+ keystone = keystone_utils.keystone_client(os_creds)
neutron = neutron_utils.neutron_client(os_creds)
derived_inst_config = settings_utils.create_vm_inst_config(
neutron = neutron_utils.neutron_client(os_creds)
derived_inst_config = settings_utils.create_vm_inst_config(
- nova, neutron, vm_inst, project_id)
+ nova, keystone, neutron, vm_inst, project_name)
derived_inst_creator = OpenStackVmInstance(
os_creds, derived_inst_config, image_config, keypair_config)
derived_inst_creator = OpenStackVmInstance(
os_creds, derived_inst_config, image_config, keypair_config)
from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
from snaps.openstack.openstack_creator import OpenStackNetworkObject
from snaps.config.network import NetworkConfig, SubnetConfig, PortConfig
from snaps.openstack.openstack_creator import OpenStackNetworkObject
-from snaps.openstack.utils import neutron_utils
+from snaps.openstack.utils import neutron_utils, keystone_utils
super(self.__class__, self).initialize()
try:
super(self.__class__, self).initialize()
try:
+ keystone = keystone_utils.keystone_client(self._os_creds)
self.__network = neutron_utils.get_network(
self.__network = neutron_utils.get_network(
- self._neutron, network_settings=self.network_settings,
- project_id=self.project_id, os_creds=self._os_creds)
+ self._neutron, keystone,
+ network_settings=self.network_settings,
+ project_name=self._os_creds.project_name)
except Unauthorized as e:
logger.warn('Unable to lookup network with name %s - %s',
self.network_settings.name, e)
except Unauthorized as e:
logger.warn('Unable to lookup network with name %s - %s',
self.network_settings.name, e)
# Delete security group 'default' if exists
neutron = neutron_utils.neutron_client(self._os_creds)
default_sec_grp = neutron_utils.get_security_group(
# Delete security group 'default' if exists
neutron = neutron_utils.neutron_client(self._os_creds)
default_sec_grp = neutron_utils.get_security_group(
- neutron, sec_grp_name='default',
- project_id=self.__project.id)
+ neutron, self._keystone, sec_grp_name='default',
+ project_name=self.__project.name)
if default_sec_grp:
try:
neutron_utils.delete_security_group(
if default_sec_grp:
try:
neutron_utils.delete_security_group(
from snaps.config.router import RouterConfig
from snaps.openstack.openstack_creator import OpenStackNetworkObject
from snaps.config.router import RouterConfig
from snaps.openstack.openstack_creator import OpenStackNetworkObject
-from snaps.openstack.utils import neutron_utils
+from snaps.openstack.utils import neutron_utils, keystone_utils
raise RouterCreationError(
'Subnet not found with name ' + internal_subnet_name)
raise RouterCreationError(
'Subnet not found with name ' + internal_subnet_name)
+ keystone = keystone_utils.keystone_client(self._os_creds)
for port_setting in self.router_settings.port_settings:
port = neutron_utils.get_port(
for port_setting in self.router_settings.port_settings:
port = neutron_utils.get_port(
- self._neutron, port_settings=port_setting,
- project_id=self.project_id)
+ self._neutron, keystone, port_settings=port_setting,
+ project_name=self._os_creds.project_name)
if port:
self.__ports.append(port)
if port:
self.__ports.append(port)
if not self.__router:
self.__router = neutron_utils.create_router(
if not self.__router:
self.__router = neutron_utils.create_router(
- self._neutron, self._os_creds, self.router_settings,
- self.project_id)
+ self._neutron, self._os_creds, self.router_settings)
for internal_subnet_name in self.router_settings.internal_subnets:
internal_subnet = neutron_utils.get_subnet(
for internal_subnet_name in self.router_settings.internal_subnets:
internal_subnet = neutron_utils.get_subnet(
raise RouterCreationError(
'Subnet not found with name ' + internal_subnet_name)
raise RouterCreationError(
'Subnet not found with name ' + internal_subnet_name)
+ keystone = keystone_utils.keystone_client(self._os_creds)
for port_setting in self.router_settings.port_settings:
port = neutron_utils.get_port(
for port_setting in self.router_settings.port_settings:
port = neutron_utils.get_port(
- self._neutron, port_settings=port_setting,
- project_id=self.project_id)
+ self._neutron, keystone, port_settings=port_setting,
+ project_name=self._os_creds.project_name)
logger.info(
'Retrieved port %s for router - %s', port_setting.name,
self.router_settings.name)
logger.info(
'Retrieved port %s for router - %s', port_setting.name,
self.router_settings.name)
port_setting.name,
self.router_settings.name)
self.__ports.append(port)
port_setting.name,
self.router_settings.name)
self.__ports.append(port)
- neutron_utils.add_interface_router(self._neutron,
- self.__router,
- port=port)
+ neutron_utils.add_interface_router(
+ self._neutron, self.__router, port=port)
else:
raise RouterCreationError(
'Error creating port with name - '
else:
raise RouterCreationError(
'Error creating port with name - '
"""
super(self.__class__, self).initialize()
"""
super(self.__class__, self).initialize()
+ keystone = keystone_utils.keystone_client(self._os_creds)
self.__security_group = neutron_utils.get_security_group(
self.__security_group = neutron_utils.get_security_group(
- self._neutron, sec_grp_settings=self.sec_grp_settings,
- project_id=self.project_id)
+ self._neutron, keystone, sec_grp_settings=self.sec_grp_settings,
+ project_name=self._os_creds.project_name)
if self.__security_group:
# Populate rules
existing_rules = neutron_utils.get_rules_by_security_group(
if self.__security_group:
# Populate rules
existing_rules = neutron_utils.get_rules_by_security_group(
keystone = keystone_utils.keystone_client(self._os_creds)
self.__security_group = neutron_utils.create_security_group(
keystone = keystone_utils.keystone_client(self._os_creds)
self.__security_group = neutron_utils.create_security_group(
- self._neutron, keystone, self.sec_grp_settings,
- project_id=self.project_id)
+ self._neutron, keystone, self.sec_grp_settings)
# Get the rules added for free
auto_rules = neutron_utils.get_rules_by_security_group(
# Get the rules added for free
auto_rules = neutron_utils.get_rules_by_security_group(
for sec_grp_rule_setting in self.sec_grp_settings.rule_settings:
try:
custom_rule = neutron_utils.create_security_group_rule(
for sec_grp_rule_setting in self.sec_grp_settings.rule_settings:
try:
custom_rule = neutron_utils.create_security_group_rule(
- self._neutron, sec_grp_rule_setting, self.project_id)
+ self._neutron, keystone, sec_grp_rule_setting,
+ self._os_creds.project_name)
self.__rules[sec_grp_rule_setting] = custom_rule
except Conflict as e:
logger.warn('Unable to create rule due to conflict - %s',
self.__rules[sec_grp_rule_setting] = custom_rule
except Conflict as e:
logger.warn('Unable to create rule due to conflict - %s',
:param rule_setting: the rule configuration
"""
rule_setting.sec_grp_name = self.sec_grp_settings.name
:param rule_setting: the rule configuration
"""
rule_setting.sec_grp_name = self.sec_grp_settings.name
+ keystone = keystone_utils.keystone_client(self._os_creds)
new_rule = neutron_utils.create_security_group_rule(
new_rule = neutron_utils.create_security_group_rule(
- self._neutron, rule_setting, self.project_id)
+ self._neutron, keystone, rule_setting, self._os_creds.project_name)
self.__rules[rule_setting] = new_rule
self.sec_grp_settings.rule_settings.append(rule_setting)
self.__rules[rule_setting] = new_rule
self.sec_grp_settings.rule_settings.append(rule_setting)
from snaps.openstack.create_volume_type import OpenStackVolumeType
from snaps.openstack.openstack_creator import OpenStackCloudObject
from snaps.openstack.utils import (
from snaps.openstack.create_volume_type import OpenStackVolumeType
from snaps.openstack.openstack_creator import OpenStackCloudObject
from snaps.openstack.utils import (
- nova_utils, settings_utils, glance_utils, cinder_utils)
+ nova_utils, settings_utils, glance_utils, cinder_utils, keystone_utils)
from snaps.openstack.create_network import OpenStackNetwork
from snaps.openstack.utils import heat_utils, neutron_utils
from snaps.openstack.create_network import OpenStackNetwork
from snaps.openstack.utils import heat_utils, neutron_utils
nova = nova_utils.nova_client(self._os_creds)
nova = nova_utils.nova_client(self._os_creds)
+ keystone = keystone_utils.keystone_client(self._os_creds)
neutron = neutron_utils.neutron_client(self._os_creds)
neutron = neutron_utils.neutron_client(self._os_creds)
stack_servers = heat_utils.get_stack_servers(
stack_servers = heat_utils.get_stack_servers(
- self.__heat_cli, nova, neutron, self.__stack, self.project_id)
+ self.__heat_cli, nova, neutron, keystone, self.__stack,
+ self._os_creds.project_name)
glance = glance_utils.glance_client(self._os_creds)
for stack_server in stack_servers:
vm_inst_settings = settings_utils.create_vm_inst_config(
glance = glance_utils.glance_client(self._os_creds)
for stack_server in stack_servers:
vm_inst_settings = settings_utils.create_vm_inst_config(
- nova, neutron, stack_server, self.project_id)
+ nova, keystone, neutron, stack_server,
+ self._os_creds.project_name)
image_settings = settings_utils.determine_image_config(
glance, stack_server, self.image_settings)
keypair_settings = settings_utils.determine_keypair_config(
image_settings = settings_utils.determine_image_config(
glance, stack_server, self.image_settings)
keypair_settings = settings_utils.determine_keypair_config(
:param os_creds: the OpenStack credentials object
"""
self._os_creds = os_creds
:param os_creds: the OpenStack credentials object
"""
self._os_creds = os_creds
- keystone = keystone_utils.keystone_client(os_creds)
- self.project_id = keystone_utils.get_project(
- keystone=keystone, project_name=os_creds.project_name).id
def initialize(self):
raise NotImplementedError('Do not override abstract method')
def initialize(self):
raise NotImplementedError('Do not override abstract method')
vm_inst = self.inst_creator.create()
self.assertIsNotNone(nova_utils.get_server(
vm_inst = self.inst_creator.create()
self.assertIsNotNone(nova_utils.get_server(
- self.nova, self.neutron, vm_inst_settings=instance_settings))
+ self.nova, self.neutron, self.keystone,
+ vm_inst_settings=instance_settings))
# Delete instance
nova_utils.delete_vm_instance(self.nova, vm_inst)
self.assertTrue(self.inst_creator.vm_deleted(block=True))
self.assertIsNone(nova_utils.get_server(
# Delete instance
nova_utils.delete_vm_instance(self.nova, vm_inst)
self.assertTrue(self.inst_creator.vm_deleted(block=True))
self.assertIsNone(nova_utils.get_server(
- self.nova, self.neutron, vm_inst_settings=instance_settings))
+ self.nova, self.neutron, self.keystone,
+ vm_inst_settings=instance_settings))
# Exception should not be thrown
self.inst_creator.clean()
# Exception should not be thrown
self.inst_creator.clean()
derived_inst_creator = create_instance.generate_creator(
self.os_creds, vm_inst, self.image_creator.image_settings,
derived_inst_creator = create_instance.generate_creator(
self.os_creds, vm_inst, self.image_creator.image_settings,
- self.project_id, self.keypair_creator.keypair_settings)
+ self.os_creds.project_name, self.keypair_creator.keypair_settings)
derived_inst_creator.add_floating_ip(FloatingIpConfig(
name=self.floating_ip_name, port_name=self.port_1_name,
derived_inst_creator.add_floating_ip(FloatingIpConfig(
name=self.floating_ip_name, port_name=self.port_1_name,
- self.image_creator = OpenStackImage(self.os_creds,
- os_image_settings)
+ self.image_creator = OpenStackImage(
+ self.os_creds, os_image_settings)
self.image_creator.create()
# First network is public
self.network_creators.append(OpenStackNetwork(
self.os_creds, self.net_config_1))
self.image_creator.create()
# First network is public
self.network_creators.append(OpenStackNetwork(
self.os_creds, self.net_config_1))
# Second network is private
self.network_creators.append(OpenStackNetwork(
self.os_creds, self.net_config_2))
# Second network is private
self.network_creators.append(OpenStackNetwork(
self.os_creds, self.net_config_2))
network_creator.create()
port_settings = [
network_creator.create()
port_settings = [
- create_network.PortConfig(
name=self.guid + '-router-port1',
ip_addrs=[{
'subnet_name':
self.net_config_1.subnet_settings[0].name,
'ip': static_gateway_ip1
}],
name=self.guid + '-router-port1',
ip_addrs=[{
'subnet_name':
self.net_config_1.subnet_settings[0].name,
'ip': static_gateway_ip1
}],
- network_name=self.net_config_1.name,
- project_name=self.os_creds.project_name),
- create_network.PortConfig(
+ network_name=self.net_config_1.name),
+ PortConfig(
name=self.guid + '-router-port2',
ip_addrs=[{
'subnet_name':
self.net_config_2.subnet_settings[0].name,
'ip': static_gateway_ip2
}],
name=self.guid + '-router-port2',
ip_addrs=[{
'subnet_name':
self.net_config_2.subnet_settings[0].name,
'ip': static_gateway_ip2
}],
- network_name=self.net_config_2.name,
- project_name=self.os_creds.project_name)]
+ network_name=self.net_config_2.name)]
router_settings = RouterConfig(
name=self.guid + '-pub-router', port_settings=port_settings)
router_settings = RouterConfig(
name=self.guid + '-pub-router', port_settings=port_settings)
- self.router_creator = create_router.OpenStackRouter(
+ self.router_creator = OpenStackRouter(
self.os_creds, router_settings)
self.router_creator.create()
self.os_creds, router_settings)
self.router_creator.create()
self.flavor_creator = OpenStackFlavor(
self.admin_os_creds,
FlavorConfig(name=self.guid + '-flavor-name', ram=512,
self.flavor_creator = OpenStackFlavor(
self.admin_os_creds,
FlavorConfig(name=self.guid + '-flavor-name', ram=512,
vm_inst = self.inst_creator.create(block=True)
self.assertIsNotNone(nova_utils.get_server(
vm_inst = self.inst_creator.create(block=True)
self.assertIsNotNone(nova_utils.get_server(
- self.nova, self.neutron, vm_inst_settings=instance_settings))
+ self.nova, self.neutron, self.keystone,
+ vm_inst_settings=instance_settings))
self.assertIsNotNone(vm_inst)
self.assertEqual(1, len(vm_inst.volume_ids))
self.assertIsNotNone(vm_inst)
self.assertEqual(1, len(vm_inst.volume_ids))
vm_inst = self.inst_creator.create(block=True)
self.assertIsNotNone(nova_utils.get_server(
vm_inst = self.inst_creator.create(block=True)
self.assertIsNotNone(nova_utils.get_server(
- self.nova, self.neutron, vm_inst_settings=instance_settings))
+ self.nova, self.neutron, self.keystone,
+ vm_inst_settings=instance_settings))
self.assertIsNotNone(vm_inst)
self.assertEqual(2, len(vm_inst.volume_ids))
self.assertIsNotNone(vm_inst)
self.assertEqual(2, len(vm_inst.volume_ids))
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import (
OSIntegrationTestCase, OSComponentTestCase)
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import (
OSIntegrationTestCase, OSComponentTestCase)
-from snaps.openstack.utils import neutron_utils
+from snaps.openstack.utils import neutron_utils, keystone_utils
from snaps.openstack.utils.tests import neutron_utils_tests
from snaps.openstack.create_network import IPv6Mode as IPv6Mode_old
from snaps.openstack.utils.tests import neutron_utils_tests
from snaps.openstack.create_network import IPv6Mode as IPv6Mode_old
Tests the creation of an OpenStack network without a router.
"""
# Create Network
Tests the creation of an OpenStack network without a router.
"""
# Create Network
- self.net_creator = OpenStackNetwork(self.os_creds,
- self.net_config.network_settings)
+ self.net_creator = OpenStackNetwork(
+ self.os_creds, self.net_config.network_settings)
self.net_creator.create()
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
self.net_creator.create()
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, self.net_creator.network_settings.name, True,
- project_id=self.project_id))
+ self.neutron, self.keystone,
+ self.net_creator.network_settings.name, True,
+ self.os_creds.project_name))
# Validate subnets
self.assertTrue(neutron_utils_tests.validate_subnet(
# Validate subnets
self.assertTrue(neutron_utils_tests.validate_subnet(
Tests the creation of an OpenStack network, it's deletion, then cleanup
"""
# Create Network
Tests the creation of an OpenStack network, it's deletion, then cleanup
"""
# Create Network
- self.net_creator = OpenStackNetwork(self.os_creds,
- self.net_config.network_settings)
+ self.net_creator = OpenStackNetwork(
+ self.os_creds, self.net_config.network_settings)
self.net_creator.create()
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
self.net_creator.create()
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, self.net_creator.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_creator.network_settings.name, True,
+ self.os_creds.project_name))
- neutron_utils.delete_network(self.neutron,
- self.net_creator.get_network())
+ neutron_utils.delete_network(
+ self.neutron, self.net_creator.get_network())
self.assertIsNone(neutron_utils.get_network(
self.assertIsNone(neutron_utils.get_network(
- self.neutron, network_settings=self.net_creator.network_settings,
- os_creds=self.os_creds, project_id=self.project_id))
+ self.neutron, self.keystone,
+ network_settings=self.net_creator.network_settings,
+ project_name=self.os_creds.project_name))
# This shall not throw an exception here
self.net_creator.clean()
# This shall not throw an exception here
self.net_creator.clean()
Tests the creation of an OpenStack network with a router.
"""
# Create Network
Tests the creation of an OpenStack network with a router.
"""
# Create Network
- self.net_creator = OpenStackNetwork(self.os_creds,
- self.net_config.network_settings)
+ self.net_creator = OpenStackNetwork(
+ self.os_creds, self.net_config.network_settings)
self.net_creator.create()
# Create Router
self.net_creator.create()
# Create Router
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, self.net_creator.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_creator.network_settings.name, True,
+ self.os_creds.project_name))
# Validate subnets
self.assertTrue(neutron_utils_tests.validate_subnet(
# Validate subnets
self.assertTrue(neutron_utils_tests.validate_subnet(
OpenStackNetwork object will not create a second.
"""
# Create Network
OpenStackNetwork object will not create a second.
"""
# Create Network
- self.net_creator = OpenStackNetwork(self.os_creds,
- self.net_config.network_settings)
+ self.net_creator = OpenStackNetwork(
+ self.os_creds, self.net_config.network_settings)
self.net_creator.create()
self.net_creator.create()
- self.net_creator2 = OpenStackNetwork(self.os_creds,
- self.net_config.network_settings)
+ self.net_creator2 = OpenStackNetwork(
+ self.os_creds, self.net_config.network_settings)
self.net_creator2.create()
self.assertEqual(self.net_creator.get_network().id,
self.net_creator2.create()
self.assertEqual(self.net_creator.get_network().id,
def test_create_network_router_admin_user_to_new_project(self):
"""
def test_create_network_router_admin_user_to_new_project(self):
"""
- Tests the creation of an OpenStack network and router with the current
- user to the admin project.
+ Tests the creation of an OpenStack network to the the current using
+ the credentials to the admin project.
"""
# Create Network/Subnet where the project names have been changed
"""
# Create Network/Subnet where the project names have been changed
- admin_project_name = self.admin_os_creds.project_name
- self.net_config.network_settings.project_name = admin_project_name
- self.net_config.network_settings.subnet_settings[0].project_name = \
- admin_project_name
- self.net_creator = OpenStackNetwork(self.os_creds,
- self.net_config.network_settings)
+ project_name = self.os_creds.project_name
+ config = self.net_config.network_settings
+ config.project_name = project_name
+ config.subnet_settings[0].project_name = project_name
+
+ self.net_creator = OpenStackNetwork(self.admin_os_creds, config)
self.net_creator.create()
retrieved_net = neutron_utils.get_network(
self.net_creator.create()
retrieved_net = neutron_utils.get_network(
- self.neutron, network_settings=self.net_config.network_settings,
- os_creds=self.os_creds, project_id=self.project_id)
+ self.neutron, self.keystone,
+ network_name=self.net_config.network_settings.name,
+ project_name=self.os_creds.project_name)
self.assertEqual(self.net_creator.get_network().id, retrieved_net.id)
self.assertEqual(self.net_creator.get_network().id, retrieved_net.id)
+ # Initialize with actual credentials
+ config.project_name = None
+ config.subnet_settings[0].project_name = None
+ proj_net_creator = OpenStackNetwork(self.os_creds, config)
+ proj_net = proj_net_creator.create()
+ self.assertEqual(retrieved_net, proj_net)
+
- self.net_config.router_settings.project_name = admin_project_name
+ self.net_config.router_settings.project_name = project_name
self.router_creator = create_router.OpenStackRouter(
self.router_creator = create_router.OpenStackRouter(
- self.os_creds, self.net_config.router_settings)
+ self.admin_os_creds, self.net_config.router_settings)
self.router_creator.create()
retrieved_router = neutron_utils.get_router(
self.router_creator.create()
retrieved_router = neutron_utils.get_router(
self.net_config.network_settings.project_name = new_project_name
self.net_config.network_settings.subnet_settings[0].project_name = \
new_project_name
self.net_config.network_settings.project_name = new_project_name
self.net_config.network_settings.subnet_settings[0].project_name = \
new_project_name
- self.net_creator = OpenStackNetwork(self.admin_os_creds,
- self.net_config.network_settings)
+ self.net_creator = OpenStackNetwork(
+ self.admin_os_creds, self.net_config.network_settings)
self.net_creator.create()
retrieved_net = neutron_utils.get_network(
self.net_creator.create()
retrieved_net = neutron_utils.get_network(
- self.neutron, network_settings=self.net_config.network_settings,
- os_creds=self.os_creds, project_id=self.project_id)
+ self.neutron, self.keystone,
+ network_settings=self.net_config.network_settings,
+ project_name=self.os_creds.project_name)
self.assertEqual(self.net_creator.get_network().id, retrieved_net.id)
self.assertEqual(self.net_creator.get_network().id, retrieved_net.id)
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, self.net_creator.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_creator.network_settings.name, True,
+ self.os_creds.project_name))
network = self.net_creator.get_network()
self.assertEqual(1, len(network.subnets))
network = self.net_creator.get_network()
self.assertEqual(1, len(network.subnets))
net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet')
self.neutron = neutron_utils.neutron_client(self.os_creds)
net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet')
self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
# Initialize for cleanup
self.net_creator = None
# Initialize for cleanup
self.net_creator = None
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, net_settings.name, True,
- self.net_creator.project_id))
+ self.neutron, self.keystone, net_settings.name, True,
+ self.os_creds.project_name))
self.assertEquals(network_type, network.type)
self.assertEquals(network_type, network.type)
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, net_settings.name, True,
- self.net_creator.project_id))
+ self.neutron, self.keystone, net_settings.name, True,
+ self.os_creds.project_name))
self.assertEquals(network_type, network.type)
self.assertEquals(network_type, network.type)
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, net_settings.name, True, project_id=self.project_id))
+ self.neutron, self.keystone, net_settings.name, True,
+ self.os_creds.project_name))
self.assertEqual(network_type, network.type)
self.assertEqual(network_type, network.type)
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
# Validate network was created
self.assertTrue(neutron_utils_tests.validate_network(
- self.neutron, net_settings.name, True))
+ self.neutron, self.keystone, net_settings.name, True,
+ self.os_creds.project_name))
self.assertEquals(network_type, network.type)
self.assertEquals(network_type, network.type)
super(self.__class__, self).__start__()
guid = uuid.uuid4()
super(self.__class__, self).__start__()
guid = uuid.uuid4()
- self.qos_settings = QoSConfig(
+ qos_settings = QoSConfig(
name=self.__class__.__name__ + '-' + str(guid),
consumer=Consumer.both)
name=self.__class__.__name__ + '-' + str(guid),
consumer=Consumer.both)
- self.cinder = cinder_utils.cinder_client(self.os_creds)
- self.qos_creator = None
+ self.cinder = cinder_utils.cinder_client(self.admin_os_creds)
+ self.qos_creator = create_qos.OpenStackQoS(
+ self.admin_os_creds, qos_settings)
Tests the creation of an OpenStack qos.
"""
# Create QoS
Tests the creation of an OpenStack qos.
"""
# Create QoS
- self.qos_creator = create_qos.OpenStackQoS(
- self.os_creds, self.qos_settings)
created_qos = self.qos_creator.create()
self.assertIsNotNone(created_qos)
retrieved_qos = cinder_utils.get_qos(
created_qos = self.qos_creator.create()
self.assertIsNotNone(created_qos)
retrieved_qos = cinder_utils.get_qos(
- self.cinder, qos_settings=self.qos_settings)
+ self.cinder, qos_settings=self.qos_creator.qos_settings)
self.assertIsNotNone(retrieved_qos)
self.assertEqual(created_qos, retrieved_qos)
self.assertIsNotNone(retrieved_qos)
self.assertEqual(created_qos, retrieved_qos)
clean() does not raise an Exception.
"""
# Create QoS
clean() does not raise an Exception.
"""
# Create QoS
- self.qos_creator = create_qos.OpenStackQoS(
- self.os_creds, self.qos_settings)
created_qos = self.qos_creator.create()
self.assertIsNotNone(created_qos)
retrieved_qos = cinder_utils.get_qos(
created_qos = self.qos_creator.create()
self.assertIsNotNone(created_qos)
retrieved_qos = cinder_utils.get_qos(
- self.cinder, qos_settings=self.qos_settings)
+ self.cinder, qos_settings=self.qos_creator.qos_settings)
self.assertIsNotNone(retrieved_qos)
self.assertEqual(created_qos, retrieved_qos)
self.assertIsNotNone(retrieved_qos)
self.assertEqual(created_qos, retrieved_qos)
cinder_utils.delete_qos(self.cinder, created_qos)
self.assertIsNone(cinder_utils.get_qos(
cinder_utils.delete_qos(self.cinder, created_qos)
self.assertIsNone(cinder_utils.get_qos(
- self.cinder, qos_settings=self.qos_settings))
+ self.cinder, qos_settings=self.qos_creator.qos_settings))
# Must not raise an exception when attempting to cleanup non-existent
# qos
# Must not raise an exception when attempting to cleanup non-existent
# qos
Tests the creation of an OpenStack qos when one already exists.
"""
# Create QoS
Tests the creation of an OpenStack qos when one already exists.
"""
# Create QoS
- self.qos_creator = create_qos.OpenStackQoS(
- self.os_creds, self.qos_settings)
qos1 = self.qos_creator.create()
retrieved_qos = cinder_utils.get_qos(
qos1 = self.qos_creator.create()
retrieved_qos = cinder_utils.get_qos(
- self.cinder, qos_settings=self.qos_settings)
+ self.cinder, qos_settings=self.qos_creator.qos_settings)
self.assertEqual(qos1, retrieved_qos)
# Should be retrieving the instance data
os_qos_2 = create_qos.OpenStackQoS(
self.assertEqual(qos1, retrieved_qos)
# Should be retrieving the instance data
os_qos_2 = create_qos.OpenStackQoS(
- self.os_creds, self.qos_settings)
+ self.admin_os_creds, self.qos_creator.qos_settings)
qos2 = os_qos_2.create()
self.assertEqual(qos1, qos2)
qos2 = os_qos_2.create()
self.assertEqual(qos1, qos2)
self.admin_os_creds, router_settings)
self.router_creator.create()
self.admin_os_creds, router_settings)
self.router_creator.create()
- router = neutron_utils.get_router(self.neutron,
- router_settings=router_settings)
+ router = neutron_utils.get_router(
+ self.neutron, router_settings=router_settings)
self.assertIsNotNone(router)
self.assertIsNotNone(router)
- self.assertEqual(self.router_creator.get_router(), router)
+ self.assertEqual(self.router_creator.get_router().id, router.id)
self.check_router_recreation(router, router_settings)
self.check_router_recreation(router, router_settings)
- def test_create_router_new_user_to_admin_project(self):
+ def test_create_router_new_user_as_admin_project(self):
"""
Test creation of a most basic router with the new user pointing
to the admin project.
"""
router_settings = RouterConfig(
name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
"""
Test creation of a most basic router with the new user pointing
to the admin project.
"""
router_settings = RouterConfig(
name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
- project_name=self.admin_os_creds.project_name)
+ project_name=self.os_creds.project_name)
self.router_creator = create_router.OpenStackRouter(
self.router_creator = create_router.OpenStackRouter(
- self.os_creds, router_settings)
+ self.admin_os_creds, router_settings)
self.router_creator.create()
self.router_creator.create()
- router = neutron_utils.get_router(self.neutron,
- router_settings=router_settings)
+ router = neutron_utils.get_router(
+ self.neutron, router_settings=router_settings)
self.assertIsNotNone(router)
self.assertIsNotNone(router)
- self.assertEqual(self.router_creator.get_router(), router)
+ self.assertEqual(self.router_creator.get_router().id, router.id)
self.check_router_recreation(router, router_settings)
self.check_router_recreation(router, router_settings)
network_settings1.subnet_settings[0].name,
'ip': static_gateway_ip1
}],
network_settings1.subnet_settings[0].name,
'ip': static_gateway_ip1
}],
- network_name=network_settings1.name,
- project_name=self.os_creds.project_name),
+ network_name=network_settings1.name),
create_network.PortConfig(
name=self.guid + '-port2',
ip_addrs=[{
'subnet_name': network_settings2.subnet_settings[0].name,
'ip': static_gateway_ip2
}],
create_network.PortConfig(
name=self.guid + '-port2',
ip_addrs=[{
'subnet_name': network_settings2.subnet_settings[0].name,
'ip': static_gateway_ip2
}],
- network_name=network_settings2.name,
- project_name=self.os_creds.project_name)]
+ network_name=network_settings2.name)]
router_settings = RouterConfig(
name=self.guid + '-pub-router', port_settings=port_settings)
router_settings = RouterConfig(
name=self.guid + '-pub-router', port_settings=port_settings)
- self.router_creator = create_router.OpenStackRouter(self.os_creds,
- router_settings)
+ self.router_creator = create_router.OpenStackRouter(
+ self.os_creds, router_settings)
self.router_creator.create()
router = neutron_utils.get_router(
self.router_creator.create()
router = neutron_utils.get_router(
ip_addrs=[{
'subnet_name': network_settings.subnet_settings[0].name,
'ip': static_gateway_ip1}],
ip_addrs=[{
'subnet_name': network_settings.subnet_settings[0].name,
'ip': static_gateway_ip1}],
- network_name=network_settings.name,
- project_name=self.os_creds.project_name)]
+ network_name=network_settings.name)]
router_settings = RouterConfig(
name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
router_settings = RouterConfig(
name=self.guid + '-pub-router', external_gateway=self.ext_net_name,
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
+ # Create Security Group
sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group()))
def test_create_group_admin_user_to_new_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
self.sec_grp_creator.get_security_group()))
def test_create_group_admin_user_to_new_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
+ # Create Security Group
sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
- project_name=self.admin_os_creds.project_name)
- self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
- self.os_creds, sec_grp_settings)
+ project_name=self.os_creds.project_name)
+ self.sec_grp_creator = OpenStackSecurityGroup(
+ self.admin_os_creds, sec_grp_settings)
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
self.sec_grp_creator.get_security_group(), rules))
+ self.assertEqual(self.sec_grp_creator.get_security_group().id,
+ sec_grp.id)
+
+ proj_creator = OpenStackSecurityGroup(
+ self.os_creds, SecurityGroupConfig(name=self.sec_grp_name))
+ proj_creator.create()
+
+ self.assertEqual(self.sec_grp_creator.get_security_group().id,
+ proj_creator.get_security_group().id)
+
def test_create_group_new_user_to_admin_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
def test_create_group_new_user_to_admin_project(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
+ # Create Security Group
sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
project_name=self.os_creds.project_name)
sec_grp_settings = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group',
project_name=self.os_creds.project_name)
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
self.sec_grp_creator.get_security_group(), rules))
def test_create_delete_group(self):
"""
Tests the creation of an OpenStack Security Group without custom rules.
"""
+ # Create Security Group
sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name,
description='hello group')
self.sec_grp_creator = create_security_group.OpenStackSecurityGroup(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group()))
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
self.sec_grp_creator.get_security_group()))
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
+ self.neutron, self.keystone,
sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
self.sec_grp_creator.clean()
sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
self.sec_grp_creator.clean()
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
def test_create_group_with_one_complex_rule(self):
self.sec_grp_creator.get_security_group(), rules))
def test_create_group_with_one_complex_rule(self):
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
def test_create_group_with_several_rules(self):
self.sec_grp_creator.get_security_group(), rules))
def test_create_group_with_several_rules(self):
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
Tests the creation of an OpenStack Security Group with one simple
custom rule.
"""
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
def test_add_rule(self):
self.sec_grp_creator.get_security_group(), rules))
def test_add_rule(self):
Tests the creation of an OpenStack Security Group with one simple
custom rule then adds one after creation.
"""
Tests the creation of an OpenStack Security Group with one simple
custom rule then adds one after creation.
"""
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
rules = neutron_utils.get_rules_by_security_group(
self.sec_grp_creator.get_security_group(), rules))
rules = neutron_utils.get_rules_by_security_group(
Tests the creation of an OpenStack Security Group with two simple
custom rules then removes one by the rule ID.
"""
Tests the creation of an OpenStack Security Group with two simple
custom rules then removes one by the rule ID.
"""
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
self.sec_grp_creator.remove_rule(
self.sec_grp_creator.get_security_group(), rules))
self.sec_grp_creator.remove_rule(
Tests the creation of an OpenStack Security Group with two simple
custom rules then removes one by the rule setting object
"""
Tests the creation of an OpenStack Security Group with two simple
custom rules then removes one by the rule setting object
"""
+ # Create Security Group
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
sec_grp_rule_settings = list()
sec_grp_rule_settings.append(
SecurityGroupRuleConfig(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
self.sec_grp_creator.create()
sec_grp = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
self.assertTrue(
validate_sec_grp(
self.assertTrue(
validate_sec_grp(
- self.neutron, self.sec_grp_creator.sec_grp_settings,
+ self.neutron, self.keystone,
+ self.sec_grp_creator.sec_grp_settings,
self.sec_grp_creator.get_security_group(), rules))
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
self.sec_grp_creator.get_security_group(), rules))
self.sec_grp_creator.remove_rule(rule_setting=sec_grp_rule_settings[0])
self.assertEqual(len(rules) - 1, len(rules_after_del))
self.assertEqual(len(rules) - 1, len(rules_after_del))
-def validate_sec_grp(neutron, sec_grp_settings, sec_grp, rules=list()):
+def validate_sec_grp(neutron, keystone, sec_grp_settings, sec_grp,
+ rules=list()):
"""
Returns True is the settings on a security group are properly contained
on the SNAPS SecurityGroup domain object
:param neutron: the neutron client
"""
Returns True is the settings on a security group are properly contained
on the SNAPS SecurityGroup domain object
:param neutron: the neutron client
+ :param keystone: the keystone client
:param sec_grp_settings: the security group configuration
:param sec_grp: the SNAPS-OO security group object
:param rules: collection of SNAPS-OO security group rule objects
:param sec_grp_settings: the security group configuration
:param sec_grp: the SNAPS-OO security group object
:param rules: collection of SNAPS-OO security group rule objects
return (sec_grp.description == sec_grp_settings.description and
sec_grp.name == sec_grp_settings.name and
validate_sec_grp_rules(
return (sec_grp.description == sec_grp_settings.description and
sec_grp.name == sec_grp_settings.name and
validate_sec_grp_rules(
- neutron, sec_grp_settings.rule_settings, rules))
+ neutron, keystone, sec_grp_settings.rule_settings, rules))
-def validate_sec_grp_rules(neutron, rule_settings, rules):
+def validate_sec_grp_rules(neutron, keystone, rule_settings, rules):
"""
Returns True is the settings on a security group rule are properly
contained on the SNAPS SecurityGroupRule domain object.
"""
Returns True is the settings on a security group rule are properly
contained on the SNAPS SecurityGroupRule domain object.
this is the only means to tell if the rule is custom or defaulted by
OpenStack
:param neutron: the neutron client
this is the only means to tell if the rule is custom or defaulted by
OpenStack
:param neutron: the neutron client
+ :param keystone: the keystone client
:param rule_settings: collection of SecurityGroupRuleConfig objects
:param rules: a collection of SecurityGroupRule domain objects
:return: T/F
:param rule_settings: collection of SecurityGroupRuleConfig objects
:param rules: a collection of SecurityGroupRule domain objects
:return: T/F
match = False
for rule in rules:
sec_grp = neutron_utils.get_security_group(
match = False
for rule in rules:
sec_grp = neutron_utils.get_security_group(
- neutron, sec_grp_name=rule_setting.sec_grp_name)
+ neutron, keystone, sec_grp_name=rule_setting.sec_grp_name)
setting_eth_type = create_security_group.Ethertype.IPv4
if rule_setting.ethertype:
setting_eth_type = create_security_group.Ethertype.IPv4
if rule_setting.ethertype:
Tests the creation of an OpenStack Security Group with the same name
within a different project/tenant.
"""
Tests the creation of an OpenStack Security Group with the same name
within a different project/tenant.
"""
+ # Create Security Group
sec_grp_config = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group')
self.sec_grp_creator_proj = OpenStackSecurityGroup(
sec_grp_config = SecurityGroupConfig(
name=self.sec_grp_name, description='hello group')
self.sec_grp_creator_proj = OpenStackSecurityGroup(
StackSettings, StackCreationError, StackError, OpenStackHeatStack)
from snaps.openstack.tests import openstack_tests, create_instance_tests
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
StackSettings, StackCreationError, StackError, OpenStackHeatStack)
from snaps.openstack.tests import openstack_tests, create_instance_tests
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
-from snaps.openstack.utils import (heat_utils, neutron_utils, nova_utils,
- keystone_utils)
+from snaps.openstack.utils import (
+ heat_utils, neutron_utils, nova_utils, keystone_utils)
self.assertEqual(1, len(net_creators))
self.assertEqual(self.network_name, net_creators[0].get_network().name)
self.assertEqual(1, len(net_creators))
self.assertEqual(self.network_name, net_creators[0].get_network().name)
- neutron = neutron_utils.neutron_client(self.os_creds)
- admin_proj_id = keystone_utils.get_project(
- self.keystone, self.admin_os_creds.project_name)
+ # Need to use 'admin' creds as heat creates objects under it's own
+ # project/tenant
+ neutron = neutron_utils.neutron_client(self.admin_os_creds)
+ keystone = keystone_utils.keystone_client(self.admin_os_creds)
net_by_name = neutron_utils.get_network(
net_by_name = neutron_utils.get_network(
- neutron, network_name=net_creators[0].get_network().name,
- project_id=admin_proj_id)
+ neutron, keystone, network_name=net_creators[0].get_network().name)
self.assertEqual(net_creators[0].get_network(), net_by_name)
self.assertIsNotNone(neutron_utils.get_network_by_id(
neutron, net_creators[0].get_network().id))
self.assertEqual(net_creators[0].get_network(), net_by_name)
self.assertIsNotNone(neutron_utils.get_network_by_id(
neutron, net_creators[0].get_network().id))
nova = nova_utils.nova_client(self.admin_os_creds)
neutron = neutron_utils.neutron_client(self.admin_os_creds)
nova = nova_utils.nova_client(self.admin_os_creds)
neutron = neutron_utils.neutron_client(self.admin_os_creds)
+ keystone = keystone_utils.keystone_client(self.admin_os_creds)
vm_inst_by_name = nova_utils.get_server(
vm_inst_by_name = nova_utils.get_server(
- nova, neutron, server_name=vm_inst_creators[0].get_vm_inst().name)
+ nova, neutron, keystone,
+ server_name=vm_inst_creators[0].get_vm_inst().name)
+
self.assertEqual(vm_inst_creators[0].get_vm_inst(), vm_inst_by_name)
self.assertIsNotNone(nova_utils.get_server_object_by_id(
self.assertEqual(vm_inst_creators[0].get_vm_inst(), vm_inst_by_name)
self.assertIsNotNone(nova_utils.get_server_object_by_id(
- nova, neutron, vm_inst_creators[0].get_vm_inst().id,
- vm_inst_creators[0].project_id))
+ nova, neutron, keystone, vm_inst_creators[0].get_vm_inst().id))
class CreateStackFloatingIpTests(OSIntegrationTestCase):
class CreateStackFloatingIpTests(OSIntegrationTestCase):
router = creator.get_router()
ext_net = neutron_utils.get_network(
router = creator.get_router()
ext_net = neutron_utils.get_network(
- self.neutron, network_name=self.ext_net_name,
- project_id=self.project_id)
+ self.neutron, self.keystone, network_name=self.ext_net_name)
self.assertEqual(ext_net.id, router.external_network_id)
self.assertEqual(ext_net.id, router.external_network_id)
self.volume_type_name = guid + '-vol-type'
self.volume_type_creator = OpenStackVolumeType(
self.volume_type_name = guid + '-vol-type'
self.volume_type_creator = OpenStackVolumeType(
- self.os_creds, VolumeTypeConfig(name=self.volume_type_name))
+ self.admin_os_creds, VolumeTypeConfig(name=self.volume_type_name))
self.volume_type_creator.create()
self.volume_creator = None
self.volume_type_creator.create()
self.volume_creator = None
Expect a NotFound to be raised when the volume type does not exist
"""
self.volume_creator = OpenStackVolume(
Expect a NotFound to be raised when the volume type does not exist
"""
self.volume_creator = OpenStackVolume(
- self.os_creds,
- VolumeConfig(
+ self.admin_os_creds, VolumeConfig(
name=self.volume_name, type_name=self.volume_type_name))
created_volume = self.volume_creator.create(block=True)
name=self.volume_name, type_name=self.volume_type_name))
created_volume = self.volume_creator.create(block=True)
from snaps.openstack import create_volume_type
from snaps.openstack.create_volume_type import (
from snaps.openstack import create_volume_type
from snaps.openstack.create_volume_type import (
- VolumeTypeSettings, VolumeTypeEncryptionSettings)
+ VolumeTypeSettings, VolumeTypeEncryptionSettings, OpenStackVolumeType)
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import cinder_utils
from snaps.openstack.tests.os_source_file_test import OSIntegrationTestCase
from snaps.openstack.utils import cinder_utils
self.volume_type_settings = VolumeTypeConfig(
name=self.__class__.__name__ + '-' + str(guid))
self.volume_type_settings = VolumeTypeConfig(
name=self.__class__.__name__ + '-' + str(guid))
- self.cinder = cinder_utils.cinder_client(self.os_creds)
- self.volume_type_creator = None
+ self.cinder = cinder_utils.cinder_client(self.admin_os_creds)
+ self.volume_type_creator = OpenStackVolumeType(
+ self.admin_os_creds, self.volume_type_settings)
Tests the creation of an OpenStack volume.
"""
# Create VolumeType
Tests the creation of an OpenStack volume.
"""
# Create VolumeType
- self.volume_type_creator = create_volume_type.OpenStackVolumeType(
- self.os_creds, self.volume_type_settings)
created_volume_type = self.volume_type_creator.create()
self.assertIsNotNone(created_volume_type)
self.assertEqual(self.volume_type_settings.name,
created_volume_type = self.volume_type_creator.create()
self.assertIsNotNone(created_volume_type)
self.assertEqual(self.volume_type_settings.name,
clean() does not raise an Exception.
"""
# Create VolumeType
clean() does not raise an Exception.
"""
# Create VolumeType
- self.volume_type_creator = create_volume_type.OpenStackVolumeType(
- self.os_creds, self.volume_type_settings)
created_volume_type = self.volume_type_creator.create()
self.assertIsNotNone(created_volume_type)
created_volume_type = self.volume_type_creator.create()
self.assertIsNotNone(created_volume_type)
Tests the creation of an OpenStack volume_type when one already exists.
"""
# Create VolumeType
Tests the creation of an OpenStack volume_type when one already exists.
"""
# Create VolumeType
- self.volume_type_creator = create_volume_type.OpenStackVolumeType(
- self.os_creds, self.volume_type_settings)
volume_type1 = self.volume_type_creator.create()
retrieved_volume_type = cinder_utils.get_volume_type(
volume_type1 = self.volume_type_creator.create()
retrieved_volume_type = cinder_utils.get_volume_type(
# Should be retrieving the instance data
os_volume_type_2 = create_volume_type.OpenStackVolumeType(
# Should be retrieving the instance data
os_volume_type_2 = create_volume_type.OpenStackVolumeType(
- self.os_creds, self.volume_type_settings)
+ self.admin_os_creds, self.volume_type_settings)
volume_type2 = os_volume_type_2.create()
self.assertEqual(volume_type2, volume_type2)
volume_type2 = os_volume_type_2.create()
self.assertEqual(volume_type2, volume_type2)
def setUp(self):
super(self.__class__, self).__start__()
def setUp(self):
super(self.__class__, self).__start__()
- self.cinder = cinder_utils.cinder_client(self.os_creds)
+ self.cinder = cinder_utils.cinder_client(self.admin_os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
qos_settings = QoSConfig(
name=guid + '-qos-spec', consumer=Consumer.both)
qos_settings = QoSConfig(
name=guid + '-qos-spec', consumer=Consumer.both)
- self.qos_creator = OpenStackQoS(self.os_creds, qos_settings)
+ self.qos_creator = OpenStackQoS(self.admin_os_creds, qos_settings)
self.qos_creator.create()
def tearDown(self):
self.qos_creator.create()
def tearDown(self):
"""
Creates a Volume Type object with an associated QoS Spec
"""
"""
Creates a Volume Type object with an associated QoS Spec
"""
- self.volume_type_creator = create_volume_type.OpenStackVolumeType(
- self.os_creds,
+ self.volume_type_creator = OpenStackVolumeType(
+ self.admin_os_creds,
VolumeTypeConfig(
name=self.volume_type_name,
qos_spec_name=self.qos_creator.qos_settings.name))
VolumeTypeConfig(
name=self.volume_type_name,
qos_spec_name=self.qos_creator.qos_settings.name))
encryption_settings = VolumeTypeEncryptionConfig(
name='foo', provider_class='bar',
control_location=ControlLocation.back_end)
encryption_settings = VolumeTypeEncryptionConfig(
name='foo', provider_class='bar',
control_location=ControlLocation.back_end)
- self.volume_type_creator = create_volume_type.OpenStackVolumeType(
- self.os_creds,
+ self.volume_type_creator = OpenStackVolumeType(
+ self.admin_os_creds,
VolumeTypeConfig(
name=self.volume_type_name,
encryption=encryption_settings))
VolumeTypeConfig(
name=self.volume_type_name,
encryption=encryption_settings))
encryption_settings = VolumeTypeEncryptionConfig(
name='foo', provider_class='bar',
control_location=ControlLocation.back_end)
encryption_settings = VolumeTypeEncryptionConfig(
name='foo', provider_class='bar',
control_location=ControlLocation.back_end)
- self.volume_type_creator = create_volume_type.OpenStackVolumeType(
- self.os_creds,
+ self.volume_type_creator = OpenStackVolumeType(
+ self.admin_os_creds,
VolumeTypeConfig(
name=self.volume_type_name,
encryption=encryption_settings,
VolumeTypeConfig(
name=self.volume_type_name,
encryption=encryption_settings,
self.image_metadata = image_metadata
self.image_metadata = image_metadata
- keystone = keystone_utils.keystone_client(self.os_creds)
- self.project_id = keystone_utils.get_project(
- keystone=keystone, project_name=self.os_creds.project_name).id
-
@staticmethod
def parameterize(testcase_klass, os_creds, ext_net_name,
image_metadata=None, log_level=logging.DEBUG):
@staticmethod
def parameterize(testcase_klass, os_creds, ext_net_name,
image_metadata=None, log_level=logging.DEBUG):
self.user_creator = deploy_utils.create_user(
self.admin_os_creds, UserConfig(
name=guid + '-user', password=guid,
self.user_creator = deploy_utils.create_user(
self.admin_os_creds, UserConfig(
name=guid + '-user', password=guid,
- project_name=project_name, roles={
- 'admin': self.project_creator.project_settings.name},
+ project_name=project_name,
domain_name=self.admin_os_creds.user_domain_name))
self.os_creds = self.user_creator.get_os_creds(
domain_name=self.admin_os_creds.user_domain_name))
self.os_creds = self.user_creator.get_os_creds(
# add user to project
self.project_creator.assoc_user(self.user_creator.get_user())
# add user to project
self.project_creator.assoc_user(self.user_creator.get_user())
- self.project_id = self.project_creator.get_project().id
-def get_stack_servers(heat_cli, nova, neutron, stack, project_id):
+def get_stack_servers(heat_cli, nova, neutron, keystone, stack, project_name):
"""
Returns a list of VMInst domain objects associated with a Stack
:param heat_cli: the OpenStack heat client object
:param nova: the OpenStack nova client object
:param neutron: the OpenStack neutron client object
"""
Returns a list of VMInst domain objects associated with a Stack
:param heat_cli: the OpenStack heat client object
:param nova: the OpenStack nova client object
:param neutron: the OpenStack neutron client object
+ :param keystone: the OpenStack keystone client object
:param stack: the SNAPS-OO Stack domain object
:param stack: the SNAPS-OO Stack domain object
- :param project_id: the associated project ID
+ :param project_name: the associated project ID
:return: a list of VMInst domain objects
"""
:return: a list of VMInst domain objects
"""
for resource in srvr_res:
try:
server = nova_utils.get_server_object_by_id(
for resource in srvr_res:
try:
server = nova_utils.get_server_object_by_id(
- nova, neutron, resource.id, project_id)
+ nova, neutron, keystone, resource.id, project_name)
if server:
out.append(server)
except NotFound:
if server:
out.append(server)
except NotFound:
heat_cli, res_res.id, 'OS::Nova::Server')
for res_srvr in res_res_srvrs:
server = nova_utils.get_server_object_by_id(
heat_cli, res_res.id, 'OS::Nova::Server')
for res_srvr in res_res_srvrs:
server = nova_utils.get_server_object_by_id(
- nova, neutron, res_srvr.id, project_id)
+ nova, neutron, keystone, res_srvr.id, project_name)
if server:
out.append(server)
if server:
out.append(server)
from keystoneauth1.identity import v3, v2
from keystoneauth1 import session
import requests
from keystoneauth1.identity import v3, v2
from keystoneauth1 import session
import requests
+from keystoneclient.exceptions import NotFound
from snaps.domain.project import Project, Domain
from snaps.domain.role import Role
from snaps.domain.project import Project, Domain
from snaps.domain.role import Role
-def get_project(keystone=None, os_creds=None, project_settings=None,
- project_name=None):
+def get_project(keystone=None, project_settings=None, project_name=None):
"""
Returns the first project where the project_settings is used for the query
if not None, else the project_name parameter is used for the query. If both
parameters are None, None is returned
:param keystone: the Keystone client
"""
Returns the first project where the project_settings is used for the query
if not None, else the project_name parameter is used for the query. If both
parameters are None, None is returned
:param keystone: the Keystone client
- :param os_creds: the OpenStack credentials used to obtain the Keystone
- client if the keystone parameter is None
:param project_settings: a ProjectConfig object
:param project_name: the name to query
:return: the SNAPS-OO Project domain object or None
"""
:param project_settings: a ProjectConfig object
:param project_name: the name to query
:return: the SNAPS-OO Project domain object or None
"""
- if not keystone:
- if os_creds:
- keystone = keystone_client(os_creds)
- else:
- raise KeystoneException(
- 'Cannot lookup project without the proper credentials')
-
proj_filter = dict()
if project_name:
proj_filter = dict()
if project_name:
+def get_project_by_id(keystone, proj_id):
+ """
+ Returns the first project where the project_settings is used for the query
+ if not None, else the project_name parameter is used for the query. If both
+ parameters are None, None is returned
+ :param keystone: the Keystone client
+ :param proj_id: the project ID
+ """
+ if proj_id and len(proj_id) > 0:
+ try:
+ os_proj = keystone.projects.get(proj_id)
+ if os_proj:
+ return Project(name=os_proj.name, project_id=os_proj.id,
+ domain_id=os_proj)
+ except NotFound:
+ pass
+ except KeyError:
+ pass
+
+
def create_project(keystone, project_settings):
"""
Creates a project
def create_project(keystone, project_settings):
"""
Creates a project
"""
project = None
if user_settings.project_name:
"""
project = None
if user_settings.project_name:
- project = get_project(keystone=keystone,
- project_name=user_settings.project_name)
+ project = get_project(
+ keystone=keystone, project_name=user_settings.project_name)
if keystone.version == V2_VERSION_STR:
project_id = None
if keystone.version == V2_VERSION_STR:
project_id = None
neutron.delete_network(network.id)
neutron.delete_network(network.id)
-def get_network(neutron, network_settings=None, network_name=None,
- project_id=None, os_creds=None):
+def get_network(neutron, keystone, network_settings=None, network_name=None,
+ project_name=None):
"""
Returns Network SNAPS-OO domain object the first network found with
either the given attributes from the network_settings object if not None,
else the query will use just the name from the network_name parameter.
"""
Returns Network SNAPS-OO domain object the first network found with
either the given attributes from the network_settings object if not None,
else the query will use just the name from the network_name parameter.
- When the project_id is included, that will be added to the query filter.
- :param neutron: the client
+ When the project_name is included, that will be added to the query filter.
+ :param neutron: the Neutron client
+ :param keystone: the Keystone client
:param network_settings: the NetworkConfig object used to create filter
:param network_name: the name of the network to retrieve
:param network_settings: the NetworkConfig object used to create filter
:param network_name: the name of the network to retrieve
- :param project_id: the id of the network's project
- :param os_creds: the OpenStack credentials for retrieving the project
+ :param project_name: the name of the network's project
:return: a SNAPS-OO Network domain object
"""
net_filter = dict()
:return: a SNAPS-OO Network domain object
"""
net_filter = dict()
elif network_name:
net_filter['name'] = network_name
elif network_name:
net_filter['name'] = network_name
- if network_settings and network_settings.project_name and os_creds:
- net_filter['project_id'] = keystone_utils.get_project(
- os_creds=os_creds, project_name=network_settings.project_name).id
- elif project_id:
- net_filter['project_id'] = project_id
-
networks = neutron.list_networks(**net_filter)
for network, netInsts in networks.items():
for inst in netInsts:
networks = neutron.list_networks(**net_filter)
for network, netInsts in networks.items():
for inst in netInsts:
- return __map_network(neutron, inst)
+ if project_name:
+ project = keystone_utils.get_project_by_id(
+ keystone, inst['project_id'])
+ if project and project.name == project_name:
+ return __map_network(neutron, inst)
+ else:
+ return __map_network(neutron, inst)
def __get_os_network_by_id(neutron, network_id):
def __get_os_network_by_id(neutron, network_id):
-def create_router(neutron, os_creds, router_settings, project_id):
+def create_router(neutron, os_creds, router_settings):
"""
Creates a router for OpenStack
:param neutron: the client
"""
Creates a router for OpenStack
:param neutron: the client
:param router_settings: A dictionary containing the router configuration
and is responsible for creating the subnet request
JSON body
:param router_settings: A dictionary containing the router configuration
and is responsible for creating the subnet request
JSON body
- :param project_id: the associated project ID
:return: a SNAPS-OO Router domain object
"""
if neutron:
:return: a SNAPS-OO Router domain object
"""
if neutron:
- if router_settings and router_settings.project_name:
- keystone = keystone_utils.keystone_client(os_creds)
- project_id = keystone_utils.get_project(
- keystone=keystone, project_name=router_settings.project_name)
- json_body = router_settings.dict_for_neutron(
- neutron, os_creds, project_id)
+ json_body = router_settings.dict_for_neutron(neutron, os_creds)
logger.info('Creating router with name - ' + router_settings.name)
os_router = neutron.create_router(json_body)
return __map_router(neutron, os_router['router'])
logger.info('Creating router with name - ' + router_settings.name)
os_router = neutron.create_router(json_body)
return __map_router(neutron, os_router['router'])
neutron.delete_port(port.id)
neutron.delete_port(port.id)
-def get_port(neutron, port_settings=None, port_name=None, project_id=None):
+def get_port(neutron, keystone, port_settings=None, port_name=None,
+ project_name=None):
"""
Returns the first port object (dictionary) found for the given query
"""
Returns the first port object (dictionary) found for the given query
- :param neutron: the client
+ :param neutron: the Neutron client
+ :param keystone: the Keystone client
:param port_settings: the PortConfig object used for generating the query
:param port_name: if port_settings is None, this name is the value to place
into the query
:param port_settings: the PortConfig object used for generating the query
:param port_name: if port_settings is None, this name is the value to place
into the query
- :param project_id: the associated project ID
+ :param project_name: the associated project name
:return: a SNAPS-OO Port domain object
"""
port_filter = dict()
:return: a SNAPS-OO Port domain object
"""
port_filter = dict()
port_filter['device_id'] = port_settings.device_id
if port_settings.mac_address:
port_filter['mac_address'] = port_settings.mac_address
port_filter['device_id'] = port_settings.device_id
if port_settings.mac_address:
port_filter['mac_address'] = port_settings.mac_address
+ if port_settings.project_name:
+ project_name = port_settings.project_name
if port_settings.network_name:
network = get_network(
if port_settings.network_name:
network = get_network(
- neutron, network_name=port_settings.network_name,
- project_id=project_id)
+ neutron, keystone, network_name=port_settings.network_name,
+ project_name=project_name)
if network:
port_filter['network_id'] = network.id
elif port_name:
port_filter['name'] = port_name
if network:
port_filter['network_id'] = network.id
elif port_name:
port_filter['name'] = port_name
- if project_id:
- port_filter['project_id'] = project_id
-
ports = neutron.list_ports(**port_filter)
for port in ports['ports']:
ports = neutron.list_ports(**port_filter)
for port in ports['ports']:
+ if project_name:
+ project = keystone_utils.get_project_by_id(
+ keystone, port['project_id'])
+ if project and project.name == project_name:
+ return Port(**port)
+ else:
+ return Port(**port)
-def create_security_group(neutron, keystone, sec_grp_settings, project_id):
+def create_security_group(neutron, keystone, sec_grp_settings):
"""
Creates a security group object in OpenStack
:param neutron: the Neutron client
:param keystone: the Keystone client
:param sec_grp_settings: the security group settings
"""
Creates a security group object in OpenStack
:param neutron: the Neutron client
:param keystone: the Keystone client
:param sec_grp_settings: the security group settings
- :param project_id: the default project to associated the security group
:return: a SNAPS-OO SecurityGroup domain object
"""
logger.info('Creating security group with name - %s',
sec_grp_settings.name)
os_group = neutron.create_security_group(
:return: a SNAPS-OO SecurityGroup domain object
"""
logger.info('Creating security group with name - %s',
sec_grp_settings.name)
os_group = neutron.create_security_group(
- sec_grp_settings.dict_for_neutron(keystone, project_id))
+ sec_grp_settings.dict_for_neutron(keystone))
return __map_os_security_group(neutron, os_group['security_group'])
return __map_os_security_group(neutron, os_group['security_group'])
neutron.delete_security_group(sec_grp.id)
neutron.delete_security_group(sec_grp.id)
-def get_security_group(neutron, sec_grp_settings=None, sec_grp_name=None,
- project_id=None):
+def get_security_group(neutron, keystone, sec_grp_settings=None,
+ sec_grp_name=None, project_name=None):
"""
Returns the first security group for a given query. The query gets built
from the sec_grp_settings parameter if not None, else only the name of
the security group will be used, else if the query parameters are None then
None will be returned
"""
Returns the first security group for a given query. The query gets built
from the sec_grp_settings parameter if not None, else only the name of
the security group will be used, else if the query parameters are None then
None will be returned
- :param neutron: the client
+ :param neutron: the neutron client
+ :param keystone: the keystone client
:param sec_grp_settings: an instance of SecurityGroupConfig object
:param sec_grp_name: the name of security group object to retrieve
:param sec_grp_settings: an instance of SecurityGroupConfig object
:param sec_grp_name: the name of security group object to retrieve
- :param project_id: the ID of the project/tentant object that owns the
+ :param project_name: the name of the project/tentant object that owns the
secuity group to retrieve
:return: a SNAPS-OO SecurityGroup domain object or None if not found
"""
sec_grp_filter = dict()
secuity group to retrieve
:return: a SNAPS-OO SecurityGroup domain object or None if not found
"""
sec_grp_filter = dict()
- if project_id:
- sec_grp_filter['tenant_id'] = project_id
if sec_grp_settings:
sec_grp_filter['name'] = sec_grp_settings.name
if sec_grp_settings.description:
sec_grp_filter['description'] = sec_grp_settings.description
if sec_grp_settings:
sec_grp_filter['name'] = sec_grp_settings.name
if sec_grp_settings.description:
sec_grp_filter['description'] = sec_grp_settings.description
+ if sec_grp_settings.project_name:
+ project_name = sec_grp_settings.project_name
elif sec_grp_name:
sec_grp_filter['name'] = sec_grp_name
else:
return None
groups = neutron.list_security_groups(**sec_grp_filter)
elif sec_grp_name:
sec_grp_filter['name'] = sec_grp_name
else:
return None
groups = neutron.list_security_groups(**sec_grp_filter)
for group in groups['security_groups']:
for group in groups['security_groups']:
+ if project_name:
+ project = keystone_utils.get_project_by_id(
+ keystone, group['tenant_id'])
+ if project and project_name == project.name:
+ break
+ else:
+ break
+ if group:
return __map_os_security_group(neutron, group)
return __map_os_security_group(neutron, group)
-def create_security_group_rule(neutron, sec_grp_rule_settings, proj_id):
+def create_security_group_rule(neutron, keystone, sec_grp_rule_settings,
+ proj_name):
"""
Creates a security group rule in OpenStack
"""
Creates a security group rule in OpenStack
- :param neutron: the client
+ :param neutron: the neutron client
+ :param keystone: the keystone client
:param sec_grp_rule_settings: the security group rule settings
:param sec_grp_rule_settings: the security group rule settings
- :param proj_id: the default project to apply to the rule settings
+ :param proj_name: the default project name
:return: a SNAPS-OO SecurityGroupRule domain object
"""
logger.info('Creating security group to security group - %s',
sec_grp_rule_settings.sec_grp_name)
os_rule = neutron.create_security_group_rule(
:return: a SNAPS-OO SecurityGroupRule domain object
"""
logger.info('Creating security group to security group - %s',
sec_grp_rule_settings.sec_grp_name)
os_rule = neutron.create_security_group_rule(
- sec_grp_rule_settings.dict_for_neutron(neutron, proj_id))
+ sec_grp_rule_settings.dict_for_neutron(neutron, keystone, proj_name))
return SecurityGroupRule(**os_rule['security_group_rule'])
return SecurityGroupRule(**os_rule['security_group_rule'])
-def create_floating_ip(neutron, ext_net_name, port_id=None):
+def create_floating_ip(neutron, keystone, ext_net_name, port_id=None):
"""
Returns the floating IP object that was created with this call
:param neutron: the Neutron client
"""
Returns the floating IP object that was created with this call
:param neutron: the Neutron client
+ :param keystone: the Keystone client
:param ext_net_name: the name of the external network on which to apply the
floating IP address
:param port_id: the ID of the port to which the floating IP will be
:param ext_net_name: the name of the external network on which to apply the
floating IP address
:param port_id: the ID of the port to which the floating IP will be
:return: the SNAPS FloatingIp object
"""
logger.info('Creating floating ip to external network - ' + ext_net_name)
:return: the SNAPS FloatingIp object
"""
logger.info('Creating floating ip to external network - ' + ext_net_name)
- ext_net = get_network(neutron, network_name=ext_net_name)
+ ext_net = get_network(neutron, keystone, network_name=ext_net_name)
if ext_net:
body = {'floatingip': {'floating_network_id': ext_net.id}}
if port_id:
if ext_net:
body = {'floatingip': {'floating_network_id': ext_net.id}}
if port_id:
region_name=os_creds.region_name)
region_name=os_creds.region_name)
-def create_server(nova, neutron, glance, instance_config, image_config,
- project_id, keypair_config=None):
+def create_server(nova, keystone, neutron, glance, instance_config,
+ image_config, project_name, keypair_config=None):
"""
Creates a VM instance
:param nova: the nova client (required)
"""
Creates a VM instance
:param nova: the nova client (required)
+ :param keystone: the keystone client for retrieving projects (required)
:param neutron: the neutron client for retrieving ports (required)
:param glance: the glance client (required)
:param instance_config: the VMInstConfig object (required)
:param image_config: the VM's ImageConfig object (required)
:param neutron: the neutron client for retrieving ports (required)
:param glance: the glance client (required)
:param instance_config: the VMInstConfig object (required)
:param image_config: the VM's ImageConfig object (required)
- :param project_id: the associated project ID (required)
+ :param project_name: the associated project name (required)
:param keypair_config: the VM's KeypairConfig object (optional)
:return: a snaps.domain.VmInst object
"""
:param keypair_config: the VM's KeypairConfig object (optional)
:return: a snaps.domain.VmInst object
"""
for port_setting in instance_config.port_settings:
port = neutron_utils.get_port(
for port_setting in instance_config.port_settings:
port = neutron_utils.get_port(
- neutron, port_settings=port_setting, project_id=project_id)
+ neutron, keystone, port_settings=port_setting,
+ project_name=project_name)
if port:
ports.append(port)
else:
if port:
ports.append(port)
else:
server = nova.servers.create(**args)
server = nova.servers.create(**args)
- return __map_os_server_obj_to_vm_inst(neutron, server, project_id)
+ return __map_os_server_obj_to_vm_inst(
+ neutron, keystone, server, project_name)
else:
raise NovaException(
'Cannot create instance, image cannot be located with name %s',
image_config.name)
else:
raise NovaException(
'Cannot create instance, image cannot be located with name %s',
image_config.name)
-def get_server(nova, neutron, vm_inst_settings=None, server_name=None,
- project_id=None):
+def get_server(nova, neutron, keystone, vm_inst_settings=None,
+ server_name=None, project_id=None):
"""
Returns a VmInst object for the first server instance found.
:param nova: the Nova client
:param neutron: the Neutron client
"""
Returns a VmInst object for the first server instance found.
:param nova: the Nova client
:param neutron: the Neutron client
+ :param keystone: the Keystone client
:param vm_inst_settings: the VmInstanceConfig object from which to build
the query if not None
:param server_name: the server with this name to return if vm_inst_settings
:param vm_inst_settings: the VmInstanceConfig object from which to build
the query if not None
:param server_name: the server with this name to return if vm_inst_settings
servers = nova.servers.list(search_opts=search_opts)
for server in servers:
servers = nova.servers.list(search_opts=search_opts)
for server in servers:
- return __map_os_server_obj_to_vm_inst(neutron, server, project_id)
+ return __map_os_server_obj_to_vm_inst(
+ neutron, keystone, server, project_id)
def get_server_connection(nova, vm_inst_settings=None, server_name=None):
def get_server_connection(nova, vm_inst_settings=None, server_name=None):
-def __map_os_server_obj_to_vm_inst(neutron, os_server, project_id):
+def __map_os_server_obj_to_vm_inst(neutron, keystone, os_server,
+ project_name=None):
"""
Returns a VmInst object for an OpenStack Server object
"""
Returns a VmInst object for an OpenStack Server object
- :param neutron: the Neutron client (when None, ports will be empty)
+ :param neutron: the Neutron client
+ :param keystone: the Keystone client
:param os_server: the OpenStack server object
:param os_server: the OpenStack server object
+ :param project_name: the associated project name
:return: an equivalent SNAPS-OO VmInst domain object
"""
sec_grp_names = list()
:return: an equivalent SNAPS-OO VmInst domain object
"""
sec_grp_names = list()
if len(os_server.networks) > 0:
for net_name, ips in os_server.networks.items():
network = neutron_utils.get_network(
if len(os_server.networks) > 0:
for net_name, ips in os_server.networks.items():
network = neutron_utils.get_network(
- neutron, network_name=net_name, project_id=project_id)
+ neutron, keystone, network_name=net_name,
+ project_name=project_name)
ports = neutron_utils.get_ports(neutron, network, ips)
for port in ports:
out_ports.append(port)
ports = neutron_utils.get_ports(neutron, network, ips)
for port in ports:
out_ports.append(port)
-def get_latest_server_object(nova, neutron, server, project_id):
+def get_latest_server_object(nova, neutron, keystone, server, project_name):
"""
Returns a server with a given id
:param nova: the Nova client
:param neutron: the Neutron client
"""
Returns a server with a given id
:param nova: the Nova client
:param neutron: the Neutron client
+ :param keystone: the Keystone client
:param server: the old server object
:param server: the old server object
- :param project_id: the associated project ID
+ :param project_name: the associated project name
:return: the list of servers or None if not found
"""
server = __get_latest_server_os_object(nova, server)
:return: the list of servers or None if not found
"""
server = __get_latest_server_os_object(nova, server)
- return __map_os_server_obj_to_vm_inst(neutron, server, project_id)
+ return __map_os_server_obj_to_vm_inst(
+ neutron, keystone, server, project_name)
-def get_server_object_by_id(nova, neutron, server_id, project_id):
+def get_server_object_by_id(nova, neutron, keystone, server_id,
+ project_name=None):
"""
Returns a server with a given id
:param nova: the Nova client
:param neutron: the Neutron client
"""
Returns a server with a given id
:param nova: the Nova client
:param neutron: the Neutron client
+ :param keystone: the Keystone client
:param server_id: the server's id
:param server_id: the server's id
- :param project_id: the associated project ID
+ :param project_name: the associated project name
:return: an SNAPS-OO VmInst object or None if not found
"""
server = __get_latest_server_os_object_by_id(nova, server_id)
:return: an SNAPS-OO VmInst object or None if not found
"""
server = __get_latest_server_os_object_by_id(nova, server_id)
- return __map_os_server_obj_to_vm_inst(neutron, server, project_id)
+ return __map_os_server_obj_to_vm_inst(
+ neutron, keystone, server, project_name)
def get_server_security_group_names(nova, server):
def get_server_security_group_names(nova, server):
return nova.quotas.update(project_id, **update_values)
return nova.quotas.update(project_id, **update_values)
-def attach_volume(nova, neutron, server, volume, project_id, timeout=120):
+def attach_volume(nova, neutron, keystone, server, volume, project_name,
+ timeout=120):
"""
Attaches a volume to a server. When the timeout parameter is used, a VmInst
object with the proper volume updates is returned unless it has not been
updated in the allotted amount of time then an Exception will be raised.
:param nova: the nova client
:param neutron: the neutron client
"""
Attaches a volume to a server. When the timeout parameter is used, a VmInst
object with the proper volume updates is returned unless it has not been
updated in the allotted amount of time then an Exception will be raised.
:param nova: the nova client
:param neutron: the neutron client
+ :param keystone: the neutron client
:param server: the VMInst domain object
:param volume: the Volume domain object
:param server: the VMInst domain object
:param volume: the Volume domain object
- :param project_id: the associated project ID
+ :param project_name: the associated project name
:param timeout: denotes the amount of time to block to determine if the
has been properly attached.
:return: updated VmInst object
:param timeout: denotes the amount of time to block to determine if the
has been properly attached.
:return: updated VmInst object
start_time = time.time()
while time.time() < start_time + timeout:
start_time = time.time()
while time.time() < start_time + timeout:
- vm = get_server_object_by_id(nova, neutron, server.id, project_id)
+ vm = get_server_object_by_id(
+ nova, neutron, keystone, server.id, project_name)
for vol_dict in vm.volume_ids:
if volume.id == vol_dict['id']:
return vm
for vol_dict in vm.volume_ids:
if volume.id == vol_dict['id']:
return vm
-def detach_volume(nova, neutron, server, volume, project_id, timeout=120):
+def detach_volume(nova, neutron, keystone, server, volume, project_name,
+ timeout=120):
"""
Detaches a volume to a server. When the timeout parameter is used, a VmInst
object with the proper volume updates is returned unless it has not been
updated in the allotted amount of time then an Exception will be raised.
:param nova: the nova client
:param neutron: the neutron client
"""
Detaches a volume to a server. When the timeout parameter is used, a VmInst
object with the proper volume updates is returned unless it has not been
updated in the allotted amount of time then an Exception will be raised.
:param nova: the nova client
:param neutron: the neutron client
+ :param keystone: the keystone client
:param server: the VMInst domain object
:param volume: the Volume domain object
:param server: the VMInst domain object
:param volume: the Volume domain object
- :param project_id: the associated project ID
+ :param project_name: the associated project name
:param timeout: denotes the amount of time to block to determine if the
has been properly detached.
:return: updated VmInst object
:param timeout: denotes the amount of time to block to determine if the
has been properly detached.
:return: updated VmInst object
start_time = time.time()
while time.time() < start_time + timeout:
start_time = time.time()
while time.time() < start_time + timeout:
- vm = get_server_object_by_id(nova, neutron, server.id, project_id)
+ vm = get_server_object_by_id(
+ nova, neutron, keystone, server.id, project_name)
if len(vm.volume_ids) == 0:
return vm
else:
if len(vm.volume_ids) == 0:
return vm
else:
return KeypairConfig(name=keypair.name)
return KeypairConfig(name=keypair.name)
-def create_vm_inst_config(nova, neutron, server, project_id):
+def create_vm_inst_config(nova, keystone, neutron, server, project_name):
"""
Returns a VmInstanceConfig object
note: if the server instance is not active, the PortSettings objects will
not be generated resulting in an invalid configuration
:param nova: the nova client
"""
Returns a VmInstanceConfig object
note: if the server instance is not active, the PortSettings objects will
not be generated resulting in an invalid configuration
:param nova: the nova client
+ :param keystone: the keystone client
:param neutron: the neutron client
:param server: a SNAPS-OO VmInst domain object
:param neutron: the neutron client
:param server: a SNAPS-OO VmInst domain object
- :param project_id: the associated project ID
+ :param project_name: the associated project name
kwargs['port_settings'] = __create_port_configs(neutron, server.ports)
kwargs['security_group_names'] = server.sec_grp_names
kwargs['floating_ip_settings'] = __create_floatingip_config(
kwargs['port_settings'] = __create_port_configs(neutron, server.ports)
kwargs['security_group_names'] = server.sec_grp_names
kwargs['floating_ip_settings'] = __create_floatingip_config(
- neutron, kwargs['port_settings'], project_id)
+ neutron, keystone, kwargs['port_settings'], project_name)
return VmInstanceConfig(**kwargs)
return VmInstanceConfig(**kwargs)
-def __create_floatingip_config(neutron, port_settings, project_id):
+def __create_floatingip_config(neutron, keystone, port_settings, project_name):
"""
Returns a list of FloatingIpConfig objects as they pertain to an
existing deployed server instance
:param neutron: the neutron client
"""
Returns a list of FloatingIpConfig objects as they pertain to an
existing deployed server instance
:param neutron: the neutron client
+ :param keystone: the keystone client
:param port_settings: list of SNAPS-OO PortConfig objects
:return: a list of FloatingIpConfig objects or an empty list if no
floating IPs have been created
:param port_settings: list of SNAPS-OO PortConfig objects
:return: a list of FloatingIpConfig objects or an empty list if no
floating IPs have been created
fip_ports = list()
for port_setting in port_settings:
setting_port = neutron_utils.get_port(
fip_ports = list()
for port_setting in port_settings:
setting_port = neutron_utils.get_port(
- neutron, port_setting, project_id=project_id)
+ neutron, keystone, port_setting, project_name=project_name)
if setting_port:
network = neutron_utils.get_network(
if setting_port:
network = neutron_utils.get_network(
- neutron, network_name=port_setting.network_name)
+ neutron, keystone, network_name=port_setting.network_name)
network_ports = neutron_utils.get_ports(neutron, network)
if network_ports:
for setting_port in network_ports:
network_ports = neutron_utils.get_ports(neutron, network)
if network_ports:
for setting_port in network_ports:
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
heat_utils, neutron_utils, nova_utils, settings_utils, glance_utils,
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
heat_utils, neutron_utils, nova_utils, settings_utils, glance_utils,
+ cinder_utils, keystone_utils)
self.assertEqual(self.subnet_name, subnets[0].name)
nova = nova_utils.nova_client(self.os_creds)
self.assertEqual(self.subnet_name, subnets[0].name)
nova = nova_utils.nova_client(self.os_creds)
+ keystone = keystone_utils.keystone_client(self.os_creds)
servers = heat_utils.get_stack_servers(
servers = heat_utils.get_stack_servers(
- self.heat_client, nova, neutron, self.stack1, self.project_id)
+ self.heat_client, nova, neutron, keystone, self.stack1,
+ self.os_creds.project_name)
self.assertIsNotNone(servers)
self.assertEqual(1, len(servers))
self.assertEqual(self.vm_inst_name, servers[0].name)
self.assertIsNotNone(servers)
self.assertEqual(1, len(servers))
self.assertEqual(self.vm_inst_name, servers[0].name)
if not is_deleted:
nova = nova_utils.nova_client(self.os_creds)
if not is_deleted:
nova = nova_utils.nova_client(self.os_creds)
+ keystone = keystone_utils.keystone_client(self.os_creds)
neutron = neutron_utils.neutron_client(self.os_creds)
glance = glance_utils.glance_client(self.os_creds)
servers = heat_utils.get_stack_servers(
neutron = neutron_utils.neutron_client(self.os_creds)
glance = glance_utils.glance_client(self.os_creds)
servers = heat_utils.get_stack_servers(
- self.heat_client, nova, neutron, self.stack,
- self.project_id)
+ self.heat_client, nova, neutron, keystone, self.stack,
+ self.os_creds.project_name)
for server in servers:
vm_settings = settings_utils.create_vm_inst_config(
for server in servers:
vm_settings = settings_utils.create_vm_inst_config(
- nova, neutron, server, self.project_id)
+ nova, keystone, neutron, server,
+ self.os_creds.project_name)
img_settings = settings_utils.determine_image_config(
glance, server,
[self.image_creator1.image_settings,
img_settings = settings_utils.determine_image_config(
glance, server,
[self.image_creator1.image_settings,
nova = nova_utils.nova_client(self.os_creds)
glance = glance_utils.glance_client(self.os_creds)
nova = nova_utils.nova_client(self.os_creds)
glance = glance_utils.glance_client(self.os_creds)
+ keystone = keystone_utils.keystone_client(self.os_creds)
servers = heat_utils.get_stack_servers(
servers = heat_utils.get_stack_servers(
- self.heat_client, nova, neutron, self.stack, self.project_id)
+ self.heat_client, nova, neutron, keystone, self.stack,
+ self.os_creds.project_name)
self.assertIsNotNone(servers)
self.assertEqual(2, len(servers))
self.assertIsNotNone(servers)
self.assertEqual(2, len(servers))
router = routers[0]
self.assertEqual(self.router_name, router.name)
router = routers[0]
self.assertEqual(self.router_name, router.name)
+ keystone = keystone_utils.keystone_client(self.os_creds)
ext_net = neutron_utils.get_network(
ext_net = neutron_utils.get_network(
- self.neutron, network_name=self.ext_net_name)
+ self.neutron, keystone, network_name=self.ext_net_name)
self.assertEqual(ext_net.id, router.external_network_id)
self.assertEqual(ext_net.id, router.external_network_id)
if self.project:
neutron = neutron_utils.neutron_client(self.os_creds)
default_sec_grp = neutron_utils.get_security_group(
if self.project:
neutron = neutron_utils.neutron_client(self.os_creds)
default_sec_grp = neutron_utils.get_security_group(
- neutron, sec_grp_name='default',
- project_id=self.project.id)
+ neutron, self.keystone, sec_grp_name='default',
+ project_name=self.os_creds.project_name)
if default_sec_grp:
try:
neutron_utils.delete_security_group(
if default_sec_grp:
try:
neutron_utils.delete_security_group(
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
self.network = None
self.net_config = openstack_tests.get_pub_net_config(
net_name=guid + '-pub-net')
self.network = None
self.net_config = openstack_tests.get_pub_net_config(
net_name=guid + '-pub-net')
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
self.assertEqual(len(self.net_config.network_settings.subnet_settings),
len(self.network.subnets))
self.assertEqual(len(self.net_config.network_settings.subnet_settings),
len(self.network.subnets))
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
self.network = None
self.net_config = openstack_tests.get_pub_net_config(
net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
self.network = None
self.net_config = openstack_tests.get_pub_net_config(
net_name=guid + '-pub-net', subnet_name=guid + '-pub-subnet',
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
with self.assertRaises(Exception):
SubnetConfig(cidr=self.net_config.subnet_cidr)
with self.assertRaises(Exception):
SubnetConfig(cidr=self.net_config.subnet_cidr)
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.port_name = str(guid) + '-port'
self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
self.network = None
self.port = None
self.router = None
self.network = None
self.port = None
self.router = None
Tests the neutron_utils.create_router()
"""
self.router = neutron_utils.create_router(
Tests the neutron_utils.create_router()
"""
self.router = neutron_utils.create_router(
- self.neutron, self.os_creds, self.net_config.router_settings,
- self.project_id)
+ self.neutron, self.os_creds, self.net_config.router_settings)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
"""
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.net_config = openstack_tests.OSNetworkConfig(
"""
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.net_config = openstack_tests.OSNetworkConfig(
- self.net_config.network_settings.name,
- subnet_setting.name,
- subnet_setting.cidr,
- self.net_config.router_settings.name,
+ self.net_config.network_settings.name, subnet_setting.name,
+ subnet_setting.cidr, self.net_config.router_settings.name,
self.ext_net_name)
self.router = neutron_utils.create_router(
self.ext_net_name)
self.router = neutron_utils.create_router(
- self.neutron, self.os_creds, self.net_config.router_settings,
- self.project_id)
- validate_router(self.neutron, self.net_config.router_settings.name,
- True)
+ self.neutron, self.os_creds, self.net_config.router_settings)
+ validate_router(
+ self.neutron, self.net_config.router_settings.name, True)
ext_net = neutron_utils.get_network(
ext_net = neutron_utils.get_network(
- self.neutron, network_name=self.ext_net_name)
+ self.neutron, self.keystone, network_name=self.ext_net_name)
self.assertEqual(self.router.external_network_id, ext_net.id)
def test_add_interface_router(self):
self.assertEqual(self.router.external_network_id, ext_net.id)
def test_add_interface_router(self):
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.router = neutron_utils.create_router(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.neutron, subnet_setting.name, subnet_setting.cidr, True))
self.router = neutron_utils.create_router(
- self.neutron, self.os_creds, self.net_config.router_settings,
- self.project_id)
+ self.neutron, self.os_creds, self.net_config.router_settings)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
self.router = neutron_utils.create_router(
self.router = neutron_utils.create_router(
- self.neutron, self.os_creds, self.net_config.router_settings,
- self.project_id)
+ self.neutron, self.os_creds, self.net_config.router_settings)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
self.router = neutron_utils.create_router(
self.router = neutron_utils.create_router(
- self.neutron, self.os_creds, self.net_config.router_settings,
- self.project_id)
+ self.neutron, self.os_creds, self.net_config.router_settings)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
validate_router(self.neutron, self.net_config.router_settings.name,
True)
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(self.neutron, subnet_setting.name,
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
self.assertEqual(self.net_config.network_settings.name,
self.network.name)
self.assertTrue(validate_network(
- self.neutron, self.net_config.network_settings.name, True,
- self.project_id))
+ self.neutron, self.keystone,
+ self.net_config.network_settings.name, True,
+ self.os_creds.project_name))
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
subnet_setting = self.net_config.network_settings.subnet_settings[0]
self.assertTrue(validate_subnet(
"""
sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
security_group = neutron_utils.create_security_group(
"""
sec_grp_settings = SecurityGroupConfig(name=self.sec_grp_name)
security_group = neutron_utils.create_security_group(
- self.neutron, self.keystone, sec_grp_settings, self.project_id)
+ self.neutron, self.keystone, sec_grp_settings)
self.assertTrue(sec_grp_settings.name, security_group.name)
sec_grp_get = neutron_utils.get_security_group(
self.assertTrue(sec_grp_settings.name, security_group.name)
sec_grp_get = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group, sec_grp_get))
neutron_utils.delete_security_group(self.neutron, security_group)
sec_grp_get = neutron_utils.get_security_group(
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group, sec_grp_get))
neutron_utils.delete_security_group(self.neutron, security_group)
sec_grp_get = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
sec_grp_settings = SecurityGroupConfig()
self.security_groups.append(
neutron_utils.create_security_group(
sec_grp_settings = SecurityGroupConfig()
self.security_groups.append(
neutron_utils.create_security_group(
- self.neutron, self.keystone, sec_grp_settings,
- self.project_id))
+ self.neutron, self.keystone, sec_grp_settings))
def test_create_sec_grp_no_rules(self):
"""
def test_create_sec_grp_no_rules(self):
"""
name=self.sec_grp_name, description='hello group')
self.security_groups.append(
neutron_utils.create_security_group(
name=self.sec_grp_name, description='hello group')
self.security_groups.append(
neutron_utils.create_security_group(
- self.neutron, self.keystone, sec_grp_settings,
- self.project_id))
+ self.neutron, self.keystone, sec_grp_settings))
self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
sec_grp_get = neutron_utils.get_security_group(
self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
sec_grp_get = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(self.security_groups[0], sec_grp_get)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(self.security_groups[0], sec_grp_get)
self.security_groups.append(
neutron_utils.create_security_group(
self.security_groups.append(
neutron_utils.create_security_group(
- self.neutron, self.keystone, sec_grp_settings,
- self.project_id))
+ self.neutron, self.keystone, sec_grp_settings))
free_rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.security_groups[0])
for free_rule in free_rules:
self.security_group_rules.append(free_rule)
keystone = keystone_utils.keystone_client(self.os_creds)
free_rules = neutron_utils.get_rules_by_security_group(
self.neutron, self.security_groups[0])
for free_rule in free_rules:
self.security_group_rules.append(free_rule)
keystone = keystone_utils.keystone_client(self.os_creds)
- project_id = keystone_utils.get_project(
- keystone, self.os_creds.project_name)
self.security_group_rules.append(
neutron_utils.create_security_group_rule(
self.security_group_rules.append(
neutron_utils.create_security_group_rule(
- self.neutron, sec_grp_settings.rule_settings[0], project_id))
+ self.neutron, keystone, sec_grp_settings.rule_settings[0],
+ self.os_creds.project_name))
# Refresh object so it is populated with the newly added rule
security_group = neutron_utils.get_security_group(
# Refresh object so it is populated with the newly added rule
security_group = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
- rules = neutron_utils.get_rules_by_security_group(self.neutron,
- security_group)
+ rules = neutron_utils.get_rules_by_security_group(
+ self.neutron, security_group)
self.assertTrue(
validation_utils.objects_equivalent(
self.assertTrue(
validation_utils.objects_equivalent(
self.assertTrue(sec_grp_settings.name, security_group.name)
sec_grp_get = neutron_utils.get_security_group(
self.assertTrue(sec_grp_settings.name, security_group.name)
sec_grp_get = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings=sec_grp_settings)
+ self.neutron, self.keystone, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(security_group, sec_grp_get)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(security_group, sec_grp_get)
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
SecurityGroupConfig(
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
SecurityGroupConfig(
- name=self.sec_grp_name + '-1', description='hello group'),
- self.project_id))
+ name=self.sec_grp_name + '-1', description='hello group')))
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
SecurityGroupConfig(
self.security_groups.append(neutron_utils.create_security_group(
self.neutron, self.keystone,
SecurityGroupConfig(
- name=self.sec_grp_name + '-2', description='hello group'),
- self.project_id))
+ name=self.sec_grp_name + '-2', description='hello group')))
sec_grp_1b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[0].id)
sec_grp_1b = neutron_utils.get_security_group_by_id(
self.neutron, self.security_groups[0].id)
and creating an OS image file within OpenStack
"""
self.neutron = neutron_utils.neutron_client(self.os_creds)
and creating an OS image file within OpenStack
"""
self.neutron = neutron_utils.neutron_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
self.floating_ip = None
def tearDown(self):
self.floating_ip = None
def tearDown(self):
"""
initial_fips = neutron_utils.get_floating_ips(self.neutron)
"""
initial_fips = neutron_utils.get_floating_ips(self.neutron)
- self.floating_ip = neutron_utils.create_floating_ip(self.neutron,
- self.ext_net_name)
+ self.floating_ip = neutron_utils.create_floating_ip(
+ self.neutron, self.keystone, self.ext_net_name)
all_fips = neutron_utils.get_floating_ips(self.neutron)
self.assertEqual(len(initial_fips) + 1, len(all_fips))
returned = neutron_utils.get_floating_ip(self.neutron,
all_fips = neutron_utils.get_floating_ips(self.neutron)
self.assertEqual(len(initial_fips) + 1, len(all_fips))
returned = neutron_utils.get_floating_ip(self.neutron,
-def validate_network(neutron, name, exists, project_id):
+def validate_network(neutron, keystone, name, exists, project_name):
"""
Returns true if a network for a given name DOES NOT exist if the exists
parameter is false conversely true. Returns false if a network for a given
name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
"""
Returns true if a network for a given name DOES NOT exist if the exists
parameter is false conversely true. Returns false if a network for a given
name DOES exist if the exists parameter is true conversely false.
:param neutron: The neutron client
+ :param keystone: The keystone client
:param name: The expected network name
:param exists: Whether or not the network name should exist or not
:param name: The expected network name
:param exists: Whether or not the network name should exist or not
- :param project_id: the associated project ID
+ :param project_name: the associated project name
- network = neutron_utils.get_network(neutron, network_name=name,
- project_id=project_id)
+ network = neutron_utils.get_network(
+ neutron, keystone, network_name=name, project_name=project_name)
if exists and network:
return True
if not exists and not network:
if exists and network:
return True
if not exists and not network:
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
- nova_utils, neutron_utils, glance_utils, cinder_utils)
+ nova_utils, neutron_utils, glance_utils, cinder_utils, keystone_utils)
from snaps.openstack.utils.nova_utils import NovaException
__author__ = 'spisarski'
from snaps.openstack.utils.nova_utils import NovaException
__author__ = 'spisarski'
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.nova = nova_utils.nova_client(self.os_creds)
guid = self.__class__.__name__ + '-' + str(uuid.uuid4())
self.nova = nova_utils.nova_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
"""
self.vm_inst = nova_utils.create_server(
"""
self.vm_inst = nova_utils.create_server(
- self.nova, self.neutron, self.glance, self.instance_settings,
- self.image_creator.image_settings, self.project_id)
+ self.nova, self.keystone, self.neutron, self.glance,
+ self.instance_settings, self.image_creator.image_settings,
+ self.os_creds.project_name)
self.assertIsNotNone(self.vm_inst)
self.assertIsNotNone(self.vm_inst)
self.assertTrue(active)
vm_inst = nova_utils.get_latest_server_object(
self.assertTrue(active)
vm_inst = nova_utils.get_latest_server_object(
- self.nova, self.neutron, self.vm_inst, self.project_id)
+ self.nova, self.neutron, self.keystone, self.vm_inst,
+ self.os_creds.project_name)
self.assertEqual(self.vm_inst.name, vm_inst.name)
self.assertEqual(self.vm_inst.id, vm_inst.id)
self.assertEqual(self.vm_inst.name, vm_inst.name)
self.assertEqual(self.vm_inst.id, vm_inst.id)
# Attach volume to VM
neutron = neutron_utils.neutron_client(self.os_creds)
# Attach volume to VM
neutron = neutron_utils.neutron_client(self.os_creds)
+ keystone = keystone_utils.keystone_client(self.os_creds)
self.assertIsNotNone(nova_utils.attach_volume(
self.assertIsNotNone(nova_utils.attach_volume(
- self.nova, neutron, self.instance_creator.get_vm_inst(),
- self.volume_creator.get_volume(), self.project_id))
+ self.nova, neutron, keystone, self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume(), self.os_creds.project_name))
vol_attach = None
vol_detach = None
vol_attach = None
vol_detach = None
self.assertTrue(attached)
self.assertIsNotNone(vol_attach)
self.assertTrue(attached)
self.assertIsNotNone(vol_attach)
+ keystone = keystone_utils.keystone_client(self.os_creds)
vm_attach = nova_utils.get_server_object_by_id(
vm_attach = nova_utils.get_server_object_by_id(
- self.nova, neutron, self.instance_creator.get_vm_inst().id,
- self.project_id)
+ self.nova, neutron, keystone,
+ self.instance_creator.get_vm_inst().id, self.os_creds.project_name)
# Validate Attachment
self.assertIsNotNone(vol_attach)
# Validate Attachment
self.assertIsNotNone(vol_attach)
# Detach volume to VM
self.assertIsNotNone(nova_utils.detach_volume(
# Detach volume to VM
self.assertIsNotNone(nova_utils.detach_volume(
- self.nova, neutron, self.instance_creator.get_vm_inst(),
- self.volume_creator.get_volume(), self.project_id))
+ self.nova, neutron, keystone, self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume(), self.os_creds.project_name))
start_time = time.time()
while time.time() < start_time + 120:
start_time = time.time()
while time.time() < start_time + 120:
self.assertIsNotNone(vol_detach)
vm_detach = nova_utils.get_server_object_by_id(
self.assertIsNotNone(vol_detach)
vm_detach = nova_utils.get_server_object_by_id(
- self.nova, neutron, self.instance_creator.get_vm_inst().id,
- self.project_id)
+ self.nova, neutron, keystone,
+ self.instance_creator.get_vm_inst().id, self.os_creds.project_name)
# Validate Detachment
self.assertIsNotNone(vol_detach)
# Validate Detachment
self.assertIsNotNone(vol_detach)
# Attach volume to VM
neutron = neutron_utils.neutron_client(self.os_creds)
# Attach volume to VM
neutron = neutron_utils.neutron_client(self.os_creds)
+ keystone = keystone_utils.keystone_client(self.os_creds)
with self.assertRaises(NovaException):
nova_utils.attach_volume(
with self.assertRaises(NovaException):
nova_utils.attach_volume(
- self.nova, neutron, self.instance_creator.get_vm_inst(),
- self.volume_creator.get_volume(), self.project_id, 0)
+ self.nova, neutron, keystone,
+ self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume(), self.os_creds.project_name,
+ 0)
def test_detach_volume_nowait(self):
"""
def test_detach_volume_nowait(self):
"""
# Attach volume to VM
neutron = neutron_utils.neutron_client(self.os_creds)
# Attach volume to VM
neutron = neutron_utils.neutron_client(self.os_creds)
+ keystone = keystone_utils.keystone_client(self.os_creds)
nova_utils.attach_volume(
nova_utils.attach_volume(
- self.nova, neutron, self.instance_creator.get_vm_inst(),
- self.volume_creator.get_volume(), self.project_id)
+ self.nova, neutron, keystone, self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume(), self.os_creds.project_name)
# Check VmInst for attachment
# Check VmInst for attachment
+ keystone = keystone_utils.keystone_client(self.os_creds)
latest_vm = nova_utils.get_server_object_by_id(
latest_vm = nova_utils.get_server_object_by_id(
- self.nova, neutron, self.instance_creator.get_vm_inst().id,
- self.project_id)
+ self.nova, neutron, keystone,
+ self.instance_creator.get_vm_inst().id, self.os_creds.project_name)
self.assertEqual(1, len(latest_vm.volume_ids))
# Check Volume for attachment
self.assertEqual(1, len(latest_vm.volume_ids))
# Check Volume for attachment
# Detach volume
with self.assertRaises(NovaException):
nova_utils.detach_volume(
# Detach volume
with self.assertRaises(NovaException):
nova_utils.detach_volume(
- self.nova, neutron, self.instance_creator.get_vm_inst(),
- self.volume_creator.get_volume(), self.project_id, 0)
+ self.nova, neutron, keystone,
+ self.instance_creator.get_vm_inst(),
+ self.volume_creator.get_volume(), self.os_creds.project_name,
+ 0)
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
from snaps.openstack.tests import openstack_tests
from snaps.openstack.tests.os_source_file_test import OSComponentTestCase
from snaps.openstack.utils import (
- neutron_utils, settings_utils, nova_utils, glance_utils)
+ neutron_utils, settings_utils, nova_utils, glance_utils, keystone_utils)
and creating an OS image file within OpenStack
"""
self.nova = nova_utils.nova_client(self.os_creds)
and creating an OS image file within OpenStack
"""
self.nova = nova_utils.nova_client(self.os_creds)
+ self.keystone = keystone_utils.keystone_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.glance = glance_utils.glance_client(self.os_creds)
self.neutron = neutron_utils.neutron_client(self.os_creds)
self.inst_creator.create(block=True)
server = nova_utils.get_server(
self.inst_creator.create(block=True)
server = nova_utils.get_server(
- self.nova, self.neutron,
+ self.nova, self.neutron, self.keystone,
vm_inst_settings=self.inst_creator.instance_settings)
derived_vm_settings = settings_utils.create_vm_inst_config(
vm_inst_settings=self.inst_creator.instance_settings)
derived_vm_settings = settings_utils.create_vm_inst_config(
- self.nova, self.neutron, server, self.project_id)
+ self.nova, self.keystone, self.neutron, server,
+ self.os_creds.project_name)
self.assertIsNotNone(derived_vm_settings)
self.assertIsNotNone(derived_vm_settings.port_settings)
self.assertIsNotNone(derived_vm_settings.floating_ip_settings)
self.assertIsNotNone(derived_vm_settings)
self.assertIsNotNone(derived_vm_settings.port_settings)
self.assertIsNotNone(derived_vm_settings.floating_ip_settings)
self.inst_creator.create(block=True)
server = nova_utils.get_server(
self.inst_creator.create(block=True)
server = nova_utils.get_server(
- self.nova, self.neutron,
+ self.nova, self.neutron, self.keystone,
vm_inst_settings=self.inst_creator.instance_settings)
derived_image_settings = settings_utils.determine_image_config(
self.glance, server, [self.image_creator.image_settings])
vm_inst_settings=self.inst_creator.instance_settings)
derived_image_settings = settings_utils.determine_image_config(
self.glance, server, [self.image_creator.image_settings])