48a379c71be6e5425a9ee6fbe073cef7d1157337
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / normalizer.py
1 #
2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Authors:
17 #   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
18 #
19
20 import yaml
21 import logging
22 import datetime
23 import time
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
27 import re
28
29 # import YAML loader
30 try:
31     from yaml import CLoader as Loader
32 except ImportError:
33     from yaml import Loader
34
35 # import synchronized queue
36 try:
37     # python 2.x
38     import Queue as queue
39 except ImportError:
40     # python 3.x
41     import queue
42
43
44 class Config(object):
45     """Configuration class used to pass config option into YAML file"""
46
47     def __init__(self, interval):
48         self.interval = interval
49
50
51 class System(object):
52     """System class which provides information like host, time etc., into YAML
53     file"""
54
55     def __init__(self):
56         self.hostname = 'localhost'
57         self._id = 0
58
59     @property
60     def id(self):
61         self._id = self._id + 1
62         return self._id
63
64     @property
65     def time(self):
66         return time.time()
67
68     @property
69     def date(self):
70         return datetime.date.today().isoformat()
71
72
73 class ItemIterator(object):
74     """Item iterator returned by Collector class"""
75
76     def __init__(self, collector, items):
77         """Item iterator init"""
78         logging.debug('{}:__init__()'.format(self.__class__.__name__))
79         self._items = items
80         self._collector = collector
81         self._index = 0
82
83     def next(self):
84         """Returns next item from the list"""
85         if self._index == len(self._items):
86             raise StopIteration
87         curr_index = self._index
88         self._index = curr_index + 1
89         return self.items[curr_index]
90
91     def __getitem__(self, key):
92         """get item by index"""
93         return self._items[key]
94
95     def __len__(self):
96         """Return length of elements"""
97         return len(self._items)
98
99     def __del__(self):
100         """Destroy iterator and unlock the collector"""
101         logging.debug('{}:__del__()'.format(self.__class__.__name__))
102         self._collector.unlock()
103
104
105 class ItemObject(object):
106     """Item object returned by Collector class"""
107
108     def __init__(self, collector, hash_):
109         """Item object init"""
110         logging.debug('{}:__init__()'.format(self.__class__.__name__))
111         super(ItemObject, self).__setattr__('_collector', collector)
112         super(ItemObject, self).__setattr__('_hash', hash_)
113
114     def __setattr__(self, name, value):
115         t, item = self._collector._metrics[self._hash]
116         logging.debug('{}:__setattr__(name={}, value={})'.format(
117                       self.__class__.__name__, name, value))
118         setattr(item, name, value)
119         self._collector._metrics[self._hash] = (time.time(), item)
120
121     def __del__(self):
122         """Destroy item object and unlock the collector"""
123         logging.debug('{}:__del__()'.format(self.__class__.__name__))
124         self._collector.unlock()
125
126
127 class Collector(object):
128     """Thread-safe collector with aging feature"""
129
130     def __init__(self, age_timeout):
131         """Initialization"""
132         self._metrics = {}
133         self._lock = RLock()
134         self._age_timeout = age_timeout
135         self._start_age_timer()
136
137     def _start_age_timer(self):
138         """Start age timer"""
139         self._age_timer = Timer(self._age_timeout, self._on_timer)
140         self._age_timer.start()
141
142     def _stop_age_timer(self):
143         """Stop age timer"""
144         self._age_timer.cancel()
145
146     def _on_timer(self):
147         """Age timer"""
148         self._start_age_timer()
149         self._check_aging()
150
151     def _check_aging(self):
152         """Check aging time for all items"""
153         self.lock()
154         for data_hash, data in self._metrics.items():
155             age, item = data
156             if ((time.time() - age) >= self._age_timeout):
157                 # aging time has expired, remove the item from the collector
158                 logging.debug('{}:_check_aging():value={}'.format(
159                               self.__class__.__name__, item))
160                 self._metrics.pop(data_hash)
161                 del(item)
162         self.unlock()
163
164     def lock(self):
165         """Lock the collector"""
166         logging.debug('{}:lock()'.format(self.__class__.__name__))
167         self._lock.acquire()
168
169     def unlock(self):
170         """Unlock the collector"""
171         logging.debug('{}:unlock()'.format(self.__class__.__name__))
172         self._lock.release()
173
174     def get(self, hash_):
175         self.lock()
176         if hash_ in self._metrics:
177             return ItemObject(self, hash_)
178         self.unlock()
179         return None
180
181     def add(self, item):
182         """Add an item into the collector"""
183         self.lock()
184         logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
185         self._metrics[hash(item)] = (time.time(), item)
186         self.unlock()
187
188     def items(self, select_list=[]):
189         """Returns locked (safe) item iterator"""
190         metrics = []
191         self.lock()
192         for k, item in self._metrics.items():
193             _, value = item
194             for select in select_list:
195                 if value.match(**select):
196                     metrics.append(value)
197         return ItemIterator(self, metrics)
198
199     def destroy(self):
200         """Destroy the collector"""
201         self._stop_age_timer()
202
203
204 class CollectdData(object):
205     """Base class for Collectd data"""
206
207     def __init__(self, host=None, plugin=None, plugin_instance=None,
208                  type_=None, type_instance=None, time_=None):
209         """Class initialization"""
210         self.host = host
211         self.plugin = plugin
212         self.plugin_instance = plugin_instance
213         self.type_instance = type_instance
214         self.type = type_
215         self.time = time_
216
217     @classmethod
218     def is_regular_expression(cls, expr):
219         if len(expr) > 1 and expr[0] == '/' and expr[-1] == '/':
220             return True
221         return False
222
223     def match(self, **kargs):
224         # compare the metric
225         for key, value in kargs.items():
226             if self.is_regular_expression(value):
227                 if re.match(value[1:-1], getattr(self, key)) is None:
228                     return False
229             elif value != getattr(self, key):
230                 return False
231         # return match event if kargs is empty
232         return True
233
234
235 class CollectdNotification(CollectdData):
236     """Collectd notification"""
237
238     def __init__(self, host=None, plugin=None, plugin_instance=None,
239                  type_=None, type_instance=None, severity=None, message=None):
240         super(CollectdNotification, self).__init__(
241             host, plugin, plugin_instance, type_, type_instance)
242         self.severity = severity
243         self.message = message
244
245     def __repr__(self):
246         return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
247                'type_instance={}, severity={}, message={}, time={})'.format(
248                    self.__class__.__name__, self.host, self.plugin,
249                    self.plugin_instance, self.type, self.type_instance,
250                    self.severity, self.message, time)
251
252
253 class CollectdValue(CollectdData):
254     """Collectd value"""
255
256     def __init__(self, host=None, plugin=None, plugin_instance=None,
257                  type_=None, type_instance=None, ds_name='value', value=None,
258                  interval=None):
259         super(CollectdValue, self).__init__(
260             host, plugin, plugin_instance, type_, type_instance)
261         self.value = value
262         self.ds_name = ds_name
263         self.interval = interval
264
265     @classmethod
266     def hash_gen(cls, host, plugin, plugin_instance, type_,
267                  type_instance, ds_name):
268         return hash((host, plugin, plugin_instance, type_,
269                     type_instance, ds_name))
270
271     def __eq__(self, other):
272         return hash(self) == hash(other) and self.value == other.value
273
274     def __hash__(self):
275         return self.hash_gen(self.host, self.plugin, self.plugin_instance,
276                              self.type, self.type_instance, self.ds_name)
277
278     def __repr__(self):
279         return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
280                'type_instance={}, ds_name={}, value={}, time={})'.format(
281                    self.__class__.__name__, self.host, self.plugin,
282                    self.plugin_instance, self.type, self.type_instance,
283                    self.ds_name, self.value, self.time)
284
285
286 class Item(yaml.YAMLObject):
287     """Base class to process tags like ArrayItem/ValueItem"""
288
289     @classmethod
290     def format_node(cls, mapping, metric):
291         if mapping.tag in [
292                 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
293                 Number.yaml_tag]:
294             return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
295         elif mapping.tag == 'tag:yaml.org,2002:map':
296             values = []
297             for key, value in mapping.value:
298                 values.append((yaml.ScalarNode(key.tag, key.value),
299                               cls.format_node(value, metric)))
300             return yaml.MappingNode(mapping.tag, values)
301         elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
302             values = []
303             for seq in mapping.value:
304                 map_values = list()
305                 for key, value in seq.value:
306                     if key.value == 'SELECT':
307                         map_values.append((yaml.ScalarNode(key.tag, key.value),
308                                           cls.format_node(value, metric)))
309                     else:
310                         map_values.append((yaml.ScalarNode(key.tag, key.value),
311                                           value))
312                 values.append(yaml.MappingNode(seq.tag, map_values))
313             return yaml.SequenceNode(mapping.tag, values)
314         elif mapping.tag in [MapValue.yaml_tag]:
315             values = []
316             for key, value in mapping.value:
317                 if key.value == 'VALUE':
318                     values.append((yaml.ScalarNode(key.tag, key.value),
319                                   cls.format_node(value, metric)))
320                 else:
321                     values.append((yaml.ScalarNode(key.tag, key.value), value))
322             return yaml.MappingNode(mapping.tag, values)
323         return mapping
324
325
326 class ValueItem(Item):
327     """Class to process VlaueItem tag"""
328     yaml_tag = u'!ValueItem'
329
330     @classmethod
331     def from_yaml(cls, loader, node):
332         logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
333         default, select, value_desc = None, list(), None
334         # find value description
335         for elem in node.value:
336             for key, value in elem.value:
337                 if key.value == 'VALUE':
338                     assert value_desc is None, "VALUE key already set"
339                     value_desc = value
340                 if key.value == 'SELECT':
341                     select.append(loader.construct_mapping(value))
342                 if key.value == 'DEFAULT':
343                     assert default is None, "DEFAULT key already set"
344                     default = loader.construct_object(value)
345         # if VALUE key isn't given, use default VALUE key
346         # format: `VALUE: !Number '{vl.value}'`
347         if value_desc is None:
348             value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
349         # select collectd metric based on SELECT condition
350         metrics = loader.collector.items(select)
351         assert len(metrics) < 2, \
352             'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
353         if len(metrics) > 0:
354             item = cls.format_node(value_desc, {'vl': metrics[0],
355                                    'system': loader.system})
356             return loader.construct_object(item)
357         # nothing has been found by SELECT condition, set to DEFAULT value.
358         assert default is not None, \
359             "No metrics selected by SELECT condition and DEFAULT key isn't set"
360         return default
361
362
363 class ArrayItem(Item):
364     """Class to process ArrayItem tag"""
365     yaml_tag = u'!ArrayItem'
366
367     @classmethod
368     def from_yaml(cls, loader, node):
369         logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
370                       loader, node))
371         # e.g.:
372         # SequenceNode(tag=u'!ArrayItem', value=[
373         #   MappingNode(tag=u'tag:yaml.org,2002:map', value=[
374         #     (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
375         #       MappingNode(tag=u'tag:yaml.org,2002:map', value=[
376         #         (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
377         #           , ...)
378         #       ]), ...
379         #     ), (key, value), ... ])
380         #   , ... ])
381         assert isinstance(node, yaml.SequenceNode), \
382             "{} tag isn't YAML array".format(cls.__name__)
383         select, index_keys, items, item_desc = list(), list(), list(), None
384         for elem in node.value:
385             for key, value in elem.value:
386                 if key.value == 'ITEM-DESC':
387                     assert item_desc is None, "ITEM-DESC key already set"
388                     item_desc = value
389                 if key.value == 'INDEX-KEY':
390                     assert len(index_keys) == 0, "INDEX-KEY key already set"
391                     index_keys = loader.construct_sequence(value)
392                 if key.value == 'SELECT':
393                     select.append(loader.construct_mapping(value))
394         # validate item description
395         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
396         assert len(select) > 0 or len(index_keys) > 0, \
397             "Mandatory key (INDEX-KEY or SELECT) isn't set"
398         metrics = loader.collector.items(select)
399         # select metrics based on INDEX-KEY provided
400         if len(index_keys) > 0:
401             metric_set = set()
402             for metric in metrics:
403                 value_params = {}
404                 for key in index_keys:
405                     value_params[key] = getattr(metric, key)
406                 metric_set.add(CollectdValue(**value_params))
407             metrics = list(metric_set)
408         # build items based on SELECT and/or INDEX-KEY criteria
409         for metric in metrics:
410             item = cls.format_node(item_desc,
411                                    {'vl': metric, 'system': loader.system,
412                                        'config': loader.config})
413             items.append(loader.construct_mapping(item))
414         return items
415
416
417 class Measurements(ArrayItem):
418     """Class to process Measurements tag"""
419     yaml_tag = u'!Measurements'
420
421
422 class Events(Item):
423     """Class to process Events tag"""
424     yaml_tag = u'!Events'
425
426     @classmethod
427     def from_yaml(cls, loader, node):
428         condition, item_desc = dict(), None
429         for elem in node.value:
430             for key, value in elem.value:
431                 if key.value == 'ITEM-DESC':
432                     item_desc = value
433                 if key.value == 'CONDITION':
434                     condition = loader.construct_mapping(value)
435         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
436         if loader.notification.match(**condition):
437             item = cls.format_node(item_desc, {
438                 'n': loader.notification, 'system': loader.system})
439             return loader.construct_mapping(item)
440         return None
441
442
443 class Bytes2Kibibytes(yaml.YAMLObject):
444     """Class to process Bytes2Kibibytes tag"""
445     yaml_tag = u'!Bytes2Kibibytes'
446
447     @classmethod
448     def from_yaml(cls, loader, node):
449         return round(float(node.value) / 1024.0, 3)
450
451
452 class Number(yaml.YAMLObject):
453     """Class to process Number tag"""
454     yaml_tag = u'!Number'
455
456     @classmethod
457     def from_yaml(cls, loader, node):
458         try:
459             return int(node.value)
460         except ValueError:
461             return float(node.value)
462
463
464 class MapValue(yaml.YAMLObject):
465     """Class to process MapValue tag"""
466     yaml_tag = u'!MapValue'
467
468     @classmethod
469     def from_yaml(cls, loader, node):
470         mapping, val = None, None
471         for key, value in node.value:
472             if key.value == 'TO':
473                 mapping = loader.construct_mapping(value)
474             if key.value == 'VALUE':
475                 val = loader.construct_object(value)
476         assert mapping is not None, "Mandatory TO key isn't set"
477         assert val is not None, "Mandatory VALUE key isn't set"
478         assert val in mapping, \
479             'Value "{}" cannot be mapped to any of {} values'.format(
480                 val, mapping.keys())
481         return mapping[val]
482
483
484 class Normalizer(object):
485     """Normalization class which handles events and measurements"""
486
487     def __init__(self):
488         """Init"""
489         self.interval = None
490         self.collector = None
491         self.system = None
492         self.queue = None
493         self.timer = None
494
495     @classmethod
496     def read_configuration(cls, config_file):
497         """read YAML configuration file"""
498         # load YAML events/measurements definition
499         f = open(config_file, 'r')
500         doc_yaml = yaml.compose(f)
501         f.close()
502         # split events & measurements definitions
503         measurements, events = list(), list()
504         for key, value in doc_yaml.value:
505             if value.tag == Measurements.yaml_tag:
506                 measurements.append((key, value))
507             if value.tag == Events.yaml_tag:
508                 events.append((key, value))
509         measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
510                                              measurements)
511         measurements_stream = yaml.serialize(measurements_yaml)
512         events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
513         events_stream = yaml.serialize(events_yaml)
514         # return event & measurements definition
515         return events_stream, measurements_stream
516
517     def initialize(self, config_file, interval):
518         """Initialize the class"""
519         e, m = self.read_configuration(config_file)
520         self.measurements_stream = m
521         self.events_stream = e
522         self.system = System()
523         self.config = Config(interval)
524         self.interval = interval
525         # start collector with aging time = double interval
526         self.collector = Collector(interval * 2)
527         # initialize event thread
528         self.queue = queue.Queue()
529         self.event_thread = Thread(target=self.event_worker)
530         self.event_thread.daemon = True
531         self.event_thread.start()
532         # initialize measurements timer
533         self.start_timer()
534
535     def destroy(self):
536         """Destroy the class"""
537         self.collector.destroy()
538         self.post_event(None)  # send stop event
539         self.event_thread.join()
540         self.stop_timer()
541
542     def start_timer(self):
543         """Start measurements timer"""
544         self.timer = Timer(self.interval, self.on_timer)
545         self.timer.start()
546
547     def stop_timer(self):
548         """Stop measurements timer"""
549         self.timer.cancel()
550
551     def on_timer(self):
552         """Measurements timer"""
553         self.start_timer()
554         self.process_measurements()
555
556     def event_worker(self):
557         """Event worker"""
558         while True:
559             event = self.queue.get()
560             if isinstance(event, CollectdNotification):
561                 self.process_notify(event)
562                 continue
563             # exit for the worker
564             break
565
566     def get_collector(self):
567         """Get metric collector reference"""
568         return self.collector
569
570     def process_measurements(self):
571         """Process measurements"""
572         loader = Loader(self.measurements_stream)
573         setattr(loader, 'collector', self.collector)
574         setattr(loader, 'system', self.system)
575         setattr(loader, 'config', self.config)
576         measurements = loader.get_data()
577         for measurement_name in measurements:
578             logging.debug('Process "{}" measurements: {}'.format(
579                 measurement_name, measurements[measurement_name]))
580             for measurement in measurements[measurement_name]:
581                 self.send_data(measurement)
582
583     def process_notify(self, notification):
584         """Process events"""
585         loader = Loader(self.events_stream)
586         setattr(loader, 'notification', notification)
587         setattr(loader, 'system', self.system)
588         notifications = loader.get_data()
589         for notify_name in notifications:
590             logging.debug('Process "{}" notification'.format(notify_name))
591             if notifications[notify_name] is not None:
592                 self.send_data(notifications[notify_name])
593
594     def send_data(self, data):
595         """Send data"""
596         assert False, 'send_data() is abstract function and MUST be overridden'
597
598     def post_event(self, notification):
599         """Post notification into the queue to process"""
600         self.queue.put(notification)