3 # Copyright (c) 2016 Orange and others.
5 # All rights reserved. This program and the accompanying materials
6 # are made available under the terms of the Apache License, Version 2.0
7 # which accompanies this distribution, and is available at
8 # http://www.apache.org/licenses/LICENSE-2.0
17 from keystoneauth1.exceptions import auth_plugins
18 from robot.errors import DataError, RobotError
19 from robot.result import testcase as result_testcase
20 from robot.utils.robottime import timestamp_to_secs
22 from functest.core import testcase
23 from functest.opnfv_tests.sdn.odl import odl
26 class ODLTesting(unittest.TestCase):
28 logging.disable(logging.CRITICAL)
30 _keystone_ip = "127.0.0.1"
31 _neutron_ip = "127.0.0.2"
32 _sdn_controller_ip = "127.0.0.3"
33 _os_auth_url = "http://{}:5000/v2.0".format(_keystone_ip)
34 _os_tenantname = "admin"
35 _os_username = "admin"
36 _os_password = "admin"
38 _odl_restconfport = "8181"
39 _odl_username = "admin"
40 _odl_password = "admin"
43 for var in ("INSTALLER_TYPE", "SDN_CONTROLLER", "SDN_CONTROLLER_IP"):
46 os.environ["OS_AUTH_URL"] = self._os_auth_url
47 os.environ["OS_USERNAME"] = self._os_username
48 os.environ["OS_PASSWORD"] = self._os_password
49 os.environ["OS_TENANT_NAME"] = self._os_tenantname
50 self.test = odl.ODLTests()
51 self.defaultargs = {'odlusername': self._odl_username,
52 'odlpassword': self._odl_password,
53 'neutronip': self._keystone_ip,
54 'osauthurl': self._os_auth_url,
55 'osusername': self._os_username,
56 'ostenantname': self._os_tenantname,
57 'ospassword': self._os_password,
58 'odlip': self._keystone_ip,
59 'odlwebport': self._odl_webport,
60 'odlrestconfport': self._odl_restconfport,
63 def test_empty_visitor(self):
64 visitor = odl.ODLResultVisitor()
65 self.assertFalse(visitor.get_data())
67 def test_visitor(self):
68 visitor = odl.ODLResultVisitor()
69 data = {'name': 'foo',
72 'starttime': "20161216 16:00:00.000",
73 'endtime': "20161216 16:00:01.000",
75 'text': 'Hello, World!',
77 test = result_testcase.TestCase(name=data['name'],
78 status=data['status'],
80 starttime=data['starttime'],
81 endtime=data['endtime'])
82 test.parent = mock.Mock()
83 config = {'name': data['parent'],
84 'criticality.test_is_critical.return_value': data[
86 test.parent.configure_mock(**config)
87 visitor.visit_test(test)
88 self.assertEqual(visitor.get_data(), [data])
90 @mock.patch('robot.api.ExecutionResult', side_effect=DataError)
91 def test_parse_results_raises_exceptions(self, *args):
92 with self.assertRaises(DataError):
93 self.test.parse_results()
95 def test_parse_results(self, *args):
96 config = {'name': 'dummy', 'starttime': '20161216 16:00:00.000',
97 'endtime': '20161216 16:00:01.000', 'status': 'PASS'}
99 suite.configure_mock(**config)
100 with mock.patch('robot.api.ExecutionResult',
101 return_value=mock.Mock(suite=suite)):
102 self.test.parse_results()
103 self.assertEqual(self.test.criteria, config['status'])
104 self.assertEqual(self.test.start_time,
105 timestamp_to_secs(config['starttime']))
106 self.assertEqual(self.test.stop_time,
107 timestamp_to_secs(config['endtime']))
108 self.assertEqual(self.test.details,
109 {'description': config['name'], 'tests': []})
111 @mock.patch('fileinput.input', side_effect=Exception())
112 def test_set_robotframework_vars_failed(self, *args):
113 self.assertFalse(self.test.set_robotframework_vars())
115 @mock.patch('fileinput.input', return_value=[])
116 def test_set_robotframework_vars_empty(self, args):
117 self.assertTrue(self.test.set_robotframework_vars())
119 @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
120 def _test_set_robotframework_vars(self, msg1, msg2, *args):
121 line = mock.MagicMock()
122 line.__iter__.return_value = [msg1]
123 with mock.patch('fileinput.input', return_value=line) as mock_method:
124 self.assertTrue(self.test.set_robotframework_vars())
125 mock_method.assert_called_once_with(
126 os.path.join(odl.ODLTests.odl_test_repo,
127 'csit/variables/Variables.py'), inplace=True)
128 self.assertEqual(args[0].getvalue(), "{}\n".format(msg2))
130 def test_set_robotframework_vars_auth_default(self):
131 self._test_set_robotframework_vars("AUTH = []",
132 "AUTH = [u'admin', u'admin']")
134 def test_set_robotframework_vars_auth1(self):
135 self._test_set_robotframework_vars("AUTH1 = []", "AUTH1 = []")
137 @mock.patch('sys.stdout', new_callable=StringIO.StringIO)
138 def test_set_robotframework_vars_auth_foo(self, *args):
139 line = mock.MagicMock()
140 line.__iter__.return_value = ["AUTH = []"]
141 with mock.patch('fileinput.input', return_value=line) as mock_method:
142 self.assertTrue(self.test.set_robotframework_vars('foo', 'bar'))
143 mock_method.assert_called_once_with(
144 os.path.join(odl.ODLTests.odl_test_repo,
145 'csit/variables/Variables.py'), inplace=True)
146 self.assertEqual(args[0].getvalue(),
147 "AUTH = [u'{}', u'{}']\n".format('foo', 'bar'))
150 def _fake_url_for(cls, service_type='identity', **kwargs):
151 if service_type == 'identity':
152 return "http://{}:5000/v2.0".format(
153 ODLTesting._keystone_ip)
154 elif service_type == 'network':
155 return "http://{}:9696".format(ODLTesting._neutron_ip)
159 def _get_main_kwargs(self, key=None):
160 kwargs = {'odlusername': self._odl_username,
161 'odlpassword': self._odl_password,
162 'neutronip': self._neutron_ip,
163 'osauthurl': self._os_auth_url,
164 'osusername': self._os_username,
165 'ostenantname': self._os_tenantname,
166 'ospassword': self._os_password,
167 'odlip': self._sdn_controller_ip,
168 'odlwebport': self._odl_webport,
169 'odlrestconfport': self._odl_restconfport}
174 def _test_main(self, status, *args):
175 kwargs = self._get_main_kwargs()
176 self.assertEqual(self.test.main(**kwargs), status)
178 args[0].assert_called_once_with(
179 odl.ODLTests.res_dir)
181 variable = ['KEYSTONE:{}'.format(self._keystone_ip),
182 'NEUTRON:{}'.format(self._neutron_ip),
183 'OS_AUTH_URL:"{}"'.format(self._os_auth_url),
184 'OSUSERNAME:"{}"'.format(self._os_username),
185 'OSTENANTNAME:"{}"'.format(self._os_tenantname),
186 'OSPASSWORD:"{}"'.format(self._os_password),
187 'ODL_SYSTEM_IP:{}'.format(self._sdn_controller_ip),
188 'PORT:{}'.format(self._odl_webport),
189 'RESTCONFPORT:{}'.format(self._odl_restconfport)]
190 args[1].assert_called_once_with(
191 odl.ODLTests.basic_suite_dir,
192 odl.ODLTests.neutron_suite_dir,
194 output=os.path.join(odl.ODLTests.res_dir, 'output.xml'),
199 args[2].assert_called_with(
200 os.path.join(odl.ODLTests.res_dir, 'stdout.txt'))
202 def _test_main_missing_keyword(self, key):
203 kwargs = self._get_main_kwargs(key)
204 self.assertEqual(self.test.main(**kwargs),
205 testcase.TestCase.EX_RUN_ERROR)
207 def test_main_missing_odlusername(self):
208 self._test_main_missing_keyword('odlusername')
210 def test_main_missing_odlpassword(self):
211 self._test_main_missing_keyword('odlpassword')
213 def test_main_missing_neutronip(self):
214 self._test_main_missing_keyword('neutronip')
216 def test_main_missing_osauthurl(self):
217 self._test_main_missing_keyword('osauthurl')
219 def test_main_missing_osusername(self):
220 self._test_main_missing_keyword('osusername')
222 def test_main_missing_ostenantname(self):
223 self._test_main_missing_keyword('ostenantname')
225 def test_main_missing_ospassword(self):
226 self._test_main_missing_keyword('ospassword')
228 def test_main_missing_odlip(self):
229 self._test_main_missing_keyword('odlip')
231 def test_main_missing_odlwebport(self):
232 self._test_main_missing_keyword('odlwebport')
234 def test_main_missing_odlrestconfport(self):
235 self._test_main_missing_keyword('odlrestconfport')
237 def test_main_set_robotframework_vars_failed(self):
238 with mock.patch.object(self.test, 'set_robotframework_vars',
240 self._test_main(testcase.TestCase.EX_RUN_ERROR)
241 self.test.set_robotframework_vars.assert_called_once_with(
242 self._odl_username, self._odl_password)
244 @mock.patch('os.makedirs', side_effect=Exception)
245 def test_main_makedirs_exception(self, mock_method):
246 with mock.patch.object(self.test, 'set_robotframework_vars',
247 return_value=True), \
248 self.assertRaises(Exception):
249 self._test_main(testcase.TestCase.EX_RUN_ERROR,
252 @mock.patch('os.makedirs', side_effect=OSError)
253 def test_main_makedirs_oserror(self, mock_method):
254 with mock.patch.object(self.test, 'set_robotframework_vars',
256 self._test_main(testcase.TestCase.EX_RUN_ERROR,
259 @mock.patch('robot.run', side_effect=RobotError)
260 @mock.patch('os.makedirs')
261 def test_main_robot_run_failed(self, *args):
262 with mock.patch.object(self.test, 'set_robotframework_vars',
263 return_value=True), \
264 mock.patch.object(odl, 'open', mock.mock_open(),
266 self.assertRaises(RobotError):
267 self._test_main(testcase.TestCase.EX_RUN_ERROR, *args)
269 @mock.patch('robot.run')
270 @mock.patch('os.makedirs')
271 def test_main_parse_results_failed(self, *args):
272 with mock.patch.object(self.test, 'set_robotframework_vars',
273 return_value=True), \
274 mock.patch.object(odl, 'open', mock.mock_open(),
276 mock.patch.object(self.test, 'parse_results',
277 side_effect=RobotError):
278 self._test_main(testcase.TestCase.EX_RUN_ERROR, *args)
280 @mock.patch('os.remove', side_effect=Exception)
281 @mock.patch('robot.run')
282 @mock.patch('os.makedirs')
283 def test_main_remove_exception(self, *args):
284 with mock.patch.object(self.test, 'set_robotframework_vars',
285 return_value=True), \
286 mock.patch.object(self.test, 'parse_results'), \
287 self.assertRaises(Exception):
288 self._test_main(testcase.TestCase.EX_OK, *args)
290 @mock.patch('os.remove')
291 @mock.patch('robot.run')
292 @mock.patch('os.makedirs')
293 def test_main(self, *args):
294 with mock.patch.object(self.test, 'set_robotframework_vars',
295 return_value=True), \
296 mock.patch.object(odl, 'open', mock.mock_open(),
298 mock.patch.object(self.test, 'parse_results'):
299 self._test_main(testcase.TestCase.EX_OK, *args)
301 @mock.patch('os.remove')
302 @mock.patch('robot.run')
303 @mock.patch('os.makedirs', side_effect=OSError(errno.EEXIST, ''))
304 def test_main_makedirs_oserror17(self, *args):
305 with mock.patch.object(self.test, 'set_robotframework_vars',
306 return_value=True), \
307 mock.patch.object(odl, 'open', mock.mock_open(),
309 mock.patch.object(self.test, 'parse_results'):
310 self._test_main(testcase.TestCase.EX_OK, *args)
312 @mock.patch('os.remove')
313 @mock.patch('robot.run', return_value=1)
314 @mock.patch('os.makedirs')
315 def test_main_testcases_in_failure(self, *args):
316 with mock.patch.object(self.test, 'set_robotframework_vars',
317 return_value=True), \
318 mock.patch.object(odl, 'open', mock.mock_open(),
320 mock.patch.object(self.test, 'parse_results'):
321 self._test_main(testcase.TestCase.EX_OK, *args)
323 @mock.patch('os.remove', side_effect=OSError)
324 @mock.patch('robot.run')
325 @mock.patch('os.makedirs')
326 def test_main_remove_oserror(self, *args):
327 with mock.patch.object(self.test, 'set_robotframework_vars',
328 return_value=True), \
329 mock.patch.object(odl, 'open', mock.mock_open(),
331 mock.patch.object(self.test, 'parse_results'):
332 self._test_main(testcase.TestCase.EX_OK, *args)
334 def _test_run_missing_env_var(self, var):
335 with mock.patch('functest.utils.openstack_utils.get_endpoint',
336 side_effect=self._fake_url_for):
338 self.assertEqual(self.test.run(),
339 testcase.TestCase.EX_RUN_ERROR)
341 def _test_run(self, status=testcase.TestCase.EX_OK,
342 exception=None, odlip="127.0.0.3", odlwebport="8080",
343 odlrestconfport="8181"):
344 with mock.patch('functest.utils.openstack_utils.get_endpoint',
345 side_effect=self._fake_url_for):
347 self.test.main = mock.Mock(side_effect=exception)
349 self.test.main = mock.Mock(return_value=status)
350 self.assertEqual(self.test.run(), status)
351 self.test.main.assert_called_once_with(
352 odl.ODLTests.default_suites,
353 neutronip=self._neutron_ip,
354 odlip=odlip, odlpassword=self._odl_password,
355 odlrestconfport=odlrestconfport,
356 odlusername=self._odl_username, odlwebport=odlwebport,
357 osauthurl=self._os_auth_url,
358 ospassword=self._os_password, ostenantname=self._os_tenantname,
359 osusername=self._os_username)
361 def _test_run_defining_multiple_suites(
363 status=testcase.TestCase.EX_OK,
364 exception=None, odlip="127.0.0.3", odlwebport="8080",
365 odlrestconfport="8181"):
366 with mock.patch('functest.utils.openstack_utils.get_endpoint',
367 side_effect=self._fake_url_for):
369 self.test.main = mock.Mock(side_effect=exception)
371 self.test.main = mock.Mock(return_value=status)
372 self.assertEqual(self.test.run(suites=suites), status)
373 self.test.main.assert_called_once_with(
375 neutronip=self._neutron_ip,
376 odlip=odlip, odlpassword=self._odl_password,
377 odlrestconfport=odlrestconfport,
378 odlusername=self._odl_username, odlwebport=odlwebport,
379 osauthurl=self._os_auth_url,
380 ospassword=self._os_password, ostenantname=self._os_tenantname,
381 osusername=self._os_username)
383 def test_run_exception(self):
384 with mock.patch('functest.utils.openstack_utils.get_endpoint',
385 side_effect=auth_plugins.MissingAuthPlugin()):
386 self.assertEqual(self.test.run(),
387 testcase.TestCase.EX_RUN_ERROR)
389 def test_run_missing_os_auth_url(self):
390 self._test_run_missing_env_var("OS_AUTH_URL")
392 def test_run_missing_os_username(self):
393 self._test_run_missing_env_var("OS_USERNAME")
395 def test_run_missing_os_password(self):
396 self._test_run_missing_env_var("OS_PASSWORD")
398 def test_run_missing_os_tenant_name(self):
399 self._test_run_missing_env_var("OS_TENANT_NAME")
401 def test_run_main_false(self):
402 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
403 self._test_run(testcase.TestCase.EX_RUN_ERROR,
404 odlip=self._sdn_controller_ip,
405 odlwebport=self._odl_webport)
407 def test_run_main_exception(self):
408 with self.assertRaises(Exception):
409 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
410 self._test_run(status=testcase.TestCase.EX_RUN_ERROR,
411 exception=Exception(),
412 odlip=self._sdn_controller_ip,
413 odlwebport=self._odl_webport)
415 def test_run_missing_sdn_controller_ip(self):
416 with mock.patch('functest.utils.openstack_utils.get_endpoint',
417 side_effect=self._fake_url_for):
418 self.assertEqual(self.test.run(),
419 testcase.TestCase.EX_RUN_ERROR)
421 def test_run_without_installer_type(self):
422 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
423 self._test_run(testcase.TestCase.EX_OK,
424 odlip=self._sdn_controller_ip,
425 odlwebport=self._odl_webport)
427 def test_run_redefining_suites(self):
428 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
429 self._test_run_defining_multiple_suites(
430 [odl.ODLTests.basic_suite_dir],
431 testcase.TestCase.EX_OK,
432 odlip=self._sdn_controller_ip,
433 odlwebport=self._odl_webport)
435 def test_run_fuel(self):
436 os.environ["INSTALLER_TYPE"] = "fuel"
437 self._test_run(testcase.TestCase.EX_OK,
438 odlip=self._neutron_ip, odlwebport='8282')
440 def test_run_apex_missing_sdn_controller_ip(self):
441 with mock.patch('functest.utils.openstack_utils.get_endpoint',
442 side_effect=self._fake_url_for):
443 os.environ["INSTALLER_TYPE"] = "apex"
444 self.assertEqual(self.test.run(),
445 testcase.TestCase.EX_RUN_ERROR)
447 def test_run_apex(self):
448 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
449 os.environ["INSTALLER_TYPE"] = "apex"
450 self._test_run(testcase.TestCase.EX_OK,
451 odlip=self._sdn_controller_ip, odlwebport='8081',
452 odlrestconfport='8081')
454 def test_run_netvirt_missing_sdn_controller_ip(self):
455 with mock.patch('functest.utils.openstack_utils.get_endpoint',
456 side_effect=self._fake_url_for):
457 os.environ["INSTALLER_TYPE"] = "netvirt"
458 self.assertEqual(self.test.run(),
459 testcase.TestCase.EX_RUN_ERROR)
461 def test_run_netvirt(self):
462 os.environ["SDN_CONTROLLER_IP"] = self._sdn_controller_ip
463 os.environ["INSTALLER_TYPE"] = "netvirt"
464 self._test_run(testcase.TestCase.EX_OK,
465 odlip=self._sdn_controller_ip, odlwebport='8081',
466 odlrestconfport='8081')
468 def test_run_joid_missing_sdn_controller(self):
469 with mock.patch('functest.utils.openstack_utils.get_endpoint',
470 side_effect=self._fake_url_for):
471 os.environ["INSTALLER_TYPE"] = "joid"
472 self.assertEqual(self.test.run(),
473 testcase.TestCase.EX_RUN_ERROR)
475 def test_run_joid(self):
476 os.environ["SDN_CONTROLLER"] = self._sdn_controller_ip
477 os.environ["INSTALLER_TYPE"] = "joid"
478 self._test_run(testcase.TestCase.EX_OK,
479 odlip=self._sdn_controller_ip, odlwebport='8080')
481 def test_run_compass(self, *args):
482 os.environ["INSTALLER_TYPE"] = "compass"
483 self._test_run(testcase.TestCase.EX_OK,
484 odlip=self._neutron_ip, odlwebport='8181')
486 def test_argparser_default(self):
487 parser = odl.ODLParser()
488 self.assertEqual(parser.parse_args(), self.defaultargs)
490 def test_argparser_basic(self):
491 self.defaultargs['neutronip'] = self._neutron_ip
492 self.defaultargs['odlip'] = self._sdn_controller_ip
493 parser = odl.ODLParser()
494 self.assertEqual(parser.parse_args(
495 ["--neutronip={}".format(self._neutron_ip),
496 "--odlip={}".format(self._sdn_controller_ip)
497 ]), self.defaultargs)
499 @mock.patch('sys.stderr', new_callable=StringIO.StringIO)
500 def test_argparser_fail(self, *args):
501 self.defaultargs['foo'] = 'bar'
502 parser = odl.ODLParser()
503 with self.assertRaises(SystemExit):
504 parser.parse_args(["--foo=bar"])
506 def _test_argparser(self, arg, value):
507 self.defaultargs[arg] = value
508 parser = odl.ODLParser()
509 self.assertEqual(parser.parse_args(["--{}={}".format(arg, value)]),
512 def test_argparser_odlusername(self):
513 self._test_argparser('odlusername', 'foo')
515 def test_argparser_odlpassword(self):
516 self._test_argparser('odlpassword', 'foo')
518 def test_argparser_osauthurl(self):
519 self._test_argparser('osauthurl', 'http://127.0.0.4:5000/v2')
521 def test_argparser_neutronip(self):
522 self._test_argparser('neutronip', '127.0.0.4')
524 def test_argparser_osusername(self):
525 self._test_argparser('osusername', 'foo')
527 def test_argparser_ostenantname(self):
528 self._test_argparser('ostenantname', 'foo')
530 def test_argparser_ospassword(self):
531 self._test_argparser('ospassword', 'foo')
533 def test_argparser_odlip(self):
534 self._test_argparser('odlip', '127.0.0.4')
536 def test_argparser_odlwebport(self):
537 self._test_argparser('odlwebport', '80')
539 def test_argparser_odlrestconfport(self):
540 self._test_argparser('odlrestconfport', '80')
542 def test_argparser_pushtodb(self):
543 self.defaultargs['pushtodb'] = True
544 parser = odl.ODLParser()
545 self.assertEqual(parser.parse_args(["--{}".format('pushtodb')]),
548 def test_argparser_multiple_args(self):
549 self.defaultargs['neutronip'] = self._neutron_ip
550 self.defaultargs['odlip'] = self._sdn_controller_ip
551 parser = odl.ODLParser()
552 self.assertEqual(parser.parse_args(
553 ["--neutronip={}".format(self._neutron_ip),
554 "--odlip={}".format(self._sdn_controller_ip)
555 ]), self.defaultargs)
558 if __name__ == "__main__":
559 unittest.main(verbosity=2)