Merge "update docker version to 16.04"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / tg_trex.py
index 058b715..4250cb7 100644 (file)
 """ Trex acts as traffic generation and vnf definitions based on IETS Spec """
 
 from __future__ import absolute_import
-from __future__ import print_function
-import multiprocessing
-import time
 import logging
 import os
+
 import yaml
 
-from yardstick import ssh
-from yardstick.network_services.vnf_generic.vnf.base import GenericTrafficGen
+from yardstick.common.utils import mac_address_to_hex_list, try_int
 from yardstick.network_services.utils import get_nsb_option
-from yardstick.network_services.utils import provision_tool
-from stl.trex_stl_lib.trex_stl_client import STLClient
-from stl.trex_stl_lib.trex_stl_client import LoggerApi
-from stl.trex_stl_lib.trex_stl_exceptions import STLError
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFTrafficGen
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
 
 LOG = logging.getLogger(__name__)
-DURATION = 30
-WAIT_QUEUE = 1
-TREX_SYNC_PORT = 4500
-TREX_ASYNC_PORT = 4501
 
 
-class TrexTrafficGen(GenericTrafficGen):
+class TrexDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
+    APP_NAME = "t-rex-64"
+    CFG_CONFIG = ""
+    CFG_SCRIPT = ""
+    PIPELINE_COMMAND = ""
+    VNF_TYPE = "TG"
+
+
+class TrexResourceHelper(ClientResourceHelper):
+
+    CONF_FILE = '/tmp/trex_cfg.yaml'
+    QUEUE_WAIT_TIME = 1
+    RESOURCE_WORD = 'trex'
+    RUN_DURATION = 0
+
+    ASYNC_PORT = 4500
+    SYNC_PORT = 4501
+
+    def __init__(self, setup_helper):
+        super(TrexResourceHelper, self).__init__(setup_helper)
+        self.port_map = {}
+        self.dpdk_to_trex_port_map = {}
+
+    def generate_cfg(self):
+        port_names = self.vnfd_helper.port_pairs.all_ports
+        vpci_list = []
+        port_list = []
+        self.port_map = {}
+        self.dpdk_to_trex_port_map = {}
+
+        sorted_ports = sorted((self.vnfd_helper.port_num(port_name), port_name) for port_name in
+                              port_names)
+        for index, (port_num, port_name) in enumerate(sorted_ports):
+            interface = self.vnfd_helper.find_interface(name=port_name)
+            virtual_interface = interface['virtual-interface']
+            dst_mac = virtual_interface["dst_mac"]
+
+            # this is to check for unused ports, all ports in the topology
+            # will always have dst_mac
+            if not dst_mac:
+                continue
+            # TRex ports are in logical order roughly based on DPDK port number sorting
+            vpci_list.append(virtual_interface["vpci"])
+            local_mac = virtual_interface["local_mac"]
+            port_list.append({
+                "src_mac": mac_address_to_hex_list(local_mac),
+                "dest_mac": mac_address_to_hex_list(dst_mac),
+            })
+            self.port_map[port_name] = index
+            self.dpdk_to_trex_port_map[port_num] = index
+        trex_cfg = {
+            'interfaces': vpci_list,
+            'port_info': port_list,
+            "port_limit": len(port_names),
+            "version": '2',
+        }
+        cfg_file = [trex_cfg]
+
+        cfg_str = yaml.safe_dump(cfg_file, default_flow_style=False, explicit_start=True)
+        self.ssh_helper.upload_config_file(os.path.basename(self.CONF_FILE), cfg_str)
+
+    def _build_ports(self):
+        super(TrexResourceHelper, self)._build_ports()
+        # override with TRex logic port number
+        self.uplink_ports = [self.dpdk_to_trex_port_map[p] for p in self.uplink_ports]
+        self.downlink_ports = [self.dpdk_to_trex_port_map[p] for p in self.downlink_ports]
+        self.all_ports = [self.dpdk_to_trex_port_map[p] for p in self.all_ports]
+
+    def port_num(self, intf):
+        # return logical TRex port
+        return self.port_map[intf]
+
+    def check_status(self):
+        status, _, _ = self.ssh_helper.execute("sudo lsof -i:%s" % self.SYNC_PORT)
+        return status
+
+    # temp disable
+    DISABLE_DEPLOY = True
+
+    def setup(self):
+        super(TrexResourceHelper, self).setup()
+        if self.DISABLE_DEPLOY:
+            return
+
+        trex_path = self.ssh_helper.join_bin_path('trex')
+
+        err = self.ssh_helper.execute("which {}".format(trex_path))[0]
+        if err == 0:
+            return
+
+        LOG.info("Copying %s to destination...", self.RESOURCE_WORD)
+        self.ssh_helper.run("sudo mkdir -p '{}'".format(os.path.dirname(trex_path)))
+        self.ssh_helper.put("~/.bash_profile", "~/.bash_profile")
+        self.ssh_helper.put(trex_path, trex_path, True)
+        ko_src = os.path.join(trex_path, "scripts/ko/src/")
+        self.ssh_helper.execute(self.MAKE_INSTALL.format(ko_src))
+
+    def start(self, ports=None, *args, **kwargs):
+        cmd = "sudo fuser -n tcp {0.SYNC_PORT} {0.ASYNC_PORT} -k > /dev/null 2>&1"
+        self.ssh_helper.execute(cmd.format(self))
+
+        self.ssh_helper.execute("sudo pkill -9 rex > /dev/null 2>&1")
+
+        # We MUST default to 1 because TRex won't work on single-queue devices with
+        # more than one core per port
+        # We really should be trying to find the number of queues in the driver,
+        # but there doesn't seem to be a way to do this
+        # TRex Error: the number of cores should be 1 when the driver
+        # support only one tx queue and one rx queue. Please use -c 1
+        threads_per_port = try_int(self.scenario_helper.options.get("queues_per_port"), 1)
+
+        trex_path = self.ssh_helper.join_bin_path("trex", "scripts")
+        path = get_nsb_option("trex_path", trex_path)
+
+        cmd = "./t-rex-64 --no-scapy-server -i -c {} --cfg '{}'".format(threads_per_port,
+                                                                        self.CONF_FILE)
+
+        if self.scenario_helper.options.get("trex_server_debug"):
+            # if there are errors we want to see them
+            redir = ""
+        else:
+            redir = ">/dev/null"
+        # we have to sudo cd because the path might be owned by root
+        trex_cmd = """sudo bash -c "cd '{}' ; {}" {}""".format(path, cmd, redir)
+        LOG.debug(trex_cmd)
+        self.ssh_helper.execute(trex_cmd)
+
+    def terminate(self):
+        super(TrexResourceHelper, self).terminate()
+        cmd = "sudo fuser -n tcp %s %s -k > /dev/null 2>&1"
+        self.ssh_helper.execute(cmd % (self.SYNC_PORT, self.ASYNC_PORT))
+
+
+class TrexTrafficGen(SampleVNFTrafficGen):
     """
     This class handles mapping traffic profile and generating
     traffic for given testcase
     """
 
