Merge "VES userguide: Updated documentation."
authorMaryam Tahhan <maryam.tahhan@intel.com>
Fri, 16 Jun 2017 14:30:57 +0000 (14:30 +0000)
committerGerrit Code Review <gerrit@opnfv.org>
Fri, 16 Jun 2017 14:30:57 +0000 (14:30 +0000)
3rd_party/collectd-ves-plugin/ves_plugin/ves_plugin.py
3rd_party/ovs_pmd_stats/ovs_pmd_stats.py [new file with mode: 0755]
baro_tests/barometer.py [new file with mode: 0644]
ci/utility/collectd_build_rpm.sh

index 3f0a069..ee27bcd 100644 (file)
@@ -40,7 +40,7 @@ class Event(object):
 
     def __init__(self):
         """Construct the common header"""
-        self.version = 1.1
+        self.version = 2.0
         self.event_type = "Info" # use "Info" unless a notification is generated
         self.domain = ""
         self.event_id = ""
@@ -53,6 +53,10 @@ class Event(object):
         self.start_epoch_microsec = 0
         self.last_epoch_micro_sec = 0
         self.sequence = 0
+        self.event_name = ""
+        self.internal_header_fields = {}
+        self.nfc_naming_code = ""
+        self.nf_naming_code = ""
 
     def get_json(self):
         """Get the object of the datatype"""
@@ -63,13 +67,16 @@ class Event(object):
         obj['eventId'] = self.event_id
         obj['sourceId'] = self.source_id
         obj['sourceName'] = self.source_name
-        obj['functionalRole'] = self.functional_role
         obj['reportingEntityId'] = self.reporting_entity_id
         obj['reportingEntityName'] = self.reporting_entity_name
         obj['priority'] = self.priority
         obj['startEpochMicrosec'] = self.start_epoch_microsec
         obj['lastEpochMicrosec'] = self.last_epoch_micro_sec
         obj['sequence'] = self.sequence
+        obj['eventName'] = self.event_name
+        obj['internalHeaderFields'] = self.internal_header_fields
+        obj['nfcNamingCode'] = self.nfc_naming_code
+        obj['nfNamingCode'] = self.nf_naming_code
         return json.dumps({
             'event' : {
                 'commonEventHeader' : obj,
@@ -83,25 +90,315 @@ class Event(object):
     def get_obj():
         assert False, 'abstract method get_obj() is not implemented'
 
-class MeasurementGroup(object):
-    """MeasurementGroup datatype"""
+class Field(object):
+    """field datatype"""
+
+    def __init__(self, name, value):
+        self.name = name
+        self.value = value
+
+    def get_obj(self):
+        return {
+            'name' : self.name,
+            'value' : self.value
+        }
+
+class NamedArrayOfFields(object):
+    """namedArrayOfFields datatype"""
 
     def __init__(self, name):
         self.name = name
-        self.measurement = []
-        pass
+        self.array_of_fields = []
 
-    def add_measurement(self, name, value):
-        self.measurement.append({
-            'name' : name,
-            'value' : value
-        })
+    def add(self, field):
+        self.array_of_fields.append(field.get_obj())
 
     def get_obj(self):
         return {
             'name' : self.name,
-            'measurements' : self.measurement
+            'arrayOfFields' : self.array_of_fields
+        }
+
+class VESDataType(object):
+    """ Base VES datatype """
+
+    def set_optional(self, obj, key, val):
+        if val is not None:
+            obj[key] = val
+
+class DiskUsage(VESDataType):
+    """diskUsage datatype"""
+
+    def __init__(self, identifier):
+      self.disk_identifier = identifier
+      self.disk_io_time_avg = None
+      self.disk_io_time_last = None
+      self.disk_io_time_max = None
+      self.disk_io_time_min = None
+      self.disk_merged_read_avg = None
+      self.disk_merged_read_last = None
+      self.disk_merged_read_max = None
+      self.disk_merged_read_min = None
+      self.disk_merged_write_avg = None
+      self.disk_merged_write_last = None
+      self.disk_merged_write_max = None
+      self.disk_merged_write_min = None
+      self.disk_octets_read_avg = None
+      self.disk_octets_read_last = None
+      self.disk_octets_read_max = None
+      self.disk_octets_read_min = None
+      self.disk_octets_write_avg = None
+      self.disk_octets_write_last = None
+      self.disk_octets_write_max = None
+      self.disk_octets_write_min = None
+      self.disk_ops_read_avg = None
+      self.disk_ops_read_last = None
+      self.disk_ops_read_max = None
+      self.disk_ops_read_min = None
+      self.disk_ops_write_avg = None
+      self.disk_ops_write_last = None
+      self.disk_ops_write_max = None
+      self.disk_ops_write_min = None
+      self.disk_pending_operations_avg = None
+      self.disk_pending_operations_last = None
+      self.disk_pending_operations_max = None
+      self.disk_pending_operations_min = None
+      self.disk_time_read_avg = None
+      self.disk_time_read_last = None
+      self.disk_time_read_max = None
+      self.disk_time_read_min = None
+      self.disk_time_write_avg = None
+      self.disk_time_write_last = None
+      self.disk_time_write_max = None
+      self.disk_time_write_min = None
+
+    def get_obj(self):
+        obj = {
+            # required
+            'diskIdentifier' : self.disk_identifier
+        }
+        self.set_optional(obj, 'diskIoTimeAvg', self.disk_io_time_avg)
+        self.set_optional(obj, 'diskIoTimeLast', self.disk_io_time_last)
+        self.set_optional(obj, 'diskIoTimeMax', self.disk_io_time_max)
+        self.set_optional(obj, 'diskIoTimeMin', self.disk_io_time_min)
+        self.set_optional(obj, 'diskMergedReadAvg', self.disk_merged_read_avg)
+        self.set_optional(obj, 'diskMergedReadLast', self.disk_merged_read_last)
+        self.set_optional(obj, 'diskMergedReadMax', self.disk_merged_read_max)
+        self.set_optional(obj, 'diskMergedReadMin', self.disk_merged_read_min)
+        self.set_optional(obj, 'diskMergedWriteAvg', self.disk_merged_write_avg)
+        self.set_optional(obj, 'diskMergedWriteLast', self.disk_merged_write_last)
+        self.set_optional(obj, 'diskMergedWriteMax', self.disk_merged_write_max)
+        self.set_optional(obj, 'diskMergedWriteMin', self.disk_merged_write_min)
+        self.set_optional(obj, 'diskOctetsReadAvg', self.disk_octets_read_avg)
+        self.set_optional(obj, 'diskOctetsReadLast', self.disk_octets_read_last)
+        self.set_optional(obj, 'diskOctetsReadMax', self.disk_octets_read_max)
+        self.set_optional(obj, 'diskOctetsReadMin', self.disk_octets_read_min)
+        self.set_optional(obj, 'diskOctetsWriteAvg', self.disk_octets_write_avg)
+        self.set_optional(obj, 'diskOctetsWriteLast', self.disk_octets_write_last)
+        self.set_optional(obj, 'diskOctetsWriteMax', self.disk_octets_write_max)
+        self.set_optional(obj, 'diskOctetsWriteMin', self.disk_octets_write_min)
+        self.set_optional(obj, 'diskOpsReadAvg', self.disk_ops_read_avg)
+        self.set_optional(obj, 'diskOpsReadLast', self.disk_ops_read_last)
+        self.set_optional(obj, 'diskOpsReadMax', self.disk_ops_read_max)
+        self.set_optional(obj, 'diskOpsReadMin', self.disk_ops_read_min)
+        self.set_optional(obj, 'diskOpsWriteAvg', self.disk_ops_write_avg)
+        self.set_optional(obj, 'diskOpsWriteLast', self.disk_ops_write_last)
+        self.set_optional(obj, 'diskOpsWriteMax', self.disk_ops_write_max)
+        self.set_optional(obj, 'diskOpsWriteMin', self.disk_ops_write_min)
+        self.set_optional(obj, 'diskPendingOperationsAvg', self.disk_pending_operations_avg)
+        self.set_optional(obj, 'diskPendingOperationsLast', self.disk_pending_operations_last)
+        self.set_optional(obj, 'diskPendingOperationsMax', self.disk_pending_operations_max)
+        self.set_optional(obj, 'diskPendingOperationsMin', self.disk_pending_operations_min)
+        self.set_optional(obj, 'diskTimeReadAvg', self.disk_time_read_avg)
+        self.set_optional(obj, 'diskTimeReadLast', self.disk_time_read_last)
+        self.set_optional(obj, 'diskTimeReadMax', self.disk_time_read_max)
+        self.set_optional(obj, 'diskTimeReadMin', self.disk_time_read_min)
+        self.set_optional(obj, 'diskTimeWriteAvg', self.disk_time_write_avg)
+        self.set_optional(obj, 'diskTimeWriteLast', self.disk_time_write_last)
+        self.set_optional(obj, 'diskTimeWriteMax', self.disk_time_write_max)
+        self.set_optional(obj, 'diskTimeWriteMin', self.disk_time_write_min)
+        return obj
+
+class VNicPerformance(VESDataType):
+    """vNicPerformance datatype"""
+
+    def __init__(self, identifier):
+      self.received_broadcast_packets_accumulated = None
+      self.received_broadcast_packets_delta = None
+      self.received_discarded_packets_accumulated = None
+      self.received_discarded_packets_delta = None
+      self.received_error_packets_accumulated = None
+      self.received_error_packets_delta = None
+      self.received_multicast_packets_accumulated = None
+      self.received_multicast_packets_delta = None
+      self.received_octets_accumulated = None
+      self.received_octets_delta = None
+      self.received_total_packets_accumulated = None
+      self.received_total_packets_delta = None
+      self.received_unicast_packets_accumulated = None
+      self.received_unicast_packets_delta = None
+      self.transmitted_broadcast_packets_accumulated = None
+      self.transmitted_broadcast_packets_delta = None
+      self.transmitted_discarded_packets_accumulated = None
+      self.transmitted_discarded_packets_delta = None
+      self.transmitted_error_packets_accumulated = None
+      self.transmitted_error_packets_delta = None
+      self.transmitted_multicast_packets_accumulated = None
+      self.transmitted_multicast_packets_delta = None
+      self.transmitted_octets_accumulated = None
+      self.transmitted_octets_delta = None
+      self.transmitted_total_packets_accumulated = None
+      self.transmitted_total_packets_delta = None
+      self.transmitted_unicast_packets_accumulated = None
+      self.transmitted_unicast_packets_delta = None
+      self.values_are_suspect = 'true'
+      self.v_nic_identifier = identifier
+
+    def get_obj(self):
+        obj = {
+            # required
+            'valuesAreSuspect' : self.values_are_suspect,
+            'vNicIdentifier' : self.v_nic_identifier
+        }
+        # optional
+        self.set_optional(obj, 'receivedBroadcastPacketsAccumulated', self.received_broadcast_packets_accumulated)
+        self.set_optional(obj, 'receivedBroadcastPacketsDelta', self.received_broadcast_packets_delta)
+        self.set_optional(obj, 'receivedDiscardedPacketsAccumulated', self.received_discarded_packets_accumulated)
+        self.set_optional(obj, 'receivedDiscardedPacketsDelta', self.received_discarded_packets_delta)
+        self.set_optional(obj, 'receivedErrorPacketsAccumulated', self.received_error_packets_accumulated)
+        self.set_optional(obj, 'receivedErrorPacketsDelta', self.received_error_packets_delta)
+        self.set_optional(obj, 'receivedMulticastPacketsAccumulated', self.received_multicast_packets_accumulated)
+        self.set_optional(obj, 'receivedMulticastPacketsDelta', self.received_multicast_packets_delta)
+        self.set_optional(obj, 'receivedOctetsAccumulated', self.received_octets_accumulated)
+        self.set_optional(obj, 'receivedOctetsDelta', self.received_octets_delta)
+        self.set_optional(obj, 'receivedTotalPacketsAccumulated', self.received_total_packets_accumulated)
+        self.set_optional(obj, 'receivedTotalPacketsDelta', self.received_total_packets_delta)
+        self.set_optional(obj, 'receivedUnicastPacketsAccumulated', self.received_unicast_packets_accumulated)
+        self.set_optional(obj, 'receivedUnicastPacketsDelta', self.received_unicast_packets_delta)
+        self.set_optional(obj, 'transmittedBroadcastPacketsAccumulated', self.transmitted_broadcast_packets_accumulated)
+        self.set_optional(obj, 'transmittedBroadcastPacketsDelta', self.transmitted_broadcast_packets_delta)
+        self.set_optional(obj, 'transmittedDiscardedPacketsAccumulated', self.transmitted_discarded_packets_accumulated)
+        self.set_optional(obj, 'transmittedDiscardedPacketsDelta', self.transmitted_discarded_packets_delta)
+        self.set_optional(obj, 'transmittedErrorPacketsAccumulated', self.transmitted_error_packets_accumulated)
+        self.set_optional(obj, 'transmittedErrorPacketsDelta', self.transmitted_error_packets_delta)
+        self.set_optional(obj, 'transmittedMulticastPacketsAccumulated', self.transmitted_multicast_packets_accumulated)
+        self.set_optional(obj, 'transmittedMulticastPacketsDelta', self.transmitted_multicast_packets_delta)
+        self.set_optional(obj, 'transmittedOctetsAccumulated', self.transmitted_octets_accumulated)
+        self.set_optional(obj, 'transmittedOctetsDelta', self.transmitted_octets_delta)
+        self.set_optional(obj, 'transmittedTotalPacketsAccumulated', self.transmitted_total_packets_accumulated)
+        self.set_optional(obj, 'transmittedTotalPacketsDelta', self.transmitted_total_packets_delta)
+        self.set_optional(obj, 'transmittedUnicastPacketsAccumulated', self.transmitted_unicast_packets_accumulated)
+        self.set_optional(obj, 'transmittedUnicastPacketsDelta', self.transmitted_unicast_packets_delta)
+        return obj
+
+class CpuUsage(VESDataType):
+    """cpuUsage datatype"""
+
+    def __init__(self, identifier):
+        self.cpu_identifier = identifier
+        self.cpu_idle = None
+        self.cpu_usage_interrupt = None
+        self.cpu_usage_nice = None
+        self.cpu_usage_soft_irq = None
+        self.cpu_usage_steal = None
+        self.cpu_usage_system = None
+        self.cpu_usage_user = None
+        self.cpu_wait = None
+        self.percent_usage = 0
+
+    def get_obj(self):
+        obj = {
+            # required
+            'cpuIdentifier' : self.cpu_identifier,
+            'percentUsage' : self.percent_usage
+        }
+        # optional
+        self.set_optional(obj, 'cpuIdle', self.cpu_idle)
+        self.set_optional(obj, 'cpuUsageInterrupt', self.cpu_usage_interrupt)
+        self.set_optional(obj, 'cpuUsageNice', self.cpu_usage_nice)
+        self.set_optional(obj, 'cpuUsageSoftIrq', self.cpu_usage_soft_irq)
+        self.set_optional(obj, 'cpuUsageSteal', self.cpu_usage_steal)
+        self.set_optional(obj, 'cpuUsageSystem', self.cpu_usage_system)
+        self.set_optional(obj, 'cpuUsageUser', self.cpu_usage_user)
+        self.set_optional(obj, 'cpuWait', self.cpu_wait)
+        return obj
+
+class MemoryUsage(VESDataType):
+    """memoryUsage datatype"""
+
+    def __init__(self, identifier):
+        self.memory_buffered = None
+        self.memory_cached = None
+        self.memory_configured = None
+        self.memory_free = None
+        self.memory_slab_recl = None
+        self.memory_slab_unrecl = None
+        self.memory_used = None
+        self.vm_identifier = identifier
+
+    def __str__(self):
+        """ for debug purposes """
+        return 'vm_identifier : {vm_identifier}\nbuffered : {buffered}\n'\
+            'cached : {cached}\nconfigured : {configured}\nfree : {free}\n'\
+            'slab_recl : {slab_recl}\nslab_unrecl : {slab_unrecl}\n'\
+            'used : {used}\n'.format(buffered = self.memory_buffered,
+            cached = self.memory_cached, configured = self.memory_configured,
+            free = self.memory_free, slab_recl = self.memory_slab_recl,
+            slab_unrecl = self.memory_slab_unrecl, used = self.memory_used,
+            vm_identifier = self.vm_identifier)
+
+    def get_memory_free(self):
+        if self.memory_free is None:
+            # calculate the free memory
+            if None not in (self.memory_configured, self.memory_used):
+                return self.memory_configured - self.memory_used
+            else:
+                # required field, so return zero
+                return 0
+        else:
+            return self.memory_free
+
+    def get_memory_used(self):
+        if self.memory_used is None:
+            # calculate the memory used
+            if None not in (self.memory_configured, self.memory_free, self.memory_buffered,
+                self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
+                return self.memory_configured - (self.memory_free +
+                    self.memory_buffered + self.memory_cached +
+                    self.memory_slab_recl + self.memory_slab_unrecl)
+            else:
+                # required field, so return zero
+                return 0
+        else:
+            return self.memory_used
+
+    def get_memory_total(self):
+        if self.memory_configured is None:
+            # calculate the total memory
+            if None not in (self.memory_used, self.memory_free, self.memory_buffered,
+                self.memory_cached, self.memory_slab_recl, self.memory_slab_unrecl):
+                return (self.memory_used + self.memory_free +
+                    self.memory_buffered + self.memory_cached +
+                    self.memory_slab_recl + self.memory_slab_unrecl)
+            else:
+                return None
+        else:
+            return self.memory_configured
+
+    def get_obj(self):
+        obj = {
+            # required fields
+            'memoryFree' : self.get_memory_free(),
+            'memoryUsed' : self.get_memory_used(),
+            'vmIdentifier' : self.vm_identifier
         }
+        # optional fields
+        self.set_optional(obj, 'memoryBuffered', self.memory_buffered)
+        self.set_optional(obj, 'memoryCached', self.memory_cached)
+        self.set_optional(obj, 'memoryConfigured', self.memory_configured)
+        self.set_optional(obj, 'memorySlabRecl', self.memory_slab_recl)
+        self.set_optional(obj, 'memorySlabUnrecl', self.memory_slab_unrecl)
+        return obj
 
 class MeasurementsForVfScaling(Event):
     """MeasurementsForVfScaling datatype"""
@@ -111,79 +408,69 @@ class MeasurementsForVfScaling(Event):
         super(MeasurementsForVfScaling, self).__init__()
         # common attributes
         self.domain = "measurementsForVfScaling"
+        self.event_type = 'hostOS'
         self.event_id = event_id
         # measurement attributes
         self.additional_measurements = []
-        self.aggregate_cpu_usage = 0
         self.codec_usage_array = []
         self.concurrent_sessions = 0
         self.configured_entities = 0
         self.cpu_usage_array = []
-        self.errors = {
-            "receiveDiscards" : 0,
-            "receiveErrors" : 0,
-            "transmitDiscards" : 0,
-            "transmitErrors" : 0
-        }
         self.feature_usage_array = []
         self.filesystem_usage_array = []
         self.latency_distribution = []
         self.mean_request_latency = 0
-        self.measurement_fields_version = 1.1
         self.measurement_interval = 0
-        self.memory_configured = 0
-        self.memory_used = 0
         self.number_of_media_ports_in_use = 0
         self.request_rate = 0
         self.vnfc_scaling_metric = 0
-        self.v_nic_usage_array = []
-
-    def add_measurement_group(self, group):
-        self.additional_measurements.append(group.get_obj())
-
-    def add_cpu_usage(self, cpu_identifier, usage):
-        self.cpu_usage_array.append({
-            'cpuIdentifier' : cpu_identifier,
-            'percentUsage' : usage
-        })
-
-    def add_v_nic_usage(self, if_name, if_pkts, if_bytes):
-        self.v_nic_usage_array.append({
-            'broadcastPacketsIn' : 0.0,
-            'broadcastPacketsOut' : 0.0,
-            'multicastPacketsIn' : 0.0,
-            'multicastPacketsOut' : 0.0,
-            'unicastPacketsIn' : 0.0,
-            'unicastPacketsOut' : 0.0,
-            'vNicIdentifier' : if_name,
-            'packetsIn' : if_pkts[0],
-            'packetsOut' : if_pkts[1],
-            'bytesIn' : if_bytes[0],
-            'bytesOut' : if_bytes[1]
-        })
+        self.additional_fields = []
+        self.additional_objects = []
+        self.disk_usage_array = []
+        self.measurements_for_vf_scaling_version = 2.0
+        self.memory_usage_array = []
+        self.v_nic_performance_array = []
+
+    def add_additional_measurement(self, named_array):
+        self.additional_measurements.append(named_array.get_obj())
+
+    def add_additional_fields(self, field):
+        self.additional_fields.append(field.get_obj())
+
+    def add_memory_usage(self, mem_usage):
+        self.memory_usage_array.append(mem_usage.get_obj())
+
+    def add_cpu_usage(self, cpu_usage):
+        self.cpu_usage_array.append(cpu_usage.get_obj())
+
+    def add_v_nic_performance(self, nic_performance):
+        self.v_nic_performance_array.append(nic_performance.get_obj())
+
+    def add_disk_usage(self, disk_usage):
+        self.disk_usage_array.append(disk_usage.get_obj())
 
     def get_obj(self):
         """Get the object of the datatype"""
         obj = {}
         obj['additionalMeasurements'] = self.additional_measurements
-        obj['aggregateCpuUsage'] = self.aggregate_cpu_usage
         obj['codecUsageArray'] = self.codec_usage_array
         obj['concurrentSessions'] = self.concurrent_sessions
         obj['configuredEntities'] = self.configured_entities
         obj['cpuUsageArray'] = self.cpu_usage_array
-        obj['errors'] = self.errors
         obj['featureUsageArray'] = self.feature_usage_array
         obj['filesystemUsageArray'] = self.filesystem_usage_array
         obj['latencyDistribution'] = self.latency_distribution
         obj['meanRequestLatency'] = self.mean_request_latency
-        obj['measurementFieldsVersion'] = self.measurement_fields_version
         obj['measurementInterval'] = self.measurement_interval
-        obj['memoryConfigured'] = self.memory_configured
-        obj['memoryUsed'] = self.memory_used
         obj['numberOfMediaPortsInUse'] = self.number_of_media_ports_in_use
         obj['requestRate'] = self.request_rate
         obj['vnfcScalingMetric'] = self.vnfc_scaling_metric
-        obj['vNicUsageArray'] = self.v_nic_usage_array
+        obj['additionalFields'] = self.additional_fields
+        obj['additionalObjects'] = self.additional_objects
+        obj['diskUsageArray'] = self.disk_usage_array
+        obj['measurementsForVfScalingVersion'] = self.measurements_for_vf_scaling_version
+        obj['memoryUsageArray'] = self.memory_usage_array
+        obj['vNicPerformanceArray'] = self.v_nic_performance_array
         return obj
 
     def get_name(self):
@@ -209,6 +496,7 @@ class Fault(Event):
         self.vf_status = 'Active'
         self.alarm_interface_a = ''
         self.alarm_additional_information = []
+        self.event_category = ""
 
     def get_name(self):
         """Name of datatype"""
@@ -225,6 +513,7 @@ class Fault(Event):
         obj['vfStatus'] = self.vf_status
         obj['alarmInterfaceA'] = self.alarm_interface_a
         obj['alarmAdditionalInformation'] = self.alarm_additional_information
+        obj['eventCategory'] = self.event_category
         return obj
 
 class VESPlugin(object):
@@ -234,7 +523,6 @@ class VESPlugin(object):
         """Plugin initialization"""
         self.__plugin_data_cache = {
             'cpu' : {'interval' : 0.0, 'vls' : []},
-            'cpu-aggregation' : {'interval' : 0.0, 'vls' : []},
             'virt' : {'interval' : 0.0, 'vls' : []},
             'disk' : {'interval' : 0.0, 'vls' : []},
             'interface' : {'interval' : 0.0, 'vls' : []},
@@ -250,8 +538,7 @@ class VESPlugin(object):
             'UseHttps' : False,
             'SendEventInterval' : 20.0,
             'FunctionalRole' : 'Collectd VES Agent',
-            'GuestRunning' : False,
-            'ApiVersion' : 5.1              
+            'ApiVersion' : 5.1
         }
         self.__host_name = None
         self.__ves_timer = None
@@ -311,172 +598,176 @@ class VESPlugin(object):
         except:
             collectd.error('Vendor Event Listener unknown error')
 
-    def bytes_to_gb(self, bytes):
-        """Convert bytes to GB"""
-        return round((bytes / 1073741824.0), 3)
+    def bytes_to_kb(self, bytes):
+        """Convert bytes to kibibytes"""
+        return round((bytes / 1024.0), 3)
 
     def get_hostname(self):
         if len(self.__host_name):
             return self.__host_name
         return socket.gethostname()
 
+    def send_host_measurements(self):
+        # get list of all VMs
+        virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
+                                               mark_as_read=False)
+        vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
+        for vm_name in vm_names:
+            # make sure that 'virt' plugin cache is up-to-date
+            vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
+                                             mark_as_read=False)
+            us_up_to_date = True
+            for vm_value in vm_values:
+                if vm_value['updated'] == False:
+                    us_up_to_date = False
+                    break
+            if not us_up_to_date:
+                    # one of the cache value is not up-to-date, break
+                    collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name))
+                    continue
+            # values are up-to-date, create an event message
+            measurement = MeasurementsForVfScaling(self.get_event_id())
+            measurement.functional_role = self.__plugin_config['FunctionalRole']
+            # fill out reporting_entity
+            measurement.reporting_entity_id = self.get_hostname()
+            measurement.reporting_entity_name = measurement.reporting_entity_id
+            # set source as a host value
+            measurement.source_id = vm_name
+            measurement.source_name = measurement.source_id
+            # fill out EpochMicrosec (convert to us)
+            measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
+            # plugin interval
+            measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
+            # memoryUsage
+            mem_usage = MemoryUsage(vm_name)
+            memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                type_name='memory', type_instance='total')
+            memory_unused = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                              type_name='memory', type_instance='unused')
+            memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                              type_name='memory', type_instance='rss')
+            if len(memory_total) > 0:
+                mem_usage.memory_configured = self.bytes_to_kb(memory_total[0]['values'][0])
+            if len(memory_unused) > 0:
+                mem_usage.memory_free = self.bytes_to_kb(memory_unused[0]['values'][0])
+            elif len(memory_rss) > 0:
+                mem_usage.memory_free = self.bytes_to_kb(memory_rss[0]['values'][0])
+            # since, "used" metric is not provided by virt plugn, set the rest of the memory stats
+            # to zero to calculate used based on provided stats only
+            mem_usage.memory_buffered = mem_usage.memory_cached = mem_usage.memory_slab_recl = \
+            mem_usage.memory_slab_unrecl = 0
+            measurement.add_memory_usage(mem_usage)
+            # cpuUsage
+            virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
+                                              plugin_name='virt', type_name='virt_vcpu')
+            for virt_vcpu in virt_vcpus:
+                cpu_usage = CpuUsage(virt_vcpu['type_instance'])
+                cpu_usage.percent_usage = self.cpu_ns_to_percentage(virt_vcpu)
+                measurement.add_cpu_usage(cpu_usage)
+            # vNicPerformance
+            if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                              type_name='if_packets', mark_as_read=False)
+            if_names = [x['type_instance'] for x in if_packets]
+            for if_name in if_names:
+                if_packets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                  type_name='if_packets', type_instance=if_name)
+                if_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                  type_name='if_octets', type_instance=if_name)
+                if_errors = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                  type_name='if_errors', type_instance=if_name)
+                if_dropped = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                  type_name='if_dropped', type_instance=if_name)
+                v_nic_performance = VNicPerformance(if_name)
+                v_nic_performance.received_total_packets_accumulated = if_packets[0]['values'][0]
+                v_nic_performance.transmitted_total_packets_accumulated = if_packets[0]['values'][1]
+                v_nic_performance.received_octets_accumulated = if_octets[0]['values'][0]
+                v_nic_performance.transmitted_octets_accumulated = if_octets[0]['values'][1]
+                v_nic_performance.received_error_packets_accumulated = if_errors[0]['values'][0]
+                v_nic_performance.transmitted_error_packets_accumulated = if_errors[0]['values'][1]
+                v_nic_performance.received_discarded_packets_accumulated = if_dropped[0]['values'][0]
+                v_nic_performance.transmitted_discarded_packets_accumulated = if_dropped[0]['values'][1]
+                measurement.add_v_nic_performance(v_nic_performance)
+            # diskUsage
+            disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                               type_name='disk_octets', mark_as_read=False)
+            disk_names = [x['type_instance'] for x in disk_octets]
+            for disk_name in disk_names:
+                disk_octets = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                   type_name='disk_octets', type_instance=disk_name)
+                disk_ops = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                                   type_name='disk_ops', type_instance=disk_name)
+                disk_usage = DiskUsage(disk_name)
+                disk_usage.disk_octets_read_last = disk_octets[0]['values'][0]
+                disk_usage.disk_octets_write_last = disk_octets[0]['values'][1]
+                disk_usage.disk_ops_read_last = disk_ops[0]['values'][0]
+                disk_usage.disk_ops_write_last = disk_ops[0]['values'][1]
+                measurement.add_disk_usage(disk_usage)
+            # add additional measurements (perf)
+            perf_values = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
+                                               type_name='perf')
+            named_array = NamedArrayOfFields('perf')
+            for perf in perf_values:
+                named_array.add(Field(perf['type_instance'], str(perf['values'][0])))
+            measurement.add_additional_measurement(named_array)
+            # add host values as additional measurements
+            self.set_additional_fields(measurement, exclude_plugins=['virt'])
+            # send event to the VES
+            self.event_send(measurement)
+        if len(vm_names) > 0:
+          # mark the additional measurements metrics as read
+          self.mark_cache_values_as_read(exclude_plugins=['virt'])
+
     def event_timer(self):
         """Event timer thread"""
         self.lock()
         try:
-            if (self.__plugin_config['GuestRunning']):
-                # if we running on a guest only, send 'additionalMeasurements' only
-                measurement = MeasurementsForVfScaling(self.get_event_id())
-                measurement.functional_role = self.__plugin_config['FunctionalRole']
-                # add host/guest values as additional measurements
-                self.fill_additional_measurements(measurement, exclude_plugins=[
-                    'cpu', 'cpu-aggregation', 'memory', 'disk', 'interface', 'virt'])
-                # fill out reporting & source entities
-                reporting_entity = self.get_hostname()
-                measurement.reporting_entity_id = reporting_entity
-                measurement.reporting_entity_name = reporting_entity
-                measurement.source_id = reporting_entity
-                measurement.source_name = measurement.source_id
-                measurement.start_epoch_microsec = (time.time() * 1000000)
-                measurement.measurement_interval = self.__plugin_config['SendEventInterval']
-                # total CPU
-                total_cpu_system = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='system')
-                total_cpu_user = self.cache_get_value(plugin_name='cpu-aggregation', type_instance='user')
-                measurement.aggregate_cpu_usage = round(total_cpu_system[0]['values'][0] +
-                                                    total_cpu_user[0]['values'][0], 2)
-                # CPU per each instance
-                cpux_system = self.cache_get_value(plugin_name='cpu', type_instance='system',
-                                                  mark_as_read = False)
-                for cpu_inst in [x['plugin_instance'] for x in cpux_system]:
-                    cpu_system = self.cache_get_value(plugin_name='cpu',
-                                                      plugin_instance=cpu_inst, type_instance='system')
-                    cpu_user = self.cache_get_value(plugin_name='cpu',
-                                                      plugin_instance=cpu_inst, type_instance='user')
-                    cpu_usage = round(cpu_system[0]['values'][0] + cpu_user[0]['values'][0], 2)
-                    measurement.add_cpu_usage(cpu_inst, cpu_usage)
-                # fill memory used
-                memory_used = self.cache_get_value(plugin_name='memory', type_name='memory', type_instance='used')
-                if len(memory_used) > 0:
-                    measurement.memory_used = self.bytes_to_gb(memory_used[0]['values'][0])
-                # if_packets
-                ifinfo = {}
-                if_stats = self.cache_get_value(plugin_name='interface', type_name='if_packets')
-                if len(if_stats) > 0:
-                    for if_stat in if_stats:
-                        ifinfo[if_stat['plugin_instance']] = {
-                            'pkts' : (if_stat['values'][0], if_stat['values'][1])
-                        }
-                # go through all interfaces and get if_octets
-                for if_name in ifinfo.keys():
-                    if_stats = self.cache_get_value(plugin_instance=if_name, plugin_name='interface',
-                                                    type_name='if_octets')
-                    if len(if_stats) > 0:
-                        ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
-                # fill vNicUsageArray filed in the event
-                for if_name in ifinfo.keys():
-                    measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
-                # send event to the VES
-                self.event_send(measurement)
-                return
-            # get list of all VMs
-            virt_vcpu_total = self.cache_get_value(plugin_name='virt', type_name='virt_cpu_total',
-                                                   mark_as_read=False)
-            vm_names = [x['plugin_instance'] for x in virt_vcpu_total]
-            for vm_name in vm_names:
-                # make sure that 'virt' plugin cache is up-to-date
-                vm_values = self.cache_get_value(plugin_name='virt', plugin_instance=vm_name,
-                                                 mark_as_read=False)
-                us_up_to_date = True
-                for vm_value in vm_values:
-                    if vm_value['updated'] == False:
-                        us_up_to_date = False
-                        break
-                if not us_up_to_date:
-                        # one of the cache value is not up-to-date, break
-                        collectd.warning("virt collectD cache values are not up-to-date for {}".format(vm_name))
-                        continue
-                # if values are up-to-date, create an event message
-                measurement = MeasurementsForVfScaling(self.get_event_id())
-                measurement.functional_role = self.__plugin_config['FunctionalRole']
-                # fill out reporting_entity
-                reporting_entity = '{}-{}-{}'.format(self.get_hostname(), 'virt', vm_name)
-                measurement.reporting_entity_id = reporting_entity
-                measurement.reporting_entity_name = reporting_entity
-                # virt_vcpu
-                virt_vcpus = self.cache_get_value(plugin_instance=vm_name,
-                                                  plugin_name='virt', type_name='virt_vcpu')
-                if len(virt_vcpus) > 0:
-                    for virt_vcpu in virt_vcpus:
-                        cpu_usage = self.cpu_ns_to_percentage(virt_vcpu)
-                        measurement.add_cpu_usage(virt_vcpu['type_instance'], cpu_usage)
-                # virt_cpu_total
-                virt_vcpu_total = self.cache_get_value(plugin_instance=vm_name,
-                                                       plugin_name='virt', type_name='virt_cpu_total')
-                if len(virt_vcpu_total) > 0:
-                    average_cpu_usage = self.cpu_ns_to_percentage(virt_vcpu_total[0]) / len(virt_vcpus)
-                    measurement.aggregate_cpu_usage = average_cpu_usage
-                    # set source as a host for virt_vcpu_total value
-                    measurement.source_id = virt_vcpu_total[0]['host']
-                    measurement.source_name = measurement.source_id
-                    # fill out EpochMicrosec (convert to us)
-                    measurement.start_epoch_microsec = (virt_vcpu_total[0]['time'] * 1000000)
-                # plugin interval
-                measurement.measurement_interval = self.__plugin_data_cache['virt']['interval']
-                # memory-total
-                memory_total = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                    type_name='memory', type_instance='total')
-                if len(memory_total) > 0:
-                    measurement.memory_configured = self.bytes_to_gb(memory_total[0]['values'][0])
-                # memory-rss
-                memory_rss = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                  type_name='memory', type_instance='rss')
-                if len(memory_rss) > 0:
-                    measurement.memory_used = self.bytes_to_gb(memory_rss[0]['values'][0])
-                # if_packets
-                ifinfo = {}
-                if_stats = self.cache_get_value(plugin_instance=vm_name,
-                                                plugin_name='virt', type_name='if_packets')
-                if len(if_stats) > 0:
-                    for if_stat in if_stats:
-                        ifinfo[if_stat['type_instance']] = {
-                            'pkts' : (if_stat['values'][0], if_stat['values'][1])
-                        }
-                # go through all interfaces and get if_octets
-                for if_name in ifinfo.keys():
-                    if_stats = self.cache_get_value(plugin_instance=vm_name, plugin_name='virt',
-                                                    type_name='if_octets', type_instance=if_name)
-                    if len(if_stats) > 0:
-                        ifinfo[if_name]['bytes'] = (if_stats[0]['values'][0], if_stats[0]['values'][1])
-                # fill vNicUsageArray filed in the event
-                for if_name in ifinfo.keys():
-                    measurement.add_v_nic_usage(if_name, ifinfo[if_name]['pkts'], ifinfo[if_name]['bytes'])
-                # add host/guest values as additional measurements
-                self.fill_additional_measurements(measurement, ['virt'])
-                # send event to the VES
-                self.event_send(measurement)
+            self.send_host_measurements()
         finally:
             self.unlock()
 
-    def fill_additional_measurements(self, measurement, exclude_plugins=None):
-        """Fill out addition measurement filed with host/guets values"""
+    def mark_cache_values_as_read(self, exclude_plugins=None):
+        """mark the cache values as read"""
+        for plugin_name in self.__plugin_data_cache.keys():
+            if (exclude_plugins != None and plugin_name in exclude_plugins):
+                # skip excluded plugins
+                continue;
+            for val in self.__plugin_data_cache[plugin_name]['vls']:
+                val['updated'] = False
+
+    def set_additional_measurements(self, measurement, exclude_plugins=None):
+        """Set addition measurement filed with host/guets values"""
         # add host/guest values as additional measurements
         for plugin_name in self.__plugin_data_cache.keys():
             if (exclude_plugins != None and plugin_name in exclude_plugins):
-                # skip host-only values
+                # skip excluded plugins
                 continue;
             for val in self.__plugin_data_cache[plugin_name]['vls']:
                 if val['updated']:
-                    mgroup_name = '{}{}{}'.format(plugin_name,
-                        '-{}'.format(val['plugin_instance']) if len(val['plugin_instance']) else '',
-                        '-{}'.format(val['type_instance']) if len(val['type_instance']) else '')
-                    mgroup = MeasurementGroup(mgroup_name)
+                    array_name = self.make_dash_string(plugin_name, val['plugin_instance'],
+                                                       val['type_instance'])
+                    named_array = NamedArrayOfFields(array_name)
                     ds = collectd.get_dataset(val['type'])
                     for index in range(len(ds)):
                         mname = '{}-{}'.format(val['type'], ds[index][0])
-                        mgroup.add_measurement(mname, str(val['values'][index]))
-                    measurement.add_measurement_group(mgroup);
+                        named_array.add(Field(mname, str(val['values'][index])))
+                    measurement.add_additional_measurement(named_array);
                     val['updated'] = False
 
+    def set_additional_fields(self, measurement, exclude_plugins=None):
+        # set host values as additional fields
+        for plugin_name in self.__plugin_data_cache.keys():
+            if (exclude_plugins != None and plugin_name in exclude_plugins):
+                # skip excluded plugins
+                continue;
+            for val in self.__plugin_data_cache[plugin_name]['vls']:
+                if val['updated']:
+                    name_prefix = self.make_dash_string(plugin_name, val['plugin_instance'],
+                                                        val['type_instance'])
+                    ds = collectd.get_dataset(val['type'])
+                    for index in range(len(ds)):
+                        field_name = self.make_dash_string(name_prefix, val['type'], ds[index][0])
+                        measurement.add_additional_fields(Field(field_name, str(val['values'][index])))
+
     def cpu_ns_to_percentage(self, vl):
         """Convert CPU usage ns to CPU %"""
         total = vl['values'][0]
@@ -607,15 +898,21 @@ class VESPlugin(object):
         fault.functional_role = self.__plugin_config['FunctionalRole']
         fault.reporting_entity_id = self.get_hostname()
         fault.reporting_entity_name = self.get_hostname()
