VES plugin: Remove collectd modules, now a stand alone process 03/39603/1
authoracarey <alan.carey@intel.com>
Wed, 16 Aug 2017 14:02:55 +0000 (15:02 +0100)
committeracarey <alan.carey@intel.com>
Fri, 18 Aug 2017 15:29:50 +0000 (16:29 +0100)
-All collectd modules removed
-Config file used to set configuration
-Logging added
-Kafka consumer added
-Plugin references in code changed to app

Change-Id: I4fed4660d18af22927bc16ff61096d28789c41d1
Signed-off-by: Alan Carey <alan.carey@intel.com>
3rd_party/collectd-ves-plugin/ves_plugin/__init__.py
3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf [new file with mode: 0644]

index 1656d43..14364a3 100644 (file)
@@ -1,21 +1,12 @@
-# MIT License
 #
-# Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
 #
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the "Software"),
-# to deal in the Software without restriction, including without limitation
-# the rights to use, copy, modify, merge, publish, distribute, sublicense,
-# and/or sell copies of the Software, and to permit persons to whom the
-# Software is furnished to do so, subject to the following conditions:
+#    http://www.apache.org/licenses/LICENSE-2.0
 #
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-# DEALINGS IN THE SOFTWARE.
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
index ee27bcd..4c313ce 100644 (file)
@@ -1,29 +1,27 @@
-# MIT License
+#!/usr/bin/env python
 #
-# Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
 #
-# Permission is hereby granted, free of charge, to any person obtaining a
-# copy of this software and associated documentation files (the "Software"),
-# to deal in the Software without restriction, including without limitation
-# the rights to use, copy, modify, merge, publish, distribute, sublicense,
-# and/or sell copies of the Software, and to permit persons to whom the
-# Software is furnished to do so, subject to the following conditions:
+#     http://www.apache.org/licenses/LICENSE-2.0
 #
-# The above copyright notice and this permission notice shall be included in
-# all copies or substantial portions of the Software.
-#
-# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
-# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
-# DEALINGS IN THE SOFTWARE.
-
-import collectd
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 import json
 import sys
 import base64
+import ConfigParser
+import logging
+import argparse
+
+from distutils.util import strtobool
+from kafka import KafkaConsumer
+
 try:
     # For Python 3.0 and later
     import urllib.request as url
@@ -35,13 +33,14 @@ import time
 from threading import Timer
 from threading import Lock
 
+
 class Event(object):
     """Event header"""
 
     def __init__(self):
         """Construct the common header"""
         self.version = 2.0
-        self.event_type = "Info" # use "Info" unless a notification is generated
+        self.event_type = "Info"  # use "Info" unless a notification is generated
         self.domain = ""
         self.event_id = ""
         self.source_id = ""
@@ -49,7 +48,7 @@ class Event(object):
         self.functional_role = ""
         self.reporting_entity_id = ""
         self.reporting_entity_name = ""
-        self.priority = "Normal" # will be derived from event if there is one
+        self.priority = "Normal"  # will be derived from event if there is one
         self.start_epoch_microsec = 0
         self.last_epoch_micro_sec = 0
         self.sequence = 0
