collectd VES agent: collectd VES agent code 93/27693/5
authormaryam.tahhan <maryam.tahhan@intel.com>
Mon, 9 Jan 2017 17:13:27 +0000 (17:13 +0000)
committerMaryam Tahhan <maryam.tahhan@intel.com>
Fri, 3 Feb 2017 13:11:37 +0000 (13:11 +0000)
Add the code for the VES collectd agent to barometer.

Change-Id: I39d9b0af4003e381fcaee1a8cbd38d9e54d9ac33
Signed-off-by: Volodymyr Mytnyk <volodymyrx.mytnyk@intel.com>
Signed-off-by: Maryam Tahhan <maryam.tahhan@intel.com>
.gitignore
3rd_party/collectd-ves-plugin/LICENSE [new file with mode: 0644]
3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT [new file with mode: 0644]
3rd_party/collectd-ves-plugin/ves_plugin/__init__.py [new file with mode: 0644]
3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py [new file with mode: 0644]
docs/userguide/collectd.ves.userguide.rst [new file with mode: 0644]
docs/userguide/index.rst

index 2660108..07fdf03 100644 (file)
@@ -1,5 +1,6 @@
 *~
 *.sw?
+*.pyc
 /docs_build/
 /docs_output/
 /releng/
diff --git a/3rd_party/collectd-ves-plugin/LICENSE b/3rd_party/collectd-ves-plugin/LICENSE
new file mode 100644 (file)
index 0000000..8b515df
--- /dev/null
@@ -0,0 +1,19 @@
+Copyright Intel Corporation 2016-2017
+
+Permission is hereby granted, free of charge, to any person obtaining a copy of
+this software and associated documentation files (the "Software"), to deal in
+the Software without restriction, including without limitation the rights to
+use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies
+of the Software, and to permit persons to whom the Software is furnished to do
+so, subject to the following conditions:
+
+The above copyright notice and this permission notice shall be included in all
+copies or substantial portions of the Software.
+
+THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
+SOFTWARE.
diff --git a/3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT b/3rd_party/collectd-ves-plugin/PSF_LICENSE_AGREEMENT
new file mode 100644 (file)
index 0000000..666648b
--- /dev/null
@@ -0,0 +1,36 @@
+PSF LICENSE AGREEMENT FOR PYTHON 2.7.1
+1. This LICENSE AGREEMENT is between the Python Software Foundation (“PSF”),
+   and the Individual or Organization (“Licensee”) accessing and otherwise
+   using Python 2.7.1 software in source or binary form and its associated
+   documentation.
+2. Subject to the terms and conditions of this License Agreement, PSF hereby
+   grants Licensee a nonexclusive, royalty-free, world-wide license to
+   reproduce, analyze, test, perform and/or display publicly, prepare
+   derivative works, distribute, and otherwise use Python 2.7.1 alone or in any
+   derivative version, provided, however, that PSF’s License Agreement and
+   PSF’s notice of copyright, i.e., “Copyright © 2001-2010 Python Software
+   Foundation; All Rights Reserved” are retained in Python 2.7.1 alone or in
+   any derivative version prepared by Licensee.
+3. In the event Licensee prepares a derivative work that is based on or
+   incorporates Python 2.7.1 or any part thereof, and wants to make the
+   derivative work available to others as provided herein, then Licensee hereby
+   agrees to include in any such work a brief summary of the changes made to
+   Python 2.7.1.
+4. PSF is making Python 2.7.1 available to Licensee on an “AS IS” basis. PSF
+   MAKES NO REPRESENTATIONS OR WARRANTIES, EXPRESS OR IMPLIED. BY WAY OF
+   EXAMPLE, BUT NOT LIMITATION, PSF MAKES NO AND DISCLAIMS ANY REPRESENTATION OR
+   WARRANTY OF MERCHANTABILITY OR FITNESS FOR ANY PARTICULAR PURPOSE OR THAT
+   THE USE OF PYTHON 2.7.1 WILL NOT INFRINGE ANY THIRD PARTY RIGHTS.
+5. PSF SHALL NOT BE LIABLE TO LICENSEE OR ANY OTHER USERS OF PYTHON 2.7.1 FOR
+   ANY INCIDENTAL, SPECIAL, OR CONSEQUENTIAL DAMAGES OR LOSS AS A RESULT OF
+   MODIFYING, DISTRIBUTING, OR OTHERWISE USING PYTHON 2.7.1, OR ANY DERIVATIVE
+   THEREOF, EVEN IF ADVISED OF THE POSSIBILITY THEREOF.
+6. This License Agreement will automatically terminate upon a material breach
+   of its terms and conditions.
+7. Nothing in this License Agreement shall be deemed to create any relationship
+   of agency, partnership, or joint venture between PSF and Licensee. This
+   License Agreement does not grant permission to use PSF trademarks or trade name
+   in a trademark sense to endorse or promote products or services of Licensee,
+   or any third party. 8. By copying, installing or otherwise using Python
+   2.7.1, Licensee agrees to be bound by the terms and conditions of this License
+   Agreement.
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py b/3rd_party/collectd-ves-plugin/ves_plugin/__init__.py
new file mode 100644 (file)
index 0000000..1656d43
--- /dev/null
@@ -0,0 +1,21 @@
+# MIT License
+#
+# Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+# DEALINGS IN THE SOFTWARE.
diff --git a/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py b/3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
new file mode 100644 (file)
index 0000000..a9715ad
--- /dev/null
@@ -0,0 +1,618 @@
+# MIT License
+#
+# Copyright(c) 2016-2017 Intel Corporation. All rights reserved.
+#
+# Permission is hereby granted, free of charge, to any person obtaining a
+# copy of this software and associated documentation files (the "Software"),
+# to deal in the Software without restriction, including without limitation
+# the rights to use, copy, modify, merge, publish, distribute, sublicense,
+# and/or sell copies of the Software, and to permit persons to whom the
+# Software is furnished to do so, subject to the following conditions:
+#
+# The above copyright notice and this permission notice shall be included in
+# all copies or substantial portions of the Software.
+#
+# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+# DEALINGS IN THE SOFTWARE.
+
+import collectd
+import json
+import sys
+import base64
+import urllib2
+import socket
+import time
+from threading import Timer
+from threading import Lock
+
+class Event(object):
+    """Event header"""
+
+    def __init__(self):
+        """Construct the common header"""
+        self.version = 1.1
+        self.event_type = "Info" # use "Info" unless a notification is generated
+        self.domain = ""
+        self.event_id = ""
+        self.source_id = ""
+        self.source_name = ""
+        self.functional_role = ""
+        self.reporting_entity_id = ""
+        self.reporting_entity_name = ""
+        self.priority = "Normal" # will be derived from event if there is one
+        self.start_epoch_microsec = 0
+        self.last_epoch_micro_sec = 0
+        self.sequence = 0
+
+    def get_json(self):
+        """Get the object of the datatype"""
+        obj = {}
+        obj['version'] = self.version
+        obj['eventType'] = self.event_type
+        obj['domain'] = self.domain
+        obj['eventId'] = self.event_id
+        obj['sourceId'] = self.source_id
+        obj['sourceName'] = self.source_name
+        obj['functionalRole'] = self.functional_role
+        obj['reportingEntityId'] = self.reporting_entity_id
+        obj['reportingEntityName'] = self.reporting_entity_name
+        obj['priority'] = self.priority
+        obj['startEpochMicrosec'] = self.start_epoch_microsec
+        obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
+        obj['sequence'] = self.sequence
+        return json.dumps({
+            'event' : {
+                'commonEventHeader' : obj,
+                self.get_name() : self.get_obj()
+            }
+        })
+
+    def get_name():
+        assert False, 'abstract method get_name() is not implemented'
+
+    def get_obj():
+        assert False, 'abstract method get_obj() is not implemented'
+
+class MeasurementGroup(object):
+    """MeasurementGroup datatype"""
+
+    def __init__(self, name):
+        self.name = name
+        self.measurement = []
+        pass
+
+    def add_measurement(self, name, value):
+        self.measurement.append({
+            'name' : name,
+            'value' : value
+        })
+
+    def get_obj(self):
+        return {
+            'name' : self.name,
+            'measurements' : self.measurement
+        }
+
+class MeasurementsForVfScaling(Event):
+    """MeasurementsForVfScaling datatype"""
+
+    def __init__(self, event_id):
+        """Construct the header"""
+        super(MeasurementsForVfScaling, self).__init__()
+        # common attributes
+        self.domain = "measurementsForVfScaling"
+        self.event_id = event_id
+        # measurement attributes
+        self.additional_measurements = []
+        self.aggregate_cpu_usage = 0
+        self.codec_usage_array = []
+        self.concurrent_sessions = 0
+        self.configured_entities = 0
+        self.cpu_usage_array = []
+        self.errors = []
+        self.feature_usage_array = []
+        self.filesystem_usage_array = []
+        self.latency_distribution = []
+        self.mean_request_latency = 0
+        self.measurement_fields_version = 1.1
+        self.measurement_interval = 0
+        self.memory_configured = 0
+        self.memory_used = 0
+        self.number_of_media_ports_in_use = 0
+        self.request_rate = 0
+        self.vnfc_scaling_metric = 0
+        self.v_nic_usage_array = []
+
+    def add_measurement_group(self, group):
+        self.additional_measurements.append(group.get_obj())
+
+    def add_cpu_usage(self, cpu_identifier, usage):
+        self.cpu_usage_array.append({
+            'cpuIdentifier' : cpu_identifier,
+            'percentUsage' : usage
+        })
+
+    def add_v_nic_usage(self, if_name, if_pkts, if_bytes):
+        self.v_nic_usage_array.append({
+            'broadcastPacketsIn' : 0.0,
+            'broadcastPacketsOut' : 0.0,
+            'multicastPacketsIn' : 0.0,
+            'multicastPacketsOut' : 0.0,
+            'unicastPacketsIn' : 0.0,
+            'unicastPacketsOut' : 0.0,
+            'vNicIdentifier' : if_name,
+            'packetsIn' : if_pkts[0],
+            'packetsOut' : if_pkts[1],
+            'bytesIn' : if_bytes[0],
+            'bytesOut' : if_bytes[1]
+        })
+
+    def get_obj(self):
+        """Get the object of the datatype"""
+        obj = {}
+        obj['additionalMeasurements'] = self.additional_measurements
+        obj['aggregateCpuUsage'] = self.aggregate_cpu_usage
+        obj['codecUsageArray'] = self.codec_usage_array
+        obj['concurrentSessions'] = self.concurrent_sessions
+        obj['configuredEntities'] = self.configured_entities
+        obj['cpuUsageArray'] = self.cpu_usage_array
+        obj['errors'] = self.errors
+        obj['featureUsageArray'] = self.feature_usage_array
+        obj['filesystemUsageArray'] = self.filesystem_usage_array
+        obj['latencyDistribution'] = self.latency_distribution
+        obj['meanRequestLatency'] = self.mean_request_latency
+        obj['measurementFieldsVersion'] = self.measurement_fields_version
+        obj['measurementInterval'] = self.measurement_interval
+        obj['memoryConfigured'] = self.memory_configured
+        obj['memoryUsed'] = self.memory_used
+        obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
+        obj['requestRate'] = self.request_rate
+        obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
+        obj['vNicUsageArray'] = self.v_nic_usage_array
+        return obj
+
+    def get_name(self):
+        """Name of datatype"""
+        return "measurementsForVfScalingFields"
+
+class Fault(Event):
+    """Fault datatype"""
+
+    def __init__(self, event_id):
+        """Construct the header"""
+        super(Fault, self).__init__()
+        # common attributes
+        self.domain = "fault"
+        self.event_id = event_id
+        self.event_type = "Fault"
+        # fault attributes
+        self.fault_fields_version = 1.1
+        self.event_severity = 'NORMAL'
+        self.event_source_type = 'other(0)'
+        self.alarm_condition = ''
+        self.specific_problem = ''
+        self.vf_status = 'Active'
+        self.alarm_interface_a = ''
+        self.alarm_additional_information = []
+
+    def get_name(self):
+        """Name of datatype"""
+        return 'faultFields'
+
+    def get_obj(self):
+        """Get the object of the datatype"""
+        obj = {}
+        obj['faultFieldsVersion'] = self.fault_fields_version
+        obj['eventSeverity'] = self.event_severity
+        obj['eventSourceType'] = self.event_source_type
+        obj['alarmCondition'] = self.alarm_condition
+        obj['specificProblem'] = self.specific_problem
+        obj['vfStatus'] = self.vf_status
+        obj['alarmInterfaceA'] = self.alarm_interface_a
+        obj['alarmAdditionalInformation'] = self.alarm_additional_information
+        return obj
+
+class VESPlugin(object):
+    """VES plugin with collectd callbacks"""
+
+    def __init__(self):
+        """Plugin initialization"""
+        self.__plugin_data_cache = {
+            'cpu' : {'interval' : 0.0, 'vls' : []},
+            'cpu-aggregation' : {'interval' : 0.0, 'vls' : []},
+            'virt' : {'interval' : 0.0, 'vls' : []},
+            'disk' : {'interval' : 0.0, 'vls' : []},
+            'interface' : {'interval' : 0.0, 'vls' : []},
+            'memory' : {'interval' : 0.0, 'vls' : []}
+        }
+        self.__plugin_config = {
+            'Domain' : '127.0.0.1',
+            'Port' : 30000.0,
+            'Path' : '',
+            'Username' : '',
+            'Password' : '',
+            'Topic' : '',
+            'UseHttps' : False,
+            'SendEventInterval' : 20.0,
+            'FunctionalRole' : 'Collectd VES Agent',
+            'GuestRunning' : False
+        }
+        self.__host_name = None
+        self.__ves_timer = None
+        self.__event_timer_interval = 20.0
+        self.__lock = Lock()
+        self.__event_id = 0
+
+    def get_event_id(self):
+        """get event id"""
+        self.__event_id += 1
+        return str(self.__event_id)
+
+    def lock(self):
+        """Lock the plugin"""
+        self.__lock.acquire()
+
+    def unlock(self):
+        """Unlock the plugin"""
+        self.__lock.release()
+
+    def start_timer(self):
+        """Start event timer"""
+        self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time)
+        self.__ves_timer.start()
+
+    def stop_timer(self):
+        """Stop event timer"""
+        self.__ves_timer.cancel()
+
+    def __on_time(self):
+        """Timer thread"""
+        self.start_timer()
+        self.event_timer()
+
+    def event_send(self, event):
+        """Send event to VES"""
+        server_url = "http{}://{}:{}/{}eventListener/v1{}".format(
+            's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'],
+            int(self.__plugin_config['Port']), '{}/'.format(
+            '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '',
+            self.__plugin_config['Topic'])
+        collectd.info('Vendor Event Listener is at: {}'.format(server_url))
+        credentials = base64.b64encode('{}:{}'.format(
+            self.__plugin_config['Username'], self.__plugin_config['Password']))
+        collectd.info('Authentication credentials are: {}'.format(credentials))
+        try:
+            request = urllib2.Request(server_url)
+            request.add_header('Authorization', 'Basic {}'.format(credentials))
+            request.add_header('Content-Type', 'application/json')
+            collectd.debug("Sending {} to {}".format(event.get_json(), server_url))
+            vel = urllib2.urlopen(request, event.get_json(), timeout=1)
+        except urllib2.HTTPError as e:
+            collectd.error('Vendor Event Listener exception: {}'.format(e))
+        except urllib2.URLError as e:
+            collectd.error('Vendor Event Listener is is not reachable: {}'.format(e))
+
+    def bytes_to_gb(self, bytes):
+        """Convert bytes to GB"""
+        return round((bytes / 1073741824.0), 3)
+
+    def get_hostname(self):
+        if len(self.__host_name):
+            return self.__host_name
+        return socket.gethostname()
+
+    def event_timer(self):
+        """Event timer thread"""
+        self.lock()
+        try:
+            if (self.__plugin_config['GuestRunning']):
+                # if we running on a guest only, send 'additionalMeasurements' only
+                measurement = MeasurementsForVfScaling(self.get_event_id())
+                measurement.functional_role = self.__plugin_config['FunctionalRole']
+                # add host/guest values as additional measurements
+                self.fill_additional_measurements(measurement, exclude_plugins=[
+                    'cpu', 'cpu-aggregation', 'memory', 'disk', 'interface', 'virt'])
+                # fill out reporting & source entities
+                reporting_entity = self.get_hostname()
+                measurement.reporting_entity_id = reporting_entity
+                measurement.reporting_entity_name = reporting_entity
+                measurement.source_id = reporting_entity
+                measurement.source_name = measurement.source_id
+                measurement.start_epoch_microsec = (time.time() * 1000000)
+                measurement.measurement_interval = self.__plugin_config['SendEventInterval']
+                # total CPU
+                total_cpu_system = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='system')
+                total_cpu_user = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='user')
+                measurement.aggregate_cpu_usage = round(total_cpu_system[0]['values'][0] +
+                                                    total_cpu_user[0]['values'][0], 2)
+                # CPU per each instance
+                cpux_system = self.cache_get_value(plugin_name='cpu', type_instance='system',
+                                                  mark_as_read = False)
+                for cpu_inst in [x['plugin_instance'] for x in cpux_system]:
+                    cpu_system = self.cache_get_value(plugin_name='cpu',
+                                                      plugin_instance=cpu_inst, type_instance='system')
+                    cpu_user = self.cache_get_value(plugin_name='cpu',
+                                                      plugin_instance=cpu_inst, type_instance='user')
+                    cpu_usage = round(cpu_system[0]['values'][0] + cpu_user[0]['values'][0], 2)
+                    measurement.add_cpu_usage(cpu_inst, cpu_usage)
+                # fill memory used
+                memory_used = self.cache_get_value(plugin_name='memory', type_name='memory', type_instance='used')
+                if len(memory_used) > 0:
+                    measurement.memory_used = self.bytes_to_gb(memory_used[0]['values'][0])
+                # if_packets
+                ifinfo = {}
+                if_stats = self.cache_get_value(plugin_name='interface', type_name='if_packets')
+                if len(if_stats) > 0:
+                    for if_stat in if_stats:
+                        ifinfo[if_stat['plugin_instance']] = {
+                            'pkts' : (if_stat['values'][0], if_stat['values'][1])
+                        }
+                # go through all interfaces and get if_octets
+                for if_name in ifinfo.keys():
+                    if_stats = self.cache_get_value(plugin_instance=if_name, plugin_name='interface',
+                                                    type_name='if_octets')
+                    if len(if_stats) > 0:
+                        ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
+                # fill vNicUsageArray filed in the event
+                for if_name in ifinfo.keys():
+                    measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
+                # send event to the VES
+                self.event_send(measurement)
+                return
+            # get list of all VMs
+            virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
+                                                   mark_as_read=False)
+            vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
+            for vm_name in vm_names:
+                # make sure that 'virt' plugin cache is up-to-date
+                vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
+                                                 mark_as_read=False)
+                us_up_to_date = True
+                for vm_value in vm_values:
+                    if vm_value['updated'] == False:
+                        us_up_to_date = False
+                        break
+                if not us_up_to_date:
+                        # one of the cache value is not up-to-date, break
+                        collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name))
+                        continue
+                # if values are up-to-date, create an event message
+                measurement = MeasurementsForVfScaling(self.get_event_id())
+                measurement.functional_role = self.__plugin_config['FunctionalRole']
+                # fill out reporting_entity
+                reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name)
+                measurement.reporting_entity_id = reporting_entity
+                measurement.reporting_entity_name = reporting_entity
+                # virt_cpu_total
+                virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name,
+                                                       plugin_name='virt', type_name='virt_cpu_total')
+                if len(virt_vcpu_total) > 0:
+                    measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0])
+                    # set source as a host for virt_vcpu_total value
+                    measurement.source_id = virt_vcpu_total[0]['host']
+                    measurement.source_name = measurement.source_id
+                    # fill out EpochMicrosec (convert to us)
+                    measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
+                # virt_vcp
+                virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
+                                                  plugin_name='virt', type_name='virt_vcpu')
+                if len(virt_vcpus) > 0:
+                    for virt_vcpu in virt_vcpus:
+                        cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
+                        measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
+                # plugin interval
+                measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
+                # memory-total
+                memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                    type_name='memory', type_instance='total')
+                if len(memory_total) > 0:
+                    measurement.memory_configured = self.bytes_to_gb(memory_total[0]['values'][0])
+                # memory-rss
+                memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                  type_name='memory', type_instance='rss')
+                if len(memory_rss) > 0:
+                    measurement.memory_used = self.bytes_to_gb(memory_rss[0]['values'][0])
+                # if_packets
+                ifinfo = {}
+                if_stats = self.cache_get_value(plugin_instance=vm_name,
+                                                plugin_name='virt', type_name='if_packets')
+                if len(if_stats) > 0:
+                    for if_stat in if_stats:
+                        ifinfo[if_stat['type_instance']] = {
+                            'pkts' : (if_stat['values'][0], if_stat['values'][1])
+                        }
+                # go through all interfaces and get if_octets
+                for if_name in ifinfo.keys():
+                    if_stats = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                    type_name='if_octets', type_instance=if_name)
+                    if len(if_stats) > 0:
+                        ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
+                # fill vNicUsageArray filed in the event
+                for if_name in ifinfo.keys():
+                    measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
+                # add host/guest values as additional measurements
+                self.fill_additional_measurements(measurement, ['virt'])
+                # send event to the VES
+                self.event_send(measurement)
+        finally:
+            self.unlock()
+
+    def fill_additional_measurements(self, measurement, exclude_plugins=None):
+        """Fill out addition measurement filed with host/guets values"""
+        # add host/guest values as additional measurements
+        for plugin_name in self.__plugin_data_cache.keys():
+            if (exclude_plugins != None and plugin_name in exclude_plugins):
+                # skip host-only values
+                continue;
+            for val in self.__plugin_data_cache[plugin_name]['vls']:
+                if val['updated']:
+                    mgroup_name = '{}{}{}'.format(plugin_name,
+                        '-{}'.format(val['plugin_instance']) if len(val['plugin_instance']) else '',
+                        '-{}'.format(val['type_instance']) if len(val['type_instance']) else '')
+                    mgroup = MeasurementGroup(mgroup_name)
+                    ds = collectd.get_dataset(val['type'])
+                    for index in xrange(len(ds)):
+                        mname = '{}-{}'.format(val['type'], ds[index][0])
+                        mgroup.add_measurement(mname, str(val['values'][index]))
+                    measurement.add_measurement_group(mgroup);
+                    val['updated'] = False
+
+    def cpu_ns_to_percentage(self, vl):
+        """Convert CPU usage ns to CPU %"""
+        total = vl['values'][0]
+        total_time = vl['time']
+        pre_total = vl['pre_values'][0]
+        pre_total_time = vl['pre_time']
+        if (total_time - pre_total_time) == 0:
+            # return zero usage if time diff is zero
+            return 0.0
+        percent = (100.0 * (total - pre_total))/((total_time - pre_total_time) * 1000000000.0)
+        collectd.debug("pre_time={}, pre_value={}, time={}, value={}, cpu={}%".format(
+            pre_total_time, pre_total, total_time, total, round(percent, 2)))
+        return round(percent, 2)
+
+    def config(self, config):
+        """Collectd config callback"""
+        for child in config.children:
+            # check the config entry name
+            if child.key not in self.__plugin_config:
+                collectd.error("Key '{}' name is invalid".format(child.key))
+                raise RuntimeError('Configuration key name error')
+            # check the config entry value type
+            if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]):
+                collectd.error("Key '{}' value type should be {}".format(
+                               child.key, str(type(self.__plugin_config[child.key]))))
+                raise RuntimeError('Configuration key value error')
+            # store the value in configuration
+            self.__plugin_config[child.key] = child.values[0]
+
+    def init(self):
+        """Collectd init callback"""
+        # start the VES timer
+        self.start_timer()
+
+    ##
+    # Please note, the cache should be locked before using this function
+    #
+    def update_cache_value(self, vl):
+        """Update value internal collectD cache values or create new one"""
+        found = False
+        if vl.plugin not in self.__plugin_data_cache:
+             self.__plugin_data_cache[vl.plugin] = {'vls': []}
+        plugin_vl = self.__plugin_data_cache[vl.plugin]['vls']
+        for index in xrange(len(plugin_vl)):
+            # record found, so just update time the values
+            if (plugin_vl[index]['plugin_instance'] ==
+                vl.plugin_instance) and (plugin_vl[index]['type_instance'] ==
+                    vl.type_instance) and (plugin_vl[index]['type'] == vl.type):
+                plugin_vl[index]['pre_time'] = plugin_vl[index]['time']
+                plugin_vl[index]['time'] = vl.time
+                plugin_vl[index]['pre_values'] = plugin_vl[index]['values']
+                plugin_vl[index]['values'] = vl.values
+                plugin_vl[index]['updated'] = True
+                found = True
+                break
+        if not found:
+            value = {}
+            # create new cache record
+            value['plugin_instance'] = vl.plugin_instance
+            value['type_instance'] = vl.type_instance
+            value['values'] = vl.values
+            value['pre_values'] = vl.values
+            value['type'] = vl.type
+            value['time'] = vl.time
+            value['pre_time'] = vl.time
+            value['host'] = vl.host
+            value['updated'] = True
+            self.__plugin_data_cache[vl.plugin]['vls'].append(value)
+            # update plugin interval based on one received in the value
+            self.__plugin_data_cache[vl.plugin]['interval'] = vl.interval
+
+    def cache_get_value(self, plugin_name=None, plugin_instance=None,
+                        type_name=None, type_instance=None, type_names=None, mark_as_read=True):
+        """Get cache value by given criteria"""
+        ret_list = []
+        if plugin_name in self.__plugin_data_cache:
+            for val in self.__plugin_data_cache[plugin_name]['vls']:
+                #collectd.info("plugin={}, type={}, type_instance={}".format(
+                #    plugin_name, val['type'], val['type_instance']))
+                if (type_name == None or type_name == val['type']) and (plugin_instance == None
+                    or plugin_instance == val['plugin_instance']) and (type_instance == None
+                    or type_instance == val['type_instance']) and (type_names == None
+                    or val['type'] in type_names):
+                    if mark_as_read:
+                        val['updated'] = False
+                    ret_list.append(val)
+        return ret_list
+
+    def write(self, vl, data=None):
+        """Collectd write callback"""
+        self.lock()
+        try:
+            # Example of collectD Value format
+            # collectd.Values(type='cpu',type_instance='interrupt',
+            # plugin='cpu',plugin_instance='25',host='localhost',
+            # time=1476694097.022873,interval=10.0,values=[0])
+            if vl.plugin == 'ves_plugin':
+                # store the host name and unregister callback
+                self.__host_name = vl.host
+                collectd.unregister_read(self.read)
+                return
+            # update the cache values
+            self.update_cache_value(vl)
+        finally:
+            self.unlock()
+
+    def read(self, data=None):
+        """Collectd read callback. Use this callback to get host name"""
+        vl = collectd.Values(type='gauge')
+        vl.plugin='ves_plugin'
+        vl.dispatch(values=[0])
+
+    def notify(self, n):
+        """Collectd notification callback"""
+        collectd_event_severity_map = {
+            collectd.NOTIF_FAILURE : 'CRITICAL',
+            collectd.NOTIF_WARNING : 'WARNING',
+            collectd.NOTIF_OKAY : 'NORMAL'
+        }
+        fault = Fault(self.get_event_id())
+        # fill out common header
+        fault.event_type = "Notification"
+        fault.functional_role = self.__plugin_config['FunctionalRole']
+        fault.reporting_entity_id = self.get_hostname()
+        fault.reporting_entity_name = self.get_hostname()
+        fault.source_id = self.get_hostname()
+        fault.source_name = self.get_hostname()
+        fault.start_epoch_microsec = (n.time * 1000000)
+        fault.last_epoch_micro_sec = fault.start_epoch_microsec
+        # fill out fault header
+        fault.event_severity = collectd_event_severity_map[n.severity]
+        fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance
+            if len(n.plugin_instance) else ''), n.type_instance)
+        fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format(
+            n.plugin_instance if len(n.plugin_instance) else ''))
+        fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)'
+        fault.alarm_condition = n.message
+        self.event_send(fault)
+
+    def shutdown(self):
+        """Collectd shutdown callback"""
+        # stop the timer
+        self.stop_timer()
+
+# The collectd plugin instance
+plugin_instance = VESPlugin()
+
+# Register plugin callbacks
+collectd.register_config(plugin_instance.config)
+collectd.register_init(plugin_instance.init)
+collectd.register_read(plugin_instance.read)
+collectd.register_write(plugin_instance.write)
+collectd.register_notification(plugin_instance.notify)
+collectd.register_shutdown(plugin_instance.shutdown)
diff --git a/docs/userguide/collectd.ves.userguide.rst b/docs/userguide/collectd.ves.userguide.rst
new file mode 100644 (file)
index 0000000..a83b646
--- /dev/null
@@ -0,0 +1,196 @@
+.. This work is licensed under a Creative Commons Attribution 4.0 International License.
+.. http://creativecommons.org/licenses/by/4.0
+.. (c) OPNFV, Intel Corporation and others.
+
+collectd VES plugin
+===================
+The Barometer repository contains a python based write plugin for VES.
+
+The plugin currently supports pushing platform relevant metrics through the
+additional measurements field for VES.
+
+**Please note**: Hardcoded configuration values will be modified so that they
+are configurable through the configuration file.
+
+Installation Instructions:
+--------------------------
+1. Clone this repo
+2. Install collectd
+
+.. code:: bash
+
+   $ sudo apt-get install collectd
+
+3. Modify the collectd configuration script: `/etc/collectd/collectd.conf`
+
+.. code:: bash
+
+    <LoadPlugin python>
+      Globals true
+    </LoadPlugin>
+
+    <Plugin python>
+      ModulePath "/path/to/your/python/modules"
+      LogTraces true
+      Interactive false
+      Import "ves_plugin"
+    <Module ves_plugin>
+    # VES plugin configuration (see next section below)
+    </Module>
+    </Plugin>
+
+where "/path/to/your/python/modules" is the path to where you cloned this repo
+
+VES python plugin configuration description:
+--------------------------------------------
+
+**Note** Details of the Vendor Event Listener REST service
+
+REST resources are defined with respect to a ServerRoot:
+
+.. code:: bash
+
+    ServerRoot = https://{Domain}:{Port}/{optionalRoutingPath}
+
+REST resources are of the form:
+
+.. code:: bash
+
+    {ServerRoot}/eventListener/v{apiVersion}`
+    {ServerRoot}/eventListener/v{apiVersion}/{topicName}`
+    {ServerRoot}/eventListener/v{apiVersion}/eventBatch`
+
+
+**Domain** *"host"*
+* VES domain name. It can be IP address or hostname of VES collector
+(default: `127.0.0.1`)
+
+**Port** *port*
+* VES port (default: `30000`)
+
+**Path** *"path"*
+* Used as the "optionalRoutingPath" element in the REST path (default: `empty`)
+
+**Topic** *"path"*
+* Used as the "topicName" element in the REST  path (default: `empty`)
+
+**UseHttps** *true|false*
+* Allow plugin to use HTTPS instead of HTTP (default: `false`)
+
+**Username** *"username"*
+* VES collector user name (default: `empty`)
+
+**Password** *"passwd"*
+* VES collector password (default: `empty`)
+
+**FunctionalRole** *"role"*
+* Used as the 'functionalRole' field of 'commonEventHeader' event (default:
+`Collectd VES Agent`)
+
+**GuestRunning** *true|false*
+* This option is used if the collectd is running on a guest machine, e.g this
+option should be set to `true` in this case. Defaults to `false`.
+
+Other collectd.conf configurations
+----------------------------------
+Please ensure that FQDNLookup is set to false
+
+.. code:: bash
+
+    FQDNLookup   false
+
+
+Please ensure that the virt plugin is enabled and configured as follows. This configuration
+is is required only on a host side ('GuestRunning' = false).
+
+.. code:: bash
+
+    LoadPlugin virt
+
+    <Plugin virt>
+            Connection "qemu:///system"
+            RefreshInterval 60
+            HostnameFormat uuid
+    </Plugin>
+
+Please ensure that the cpu plugin is enabled and configured as follows
+
+.. code:: bash
+
+    LoadPlugin cpu
+
+    <Plugin cpu>
+        ReportByCpu false
+        ValuesPercentage true
+    </Plugin>
+
+Please ensure that the aggregation plugin is enabled and configured as follows
+
+.. code:: bash
+
+    LoadPlugin aggregation
+
+    <Plugin aggregation>
+        <Aggregation>
+                Plugin "cpu"
+                Type "percent"
+                GroupBy "Host"
+                GroupBy "TypeInstance"
+                SetPlugin "cpu-aggregation"
+                CalculateAverage true
+        </Aggregation>
+    </Plugin>
+
+If plugin is running on a guest side, it is important to enable uuid plugin
+too. In this case the hostname in event message will be represented as UUID
+instead of system host name.
+
+LoadPlugin uuid
+
+If custom UUID needs to be provided, the following configuration is required in collectd.conf
+file:
+
+.. code:: bash
+
+    <Plugin uuid>
+        UUIDFile "/etc/uuid"
+    </Plugin>
+
+Where "/etc/uuid" is a file containing custom UUID.
+
+Please also ensure that the following plugins are enabled:
+
+.. code:: bash
+
+    LoadPlugin disk
+    LoadPlugin interface
+    LoadPlugin memory
+
+VES plugin notification example
+-------------------------------
+
+A good example of collectD notification is monitoring of CPU load on a host or guest using
+'threshold' plugin. The following configuration will setup VES plugin to send 'Fault'
+event every time a CPU idle value is out of range (e.g.: WARNING: CPU-IDLE < 50%, CRITICAL:
+CPU-IDLE < 30%) and send 'Fault' NORMAL event if CPU idle value is back to normal.
+
+.. code:: bash
+
+    LoadPlugin threshold
+
+    <Plugin "threshold">
+         <Plugin "cpu-aggregation">
+            <Type "percent">
+              WarningMin    50.0
+              WarningMax   100.0
+              FailureMin    30.0
+              FailureMax   100.0
+              Instance "idle"
+              Hits 1
+            </Type>
+        </Plugin>
+    </Plugin>
+
+More detailed information on how to configure collectD thresholds(memory, cpu
+etc.) can be found here at
+https://collectd.org/documentation/manpages/collectd-threshold.5.shtml
index bf62a43..aaeadf9 100644 (file)
@@ -16,3 +16,4 @@ Barometer user guide
      :maxdepth: 3
 
      collectd.userguide.rst
+     collectd.ves.userguide.rst