-        fault.source_id = self.get_hostname()
-        fault.source_name = self.get_hostname()
+        if n.plugin == 'virt':
+            # if the notification is generated by virt plugin,
+            # use the plugin_instance (e.g. VM name) as a source.
+            fault.source_id = str(n.plugin_instance)
+            fault.source_name = fault.source_id
+        else:
+            fault.source_id = self.get_hostname()
+            fault.source_name = self.get_hostname()
         fault.start_epoch_microsec = (n.time * 1000000)
         fault.last_epoch_micro_sec = fault.start_epoch_microsec
         # fill out fault header
         fault.event_severity = collectd_event_severity_map[n.severity]
         fault.specific_problem = self.make_dash_string(n.plugin_instance, n.type_instance)
         fault.alarm_interface_a = self.make_dash_string(n.plugin, n.plugin_instance)
-        fault.event_source_type = 'virtualMachine(8)' if self.__plugin_config['GuestRunning'] else 'host(3)'
+        fault.event_source_type = 'host(3)'
         fault.alarm_condition = n.message
         self.event_send(fault)
 
diff --git a/3rd_party/ovs_pmd_stats/ovs_pmd_stats.py b/3rd_party/ovs_pmd_stats/ovs_pmd_stats.py
new file mode 100755 (executable)
index 0000000..fc6045b
--- /dev/null
@@ -0,0 +1,109 @@
+#!/usr/bin/env python
+#
+# Copyright(c) 2017 Intel Corporation. All rights reserved.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Authors:
+#   Roman Korynkevych <romanx.korynkevych@intel.com>
+
+import socket
+import argparse
+import json
+import logging
+
+HOSTNAME = socket.gethostname()
+PROG_NAME = 'ovs_pmd_stats'
+TYPE = 'counter'
+
+MAIN_THREAD = 'main thread'
+PMD_THREAD = 'pmd thread'
+
+REQUEST_MESSAGE = '{"id":0,"method":"dpif-netdev/pmd-stats-show","params":[]}'
+RESPONSE_MESSAGE_TIMEOUT = 1.0
+
+# Setup arguments
+parser = argparse.ArgumentParser(prog=PROG_NAME)
+parser.add_argument('--socket-pid-file', required=True, help='ovs-vswitchd.pid file location')
+args = parser.parse_args()
+
+try:
+    fp = open(args.socket_pid_file, 'r')
+    pid = fp.readline()
+    fp.close()
+except IOError as e:
+    logging.error('I/O error({}): {}'.format(e.errno, e.strerror))
+    raise SystemExit()
+except:
+    logging.error('Unexpected error:', sys.exc_info()[0])
+    raise SystemExit()
+
+server_address = args.socket_pid_file.replace('.pid', '.{}.ctl'.format(pid.strip()))
+
+# open unix socket to ovs-vswitch
+sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
+try:
+    sock.connect(server_address)
+except socket.error as msg:
+    logging.error('Socket address: {} Error: {}'.format(server_address, msg))
+    raise SystemExit()
+
+# set timeout
+sock.settimeout(RESPONSE_MESSAGE_TIMEOUT)
+
+# send request
+sock.sendall(REQUEST_MESSAGE)
+
+# listen for respnse message
+rdata = ''
+while True:
+    try:
+        rdata += sock.recv(4096)
+
+        if rdata.count('{') == rdata.count('}'):
+          break
+    except socket.timeout:
+        logging.error('Response message has not been received in {} sec.'.format(RESPONSE_MESSAGE_TIMEOUT))
+        raise SystemExit()
+    except socket.error as e:
+        logging.error('Error received while reading: {}'.format(e.strerror))
+        raise SystemExit()
+
+# parse the message
+try:
+    s = json.loads(rdata, strict=False)
+except ValueError as e:
+    logging.error('Failed to parse JSON response: {}'.format(e.strerror))
+    raise SystemExit()
+
+# check for key string presence in the string
+if 'result' not in s or 'id' not in s or 'error' not in s:
+    logging.error("One of the keys: ['id'], ['result'], ['error'] is missed in the response")
+    logging.error('Msg: {}'.format(s))
+    raise SystemExit()
+
+array = s['result'].replace('\t', '').splitlines()
+
+# submit metrics in collectd format
+plugin_instance = ''
+for el in array:
+    if MAIN_THREAD in el or PMD_THREAD in el:
+        plugin_instance = el[:-1].replace(' ', '_')
+    else:
+        type_instance = el.split(':')[0].replace(' ', "_")
+        value = el.split(':')[1].split(' ')[0]
+        print('PUTVAL %s/%s-%s/%s-%s N:%s' % (HOSTNAME, PROG_NAME, plugin_instance, TYPE, type_instance, value))
+
+# close socket
+sock.close()
+
diff --git a/baro_tests/barometer.py b/baro_tests/barometer.py
new file mode 100644 (file)
index 0000000..e210f33
--- /dev/null
@@ -0,0 +1,24 @@
+#!/usr/bin/env python
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+
+import logging
+
+from baro_tests import collectd
+
+import functest.core.feature as base
+
+
+class BarometerCollectd(base.Feature):
+    '''
+    Class for executing barometercollectd testcase.
+    '''
+
+    __logger = logging.getLogger(__name__)
+
+    def execute(self):
+        return collectd.main(self.__logger)
index 2190d07..086aa97 100755 (executable)
@@ -1,5 +1,5 @@
 #!/bin/bash
-# Copyright 2017 Intel Corporation
+# Copyright 2017 Intel Corporation and OPNFV
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ source $DIR/package-list.sh
 
 VERSION="VERSION_NOT_SET"
 
+rm -rf $RPM_WORKDIR
 cd $COLLECTD_DIR
 VERSION=$( $COLLECTD_DIR/version-gen.sh | sed "s/\W$//g" )
 $COLLECTD_DIR/build.sh
@@ -36,5 +37,4 @@ sed   --regexp-extended \
        --expression="s/without_intel_rdt:[0-9]/without_intel_rdt:1/g" \
        $COLLECTD_DIR/contrib/redhat/collectd.spec
 
-rpmbuild --define "_topdir $RPM_WORKDIR" -bb $COLLECTD_DIR/contrib/redhat/collectd.spec
-gsutil -m cp -r $RPM_WORKDIR/RPMS/* gs://artifacts.opnfv.org/barometer/rpms
+rpmbuild --define "_topdir $RPM_WORKDIR" -bb $COLLECTD_DIR/contrib/redhat/collectd.spec
\ No newline at end of file