2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
31 from yaml import CLoader as Loader
33 from yaml import Loader
35 # import synchronized queue
45 """Configuration class used to pass config option into YAML file"""
47 def __init__(self, interval):
48 self.interval = interval
52 """System class which provides information like host, time etc., into YAML
56 self.hostname = 'localhost'
61 self._id = self._id + 1
70 return datetime.date.today().isoformat()
73 class ItemIterator(object):
74 """Item iterator returned by Collector class"""
76 def __init__(self, collector, items):
77 """Item iterator init"""
78 logging.debug('{}:__init__()'.format(self.__class__.__name__))
80 self._collector = collector
84 """Returns next item from the list"""
85 if self._index == len(self._items):
87 curr_index = self._index
88 self._index = curr_index + 1
89 return self.items[curr_index]
91 def __getitem__(self, key):
92 """get item by index"""
93 return self._items[key]
96 """Return length of elements"""
97 return len(self._items)
100 """Destroy iterator and unlock the collector"""
101 logging.debug('{}:__del__()'.format(self.__class__.__name__))
102 self._collector.unlock()
105 class ItemObject(object):
106 """Item object returned by Collector class"""
108 def __init__(self, collector, hash_):
109 """Item object init"""
110 logging.debug('{}:__init__()'.format(self.__class__.__name__))
111 super(ItemObject, self).__setattr__('_collector', collector)
112 super(ItemObject, self).__setattr__('_hash', hash_)
114 def __setattr__(self, name, value):
115 t, item = self._collector._metrics[self._hash]
116 logging.debug('{}:__setattr__(name={}, value={})'.format(
117 self.__class__.__name__, name, value))
118 setattr(item, name, value)
119 self._collector._metrics[self._hash] = (time.time(), item)
122 """Destroy item object and unlock the collector"""
123 logging.debug('{}:__del__()'.format(self.__class__.__name__))
124 self._collector.unlock()
127 class Collector(object):
128 """Thread-safe collector with aging feature"""
130 def __init__(self, age_timeout):
134 self._age_timeout = age_timeout
135 self._start_age_timer()
137 def _start_age_timer(self):
138 """Start age timer"""
139 self._age_timer = Timer(self._age_timeout, self._on_timer)
140 self._age_timer.start()
142 def _stop_age_timer(self):
144 self._age_timer.cancel()
148 self._start_age_timer()
151 def _check_aging(self):
152 """Check aging time for all items"""
154 for data_hash, data in self._metrics.items():
156 if ((time.time() - age) >= self._age_timeout):
157 # aging time has expired, remove the item from the collector
158 logging.debug('{}:_check_aging():value={}'.format(
159 self.__class__.__name__, item))
160 self._metrics.pop(data_hash)
165 """Lock the collector"""
166 logging.debug('{}:lock()'.format(self.__class__.__name__))
170 """Unlock the collector"""
171 logging.debug('{}:unlock()'.format(self.__class__.__name__))
174 def get(self, hash_):
176 if hash_ in self._metrics:
177 return ItemObject(self, hash_)
182 """Add an item into the collector"""
184 logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
185 self._metrics[hash(item)] = (time.time(), item)
188 def items(self, select_list=[]):
189 """Returns locked (safe) item iterator"""
192 for k, item in self._metrics.items():
194 for select in select_list:
195 if value.match(**select):
196 metrics.append(value)
197 return ItemIterator(self, metrics)
200 """Destroy the collector"""
201 self._stop_age_timer()
204 class CollectdData(object):
205 """Base class for Collectd data"""
207 def __init__(self, host=None, plugin=None, plugin_instance=None,
208 type_=None, type_instance=None, time_=None):
209 """Class initialization"""
212 self.plugin_instance = plugin_instance
213 self.type_instance = type_instance
218 def is_regular_expression(cls, expr):
219 return len(expr) > 1 and expr[0] == '/' and expr[-1] == '/'
221 def match(self, **kargs):
223 for key, value in kargs.items():
224 if self.is_regular_expression(value):
225 if re.match(value[1:-1], getattr(self, key)) is None:
227 elif value != getattr(self, key):
229 # return match event if kargs is empty
233 class CollectdNotification(CollectdData):
234 """Collectd notification"""
236 def __init__(self, host=None, plugin=None, plugin_instance=None,
237 type_=None, type_instance=None, severity=None, message=None):
238 super(CollectdNotification, self).__init__(
239 host, plugin, plugin_instance, type_, type_instance)
240 self.severity = severity
241 self.message = message
244 return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
245 'type_instance={}, severity={}, message={}, time={})'.format(
246 self.__class__.__name__, self.host, self.plugin,
247 self.plugin_instance, self.type, self.type_instance,
248 self.severity, self.message, time)
251 class CollectdValue(CollectdData):
254 def __init__(self, host=None, plugin=None, plugin_instance=None,
255 type_=None, type_instance=None, ds_name='value', value=None,
257 super(CollectdValue, self).__init__(
258 host, plugin, plugin_instance, type_, type_instance)
260 self.ds_name = ds_name
261 self.interval = interval
264 def hash_gen(cls, host, plugin, plugin_instance, type_,
265 type_instance, ds_name):
266 return hash((host, plugin, plugin_instance, type_,
267 type_instance, ds_name))
269 def __eq__(self, other):
270 return hash(self) == hash(other) and self.value == other.value
273 return self.hash_gen(self.host, self.plugin, self.plugin_instance,
274 self.type, self.type_instance, self.ds_name)
277 return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
278 'type_instance={}, ds_name={}, value={}, time={})'.format(
279 self.__class__.__name__, self.host, self.plugin,
280 self.plugin_instance, self.type, self.type_instance,
281 self.ds_name, self.value, self.time)
284 class Item(yaml.YAMLObject):
285 """Base class to process tags like ArrayItem/ValueItem"""
288 def format_node(cls, mapping, metric):
290 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
291 Number.yaml_tag, StripExtraDash.yaml_tag]:
292 return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
293 elif mapping.tag == 'tag:yaml.org,2002:map':
295 for key, value in mapping.value:
296 values.append((yaml.ScalarNode(key.tag, key.value),
297 cls.format_node(value, metric)))
298 return yaml.MappingNode(mapping.tag, values)
299 elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
301 for seq in mapping.value:
303 for key, value in seq.value:
304 if key.value == 'SELECT':
305 map_values.append((yaml.ScalarNode(key.tag, key.value),
306 cls.format_node(value, metric)))
308 map_values.append((yaml.ScalarNode(key.tag, key.value),
310 values.append(yaml.MappingNode(seq.tag, map_values))
311 return yaml.SequenceNode(mapping.tag, values)
312 elif mapping.tag in [MapValue.yaml_tag]:
314 for key, value in mapping.value:
315 if key.value == 'VALUE':
316 values.append((yaml.ScalarNode(key.tag, key.value),
317 cls.format_node(value, metric)))
319 values.append((yaml.ScalarNode(key.tag, key.value), value))
320 return yaml.MappingNode(mapping.tag, values)
324 class ValueItem(Item):
325 """Class to process VlaueItem tag"""
326 yaml_tag = u'!ValueItem'
329 def from_yaml(cls, loader, node):
330 logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
331 default, select, value_desc = None, list(), None
332 # find value description
333 for elem in node.value:
334 for key, value in elem.value:
335 if key.value == 'VALUE':
336 assert value_desc is None, "VALUE key already set"
338 if key.value == 'SELECT':
339 select.append(loader.construct_mapping(value))
340 if key.value == 'DEFAULT':
341 assert default is None, "DEFAULT key already set"
342 default = loader.construct_object(value)
343 # if VALUE key isn't given, use default VALUE key
344 # format: `VALUE: !Number '{vl.value}'`
345 if value_desc is None:
346 value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
347 # select collectd metric based on SELECT condition
348 metrics = loader.collector.items(select)
349 assert len(metrics) < 2, \
350 'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
352 item = cls.format_node(value_desc, {'vl': metrics[0],
353 'system': loader.system})
354 return loader.construct_object(item)
355 # nothing has been found by SELECT condition, set to DEFAULT value.
356 assert default is not None, \
357 "No metrics selected by SELECT condition and DEFAULT key isn't set"
361 class ArrayItem(Item):
362 """Class to process ArrayItem tag"""
363 yaml_tag = u'!ArrayItem'
366 def from_yaml(cls, loader, node):
367 logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
370 # SequenceNode(tag=u'!ArrayItem', value=[
371 # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
372 # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
373 # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
374 # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
377 # ), (key, value), ... ])
379 assert isinstance(node, yaml.SequenceNode), \
380 "{} tag isn't YAML array".format(cls.__name__)
381 select, index_keys, items, item_desc = list(), list(), list(), None
382 for elem in node.value:
383 for key, value in elem.value:
384 if key.value == 'ITEM-DESC':
385 assert item_desc is None, "ITEM-DESC key already set"
387 if key.value == 'INDEX-KEY':
388 assert len(index_keys) == 0, "INDEX-KEY key already set"
389 index_keys = loader.construct_sequence(value)
390 if key.value == 'SELECT':
391 select.append(loader.construct_mapping(value))
392 # validate item description
393 assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
394 assert len(select) > 0 or len(index_keys) > 0, \
395 "Mandatory key (INDEX-KEY or SELECT) isn't set"
396 metrics = loader.collector.items(select)
397 # select metrics based on INDEX-KEY provided
398 if len(index_keys) > 0:
400 for metric in metrics:
402 for key in index_keys:
403 value_params[key] = getattr(metric, key)
404 metric_set.add(CollectdValue(**value_params))
405 metrics = list(metric_set)
406 # build items based on SELECT and/or INDEX-KEY criteria
407 for metric in metrics:
408 item = cls.format_node(item_desc,
409 {'vl': metric, 'system': loader.system,
410 'config': loader.config})
411 items.append(loader.construct_mapping(item))
415 class Measurements(ArrayItem):
416 """Class to process Measurements tag"""
417 yaml_tag = u'!Measurements'
421 """Class to process Events tag"""
422 yaml_tag = u'!Events'
425 def from_yaml(cls, loader, node):
426 condition, item_desc = dict(), None
427 for elem in node.value:
428 for key, value in elem.value:
429 if key.value == 'ITEM-DESC':
431 if key.value == 'CONDITION':
432 condition = loader.construct_mapping(value)
433 assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
434 if loader.notification.match(**condition):
435 item = cls.format_node(item_desc, {
436 'n': loader.notification, 'system': loader.system})
437 return loader.construct_mapping(item)
441 class Bytes2Kibibytes(yaml.YAMLObject):
442 """Class to process Bytes2Kibibytes tag"""
443 yaml_tag = u'!Bytes2Kibibytes'
446 def from_yaml(cls, loader, node):
447 return round(float(node.value) / 1024.0, 3)
450 class Number(yaml.YAMLObject):
451 """Class to process Number tag"""
452 yaml_tag = u'!Number'
455 def from_yaml(cls, loader, node):
457 return int(node.value)
459 return float(node.value)
462 class StripExtraDash(yaml.YAMLObject):
463 """Class to process StripExtraDash tag"""
464 yaml_tag = u'!StripExtraDash'
467 def from_yaml(cls, loader, node):
468 return '-'.join([ x for x in node.value.split('-') if len(x) > 0])
471 class MapValue(yaml.YAMLObject):
472 """Class to process MapValue tag"""
473 yaml_tag = u'!MapValue'
476 def from_yaml(cls, loader, node):
477 mapping, val = None, None
478 for key, value in node.value:
479 if key.value == 'TO':
480 mapping = loader.construct_mapping(value)
481 if key.value == 'VALUE':
482 val = loader.construct_object(value)
483 assert mapping is not None, "Mandatory TO key isn't set"
484 assert val is not None, "Mandatory VALUE key isn't set"
485 assert val in mapping, \
486 'Value "{}" cannot be mapped to any of {} values'.format(
491 class Normalizer(object):
492 """Normalization class which handles events and measurements"""
497 self.collector = None
503 def read_configuration(cls, config_file):
504 """read YAML configuration file"""
505 # load YAML events/measurements definition
506 f = open(config_file, 'r')
507 doc_yaml = yaml.compose(f)
509 # split events & measurements definitions
510 measurements, events = list(), list()
511 for key, value in doc_yaml.value:
512 if value.tag == Measurements.yaml_tag:
513 measurements.append((key, value))
514 if value.tag == Events.yaml_tag:
515 events.append((key, value))
516 measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
518 measurements_stream = yaml.serialize(measurements_yaml)
519 events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
520 events_stream = yaml.serialize(events_yaml)
521 # return event & measurements definition
522 return events_stream, measurements_stream
524 def initialize(self, config_file, interval):
525 """Initialize the class"""
526 e, m = self.read_configuration(config_file)
527 self.measurements_stream = m
528 self.events_stream = e
529 self.system = System()
530 self.config = Config(interval)
531 self.interval = interval
532 # start collector with aging time = double interval
533 self.collector = Collector(interval * 2)
534 # initialize event thread
535 self.queue = queue.Queue()
536 self.event_thread = Thread(target=self.event_worker)
537 self.event_thread.daemon = True
538 self.event_thread.start()
539 # initialize measurements timer
543 """Destroy the class"""
544 self.collector.destroy()
545 self.post_event(None) # send stop event
546 self.event_thread.join()
549 def start_timer(self):
550 """Start measurements timer"""
551 self.timer = Timer(self.interval, self.on_timer)
554 def stop_timer(self):
555 """Stop measurements timer"""
559 """Measurements timer"""
561 self.process_measurements()
563 def event_worker(self):
566 event = self.queue.get()
567 if isinstance(event, CollectdNotification):
568 self.process_notify(event)
570 # exit for the worker
573 def get_collector(self):
574 """Get metric collector reference"""
575 return self.collector
577 def process_measurements(self):
578 """Process measurements"""
579 loader = Loader(self.measurements_stream)
580 setattr(loader, 'collector', self.collector)
581 setattr(loader, 'system', self.system)
582 setattr(loader, 'config', self.config)
583 measurements = loader.get_data()
584 for measurement_name in measurements:
585 logging.debug('Process "{}" measurements: {}'.format(
586 measurement_name, measurements[measurement_name]))
587 for measurement in measurements[measurement_name]:
588 self.send_data(measurement)
590 def process_notify(self, notification):
592 loader = Loader(self.events_stream)
593 setattr(loader, 'notification', notification)
594 setattr(loader, 'system', self.system)
595 notifications = loader.get_data()
596 for notify_name in notifications:
597 logging.debug('Process "{}" notification'.format(notify_name))
598 if notifications[notify_name] is not None:
599 self.send_data(notifications[notify_name])
601 def send_data(self, data):
603 assert False, 'send_data() is abstract function and MUST be overridden'
605 def post_event(self, notification):
606 """Post notification into the queue to process"""
607 self.queue.put(notification)