VES: Implement YAML configuration support 71/42771/2
authorMytnyk, Volodymyr <volodymyrx.mytnyk@intel.com>
Tue, 8 Aug 2017 17:07:15 +0000 (18:07 +0100)
committerMaryam Tahhan <maryam.tahhan@intel.com>
Tue, 3 Oct 2017 09:12:30 +0000 (09:12 +0000)
- Added YAML schema config file
- Moved host metrics into additionalMeasurements
- Moved VNF (virt) metrics into additionalFields

Signed-off-by: Mytnyk, Volodymyr <volodymyrx.mytnyk@intel.com>
Change-Id: I35e5e20750c375a61928ce3c925287f1b9cabdca

3rd_party/collectd-ves-app/ves_app/host.yaml [new file with mode: 0644]
3rd_party/collectd-ves-app/ves_app/normalizer.py [new file with mode: 0644]
3rd_party/collectd-ves-app/ves_app/ves_app.py
3rd_party/collectd-ves-app/ves_app/ves_app_config.conf

diff --git a/3rd_party/collectd-ves-app/ves_app/host.yaml b/3rd_party/collectd-ves-app/ves_app/host.yaml
new file mode 100644 (file)
index 0000000..a91574c
--- /dev/null
@@ -0,0 +1,214 @@
+---
+# Common event header definition (required fields and defaults)
+commonEventHeader: &commonEventHeader
+  domain: N/A
+  eventId: "{system.id}"
+  eventName: ""
+  eventType: Info
+  lastEpochMicrosec: 0
+  priority: Normal
+  reportingEntityId: &reportingEntityId "{system.hostname}"
+  reportingEntityName: *reportingEntityId
+  sequence: 0
+  sourceName: N/A
+  startEpochMicrosec: 0
+  version: 2.0
+
+# Value mapping (used to map collectd notification severity to VES)
+collectdSeverityMapping: &collectdSeverityMapping
+  NOTIF_FAILURE: CRITICAL
+  NOTIF_WARNING: WARNING
+  NOTIF_OKAY: NORMAL
+
+# Measurements definition
+Host Measurements: !Measurements
+  - ITEM-DESC:
+      event:
+        commonEventHeader:
+          <<: *commonEventHeader
+          eventType: hostOS
+          domain: measurementsForVfScaling
+          sourceId: &sourceId "{vl.plugin_instance}"
+          sourceName: *sourceId
+          startEpochMicrosec: !Number "{vl.time}"
+        measurementsForVfScalingFields:
+          measurementsForVfScalingVersion: 2.0
+          additionalFields: !ArrayItem
+            - SELECT:
+                plugin: virt
+                plugin_instance: "{vl.plugin_instance}"
+                type: "/^(?!memory|virt_vcpu|disk_octets|disk_ops|if_packets|if_errors|if_octets|if_dropped).*$/"
+            - ITEM-DESC:
+                name: "{vl.type}-{vl.type_instance}-{vl.ds_name}"
+                value: "{vl.value}"
+          additionalMeasurements: !ArrayItem
+            - SELECT:
+                plugin: "/^(?!virt).*$/"
+            - INDEX-KEY:
+                - plugin
+                - plugin_instance
+            - ITEM-DESC:
+                name: "{vl.plugin}-{vl.plugin_instance}"
+                arrayOfFields: !ArrayItem
+                  - SELECT:
+                      plugin: "{vl.plugin}"
+                      plugin_instance: "{vl.plugin_instance}"
+                  - ITEM-DESC:
+                      name: "{vl.type}-{vl.type_instance}-{vl.ds_name}"
+                      value: "{vl.value}"
+          measurementInterval: !Number "{vl.interval}"
+          memoryUsageArray: !ArrayItem
+            - SELECT:
+                plugin: virt
+                plugin_instance: "{vl.plugin_instance}"
+                type: memory
+                type_instance: total
+            - ITEM-DESC:
+                memoryConfigured: !Bytes2Kibibytes "{vl.value}"
+                vmIdentifier: "{vl.plugin_instance}"
+                memoryUsed: 0.0
+                memoryFree: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: memory
+                      type_instance: rss
+                  - VALUE: !Bytes2Kibibytes "{vl.value}"
+                  - DEFAULT: 0
+          cpuUsageArray: !ArrayItem
+            - SELECT:
+                plugin: virt
+                plugin_instance: "{vl.plugin_instance}"
+                type: virt_vcpu
+            - ITEM-DESC:
+                cpuIdentifier: "{vl.type_instance}"
+                percentUsage: !Number "{vl.value}"
+          vNicPerformanceArray: !ArrayItem
+            - SELECT:
+                plugin: virt
+                plugin_instance: "{vl.plugin_instance}"
+                type: if_packets
+                ds_name: rx
+            - ITEM-DESC:
+                valuesAreSuspect: "true"
+                vNicIdentifier: "{vl.type_instance}"
+                receivedTotalPacketsAccumulated: !Number "{vl.value}"
+                transmittedTotalPacketsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_packets
+                      type_instance: "{vl.type_instance}"
+                      ds_name: tx
+                receivedOctetsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_octets
+                      type_instance: "{vl.type_instance}"
+                      ds_name: rx
+                transmittedOctetsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_octets
+                      type_instance: "{vl.type_instance}"
+                      ds_name: tx
+                receivedErrorPacketsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_errors
+                      type_instance: "{vl.type_instance}"
+                      ds_name: rx
+                transmittedErrorPacketsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_errors
+                      type_instance: "{vl.type_instance}"
+                      ds_name: tx
+                receivedDiscardedPacketsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_dropped
+                      type_instance: "{vl.type_instance}"
+                      ds_name: rx
+                transmittedDiscardedPacketsAccumulated: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: if_dropped
+                      type_instance: "{vl.type_instance}"
+                      ds_name: tx
+          diskUsageArray: !ArrayItem
+            - SELECT:
+                plugin: virt
+                plugin_instance: "{vl.plugin_instance}"
+                type: disk_octets
+                ds_name: read
+            - ITEM-DESC:
+                diskIdentifier: "{vl.type_instance}"
+                diskOctetsReadLast: !Number "{vl.value}"
+                diskOctetsWriteLast: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: disk_octets
+                      type_instance: "{vl.type_instance}"
+                      ds_name: write
+                diskOpsReadLast: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: disk_ops
+                      type_instance: "{vl.type_instance}"
+                      ds_name: read
+                diskOpsWriteLast: !ValueItem
+                  - SELECT:
+                      plugin: virt
+                      plugin_instance: "{vl.plugin_instance}"
+                      type: disk_ops
+                      type_instance: "{vl.type_instance}"
+                      ds_name: write
+  - SELECT:
+      plugin: virt
+      type_instance: virt_cpu_total
+
+Virt Events: !Events
+  - ITEM-DESC:
+      event:
+        commonEventHeader: &event_commonEventHeader
+          <<: *commonEventHeader
+          domain: fault
+          eventType: Notification
+          sourceId: &event_sourceId "{n.plugin_instance}"
+          sourceName: *event_sourceId
+          lastEpochMicrosec: !Number "{n.time}"
+          startEpochMicrosec: !Number "{n.time}"
+        faultFields: &faultFields
+          alarmInterfaceA: "{n.plugin}-{n.plugin_instance}"
+          alarmCondition: "{n.message}"
+          eventSeverity: !MapValue
+            VALUE: "{n.severity}"
+            TO: *collectdSeverityMapping
+          eventSourceType: hypervisor
+          faultFieldsVersion: 1.1
+          specificProblem: "{n.plugin_instance}-{n.type_instance}"
+          vfStatus: Active
+  - CONDITION:
+      plugin: virt
+
+Host Events: !Events
+  - ITEM-DESC:
+      event:
+        commonEventHeader:
+          <<: *event_commonEventHeader
+          sourceId: "{system.hostname}"
+          sourceName: "{system.hostname}"
+        faultFields:
+          <<: *faultFields
+          eventSourceType: host
+  - CONDITION:
+      plugin: "/^(?!virt).*$/"
diff --git a/3rd_party/collectd-ves-app/ves_app/normalizer.py b/3rd_party/collectd-ves-app/ves_app/normalizer.py
new file mode 100644 (file)
index 0000000..899c850
--- /dev/null
@@ -0,0 +1,598 @@
+#
+# Copyright(c) 2017 Intel Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Authors:
+#   Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
+#
+
+import yaml
+import logging
+import datetime
+import time
+from threading import RLock
+from threading import Timer
+from threading import Thread
+import re
+
+# import YAML loader
+try:
+    from yaml import CLoader as Loader
+except ImportError:
+    from yaml import Loader
+
+# import synchronized queue
+try:
+    # python 2.x
+    import Queue as queue
+except ImportError:
+    # python 3.x
+    import queue
+
+
+class Config(object):
+    """Configuration class used to pass config option into YAML file"""
+
+    def __init__(self, interval):
+        self.interval = interval
+
+
+class System(object):
+    """System class which provides information like host, time etc., into YAML
+    file"""
+
+    def __init__(self):
+        self.hostname = 'localhost'
+        self._id = 0
+
+    @property
+    def id(self):
+        self._id = self._id + 1
+        return self._id
+
+    @property
+    def time(self):
+        return time.time()
+
+    @property
+    def date(self):
+        return datetime.date.today().isoformat()
+
+
+class ItemIterator(object):
+    """Item iterator returned by Collector class"""
+
+    def __init__(self, collector, items):
+        """Item iterator init"""
+        logging.debug('{}:__init__()'.format(self.__class__.__name__))
+        self._items = items
+        self._collector = collector
+        self._index = 0
+
+    def next(self):
+        """Returns next item from the list"""
+        if self._index == len(self._items):
+            raise StopIteration
+        curr_index = self._index
+        self._index = curr_index + 1
+        return self.items[curr_index]
+
+    def __getitem__(self, key):
+        """get item by index"""
+        return self._items[key]
+
+    def __len__(self):
+        """Return length of elements"""
+        return len(self._items)
+
+    def __del__(self):
+        """Destroy iterator and unlock the collector"""
+        logging.debug('{}:__del__()'.format(self.__class__.__name__))
+        self._collector.unlock()
+
+
+class ItemObject(object):
+    """Item object returned by Collector class"""
+
+    def __init__(self, collector, hash_):
+        """Item object init"""
+        logging.debug('{}:__init__()'.format(self.__class__.__name__))
+        super(ItemObject, self).__setattr__('_collector', collector)
+        super(ItemObject, self).__setattr__('_hash', hash_)
+
+    def __setattr__(self, name, value):
+        t, item = self._collector._metrics[self._hash]
+        logging.debug('{}:__setattr__(name={}, value={})'.format(
+                      self.__class__.__name__, name, value))
+        setattr(item, name, value)
+        self._collector._metrics[self._hash] = (time.time(), item)
+
+    def __del__(self):
+        """Destroy item object and unlock the collector"""
+        logging.debug('{}:__del__()'.format(self.__class__.__name__))
+        self._collector.unlock()
+
+
+class Collector(object):
+    """Thread-safe collector with aging feature"""
+
+    def __init__(self, age_timeout):
+        """Initialization"""
+        self._metrics = {}
+        self._lock = RLock()
+        self._age_timeout = age_timeout
+        self._start_age_timer()
+
+    def _start_age_timer(self):
+        """Start age timer"""
+        self._age_timer = Timer(self._age_timeout, self._on_timer)
+        self._age_timer.start()
+
+    def _stop_age_timer(self):
+        """Stop age timer"""
+        self._age_timer.cancel()
+
+    def _on_timer(self):
+        """Age timer"""
+        self._start_age_timer()
+        self._check_aging()
+
+    def _check_aging(self):
+        """Check aging time for all items"""
+        self.lock()
+        for data_hash, data in self._metrics.items():
+            age, item = data
+            if ((time.time() - age) >= self._age_timeout):
+                # aging time has expired, remove the item from the collector
+                logging.debug('{}:_check_aging():value={}'.format(
+                              self.__class__.__name__, item))
+                self._metrics.pop(data_hash)
+                del(item)
+        self.unlock()
+
+    def lock(self):
+        """Lock the collector"""
+        logging.debug('{}:lock()'.format(self.__class__.__name__))
+        self._lock.acquire()
+
+    def unlock(self):
+        """Unlock the collector"""
+        logging.debug('{}:unlock()'.format(self.__class__.__name__))
+        self._lock.release()
+
+    def get(self, hash_):
+        self.lock()
+        if hash_ in self._metrics:
+            return ItemObject(self, hash_)
+        self.unlock()
+        return None
+
+    def add(self, item):
+        """Add an item into the collector"""
+        self.lock()
+        logging.debug('{}:add(item={})'.format(self.__class__.__name__, item))
+        self._metrics[hash(item)] = (time.time(), item)
+        self.unlock()
+
+    def items(self, select_list=[]):
+        """Returns locked (safe) item iterator"""
+        metrics = []
+        self.lock()
+        for k, item in self._metrics.items():
+            _, value = item
+            for select in select_list:
+                if value.match(**select):
+                    metrics.append(value)
+        return ItemIterator(self, metrics)
+
+    def destroy(self):
+        """Destroy the collector"""
+        self._stop_age_timer()
+
+
+class CollectdData(object):
+    """Base class for Collectd data"""
+
+    def __init__(self, host=None, plugin=None, plugin_instance=None,
+                 type_=None, type_instance=None, time_=None):
+        """Class initialization"""
+        self.host = host
+        self.plugin = plugin
+        self.plugin_instance = plugin_instance
+        self.type_instance = type_instance
+        self.type = type_
+        self.time = time_
+
+    @classmethod
+    def is_regular_expression(cls, expr):
+        return True if expr[0] == '/' and expr[-1] == '/' else False
+
+    def match(self, **kargs):
+        # compare the metric
+        for key, value in kargs.items():
+            if self.is_regular_expression(value):
+                if re.match(value[1:-1], getattr(self, key)) is None:
+                    return False
+            elif value != getattr(self, key):
+                return False
+        # return match event if kargs is empty
+        return True
+
+
+class CollectdNotification(CollectdData):
+    """Collectd notification"""
+
+    def __init__(self, host=None, plugin=None, plugin_instance=None,
+                 type_=None, type_instance=None, severity=None, message=None):
+        super(CollectdNotification, self).__init__(
+            host, plugin, plugin_instance, type_, type_instance)
+        self.severity = severity
+        self.message = message
+
+    def __repr__(self):
+        return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
+               'type_instance={}, severity={}, message={}, time={})'.format(
+                   self.__class__.__name__, self.host, self.plugin,
+                   self.plugin_instance, self.type, self.type_instance,
+                   self.severity, self.message, time)
+
+
+class CollectdValue(CollectdData):
+    """Collectd value"""
+
+    def __init__(self, host=None, plugin=None, plugin_instance=None,
+                 type_=None, type_instance=None, ds_name='value', value=None,
+                 interval=None):
+        super(CollectdValue, self).__init__(
+            host, plugin, plugin_instance, type_, type_instance)
+        self.value = value
+        self.ds_name = ds_name
+        self.interval = interval
+
+    @classmethod
+    def hash_gen(cls, host, plugin, plugin_instance, type_,
+                 type_instance, ds_name):
+        return hash((host, plugin, plugin_instance, type_,
+                    type_instance, ds_name))
+
+    def __eq__(self, other):
+        return hash(self) == hash(other) and self.value == other.value
+
+    def __hash__(self):
+        return self.hash_gen(self.host, self.plugin, self.plugin_instance,
+                             self.type, self.type_instance, self.ds_name)
+
+    def __repr__(self):
+        return '{}(host={}, plugin={}, plugin_instance={}, type={},' \
+               'type_instance={}, ds_name={}, value={}, time={})'.format(
+                   self.__class__.__name__, self.host, self.plugin,
+                   self.plugin_instance, self.type, self.type_instance,
+                   self.ds_name, self.value, self.time)
+
+
+class Item(yaml.YAMLObject):
+    """Base class to process tags like ArrayItem/ValueItem"""
+
+    @classmethod
+    def format_node(cls, mapping, metric):
+        if mapping.tag in [
+                'tag:yaml.org,2002:str', Bytes2Kibibytes.yaml_tag,
+                Number.yaml_tag]:
+            return yaml.ScalarNode(mapping.tag, mapping.value.format(**metric))
+        elif mapping.tag == 'tag:yaml.org,2002:map':
+            values = []
+            for key, value in mapping.value:
+                values.append((yaml.ScalarNode(key.tag, key.value),
+                              cls.format_node(value, metric)))
+            return yaml.MappingNode(mapping.tag, values)
+        elif mapping.tag in [ArrayItem.yaml_tag, ValueItem.yaml_tag]:
+            values = []
+            for seq in mapping.value:
+                map_values = list()
+                for key, value in seq.value:
+                    if key.value == 'SELECT':
+                        map_values.append((yaml.ScalarNode(key.tag, key.value),
+                                          cls.format_node(value, metric)))
+                    else:
+                        map_values.append((yaml.ScalarNode(key.tag, key.value),
+                                          value))
+                values.append(yaml.MappingNode(seq.tag, map_values))
+            return yaml.SequenceNode(mapping.tag, values)
+        elif mapping.tag in [MapValue.yaml_tag]:
+            values = []
+            for key, value in mapping.value:
+                if key.value == 'VALUE':
+                    values.append((yaml.ScalarNode(key.tag, key.value),
+                                  cls.format_node(value, metric)))
+                else:
+                    values.append((yaml.ScalarNode(key.tag, key.value), value))
+            return yaml.MappingNode(mapping.tag, values)
+        return mapping
+
+
+class ValueItem(Item):
+    """Class to process VlaueItem tag"""
+    yaml_tag = u'!ValueItem'
+
+    @classmethod
+    def from_yaml(cls, loader, node):
+        logging.debug('{}:from_yaml(loader={})'.format(cls.__name__, loader))
+        default, select, value_desc = None, list(), None
+        # find value description
+        for elem in node.value:
+            for key, value in elem.value:
+                if key.value == 'VALUE':
+                    assert value_desc is None, "VALUE key already set"
+                    value_desc = value
+                if key.value == 'SELECT':
+                    select.append(loader.construct_mapping(value))
+                if key.value == 'DEFAULT':
+                    assert default is None, "DEFAULT key already set"
+                    default = loader.construct_object(value)
+        # if VALUE key isn't given, use default VALUE key
+        # format: `VALUE: !Number '{vl.value}'`
+        if value_desc is None:
+            value_desc = yaml.ScalarNode(tag=u'!Number', value=u'{vl.value}')
+        # select collectd metric based on SELECT condition
+        metrics = loader.collector.items(select)
+        assert len(metrics) < 2, \
+            'Wrong SELECT condition, selected {} metrics'.format(len(metrics))
+        if len(metrics) > 0:
+            item = cls.format_node(value_desc, {'vl': metrics[0],
+                                   'system': loader.system})
+            return loader.construct_object(item)
+        # nothing has been found by SELECT condition, set to DEFAULT value.
+        assert default is not None, \
+            "No metrics selected by SELECT condition and DEFAULT key isn't set"
+        return default
+
+
+class ArrayItem(Item):
+    """Class to process ArrayItem tag"""
+    yaml_tag = u'!ArrayItem'
+
+    @classmethod
+    def from_yaml(cls, loader, node):
+        logging.debug('{}:process(loader={}, node={})'.format(cls.__name__,
+                      loader, node))
+        # e.g.:
+        # SequenceNode(tag=u'!ArrayItem', value=[
+        #   MappingNode(tag=u'tag:yaml.org,2002:map', value=[
+        #     (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'SELECT'),
+        #       MappingNode(tag=u'tag:yaml.org,2002:map', value=[
+        #         (ScalarNode(tag=u'tag:yaml.org,2002:str', value=u'plugin'),
+        #           , ...)
+        #       ]), ...
+        #     ), (key, value), ... ])
+        #   , ... ])
+        assert isinstance(node, yaml.SequenceNode), \
+            "{} tag isn't YAML array".format(cls.__name__)
+        select, index_keys, items, item_desc = list(), list(), list(), None
+        for elem in node.value:
+            for key, value in elem.value:
+                if key.value == 'ITEM-DESC':
+                    assert item_desc is None, "ITEM-DESC key already set"
+                    item_desc = value
+                if key.value == 'INDEX-KEY':
+                    assert len(index_keys) == 0, "INDEX-KEY key already set"
+                    index_keys = loader.construct_sequence(value)
+                if key.value == 'SELECT':
+                    select.append(loader.construct_mapping(value))
+        # validate item description
+        assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
+        assert len(select) > 0 or len(index_keys) > 0, \
+            "Mandatory key (INDEX-KEY or SELECT) isn't set"
+        metrics = loader.collector.items(select)
+        # select metrics based on INDEX-KEY provided
+        if len(index_keys) > 0:
+            metric_set = set()
+            for metric in metrics:
+                value_params = {}
+                for key in index_keys:
+                    value_params[key] = getattr(metric, key)
+                metric_set.add(CollectdValue(**value_params))
+            metrics = list(metric_set)
+        # build items based on SELECT and/or INDEX-KEY criteria
+        for metric in metrics:
+            item = cls.format_node(item_desc,
+                                   {'vl': metric, 'system': loader.system,
+                                       'config': loader.config})
+            items.append(loader.construct_mapping(item))
+        return items
+
+
+class Measurements(ArrayItem):
+    """Class to process Measurements tag"""
+    yaml_tag = u'!Measurements'
+
+
+class Events(Item):
+    """Class to process Events tag"""
+    yaml_tag = u'!Events'
+
+    @classmethod
+    def from_yaml(cls, loader, node):
+        condition, item_desc = dict(), None
+        for elem in node.value:
+            for key, value in elem.value:
+                if key.value == 'ITEM-DESC':
+                    item_desc = value
+                if key.value == 'CONDITION':
+                    condition = loader.construct_mapping(value)
+        assert item_desc is not None, "Mandatory ITEM-DESC key isn't set"
+        if loader.notification.match(**condition):
+            item = cls.format_node(item_desc, {
+                'n': loader.notification, 'system': loader.system})
+            return loader.construct_mapping(item)
+        return None
+
+
+class Bytes2Kibibytes(yaml.YAMLObject):
+    """Class to process Bytes2Kibibytes tag"""
+    yaml_tag = u'!Bytes2Kibibytes'
+
+    @classmethod
+    def from_yaml(cls, loader, node):
+        return round(float(node.value) / 1024.0, 3)
+
+
+class Number(yaml.YAMLObject):
+    """Class to process Number tag"""
+    yaml_tag = u'!Number'
+
+    @classmethod
+    def from_yaml(cls, loader, node):
+        try:
+            return int(node.value)
+        except ValueError:
+            return float(node.value)
+
+
+class MapValue(yaml.YAMLObject):
+    """Class to process MapValue tag"""
+    yaml_tag = u'!MapValue'
+
+    @classmethod
+    def from_yaml(cls, loader, node):
+        mapping, val = None, None
+        for key, value in node.value:
+            if key.value == 'TO':
+                mapping = loader.construct_mapping(value)
+            if key.value == 'VALUE':
+                val = loader.construct_object(value)
+        assert mapping is not None, "Mandatory TO key isn't set"
+        assert val is not None, "Mandatory VALUE key isn't set"
+        assert val in mapping, \
+            'Value "{}" cannot be mapped to any of {} values'.format(
+                val, mapping.keys())
+        return mapping[val]
+
+
+class Normalizer(object):
+    """Normalization class which handles events and measurements"""
+
+    def __init__(self):
+        """Init"""
+        self.interval = None
+        self.collector = None
+        self.system = None
+        self.queue = None
+        self.timer = None
+
+    @classmethod
+    def read_configuration(cls, config_file):
+        """read YAML configuration file"""
+        # load YAML events/measurements definition
+        f = open(config_file, 'r')
+        doc_yaml = yaml.compose(f)
+        f.close()
+        # split events & measurements definitions
+        measurements, events = list(), list()
+        for key, value in doc_yaml.value:
+            if value.tag == Measurements.yaml_tag:
+                measurements.append((key, value))
+            if value.tag == Events.yaml_tag:
+                events.append((key, value))
+        measurements_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map',
+                                             measurements)
+        measurements_stream = yaml.serialize(measurements_yaml)
+        events_yaml = yaml.MappingNode(u'tag:yaml.org,2002:map', events)
+        events_stream = yaml.serialize(events_yaml)
+        # return event & measurements definition
+        return events_stream, measurements_stream
+
+    def initialize(self, config_file, interval):
+        """Initialize the class"""
+        e, m = self.read_configuration(config_file)
+        self.measurements_stream = m
+        self.events_stream = e
+        self.system = System()
+        self.config = Config(interval)
+        self.interval = interval
+        # start collector with aging time = double interval
+        self.collector = Collector(interval * 2)
+        # initialize event thread
+        self.queue = queue.Queue()
+        self.event_thread = Thread(target=self.event_worker)
+        self.event_thread.daemon = True
+        self.event_thread.start()
+        # initialize measurements timer
+        self.start_timer()
+
+    def destroy(self):
+        """Destroy the class"""
+        self.collector.destroy()
+        self.post_event(None)  # send stop event
+        self.event_thread.join()
+        self.stop_timer()
+
+    def start_timer(self):
+        """Start measurements timer"""
+        self.timer = Timer(self.interval, self.on_timer)
+        self.timer.start()
+
+    def stop_timer(self):
+        """Stop measurements timer"""
+        self.timer.cancel()
+
+    def on_timer(self):
+        """Measurements timer"""
+        self.start_timer()
+        self.process_measurements()
+
+    def event_worker(self):
+        """Event worker"""
+        while True:
+            event = self.queue.get()
+            if isinstance(event, CollectdNotification):
+                self.process_notify(event)
+                continue
+            # exit for the worker
+            break
+
+    def get_collector(self):
+        """Get metric collector reference"""
+        return self.collector
+
+    def process_measurements(self):
+        """Process measurements"""
+        loader = Loader(self.measurements_stream)
+        setattr(loader, 'collector', self.collector)
+        setattr(loader, 'system', self.system)
+        setattr(loader, 'config', self.config)
+        measurements = loader.get_data()
+        for measurement_name in measurements:
+            logging.debug('Process "{}" measurements: {}'.format(
+                measurement_name, measurements[measurement_name]))
+            for measurement in measurements[measurement_name]:
+                self.send_data(measurement)
+
+    def process_notify(self, notification):
+        """Process events"""
+        loader = Loader(self.events_stream)
+        setattr(loader, 'notification', notification)
+        setattr(loader, 'system', self.system)
+        notifications = loader.get_data()
+        for notify_name in notifications:
+            logging.debug('Process "{}" notification'.format(notify_name))
+            if notifications[notify_name] is not None:
+                self.send_data(notifications[notify_name])
+
+    def send_data(self, data):
+        """Send data"""
+        assert False, 'send_data() is abstract function and MUST be overridden'
+
+    def post_event(self, notification):
+        """Post notification into the queue to process"""
+        self.queue.put(notification)
index 49e7635..105c66e 100644 (file)
@@ -22,522 +22,23 @@ import argparse
 from distutils.util import strtobool
 from kafka import KafkaConsumer
 
+from normalizer import Normalizer
+from normalizer import CollectdValue
+
 try:
     # For Python 3.0 and later
     import urllib.request as url
 except ImportError:
     # Fall back to Python 2's urllib2
     import urllib2 as url
-import socket
-import time
-from threading import Timer
-from threading import Lock
-
-
-class Event(object):
-    """Event header"""
-
-    def __init__(self):
-        """Construct the common header"""
-        self.version = 2.0
-        self.event_type = "Info"  # use "Info" unless a notification is generated
-        self.domain = ""
-        self.event_id = ""
-        self.source_id = ""
-        self.source_name = ""
-        self.functional_role = ""
-        self.reporting_entity_id = ""
-        self.reporting_entity_name = ""
-        self.priority = "Normal"  # will be derived from event if there is one
-        self.start_epoch_microsec = 0
-        self.last_epoch_micro_sec = 0
-        self.sequence = 0
-        self.event_name = ""
-        self.internal_header_fields = {}
-        self.nfc_naming_code = ""
-        self.nf_naming_code = ""
-
-    def get_json(self):
-        """Get the object of the datatype"""
-        obj = {}
-        obj['version'] = self.version
-        obj['eventType'] = self.event_type
-        obj['domain'] = self.domain
-        obj['eventId'] = self.event_id
-        obj['sourceId'] = self.source_id
-        obj['sourceName'] = self.source_name
-        obj['reportingEntityId'] = self.reporting_entity_id
-        obj['reportingEntityName'] = self.reporting_entity_name
-        obj['priority'] = self.priority
-        obj['startEpochMicrosec'] = self.start_epoch_microsec
-        obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
-        obj['sequence'] = self.sequence
-        obj['eventName'] = self.event_name
-        obj['internalHeaderFields'] = self.internal_header_fields
-        obj['nfcNamingCode'] = self.nfc_naming_code
-        obj['nfNamingCode'] = self.nf_naming_code
-        return json.dumps({
-            'event': {
-                'commonEventHeader': obj,
-                self.get_name(): self.get_obj()
-            }
-        }).encode()
-
-    def get_name():
-        assert False, 'abstract method get_name() is not implemented'
-
-    def get_obj():
-        assert False, 'abstract method get_obj() is not implemented'
-
-
-class Field(object):
-    """field datatype"""
-
-    def __init__(self, name, value):
-        self.name = name
-        self.value = value
-
-    def get_obj(self):
-        return {
-            'name': self.name,
-            'value': self.value
-        }
-
-
-class NamedArrayOfFields(object):
-    """namedArrayOfFields datatype"""
-
-    def __init__(self, name):
-        self.name = name
-        self.array_of_fields = []
-
-    def add(self, field):
-        self.array_of_fields.append(field.get_obj())
-
-    def get_obj(self):
-        return {
-            'name': self.name,
-            'arrayOfFields': self.array_of_fields
-        }
-
-
-class VESDataType(object):
-    """ Base VES datatype """
-
-    def set_optional(self, obj, key, val):
-        if val is not None:
-            obj[key] = val
-
-
-class DiskUsage(VESDataType):
-    """diskUsage datatype"""
 
