from yardstick.tests import STL_MOCKS
from yardstick.tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
+from yardstick.benchmark.contexts import base as ctx_base
SSH_HELPER = 'yardstick.network_services.vnf_generic.vnf.sample_vnf.VnfSshHelper'
self.assertIsNone(udp_replay_approx_vnf._vnf_process)
@mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.time")
+ @mock.patch.object(ctx_base.Context, 'get_physical_node_from_server', return_value='mock_node')
@mock.patch(SSH_HELPER)
def test_collect_kpi(self, ssh, *args):
mock_ssh(ssh)
"0\t\t7374156\t\t7374136\t\t\t0\t\t\t0\r\n" \
"1\t\t7374316\t\t7374315\t\t\t0\t\t\t0\r\n\r\nReplay>\r\r\nReplay>"
udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, vnfd)
+ udp_replay_approx_vnf.scenario_helper.scenario_cfg = {
+ 'nodes': {udp_replay_approx_vnf.name: "mock"}
+ }
udp_replay_approx_vnf.q_in = mock.MagicMock()
udp_replay_approx_vnf.q_out = mock.MagicMock()
udp_replay_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
udp_replay_approx_vnf.all_ports = ["xe0", "xe1"]
udp_replay_approx_vnf.get_stats = mock.Mock(return_value=get_stats_ret_val)
-
- result = {'collect_stats': {}, 'packets_dropped': 0,
- 'packets_fwd': 14748451, 'packets_in': 14748472}
+ result = {
+ 'physical_node': 'mock_node',
+ 'collect_stats': {},
+ 'packets_dropped': 0,
+ 'packets_fwd': 14748451,
+ 'packets_in': 14748472
+ }
self.assertEqual(result, udp_replay_approx_vnf.collect_kpi())
@mock.patch(SSH_HELPER)
file_path = os.path.join(curr_path, filename)
return file_path
- @mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.Context")
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server')
@mock.patch(SSH_HELPER)
- def test__build_config(self, ssh, mock_context, *args):
+ def test__build_config(self, ssh, mock_get_ctx, *args):
mock_ssh(ssh)
+ nfvi_context = mock.Mock()
+ nfvi_context.attrs = {'nfvi_type': 'baremetal'}
+ mock_get_ctx.return_value = nfvi_context
+
udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
udp_replay_approx_vnf.queue_wrapper = mock.MagicMock()
- udp_replay_approx_vnf.nfvi_context = mock_context
+ udp_replay_approx_vnf.nfvi_context = mock_get_ctx
udp_replay_approx_vnf.nfvi_context.attrs = {'nfvi_type': 'baremetal'}
udp_replay_approx_vnf.setup_helper.bound_pci = []
udp_replay_approx_vnf.ssh_helper.provision_tool = mock.MagicMock(return_value="tool_path")
self.assertEqual(cmd_line, expected)
@mock.patch('yardstick.network_services.vnf_generic.vnf.udp_replay.open')
- @mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.Context")
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server')
@mock.patch(SSH_HELPER)
- def test__build_pipeline_kwargs(self, ssh, mock_context, *args):
+ def test__build_pipeline_kwargs(self, ssh, mock_get_ctx, *args):
mock_ssh(ssh)
+
+ nfvi_context = mock.Mock()
+ nfvi_context.attrs = {'nfvi_type': "baremetal"}
+ mock_get_ctx.return_value = nfvi_context
+
udp_replay_approx_vnf = UdpReplayApproxVnf(NAME, self.VNFD_0)
- udp_replay_approx_vnf.nfvi_context = mock_context
- udp_replay_approx_vnf.nfvi_context.attrs = {'nfvi_type': 'baremetal'}
udp_replay_approx_vnf.setup_helper.bound_pci = ['0000:00:0.1', '0000:00:0.3']
udp_replay_approx_vnf.all_ports = ["xe0", "xe1"]
udp_replay_approx_vnf.ssh_helper.provision_tool = mock.MagicMock(return_value="tool_path")
udp_replay_approx_vnf.ssh_helper.run.assert_called_once()
- @mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.Context")
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server')
@mock.patch(SSH_HELPER)
def test_instantiate(self, ssh, *args):
mock_ssh(ssh)
self.assertEqual(udp_replay_approx_vnf.wait_for_instantiate(), 0)
- @mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.Context")
+ @mock.patch.object(ctx_base.Context, 'get_context_from_server')
@mock.patch('yardstick.ssh.SSH')
@mock.patch(SSH_HELPER)
def test_instantiate_panic(self, *args):