Merge "Bugfix: ha test case criteria pass when sla not pass"
[yardstick.git] / yardstick / network_services / nfvi / resource.py
index ce09b65..055fdba 100644 (file)
@@ -27,7 +27,7 @@ from oslo_config import cfg
 
 from yardstick import ssh
 from yardstick.network_services.nfvi.collectd import AmqpConsumer
-from yardstick.network_services.utils import provision_tool
+from yardstick.network_services.utils import get_nsb_option
 
 LOG = logging.getLogger(__name__)
 
@@ -35,7 +35,7 @@ CONF = cfg.CONF
 ZMQ_OVS_PORT = 5567
 ZMQ_POLLING_TIME = 12000
 LIST_PLUGINS_ENABLED = ["amqp", "cpu", "cpufreq", "intel_rdt", "memory",
-                        "hugepages", "dpdkstat", "virt", "ovs_stats"]
+                        "hugepages", "dpdkstat", "virt", "ovs_stats", "intel_pmu"]
 
 
 class ResourceProfile(object):
@@ -73,18 +73,18 @@ class ResourceProfile(object):
 
     @classmethod
     def parse_simple_resource(cls, key, value):
-        return {'/'.join(key): value.split(":")[1]}
+        reskey = "/".join(rkey for rkey in key if "nsb_stats" not in rkey)
+        return {reskey: value.split(":")[1]}
 
     @classmethod
-    def get_cpu_data(cls, key_split, value):
+    def get_cpu_data(cls, res_key0, res_key1, value):
         """ Get cpu topology of the host """
         pattern = r"-(\d+)"
-        if "cpufreq" in key_split[0]:
-            metric = key_split[0]
-            source = key_split[1]
+
+        if 'cpufreq' in res_key0:
+            metric, source = res_key0, res_key1
         else:
-            metric = key_split[1]
-            source = key_split[0]
+            metric, source = res_key1, res_key0
 
         match = re.search(pattern, source, re.MULTILINE)
         if not match:
@@ -109,6 +109,10 @@ class ResourceProfile(object):
     def parse_ovs_stats(cls, key, value):
         return cls.parse_simple_resource(key, value)
 
+    @classmethod
+    def parse_intel_pmu_stats(cls, key, value):
+        return {''.join(key): value.split(":")[1]}
+
     def parse_collectd_result(self, metrics, core_list):
         """ convert collectd data into json"""
         result = {
@@ -118,6 +122,7 @@ class ResourceProfile(object):
             "dpdkstat": {},
             "virt": {},
             "ovs_stats": {},
+            "intel_pmu": {},
         }
         testcase = ""
 
@@ -128,7 +133,8 @@ class ResourceProfile(object):
             res_key1 = next(res_key_iter)
 
             if "cpu" in res_key0 or "intel_rdt" in res_key0:
-                cpu_key, name, metric, testcase = self.get_cpu_data(key_split, value)
+                cpu_key, name, metric, testcase = \
+                    self.get_cpu_data(res_key0, res_key1, value)
                 if cpu_key in core_list:
                     result["cpu"].setdefault(cpu_key, {}).update({name: metric})
 
@@ -136,16 +142,19 @@ class ResourceProfile(object):
                 result["memory"].update({res_key1: value.split(":")[0]})
 
             elif "hugepages" in res_key0:
-                result["hugepages"].update(self.parse_hugepages(key, value))
+                result["hugepages"].update(self.parse_hugepages(key_split, value))
 
             elif "dpdkstat" in res_key0:
-                result["dpdkstat"].update(self.parse_dpdkstat(key, value))
+                result["dpdkstat"].update(self.parse_dpdkstat(key_split, value))
 
             elif "virt" in res_key1:
-                result["virt"].update(self.parse_virt(key, value))
+                result["virt"].update(self.parse_virt(key_split, value))
 
             elif "ovs_stats" in res_key0:
-                result["ovs_stats"].update(self.parse_ovs_stats(key, value))
+                result["ovs_stats"].update(self.parse_ovs_stats(key_split, value))
+
+            elif "intel_pmu-all" in res_key0:
+                result["intel_pmu"].update(self.parse_intel_pmu_stats(res_key1, value))
 
         result["timestamp"] = testcase
 
@@ -153,13 +162,16 @@ class ResourceProfile(object):
 
     def amqp_process_for_nfvi_kpi(self):
         """ amqp collect and return nfvi kpis """
-        if self.amqp_client is None:
+        if self.amqp_client is None and self.enable:
             self.amqp_client = \
                 multiprocessing.Process(target=self.run_collectd_amqp)
             self.amqp_client.start()
 
     def amqp_collect_nfvi_kpi(self):
         """ amqp collect and return nfvi kpis """
+        if not self.enable:
+            return {}
+
         metric = {}
         while not self._queue.empty():
             metric.update(self._queue.get())
@@ -188,16 +200,25 @@ class ResourceProfile(object):
             "loadplugin": loadplugin,
             "dpdk_interface": interfaces,
         }
-
         self._provide_config_file(bin_path, 'collectd.conf', kwargs)
 
     def _start_collectd(self, connection, bin_path):
         LOG.debug("Starting collectd to collect NFVi stats")
-        # temp disable
-        return
         connection.execute('sudo pkill -9 collectd')
-        collectd = os.path.join(bin_path, "collectd.sh")
-        provision_tool(connection, collectd)
+        bin_path = get_nsb_option("bin_path")
+        collectd_path = os.path.join(bin_path, "collectd", "collectd")
+        exit_status = connection.execute("which %s > /dev/null 2>&1" % collectd_path)[0]
+        if exit_status != 0:
+            LOG.warning("%s is not present disabling", collectd_path)
+            # disable auto-provisioning because it requires Internet access
+            # collectd_installer = os.path.join(bin_path, "collectd.sh")
+            # provision_tool(connection, collectd)
+            # http_proxy = os.environ.get('http_proxy', '')
+            # https_proxy = os.environ.get('https_proxy', '')
+            # connection.execute("sudo %s '%s' '%s'" % (
+            #     collectd_installer, http_proxy, https_proxy))
+            return
+        LOG.debug("Starting collectd to collect NFVi stats")
         self._prepare_collectd_conf(bin_path)
 
         # Reset amqp queue
@@ -209,15 +230,14 @@ class ResourceProfile(object):
         connection.execute("sudo rabbitmqctl start_app")
         connection.execute("sudo service rabbitmq-server restart")
 
-        # Run collectd
+        LOG.debug("Creating amdin user for rabbitmq in order to collect data from collectd")
+        connection.execute("sudo rabbitmqctl delete_user guest")
+        connection.execute("sudo rabbitmqctl add_user admin admin")
+        connection.execute("sudo rabbitmqctl authenticate_user admin admin")
+        connection.execute("sudo rabbitmqctl set_permissions -p / admin \".*\" \".*\" \".*\"")
 
-        http_proxy = os.environ.get('http_proxy', '')
-        https_proxy = os.environ.get('https_proxy', '')
-        connection.execute("sudo %s '%s' '%s'" %
-                           (collectd, http_proxy, https_proxy))
         LOG.debug("Start collectd service.....")
-        connection.execute(
-            "sudo %s" % os.path.join(bin_path, "collectd", "collectd"))
+        connection.execute("sudo %s" % collectd_path)
         LOG.debug("Done")
 
     def initiate_systemagent(self, bin_path):