3 # Copyright (c) 2016 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
16 from keystoneauth1.exceptions import auth_plugins
17 from robot.errors import RobotError
18 from robot.result import testcase
20 from functest.core import testcase_base
21 from functest.opnfv_tests.sdn.odl import odl
24 class ODLTesting(unittest.TestCase):
26 logging.disable(logging.CRITICAL)
28 _keystone_ip = "127.0.0.1"
29 _neutron_ip = "127.0.0.2"
30 _sdn_controller_ip = "127.0.0.3"
31 _os_tenantname = "admin"
32 _os_username = "admin"
33 _os_password = "admin"
35 _odl_restconfport = "8181"
36 _odl_username = "admin"
37 _odl_password = "admin"
40 for var in ("INSTALLER_TYPE", "SDN_CONTROLLER", "SDN_CONTROLLER_IP"):
43 os.environ["OS_USERNAME"] = self._os_username
44 os.environ["OS_PASSWORD"] = self._os_password
45 os.environ["OS_TENANT_NAME"] = self._os_tenantname
46 self.test = odl.ODLTests()
48 def test_empty_visitor(self):
49 visitor = odl.ODLResultVisitor()
50 self.assertFalse(visitor.get_data())
52 def test_visitor(self):
53 visitor = odl.ODLResultVisitor()
54 data = {'name': 'foo',
57 'starttime': "20161216 16:00:00.000",
58 'endtime': "20161216 16:00:01.000",
60 'text': 'Hello, World!',
62 test = testcase.TestCase(name=data['name'],
63 status=data['status'],
65 starttime=data['starttime'],
66 endtime=data['endtime'])
67 test.parent = mock.Mock()
68 config = {'name': data['parent'],
69 'criticality.test_is_critical.return_value': data[
71 test.parent.configure_mock(**config)
72 visitor.visit_test(test)
73 self.assertEqual(visitor.get_data(), [data])
75 @mock.patch('fileinput.input', side_effect=Exception())
76 def test_set_robotframework_vars_failed(self, *args):
77 self.assertFalse(self.test.set_robotframework_vars())
79 @mock.patch('fileinput.input', return_value=[])
80 def test_set_robotframework_vars(self, args):
81 self.assertTrue(self.test.set_robotframework_vars())
84 def _fake_url_for(cls, service_type='identity', **kwargs):
85 if service_type == 'identity':
86 return "http://{}:5000/v2.0".format(
87 ODLTesting._keystone_ip)
88 elif service_type == 'network':
89 return "http://{}:9696".format(ODLTesting._neutron_ip)
93 def _get_main_kwargs(self, key=None):
94 kwargs = {'odlusername': self._odl_username,
95 'odlpassword': self._odl_password,
96 'keystoneip': self._keystone_ip,
97 'neutronip': self._neutron_ip,
98 'osusername': self._os_username,
99 'ostenantname': self._os_tenantname,
100 'ospassword': self._os_password,
101 'odlip': self._sdn_controller_ip,
102 'odlwebport': self._odl_webport,
103 'odlrestconfport': self._odl_restconfport}
108 def _test_main(self, status, *args):
109 kwargs = self._get_main_kwargs()
110 self.assertEqual(self.test.main(**kwargs), status)
112 args[0].assert_called_once_with(
113 odl.ODLTests.res_dir)
115 variable = ['KEYSTONE:{}'.format(self._keystone_ip),
116 'NEUTRON:{}'.format(self._neutron_ip),
117 'OSUSERNAME:"{}"'.format(self._os_username),
118 'OSTENANTNAME:"{}"'.format(self._os_tenantname),
119 'OSPASSWORD:"{}"'.format(self._os_password),
120 'ODL_SYSTEM_IP:{}'.format(self._sdn_controller_ip),
121 'PORT:{}'.format(self._odl_webport),
122 'RESTCONFPORT:{}'.format(self._odl_restconfport)]
123 args[1].assert_called_once_with(
124 odl.ODLTests.basic_suite_dir,
125 odl.ODLTests.neutron_suite_dir,
127 output=os.path.join(odl.ODLTests.res_dir, 'output.xml'),
132 args[2].assert_called_with(
133 os.path.join(odl.ODLTests.res_dir, 'stdout.txt'))
135 def _test_main_missing_keyword(self, key):
136 kwargs = self._get_main_kwargs(key)
137 self.assertEqual(self.test.main(**kwargs),
138 testcase_base.TestcaseBase.EX_RUN_ERROR)
140 def test_main_missing_odlusername(self):
141 self._test_main_missing_keyword('odlusername')
143 def test_main_missing_odlpassword(self):
144 self._test_main_missing_keyword('odlpassword')
146 def test_main_missing_keystoneip(self):
147 self._test_main_missing_keyword('keystoneip')
149 def test_main_missing_neutronip(self):
150 self._test_main_missing_keyword('neutronip')
152 def test_main_missing_osusername(self):
153 self._test_main_missing_keyword('osusername')
155 def test_main_missing_ostenantname(self):
156 self._test_main_missing_keyword('ostenantname')
158 def test_main_missing_ospassword(self):
159 self._test_main_missing_keyword('ospassword')
161 def test_main_missing_odlip(self):
162 self._test_main_missing_keyword('odlip')
164 def test_main_missing_odlwebport(self):
165 self._test_main_missing_keyword('odlwebport')
167 def test_main_missing_odlrestconfport(self):
168 self._test_main_missing_keyword('odlrestconfport')
170 def test_main_set_robotframework_vars_failed(self):
171 with mock.patch.object(self.test, 'set_robotframework_vars',
173 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR)
174 self.test.set_robotframework_vars.assert_called_once_with(
175 self._odl_username, self._odl_password)
177 @mock.patch('os.makedirs', side_effect=Exception)
178 def test_main_makedirs_exception(self, mock_method):
179 with mock.patch.object(self.test, 'set_robotframework_vars',
180 return_value=True), \
181 self.assertRaises(Exception):
182 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR,
185 @mock.patch('os.makedirs', side_effect=OSError)
186 def test_main_makedirs_oserror(self, mock_method):
187 with mock.patch.object(self.test, 'set_robotframework_vars',
189 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR,
192 @mock.patch('robot.run', side_effect=RobotError)
193 @mock.patch('os.makedirs')
194 def test_main_robot_run_failed(self, *args):
195 with mock.patch.object(self.test, 'set_robotframework_vars',
196 return_value=True), \
197 self.assertRaises(RobotError):
198 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, *args)
200 @mock.patch('robot.run')
201 @mock.patch('os.makedirs')
202 def test_main_parse_results_failed(self, *args):
203 with mock.patch.object(self.test, 'set_robotframework_vars',
204 return_value=True), \
205 mock.patch.object(self.test, 'parse_results',
206 side_effect=RobotError):
207 self._test_main(testcase_base.TestcaseBase.EX_RUN_ERROR, *args)
209 @mock.patch('os.remove', side_effect=Exception)
210 @mock.patch('robot.run')
211 @mock.patch('os.makedirs')
212 def test_main_remove_exception(self, *args):
213 with mock.patch.object(self.test, 'set_robotframework_vars',
214 return_value=True), \
215 mock.patch.object(self.test, 'parse_results'), \
216 self.assertRaises(Exception):
217 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
219 @mock.patch('os.remove')
220 @mock.patch('robot.run')
221 @mock.patch('os.makedirs')
222 def test_main(self, *args):
223 with mock.patch.object(self.test, 'set_robotframework_vars',
224 return_value=True), \
225 mock.patch.object(self.test, 'parse_results'):
226 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
228 @mock.patch('os.remove')
229 @mock.patch('robot.run')
230 @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
231 def test_main_makedirs_oserror17(self, *args):
232 with mock.patch.object(self.test, 'set_robotframework_vars',
233 return_value=True), \
234 mock.patch.object(self.test, 'parse_results'):
235 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
237 @mock.patch('os.remove')
238 @mock.patch('robot.run', return_value=1)
239 @mock.patch('os.makedirs')
240 def test_main_testcases_in_failure(self, *args):
241 with mock.patch.object(self.test, 'set_robotframework_vars',
242 return_value=True), \
243 mock.patch.object(self.test, 'parse_results'):
244 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
246 @mock.patch('os.remove', side_effect=OSError)
247 @mock.patch('robot.run')
248 @mock.patch('os.makedirs')
249 def test_main_remove_oserror(self, *args):
250 with mock.patch.object(self.test, 'set_robotframework_vars',
251 return_value=True), \
252 mock.patch.object(self.test, 'parse_results'):
253 self._test_main(testcase_base.TestcaseBase.EX_OK, *args)
255 def _test_run_missing_env_var(self, var):
256 with mock.patch('functest.utils.openstack_utils.get_endpoint',
257 side_effect=self._fake_url_for):
259 self.assertEqual(self.test.run(),
260 testcase_base.TestcaseBase.EX_RUN_ERROR)
262 def _test_run(self, status=testcase_base.TestcaseBase.EX_OK,
263 exception=None, odlip="127.0.0.3", odlwebport="8080"):
264 with mock.patch('functest.utils.openstack_utils.get_endpoint',
265 side_effect=self._fake_url_for):
267 self.test.main = mock.Mock(side_effect=exception)
269 self.test.main = mock.Mock(return_value=status)
270 self.assertEqual(self.test.run(), status)
271 self.test.main.assert_called_once_with(
272 keystoneip=self._keystone_ip, neutronip=self._neutron_ip,
273 odlip=odlip, odlpassword=self._odl_password,
274 odlrestconfport=self._odl_restconfport,
275 odlusername=self._odl_username, odlwebport=odlwebport,
276 ospassword=self._os_password, ostenantname=self._os_tenantname,
277 osusername=self._os_username)
279 def test_run_exception(self):
280 with mock.patch('functest.utils.openstack_utils.get_endpoint',
281 side_effect=auth_plugins.MissingAuthPlugin()):
282 self.assertEqual(self.test.run(),
283 testcase_base.TestcaseBase.EX_RUN_ERROR)
285 def test_run_missing_os_username(self):
286 self._test_run_missing_env_var("OS_USERNAME")
288 def test_run_missing_os_password(self):
289 self._test_run_missing_env_var("OS_PASSWORD")
291 def test_run_missing_os_tenant_name(self):
292 self._test_run_missing_env_var("OS_TENANT_NAME")
294 def test_run_main_false(self):
295 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
296 self._test_run(testcase_base.TestcaseBase.EX_RUN_ERROR,
297 odlip=self._sdn_controller_ip,
298 odlwebport=self._odl_webport)
300 def test_run_main_exception(self):
301 with self.assertRaises(Exception):
302 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
303 self._test_run(status=testcase_base.TestcaseBase.EX_RUN_ERROR,
304 exception=Exception(),
305 odlip=self._sdn_controller_ip,
306 odlwebport=self._odl_webport)
308 def test_run_missing_sdn_controller_ip(self):
309 with mock.patch('functest.utils.openstack_utils.get_endpoint',
310 side_effect=self._fake_url_for):
311 self.assertEqual(self.test.run(),
312 testcase_base.TestcaseBase.EX_RUN_ERROR)
314 def test_run_without_installer_type(self):
315 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
316 self._test_run(testcase_base.TestcaseBase.EX_OK,
317 odlip=self._sdn_controller_ip,
318 odlwebport=self._odl_webport)
320 def test_run_fuel(self):
321 os.environ["INSTALLER_TYPE"] = "fuel"
322 self._test_run(testcase_base.TestcaseBase.EX_OK,
323 odlip=self._neutron_ip, odlwebport='8282')
325 def test_run_apex_missing_sdn_controller_ip(self):
326 with mock.patch('functest.utils.openstack_utils.get_endpoint',
327 side_effect=self._fake_url_for):
328 os.environ["INSTALLER_TYPE"] = "apex"
329 self.assertEqual(self.test.run(),
330 testcase_base.TestcaseBase.EX_RUN_ERROR)
332 def test_run_apex(self):
333 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
334 os.environ["INSTALLER_TYPE"] = "apex"
335 self._test_run(testcase_base.TestcaseBase.EX_OK,
336 odlip=self._sdn_controller_ip, odlwebport='8181')
338 def test_run_joid_missing_sdn_controller(self):
339 with mock.patch('functest.utils.openstack_utils.get_endpoint',
340 side_effect=self._fake_url_for):
341 os.environ["INSTALLER_TYPE"] = "joid"
342 self.assertEqual(self.test.run(),
343 testcase_base.TestcaseBase.EX_RUN_ERROR)
345 def test_run_joid(self):
346 os.environ["SDN_CONTROLLER"] = self._sdn_controller_ip
347 os.environ["INSTALLER_TYPE"] = "joid"
348 self._test_run(testcase_base.TestcaseBase.EX_OK,
349 odlip=self._sdn_controller_ip, odlwebport='8080')
351 def test_run_compass(self, *args):
352 os.environ["INSTALLER_TYPE"] = "compass"
353 self._test_run(testcase_base.TestcaseBase.EX_OK,
354 odlip=self._neutron_ip, odlwebport='8181')
357 if __name__ == "__main__":
358 unittest.main(verbosity=2)