self.param.task_id = [FAKE_TASK_ID]
self.rep = report.Report()
- @mock.patch('yardstick.benchmark.core.report.Report._get_tasks')
- @mock.patch('yardstick.benchmark.core.report.Report._get_fieldkeys')
- @mock.patch('yardstick.benchmark.core.report.Report._validate')
+ @mock.patch.object(report.Report, '_get_tasks')
+ @mock.patch.object(report.Report, '_get_fieldkeys')
+ @mock.patch.object(report.Report, '_validate')
def test_generate_success(self, mock_valid, mock_keys, mock_tasks):
mock_tasks.return_value = FAKE_DB_TASK
mock_keys.return_value = FAKE_DB_FIELDKEYS
self.rep.generate(self.param)
mock_valid.assert_called_once_with(FAKE_YAML_NAME, FAKE_TASK_ID)
- self.assertEqual(1, mock_tasks.call_count)
- self.assertEqual(1, mock_keys.call_count)
+ mock_tasks.assert_called_once_with()
+ mock_keys.assert_called_once_with()
# pylint: disable=deprecated-method
def test_invalid_yaml_name(self):
with helper.get_benchmark_instance():
helper()
- self.assertEqual(method.call_count, 1)
+ method.assert_called_once()
def test___call___error(self):
cls = mock.MagicMock()
runner = SearchRunner({})
runner._run_benchmark(cls, 'my_method', scenario_cfg, {})
- self.assertEqual(mock_multi_process.Process.call_count, 1)
+ mock_multi_process.Process.assert_called_once()
dpdk_helper.force_dpdk_rebind = mock_helper_func = mock.Mock()
dpdk_node._force_rebind()
- self.assertEqual(mock_helper_func.call_count, 1)
+ mock_helper_func.assert_called_once()
class TestDpdkBindHelper(unittest.TestCase):
y._rules = None
self.assertIsNone(y.get_rules())
- self.assertEqual(read_mock.call_count, 1)
- self.assertEqual(get_mock.call_count, 1)
+ read_mock.assert_called_once()
+ get_mock.assert_called_once()
# True value should prevent calling read and get
y._rules = 999
self.assertEqual(y.get_rules(), 999)
- self.assertEqual(read_mock.call_count, 1)
- self.assertEqual(get_mock.call_count, 1)
+ read_mock.assert_called_once()
+ get_mock.assert_called_once()
for _ in profile.bounds_iterator(mock_logger):
pass
- self.assertEqual(mock_logger.debug.call_count, 1)
+ mock_logger.debug.assert_called_once()
self.assertEqual(mock_logger.info.call_count, 10)
my_mock_logger = mock.MagicMock()
prox_test_data = ProxTestDataTuple(1, 2, 3, 4, 5, [6.1, 6.9, 6.4], 7, 8, 9)
prox_test_data.log_data()
- self.assertEqual(my_mock_logger.debug.call_count, 0)
- self.assertEqual(mock_logger.debug.call_count, 0)
+
+ my_mock_logger.debug.assert_not_called()
+ mock_logger.debug.assert_not_called()
mock_logger.debug.reset_mock()
prox_test_data.log_data(my_mock_logger)
- self.assertEqual(my_mock_logger.debug.call_count, 0)
- self.assertEqual(mock_logger.debug.call_count, 0)
+ my_mock_logger.assert_not_called()
+ mock_logger.debug.assert_not_called()
class TestPacketDump(unittest.TestCase):
mock_sock = mock.MagicMock()
prox = ProxSocketHelper(mock_sock)
prox.connect('10.20.30.40', 23456)
- self.assertEqual(mock_sock.connect.call_count, 1)
+ mock_sock.connect.assert_called_once()
def test_get_sock(self):
mock_sock = mock.MagicMock()
mock_socket = mock.MagicMock()
prox = ProxSocketHelper(mock_socket)
prox.dump_rx(3, 5, 8)
- self.assertEqual(mock_socket.sendall.call_count, 1)
+ mock_socket.sendall.assert_called_once()
def test_quit(self):
mock_socket = mock.MagicMock()
prox_approx_vnf.resource_helper = resource_helper = mock.Mock()
prox_approx_vnf._vnf_up_post()
- self.assertEqual(resource_helper.up_post.call_count, 1)
+ resource_helper.up_post.assert_called_once()
@mock.patch(SSH_HELPER)
def test_vnf_execute_oserror(self, ssh, *args):
self.assertFalse(ssh_helper.is_connected)
cfg_file = ssh_helper.upload_config_file('my/prefix', 'my content')
self.assertTrue(ssh_helper.is_connected)
- self.assertEqual(mock_paramiko.SSHClient.call_count, 1)
+ mock_paramiko.SSHClient.assert_called_once()
self.assertTrue(cfg_file.startswith('/tmp'))
cfg_file = ssh_helper.upload_config_file('/my/prefix', 'my content')
self.assertTrue(ssh_helper.is_connected)
- self.assertEqual(mock_paramiko.SSHClient.call_count, 1)
+ mock_paramiko.SSHClient.assert_called_once()
self.assertEqual(cfg_file, '/my/prefix')
def test_join_bin_path(self):
self.assertFalse(ssh_helper.is_connected)
ssh_helper.provision_tool()
self.assertTrue(ssh_helper.is_connected)
- self.assertEqual(mock_paramiko.SSHClient.call_count, 1)
- self.assertEqual(mock_provision_tool.call_count, 1)
+ mock_paramiko.SSHClient.assert_called_once()
+ mock_provision_tool.assert_called_once()
ssh_helper.provision_tool(tool_file='my_tool.sh')
self.assertTrue(ssh_helper.is_connected)
- self.assertEqual(mock_paramiko.SSHClient.call_count, 1)
+ mock_paramiko.SSHClient.assert_called_once()
self.assertEqual(mock_provision_tool.call_count, 2)
ssh_helper.provision_tool('tool_path', 'my_tool.sh')
self.assertTrue(ssh_helper.is_connected)
- self.assertEqual(mock_paramiko.SSHClient.call_count, 1)
+ mock_paramiko.SSHClient.assert_called_once()
self.assertEqual(mock_provision_tool.call_count, 3)
result = dpdk_setup_helper.build_config()
self.assertEqual(result, expected)
self.assertGreaterEqual(ssh_helper.upload_config_file.call_count, 2)
- self.assertGreaterEqual(mock_find.call_count, 1)
- self.assertGreaterEqual(mock_multi_port_config.generate_config.call_count, 1)
- self.assertGreaterEqual(mock_multi_port_config.generate_script.call_count, 1)
+ mock_find.assert_called()
+ mock_multi_port_config.generate_config.assert_called()
+ mock_multi_port_config.generate_script.assert_called()
scenario_helper.vnf_cfg = {'file': 'fake_file'}
dpdk_setup_helper = DpdkVnfSetupEnvHelper(vnfd_helper, ssh_helper, scenario_helper)
mock_open_rf.assert_called_once()
self.assertEqual(result, expected)
self.assertGreaterEqual(ssh_helper.upload_config_file.call_count, 2)
- self.assertGreaterEqual(mock_find.call_count, 1)
- self.assertGreaterEqual(mock_multi_port_config.generate_config.call_count, 1)
- self.assertGreaterEqual(mock_multi_port_config.generate_script.call_count, 1)
+ mock_find.assert_called()
+ mock_multi_port_config.generate_config.assert_called()
+ mock_multi_port_config.generate_script.assert_called()
def test__build_pipeline_kwargs(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
client_resource_helper.client.get_stats.side_effect = mock_state_error
self.assertEqual(client_resource_helper.get_stats(), {})
- self.assertEqual(client_resource_helper.client.get_stats.call_count, 1)
+ client_resource_helper.client.get_stats.assert_called_once()
def test_generate_samples(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
client_resource_helper.client = mock.Mock()
self.assertIsNone(client_resource_helper.clear_stats())
- self.assertEqual(client_resource_helper.client.clear_stats.call_count, 1)
+ client_resource_helper.client.clear_stats.assert_called_once()
def test_clear_stats_of_ports(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
client_resource_helper.client = mock.Mock()
self.assertIsNone(client_resource_helper.clear_stats([3, 4]))
- self.assertEqual(client_resource_helper.client.clear_stats.call_count, 1)
+ client_resource_helper.client.clear_stats.assert_called_once()
def test_start(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
client_resource_helper.client = mock.Mock()
self.assertIsNone(client_resource_helper.start())
- self.assertEqual(client_resource_helper.client.start.call_count, 1)
+ client_resource_helper.client.start.assert_called_once()
def test_start_ports(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
client_resource_helper.client = mock.Mock()
self.assertIsNone(client_resource_helper.start([3, 4]))
- self.assertEqual(client_resource_helper.client.start.call_count, 1)
+ client_resource_helper.client.start.assert_called_once()
def test_collect_kpi_with_queue(self):
vnfd_helper = VnfdHelper(self.VNFD_0)
self.assertIsNone(sample_vnf_deploy_helper.deploy_vnfs('name1'))
sample_vnf_deploy_helper.DISABLE_DEPLOY = True
self.assertEqual(ssh_helper.execute.call_count, 5)
- self.assertEqual(ssh_helper.put.call_count, 1)
+ ssh_helper.put.assert_called_once()
@mock.patch('yardstick.network_services.vnf_generic.vnf.sample_vnf.time')
@mock.patch('subprocess.check_output')
self.assertIsNone(sample_vnf_deploy_helper.deploy_vnfs('name1'))
self.assertEqual(ssh_helper.execute.call_count, 5)
- self.assertEqual(ssh_helper.put.call_count, 1)
+ ssh_helper.put.assert_called_once()
@mock.patch('subprocess.check_output')
def test_deploy_vnfs_early_success(self, *args):
sample_vnf_deploy_helper.DISABLE_DEPLOY = False
self.assertIsNone(sample_vnf_deploy_helper.deploy_vnfs('name1'))
- self.assertEqual(ssh_helper.execute.call_count, 1)
- self.assertEqual(ssh_helper.put.call_count, 0)
+ ssh_helper.execute.assert_called_once()
+ ssh_helper.put.assert_not_called()
class TestScenarioHelper(unittest.TestCase):
ixia_resource_helper.client = mock_client
ixia_resource_helper.stop_collect()
- self.assertEqual(mock_client.ix_stop_traffic.call_count, 1)
+ mock_client.ix_stop_traffic.assert_called_once()
def test_run_traffic(self):
mock_tprofile = mock.Mock()
with self.assertRaises(exceptions.SSHError) as raised:
test_ssh._get_client()
- self.assertEqual(mock_paramiko.SSHClient.call_count, 1)
- self.assertEqual(mock_paramiko.AutoAddPolicy.call_count, 1)
- self.assertEqual(fake_client.set_missing_host_key_policy.call_count, 1)
- self.assertEqual(fake_client.connect.call_count, 1)
+ mock_paramiko.SSHClient.assert_called_once()
+ mock_paramiko.AutoAddPolicy.assert_called_once()
+ fake_client.set_missing_host_key_policy.assert_called_once()
+ fake_client.connect.assert_called_once()
exc_str = str(raised.exception)
self.assertIn('raised during connect', exc_str)
self.assertIn('MyError', exc_str)
auto_connect_ssh._get_client = mock__get_client = mock.Mock()
auto_connect_ssh._connect()
- self.assertEqual(mock__get_client.call_count, 1)
+ mock__get_client.assert_called_once()
def test___init___negative(self):
with self.assertRaises(TypeError):
auto_connect_ssh.get_file_obj('remote/path', mock.Mock())
- self.assertEqual(mock_sftp.getfo.call_count, 1)
+ mock_sftp.getfo.assert_called_once()
def test__make_dict(self):
auto_connect_ssh = AutoConnectSSH('user1', 'host1')
auto_connect_ssh.put('a', 'z')
with mock_scp_client_type() as mock_scp_client:
- self.assertEqual(mock_scp_client.put.call_count, 1)
+ mock_scp_client.put.assert_called_once()
@mock.patch('yardstick.ssh.SCPClient')
def test_get(self, mock_scp_client_type):
auto_connect_ssh.get('a', 'z')
with mock_scp_client_type() as mock_scp_client:
- self.assertEqual(mock_scp_client.get.call_count, 1)
+ mock_scp_client.get.assert_called_once()
def test_put_file(self):
auto_connect_ssh = AutoConnectSSH('user1', 'host1')
auto_connect_ssh._put_file_sftp = mock_put_sftp = mock.Mock()
auto_connect_ssh.put_file('a', 'b')
- self.assertEqual(mock_put_sftp.call_count, 1)
+ mock_put_sftp.assert_called_once()