@@ -78,9 +77,9 @@ class Event(object):
         obj['nfcNamingCode'] = self.nfc_naming_code
         obj['nfNamingCode'] = self.nf_naming_code
         return json.dumps({
-            'event' : {
-                'commonEventHeader' : obj,
-                self.get_name() : self.get_obj()
+            'event': {
+                'commonEventHeader': obj,
+                self.get_name(): self.get_obj()
             }
         }).encode()
 
@@ -90,6 +89,7 @@ class Event(object):
     def get_obj():
         assert False, 'abstract method get_obj() is not implemented'
 
+
 class Field(object):
     """field datatype"""
 
@@ -99,10 +99,11 @@ class Field(object):
 
     def get_obj(self):
         return {
-            'name' : self.name,
-            'value' : self.value
+            'name': self.name,
+            'value': self.value
         }
 
+
 class NamedArrayOfFields(object):
     """namedArrayOfFields datatype"""
 
@@ -115,10 +116,11 @@ class NamedArrayOfFields(object):
 
     def get_obj(self):
         return {
-            'name' : self.name,
-            'arrayOfFields' : self.array_of_fields
+            'name': self.name,
+            'arrayOfFields': self.array_of_fields
         }
 
+
 class VESDataType(object):
     """ Base VES datatype """
 
@@ -126,56 +128,57 @@ class VESDataType(object):
         if val is not None:
             obj[key] = val
 
+
 class DiskUsage(VESDataType):
     """diskUsage datatype"""
 
     def __init__(self, identifier):
-      self.disk_identifier = identifier
-      self.disk_io_time_avg = None
-      self.disk_io_time_last = None
-      self.disk_io_time_max = None
-      self.disk_io_time_min = None
-      self.disk_merged_read_avg = None
-      self.disk_merged_read_last = None
-      self.disk_merged_read_max = None
-      self.disk_merged_read_min = None
-      self.disk_merged_write_avg = None
-      self.disk_merged_write_last = None
-      self.disk_merged_write_max = None
-      self.disk_merged_write_min = None
-      self.disk_octets_read_avg = None
-      self.disk_octets_read_last = None
-      self.disk_octets_read_max = None
-      self.disk_octets_read_min = None
-      self.disk_octets_write_avg = None
-      self.disk_octets_write_last = None
-      self.disk_octets_write_max = None
-      self.disk_octets_write_min = None
-      self.disk_ops_read_avg = None
-      self.disk_ops_read_last = None
-      self.disk_ops_read_max = None
-      self.disk_ops_read_min = None
-      self.disk_ops_write_avg = None
-      self.disk_ops_write_last = None
-      self.disk_ops_write_max = None
-      self.disk_ops_write_min = None
-      self.disk_pending_operations_avg = None
-      self.disk_pending_operations_last = None
-      self.disk_pending_operations_max = None
-      self.disk_pending_operations_min = None
-      self.disk_time_read_avg = None
-      self.disk_time_read_last = None
-      self.disk_time_read_max = None
-      self.disk_time_read_min = None
-      self.disk_time_write_avg = None
-      self.disk_time_write_last = None
-      self.disk_time_write_max = None
-      self.disk_time_write_min = None
+        self.disk_identifier = identifier
+        self.disk_io_time_avg = None
+        self.disk_io_time_last = None
+        self.disk_io_time_max = None
+        self.disk_io_time_min = None
+        self.disk_merged_read_avg = None
+        self.disk_merged_read_last = None
+        self.disk_merged_read_max = None
+        self.disk_merged_read_min = None
+        self.disk_merged_write_avg = None
+        self.disk_merged_write_last = None
+        self.disk_merged_write_max = None
+        self.disk_merged_write_min = None
+        self.disk_octets_read_avg = None
+        self.disk_octets_read_last = None
+        self.disk_octets_read_max = None
+        self.disk_octets_read_min = None
+        self.disk_octets_write_avg = None
+        self.disk_octets_write_last = None
+        self.disk_octets_write_max = None
+        self.disk_octets_write_min = None
+        self.disk_ops_read_avg = None
+        self.disk_ops_read_last = None
+        self.disk_ops_read_max = None
+        self.disk_ops_read_min = None
+        self.disk_ops_write_avg = None
+        self.disk_ops_write_last = None
+        self.disk_ops_write_max = None
+        self.disk_ops_write_min = None
+        self.disk_pending_operations_avg = None
+        self.disk_pending_operations_last = None
+        self.disk_pending_operations_max = None
+        self.disk_pending_operations_min = None
+        self.disk_time_read_avg = None
+        self.disk_time_read_last = None
+        self.disk_time_read_max = None
+        self.disk_time_read_min = None
+        self.disk_time_write_avg = None
+        self.disk_time_write_last = None
+        self.disk_time_write_max = None
+        self.disk_time_write_min = None
 
     def get_obj(self):
         obj = {
             # required
-            'diskIdentifier' : self.disk_identifier
+            'diskIdentifier': self.disk_identifier
         }
         self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg)
         self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last)
@@ -219,46 +222,47 @@ class DiskUsage(VESDataType):
         self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min)
         return obj
 
+
 class VNicPerformance(VESDataType):
     """vNicPerformance datatype"""
 
     def __init__(self, identifier):
