add yardstick iruya 9.0.0 release notes
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
index a09f2a7..a369a3a 100644 (file)
@@ -1,4 +1,4 @@
-# Copyright (c) 2016-2018 Intel Corporation
+# Copyright (c) 2016-2019 Intel Corporation
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
 # limitations under the License.
 
 import logging
-from multiprocessing import Queue, Value, Process
+import decimal
+from multiprocessing import Queue, Value, Process, JoinableQueue
 import os
 import posixpath
 import re
-import uuid
 import subprocess
 import time
 
-import six
 
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
@@ -113,19 +112,6 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         self.used_drivers = None
         self.dpdk_bind_helper = DpdkBindHelper(ssh_helper)
 
-    def _setup_hugepages(self):
-        meminfo = utils.read_meminfo(self.ssh_helper)
-        hp_size_kb = int(meminfo['Hugepagesize'])
-        hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
-        nr_hugepages = int(abs(hugepages_gb * 1024 * 1024 / hp_size_kb))
-        self.ssh_helper.execute('echo %s | sudo tee %s' %
-                                (nr_hugepages, self.NR_HUGEPAGES_PATH))
-        hp = six.BytesIO()
-        self.ssh_helper.get_file_obj(self.NR_HUGEPAGES_PATH, hp)
-        nr_hugepages_set = int(hp.getvalue().decode('utf-8').splitlines()[0])
-        LOG.info('Hugepages size (kB): %s, number claimed: %s, number set: %s',
-                 hp_size_kb, nr_hugepages, nr_hugepages_set)
-
     def build_config(self):
         vnf_cfg = self.scenario_helper.vnf_cfg
         task_path = self.scenario_helper.task_path
@@ -193,7 +179,7 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         """No actions/rules (flows) by default"""
         return None
 
-    def _build_pipeline_kwargs(self):
+    def _build_pipeline_kwargs(self, cfg_file=None, script=None):
         tool_path = self.ssh_helper.provision_tool(tool_file=self.APP_NAME)
         # count the number of actual ports in the list of pairs
         # remove duplicate ports
@@ -213,8 +199,8 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
             hwlb = ' --hwlb %s' % worker_threads
 
         self.pipeline_kwargs = {
-            'cfg_file': self.CFG_CONFIG,
-            'script': self.CFG_SCRIPT,
+            'cfg_file': cfg_file if cfg_file else self.CFG_CONFIG,
+            'script': script if script else self.CFG_SCRIPT,
             'port_mask_hex': ports_mask_hex,
             'tool_path': tool_path,
             'hwlb': hwlb,
@@ -238,12 +224,16 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
 
     def _setup_dpdk(self):
         """Setup DPDK environment needed for VNF to run"""
-        self._setup_hugepages()
+        hugepages_gb = self.scenario_helper.all_options.get('hugepages_gb', 16)
+        utils.setup_hugepages(self.ssh_helper, hugepages_gb * 1024 * 1024)
         self.dpdk_bind_helper.load_dpdk_driver()
 
         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
         if exit_status == 0:
             return
+        else:
+            LOG.critical("DPDK Driver not installed")
+            return
 
     def _setup_resources(self):
         # what is this magic?  how do we know which socket is for which port?
@@ -409,26 +399,26 @@ class ClientResourceHelper(ResourceHelper):
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
-    def run_traffic(self, traffic_profile, mq_producer):
+    def run_traffic(self, traffic_profile):
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
-        mq_producer.tg_method_started()
         try:
             self._build_ports()
             self.client = self._connect()
+            if self.client is None:
+                LOG.critical("Failure to Connect ... unable to continue")
+                return
+
             self.client.reset(ports=self.all_ports)
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
-            iteration_index = 0
             while self._terminated.value == 0:
-                iteration_index += 1
                 if self._run_traffic_once(traffic_profile):
                     self._terminated.value = 1
-                mq_producer.tg_method_iteration(iteration_index)
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
@@ -439,8 +429,6 @@ class ClientResourceHelper(ResourceHelper):
                 return  # return if trex/tg server is stopped.
             raise
 
-        mq_producer.tg_method_finished()
-
     def terminate(self):
         self._terminated.value = 1  # stop client
 
@@ -473,22 +461,35 @@ class ClientResourceHelper(ResourceHelper):
                                server=self.vnfd_helper.mgmt_interface["ip"],
                                verbose_level=LoggerApi.VERBOSE_QUIET)
 
-        # try to connect with 5s intervals, 30s max
+        # try to connect with 5s intervals
         for idx in range(6):
             try:
                 client.connect()
-                break
+                for idx2 in range(6):
+                    if client.is_connected():
+                        return client
+                    LOG.info("Waiting to confirm connection %s .. Attempt %s",
+                             idx, idx2)
+                    time.sleep(1)
+                client.disconnect(stop_traffic=True, release_ports=True)
             except STLError:
                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
                 time.sleep(5)
-        return client
 
+        if client.is_connected():
+            return client
+        else:
+            LOG.critical("Connection failure ..TRex username: %s server: %s",
+                         self.vnfd_helper.mgmt_interface["user"],
+                         self.vnfd_helper.mgmt_interface["ip"])
+            return None
 
 class Rfc2544ResourceHelper(object):
 
     DEFAULT_CORRELATED_TRAFFIC = False
     DEFAULT_LATENCY = False
     DEFAULT_TOLERANCE = '0.0001 - 0.0001'
+    DEFAULT_RESOLUTION = '0.1'
 
     def __init__(self, scenario_helper):
         super(Rfc2544ResourceHelper, self).__init__()
@@ -499,6 +500,8 @@ class Rfc2544ResourceHelper(object):
         self._rfc2544 = None
         self._tolerance_low = None
         self._tolerance_high = None
+        self._tolerance_precision = None
+        self._resolution = None
 
     @property
     def rfc2544(self):
@@ -518,6 +521,12 @@ class Rfc2544ResourceHelper(object):
             self.get_rfc_tolerance()
         return self._tolerance_high
 
+    @property
+    def tolerance_precision(self):
+        if self._tolerance_precision is None:
+            self.get_rfc_tolerance()
+        return self._tolerance_precision
+
     @property
     def correlated_traffic(self):
         if self._correlated_traffic is None:
@@ -532,14 +541,25 @@ class Rfc2544ResourceHelper(object):
             self._latency = self.get_rfc2544('latency', self.DEFAULT_LATENCY)
         return self._latency
 
+    @property
+    def resolution(self):
+        if self._resolution is None:
+            self._resolution = float(self.get_rfc2544('resolution',
+                                                self.DEFAULT_RESOLUTION))
+        return self._resolution
+
     def get_rfc2544(self, name, default=None):
         return self.rfc2544.get(name, default)
 
     def get_rfc_tolerance(self):
         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
-        tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
-        self._tolerance_low = next(tolerance_iter)
-        self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+        tolerance_iter = iter(sorted(
+            decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
+        tolerance_low = next(tolerance_iter)
+        tolerance_high = next(tolerance_iter, tolerance_low)
+        self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
+        self._tolerance_high = float(tolerance_high)
+        self._tolerance_low = float(tolerance_low)
 
 
 class SampleVNFDeployHelper(object):
@@ -620,7 +640,6 @@ class ScenarioHelper(object):
         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
         return test_duration if test_duration > test_timeout else test_timeout
 
-
 class SampleVNF(GenericVNF):
     """ Class providing file-like API for generic VNF implementation """
 
@@ -630,9 +649,8 @@ class SampleVNF(GenericVNF):
     APP_NAME = "SampleVNF"
     # we run the VNF interactively, so the ssh command will timeout after this long
 
-    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
-                 resource_helper_type=None):
-        super(SampleVNF, self).__init__(name, vnfd, task_id)
+    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+        super(SampleVNF, self).__init__(name, vnfd)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -701,8 +719,8 @@ class SampleVNF(GenericVNF):
         scenarios:
         - type: NSPerf
           nodes:
-            tg__0: trafficgen_1.yardstick
-            vnf__0: vnf.yardstick
+            tg__0: trafficgen_0.yardstick
+            vnf__0: vnf_0.yardstick
           options:
             collectd:
               <options>  # COLLECTD priority 3
@@ -765,6 +783,53 @@ class SampleVNF(GenericVNF):
             # by other VNF output
             self.q_in.put('\r\n')
 
+    def wait_for_initialize(self):
+        buf = []
+        vnf_prompt_found = False
+        prompt_command = '\r\n'
+        script_name = 'non_existent_script_name'
+        done_string = 'Cannot open file "{}"'.format(script_name)
+        time.sleep(self.WAIT_TIME)  # Give some time for config to load
+        while True:
+            if not self._vnf_process.is_alive():
+                raise RuntimeError("%s VNF process died." % self.APP_NAME)
+            while self.q_out.qsize() > 0:
+                buf.append(self.q_out.get())
+                message = ''.join(buf)
+
+                if self.VNF_PROMPT in message and not vnf_prompt_found:
+                    # Once we got VNF promt, it doesn't mean that the VNF is
+                    # up and running/initialized completely. But we can run
+                    # addition (any) VNF command and wait for it to complete
+                    # as it will be finished ONLY at the end of the VNF
+                    # initialization. So, this approach can be used to
+                    # indentify that VNF is completely initialized.
+                    LOG.info("Got %s VNF prompt.", self.APP_NAME)
+                    prompt_command = "run {}\r\n".format(script_name)
+                    self.q_in.put(prompt_command)
+                    # Cut the buffer since we are not interesting to find
+                    # the VNF prompt anymore
+                    prompt_pos = message.find(self.VNF_PROMPT)
+                    buf = [message[prompt_pos + len(self.VNF_PROMPT):]]
+                    vnf_prompt_found = True
+                    continue
+
+                if done_string in message:
+                    LOG.info("%s VNF is up and running.", self.APP_NAME)
+                    self._vnf_up_post()
+                    self.queue_wrapper.clear()
+                    return self._vnf_process.exitcode
+
+                if "PANIC" in message:
+                    raise RuntimeError("Error starting %s VNF." %
+                                       self.APP_NAME)
+
+            LOG.info("Waiting for %s VNF to start.. ", self.APP_NAME)
+            time.sleep(self.WAIT_TIME_FOR_SCRIPT)
+            # Send command again to display the expected prompt in case the
+            # expected text was corrupted by other VNF output
+            self.q_in.put(prompt_command)
+
     def start_collect(self):
         self.resource_helper.start_collect()
 
@@ -863,9 +928,8 @@ class SampleVNFTrafficGen(GenericTrafficGen):
     APP_NAME = 'Sample'
     RUN_WAIT = 1
 
-    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
-                 resource_helper_type=None):
-        super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
+    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
+        super(SampleVNFTrafficGen, self).__init__(name, vnfd)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -887,6 +951,39 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         self.traffic_finished = False
         self._tg_process = None
         self._traffic_process = None
