# Delete security group 'default' if exists
neutron = neutron_utils.neutron_client(self.__os_creds)
default_sec_grp = neutron_utils.get_security_group(
- neutron, 'default',
- tenant_id=self.__project.id)
+ neutron, sec_grp_name='default',
+ project_id=self.__project.id)
if default_sec_grp:
try:
neutron_utils.delete_security_group(
'Creating security group %s...' % self.sec_grp_settings.name)
self.__security_group = neutron_utils.get_security_group(
- self.__neutron, self.sec_grp_settings.name)
+ self.__neutron, sec_grp_settings=self.sec_grp_settings)
if not self.__security_group and not cleanup:
# Create the security group
self.__security_group = neutron_utils.create_security_group(
# Refresh security group object to reflect the new rules added
self.__security_group = neutron_utils.get_security_group(
- self.__neutron, self.sec_grp_settings.name)
+ self.__neutron, sec_grp_settings=self.sec_grp_settings)
else:
# Populate rules
existing_rules = neutron_utils.get_rules_by_security_group(
out['protocol'] = self.protocol.name
if self.sec_grp_name:
sec_grp = neutron_utils.get_security_group(
- neutron, self.sec_grp_name)
+ neutron, sec_grp_name=self.sec_grp_name)
if sec_grp:
out['security_group_id'] = sec_grp.id
else:
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
self.admin_os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp)
validation_utils.objects_equivalent(
neutron_utils.delete_security_group(self.neutron, created_sec_grp)
self.assertIsNone(neutron_utils.get_security_group(
- self.neutron, self.sec_grp_creator.sec_grp_settings.name))
+ self.neutron,
+ sec_grp_settings=self.sec_grp_creator.sec_grp_settings))
self.sec_grp_creator.clean()
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
rules = neutron_utils.get_rules_by_security_group(
self.os_creds, sec_grp_settings)
self.sec_grp_creator.create()
- sec_grp = neutron_utils.get_security_group(self.neutron,
- self.sec_grp_name)
+ sec_grp = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
validation_utils.objects_equivalent(
self.sec_grp_creator.get_security_group(), sec_grp)
setting_proto = rule_setting.protocol.name
sec_grp = neutron_utils.get_security_group(
- neutron, rule_setting.sec_grp_name)
+ neutron, sec_grp_name=rule_setting.sec_grp_name)
setting_eth_type = create_security_group.Ethertype.IPv4
if rule_setting.ethertype:
neutron.delete_security_group(sec_grp.id)
-def get_security_group(neutron, name, tenant_id=None):
+def get_security_group(neutron, sec_grp_settings=None, sec_grp_name=None,
+ project_id=None):
"""
- Returns the first security group object of the given name else None
+ Returns the first security group for a given query. The query gets built
+ from the sec_grp_settings parameter if not None, else only the name of
+ the security group will be used, else if the query parameters are None then
+ None will be returned
:param neutron: the client
- :param name: the name of security group object to retrieve
+ :param sec_grp_settings: an instance of SecurityGroupSettings config object
+ :param sec_grp_name: the name of security group object to retrieve
+ :param project_id: the ID of the project/tentant object that owns the
+ secuity group to retrieve
:return: a SNAPS-OO SecurityGroup domain object or None if not found
"""
- logger.info('Retrieving security group with name - ' + name)
- filter = {'name': name}
- if tenant_id:
- filter['tenant_id'] = tenant_id
- groups = neutron.list_security_groups(**filter)
+ sec_grp_filter = dict()
+ if project_id:
+ sec_grp_filter['tenant_id'] = project_id
+
+ if sec_grp_settings:
+ sec_grp_filter['name'] = sec_grp_settings.name
+
+ if sec_grp_settings.description:
+ sec_grp_filter['description'] = sec_grp_settings.description
+ elif sec_grp_name:
+ sec_grp_filter['name'] = sec_grp_name
+ else:
+ return None
+
+ groups = neutron.list_security_groups(**sec_grp_filter)
for group in groups['security_groups']:
- if group['name'] == name:
- return SecurityGroup(**group)
- return None
+ return SecurityGroup(**group)
def get_security_group_by_id(neutron, sec_grp_id):
if self.project:
neutron = neutron_utils.neutron_client(self.os_creds)
default_sec_grp = neutron_utils.get_security_group(
- neutron, 'default',
- tenant_id=self.project.id)
+ neutron, sec_grp_name='default',
+ project_id=self.project.id)
if default_sec_grp:
try:
neutron_utils.delete_security_group(
self.assertTrue(sec_grp_settings.name, security_group.name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertTrue(validation_utils.objects_equivalent(
security_group, sec_grp_get))
neutron_utils.delete_security_group(self.neutron, security_group)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNone(sec_grp_get)
def test_create_sec_grp_no_name(self):
self.assertTrue(sec_grp_settings.name, self.security_groups[0].name)
self.assertEqual(sec_grp_settings.name, self.security_groups[0].name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(self.security_groups[0], sec_grp_get)
# Refresh object so it is populated with the newly added rule
security_group = neutron_utils.get_security_group(
- self.neutron, sec_grp_settings.name)
+ self.neutron, sec_grp_settings=sec_grp_settings)
rules = neutron_utils.get_rules_by_security_group(self.neutron,
security_group)
self.assertTrue(sec_grp_settings.name, security_group.name)
- sec_grp_get = neutron_utils.get_security_group(self.neutron,
- sec_grp_settings.name)
+ sec_grp_get = neutron_utils.get_security_group(
+ self.neutron, sec_grp_settings=sec_grp_settings)
self.assertIsNotNone(sec_grp_get)
self.assertEqual(security_group, sec_grp_get)