3 # Copyright (c) 2017 Okinawa Open Laboratory and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # pylint: disable=missing-docstring
12 """vrouter testcase implementation."""
21 from functest.core import cloudify
22 from functest.opnfv_tests.vnf.router import vrouter_base
23 from functest.opnfv_tests.vnf.router.utilvnf import Utilvnf
24 from functest.utils import config
25 from functest.utils import functest_utils
28 __author__ = "Shuya Nakama <shuya.nakama@okinawaopenlabs.org>"
31 class CloudifyVrouter(cloudify.Cloudify):
32 # pylint: disable=too-many-instance-attributes
33 """vrouter testcase deployed with Cloudify Orchestrator."""
35 __logger = logging.getLogger(__name__)
37 filename_alt = '/home/opnfv/functest/images/vyos-1.1.7.img'
43 def __init__(self, **kwargs):
44 if "case_name" not in kwargs:
45 kwargs["case_name"] = "vyos_vrouter"
46 super(CloudifyVrouter, self).__init__(**kwargs)
48 # Retrieve the configuration
50 self.config = getattr(
51 config.CONF, 'vnf_{}_config'.format(self.case_name))
53 raise Exception("VNF config file not found")
55 self.case_dir = pkg_resources.resource_filename(
56 'functest', 'opnfv_tests/vnf/router')
57 config_file = os.path.join(self.case_dir, self.config)
58 self.orchestrator = dict(
59 requirements=functest_utils.get_parameter_from_yaml(
60 "orchestrator.requirements", config_file),
62 self.details['orchestrator'] = dict(
63 name=functest_utils.get_parameter_from_yaml(
64 "orchestrator.name", config_file),
65 version=functest_utils.get_parameter_from_yaml(
66 "orchestrator.version", config_file),
70 self.__logger.debug("Orchestrator configuration %s", self.orchestrator)
71 self.__logger.debug("name = %s", __name__)
73 descriptor=functest_utils.get_parameter_from_yaml(
74 "vnf.descriptor", config_file),
75 inputs=functest_utils.get_parameter_from_yaml(
76 "vnf.inputs", config_file),
77 requirements=functest_utils.get_parameter_from_yaml(
78 "vnf.requirements", config_file)
80 self.details['vnf'] = dict(
81 descriptor_version=self.vnf['descriptor']['version'],
82 name=functest_utils.get_parameter_from_yaml(
83 "vnf.name", config_file),
84 version=functest_utils.get_parameter_from_yaml(
85 "vnf.version", config_file),
87 self.__logger.debug("VNF configuration: %s", self.vnf)
90 self.util.set_credentials(self.cloud)
91 credentials = {"cloud": self.cloud}
92 self.util_info = {"credentials": credentials,
93 "vnf_data_dir": self.util.vnf_data_dir}
95 self.details['test_vnf'] = dict(
96 name=functest_utils.get_parameter_from_yaml(
97 "vnf_test_suite.name", config_file),
98 version=functest_utils.get_parameter_from_yaml(
99 "vnf_test_suite.version", config_file)
101 self.images = functest_utils.get_parameter_from_yaml(
102 "tenant_images", config_file)
103 self.__logger.info("Images needed for vrouter: %s", self.images)
105 self.image_alt = None
106 self.flavor_alt = None
109 # pylint: disable=too-many-locals,too-many-statements
111 Deploy Cloudify Manager.
112 network, security group, fip, VM creation
115 super(CloudifyVrouter, self).execute()
116 start_time = time.time()
117 self.__logger.info("Put private keypair in manager")
118 scpc = scp.SCPClient(self.ssh.get_transport())
119 scpc.put(self.key_filename, remote_path='~/cloudify_ims.pem')
120 (_, stdout, stderr) = self.ssh.exec_command(
121 "sudo cp ~/cloudify_ims.pem /etc/cloudify/ && "
122 "sudo chmod 444 /etc/cloudify/cloudify_ims.pem && "
123 "sudo yum install -y gcc python-devel python-cmd2 && "
125 self.__logger.info("output:\n%s", stdout.read())
126 self.__logger.info("error:\n%s", stderr.read())
128 self.image_alt = self.publish_image_alt()
129 self.flavor_alt = self.create_flavor_alt()
131 duration = time.time() - start_time
132 self.details['orchestrator'].update(status='PASS', duration=duration)
134 self.vnf['inputs'].update(dict(
135 external_network_name=self.ext_net.name))
136 self.vnf['inputs'].update(dict(
137 target_vnf_image_id=self.image_alt.id))
138 self.vnf['inputs'].update(dict(
139 reference_vnf_image_id=self.image_alt.id))
140 self.vnf['inputs'].update(dict(
141 target_vnf_flavor_id=self.flavor_alt.id))
142 self.vnf['inputs'].update(dict(
143 reference_vnf_flavor_id=self.flavor_alt.id))
144 self.vnf['inputs'].update(dict(
145 keystone_username=self.project.user.name))
146 self.vnf['inputs'].update(dict(
147 keystone_password=self.project.password))
148 self.vnf['inputs'].update(dict(
149 keystone_tenant_name=self.project.project.name))
150 self.vnf['inputs'].update(dict(
151 keystone_user_domain_name=os.environ.get(
152 'OS_USER_DOMAIN_NAME', 'Default')))
153 self.vnf['inputs'].update(dict(
154 keystone_project_domain_name=os.environ.get(
155 'OS_PROJECT_DOMAIN_NAME', 'Default')))
156 self.vnf['inputs'].update(dict(
157 region=os.environ.get('OS_REGION_NAME', 'RegionOne')))
158 self.vnf['inputs'].update(dict(
159 keystone_url=self.get_public_auth_url(self.orig_cloud)))
161 if self.deploy_vnf() and self.test_vnf():
164 self.result = 1/3 * 100
167 def deploy_vnf(self):
168 start_time = time.time()
169 self.__logger.info("Upload VNFD")
170 descriptor = self.vnf['descriptor']
171 self.util_info["cfy"] = self.cfy_client
172 self.util_info["cfy_manager_ip"] = self.fip.floating_ip_address
173 self.util_info["deployment_name"] = descriptor.get('name')
175 self.cfy_client.blueprints.upload(
176 descriptor.get('file_name'), descriptor.get('name'))
178 self.__logger.info("Create VNF Instance")
179 self.cfy_client.deployments.create(
180 descriptor.get('name'), descriptor.get('name'),
181 self.vnf.get('inputs'))
183 cloudify.wait_for_execution(
184 self.cfy_client, cloudify.get_execution_id(
185 self.cfy_client, descriptor.get('name')),
186 self.__logger, timeout=7200)
188 self.__logger.info("Start the VNF Instance deployment")
189 execution = self.cfy_client.executions.start(
190 descriptor.get('name'), 'install')
192 execution = cloudify.wait_for_execution(
193 self.cfy_client, execution, self.__logger)
195 duration = time.time() - start_time
197 self.__logger.info(execution)
198 if execution.status == 'terminated':
199 self.details['vnf'].update(status='PASS', duration=duration)
202 self.details['vnf'].update(status='FAIL', duration=duration)
207 start_time = time.time()
208 testing = vrouter_base.VrouterOnBoardingBase(
209 self.case_name, self.util, self.util_info)
210 result, test_result_data = testing.test_vnf()
211 duration = time.time() - start_time
213 self.details['test_vnf'].update(
214 status='PASS', result='OK', full_result=test_result_data,
217 self.details['test_vnf'].update(
218 status='FAIL', result='NG', full_result=test_result_data,
224 dep_name = self.vnf['descriptor'].get('name')
225 # kill existing execution
226 self.__logger.info('Deleting the current deployment')
227 exec_list = self.cfy_client.executions.list()
228 for execution in exec_list:
229 if execution['status'] == "started":
231 self.cfy_client.executions.cancel(
232 execution['id'], force=True)
233 except Exception: # pylint: disable=broad-except
234 self.__logger.warn("Can't cancel the current exec")
236 execution = self.cfy_client.executions.start(
237 dep_name, 'uninstall', parameters=dict(ignore_failure=True))
239 cloudify.wait_for_execution(
240 self.cfy_client, execution, self.__logger)
241 self.cfy_client.deployments.delete(
242 self.vnf['descriptor'].get('name'))
243 self.cfy_client.blueprints.delete(
244 self.vnf['descriptor'].get('name'))
245 except Exception: # pylint: disable=broad-except
246 self.__logger.exception("Some issue during the undeployment ..")
248 self.cloud.delete_image(self.image_alt)
250 self.orig_cloud.delete_flavor(self.flavor_alt.id)
251 super(CloudifyVrouter, self).clean()