Logic to trigger Security Scan project
[functest.git] / functest / core / vnf_base.py
1 #!/usr/bin/env python
2
3 # Copyright (c) 2016 Orange and others.
4 #
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
9
10 import os
11 import time
12
13 import inspect
14
15
16 import functest.utils.functest_logger as ft_logger
17 import functest.utils.openstack_utils as os_utils
18 import functest.utils.functest_utils as ft_utils
19 import testcase_base as base
20 from functest.utils.constants import CONST
21
22
23 class VnfOnBoardingBase(base.TestcaseBase):
24
25     logger = ft_logger.Logger(__name__).getLogger()
26
27     def __init__(self, project='functest', case='', repo='', cmd=''):
28         super(VnfOnBoardingBase, self).__init__()
29         self.repo = repo
30         self.project_name = project
31         self.case_name = case
32         self.cmd = cmd
33         self.details = {}
34         self.data_dir = CONST.dir_functest_data
35         self.details['orchestrator'] = {}
36         self.details['vnf'] = {}
37         self.details['test_vnf'] = {}
38         self.images = {}
39         try:
40             self.tenant_name = CONST.__getattribute__(
41                 'vnf_{}_tenant_name'.format(self.case_name))
42             self.tenant_description = CONST.__getattribute__(
43                 'vnf_{}_tenant_description'.format(self.case_name))
44         except:
45             raise Exception("Unknown VNF case=" + self.case_name)
46
47         try:
48             self.images = CONST.__getattribute__(
49                 'vnf_{}_tenant_images'.format(self.case_name))
50         except:
51             self.logger.warn("No tenant image defined for this VNF")
52
53     def execute(self):
54         self.start_time = time.time()
55         # Prepare the test (Create Tenant, User, ...)
56         self.logger.info("Create VNF Onboarding environment")
57         self.prepare()
58
59         # Deploy orchestrator
60         try:
61             self.logger.info("Deploy orchestrator (if necessary)")
62             orchestrator_ready_time = time.time()
63             res_orchestrator = self.deploy_orchestrator()
64             # orchestrator is not mandatory
65             if res_orchestrator is not None:
66                 self.details['orchestrator']['status'] = (
67                     res_orchestrator['status'])
68                 self.details['orchestrator']['result'] = (
69                     res_orchestrator['result'])
70                 self.details['orchestrator']['duration'] = round(
71                     orchestrator_ready_time - self.start_time, 1)
72         except:
73             self.logger.warn("Problem with the Orchestrator")
74
75         # Deploy VNF
76         try:
77             self.logger.info("Deploy VNF " + self.case_name)
78             res_deploy_vnf = self.deploy_vnf()
79             vnf_ready_time = time.time()
80             self.details['vnf']['status'] = res_deploy_vnf['status']
81             self.details['vnf']['result'] = res_deploy_vnf['result']
82             self.details['vnf']['duration'] = round(
83                 vnf_ready_time - orchestrator_ready_time, 1)
84         except:
85             raise Exception("Error during VNF deployment")
86
87         # Test VNF
88         try:
89             self.logger.info("Test VNF")
90             res_test_vnf = self.test_vnf()
91             test_vnf_done_time = time.time()
92             self.details['test_vnf']['status'] = res_test_vnf['status']
93             self.details['test_vnf']['result'] = res_test_vnf['result']
94             self.details['test_vnf']['duration'] = round(
95                 test_vnf_done_time - vnf_ready_time, 1)
96         except:
97             raise Exception("Error when running VNF tests")
98
99         # Clean the system
100         self.clean()
101         self.stop_time = time.time()
102
103         exit_code = self.parse_results()
104         self.log_results()
105         return exit_code
106
107     # prepare state could consist in the creation of the resources
108     # a dedicated user
109     # a dedictaed tenant
110     # dedicated images
111     def prepare(self):
112         self.creds = os_utils.get_credentials()
113         self.keystone_client = os_utils.get_keystone_client()
114
115         self.logger.info("Prepare OpenStack plateform(create tenant and user)")
116         user_id = os_utils.get_user_id(self.keystone_client,
117                                        self.creds['username'])
118         if user_id == '':
119             self.step_failure("Failed to get id of " +
120                               self.creds['username'])
121
122         tenant_id = os_utils.create_tenant(
123             self.keystone_client, self.tenant_name, self.tenant_description)
124         if not tenant_id:
125             self.step_failure("Failed to create " +
126                               self.tenant_name + " tenant")
127
128         roles_name = ["admin", "Admin"]
129         role_id = ''
130         for role_name in roles_name:
131             if role_id == '':
132                 role_id = os_utils.get_role_id(self.keystone_client, role_name)
133
134         if role_id == '':
135             self.logger.error("Failed to get id for %s role" % role_name)
136             self.step_failure("Failed to get role id of " + role_name)
137
138         if not os_utils.add_role_user(self.keystone_client, user_id,
139                                       role_id, tenant_id):
140             self.logger.error("Failed to add %s on tenant" %
141                               self.creds['username'])
142             self.step_failure("Failed to add %s on tenant" %
143                               self.creds['username'])
144
145         user_id = os_utils.create_user(self.keystone_client,
146                                        self.tenant_name,
147                                        self.tenant_name,
148                                        None,
149                                        tenant_id)
150         if not user_id:
151             self.logger.error("Failed to create %s user" % self.tenant_name)
152             self.step_failure("Failed to create user ")
153
154         self.logger.info("Update OpenStack creds informations")
155         self.creds.update({
156             "tenant": self.tenant_name,
157         })
158         self.neutron_client = os_utils.get_neutron_client(self.creds)
159         self.nova_client = os_utils.get_nova_client(self.creds)
160         self.creds.update({
161             "username": self.tenant_name,
162             "password": self.tenant_name,
163         })
164         self.glance_client = os_utils.get_glance_client(self.creds)
165         self.logger.info("Upload some OS images if it doesn't exist")
166
167         temp_dir = os.path.join(self.data_dir, "tmp/")
168         for image_name, image_url in self.images.iteritems():
169             image_id = os_utils.get_image_id(self.glance_client, image_name)
170
171             if image_id == '':
172                 self.logger.info("""%s image doesn't exist on glance repository. Try
173                 downloading this image and upload on glance !""" % image_name)
174                 image_id = os_utils.download_and_add_image_on_glance(
175                     self.glance_client, image_name, image_url, temp_dir)
176
177             if image_id == '':
178                 self.step_failure(
179                     "Failed to find or upload required OS "
180                     "image for this deployment")
181
182         self.logger.info("Update security group quota for this tenant")
183
184         if not os_utils.update_sg_quota(self.neutron_client,
185                                         tenant_id, 50, 100):
186             self.step_failure("Failed to update security group quota" +
187                               " for tenant " + self.tenant_name)
188
189     # orchestrator is not mandatory to dpeloy and test VNF
190     def deploy_orchestrator(self, **kwargs):
191         pass
192
193     # TODO see how to use built-in exception from releng module
194     def deploy_vnf(self):
195         self.logger.error("VNF must be deployed")
196         raise Exception("VNF not deployed")
197
198     def test_vnf(self):
199         self.logger.error("VNF must be tested")
200         raise Exception("VNF not tested")
201
202     def clean(self):
203         self.logger.info("test cleaning")
204
205         self.logger.info("Removing %s tenant .." % self.tenant_name)
206         tenant_id = os_utils.get_tenant_id(self.keystone_client,
207                                            self.tenant_name)
208         if tenant_id == '':
209             self.logger.error("Error : Failed to get id of %s tenant" %
210                               self.tenant_name)
211         else:
212             if not os_utils.delete_tenant(self.keystone_client, tenant_id):
213                 self.logger.error("Error : Failed to remove %s tenant" %
214                                   self.tenant_name)
215
216         self.logger.info("Removing %s user .." % self.tenant_name)
217         user_id = os_utils.get_user_id(
218             self.keystone_client, self.tenant_name)
219         if user_id == '':
220             self.logger.error("Error : Failed to get id of %s user" %
221                               self.tenant_name)
222         else:
223             if not os_utils.delete_user(self.keystone_client, user_id):
224                 self.logger.error("Error : Failed to remove %s user" %
225                                   self.tenant_name)
226
227     def parse_results(self):
228         exit_code = self.EX_OK
229         self.criteria = "PASS"
230         self.logger.info(self.details)
231         # The 2 VNF steps must be OK to get a PASS result
232         if (self.details['vnf']['status'] is not "PASS" or
233                 self.details['test_vnf']['status'] is not "PASS"):
234             exit_code = self.EX_RUN_ERROR
235             self.criteria = "FAIL"
236         return exit_code
237
238     def log_results(self):
239         ft_utils.logger_test_results(self.project_name,
240                                      self.case_name,
241                                      self.criteria,
242                                      self.details)
243
244     def step_failure(self, error_msg):
245         part = inspect.stack()[1][3]
246         self.details[part]['status'] = 'FAIL'
247         self.details[part]['result'] = error_msg
248         raise Exception(error_msg)