Merge "Change default vports type to int"
[yardstick.git] / yardstick / network_services / vnf_generic / vnf / sample_vnf.py
index 6ae393b..a369a3a 100644 (file)
@@ -14,7 +14,7 @@
 
 import logging
 import decimal
-from multiprocessing import Queue, Value, Process
+from multiprocessing import Queue, Value, Process, JoinableQueue
 import os
 import posixpath
 import re
@@ -231,6 +231,9 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         exit_status = self.dpdk_bind_helper.check_dpdk_driver()
         if exit_status == 0:
             return
+        else:
+            LOG.critical("DPDK Driver not installed")
+            return
 
     def _setup_resources(self):
         # what is this magic?  how do we know which socket is for which port?
@@ -405,6 +408,10 @@ class ClientResourceHelper(ResourceHelper):
         try:
             self._build_ports()
             self.client = self._connect()
+            if self.client is None:
+                LOG.critical("Failure to Connect ... unable to continue")
+                return
+
             self.client.reset(ports=self.all_ports)
             self.client.remove_all_streams(self.all_ports)  # remove all streams
             traffic_profile.register_generator(self)
@@ -454,16 +461,28 @@ class ClientResourceHelper(ResourceHelper):
                                server=self.vnfd_helper.mgmt_interface["ip"],
                                verbose_level=LoggerApi.VERBOSE_QUIET)
 
-        # try to connect with 5s intervals, 30s max
+        # try to connect with 5s intervals
         for idx in range(6):
             try:
                 client.connect()
-                break
+                for idx2 in range(6):
+                    if client.is_connected():
+                        return client
+                    LOG.info("Waiting to confirm connection %s .. Attempt %s",
+                             idx, idx2)
+                    time.sleep(1)
+                client.disconnect(stop_traffic=True, release_ports=True)
             except STLError:
                 LOG.info("Unable to connect to Trex Server.. Attempt %s", idx)
                 time.sleep(5)
-        return client
 
+        if client.is_connected():
+            return client
+        else:
+            LOG.critical("Connection failure ..TRex username: %s server: %s",
+                         self.vnfd_helper.mgmt_interface["user"],
+                         self.vnfd_helper.mgmt_interface["ip"])
+            return None
 
 class Rfc2544ResourceHelper(object):
 
@@ -700,8 +719,8 @@ class SampleVNF(GenericVNF):
         scenarios:
         - type: NSPerf
           nodes:
-            tg__0: trafficgen_1.yardstick
-            vnf__0: vnf.yardstick
+            tg__0: trafficgen_0.yardstick
+            vnf__0: vnf_0.yardstick
           options:
             collectd:
               <options>  # COLLECTD priority 3
@@ -932,6 +951,39 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         self.traffic_finished = False
         self._tg_process = None
         self._traffic_process = None
+        self._tasks_queue = JoinableQueue()
+        self._result_queue = Queue()
+
+    def _test_runner(self, traffic_profile, tasks, results):
+        self.resource_helper.run_test(traffic_profile, tasks, results)
+
+    def _init_traffic_process(self, traffic_profile):
+        name = '{}-{}-{}-{}'.format(self.name, self.APP_NAME,
+                                    traffic_profile.__class__.__name__,
+                                    os.getpid())
+        self._traffic_process = Process(name=name, target=self._test_runner,
+                                        args=(
+                                        traffic_profile, self._tasks_queue,
+                                        self._result_queue))
+
+        self._traffic_process.start()
+        while self.resource_helper.client_started.value == 0:
+            time.sleep(1)
+            if not self._traffic_process.is_alive():
+                break
+
+    def run_traffic_once(self, traffic_profile):
+        if self.resource_helper.client_started.value == 0:
+            self._init_traffic_process(traffic_profile)
+
+        # continue test - run next iteration
+        LOG.info("Run next iteration ...")
+        self._tasks_queue.put('RUN_TRAFFIC')
+
+    def wait_on_traffic(self):
+        self._tasks_queue.join()
+        result = self._result_queue.get()
+        return result
 
     def _start_server(self):
         # we can't share ssh paramiko objects to force new connection