3 # Copyright(c) 2017-2019 Intel Corporation and OPNFV. All rights reserved.
5 # Licensed under the Apache License, Version 2.0 (the "License");
6 # you may not use this file except in compliance with the License.
7 # You may obtain a copy of the License at
9 # http://www.apache.org/licenses/LICENSE-2.0
11 # Unless required by applicable law or agreed to in writing, software
12 # distributed under the License is distributed on an "AS IS" BASIS,
13 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 # See the License for the specific language governing permissions and
15 # limitations under the License.
27 import ConfigParser as configparser
29 from distutils.util import strtobool
30 from kafka import KafkaConsumer
32 from normalizer import Normalizer
33 from normalizer import CollectdValue
36 # For Python 3.0 and later
37 import urllib.request as url
39 # Fall back to Python 2's urllib2
42 class VESApp(Normalizer):
46 """Application initialization"""
48 'Domain': '127.0.0.1',
55 'SendEventInterval': 20.0,
58 'KafkaBroker': 'localhost'
61 def send_data(self, event):
62 """Send event to VES"""
63 server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
64 's' if self._app_config['UseHttps'] else '',
65 self._app_config['Domain'], int(self._app_config['Port']),
66 '{}'.format('/{}'.format(self._app_config['Path']) if len(
67 self._app_config['Path']) > 0 else ''),
68 int(self._app_config['ApiVersion']), '{}'.format(
69 '/{}'.format(self._app_config['Topic']) if len(
70 self._app_config['Topic']) > 0 else ''))
71 logging.info('Vendor Event Listener is at: {}'.format(server_url))
72 credentials = base64.b64encode('{}:{}'.format(
73 self._app_config['Username'],
74 self._app_config['Password']).encode()).decode()
75 logging.info('Authentication credentials are: {}'.format(credentials))
77 request = url.Request(server_url)
78 request.add_header('Authorization', 'Basic {}'.format(credentials))
79 request.add_header('Content-Type', 'application/json')
80 event_str = json.dumps(event).encode()
81 logging.debug("Sending {} to {}".format(event_str, server_url))
82 url.urlopen(request, event_str, timeout=1)
83 logging.debug("Sent data to {} successfully".format(server_url))
84 except url.HTTPError as e:
85 logging.error('Vendor Event Listener exception: {}'.format(e))
86 except url.URLError as e:
88 'Vendor Event Listener is is not reachable: {}'.format(e))
89 except Exception as e:
90 logging.error('Vendor Event Listener error: {}'.format(e))
92 def config(self, config):
93 """VES option configuration"""
94 for key, value in config.items('config'):
95 if key in self._app_config:
97 if type(self._app_config[key]) == int:
99 elif type(self._app_config[key]) == float:
101 elif type(self._app_config[key]) == bool:
102 value = bool(strtobool(value))
104 if isinstance(value, type(self._app_config[key])):
105 self._app_config[key] = value
107 logging.error("Type mismatch with %s" % key)
110 logging.error("Incorrect value type for %s" % key)
113 logging.error("Incorrect key configuration %s" % key)
116 def init(self, configfile, schema_file):
117 if configfile is not None:
118 # read VES configuration file if provided
119 config = configparser.ConfigParser()
120 config.optionxform = lambda option: option
121 config.read(configfile)
123 # initialize normalizer
124 self.initialize(schema_file, self._app_config['SendEventInterval'])
127 """Consumer JSON data from kafka broker"""
128 kafka_server = '{}:{}'.format(
129 self._app_config.get('KafkaBroker'),
130 self._app_config.get('KafkaPort'))
131 consumer = KafkaConsumer(
132 'collectd', bootstrap_servers=kafka_server,
133 auto_offset_reset='latest', enable_auto_commit=False,
134 value_deserializer=lambda m: json.loads(m.decode('ascii')))
136 for message in consumer:
137 for kafka_data in message.value:
139 # u'dstypes': [u'derive'],
141 # u'dsnames': [u'value'],
143 # u'host': u'localhost',
144 # u'values': [99.9978996416267],
145 # u'time': 1502114956.244,
146 # u'plugin_instance': u'44',
147 # u'type_instance': u'idle',
150 logging.debug('{}:run():data={}'.format(
151 self.__class__.__name__, kafka_data))
152 for ds_name in kafka_data['dsnames']:
153 index = kafka_data['dsnames'].index(ds_name)
154 val_hash = CollectdValue.hash_gen(
155 kafka_data['host'], kafka_data['plugin'],
156 kafka_data['plugin_instance'], kafka_data['type'],
157 kafka_data['type_instance'], ds_name)
158 collector = self.get_collector()
159 val = collector.get(val_hash)
162 val.value = kafka_data['values'][index]
163 val.time = kafka_data['time']
166 # add new value into the collector
167 val = CollectdValue()
168 val.host = kafka_data['host']
169 val.plugin = kafka_data['plugin']
170 val.plugin_instance = kafka_data['plugin_instance']
171 val.type = kafka_data['type']
172 val.type_instance = kafka_data['type_instance']
173 val.value = kafka_data['values'][index]
174 val.interval = kafka_data['interval']
175 val.time = kafka_data['time']
176 val.ds_name = ds_name
181 # Parsing cmdline options
182 parser = argparse.ArgumentParser()
183 parser.add_argument("--events-schema", dest="schema", required=True,
184 help="YAML events schema definition", metavar="FILE")
185 parser.add_argument("--config", dest="configfile", default=None,
186 help="Specify config file", metavar="FILE")
187 parser.add_argument("--loglevel", dest="level", default='INFO',
188 choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
189 help="Specify log level (default: %(default)s)",
191 parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
192 help="Specify log file (default: %(default)s)",
194 args = parser.parse_args()
197 logging.basicConfig(filename=args.logfile,
198 format='%(asctime)s %(message)s',
200 if args.configfile is None:
201 logging.warning("No configfile specified, using default options")
203 # Create Application Instance
204 application_instance = VESApp()
205 application_instance.init(args.configfile, args.schema)
209 application_instance.run()
210 except KeyboardInterrupt:
211 logging.info(" - Ctrl-C handled, exiting gracefully")
212 except Exception as e:
213 logging.error('{}, {}'.format(type(e), e))
215 application_instance.destroy()
219 if __name__ == '__main__':