# limitations under the License.
#
-# Unittest for yardstick.network_services.vnf_generic.vnf.sample_vnf
-
-from __future__ import absolute_import
+from copy import deepcopy
import unittest
import mock
-from copy import deepcopy
+import six
from tests.unit.network_services.vnf_generic.vnf.test_base import mock_ssh
from tests.unit import STL_MOCKS
from yardstick.benchmark.contexts.base import Context
+from yardstick.common import exceptions as y_exceptions
+from yardstick.common import utils
from yardstick.network_services.nfvi.resource import ResourceProfile
-from yardstick.network_services.traffic_profile.base import TrafficProfile
from yardstick.network_services.vnf_generic.vnf.base import VnfdHelper
stl_patch.start()
if stl_patch:
+ from yardstick.network_services.vnf_generic.vnf import sample_vnf
from yardstick.network_services.vnf_generic.vnf.sample_vnf import VnfSshHelper
from yardstick.network_services.vnf_generic.vnf.sample_vnf import SampleVNFDeployHelper
from yardstick.network_services.vnf_generic.vnf.sample_vnf import ScenarioHelper
result = DpdkVnfSetupEnvHelper._update_traffic_type(ip_pipeline_cfg, traffic_options)
self.assertEqual(result, expected)
- def test__setup_hugepages(self):
- vnfd_helper = VnfdHelper(self.VNFD_0)
- ssh_helper = mock.Mock()
- ssh_helper.execute.return_value = 0, '', ''
- scenario_helper = mock.Mock()
- dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper)
-
- result = dpdk_setup_helper._setup_hugepages()
- expect_start_list = ['awk', 'awk', 'echo']
- expect_in_list = ['meminfo', 'nr_hugepages', '16']
- call_args_iter = (args[0][0] for args in ssh_helper.execute.call_args_list)
- self.assertIsNone(result)
- self.assertEqual(ssh_helper.execute.call_count, 3)
- for expect_start, expect_in, arg0 in zip(expect_start_list, expect_in_list,
- call_args_iter):
- self.assertTrue(arg0.startswith(expect_start))
- self.assertIn(expect_in, arg0)
-
- def test__setup_hugepages_2_mb(self):
- vnfd_helper = VnfdHelper(self.VNFD_0)
+ @mock.patch.object(six, 'BytesIO', return_value=six.BytesIO(b'100\n'))
+ @mock.patch.object(utils, 'read_meminfo',
+ return_value={'Hugepagesize': '2048'})
+ def test__setup_hugepages(self, mock_meminfo, *args):
ssh_helper = mock.Mock()
- ssh_helper.execute.return_value = 0, '2048kB ', ''
- scenario_helper = mock.Mock()
- dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper)
-
- result = dpdk_setup_helper._setup_hugepages()
- expect_start_list = ['awk', 'awk', 'echo']
- expect_in_list = ['meminfo', 'nr_hugepages', '8192']
- call_args_iter = (args[0][0] for args in ssh_helper.execute.call_args_list)
- self.assertIsNone(result)
- self.assertEqual(ssh_helper.execute.call_count, 3)
- for expect_start, expect_in, arg0 in zip(expect_start_list, expect_in_list,
- call_args_iter):
- self.assertTrue(arg0.startswith(expect_start))
- self.assertIn(expect_in, arg0)
+ dpdk_setup_helper = DpdkVnfSetupEnvHelper(
+ mock.ANY, ssh_helper, mock.ANY)
+ with mock.patch.object(sample_vnf.LOG, 'info') as mock_info:
+ dpdk_setup_helper._setup_hugepages()
+ mock_info.assert_called_once_with(
+ 'Hugepages size (kB): %s, number claimed: %s, number set: '
+ '%s', 2048, 8192, 100)
+ mock_meminfo.assert_called_once_with(ssh_helper)
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.open')
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.find_relative_file')
+ @mock.patch.object(utils, 'find_relative_file')
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.MultiPortConfig')
- def test_build_config(self, mock_multi_port_config_class, mock_find, _):
+ def test_build_config(self, mock_multi_port_config_class, mock_find, *args):
mock_multi_port_config = mock_multi_port_config_class()
vnfd_helper = VnfdHelper(self.VNFD_0)
ssh_helper = mock.Mock()
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
@mock.patch('yardstick.ssh.SSH')
- def test_setup_vnf_environment(self, _, mock_time):
- def execute(cmd, *args, **kwargs):
+ def test_setup_vnf_environment(self, *args):
+ def execute(cmd):
if cmd.startswith('which '):
return exec_failure
return exec_success
dpdk_vnf_setup_env_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, mock.Mock())
dpdk_vnf_setup_env_helper._validate_cpu_cfg = mock.Mock(return_value=[])
- self.assertIsInstance(dpdk_vnf_setup_env_helper.setup_vnf_environment(), ResourceProfile)
+ with mock.patch.object(dpdk_vnf_setup_env_helper, '_setup_dpdk'):
+ self.assertIsInstance(
+ dpdk_vnf_setup_env_helper.setup_vnf_environment(),
+ ResourceProfile)
- def test__setup_dpdk_early_success(self):
- vnfd_helper = VnfdHelper(self.VNFD_0)
+ def test__setup_dpdk(self):
ssh_helper = mock.Mock()
- ssh_helper.execute.return_value = 0, 'output', ''
- ssh_helper.join_bin_path.return_value = 'joined_path'
- ssh_helper.provision_tool.return_value = 'provision string'
- scenario_helper = mock.Mock()
- dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper)
- dpdk_setup_helper._setup_hugepages = mock.Mock()
-
- self.assertIsNone(dpdk_setup_helper._setup_dpdk())
- self.assertEqual(dpdk_setup_helper.ssh_helper.execute.call_count, 2)
-
- @mock.patch('yardstick.ssh.SSH')
- def test__setup_dpdk_short(self, _):
- def execute_side(cmd, *args, **kwargs):
- if 'joined_path' in cmd:
- return 0, 'output', ''
- return 1, 'bad output', 'error output'
+ ssh_helper.execute = mock.Mock()
+ ssh_helper.execute.return_value = (0, 0, 0)
+ dpdk_setup_helper = DpdkVnfSetupEnvHelper(mock.ANY, ssh_helper, mock.ANY)
+ with mock.patch.object(dpdk_setup_helper, '_setup_hugepages') as \
+ mock_setup_hp:
+ dpdk_setup_helper._setup_dpdk()
+ mock_setup_hp.assert_called_once()
+ ssh_helper.execute.assert_has_calls([
+ mock.call('sudo modprobe uio && sudo modprobe igb_uio'),
+ mock.call('lsmod | grep -i igb_uio')
+ ])
- vnfd_helper = VnfdHelper(self.VNFD_0)
+ def test__setup_dpdk_igb_uio_not_loaded(self):
ssh_helper = mock.Mock()
- ssh_helper.execute.side_effect = execute_side
- ssh_helper.join_bin_path.return_value = 'joined_path'
- ssh_helper.provision_tool.return_value = 'provision string'
- scenario_helper = mock.Mock()
- dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper)
- dpdk_setup_helper._setup_hugepages = mock.Mock()
-
- self.assertIsNone(dpdk_setup_helper._setup_dpdk())
- self.assertEqual(dpdk_setup_helper.ssh_helper.execute.call_count, 3)
+ ssh_helper.execute = mock.Mock()
+ ssh_helper.execute.side_effect = [(0, 0, 0), (1, 0, 0)]
+ dpdk_setup_helper = DpdkVnfSetupEnvHelper(mock.ANY, ssh_helper, mock.ANY)
+ with mock.patch.object(dpdk_setup_helper, '_setup_hugepages') as \
+ mock_setup_hp:
+ with self.assertRaises(y_exceptions.DPDKSetupDriverError):
+ dpdk_setup_helper._setup_dpdk()
+ mock_setup_hp.assert_called_once()
+ ssh_helper.execute.assert_has_calls([
+ mock.call('sudo modprobe uio && sudo modprobe igb_uio'),
+ mock.call('lsmod | grep -i igb_uio')
+ ])
@mock.patch('yardstick.ssh.SSH')
def test__setup_resources(self, _):
self.assertEqual(dpdk_setup_helper.socket, 1)
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
- def test__detect_and_bind_drivers(self, mock_time):
+ def test__detect_and_bind_drivers(self, *args):
vnfd_helper = VnfdHelper(deepcopy(self.VNFD_0))
ssh_helper = mock.Mock()
# ssh_helper.execute = mock.Mock(return_value = (0, 'text', ''))
intf_0 = vnfd_helper.vdu[0]['external-interface'][0]['virtual-interface']
intf_1 = vnfd_helper.vdu[0]['external-interface'][1]['virtual-interface']
- self.assertEquals(0, intf_0['dpdk_port_num'])
- self.assertEquals(1, intf_1['dpdk_port_num'])
+ self.assertEqual(0, intf_0['dpdk_port_num'])
+ self.assertEqual(1, intf_1['dpdk_port_num'])
def test_tear_down(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.LOG')
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.STLError',
new_callable=lambda: MockError)
- def test_get_stats_not_connected(self, mock_state_error, mock_logger):
+ def test_get_stats_not_connected(self, mock_state_error, *args):
vnfd_helper = VnfdHelper(self.VNFD_0)
ssh_helper = mock.Mock()
scenario_helper = mock.Mock()
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.LOG')
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.STLError',
new_callable=lambda: MockError)
- def test__connect_with_failures(self, mock_error, mock_logger, mock_time):
+ def test__connect_with_failures(self, mock_error, *args):
vnfd_helper = VnfdHelper(self.VNFD_0)
ssh_helper = mock.Mock()
scenario_helper = mock.Mock()
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
@mock.patch('subprocess.check_output')
- def test_deploy_vnfs_disabled(self, mock_check_output, mock_time):
+ def test_deploy_vnfs_disabled(self, *args):
vnfd_helper = mock.Mock()
ssh_helper = mock.Mock()
ssh_helper.join_bin_path.return_value = 'joined_path'
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
@mock.patch('subprocess.check_output')
- def test_deploy_vnfs(self, mock_check_output, mock_time):
+ def test_deploy_vnfs(self, *args):
vnfd_helper = mock.Mock()
ssh_helper = mock.Mock()
ssh_helper.join_bin_path.return_value = 'joined_path'
self.assertEqual(ssh_helper.put.call_count, 1)
@mock.patch('subprocess.check_output')
- def test_deploy_vnfs_early_success(self, mock_check_output):
+ def test_deploy_vnfs_early_success(self, *args):
vnfd_helper = mock.Mock()
ssh_helper = mock.Mock()
ssh_helper.join_bin_path.return_value = 'joined_path'
self.assertEqual(result, expected)
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.Process')
- def test__start_vnf(self, mock_process_type):
+ def test__start_vnf(self, *args):
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
sample_vnf = SampleVNF('vnf1', vnfd)
sample_vnf._run = mock.Mock()
@mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.time")
@mock.patch("yardstick.ssh.SSH")
- def test_wait_for_instantiate_empty_queue(self, ssh, mock_time):
+ def test_wait_for_instantiate_empty_queue(self, ssh, *args):
mock_ssh(ssh, exec_result=(1, "", ""))
queue_size_list = [
self.assertIsNotNone(sample_vnf.my_ports)
@mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.time")
- def test_vnf_execute_with_queue_data(self, mock_time):
+ def test_vnf_execute_with_queue_data(self, *args):
queue_size_list = [
1,
1,
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
sample_vnf = SampleVNF('vnf1', vnfd)
sample_vnf.APP_NAME = 'sample1'
- sample_vnf.COLLECT_KPI = '\s(\d+)\D*(\d+)\D*(\d+)'
+ sample_vnf.COLLECT_KPI = r'\s(\d+)\D*(\d+)\D*(\d+)'
sample_vnf.COLLECT_MAP = {
'k1': 3,
'k2': 1,
vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
sample_vnf = SampleVNF('vnf1', vnfd)
sample_vnf.APP_NAME = 'sample1'
- sample_vnf.COLLECT_KPI = '\s(\d+)\D*(\d+)\D*(\d+)'
+ sample_vnf.COLLECT_KPI = r'\s(\d+)\D*(\d+)\D*(\d+)'
sample_vnf.get_stats = mock.Mock(return_value='')
expected = {
result = sample_vnf.collect_kpi()
self.assertDictEqual(result, expected)
+ def test_scale(self):
+ vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ sample_vnf = SampleVNF('vnf1', vnfd)
+ self.assertRaises(y_exceptions.FunctionNotImplemented,
+ sample_vnf.scale)
+
+ def test__run(self):
+ test_cmd = 'test cmd'
+ run_kwargs = {'arg1': 'val1', 'arg2': 'val2'}
+ vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
+ sample_vnf = SampleVNF('vnf1', vnfd)
+ sample_vnf.ssh_helper = mock.Mock()
+ sample_vnf.setup_helper = mock.Mock()
+ with mock.patch.object(sample_vnf, '_build_config',
+ return_value=test_cmd), \
+ mock.patch.object(sample_vnf, '_build_run_kwargs'):
+ sample_vnf.run_kwargs = run_kwargs
+ sample_vnf._run()
+ sample_vnf.ssh_helper.drop_connection.assert_called_once()
+ sample_vnf.ssh_helper.run.assert_called_once_with(test_cmd,
+ **run_kwargs)
+ sample_vnf.setup_helper.kill_vnf.assert_called_once()
+
class TestSampleVNFTrafficGen(unittest.TestCase):
sample_vnf_tg.terminate()
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.LOG')
- def test_wait_for_instantiate(self, mock_logger, mock_time):
+ def test__wait_for_process(self):
sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
- sample_vnf_tg._check_status = mock.Mock(side_effect=iter([1, 0]))
- sample_vnf_tg._tg_process = mock.Mock()
- sample_vnf_tg._tg_process.is_alive.return_value = True
- sample_vnf_tg._tg_process.exitcode = 234
-
- self.assertEqual(sample_vnf_tg.wait_for_instantiate(), 234)
-
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.LOG')
- def test_wait_for_instantiate_not_alive(self, mock_logger, mock_time):
+ with mock.patch.object(sample_vnf_tg, '_check_status',
+ return_value=0) as mock_status, \
+ mock.patch.object(sample_vnf_tg, '_tg_process') as mock_proc:
+ mock_proc.is_alive.return_value = True
+ mock_proc.exitcode = 234
+ self.assertEqual(sample_vnf_tg._wait_for_process(), 234)
+ mock_proc.is_alive.assert_called_once()
+ mock_status.assert_called_once()
+
+ def test__wait_for_process_not_alive(self):
sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
- sample_vnf_tg._check_status = mock.Mock(return_value=1)
- sample_vnf_tg._tg_process = mock.Mock()
- sample_vnf_tg._tg_process.is_alive.side_effect = iter([True, False])
- sample_vnf_tg._tg_process.exitcode = 234
-
- with self.assertRaises(RuntimeError):
- sample_vnf_tg.wait_for_instantiate()
-
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.LOG')
- @mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.Process')
- def test_wait_for_instantiate_delayed(self, mock_process, mock_logger, mock_time):
- class MockClientStarted(mock.Mock):
-
- def __init__(self, *args, **kwargs):
- super(MockClientStarted, self).__init__(*args, **kwargs)
- self.iter = iter([0, 0, 1])
-
- @property
- def value(self):
- return next(self.iter)
-
- mock_traffic_profile = mock.Mock(autospec=TrafficProfile)
- mock_traffic_profile.get_traffic_definition.return_value = "64"
- mock_traffic_profile.execute_traffic.return_value = "64"
- mock_traffic_profile.params = self.TRAFFIC_PROFILE
+ with mock.patch.object(sample_vnf_tg, '_tg_process') as mock_proc:
+ mock_proc.is_alive.return_value = False
+ self.assertRaises(RuntimeError, sample_vnf_tg._wait_for_process)
+ mock_proc.is_alive.assert_called_once()
+ def test__wait_for_process_delayed(self):
sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
- sample_vnf_tg._check_status = mock.Mock(side_effect=iter([1, 0]))
- sample_vnf_tg._tg_process = mock.Mock()
- sample_vnf_tg._tg_process.is_alive.return_value = True
- sample_vnf_tg._tg_process.exitcode = 234
- sample_vnf_tg.resource_helper = mock.Mock()
- sample_vnf_tg.resource_helper.client_started = MockClientStarted()
-
- self.assertTrue(sample_vnf_tg.run_traffic(mock_traffic_profile))
- self.assertEqual(mock_time.sleep.call_count, 2)
+ with mock.patch.object(sample_vnf_tg, '_check_status',
+ side_effect=[1, 0]) as mock_status, \
+ mock.patch.object(sample_vnf_tg,
+ '_tg_process') as mock_proc:
+ mock_proc.is_alive.return_value = True
+ mock_proc.exitcode = 234
+ self.assertEqual(sample_vnf_tg._wait_for_process(), 234)
+ mock_proc.is_alive.assert_has_calls([mock.call(), mock.call()])
+ mock_status.assert_has_calls([mock.call(), mock.call()])
+
+ def test_scale(self):
+ sample_vnf_tg = SampleVNFTrafficGen('tg1', self.VNFD_0)
+ self.assertRaises(y_exceptions.FunctionNotImplemented,
+ sample_vnf_tg.scale)