-      self.received_broadcast_packets_accumulated = None
-      self.received_broadcast_packets_delta = None
-      self.received_discarded_packets_accumulated = None
-      self.received_discarded_packets_delta = None
-      self.received_error_packets_accumulated = None
-      self.received_error_packets_delta = None
-      self.received_multicast_packets_accumulated = None
-      self.received_multicast_packets_delta = None
-      self.received_octets_accumulated = None
-      self.received_octets_delta = None
-      self.received_total_packets_accumulated = None
-      self.received_total_packets_delta = None
-      self.received_unicast_packets_accumulated = None
-      self.received_unicast_packets_delta = None
-      self.transmitted_broadcast_packets_accumulated = None
-      self.transmitted_broadcast_packets_delta = None
-      self.transmitted_discarded_packets_accumulated = None
-      self.transmitted_discarded_packets_delta = None
-      self.transmitted_error_packets_accumulated = None
-      self.transmitted_error_packets_delta = None
-      self.transmitted_multicast_packets_accumulated = None
-      self.transmitted_multicast_packets_delta = None
-      self.transmitted_octets_accumulated = None
-      self.transmitted_octets_delta = None
-      self.transmitted_total_packets_accumulated = None
-      self.transmitted_total_packets_delta = None
-      self.transmitted_unicast_packets_accumulated = None
-      self.transmitted_unicast_packets_delta = None
-      self.values_are_suspect = 'true'
-      self.v_nic_identifier = identifier
+        self.received_broadcast_packets_accumulated = None
+        self.received_broadcast_packets_delta = None
+        self.received_discarded_packets_accumulated = None
+        self.received_discarded_packets_delta = None
+        self.received_error_packets_accumulated = None
+        self.received_error_packets_delta = None
+        self.received_multicast_packets_accumulated = None
+        self.received_multicast_packets_delta = None
+        self.received_octets_accumulated = None
+        self.received_octets_delta = None
+        self.received_total_packets_accumulated = None
+        self.received_total_packets_delta = None
+        self.received_unicast_packets_accumulated = None
+        self.received_unicast_packets_delta = None
+        self.transmitted_broadcast_packets_accumulated = None
+        self.transmitted_broadcast_packets_delta = None
+        self.transmitted_discarded_packets_accumulated = None
+        self.transmitted_discarded_packets_delta = None
+        self.transmitted_error_packets_accumulated = None
+        self.transmitted_error_packets_delta = None
+        self.transmitted_multicast_packets_accumulated = None
+        self.transmitted_multicast_packets_delta = None
+        self.transmitted_octets_accumulated = None
+        self.transmitted_octets_delta = None
+        self.transmitted_total_packets_accumulated = None
+        self.transmitted_total_packets_delta = None
+        self.transmitted_unicast_packets_accumulated = None
+        self.transmitted_unicast_packets_delta = None
+        self.values_are_suspect = 'true'
+        self.v_nic_identifier = identifier
 
     def get_obj(self):
         obj = {
             # required
-            'valuesAreSuspect' : self.values_are_suspect,
-            'vNicIdentifier' : self.v_nic_identifier
+            'valuesAreSuspect': self.values_are_suspect,
+            'vNicIdentifier': self.v_nic_identifier
         }
         # optional
         self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated)
@@ -291,6 +295,7 @@ class VNicPerformance(VESDataType):
         self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta)
         return obj
 
+
 class CpuUsage(VESDataType):
     """cpuUsage datatype"""
 
@@ -309,8 +314,8 @@ class CpuUsage(VESDataType):
     def get_obj(self):
         obj = {
             # required
-            'cpuIdentifier' : self.cpu_identifier,
-            'percentUsage' : self.percent_usage
+            'cpuIdentifier': self.cpu_identifier,
+            'percentUsage': self.percent_usage
         }
         # optional
         self.set_optional(obj, 'cpuIdle', self.cpu_idle)
@@ -323,6 +328,7 @@ class CpuUsage(VESDataType):
         self.set_optional(obj, 'cpuWait', self.cpu_wait)
         return obj
 
+
 class MemoryUsage(VESDataType):
     """memoryUsage datatype"""
 
@@ -338,14 +344,14 @@ class MemoryUsage(VESDataType):
 
     def __str__(self):
         """ for debug purposes """
-        return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n'\
-            'cached : {cached}\nconfigured : {configured}\nfree : {free}\n'\
-            'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n'\
-            'used : {used}\n'.format(buffered = self.memory_buffered,
-            cached = self.memory_cached, configured = self.memory_configured,
-            free = self.memory_free, slab_recl = self.memory_slab_recl,
-            slab_unrecl = self.memory_slab_unrecl, used = self.memory_used,
-            vm_identifier = self.vm_identifier)
+        return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n' \
+               'cached : {cached}\nconfigured : {configured}\nfree : {free}\n' \
+               'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n' \
+               'used : {used}\n'.format(buffered=self.memory_buffered,
+                                        cached=self.memory_cached, configured=self.memory_configured,
+                                        free=self.memory_free, slab_recl=self.memory_slab_recl,
+                                        slab_unrecl=self.memory_slab_unrecl, used=self.memory_used,
+                                        vm_identifier=self.vm_identifier)
 
     def get_memory_free(self):
         if self.memory_free is None:
@@ -362,10 +368,10 @@ class MemoryUsage(VESDataType):
         if self.memory_used is None:
             # calculate the memory used
             if None not in (self.memory_configured, self.memory_free, self.memory_buffered,
-                self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
+                            self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
                 return self.memory_configured - (self.memory_free +
-                    self.memory_buffered + self.memory_cached +
-                    self.memory_slab_recl + self.memory_slab_unrecl)
+                                                 self.memory_buffered + self.memory_cached +
+                                                 self.memory_slab_recl + self.memory_slab_unrecl)
             else:
                 # required field, so return zero
                 return 0
