initial code repo
[stor4nfv.git] / src / ceph / src / pybind / mgr / influx / module.py
diff --git a/src/ceph/src/pybind/mgr/influx/module.py b/src/ceph/src/pybind/mgr/influx/module.py
new file mode 100644 (file)
index 0000000..adeb452
--- /dev/null
@@ -0,0 +1,162 @@
+
+from datetime import datetime
+from threading import Event
+import json
+import errno
+
+from mgr_module import MgrModule
+
+try:
+    from influxdb import InfluxDBClient
+    from influxdb.exceptions import InfluxDBClientError
+except ImportError:
+    InfluxDBClient = None
+
+class Module(MgrModule):
+    COMMANDS = [
+        {
+            "cmd": "influx self-test",
+            "desc": "debug the module",
+            "perm": "rw"  
+        },
+    ]
+
+
+    def __init__(self, *args, **kwargs):
+        super(Module, self).__init__(*args, **kwargs)
+        self.event = Event()
+        self.run = True 
+
+
+    def get_latest(self, daemon_type, daemon_name, stat):
+        data = self.get_counter(daemon_type, daemon_name, stat)[stat]
+        if data:
+            return data[-1][1]
+        else:
+            return 0
+
+
+    def get_df_stats(self):
+        df = self.get("df")
+        data = []
+
+        df_types = [
+            'bytes_used',
+            'dirty',
+            'rd_bytes',
+            'raw_bytes_used',
+            'wr_bytes',
+            'objects',
+            'max_avail'
+        ]
+
+        for df_type in df_types:
+            for pool in df['pools']:
+                point = {
+                    "measurement": "ceph_pool_stats",
+                    "tags": {
+                        "pool_name" : pool['name'],
+                        "pool_id" : pool['id'],
+                        "type_instance" : df_type,
+                        "mgr_id" : self.get_mgr_id(),
+                    },
+                        "time" : datetime.utcnow().isoformat() + 'Z',
+                        "fields": {
+                            "value" : pool['stats'][df_type],
+                        }
+                }
+                data.append(point)
+        return data
+
+    def get_daemon_stats(self):
+        data = []
+
+        for daemon, counters in self.get_all_perf_counters().iteritems():
+            svc_type, svc_id = daemon.split(".")
+            metadata = self.get_metadata(svc_type, svc_id)
+
+            for path, counter_info in counters.items():
+                if counter_info['type'] & self.PERFCOUNTER_HISTOGRAM:
+                    continue
+
+                value = counter_info['value']
+
+                data.append({
+                    "measurement": "ceph_daemon_stats",
+                    "tags": {
+                        "ceph_daemon": daemon,
+                        "type_instance": path,
+                        "host": metadata['hostname']
+                    },
+                    "time": datetime.utcnow().isoformat() + 'Z',
+                    "fields": {
+                        "value": value
+                    }
+                })
+
+        return data
+
+    def send_to_influx(self):
+        host = self.get_config("hostname")
+        if not host:
+            self.log.error("No InfluxDB server configured, please set"
+                           "`hostname` configuration key.")
+            return
+
+        port = int(self.get_config("port", default="8086"))
+        database = self.get_config("database", default="ceph")
+
+        # If influx server has authentication turned off then
+        # missing username/password is valid.
+        username = self.get_config("username", default="")
+        password = self.get_config("password", default="")
+
+        client = InfluxDBClient(host, port, username, password, database)
+
+        # using influx client get_list_database requires admin privs, instead we'll catch the not found exception and inform the user if db can't be created
+        try:
+            client.write_points(self.get_df_stats(), 'ms')
+            client.write_points(self.get_daemon_stats(), 'ms')
+        except InfluxDBClientError as e:
+            if e.code == 404:
+                self.log.info("Database '{0}' not found, trying to create (requires admin privs).  You can also create manually and grant write privs to user '{1}'".format(database,username))
+                client.create_database(database)
+            else:
+                raise
+
+    def shutdown(self):
+        self.log.info('Stopping influx module')
+        self.run = False
+        self.event.set()
+
+    def handle_command(self, cmd):
+        if cmd['prefix'] == 'influx self-test':
+            daemon_stats = self.get_daemon_stats()
+            assert len(daemon_stats)
+            df_stats = self.get_df_stats()
+            result = {
+                'daemon_stats': daemon_stats,
+                'df_stats': df_stats
+            }
+            return 0, json.dumps(result, indent=2), 'Self-test OK'
+        else:
+            return (-errno.EINVAL, '',
+                    "Command not found '{0}'".format(cmd['prefix']))
+
+    def serve(self):
+        if InfluxDBClient is None:
+            self.log.error("Cannot transmit statistics: influxdb python "
+                           "module not found.  Did you install it?")
+            return
+
+        self.log.info('Starting influx module')
+        self.run = True
+        while self.run:
+            self.send_to_influx()
+            self.log.debug("Running interval loop")
+            interval = self.get_config("interval")
+            if interval is None:
+                interval = 5
+            self.log.debug("sleeping for %d seconds",interval)
+            self.event.wait(interval)
+