from functest.core import tenantnetwork
from functest.utils import config
+from functest.utils import env
+from functest.utils import functest_utils
class VmReady1(tenantnetwork.TenantNetwork1):
# pylint: disable=too-many-instance-attributes
__logger = logging.getLogger(__name__)
- filename = '/home/opnfv/functest/images/cirros-0.4.0-x86_64-disk.img'
+ filename = '/home/opnfv/functest/images/cirros-0.5.1-x86_64-disk.img'
image_format = 'qcow2'
extra_properties = {}
filename_alt = filename
"""
assert self.cloud
extra_properties = self.extra_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
extra_properties.update(
getattr(config.CONF, '{}_extra_properties'.format(
self.case_name), {}))
"""
assert self.cloud
extra_alt_properties = self.extra_alt_properties.copy()
+ if env.get('IMAGE_PROPERTIES'):
+ extra_alt_properties.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('IMAGE_PROPERTIES')))
extra_alt_properties.update(
getattr(config.CONF, '{}_extra_alt_properties'.format(
self.case_name), {}))
self.flavor_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_extra_specs = self.flavor_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
flavor_extra_specs.update(
getattr(config.CONF,
'{}_flavor_extra_specs'.format(self.case_name), {}))
self.flavor_alt_disk))
self.__logger.debug("flavor: %s", flavor)
flavor_alt_extra_specs = self.flavor_alt_extra_specs.copy()
+ if env.get('FLAVOR_EXTRA_SPECS'):
+ flavor_alt_extra_specs.update(
+ functest_utils.convert_ini_to_dict(
+ env.get('FLAVOR_EXTRA_SPECS')))
flavor_alt_extra_specs.update(
getattr(config.CONF,
'{}_flavor_alt_extra_specs'.format(self.case_name), {}))
vm1 = self.cloud.create_server(
name if name else '{}-vm_{}'.format(self.case_name, self.guid),
image=self.image.id, flavor=self.flavor.id,
- auto_ip=False, network=self.network.id,
+ auto_ip=False,
+ network=self.network.id if self.network else env.get(
+ "EXTERNAL_NETWORK"),
timeout=self.create_server_timeout, wait=True, **kwargs)
self.__logger.debug("vm: %s", vm1)
return vm1
- def check_regex_in_console(self, name, regex=' login: ', loop=1):
+ def check_regex_in_console(self, name, regex=' login: ', loop=6):
"""Wait for specific message in console
Returns: True or False on errors
console = self.cloud.get_server_console(name)
self.__logger.debug("console: \n%s", console)
if re.search(regex, console):
- self.__logger.debug("regex found: ''%s' in console", regex)
- return True
- else:
self.__logger.debug(
- "try %s: cannot find regex '%s' in console",
- iloop + 1, regex)
- time.sleep(10)
+ "regex found: '%s' in console\n%s", regex, console)
+ return True
+ self.__logger.debug(
+ "try %s: cannot find regex '%s' in console\n%s",
+ iloop + 1, regex, console)
+ time.sleep(10)
self.__logger.error("cannot find regex '%s' in console", regex)
return False
+ def clean_orphan_security_groups(self):
+ """Clean all security groups which are not owned by an existing tenant
+
+ It lists all orphan security groups in use as debug to avoid
+ misunderstanding the testcase results (it could happen if cloud admin
+ removes accounts without cleaning the virtual machines)
+ """
+ sec_groups = self.orig_cloud.list_security_groups()
+ for sec_group in sec_groups:
+ if not sec_group.tenant_id:
+ continue
+ if not self.orig_cloud.get_project(sec_group.tenant_id):
+ self.__logger.debug("Cleaning security group %s", sec_group.id)
+ try:
+ self.orig_cloud.delete_security_group(sec_group.id)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.debug(
+ "Orphan security group %s in use", sec_group.id)
+
+ def count_hypervisors(self):
+ """Count hypervisors."""
+ if env.get('SKIP_DOWN_HYPERVISORS').lower() == 'false':
+ return len(self.orig_cloud.list_hypervisors())
+ return self.count_active_hypervisors()
+
+ def count_active_hypervisors(self):
+ """Count all hypervisors which are up."""
+ compute_cnt = 0
+ for hypervisor in self.orig_cloud.list_hypervisors():
+ if hypervisor['state'] == 'up':
+ compute_cnt += 1
+ else:
+ self.__logger.warning(
+ "%s is down", hypervisor['hypervisor_hostname'])
+ return compute_cnt
+
def run(self, **kwargs):
"""Boot the new VM
self.cloud.delete_image(self.image.id)
if self.flavor:
self.orig_cloud.delete_flavor(self.flavor.id)
+ if env.get('CLEAN_ORPHAN_SECURITY_GROUPS').lower() == 'true':
+ self.clean_orphan_security_groups()
except Exception: # pylint: disable=broad-except
self.__logger.exception("Cannot clean all resources")
ssh_connect_timeout = 1
ssh_connect_loops = 6
create_floating_ip_timeout = 120
+ check_console_loop = 6
+ check_console_regex = ' login: '
def __init__(self, **kwargs):
if "case_name" not in kwargs:
self.keypair = self.cloud.create_keypair(
'{}-kp_{}'.format(self.case_name, self.guid))
self.__logger.debug("keypair: %s", self.keypair)
- self.__logger.debug("private_key: %s", self.keypair.private_key)
+ self.__logger.debug("private_key:\n%s", self.keypair.private_key)
with open(self.key_filename, 'w') as private_key_file:
private_key_file.write(self.keypair.private_key)
self.sec = self.cloud.create_security_group(
- None on error
"""
assert vm1
- fip = self.cloud.create_floating_ip(
- network=self.ext_net.id, server=vm1, wait=True,
- timeout=self.create_floating_ip_timeout)
- self.__logger.debug("floating_ip: %s", fip)
+ fip = None
+ if env.get('NO_TENANT_NETWORK').lower() != 'true':
+ fip = self.cloud.create_floating_ip(
+ network=self.ext_net.id, server=vm1, wait=True,
+ timeout=self.create_floating_ip_timeout)
+ self.__logger.debug("floating_ip: %s", fip)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.client.AutoAddPolicy())
for loop in range(self.ssh_connect_loops):
p_console = self.cloud.get_server_console(vm1)
self.__logger.debug("vm console: \n%s", p_console)
ssh.connect(
- fip.floating_ip_address,
+ fip.floating_ip_address if fip else vm1.public_v4,
username=getattr(
config.CONF,
'{}_image_user'.format(self.case_name), self.username),
except Exception as exc: # pylint: disable=broad-except
self.__logger.debug(
"try %s: cannot connect to %s: %s", loop + 1,
- fip.floating_ip_address, exc)
+ fip.floating_ip_address if fip else vm1.public_v4, exc)
time.sleep(9)
else:
self.__logger.error(
Returns: echo exit codes
"""
(_, stdout, stderr) = self.ssh.exec_command('echo Hello World')
- self.__logger.debug("output:\n%s", stdout.read())
- self.__logger.debug("error:\n%s", stderr.read())
+ self.__logger.debug("output:\n%s", stdout.read().decode("utf-8"))
+ self.__logger.debug("error:\n%s", stderr.read().decode("utf-8"))
return stdout.channel.recv_exit_status()
def run(self, **kwargs):
self.prepare()
self.sshvm = self.boot_vm(
key_name=self.keypair.id, security_groups=[self.sec.id])
- (self.fip, self.ssh) = self.connect(self.sshvm)
- if not self.execute():
- self.result = 100
- status = testcase.TestCase.EX_OK
+ if self.check_regex_in_console(
+ self.sshvm.name, regex=self.check_console_regex,
+ loop=self.check_console_loop):
+ (self.fip, self.ssh) = self.connect(self.sshvm)
+ if not self.execute():
+ self.result = 100
+ status = testcase.TestCase.EX_OK
except Exception: # pylint: disable=broad-except
self.__logger.exception('Cannot run %s', self.case_name)
finally: