NSB PROX test hang fixes 37/44237/1
authorRoss Brattain <ross.b.brattain@intel.com>
Thu, 28 Sep 2017 07:10:43 +0000 (00:10 -0700)
committerRoss Brattain <ross.b.brattain@intel.com>
Wed, 4 Oct 2017 15:31:17 +0000 (08:31 -0700)
The PROX tests were hanging in the duration
runner.

These are fixes for various errors:

raise error in collect_kpi if VNF is down

move prox dpdk_rebind after collectd stop
fix dpdk nicbind rebind to group by drivers

prox: raise error in collect_kpi if the VNF is down
prox: add VNF_TYPE for consistency

sample_vnf: debug and fix kill_vnf
pkill is not matching some executable names,
add some debug process dumps and try switching
back to killall until we can find the issue

sample_vnf: add default timeout, so we can override
default 3600 SSH timeout

collect_kpi is the point at which we check
the VNFs and TGs for failures or exits

queues are the problem make sure we aren't silently blocking on
non-empty queues by canceling join thread in subprocess

fixup duration runner to close queues
and other attempt to stop duration runner
from hanging

VnfdHelper: memoize port_num

resource: fail if ssh can't connect
at the end of 3600 second test our ssh connection
is dead, so we can't actually stop collectd
unless we reconnect

fix stop() logic to ignore ssh errors

Change-Id: I6c8e682a80cb9d00362e2fef4a46df080f304e55
Signed-off-by: Ross Brattain <ross.b.brattain@intel.com>
30 files changed:
tests/unit/benchmark/scenarios/networking/test_vnf_generic.py
tests/unit/common/test_process.py [new file with mode: 0644]
tests/unit/network_services/collector/test_subscriber.py
tests/unit/network_services/helpers/test_dpdkbindnic_helper.py
tests/unit/network_services/nfvi/test_resource.py
tests/unit/network_services/vnf_generic/vnf/test_cgnapt_vnf.py
tests/unit/network_services/vnf_generic/vnf/test_prox_helpers.py
tests/unit/network_services/vnf_generic/vnf/test_prox_vnf.py
tests/unit/network_services/vnf_generic/vnf/test_sample_vnf.py
tests/unit/network_services/vnf_generic/vnf/test_tg_prox.py
tests/unit/network_services/vnf_generic/vnf/test_udp_replay.py
tests/unit/network_services/vnf_generic/vnf/test_vpe_vnf.py
yardstick/benchmark/runners/arithmetic.py
yardstick/benchmark/runners/base.py
yardstick/benchmark/runners/duration.py
yardstick/benchmark/runners/dynamictp.py
yardstick/benchmark/runners/iteration.py
yardstick/benchmark/runners/search.py
yardstick/benchmark/runners/sequence.py
yardstick/benchmark/scenarios/networking/vnf_generic.py
yardstick/common/process.py [new file with mode: 0644]
yardstick/network_services/collector/subscriber.py
yardstick/network_services/helpers/dpdkbindnic_helper.py [moved from yardstick/network_services/helpers/dpdknicbind_helper.py with 85% similarity]
yardstick/network_services/nfvi/resource.py
yardstick/network_services/vnf_generic/vnf/base.py
yardstick/network_services/vnf_generic/vnf/prox_helpers.py
yardstick/network_services/vnf_generic/vnf/prox_vnf.py
yardstick/network_services/vnf_generic/vnf/sample_vnf.py
yardstick/network_services/vnf_generic/vnf/udp_replay.py
yardstick/network_services/vnf_generic/vnf/vpe_vnf.py

index 5b15dac..016608a 100644 (file)
@@ -445,6 +445,13 @@ class TestNetworkServiceTestCase(unittest.TestCase):
         self.assertIsNotNone(
             self.s.load_vnf_models(self.scenario_cfg, self.context_cfg))
 
+    def test_load_vnf_models_no_model(self):
+        vnf = mock.Mock(autospec=GenericVNF)
+        self.s.get_vnf_impl = mock.Mock(return_value=vnf)
+
+        self.assertIsNotNone(
+            self.s.load_vnf_models(self.scenario_cfg, self.context_cfg))
+
     def test_map_topology_to_infrastructure(self):
         with mock.patch("yardstick.ssh.SSH") as ssh:
             ssh_mock = mock.Mock(autospec=ssh.SSH)
@@ -568,6 +575,35 @@ class TestNetworkServiceTestCase(unittest.TestCase):
                 mock.Mock(return_value=TRAFFIC_PROFILE)
             self.assertEqual(None, self.s.setup())
 
+    def test_setup_exception(self):
+        with mock.patch("yardstick.ssh.SSH") as ssh:
+            ssh_mock = mock.Mock(autospec=ssh.SSH)
+            ssh_mock.execute = \
+                mock.Mock(return_value=(0, SYS_CLASS_NET + IP_ADDR_SHOW, ""))
+            ssh.from_node.return_value = ssh_mock
+
+            tgen = mock.Mock(autospec=GenericTrafficGen)
+            tgen.traffic_finished = True
+            verified_dict = {"verified": True}
+            tgen.verify_traffic = lambda x: verified_dict
+            tgen.terminate = mock.Mock(return_value=True)
+            tgen.name = "tgen__1"
+            vnf = mock.Mock(autospec=GenericVNF)
+            vnf.runs_traffic = False
+            vnf.instantiate.side_effect = RuntimeError("error during instantiate")
+            vnf.terminate = mock.Mock(return_value=True)
+            self.s.vnfs = [tgen, vnf]
+            self.s.traffic_profile = mock.Mock()
+            self.s.collector = mock.Mock(autospec=Collector)
+            self.s.collector.get_kpi = \
+                mock.Mock(return_value={tgen.name: verified_dict})
+            self.s.map_topology_to_infrastructure = mock.Mock(return_value=0)
+            self.s.load_vnf_models = mock.Mock(return_value=self.s.vnfs)
+            self.s._fill_traffic_profile = \
+                mock.Mock(return_value=TRAFFIC_PROFILE)
+            with self.assertRaises(RuntimeError):
+                self.s.setup()
+
     def test__get_traffic_profile(self):
         self.scenario_cfg["traffic_profile"] = \
             self._get_file_abspath("ipv4_throughput_vpe.yaml")
@@ -594,8 +630,8 @@ class TestNetworkServiceTestCase(unittest.TestCase):
 
     def test_teardown(self):
         vnf = mock.Mock(autospec=GenericVNF)
-        vnf.terminate = \
-            mock.Mock(return_value=True)
+        vnf.terminate = mock.Mock(return_value=True)
+        vnf.name = str(vnf)
         self.s.vnfs = [vnf]
         self.s.traffic_profile = mock.Mock()
         self.s.collector = mock.Mock(autospec=Collector)
@@ -603,6 +639,18 @@ class TestNetworkServiceTestCase(unittest.TestCase):
             mock.Mock(return_value=True)
         self.assertIsNone(self.s.teardown())
 