-    def __init__(self, identifier):
-        self.disk_identifier = identifier
-        self.disk_io_time_avg = None
-        self.disk_io_time_last = None
-        self.disk_io_time_max = None
-        self.disk_io_time_min = None
-        self.disk_merged_read_avg = None
-        self.disk_merged_read_last = None
-        self.disk_merged_read_max = None
-        self.disk_merged_read_min = None
-        self.disk_merged_write_avg = None
-        self.disk_merged_write_last = None
-        self.disk_merged_write_max = None
-        self.disk_merged_write_min = None
-        self.disk_octets_read_avg = None
-        self.disk_octets_read_last = None
-        self.disk_octets_read_max = None
-        self.disk_octets_read_min = None
-        self.disk_octets_write_avg = None
-        self.disk_octets_write_last = None
-        self.disk_octets_write_max = None
-        self.disk_octets_write_min = None
-        self.disk_ops_read_avg = None
-        self.disk_ops_read_last = None
-        self.disk_ops_read_max = None
-        self.disk_ops_read_min = None
-        self.disk_ops_write_avg = None
-        self.disk_ops_write_last = None
-        self.disk_ops_write_max = None
-        self.disk_ops_write_min = None
-        self.disk_pending_operations_avg = None
-        self.disk_pending_operations_last = None
-        self.disk_pending_operations_max = None
-        self.disk_pending_operations_min = None
-        self.disk_time_read_avg = None
-        self.disk_time_read_last = None
-        self.disk_time_read_max = None
-        self.disk_time_read_min = None
-        self.disk_time_write_avg = None
-        self.disk_time_write_last = None
-        self.disk_time_write_max = None
-        self.disk_time_write_min = None
-
-    def get_obj(self):
-        obj = {
-            # required
-            'diskIdentifier': self.disk_identifier
-        }
-        self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg)
-        self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last)
-        self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max)
-        self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min)
-        self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg)
-        self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last)
-        self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max)
-        self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min)
-        self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg)
-        self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last)
-        self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max)
-        self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min)
-        self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg)
-        self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last)
-        self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max)
-        self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min)
-        self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg)
-        self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last)
-        self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max)
-        self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min)
-        self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg)
-        self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last)
-        self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max)
-        self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min)
-        self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg)
-        self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last)
-        self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max)
-        self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min)
-        self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg)
-        self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last)
-        self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max)
-        self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min)
-        self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg)
-        self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last)
-        self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max)
-        self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min)
-        self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg)
-        self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last)
-        self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max)
-        self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min)
-        return obj
-
-
-class VNicPerformance(VESDataType):
-    """vNicPerformance datatype"""
-
-    def __init__(self, identifier):
-        self.received_broadcast_packets_accumulated = None
-        self.received_broadcast_packets_delta = None
-        self.received_discarded_packets_accumulated = None
-        self.received_discarded_packets_delta = None
-        self.received_error_packets_accumulated = None
-        self.received_error_packets_delta = None
-        self.received_multicast_packets_accumulated = None
-        self.received_multicast_packets_delta = None
-        self.received_octets_accumulated = None
-        self.received_octets_delta = None
-        self.received_total_packets_accumulated = None
-        self.received_total_packets_delta = None
-        self.received_unicast_packets_accumulated = None
-        self.received_unicast_packets_delta = None
-        self.transmitted_broadcast_packets_accumulated = None
-        self.transmitted_broadcast_packets_delta = None
-        self.transmitted_discarded_packets_accumulated = None
-        self.transmitted_discarded_packets_delta = None
-        self.transmitted_error_packets_accumulated = None
-        self.transmitted_error_packets_delta = None
-        self.transmitted_multicast_packets_accumulated = None
-        self.transmitted_multicast_packets_delta = None
-        self.transmitted_octets_accumulated = None
-        self.transmitted_octets_delta = None
-        self.transmitted_total_packets_accumulated = None
-        self.transmitted_total_packets_delta = None
-        self.transmitted_unicast_packets_accumulated = None
-        self.transmitted_unicast_packets_delta = None
-        self.values_are_suspect = 'true'
-        self.v_nic_identifier = identifier
-
-    def get_obj(self):
-        obj = {
-            # required
-            'valuesAreSuspect': self.values_are_suspect,
-            'vNicIdentifier': self.v_nic_identifier
-        }
-        # optional
-        self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated)
-        self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta)
-        self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated)
-        self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta)
-        self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated)
-        self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta)
-        self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated)
-        self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta)
-        self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated)
-        self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta)
-        self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated)
-        self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta)
-        self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated)
-        self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta)
-        self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated)
-        self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta)
-        self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated)
-        self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta)
-        self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated)
-        self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta)
-        self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated)
-        self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta)
-        self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated)
-        self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta)
-        self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated)
-        self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta)
-        self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated)
-        self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta)
-        return obj
-
-
-class CpuUsage(VESDataType):
-    """cpuUsage datatype"""
-
-    def __init__(self, identifier):
-        self.cpu_identifier = identifier
-        self.cpu_idle = None
-        self.cpu_usage_interrupt = None
-        self.cpu_usage_nice = None
-        self.cpu_usage_soft_irq = None
-        self.cpu_usage_steal = None
-        self.cpu_usage_system = None
-        self.cpu_usage_user = None
-        self.cpu_wait = None
-        self.percent_usage = 0
-
-    def get_obj(self):
-        obj = {
-            # required
-            'cpuIdentifier': self.cpu_identifier,
-            'percentUsage': self.percent_usage
-        }
-        # optional
-        self.set_optional(obj, 'cpuIdle', self.cpu_idle)
-        self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt)
-        self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice)
-        self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq)
-        self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal)
-        self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system)
-        self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user)
-        self.set_optional(obj, 'cpuWait', self.cpu_wait)
-        return obj
-
-
-class MemoryUsage(VESDataType):
-    """memoryUsage datatype"""
-
-    def __init__(self, identifier):
-        self.memory_buffered = None
-        self.memory_cached = None
-        self.memory_configured = None
-        self.memory_free = None
-        self.memory_slab_recl = None
-        self.memory_slab_unrecl = None
-        self.memory_used = None
-        self.vm_identifier = identifier
-
-    def __str__(self):
-        """ for debug purposes """
-        return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \
-               'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \
-               'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \
-               'used : {used}\n'.format(buffered=self.memory_buffered,
-                                        cached=self.memory_cached, configured=self.memory_configured,
-                                        free=self.memory_free, slab_recl=self.memory_slab_recl,
-                                        slab_unrecl=self.memory_slab_unrecl, used=self.memory_used,
-                                        vm_identifier=self.vm_identifier)
-
-    def get_memory_free(self):
-        if self.memory_free is None:
-            # calculate the free memory
-            if None not in (self.memory_configured, self.memory_used):
-                return self.memory_configured - self.memory_used
-            else:
-                # required field, so return zero
-                return 0
-        else:
-            return self.memory_free
-
-    def get_memory_used(self):
-        if self.memory_used is None:
-            # calculate the memory used
-            if None not in (self.memory_configured, self.memory_free, self.memory_buffered,
-                            self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
-                return self.memory_configured - (self.memory_free +
-                                                 self.memory_buffered + self.memory_cached +
-                                                 self.memory_slab_recl + self.memory_slab_unrecl)
-            else:
-                # required field, so return zero
-                return 0
-        else:
-            return self.memory_used
-
-    def get_memory_total(self):
-        if self.memory_configured is None:
-            # calculate the total memory
-            if None not in (self.memory_used, self.memory_free, self.memory_buffered,
-                            self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
-                return (self.memory_used + self.memory_free +
-                        self.memory_buffered + self.memory_cached +
-                        self.memory_slab_recl + self.memory_slab_unrecl)
-            else:
-                return None
-        else:
-            return self.memory_configured
-
-    def get_obj(self):
-        obj = {
-            # required fields
-            'memoryFree': self.get_memory_free(),
-            'memoryUsed': self.get_memory_used(),
-            'vmIdentifier': self.vm_identifier
-        }
-        # optional fields
-        self.set_optional(obj, 'memoryBuffered', self.memory_buffered)
-        self.set_optional(obj, 'memoryCached', self.memory_cached)
-        self.set_optional(obj, 'memoryConfigured', self.memory_configured)
-        self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl)
-        self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl)
-        return obj
 
