Merge "Added NSB descriptors for vCMTS testcase"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / prox_helpers.py
index 6d28f47..3507315 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2017 Intel Corporation
+# Copyright (c) 2018-2019 Intel Corporation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -21,6 +21,7 @@ import re
 import select
 import socket
 import time
+
 from collections import OrderedDict, namedtuple
 from contextlib import contextmanager
 from itertools import repeat, chain
@@ -325,7 +326,28 @@ class ProxSocketHelper(object):
 
         return ret_str, False
 
-    def get_data(self, pkt_dump_only=False, timeout=0.01):
+    def get_string(self, pkt_dump_only=False, timeout=0.01):
+
+        def is_ready_string():
+            # recv() is blocking, so avoid calling it when no data is waiting.
+            ready = select.select([self._sock], [], [], timeout)
+            return bool(ready[0])
+
+        status = False
+        ret_str = ""
+        while status is False:
+            for status in iter(is_ready_string, False):
+                decoded_data = self._sock.recv(256).decode('utf-8')
+                ret_str, done = self._parse_socket_data(decoded_data,
+                                                        pkt_dump_only)
+                if (done):
+                    status = True
+                    break
+
+        LOG.debug("Received data from socket: [%s]", ret_str)
+        return status, ret_str
+
+    def get_data(self, pkt_dump_only=False, timeout=10.0):
         """ read data from the socket """
 
         # This method behaves slightly differently depending on whether it is
@@ -394,7 +416,6 @@ class ProxSocketHelper(object):
         """ stop all cores on the remote instance """
         LOG.debug("Stop all")
         self.put_command("stop all\n")
-        time.sleep(3)
 
     def stop(self, cores, task=''):
         """ stop specific cores on the remote instance """
@@ -406,7 +427,6 @@ class ProxSocketHelper(object):
 
         LOG.debug("Stopping cores %s", tmpcores)
         self.put_command("stop {} {}\n".format(join_non_strings(',', tmpcores), task))
-        time.sleep(3)
 
     def start_all(self):
         """ start all cores on the remote instance """
@@ -423,13 +443,11 @@ class ProxSocketHelper(object):
 
         LOG.debug("Starting cores %s", tmpcores)
         self.put_command("start {}\n".format(join_non_strings(',', tmpcores)))
-        time.sleep(3)
 
     def reset_stats(self):
         """ reset the statistics on the remote instance """
         LOG.debug("Reset stats")
         self.put_command("reset stats\n")
-        time.sleep(1)
 
     def _run_template_over_cores(self, template, cores, *args):
         for core in cores:
@@ -440,7 +458,6 @@ class ProxSocketHelper(object):
         LOG.debug("Set packet size for core(s) %s to %d", cores, pkt_size)
         pkt_size -= 4
         self._run_template_over_cores("pkt_size {} 0 {}\n", cores, pkt_size)
-        time.sleep(1)
 
     def set_value(self, cores, offset, value, length):
         """ set value on the remote instance """
@@ -544,50 +561,173 @@ class ProxSocketHelper(object):
             tsc = int(ret[3])
         return rx, tx, drop, tsc
 
-    def multi_port_stats(self, ports):
-        """get counter values from all ports port"""
+    def irq_core_stats(self, cores_tasks):
+        """ get IRQ stats per core"""
+
+        stat = {}
+        core = 0
+        task = 0
+        for core, task in cores_tasks:
+            self.put_command("stats task.core({}).task({}).max_irq,task.core({}).task({}).irq(0),"
+                             "task.core({}).task({}).irq(1),task.core({}).task({}).irq(2),"
+                             "task.core({}).task({}).irq(3),task.core({}).task({}).irq(4),"
+                             "task.core({}).task({}).irq(5),task.core({}).task({}).irq(6),"
+                             "task.core({}).task({}).irq(7),task.core({}).task({}).irq(8),"
+                             "task.core({}).task({}).irq(9),task.core({}).task({}).irq(10),"
+                             "task.core({}).task({}).irq(11),task.core({}).task({}).irq(12)"
+                             "\n".format(core, task, core, task, core, task, core, task,
+                                         core, task, core, task, core, task, core, task,
+                                         core, task, core, task, core, task, core, task,
+                                         core, task, core, task))
+            in_data_str = self.get_data().split(",")
+            ret = [try_int(s, 0) for s in in_data_str]
+            key = "core_" + str(core)
+            try:
+                stat[key] = {"cpu": core, "max_irq": ret[0], "bucket_0" : ret[1],
+                             "bucket_1" : ret[2], "bucket_2" : ret[3],
+                             "bucket_3" : ret[4], "bucket_4" : ret[5],
+                             "bucket_5" : ret[6], "bucket_6" : ret[7],
+                             "bucket_7" : ret[8], "bucket_8" : ret[9],
+                             "bucket_9" : ret[10], "bucket_10" : ret[11],
+                             "bucket_11" : ret[12], "bucket_12" : ret[13],
+                             "overflow": ret[10] + ret[11] + ret[12] + ret[13]}
+            except (KeyError, IndexError):
+                LOG.error("Corrupted PACKET %s", in_data_str)
+
+        return stat
 
