VES plugin: update to latest VES event schema 29/28429/4
authorMytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
Wed, 8 Feb 2017 16:01:05 +0000 (16:01 +0000)
committerMytnyk, Volodymyr <volodymyrx.mytnyk@intel.com>
Thu, 18 May 2017 16:11:00 +0000 (17:11 +0100)
- Updated the plugin to lates VES schema
- Migrated VES plugin to python3 (with backward compatibility)
- Fixed issue with CPU total calculation
- Fixed VES documentation

Change-Id: Ic8b0419146a9c75a48907f39adda1351f3b3bc73
Signed-off-by: Mytnyk, VolodymyrX <volodymyrx.mytnyk@intel.com>
3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
docs/release/userguide/collectd.ves.userguide.rst

index 3e30089..1ab8f67 100644 (file)
@@ -24,7 +24,12 @@ import collectd
 import json
 import sys
 import base64
-import urllib2
+try:
+    # For Python 3.0 and later
+    import urllib.request as url
+except ImportError:
+    # Fall back to Python 2's urllib2
+    import urllib2 as url
 import socket
 import time
 from threading import Timer
@@ -70,7 +75,7 @@ class Event(object):
                 'commonEventHeader' : obj,
                 self.get_name() : self.get_obj()
             }
-        })
+        }).encode()
 
     def get_name():
         assert False, 'abstract method get_name() is not implemented'
@@ -114,7 +119,12 @@ class MeasurementsForVfScaling(Event):
         self.concurrent_sessions = 0
         self.configured_entities = 0
         self.cpu_usage_array = []
-        self.errors = []
+        self.errors = {
+            "receiveDiscards" : 0,
+            "receiveErrors" : 0,
+            "transmitDiscards" : 0,
+            "transmitErrors" : 0
+        }
         self.feature_usage_array = []
         self.filesystem_usage_array = []
         self.latency_distribution = []
@@ -240,11 +250,11 @@ class VESPlugin(object):
             'UseHttps' : False,
             'SendEventInterval' : 20.0,
             'FunctionalRole' : 'Collectd VES Agent',
-            'GuestRunning' : False
+            'GuestRunning' : False,
+            'ApiVersion' : 1.0
         }
         self.__host_name = None
         self.__ves_timer = None
-        self.__event_timer_interval = 20.0
         self.__lock = Lock()
         self.__event_id = 0
 
@@ -263,7 +273,7 @@ class VESPlugin(object):
 
     def start_timer(self):
         """Start event timer"""
-        self.__ves_timer = Timer(self.__event_timer_interval, self.__on_time)
+        self.__ves_timer = Timer(self.__plugin_config['SendEventInterval'], self.__on_time)
         self.__ves_timer.start()
 
     def stop_timer(self):
@@ -272,30 +282,34 @@ class VESPlugin(object):
 
     def __on_time(self):
         """Timer thread"""
-        self.start_timer()
         self.event_timer()
+        self.start_timer()
 
     def event_send(self, event):
         """Send event to VES"""
-        server_url = "http{}://{}:{}/{}eventListener/v3{}".format(
+        server_url = "http{}://{}:{}{}/eventListener/v{}{}".format(
             's' if self.__plugin_config['UseHttps'] else '', self.__plugin_config['Domain'],
-            int(self.__plugin_config['Port']), '{}/'.format(
-            '/{}'.format(self.__plugin_config['Path'])) if (len(self.__plugin_config['Path']) > 0) else '',
-            self.__plugin_config['Topic'])
+            int(self.__plugin_config['Port']), '{}'.format(
+            '/{}'.format(self.__plugin_config['Path']) if (len(self.__plugin_config['Path']) > 0) else ''),
+            int(self.__plugin_config['ApiVersion']), '{}'.format(
+            '/{}'.format(self.__plugin_config['Topic']) if (len(self.__plugin_config['Topic']) > 0) else ''))
         collectd.info('Vendor Event Listener is at: {}'.format(server_url))
         credentials = base64.b64encode('{}:{}'.format(
-            self.__plugin_config['Username'], self.__plugin_config['Password']))
+            self.__plugin_config['Username'], self.__plugin_config['Password']).encode()).decode()
         collectd.info('Authentication credentials are: {}'.format(credentials))
         try:
