Merge "bugfix: remove pod_name in host and unify host parameter"
[yardstick.git] / yardstick / benchmark / scenarios / networking / vnf_generic.py
index 450f83f..b94bfc9 100644 (file)
@@ -19,6 +19,8 @@ import logging
 import errno
 
 import ipaddress
+
+import copy
 import os
 import sys
 import re
@@ -30,6 +32,7 @@ from collections import defaultdict
 
 from yardstick.benchmark.scenarios import base
 from yardstick.common.constants import LOG_DIR
+from yardstick.common.process import terminate_children
 from yardstick.common.utils import import_modules_from_package, itersubclasses
 from yardstick.common.yaml_loader import yaml_load
 from yardstick.network_services.collector.subscriber import Collector
@@ -140,8 +143,15 @@ class NetworkServiceTestCase(base.Scenario):
 
     def _get_ip_flow_range(self, ip_start_range):
 
+        # IP range is specified as 'x.x.x.x-y.y.y.y'
+        if isinstance(ip_start_range, six.string_types):
+            return ip_start_range
+
         node_name, range_or_interface = next(iter(ip_start_range.items()), (None, '0.0.0.0'))
-        if node_name is not None:
+        if node_name is None:
+            # we are manually specifying the range
+            ip_addr_range = range_or_interface
+        else:
             node = self.context_cfg["nodes"].get(node_name, {})
             try:
                 # the ip_range is the interface name
@@ -163,9 +173,6 @@ class NetworkServiceTestCase(base.Scenario):
                 LOG.warning("Only single IP in range %s", ipaddr)
                 # fall back to single IP range
                 ip_addr_range = ip
-        else:
-            # we are manually specifying the range
-            ip_addr_range = range_or_interface
         return ip_addr_range
 
     def _get_traffic_flow(self):
@@ -183,6 +190,12 @@ class NetworkServiceTestCase(base.Scenario):
             for index, publicip in enumerate(fflow.get("public_ip", [])):
                 flow["public_ip_{}".format(index)] = publicip
 
+            for index, src_port in enumerate(fflow.get("src_port", [])):
+                flow["src_port_{}".format(index)] = src_port
+
+            for index, dst_port in enumerate(fflow.get("dst_port", [])):
+                flow["dst_port_{}".format(index)] = dst_port
+
             flow["count"] = fflow["count"]
         except KeyError:
             flow = {}
@@ -371,13 +384,7 @@ class NetworkServiceTestCase(base.Scenario):
         context_yaml = os.path.join(LOG_DIR, "pod-{}.yaml".format(self.scenario_cfg['task_id']))
         # convert OrderedDict to a list
         # pod.yaml nodes is a list
-        nodes = []
-        for node in self.context_cfg["nodes"].values():
-            # name field is required
-            # remove context suffix
-            node['name'] = node['name'].split('.')[0]
-            nodes.append(node)
-        nodes = self._convert_pkeys_to_string(nodes)
+        nodes = [self._serialize_node(node) for node in self.context_cfg["nodes"].values()]
         pod_dict = {
             "nodes": nodes,
             "networks": self.context_cfg["networks"]
@@ -387,15 +394,16 @@ class NetworkServiceTestCase(base.Scenario):
                            explicit_start=True)
 
     @staticmethod
-    def _convert_pkeys_to_string(nodes):
-        # make copy because we are mutating
-        nodes = nodes[:]
-        for i, node in enumerate(nodes):
-            try:
-                nodes[i] = dict(node, pkey=ssh.convert_key_to_str(node["pkey"]))
-            except KeyError:
-                pass
-        return nodes
+    def _serialize_node(node):
+        new_node = copy.deepcopy(node)
+        # name field is required
+        # remove context suffix
+        new_node["name"] = node['name'].split('.')[0]
+        try:
+            new_node["pkey"] = ssh.convert_key_to_str(node["pkey"])
+        except KeyError:
+            pass
+        return new_node
 
     TOPOLOGY_REQUIRED_KEYS = frozenset({
         "vpci", "local_ip", "netmask", "local_mac", "driver"})
@@ -542,7 +550,11 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
         # we assume OrderedDict for consistenct in instantiation
         for node_name, node in context_cfg["nodes"].items():
             LOG.debug(node)
-            file_name = node["VNF model"]
+            try:
+                file_name = node["VNF model"]
+            except KeyError:
+                LOG.debug("no model for %s, skipping", node_name)
+                continue
             file_path = scenario_cfg['task_path']
             with open_relative_file(file_name, file_path) as stream:
                 vnf_model = stream.read()
@@ -588,7 +600,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
                 LOG.info("Waiting for %s to instantiate", vnf.name)
                 vnf.wait_for_instantiate()
-        except RuntimeError:
+        except:
+            LOG.exception("")
             for vnf in self.vnfs:
                 vnf.terminate()
             raise
@@ -599,7 +612,7 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
             traffic_gen.listen_traffic(self.traffic_profile)
 
         # register collector with yardstick for KPI collection.
-        self.collector = Collector(self.vnfs, self.traffic_profile)
+        self.collector = Collector(self.vnfs, self.context_cfg["nodes"], self.traffic_profile)
         self.collector.start()
 
         # Start the actual traffic
@@ -615,11 +628,11 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
         :return: None
         """
 
-        for vnf in self.vnfs:
-            # Result example:
-            # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
-            LOG.debug("collect KPI for %s", vnf.name)
-            result.update(self.collector.get_kpi(vnf))
+        # this is the only method that is check from the runner
+        # so if we have any fatal error it must be raised via these methods
+        # otherwise we will not terminate
+
+        result.update(self.collector.get_kpi())
 
     def teardown(self):
         """ Stop the collector and terminate VNF & TG instance
@@ -627,7 +640,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
         :return
         """
 
-        self.collector.stop()
-        for vnf in self.vnfs:
-            LOG.info("Stopping %s", vnf.name)
-            vnf.terminate()
+        try:
+            try:
+                self.collector.stop()
+                for vnf in self.vnfs:
+                    LOG.info("Stopping %s", vnf.name)
+                    vnf.terminate()
+                LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
+            finally:
+                terminate_children()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise RuntimeError("Error in teardown")