2 # Copyright(c) 2017 Intel Corporation. All rights reserved.
4 # Licensed under the Apache License, Version 2.0 (the "License");
5 # you may not use this file except in compliance with the License.
6 # You may obtain a copy of the License at
8 # http://www.apache.org/licenses/LICENSE-2.0
10 # Unless required by applicable law or agreed to in writing, software
11 # distributed under the License is distributed on an "AS IS" BASIS,
12 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 # See the License for the specific language governing permissions and
14 # limitations under the License.
17 # Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
24 from threading import RLock
25 from threading import Timer
26 from threading import Thread
31 from yaml import CLoader as Loader
33 from yaml import Loader
35 # import synchronized queue
45 """Configuration class used to pass config option into YAML file"""
47 def __init__(self, interval):
48 self.interval = interval
52 """System class which provides information like host, time etc., into YAML
56 self.hostname = 'localhost'
61 self._id = self._id + 1
70 return datetime.date.today().isoformat()
73 class ItemIterator(object):
74 """Item iterator returned by Collector class"""
76 def __init__(self, collector, items):
77 """Item iterator init"""
78 logging.debug('{}:__init__()'.format(self.__class__.__name__))
80 self._collector = collector
84 """Returns next item from the list"""
85 if self._index == len(self._items):
87 curr_index = self._index
88 self._index = curr_index + 1
89 return self.items[curr_index]
91 def __getitem__(self, key):
92 """get item by index"""
93 return self._items[key]
96 """Return length of elements"""
97 return len(self._items)
100 """Destroy iterator and unlock the collector"""
101 logging.debug('{}:__del__()'.format(self.__class__.__name__))
102 self._collector.unlock()
105 class ItemObject(object):
106 """Item object returned by Collector class"""
108 def __init__(self, collector, hash_):
109 """Item object init"""
110 logging.debug('{}:__init__()'.format(self.__class__.__name__))
111 super(ItemObject, self).__setattr__('_collector', collector)
112 super(ItemObject, self).__setattr__('_hash', hash_)
114 def __setattr__(self, name, value):
115 t, item = self._collector._metrics[self._hash]
116 logging.debug('{}:__setattr__(name={}, value={})'.format(
117 self.__class__.__name__, name, value))
118 setattr(item, name, value)
119 self._collector._metrics[self._hash] = (time.time(), item)
122 """Destroy item object and unlock the collector"""
123 logging.debug('{}:__del__()'.format(self.__class__.__name__))
124 self._collector.unlock()
127 class Collector(object):
128 """Thread-safe collector with aging feature"""
130 def __init__(self, age_timeout):
134 self._age_timeout = age_timeout
135 self._start_age_timer()
137 def _start_age_timer(self):
138 """Start age timer"""
139 self._age_timer = Timer(self._age_timeout, self._on_timer)
140 self._age_timer.start()
142 def _stop_age_timer(self):
144 self._age_timer.cancel()
148 self._start_age_timer()
151 def _check_aging(self):
152 """Check aging time for all items"""
154 for data_hash, data in self._metrics.items():
156 if ((time.time() - age) >= self._age_timeout):
157 # aging time has expired, remove the item from the collector
158 logging.debug('{}:_check_aging():value={}'.format(
159 self.__class__.__name__, item))
160 self._metrics.pop(data_hash)
165 """Lock the collector"""
166 logging.debug('{}:lock()'.format(self.__class__.__name__))
170 """Unlock the collector"""
171 logging.debug('{}:unlock()'.format(self.__class__.__name__))
174 def get(self, hash_):
176 if hash_ in self._metrics:
177 return ItemObject(self, hash_)
182 """Add an item into the collector"""
184 logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
185 self._metrics[hash(item)] = (time.time(), item)
188 def items(self, select_list=[]):
189 """Returns locked (safe) item iterator"""
192 for k, item in self._metrics.items():
194 for select in select_list:
195 if value.match(**select):
196 metrics.append(value)
197 return ItemIterator(self, metrics)
200 """Destroy the collector"""
201 self._stop_age_timer()
204 class CollectdData(object):
205 """Base class for Collectd data"""
207 def __init__(self, host=None, plugin=None, plugin_instance=None,
208 type_=None, type_instance=None, time_=None):
209 """Class initialization"""
212 self.plugin_instance = plugin_instance
213 self.type_instance = type_instance
218 def is_regular_expression(cls, expr):
219 if len(expr) > 1 and expr[0] == '/' and expr[-1] == '/':
223 def match(self, **kargs):
225 for key, value in kargs.items():
226 if self.is_regular_expression(value):
227 if re.match(value[1:-1], getattr(self, key)) is None:
229 elif value != getattr(self, key):
231 # return match event if kargs is empty
235 class CollectdNotification(CollectdData):
236 """Collectd notification"""
238 def __init__(self, host=None, plugin=None, plugin_instance=None,
239 type_=None, type_instance=None, severity=None, message=None):
240 super(CollectdNotification, self).__init__(
241 host, plugin, plugin_instance, type_, type_instance)
242 self.severity = severity
243 self.message = message
246 return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
247 'type_instance={}, severity={}, message={}, time={})'.format(
248 self.__class__.__name__, self.host, self.plugin,
249 self.plugin_instance, self.type, self.type_instance,
250 self.severity, self.message, time)
253 class CollectdValue(CollectdData):
256 def __init__(self, host=None, plugin=None, plugin_instance=None,
257 type_=None, type_instance=None, ds_name='value', value=None,
259 super(CollectdValue, self).__init__(
260 host, plugin, plugin_instance, type_, type_instance)
262 self.ds_name = ds_name
263 self.interval = interval
266 def hash_gen(cls, host, plugin, plugin_instance, type_,
267 type_instance, ds_name):
268 return hash((host, plugin, plugin_instance, type_,
269 type_instance, ds_name))
271 def __eq__(self, other):
272 return hash(self) == hash(other) and self.value == other.value
275 return self.hash_gen(self.host, self.plugin, self.plugin_instance,
276 self.type, self.type_instance, self.ds_name)
279 return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
280 'type_instance={}, ds_name={}, value={}, time={})'.format(
281 self.__class__.__name__, self.host, self.plugin,
282 self.plugin_instance, self.type, self.type_instance,
283 self.ds_name, self.value, self.time)
286 class Item(yaml.YAMLObject):
287 """Base class to process tags like ArrayItem/ValueItem"""
290 def format_node(cls, mapping, metric):
292 'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
294 return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
295 elif mapping.tag == 'tag:yaml.org,2002:map':
297 for key, value in mapping.value:
298 values.append((yaml.ScalarNode(key.tag, key.value),
299 cls.format_node(value, metric)))
300 return yaml.MappingNode(mapping.tag, values)
301 elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
303 for seq in mapping.value:
305 for key, value in seq.value:
306 if key.value == 'SELECT':
307 map_values.append((yaml.ScalarNode(key.tag, key.value),
308 cls.format_node(value, metric)))
310 map_values.append((yaml.ScalarNode(key.tag, key.value),
312 values.append(yaml.MappingNode(seq.tag, map_values))
313 return yaml.SequenceNode(mapping.tag, values)
314 elif mapping.tag in [MapValue.yaml_tag]:
316 for key, value in mapping.value:
317 if key.value == 'VALUE':
318 values.append((yaml.ScalarNode(key.tag, key.value),
319 cls.format_node(value, metric)))
321 values.append((yaml.ScalarNode(key.tag, key.value), value))
322 return yaml.MappingNode(mapping.tag, values)
326 class ValueItem(Item):
327 """Class to process VlaueItem tag"""
328 yaml_tag = u'!ValueItem'
331 def from_yaml(cls, loader, node):
332 logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
333 default, select, value_desc = None, list(), None
334 # find value description
335 for elem in node.value:
336 for key, value in elem.value:
337 if key.value == 'VALUE':
338 assert value_desc is None, "VALUE key already set"
340 if key.value == 'SELECT':
341 select.append(loader.construct_mapping(value))
342 if key.value == 'DEFAULT':
343 assert default is None, "DEFAULT key already set"
344 default = loader.construct_object(value)
345 # if VALUE key isn't given, use default VALUE key
346 # format: `VALUE: !Number '{vl.value}'`
347 if value_desc is None:
348 value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
349 # select collectd metric based on SELECT condition
350 metrics = loader.collector.items(select)
351 assert len(metrics) < 2, \
352 'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
354 item = cls.format_node(value_desc, {'vl': metrics[0],
355 'system': loader.system})
356 return loader.construct_object(item)
357 # nothing has been found by SELECT condition, set to DEFAULT value.
358 assert default is not None, \
359 "No metrics selected by SELECT condition and DEFAULT key isn't set"
363 class ArrayItem(Item):
364 """Class to process ArrayItem tag"""
365 yaml_tag = u'!ArrayItem'
368 def from_yaml(cls, loader, node):
369 logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
372 # SequenceNode(tag=u'!ArrayItem', value=[
373 # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
374 # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
375 # MappingNode(tag=u'tag:yaml.org,2002:map', value=[
376 # (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
379 # ), (key, value), ... ])
381 assert isinstance(node, yaml.SequenceNode), \
382 "{} tag isn't YAML array".format(cls.__name__)
383 select, index_keys, items, item_desc = list(), list(), list(), None
384 for elem in node.value:
385 for key, value in elem.value:
386 if key.value == 'ITEM-DESC':
387 assert item_desc is None, "ITEM-DESC key already set"
389 if key.value == 'INDEX-KEY':
390 assert len(index_keys) == 0, "INDEX-KEY key already set"
391 index_keys = loader.construct_sequence(value)
392 if key.value == 'SELECT':
393 select.append(loader.construct_mapping(value))
394 # validate item description
395 assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
396 assert len(select) > 0 or len(index_keys) > 0, \
397 "Mandatory key (INDEX-KEY or SELECT) isn't set"
398 metrics = loader.collector.items(select)
399 # select metrics based on INDEX-KEY provided
400 if len(index_keys) > 0:
402 for metric in metrics:
404 for key in index_keys:
405 value_params[key] = getattr(metric, key)
406 metric_set.add(CollectdValue(**value_params))
407 metrics = list(metric_set)
408 # build items based on SELECT and/or INDEX-KEY criteria
409 for metric in metrics:
410 item = cls.format_node(item_desc,
411 {'vl': metric, 'system': loader.system,
412 'config': loader.config})
413 items.append(loader.construct_mapping(item))
417 class Measurements(ArrayItem):
418 """Class to process Measurements tag"""
419 yaml_tag = u'!Measurements'
423 """Class to process Events tag"""
424 yaml_tag = u'!Events'
427 def from_yaml(cls, loader, node):
428 condition, item_desc = dict(), None
429 for elem in node.value:
430 for key, value in elem.value:
431 if key.value == 'ITEM-DESC':
433 if key.value == 'CONDITION':
434 condition = loader.construct_mapping(value)
435 assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
436 if loader.notification.match(**condition):
437 item = cls.format_node(item_desc, {
438 'n': loader.notification, 'system': loader.system})
439 return loader.construct_mapping(item)
443 class Bytes2Kibibytes(yaml.YAMLObject):
444 """Class to process Bytes2Kibibytes tag"""
445 yaml_tag = u'!Bytes2Kibibytes'
448 def from_yaml(cls, loader, node):
449 return round(float(node.value) / 1024.0, 3)
452 class Number(yaml.YAMLObject):
453 """Class to process Number tag"""
454 yaml_tag = u'!Number'
457 def from_yaml(cls, loader, node):
459 return int(node.value)
461 return float(node.value)
464 class MapValue(yaml.YAMLObject):
465 """Class to process MapValue tag"""
466 yaml_tag = u'!MapValue'
469 def from_yaml(cls, loader, node):
470 mapping, val = None, None
471 for key, value in node.value:
472 if key.value == 'TO':
473 mapping = loader.construct_mapping(value)
474 if key.value == 'VALUE':
475 val = loader.construct_object(value)
476 assert mapping is not None, "Mandatory TO key isn't set"
477 assert val is not None, "Mandatory VALUE key isn't set"
478 assert val in mapping, \
479 'Value "{}" cannot be mapped to any of {} values'.format(
484 class Normalizer(object):
485 """Normalization class which handles events and measurements"""
490 self.collector = None
496 def read_configuration(cls, config_file):
497 """read YAML configuration file"""
498 # load YAML events/measurements definition
499 f = open(config_file, 'r')
500 doc_yaml = yaml.compose(f)
502 # split events & measurements definitions
503 measurements, events = list(), list()
504 for key, value in doc_yaml.value:
505 if value.tag == Measurements.yaml_tag:
506 measurements.append((key, value))
507 if value.tag == Events.yaml_tag:
508 events.append((key, value))
509 measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
511 measurements_stream = yaml.serialize(measurements_yaml)
512 events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
513 events_stream = yaml.serialize(events_yaml)
514 # return event & measurements definition
515 return events_stream, measurements_stream
517 def initialize(self, config_file, interval):
518 """Initialize the class"""
519 e, m = self.read_configuration(config_file)
520 self.measurements_stream = m
521 self.events_stream = e
522 self.system = System()
523 self.config = Config(interval)
524 self.interval = interval
525 # start collector with aging time = double interval
526 self.collector = Collector(interval * 2)
527 # initialize event thread
528 self.queue = queue.Queue()
529 self.event_thread = Thread(target=self.event_worker)
530 self.event_thread.daemon = True
531 self.event_thread.start()
532 # initialize measurements timer
536 """Destroy the class"""
537 self.collector.destroy()
538 self.post_event(None) # send stop event
539 self.event_thread.join()
542 def start_timer(self):
543 """Start measurements timer"""
544 self.timer = Timer(self.interval, self.on_timer)
547 def stop_timer(self):
548 """Stop measurements timer"""
552 """Measurements timer"""
554 self.process_measurements()
556 def event_worker(self):
559 event = self.queue.get()
560 if isinstance(event, CollectdNotification):
561 self.process_notify(event)
563 # exit for the worker
566 def get_collector(self):
567 """Get metric collector reference"""
568 return self.collector
570 def process_measurements(self):
571 """Process measurements"""
572 loader = Loader(self.measurements_stream)
573 setattr(loader, 'collector', self.collector)
574 setattr(loader, 'system', self.system)
575 setattr(loader, 'config', self.config)
576 measurements = loader.get_data()
577 for measurement_name in measurements:
578 logging.debug('Process "{}" measurements: {}'.format(
579 measurement_name, measurements[measurement_name]))
580 for measurement in measurements[measurement_name]:
581 self.send_data(measurement)
583 def process_notify(self, notification):
585 loader = Loader(self.events_stream)
586 setattr(loader, 'notification', notification)
587 setattr(loader, 'system', self.system)
588 notifications = loader.get_data()
589 for notify_name in notifications:
590 logging.debug('Process "{}" notification'.format(notify_name))
591 if notifications[notify_name] is not None:
592 self.send_data(notifications[notify_name])
594 def send_data(self, data):
596 assert False, 'send_data() is abstract function and MUST be overridden'
598 def post_event(self, notification):
599 """Post notification into the queue to process"""
600 self.queue.put(notification)