Merge changes If3d86e80,I48763243,I656a6786
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / ves_app.py
1 #!/usr/bin/env python
2 #
3 # Copyright(c) 2017-2019 Intel Corporation and OPNFV. All rights reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import json
19 import sys
20 import base64
21 import logging
22 import argparse
23
24 try:
25     import configparser
26 except ImportError:
27     import ConfigParser as configparser
28
29 from distutils.util import strtobool
30 from kafka import KafkaConsumer
31
32 from normalizer import Normalizer
33 from normalizer import CollectdValue
34
35 try:
36     # For Python 3.0 and later
37     import urllib.request as url
38 except ImportError:
39     # Fall back to Python 2's urllib2
40     import urllib2 as url
41
42 class VESApp(Normalizer):
43     """VES Application"""
44
45     def __init__(self):
46         """Application initialization"""
47         self._app_config = {
48             'Domain': '127.0.0.1',
49             'Port': 30000,
50             'Path': '',
51             'Username': '',
52             'Password': '',
53             'Topic': '',
54             'UseHttps': False,
55             'SendEventInterval': 20.0,
56             'ApiVersion': 5.1,
57             'KafkaPort': 9092,
58             'KafkaBroker': 'localhost'
59         }
60
61     def send_data(self, event):
62         """Send event to VES"""
63         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
64             's' if self._app_config['UseHttps'] else '',
65             self._app_config['Domain'], int(self._app_config['Port']),
66             '{}'.format('/{}'.format(self._app_config['Path']) if len(
67                 self._app_config['Path']) > 0 else ''),
68             int(self._app_config['ApiVersion']), '{}'.format(
69                 '/{}'.format(self._app_config['Topic']) if len(
70                     self._app_config['Topic']) > 0 else ''))
71         logging.info('Vendor Event Listener is at: {}'.format(server_url))
72         credentials = base64.b64encode('{}:{}'.format(
73             self._app_config['Username'],
74             self._app_config['Password']).encode()).decode()
75         logging.info('Authentication credentials are: {}'.format(credentials))
76         try:
77             request = url.Request(server_url)
78             request.add_header('Authorization', 'Basic {}'.format(credentials))
79             request.add_header('Content-Type', 'application/json')
80             event_str = json.dumps(event).encode()
81             logging.debug("Sending {} to {}".format(event_str, server_url))
82             url.urlopen(request, event_str, timeout=1)
83             logging.debug("Sent data to {} successfully".format(server_url))
84         except url.HTTPError as e:
85             logging.error('Vendor Event Listener exception: {}'.format(e))
86         except url.URLError as e:
87             logging.error(
88                 'Vendor Event Listener is is not reachable: {}'.format(e))
89         except Exception as e:
90             logging.error('Vendor Event Listener error: {}'.format(e))
91
92     def config(self, config):
93         """VES option configuration"""
94         for key, value in config.items('config'):
95             if key in self._app_config:
96                 try:
97                     if type(self._app_config[key]) == int:
98                         value = int(value)
99                     elif type(self._app_config[key]) == float:
100                         value = float(value)
101                     elif type(self._app_config[key]) == bool:
102                         value = bool(strtobool(value))
103
104                     if isinstance(value, type(self._app_config[key])):
105                         self._app_config[key] = value
106                     else:
107                         logging.error("Type mismatch with %s" % key)
108                         sys.exit()
109                 except ValueError:
110                     logging.error("Incorrect value type for %s" % key)
111                     sys.exit()
112             else:
113                 logging.error("Incorrect key configuration %s" % key)
114                 sys.exit()
115
116     def init(self, configfile, schema_file):
117         if configfile is not None:
118             # read VES configuration file if provided
119             config = configparser.ConfigParser()
120             config.optionxform = lambda option: option
121             config.read(configfile)
122             self.config(config)
123         # initialize normalizer
124         self.initialize(schema_file, self._app_config['SendEventInterval'])
125
126     def run(self):
127         """Consumer JSON data from kafka broker"""
128         kafka_server = '{}:{}'.format(
129             self._app_config.get('KafkaBroker'),
130             self._app_config.get('KafkaPort'))
131         consumer = KafkaConsumer(
132             'collectd', bootstrap_servers=kafka_server,
133             auto_offset_reset='latest', enable_auto_commit=False,
134             value_deserializer=lambda m: json.loads(m.decode('ascii')))
135
136         for message in consumer:
137             for kafka_data in message.value:
138                 # {
139                 #   u'dstypes': [u'derive'],
140                 #   u'plugin': u'cpu',
141                 #   u'dsnames': [u'value'],
142                 #   u'interval': 10.0,
143                 #   u'host': u'localhost',
144                 #   u'values': [99.9978996416267],
145                 #   u'time': 1502114956.244,
146                 #   u'plugin_instance': u'44',
147                 #   u'type_instance': u'idle',
148                 #   u'type': u'cpu'
149                 # }
150                 logging.debug('{}:run():data={}'.format(
151                     self.__class__.__name__, kafka_data))
152                 for ds_name in kafka_data['dsnames']:
153                     index = kafka_data['dsnames'].index(ds_name)
154                     val_hash = CollectdValue.hash_gen(
155                         kafka_data['host'], kafka_data['plugin'],
156                         kafka_data['plugin_instance'], kafka_data['type'],
157                         kafka_data['type_instance'], ds_name)
158                     collector = self.get_collector()
159                     val = collector.get(val_hash)
160                     if val:
161                         # update the value
162                         val.value = kafka_data['values'][index]
163                         val.time = kafka_data['time']
164                         del(val)
165                     else:
166                         # add new value into the collector
167                         val = CollectdValue()
168                         val.host = kafka_data['host']
169                         val.plugin = kafka_data['plugin']
170                         val.plugin_instance = kafka_data['plugin_instance']
171                         val.type = kafka_data['type']
172                         val.type_instance = kafka_data['type_instance']
173                         val.value = kafka_data['values'][index]
174                         val.interval = kafka_data['interval']
175                         val.time = kafka_data['time']
176                         val.ds_name = ds_name
177                         collector.add(val)
178
179
180 def main():
181     # Parsing cmdline options
182     parser = argparse.ArgumentParser()
183     parser.add_argument("--events-schema", dest="schema", required=True,
184                         help="YAML events schema definition", metavar="FILE")
185     parser.add_argument("--config", dest="configfile", default=None,
186                         help="Specify config file", metavar="FILE")
187     parser.add_argument("--loglevel", dest="level", default='INFO',
188                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
189                         help="Specify log level (default: %(default)s)",
190                         metavar="LEVEL")
191     parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
192                         help="Specify log file (default: %(default)s)",
193                         metavar="FILE")
194     args = parser.parse_args()
195
196     # Create log file
197     logging.basicConfig(filename=args.logfile,
198                         format='%(asctime)s %(message)s',
199                         level=args.level)
200     if args.configfile is None:
201         logging.warning("No configfile specified, using default options")
202
203     # Create Application Instance
204     application_instance = VESApp()
205     application_instance.init(args.configfile, args.schema)
206
207     try:
208         # Run the plugin
209         application_instance.run()
210     except KeyboardInterrupt:
211         logging.info(" - Ctrl-C handled, exiting gracefully")
212     except Exception as e:
213         logging.error('{}, {}'.format(type(e), e))
214     finally:
215         application_instance.destroy()
216         sys.exit()
217
218
219 if __name__ == '__main__':
220     main()