-    def __init__(self, vnfd):
-        super(TrexTrafficGen, self).__init__(vnfd)
-        self._result = {}
-        self._queue = multiprocessing.Queue()
-        self._terminated = multiprocessing.Value('i', 0)
-        self._traffic_process = None
-        self._vpci_ascending = None
-        self.client = None
-        self.my_ports = None
-        self.client_started = multiprocessing.Value('i', 0)
-
-        mgmt_interface = vnfd["mgmt-interface"]
-
-        self.connection = ssh.SSH.from_node(mgmt_interface)
-        self.connection.wait()
-
-    @classmethod
-    def _split_mac_address_into_list(cls, mac):
-        octets = mac.split(':')
-        for i, elem in enumerate(octets):
-            octets[i] = "0x" + str(elem)
-        return octets
-
-    def _generate_trex_cfg(self, vnfd):
-        """
-
-        :param vnfd: vnfd.yaml
-        :return: trex_cfg.yaml file
-        """
-        trex_cfg = dict(
-            port_limit=0,
-            version='2',
-            interfaces=[],
-            port_info=list(dict(
-            ))
-        )
-        trex_cfg["port_limit"] = len(vnfd["vdu"][0]["external-interface"])
-        trex_cfg["version"] = '2'
-
-        cfg_file = []
-        vpci = []
-        port = {}
-
-        for interface in range(len(vnfd["vdu"][0]["external-interface"])):
-            ext_intrf = vnfd["vdu"][0]["external-interface"]
-            virtual_interface = ext_intrf[interface]["virtual-interface"]
-            vpci.append(virtual_interface["vpci"])
-
-            port["src_mac"] = self._split_mac_address_into_list(
-                virtual_interface["local_mac"])
-            port["dest_mac"] = self._split_mac_address_into_list(
-                virtual_interface["dst_mac"])
-
-            trex_cfg["port_info"].append(port.copy())
-
-        trex_cfg["interfaces"] = vpci
-        cfg_file.append(trex_cfg)
-
-        with open('/tmp/trex_cfg.yaml', 'w') as outfile:
-            outfile.write(yaml.safe_dump(cfg_file, default_flow_style=False))
-        self.connection.put('/tmp/trex_cfg.yaml', '/etc')
-
-        self._vpci_ascending = sorted(vpci)
-
-    @classmethod
-    def __setup_hugepages(cls, connection):
-        hugepages = \
-            connection.execute(
-                "awk '/Hugepagesize/ { print $2$3 }' < /proc/meminfo")[1]
-        hugepages = hugepages.rstrip()
-
-        memory_path = \
-            '/sys/kernel/mm/hugepages/hugepages-%s/nr_hugepages' % hugepages
-        connection.execute("awk -F: '{ print $1 }' < %s" % memory_path)
-
-        pages = 16384 if hugepages.rstrip() == "2048kB" else 16
-        connection.execute("echo %s > %s" % (pages, memory_path))
-
-    def setup_vnf_environment(self, connection):
-        ''' setup dpdk environment needed for vnf to run '''
-
-        self.__setup_hugepages(connection)
-        connection.execute("modprobe uio && modprobe igb_uio")
-
-        exit_status = connection.execute("lsmod | grep -i igb_uio")[0]
-        if exit_status == 0:
-            return
+    APP_NAME = 'TRex'
 
