self.ovs_dpdk.helper = mock_helper
self.ovs_dpdk.vnf_node = mock_server
self.assertIsNone(self.ovs_dpdk.file_path)
- self.assertEqual(self.ovs_dpdk.first_run, True)
+ self.assertTrue(self.ovs_dpdk.first_run)
def test_init(self):
self.ovs_dpdk.helper.parse_pod_file = mock.Mock(return_value=[{}, {}, {}])
self.sriov.helper = mock_helper
self.sriov.vnf_node = mock_server
self.assertIsNone(self.sriov.file_path)
- self.assertEqual(self.sriov.first_run, True)
+ self.assertTrue(self.sriov.first_run)
def test_init(self):
self.sriov.helper.parse_pod_file = mock.Mock(return_value=[{}, {}, {}])
instance.setup()
mock_subprocess_check_output.return_value = (0, 'unittest')
ret = instance.monitor_func()
- self.assertEqual(ret, True)
+ self.assertTrue(ret)
instance._result = {"outage_time": 0}
instance.verify_SLA()
instance.setup()
mock_subprocess_check_output.side_effect = RuntimeError
ret = instance.monitor_func()
- self.assertEqual(ret, False)
+ self.assertFalse(ret)
mock_log.error.assert_called_once()
instance._result = {"outage_time": 10}
instance.verify_SLA()
p = serviceha.ServiceHA(self.args, self.ctx)
p.setup()
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
mock_monitor.MonitorMgr().verify_SLA.return_value = True
ret = {}
p.run(ret)
p.teardown()
p.setup()
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# def test__serviceha_run_sla_error(self, mock_attacker, mock_monitor):
# p = serviceha.ServiceHA(self.args, self.ctx)
c.setup()
self.assertIsNotNone(c.guest)
self.assertIsNotNone(c.host)
- self.assertEqual(c.setup_done, True)
+ self.assertTrue(c.setup_done)
def test_cyclictest_successful_no_sla(self, mock_ssh):
result = {}
q.setup()
self.assertIsNotNone(q.host)
- self.assertEqual(q.setup_done, True)
+ self.assertTrue(q.setup_done)
def test_qemu_migrate_successful_no_sla(self, mock_ssh):
result = {}
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
self.assertIsNotNone(u.client)
- self.assertEqual(u.setup_done, True)
+ self.assertTrue(u.setup_done)
def test_unixbench_successful_no_sla(self, mock_ssh):
self.assertIsNone(self.test_context.scenario_cfg)
self.assertIsNone(self.test_context.context_cfg)
- self.assertEqual(self.test_context.setup_done, False)
+ self.assertFalse(self.test_context.setup_done)
def test_run(self):
result = {}
self.test_context.run(result)
self.assertEqual(result["hello"], "yardstick")
- self.assertEqual(self.test_context.setup_done, True)
+ self.assertTrue(self.test_context.setup_done)
scenario_cfg = {'info1': info1, 'info2': info2}
obj = CheckNumaInfo(scenario_cfg, {})
status = obj._check_vm2_status(info1, info2)
- self.assertEqual(status, True)
+ self.assertTrue(status)
def test_check_vm2_status_length_gt_1(self):
info1 = {
scenario_cfg = {'info1': info1, 'info2': info2}
obj = CheckNumaInfo(scenario_cfg, {})
status = obj._check_vm2_status(info1, info2)
- self.assertEqual(status, False)
+ self.assertFalse(status)
def test_check_vm2_status_length_not_in_set(self):
info1 = {
scenario_cfg = {'info1': info1, 'info2': info2}
obj = CheckNumaInfo(scenario_cfg, {})
status = obj._check_vm2_status(info1, info2)
- self.assertEqual(status, False)
+ self.assertFalse(status)
def main():
p.setup()
self.assertIsNotNone(p.server)
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_netperf_successful_no_sla(self, mock_ssh):
p.setup()
self.assertIsNotNone(p.server)
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_netperf_node_successful_no_sla(self, mock_ssh):
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
self.assertIsNotNone(n.client)
- self.assertEqual(n.setup_done, True)
+ self.assertTrue(n.setup_done)
def test_nstat_successful_no_sla(self, mock_ssh):
mock_ssh.SSH.from_node().execute.return_value = (0, '0', '')
p.setup()
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
@mock.patch('yardstick.benchmark.scenarios.networking.ping6.ssh')
def test_ping_successful_no_sla(self, mock_ssh):
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
self.assertIsNotNone(p.server)
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_pktgen_successful_iptables_setup(self, mock_ssh):
self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
self.assertIsNotNone(p.server)
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_pktgen_dpdk_successful_get_port_ip(self):
mock_ssh.SSH().execute.return_value = (0, '', '')
self.assertIsNotNone(p.server)
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_pktgen_dpdk_throughput_successful_no_sla(self, mock_ssh):
args = {
p.dpdk_setup()
- self.assertEqual(p.dpdk_setup_done, True)
+ self.assertTrue(p.dpdk_setup_done)
def test_pktgen_dpdk_throughput_dpdk_get_result(self, mock_ssh):
args = {
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_vsperf_teardown(self, mock_ssh, mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
p.teardown()
- self.assertEqual(p.setup_done, False)
+ self.assertFalse(p.setup_done)
def test_vsperf_run_ok(self, mock_ssh, mock_subprocess):
p = vsperf.Vsperf(self.args, self.ctx)
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_vsperf_dpdk_teardown(self, mock_subprocess):
p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
p.teardown()
- self.assertEqual(p.setup_done, False)
+ self.assertFalse(p.setup_done)
def test_vsperf_dpdk_is_dpdk_setup_no(self, mock_subprocess):
p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# is_dpdk_setup() specific mocks
self.mock_ssh.SSH.from_node().execute.return_value = (0, 'dummy', '')
result = p._is_dpdk_setup()
- self.assertEqual(result, False)
+ self.assertFalse(result)
def test_vsperf_dpdk_is_dpdk_setup_yes(self, mock_subprocess):
p = vsperf_dpdk.VsperfDPDK(self.args, self.ctx)
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# is_dpdk_setup() specific mocks
self.mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
result = p._is_dpdk_setup()
- self.assertEqual(result, True)
+ self.assertTrue(result)
@mock.patch('time.sleep')
def test_vsperf_dpdk_dpdk_setup_first(self, _, mock_subprocess):
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# is_dpdk_setup() specific mocks
self.mock_ssh.SSH.from_node().execute.return_value = (0, 'dummy', '')
p.dpdk_setup()
- self.assertEqual(p._is_dpdk_setup(), False)
- self.assertEqual(p.dpdk_setup_done, True)
+ self.assertFalse(p._is_dpdk_setup())
+ self.assertTrue(p.dpdk_setup_done)
@mock.patch('time.sleep')
def test_vsperf_dpdk_dpdk_setup_next(self, _, mock_subprocess):
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
p.dpdk_setup()
- self.assertEqual(p._is_dpdk_setup(), True)
- self.assertEqual(p.dpdk_setup_done, True)
+ self.assertTrue(p._is_dpdk_setup())
+ self.assertTrue(p.dpdk_setup_done)
@mock.patch('time.sleep')
def test_vsperf_dpdk_dpdk_setup_fail(self, _, mock_subprocess):
p.setup()
self.assertIsNotNone(p.client)
self.mock_ssh.SSH.from_node().execute.return_value = (1, '', '')
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
self.assertRaises(RuntimeError, p.dpdk_setup)
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# run() specific mocks
mock_subprocess.call().execute.return_value = None
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# run() specific mocks
mock_subprocess.call().execute.return_value = None
p.setup()
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
# run() specific mocks
mock_subprocess.call().execute.return_value = None
p = parser.Parser({}, {})
mock_subprocess.call().return_value = 0
p.setup()
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_parser_successful(self, mock_subprocess):
args = {
p = parser.Parser({}, {})
mock_subprocess.call().return_value = 0
p.teardown()
- self.assertEqual(p.teardown_done, True)
+ self.assertTrue(p.teardown_done)
def main():
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_fio_job_file_successful_setup(self, mock_ssh):
mock_ssh.SSH.from_node().execute.return_value = (0, '', '')
self.assertIsNotNone(p.client)
- self.assertEqual(p.setup_done, True)
+ self.assertTrue(p.setup_done)
def test_fio_successful_no_sla(self, mock_ssh):
self.resource_profile.connection = self.ssh_mock
def test___init__(self):
- self.assertEqual(True, self.resource_profile.enable)
+ self.assertTrue(self.resource_profile.enable)
def test_check_if_system_agent_running(self):
self.assertEqual(self.resource_profile.check_if_system_agent_running("collectd"),
ping_parser = PingParser(q_out)
ping_parser.write(sample_out)
ping_parser.clear()
- self.assertEqual(True, q_out.empty())
+ self.assertTrue(q_out.empty())
def test_close(self):
q_out = Queue()