-        ports_str = ""
-        for port in ports:
-            ports_str = ports_str + str(port) + ","
-        ports_str = ports_str[:-1]
+    def multi_port_stats(self, ports):
+        """get counter values from all  ports at once"""
 
+        ports_str = ",".join(map(str, ports))
         ports_all_data = []
         tot_result = [0] * len(ports)
 
-        retry_counter = 0
         port_index = 0
-        while (len(ports) is not len(ports_all_data)) and (retry_counter < 10):
+        while (len(ports) is not len(ports_all_data)):
             self.put_command("multi port stats {}\n".format(ports_str))
-            ports_all_data = self.get_data().split(";")
+            status, ports_all_data_str = self.get_string()
+
+            if not status:
+                return False, []
+
+            ports_all_data = ports_all_data_str.split(";")
 
             if len(ports) is len(ports_all_data):
                 for port_data_str in ports_all_data:
 
+                    tmpdata = []
                     try:
-                        tot_result[port_index] = [try_int(s, 0) for s in port_data_str.split(",")]
+                        tmpdata = [try_int(s, 0) for s in port_data_str.split(",")]
                     except (IndexError, TypeError):
-                        LOG.error("Port Index error %d  %s - retrying ", port_index, port_data_str)
-
-                    if (len(tot_result[port_index]) is not 6) or \
-                                    tot_result[port_index][0] is not ports[port_index]:
-                        ports_all_data = []
-                        tot_result = [0] * len(ports)
-                        port_index = 0
-                        time.sleep(0.1)
+                        LOG.error("Unpacking data error %s", port_data_str)
+                        return False, []
+
+                    if (len(tmpdata) < 6) or tmpdata[0] not in ports:
                         LOG.error("Corrupted PACKET %s - retrying", port_data_str)
-                        break
+                        return False, []
                     else:
+                        tot_result[port_index] = tmpdata
                         port_index = port_index + 1
             else:
                 LOG.error("Empty / too much data - retry -%s-", ports_all_data)
-                ports_all_data = []
-                tot_result = [0] * len(ports)
-                port_index = 0
-                time.sleep(0.1)
+                return False, []
 
-            retry_counter = retry_counter + 1
-        return tot_result
+        LOG.debug("Multi port packet ..OK.. %s", tot_result)
+        return True, tot_result
+
+    @staticmethod
+    def multi_port_stats_tuple(stats, ports):
+        """
+        Create a statistics tuple from port stats.
+
+        Returns a dict with contains the port stats indexed by port name
+
+        :param stats: (List) - List of List of port stats in pps
+        :param ports (Iterator) - to List of Ports
+
+        :return: (Dict) of port stats indexed by port_name
+        """
+
+        samples = {}
+        port_names = {}
+        try:
+            port_names = {port_num: port_name for port_name, port_num in ports}
+        except (TypeError, IndexError, KeyError):
+            LOG.critical("Ports are not initialized or number of port is ZERO ... CRITICAL ERROR")
+            return {}
+
+        try:
+            for stat in stats:
+                port_num = stat[0]
+                samples[port_names[port_num]] = {
+                    "in_packets": stat[1],
+                    "out_packets": stat[2]}
+        except (TypeError, IndexError, KeyError):
+            LOG.error("Ports data and samples data is incompatable ....")
+            return {}
+
+        return samples
+
+    @staticmethod
+    def multi_port_stats_diff(prev_stats, new_stats, hz):
+        """
+        Create a statistics tuple from difference between prev port stats
+        and current port stats. And store results in pps.
+
+        :param prev_stats: (List) - Previous List of port statistics
+        :param new_stats: (List) - Current List of port statistics
+        :param hz (float) - speed of system in Hz
+
+        :return: sample (List) - Difference of prev_port_stats and
+                new_port_stats  in pps
+        """
+
+        RX_TOTAL_INDEX = 1
+        TX_TOTAL_INDEX = 2
+        TSC_INDEX = 5
+
+        stats = []
+
+        if len(prev_stats) is not len(new_stats):
+            for port_index, stat in enumerate(new_stats):
+                stats.append([port_index, float(0), float(0), 0, 0, 0])
+            return stats
+
+        try:
+            for port_index, stat in enumerate(new_stats):
+                if stat[RX_TOTAL_INDEX] > prev_stats[port_index][RX_TOTAL_INDEX]:
+                    rx_total = stat[RX_TOTAL_INDEX] - \
+                               prev_stats[port_index][RX_TOTAL_INDEX]
+                else:
+                    rx_total = stat[RX_TOTAL_INDEX]
+
+                if stat[TX_TOTAL_INDEX] > prev_stats[port_index][TX_TOTAL_INDEX]:
+                    tx_total = stat[TX_TOTAL_INDEX] - prev_stats[port_index][TX_TOTAL_INDEX]
+                else:
+                    tx_total = stat[TX_TOTAL_INDEX]
+
+                if stat[TSC_INDEX] > prev_stats[port_index][TSC_INDEX]:
+                    tsc = stat[TSC_INDEX] - prev_stats[port_index][TSC_INDEX]
+                else:
+                    tsc = stat[TSC_INDEX]
+
+                if tsc is 0:
+                    rx_total = tx_total = float(0)
+                else:
+                    if hz is 0:
+                        LOG.error("HZ is ZERO ..")
+                        rx_total = tx_total = float(0)
+                    else:
+                        rx_total = float(rx_total * hz / tsc)
+                        tx_total = float(tx_total * hz / tsc)
+
+                stats.append([port_index, rx_total, tx_total, 0, 0, tsc])
+        except (TypeError, IndexError, KeyError):
+            stats = []
+            LOG.info("Current Port Stats incompatable to previous Port stats .. Discarded")
+
+        return stats
 
     def port_stats(self, ports):
         """get counter values from a specific port"""
@@ -649,7 +789,6 @@ class ProxSocketHelper(object):
         self.put_command("quit_force\n")
         time.sleep(3)
 
-
 _LOCAL_OBJECT = object()
 
 
@@ -731,6 +870,30 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
         file_str[1] = self.additional_files[base_name]
         return '"'.join(file_str)
 
+    def _make_core_list(self, inputStr):
+
+        my_input = inputStr.split("core ", 1)[1]
+        ok_list = set()
+
+        substrs = [x.strip() for x in my_input.split(',')]
+        for i in substrs:
+            try:
+                ok_list.add(int(i))
+
+            except ValueError:
+                try:
+                    substr = [int(k.strip()) for k in i.split('-')]
+                    if len(substr) > 1:
+                        startstr = substr[0]
+                        endstr = substr[len(substr) - 1]
+                        for z in range(startstr, endstr + 1):
+                            ok_list.add(z)
+                except ValueError:
+                    LOG.error("Error in cores list ... resuming ")
+                    return ok_list
+
+        return ok_list
+
     def generate_prox_config_file(self, config_path):
         sections = []
         prox_config = ConfigParser(config_path, sections)
@@ -750,6 +913,18 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
                     if section_data[0] == "mac":
                         section_data[1] = "hardware"
 
+        # adjust for range of cores
+        new_sections = []
+        for section_name, section in sections:
+            if section_name.startswith('core') and section_name.find('$') == -1:
+                    core_list = self._make_core_list(section_name)
+                    for core in core_list:
+                        new_sections.append(["core " + str(core), section])
+            else:
+                new_sections.append([section_name, section])
+
+        sections = new_sections
+
         # search for dst mac
         for _, section in sections:
             for section_data in section:
@@ -956,6 +1131,8 @@ class ProxResourceHelper(ClientResourceHelper):
         self.step_delta = 1
         self.step_time = 0.5
         self._test_type = None
+        self.prev_multi_port = []
+        self.prev_hz = 0
 
     @property
     def sut(self):
@@ -984,7 +1161,7 @@ class ProxResourceHelper(ClientResourceHelper):
 
     def _run_traffic_once(self, traffic_profile):
         traffic_profile.execute_traffic(self)
-        if traffic_profile.done:
+        if traffic_profile.done.is_set():
             self._queue.put({'done': True})
             LOG.debug("tg_prox done")
             self._terminated.value = 1
@@ -994,11 +1171,40 @@ class ProxResourceHelper(ClientResourceHelper):
     def collect_collectd_kpi(self):
         return self._collect_resource_kpi()
 
