import six
import unittest
-from yardstick.network_services.vnf_generic.vnf import tg_rfc2544_ixia
+from yardstick.benchmark import contexts
+from yardstick.benchmark.contexts import base as ctx_base
+from yardstick.network_services.libs.ixia_libs.ixnet import ixnet_api
from yardstick.network_services.traffic_profile import base as tp_base
+from yardstick.network_services.vnf_generic.vnf import tg_rfc2544_ixia
TEST_FILE_YAML = 'nsb_test_case.yaml'
class TestIxiaResourceHelper(unittest.TestCase):
def setUp(self):
- self._mock_IxNextgen = mock.patch.object(tg_rfc2544_ixia,
- 'IxNextgen')
+ self._mock_IxNextgen = mock.patch.object(ixnet_api, 'IxNextgen')
self.mock_IxNextgen = self._mock_IxNextgen.start()
self.addCleanup(self._stop_mocks)
def test_stop_collect_with_client(self):
mock_client = mock.Mock()
-
ixia_resource_helper = tg_rfc2544_ixia.IxiaResourceHelper(mock.Mock())
-
ixia_resource_helper.client = mock_client
ixia_resource_helper.stop_collect()
- mock_client.ix_stop_traffic.assert_called_once()
+ self.assertEqual(1, ixia_resource_helper._terminated.value)
def test_run_traffic(self):
mock_tprofile = mock.Mock()
self.assertEqual('fake_samples', ixia_rhelper._queue.get())
-@mock.patch(
- "yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.IxNextgen")
+@mock.patch.object(tg_rfc2544_ixia, 'ixnet_api')
class TestIXIATrafficGen(unittest.TestCase):
VNFD = {'vnfd:vnfd-catalog':
{'vnfd':
'nodes': {'tg__1': 'trafficgen_1.yardstick',
'vnf__1': 'vnf.yardstick'},
'topology': 'vpe_vnf_topology.yaml'}],
- 'context': {'nfvi_type': 'baremetal', 'type': 'Node',
+ 'context': {'nfvi_type': 'baremetal',
+ 'type': contexts.CONTEXT_NODE,
'name': 'yardstick',
'file': '/etc/yardstick/nodes/pod.yaml'},
'schema': 'yardstick:task:0.1'}
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
# NOTE(ralonsoh): check the object returned.
- tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd, 'task_id')
def test_listen_traffic(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
self.assertIsNone(ixnet_traffic_gen.listen_traffic({}))
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server', return_value='fake_context')
def test_instantiate(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
scenario_cfg = {'tc': "nsb_test_case", "topology": "",
'ixia_profile': "ixload.cfg"}
scenario_cfg.update(
'lb_count': 1,
'worker_config': '1C/1T',
'worker_threads': 1}}}})
+ scenario_cfg.update({
+ 'nodes': {ixnet_traffic_gen.name: "mock"}
+ })
ixnet_traffic_gen.topology = ""
ixnet_traffic_gen.get_ixobj = mock.MagicMock()
ixnet_traffic_gen._ixia_traffic_gen = mock.MagicMock()
IOError,
ixnet_traffic_gen.instantiate(scenario_cfg, {}))
+ @mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
def test_collect_kpi(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock = mock.Mock(autospec=ssh.SSH)
ssh_mock.execute = \
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
+
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
+ ixnet_traffic_gen.scenario_helper.scenario_cfg = {
+ 'nodes': {ixnet_traffic_gen.name: "mock"}
+ }
ixnet_traffic_gen.data = {}
restult = ixnet_traffic_gen.collect_kpi()
- self.assertEqual({}, restult)
+
+ expected = {'collect_stats': {},
+ 'physical_node': 'mock_node'}
+
+ self.assertEqual(expected, restult)
def test_terminate(self, *args):
with mock.patch("yardstick.ssh.SSH") as ssh:
ssh_mock.execute = \
mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = ssh_mock
- ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd)
+ ixnet_traffic_gen = tg_rfc2544_ixia.IxiaTrafficGen(NAME, vnfd,
+ 'task_id')
ixnet_traffic_gen._terminated = mock.MagicMock()
ixnet_traffic_gen._terminated.value = 0
ixnet_traffic_gen._ixia_traffic_gen = mock.MagicMock()
def test__check_status(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd)
+ sut = tg_rfc2544_ixia.IxiaTrafficGen('vnf1', vnfd, 'task_id')
sut._check_status()
@mock.patch("yardstick.ssh.SSH")
mock_traffic_profile.get_drop_percentage.return_value = [
'Completed', samples]
- sut = tg_rfc2544_ixia.IxiaTrafficGen(name, vnfd)
+ sut = tg_rfc2544_ixia.IxiaTrafficGen(name, vnfd, 'task_id')
sut.vnf_port_pairs = [[[0], [1]]]
sut.tc_file_name = self._get_file_abspath(TEST_FILE_YAML)
sut.topology = ""
mock.mock_open(), create=True)
@mock.patch('yardstick.network_services.vnf_generic.vnf.tg_rfc2544_ixia.LOG.exception')
def _traffic_runner(*args):
- result = sut._traffic_runner(mock_traffic_profile)
+ sut._setup_mq_producer = mock.Mock(return_value='mq_producer')
+ result = sut._traffic_runner(mock_traffic_profile, mock.ANY)
self.assertIsNone(result)
_traffic_runner()