-            request = urllib2.Request(server_url)
+            request = url.Request(server_url)
             request.add_header('Authorization', 'Basic {}'.format(credentials))
             request.add_header('Content-Type', 'application/json')
             collectd.debug("Sending {} to {}".format(event.get_json(), server_url))
-            vel = urllib2.urlopen(request, event.get_json(), timeout=1)
-        except urllib2.HTTPError as e:
+            vel = url.urlopen(request, event.get_json(), timeout=1)
+            collectd.debug("Sent data to {} successfully".format(server_url))
+        except url.HTTPError as e:
             collectd.error('Vendor Event Listener exception: {}'.format(e))
-        except urllib2.URLError as e:
+        except url.URLError as e:
             collectd.error('Vendor Event Listener is is not reachable: {}'.format(e))
+        except:
+            collectd.error('Vendor Event Listener unknown error')
 
     def bytes_to_gb(self, bytes):
         """Convert bytes to GB"""
@@ -388,23 +402,24 @@ class VESPlugin(object):
                 reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name)
                 measurement.reporting_entity_id = reporting_entity
                 measurement.reporting_entity_name = reporting_entity
+                # virt_vcpu
+                virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
+                                                  plugin_name='virt', type_name='virt_vcpu')
+                if len(virt_vcpus) > 0:
+                    for virt_vcpu in virt_vcpus:
+                        cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
+                        measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
                 # virt_cpu_total
                 virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name,
                                                        plugin_name='virt', type_name='virt_cpu_total')
                 if len(virt_vcpu_total) > 0:
-                    measurement.aggregate_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0])
+                    average_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) / len(virt_vcpus)
+                    measurement.aggregate_cpu_usage = average_cpu_usage
                     # set source as a host for virt_vcpu_total value
                     measurement.source_id = virt_vcpu_total[0]['host']
                     measurement.source_name = measurement.source_id
                     # fill out EpochMicrosec (convert to us)
                     measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
-                # virt_vcp
-                virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
-                                                  plugin_name='virt', type_name='virt_vcpu')
-                if len(virt_vcpus) > 0:
-                    for virt_vcpu in virt_vcpus:
-                        cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
-                        measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
                 # plugin interval
                 measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
                 # memory-total
@@ -456,7 +471,7 @@ class VESPlugin(object):
                         '-{}'.format(val['type_instance']) if len(val['type_instance']) else '')
                     mgroup = MeasurementGroup(mgroup_name)
                     ds = collectd.get_dataset(val['type'])
-                    for index in xrange(len(ds)):
+                    for index in range(len(ds)):
                         mname = '{}-{}'.format(val['type'], ds[index][0])
                         mgroup.add_measurement(mname, str(val['values'][index]))
                     measurement.add_measurement_group(mgroup);
@@ -476,6 +491,10 @@ class VESPlugin(object):
             pre_total_time, pre_total, total_time, total, round(percent, 2)))
         return round(percent, 2)
 
+    def make_dash_string(self, *args):
+        """Join non empty strings with dash symbol"""
+        return '-'.join(filter(lambda x: len(x) > 0, args))
+
     def config(self, config):
         """Collectd config callback"""
         for child in config.children:
@@ -485,8 +504,9 @@ class VESPlugin(object):
                 raise RuntimeError('Configuration key name error')
             # check the config entry value type
             if len(child.values) == 0 or type(child.values[0]) != type(self.__plugin_config[child.key]):
-                collectd.error("Key '{}' value type should be {}".format(
-                               child.key, str(type(self.__plugin_config[child.key]))))
+                collectd.error("Key '{}' value type '{}' should be {}".format(
+                               child.key, str(type(child.values[0])),
+                               str(type(self.__plugin_config[child.key]))))
                 raise RuntimeError('Configuration key value error')
             # store the value in configuration
             self.__plugin_config[child.key] = child.values[0]
@@ -505,7 +525,7 @@ class VESPlugin(object):
         if vl.plugin not in self.__plugin_data_cache:
              self.__plugin_data_cache[vl.plugin] = {'vls': []}
         plugin_vl = self.__plugin_data_cache[vl.plugin]['vls']
-        for index in xrange(len(plugin_vl)):
+        for index in range(len(plugin_vl)):
             # record found, so just update time the values
             if (plugin_vl[index]['plugin_instance'] ==
                 vl.plugin_instance) and (plugin_vl[index]['type_instance'] ==
@@ -593,10 +613,8 @@ class VESPlugin(object):
         fault.last_epoch_micro_sec = fault.start_epoch_microsec
         # fill out fault header
         fault.event_severity = collectd_event_severity_map[n.severity]
-        fault.specific_problem = '{}{}'.format('{}-'.format(n.plugin_instance
-            if len(n.plugin_instance) else ''), n.type_instance)
-        fault.alarm_interface_a = '{}{}'.format(n.plugin, '-{}'.format(
-            n.plugin_instance if len(n.plugin_instance) else ''))
+        fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance)
+        fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance)
         fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)'
         fault.alarm_condition = n.message
         self.event_send(fault)
index 3cf2600..7ed6be9 100644 (file)
@@ -61,36 +61,42 @@ REST resources are of the form:
     {ServerRoot}/eventListener/v{apiVersion}/{topicName}`
     {ServerRoot}/eventListener/v{apiVersion}/eventBatch`
 
-
 **Domain** *"host"*
-* VES domain name. It can be IP address or hostname of VES collector
-(default: `127.0.0.1`)
+  VES domain name. It can be IP address or hostname of VES collector
+  (default: `127.0.0.1`)
 
 **Port** *port*
-* VES port (default: `30000`)
+  VES port (default: `30000`)
 
 **Path** *"path"*
-* Used as the "optionalRoutingPath" element in the REST path (default: `empty`)
+  Used as the "optionalRoutingPath" element in the REST path (default: `empty`)
 
 **Topic** *"path"*
-* Used as the "topicName" element in the REST  path (default: `empty`)
+  Used as the "topicName" element in the REST  path (default: `empty`)
 
 **UseHttps** *true|false*
-* Allow plugin to use HTTPS instead of HTTP (default: `false`)
+  Allow plugin to use HTTPS instead of HTTP (default: `false`)
 
 **Username** *"username"*
-* VES collector user name (default: `empty`)
+  VES collector user name (default: `empty`)
 
 **Password** *"passwd"*
-* VES collector password (default: `empty`)
+  VES collector password (default: `empty`)
 
 **FunctionalRole** *"role"*
-* Used as the 'functionalRole' field of 'commonEventHeader' event (default:
-`Collectd VES Agent`)
+  Used as the 'functionalRole' field of 'commonEventHeader' event (default:
+  `Collectd VES Agent`)
 
 **GuestRunning** *true|false*
-* This option is used if the collectd is running on a guest machine, e.g this
-option should be set to `true` in this case. Defaults to `false`.
+  This option is used if the collectd is running on a guest machine, e.g this
+  option should be set to `true` in this case. Defaults to `false`.
+
+**SendEventInterval** *interval*
+  This configuration option controls how often (sec) collectd data is sent to
+  Vendor Event Listener (default: `20`)
+
+**ApiVersion** *version*
+  Used as the "apiVersion" element in the REST path (default: `1`)
 
 Other collectd.conf configurations
 ----------------------------------
@@ -100,7 +106,6 @@ Please ensure that FQDNLookup is set to false
 
     FQDNLookup   false
 
-
 Please ensure that the virt plugin is enabled and configured as follows. This configuration
 is is required only on a host side ('GuestRunning' = false).
 
@@ -125,7 +130,11 @@ Please ensure that the cpu plugin is enabled and configured as follows
         ValuesPercentage true
     </Plugin>
 
+**Note**: The `ReportByCpu` option should be set to `true` (default) if VES pugin
+is running on guest machine ('GuestRunning' = true).
+
 Please ensure that the aggregation plugin is enabled and configured as follows
+(required if 'GuestRunning' = true)
 
 .. code:: bash
 
@@ -146,7 +155,9 @@ If plugin is running on a guest side, it is important to enable uuid plugin
 too. In this case the hostname in event message will be represented as UUID
 instead of system host name.
 
-LoadPlugin uuid
+.. code:: bash
+
+  LoadPlugin uuid
 
 If custom UUID needs to be provided, the following configuration is required in collectd.conf
 file: