--- /dev/null
+
+from datetime import datetime
+from threading import Event
+import json
+import errno
+
+from mgr_module import MgrModule
+
+try:
+ from influxdb import InfluxDBClient
+ from influxdb.exceptions import InfluxDBClientError
+except ImportError:
+ InfluxDBClient = None
+
+class Module(MgrModule):
+ COMMANDS = [
+ {
+ "cmd": "influx self-test",
+ "desc": "debug the module",
+ "perm": "rw"
+ },
+ ]
+
+
+ def __init__(self, *args, **kwargs):
+ super(Module, self).__init__(*args, **kwargs)
+ self.event = Event()
+ self.run = True
+
+
+ def get_latest(self, daemon_type, daemon_name, stat):
+ data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+ if data:
+ return data[-1][1]
+ else:
+ return 0
+
+
+ def get_df_stats(self):
+ df = self.get("df")
+ data = []
+
+ df_types = [
+ 'bytes_used',
+ 'dirty',
+ 'rd_bytes',
+ 'raw_bytes_used',
+ 'wr_bytes',
+ 'objects',
+ 'max_avail'
+ ]
+
+ for df_type in df_types:
+ for pool in df['pools']:
+ point = {
+ "measurement": "ceph_pool_stats",
+ "tags": {
+ "pool_name" : pool['name'],
+ "pool_id" : pool['id'],
+ "type_instance" : df_type,
+ "mgr_id" : self.get_mgr_id(),
+ },
+ "time" : datetime.utcnow().isoformat() + 'Z',
+ "fields": {
+ "value" : pool['stats'][df_type],
+ }
+ }
+ data.append(point)
+ return data
+
+ def get_daemon_stats(self):
+ data = []
+
+ for daemon, counters in self.get_all_perf_counters().iteritems():
+ svc_type, svc_id = daemon.split(".")
+ metadata = self.get_metadata(svc_type, svc_id)
+
+ for path, counter_info in counters.items():
+ if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
+ continue
+
+ value = counter_info['value']
+
+ data.append({
+ "measurement": "ceph_daemon_stats",
+ "tags": {
+ "ceph_daemon": daemon,
+ "type_instance": path,
+ "host": metadata['hostname']
+ },
+ "time": datetime.utcnow().isoformat() + 'Z',
+ "fields": {
+ "value": value
+ }
+ })
+
+ return data
+
+ def send_to_influx(self):
+ host = self.get_config("hostname")
+ if not host:
+ self.log.error("No InfluxDB server configured, please set"
+ "`hostname` configuration key.")
+ return
+
+ port = int(self.get_config("port", default="8086"))
+ database = self.get_config("database", default="ceph")
+
+ # If influx server has authentication turned off then
+ # missing username/password is valid.
+ username = self.get_config("username", default="")
+ password = self.get_config("password", default="")
+
+ client = InfluxDBClient(host, port, username, password, database)
+
+ # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
+ try:
+ client.write_points(self.get_df_stats(), 'ms')
+ client.write_points(self.get_daemon_stats(), 'ms')
+ except InfluxDBClientError as e:
+ if e.code == 404:
+ self.log.info("Database '{0}' not found, trying to create (requires admin privs). You can also create manually and grant write privs to user '{1}'".format(database,username))
+ client.create_database(database)
+ else:
+ raise
+
+ def shutdown(self):
+ self.log.info('Stopping influx module')
+ self.run = False
+ self.event.set()
+
+ def handle_command(self, cmd):
+ if cmd['prefix'] == 'influx self-test':
+ daemon_stats = self.get_daemon_stats()
+ assert len(daemon_stats)
+ df_stats = self.get_df_stats()
+ result = {
+ 'daemon_stats': daemon_stats,
+ 'df_stats': df_stats
+ }
+ return 0, json.dumps(result, indent=2), 'Self-test OK'
+ else:
+ return (-errno.EINVAL, '',
+ "Command not found '{0}'".format(cmd['prefix']))
+
+ def serve(self):
+ if InfluxDBClient is None:
+ self.log.error("Cannot transmit statistics: influxdb python "
+ "module not found. Did you install it?")
+ return
+
+ self.log.info('Starting influx module')
+ self.run = True
+ while self.run:
+ self.send_to_influx()
+ self.log.debug("Running interval loop")
+ interval = self.get_config("interval")
+ if interval is None:
+ interval = 5
+ self.log.debug("sleeping for %d seconds",interval)
+ self.event.wait(interval)
+