9bc04ebf45b2eedf9fe4daea45b37dc44a9591b0
[yardstick.git] / yardstick / tests / unit / benchmark / scenarios / availability / test_basemonitor.py
1 ##############################################################################
2 # Copyright (c) 2015 Huawei Technologies Co.,Ltd and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import mock
11 import unittest
12
13 from yardstick.benchmark.scenarios.availability.monitor import basemonitor
14
15
16 class MonitorMgrTestCase(unittest.TestCase):
17
18     def setUp(self):
19         self.monitor_configs = [
20             {
21                 "monitor_type": "openstack-cmd",
22                 "command_name": "openstack router list",
23                 "monitor_time": 10,
24                 "monitor_number": 3,
25                 "sla": {
26                     "max_outage_time": 5
27                 }
28             },
29             {
30                 "monitor_type": "process",
31                 "process_name": "neutron-server",
32                 "host": "node1",
33                 "monitor_time": 20,
34                 "monitor_number": 3,
35                 "sla": {
36                     "max_recover_time": 20
37                 }
38             }
39         ]
40         self.MonitorMgr = basemonitor.MonitorMgr([])
41         self.MonitorMgr.init_monitors(self.monitor_configs, None)
42         self.monitor_list = self.MonitorMgr._monitor_list
43         for mo in self.monitor_list:
44             mo._result = {"outage_time": 10}
45
46     @mock.patch.object(basemonitor, 'BaseMonitor')
47     def test__MonitorMgr_setup_successful(self, *args):
48         instance = basemonitor.MonitorMgr({"nova-api": 10})
49         instance.init_monitors(self.monitor_configs, None)
50         instance.start_monitors()
51         instance.wait_monitors()
52
53         # TODO(elfoley): Check the return value
54         ret = instance.verify_SLA()  # pylint: disable=unused-variable
55
56     @mock.patch.object(basemonitor, 'BaseMonitor')
57     def test_MonitorMgr_getitem(self, *args):
58         monitorMgr = basemonitor.MonitorMgr({"nova-api": 10})
59         monitorMgr.init_monitors(self.monitor_configs, None)
60
61     @mock.patch.object(basemonitor, 'BaseMonitor')
62     def test_store_result(self, *args):
63         expect = {'process_neutron-server_outage_time': 10,
64                   'openstack-router-list_outage_time': 10}
65         result = {}
66         self.MonitorMgr.store_result(result)
67         self.assertDictEqual(result, expect)
68
69
70 class BaseMonitorTestCase(unittest.TestCase):
71
72     class MonitorSimple(basemonitor.BaseMonitor):
73         __monitor_type__ = "MonitorForTest"
74
75         def setup(self):
76             self.monitor_result = False
77
78         def monitor_func(self):
79             return self.monitor_result
80
81     def setUp(self):
82         self.monitor_cfg = {
83             'monitor_type': 'MonitorForTest',
84             'command_name': 'nova image-list',
85             'monitor_time': 0.01,
86             'sla': {'max_outage_time': 5}
87         }
88
89     def test__basemonitor_start_wait_successful(self):
90         ins = basemonitor.BaseMonitor(self.monitor_cfg, None, {"nova-api": 10})
91         ins.start_monitor()
92         ins.wait_monitor()
93
94     def test__basemonitor_all_successful(self):
95         ins = self.MonitorSimple(self.monitor_cfg, None, {"nova-api": 10})
96         ins.setup()
97         ins.run()
98         ins.verify_SLA()
99
100     @mock.patch.object(basemonitor, 'multiprocessing')
101     def test__basemonitor_func_false(self, mock_multiprocess):
102         ins = self.MonitorSimple(self.monitor_cfg, None, {"nova-api": 10})
103         ins.setup()
104         mock_multiprocess.Event().is_set.return_value = False
105         ins.run()
106         ins.verify_SLA()
107
108     # TODO(elfoley): fix this test to not throw an error
109     def test__basemonitor_getmonitorcls_successfule(self):
110         cls = None
111         try:
112             cls = basemonitor.BaseMonitor.get_monitor_cls(self.monitor_cfg)
113         except Exception:  # pylint: disable=broad-except
114             pass
115         self.assertIsNone(cls)
116
117
118 if __name__ == "__main__":
119     unittest.main()