-
-class MeasurementsForVfScaling(Event):
-    """MeasurementsForVfScaling datatype"""
-
-    def __init__(self, event_id):
-        """Construct the header"""
-        super(MeasurementsForVfScaling, self).__init__()
-        # common attributes
-        self.domain = "measurementsForVfScaling"
-        self.event_type = 'hostOS'
-        self.event_id = event_id
-        # measurement attributes
-        self.additional_measurements = []
-        self.codec_usage_array = []
-        self.concurrent_sessions = 0
-        self.configured_entities = 0
-        self.cpu_usage_array = []
-        self.feature_usage_array = []
-        self.filesystem_usage_array = []
-        self.latency_distribution = []
-        self.mean_request_latency = 0
-        self.measurement_interval = 0
-        self.number_of_media_ports_in_use = 0
-        self.request_rate = 0
-        self.vnfc_scaling_metric = 0
-        self.additional_fields = []
-        self.additional_objects = []
-        self.disk_usage_array = []
-        self.measurements_for_vf_scaling_version = 2.0
-        self.memory_usage_array = []
-        self.v_nic_performance_array = []
-
-    def add_additional_measurement(self, named_array):
-        self.additional_measurements.append(named_array.get_obj())
-
-    def add_additional_fields(self, field):
-        self.additional_fields.append(field.get_obj())
-
-    def add_memory_usage(self, mem_usage):
-        self.memory_usage_array.append(mem_usage.get_obj())
-
-    def add_cpu_usage(self, cpu_usage):
-        self.cpu_usage_array.append(cpu_usage.get_obj())
-
-    def add_v_nic_performance(self, nic_performance):
-        self.v_nic_performance_array.append(nic_performance.get_obj())
-
-    def add_disk_usage(self, disk_usage):
-        self.disk_usage_array.append(disk_usage.get_obj())
-
-    def get_obj(self):
-        """Get the object of the datatype"""
-        obj = {}
-        obj['additionalMeasurements'] = self.additional_measurements
-        obj['codecUsageArray'] = self.codec_usage_array
-        obj['concurrentSessions'] = self.concurrent_sessions
-        obj['configuredEntities'] = self.configured_entities
-        obj['cpuUsageArray'] = self.cpu_usage_array
-        obj['featureUsageArray'] = self.feature_usage_array
-        obj['filesystemUsageArray'] = self.filesystem_usage_array
-        obj['latencyDistribution'] = self.latency_distribution
-        obj['meanRequestLatency'] = self.mean_request_latency
-        obj['measurementInterval'] = self.measurement_interval
-        obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
-        obj['requestRate'] = self.request_rate
-        obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
-        obj['additionalFields'] = self.additional_fields
-        obj['additionalObjects'] = self.additional_objects
-        obj['diskUsageArray'] = self.disk_usage_array
-        obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version
-        obj['memoryUsageArray'] = self.memory_usage_array
-        obj['vNicPerformanceArray'] = self.v_nic_performance_array
-        return obj
-
-    def get_name(self):
-        """Name of datatype"""
-        return "measurementsForVfScalingFields"
-
-
-class Fault(Event):
-    """Fault datatype"""
-
-    def __init__(self, event_id):
-        """Construct the header"""
-        super(Fault, self).__init__()
-        # common attributes
-        self.domain = "fault"
-        self.event_id = event_id
-        self.event_type = "Fault"
-        # fault attributes
-        self.fault_fields_version = 1.1
-        self.event_severity = 'NORMAL'
-        self.event_source_type = 'other(0)'
-        self.alarm_condition = ''
-        self.specific_problem = ''
-        self.vf_status = 'Active'
-        self.alarm_interface_a = ''
-        self.alarm_additional_information = []
-        self.event_category = ""
-
-    def get_name(self):
-        """Name of datatype"""
-        return 'faultFields'
-
-    def get_obj(self):
-        """Get the object of the datatype"""
-        obj = {}
-        obj['faultFieldsVersion'] = self.fault_fields_version
-        obj['eventSeverity'] = self.event_severity
-        obj['eventSourceType'] = self.event_source_type
-        obj['alarmCondition'] = self.alarm_condition
-        obj['specificProblem'] = self.specific_problem
-        obj['vfStatus'] = self.vf_status
-        obj['alarmInterfaceA'] = self.alarm_interface_a
-        obj['alarmAdditionalInformation'] = self.alarm_additional_information
-        obj['eventCategory'] = self.event_category
-        return obj
-
-
-class VESApp(object):
+class VESApp(Normalizer):
     """VES Application"""
 
     def __init__(self):
         """Application initialization"""