+    def test_teardown_exception(self):
+        vnf = mock.Mock(autospec=GenericVNF)
+        vnf.terminate = mock.Mock(side_effect=RuntimeError("error duing terminate"))
+        vnf.name = str(vnf)
+        self.s.vnfs = [vnf]
+        self.s.traffic_profile = mock.Mock()
+        self.s.collector = mock.Mock(autospec=Collector)
+        self.s.collector.stop = \
+            mock.Mock(return_value=True)
+        with self.assertRaises(RuntimeError):
+            self.s.teardown()
+
     SAMPLE_NETDEVS = {
         'enp11s0': {
             'address': '0a:de:ad:be:ef:f5',
diff --git a/tests/unit/common/test_process.py b/tests/unit/common/test_process.py
new file mode 100644 (file)
index 0000000..5eee55b
--- /dev/null
@@ -0,0 +1,46 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import unittest
+
+import mock
+
+from yardstick.common import process
+
+
+class ProcessTestcase(unittest.TestCase):
+    def test_check_if_procces_failed_None(self):
+        p = mock.MagicMock(**{"exitcode": None, "name": "debug"})
+        process.check_if_process_failed(p)
+
+    def test_check_if_procces_failed_0(self):
+        p = mock.MagicMock(**{"exitcode": 0, "name": "debug"})
+        process.check_if_process_failed(p)
+
+    def test_check_if_procces_failed_1(self):
+        p = mock.MagicMock(**{"exitcode": 1, "name": "debug"})
+        with self.assertRaises(RuntimeError):
+            process.check_if_process_failed(p)
+
+
+@mock.patch("yardstick.common.process.multiprocessing")
+class TerminateChildrenTestcase(unittest.TestCase):
+    def test_some_children(self, mock_multiprocessing):
+        p1 = mock.MagicMock()
+        p2 = mock.MagicMock()
+        mock_multiprocessing.active_children.return_value = [p1, p2]
+        process.terminate_children()
+
+    def test_no_children(self, mock_multiprocessing):
+        mock_multiprocessing.active_children.return_value = []
+        process.terminate_children()
index 260f0bb..f324f62 100644 (file)
@@ -45,6 +45,7 @@ class CollectorTestCase(unittest.TestCase):
     NODES = {
         'node1': {},
         'node2': {
+            'ip': '1.2.3.4',
             'collectd': {
                 'plugins': {'abc': 12, 'def': 34},
                 'interval': 987,
index dbd8396..0f1cf7d 100644 (file)
 
 import mock
 import unittest
-from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
-from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelperException
-from yardstick.network_services.helpers.dpdknicbind_helper import NETWORK_KERNEL
-from yardstick.network_services.helpers.dpdknicbind_helper import NETWORK_DPDK
-from yardstick.network_services.helpers.dpdknicbind_helper import CRYPTO_KERNEL
-from yardstick.network_services.helpers.dpdknicbind_helper import CRYPTO_DPDK
-from yardstick.network_services.helpers.dpdknicbind_helper import NETWORK_OTHER
-from yardstick.network_services.helpers.dpdknicbind_helper import CRYPTO_OTHER
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelperException
+from yardstick.network_services.helpers.dpdkbindnic_helper import NETWORK_KERNEL
+from yardstick.network_services.helpers.dpdkbindnic_helper import NETWORK_DPDK
+from yardstick.network_services.helpers.dpdkbindnic_helper import CRYPTO_KERNEL
+from yardstick.network_services.helpers.dpdkbindnic_helper import CRYPTO_DPDK
+from yardstick.network_services.helpers.dpdkbindnic_helper import NETWORK_OTHER
+from yardstick.network_services.helpers.dpdkbindnic_helper import CRYPTO_OTHER
 
 pass
 
 
-class MyTestDpdkBindHelper(unittest.TestCase):
+class TestDpdkBindHelper(unittest.TestCase):
     EXAMPLE_OUTPUT = """
 
 Network devices using DPDK-compatible driver
@@ -204,17 +204,31 @@ Other crypto devices
     def test_bind(self):
         conn = mock.Mock()
         conn.execute = mock.Mock(return_value=(0, '', ''))
-        conn.provision_tool = mock.Mock(return_value='/opt/nsb_bin/dpdk_nic_bind.py')
+        conn.provision_tool = mock.Mock(return_value='/opt/nsb_bin/dpdk-devbind.py')
 
         dpdk_bind_helper = DpdkBindHelper(conn)
         dpdk_bind_helper.read_status = mock.Mock()
 
         dpdk_bind_helper.bind(['0000:00:03.0', '0000:00:04.0'], 'my_driver')
 
-        conn.execute.assert_called_with('sudo /opt/nsb_bin/dpdk_nic_bind.py --force '
+        conn.execute.assert_called_with('sudo /opt/nsb_bin/dpdk-devbind.py --force '
                                         '-b my_driver 0000:00:03.0 0000:00:04.0')
         dpdk_bind_helper.read_status.assert_called_once()
 
+    def test_bind_single_pci(self):
+        conn = mock.Mock()
+        conn.execute = mock.Mock(return_value=(0, '', ''))
+        conn.provision_tool = mock.Mock(return_value='/opt/nsb_bin/dpdk-devbind.py')
+
+        dpdk_bind_helper = DpdkBindHelper(conn)
+        dpdk_bind_helper.read_status = mock.Mock()
+
+        dpdk_bind_helper.bind('0000:00:03.0', 'my_driver')
+
+        conn.execute.assert_called_with('sudo /opt/nsb_bin/dpdk-devbind.py --force '
+                                        '-b my_driver 0000:00:03.0')
+        dpdk_bind_helper.read_status.assert_called_once()
+
     def test_rebind_drivers(self):
         conn = mock.Mock()
 
@@ -222,14 +236,14 @@ Other crypto devices
 
         dpdk_bind_helper.bind = mock.Mock()
         dpdk_bind_helper.used_drivers = {
-            '0000:05:00.0': 'd1',
-            '0000:05:01.0': 'd3',
+            'd1': ['0000:05:00.0'],
+            'd3': ['0000:05:01.0', '0000:05:02.0'],
         }
 
         dpdk_bind_helper.rebind_drivers()
 
-        dpdk_bind_helper.bind.assert_any_call('0000:05:00.0', 'd1', True)
-        dpdk_bind_helper.bind.assert_any_call('0000:05:01.0', 'd3', True)
+        dpdk_bind_helper.bind.assert_any_call(['0000:05:00.0'], 'd1', True)
+        dpdk_bind_helper.bind.assert_any_call(['0000:05:01.0', '0000:05:02.0'], 'd3', True)
 
     def test_save_used_drivers(self):
         conn = mock.Mock()
@@ -239,9 +253,8 @@ Other crypto devices
         dpdk_bind_helper.save_used_drivers()
 
         expected = {
-            '0000:00:04.0': 'igb_uio',
-            '0000:00:05.0': 'igb_uio',
-            '0000:00:03.0': 'virtio-pci',
+            'igb_uio': ['0000:00:04.0', '0000:00:05.0'],
+            'virtio-pci': ['0000:00:03.0'],
         }
 
-        self.assertEqual(expected, dpdk_bind_helper.used_drivers)
+        self.assertDictEqual(expected, dpdk_bind_helper.used_drivers)
index 4fc6d77..799cc20 100644 (file)
@@ -14,6 +14,8 @@
 
 from __future__ import absolute_import
 import unittest
+
+import errno
 import mock
 
 from yardstick.network_services.nfvi.resource import ResourceProfile
@@ -88,7 +90,7 @@ class TestResourceProfile(unittest.TestCase):
         with mock.patch("yardstick.ssh.AutoConnectSSH") as ssh:
             self.ssh_mock = mock.Mock(autospec=ssh.SSH)
             self.ssh_mock.execute = \
-                mock.Mock(return_value=(0, {}, ""))
+                mock.Mock(return_value=(0, "", ""))
             ssh.from_node.return_value = self.ssh_mock
 
             mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface']
@@ -105,7 +107,12 @@ class TestResourceProfile(unittest.TestCase):
 
     def test_check_if_sa_running(self):
         self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
-                         (True, {}))
+                         (0, ""))
+
+    def test_check_if_sa_running_excetion(self):
+        with mock.patch.object(self.resource_profile.connection, "execute") as mock_execute:
+            mock_execute.side_effect = OSError(errno.ECONNRESET, "error")
+            self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), (1, None))
 
     def test_get_cpu_data(self):
         reskey = ["", "cpufreq", "cpufreq-0"]
@@ -127,7 +134,6 @@ class TestResourceProfile(unittest.TestCase):
             self.assertIsNone(
                 self.resource_profile._prepare_collectd_conf("/opt/nsb_bin"))
 
-
     @mock.patch("yardstick.network_services.nfvi.resource.open")
     @mock.patch("yardstick.network_services.nfvi.resource.os")
     def test__provide_config_file(self, mock_open, mock_os):
@@ -141,13 +147,18 @@ class TestResourceProfile(unittest.TestCase):
         self.resource_profile._provide_config_file("/opt/nsb_bin", "collectd.conf", kwargs)
         self.ssh_mock.execute.assert_called_once()
 
-
     @mock.patch("yardstick.network_services.nfvi.resource.open")
     def test_initiate_systemagent(self, mock_open):
         self.resource_profile._start_collectd = mock.Mock()
         self.assertIsNone(
             self.resource_profile.initiate_systemagent("/opt/nsb_bin"))
 
+    @mock.patch("yardstick.network_services.nfvi.resource.open")
+    def test_initiate_systemagent_raise(self, mock_open):
+        self.resource_profile._start_collectd = mock.Mock(side_effect=RuntimeError)
+        with self.assertRaises(RuntimeError):
+            self.resource_profile.initiate_systemagent("/opt/nsb_bin")
+
     def test__parse_hugepages(self):
         reskey = ["cpu", "cpuFreq"]
         value = "timestamp:12345"
@@ -173,7 +184,7 @@ class TestResourceProfile(unittest.TestCase):
         self.assertEqual({'ovs/stats': '45'}, res)
 
     def test_parse_collectd_result(self):
-        res = self.resource_profile.parse_collectd_result({}, [0, 1, 2])
+        res = self.resource_profile.parse_collectd_result({})
         expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {},
                            'memory': {}, 'ovs_stats': {}, 'timestamp': '',
                            'intel_pmu': {},
@@ -186,7 +197,7 @@ class TestResourceProfile(unittest.TestCase):
                                                                      "ipc",
                                                                      "1234",
                                                                      ""])
-        res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+        res = self.resource_profile.parse_collectd_result(metric)
         expected_result = {'cpu': {1: {'ipc': '1234'}}, 'dpdkstat': {}, 'hugepages': {},
                            'memory': {}, 'ovs_stats': {}, 'timestamp': '',
                            'intel_pmu': {},
@@ -195,7 +206,7 @@ class TestResourceProfile(unittest.TestCase):
 
     def test_parse_collectd_result_memory(self):
         metric = {"nsb_stats/memory/bw": "101"}
-        res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+        res = self.resource_profile.parse_collectd_result(metric)
         expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {},
                            'memory': {'bw': '101'}, 'ovs_stats': {}, 'timestamp': '',
                            'intel_pmu': {},
@@ -205,9 +216,8 @@ class TestResourceProfile(unittest.TestCase):
     def test_parse_collectd_result_hugepage(self):
         # amqp returns bytes
         metric = {b"nsb_stats/hugepages/free": b"101"}
-        self.resource_profile.parse_hugepages = \
-        mock.Mock(return_value={"free": "101"})
-        res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+        self.resource_profile.parse_hugepages = mock.Mock(return_value={"free": "101"})
+        res = self.resource_profile.parse_collectd_result(metric)
         expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {'free': '101'},
                            'memory': {}, 'ovs_stats': {}, 'timestamp': '',
                            'intel_pmu': {},
@@ -224,7 +234,7 @@ class TestResourceProfile(unittest.TestCase):
             mock.Mock(return_value={"memory": "101"})
         self.resource_profile.parse_ovs_stats = \
             mock.Mock(return_value={"tx": "101"})
-        res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+        res = self.resource_profile.parse_collectd_result(metric)
         expected_result = {'cpu': {}, 'dpdkstat': {'tx': '101'}, 'hugepages': {},
                            'memory': {}, 'ovs_stats': {'tx': '101'}, 'timestamp': '',
                            'intel_pmu': {},
@@ -258,5 +268,9 @@ class TestResourceProfile(unittest.TestCase):
     def test_stop(self):
         self.assertIsNone(self.resource_profile.stop())
 
+    def test_stop(self):
+        self.resource_profile.amqp_client = mock.MagicMock()
+        self.assertIsNone(self.resource_profile.stop())
+
 if __name__ == '__main__':
     unittest.main()
index 0a4c124..b0ef1da 100644 (file)
@@ -41,7 +41,7 @@ TEST_FILE_YAML = 'nsb_test_case.yaml'
 SSH_HELPER = 'yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper'
 
 
-name = 'vnf__1'
+name = 'vnf__0'
 
 
 class TestCgnaptApproxSetupEnvHelper(unittest.TestCase):
@@ -162,7 +162,7 @@ class TestCgnaptApproxVnf(unittest.TestCase):
             'rfc2544': {
                 'allowed_drop_rate': '0.8 - 1',
             },
-            'vnf__1': {
+            'vnf__0': {
                 'napt': 'dynamic',
                 'vnf_config': {
                     'lb_config': 'SW',
@@ -172,6 +172,10 @@ class TestCgnaptApproxVnf(unittest.TestCase):
                     'worker_threads': 1,
                 },
             },
+            'flow': {'count': 1,
+                     'dst_ip': [{'tg__1': 'xe0'}],
+                     'public_ip': [''],
+                     'src_ip': [{'tg__0': 'xe0'}]},
         },
         'task_id': 'a70bdf4a-8e67-47a3-9dc1-273c14506eb7',
         'task_path': '/tmp',
@@ -185,15 +189,11 @@ class TestCgnaptApproxVnf(unittest.TestCase):
             'type': 'Duration',
         },
         'traffic_profile': 'ipv4_throughput_acl.yaml',
-        'traffic_options': {
-            'flow': 'ipv4_Packets_acl.yaml',
-            'imix': 'imix_voice.yaml',
-        },
-        'type': 'ISB',
+        'type': 'NSPerf',
         'nodes': {
-            'tg__2': 'trafficgen_2.yardstick',
             'tg__1': 'trafficgen_1.yardstick',
-            'vnf__1': 'vnf.yardstick',
+            'tg__0': 'trafficgen_0.yardstick',
+            'vnf__0': 'vnf.yardstick',
         },
         'topology': 'vpe-tg-topology-baremetal.yaml',
     }
@@ -253,9 +253,9 @@ class TestCgnaptApproxVnf(unittest.TestCase):
                               'password': 'r00t',
                               'VNF model': 'tg_rfc2544_tpl.yaml',
                               'user': 'root'},
-                             'vnf__1':
+                             'vnf__0':
                              {'name': 'vnf.yardstick',
-                              'vnfd-id-ref': 'vnf__1',
+                              'vnfd-id-ref': 'vnf__0',
                               'ip': '1.2.1.1',
                               'interfaces':
                               {'xe0': {'local_iface_name': 'ens786f0',
@@ -318,6 +318,8 @@ class TestCgnaptApproxVnf(unittest.TestCase):
 
         vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
         cgnapt_approx_vnf = CgnaptApproxVnf(name, vnfd)
+        cgnapt_approx_vnf._vnf_process = mock.MagicMock(
+            **{"is_alive.return_value": True, "exitcode": None})
         cgnapt_approx_vnf.q_in = mock.MagicMock()
         cgnapt_approx_vnf.q_out = mock.MagicMock()
         cgnapt_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
@@ -388,7 +390,7 @@ class TestCgnaptApproxVnf(unittest.TestCase):
                                                     'rules': ""}}
         cgnapt_approx_vnf.q_out.put("pipeline>")
         cgnapt_vnf.WAIT_TIME = 3
-        self.scenario_cfg.update({"nodes": {"vnf__1": ""}})
+        self.scenario_cfg.update({"nodes": {"vnf__0": ""}})
         self.assertIsNone(cgnapt_approx_vnf.instantiate(self.scenario_cfg,
                                                         self.context_cfg))
 
index e4319d6..54540ad 100644 (file)
 
 from __future__ import absolute_import
 
-import copy
 import os
 import socket
 import unittest
 from itertools import repeat, chain
-from contextlib import contextmanager
 import mock
 
 from tests.unit import STL_MOCKS
@@ -1501,7 +1499,9 @@ class TestProxResourceHelper(unittest.TestCase):
         helper = ProxResourceHelper(mock.MagicMock())
         helper.resource = resource = mock.MagicMock()
 
+        resource.check_if_sa_running.return_value = 0, '1234'
         resource.amqp_collect_nfvi_kpi.return_value = 543
+        resource.check_if_sa_running.return_value = (0, None)
 
         expected = {'core': 543}
         result = helper.collect_collectd_kpi()
@@ -1513,7 +1513,9 @@ class TestProxResourceHelper(unittest.TestCase):
         helper._result = {'z': 123}
         helper.resource = resource = mock.MagicMock()
 
+        resource.check_if_sa_running.return_value = 0, '1234'
         resource.amqp_collect_nfvi_kpi.return_value = 543
+        resource.check_if_sa_running.return_value = (0, None)
 
         queue.empty.return_value = False
         queue.get.return_value = {'a': 789}
@@ -1800,7 +1802,7 @@ class TestProxProfileHelper(unittest.TestCase):
 
     def test_latency_cores(self):
         resource_helper = mock.MagicMock()
-        resource_helper.setup_helper.prox_config_data= []
+        resource_helper.setup_helper.prox_config_data = []
 
         helper = ProxProfileHelper(resource_helper)
         helper._cpu_topology = []
@@ -1977,7 +1979,6 @@ class TestProxProfileHelper(unittest.TestCase):
             ]),
         ]
 
-
         client = mock.MagicMock()
         client.hz.return_value = 2
         client.port_stats.return_value = tuple(range(12))
index c88b152..09060ff 100644 (file)
@@ -85,7 +85,7 @@ class TestProxApproxVnf(unittest.TestCase):
                             'vpci': '0000:05:00.0',
                             'local_ip': '152.16.100.19',
                             'type': 'PCI-PASSTHROUGH',
-                            'vld_id': '',
+                            'vld_id': 'downlink_0',
                             'ifname': 'xe1',
                             'netmask': '255.255.255.0',
                             'dpdk_port_num': 0,
@@ -104,8 +104,8 @@ class TestProxApproxVnf(unittest.TestCase):
                             'vpci': '0000:05:00.1',
                             'local_ip': '152.16.40.19',
                             'type': 'PCI-PASSTHROUGH',
-                            'vld_id': '',
-                            'ifname': 'xe3',
+                            'vld_id': 'uplink_0',
+                            'ifname': 'xe1',
                             'driver': "i40e",
                             'netmask': '255.255.255.0',
                             'dpdk_port_num': 1,
@@ -365,6 +365,7 @@ class TestProxApproxVnf(unittest.TestCase):
         prox_approx_vnf = ProxApproxVnf(NAME, deepcopy(self.VNFD0))
         prox_approx_vnf.resource_helper = resource_helper
         prox_approx_vnf.vnfd_helper['vdu'][0]['external-interface'] = []
+        prox_approx_vnf.vnfd_helper.port_pairs.interfaces = []
 
         with self.assertRaises(RuntimeError):
             prox_approx_vnf.collect_kpi()
index 77c92e0..4d53f09 100644 (file)
@@ -833,13 +833,13 @@ class TestDpdkVnfSetupEnvHelper(unittest.TestCase):
         dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper)
         dpdk_setup_helper.dpdk_bind_helper.bind = mock.Mock()
         dpdk_setup_helper.dpdk_bind_helper.used_drivers = {
-            '0000:05:00.0': 'd1',
-            '0000:05:01.0': 'd3',
+            'd1': ['0000:05:00.0'],
+            'd3': ['0000:05:01.0'],
         }
 
         self.assertIsNone(dpdk_setup_helper.tear_down())
-        dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call('0000:05:00.0', 'd1', True)
-        dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call('0000:05:01.0', 'd3', True)
+        dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call(['0000:05:00.0'], 'd1', True)
+        dpdk_setup_helper.dpdk_bind_helper.bind.assert_any_call(['0000:05:01.0'], 'd3', True)
 
 
 class TestResourceHelper(unittest.TestCase):
@@ -2130,6 +2130,7 @@ class TestSampleVNFTrafficGen(unittest.TestCase):
     def test_terminate(self):
         sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
         sample_vnf_tg._traffic_process = mock.Mock()
+        sample_vnf_tg._tg_process = mock.Mock()
 
         sample_vnf_tg.terminate()
 
index bdda149..23d448c 100644 (file)
@@ -426,6 +426,8 @@ class TestProxTrafficGen(unittest.TestCase):
         prox_traffic_gen._traffic_process.terminate = mock.Mock()
         prox_traffic_gen.ssh_helper = mock.MagicMock()
         prox_traffic_gen.setup_helper = mock.MagicMock()
-        prox_traffic_gen._vnf_wrapper.setup_helper = mock.MagicMock()
         prox_traffic_gen.resource_helper = mock.MagicMock()
+        prox_traffic_gen._vnf_wrapper.setup_helper = mock.MagicMock()
+        prox_traffic_gen._vnf_wrapper._vnf_process = mock.MagicMock()
+        prox_traffic_gen._vnf_wrapper.resource_helper = mock.MagicMock()
         self.assertEqual(None, prox_traffic_gen.terminate())
index b75ed67..39b37c6 100644 (file)
@@ -426,6 +426,7 @@ class TestUdpReplayApproxVnf(unittest.TestCase):
         udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
         udp_replay_approx_vnf._build_config = mock.MagicMock()
         udp_replay_approx_vnf.queue_wrapper = mock.MagicMock()
+        udp_replay_approx_vnf.scenario_helper = mock.MagicMock()
 
         udp_replay_approx_vnf._run()
 
index 3813aaa..4103d78 100644 (file)
@@ -565,8 +565,9 @@ class TestVpeApproxVnf(unittest.TestCase):
         mock_ssh(ssh)
 
         resource = mock.Mock(autospec=ResourceProfile)
-        resource.check_if_sa_running.return_value = False, 'error'
+        resource.check_if_sa_running.return_value = 1, ''
         resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234}
+        resource.check_if_sa_running.return_value = (1, None)
 
         vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
         vpe_approx_vnf.q_in = mock.MagicMock()
@@ -588,7 +589,7 @@ class TestVpeApproxVnf(unittest.TestCase):
         mock_ssh(ssh)
 
         resource = mock.Mock(autospec=ResourceProfile)
-        resource.check_if_sa_running.return_value = True, 'good'
+        resource.check_if_sa_running.return_value = 0, '1234'
         resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234}
 
         vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
index 3ff064a..6aaaed8 100755 (executable)
@@ -191,7 +191,9 @@ class ArithmeticRunner(base.Runner):
     __execution_type__ = 'Arithmetic'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted, self.output_queue))
index 3ecf677..13718d7 100755 (executable)
 # rally/rally/benchmark/runners/base.py
 
 from __future__ import absolute_import
-import importlib
+
 import logging
 import multiprocessing
 import subprocess
 import time
 import traceback
 
+import importlib
 
 from six.moves.queue import Empty
 
@@ -36,7 +37,6 @@ log = logging.getLogger(__name__)
 def _execute_shell_command(command):
     """execute shell script with error handling"""
     exitcode = 0
-    output = []
     try:
         output = subprocess.check_output(command, shell=True)
     except Exception:
index 2cb2600..fbf72a7 100644 (file)
@@ -138,7 +138,9 @@ If the scenario ends before the time has elapsed, it will be started again.
     __execution_type__ = 'Duration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted, self.output_queue))
index 01e76c6..63bfc82 100755 (executable)
 """A runner that searches for the max throughput with binary search
 """
 
-import os
-import multiprocessing
 import logging
-import traceback
+import multiprocessing
 import time
+import traceback
+
+import os
 
 from yardstick.benchmark.runners import base
 
@@ -65,8 +66,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
             max_throuput_found = False
             sequence = 0
 
-            last_min_data = {}
-            last_min_data['packets_per_second'] = 0
+            last_min_data = {'packets_per_second': 0}
 
             while True:
                 sequence += 1
@@ -125,7 +125,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                     queue.put(record)
                     max_throuput_found = True
 
-                if (errors) or aborted.is_set() or max_throuput_found:
+                if errors or aborted.is_set() or max_throuput_found:
                     LOG.info("worker END")
                     break
 
@@ -155,7 +155,7 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
 
 
 class IterationRunner(base.Runner):
-    '''Run a scenario to find the max throughput
+    """Run a scenario to find the max throughput
 
 If the scenario ends before the time has elapsed, it will be started again.
 
@@ -168,11 +168,13 @@ If the scenario ends before the time has elapsed, it will be started again.
         type:   int
         unit:   pps
         default: 1000 pps
-    '''
+    """
     __execution_type__ = 'Dynamictp'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted))
index 88158ee..cb04243 100644 (file)
 """
 
 from __future__ import absolute_import
-import os
-import multiprocessing
+
 import logging
-import traceback
+import multiprocessing
 import time
+import traceback
+
+import os
 
 from yardstick.benchmark.runners import base
 
@@ -88,9 +90,9 @@ def _worker_process(queue, cls, method_name, scenario_cfg,
                     scenario_cfg['options']['rate'] -= delta
                     sequence = 1
                     continue
-            except Exception as e:
+            except Exception:
                 errors = traceback.format_exc()
-                LOG.exception(e)
+                LOG.exception("")
             else:
                 if result:
                     # add timeout for put so we don't block test
@@ -151,7 +153,9 @@ If the scenario ends before the time has elapsed, it will be started again.
     __execution_type__ = 'Iteration'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted, self.output_queue))
index 5948763..8037329 100644 (file)
 """
 
 from __future__ import absolute_import
-import os
-import multiprocessing
+
 import logging
-import traceback
+import multiprocessing
 import time
-
-from collections import Mapping
+import traceback
 from contextlib import contextmanager
 from itertools import takewhile
+
+import os
+from collections import Mapping
 from six.moves import zip
 
 from yardstick.benchmark.runners import base
@@ -173,7 +174,9 @@ If the scenario ends before the time has elapsed, it will be started again.
                     break
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=self._worker_run,
             args=(cls, method, scenario_cfg, context_cfg))
         self.process.start()
index f08ca5d..d6e3f71 100644 (file)
@@ -21,11 +21,13 @@ The input value in the sequence is specified in a list in the input file.
 """
 
 from __future__ import absolute_import
-import os
-import multiprocessing
+
 import logging
-import traceback
+import multiprocessing
 import time
+import traceback
+
+import os
 
 from yardstick.benchmark.runners import base
 
@@ -140,7 +142,9 @@ class SequenceRunner(base.Runner):
     __execution_type__ = 'Sequence'
 
     def _run_benchmark(self, cls, method, scenario_cfg, context_cfg):
+        name = "{}-{}-{}".format(self.__execution_type__, scenario_cfg.get("type"), os.getpid())
         self.process = multiprocessing.Process(
+            name=name,
             target=_worker_process,
             args=(self.result_queue, cls, method, scenario_cfg,
                   context_cfg, self.aborted, self.output_queue))
index d9cc0ea..d851252 100644 (file)
@@ -30,6 +30,7 @@ from collections import defaultdict
 
 from yardstick.benchmark.scenarios import base
 from yardstick.common.constants import LOG_DIR
+from yardstick.common.process import terminate_children
 from yardstick.common.utils import import_modules_from_package, itersubclasses
 from yardstick.common.yaml_loader import yaml_load
 from yardstick.network_services.collector.subscriber import Collector
@@ -596,7 +597,8 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
                 vnf.instantiate(self.scenario_cfg, self.context_cfg)
                 LOG.info("Waiting for %s to instantiate", vnf.name)
                 vnf.wait_for_instantiate()
-        except RuntimeError:
+        except:
+            LOG.exception("")
             for vnf in self.vnfs:
                 vnf.terminate()
             raise
@@ -635,7 +637,19 @@ printf "%s/driver:" $1 ; basename $(readlink -s $1/device/driver); } \
         :return
         """
 
-        self.collector.stop()
-        for vnf in self.vnfs:
-            LOG.info("Stopping %s", vnf.name)
-            vnf.terminate()
+        try:
+            try:
+                self.collector.stop()
+                for vnf in self.vnfs:
+                    LOG.info("Stopping %s", vnf.name)
+                    vnf.terminate()
+                LOG.debug("all VNFs terminated: %s", ", ".join(vnf.name for vnf in self.vnfs))
+            finally:
+                terminate_children()
+        except Exception:
+            # catch any exception in teardown and convert to simple exception
+            # never pass exceptions back to multiprocessing, because some exceptions can
+            # be unpicklable
+            # https://bugs.python.org/issue9400
+            LOG.exception("")
+            raise RuntimeError("Error in teardown")
diff --git a/yardstick/common/process.py b/yardstick/common/process.py
new file mode 100644 (file)
index 0000000..812ddea
--- /dev/null
@@ -0,0 +1,47 @@
+# Copyright (c) 2017 Intel Corporation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+import multiprocessing
+
+import os
+
+LOG = logging.getLogger(__name__)
+
+
+def check_if_process_failed(proc, timeout=1):
+    if proc is not None:
+        proc.join(timeout)
+        # Only abort if the process aborted
+        if proc.exitcode is not None and proc.exitcode > 0:
+            raise RuntimeError("{} exited with status {}".format(proc.name, proc.exitcode))
+
+
+def terminate_children(timeout=3):
+    current_proccess = multiprocessing.current_process()
+    active_children = multiprocessing.active_children()
+    if not active_children:
+        LOG.debug("no children to terminate")
+        return
+    for child in active_children:
+        LOG.debug("%s %s %s, child: %s %s", current_proccess.name, current_proccess.pid,
+                  os.getpid(), child, child.pid)
+        LOG.debug("joining %s", child)
+        child.join(timeout)
+        child.terminate()
+    active_children = multiprocessing.active_children()
+    if not active_children:
+        LOG.debug("no children to terminate")
+    for child in active_children:
+        LOG.debug("%s %s %s, after terminate child: %s %s", current_proccess.name,
+                  current_proccess.pid, os.getpid(), child, child.pid)
index db52e0b..d560e1d 100644 (file)
@@ -74,11 +74,12 @@ class Collector(object):
             # Result example:
             # {"VNF1: { "tput" : [1000, 999] }, "VNF2": { "latency": 100 }}
             LOG.debug("collect KPI for %s", node_name)
-            if not resource.check_if_sa_running("collectd")[0]:
+            if resource.check_if_sa_running("collectd")[0] != 0:
                 continue
 
             try:
                 results[node_name] = {"core": resource.amqp_collect_nfvi_kpi()}
+                LOG.debug("%s collect KPIs %s", node_name, results[node_name]['core'])
             except Exception:
                 LOG.exception("")
         return results
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
+
 import re
 import itertools
 
+import six
+
 NETWORK_KERNEL = 'network_kernel'
 NETWORK_DPDK = 'network_dpdk'
 NETWORK_OTHER = 'network_other'
@@ -22,6 +26,9 @@ CRYPTO_DPDK = 'crypto_dpdk'
 CRYPTO_OTHER = 'crypto_other'
 
 
+LOG = logging.getLogger(__name__)
+
+
 class DpdkBindHelperException(Exception):
     pass
 
@@ -123,23 +130,31 @@ class DpdkBindHelper(object):
     @property
     def interface_driver_map(self):
         return {interface['vpci']: interface['driver']
-                for interface in itertools.chain(*self.dpdk_status.values())}
+                for interface in itertools.chain.from_iterable(self.dpdk_status.values())}
 
     def read_status(self):
         return self.parse_dpdk_status_output(self._dpdk_execute(self._status_cmd)[1])
 
-    def bind(self, pci, driver, force=True):
+    def bind(self, pci_addresses, driver, force=True):
+        # accept single PCI or list of PCI
+        if isinstance(pci_addresses, six.string_types):
+            pci_addresses = [pci_addresses]
         cmd = self.DPDK_BIND_CMD.format(dpdk_nic_bind=self._dpdk_nic_bind,
                                         driver=driver,
-                                        vpci=' '.join(list(pci)),
+                                        vpci=' '.join(list(pci_addresses)),
                                         force='--force' if force else '')
+        LOG.debug(cmd)
         self._dpdk_execute(cmd)
         # update the inner status dict
         self.read_status()
 
     def save_used_drivers(self):
-        self.used_drivers = self.interface_driver_map
+        # invert the map, so we can bind by driver type
+        self.used_drivers = {}
+        # sort for stabililty
+        for vpci, driver in sorted(self.interface_driver_map.items()):
+            self.used_drivers.setdefault(driver, []).append(vpci)
 
     def rebind_drivers(self, force=True):
-        for vpci, driver in self.used_drivers.items():
-            self.bind(vpci, driver, force)
+        for driver, vpcis in self.used_drivers.items():
+            self.bind(vpcis, driver, force)
index 7e8334c..e3d0e3b 100644 (file)
@@ -19,6 +19,7 @@ from __future__ import print_function
 import logging
 from itertools import chain
 
+import errno
 import jinja2
 import os
 import os.path
@@ -72,7 +73,6 @@ class ResourceProfile(object):
             self.timeout = timeout
 
         self.enable = True
-        self.cores = validate_non_string_sequence(cores, default=[])
         self._queue = multiprocessing.Queue()
         self.amqp_client = None
         self.port_names = validate_non_string_sequence(port_names, default=[])
@@ -83,8 +83,16 @@ class ResourceProfile(object):
 
     def check_if_sa_running(self, process):
         """ verify if system agent is running """
-        status, pid, _ = self.connection.execute("pgrep -f %s" % process)
-        return status == 0, pid
+        try:
+            err, pid, _ = self.connection.execute("pgrep -f %s" % process)
+            # strip whitespace
+            return err, pid.strip()
+        except OSError as e:
+            if e.errno in {errno.ECONNRESET}:
+                # if we can't connect to check, then we won't be able to connect to stop it
+                LOG.exception("can't connect to host to check collectd status")
+                return 1, None
+            raise
 
     def run_collectd_amqp(self):
         """ run amqp consumer to collect the NFVi data """
@@ -137,7 +145,7 @@ class ResourceProfile(object):
     def parse_intel_pmu_stats(cls, key, value):
         return {''.join(str(v) for v in key): value.split(":")[1]}
 
-    def parse_collectd_result(self, metrics, core_list):
+    def parse_collectd_result(self, metrics):
         """ convert collectd data into json"""
         result = {
             "cpu": {},
@@ -161,8 +169,7 @@ class ResourceProfile(object):
             if "cpu" in res_key0 or "intel_rdt" in res_key0:
                 cpu_key, name, metric, testcase = \
                     self.get_cpu_data(res_key0, res_key1, value)
-                if cpu_key in core_list:
-                    result["cpu"].setdefault(cpu_key, {}).update({name: metric})
+                result["cpu"].setdefault(cpu_key, {}).update({name: metric})
 
             elif "memory" in res_key0:
                 result["memory"].update({res_key1: value.split(":")[0]})
@@ -189,8 +196,9 @@ class ResourceProfile(object):
     def amqp_process_for_nfvi_kpi(self):
         """ amqp collect and return nfvi kpis """
         if self.amqp_client is None and self.enable:
-            self.amqp_client = \
-                multiprocessing.Process(target=self.run_collectd_amqp)
+            self.amqp_client = multiprocessing.Process(
+                name="AmqpClient-{}-{}".format(self.mgmt['ip'], os.getpid()),
+                target=self.run_collectd_amqp)
             self.amqp_client.start()
 
     def amqp_collect_nfvi_kpi(self):
@@ -201,7 +209,7 @@ class ResourceProfile(object):
         metric = {}
         while not self._queue.empty():
             metric.update(self._queue.get())
-        msg = self.parse_collectd_result(metric, self.cores)
+        msg = self.parse_collectd_result(metric)
         return msg
 
     def _provide_config_file(self, config_file_path, nfvi_cfg, template_kwargs):
@@ -265,8 +273,8 @@ class ResourceProfile(object):
         connection.execute("sudo rabbitmqctl authenticate_user admin admin")
         connection.execute("sudo rabbitmqctl set_permissions -p / admin '.*' '.*' '.*'")
 
-        LOG.debug("Start collectd service.....")
-        connection.execute("sudo %s" % collectd_path)
+        LOG.debug("Start collectd service..... %s second timeout", self.timeout)
+        connection.execute("sudo %s" % collectd_path, timeout=self.timeout)
         LOG.debug("Done")
 
     def initiate_systemagent(self, bin_path):
@@ -292,13 +300,18 @@ class ResourceProfile(object):
         LOG.debug("Stop resource monitor...")
 
         if self.amqp_client is not None:
+            # we proper and try to join first
+            self.amqp_client.join(3)
             self.amqp_client.terminate()
 
+        LOG.debug("Check if %s is running", agent)
         status, pid = self.check_if_sa_running(agent)
-        if status == 0:
+        LOG.debug("status %s  pid %s", status, pid)
+        if status != 0:
             return
 
-        self.connection.execute('sudo kill -9 %s' % pid)
-        self.connection.execute('sudo pkill -9 %s' % agent)
+        if pid:
+            self.connection.execute('sudo kill -9 "%s"' % pid)
+        self.connection.execute('sudo pkill -9 "%s"' % agent)
         self.connection.execute('sudo service rabbitmq-server stop')
         self.connection.execute("sudo rabbitmqctl stop_app")
index 67634a7..7391633 100644 (file)
@@ -64,6 +64,8 @@ class VnfdHelper(dict):
     def __init__(self, *args, **kwargs):
         super(VnfdHelper, self).__init__(*args, **kwargs)
         self.port_pairs = PortPairs(self['vdu'][0]['external-interface'])
+        # port num is not present until binding so we have to memoize
+        self._port_num_map = {}
 
     @property
     def mgmt_interface(self):
@@ -91,12 +93,14 @@ class VnfdHelper(dict):
             virtual_intf = interface["virtual-interface"]
             if virtual_intf[key] == value:
                 return interface
+        raise KeyError()
 
     def find_interface(self, **kwargs):
         key, value = next(iter(kwargs.items()))
         for interface in self.interfaces:
             if interface[key] == value:
                 return interface
+        raise KeyError()
 
     # hide dpdk_port_num key so we can abstract
     def find_interface_by_port(self, port):
@@ -105,6 +109,7 @@ class VnfdHelper(dict):
             # we have to convert to int to compare
             if int(virtual_intf['dpdk_port_num']) == port:
                 return interface
+        raise KeyError()
 
     def port_num(self, port):
         # we need interface name -> DPDK port num (PMD ID) -> LINK ID
@@ -118,7 +123,8 @@ class VnfdHelper(dict):
             intf = port
         else:
             intf = self.find_interface(name=port)
-        return int(intf["virtual-interface"]["dpdk_port_num"])
+        return self._port_num_map.setdefault(intf["name"],
+                                             int(intf["virtual-interface"]["dpdk_port_num"]))
 
     def port_nums(self, intfs):
         return [self.port_num(i) for i in intfs]
index ba4d44c..d9fe9a9 100644 (file)
@@ -364,6 +364,7 @@ class ProxSocketHelper(object):
         """ send data to the remote instance """
         LOG.debug("Sending data to socket: [%s]", to_send.rstrip('\n'))
         try:
+            # TODO: sendall will block, we need a timeout
             self._sock.sendall(to_send.encode('utf-8'))
         except:
             pass
@@ -593,6 +594,8 @@ _LOCAL_OBJECT = object()
 class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
     # the actual app is lowercase
     APP_NAME = 'prox'
+    # not used for Prox but added for consistency
+    VNF_TYPE = "PROX"
 
     LUA_PARAMETER_NAME = ""
     LUA_PARAMETER_PEER = {
@@ -609,6 +612,8 @@ class ProxDpdkVnfSetupEnvHelper(DpdkVnfSetupEnvHelper):
         self._prox_config_data = None
         self.additional_files = {}
         self.config_queue = Queue()
+        # allow_exit_without_flush
+        self.config_queue.cancel_join_thread()
         self._global_section = None
 
     @property
index 2ac6ea4..3bfca19 100644 (file)
@@ -16,9 +16,10 @@ import errno
 import logging
 
 
+from yardstick.common.process import check_if_process_failed
 from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxDpdkVnfSetupEnvHelper
 from yardstick.network_services.vnf_generic.vnf.prox_helpers import ProxResourceHelper
-from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF
+from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, PROCESS_JOIN_TIMEOUT
 
 LOG = logging.getLogger(__name__)
 
@@ -51,10 +52,18 @@ class ProxApproxVnf(SampleVNF):
         try:
             return self.resource_helper.execute(cmd, *args, **kwargs)
         except OSError as e:
-            if not ignore_errors or e.errno not in {errno.EPIPE, errno.ESHUTDOWN}:
+            if e.errno in {errno.EPIPE, errno.ESHUTDOWN, errno.ECONNRESET}:
+                if ignore_errors:
+                    LOG.debug("ignoring vnf_execute exception %s for command %s", e, cmd)
+                else:
+                    raise
+            else:
                 raise
 
     def collect_kpi(self):
+        # we can't get KPIs if the VNF is down
+        check_if_process_failed(self._vnf_process)
+
         if self.resource_helper is None:
             result = {
                 "packets_in": 0,
@@ -64,12 +73,13 @@ class ProxApproxVnf(SampleVNF):
             }
             return result
 
-        intf_count = len(self.vnfd_helper.interfaces)
-        if intf_count not in {1, 2, 4}:
+        # use all_ports so we only use ports matched in topology
+        port_count = len(self.vnfd_helper.port_pairs.all_ports)
+        if port_count not in {1, 2, 4}:
             raise RuntimeError("Failed ..Invalid no of ports .. "
                                "1, 2 or 4 ports only supported at this time")
 
-        port_stats = self.vnf_execute('port_stats', range(intf_count))
+        port_stats = self.vnf_execute('port_stats', range(port_count))
         try:
             rx_total = port_stats[6]
             tx_total = port_stats[7]
@@ -94,13 +104,20 @@ class ProxApproxVnf(SampleVNF):
         self.setup_helper.tear_down()
 
     def terminate(self):
+        # stop collectd first or we get pika errors?
+        self.resource_helper.stop_collect()
         # try to quit with socket commands
-        self.vnf_execute("stop_all")
-        self.vnf_execute("quit")
-        # hopefully quit succeeds and socket closes, so ignore force_quit socket errors
-        self.vnf_execute("force_quit", _ignore_errors=True)
-        if self._vnf_process:
-            self._vnf_process.terminate()
+        # pkill is not matching, debug with pgrep
+        self.ssh_helper.execute("sudo pgrep -lax  %s" % self.setup_helper.APP_NAME)
+        self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.setup_helper.APP_NAME)
+        if self._vnf_process.is_alive():
+            self.vnf_execute("stop_all")
+            self.vnf_execute("quit")
+            # hopefully quit succeeds and socket closes, so ignore force_quit socket errors
+            self.vnf_execute("force_quit", _ignore_errors=True)
         self.setup_helper.kill_vnf()
         self._tear_down()
-        self.resource_helper.stop_collect()
+        if self._vnf_process is not None:
+            LOG.debug("joining before terminate %s", self._vnf_process.name)
+            self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+            self._vnf_process.terminate()
index bbaae39..114153d 100644 (file)
@@ -29,10 +29,11 @@ from six.moves import cStringIO
 
 from yardstick.benchmark.contexts.base import Context
 from yardstick.benchmark.scenarios.networking.vnf_generic import find_relative_file
+from yardstick.common.process import check_if_process_failed
 from yardstick.network_services.helpers.cpu import CpuSysCores
 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
 from yardstick.network_services.helpers.samplevnf_helper import MultiPortConfig
-from yardstick.network_services.helpers.dpdknicbind_helper import DpdkBindHelper
+from yardstick.network_services.helpers.dpdkbindnic_helper import DpdkBindHelper
 from yardstick.network_services.nfvi.resource import ResourceProfile
 from yardstick.network_services.vnf_generic.vnf.base import GenericVNF
 from yardstick.network_services.vnf_generic.vnf.base import QueueFileWrapper
@@ -51,6 +52,8 @@ LOG = logging.getLogger(__name__)
 
 
 REMOTE_TMP = "/tmp"
+DEFAULT_VNF_TIMEOUT = 3600
+PROCESS_JOIN_TIMEOUT = 3
 
 
 class VnfSshHelper(AutoConnectSSH):
@@ -290,8 +293,12 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         return resource
 
     def kill_vnf(self):
+        # pkill is not matching, debug with pgrep
+        self.ssh_helper.execute("sudo pgrep -lax  %s" % self.APP_NAME)
+        self.ssh_helper.execute("sudo ps aux | grep -i %s" % self.APP_NAME)
         # have to use exact match
-        self.ssh_helper.execute("sudo pkill -x %s" % self.APP_NAME)
+        # try using killall to match
+        self.ssh_helper.execute("sudo killall %s" % self.APP_NAME)
 
     def _setup_dpdk(self):
         """ setup dpdk environment needed for vnf to run """
@@ -330,8 +337,10 @@ class DpdkVnfSetupEnvHelper(SetupEnvHelper):
         port_names = (intf["name"] for intf in ports)
         collectd_options = self.get_collectd_options()
         plugins = collectd_options.get("plugins", {})
+        # we must set timeout to be the same as the VNF otherwise KPIs will die before VNF
         return ResourceProfile(self.vnfd_helper.mgmt_interface, port_names=port_names, cores=cores,
-                               plugins=plugins, interval=collectd_options.get("interval"))
+                               plugins=plugins, interval=collectd_options.get("interval"),
+                               timeout=self.scenario_helper.timeout)
 
     def _detect_and_bind_drivers(self):
         interfaces = self.vnfd_helper.interfaces
@@ -386,7 +395,7 @@ class ResourceHelper(object):
     def _collect_resource_kpi(self):
         result = {}
         status = self.resource.check_if_sa_running("collectd")[0]
-        if status:
+        if status == 0:
             result = self.resource.amqp_collect_nfvi_kpi()
 
         result = {"core": result}
@@ -672,12 +681,18 @@ class ScenarioHelper(object):
     def topology(self):
         return self.scenario_cfg['topology']
 
+    @property
+    def timeout(self):
+        return self.options.get('timeout', DEFAULT_VNF_TIMEOUT)
+
 
 class SampleVNF(GenericVNF):
     """ Class providing file-like API for generic VNF implementation """
 
     VNF_PROMPT = "pipeline>"
     WAIT_TIME = 1
+    APP_NAME = "SampleVNF"
+    # we run the VNF interactively, so the ssh command will timeout after this long
 
     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
         super(SampleVNF, self).__init__(name, vnfd)
@@ -760,7 +775,8 @@ class SampleVNF(GenericVNF):
 
     def _start_vnf(self):
         self.queue_wrapper = QueueFileWrapper(self.q_in, self.q_out, self.VNF_PROMPT)
-        self._vnf_process = Process(target=self._run)
+        name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
+        self._vnf_process = Process(name=name, target=self._run)
         self._vnf_process.start()
 
     def _vnf_up_post(self):
@@ -810,6 +826,7 @@ class SampleVNF(GenericVNF):
             'stdout': self.queue_wrapper,
             'keep_stdin_open': True,
             'pty': True,
+            'timeout': self.scenario_helper.timeout,
         }
 
     def _build_config(self):
@@ -842,11 +859,15 @@ class SampleVNF(GenericVNF):
 
     def terminate(self):
         self.vnf_execute("quit")
-        if self._vnf_process:
-            self._vnf_process.terminate()
         self.setup_helper.kill_vnf()
         self._tear_down()
         self.resource_helper.stop_collect()
+        if self._vnf_process is not None:
+            # be proper and join first before we kill
+            LOG.debug("joining before terminate %s", self._vnf_process.name)
+            self._vnf_process.join(PROCESS_JOIN_TIMEOUT)
+            self._vnf_process.terminate()
+        # no terminate children here because we share processes with tg
 
     def get_stats(self, *args, **kwargs):
         """
@@ -860,6 +881,8 @@ class SampleVNF(GenericVNF):
         return out
 
     def collect_kpi(self):
+        # we can't get KPIs if the VNF is down
+        check_if_process_failed(self._vnf_process)
         stats = self.get_stats()
         m = re.search(self.COLLECT_KPI, stats, re.MULTILINE)
         if m:
@@ -884,7 +907,6 @@ class SampleVNFTrafficGen(GenericTrafficGen):
     def __init__(self, name, vnfd, setup_env_helper_type=None, resource_helper_type=None):
         super(SampleVNFTrafficGen, self).__init__(name, vnfd)
         self.bin_path = get_nsb_option('bin_path', '')
-        self.name = "tgen__1"  # name in topology file
 
         self.scenario_helper = ScenarioHelper(self.name)
         self.ssh_helper = VnfSshHelper(self.vnfd_helper.mgmt_interface, self.bin_path, wait=True)
@@ -916,7 +938,8 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         self.resource_helper.setup()
 
         LOG.info("Starting %s server...", self.APP_NAME)
-        self._tg_process = Process(target=self._start_server)
+        name = "{}-{}-{}".format(self.name, self.APP_NAME, os.getpid())
+        self._tg_process = Process(name=name, target=self._start_server)
         self._tg_process.start()
 
     def wait_for_instantiate(self):
@@ -952,7 +975,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :param traffic_profile:
         :return: True/False
         """
-        self._traffic_process = Process(target=self._traffic_runner,
+        name = "{}-{}-{}-{}".format(self.name, self.APP_NAME, traffic_profile.__class__.__name__,
+                                    os.getpid())
+        self._traffic_process = Process(name=name, target=self._traffic_runner,
                                         args=(traffic_profile,))
         self._traffic_process.start()
         # Wait for traffic process to start
@@ -983,6 +1008,9 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         pass
 
     def collect_kpi(self):
+        # check if the tg processes have exited
+        for proc in (self._tg_process, self._traffic_process):
+            check_if_process_failed(proc)
         result = self.resource_helper.collect_kpi()
         LOG.debug("%s collect KPIs %s", self.APP_NAME, result)
         return result
@@ -993,5 +1021,15 @@ class SampleVNFTrafficGen(GenericTrafficGen):
         :return: True/False
         """
         self.traffic_finished = True
+        # we must kill client before we kill the server, or the client will raise exception
         if self._traffic_process is not None:
+            # be proper and try to join before terminating
+            LOG.debug("joining before terminate %s", self._traffic_process.name)
+            self._traffic_process.join(PROCESS_JOIN_TIMEOUT)
             self._traffic_process.terminate()
+        if self._tg_process is not None:
+            # be proper and try to join before terminating
+            LOG.debug("joining before terminate %s", self._tg_process.name)
+            self._tg_process.join(PROCESS_JOIN_TIMEOUT)
+            self._tg_process.terminate()
+        # no terminate children here because we share processes with vnf
index 6b77797..a57f53b 100644 (file)
@@ -15,6 +15,7 @@
 from __future__ import absolute_import
 import logging
 
+from yardstick.common.process import check_if_process_failed
 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF
 from yardstick.network_services.vnf_generic.vnf.sample_vnf import DpdkVnfSetupEnvHelper
 from yardstick.network_services.vnf_generic.vnf.sample_vnf import ClientResourceHelper
@@ -107,6 +108,8 @@ class UdpReplayApproxVnf(SampleVNF):
     def collect_kpi(self):
         def get_sum(offset):
             return sum(int(i) for i in split_stats[offset::5])
+        # we can't get KPIs if the VNF is down
+        check_if_process_failed(self._vnf_process)
 
         number_of_ports = len(self.vnfd_helper.port_pairs.all_ports)
 
index 5f1c4d4..c02c0eb 100644 (file)
@@ -24,6 +24,7 @@ import posixpath
 
 from six.moves import configparser, zip
 
+from yardstick.common.process import check_if_process_failed
 from yardstick.network_services.helpers.samplevnf_helper import PortPairs
 from yardstick.network_services.pipeline import PipelineRules
 from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNF, DpdkVnfSetupEnvHelper
@@ -278,6 +279,8 @@ class VpeApproxVnf(SampleVNF):
         raise NotImplementedError
 
     def collect_kpi(self):
+        # we can't get KPIs if the VNF is down
+        check_if_process_failed(self._vnf_process)
         result = {
             'pkt_in_up_stream': 0,
             'pkt_drop_up_stream': 0,