+    def collect_live_stats(self):
+        ports = []
+        for _, port_num in self.vnfd_helper.ports_iter():
+            ports.append(port_num)
+
+        ok, curr_port_stats = self.sut.multi_port_stats(ports)
+        if not ok:
+            return False, {}
+
+        hz = self.sut.hz()
+        if hz is 0:
+            hz = self.prev_hz
+        else:
+            self.prev_hz = hz
+
+        new_all_port_stats = \
+            self.sut.multi_port_stats_diff(self.prev_multi_port, curr_port_stats, hz)
+
+        self.prev_multi_port = curr_port_stats
+
+        live_stats = self.sut.multi_port_stats_tuple(new_all_port_stats,
+                                                     self.vnfd_helper.ports_iter())
+        return True, live_stats
+
     def collect_kpi(self):
         result = super(ProxResourceHelper, self).collect_kpi()
         # add in collectd kpis manually
         if result:
             result['collect_stats'] = self._collect_resource_kpi()
+
+        ok, live_stats = self.collect_live_stats()
+        if ok:
+            result.update({'live_stats': live_stats})
+
         return result
 
     def terminate(self):
@@ -1070,41 +1276,70 @@ class ProxDataHelper(object):
     def totals_and_pps(self):
         if self._totals_and_pps is None:
             rx_total = tx_total = 0
-            all_ports = self.sut.multi_port_stats(range(self.port_count))
-            for port in all_ports:
-                rx_total = rx_total + port[1]
-                tx_total = tx_total + port[2]
-            requested_pps = self.value / 100.0 * self.line_rate_to_pps()
-            self._totals_and_pps = rx_total, tx_total, requested_pps
+            ok = False
+            timeout = time.time() + constants.RETRY_TIMEOUT
+            while not ok:
+                ok, all_ports = self.sut.multi_port_stats([
+                    self.vnfd_helper.port_num(port_name)
+                    for port_name in self.vnfd_helper.port_pairs.all_ports])
+                if time.time() > timeout:
+                    break
+            if ok:
+                for port in all_ports:
+                    rx_total = rx_total + port[1]
+                    tx_total = tx_total + port[2]
+                requested_pps = self.value / 100.0 * self.line_rate_to_pps()
+                self._totals_and_pps = rx_total, tx_total, requested_pps
         return self._totals_and_pps
 
     @property
     def rx_total(self):
-        return self.totals_and_pps[0]
+        try:
+            ret_val = self.totals_and_pps[0]
+        except (AttributeError, ValueError, TypeError, LookupError):
+            ret_val = 0
+        return ret_val
 
     @property
     def tx_total(self):
-        return self.totals_and_pps[1]
+        try:
+            ret_val = self.totals_and_pps[1]
+        except (AttributeError, ValueError, TypeError, LookupError):
+            ret_val = 0
+        return ret_val
 
     @property
     def requested_pps(self):
-        return self.totals_and_pps[2]
+        try:
+            ret_val = self.totals_and_pps[2]
+        except (AttributeError, ValueError, TypeError, LookupError):
+            ret_val = 0
+        return ret_val
 
     @property
     def samples(self):
         samples = {}
         ports = []
-        port_names = []
+        port_names = {}
         for port_name, port_num in self.vnfd_helper.ports_iter():
             ports.append(port_num)
-            port_names.append(port_name)
-
-        results = self.sut.multi_port_stats(ports)
-        for result in results:
-            port_num = result[0]
-            samples[port_names[port_num]] = {
-                    "in_packets": result[1],
-                    "out_packets": result[2]}
+            port_names[port_num] = port_name
+
+        ok = False
+        timeout = time.time() + constants.RETRY_TIMEOUT
+        while not ok:
+            ok, results = self.sut.multi_port_stats(ports)
+            if time.time() > timeout:
+                break
+        if ok:
+            for result in results:
+                port_num = result[0]
+                try:
+                    samples[port_names[port_num]] = {
+                        "in_packets": result[1],
+                        "out_packets": result[2]}
+                except (IndexError, KeyError):
+                    pass
         return samples
 
     def __enter__(self):
@@ -1902,3 +2137,15 @@ class ProxlwAFTRProfileHelper(ProxProfileHelper):
                 data_helper.latency = self.get_latency()
 
         return data_helper.result_tuple, data_helper.samples
+
+
+class ProxIrqProfileHelper(ProxProfileHelper):
+
+    __prox_profile_type__ = "IRQ Query"
+
+    def __init__(self, resource_helper):
+        super(ProxIrqProfileHelper, self).__init__(resource_helper)
+        self._cores_tuple = None
+        self._ports_tuple = None
+        self.step_delta = 5
+        self.step_time = 0.5