-        self.__plugin_data_cache = {
-            'cpu': {'interval': 0.0, 'vls': []},
-            'virt': {'interval': 0.0, 'vls': []},
-            'disk': {'interval': 0.0, 'vls': []},
-            'interface': {'interval': 0.0, 'vls': []},
-            'memory': {'interval': 0.0, 'vls': []}
-        }
-        self.__app_config = {
+        self._app_config = {
             'Domain': '127.0.0.1',
             'Port': 30000,
             'Path': '',
@@ -546,270 +47,56 @@ class VESApp(object):
             'Topic': '',
             'UseHttps': False,
             'SendEventInterval': 20.0,
-            'FunctionalRole': 'Collectd VES Agent',
             'ApiVersion': 5.1,
             'KafkaPort': 9092,
             'KafkaBroker': 'localhost'
         }
-        self.__host_name = None
-        self.__ves_timer = None
-        self.__lock = Lock()
-        self.__event_id = 0
-
-    def get_event_id(self):
-        """get event id"""
-        self.__event_id += 1
-        return str(self.__event_id)
-
-    def lock(self):
-        """Lock the application"""
-        self.__lock.acquire()
-
-    def unlock(self):
-        """Unlock the application"""
-        self.__lock.release()
-
-    def start_timer(self):
-        """Start event timer"""
-        self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time)
-        self.__ves_timer.start()
-
-    def stop_timer(self):
-        """Stop event timer"""
-        self.__ves_timer.cancel()
-
-    def __on_time(self):
-        """Timer thread"""
-        self.event_timer()
-        self.start_timer()
 