@@ -376,10 +382,10 @@ class MemoryUsage(VESDataType):
         if self.memory_configured is None:
             # calculate the total memory
             if None not in (self.memory_used, self.memory_free, self.memory_buffered,
-                self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
+                            self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
                 return (self.memory_used + self.memory_free +
-                    self.memory_buffered + self.memory_cached +
-                    self.memory_slab_recl + self.memory_slab_unrecl)
+                        self.memory_buffered + self.memory_cached +
+                        self.memory_slab_recl + self.memory_slab_unrecl)
             else:
                 return None
         else:
@@ -388,9 +394,9 @@ class MemoryUsage(VESDataType):
     def get_obj(self):
         obj = {
             # required fields
-            'memoryFree' : self.get_memory_free(),
-            'memoryUsed' : self.get_memory_used(),
-            'vmIdentifier' : self.vm_identifier
+            'memoryFree': self.get_memory_free(),
+            'memoryUsed': self.get_memory_used(),
+            'vmIdentifier': self.vm_identifier
         }
         # optional fields
         self.set_optional(obj, 'memoryBuffered', self.memory_buffered)
@@ -400,6 +406,7 @@ class MemoryUsage(VESDataType):
         self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl)
         return obj
 
+
 class MeasurementsForVfScaling(Event):
     """MeasurementsForVfScaling datatype"""
 
@@ -477,6 +484,7 @@ class MeasurementsForVfScaling(Event):
         """Name of datatype"""
         return "measurementsForVfScalingFields"
 
+
 class Fault(Event):
     """Fault datatype"""
 
@@ -516,29 +524,32 @@ class Fault(Event):
         obj['eventCategory'] = self.event_category
         return obj
 
-class VESPlugin(object):
-    """VES plugin with collectd callbacks"""
+
+class VESApp(object):
+    """VES Application"""
 
     def __init__(self):
-        """Plugin initialization"""
+        """Application initialization"""
         self.__plugin_data_cache = {
-            'cpu' : {'interval' : 0.0, 'vls' : []},
-            'virt' : {'interval' : 0.0, 'vls' : []},
-            'disk' : {'interval' : 0.0, 'vls' : []},
-            'interface' : {'interval' : 0.0, 'vls' : []},
-            'memory' : {'interval' : 0.0, 'vls' : []}
+            'cpu': {'interval': 0.0, 'vls': []},
+            'virt': {'interval': 0.0, 'vls': []},
+            'disk': {'interval': 0.0, 'vls': []},
+            'interface': {'interval': 0.0, 'vls': []},
+            'memory': {'interval': 0.0, 'vls': []}
         }
-        self.__plugin_config = {
-            'Domain' : '127.0.0.1',
-            'Port' : 30000.0,
-            'Path' : '',
-            'Username' : '',
-            'Password' : '',
-            'Topic' : '',
-            'UseHttps' : False,
-            'SendEventInterval' : 20.0,
-            'FunctionalRole' : 'Collectd VES Agent',
-            'ApiVersion' : 5.1
+        self.__app_config = {
+            'Domain': '127.0.0.1',
+            'Port': 30000,
+            'Path': '',
+            'Username': '',
+            'Password': '',
+            'Topic': '',
+            'UseHttps': False,
+            'SendEventInterval': 20.0,
+            'FunctionalRole': 'Collectd VES Agent',
+            'ApiVersion': 5.1,
+            'KafkaPort': 9092,
+            'KafkaBroker': 'localhost'
         }
         self.__host_name = None
         self.__ves_timer = None
@@ -551,16 +562,16 @@ class VESPlugin(object):
         return str(self.__event_id)
 
     def lock(self):
-        """Lock the plugin"""
+        """Lock the application"""
         self.__lock.acquire()
 
     def unlock(self):
-        """Unlock the plugin"""
+        """Unlock the application"""
         self.__lock.release()
 
     def start_timer(self):
         """Start event timer"""
-        self.__ves_timer = Timer(self.__plugin_config['SendEventInterval'], self.__on_time)
+        self.__ves_timer = Timer(self.__app_config['SendEventInterval'], self.__on_time)
         self.__ves_timer.start()
 
     def stop_timer(self):
@@ -575,36 +586,34 @@ class VESPlugin(object):
     def event_send(self, event):
         """Send event to VES"""
         server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
-            's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'],
-            int(self.__plugin_config['Port']), '{}'.format(
-            '/{}'.format(self.__plugin_config['Path']) if (len(self.__plugin_config['Path']) > 0) else ''),
-            int(self.__plugin_config['ApiVersion']), '{}'.format(
-            '/{}'.format(self.__plugin_config['Topic']) if (len(self.__plugin_config['Topic']) > 0) else ''))
-        collectd.info('Vendor Event Listener is at: {}'.format(server_url))
+            's' if self.__app_config['UseHttps'] else '', self.__app_config['Domain'],
+            int(self.__app_config['Port']), '{}'.format(
+                '/{}'.format(self.__app_config['Path']) if (len(self.__app_config['Path']) > 0) else ''),
+            int(self.__app_config['ApiVersion']), '{}'.format(
+                '/{}'.format(self.__app_config['Topic']) if (len(self.__app_config['Topic']) > 0) else ''))
+        logging.info('Vendor Event Listener is at: {}'.format(server_url))
         credentials = base64.b64encode('{}:{}'.format(
-            self.__plugin_config['Username'], self.__plugin_config['Password']).encode()).decode()
-        collectd.info('Authentication credentials are: {}'.format(credentials))
+            self.__app_config['Username'], self.__app_config['Password']).encode()).decode()
+        logging.info('Authentication credentials are: {}'.format(credentials))
         try:
             request = url.Request(server_url)
             request.add_header('Authorization', 'Basic {}'.format(credentials))
             request.add_header('Content-Type', 'application/json')
