Simplify RE validation
[barometer.git] / 3rd_party / collectd-ves-app / ves_app / normalizer.py
1 #
2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
3 #
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
7 #
8 #     http://www.apache.org/licenses/LICENSE-2.0
9 #
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
15 #
16 # Authors:
17 #   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
18 #
19
20 import yaml
21 import logging
22 import datetime
23 import time
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
27 import re
28
29 # import YAML loader
30 try:
31     from yaml import CLoader as Loader
32 except ImportError:
33     from yaml import Loader
34
35 # import synchronized queue
36 try:
37     # python 2.x
38     import Queue as queue
39 except ImportError:
40     # python 3.x
41     import queue
42
43
44 class Config(object):
45     """Configuration class used to pass config option into YAML file"""
46
47     def __init__(self, interval):
48         self.interval = interval
49
50
51 class System(object):
52     """System class which provides information like host, time etc., into YAML
53     file"""
54
55     def __init__(self):
56         self.hostname = 'localhost'
57         self._id = 0
58
59     @property
60     def id(self):
61         self._id = self._id + 1
62         return self._id
63
64     @property
65     def time(self):
66         return time.time()
67
68     @property
69     def date(self):
70         return datetime.date.today().isoformat()
71
72
73 class ItemIterator(object):
74     """Item iterator returned by Collector class"""
75
76     def __init__(self, collector, items):
77         """Item iterator init"""
78         logging.debug('{}:__init__()'.format(self.__class__.__name__))
79         self._items = items
80         self._collector = collector
81         self._index = 0
82
83     def next(self):
84         """Returns next item from the list"""
85         if self._index == len(self._items):
86             raise StopIteration
87         curr_index = self._index
88         self._index = curr_index + 1
89         return self.items[curr_index]
90
91     def __getitem__(self, key):
92         """get item by index"""
93         return self._items[key]
94
95     def __len__(self):
96         """Return length of elements"""
97         return len(self._items)
98
99     def __del__(self):
100         """Destroy iterator and unlock the collector"""
101         logging.debug('{}:__del__()'.format(self.__class__.__name__))
102         self._collector.unlock()
103
104
105 class ItemObject(object):
106     """Item object returned by Collector class"""
107
108     def __init__(self, collector, hash_):
109         """Item object init"""
110         logging.debug('{}:__init__()'.format(self.__class__.__name__))
111         super(ItemObject, self).__setattr__('_collector', collector)
112         super(ItemObject, self).__setattr__('_hash', hash_)
113
114     def __setattr__(self, name, value):
115         t, item = self._collector._metrics[self._hash]
116         logging.debug('{}:__setattr__(name={}, value={})'.format(
117                       self.__class__.__name__, name, value))
118         setattr(item, name, value)
119         self._collector._metrics[self._hash] = (time.time(), item)
120
121     def __del__(self):
122         """Destroy item object and unlock the collector"""
123         logging.debug('{}:__del__()'.format(self.__class__.__name__))
124         self._collector.unlock()
125
126
127 class Collector(object):
128     """Thread-safe collector with aging feature"""
129
130     def __init__(self, age_timeout):
131         """Initialization"""
132         self._metrics = {}
133         self._lock = RLock()
134         self._age_timeout = age_timeout
135         self._start_age_timer()
136
137     def _start_age_timer(self):
138         """Start age timer"""
139         self._age_timer = Timer(self._age_timeout, self._on_timer)
140         self._age_timer.start()
141
142     def _stop_age_timer(self):
143         """Stop age timer"""
144         self._age_timer.cancel()
145
146     def _on_timer(self):
147         """Age timer"""
148         self._start_age_timer()
149         self._check_aging()
150
151     def _check_aging(self):
152         """Check aging time for all items"""
153         self.lock()
154         for data_hash, data in self._metrics.items():
155             age, item = data
156             if ((time.time() - age) >= self._age_timeout):
157                 # aging time has expired, remove the item from the collector
158                 logging.debug('{}:_check_aging():value={}'.format(
159                               self.__class__.__name__, item))
160                 self._metrics.pop(data_hash)
161                 del(item)
162         self.unlock()
163
164     def lock(self):
165         """Lock the collector"""
166         logging.debug('{}:lock()'.format(self.__class__.__name__))
167         self._lock.acquire()
168
169     def unlock(self):
170         """Unlock the collector"""
171         logging.debug('{}:unlock()'.format(self.__class__.__name__))
172         self._lock.release()
173
174     def get(self, hash_):
175         self.lock()
176         if hash_ in self._metrics:
177             return ItemObject(self, hash_)
178         self.unlock()
179         return None
180
181     def add(self, item):
182         """Add an item into the collector"""
183         self.lock()
184         logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
185         self._metrics[hash(item)] = (time.time(), item)
186         self.unlock()
187
188     def items(self, select_list=[]):
189         """Returns locked (safe) item iterator"""
190         metrics = []
191         self.lock()
192         for k, item in self._metrics.items():
193             _, value = item
194             for select in select_list:
195                 if value.match(**select):
196                     metrics.append(value)
197         return ItemIterator(self, metrics)
198
199     def destroy(self):
200         """Destroy the collector"""
201         self._stop_age_timer()
202
203
204 class CollectdData(object):
205     """Base class for Collectd data"""
206
207     def __init__(self, host=None, plugin=None, plugin_instance=None,
208                  type_=None, type_instance=None, time_=None):
209         """Class initialization"""
210         self.host = host
211         self.plugin = plugin
212         self.plugin_instance = plugin_instance
213         self.type_instance = type_instance
214         self.type = type_
215         self.time = time_
216
217     @classmethod
218     def is_regular_expression(cls, expr):
219         return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
220
221     def match(self, **kargs):
222         # compare the metric
223         for key, value in kargs.items():
224             if self.is_regular_expression(value):
225                 if re.match(value[1:-1], getattr(self, key)) is None:
226                     return False
227             elif value != getattr(self, key):
228                 return False
229         # return match event if kargs is empty
230         return True
231
232
233 class CollectdNotification(CollectdData):
234     """Collectd notification"""
235
236     def __init__(self, host=None, plugin=None, plugin_instance=None,
237                  type_=None, type_instance=None, severity=None, message=None):
238         super(CollectdNotification, self).__init__(
239             host, plugin, plugin_instance, type_, type_instance)
240         self.severity = severity
241         self.message = message
242
243     def __repr__(self):
244         return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
245                'type_instance={}, severity={}, message={}, time={})'.format(
246                    self.__class__.__name__, self.host, self.plugin,
247                    self.plugin_instance, self.type, self.type_instance,
248                    self.severity, self.message, time)
249
250
251 class CollectdValue(CollectdData):
252     """Collectd value"""
253
254     def __init__(self, host=None, plugin=None, plugin_instance=None,
255                  type_=None, type_instance=None, ds_name='value', value=None,
256                  interval=None):
257         super(CollectdValue, self).__init__(
258             host, plugin, plugin_instance, type_, type_instance)
259         self.value = value
260         self.ds_name = ds_name
261         self.interval = interval
262
263     @classmethod
264     def hash_gen(cls, host, plugin, plugin_instance, type_,
265                  type_instance, ds_name):
266         return hash((host, plugin, plugin_instance, type_,
267                     type_instance, ds_name))
268
269     def __eq__(self, other):
270         return hash(self) == hash(other) and self.value == other.value
271
272     def __hash__(self):
273         return self.hash_gen(self.host, self.plugin, self.plugin_instance,
274                              self.type, self.type_instance, self.ds_name)
275
276     def __repr__(self):
277         return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
278                'type_instance={}, ds_name={}, value={}, time={})'.format(
279                    self.__class__.__name__, self.host, self.plugin,
280                    self.plugin_instance, self.type, self.type_instance,
281                    self.ds_name, self.value, self.time)
282
283
284 class Item(yaml.YAMLObject):
285     """Base class to process tags like ArrayItem/ValueItem"""
286
287     @classmethod
288     def format_node(cls, mapping, metric):
289         if mapping.tag in [
290                 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
291                 Number.yaml_tag]:
292             return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
293         elif mapping.tag == 'tag:yaml.org,2002:map':
294             values = []
295             for key, value in mapping.value:
296                 values.append((yaml.ScalarNode(key.tag, key.value),
297                               cls.format_node(value, metric)))
298             return yaml.MappingNode(mapping.tag, values)
299         elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
300             values = []
301             for seq in mapping.value:
302                 map_values = list()
303                 for key, value in seq.value:
304                     if key.value == 'SELECT':
305                         map_values.append((yaml.ScalarNode(key.tag, key.value),
306                                           cls.format_node(value, metric)))
307                     else:
308                         map_values.append((yaml.ScalarNode(key.tag, key.value),
309                                           value))
310                 values.append(yaml.MappingNode(seq.tag, map_values))
311             return yaml.SequenceNode(mapping.tag, values)
312         elif mapping.tag in [MapValue.yaml_tag]:
313             values = []
314             for key, value in mapping.value:
315                 if key.value == 'VALUE':
316                     values.append((yaml.ScalarNode(key.tag, key.value),
317                                   cls.format_node(value, metric)))
318                 else:
319                     values.append((yaml.ScalarNode(key.tag, key.value), value))
320             return yaml.MappingNode(mapping.tag, values)
321         return mapping
322
323
324 class ValueItem(Item):
325     """Class to process VlaueItem tag"""
326     yaml_tag = u'!ValueItem'
327
328     @classmethod
329     def from_yaml(cls, loader, node):
330         logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
331         default, select, value_desc = None, list(), None
332         # find value description
333         for elem in node.value:
334             for key, value in elem.value:
335                 if key.value == 'VALUE':
336                     assert value_desc is None, "VALUE key already set"
337                     value_desc = value
338                 if key.value == 'SELECT':
339                     select.append(loader.construct_mapping(value))
340                 if key.value == 'DEFAULT':
341                     assert default is None, "DEFAULT key already set"
342                     default = loader.construct_object(value)
343         # if VALUE key isn't given, use default VALUE key
344         # format: `VALUE: !Number '{vl.value}'`
345         if value_desc is None:
346             value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
347         # select collectd metric based on SELECT condition
348         metrics = loader.collector.items(select)
349         assert len(metrics) < 2, \
350             'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
351         if len(metrics) > 0:
352             item = cls.format_node(value_desc, {'vl': metrics[0],
353                                    'system': loader.system})
354             return loader.construct_object(item)
355         # nothing has been found by SELECT condition, set to DEFAULT value.
356         assert default is not None, \
357             "No metrics selected by SELECT condition and DEFAULT key isn't set"
358         return default
359
360
361 class ArrayItem(Item):
362     """Class to process ArrayItem tag"""
363     yaml_tag = u'!ArrayItem'
364
365     @classmethod
366     def from_yaml(cls, loader, node):
367         logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
368                       loader, node))
369         # e.g.:
370         # SequenceNode(tag=u'!ArrayItem', value=[
371         #   MappingNode(tag=u'tag:yaml.org,2002:map', value=[
372         #     (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
373         #       MappingNode(tag=u'tag:yaml.org,2002:map', value=[
374         #         (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
375         #           , ...)
376         #       ]), ...
377         #     ), (key, value), ... ])
378         #   , ... ])
379         assert isinstance(node, yaml.SequenceNode), \
380             "{} tag isn't YAML array".format(cls.__name__)
381         select, index_keys, items, item_desc = list(), list(), list(), None
382         for elem in node.value:
383             for key, value in elem.value:
384                 if key.value == 'ITEM-DESC':
385                     assert item_desc is None, "ITEM-DESC key already set"
386                     item_desc = value
387                 if key.value == 'INDEX-KEY':
388                     assert len(index_keys) == 0, "INDEX-KEY key already set"
389                     index_keys = loader.construct_sequence(value)
390                 if key.value == 'SELECT':
391                     select.append(loader.construct_mapping(value))
392         # validate item description
393         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
394         assert len(select) > 0 or len(index_keys) > 0, \
395             "Mandatory key (INDEX-KEY or SELECT) isn't set"
396         metrics = loader.collector.items(select)
397         # select metrics based on INDEX-KEY provided
398         if len(index_keys) > 0:
399             metric_set = set()
400             for metric in metrics:
401                 value_params = {}
402                 for key in index_keys:
403                     value_params[key] = getattr(metric, key)
404                 metric_set.add(CollectdValue(**value_params))
405             metrics = list(metric_set)
406         # build items based on SELECT and/or INDEX-KEY criteria
407         for metric in metrics:
408             item = cls.format_node(item_desc,
409                                    {'vl': metric, 'system': loader.system,
410                                        'config': loader.config})
411             items.append(loader.construct_mapping(item))
412         return items
413
414
415 class Measurements(ArrayItem):
416     """Class to process Measurements tag"""
417     yaml_tag = u'!Measurements'
418
419
420 class Events(Item):
421     """Class to process Events tag"""
422     yaml_tag = u'!Events'
423
424     @classmethod
425     def from_yaml(cls, loader, node):
426         condition, item_desc = dict(), None
427         for elem in node.value:
428             for key, value in elem.value:
429                 if key.value == 'ITEM-DESC':
430                     item_desc = value
431                 if key.value == 'CONDITION':
432                     condition = loader.construct_mapping(value)
433         assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
434         if loader.notification.match(**condition):
435             item = cls.format_node(item_desc, {
436                 'n': loader.notification, 'system': loader.system})
437             return loader.construct_mapping(item)
438         return None
439
440
441 class Bytes2Kibibytes(yaml.YAMLObject):
442     """Class to process Bytes2Kibibytes tag"""
443     yaml_tag = u'!Bytes2Kibibytes'
444
445     @classmethod
446     def from_yaml(cls, loader, node):
447         return round(float(node.value) / 1024.0, 3)
448
449
450 class Number(yaml.YAMLObject):
451     """Class to process Number tag"""
452     yaml_tag = u'!Number'
453
454     @classmethod
455     def from_yaml(cls, loader, node):
456         try:
457             return int(node.value)
458         except ValueError:
459             return float(node.value)
460
461
462 class MapValue(yaml.YAMLObject):
463     """Class to process MapValue tag"""
464     yaml_tag = u'!MapValue'
465
466     @classmethod
467     def from_yaml(cls, loader, node):
468         mapping, val = None, None
469         for key, value in node.value:
470             if key.value == 'TO':
471                 mapping = loader.construct_mapping(value)
472             if key.value == 'VALUE':
473                 val = loader.construct_object(value)
474         assert mapping is not None, "Mandatory TO key isn't set"
475         assert val is not None, "Mandatory VALUE key isn't set"
476         assert val in mapping, \
477             'Value "{}" cannot be mapped to any of {} values'.format(
478                 val, mapping.keys())
479         return mapping[val]
480
481
482 class Normalizer(object):
483     """Normalization class which handles events and measurements"""
484
485     def __init__(self):
486         """Init"""
487         self.interval = None
488         self.collector = None
489         self.system = None
490         self.queue = None
491         self.timer = None
492
493     @classmethod
494     def read_configuration(cls, config_file):
495         """read YAML configuration file"""
496         # load YAML events/measurements definition
497         f = open(config_file, 'r')
498         doc_yaml = yaml.compose(f)
499         f.close()
500         # split events & measurements definitions
501         measurements, events = list(), list()
502         for key, value in doc_yaml.value:
503             if value.tag == Measurements.yaml_tag:
504                 measurements.append((key, value))
505             if value.tag == Events.yaml_tag:
506                 events.append((key, value))
507         measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
508                                              measurements)
509         measurements_stream = yaml.serialize(measurements_yaml)
510         events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
511         events_stream = yaml.serialize(events_yaml)
512         # return event & measurements definition
513         return events_stream, measurements_stream
514
515     def initialize(self, config_file, interval):
516         """Initialize the class"""
517         e, m = self.read_configuration(config_file)
518         self.measurements_stream = m
519         self.events_stream = e
520         self.system = System()
521         self.config = Config(interval)
522         self.interval = interval
523         # start collector with aging time = double interval
524         self.collector = Collector(interval * 2)
525         # initialize event thread
526         self.queue = queue.Queue()
527         self.event_thread = Thread(target=self.event_worker)
528         self.event_thread.daemon = True
529         self.event_thread.start()
530         # initialize measurements timer
531         self.start_timer()
532
533     def destroy(self):
534         """Destroy the class"""
535         self.collector.destroy()
536         self.post_event(None)  # send stop event
537         self.event_thread.join()
538         self.stop_timer()
539
540     def start_timer(self):
541         """Start measurements timer"""
542         self.timer = Timer(self.interval, self.on_timer)
543         self.timer.start()
544
545     def stop_timer(self):
546         """Stop measurements timer"""
547         self.timer.cancel()
548
549     def on_timer(self):
550         """Measurements timer"""
551         self.start_timer()
552         self.process_measurements()
553
554     def event_worker(self):
555         """Event worker"""
556         while True:
557             event = self.queue.get()
558             if isinstance(event, CollectdNotification):
559                 self.process_notify(event)
560                 continue
561             # exit for the worker
562             break
563
564     def get_collector(self):
565         """Get metric collector reference"""
566         return self.collector
567
568     def process_measurements(self):
569         """Process measurements"""
570         loader = Loader(self.measurements_stream)
571         setattr(loader, 'collector', self.collector)
572         setattr(loader, 'system', self.system)
573         setattr(loader, 'config', self.config)
574         measurements = loader.get_data()
575         for measurement_name in measurements:
576             logging.debug('Process "{}" measurements: {}'.format(
577                 measurement_name, measurements[measurement_name]))
578             for measurement in measurements[measurement_name]:
579                 self.send_data(measurement)
580
581     def process_notify(self, notification):
582         """Process events"""
583         loader = Loader(self.events_stream)
584         setattr(loader, 'notification', notification)
585         setattr(loader, 'system', self.system)
586         notifications = loader.get_data()
587         for notify_name in notifications:
588             logging.debug('Process "{}" notification'.format(notify_name))
589             if notifications[notify_name] is not None:
590                 self.send_data(notifications[notify_name])
591
592     def send_data(self, data):
593         """Send data"""
594         assert False, 'send_data() is abstract function and MUST be overridden'
595
596     def post_event(self, notification):
597         """Post notification into the queue to process"""
598         self.queue.put(notification)