-    def event_send(self, event):
+    def send_data(self, event):
         """Send event to VES"""
         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
-            's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'],
-            int(self.__app_config['Port']), '{}'.format(
-                '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''),
-            int(self.__app_config['ApiVersion']), '{}'.format(
-                '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else ''))
+            's' if self._app_config['UseHttps'] else '',
+            self._app_config['Domain'], int(self._app_config['Port']),
+            '{}'.format('/{}'.format(self._app_config['Path']) if len(
+                self._app_config['Path']) > 0 else ''),
+            int(self._app_config['ApiVersion']), '{}'.format(
+                '/{}'.format(self._app_config['Topic']) if len(
+                    self._app_config['Topic']) > 0 else ''))
         logging.info('Vendor Event Listener is at: {}'.format(server_url))
         credentials = base64.b64encode('{}:{}'.format(
-            self.__app_config['Username'], self.__app_config['Password']).encode()).decode()
+            self._app_config['Username'],
+            self._app_config['Password']).encode()).decode()
         logging.info('Authentication credentials are: {}'.format(credentials))
         try:
             request = url.Request(server_url)
             request.add_header('Authorization', 'Basic {}'.format(credentials))
             request.add_header('Content-Type', 'application/json')
-            logging.debug("Sending {} to {}".format(event.get_json(), server_url))
-            vel = url.urlopen(request, event.get_json(), timeout=1)
+            event_str = json.dumps(event).encode()
+            logging.debug("Sending {} to {}".format(event_str, server_url))
+            url.urlopen(request, event_str, timeout=1)
             logging.debug("Sent data to {} successfully".format(server_url))
         except url.HTTPError as e:
             logging.error('Vendor Event Listener exception: {}'.format(e))
         except url.URLError as e:
-            logging.error('Vendor Event Listener is is not reachable: {}'.format(e))
+            logging.error(
+                'Vendor Event Listener is is not reachable: {}'.format(e))
         except Exception as e:
             logging.error('Vendor Event Listener error: {}'.format(e))
 
