from __future__ import absolute_import
import unittest
+
+import errno
import mock
from yardstick.network_services.nfvi.resource import ResourceProfile
with mock.patch("yardstick.ssh.AutoConnectSSH") as ssh:
self.ssh_mock = mock.Mock(autospec=ssh.SSH)
self.ssh_mock.execute = \
- mock.Mock(return_value=(0, {}, ""))
+ mock.Mock(return_value=(0, "", ""))
ssh.from_node.return_value = self.ssh_mock
mgmt = self.VNFD['vnfd:vnfd-catalog']['vnfd'][0]['mgmt-interface']
def test_check_if_sa_running(self):
self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
- [True, {}])
+ (0, ""))
+
+ def test_check_if_sa_running_excetion(self):
+ with mock.patch.object(self.resource_profile.connection, "execute") as mock_execute:
+ mock_execute.side_effect = OSError(errno.ECONNRESET, "error")
+ self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), (1, None))
def test_get_cpu_data(self):
reskey = ["", "cpufreq", "cpufreq-0"]
self.assertIsNone(
self.resource_profile._prepare_collectd_conf("/opt/nsb_bin"))
+ def test__setup_intel_pmu(self):
+ self.assertIsNone(
+ self.resource_profile._setup_intel_pmu(self.ssh_mock, "/opt/nsb_bin"))
+
+ def test__setup_ovs_stats(self):
+ self.assertIsNone(
+ self.resource_profile._setup_ovs_stats(self.ssh_mock))
@mock.patch("yardstick.network_services.nfvi.resource.open")
@mock.patch("yardstick.network_services.nfvi.resource.os")
self.resource_profile._provide_config_file("/opt/nsb_bin", "collectd.conf", kwargs)
self.ssh_mock.execute.assert_called_once()
-
@mock.patch("yardstick.network_services.nfvi.resource.open")
def test_initiate_systemagent(self, mock_open):
self.resource_profile._start_collectd = mock.Mock()
self.assertIsNone(
self.resource_profile.initiate_systemagent("/opt/nsb_bin"))
+ @mock.patch("yardstick.network_services.nfvi.resource.open")
+ def test_initiate_systemagent_raise(self, mock_open):
+ self.resource_profile._start_collectd = mock.Mock(side_effect=RuntimeError)
+ with self.assertRaises(RuntimeError):
+ self.resource_profile.initiate_systemagent("/opt/nsb_bin")
+
def test__parse_hugepages(self):
reskey = ["cpu", "cpuFreq"]
value = "timestamp:12345"
self.assertEqual({'ovs/stats': '45'}, res)
def test_parse_collectd_result(self):
- res = self.resource_profile.parse_collectd_result({}, [0, 1, 2])
+ res = self.resource_profile.parse_collectd_result({})
expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {},
'memory': {}, 'ovs_stats': {}, 'timestamp': '',
- 'intel_pmu': {},
'virt': {}}
self.assertDictEqual(res, expected_result)
"ipc",
"1234",
""])
- res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+ res = self.resource_profile.parse_collectd_result(metric)
expected_result = {'cpu': {1: {'ipc': '1234'}}, 'dpdkstat': {}, 'hugepages': {},
'memory': {}, 'ovs_stats': {}, 'timestamp': '',
- 'intel_pmu': {},
'virt': {}}
self.assertDictEqual(res, expected_result)
def test_parse_collectd_result_memory(self):
metric = {"nsb_stats/memory/bw": "101"}
- res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+ res = self.resource_profile.parse_collectd_result(metric)
expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {},
'memory': {'bw': '101'}, 'ovs_stats': {}, 'timestamp': '',
- 'intel_pmu': {},
'virt': {}}
self.assertDictEqual(res, expected_result)
def test_parse_collectd_result_hugepage(self):
# amqp returns bytes
metric = {b"nsb_stats/hugepages/free": b"101"}
- self.resource_profile.parse_hugepages = \
- mock.Mock(return_value={"free": "101"})
- res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+ self.resource_profile.parse_hugepages = mock.Mock(return_value={"free": "101"})
+ res = self.resource_profile.parse_collectd_result(metric)
expected_result = {'cpu': {}, 'dpdkstat': {}, 'hugepages': {'free': '101'},
'memory': {}, 'ovs_stats': {}, 'timestamp': '',
- 'intel_pmu': {},
'virt': {}}
self.assertDictEqual(res, expected_result)
mock.Mock(return_value={"memory": "101"})
self.resource_profile.parse_ovs_stats = \
mock.Mock(return_value={"tx": "101"})
- res = self.resource_profile.parse_collectd_result(metric, [0, 1, 2])
+ res = self.resource_profile.parse_collectd_result(metric)
expected_result = {'cpu': {}, 'dpdkstat': {'tx': '101'}, 'hugepages': {},
'memory': {}, 'ovs_stats': {'tx': '101'}, 'timestamp': '',
- 'intel_pmu': {},
'virt': {'memory': '101'}}
self.assertDictEqual(res, expected_result)
def test_stop(self):
self.assertIsNone(self.resource_profile.stop())
+ def test_stop(self):
+ self.resource_profile.amqp_client = mock.MagicMock()
+ self.assertIsNone(self.resource_profile.stop())
+
if __name__ == '__main__':
unittest.main()