- def test_collect_kpi(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- vpe_approx_vnf.resource = mock.Mock(autospec=ResourceProfile)
- vpe_approx_vnf.resource.check_if_sa_running = \
- mock.Mock(return_value=[0, 1])
- vpe_approx_vnf.resource.amqp_collect_nfvi_kpi = \
- mock.Mock(return_value={})
- result = {'pkt_in_down_stream': 0,
- 'pkt_in_up_stream': 0,
- 'collect_stats': {'core': {}},
- 'pkt_drop_down_stream': 0, 'pkt_drop_up_stream': 0}
- # mock execute_command because it sleeps for 3 seconds.
- with mock.patch.object(vpe_approx_vnf, "execute_command", return_value=""):
- self.assertEqual(result, vpe_approx_vnf.collect_kpi())
-
- def test_execute_command(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- cmd = "quit"
- self.assertEqual("", vpe_approx_vnf.execute_command(cmd))
-
- def test_get_stats_vpe(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- vpe_approx_vnf.execute_command = \
- mock.Mock(return_value="Pkts in: 101\r\n\tPkts dropped by AH: 100\r\n\tPkts dropped by other: 100")
- result = {'pkt_in_down_stream': 202, 'pkt_in_up_stream': 303,
- 'pkt_drop_down_stream': 400, 'pkt_drop_up_stream': 600}
- self.assertEqual(result, vpe_approx_vnf.get_stats_vpe())
-
- def test_run_vpe(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh_mock.run = \
- mock.Mock(return_value=(0, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- curr_path = os.path.dirname(os.path.abspath(__file__))
- vpe_vnf = os.path.join(curr_path, "vpe_config")
- queue_wrapper = \
- QueueFileWrapper(vpe_approx_vnf.q_in,
- vpe_approx_vnf.q_out, "pipeline>")
- self.assertEqual(None,
- vpe_approx_vnf._run_vpe(queue_wrapper, vpe_vnf))
-
- def test_instantiate(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- self.scenario_cfg['vnf_options'] = {'vpe': {'cfg': ""}}
- vpe_approx_vnf._run_vpe = mock.Mock(return_value=0)
- vpe_approx_vnf._resource_collect_start = mock.Mock(return_value=0)
- vpe_approx_vnf.q_out.put("pipeline>")
- vpe_vnf.WAIT_TIME = 0.1
- # if process it still running exitcode will be None
- self.assertIn(vpe_approx_vnf.instantiate(self.scenario_cfg, self.context_cfg),
- {0, None})
-
- def test_instantiate_panic(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(1, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- self.scenario_cfg['vnf_options'] = {'vpe': {'cfg': ""}}
- vpe_approx_vnf._run_vpe = mock.Mock(return_value=0)
- vpe_vnf.WAIT_TIME = 0.1
- self.assertRaises(RuntimeError, vpe_approx_vnf.instantiate,
- self.scenario_cfg, self.context_cfg)
-
- def test_scale(self, mock_time):
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- flavor = ""
- self.assertRaises(NotImplementedError, vpe_approx_vnf.scale, flavor)
-
- def test_setup_vnf_environment(self, mock_time):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.from_node.return_value = ssh_mock
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- self.assertEqual(None,
- vpe_approx_vnf.setup_vnf_environment(ssh_mock))
-
- def test_terminate(self, mock_time):
- vnfd = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]
- vpe_approx_vnf = VpeApproxVnf(vnfd)
- self.assertEqual(None, vpe_approx_vnf.terminate())
-
-if __name__ == '__main__':
- unittest.main()
+ @mock.patch(SSH_HELPER)
+ def test_collect_kpi_sa_not_running(self, ssh):
+ mock_ssh(ssh)
+
+ resource = mock.Mock(autospec=ResourceProfile)
+ resource.check_if_system_agent_running.return_value = 1, ''
+ resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234}
+ resource.check_if_system_agent_running.return_value = (1, None)
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf.q_in = mock.MagicMock()
+ vpe_approx_vnf.q_out = mock.MagicMock()
+ vpe_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
+ vpe_approx_vnf.resource_helper.resource = resource
+
+ expected = {
+ 'pkt_in_down_stream': 0,
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'collect_stats': {'core': {}},
+ }
+ self.assertEqual(vpe_approx_vnf.collect_kpi(), expected)
+
+ @mock.patch(SSH_HELPER)
+ def test_collect_kpi_sa_running(self, ssh):
+ mock_ssh(ssh)
+
+ resource = mock.Mock(autospec=ResourceProfile)
+ resource.check_if_system_agent_running.return_value = 0, '1234'
+ resource.amqp_collect_nfvi_kpi.return_value = {'foo': 234}
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf.q_in = mock.MagicMock()
+ vpe_approx_vnf.q_out = mock.MagicMock()
+ vpe_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
+ vpe_approx_vnf.resource_helper.resource = resource
+
+ expected = {
+ 'pkt_in_down_stream': 0,
+ 'pkt_in_up_stream': 0,
+ 'pkt_drop_down_stream': 0,
+ 'pkt_drop_up_stream': 0,
+ 'collect_stats': {'core': {'foo': 234}},
+ }
+ self.assertEqual(vpe_approx_vnf.collect_kpi(), expected)
+
+ @mock.patch(SSH_HELPER)
+ def test_vnf_execute(self, ssh):
+ mock_ssh(ssh)
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf.q_in = mock.MagicMock()
+ vpe_approx_vnf.q_out = mock.MagicMock()
+ vpe_approx_vnf.q_out.qsize = mock.Mock(return_value=0)
+ self.assertEqual(vpe_approx_vnf.vnf_execute("quit", 0), '')
+
+ @mock.patch(SSH_HELPER)
+ def test_run_vpe(self, ssh):
+ mock_ssh(ssh)
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf.tc_file_name = get_file_abspath(TEST_FILE_YAML)
+ vpe_approx_vnf.vnf_cfg = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+ vpe_approx_vnf.scenario_helper.scenario_cfg = {
+ 'options': {
+ NAME: {
+ 'traffic_type': '4',
+ 'topology': 'nsb_test_case.yaml',
+ 'vnf_config': 'vpe_config',
+ }
+ }
+ }
+ vpe_approx_vnf.topology = "nsb_test_case.yaml"
+ vpe_approx_vnf.nfvi_type = "baremetal"
+ vpe_approx_vnf._provide_config_file = mock.Mock()
+ vpe_approx_vnf._build_config = mock.MagicMock()
+
+ self.assertIsInstance(vpe_approx_vnf.ssh_helper, mock.Mock)
+ self.assertIsInstance(vpe_approx_vnf.ssh_helper, mock.Mock)
+ self.assertIsNone(vpe_approx_vnf._run())
+
+ @mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.MultiPortConfig")
+ @mock.patch("yardstick.network_services.vnf_generic.vnf.sample_vnf.Context")
+ @mock.patch("yardstick.network_services.vnf_generic.vnf.vpe_vnf.ConfigCreate")
+ @mock.patch("yardstick.network_services.vnf_generic.vnf.vpe_vnf.open")
+ @mock.patch(SSH_HELPER)
+ def test_build_config(self, ssh, *args):
+ mock_ssh(ssh)
+ vpe_approx_vnf = VpeApproxSetupEnvHelper(mock.MagicMock(),
+ mock.MagicMock, mock.MagicMock)
+ vpe_approx_vnf.tc_file_name = get_file_abspath(TEST_FILE_YAML)
+ vpe_approx_vnf.generate_port_pairs = mock.Mock()
+ vpe_approx_vnf.vnf_cfg = {
+ 'lb_config': 'SW',
+ 'lb_count': 1,
+ 'worker_config': '1C/1T',
+ 'worker_threads': 1,
+ }
+ vpe_approx_vnf.scenario_helper.scenario_cfg = {
+ 'options': {
+ NAME: {
+ 'traffic_type': '4',
+ 'topology': 'nsb_test_case.yaml',
+ 'vnf_config': 'vpe_config',
+ }
+ }
+ }
+ vpe_approx_vnf.topology = "nsb_test_case.yaml"
+ vpe_approx_vnf.nfvi_type = "baremetal"
+ vpe_approx_vnf._provide_config_file = mock.Mock()
+
+ vpe_approx_vnf.ssh_helper = mock.MagicMock()
+ vpe_approx_vnf.scenario_helper = mock.MagicMock()
+ vpe_approx_vnf.ssh_helper.bin_path = mock.Mock()
+ vpe_approx_vnf.ssh_helper.upload_config_file = mock.MagicMock()
+ self.assertIsNone(vpe_approx_vnf._build_vnf_ports())
+ self.assertIsNotNone(vpe_approx_vnf.build_config())
+
+ @mock.patch(SSH_HELPER)
+ def test_wait_for_instantiate(self, ssh):
+ mock_ssh(ssh)
+
+ mock_process = mock.Mock(autospec=Process)
+ mock_process.is_alive.return_value = True
+ mock_process.exitcode = 432
+
+ mock_q_out = mock.Mock(autospec=Queue)
+ mock_q_out.get.side_effect = iter(["pipeline>"])
+ mock_q_out.qsize.side_effect = range(1, -1, -1)
+
+ mock_resource = mock.MagicMock()
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf._vnf_process = mock_process
+ vpe_approx_vnf.q_out = mock_q_out
+ vpe_approx_vnf.queue_wrapper = mock.Mock(autospec=QueueFileWrapper)
+ vpe_approx_vnf.resource_helper.resource = mock_resource
+
+ vpe_approx_vnf.q_out.put("pipeline>")
+ self.assertEqual(vpe_approx_vnf.wait_for_instantiate(), 432)
+
+ @mock.patch(SSH_HELPER)
+ def test_wait_for_instantiate_fragmented(self, ssh):
+ mock_ssh(ssh)
+
+ mock_process = mock.Mock(autospec=Process)
+ mock_process.is_alive.return_value = True
+ mock_process.exitcode = 432
+
+ # test that fragmented pipeline prompt is recognized
+ mock_q_out = mock.Mock(autospec=Queue)
+ mock_q_out.get.side_effect = iter(["wow pipel", "ine>"])
+ mock_q_out.qsize.side_effect = range(2, -1, -1)
+
+ mock_resource = mock.MagicMock()
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf._vnf_process = mock_process
+ vpe_approx_vnf.q_out = mock_q_out
+ vpe_approx_vnf.queue_wrapper = mock.Mock(autospec=QueueFileWrapper)
+ vpe_approx_vnf.resource_helper.resource = mock_resource
+
+ self.assertEqual(vpe_approx_vnf.wait_for_instantiate(), 432)
+
+ @mock.patch(SSH_HELPER)
+ def test_wait_for_instantiate_crash(self, ssh):
+ mock_ssh(ssh, exec_result=(1, "", ""))
+
+ mock_process = mock.Mock(autospec=Process)
+ mock_process.is_alive.return_value = False
+ mock_process.exitcode = 432
+
+ mock_resource = mock.MagicMock()
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf._vnf_process = mock_process
+ vpe_approx_vnf.resource_helper.resource = mock_resource
+
+ with self.assertRaises(RuntimeError) as raised:
+ vpe_approx_vnf.wait_for_instantiate()
+
+ self.assertIn('VNF process died', str(raised.exception))
+
+ @mock.patch(SSH_HELPER)
+ def test_wait_for_instantiate_panic(self, ssh):
+ mock_ssh(ssh, exec_result=(1, "", ""))
+
+ mock_process = mock.Mock(autospec=Process)
+ mock_process.is_alive.return_value = True
+ mock_process.exitcode = 432
+
+ mock_resource = mock.MagicMock()
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf._vnf_process = mock_process
+ vpe_approx_vnf.resource_helper.resource = mock_resource
+
+ vpe_approx_vnf.q_out.put("PANIC")
+ with self.assertRaises(RuntimeError) as raised:
+ vpe_approx_vnf.wait_for_instantiate()
+
+ self.assertIn('Error starting', str(raised.exception))
+
+ @mock.patch(SSH_HELPER)
+ def test_wait_for_instantiate_panic_fragmented(self, ssh):
+ mock_ssh(ssh, exec_result=(1, "", ""))
+
+ mock_process = mock.Mock(autospec=Process)
+ mock_process.is_alive.return_value = True
+ mock_process.exitcode = 432
+
+ # test that fragmented PANIC is recognized
+ mock_q_out = mock.Mock(autospec=Queue)
+ mock_q_out.get.side_effect = iter(["omg PA", "NIC this is bad"])
+ mock_q_out.qsize.side_effect = range(2, -1, -1)
+
+ mock_resource = mock.MagicMock()
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf._vnf_process = mock_process
+ vpe_approx_vnf.q_out = mock_q_out
+ vpe_approx_vnf.resource_helper.resource = mock_resource
+
+ with self.assertRaises(RuntimeError) as raised:
+ vpe_approx_vnf.wait_for_instantiate()
+
+ self.assertIn('Error starting', str(raised.exception))
+
+ @mock.patch(SSH_HELPER)
+ def test_terminate(self, ssh):
+ mock_ssh(ssh)
+
+ vpe_approx_vnf = VpeApproxVnf(NAME, self.VNFD_0)
+ vpe_approx_vnf._vnf_process = mock.MagicMock()
+ vpe_approx_vnf._resource_collect_stop = mock.Mock()
+ vpe_approx_vnf.resource_helper = mock.MagicMock()
+
+ self.assertIsNone(vpe_approx_vnf.terminate())