Add Collectd as a Monitor Type
[doctor.git] / tests / lib / monitors / collectd / collectd_plugin.py
1 ##############################################################################
2 # Copyright (c) 2017 NEC Corporation and others.
3 #
4 # All rights reserved. This program and the accompanying materials
5 # are made available under the terms of the Apache License, Version 2.0
6 # which accompanies this distribution, and is available at
7 # http://www.apache.org/licenses/LICENSE-2.0
8 ##############################################################################
9
10 import collectd
11 import sys
12 from netifaces import interfaces, ifaddresses, AF_INET
13 from datetime import datetime
14 import json
15 import requests
16 import time
17 from requests.exceptions import ConnectionError
18
19 from keystoneauth1 import loading
20 from keystoneauth1 import session
21 from congressclient.v1 import client
22
23
24 def write_debug(str_write, write_type, compute_user):
25     file_name = ('/home/%s/monitor.log' % compute_user)
26     file_tmp = open(file_name, write_type)
27     file_tmp.write( "%s" % str_write)
28     file_tmp.close()
29
30
31 class DoctorMonitorCollectd(object):
32     def __init__(self):
33         self.control_ip = ''
34         self.compute_user = ''
35         self.compute_ip = ''
36         self.host_name = ''
37         self.inspector_type = ''
38         self.inspector_url = ''
39         self.os_auth_url = ''
40         self.os_username = ''
41         self.os_password = ''
42         self.os_project_name = ''
43         self.os_user_domain_name = ''
44         self.os_user_domain_id = ''
45         self.os_project_domain_name = ''
46         self.os_project_domain_id = ''
47         self.sess = ''
48         self.auth = ''
49         self.inspector_notified = 0
50         self.start_notifications = 0
51         self.monitor_type = 'sample'
52
53     def config_func(self, config):
54         for node in config.children:
55             key = node.key.lower()
56             val = node.values[0]
57
58             if key == 'compute_host':
59                 self.host_name = val
60             elif key == 'control_ip':
61                 self.control_ip = val
62             elif key == 'compute_ip':
63                 self.compute_ip = val
64             elif key == 'compute_user':
65                 self.compute_user = val
66             elif key == 'inspector_type':
67                 self.inspector_type = val
68             elif key == 'os_auth_url':
69                 self.os_auth_url = val
70             elif key == 'os_username':
71                 self.os_username = val
72             elif key == 'os_password':
73                 self.os_password = val
74             elif key == 'os_project_name':
75                 self.os_project_name = val
76             elif key == 'os_user_domain_name':
77                 self.os_user_domain_name = val
78             elif key == 'os_user_domain_id':
79                 self.os_user_domain_id = val
80             elif key == 'os_project_domain_name':
81                 self.os_project_domain_name = val
82             elif key == 'os_project_domain_id':
83                 self.os_project_domain_id = val
84             else:
85                 collectd.info('Unknown config key "%s"' % key)
86
87     def init_collectd(self):
88         write_debug("Compute node collectd monitor start at %s\n\n" % datetime.now().isoformat(), "w", self.compute_user)
89
90         if self.inspector_type == 'sample':
91             self.inspector_url = ('http://%s:12345/events' % self.control_ip)
92         elif self.inspector_type == 'congress':
93             loader = loading.get_plugin_loader('password')
94             self.auth = loader.load_from_options(auth_url=self.os_auth_url,
95                         username=self.os_username,
96                         password=self.os_password,
97                         project_name=self.os_project_name,
98                         user_domain_name=self.os_user_domain_name,
99                         user_domain_id=self.os_user_domain_id,
100                         project_domain_name=self.os_project_domain_name,
101                         project_domain_id=self.os_project_domain_id)
102             self.sess=session.Session(auth=self.auth)
103             congress = client.Client(session=self.sess, service_type='policy')
104             ds = congress.list_datasources()['results']
105             doctor_ds = next((item for item in ds if item['driver'] == 'doctor'),
106                          None)
107
108             congress_endpoint = congress.httpclient.get_endpoint(auth=self.auth)
109             self.inspector_url = ('%s/v1/data-sources/%s/tables/events/rows' %
110                               (congress_endpoint, doctor_ds['id']))
111         else:
112             sys.exit()
113         self.start_notifications = 1
114
115
116     def notify_inspector(self):
117         event_type = "compute.host.down"
118         payload = [
119             {
120                  'id': ("monitor_%s_id1" % self.monitor_type),
121                  'time': datetime.now().isoformat(),
122                  'type': event_type,
123                  'details': {
124                      'hostname': self.host_name,
125                      'status': 'down',
126                      'monitor': ("monitor_%s" % self.monitor_type),
127                      'monitor_event_id': ("monitor_%s_event1" % self.monitor_type)
128                  },
129              },
130         ]
131         data = json.dumps(payload)
132         self.inspector_notified = 1
133
134         if self.inspector_type == 'sample':
135             headers = {'content-type': 'application/json'}
136             try:
137                 requests.post(self.inspector_url, data=data, headers=headers)
138             except ConnectionError as err:
139                 print err
140         elif self.inspector_type == 'congress':
141             # TODO(umar) enhance for token expiry case
142             headers = {
143                 'Content-Type': 'application/json',
144                 'Accept': 'application/json',
145                 'X-Auth-Token': self.sess.get_token()
146             }
147             requests.put(self.inspector_url, data=data, headers=headers)
148
149
150     def handle_notif(self, notification, data=None):
151         if (notification.severity == collectd.NOTIF_FAILURE or
152             notification.severity == collectd.NOTIF_WARNING):
153             if (self.start_notifications == 1 and self.inspector_notified == 0):
154                 write_debug("Received down notification: doctor monitor detected at %s\n" % time.time(), "a", self.compute_user)
155                 self.notify_inspector()
156
157         elif notification.severity == collectd.NOTIF_OKAY:
158             collectd.info("Interface status: UP again %s\n" % time.time())
159         else:
160             collectd.info("Unknown notification severity %s\n" % notification.severity)
161
162
163 monitor = DoctorMonitorCollectd()
164
165 collectd.register_config(monitor.config_func)
166 collectd.register_init(monitor.init_collectd)
167 collectd.register_notification(monitor.handle_notif)