Merge "Add API for PPPoE clients statistic retrieval"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
index 1ee71aa..21719cb 100644 (file)
 # limitations under the License.
 
 import logging
+import decimal
 from multiprocessing import Queue, Value, Process
-
 import os
 import posixpath
 import re
-import six
+import uuid
 import subprocess
 import time
 
+import six
+
 from trex_stl_lib.trex_stl_client import LoggerApi
 from trex_stl_lib.trex_stl_client import STLClient
 from trex_stl_lib.trex_stl_exceptions import STLError
@@ -408,12 +410,13 @@ class ClientResourceHelper(ResourceHelper):
         time.sleep(self.QUEUE_WAIT_TIME)
         self._queue.put(samples)
 
-    def run_traffic(self, traffic_profile):
+    def run_traffic(self, traffic_profile, mq_producer):
         # if we don't do this we can hang waiting for the queue to drain
         # have to do this in the subprocess
         self._queue.cancel_join_thread()
         # fixme: fix passing correct trex config file,
         # instead of searching the default path
+        mq_producer.tg_method_started()
         try:
             self._build_ports()
             self.client = self._connect()
@@ -421,8 +424,12 @@ class ClientResourceHelper(ResourceHelper):
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
 
+            iteration_index = 0
             while self._terminated.value == 0:
-                self._run_traffic_once(traffic_profile)
+                iteration_index += 1
+                if self._run_traffic_once(traffic_profile):
+                    self._terminated.value = 1
+                mq_producer.tg_method_iteration(iteration_index)
 
             self.client.stop(self.all_ports)
             self.client.disconnect()
@@ -433,6 +440,8 @@ class ClientResourceHelper(ResourceHelper):
                 return  # return if trex/tg server is stopped.
             raise
 
+        mq_producer.tg_method_finished()
+
     def terminate(self):
         self._terminated.value = 1  # stop client
 
@@ -491,6 +500,7 @@ class Rfc2544ResourceHelper(object):
         self._rfc2544 = None
         self._tolerance_low = None
         self._tolerance_high = None
+        self._tolerance_precision = None
 
     @property
     def rfc2544(self):
@@ -510,6 +520,12 @@ class Rfc2544ResourceHelper(object):
             self.get_rfc_tolerance()
         return self._tolerance_high
 
+    @property
+    def tolerance_precision(self):
+        if self._tolerance_precision is None:
+            self.get_rfc_tolerance()
+        return self._tolerance_precision
+
     @property
     def correlated_traffic(self):
         if self._correlated_traffic is None:
@@ -529,9 +545,13 @@ class Rfc2544ResourceHelper(object):
 
     def get_rfc_tolerance(self):
         tolerance_str = self.get_rfc2544('allowed_drop_rate', self.DEFAULT_TOLERANCE)
-        tolerance_iter = iter(sorted(float(t.strip()) for t in tolerance_str.split('-')))
-        self._tolerance_low = next(tolerance_iter)
-        self._tolerance_high = next(tolerance_iter, self.tolerance_low)
+        tolerance_iter = iter(sorted(
+            decimal.Decimal(t.strip()) for t in tolerance_str.split('-')))
+        tolerance_low = next(tolerance_iter)
+        tolerance_high = next(tolerance_iter, tolerance_low)
+        self._tolerance_precision = abs(tolerance_high.as_tuple().exponent)
+        self._tolerance_high = float(tolerance_high)
+        self._tolerance_low = float(tolerance_low)
 
 
 class SampleVNFDeployHelper(object):
@@ -612,6 +632,7 @@ class ScenarioHelper(object):
         test_timeout = self.options.get('timeout', constants.DEFAULT_VNF_TIMEOUT)
         return test_duration if test_duration > test_timeout else test_timeout
 
+
 class SampleVNF(GenericVNF):
     """ Class providing file-like API for generic VNF implementation """
 
@@ -621,8 +642,9 @@ class SampleVNF(GenericVNF):
     APP_NAME = "SampleVNF"
     # we run the VNF interactively, so the ssh command will timeout after this long
 
-    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
-        super(SampleVNF, self).__init__(name, vnfd)
+    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+                 resource_helper_type=None):
+        super(SampleVNF, self).__init__(name, vnfd, task_id)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -853,8 +875,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
     APP_NAME = 'Sample'
     RUN_WAIT = 1
 
-    def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
-        super(SampleVNFTrafficGen, self).__init__(name, vnfd)
+    def __init__(self, name, vnfd, task_id, setup_env_helper_type=None,
+                 resource_helper_type=None):
+        super(SampleVNFTrafficGen, self).__init__(name, vnfd, task_id)
         self.bin_path = get_nsb_option('bin_path', '')
 
         self.scenario_helper = ScenarioHelper(self.name)
@@ -911,12 +934,13 @@ class SampleVNFTrafficGen(GenericTrafficGen):
                 LOG.info("%s TG Server is up and running.", self.APP_NAME)
                 return self._tg_process.exitcode
 
-    def _traffic_runner(self, traffic_profile):
+    def _traffic_runner(self, traffic_profile, mq_id):
         # always drop connections first thing in new processes
         # so we don't get paramiko errors
         self.ssh_helper.drop_connection()
         LOG.info("Starting %s client...", self.APP_NAME)
-        self.resource_helper.run_traffic(traffic_profile)
+        self._mq_producer = self._setup_mq_producer(mq_id)
+        self.resource_helper.run_traffic(traffic_profile, self._mq_producer)
 
     def run_traffic(self, traffic_profile):
         """ Generate traffic on the wire according to the given params.
@@ -926,10 +950,12 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
                                     os.getpid())
-        self._traffic_process = Process(name=name, target=self._traffic_runner,
-                                        args=(traffic_profile,))
+        self._traffic_process = Process(
+            name=name, target=self._traffic_runner,
+            args=(traffic_profile, uuid.uuid1().int))
         self._traffic_process.start()
         # Wait for traffic process to start
         while self.resource_helper.client_started.value == 0:
@@ -938,8 +964,6 @@ class SampleVNFTrafficGen(GenericTrafficGen):
             if not self._traffic_process.is_alive():
                 break
 
-        return self._traffic_process.is_alive()
-
     def collect_kpi(self):
         # check if the tg processes have exited
         physical_node = Context.get_physical_node_from_server(