from functest.core import tenantnetwork
from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
class VmReady1(tenantnetwork.TenantNetwork1):
if "case_name" not in kwargs:
kwargs["case_name"] = 'vmready1'
super(VmReady1, self).__init__(**kwargs)
- self.orig_cloud = self.cloud
self.image = None
self.flavor = None
"""
assert self.cloud
extra_properties = self.extra_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
extra_properties.update(
getattr(config.CONF, '{}_extra_properties'.format(
self.case_name), {}))
"""
assert self.cloud
extra_alt_properties = self.extra_alt_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_alt_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
extra_alt_properties.update(
getattr(config.CONF, '{}_extra_alt_properties'.format(
self.case_name), {}))
self.flavor_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_extra_specs = self.flavor_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
flavor_extra_specs.update(
getattr(config.CONF,
'{}_flavor_extra_specs'.format(self.case_name), {}))
self.flavor_alt_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_alt_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
flavor_alt_extra_specs.update(
getattr(config.CONF,
'{}_flavor_alt_extra_specs'.format(self.case_name), {}))
console = self.cloud.get_server_console(name)
self.__logger.debug("console: \n%s", console)
if re.search(regex, console):
- self.__logger.debug("regex found: ''%s' in console", regex)
- return True
- else:
self.__logger.debug(
- "try %s: cannot find regex '%s' in console",
- iloop + 1, regex)
- time.sleep(10)
+ "regex found: '%s' in console\n%s", regex, console)
+ return True
+ self.__logger.debug(
+ "try %s: cannot find regex '%s' in console\n%s",
+ iloop + 1, regex, console)
+ time.sleep(10)
self.__logger.error("cannot find regex '%s' in console", regex)
return False
+ def clean_orphan_security_groups(self):
+ """Clean all security groups which are not owned by an existing tenant
+
+ It lists all orphan security groups in use as debug to avoid
+ misunderstanding the testcase results (it could happen if cloud admin
+ removes accounts without cleaning the virtual machines)
+ """
+ sec_groups = self.orig_cloud.list_security_groups()
+ for sec_group in sec_groups:
+ if not sec_group.tenant_id:
+ continue
+ if not self.orig_cloud.get_project(sec_group.tenant_id):
+ self.__logger.debug("Cleaning security group %s", sec_group.id)
+ try:
+ self.orig_cloud.delete_security_group(sec_group.id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.debug(
+ "Orphan security group %s in use", sec_group.id)
+
+ def count_hypervisors(self):
+ """Count hypervisors."""
+ if env.get('SKIP_DOWN_HYPERVISORS').lower() == 'false':
+ return len(self.orig_cloud.list_hypervisors())
+ return self.count_active_hypervisors()
+
+ def count_active_hypervisors(self):
+ """Count all hypervisors which are up."""
+ compute_cnt = 0
+ for hypervisor in self.orig_cloud.list_hypervisors():
+ if hypervisor['state'] == 'up':
+ compute_cnt += 1
+ else:
+ self.__logger.warning(
+ "%s is down", hypervisor['hypervisor_hostname'])
+ return compute_cnt
+
def run(self, **kwargs):
"""Boot the new VM
self.cloud.delete_image(self.image.id)
if self.flavor:
self.orig_cloud.delete_flavor(self.flavor.id)
+ if env.get('CLEAN_ORPHAN_SECURITY_GROUPS').lower() == 'true':
+ self.clean_orphan_security_groups()
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean all resources")
self.keypair = self.cloud.create_keypair(
'{}-kp_{}'.format(self.case_name, self.guid))
self.__logger.debug("keypair: %s", self.keypair)
- self.__logger.debug("private_key: %s", self.keypair.private_key)
+ self.__logger.debug("private_key:\n%s", self.keypair.private_key)
with open(self.key_filename, 'w') as private_key_file:
private_key_file.write(self.keypair.private_key)
self.sec = self.cloud.create_security_group(