-    def bytes_to_kb(self, bytes):
-        """Convert bytes to kibibytes"""
-        return round((bytes / 1024.0), 3)
-
-    def get_hostname(self):
-        return socket.gethostname()
-
-    def send_host_measurements(self):
-        # get list of all VMs
-        virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
-                                               mark_as_read=False)
-        vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
-        for vm_name in vm_names:
-            # make sure that 'virt' plugin cache is up-to-date
-            vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
-                                             mark_as_read=False)
-            us_up_to_date = True
-            for vm_value in vm_values:
-                if vm_value['updated'] == False:
-                    logging.warning("%s" % vm_value)
-                    us_up_to_date = False
-                    break
-            if not us_up_to_date:
-                # one of the cache value is not up-to-date, break
-                logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name))
-                continue
-            # values are up-to-date, create an event message
-            measurement = MeasurementsForVfScaling(self.get_event_id())
-            measurement.functional_role = self.__app_config['FunctionalRole']
-            # fill out reporting_entity
-            measurement.reporting_entity_id = self.get_hostname()
-            measurement.reporting_entity_name = measurement.reporting_entity_id
-            # set source as a host value
-            measurement.source_id = vm_name
-            measurement.source_name = measurement.source_id
-            # fill out EpochMicrosec (convert to us)
-            measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
-            # plugin interval
-            measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
-            # memoryUsage
-            mem_usage = MemoryUsage(vm_name)
-            memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                type_name='memory', type_instance='total')
-            memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                 type_name='memory', type_instance='unused')
-            memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                              type_name='memory', type_instance='rss')
-            if len(memory_total) > 0:
-                mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0])
-            if len(memory_unused) > 0:
-                mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0])
-            elif len(memory_rss) > 0:
-                mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0])
-            # since, "used" metric is not provided by virt plugn, set the rest of the memory stats
-            # to zero to calculate used based on provided stats only
-            mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \
-                mem_usage.memory_slab_unrecl = 0
-            measurement.add_memory_usage(mem_usage)
-            # cpuUsage
-            virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
-                                              plugin_name='virt', type_name='virt_vcpu')
-            for virt_vcpu in virt_vcpus:
-                cpu_usage = CpuUsage(virt_vcpu['type_instance'])
-                cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu)
-                measurement.add_cpu_usage(cpu_usage)
-            # vNicPerformance
-            if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                              type_name='if_packets', mark_as_read=False)
-            if_names = [x['type_instance'] for x in if_packets]
-            for if_name in if_names:
-                if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                  type_name='if_packets', type_instance=if_name)
-                if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                 type_name='if_octets', type_instance=if_name)
-                if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                 type_name='if_errors', type_instance=if_name)
-                if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                  type_name='if_dropped', type_instance=if_name)
-                v_nic_performance = VNicPerformance(if_name)
-                v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0]
-                v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1]
-                v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0]
-                v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1]
-                v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0]
-                v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1]
-                v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0]
-                v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1]
-                measurement.add_v_nic_performance(v_nic_performance)
-            # diskUsage
-            disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                               type_name='disk_octets', mark_as_read=False)
-            disk_names = [x['type_instance'] for x in disk_octets]
-            for disk_name in disk_names:
-                disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                   type_name='disk_octets', type_instance=disk_name)
-                disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                type_name='disk_ops', type_instance=disk_name)
-                disk_usage = DiskUsage(disk_name)
-                disk_usage.disk_octets_read_last = disk_octets[0]['values'][0]
-                disk_usage.disk_octets_write_last = disk_octets[0]['values'][1]
-                disk_usage.disk_ops_read_last = disk_ops[0]['values'][0]
-                disk_usage.disk_ops_write_last = disk_ops[0]['values'][1]
-                measurement.add_disk_usage(disk_usage)
-            # add additional measurements (perf)
-            perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                               type_name='perf')
-            named_array = NamedArrayOfFields('perf')
-            for perf in perf_values:
-                named_array.add(Field(perf['type_instance'], str(perf['values'][0])))
-            measurement.add_additional_measurement(named_array)
-            # add host values as additional measurements
-            self.set_additional_fields(measurement, exclude_plugins=['virt'])
-            # send event to the VES
-            self.event_send(measurement)
-        if len(vm_names) > 0:
-            # mark the additional measurements metrics as read
-            self.mark_cache_values_as_read(exclude_plugins=['virt'])
-
-    def event_timer(self):
-        """Event timer thread"""
-        self.lock()
-        try:
-            self.send_host_measurements()
-        finally:
-            self.unlock()
-
-    def mark_cache_values_as_read(self, exclude_plugins=None):
-        """mark the cache values as read"""
-        for plugin_name in self.__plugin_data_cache.keys():
-            if (exclude_plugins != None and plugin_name in exclude_plugins):
-                # skip excluded plugins
-                continue;
-            for val in self.__plugin_data_cache[plugin_name]['vls']:
-                val['updated'] = False
-
-    def set_additional_measurements(self, measurement, exclude_plugins=None):
-        """Set addition measurement filed with host/guets values"""
-        # add host/guest values as additional measurements
-        for plugin_name in self.__plugin_data_cache.keys():
-            if (exclude_plugins != None and plugin_name in exclude_plugins):
-                # skip excluded plugins
-                continue;
-            for val in self.__plugin_data_cache[plugin_name]['vls']:
-                if val['updated']:
-                    array_name = self.make_dash_string(plugin_name, val['plugin_instance'],
-                                                       val['type_instance'])
-                    named_array = NamedArrayOfFields(array_name)
-
-                    for index in range(len(val['dsnames'])):
-                        mname = '{}-{}'.format(val['type'], val['dsnames'][index])
-                        named_array.add(Field(mname, str(val['values'][index])))
-                    measurement.add_additional_measurement(named_array);
-                    val['updated'] = False
-
-    def set_additional_fields(self, measurement, exclude_plugins=None):
-        # set host values as additional fields
-        for plugin_name in self.__plugin_data_cache.keys():
-            if (exclude_plugins != None and plugin_name in exclude_plugins):
-                # skip excluded plugins
-                continue;
-            for val in self.__plugin_data_cache[plugin_name]['vls']:
-                if val['updated']:
-                    name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'],
-                                                        val['type_instance'])
-
-                    for index in range(len(val['dsnames'])):
-                        field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index])
-                        measurement.add_additional_fields(Field(field_name, str(val['values'][index])))
-
-    def cpu_ns_to_percentage(self, vl):
-        """Convert CPU usage ns to CPU %"""
-        total = vl['values'][0]
-        total_time = vl['time']
-        pre_total = vl['pre_values'][0]
-        pre_total_time = vl['pre_time']
-        if (total_time - pre_total_time) == 0:
-            # return zero usage if time diff is zero
-            return 0.0
-        percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0)
-        logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
-            pre_total_time, pre_total, total_time, total, round(percent, 2)))
-        return round(percent, 2)
-
-    def make_dash_string(self, *args):
-        """Join non empty strings with dash symbol"""
-        return '-'.join(filter(lambda x: len(x) > 0, args))
-
     def config(self, config):
         """VES option configuration"""
         for key, value in config.items('config'):
-            if key in self.__app_config:
+            if key in self._app_config:
                 try:
-                    if type(self.__app_config[key]) == int:
+                    if type(self._app_config[key]) == int:
                         value = int(value)
-                    elif type(self.__app_config[key]) == float:
+                    elif type(self._app_config[key]) == float:
                         value = float(value)
-                    elif type(self.__app_config[key]) == bool:
-                        value = bool(distutils.util.strtobool(value))
+                    elif type(self._app_config[key]) == bool:
+                        value = bool(strtobool(value))
 
-                    if isinstance(value, type(self.__app_config[key])):
-                        self.__app_config[key] = value
+                    if isinstance(value, type(self._app_config[key])):
+                        self._app_config[key] = value
                     else:
                         logging.error("Type mismatch with %s" % key)
                         sys.exit()
