Updates/Fix on Legal/License Headers
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / ves_app.py
1 #!/usr/bin/env python
2 #
3 # Copyright(c) 2017-2019 Intel Corporation and OPNFV. All rights reserved.
4 #
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
8 #
9 # http://www.apache.org/licenses/LICENSE-2.0
10 #
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
16 #
17
18 import json
19 import sys
20 import base64
21 import ConfigParser
22 import logging
23 import argparse
24
25 from distutils.util import strtobool
26 from kafka import KafkaConsumer
27
28 from normalizer import Normalizer
29 from normalizer import CollectdValue
30
31 try:
32     # For Python 3.0 and later
33     import urllib.request as url
34 except ImportError:
35     # Fall back to Python 2's urllib2
36     import urllib2 as url
37
38
39 class VESApp(Normalizer):
40     """VES Application"""
41
42     def __init__(self):
43         """Application initialization"""
44         self._app_config = {
45             'Domain': '127.0.0.1',
46             'Port': 30000,
47             'Path': '',
48             'Username': '',
49             'Password': '',
50             'Topic': '',
51             'UseHttps': False,
52             'SendEventInterval': 20.0,
53             'ApiVersion': 5.1,
54             'KafkaPort': 9092,
55             'KafkaBroker': 'localhost'
56         }
57
58     def send_data(self, event):
59         """Send event to VES"""
60         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
61             's' if self._app_config['UseHttps'] else '',
62             self._app_config['Domain'], int(self._app_config['Port']),
63             '{}'.format('/{}'.format(self._app_config['Path']) if len(
64                 self._app_config['Path']) > 0 else ''),
65             int(self._app_config['ApiVersion']), '{}'.format(
66                 '/{}'.format(self._app_config['Topic']) if len(
67                     self._app_config['Topic']) > 0 else ''))
68         logging.info('Vendor Event Listener is at: {}'.format(server_url))
69         credentials = base64.b64encode('{}:{}'.format(
70             self._app_config['Username'],
71             self._app_config['Password']).encode()).decode()
72         logging.info('Authentication credentials are: {}'.format(credentials))
73         try:
74             request = url.Request(server_url)
75             request.add_header('Authorization', 'Basic {}'.format(credentials))
76             request.add_header('Content-Type', 'application/json')
77             event_str = json.dumps(event).encode()
78             logging.debug("Sending {} to {}".format(event_str, server_url))
79             url.urlopen(request, event_str, timeout=1)
80             logging.debug("Sent data to {} successfully".format(server_url))
81         except url.HTTPError as e:
82             logging.error('Vendor Event Listener exception: {}'.format(e))
83         except url.URLError as e:
84             logging.error(
85                 'Vendor Event Listener is is not reachable: {}'.format(e))
86         except Exception as e:
87             logging.error('Vendor Event Listener error: {}'.format(e))
88
89     def config(self, config):
90         """VES option configuration"""
91         for key, value in config.items('config'):
92             if key in self._app_config:
93                 try:
94                     if type(self._app_config[key]) == int:
95                         value = int(value)
96                     elif type(self._app_config[key]) == float:
97                         value = float(value)
98                     elif type(self._app_config[key]) == bool:
99                         value = bool(strtobool(value))
100
101                     if isinstance(value, type(self._app_config[key])):
102                         self._app_config[key] = value
103                     else:
104                         logging.error("Type mismatch with %s" % key)
105                         sys.exit()
106                 except ValueError:
107                     logging.error("Incorrect value type for %s" % key)
108                     sys.exit()
109             else:
110                 logging.error("Incorrect key configuration %s" % key)
111                 sys.exit()
112
113     def init(self, configfile, schema_file):
114         if configfile is not None:
115             # read VES configuration file if provided
116             config = ConfigParser.ConfigParser()
117             config.optionxform = lambda option: option
118             config.read(configfile)
119             self.config(config)
120         # initialize normalizer
121         self.initialize(schema_file, self._app_config['SendEventInterval'])
122
123     def run(self):
124         """Consumer JSON data from kafka broker"""
125         kafka_server = '{}:{}'.format(
126             self._app_config.get('KafkaBroker'),
127             self._app_config.get('KafkaPort'))
128         consumer = KafkaConsumer(
129             'collectd', bootstrap_servers=kafka_server,
130             auto_offset_reset='latest', enable_auto_commit=False,
131             value_deserializer=lambda m: json.loads(m.decode('ascii')))
132
133         for message in consumer:
134             for kafka_data in message.value:
135                 # {
136                 #   u'dstypes': [u'derive'],
137                 #   u'plugin': u'cpu',
138                 #   u'dsnames': [u'value'],
139                 #   u'interval': 10.0,
140                 #   u'host': u'localhost',
141                 #   u'values': [99.9978996416267],
142                 #   u'time': 1502114956.244,
143                 #   u'plugin_instance': u'44',
144                 #   u'type_instance': u'idle',
145                 #   u'type': u'cpu'
146                 # }
147                 logging.debug('{}:run():data={}'.format(
148                     self.__class__.__name__, kafka_data))
149                 for ds_name in kafka_data['dsnames']:
150                     index = kafka_data['dsnames'].index(ds_name)
151                     val_hash = CollectdValue.hash_gen(
152                         kafka_data['host'], kafka_data['plugin'],
153                         kafka_data['plugin_instance'], kafka_data['type'],
154                         kafka_data['type_instance'], ds_name)
155                     collector = self.get_collector()
156                     val = collector.get(val_hash)
157                     if val:
158                         # update the value
159                         val.value = kafka_data['values'][index]
160                         val.time = kafka_data['time']
161                         del(val)
162                     else:
163                         # add new value into the collector
164                         val = CollectdValue()
165                         val.host = kafka_data['host']
166                         val.plugin = kafka_data['plugin']
167                         val.plugin_instance = kafka_data['plugin_instance']
168                         val.type = kafka_data['type']
169                         val.type_instance = kafka_data['type_instance']
170                         val.value = kafka_data['values'][index]
171                         val.interval = kafka_data['interval']
172                         val.time = kafka_data['time']
173                         val.ds_name = ds_name
174                         collector.add(val)
175
176
177 def main():
178     # Parsing cmdline options
179     parser = argparse.ArgumentParser()
180     parser.add_argument("--events-schema", dest="schema", required=True,
181                         help="YAML events schema definition", metavar="FILE")
182     parser.add_argument("--config", dest="configfile", default=None,
183                         help="Specify config file", metavar="FILE")
184     parser.add_argument("--loglevel", dest="level", default='INFO',
185                         choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
186                         help="Specify log level (default: %(default)s)",
187                         metavar="LEVEL")
188     parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
189                         help="Specify log file (default: %(default)s)",
190                         metavar="FILE")
191     args = parser.parse_args()
192
193     # Create log file
194     logging.basicConfig(filename=args.logfile,
195                         format='%(asctime)s %(message)s',
196                         level=args.level)
197     if args.configfile is None:
198         logging.warning("No configfile specified, using default options")
199
200     # Create Application Instance
201     application_instance = VESApp()
202     application_instance.init(args.configfile, args.schema)
203
204     try:
205         # Run the plugin
206         application_instance.run()
207     except KeyboardInterrupt:
208         logging.info(" - Ctrl-C handled, exiting gracefully")
209     except Exception as e:
210         logging.error('{}, {}'.format(type(e), e))
211     finally:
212         application_instance.destroy()
213         sys.exit()
214
215
216 if __name__ == '__main__':
217     main()