from __future__ import absolute_import
import unittest
-import multiprocessing
+
+import errno
import mock
from yardstick.network_services.nfvi.resource import ResourceProfile
'local_ip': '172.16.100.19',
'type': 'PCI-PASSTHROUGH',
'netmask': '255.255.255.0',
- 'dpdk_port_num': '0',
+ 'dpdk_port_num': 0,
'bandwidth': '10 Gbps',
'dst_ip': '172.16.100.20',
'local_mac': '3c:fd:fe:a1:2b:80'},
'local_ip': '172.16.40.19',
'type': 'PCI-PASSTHROUGH',
'netmask': '255.255.255.0',
- 'dpdk_port_num': '1',
+ 'dpdk_port_num': 1,
'bandwidth': '10 Gbps',
'dst_ip': '172.16.40.20',
'local_mac': '3c:fd:fe:a1:2b:81'},
'id': 'VpeApproxVnf', 'name': 'VPEVnfSsh'}]}}
def setUp(self):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, {}, ""))
- ssh.return_value = ssh_mock
+ with mock.patch("yardstick.ssh.AutoConnectSSH") as ssh:
+ self.ssh_mock = mock.Mock(autospec=ssh.SSH)
+ self.ssh_mock.execute = \
+ mock.Mock(return_value=(0, "", ""))
+ ssh.from_node.return_value = self.ssh_mock
+ mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface']
+ # interfaces = \
+ # self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface']
+ port_names = \
+ self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['vdu'][0]['external-interface']
self.resource_profile = \
- ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
- [1, 2, 3])
+ ResourceProfile(mgmt, port_names, [1, 2, 3])
+ self.resource_profile.connection = self.ssh_mock
def test___init__(self):
self.assertEqual(True, self.resource_profile.enable)
def test_check_if_sa_running(self):
self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
- [True, {}])
+ (0, ""))
+
+ def test_check_if_sa_running_excetion(self):
+ with mock.patch.object(self.resource_profile.connection, "execute") as mock_execute:
+ mock_execute.side_effect = OSError(errno.ECONNRESET, "error")
+ self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), (1, None))
def test_get_cpu_data(self):
reskey = ["", "cpufreq", "cpufreq-0"]
value = "metric:10"
- val = self.resource_profile.get_cpu_data(reskey, value)
- self.assertEqual(val, ['0', 'cpufreq', '10', 'metric'])
+ val = self.resource_profile.get_cpu_data(reskey[1], reskey[2], value)
+ self.assertIsNotNone(val)
def test_get_cpu_data_error(self):
reskey = ["", "", ""]
value = "metric:10"
- val = self.resource_profile.get_cpu_data(reskey, value)
- self.assertEqual(val, ['error', 'Invalid', ''])
+ val = self.resource_profile.get_cpu_data(reskey[0], reskey[1], value)
+ self.assertEqual(val, ('error', 'Invalid', '', ''))
def test__start_collectd(self):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.return_value = ssh_mock
- resource_profile = \
- ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
- [1, 2, 3])
self.assertIsNone(
- resource_profile._start_collectd(ssh_mock, "/opt/nsb_bin"))
+ self.resource_profile._start_collectd(self.ssh_mock, "/opt/nsb_bin"))
- def test_initiate_systemagent(self):
- with mock.patch("yardstick.ssh.SSH") as ssh:
- ssh_mock = mock.Mock(autospec=ssh.SSH)
- ssh_mock.execute = \
- mock.Mock(return_value=(0, "", ""))
- ssh.return_value = ssh_mock
- resource_profile = \
- ResourceProfile(self.VNFD['vnfd:vnfd-catalog']['vnfd'][0],
- [1, 2, 3])
+ def test__prepare_collectd_conf(self):
self.assertIsNone(
- resource_profile.initiate_systemagent("/opt/nsb_bin"))
+ self.resource_profile._prepare_collectd_conf("/opt/nsb_bin"))
+
+ def test__setup_intel_pmu(self):
+ self.assertIsNone(
+ self.resource_profile._setup_intel_pmu(self.ssh_mock, "/opt/nsb_bin"))
+
+ def test__setup_ovs_stats(self):
+ self.assertIsNone(
+ self.resource_profile._setup_ovs_stats(self.ssh_mock))
+
+ @mock.patch("yardstick.network_services.nfvi.resource.open")
+ @mock.patch("yardstick.network_services.nfvi.resource.os")
+ def test__provide_config_file(self, mock_open, mock_os):
+ loadplugin = range(5)
+ port_names = range(5)
+ kwargs = {
+ "interval": '25',
+ "loadplugin": loadplugin,
+ "port_names": port_names,
+ }
+ self.resource_profile._provide_config_file("/opt/nsb_bin", "collectd.conf", kwargs)
+ self.ssh_mock.execute.assert_called_once()
+
+ @mock.patch("yardstick.network_services.nfvi.resource.open")
+ def test_initiate_systemagent(self, mock_open):
+ self.resource_profile._start_collectd = mock.Mock()
+ self.assertIsNone(
+ self.resource_profile.initiate_systemagent("/opt/nsb_bin"))
+
+ @mock.patch("yardstick.network_services.nfvi.resource.open")
+ def test_initiate_systemagent_raise(self, mock_open):
+ self.resource_profile._start_collectd = mock.Mock(side_effect=RuntimeError)
+ with self.assertRaises(RuntimeError):
+ self.resource_profile.initiate_systemagent("/opt/nsb_bin")
+
+ def test__parse_hugepages(self):
+ reskey = ["cpu", "cpuFreq"]
+ value = "timestamp:12345"
+ res = self.resource_profile.parse_hugepages(reskey, value)
+ self.assertEqual({'cpu/cpuFreq': '12345'}, res)
+
+ def test__parse_dpdkstat(self):
+ reskey = ["dpdk0", "0"]
+ value = "tx:12345"
+ res = self.resource_profile.parse_dpdkstat(reskey, value)
+ self.assertEqual({'dpdk0/0': '12345'}, res)
+
+ def test__parse_virt(self):
+ reskey = ["vm0", "cpu"]
+ value = "load:45"
+ res = self.resource_profile.parse_virt(reskey, value)
+ self.assertEqual({'vm0/cpu': '45'}, res)
+
+ def test__parse_ovs_stats(self):
+ reskey = ["ovs", "stats"]
+ value = "tx:45"
+ res = self.resource_profile.parse_ovs_stats(reskey, value)
+ self.assertEqual({'ovs/stats': '45'}, res)
def test_parse_collectd_result(self):
- res = self.resource_profile.parse_collectd_result({}, [0, 1, 2])
- self.assertDictEqual(res, {'timestamp': '', 'cpu': {}, 'memory': {}})
+ res = self.resource_profile.parse_collectd_result({})
+ expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {},
+ 'memory': {}, 'ovs_stats': {}, 'timestamp': '',
+ 'virt': {}}
+ self.assertDictEqual(res, expected_result)
+
+ def test_parse_collectd_result_cpu(self):
+ metric = {"nsb_stats/cpu/0/ipc": "101"}
+ self.resource_profile.get_cpu_data = mock.Mock(return_value=[1,
+ "ipc",
+ "1234",
+ ""])
+ res = self.resource_profile.parse_collectd_result(metric)
+ expected_result = {'cpu': {1: {'ipc': '1234'}}, 'dpdkstat': {}, 'hugepages': {},
+ 'memory': {}, 'ovs_stats': {}, 'timestamp': '',
+ 'virt': {}}
+ self.assertDictEqual(res, expected_result)
+
+ def test_parse_collectd_result_memory(self):
+ metric = {"nsb_stats/memory/bw": "101"}
+ res = self.resource_profile.parse_collectd_result(metric)
+ expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {},
+ 'memory': {'bw': '101'}, 'ovs_stats': {}, 'timestamp': '',
+ 'virt': {}}
+ self.assertDictEqual(res, expected_result)
+
+ def test_parse_collectd_result_hugepage(self):
+ # amqp returns bytes
+ metric = {b"nsb_stats/hugepages/free": b"101"}
+ self.resource_profile.parse_hugepages = mock.Mock(return_value={"free": "101"})
+ res = self.resource_profile.parse_collectd_result(metric)
+ expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {'free': '101'},
+ 'memory': {}, 'ovs_stats': {}, 'timestamp': '',
+ 'virt': {}}
+ self.assertDictEqual(res, expected_result)
+
+ def test_parse_collectd_result_dpdk_virt_ovs(self):
+ metric = {b"nsb_stats/dpdkstat/tx": b"101",
+ b"nsb_stats/ovs_stats/tx": b"101",
+ b"nsb_stats/virt/virt/memory": b"101"}
+ self.resource_profile.parse_dpdkstat = \
+ mock.Mock(return_value={"tx": "101"})
+ self.resource_profile.parse_virt = \
+ mock.Mock(return_value={"memory": "101"})
+ self.resource_profile.parse_ovs_stats = \
+ mock.Mock(return_value={"tx": "101"})
+ res = self.resource_profile.parse_collectd_result(metric)
+ expected_result = {'cpu': {}, 'dpdkstat': {'tx': '101'}, 'hugepages': {},
+ 'memory': {}, 'ovs_stats': {'tx': '101'}, 'timestamp': '',
+ 'virt': {'memory': '101'}}
+ self.assertDictEqual(res, expected_result)
+
+ def test_amqp_process_for_nfvi_kpi(self):
+ self.resource_profile.amqp_client = \
+ mock.MagicMock(side_effect=[None, mock.MagicMock()])
+ self.resource_profile.run_collectd_amqp = \
+ mock.Mock(return_value=0)
+ res = self.resource_profile.amqp_process_for_nfvi_kpi()
+ self.assertEqual(None, res)
+
+ def test_amqp_collect_nfvi_kpi(self):
+ self.resource_profile.amqp_client = \
+ mock.MagicMock(side_effect=[None, mock.MagicMock()])
+ self.resource_profile.run_collectd_amqp = \
+ mock.Mock(return_value=0)
+ self.resource_profile.parse_collectd_result = mock.Mock()
+ res = self.resource_profile.amqp_collect_nfvi_kpi()
+ self.assertIsNotNone(res)
def test_run_collectd_amqp(self):
- _queue = multiprocessing.Queue()
resource.AmqpConsumer = mock.Mock(autospec=collectd)
- self.assertIsNone(self.resource_profile.run_collectd_amqp(_queue))
+ self.assertIsNone(self.resource_profile.run_collectd_amqp())
def test_start(self):
self.assertIsNone(self.resource_profile.start())
def test_stop(self):
self.assertIsNone(self.resource_profile.stop())
+ def test_stop(self):
+ self.resource_profile.amqp_client = mock.MagicMock()
+ self.assertIsNone(self.resource_profile.stop())
+
if __name__ == '__main__':
unittest.main()