0e92d1a533652a6374a2821df037e781ca7a3cbb
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / normalizer.py
1 #
2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Authors:
17 #   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
18 #
19
20 from . import yaml
21 import logging
22 import datetime
23 import time
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
27 import re
28
29 # import YAML loader
30 try:
31     from .yaml import CLoader as Loader
32 except ImportError:
33     from .yaml import Loader
34
35 # import synchronized queue
36 import queue
37
38
39 class Config(object):
40     """Configuration class used to pass config option into YAML file"""
41
42     def __init__(self, interval):
43         self.interval = interval
44
45
46 class System(object):
47     """System class which provides information like host, time etc., into YAML
48     file"""
49
50     def __init__(self):
51         self.hostname = 'localhost'
52         self._id = 0
53
54     @property
55     def id(self):
56         self._id = self._id + 1
57         return self._id
58
59     @property
60     def time(self):
61         return time.time()
62
63     @property
64     def date(self):
65         return datetime.date.today().isoformat()
66
67
68 class ItemIterator(object):
69     """Item iterator returned by Collector class"""
70
71     def __init__(self, collector, items):
72         """Item iterator init"""
73         logging.debug('{}:__init__()'.format(self.__class__.__name__))
74         self._items = items
75         self._collector = collector
76         self._index = 0
77
78     def __next__(self):
79         """Returns next item from the list"""
80         if self._index == len(self._items):
81             raise StopIteration
82         curr_index = self._index
83         self._index = curr_index + 1
84         return self.items[curr_index]
85
86     def __getitem__(self, key):
87         """get item by index"""
88         return self._items[key]
89
90     def __len__(self):
91         """Return length of elements"""
92         return len(self._items)
93
94     def __del__(self):
95         """Destroy iterator and unlock the collector"""
96         logging.debug('{}:__del__()'.format(self.__class__.__name__))
97         self._collector.unlock()
98
99
100 class ItemObject(object):
101     """Item object returned by Collector class"""
102
103     def __init__(self, collector, hash_):
104         """Item object init"""
105         logging.debug('{}:__init__()'.format(self.__class__.__name__))
106         super(ItemObject, self).__setattr__('_collector', collector)
107         super(ItemObject, self).__setattr__('_hash', hash_)
108
109     def __setattr__(self, name, value):
110         t, item = self._collector._metrics[self._hash]
111         logging.debug('{}:__setattr__(name={}, value={})'.format(
112                       self.__class__.__name__, name, value))
113         setattr(item, name, value)
114         self._collector._metrics[self._hash] = (time.time(), item)
115
116     def __del__(self):
117         """Destroy item object and unlock the collector"""
118         logging.debug('{}:__del__()'.format(self.__class__.__name__))
119         self._collector.unlock()
120
121
122 class Collector(object):
123     """Thread-safe collector with aging feature"""
124
125     def __init__(self, age_timeout):
126         """Initialization"""
127         self._metrics = {}
128         self._lock = RLock()
129         self._age_timeout = age_timeout
130         self._start_age_timer()
131
132     def _start_age_timer(self):
133         """Start age timer"""
134         self._age_timer = Timer(self._age_timeout, self._on_timer)
135         self._age_timer.start()
136
137     def _stop_age_timer(self):
138         """Stop age timer"""
139         self._age_timer.cancel()
140
141     def _on_timer(self):
142         """Age timer"""
143         self._start_age_timer()
144         self._check_aging()
145
146     def _check_aging(self):
147         """Check aging time for all items"""
148         self.lock()
149         for data_hash, data in list(self._metrics.items()):
150             age, item = data
151             if ((time.time() - age) >= self._age_timeout):
152                 # aging time has expired, remove the item from the collector
153                 logging.debug('{}:_check_aging():value={}'.format(
154                               self.__class__.__name__, item))
155                 self._metrics.pop(data_hash)
156                 del(item)
157         self.unlock()
158
159     def lock(self):
160         """Lock the collector"""
161         logging.debug('{}:lock()'.format(self.__class__.__name__))
162         self._lock.acquire()
163
164     def unlock(self):
165         """Unlock the collector"""
166         logging.debug('{}:unlock()'.format(self.__class__.__name__))
167         self._lock.release()
168
169     def get(self, hash_):
170         self.lock()
171         if hash_ in self._metrics:
172             return ItemObject(self, hash_)
173         self.unlock()
174         return None
175
176     def add(self, item):
177         """Add an item into the collector"""
178         self.lock()
179         logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
180         self._metrics[hash(item)] = (time.time(), item)
181         self.unlock()
182
183     def items(self, select_list=[]):
184         """Returns locked (safe) item iterator"""
185         metrics = []
186         self.lock()
187         for k, item in list(self._metrics.items()):
188             _, value = item
189             for select in select_list:
190                 if value.match(**select):
191                     metrics.append(value)
192         return ItemIterator(self, metrics)
193
194     def destroy(self):
195         """Destroy the collector"""
196         self._stop_age_timer()
197
198
199 class CollectdData(object):
200     """Base class for Collectd data"""
201
202     def __init__(self, host=None, plugin=None, plugin_instance=None,
203                  type_=None, type_instance=None, time_=None):
204         """Class initialization"""
205         self.host = host
206         self.plugin = plugin
207         self.plugin_instance = plugin_instance
208         self.type_instance = type_instance
209         self.type = type_
210         self.time = time_
211
212     @classmethod
213     def is_regular_expression(cls, expr):
214         return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
215
216     def match(self, **kargs):
217         # compare the metric
218         for key, value in list(kargs.items()):
219             if self.is_regular_expression(value):
220                 if re.match(value[1:-1], getattr(self, key)) is None:
221                     return False
222             elif value != getattr(self, key):
223                 return False
224         # return match event if kargs is empty
225         return True
226
227
228 class CollectdNotification(CollectdData):
229     """Collectd notification"""
230
231     def __init__(self, host=None, plugin=None, plugin_instance=None,
232                  type_=None, type_instance=None, severity=None, message=None):
233         super(CollectdNotification, self).__init__(
234             host, plugin, plugin_instance, type_, type_instance)
235         self.severity = severity
236         self.message = message
237
238     def __repr__(self):
239         return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
240                'type_instance={}, severity={}, message={}, time={})'.format(
241                    self.__class__.__name__, self.host, self.plugin,
242                    self.plugin_instance, self.type, self.type_instance,
243                    self.severity, self.message, time)
244
245
246 class CollectdValue(CollectdData):
247     """Collectd value"""
248
249     def __init__(self, host=None, plugin=None, plugin_instance=None,
250                  type_=None, type_instance=None, ds_name='value', value=None,
251                  interval=None):
252         super(CollectdValue, self).__init__(
253             host, plugin, plugin_instance, type_, type_instance)
254         self.value = value
255         self.ds_name = ds_name
256         self.interval = interval
257
258     @classmethod
259     def hash_gen(cls, host, plugin, plugin_instance, type_,
260                  type_instance, ds_name):
261         return hash((host, plugin, plugin_instance, type_,
262                     type_instance, ds_name))
263
264     def __eq__(self, other):
265         return hash(self) == hash(other) and self.value == other.value
266
267     def __hash__(self):
268         return self.hash_gen(self.host, self.plugin, self.plugin_instance,
269                              self.type, self.type_instance, self.ds_name)
270
271     def __repr__(self):
272         return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
273                'type_instance={}, ds_name={}, value={}, time={})'.format(
274                    self.__class__.__name__, self.host, self.plugin,
275                    self.plugin_instance, self.type, self.type_instance,
276                    self.ds_name, self.value, self.time)
277
278
279 class Item(yaml.YAMLObject):
280     """Base class to process tags like ArrayItem/ValueItem"""
281
282     @classmethod
283     def format_node(cls, mapping, metric):
284         if mapping.tag in [
285                 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
286                 Number.yaml_tag, StripExtraDash.yaml_tag]:
287             return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
288         elif mapping.tag == 'tag:yaml.org,2002:map':
289             values = []
290             for key, value in mapping.value:
291                 values.append((yaml.ScalarNode(key.tag, key.value),
292                               cls.format_node(value, metric)))
293             return yaml.MappingNode(mapping.tag, values)
294         elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
295             values = []
296             for seq in mapping.value:
297                 map_values = list()
298                 for key, value in seq.value:
299                     if key.value == 'SELECT':
300                         map_values.append((yaml.ScalarNode(key.tag, key.value),
301                                           cls.format_node(value, metric)))
302                     else:
303                         map_values.append((yaml.ScalarNode(key.tag, key.value),
304                                           value))
305                 values.append(yaml.MappingNode(seq.tag, map_values))
306             return yaml.SequenceNode(mapping.tag, values)
307         elif mapping.tag in [MapValue.yaml_tag]:
308             values = []
309             for key, value in mapping.value:
310                 if key.value == 'VALUE':
311                     values.append((yaml.ScalarNode(key.tag, key.value),
312                                   cls.format_node(value, metric)))
313                 else:
314                     values.append((yaml.ScalarNode(key.tag, key.value), value))
315             return yaml.MappingNode(mapping.tag, values)
316         return mapping
317
318
319 class ValueItem(Item):
320     """Class to process VlaueItem tag"""
321     yaml_tag = '!ValueItem'
322
323     @classmethod
324     def from_yaml(cls, loader, node):
325         logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
326         default, select, value_desc = None, list(), None
327         # find value description
328         for elem in node.value:
329             for key, value in elem.value:
330                 if key.value == 'VALUE':
331                     assert value_desc is None, "VALUE key already set"
332                     value_desc = value
333                 if key.value == 'SELECT':
334                     select.append(loader.construct_mapping(value))
335                 if key.value == 'DEFAULT':
336                     assert default is None, "DEFAULT key already set"
337                     default = loader.construct_object(value)
338         # if VALUE key isn't given, use default VALUE key
339         # format: `VALUE: !Number '{vl.value}'`
340         if value_desc is None:
341             value_desc = yaml.ScalarNode(tag='!Number', value='{vl.value}')
342         # select collectd metric based on SELECT condition
343         metrics = loader.collector.items(select)
344         assert len(metrics) < 2, \
345             'Wrong SELECT condition {}, selected {} metrics'.format(
346             select, len(metrics))
347         if len(metrics) > 0:
348             item = cls.format_node(value_desc, {'vl': metrics[0],
349                                    'system': loader.system})
350             return loader.construct_object(item)
351         # nothing has been found by SELECT condition, set to DEFAULT value.
352         assert default is not None, "No metrics selected by SELECT condition" \
353             " {} and DEFAULT key isn't set".format(select)
354         return default
355
356
357 class ArrayItem(Item):
358     """Class to process ArrayItem tag"""
359     yaml_tag = '!ArrayItem'
360
361     @classmethod
362     def from_yaml(cls, loader, node):
363         logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
364                       loader, node))
365         # e.g.:
366         # SequenceNode(tag=u'!ArrayItem', value=[
367         #   MappingNode(tag=u'tag:yaml.org,2002:map', value=[
368         #     (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
369         #       MappingNode(tag=u'tag:yaml.org,2002:map', value=[
370         #         (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
371         #           , ...)
372         #       ]), ...
373         #     ), (key, value), ... ])
374         #   , ... ])
375         assert isinstance(node, yaml.SequenceNode), \
376             "{} tag isn't YAML array".format(cls.__name__)
377         select, index_keys, items, item_desc = list(), list(), list(), None
378         for elem in node.value:
379             for key, value in elem.value:
380                 if key.value == 'ITEM-DESC':
381                     assert item_desc is None, "ITEM-DESC key already set"
382                     item_desc = value
383                 if key.value == 'INDEX-KEY':
384                     assert len(index_keys) == 0, "INDEX-KEY key already set"
385                     index_keys = loader.construct_sequence(value)
386                 if key.value == 'SELECT':
387                     select.append(loader.construct_mapping(value))
388         # validate item description
389         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
390         assert len(select) > 0 or len(index_keys) > 0, \
391             "Mandatory key (INDEX-KEY or SELECT) isn't set"
392         metrics = loader.collector.items(select)
393         # select metrics based on INDEX-KEY provided
394         if len(index_keys) > 0:
395             metric_set = set()
396             for metric in metrics:
397                 value = CollectdValue()
398                 for key in index_keys:
399                     setattr(value, key, getattr(metric, key))
400                 metric_set.add(value)
401             metrics = list(metric_set)
402         # build items based on SELECT and/or INDEX-KEY criteria
403         for metric in metrics:
404             item = cls.format_node(item_desc,
405                                    {'vl': metric, 'system': loader.system,
406                                        'config': loader.config})
407             items.append(loader.construct_mapping(item))
408         return items
409
410
411 class Measurements(ArrayItem):
412     """Class to process Measurements tag"""
413     yaml_tag = '!Measurements'
414
415
416 class Events(Item):
417     """Class to process Events tag"""
418     yaml_tag = '!Events'
419
420     @classmethod
421     def from_yaml(cls, loader, node):
422         condition, item_desc = dict(), None
423         for elem in node.value:
424             for key, value in elem.value:
425                 if key.value == 'ITEM-DESC':
426                     item_desc = value
427                 if key.value == 'CONDITION':
428                     condition = loader.construct_mapping(value)
429         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
430         if loader.notification.match(**condition):
431             item = cls.format_node(item_desc, {
432                 'n': loader.notification, 'system': loader.system})
433             return loader.construct_mapping(item)
434         return None
435
436
437 class Bytes2Kibibytes(yaml.YAMLObject):
438     """Class to process Bytes2Kibibytes tag"""
439     yaml_tag = '!Bytes2Kibibytes'
440
441     @classmethod
442     def from_yaml(cls, loader, node):
443         return round(float(node.value) / 1024.0, 3)
444
445
446 class Number(yaml.YAMLObject):
447     """Class to process Number tag"""
448     yaml_tag = '!Number'
449
450     @classmethod
451     def from_yaml(cls, loader, node):
452         try:
453             return int(node.value)
454         except ValueError:
455             return float(node.value)
456
457
458 class StripExtraDash(yaml.YAMLObject):
459     """Class to process StripExtraDash tag"""
460     yaml_tag = '!StripExtraDash'
461
462     @classmethod
463     def from_yaml(cls, loader, node):
464         return '-'.join([x for x in node.value.split('-') if len(x) > 0])
465
466
467 class MapValue(yaml.YAMLObject):
468     """Class to process MapValue tag"""
469     yaml_tag = '!MapValue'
470
471     @classmethod
472     def from_yaml(cls, loader, node):
473         mapping, val = None, None
474         for key, value in node.value:
475             if key.value == 'TO':
476                 mapping = loader.construct_mapping(value)
477             if key.value == 'VALUE':
478                 val = loader.construct_object(value)
479         assert mapping is not None, "Mandatory TO key isn't set"
480         assert val is not None, "Mandatory VALUE key isn't set"
481         assert val in mapping, \
482             'Value "{}" cannot be mapped to any of {} values'.format(
483                 val, list(mapping.keys()))
484         return mapping[val]
485
486
487 class Normalizer(object):
488     """Normalization class which handles events and measurements"""
489
490     def __init__(self):
491         """Init"""
492         self.interval = None
493         self.collector = None
494         self.system = None
495         self.queue = None
496         self.timer = None
497
498     @classmethod
499     def read_configuration(cls, config_file):
500         """read YAML configuration file"""
501         # load YAML events/measurements definition
502         f = open(config_file, 'r')
503         doc_yaml = yaml.compose(f)
504         f.close()
505         # split events & measurements definitions
506         measurements, events = list(), list()
507         for key, value in doc_yaml.value:
508             if value.tag == Measurements.yaml_tag:
509                 measurements.append((key, value))
510             if value.tag == Events.yaml_tag:
511                 events.append((key, value))
512         measurements_yaml = yaml.MappingNode('tag:yaml.org,2002:map',
513                                              measurements)
514         measurements_stream = yaml.serialize(measurements_yaml)
515         events_yaml = yaml.MappingNode('tag:yaml.org,2002:map', events)
516         events_stream = yaml.serialize(events_yaml)
517         # return event & measurements definition
518         return events_stream, measurements_stream
519
520     def initialize(self, config_file, interval):
521         """Initialize the class"""
522         e, m = self.read_configuration(config_file)
523         self.measurements_stream = m
524         self.events_stream = e
525         self.system = System()
526         self.config = Config(interval)
527         self.interval = interval
528         # start collector with aging time = double interval
529         self.collector = Collector(interval * 2)
530         # initialize event thread
531         self.queue = queue.Queue()
532         self.event_thread = Thread(target=self.event_worker)
533         self.event_thread.daemon = True
534         self.event_thread.start()
535         # initialize measurements timer
536         self.start_timer()
537
538     def destroy(self):
539         """Destroy the class"""
540         self.collector.destroy()
541         self.post_event(None)  # send stop event
542         self.event_thread.join()
543         self.stop_timer()
544
545     def start_timer(self):
546         """Start measurements timer"""
547         self.timer = Timer(self.interval, self.on_timer)
548         self.timer.start()
549
550     def stop_timer(self):
551         """Stop measurements timer"""
552         self.timer.cancel()
553
554     def on_timer(self):
555         """Measurements timer"""
556         self.start_timer()
557         self.process_measurements()
558
559     def event_worker(self):
560         """Event worker"""
561         while True:
562             event = self.queue.get()
563             if isinstance(event, CollectdNotification):
564                 self.process_notify(event)
565                 continue
566             # exit for the worker
567             break
568
569     def get_collector(self):
570         """Get metric collector reference"""
571         return self.collector
572
573     def process_measurements(self):
574         """Process measurements"""
575         loader = Loader(self.measurements_stream)
576         setattr(loader, 'collector', self.collector)
577         setattr(loader, 'system', self.system)
578         setattr(loader, 'config', self.config)
579         measurements = loader.get_data()
580         for measurement_name in measurements:
581             logging.debug('Process "{}" measurements: {}'.format(
582                 measurement_name, measurements[measurement_name]))
583             for measurement in measurements[measurement_name]:
584                 self.send_data(measurement)
585
586     def process_notify(self, notification):
587         """Process events"""
588         loader = Loader(self.events_stream)
589         setattr(loader, 'notification', notification)
590         setattr(loader, 'system', self.system)
591         notifications = loader.get_data()
592         for notify_name in notifications:
593             logging.debug('Process "{}" notification'.format(notify_name))
594             if notifications[notify_name] is not None:
595                 self.send_data(notifications[notify_name])
596
597     def send_data(self, data):
598         """Send data"""
599         assert False, 'send_data() is abstract function and MUST be overridden'
600
601     def post_event(self, notification):
602         """Post notification into the queue to process"""
603         self.queue.put(notification)