-            collectd.debug("Sending {} to {}".format(event.get_json(), server_url))
+            logging.debug("Sending {} to {}".format(event.get_json(), server_url))
             vel = url.urlopen(request, event.get_json(), timeout=1)
-            collectd.debug("Sent data to {} successfully".format(server_url))
+            logging.debug("Sent data to {} successfully".format(server_url))
         except url.HTTPError as e:
-            collectd.error('Vendor Event Listener exception: {}'.format(e))
+            logging.error('Vendor Event Listener exception: {}'.format(e))
         except url.URLError as e:
-            collectd.error('Vendor Event Listener is is not reachable: {}'.format(e))
-        except:
-            collectd.error('Vendor Event Listener unknown error')
+            logging.error('Vendor Event Listener is is not reachable: {}'.format(e))
+        except Exception as e:
+            logging.error('Vendor Event Listener error: {}'.format(e))
 
     def bytes_to_kb(self, bytes):
         """Convert bytes to kibibytes"""
         return round((bytes / 1024.0), 3)
 
     def get_hostname(self):
-        if len(self.__host_name):
-            return self.__host_name
         return socket.gethostname()
 
     def send_host_measurements(self):
@@ -619,15 +628,16 @@ class VESPlugin(object):
             us_up_to_date = True
             for vm_value in vm_values:
                 if vm_value['updated'] == False:
+                    logging.warning("%s" % vm_value)
                     us_up_to_date = False
                     break
             if not us_up_to_date:
-                    # one of the cache value is not up-to-date, break
-                    collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name))
-                    continue
+                # one of the cache value is not up-to-date, break
+                logging.warning("virt collectd cache values are not up-to-date for {}".format(vm_name))
+                continue
             # values are up-to-date, create an event message
             measurement = MeasurementsForVfScaling(self.get_event_id())
-            measurement.functional_role = self.__plugin_config['FunctionalRole']
+            measurement.functional_role = self.__app_config['FunctionalRole']
             # fill out reporting_entity
             measurement.reporting_entity_id = self.get_hostname()
             measurement.reporting_entity_name = measurement.reporting_entity_id
@@ -643,7 +653,7 @@ class VESPlugin(object):
             memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
                                                 type_name='memory', type_instance='total')
             memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                              type_name='memory', type_instance='unused')
+                                                 type_name='memory', type_instance='unused')
             memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
                                               type_name='memory', type_instance='rss')
             if len(memory_total) > 0:
@@ -655,7 +665,7 @@ class VESPlugin(object):
             # since, "used" metric is not provided by virt plugn, set the rest of the memory stats
             # to zero to calculate used based on provided stats only
             mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \
-            mem_usage.memory_slab_unrecl = 0
+                mem_usage.memory_slab_unrecl = 0
             measurement.add_memory_usage(mem_usage)
             # cpuUsage
             virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
@@ -672,9 +682,9 @@ class VESPlugin(object):
                 if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
                                                   type_name='if_packets', type_instance=if_name)
                 if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                  type_name='if_octets', type_instance=if_name)
+                                                 type_name='if_octets', type_instance=if_name)
                 if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                  type_name='if_errors', type_instance=if_name)
+                                                 type_name='if_errors', type_instance=if_name)
                 if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
                                                   type_name='if_dropped', type_instance=if_name)
                 v_nic_performance = VNicPerformance(if_name)
