X-Git-Url: https://gerrit.opnfv.org/gerrit/gitweb?a=blobdiff_plain;f=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Finflux%2Fmodule.py;fp=src%2Fceph%2Fsrc%2Fpybind%2Fmgr%2Finflux%2Fmodule.py;h=adeb452701d81394df5fc2a5e540de4a98f7d0d5;hb=812ff6ca9fcd3e629e49d4328905f33eee8ca3f5;hp=0000000000000000000000000000000000000000;hpb=15280273faafb77777eab341909a3f495cf248d9;p=stor4nfv.git diff --git a/src/ceph/src/pybind/mgr/influx/module.py b/src/ceph/src/pybind/mgr/influx/module.py new file mode 100644 index 0000000..adeb452 --- /dev/null +++ b/src/ceph/src/pybind/mgr/influx/module.py @@ -0,0 +1,162 @@ + +from datetime import datetime +from threading import Event +import json +import errno + +from mgr_module import MgrModule + +try: + from influxdb import InfluxDBClient + from influxdb.exceptions import InfluxDBClientError +except ImportError: + InfluxDBClient = None + +class Module(MgrModule): + COMMANDS = [ + { + "cmd": "influx self-test", + "desc": "debug the module", + "perm": "rw" + }, + ] + + + def __init__(self, *args, **kwargs): + super(Module, self).__init__(*args, **kwargs) + self.event = Event() + self.run = True + + + def get_latest(self, daemon_type, daemon_name, stat): + data = self.get_counter(daemon_type, daemon_name, stat)[stat] + if data: + return data[-1][1] + else: + return 0 + + + def get_df_stats(self): + df = self.get("df") + data = [] + + df_types = [ + 'bytes_used', + 'dirty', + 'rd_bytes', + 'raw_bytes_used', + 'wr_bytes', + 'objects', + 'max_avail' + ] + + for df_type in df_types: + for pool in df['pools']: + point = { + "measurement": "ceph_pool_stats", + "tags": { + "pool_name" : pool['name'], + "pool_id" : pool['id'], + "type_instance" : df_type, + "mgr_id" : self.get_mgr_id(), + }, + "time" : datetime.utcnow().isoformat() + 'Z', + "fields": { + "value" : pool['stats'][df_type], + } + } + data.append(point) + return data + + def get_daemon_stats(self): + data = [] + + for daemon, counters in self.get_all_perf_counters().iteritems(): + svc_type, svc_id = daemon.split(".") + metadata = self.get_metadata(svc_type, svc_id) + + for path, counter_info in counters.items(): + if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM: + continue + + value = counter_info['value'] + + data.append({ + "measurement": "ceph_daemon_stats", + "tags": { + "ceph_daemon": daemon, + "type_instance": path, + "host": metadata['hostname'] + }, + "time": datetime.utcnow().isoformat() + 'Z', + "fields": { + "value": value + } + }) + + return data + + def send_to_influx(self): + host = self.get_config("hostname") + if not host: + self.log.error("No InfluxDB server configured, please set" + "`hostname` configuration key.") + return + + port = int(self.get_config("port", default="8086")) + database = self.get_config("database", default="ceph") + + # If influx server has authentication turned off then + # missing username/password is valid. + username = self.get_config("username", default="") + password = self.get_config("password", default="") + + client = InfluxDBClient(host, port, username, password, database) + + # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created + try: + client.write_points(self.get_df_stats(), 'ms') + client.write_points(self.get_daemon_stats(), 'ms') + except InfluxDBClientError as e: + if e.code == 404: + self.log.info("Database '{0}' not found, trying to create (requires admin privs). You can also create manually and grant write privs to user '{1}'".format(database,username)) + client.create_database(database) + else: + raise + + def shutdown(self): + self.log.info('Stopping influx module') + self.run = False + self.event.set() + + def handle_command(self, cmd): + if cmd['prefix'] == 'influx self-test': + daemon_stats = self.get_daemon_stats() + assert len(daemon_stats) + df_stats = self.get_df_stats() + result = { + 'daemon_stats': daemon_stats, + 'df_stats': df_stats + } + return 0, json.dumps(result, indent=2), 'Self-test OK' + else: + return (-errno.EINVAL, '', + "Command not found '{0}'".format(cmd['prefix'])) + + def serve(self): + if InfluxDBClient is None: + self.log.error("Cannot transmit statistics: influxdb python " + "module not found. Did you install it?") + return + + self.log.info('Starting influx module') + self.run = True + while self.run: + self.send_to_influx() + self.log.debug("Running interval loop") + interval = self.get_config("interval") + if interval is None: + interval = 5 + self.log.debug("sleeping for %d seconds",interval) + self.event.wait(interval) +