2 from datetime import datetime
3 from threading import Event
7 from mgr_module import MgrModule
10 from influxdb import InfluxDBClient
11 from influxdb.exceptions import InfluxDBClientError
15 class Module(MgrModule):
18 "cmd": "influx self-test",
19 "desc": "debug the module",
25 def __init__(self, *args, **kwargs):
26 super(Module, self).__init__(*args, **kwargs)
31 def get_latest(self, daemon_type, daemon_name, stat):
32 data = self.get_counter(daemon_type, daemon_name, stat)[stat]
39 def get_df_stats(self):
53 for df_type in df_types:
54 for pool in df['pools']:
56 "measurement": "ceph_pool_stats",
58 "pool_name" : pool['name'],
59 "pool_id" : pool['id'],
60 "type_instance" : df_type,
61 "mgr_id" : self.get_mgr_id(),
63 "time" : datetime.utcnow().isoformat() + 'Z',
65 "value" : pool['stats'][df_type],
71 def get_daemon_stats(self):
74 for daemon, counters in self.get_all_perf_counters().iteritems():
75 svc_type, svc_id = daemon.split(".")
76 metadata = self.get_metadata(svc_type, svc_id)
78 for path, counter_info in counters.items():
79 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
82 value = counter_info['value']
85 "measurement": "ceph_daemon_stats",
87 "ceph_daemon": daemon,
88 "type_instance": path,
89 "host": metadata['hostname']
91 "time": datetime.utcnow().isoformat() + 'Z',
99 def send_to_influx(self):
100 host = self.get_config("hostname")
102 self.log.error("No InfluxDB server configured, please set"
103 "`hostname` configuration key.")
106 port = int(self.get_config("port", default="8086"))
107 database = self.get_config("database", default="ceph")
109 # If influx server has authentication turned off then
110 # missing username/password is valid.
111 username = self.get_config("username", default="")
112 password = self.get_config("password", default="")
114 client = InfluxDBClient(host, port, username, password, database)
116 # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
118 client.write_points(self.get_df_stats(), 'ms')
119 client.write_points(self.get_daemon_stats(), 'ms')
120 except InfluxDBClientError as e:
122 self.log.info("Database '{0}' not found, trying to create (requires admin privs). You can also create manually and grant write privs to user '{1}'".format(database,username))
123 client.create_database(database)
128 self.log.info('Stopping influx module')
132 def handle_command(self, cmd):
133 if cmd['prefix'] == 'influx self-test':
134 daemon_stats = self.get_daemon_stats()
135 assert len(daemon_stats)
136 df_stats = self.get_df_stats()
138 'daemon_stats': daemon_stats,
141 return 0, json.dumps(result, indent=2), 'Self-test OK'
143 return (-errno.EINVAL, '',
144 "Command not found '{0}'".format(cmd['prefix']))
147 if InfluxDBClient is None:
148 self.log.error("Cannot transmit statistics: influxdb python "
149 "module not found. Did you install it?")
152 self.log.info('Starting influx module')
155 self.send_to_influx()
156 self.log.debug("Running interval loop")
157 interval = self.get_config("interval")
160 self.log.debug("sleeping for %d seconds",interval)
161 self.event.wait(interval)