-        dpdk = os.path.join(self.bin_path, "dpdk-16.07")
-        dpdk_setup = \
-            provision_tool(self.connection,
-                           os.path.join(self.bin_path, "nsb_setup.sh"))
-        status = connection.execute("ls {} >/dev/null 2>&1".format(dpdk))[0]
-        if status:
-            connection.execute("bash %s dpdk >/dev/null 2>&1" % dpdk_setup)
+    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+        if resource_helper_type is None:
+            resource_helper_type = TrexResourceHelper
 
-    def scale(self, flavor=""):
-        ''' scale vnfbased on flavor input '''
-        super(TrexTrafficGen, self).scale(flavor)
-
-    def instantiate(self, scenario_cfg, context_cfg):
-        self._generate_trex_cfg(self.vnfd)
-        self.setup_vnf_environment(self.connection)
-
-        trex = os.path.join(self.bin_path, "trex")
-        err = \
-            self.connection.execute("ls {} >/dev/null 2>&1".format(trex))[0]
-        if err != 0:
-            LOG.info("Copying trex to destination...")
-            self.connection.put("/root/.bash_profile", "/root/.bash_profile")
-            self.connection.put(trex, trex, True)
-            ko_src = os.path.join(trex, "scripts/ko/src/")
-            self.connection.execute("cd %s && make && make install" % ko_src)
-
-        LOG.info("Starting TRex server...")
-        _tg_process = \
-            multiprocessing.Process(target=self._start_server)
-        _tg_process.start()
-        while True:
-            if not _tg_process.is_alive():
-                raise RuntimeError("Traffic Generator process died.")
-            LOG.info("Waiting for TG Server to start.. ")
-            time.sleep(1)
-            status = \
-                self.connection.execute("lsof -i:%s" % TREX_SYNC_PORT)[0]
-            if status == 0:
-                LOG.info("TG server is up and running.")
-                return _tg_process.exitcode
-
-    def listen_traffic(self, traffic_profile):
-        pass
+        if setup_env_helper_type is None:
+            setup_env_helper_type = TrexDpdkVnfSetupEnvHelper
 
