X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Fzabbix%2Fmodule.py;fp=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Fzabbix%2Fmodule.py;h=5bb871afe58d41db6906288b5e5229ff592384fa;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/pybind/mgr/zabbix/module.py b/src/ceph/src/pybind/mgr/zabbix/module.py new file mode 100644 index 0000000..5bb871a --- /dev/null +++ b/src/ceph/src/pybind/mgr/zabbix/module.py @@ -0,0 +1,285 @@ +""" +Zabbix module for ceph-mgr + +Collect statistics from Ceph cluster and every X seconds send data to a Zabbix +server using the zabbix_sender executable. +""" +import json +import errno +from subprocess import Popen, PIPE +from threading import Event +from mgr_module import MgrModule + + +def avg(data): + if len(data): + return sum(data) / float(len(data)) + else: + return 0 + + +class ZabbixSender(object): + def __init__(self, sender, host, port, log): + self.sender = sender + self.host = host + self.port = port + self.log = log + + def send(self, hostname, data): + if len(data) == 0: + return + + cmd = [self.sender, '-z', self.host, '-p', str(self.port), '-s', + hostname, '-vv', '-i', '-'] + + proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE) + + for key, value in data.items(): + proc.stdin.write('{0} ceph.{1} {2}\n'.format(hostname, key, value)) + + stdout, stderr = proc.communicate() + if proc.returncode != 0: + raise RuntimeError('%s exited non-zero: %s' % (self.sender, + stderr)) + + self.log.debug('Zabbix Sender: %s', stdout.rstrip()) + + +class Module(MgrModule): + run = False + config = dict() + ceph_health_mapping = {'HEALTH_OK': 0, 'HEALTH_WARN': 1, 'HEALTH_ERR': 2} + + config_keys = { + 'zabbix_sender': '/usr/bin/zabbix_sender', + 'zabbix_host': None, + 'zabbix_port': 10051, + 'identifier': None, 'interval': 60 + } + + COMMANDS = [ + { + "cmd": "zabbix config-set name=key,type=CephString " + "name=value,type=CephString", + "desc": "Set a configuration value", + "perm": "rw" + }, + { + "cmd": "zabbix config-show", + "desc": "Show current configuration", + "perm": "r" + }, + { + "cmd": "zabbix send", + "desc": "Force sending data to Zabbux", + "perm": "rw" + }, + { + "cmd": "zabbix self-test", + "desc": "Run a self-test on the Zabbix module", + "perm": "r" + } + ] + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + + def init_module_config(self): + for key, default in self.config_keys.items(): + value = self.get_localized_config(key, default) + if value is None: + raise RuntimeError('Configuration key {0} not set; "ceph ' + 'config-key set mgr/zabbix/{0} ' + '"'.format(key)) + + self.set_config_option(key, value) + + def set_config_option(self, option, value): + if option not in self.config_keys.keys(): + raise RuntimeError('{0} is a unknown configuration ' + 'option'.format(option)) + + if option in ['zabbix_port', 'interval']: + try: + value = int(value) + except (ValueError, TypeError): + raise RuntimeError('invalid {0} configured. Please specify ' + 'a valid integer'.format(option)) + + if option == 'interval' and value < 10: + raise RuntimeError('interval should be set to at least 10 seconds') + + self.config[option] = value + + def get_data(self): + data = dict() + + health = json.loads(self.get('health')['json']) + # 'status' is luminous+, 'overall_status' is legacy mode. + data['overall_status'] = health.get('status', + health.get('overall_status')) + data['overall_status_int'] = \ + self.ceph_health_mapping.get(data['overall_status']) + + mon_status = json.loads(self.get('mon_status')['json']) + data['num_mon'] = len(mon_status['monmap']['mons']) + + df = self.get('df') + data['num_pools'] = len(df['pools']) + data['total_objects'] = df['stats']['total_objects'] + data['total_used_bytes'] = df['stats']['total_used_bytes'] + data['total_bytes'] = df['stats']['total_bytes'] + data['total_avail_bytes'] = df['stats']['total_avail_bytes'] + + wr_ops = 0 + rd_ops = 0 + wr_bytes = 0 + rd_bytes = 0 + + for pool in df['pools']: + wr_ops += pool['stats']['wr'] + rd_ops += pool['stats']['rd'] + wr_bytes += pool['stats']['wr_bytes'] + rd_bytes += pool['stats']['rd_bytes'] + + data['wr_ops'] = wr_ops + data['rd_ops'] = rd_ops + data['wr_bytes'] = wr_bytes + data['rd_bytes'] = rd_bytes + + osd_map = self.get('osd_map') + data['num_osd'] = len(osd_map['osds']) + data['osd_nearfull_ratio'] = osd_map['nearfull_ratio'] + data['osd_full_ratio'] = osd_map['full_ratio'] + data['osd_backfillfull_ratio'] = osd_map['backfillfull_ratio'] + + data['num_pg_temp'] = len(osd_map['pg_temp']) + + num_up = 0 + num_in = 0 + for osd in osd_map['osds']: + if osd['up'] == 1: + num_up += 1 + + if osd['in'] == 1: + num_in += 1 + + data['num_osd_up'] = num_up + data['num_osd_in'] = num_in + + osd_fill = list() + osd_apply_latency = list() + osd_commit_latency = list() + + osd_stats = self.get('osd_stats') + for osd in osd_stats['osd_stats']: + if osd['kb'] == 0: + continue + osd_fill.append((float(osd['kb_used']) / float(osd['kb'])) * 100) + osd_apply_latency.append(osd['perf_stat']['apply_latency_ms']) + osd_commit_latency.append(osd['perf_stat']['commit_latency_ms']) + + try: + data['osd_max_fill'] = max(osd_fill) + data['osd_min_fill'] = min(osd_fill) + data['osd_avg_fill'] = avg(osd_fill) + except ValueError: + pass + + try: + data['osd_latency_apply_max'] = max(osd_apply_latency) + data['osd_latency_apply_min'] = min(osd_apply_latency) + data['osd_latency_apply_avg'] = avg(osd_apply_latency) + + data['osd_latency_commit_max'] = max(osd_commit_latency) + data['osd_latency_commit_min'] = min(osd_commit_latency) + data['osd_latency_commit_avg'] = avg(osd_commit_latency) + except ValueError: + pass + + pg_summary = self.get('pg_summary') + num_pg = 0 + for state, num in pg_summary['all'].items(): + num_pg += num + + data['num_pg'] = num_pg + + return data + + def send(self): + data = self.get_data() + + self.log.debug('Sending data to Zabbix server %s', + self.config['zabbix_host']) + self.log.debug(data) + + try: + zabbix = ZabbixSender(self.config['zabbix_sender'], + self.config['zabbix_host'], + self.config['zabbix_port'], self.log) + zabbix.send(self.config['identifier'], data) + except Exception as exc: + self.log.error('Exception when sending: %s', exc) + + def handle_command(self, command): + if command['prefix'] == 'zabbix config-show': + return 0, json.dumps(self.config), '' + elif command['prefix'] == 'zabbix config-set': + key = command['key'] + value = command['value'] + if not value: + return -errno.EINVAL, '', 'Value should not be empty or None' + + self.log.debug('Setting configuration option %s to %s', key, value) + self.set_config_option(key, value) + self.set_localized_config(key, value) + return 0, 'Configuration option {0} updated'.format(key), '' + elif command['prefix'] == 'zabbix send': + self.send() + return 0, 'Sending data to Zabbix', '' + elif command['prefix'] == 'zabbix self-test': + self.self_test() + return 0, 'Self-test succeeded', '' + else: + return (-errno.EINVAL, '', + "Command not found '{0}'".format(command['prefix'])) + + def shutdown(self): + self.log.info('Stopping zabbix') + self.run = False + self.event.set() + + def serve(self): + self.log.debug('Zabbix module starting up') + self.run = True + + self.init_module_config() + + for key, value in self.config.items(): + self.log.debug('%s: %s', key, value) + + while self.run: + self.log.debug('Waking up for new iteration') + + try: + self.send() + except Exception as exc: + # Shouldn't happen, but let's log it and retry next interval, + # rather than dying completely. + self.log.exception("Unexpected error during send():") + + interval = self.config['interval'] + self.log.debug('Sleeping for %d seconds', interval) + self.event.wait(interval) + + def self_test(self): + data = self.get_data() + + if data['overall_status'] not in self.ceph_health_mapping: + raise RuntimeError('No valid overall_status found in data') + + int(data['overall_status_int']) + + if data['num_mon'] < 1: + raise RuntimeError('num_mon is smaller than 1')