+        self._tasks_queue = JoinableQueue()
+        self._result_queue = Queue()
+
+    def _test_runner(self, traffic_profile, tasks, results):
+        self.resource_helper.run_test(traffic_profile, tasks, results)
+
+    def _init_traffic_process(self, traffic_profile):
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
+                                    os.getpid())
+        self._traffic_process = Process(name=name, target=self._test_runner,
+                                        args=(
+                                        traffic_profile, self._tasks_queue,
+                                        self._result_queue))
+
+        self._traffic_process.start()
+        while self.resource_helper.client_started.value == 0:
+            time.sleep(1)
+            if not self._traffic_process.is_alive():
+                break
+
+    def run_traffic_once(self, traffic_profile):
+        if self.resource_helper.client_started.value == 0:
+            self._init_traffic_process(traffic_profile)
+
+        # continue test - run next iteration
+        LOG.info("Run next iteration ...")
+        self._tasks_queue.put('RUN_TRAFFIC')
+
+    def wait_on_traffic(self):
+        self._tasks_queue.join()
+        result = self._result_queue.get()
+        return result
 
     def _start_server(self):
         # we can't share ssh paramiko objects to force new connection
@@ -899,6 +996,8 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             self.scenario_helper.nodes[self.name]
         )
 
+        self.resource_helper.context_cfg = context_cfg
+
         self.resource_helper.setup()
         # must generate_cfg after DPDK bind because we need port number
         self.resource_helper.generate_cfg()
@@ -922,13 +1021,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
-    def _traffic_runner(self, traffic_profile, mq_id):
+    def _traffic_runner(self, traffic_profile):
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
-        self._mq_producer = self._setup_mq_producer(mq_id)
-        self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
+        self.resource_helper.run_traffic(traffic_profile)
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
@@ -938,12 +1036,10 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
-                                    traffic_profile.__class__.__name__,
+        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
                                     os.getpid())
-        self._traffic_process = Process(
-            name=name, target=self._traffic_runner,
-            args=(traffic_profile, uuid.uuid1().int))
+        self._traffic_process = Process(name=name, target=self._traffic_runner,
+                                        args=(traffic_profile,))
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
@@ -952,6 +1048,8 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             if not self._traffic_process.is_alive():
                 break
 
+        return self._traffic_process.is_alive()
+
     def collect_kpi(self):
         # check if the tg processes have exited
         physical_node = Context.get_physical_node_from_server(