@@ -695,7 +705,7 @@ class VESPlugin(object):
                 disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
                                                    type_name='disk_octets', type_instance=disk_name)
                 disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                   type_name='disk_ops', type_instance=disk_name)
+                                                type_name='disk_ops', type_instance=disk_name)
                 disk_usage = DiskUsage(disk_name)
                 disk_usage.disk_octets_read_last = disk_octets[0]['values'][0]
                 disk_usage.disk_octets_write_last = disk_octets[0]['values'][1]
@@ -714,8 +724,8 @@ class VESPlugin(object):
             # send event to the VES
             self.event_send(measurement)
         if len(vm_names) > 0:
-          # mark the additional measurements metrics as read
-          self.mark_cache_values_as_read(exclude_plugins=['virt'])
+            # mark the additional measurements metrics as read
+            self.mark_cache_values_as_read(exclude_plugins=['virt'])
 
     def event_timer(self):
         """Event timer thread"""
@@ -746,9 +756,9 @@ class VESPlugin(object):
                     array_name = self.make_dash_string(plugin_name, val['plugin_instance'],
                                                        val['type_instance'])
                     named_array = NamedArrayOfFields(array_name)
-                    ds = collectd.get_dataset(val['type'])
-                    for index in range(len(ds)):
-                        mname = '{}-{}'.format(val['type'], ds[index][0])
+
+                    for index in range(len(val['dsnames'])):
+                        mname = '{}-{}'.format(val['type'], val['dsnames'][index])
                         named_array.add(Field(mname, str(val['values'][index])))
                     measurement.add_additional_measurement(named_array);
                     val['updated'] = False
@@ -763,9 +773,9 @@ class VESPlugin(object):
                 if val['updated']:
                     name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'],
                                                         val['type_instance'])
-                    ds = collectd.get_dataset(val['type'])
-                    for index in range(len(ds)):
-                        field_name = self.make_dash_string(name_prefix, val['type'], ds[index][0])
+
+                    for index in range(len(val['dsnames'])):
+                        field_name = self.make_dash_string(name_prefix, val['type'], val['dsnames'][index])
                         measurement.add_additional_fields(Field(field_name, str(val['values'][index])))
 
     def cpu_ns_to_percentage(self, vl):
@@ -777,8 +787,8 @@ class VESPlugin(object):
         if (total_time - pre_total_time) == 0:
             # return zero usage if time diff is zero
             return 0.0
-        percent = (100.0 * (total - pre_total))/((total_time - pre_total_time) * 1000000000.0)
-        collectd.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
+        percent = (100.0 * (total - pre_total)) / ((total_time - pre_total_time) * 1000000000.0)
+        logging.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
             pre_total_time, pre_total, total_time, total, round(percent, 2)))
         return round(percent, 2)
 
@@ -787,62 +797,71 @@ class VESPlugin(object):
         return '-'.join(filter(lambda x: len(x) > 0, args))
 
     def config(self, config):
-        """Collectd config callback"""
-        for child in config.children:
-            # check the config entry name
-            if child.key not in self.__plugin_config:
-                collectd.error("Key '{}' name is invalid".format(child.key))
-                raise RuntimeError('Configuration key name error')
-            # check the config entry value type
-            if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]):
-                collectd.error("Key '{}' value type '{}' should be {}".format(
-                               child.key, str(type(child.values[0])),
-                               str(type(self.__plugin_config[child.key]))))
-                raise RuntimeError('Configuration key value error')
-            # store the value in configuration
-            self.__plugin_config[child.key] = child.values[0]
+        """VES option configuration"""
+        for key, value in config.items('config'):
+            if key in self.__app_config:
+                try:
+                    if type(self.__app_config[key]) == int:
+                        value = int(value)
+                    elif type(self.__app_config[key]) == float:
+                        value = float(value)
+                    elif type(self.__app_config[key]) == bool:
+                        value = bool(distutils.util.strtobool(value))
+
+                    if isinstance(value, type(self.__app_config[key])):
+                        self.__app_config[key] = value
+                    else:
+                        logging.error("Type mismatch with %s" % key)
+                        sys.exit()
+                except ValueError:
+                    logging.error("Incorrect value type for %s" % key)
+                    sys.exit()
+            else:
+                logging.error("Incorrect key configuration %s" % key)
+                sys.exit()
 
     def init(self):
-        """Collectd init callback"""
+        """Initialisation"""
         # start the VES timer
         self.start_timer()
 
-    ##
-    # Please note, the cache should be locked before using this function
-    #
-    def update_cache_value(self, vl):
-        """Update value internal collectD cache values or create new one"""
+    def update_cache_value(self, kafka_data):
+        """Update value internal collectd cache values or create new one
+         Please note, the cache should be locked before using this function"""
         found = False