@@ -820,125 +107,107 @@ class VESApp(object):
                 logging.error("Incorrect key configuration %s" % key)
                 sys.exit()
 
-    def init(self):
-        """Initialisation"""
-        # start the VES timer
-        self.start_timer()
-
-    def update_cache_value(self, kafka_data):
-        """Update value internal collectd cache values or create new one
-         Please note, the cache should be locked before using this function"""
-        found = False
-        if kafka_data['plugin'] not in self.__plugin_data_cache:
-            self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []}
-        plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls']
-
-        for index in range(len(plugin_vl)):
-            # record found, so just update time the values
-            if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \
-                    (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \
-                    (plugin_vl[index]['type'] == kafka_data['type']):
-                plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
-                plugin_vl[index]['time'] = kafka_data['time']
-                plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
-                plugin_vl[index]['values'] = kafka_data['values']
-                plugin_vl[index]['dsnames'] = kafka_data['dsnames']
-                plugin_vl[index]['updated'] = True
-                found = True
-                break
-        if not found:
-            value = {}
-            # create new cache record
-            value['plugin_instance'] = kafka_data['plugin_instance']
-            value['type_instance'] = kafka_data['type_instance']
-            value['values'] = kafka_data['values']
-            value['pre_values'] = kafka_data['values']
-            value['type'] = kafka_data['type']
-            value['time'] = kafka_data['time']
-            value['pre_time'] = kafka_data['time']
-            value['host'] = kafka_data['host']
-            value['dsnames'] = kafka_data['dsnames']
-            value['updated'] = True
-            self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value)
-            # update plugin interval based on one received in the value
-            self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval']
-
-    def cache_get_value(self, plugin_name=None, plugin_instance=None,
-                        type_name=None, type_instance=None, type_names=None, mark_as_read=True):
-        """Get cache value by given criteria"""
-        ret_list = []
-        if plugin_name in self.__plugin_data_cache:
-            for val in self.__plugin_data_cache[plugin_name]['vls']:
-                if (type_name == None or type_name == val['type']) and \
-                        (plugin_instance == None or plugin_instance == val['plugin_instance']) and \
-                        (type_instance == None or type_instance == val['type_instance']) and \
-                        (type_names == None or val['type'] in type_names):
-                    if mark_as_read:
-                        val['updated'] = False
-                    ret_list.append(val)
-        return ret_list
-
-    def shutdown(self):
-        """Shutdown method for clean up"""
-        self.stop_timer()
+    def init(self, configfile, schema_file):
+        if configfile is not None:
+            # read VES configuration file if provided
+            config = ConfigParser.ConfigParser()
+            config.optionxform = lambda option: option
+            config.read(configfile)
+            self.config(config)
+        # initialize normalizer
+        self.initialize(schema_file, self._app_config['SendEventInterval'])
 
     def run(self):
         """Consumer JSON data from kafka broker"""
-        kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort'))
-        consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest',
-                                 enable_auto_commit=False,
-                                 value_deserializer=lambda m: json.loads(m.decode('ascii')))
+        kafka_server = '{}:{}'.format(
+            self._app_config.get('KafkaBroker'),
+            self._app_config.get('KafkaPort'))
+        consumer = KafkaConsumer(
+            'collectd', bootstrap_servers=kafka_server,
+            auto_offset_reset='latest', enable_auto_commit=False,
+            value_deserializer=lambda m: json.loads(m.decode('ascii')))
 
         for message in consumer:
             for kafka_data in message.value:
-                self.lock()
-                try:
-                    # Receive Kafka data and update cache values
-                    self.update_cache_value(kafka_data)
-
-                finally:
-                    self.unlock()
+                # {
+                #   u'dstypes': [u'derive'],
+                #   u'plugin': u'cpu',
+                #   u'dsnames': [u'value'],
+                #   u'interval': 10.0,
+                #   u'host': u'localhost',
+                #   u'values': [99.9978996416267],
+                #   u'time': 1502114956.244,
+                #   u'plugin_instance': u'44',
+                #   u'type_instance': u'idle',
+                #   u'type': u'cpu'
+                # }
+                logging.debug('{}:run():data={}'.format(
+                    self.__class__.__name__, kafka_data))
+                for ds_name in kafka_data['dsnames']:
+                    index = kafka_data['dsnames'].index(ds_name)
+                    val_hash = CollectdValue.hash_gen(
+                        kafka_data['host'], kafka_data['plugin'],
+                        kafka_data['plugin_instance'], kafka_data['type'],
+                        kafka_data['type_instance'], ds_name)
+                    collector = self.get_collector()
+                    val = collector.get(val_hash)
+                    if val:
+                        # update the value
+                        val.value = kafka_data['values'][index]
+                        val.time = kafka_data['time']
+                        del(val)
+                    else:
+                        # add new value into the collector
+                        val = CollectdValue()
+                        val.host = kafka_data['host']
+                        val.plugin = kafka_data['plugin']
+                        val.plugin_instance = kafka_data['plugin_instance']
+                        val.type = kafka_data['type']
+                        val.type_instance = kafka_data['type_instance']
+                        val.value = kafka_data['values'][index]
+                        val.interval = kafka_data['interval']
+                        val.time = kafka_data['time']
+                        val.ds_name = ds_name
+                        collector.add(val)
 
 
 def main():
     # Parsing cmdline options
     parser = argparse.ArgumentParser()
-    parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE")
-    parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING',
-                        help="Specify log level (default: %(default)s)", metavar="LEVEL")
+    parser.add_argument("--events-schema", dest="schema", required=True,
+                        help="YAML events schema definition", metavar="FILE")
+    parser.add_argument("--config", dest="configfile", default=None,
+                        help="Specify config file", metavar="FILE")
+    parser.add_argument("--loglevel", dest="level", default='INFO',
+                        choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'],
+                        help="Specify log level (default: %(default)s)",
+                        metavar="LEVEL")
     parser.add_argument("--logfile", dest="logfile", default='ves_app.log',
-                        help="Specify log file (default: %(default)s)", metavar="FILE")
+                        help="Specify log file (default: %(default)s)",
+                        metavar="FILE")
     args = parser.parse_args()
 
     # Create log file
-    logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level)
+    logging.basicConfig(filename=args.logfile,
+                        format='%(asctime)s %(message)s',
+                        level=args.level)
+    if args.configfile is None:
+        logging.warning("No configfile specified, using default options")
 
     # Create Application Instance
     application_instance = VESApp()
+    application_instance.init(args.configfile, args.schema)
 
-    # Read from config file
-    if args.configfile:
-        config = ConfigParser.ConfigParser()
-        config.optionxform = lambda option: option
-        config.read(args.configfile)
-        # Write Config Values
-        application_instance.config(config)
-    else:
-        logging.warning("No configfile specified, using default options")
-
-    # Start timer for interval
-    application_instance.init()
-
-    # Read Data from Kakfa
     try:
-        # Kafka consumer & update cache
+        # Run the plugin
         application_instance.run()
     except KeyboardInterrupt:
         logging.info(" - Ctrl-C handled, exiting gracefully")
-        application_instance.shutdown()
+    except Exception as e:
+        logging.error('{}, {}'.format(type(e), e))
+    finally:
+        application_instance.destroy()
         sys.exit()
-    except:
-        logging.error("Unknown Error")
 
 
 if __name__ == '__main__':
index 1dccd49..aee0816 100644 (file)
@@ -6,8 +6,7 @@ Topic = example_vnf
 UseHttps = false
 Username =
 Password =
-FunctionalRole = Collectd VES Agent
 SendEventInterval = 20
 ApiVersion = 3
 KafkaPort = 9092
-KafkaBroker = localhost
\ No newline at end of file
+KafkaBroker = localhost