Merge "docker: Added docker container for running collectd"
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / normalizer.py
1 #
2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Authors:
17 #   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
18 #
19
20 import yaml
21 import logging
22 import datetime
23 import time
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
27 import re
28
29 # import YAML loader
30 try:
31     from yaml import CLoader as Loader
32 except ImportError:
33     from yaml import Loader
34
35 # import synchronized queue
36 try:
37     # python 2.x
38     import Queue as queue
39 except ImportError:
40     # python 3.x
41     import queue
42
43
44 class Config(object):
45     """Configuration class used to pass config option into YAML file"""
46
47     def __init__(self, interval):
48         self.interval = interval
49
50
51 class System(object):
52     """System class which provides information like host, time etc., into YAML
53     file"""
54
55     def __init__(self):
56         self.hostname = 'localhost'
57         self._id = 0
58
59     @property
60     def id(self):
61         self._id = self._id + 1
62         return self._id
63
64     @property
65     def time(self):
66         return time.time()
67
68     @property
69     def date(self):
70         return datetime.date.today().isoformat()
71
72
73 class ItemIterator(object):
74     """Item iterator returned by Collector class"""
75
76     def __init__(self, collector, items):
77         """Item iterator init"""
78         logging.debug('{}:__init__()'.format(self.__class__.__name__))
79         self._items = items
80         self._collector = collector
81         self._index = 0
82
83     def next(self):
84         """Returns next item from the list"""
85         if self._index == len(self._items):
86             raise StopIteration
87         curr_index = self._index
88         self._index = curr_index + 1
89         return self.items[curr_index]
90
91     def __getitem__(self, key):
92         """get item by index"""
93         return self._items[key]
94
95     def __len__(self):
96         """Return length of elements"""
97         return len(self._items)
98
99     def __del__(self):
100         """Destroy iterator and unlock the collector"""
101         logging.debug('{}:__del__()'.format(self.__class__.__name__))
102         self._collector.unlock()
103
104
105 class ItemObject(object):
106     """Item object returned by Collector class"""
107
108     def __init__(self, collector, hash_):
109         """Item object init"""
110         logging.debug('{}:__init__()'.format(self.__class__.__name__))
111         super(ItemObject, self).__setattr__('_collector', collector)
112         super(ItemObject, self).__setattr__('_hash', hash_)
113
114     def __setattr__(self, name, value):
115         t, item = self._collector._metrics[self._hash]
116         logging.debug('{}:__setattr__(name={}, value={})'.format(
117                       self.__class__.__name__, name, value))
118         setattr(item, name, value)
119         self._collector._metrics[self._hash] = (time.time(), item)
120
121     def __del__(self):
122         """Destroy item object and unlock the collector"""
123         logging.debug('{}:__del__()'.format(self.__class__.__name__))
124         self._collector.unlock()
125
126
127 class Collector(object):
128     """Thread-safe collector with aging feature"""
129
130     def __init__(self, age_timeout):
131         """Initialization"""
132         self._metrics = {}
133         self._lock = RLock()
134         self._age_timeout = age_timeout
135         self._start_age_timer()
136
137     def _start_age_timer(self):
138         """Start age timer"""
139         self._age_timer = Timer(self._age_timeout, self._on_timer)
140         self._age_timer.start()
141
142     def _stop_age_timer(self):
143         """Stop age timer"""
144         self._age_timer.cancel()
145
146     def _on_timer(self):
147         """Age timer"""
148         self._start_age_timer()
149         self._check_aging()
150
151     def _check_aging(self):
152         """Check aging time for all items"""
153         self.lock()
154         for data_hash, data in self._metrics.items():
155             age, item = data
156             if ((time.time() - age) >= self._age_timeout):
157                 # aging time has expired, remove the item from the collector
158                 logging.debug('{}:_check_aging():value={}'.format(
159                               self.__class__.__name__, item))
160                 self._metrics.pop(data_hash)
161                 del(item)
162         self.unlock()
163
164     def lock(self):
165         """Lock the collector"""
166         logging.debug('{}:lock()'.format(self.__class__.__name__))
167         self._lock.acquire()
168
169     def unlock(self):
170         """Unlock the collector"""
171         logging.debug('{}:unlock()'.format(self.__class__.__name__))
172         self._lock.release()
173
174     def get(self, hash_):
175         self.lock()
176         if hash_ in self._metrics:
177             return ItemObject(self, hash_)
178         self.unlock()
179         return None
180
181     def add(self, item):
182         """Add an item into the collector"""
183         self.lock()
184         logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
185         self._metrics[hash(item)] = (time.time(), item)
186         self.unlock()
187
188     def items(self, select_list=[]):
189         """Returns locked (safe) item iterator"""
190         metrics = []
191         self.lock()
192         for k, item in self._metrics.items():
193             _, value = item
194             for select in select_list:
195                 if value.match(**select):
196                     metrics.append(value)
197         return ItemIterator(self, metrics)
198
199     def destroy(self):
200         """Destroy the collector"""
201         self._stop_age_timer()
202
203
204 class CollectdData(object):
205     """Base class for Collectd data"""
206
207     def __init__(self, host=None, plugin=None, plugin_instance=None,
208                  type_=None, type_instance=None, time_=None):
209         """Class initialization"""
210         self.host = host
211         self.plugin = plugin
212         self.plugin_instance = plugin_instance
213         self.type_instance = type_instance
214         self.type = type_
215         self.time = time_
216
217     @classmethod
218     def is_regular_expression(cls, expr):
219         return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
220
221     def match(self, **kargs):
222         # compare the metric
223         for key, value in kargs.items():
224             if self.is_regular_expression(value):
225                 if re.match(value[1:-1], getattr(self, key)) is None:
226                     return False
227             elif value != getattr(self, key):
228                 return False
229         # return match event if kargs is empty
230         return True
231
232
233 class CollectdNotification(CollectdData):
234     """Collectd notification"""
235
236     def __init__(self, host=None, plugin=None, plugin_instance=None,
237                  type_=None, type_instance=None, severity=None, message=None):
238         super(CollectdNotification, self).__init__(
239             host, plugin, plugin_instance, type_, type_instance)
240         self.severity = severity
241         self.message = message
242
243     def __repr__(self):
244         return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
245                'type_instance={}, severity={}, message={}, time={})'.format(
246                    self.__class__.__name__, self.host, self.plugin,
247                    self.plugin_instance, self.type, self.type_instance,
248                    self.severity, self.message, time)
249
250
251 class CollectdValue(CollectdData):
252     """Collectd value"""
253
254     def __init__(self, host=None, plugin=None, plugin_instance=None,
255                  type_=None, type_instance=None, ds_name='value', value=None,
256                  interval=None):
257         super(CollectdValue, self).__init__(
258             host, plugin, plugin_instance, type_, type_instance)
259         self.value = value
260         self.ds_name = ds_name
261         self.interval = interval
262
263     @classmethod
264     def hash_gen(cls, host, plugin, plugin_instance, type_,
265                  type_instance, ds_name):
266         return hash((host, plugin, plugin_instance, type_,
267                     type_instance, ds_name))
268
269     def __eq__(self, other):
270         return hash(self) == hash(other) and self.value == other.value
271
272     def __hash__(self):
273         return self.hash_gen(self.host, self.plugin, self.plugin_instance,
274                              self.type, self.type_instance, self.ds_name)
275
276     def __repr__(self):
277         return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
278                'type_instance={}, ds_name={}, value={}, time={})'.format(
279                    self.__class__.__name__, self.host, self.plugin,
280                    self.plugin_instance, self.type, self.type_instance,
281                    self.ds_name, self.value, self.time)
282
283
284 class Item(yaml.YAMLObject):
285     """Base class to process tags like ArrayItem/ValueItem"""
286
287     @classmethod
288     def format_node(cls, mapping, metric):
289         if mapping.tag in [
290                 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
291                 Number.yaml_tag, StripExtraDash.yaml_tag]:
292             return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
293         elif mapping.tag == 'tag:yaml.org,2002:map':
294             values = []
295             for key, value in mapping.value:
296                 values.append((yaml.ScalarNode(key.tag, key.value),
297                               cls.format_node(value, metric)))
298             return yaml.MappingNode(mapping.tag, values)
299         elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
300             values = []
301             for seq in mapping.value:
302                 map_values = list()
303                 for key, value in seq.value:
304                     if key.value == 'SELECT':
305                         map_values.append((yaml.ScalarNode(key.tag, key.value),
306                                           cls.format_node(value, metric)))
307                     else:
308                         map_values.append((yaml.ScalarNode(key.tag, key.value),
309                                           value))
310                 values.append(yaml.MappingNode(seq.tag, map_values))
311             return yaml.SequenceNode(mapping.tag, values)
312         elif mapping.tag in [MapValue.yaml_tag]:
313             values = []
314             for key, value in mapping.value:
315                 if key.value == 'VALUE':
316                     values.append((yaml.ScalarNode(key.tag, key.value),
317                                   cls.format_node(value, metric)))
318                 else:
319                     values.append((yaml.ScalarNode(key.tag, key.value), value))
320             return yaml.MappingNode(mapping.tag, values)
321         return mapping
322
323
324 class ValueItem(Item):
325     """Class to process VlaueItem tag"""
326     yaml_tag = u'!ValueItem'
327
328     @classmethod
329     def from_yaml(cls, loader, node):
330         logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
331         default, select, value_desc = None, list(), None
332         # find value description
333         for elem in node.value:
334             for key, value in elem.value:
335                 if key.value == 'VALUE':
336                     assert value_desc is None, "VALUE key already set"
337                     value_desc = value
338                 if key.value == 'SELECT':
339                     select.append(loader.construct_mapping(value))
340                 if key.value == 'DEFAULT':
341                     assert default is None, "DEFAULT key already set"
342                     default = loader.construct_object(value)
343         # if VALUE key isn't given, use default VALUE key
344         # format: `VALUE: !Number '{vl.value}'`
345         if value_desc is None:
346             value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
347         # select collectd metric based on SELECT condition
348         metrics = loader.collector.items(select)
349         assert len(metrics) < 2, \
350             'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
351         if len(metrics) > 0:
352             item = cls.format_node(value_desc, {'vl': metrics[0],
353                                    'system': loader.system})
354             return loader.construct_object(item)
355         # nothing has been found by SELECT condition, set to DEFAULT value.
356         assert default is not None, \
357             "No metrics selected by SELECT condition and DEFAULT key isn't set"
358         return default
359
360
361 class ArrayItem(Item):
362     """Class to process ArrayItem tag"""
363     yaml_tag = u'!ArrayItem'
364
365     @classmethod
366     def from_yaml(cls, loader, node):
367         logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
368                       loader, node))
369         # e.g.:
370         # SequenceNode(tag=u'!ArrayItem', value=[
371         #   MappingNode(tag=u'tag:yaml.org,2002:map', value=[
372         #     (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
373         #       MappingNode(tag=u'tag:yaml.org,2002:map', value=[
374         #         (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
375         #           , ...)
376         #       ]), ...
377         #     ), (key, value), ... ])
378         #   , ... ])
379         assert isinstance(node, yaml.SequenceNode), \
380             "{} tag isn't YAML array".format(cls.__name__)
381         select, index_keys, items, item_desc = list(), list(), list(), None
382         for elem in node.value:
383             for key, value in elem.value:
384                 if key.value == 'ITEM-DESC':
385                     assert item_desc is None, "ITEM-DESC key already set"
386                     item_desc = value
387                 if key.value == 'INDEX-KEY':
388                     assert len(index_keys) == 0, "INDEX-KEY key already set"
389                     index_keys = loader.construct_sequence(value)
390                 if key.value == 'SELECT':
391                     select.append(loader.construct_mapping(value))
392         # validate item description
393         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
394         assert len(select) > 0 or len(index_keys) > 0, \
395             "Mandatory key (INDEX-KEY or SELECT) isn't set"
396         metrics = loader.collector.items(select)
397         # select metrics based on INDEX-KEY provided
398         if len(index_keys) > 0:
399             metric_set = set()
400             for metric in metrics:
401                 value_params = {}
402                 for key in index_keys:
403                     value_params[key] = getattr(metric, key)
404                 metric_set.add(CollectdValue(**value_params))
405             metrics = list(metric_set)
406         # build items based on SELECT and/or INDEX-KEY criteria
407         for metric in metrics:
408             item = cls.format_node(item_desc,
409                                    {'vl': metric, 'system': loader.system,
410                                        'config': loader.config})
411             items.append(loader.construct_mapping(item))
412         return items
413
414
415 class Measurements(ArrayItem):
416     """Class to process Measurements tag"""
417     yaml_tag = u'!Measurements'
418
419
420 class Events(Item):
421     """Class to process Events tag"""
422     yaml_tag = u'!Events'
423
424     @classmethod
425     def from_yaml(cls, loader, node):
426         condition, item_desc = dict(), None
427         for elem in node.value:
428             for key, value in elem.value:
429                 if key.value == 'ITEM-DESC':
430                     item_desc = value
431                 if key.value == 'CONDITION':
432                     condition = loader.construct_mapping(value)
433         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
434         if loader.notification.match(**condition):
435             item = cls.format_node(item_desc, {
436                 'n': loader.notification, 'system': loader.system})
437             return loader.construct_mapping(item)
438         return None
439
440
441 class Bytes2Kibibytes(yaml.YAMLObject):
442     """Class to process Bytes2Kibibytes tag"""
443     yaml_tag = u'!Bytes2Kibibytes'
444
445     @classmethod
446     def from_yaml(cls, loader, node):
447         return round(float(node.value) / 1024.0, 3)
448
449
450 class Number(yaml.YAMLObject):
451     """Class to process Number tag"""
452     yaml_tag = u'!Number'
453
454     @classmethod
455     def from_yaml(cls, loader, node):
456         try:
457             return int(node.value)
458         except ValueError:
459             return float(node.value)
460
461
462 class StripExtraDash(yaml.YAMLObject):
463     """Class to process StripExtraDash tag"""
464     yaml_tag = u'!StripExtraDash'
465
466     @classmethod
467     def from_yaml(cls, loader, node):
468         return '-'.join([ x for x in node.value.split('-') if len(x) > 0])
469
470
471 class MapValue(yaml.YAMLObject):
472     """Class to process MapValue tag"""
473     yaml_tag = u'!MapValue'
474
475     @classmethod
476     def from_yaml(cls, loader, node):
477         mapping, val = None, None
478         for key, value in node.value:
479             if key.value == 'TO':
480                 mapping = loader.construct_mapping(value)
481             if key.value == 'VALUE':
482                 val = loader.construct_object(value)
483         assert mapping is not None, "Mandatory TO key isn't set"
484         assert val is not None, "Mandatory VALUE key isn't set"
485         assert val in mapping, \
486             'Value "{}" cannot be mapped to any of {} values'.format(
487                 val, mapping.keys())
488         return mapping[val]
489
490
491 class Normalizer(object):
492     """Normalization class which handles events and measurements"""
493
494     def __init__(self):
495         """Init"""
496         self.interval = None
497         self.collector = None
498         self.system = None
499         self.queue = None
500         self.timer = None
501
502     @classmethod
503     def read_configuration(cls, config_file):
504         """read YAML configuration file"""
505         # load YAML events/measurements definition
506         f = open(config_file, 'r')
507         doc_yaml = yaml.compose(f)
508         f.close()
509         # split events & measurements definitions
510         measurements, events = list(), list()
511         for key, value in doc_yaml.value:
512             if value.tag == Measurements.yaml_tag:
513                 measurements.append((key, value))
514             if value.tag == Events.yaml_tag:
515                 events.append((key, value))
516         measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
517                                              measurements)
518         measurements_stream = yaml.serialize(measurements_yaml)
519         events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
520         events_stream = yaml.serialize(events_yaml)
521         # return event & measurements definition
522         return events_stream, measurements_stream
523
524     def initialize(self, config_file, interval):
525         """Initialize the class"""
526         e, m = self.read_configuration(config_file)
527         self.measurements_stream = m
528         self.events_stream = e
529         self.system = System()
530         self.config = Config(interval)
531         self.interval = interval
532         # start collector with aging time = double interval
533         self.collector = Collector(interval * 2)
534         # initialize event thread
535         self.queue = queue.Queue()
536         self.event_thread = Thread(target=self.event_worker)
537         self.event_thread.daemon = True
538         self.event_thread.start()
539         # initialize measurements timer
540         self.start_timer()
541
542     def destroy(self):
543         """Destroy the class"""
544         self.collector.destroy()
545         self.post_event(None)  # send stop event
546         self.event_thread.join()
547         self.stop_timer()
548
549     def start_timer(self):
550         """Start measurements timer"""
551         self.timer = Timer(self.interval, self.on_timer)
552         self.timer.start()
553
554     def stop_timer(self):
555         """Stop measurements timer"""
556         self.timer.cancel()
557
558     def on_timer(self):
559         """Measurements timer"""
560         self.start_timer()
561         self.process_measurements()
562
563     def event_worker(self):
564         """Event worker"""
565         while True:
566             event = self.queue.get()
567             if isinstance(event, CollectdNotification):
568                 self.process_notify(event)
569                 continue
570             # exit for the worker
571             break
572
573     def get_collector(self):
574         """Get metric collector reference"""
575         return self.collector
576
577     def process_measurements(self):
578         """Process measurements"""
579         loader = Loader(self.measurements_stream)
580         setattr(loader, 'collector', self.collector)
581         setattr(loader, 'system', self.system)
582         setattr(loader, 'config', self.config)
583         measurements = loader.get_data()
584         for measurement_name in measurements:
585             logging.debug('Process "{}" measurements: {}'.format(
586                 measurement_name, measurements[measurement_name]))
587             for measurement in measurements[measurement_name]:
588                 self.send_data(measurement)
589
590     def process_notify(self, notification):
591         """Process events"""
592         loader = Loader(self.events_stream)
593         setattr(loader, 'notification', notification)
594         setattr(loader, 'system', self.system)
595         notifications = loader.get_data()
596         for notify_name in notifications:
597             logging.debug('Process "{}" notification'.format(notify_name))
598             if notifications[notify_name] is not None:
599                 self.send_data(notifications[notify_name])
600
601     def send_data(self, data):
602         """Send data"""
603         assert False, 'send_data() is abstract function and MUST be overridden'
604
605     def post_event(self, notification):
606         """Post notification into the queue to process"""
607         self.queue.put(notification)