Python 2 to 3 migration of collectd-ves-app
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / ves_app.py
1 #!/usr/bin/env python
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #     http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15 import json
16 import sys
17 import base64
18 import configparser
19 import logging
20 import argparse
21
22 from distutils.util import strtobool
23 from kafka import KafkaConsumer
24
25 from .normalizer import Normalizer
26 from .normalizer import CollectdValue
27
28 import urllib.request as url
29
30 class VESApp(Normalizer):
31     """VES Application"""
32
33     def __init__(self):
34         """Application initialization"""
35         self._app_config = {
36             'Domain': '127.0.0.1',
37             'Port': 30000,
38             'Path': '',
39             'Username': '',
40             'Password': '',
41             'Topic': '',
42             'UseHttps': False,
43             'SendEventInterval': 20.0,
44             'ApiVersion': 5.1,
45             'KafkaPort': 9092,
46             'KafkaBroker': 'localhost'
47         }
48
49     def send_data(self, event):
50         """Send event to VES"""
51         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
52             's' if self._app_config['UseHttps'] else '',
53             self._app_config['Domain'], int(self._app_config['Port']),
54             '{}'.format('/{}'.format(self._app_config['Path']) if len(
55                 self._app_config['Path']) > 0 else ''),
56             int(self._app_config['ApiVersion']), '{}'.format(
57                 '/{}'.format(self._app_config['Topic']) if len(
58                     self._app_config['Topic']) > 0 else ''))
59         logging.info('Vendor Event Listener is at: {}'.format(server_url))
60         credentials = base64.b64encode('{}:{}'.format(
61             self._app_config['Username'],
62             self._app_config['Password']).encode()).decode()
63         logging.info('Authentication credentials are: {}'.format(credentials))
64         try:
65             request = url.Request(server_url)
66             request.add_header('Authorization', 'Basic {}'.format(credentials))
67             request.add_header('Content-Type', 'application/json')
68             event_str = json.dumps(event).encode()
69             logging.debug("Sending {} to {}".format(event_str, server_url))
70             url.urlopen(request, event_str, timeout=1)
71             logging.debug("Sent data to {} successfully".format(server_url))
72         except url.HTTPError as e:
73             logging.error('Vendor Event Listener exception: {}'.format(e))
74         except url.URLError as e:
75             logging.error(
76                 'Vendor Event Listener is is not reachable: {}'.format(e))
77         except Exception as e:
78             logging.error('Vendor Event Listener error: {}'.format(e))
79
80     def config(self, config):
81         """VES option configuration"""
82         for key, value in config.items('config'):
83             if key in self._app_config:
84                 try:
85                     if type(self._app_config[key]) == int:
86                         value = int(value)
87                     elif type(self._app_config[key]) == float:
88                         value = float(value)
89                     elif type(self._app_config[key]) == bool:
90                         value = bool(strtobool(value))
91
92                     if isinstance(value, type(self._app_config[key])):
93                         self._app_config[key] = value
94                     else:
95                         logging.error("Type mismatch with %s" % key)
96                         sys.exit()
97                 except ValueError:
98                     logging.error("Incorrect value type for %s" % key)
99                     sys.exit()
100             else:
101                 logging.error("Incorrect key configuration %s" % key)
102                 sys.exit()
103
104     def init(self, configfile, schema_file):
105         if configfile is not None:
106             # read VES configuration file if provided
107             config = configparser.ConfigParser()
108             config.optionxform = lambda option: option
109             config.read(configfile)
110             self.config(config)
111         # initialize normalizer
112         self.initialize(schema_file, self._app_config['SendEventInterval'])
113
114     def run(self):
115         """Consumer JSON data from kafka broker"""
116         kafka_server = '{}:{}'.format(
117             self._app_config.get('KafkaBroker'),
118             self._app_config.get('KafkaPort'))
119         consumer = KafkaConsumer(
120             'collectd', bootstrap_servers=kafka_server,
121             auto_offset_reset='latest', enable_auto_commit=False,
122             value_deserializer=lambda m: json.loads(m.decode('ascii')))
123
124         for message in consumer:
125             for kafka_data in message.value:
126                 # {
127                 #   u'dstypes': [u'derive'],
128                 #   u'plugin': u'cpu',
129                 #   u'dsnames': [u'value'],
130                 #   u'interval': 10.0,
131                 #   u'host': u'localhost',
132                 #   u'values': [99.9978996416267],
133                 #   u'time': 1502114956.244,
134                 #   u'plugin_instance': u'44',
135                 #   u'type_instance': u'idle',
136                 #   u'type': u'cpu'
137                 # }
138                 logging.debug('{}:run():data={}'.format(
139                     self.__class__.__name__, kafka_data))
140                 for ds_name in kafka_data['dsnames']:
141                     index = kafka_data['dsnames'].index(ds_name)
142                     val_hash = CollectdValue.hash_gen(
143                         kafka_data['host'], kafka_data['plugin'],
144                         kafka_data['plugin_instance'], kafka_data['type'],
145                         kafka_data['type_instance'], ds_name)
146                     collector = self.get_collector()
147                     val = collector.get(val_hash)
148                     if val:
149                         # update the value
150                         val.value = kafka_data['values'][index]
151                         val.time = kafka_data['time']
152                         del(val)
153                     else:
154                         # add new value into the collector
155                         val = CollectdValue()
156                         val.host = kafka_data['host']
157                         val.plugin = kafka_data['plugin']
158                         val.plugin_instance = kafka_data['plugin_instance']
159                         val.type = kafka_data['type']
160                         val.type_instance = kafka_data['type_instance']
161                         val.value = kafka_data['values'][index]
162                         val.interval = kafka_data['interval']
163                         val.time = kafka_data['time']
164                         val.ds_name = ds_name
165                         collector.add(val)
166
167
168 def main():
169     # Parsing cmdline options
170     parser = argparse.ArgumentParser()
171     parser.add_argument("--events-schema", dest="schema", required=True,
172                         help="YAML events schema definition", metavar="FILE")
173     parser.add_argument("--config", dest="configfile", default=None,
174                         help="Specify config file", metavar="FILE")
175     parser.add_argument("--loglevel", dest="level", default='INFO',
176                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
177                         help="Specify log level (default: %(default)s)",
178                         metavar="LEVEL")
179     parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
180                         help="Specify log file (default: %(default)s)",
181                         metavar="FILE")
182     args = parser.parse_args()
183
184     # Create log file
185     logging.basicConfig(filename=args.logfile,
186                         format='%(asctime)s %(message)s',
187                         level=args.level)
188     if args.configfile is None:
189         logging.warning("No configfile specified, using default options")
190
191     # Create Application Instance
192     application_instance = VESApp()
193     application_instance.init(args.configfile, args.schema)
194
195     try:
196         # Run the plugin
197         application_instance.run()
198     except KeyboardInterrupt:
199         logging.info(" - Ctrl-C handled, exiting gracefully")
200     except Exception as e:
201         logging.error('{}, {}'.format(type(e), e))
202     finally:
203         application_instance.destroy()
204         sys.exit()
205
206
207 if __name__ == '__main__':
208     main()