# See the License for the specific language governing permissions and
# limitations under the License.
-from __future__ import absolute_import
-import unittest
-
import errno
+
import mock
+import unittest
from yardstick.network_services.nfvi.resource import ResourceProfile
from yardstick.network_services.nfvi import resource, collectd
+from yardstick.common.exceptions import ResourceCommandError
+from yardstick.common.exceptions import SSHError
class TestResourceProfile(unittest.TestCase):
self.resource_profile.connection = self.ssh_mock
def test___init__(self):
- self.assertEqual(True, self.resource_profile.enable)
+ self.assertTrue(self.resource_profile.enable)
- def test_check_if_sa_running(self):
- self.assertEqual(self.resource_profile.check_if_sa_running("collectd"),
+ def test_check_if_system_agent_running(self):
+ self.assertEqual(self.resource_profile.check_if_system_agent_running("collectd"),
(0, ""))
- def test_check_if_sa_running_excetion(self):
+ def test_check_if_system_agent_running_excetion(self):
with mock.patch.object(self.resource_profile.connection, "execute") as mock_execute:
mock_execute.side_effect = OSError(errno.ECONNRESET, "error")
- self.assertEqual(self.resource_profile.check_if_sa_running("collectd"), (1, None))
+ self.assertEqual(
+ self.resource_profile.check_if_system_agent_running("collectd"),
+ (1, None))
def test_get_cpu_data(self):
reskey = ["", "cpufreq", "cpufreq-0"]
self.assertEqual(val, ('error', 'Invalid', '', ''))
def test__start_collectd(self):
- self.assertIsNone(
- self.resource_profile._start_collectd(self.ssh_mock, "/opt/nsb_bin"))
+ ssh_mock = mock.Mock()
+ ssh_mock.execute = mock.Mock(return_value=(0, "", ""))
+ self.assertIsNone(self.resource_profile._start_collectd(ssh_mock,
+ "/opt/nsb_bin"))
+
+ ssh_mock.execute = mock.Mock(side_effect=SSHError)
+ with self.assertRaises(SSHError):
+ self.resource_profile._start_collectd(ssh_mock, "/opt/nsb_bin")
+
+ ssh_mock.execute = mock.Mock(return_value=(1, "", ""))
+ self.assertIsNone(self.resource_profile._start_collectd(ssh_mock,
+ "/opt/nsb_bin"))
+
+ def test__start_rabbitmq(self):
+ ssh_mock = mock.Mock()
+ ssh_mock.execute = mock.Mock(return_value=(0, "RabbitMQ", ""))
+ self.assertIsNone(self.resource_profile._start_rabbitmq(ssh_mock))
+
+ ssh_mock.execute = mock.Mock(return_value=(0, "", ""))
+ with self.assertRaises(ResourceCommandError):
+ self.resource_profile._start_rabbitmq(ssh_mock)
+
+ ssh_mock.execute = mock.Mock(return_value=(1, "", ""))
+ with self.assertRaises(ResourceCommandError):
+ self.resource_profile._start_rabbitmq(ssh_mock)
def test__prepare_collectd_conf(self):
self.assertIsNone(
self.resource_profile._prepare_collectd_conf("/opt/nsb_bin"))
def test__setup_ovs_stats(self):
+ # TODO(elfoley): This method doesn't actually return anything, the side
+ # effects should be checked
self.assertIsNone(
self.resource_profile._setup_ovs_stats(self.ssh_mock))
- @mock.patch("yardstick.network_services.nfvi.resource.open")
- @mock.patch("yardstick.network_services.nfvi.resource.os")
- def test__provide_config_file(self, mock_open, mock_os):
+ def test__provide_config_file(self,):
loadplugin = range(5)
port_names = range(5)
kwargs = {
self.resource_profile._provide_config_file("/opt/nsb_bin", "collectd.conf", kwargs)
self.ssh_mock.execute.assert_called_once()
- @mock.patch("yardstick.network_services.nfvi.resource.open")
- def test_initiate_systemagent(self, mock_open):
+ def test_initiate_systemagent(self):
self.resource_profile._start_collectd = mock.Mock()
+ self.resource_profile._start_rabbitmq = mock.Mock()
self.assertIsNone(
self.resource_profile.initiate_systemagent("/opt/nsb_bin"))
- @mock.patch("yardstick.network_services.nfvi.resource.open")
- def test_initiate_systemagent_raise(self, mock_open):
- self.resource_profile._start_collectd = mock.Mock(side_effect=RuntimeError)
+ def test_initiate_systemagent_raise(self):
+ self.resource_profile._start_rabbitmq = mock.Mock(side_effect=RuntimeError)
with self.assertRaises(RuntimeError):
self.resource_profile.initiate_systemagent("/opt/nsb_bin")
self.resource_profile.run_collectd_amqp = \
mock.Mock(return_value=0)
res = self.resource_profile.amqp_process_for_nfvi_kpi()
- self.assertEqual(None, res)
+ self.assertIsNone(res)
def test_amqp_collect_nfvi_kpi(self):
self.resource_profile.amqp_client = \
def test_stop(self):
self.assertIsNone(self.resource_profile.stop())
- def test_stop(self):
+ def test_stop_amqp_not_running(self):
self.resource_profile.amqp_client = mock.MagicMock()
+ # TODO(efoley): Fix this incorrect test.
+ # Should check that we don't try to stop amqp when it's not running
self.assertIsNone(self.resource_profile.stop())
-
-if __name__ == '__main__':
- unittest.main()