-    def _get_logical_if_name(self, vpci):
-        ext_intf = self.vnfd["vdu"][0]["external-interface"]
-        for interface in range(len(self.vnfd["vdu"][0]["external-interface"])):
-            virtual_intf = ext_intf[interface]["virtual-interface"]
-            if virtual_intf["vpci"] == vpci:
-                return ext_intf[interface]["name"]
-
-    def run_traffic(self, traffic_profile):
-        self._traffic_process = \
-            multiprocessing.Process(target=self._traffic_runner,
-                                    args=(traffic_profile, self._queue,
-                                          self.client_started,
-                                          self._terminated))
-        self._traffic_process.start()
-        # Wait for traffic process to start
-        while self.client_started.value == 0:
-            time.sleep(1)
-
-        return self._traffic_process.is_alive()
+        super(TrexTrafficGen, self).__init__(name, vnfd, setup_env_helper_type,
+                                             resource_helper_type)
 
-    def _start_server(self):
-        mgmt_interface = self.vnfd["mgmt-interface"]
+    def _check_status(self):
+        return self.resource_helper.check_status()
 
-        _server = ssh.SSH.from_node(mgmt_interface)
-        _server.wait()
-
-        _server.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
-                        (TREX_SYNC_PORT, TREX_ASYNC_PORT))
+    def _start_server(self):
+        super(TrexTrafficGen, self)._start_server()
+        self.resource_helper.start()
 
-        trex_path = os.path.join(self.bin_path, "trex/scripts")
-        path = get_nsb_option("trex_path", trex_path)
-        trex_cmd = "cd " + path + "; sudo ./t-rex-64 -i > /dev/null 2>&1"
-
-        _server.execute(trex_cmd)
-
-    def _connect_client(self, client=None):
-        if client is None:
-            client = STLClient(username=self.vnfd["mgmt-interface"]["user"],
-                               server=self.vnfd["mgmt-interface"]["ip"],
-                               verbose_level=LoggerApi.VERBOSE_QUIET)
-        # try to connect with 5s intervals, 30s max
-        for idx in range(6):
-            try:
-                client.connect()
-                break
-            except STLError:
-                LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
-                time.sleep(5)
-        return client
-
-    def _traffic_runner(self, traffic_profile, queue,
-                        client_started, terminated):
-        LOG.info("Starting TRex client...")
-
-        self.my_ports = [0, 1]
-        self.client = self._connect_client()
-        self.client.reset(ports=self.my_ports)
-
-        self.client.remove_all_streams(self.my_ports)  # remove all streams
-
-        while not terminated.value:
-            traffic_profile.execute(self)
-            client_started.value = 1
-            last_res = self.client.get_stats(self.my_ports)
-            if not isinstance(last_res, dict):  # added for mock unit test
-                terminated.value = 1
-                last_res = {}
-
-            samples = {}
-            for vpci_idx in range(len(self._vpci_ascending)):
-                name = \
-                    self._get_logical_if_name(self._vpci_ascending[vpci_idx])
-                # fixme: VNFDs KPIs values needs to be mapped to TRex structure
-                xe_value = last_res.get(vpci_idx, {})
-                samples[name] = \
-                    {"rx_throughput_fps": float(xe_value.get("rx_pps", 0.0)),
-                     "tx_throughput_fps": float(xe_value.get("tx_pps", 0.0)),
-                     "rx_throughput_mbps": float(xe_value.get("rx_bps", 0.0)),
-                     "tx_throughput_mbps": float(xe_value.get("tx_bps", 0.0)),
-                     "in_packets": xe_value.get("ipackets", 0),
-                     "out_packets": xe_value.get("opackets", 0)}
-            time.sleep(WAIT_QUEUE)
-            queue.put(samples)
-
-        self.client.disconnect()
-        terminated.value = 0
-
-    def collect_kpi(self):
-        if not self._queue.empty():
-            self._result.update(self._queue.get())
-        LOG.debug("trex collect Kpis %s", self._result)
-        return self._result
+    def scale(self, flavor=""):
+        pass
 
     def terminate(self):
-        self.connection.execute("fuser -n tcp %s %s -k > /dev/null 2>&1" %
-                                (TREX_SYNC_PORT, TREX_ASYNC_PORT))
-        self.traffic_finished = True
-        if self._traffic_process:
-            self._traffic_process.terminate()
+        self.resource_helper.terminate()
+
+    def wait_for_instantiate(self):
+        return self._wait_for_process()