-
-from datetime import datetime
-from threading import Event
-import json
-import errno
-
-from mgr_module import MgrModule
-
-try:
- from influxdb import InfluxDBClient
- from influxdb.exceptions import InfluxDBClientError
-except ImportError:
- InfluxDBClient = None
-
-class Module(MgrModule):
- COMMANDS = [
- {
- "cmd": "influx self-test",
- "desc": "debug the module",
- "perm": "rw"
- },
- ]
-
-
- def __init__(self, *args, **kwargs):
- super(Module, self).__init__(*args, **kwargs)
- self.event = Event()
- self.run = True
-
-
- def get_latest(self, daemon_type, daemon_name, stat):
- data = self.get_counter(daemon_type, daemon_name, stat)[stat]
- if data:
- return data[-1][1]
- else:
- return 0
-
-
- def get_df_stats(self):
- df = self.get("df")
- data = []
-
- df_types = [
- 'bytes_used',
- 'dirty',
- 'rd_bytes',
- 'raw_bytes_used',
- 'wr_bytes',
- 'objects',
- 'max_avail'
- ]
-
- for df_type in df_types:
- for pool in df['pools']:
- point = {
- "measurement": "ceph_pool_stats",
- "tags": {
- "pool_name" : pool['name'],
- "pool_id" : pool['id'],
- "type_instance" : df_type,
- "mgr_id" : self.get_mgr_id(),
- },
- "time" : datetime.utcnow().isoformat() + 'Z',
- "fields": {
- "value" : pool['stats'][df_type],
- }
- }
- data.append(point)
- return data
-
- def get_daemon_stats(self):
- data = []
-
- for daemon, counters in self.get_all_perf_counters().iteritems():
- svc_type, svc_id = daemon.split(".")
- metadata = self.get_metadata(svc_type, svc_id)
-
- for path, counter_info in counters.items():
- if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
- continue
-
- value = counter_info['value']
-
- data.append({
- "measurement": "ceph_daemon_stats",
- "tags": {
- "ceph_daemon": daemon,
- "type_instance": path,
- "host": metadata['hostname']
- },
- "time": datetime.utcnow().isoformat() + 'Z',
- "fields": {
- "value": value
- }
- })
-
- return data
-
- def send_to_influx(self):
- host = self.get_config("hostname")
- if not host:
- self.log.error("No InfluxDB server configured, please set"
- "`hostname` configuration key.")
- return
-
- port = int(self.get_config("port", default="8086"))
- database = self.get_config("database", default="ceph")
-
- # If influx server has authentication turned off then
- # missing username/password is valid.
- username = self.get_config("username", default="")
- password = self.get_config("password", default="")
-
- client = InfluxDBClient(host, port, username, password, database)
-
- # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
- try:
- client.write_points(self.get_df_stats(), 'ms')
- client.write_points(self.get_daemon_stats(), 'ms')
- except InfluxDBClientError as e:
- if e.code == 404:
- self.log.info("Database '{0}' not found, trying to create (requires admin privs). You can also create manually and grant write privs to user '{1}'".format(database,username))
- client.create_database(database)
- else:
- raise
-
- def shutdown(self):
- self.log.info('Stopping influx module')
- self.run = False
- self.event.set()
-
- def handle_command(self, cmd):
- if cmd['prefix'] == 'influx self-test':
- daemon_stats = self.get_daemon_stats()
- assert len(daemon_stats)
- df_stats = self.get_df_stats()
- result = {
- 'daemon_stats': daemon_stats,
- 'df_stats': df_stats
- }
- return 0, json.dumps(result, indent=2), 'Self-test OK'
- else:
- return (-errno.EINVAL, '',
- "Command not found '{0}'".format(cmd['prefix']))
-
- def serve(self):
- if InfluxDBClient is None:
- self.log.error("Cannot transmit statistics: influxdb python "
- "module not found. Did you install it?")
- return
-
- self.log.info('Starting influx module')
- self.run = True
- while self.run:
- self.send_to_influx()
- self.log.debug("Running interval loop")
- interval = self.get_config("interval")
- if interval is None:
- interval = 5
- self.log.debug("sleeping for %d seconds",interval)
- self.event.wait(interval)
-