-        if vl.plugin not in self.__plugin_data_cache:
-             self.__plugin_data_cache[vl.plugin] = {'vls': []}
-        plugin_vl = self.__plugin_data_cache[vl.plugin]['vls']
+        if kafka_data['plugin'] not in self.__plugin_data_cache:
+            self.__plugin_data_cache[kafka_data['plugin']] = {'vls': []}
+        plugin_vl = self.__plugin_data_cache[kafka_data['plugin']]['vls']
+
         for index in range(len(plugin_vl)):
             # record found, so just update time the values
-            if (plugin_vl[index]['plugin_instance'] ==
-                vl.plugin_instance) and (plugin_vl[index]['type_instance'] ==
-                    vl.type_instance) and (plugin_vl[index]['type'] == vl.type):
+            if (plugin_vl[index]['plugin_instance'] == kafka_data['plugin_instance']) and \
+                    (plugin_vl[index]['type_instance'] == kafka_data['type_instance']) and \
+                    (plugin_vl[index]['type'] == kafka_data['type']):
                 plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
-                plugin_vl[index]['time'] = vl.time
+                plugin_vl[index]['time'] = kafka_data['time']
                 plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
-                plugin_vl[index]['values'] = vl.values
+                plugin_vl[index]['values'] = kafka_data['values']
+                plugin_vl[index]['dsnames'] = kafka_data['dsnames']
                 plugin_vl[index]['updated'] = True
                 found = True
                 break
         if not found:
             value = {}
             # create new cache record
-            value['plugin_instance'] = vl.plugin_instance
-            value['type_instance'] = vl.type_instance
-            value['values'] = vl.values
-            value['pre_values'] = vl.values
-            value['type'] = vl.type
-            value['time'] = vl.time
-            value['pre_time'] = vl.time
-            value['host'] = vl.host
+            value['plugin_instance'] = kafka_data['plugin_instance']
+            value['type_instance'] = kafka_data['type_instance']
+            value['values'] = kafka_data['values']
+            value['pre_values'] = kafka_data['values']
+            value['type'] = kafka_data['type']
+            value['time'] = kafka_data['time']
+            value['pre_time'] = kafka_data['time']
+            value['host'] = kafka_data['host']
+            value['dsnames'] = kafka_data['dsnames']
             value['updated'] = True
-            self.__plugin_data_cache[vl.plugin]['vls'].append(value)
+            self.__plugin_data_cache[kafka_data['plugin']]['vls'].append(value)
             # update plugin interval based on one received in the value
-            self.__plugin_data_cache[vl.plugin]['interval'] = vl.interval
+            self.__plugin_data_cache[kafka_data['plugin']]['interval'] = kafka_data['interval']
 
     def cache_get_value(self, plugin_name=None, plugin_instance=None,
                         type_name=None, type_instance=None, type_names=None, mark_as_read=True):
@@ -850,84 +869,77 @@ class VESPlugin(object):
         ret_list = []
         if plugin_name in self.__plugin_data_cache:
             for val in self.__plugin_data_cache[plugin_name]['vls']:
-                #collectd.info("plugin={}, type={}, type_instance={}".format(
-                #    plugin_name, val['type'], val['type_instance']))
-                if (type_name == None or type_name == val['type']) and (plugin_instance == None
-                    or plugin_instance == val['plugin_instance']) and (type_instance == None
-                    or type_instance == val['type_instance']) and (type_names == None
-                    or val['type'] in type_names):
+                if (type_name == None or type_name == val['type']) and \
+                        (plugin_instance == None or plugin_instance == val['plugin_instance']) and \
+                        (type_instance == None or type_instance == val['type_instance']) and \
+                        (type_names == None or val['type'] in type_names):
                     if mark_as_read:
                         val['updated'] = False
                     ret_list.append(val)
         return ret_list
 
