self.__logger.info("Cloudify Manager is up and running")
return 0
+ def put_private_key(self):
+ """Put private keypair in manager"""
+ self.__logger.info("Put private keypair in manager")
+ scpc = scp.SCPClient(self.ssh.get_transport())
+ scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem')
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo docker cp ~/cloudify_ims.pem "
+ "cfy_manager_local:/etc/cloudify/ && "
+ "sudo docker exec cfy_manager_local "
+ "chmod 444 /etc/cloudify/cloudify_ims.pem")
+ self.__logger.debug("output:\n%s", stdout.read())
+ self.__logger.debug("error:\n%s", stderr.read())
+
+ def upload_cfy_plugins(self, yaml, wgn):
+ """Upload Cloudify plugins"""
+ (_, stdout, stderr) = self.ssh.exec_command(
+ "sudo docker exec cfy_manager_local "
+ "cfy plugins upload -y {} {} && "
+ "sudo docker exec cfy_manager_local cfy status".format(yaml, wgn))
+ self.__logger.debug("output:\n%s", stdout.read())
+ self.__logger.debug("error:\n%s", stderr.read())
+
+ def kill_existing_execution(self, dep_name):
+ """kill existing execution"""
+ try:
+ self.__logger.info('Deleting the current deployment')
+ exec_list = self.cfy_client.executions.list()
+ for execution in exec_list:
+ if execution['status'] == "started":
+ try:
+ self.cfy_client.executions.cancel(
+ execution['id'], force=True)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.warn("Can't cancel the current exec")
+ execution = self.cfy_client.executions.start(
+ dep_name, 'uninstall', parameters=dict(ignore_failure=True))
+ wait_for_execution(self.cfy_client, execution, self.__logger)
+ self.cfy_client.deployments.delete(dep_name)
+ time.sleep(10)
+ self.cfy_client.blueprints.delete(dep_name)
+ except Exception: # pylint: disable=broad-except
+ self.__logger.exception("Some issue during the undeployment ..")
+
def wait_for_execution(client, execution, logger, timeout=3600, ):
"""Wait for a workflow execution on Cloudify Manager."""
cloud_config=osconfig.get_one_cloud())
self.__logger.debug("new cloud %s", self.cloud.auth)
+ def get_environ(self):
+ "Get new environ"
+ environ = dict(
+ os.environ,
+ OS_USERNAME=self.user.name,
+ OS_PROJECT_NAME=self.project.name,
+ OS_PROJECT_ID=self.project.id,
+ OS_PASSWORD=self.password)
+ try:
+ del environ['OS_TENANT_NAME']
+ del environ['OS_TENANT_ID']
+ except Exception: # pylint: disable=broad-except
+ pass
+ return environ
+
def clean(self):
"""Remove projects/users"""
try:
try:
assert super(RallyBase, self).run(
**kwargs) == testcase.TestCase.EX_OK
- environ = dict(
- os.environ,
- OS_USERNAME=self.project.user.name,
- OS_PROJECT_NAME=self.project.project.name,
- OS_PROJECT_ID=self.project.project.id,
- OS_PASSWORD=self.project.password)
- try:
- del environ['OS_TENANT_NAME']
- del environ['OS_TENANT_ID']
- except Exception: # pylint: disable=broad-except
- pass
- self.create_rally_deployment(environ=environ)
+ self.create_rally_deployment(environ=self.project.get_environ())
self.prepare_run(**kwargs)
self.run_tests(**kwargs)
self._generate_report()
"""
if not os.path.exists(self.res_dir):
os.makedirs(self.res_dir)
- environ = dict(
- os.environ,
- OS_USERNAME=self.project.user.name,
- OS_PROJECT_NAME=self.project.project.name,
- OS_PROJECT_ID=self.project.project.id,
- OS_PASSWORD=self.project.password)
- try:
- del environ['OS_TENANT_NAME']
- del environ['OS_TENANT_ID']
- except Exception: # pylint: disable=broad-except
- pass
self.deployment_id = rally.RallyBase.create_rally_deployment(
- environ=environ)
+ environ=self.project.get_environ())
if not self.deployment_id:
raise Exception("Deployment create failed")
self.verifier_id = self.create_verifier()
vims_test_result["skipped"] = int(grp.group(3))
vims_test_result['passed'] = (
int(grp.group(2)) - int(grp.group(3)) - int(grp.group(1)))
+ if vims_test_result['total'] - vims_test_result['skipped'] > 0:
+ vnf_test_rate = vims_test_result['passed'] / (
+ vims_test_result['total'] - vims_test_result['skipped'])
+ else:
+ vnf_test_rate = 0
except Exception: # pylint: disable=broad-except
self.logger.exception("Cannot parse live tests results")
- return None
- return vims_test_result
+ return None, 0
+ return vims_test_result, vnf_test_rate
import time
import pkg_resources
-import scp
import six
from functest.core import cloudify
duration = time.time() - start_time
- self.__logger.info("Put private keypair in manager")
- scpc = scp.SCPClient(self.ssh.get_transport())
- scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem')
- (_, stdout, stderr) = self.ssh.exec_command(
- "sudo docker exec cfy_manager_local "
- "cfy plugins upload -y {} {} && "
- "sudo docker cp ~/cloudify_ims.pem "
- "cfy_manager_local:/etc/cloudify/ && "
- "sudo docker exec cfy_manager_local "
- "chmod 444 /etc/cloudify/cloudify_ims.pem && "
- "sudo docker exec cfy_manager_local cfy status".format(
- self.cop_yaml, self.cop_wgn))
- self.__logger.info("output:\n%s", stdout.read())
- self.__logger.info("error:\n%s", stderr.read())
+ self.put_private_key()
+ self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
self.details['orchestrator'].update(status='PASS', duration=duration)
def test_vnf(self):
"""Run test on clearwater ims instance."""
start_time = time.time()
-
dns_ip = self.cfy_client.deployments.outputs.get(
self.vnf['descriptor'].get('name'))['outputs']['dns_ip']
-
if not dns_ip:
return False
-
- short_result = self.clearwater.run_clearwater_live_test(
- dns_ip=dns_ip,
- public_domain=self.vnf['inputs']["public_domain"])
+ short_result, vnf_test_rate = self.clearwater.run_clearwater_live_test(
+ dns_ip=dns_ip, public_domain=self.vnf['inputs']["public_domain"])
duration = time.time() - start_time
self.__logger.info(short_result)
- self.details['test_vnf'].update(result=short_result,
- duration=duration)
- try:
- vnf_test_rate = short_result['passed'] / (
- short_result['total'] - short_result['skipped'])
- # orchestrator + vnf + test_vnf
- self.result += vnf_test_rate / 3 * 100
- except ZeroDivisionError:
- self.__logger.error("No test has been executed")
+ self.details['test_vnf'].update(result=short_result, duration=duration)
+ self.result += vnf_test_rate / 3 * 100
+ if vnf_test_rate == 0:
self.details['test_vnf'].update(status='FAIL')
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot calculate results")
- self.details['test_vnf'].update(status='FAIL')
- return False
return True if vnf_test_rate > 0 else False
def clean(self):
"""Clean created objects/functions."""
- try:
- dep_name = self.vnf['descriptor'].get('name')
- # kill existing execution
- self.__logger.info('Deleting the current deployment')
- exec_list = self.cfy_client.executions.list()
- for execution in exec_list:
- if execution['status'] == "started":
- try:
- self.cfy_client.executions.cancel(
- execution['id'], force=True)
- except Exception: # pylint: disable=broad-except
- self.__logger.warn("Can't cancel the current exec")
-
- execution = self.cfy_client.executions.start(
- dep_name,
- 'uninstall',
- parameters=dict(ignore_failure=True),
- force=True)
-
- cloudify.wait_for_execution(
- self.cfy_client, execution, self.__logger)
- self.cfy_client.deployments.delete(
- self.vnf['descriptor'].get('name'))
- time.sleep(10)
- self.cfy_client.blueprints.delete(
- self.vnf['descriptor'].get('name'))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue during the undeployment ..")
+ self.kill_existing_execution(self.vnf['descriptor'].get('name'))
if self.image_alt:
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
def test_vnf(self):
"""Run test on clearwater ims instance."""
start_time = time.time()
-
outputs = self.cloud.get_stack(self.stack.id).outputs
self.__logger.debug("stack outputs: %s", outputs)
dns_ip = re.findall(r'[0-9]+(?:\.[0-9]+){3}', str(outputs))[0]
-
if not dns_ip:
return False
-
- short_result = self.clearwater.run_clearwater_live_test(
- dns_ip=dns_ip,
- public_domain=self.vnf['parameters']["zone"])
+ short_result, vnf_test_rate = self.clearwater.run_clearwater_live_test(
+ dns_ip=dns_ip, public_domain=self.vnf['parameters']["zone"])
duration = time.time() - start_time
self.__logger.info(short_result)
- self.details['test_vnf'] = dict(result=short_result,
- duration=duration)
- try:
- vnf_test_rate = short_result['passed'] / (
- short_result['total'] - short_result['skipped'])
- # orchestrator + vnf + test_vnf
- self.result += vnf_test_rate / 3 * 100
- except ZeroDivisionError:
- self.__logger.error("No test has been executed")
+ self.details['test_vnf'] = dict(result=short_result, duration=duration)
+ self.result += vnf_test_rate / 3 * 100
+ if vnf_test_rate == 0:
self.details['test_vnf'].update(status='FAIL')
- return False
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Cannot calculate results")
- self.details['test_vnf'].update(status='FAIL')
- return False
self._monit()
return True if vnf_test_rate > 0 else False
import time
import pkg_resources
-import scp
from functest.core import cloudify
from functest.opnfv_tests.vnf.router import vrouter_base
# network creation
super(CloudifyVrouter, self).execute()
start_time = time.time()
- self.__logger.info("Put private keypair in manager")
- scpc = scp.SCPClient(self.ssh.get_transport())
- scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem')
- (_, stdout, stderr) = self.ssh.exec_command(
- "sudo docker exec cfy_manager_local "
- "cfy plugins upload -y {} {} && "
- "sudo docker cp ~/cloudify_ims.pem "
- "cfy_manager_local:/etc/cloudify/ && "
- "sudo docker exec cfy_manager_local "
- "chmod 444 /etc/cloudify/cloudify_ims.pem && "
- "sudo docker exec cfy_manager_local cfy status".format(
- self.cop_yaml, self.cop_wgn))
- self.__logger.info("output:\n%s", stdout.read())
- self.__logger.info("error:\n%s", stderr.read())
+ self.put_private_key()
+ self.upload_cfy_plugins(self.cop_yaml, self.cop_wgn)
self.image_alt = self.publish_image_alt()
self.flavor_alt = self.create_flavor_alt()
return True
def clean(self):
- try:
- dep_name = self.vnf['descriptor'].get('name')
- # kill existing execution
- self.__logger.info('Deleting the current deployment')
- exec_list = self.cfy_client.executions.list()
- for execution in exec_list:
- if execution['status'] == "started":
- try:
- self.cfy_client.executions.cancel(
- execution['id'], force=True)
- except Exception: # pylint: disable=broad-except
- self.__logger.warn("Can't cancel the current exec")
-
- execution = self.cfy_client.executions.start(
- dep_name, 'uninstall', parameters=dict(ignore_failure=True))
-
- cloudify.wait_for_execution(
- self.cfy_client, execution, self.__logger)
- self.cfy_client.deployments.delete(
- self.vnf['descriptor'].get('name'))
- time.sleep(10)
- self.cfy_client.blueprints.delete(
- self.vnf['descriptor'].get('name'))
- except Exception: # pylint: disable=broad-except
- self.__logger.exception("Some issue during the undeployment ..")
+ self.kill_existing_execution(self.vnf['descriptor'].get('name'))
if self.image_alt:
self.cloud.delete_image(self.image_alt)
if self.flavor_alt:
[testenv:pylint]
basepython = python2.7
-whitelist_externals = bash
-modules =
- functest.opnfv_tests.openstack.cinder
- functest.opnfv_tests.openstack.rally
- functest.opnfv_tests.openstack.refstack
- functest.opnfv_tests.openstack.tempest
- functest.opnfv_tests.openstack.vmtp
- functest.opnfv_tests.openstack.vping
- functest.opnfv_tests.sdn.odl
- functest.opnfv_tests.vnf.router
- functest.tests.unit.odl
- functest.tests.unit.openstack.rally
- functest.tests.unit.openstack.tempest
- functest.tests.unit.openstack.vmtp
- functest.tests.unit.openstack.vping
- functest.tests.unit.vnf.router
- functest.tests.unit.utils
- functest.utils.config
- functest.utils.constants
- functest.utils.env
- functest.utils.functest_utils
commands =
- bash -c "\
- pylint -f parseable \
- --ignore-imports=y \
- --disable=locally-disabled functest | tee pylint.out"
- pylint --reports=n --errors-only functest
- pylint --disable=locally-disabled \
- --disable=duplicate-code \
- --ignore-imports=y --reports=n {[testenv:pylint]modules}
+ pylint \
+ --ignore-imports=y --min-similarity-lines=10 \
+ --disable=locally-disabled functest
[testenv:yamllint]
basepython = python2.7