Fix some bugs when testing opensds ansible
[stor4nfv.git] / src / ceph / src / pybind / mgr / influx / module.py
1
2 from datetime import datetime
3 from threading import Event
4 import json
5 import errno
6
7 from mgr_module import MgrModule
8
9 try:
10     from influxdb import InfluxDBClient
11     from influxdb.exceptions import InfluxDBClientError
12 except ImportError:
13     InfluxDBClient = None
14
15 class Module(MgrModule):
16     COMMANDS = [
17         {
18             "cmd": "influx self-test",
19             "desc": "debug the module",
20             "perm": "rw"  
21         },
22     ]
23
24
25     def __init__(self, *args, **kwargs):
26         super(Module, self).__init__(*args, **kwargs)
27         self.event = Event()
28         self.run = True 
29
30
31     def get_latest(self, daemon_type, daemon_name, stat):
32         data = self.get_counter(daemon_type, daemon_name, stat)[stat]
33         if data:
34             return data[-1][1]
35         else:
36             return 0
37
38
39     def get_df_stats(self):
40         df = self.get("df")
41         data = []
42
43         df_types = [
44             'bytes_used',
45             'dirty',
46             'rd_bytes',
47             'raw_bytes_used',
48             'wr_bytes',
49             'objects',
50             'max_avail'
51         ]
52
53         for df_type in df_types:
54             for pool in df['pools']:
55                 point = {
56                     "measurement": "ceph_pool_stats",
57                     "tags": {
58                         "pool_name" : pool['name'],
59                         "pool_id" : pool['id'],
60                         "type_instance" : df_type,
61                         "mgr_id" : self.get_mgr_id(),
62                     },
63                         "time" : datetime.utcnow().isoformat() + 'Z',
64                         "fields": {
65                             "value" : pool['stats'][df_type],
66                         }
67                 }
68                 data.append(point)
69         return data
70
71     def get_daemon_stats(self):
72         data = []
73
74         for daemon, counters in self.get_all_perf_counters().iteritems():
75             svc_type, svc_id = daemon.split(".")
76             metadata = self.get_metadata(svc_type, svc_id)
77
78             for path, counter_info in counters.items():
79                 if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
80                     continue
81
82                 value = counter_info['value']
83
84                 data.append({
85                     "measurement": "ceph_daemon_stats",
86                     "tags": {
87                         "ceph_daemon": daemon,
88                         "type_instance": path,
89                         "host": metadata['hostname']
90                     },
91                     "time": datetime.utcnow().isoformat() + 'Z',
92                     "fields": {
93                         "value": value
94                     }
95                 })
96
97         return data
98
99     def send_to_influx(self):
100         host = self.get_config("hostname")
101         if not host:
102             self.log.error("No InfluxDB server configured, please set"
103                            "`hostname` configuration key.")
104             return
105
106         port = int(self.get_config("port", default="8086"))
107         database = self.get_config("database", default="ceph")
108
109         # If influx server has authentication turned off then
110         # missing username/password is valid.
111         username = self.get_config("username", default="")
112         password = self.get_config("password", default="")
113
114         client = InfluxDBClient(host, port, username, password, database)
115
116         # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
117         try:
118             client.write_points(self.get_df_stats(), 'ms')
119             client.write_points(self.get_daemon_stats(), 'ms')
120         except InfluxDBClientError as e:
121             if e.code == 404:
122                 self.log.info("Database '{0}' not found, trying to create (requires admin privs).  You can also create manually and grant write privs to user '{1}'".format(database,username))
123                 client.create_database(database)
124             else:
125                 raise
126
127     def shutdown(self):
128         self.log.info('Stopping influx module')
129         self.run = False
130         self.event.set()
131
132     def handle_command(self, cmd):
133         if cmd['prefix'] == 'influx self-test':
134             daemon_stats = self.get_daemon_stats()
135             assert len(daemon_stats)
136             df_stats = self.get_df_stats()
137             result = {
138                 'daemon_stats': daemon_stats,
139                 'df_stats': df_stats
140             }
141             return 0, json.dumps(result, indent=2), 'Self-test OK'
142         else:
143             return (-errno.EINVAL, '',
144                     "Command not found '{0}'".format(cmd['prefix']))
145
146     def serve(self):
147         if InfluxDBClient is None:
148             self.log.error("Cannot transmit statistics: influxdb python "
149                            "module not found.  Did you install it?")
150             return
151
152         self.log.info('Starting influx module')
153         self.run = True
154         while self.run:
155             self.send_to_influx()
156             self.log.debug("Running interval loop")
157             interval = self.get_config("interval")
158             if interval is None:
159                 interval = 5
160             self.log.debug("sleeping for %d seconds",interval)
161             self.event.wait(interval)
162