Python 2 to 3 migration fixes
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / ves_app.py
1 #!/usr/bin/env python
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import json
16 import sys
17 import base64
18 import logging
19 import argparse
20
21 try:
22     import configparser
23 except ImportError:
24     import ConfigParser as configparser
25
26 from distutils.util import strtobool
27 from kafka import KafkaConsumer
28
29 from .normalizer import Normalizer
30 from .normalizer import CollectdValue
31
32 try:
33     # For Python 3.0 and later
34     import urllib.request as url
35 except ImportError:
36     # Fall back to Python 2's urllib2
37     import urllib2 as url
38
39 class VESApp(Normalizer):
40     """VES Application"""
41
42     def __init__(self):
43         """Application initialization"""
44         self._app_config = {
45             'Domain': '127.0.0.1',
46             'Port': 30000,
47             'Path': '',
48             'Username': '',
49             'Password': '',
50             'Topic': '',
51             'UseHttps': False,
52             'SendEventInterval': 20.0,
53             'ApiVersion': 5.1,
54             'KafkaPort': 9092,
55             'KafkaBroker': 'localhost'
56         }
57
58     def send_data(self, event):
59         """Send event to VES"""
60         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
61             's' if self._app_config['UseHttps'] else '',
62             self._app_config['Domain'], int(self._app_config['Port']),
63             '{}'.format('/{}'.format(self._app_config['Path']) if len(
64                 self._app_config['Path']) > 0 else ''),
65             int(self._app_config['ApiVersion']), '{}'.format(
66                 '/{}'.format(self._app_config['Topic']) if len(
67                     self._app_config['Topic']) > 0 else ''))
68         logging.info('Vendor Event Listener is at: {}'.format(server_url))
69         credentials = base64.b64encode('{}:{}'.format(
70             self._app_config['Username'],
71             self._app_config['Password']).encode()).decode()
72         logging.info('Authentication credentials are: {}'.format(credentials))
73         try:
74             request = url.Request(server_url)
75             request.add_header('Authorization', 'Basic {}'.format(credentials))
76             request.add_header('Content-Type', 'application/json')
77             event_str = json.dumps(event).encode()
78             logging.debug("Sending {} to {}".format(event_str, server_url))
79             url.urlopen(request, event_str, timeout=1)
80             logging.debug("Sent data to {} successfully".format(server_url))
81         except url.HTTPError as e:
82             logging.error('Vendor Event Listener exception: {}'.format(e))
83         except url.URLError as e:
84             logging.error(
85                 'Vendor Event Listener is is not reachable: {}'.format(e))
86         except Exception as e:
87             logging.error('Vendor Event Listener error: {}'.format(e))
88
89     def config(self, config):
90         """VES option configuration"""
91         for key, value in config.items('config'):
92             if key in self._app_config:
93                 try:
94                     if type(self._app_config[key]) == int:
95                         value = int(value)
96                     elif type(self._app_config[key]) == float:
97                         value = float(value)
98                     elif type(self._app_config[key]) == bool:
99                         value = bool(strtobool(value))
100
101                     if isinstance(value, type(self._app_config[key])):
102                         self._app_config[key] = value
103                     else:
104                         logging.error("Type mismatch with %s" % key)
105                         sys.exit()
106                 except ValueError:
107                     logging.error("Incorrect value type for %s" % key)
108                     sys.exit()
109             else:
110                 logging.error("Incorrect key configuration %s" % key)
111                 sys.exit()
112
113     def init(self, configfile, schema_file):
114         if configfile is not None:
115             # read VES configuration file if provided
116             config = configparser.ConfigParser()
117             config.optionxform = lambda option: option
118             config.read(configfile)
119             self.config(config)
120         # initialize normalizer
121         self.initialize(schema_file, self._app_config['SendEventInterval'])
122
123     def run(self):
124         """Consumer JSON data from kafka broker"""
125         kafka_server = '{}:{}'.format(
126             self._app_config.get('KafkaBroker'),
127             self._app_config.get('KafkaPort'))
128         consumer = KafkaConsumer(
129             'collectd', bootstrap_servers=kafka_server,
130             auto_offset_reset='latest', enable_auto_commit=False,
131             value_deserializer=lambda m: json.loads(m.decode('ascii')))
132
133         for message in consumer:
134             for kafka_data in message.value:
135                 # {
136                 #   u'dstypes': [u'derive'],
137                 #   u'plugin': u'cpu',
138                 #   u'dsnames': [u'value'],
139                 #   u'interval': 10.0,
140                 #   u'host': u'localhost',
141                 #   u'values': [99.9978996416267],
142                 #   u'time': 1502114956.244,
143                 #   u'plugin_instance': u'44',
144                 #   u'type_instance': u'idle',
145                 #   u'type': u'cpu'
146                 # }
147                 logging.debug('{}:run():data={}'.format(
148                     self.__class__.__name__, kafka_data))
149                 for ds_name in kafka_data['dsnames']:
150                     index = kafka_data['dsnames'].index(ds_name)
151                     val_hash = CollectdValue.hash_gen(
152                         kafka_data['host'], kafka_data['plugin'],
153                         kafka_data['plugin_instance'], kafka_data['type'],
154                         kafka_data['type_instance'], ds_name)
155                     collector = self.get_collector()
156                     val = collector.get(val_hash)
157                     if val:
158                         # update the value
159                         val.value = kafka_data['values'][index]
160                         val.time = kafka_data['time']
161                         del(val)
162                     else:
163                         # add new value into the collector
164                         val = CollectdValue()
165                         val.host = kafka_data['host']
166                         val.plugin = kafka_data['plugin']
167                         val.plugin_instance = kafka_data['plugin_instance']
168                         val.type = kafka_data['type']
169                         val.type_instance = kafka_data['type_instance']
170                         val.value = kafka_data['values'][index]
171                         val.interval = kafka_data['interval']
172                         val.time = kafka_data['time']
173                         val.ds_name = ds_name
174                         collector.add(val)
175
176
177 def main():
178     # Parsing cmdline options
179     parser = argparse.ArgumentParser()
180     parser.add_argument("--events-schema", dest="schema", required=True,
181                         help="YAML events schema definition", metavar="FILE")
182     parser.add_argument("--config", dest="configfile", default=None,
183                         help="Specify config file", metavar="FILE")
184     parser.add_argument("--loglevel", dest="level", default='INFO',
185                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
186                         help="Specify log level (default: %(default)s)",
187                         metavar="LEVEL")
188     parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
189                         help="Specify log file (default: %(default)s)",
190                         metavar="FILE")
191     args = parser.parse_args()
192
193     # Create log file
194     logging.basicConfig(filename=args.logfile,
195                         format='%(asctime)s %(message)s',
196                         level=args.level)
197     if args.configfile is None:
198         logging.warning("No configfile specified, using default options")
199
200     # Create Application Instance
201     application_instance = VESApp()
202     application_instance.init(args.configfile, args.schema)
203
204     try:
205         # Run the plugin
206         application_instance.run()
207     except KeyboardInterrupt:
208         logging.info(" - Ctrl-C handled, exiting gracefully")
209     except Exception as e:
210         logging.error('{}, {}'.format(type(e), e))
211     finally:
212         application_instance.destroy()
213         sys.exit()
214
215
216 if __name__ == '__main__':
217     main()