-    def write(self, vl, data=None):
-        """Collectd write callback"""
-        self.lock()
-        try:
-            # Example of collectD Value format
-            # collectd.Values(type='cpu',type_instance='interrupt',
-            # plugin='cpu',plugin_instance='25',host='localhost',
-            # time=1476694097.022873,interval=10.0,values=[0])
-            if vl.plugin == 'ves_plugin':
-                # store the host name and unregister callback
-                self.__host_name = vl.host
-                collectd.unregister_read(self.read)
-                return
-            # update the cache values
-            self.update_cache_value(vl)
-        finally:
-            self.unlock()
-
-    def read(self, data=None):
-        """Collectd read callback. Use this callback to get host name"""
-        vl = collectd.Values(type='gauge')
-        vl.plugin='ves_plugin'
-        vl.dispatch(values=[0])
-
-    def notify(self, n):
-        """Collectd notification callback"""
-        collectd_event_severity_map = {
-            collectd.NOTIF_FAILURE : 'CRITICAL',
-            collectd.NOTIF_WARNING : 'WARNING',
-            collectd.NOTIF_OKAY : 'NORMAL'
-        }
-        fault = Fault(self.get_event_id())
-        # fill out common header
-        fault.event_type = "Notification"
-        fault.functional_role = self.__plugin_config['FunctionalRole']
-        fault.reporting_entity_id = self.get_hostname()
-        fault.reporting_entity_name = self.get_hostname()
-        if n.plugin == 'virt':
-            # if the notification is generated by virt plugin,
-            # use the plugin_instance (e.g. VM name) as a source.
-            fault.source_id = str(n.plugin_instance)
-            fault.source_name = fault.source_id
-        else:
-            fault.source_id = self.get_hostname()
-            fault.source_name = self.get_hostname()
-        fault.start_epoch_microsec = (n.time * 1000000)
-        fault.last_epoch_micro_sec = fault.start_epoch_microsec
-        # fill out fault header
-        fault.event_severity = collectd_event_severity_map[n.severity]
-        fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance)
-        fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance)
-        fault.event_source_type = 'host(3)'
-        fault.alarm_condition = n.message
-        self.event_send(fault)
-
     def shutdown(self):
-        """Collectd shutdown callback"""
-        # stop the timer
+        """Shutdown method for clean up"""
         self.stop_timer()
 
-# The collectd plugin instance
-plugin_instance = VESPlugin()
-
-# Register plugin callbacks
-collectd.register_config(plugin_instance.config)
-collectd.register_init(plugin_instance.init)
-collectd.register_read(plugin_instance.read)
-collectd.register_write(plugin_instance.write)
-collectd.register_notification(plugin_instance.notify)
-collectd.register_shutdown(plugin_instance.shutdown)
+    def run(self):
+        """Consumer JSON data from kafka broker"""
+        kafka_server = '%s:%s' % (self.__app_config.get('KafkaBroker'), self.__app_config.get('KafkaPort'))
+        consumer = KafkaConsumer('collectd', bootstrap_servers=kafka_server, auto_offset_reset='latest',
+                                 enable_auto_commit=False,
+                                 value_deserializer=lambda m: json.loads(m.decode('ascii')))
+
+        for message in consumer:
+            for kafka_data in message.value:
+                self.lock()
+                try:
+                    # Receive Kafka data and update cache values
+                    self.update_cache_value(kafka_data)
+
+                finally:
+                    self.unlock()
+
+
+def main():
+    # Parsing cmdline options
+    parser = argparse.ArgumentParser()
+    parser.add_argument("--config", dest="configfile", default=None, help="Specify config file", metavar="FILE")
+    parser.add_argument("--loglevel", dest="level", choices=['DEBUG', 'INFO', 'WARNING', 'ERROR'], default='WARNING',
+                        help="Specify log level (default: %(default)s)", metavar="LEVEL")
+    parser.add_argument("--logfile", dest="logfile", default='ves_plugin.log',
+                        help="Specify log file (default: %(default)s)", metavar="FILE")
+    args = parser.parse_args()
+
+    # Create log file
+    logging.basicConfig(filename=args.logfile, format='%(asctime)s %(message)s', level=args.level)
+
+    # Create Application Instance
+    application_instance = VESApp()
+
+    # Read from config file
+    if args.configfile:
+        config = ConfigParser.ConfigParser()
+        config.optionxform = lambda option: option
+        config.read(args.configfile)
+        # Write Config Values
+        application_instance.config(config)
+    else:
+        logging.warning("No configfile specified, using default options")
+
+    # Start timer for interval
+    application_instance.init()
+
+    # Read Data from Kakfa
+    try:
+        # Kafka consumer & update cache
+        application_instance.run()
+    except KeyboardInterrupt:
+        logging.info(" - Ctrl-C handled, exiting gracefully")
+        application_instance.shutdown()
+        sys.exit()
+    except:
+        logging.error("Unknown Error")
+
+
+if __name__ == '__main__':
+    main()
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin_config.conf
new file mode 100644 (file)
index 0000000..1dccd49
--- /dev/null
@@ -0,0 +1,13 @@
+[config]
+Domain = 127.0.0.1
+Port = 30000
+Path = vendor_event_listener
+Topic = example_vnf
+UseHttps = false
+Username =
+Password =
+FunctionalRole = Collectd VES Agent
+SendEventInterval = 20
+ApiVersion = 3
+KafkaPort = 9092
+KafkaBroker = localhost
\ No newline at end of file