105c66e253f6857813864d1a7f356b79ec2581b6
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / ves_app.py
1 #!/usr/bin/env python
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import json
16 import sys
17 import base64
18 import ConfigParser
19 import logging
20 import argparse
21
22 from distutils.util import strtobool
23 from kafka import KafkaConsumer
24
25 from normalizer import Normalizer
26 from normalizer import CollectdValue
27
28 try:
29     # For Python 3.0 and later
30     import urllib.request as url
31 except ImportError:
32     # Fall back to Python 2's urllib2
33     import urllib2 as url
34
35
36 class VESApp(Normalizer):
37     """VES Application"""
38
39     def __init__(self):
40         """Application initialization"""
41         self._app_config = {
42             'Domain': '127.0.0.1',
43             'Port': 30000,
44             'Path': '',
45             'Username': '',
46             'Password': '',
47             'Topic': '',
48             'UseHttps': False,
49             'SendEventInterval': 20.0,
50             'ApiVersion': 5.1,
51             'KafkaPort': 9092,
52             'KafkaBroker': 'localhost'
53         }
54
55     def send_data(self, event):
56         """Send event to VES"""
57         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
58             's' if self._app_config['UseHttps'] else '',
59             self._app_config['Domain'], int(self._app_config['Port']),
60             '{}'.format('/{}'.format(self._app_config['Path']) if len(
61                 self._app_config['Path']) > 0 else ''),
62             int(self._app_config['ApiVersion']), '{}'.format(
63                 '/{}'.format(self._app_config['Topic']) if len(
64                     self._app_config['Topic']) > 0 else ''))
65         logging.info('Vendor Event Listener is at: {}'.format(server_url))
66         credentials = base64.b64encode('{}:{}'.format(
67             self._app_config['Username'],
68             self._app_config['Password']).encode()).decode()
69         logging.info('Authentication credentials are: {}'.format(credentials))
70         try:
71             request = url.Request(server_url)
72             request.add_header('Authorization', 'Basic {}'.format(credentials))
73             request.add_header('Content-Type', 'application/json')
74             event_str = json.dumps(event).encode()
75             logging.debug("Sending {} to {}".format(event_str, server_url))
76             url.urlopen(request, event_str, timeout=1)
77             logging.debug("Sent data to {} successfully".format(server_url))
78         except url.HTTPError as e:
79             logging.error('Vendor Event Listener exception: {}'.format(e))
80         except url.URLError as e:
81             logging.error(
82                 'Vendor Event Listener is is not reachable: {}'.format(e))
83         except Exception as e:
84             logging.error('Vendor Event Listener error: {}'.format(e))
85
86     def config(self, config):
87         """VES option configuration"""
88         for key, value in config.items('config'):
89             if key in self._app_config:
90                 try:
91                     if type(self._app_config[key]) == int:
92                         value = int(value)
93                     elif type(self._app_config[key]) == float:
94                         value = float(value)
95                     elif type(self._app_config[key]) == bool:
96                         value = bool(strtobool(value))
97
98                     if isinstance(value, type(self._app_config[key])):
99                         self._app_config[key] = value
100                     else:
101                         logging.error("Type mismatch with %s" % key)
102                         sys.exit()
103                 except ValueError:
104                     logging.error("Incorrect value type for %s" % key)
105                     sys.exit()
106             else:
107                 logging.error("Incorrect key configuration %s" % key)
108                 sys.exit()
109
110     def init(self, configfile, schema_file):
111         if configfile is not None:
112             # read VES configuration file if provided
113             config = ConfigParser.ConfigParser()
114             config.optionxform = lambda option: option
115             config.read(configfile)
116             self.config(config)
117         # initialize normalizer
118         self.initialize(schema_file, self._app_config['SendEventInterval'])
119
120     def run(self):
121         """Consumer JSON data from kafka broker"""
122         kafka_server = '{}:{}'.format(
123             self._app_config.get('KafkaBroker'),
124             self._app_config.get('KafkaPort'))
125         consumer = KafkaConsumer(
126             'collectd', bootstrap_servers=kafka_server,
127             auto_offset_reset='latest', enable_auto_commit=False,
128             value_deserializer=lambda m: json.loads(m.decode('ascii')))
129
130         for message in consumer:
131             for kafka_data in message.value:
132                 # {
133                 #   u'dstypes': [u'derive'],
134                 #   u'plugin': u'cpu',
135                 #   u'dsnames': [u'value'],
136                 #   u'interval': 10.0,
137                 #   u'host': u'localhost',
138                 #   u'values': [99.9978996416267],
139                 #   u'time': 1502114956.244,
140                 #   u'plugin_instance': u'44',
141                 #   u'type_instance': u'idle',
142                 #   u'type': u'cpu'
143                 # }
144                 logging.debug('{}:run():data={}'.format(
145                     self.__class__.__name__, kafka_data))
146                 for ds_name in kafka_data['dsnames']:
147                     index = kafka_data['dsnames'].index(ds_name)
148                     val_hash = CollectdValue.hash_gen(
149                         kafka_data['host'], kafka_data['plugin'],
150                         kafka_data['plugin_instance'], kafka_data['type'],
151                         kafka_data['type_instance'], ds_name)
152                     collector = self.get_collector()
153                     val = collector.get(val_hash)
154                     if val:
155                         # update the value
156                         val.value = kafka_data['values'][index]
157                         val.time = kafka_data['time']
158                         del(val)
159                     else:
160                         # add new value into the collector
161                         val = CollectdValue()
162                         val.host = kafka_data['host']
163                         val.plugin = kafka_data['plugin']
164                         val.plugin_instance = kafka_data['plugin_instance']
165                         val.type = kafka_data['type']
166                         val.type_instance = kafka_data['type_instance']
167                         val.value = kafka_data['values'][index]
168                         val.interval = kafka_data['interval']
169                         val.time = kafka_data['time']
170                         val.ds_name = ds_name
171                         collector.add(val)
172
173
174 def main():
175     # Parsing cmdline options
176     parser = argparse.ArgumentParser()
177     parser.add_argument("--events-schema", dest="schema", required=True,
178                         help="YAML events schema definition", metavar="FILE")
179     parser.add_argument("--config", dest="configfile", default=None,
180                         help="Specify config file", metavar="FILE")
181     parser.add_argument("--loglevel", dest="level", default='INFO',
182                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
183                         help="Specify log level (default: %(default)s)",
184                         metavar="LEVEL")
185     parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
186                         help="Specify log file (default: %(default)s)",
187                         metavar="FILE")
188     args = parser.parse_args()
189
190     # Create log file
191     logging.basicConfig(filename=args.logfile,
192                         format='%(asctime)s %(message)s',
193                         level=args.level)
194     if args.configfile is None:
195         logging.warning("No configfile specified, using default options")
196
197     # Create Application Instance
198     application_instance = VESApp()
199     application_instance.init(args.configfile, args.schema)
200
201     try:
202         # Run the plugin
203         application_instance.run()
204     except KeyboardInterrupt:
205         logging.info(" - Ctrl-C handled, exiting gracefully")
206     except Exception as e:
207         logging.error('{}, {}'.format(type(e), e))
208     finally:
209         application_instance.destroy()
210         sys.exit()
211
212
213 if __name__ == '__main__':
214     main()