3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
7 # http://www.apache.org/licenses/LICENSE-2.0
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
22 from distutils.util import strtobool
23 from kafka import KafkaConsumer
25 from .normalizer import Normalizer
26 from .normalizer import CollectdValue
28 import urllib.request as url
30 class VESApp(Normalizer):
34 """Application initialization"""
36 'Domain': '127.0.0.1',
43 'SendEventInterval': 20.0,
46 'KafkaBroker': 'localhost'
49 def send_data(self, event):
50 """Send event to VES"""
51 server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
52 's' if self._app_config['UseHttps'] else '',
53 self._app_config['Domain'], int(self._app_config['Port']),
54 '{}'.format('/{}'.format(self._app_config['Path']) if len(
55 self._app_config['Path']) > 0 else ''),
56 int(self._app_config['ApiVersion']), '{}'.format(
57 '/{}'.format(self._app_config['Topic']) if len(
58 self._app_config['Topic']) > 0 else ''))
59 logging.info('Vendor Event Listener is at: {}'.format(server_url))
60 credentials = base64.b64encode('{}:{}'.format(
61 self._app_config['Username'],
62 self._app_config['Password']).encode()).decode()
63 logging.info('Authentication credentials are: {}'.format(credentials))
65 request = url.Request(server_url)
66 request.add_header('Authorization', 'Basic {}'.format(credentials))
67 request.add_header('Content-Type', 'application/json')
68 event_str = json.dumps(event).encode()
69 logging.debug("Sending {} to {}".format(event_str, server_url))
70 url.urlopen(request, event_str, timeout=1)
71 logging.debug("Sent data to {} successfully".format(server_url))
72 except url.HTTPError as e:
73 logging.error('Vendor Event Listener exception: {}'.format(e))
74 except url.URLError as e:
76 'Vendor Event Listener is is not reachable: {}'.format(e))
77 except Exception as e:
78 logging.error('Vendor Event Listener error: {}'.format(e))
80 def config(self, config):
81 """VES option configuration"""
82 for key, value in config.items('config'):
83 if key in self._app_config:
85 if type(self._app_config[key]) == int:
87 elif type(self._app_config[key]) == float:
89 elif type(self._app_config[key]) == bool:
90 value = bool(strtobool(value))
92 if isinstance(value, type(self._app_config[key])):
93 self._app_config[key] = value
95 logging.error("Type mismatch with %s" % key)
98 logging.error("Incorrect value type for %s" % key)
101 logging.error("Incorrect key configuration %s" % key)
104 def init(self, configfile, schema_file):
105 if configfile is not None:
106 # read VES configuration file if provided
107 config = configparser.ConfigParser()
108 config.optionxform = lambda option: option
109 config.read(configfile)
111 # initialize normalizer
112 self.initialize(schema_file, self._app_config['SendEventInterval'])
115 """Consumer JSON data from kafka broker"""
116 kafka_server = '{}:{}'.format(
117 self._app_config.get('KafkaBroker'),
118 self._app_config.get('KafkaPort'))
119 consumer = KafkaConsumer(
120 'collectd', bootstrap_servers=kafka_server,
121 auto_offset_reset='latest', enable_auto_commit=False,
122 value_deserializer=lambda m: json.loads(m.decode('ascii')))
124 for message in consumer:
125 for kafka_data in message.value:
127 # u'dstypes': [u'derive'],
129 # u'dsnames': [u'value'],
131 # u'host': u'localhost',
132 # u'values': [99.9978996416267],
133 # u'time': 1502114956.244,
134 # u'plugin_instance': u'44',
135 # u'type_instance': u'idle',
138 logging.debug('{}:run():data={}'.format(
139 self.__class__.__name__, kafka_data))
140 for ds_name in kafka_data['dsnames']:
141 index = kafka_data['dsnames'].index(ds_name)
142 val_hash = CollectdValue.hash_gen(
143 kafka_data['host'], kafka_data['plugin'],
144 kafka_data['plugin_instance'], kafka_data['type'],
145 kafka_data['type_instance'], ds_name)
146 collector = self.get_collector()
147 val = collector.get(val_hash)
150 val.value = kafka_data['values'][index]
151 val.time = kafka_data['time']
154 # add new value into the collector
155 val = CollectdValue()
156 val.host = kafka_data['host']
157 val.plugin = kafka_data['plugin']
158 val.plugin_instance = kafka_data['plugin_instance']
159 val.type = kafka_data['type']
160 val.type_instance = kafka_data['type_instance']
161 val.value = kafka_data['values'][index]
162 val.interval = kafka_data['interval']
163 val.time = kafka_data['time']
164 val.ds_name = ds_name
169 # Parsing cmdline options
170 parser = argparse.ArgumentParser()
171 parser.add_argument("--events-schema", dest="schema", required=True,
172 help="YAML events schema definition", metavar="FILE")
173 parser.add_argument("--config", dest="configfile", default=None,
174 help="Specify config file", metavar="FILE")
175 parser.add_argument("--loglevel", dest="level", default='INFO',
176 choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
177 help="Specify log level (default: %(default)s)",
179 parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
180 help="Specify log file (default: %(default)s)",
182 args = parser.parse_args()
185 logging.basicConfig(filename=args.logfile,
186 format='%(asctime)s %(message)s',
188 if args.configfile is None:
189 logging.warning("No configfile specified, using default options")
191 # Create Application Instance
192 application_instance = VESApp()
193 application_instance.init(args.configfile, args.schema)
197 application_instance.run()
198 except KeyboardInterrupt:
199 logging.info(" - Ctrl-C handled, exiting gracefully")
200 except Exception as e:
201 logging.error('{}, {}'.format(type(e), e))
203 application_instance.destroy()
207 if __name__ == '__main__':