Python 2 to 3 migration fixes
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / normalizer.py
1 #
2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Authors:
17 #   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
18 #
19
20 import yaml
21 import logging
22 import datetime
23 import time
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
27 import re
28
29 # import YAML loader
30 try:
31     from yaml import CLoader as Loader
32 except ImportError:
33     from yaml import Loader
34
35 # import synchronized queue
36 try:
37     import queue
38 except ImportError:
39     import Queue as queue
40
41
42 class Config(object):
43     """Configuration class used to pass config option into YAML file"""
44
45     def __init__(self, interval):
46         self.interval = interval
47
48
49 class System(object):
50     """System class which provides information like host, time etc., into YAML
51     file"""
52
53     def __init__(self):
54         self.hostname = 'localhost'
55         self._id = 0
56
57     @property
58     def id(self):
59         self._id = self._id + 1
60         return self._id
61
62     @property
63     def time(self):
64         return time.time()
65
66     @property
67     def date(self):
68         return datetime.date.today().isoformat()
69
70
71 class ItemIterator(object):
72     """Item iterator returned by Collector class"""
73
74     def __init__(self, collector, items):
75         """Item iterator init"""
76         logging.debug('{}:__init__()'.format(self.__class__.__name__))
77         self._items = items
78         self._collector = collector
79         self._index = 0
80
81     def __next__(self):
82         """Returns next item from the list"""
83         if self._index == len(self._items):
84             raise StopIteration
85         curr_index = self._index
86         self._index = curr_index + 1
87         return self.items[curr_index]
88
89     def __getitem__(self, key):
90         """get item by index"""
91         return self._items[key]
92
93     def __len__(self):
94         """Return length of elements"""
95         return len(self._items)
96
97     def __del__(self):
98         """Destroy iterator and unlock the collector"""
99         logging.debug('{}:__del__()'.format(self.__class__.__name__))
100         self._collector.unlock()
101
102
103 class ItemObject(object):
104     """Item object returned by Collector class"""
105
106     def __init__(self, collector, hash_):
107         """Item object init"""
108         logging.debug('{}:__init__()'.format(self.__class__.__name__))
109         super(ItemObject, self).__setattr__('_collector', collector)
110         super(ItemObject, self).__setattr__('_hash', hash_)
111
112     def __setattr__(self, name, value):
113         t, item = self._collector._metrics[self._hash]
114         logging.debug('{}:__setattr__(name={}, value={})'.format(
115                       self.__class__.__name__, name, value))
116         setattr(item, name, value)
117         self._collector._metrics[self._hash] = (time.time(), item)
118
119     def __del__(self):
120         """Destroy item object and unlock the collector"""
121         logging.debug('{}:__del__()'.format(self.__class__.__name__))
122         self._collector.unlock()
123
124
125 class Collector(object):
126     """Thread-safe collector with aging feature"""
127
128     def __init__(self, age_timeout):
129         """Initialization"""
130         self._metrics = {}
131         self._lock = RLock()
132         self._age_timeout = age_timeout
133         self._start_age_timer()
134
135     def _start_age_timer(self):
136         """Start age timer"""
137         self._age_timer = Timer(self._age_timeout, self._on_timer)
138         self._age_timer.start()
139
140     def _stop_age_timer(self):
141         """Stop age timer"""
142         self._age_timer.cancel()
143
144     def _on_timer(self):
145         """Age timer"""
146         self._start_age_timer()
147         self._check_aging()
148
149     def _check_aging(self):
150         """Check aging time for all items"""
151         self.lock()
152         for data_hash, data in list(self._metrics.items()):
153             age, item = data
154             if ((time.time() - age) >= self._age_timeout):
155                 # aging time has expired, remove the item from the collector
156                 logging.debug('{}:_check_aging():value={}'.format(
157                               self.__class__.__name__, item))
158                 self._metrics.pop(data_hash)
159                 del(item)
160         self.unlock()
161
162     def lock(self):
163         """Lock the collector"""
164         logging.debug('{}:lock()'.format(self.__class__.__name__))
165         self._lock.acquire()
166
167     def unlock(self):
168         """Unlock the collector"""
169         logging.debug('{}:unlock()'.format(self.__class__.__name__))
170         self._lock.release()
171
172     def get(self, hash_):
173         self.lock()
174         if hash_ in self._metrics:
175             return ItemObject(self, hash_)
176         self.unlock()
177         return None
178
179     def add(self, item):
180         """Add an item into the collector"""
181         self.lock()
182         logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
183         self._metrics[hash(item)] = (time.time(), item)
184         self.unlock()
185
186     def items(self, select_list=[]):
187         """Returns locked (safe) item iterator"""
188         metrics = []
189         self.lock()
190         for k, item in list(self._metrics.items()):
191             _, value = item
192             for select in select_list:
193                 if value.match(**select):
194                     metrics.append(value)
195         return ItemIterator(self, metrics)
196
197     def destroy(self):
198         """Destroy the collector"""
199         self._stop_age_timer()
200
201
202 class CollectdData(object):
203     """Base class for Collectd data"""
204
205     def __init__(self, host=None, plugin=None, plugin_instance=None,
206                  type_=None, type_instance=None, time_=None):
207         """Class initialization"""
208         self.host = host
209         self.plugin = plugin
210         self.plugin_instance = plugin_instance
211         self.type_instance = type_instance
212         self.type = type_
213         self.time = time_
214
215     @classmethod
216     def is_regular_expression(cls, expr):
217         return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
218
219     def match(self, **kargs):
220         # compare the metric
221         for key, value in list(kargs.items()):
222             if self.is_regular_expression(value):
223                 if re.match(value[1:-1], getattr(self, key)) is None:
224                     return False
225             elif value != getattr(self, key):
226                 return False
227         # return match event if kargs is empty
228         return True
229
230
231 class CollectdNotification(CollectdData):
232     """Collectd notification"""
233
234     def __init__(self, host=None, plugin=None, plugin_instance=None,
235                  type_=None, type_instance=None, severity=None, message=None):
236         super(CollectdNotification, self).__init__(
237             host, plugin, plugin_instance, type_, type_instance)
238         self.severity = severity
239         self.message = message
240
241     def __repr__(self):
242         return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
243                'type_instance={}, severity={}, message={}, time={})'.format(
244                    self.__class__.__name__, self.host, self.plugin,
245                    self.plugin_instance, self.type, self.type_instance,
246                    self.severity, self.message, time)
247
248
249 class CollectdValue(CollectdData):
250     """Collectd value"""
251
252     def __init__(self, host=None, plugin=None, plugin_instance=None,
253                  type_=None, type_instance=None, ds_name='value', value=None,
254                  interval=None):
255         super(CollectdValue, self).__init__(
256             host, plugin, plugin_instance, type_, type_instance)
257         self.value = value
258         self.ds_name = ds_name
259         self.interval = interval
260
261     @classmethod
262     def hash_gen(cls, host, plugin, plugin_instance, type_,
263                  type_instance, ds_name):
264         return hash((host, plugin, plugin_instance, type_,
265                     type_instance, ds_name))
266
267     def __eq__(self, other):
268         return hash(self) == hash(other) and self.value == other.value
269
270     def __hash__(self):
271         return self.hash_gen(self.host, self.plugin, self.plugin_instance,
272                              self.type, self.type_instance, self.ds_name)
273
274     def __repr__(self):
275         return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
276                'type_instance={}, ds_name={}, value={}, time={})'.format(
277                    self.__class__.__name__, self.host, self.plugin,
278                    self.plugin_instance, self.type, self.type_instance,
279                    self.ds_name, self.value, self.time)
280
281
282 class Item(yaml.YAMLObject):
283     """Base class to process tags like ArrayItem/ValueItem"""
284
285     @classmethod
286     def format_node(cls, mapping, metric):
287         if mapping.tag in [
288                 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
289                 Number.yaml_tag, StripExtraDash.yaml_tag]:
290             return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
291         elif mapping.tag == 'tag:yaml.org,2002:map':
292             values = []
293             for key, value in mapping.value:
294                 values.append((yaml.ScalarNode(key.tag, key.value),
295                               cls.format_node(value, metric)))
296             return yaml.MappingNode(mapping.tag, values)
297         elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
298             values = []
299             for seq in mapping.value:
300                 map_values = list()
301                 for key, value in seq.value:
302                     if key.value == 'SELECT':
303                         map_values.append((yaml.ScalarNode(key.tag, key.value),
304                                           cls.format_node(value, metric)))
305                     else:
306                         map_values.append((yaml.ScalarNode(key.tag, key.value),
307                                           value))
308                 values.append(yaml.MappingNode(seq.tag, map_values))
309             return yaml.SequenceNode(mapping.tag, values)
310         elif mapping.tag in [MapValue.yaml_tag]:
311             values = []
312             for key, value in mapping.value:
313                 if key.value == 'VALUE':
314                     values.append((yaml.ScalarNode(key.tag, key.value),
315                                   cls.format_node(value, metric)))
316                 else:
317                     values.append((yaml.ScalarNode(key.tag, key.value), value))
318             return yaml.MappingNode(mapping.tag, values)
319         return mapping
320
321
322 class ValueItem(Item):
323     """Class to process VlaueItem tag"""
324     yaml_tag = '!ValueItem'
325
326     @classmethod
327     def from_yaml(cls, loader, node):
328         logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
329         default, select, value_desc = None, list(), None
330         # find value description
331         for elem in node.value:
332             for key, value in elem.value:
333                 if key.value == 'VALUE':
334                     assert value_desc is None, "VALUE key already set"
335                     value_desc = value
336                 if key.value == 'SELECT':
337                     select.append(loader.construct_mapping(value))
338                 if key.value == 'DEFAULT':
339                     assert default is None, "DEFAULT key already set"
340                     default = loader.construct_object(value)
341         # if VALUE key isn't given, use default VALUE key
342         # format: `VALUE: !Number '{vl.value}'`
343         if value_desc is None:
344             value_desc = yaml.ScalarNode(tag='!Number', value='{vl.value}')
345         # select collectd metric based on SELECT condition
346         metrics = loader.collector.items(select)
347         assert len(metrics) < 2, \
348             'Wrong SELECT condition {}, selected {} metrics'.format(
349             select, len(metrics))
350         if len(metrics) > 0:
351             item = cls.format_node(value_desc, {'vl': metrics[0],
352                                    'system': loader.system})
353             return loader.construct_object(item)
354         # nothing has been found by SELECT condition, set to DEFAULT value.
355         assert default is not None, "No metrics selected by SELECT condition" \
356             " {} and DEFAULT key isn't set".format(select)
357         return default
358
359
360 class ArrayItem(Item):
361     """Class to process ArrayItem tag"""
362     yaml_tag = '!ArrayItem'
363
364     @classmethod
365     def from_yaml(cls, loader, node):
366         logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
367                       loader, node))
368         # e.g.:
369         # SequenceNode(tag=u'!ArrayItem', value=[
370         #   MappingNode(tag=u'tag:yaml.org,2002:map', value=[
371         #     (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
372         #       MappingNode(tag=u'tag:yaml.org,2002:map', value=[
373         #         (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
374         #           , ...)
375         #       ]), ...
376         #     ), (key, value), ... ])
377         #   , ... ])
378         assert isinstance(node, yaml.SequenceNode), \
379             "{} tag isn't YAML array".format(cls.__name__)
380         select, index_keys, items, item_desc = list(), list(), list(), None
381         for elem in node.value:
382             for key, value in elem.value:
383                 if key.value == 'ITEM-DESC':
384                     assert item_desc is None, "ITEM-DESC key already set"
385                     item_desc = value
386                 if key.value == 'INDEX-KEY':
387                     assert len(index_keys) == 0, "INDEX-KEY key already set"
388                     index_keys = loader.construct_sequence(value)
389                 if key.value == 'SELECT':
390                     select.append(loader.construct_mapping(value))
391         # validate item description
392         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
393         assert len(select) > 0 or len(index_keys) > 0, \
394             "Mandatory key (INDEX-KEY or SELECT) isn't set"
395         metrics = loader.collector.items(select)
396         # select metrics based on INDEX-KEY provided
397         if len(index_keys) > 0:
398             metric_set = set()
399             for metric in metrics:
400                 value = CollectdValue()
401                 for key in index_keys:
402                     setattr(value, key, getattr(metric, key))
403                 metric_set.add(value)
404             metrics = list(metric_set)
405         # build items based on SELECT and/or INDEX-KEY criteria
406         for metric in metrics:
407             item = cls.format_node(item_desc,
408                                    {'vl': metric, 'system': loader.system,
409                                        'config': loader.config})
410             items.append(loader.construct_mapping(item))
411         return items
412
413
414 class Measurements(ArrayItem):
415     """Class to process Measurements tag"""
416     yaml_tag = '!Measurements'
417
418
419 class Events(Item):
420     """Class to process Events tag"""
421     yaml_tag = '!Events'
422
423     @classmethod
424     def from_yaml(cls, loader, node):
425         condition, item_desc = dict(), None
426         for elem in node.value:
427             for key, value in elem.value:
428                 if key.value == 'ITEM-DESC':
429                     item_desc = value
430                 if key.value == 'CONDITION':
431                     condition = loader.construct_mapping(value)
432         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
433         if loader.notification.match(**condition):
434             item = cls.format_node(item_desc, {
435                 'n': loader.notification, 'system': loader.system})
436             return loader.construct_mapping(item)
437         return None
438
439
440 class Bytes2Kibibytes(yaml.YAMLObject):
441     """Class to process Bytes2Kibibytes tag"""
442     yaml_tag = '!Bytes2Kibibytes'
443
444     @classmethod
445     def from_yaml(cls, loader, node):
446         return round(float(node.value) / 1024.0, 3)
447
448
449 class Number(yaml.YAMLObject):
450     """Class to process Number tag"""
451     yaml_tag = '!Number'
452
453     @classmethod
454     def from_yaml(cls, loader, node):
455         try:
456             return int(node.value)
457         except ValueError:
458             return float(node.value)
459
460
461 class StripExtraDash(yaml.YAMLObject):
462     """Class to process StripExtraDash tag"""
463     yaml_tag = '!StripExtraDash'
464
465     @classmethod
466     def from_yaml(cls, loader, node):
467         return '-'.join([x for x in node.value.split('-') if len(x) > 0])
468
469
470 class MapValue(yaml.YAMLObject):
471     """Class to process MapValue tag"""
472     yaml_tag = '!MapValue'
473
474     @classmethod
475     def from_yaml(cls, loader, node):
476         mapping, val = None, None
477         for key, value in node.value:
478             if key.value == 'TO':
479                 mapping = loader.construct_mapping(value)
480             if key.value == 'VALUE':
481                 val = loader.construct_object(value)
482         assert mapping is not None, "Mandatory TO key isn't set"
483         assert val is not None, "Mandatory VALUE key isn't set"
484         assert val in mapping, \
485             'Value "{}" cannot be mapped to any of {} values'.format(
486                 val, list(mapping.keys()))
487         return mapping[val]
488
489
490 class Normalizer(object):
491     """Normalization class which handles events and measurements"""
492
493     def __init__(self):
494         """Init"""
495         self.interval = None
496         self.collector = None
497         self.system = None
498         self.queue = None
499         self.timer = None
500
501     @classmethod
502     def read_configuration(cls, config_file):
503         """read YAML configuration file"""
504         # load YAML events/measurements definition
505         f = open(config_file, 'r')
506         doc_yaml = yaml.compose(f)
507         f.close()
508         # split events & measurements definitions
509         measurements, events = list(), list()
510         for key, value in doc_yaml.value:
511             if value.tag == Measurements.yaml_tag:
512                 measurements.append((key, value))
513             if value.tag == Events.yaml_tag:
514                 events.append((key, value))
515         measurements_yaml = yaml.MappingNode('tag:yaml.org,2002:map',
516                                              measurements)
517         measurements_stream = yaml.serialize(measurements_yaml)
518         events_yaml = yaml.MappingNode('tag:yaml.org,2002:map', events)
519         events_stream = yaml.serialize(events_yaml)
520         # return event & measurements definition
521         return events_stream, measurements_stream
522
523     def initialize(self, config_file, interval):
524         """Initialize the class"""
525         e, m = self.read_configuration(config_file)
526         self.measurements_stream = m
527         self.events_stream = e
528         self.system = System()
529         self.config = Config(interval)
530         self.interval = interval
531         # start collector with aging time = double interval
532         self.collector = Collector(interval * 2)
533         # initialize event thread
534         self.queue = queue.Queue()
535         self.event_thread = Thread(target=self.event_worker)
536         self.event_thread.daemon = True
537         self.event_thread.start()
538         # initialize measurements timer
539         self.start_timer()
540
541     def destroy(self):
542         """Destroy the class"""
543         self.collector.destroy()
544         self.post_event(None)  # send stop event
545         self.event_thread.join()
546         self.stop_timer()
547
548     def start_timer(self):
549         """Start measurements timer"""
550         self.timer = Timer(self.interval, self.on_timer)
551         self.timer.start()
552
553     def stop_timer(self):
554         """Stop measurements timer"""
555         self.timer.cancel()
556
557     def on_timer(self):
558         """Measurements timer"""
559         self.start_timer()
560         self.process_measurements()
561
562     def event_worker(self):
563         """Event worker"""
564         while True:
565             event = self.queue.get()
566             if isinstance(event, CollectdNotification):
567                 self.process_notify(event)
568                 continue
569             # exit for the worker
570             break
571
572     def get_collector(self):
573         """Get metric collector reference"""
574         return self.collector
575
576     def process_measurements(self):
577         """Process measurements"""
578         loader = Loader(self.measurements_stream)
579         setattr(loader, 'collector', self.collector)
580         setattr(loader, 'system', self.system)
581         setattr(loader, 'config', self.config)
582         measurements = loader.get_data()
583         for measurement_name in measurements:
584             logging.debug('Process "{}" measurements: {}'.format(
585                 measurement_name, measurements[measurement_name]))
586             for measurement in measurements[measurement_name]:
587                 self.send_data(measurement)
588
589     def process_notify(self, notification):
590         """Process events"""
591         loader = Loader(self.events_stream)
592         setattr(loader, 'notification', notification)
593         setattr(loader, 'system', self.system)
594         notifications = loader.get_data()
595         for notify_name in notifications:
596             logging.debug('Process "{}" notification'.format(notify_name))
597             if notifications[notify_name] is not None:
598                 self.send_data(notifications[notify_name])
599
600     def send_data(self, data):
601         """Send data"""
602         assert False, 'send_data() is abstract function and MUST be overridden'
603
604     def post_event(self, notification):
605         """Post notification into the queue to process"""
606         self.queue.put(notification)