Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / mgr / zabbix / module.py
1 """
2 Zabbix module for ceph-mgr
3
4 Collect statistics from Ceph cluster and every X seconds send data to a Zabbix
5 server using the zabbix_sender executable.
6 """
7 import json
8 import errno
9 from subprocess import Popen, PIPE
10 from threading import Event
11 from mgr_module import MgrModule
12
13
14 def avg(data):
15     if len(data):
16         return sum(data) / float(len(data))
17     else:
18         return 0
19
20
21 class ZabbixSender(object):
22     def __init__(self, sender, host, port, log):
23         self.sender = sender
24         self.host = host
25         self.port = port
26         self.log = log
27
28     def send(self, hostname, data):
29         if len(data) == 0:
30             return
31
32         cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s',
33                hostname, '-vv', '-i', '-']
34
35         proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)
36
37         for key, value in data.items():
38             proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value))
39
40         stdout, stderr = proc.communicate()
41         if proc.returncode != 0:
42             raise RuntimeError('%s exited non-zero: %s' % (self.sender,
43                                                            stderr))
44
45         self.log.debug('Zabbix Sender: %s', stdout.rstrip())
46
47
48 class Module(MgrModule):
49     run = False
50     config = dict()
51     ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2}
52
53     config_keys = {
54         'zabbix_sender': '/usr/bin/zabbix_sender',
55         'zabbix_host': None,
56         'zabbix_port': 10051,
57         'identifier': None, 'interval': 60
58     }
59
60     COMMANDS = [
61         {
62             "cmd": "zabbix config-set name=key,type=CephString "
63                    "name=value,type=CephString",
64             "desc": "Set a configuration value",
65             "perm": "rw"
66         },
67         {
68             "cmd": "zabbix config-show",
69             "desc": "Show current configuration",
70             "perm": "r"
71         },
72         {
73             "cmd": "zabbix send",
74             "desc": "Force sending data to Zabbux",
75             "perm": "rw"
76         },
77         {
78             "cmd": "zabbix self-test",
79             "desc": "Run a self-test on the Zabbix module",
80             "perm": "r"
81         }
82     ]
83
84     def __init__(self, *args, **kwargs):
85         super(Module, self).__init__(*args, **kwargs)
86         self.event = Event()
87
88     def init_module_config(self):
89         for key, default in self.config_keys.items():
90             value = self.get_localized_config(key, default)
91             if value is None:
92                 raise RuntimeError('Configuration key {0} not set; "ceph '
93                                    'config-key set mgr/zabbix/{0} '
94                                    '<value>"'.format(key))
95
96             self.set_config_option(key, value)
97
98     def set_config_option(self, option, value):
99         if option not in self.config_keys.keys():
100             raise RuntimeError('{0} is a unknown configuration '
101                                'option'.format(option))
102
103         if option in ['zabbix_port', 'interval']:
104             try:
105                 value = int(value)
106             except (ValueError, TypeError):
107                 raise RuntimeError('invalid {0} configured. Please specify '
108                                    'a valid integer'.format(option))
109
110         if option == 'interval' and value < 10:
111             raise RuntimeError('interval should be set to at least 10 seconds')
112
113         self.config[option] = value
114
115     def get_data(self):
116         data = dict()
117
118         health = json.loads(self.get('health')['json'])
119         # 'status' is luminous+, 'overall_status' is legacy mode.
120         data['overall_status'] = health.get('status',
121                                             health.get('overall_status'))
122         data['overall_status_int'] = \
123             self.ceph_health_mapping.get(data['overall_status'])
124
125         mon_status = json.loads(self.get('mon_status')['json'])
126         data['num_mon'] = len(mon_status['monmap']['mons'])
127
128         df = self.get('df')
129         data['num_pools'] = len(df['pools'])
130         data['total_objects'] = df['stats']['total_objects']
131         data['total_used_bytes'] = df['stats']['total_used_bytes']
132         data['total_bytes'] = df['stats']['total_bytes']
133         data['total_avail_bytes'] = df['stats']['total_avail_bytes']
134
135         wr_ops = 0
136         rd_ops = 0
137         wr_bytes = 0
138         rd_bytes = 0
139
140         for pool in df['pools']:
141             wr_ops += pool['stats']['wr']
142             rd_ops += pool['stats']['rd']
143             wr_bytes += pool['stats']['wr_bytes']
144             rd_bytes += pool['stats']['rd_bytes']
145
146         data['wr_ops'] = wr_ops
147         data['rd_ops'] = rd_ops
148         data['wr_bytes'] = wr_bytes
149         data['rd_bytes'] = rd_bytes
150
151         osd_map = self.get('osd_map')
152         data['num_osd'] = len(osd_map['osds'])
153         data['osd_nearfull_ratio'] = osd_map['nearfull_ratio']
154         data['osd_full_ratio'] = osd_map['full_ratio']
155         data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio']
156
157         data['num_pg_temp'] = len(osd_map['pg_temp'])
158
159         num_up = 0
160         num_in = 0
161         for osd in osd_map['osds']:
162             if osd['up'] == 1:
163                 num_up += 1
164
165             if osd['in'] == 1:
166                 num_in += 1
167
168         data['num_osd_up'] = num_up
169         data['num_osd_in'] = num_in
170
171         osd_fill = list()
172         osd_apply_latency = list()
173         osd_commit_latency = list()
174
175         osd_stats = self.get('osd_stats')
176         for osd in osd_stats['osd_stats']:
177             if osd['kb'] == 0:
178                 continue
179             osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100)
180             osd_apply_latency.append(osd['perf_stat']['apply_latency_ms'])
181             osd_commit_latency.append(osd['perf_stat']['commit_latency_ms'])
182
183         try:
184             data['osd_max_fill'] = max(osd_fill)
185             data['osd_min_fill'] = min(osd_fill)
186             data['osd_avg_fill'] = avg(osd_fill)
187         except ValueError:
188             pass
189
190         try:
191             data['osd_latency_apply_max'] = max(osd_apply_latency)
192             data['osd_latency_apply_min'] = min(osd_apply_latency)
193             data['osd_latency_apply_avg'] = avg(osd_apply_latency)
194
195             data['osd_latency_commit_max'] = max(osd_commit_latency)
196             data['osd_latency_commit_min'] = min(osd_commit_latency)
197             data['osd_latency_commit_avg'] = avg(osd_commit_latency)
198         except ValueError:
199             pass
200
201         pg_summary = self.get('pg_summary')
202         num_pg = 0
203         for state, num in pg_summary['all'].items():
204             num_pg += num
205
206         data['num_pg'] = num_pg
207
208         return data
209
210     def send(self):
211         data = self.get_data()
212
213         self.log.debug('Sending data to Zabbix server %s',
214                        self.config['zabbix_host'])
215         self.log.debug(data)
216
217         try:
218             zabbix = ZabbixSender(self.config['zabbix_sender'],
219                                   self.config['zabbix_host'],
220                                   self.config['zabbix_port'], self.log)
221             zabbix.send(self.config['identifier'], data)
222         except Exception as exc:
223             self.log.error('Exception when sending: %s', exc)
224
225     def handle_command(self, command):
226         if command['prefix'] == 'zabbix config-show':
227             return 0, json.dumps(self.config), ''
228         elif command['prefix'] == 'zabbix config-set':
229             key = command['key']
230             value = command['value']
231             if not value:
232                 return -errno.EINVAL, '', 'Value should not be empty or None'
233
234             self.log.debug('Setting configuration option %s to %s', key, value)
235             self.set_config_option(key, value)
236             self.set_localized_config(key, value)
237             return 0, 'Configuration option {0} updated'.format(key), ''
238         elif command['prefix'] == 'zabbix send':
239             self.send()
240             return 0, 'Sending data to Zabbix', ''
241         elif command['prefix'] == 'zabbix self-test':
242             self.self_test()
243             return 0, 'Self-test succeeded', ''
244         else:
245             return (-errno.EINVAL, '',
246                     "Command not found '{0}'".format(command['prefix']))
247
248     def shutdown(self):
249         self.log.info('Stopping zabbix')
250         self.run = False
251         self.event.set()
252
253     def serve(self):
254         self.log.debug('Zabbix module starting up')
255         self.run = True
256
257         self.init_module_config()
258
259         for key, value in self.config.items():
260             self.log.debug('%s: %s', key, value)
261
262         while self.run:
263             self.log.debug('Waking up for new iteration')
264
265             try:
266                 self.send()
267             except Exception as exc:
268                 # Shouldn't happen, but let's log it and retry next interval,
269                 # rather than dying completely.
270                 self.log.exception("Unexpected error during send():")
271
272             interval = self.config['interval']
273             self.log.debug('Sleeping for %d seconds', interval)
274             self.event.wait(interval)
275
276     def self_test(self):
277         data = self.get_data()
278
279         if data['overall_status'] not in self.ceph_health_mapping:
280             raise RuntimeError('No valid overall_status found in data')
281
282         int(data['overall_status_int'])
283
284         if data['num_mon'] < 1:
285             raise RuntimeError('num_mon is smaller than 1')