2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
31 from .yaml import CLoader as Loader
33 from .yaml import Loader
35 # import synchronized queue
40 """Configuration class used to pass config option into YAML file"""
42 def __init__(self, interval):
43 self.interval = interval
47 """System class which provides information like host, time etc., into YAML
51 self.hostname = 'localhost'
56 self._id = self._id + 1
65 return datetime.date.today().isoformat()
68 class ItemIterator(object):
69 """Item iterator returned by Collector class"""
71 def __init__(self, collector, items):
72 """Item iterator init"""
73 logging.debug('{}:__init__()'.format(self.__class__.__name__))
75 self._collector = collector
79 """Returns next item from the list"""
80 if self._index == len(self._items):
82 curr_index = self._index
83 self._index = curr_index + 1
84 return self.items[curr_index]
86 def __getitem__(self, key):
87 """get item by index"""
88 return self._items[key]
91 """Return length of elements"""
92 return len(self._items)
95 """Destroy iterator and unlock the collector"""
96 logging.debug('{}:__del__()'.format(self.__class__.__name__))
97 self._collector.unlock()
100 class ItemObject(object):
101 """Item object returned by Collector class"""
103 def __init__(self, collector, hash_):
104 """Item object init"""
105 logging.debug('{}:__init__()'.format(self.__class__.__name__))
106 super(ItemObject, self).__setattr__('_collector', collector)
107 super(ItemObject, self).__setattr__('_hash', hash_)
109 def __setattr__(self, name, value):
110 t, item = self._collector._metrics[self._hash]
111 logging.debug('{}:__setattr__(name={}, value={})'.format(
112 self.__class__.__name__, name, value))
113 setattr(item, name, value)
114 self._collector._metrics[self._hash] = (time.time(), item)
117 """Destroy item object and unlock the collector"""
118 logging.debug('{}:__del__()'.format(self.__class__.__name__))
119 self._collector.unlock()
122 class Collector(object):
123 """Thread-safe collector with aging feature"""
125 def __init__(self, age_timeout):
129 self._age_timeout = age_timeout
130 self._start_age_timer()
132 def _start_age_timer(self):
133 """Start age timer"""
134 self._age_timer = Timer(self._age_timeout, self._on_timer)
135 self._age_timer.start()
137 def _stop_age_timer(self):
139 self._age_timer.cancel()
143 self._start_age_timer()
146 def _check_aging(self):
147 """Check aging time for all items"""
149 for data_hash, data in list(self._metrics.items()):
151 if ((time.time() - age) >= self._age_timeout):
152 # aging time has expired, remove the item from the collector
153 logging.debug('{}:_check_aging():value={}'.format(
154 self.__class__.__name__, item))
155 self._metrics.pop(data_hash)
160 """Lock the collector"""
161 logging.debug('{}:lock()'.format(self.__class__.__name__))
165 """Unlock the collector"""
166 logging.debug('{}:unlock()'.format(self.__class__.__name__))
169 def get(self, hash_):
171 if hash_ in self._metrics:
172 return ItemObject(self, hash_)
177 """Add an item into the collector"""
179 logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
180 self._metrics[hash(item)] = (time.time(), item)
183 def items(self, select_list=[]):
184 """Returns locked (safe) item iterator"""
187 for k, item in list(self._metrics.items()):
189 for select in select_list:
190 if value.match(**select):
191 metrics.append(value)
192 return ItemIterator(self, metrics)
195 """Destroy the collector"""
196 self._stop_age_timer()
199 class CollectdData(object):
200 """Base class for Collectd data"""
202 def __init__(self, host=None, plugin=None, plugin_instance=None,
203 type_=None, type_instance=None, time_=None):
204 """Class initialization"""
207 self.plugin_instance = plugin_instance
208 self.type_instance = type_instance
213 def is_regular_expression(cls, expr):
214 return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
216 def match(self, **kargs):
218 for key, value in list(kargs.items()):
219 if self.is_regular_expression(value):
220 if re.match(value[1:-1], getattr(self, key)) is None:
222 elif value != getattr(self, key):
224 # return match event if kargs is empty
228 class CollectdNotification(CollectdData):
229 """Collectd notification"""
231 def __init__(self, host=None, plugin=None, plugin_instance=None,
232 type_=None, type_instance=None, severity=None, message=None):
233 super(CollectdNotification, self).__init__(
234 host, plugin, plugin_instance, type_, type_instance)
235 self.severity = severity
236 self.message = message
239 return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
240 'type_instance={}, severity={}, message={}, time={})'.format(
241 self.__class__.__name__, self.host, self.plugin,
242 self.plugin_instance, self.type, self.type_instance,
243 self.severity, self.message, time)
246 class CollectdValue(CollectdData):
249 def __init__(self, host=None, plugin=None, plugin_instance=None,
250 type_=None, type_instance=None, ds_name='value', value=None,
252 super(CollectdValue, self).__init__(
253 host, plugin, plugin_instance, type_, type_instance)
255 self.ds_name = ds_name
256 self.interval = interval
259 def hash_gen(cls, host, plugin, plugin_instance, type_,
260 type_instance, ds_name):
261 return hash((host, plugin, plugin_instance, type_,
262 type_instance, ds_name))
264 def __eq__(self, other):
265 return hash(self) == hash(other) and self.value == other.value
268 return self.hash_gen(self.host, self.plugin, self.plugin_instance,
269 self.type, self.type_instance, self.ds_name)
272 return '{}(host={}, plugin={}, plugin_instance={}, type={}, ' \
273 'type_instance={}, ds_name={}, value={}, time={})'.format(
274 self.__class__.__name__, self.host, self.plugin,
275 self.plugin_instance, self.type, self.type_instance,
276 self.ds_name, self.value, self.time)
279 class Item(yaml.YAMLObject):
280 """Base class to process tags like ArrayItem/ValueItem"""
283 def format_node(cls, mapping, metric):
285 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
286 Number.yaml_tag, StripExtraDash.yaml_tag]:
287 return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
288 elif mapping.tag == 'tag:yaml.org,2002:map':
290 for key, value in mapping.value:
291 values.append((yaml.ScalarNode(key.tag, key.value),
292 cls.format_node(value, metric)))
293 return yaml.MappingNode(mapping.tag, values)
294 elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
296 for seq in mapping.value:
298 for key, value in seq.value:
299 if key.value == 'SELECT':
300 map_values.append((yaml.ScalarNode(key.tag, key.value),
301 cls.format_node(value, metric)))
303 map_values.append((yaml.ScalarNode(key.tag, key.value),
305 values.append(yaml.MappingNode(seq.tag, map_values))
306 return yaml.SequenceNode(mapping.tag, values)
307 elif mapping.tag in [MapValue.yaml_tag]:
309 for key, value in mapping.value:
310 if key.value == 'VALUE':
311 values.append((yaml.ScalarNode(key.tag, key.value),
312 cls.format_node(value, metric)))
314 values.append((yaml.ScalarNode(key.tag, key.value), value))
315 return yaml.MappingNode(mapping.tag, values)
319 class ValueItem(Item):
320 """Class to process VlaueItem tag"""
321 yaml_tag = '!ValueItem'
324 def from_yaml(cls, loader, node):
325 logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
326 default, select, value_desc = None, list(), None
327 # find value description
328 for elem in node.value:
329 for key, value in elem.value:
330 if key.value == 'VALUE':
331 assert value_desc is None, "VALUE key already set"
333 if key.value == 'SELECT':
334 select.append(loader.construct_mapping(value))
335 if key.value == 'DEFAULT':
336 assert default is None, "DEFAULT key already set"
337 default = loader.construct_object(value)
338 # if VALUE key isn't given, use default VALUE key
339 # format: `VALUE: !Number '{vl.value}'`
340 if value_desc is None:
341 value_desc = yaml.ScalarNode(tag='!Number', value='{vl.value}')
342 # select collectd metric based on SELECT condition
343 metrics = loader.collector.items(select)
344 assert len(metrics) < 2, \
345 'Wrong SELECT condition {}, selected {} metrics'.format(
346 select, len(metrics))
348 item = cls.format_node(value_desc, {'vl': metrics[0],
349 'system': loader.system})
350 return loader.construct_object(item)
351 # nothing has been found by SELECT condition, set to DEFAULT value.
352 assert default is not None, "No metrics selected by SELECT condition" \
353 " {} and DEFAULT key isn't set".format(select)
357 class ArrayItem(Item):
358 """Class to process ArrayItem tag"""
359 yaml_tag = '!ArrayItem'
362 def from_yaml(cls, loader, node):
363 logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
366 # SequenceNode(tag=u'!ArrayItem', value=[
367 # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
368 # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
369 # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
370 # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
373 # ), (key, value), ... ])
375 assert isinstance(node, yaml.SequenceNode), \
376 "{} tag isn't YAML array".format(cls.__name__)
377 select, index_keys, items, item_desc = list(), list(), list(), None
378 for elem in node.value:
379 for key, value in elem.value:
380 if key.value == 'ITEM-DESC':
381 assert item_desc is None, "ITEM-DESC key already set"
383 if key.value == 'INDEX-KEY':
384 assert len(index_keys) == 0, "INDEX-KEY key already set"
385 index_keys = loader.construct_sequence(value)
386 if key.value == 'SELECT':
387 select.append(loader.construct_mapping(value))
388 # validate item description
389 assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
390 assert len(select) > 0 or len(index_keys) > 0, \
391 "Mandatory key (INDEX-KEY or SELECT) isn't set"
392 metrics = loader.collector.items(select)
393 # select metrics based on INDEX-KEY provided
394 if len(index_keys) > 0:
396 for metric in metrics:
397 value = CollectdValue()
398 for key in index_keys:
399 setattr(value, key, getattr(metric, key))
400 metric_set.add(value)
401 metrics = list(metric_set)
402 # build items based on SELECT and/or INDEX-KEY criteria
403 for metric in metrics:
404 item = cls.format_node(item_desc,
405 {'vl': metric, 'system': loader.system,
406 'config': loader.config})
407 items.append(loader.construct_mapping(item))
411 class Measurements(ArrayItem):
412 """Class to process Measurements tag"""
413 yaml_tag = '!Measurements'
417 """Class to process Events tag"""
421 def from_yaml(cls, loader, node):
422 condition, item_desc = dict(), None
423 for elem in node.value:
424 for key, value in elem.value:
425 if key.value == 'ITEM-DESC':
427 if key.value == 'CONDITION':
428 condition = loader.construct_mapping(value)
429 assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
430 if loader.notification.match(**condition):
431 item = cls.format_node(item_desc, {
432 'n': loader.notification, 'system': loader.system})
433 return loader.construct_mapping(item)
437 class Bytes2Kibibytes(yaml.YAMLObject):
438 """Class to process Bytes2Kibibytes tag"""
439 yaml_tag = '!Bytes2Kibibytes'
442 def from_yaml(cls, loader, node):
443 return round(float(node.value) / 1024.0, 3)
446 class Number(yaml.YAMLObject):
447 """Class to process Number tag"""
451 def from_yaml(cls, loader, node):
453 return int(node.value)
455 return float(node.value)
458 class StripExtraDash(yaml.YAMLObject):
459 """Class to process StripExtraDash tag"""
460 yaml_tag = '!StripExtraDash'
463 def from_yaml(cls, loader, node):
464 return '-'.join([x for x in node.value.split('-') if len(x) > 0])
467 class MapValue(yaml.YAMLObject):
468 """Class to process MapValue tag"""
469 yaml_tag = '!MapValue'
472 def from_yaml(cls, loader, node):
473 mapping, val = None, None
474 for key, value in node.value:
475 if key.value == 'TO':
476 mapping = loader.construct_mapping(value)
477 if key.value == 'VALUE':
478 val = loader.construct_object(value)
479 assert mapping is not None, "Mandatory TO key isn't set"
480 assert val is not None, "Mandatory VALUE key isn't set"
481 assert val in mapping, \
482 'Value "{}" cannot be mapped to any of {} values'.format(
483 val, list(mapping.keys()))
487 class Normalizer(object):
488 """Normalization class which handles events and measurements"""
493 self.collector = None
499 def read_configuration(cls, config_file):
500 """read YAML configuration file"""
501 # load YAML events/measurements definition
502 f = open(config_file, 'r')
503 doc_yaml = yaml.compose(f)
505 # split events & measurements definitions
506 measurements, events = list(), list()
507 for key, value in doc_yaml.value:
508 if value.tag == Measurements.yaml_tag:
509 measurements.append((key, value))
510 if value.tag == Events.yaml_tag:
511 events.append((key, value))
512 measurements_yaml = yaml.MappingNode('tag:yaml.org,2002:map',
514 measurements_stream = yaml.serialize(measurements_yaml)
515 events_yaml = yaml.MappingNode('tag:yaml.org,2002:map', events)
516 events_stream = yaml.serialize(events_yaml)
517 # return event & measurements definition
518 return events_stream, measurements_stream
520 def initialize(self, config_file, interval):
521 """Initialize the class"""
522 e, m = self.read_configuration(config_file)
523 self.measurements_stream = m
524 self.events_stream = e
525 self.system = System()
526 self.config = Config(interval)
527 self.interval = interval
528 # start collector with aging time = double interval
529 self.collector = Collector(interval * 2)
530 # initialize event thread
531 self.queue = queue.Queue()
532 self.event_thread = Thread(target=self.event_worker)
533 self.event_thread.daemon = True
534 self.event_thread.start()
535 # initialize measurements timer
539 """Destroy the class"""
540 self.collector.destroy()
541 self.post_event(None) # send stop event
542 self.event_thread.join()
545 def start_timer(self):
546 """Start measurements timer"""
547 self.timer = Timer(self.interval, self.on_timer)
550 def stop_timer(self):
551 """Stop measurements timer"""
555 """Measurements timer"""
557 self.process_measurements()
559 def event_worker(self):
562 event = self.queue.get()
563 if isinstance(event, CollectdNotification):
564 self.process_notify(event)
566 # exit for the worker
569 def get_collector(self):
570 """Get metric collector reference"""
571 return self.collector
573 def process_measurements(self):
574 """Process measurements"""
575 loader = Loader(self.measurements_stream)
576 setattr(loader, 'collector', self.collector)
577 setattr(loader, 'system', self.system)
578 setattr(loader, 'config', self.config)
579 measurements = loader.get_data()
580 for measurement_name in measurements:
581 logging.debug('Process "{}" measurements: {}'.format(
582 measurement_name, measurements[measurement_name]))
583 for measurement in measurements[measurement_name]:
584 self.send_data(measurement)
586 def process_notify(self, notification):
588 loader = Loader(self.events_stream)
589 setattr(loader, 'notification', notification)
590 setattr(loader, 'system', self.system)
591 notifications = loader.get_data()
592 for notify_name in notifications:
593 logging.debug('Process "{}" notification'.format(notify_name))
594 if notifications[notify_name] is not None:
595 self.send_data(notifications[notify_name])
597 def send_data(self, data):
599 assert False, 'send_data() is abstract function and MUST be overridden'
601 def post_event(self, notification):
602 """Post notification into the queue to process"""
603 self.queue.put(notification)