Merge "Remove unused methods in SampleVNF"
[yardstick.git] / yardstick / tests / unit / common / test_utils.py
index 452b93a..666b29b 100644 (file)
@@ -7,23 +7,24 @@
 # http://www.apache.org/licenses/LICENSE-2.0
 ##############################################################################
 
-# Unittest for yardstick.common.utils
-
-from __future__ import absolute_import
-
 from copy import deepcopy
 import errno
+import importlib
 import ipaddress
 from itertools import product, chain
 import mock
 import os
 import six
 from six.moves import configparser
+import time
 import unittest
 
 import yardstick
-from yardstick.common import utils
+from yardstick import ssh
+import yardstick.error
 from yardstick.common import constants
+from yardstick.common import utils
+from yardstick.common import exceptions
 
 
 class IterSubclassesTestCase(unittest.TestCase):
@@ -60,8 +61,8 @@ class ImportModulesFromPackageTestCase(unittest.TestCase):
         utils.import_modules_from_package('foo.bar')
 
     @mock.patch('yardstick.common.utils.os.walk')
-    @mock.patch('yardstick.common.utils.importutils')
-    def test_import_modules_from_package(self, mock_importutils, mock_walk):
+    @mock.patch.object(importlib, 'import_module')
+    def test_import_modules_from_package(self, mock_import_module, mock_walk):
 
         yardstick_root = os.path.dirname(os.path.dirname(yardstick.__file__))
         mock_walk.return_value = ([
@@ -69,7 +70,7 @@ class ImportModulesFromPackageTestCase(unittest.TestCase):
         ])
 
         utils.import_modules_from_package('foo.bar')
-        mock_importutils.import_module.assert_called_with('bar.baz')
+        mock_import_module.assert_called_once_with('bar.baz')
 
 
 class GetParaFromYaml(unittest.TestCase):
@@ -128,6 +129,63 @@ class CommonUtilTestCase(unittest.TestCase):
             ("=".join(item) for item in sorted(flattened_data.items())))
         self.assertEqual(result, line)
 
+    def test_get_key_with_default_negative(self):
+        with self.assertRaises(KeyError):
+            utils.get_key_with_default({}, 'key1')
+
+    @mock.patch('yardstick.common.utils.open', create=True)
+    def test_(self, mock_open):
+        mock_open.side_effect = IOError
+
+        with self.assertRaises(IOError):
+            utils.find_relative_file('my/path', 'task/path')
+
+        self.assertEqual(mock_open.call_count, 2)
+
+    @mock.patch('yardstick.common.utils.open', create=True)
+    def test_open_relative_path(self, mock_open):
+        mock_open_result = mock_open()
+        mock_open_call_count = 1  # initial call to get result
+
+        self.assertEqual(utils.open_relative_file('foo', 'bar'), mock_open_result)
+
+        mock_open_call_count += 1  # one more call expected
+        self.assertEqual(mock_open.call_count, mock_open_call_count)
+        self.assertIn('foo', mock_open.call_args_list[-1][0][0])
+        self.assertNotIn('bar', mock_open.call_args_list[-1][0][0])
+
+        def open_effect(*args, **kwargs):
+            if kwargs.get('name', args[0]) == os.path.join('bar', 'foo'):
+                return mock_open_result
+            raise IOError(errno.ENOENT, 'not found')
+
+        mock_open.side_effect = open_effect
+        self.assertEqual(utils.open_relative_file('foo', 'bar'), mock_open_result)
+
+        mock_open_call_count += 2  # two more calls expected
+        self.assertEqual(mock_open.call_count, mock_open_call_count)
+        self.assertIn('foo', mock_open.call_args_list[-1][0][0])
+        self.assertIn('bar', mock_open.call_args_list[-1][0][0])
+
+        # test an IOError of type ENOENT
+        mock_open.side_effect = IOError(errno.ENOENT, 'not found')
+        with self.assertRaises(IOError):
+            # the second call still raises
+            utils.open_relative_file('foo', 'bar')
+
+        mock_open_call_count += 2  # two more calls expected
+        self.assertEqual(mock_open.call_count, mock_open_call_count)
+        self.assertIn('foo', mock_open.call_args_list[-1][0][0])
+        self.assertIn('bar', mock_open.call_args_list[-1][0][0])
+
+        # test an IOError other than ENOENT
+        mock_open.side_effect = IOError(errno.EBUSY, 'busy')
+        with self.assertRaises(IOError):
+            utils.open_relative_file('foo', 'bar')
+
+        mock_open_call_count += 1  # one more call expected
+        self.assertEqual(mock_open.call_count, mock_open_call_count)
+
 
 class TestMacAddressToHex(unittest.TestCase):
 
@@ -249,8 +307,8 @@ power management:
 
 """
         socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
-        assert sorted(socket_map.keys()) == [0]
-        assert sorted(socket_map[0].keys()) == [2, 3, 4]
+        self.assertEqual(sorted(socket_map.keys()), [0])
+        self.assertEqual(sorted(socket_map[0].keys()), [2, 3, 4])
 
     def test_single_socket_hyperthread(self):
         cpuinfo = """\
@@ -337,11 +395,11 @@ power management:
 
 """
         socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
-        assert sorted(socket_map.keys()) == [0]
-        assert sorted(socket_map[0].keys()) == [1, 2, 3]
-        assert sorted(socket_map[0][1]) == [5]
-        assert sorted(socket_map[0][2]) == [6]
-        assert sorted(socket_map[0][3]) == [7]
+        self.assertEqual(sorted(socket_map.keys()), [0])
+        self.assertEqual(sorted(socket_map[0].keys()), [1, 2, 3])
+        self.assertEqual(sorted(socket_map[0][1]), [5])
+        self.assertEqual(sorted(socket_map[0][2]), [6])
+        self.assertEqual(sorted(socket_map[0][3]), [7])
 
     def test_dual_socket_hyperthread(self):
         cpuinfo = """\
@@ -536,15 +594,15 @@ power management:
 
 """
         socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
-        assert sorted(socket_map.keys()) == [0, 1]
-        assert sorted(socket_map[0].keys()) == [0, 1, 2]
-        assert sorted(socket_map[1].keys()) == [26, 27, 28]
-        assert sorted(socket_map[0][0]) == [44]
-        assert sorted(socket_map[0][1]) == [1]
-        assert sorted(socket_map[0][2]) == [2]
-        assert sorted(socket_map[1][26]) == [85]
-        assert sorted(socket_map[1][27]) == [86]
-        assert sorted(socket_map[1][28]) == [43, 87]
+        self.assertEqual(sorted(socket_map.keys()), [0, 1])
+        self.assertEqual(sorted(socket_map[0].keys()), [0, 1, 2])
+        self.assertEqual(sorted(socket_map[1].keys()), [26, 27, 28])
+        self.assertEqual(sorted(socket_map[0][0]), [44])
+        self.assertEqual(sorted(socket_map[0][1]), [1])
+        self.assertEqual(sorted(socket_map[0][2]), [2])
+        self.assertEqual(sorted(socket_map[1][26]), [85])
+        self.assertEqual(sorted(socket_map[1][27]), [86])
+        self.assertEqual(sorted(socket_map[1][28]), [43, 87])
 
     def test_dual_socket_no_hyperthread(self):
         cpuinfo = """\
@@ -740,11 +798,11 @@ power management:
 """
         socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
         processors = socket_map.processors()
-        assert processors == [1, 2, 43, 44, 85, 86, 87]
+        self.assertEqual(processors, [1, 2, 43, 44, 85, 86, 87])
         cores = socket_map.cores()
-        assert cores == [0, 1, 2, 26, 27, 28]
+        self.assertEqual(cores, [0, 1, 2, 26, 27, 28])
         sockets = socket_map.sockets()
-        assert sockets == [0, 1]
+        self.assertEqual(sockets, [0, 1])
 
 
 class ChangeObjToDictTestCase(unittest.TestCase):
@@ -933,9 +991,9 @@ class TestUtils(unittest.TestCase):
 
     def test_error_class(self):
         with self.assertRaises(RuntimeError):
-            utils.ErrorClass()
+            yardstick.error.ErrorClass()
 
-        error_instance = utils.ErrorClass(test='')
+        error_instance = yardstick.error.ErrorClass(test='')
         with self.assertRaises(AttributeError):
             error_instance.get_name()
 
@@ -1078,8 +1136,67 @@ class SafeDecodeUtf8TestCase(unittest.TestCase):
         self.assertEqual('this is a byte array', out)
 
 
-def main():
-    unittest.main()
-
-if __name__ == '__main__':
-    main()
+class ReadMeminfoTestCase(unittest.TestCase):
+
+    MEMINFO = (b'MemTotal:       65860500 kB\n'
+               b'MemFree:        28690900 kB\n'
+               b'MemAvailable:   52873764 kB\n'
+               b'Active(anon):    3015676 kB\n'
+               b'HugePages_Total:       8\n'
+               b'Hugepagesize:    1048576 kB')
+    MEMINFO_DICT = {'MemTotal': '65860500',
+                    'MemFree': '28690900',
+                    'MemAvailable': '52873764',
+                    'Active(anon)': '3015676',
+                    'HugePages_Total': '8',
+                    'Hugepagesize': '1048576'}
+
+    def test_read_meminfo(self):
+        ssh_client = ssh.SSH('user', 'host')
+        with mock.patch.object(ssh_client, 'get_file_obj') as \
+                mock_get_client, \
+                mock.patch.object(six, 'BytesIO',
+                                  return_value=six.BytesIO(self.MEMINFO)):
+            output = utils.read_meminfo(ssh_client)
+            mock_get_client.assert_called_once_with('/proc/meminfo', mock.ANY)
+        self.assertEqual(self.MEMINFO_DICT, output)
+
+
+class TimerTestCase(unittest.TestCase):
+
+    def test__getattr(self):
+        with utils.Timer() as timer:
+            time.sleep(1)
+        self.assertEqual(1, round(timer.total_seconds(), 0))
+        self.assertEqual(1, timer.delta.seconds)
+
+    def test__enter_with_timeout(self):
+        with utils.Timer(timeout=10) as timer:
+            time.sleep(1)
+        self.assertEqual(1, round(timer.total_seconds(), 0))
+
+    def test__enter_with_timeout_exception(self):
+        with self.assertRaises(exceptions.TimerTimeout):
+            with utils.Timer(timeout=1):
+                time.sleep(2)
+
+
+class WaitUntilTrueTestCase(unittest.TestCase):
+
+    def test_no_timeout(self):
+        self.assertIsNone(utils.wait_until_true(lambda: True,
+                                                timeout=1, sleep=1))
+
+    def test_timeout_generic_exception(self):
+        with self.assertRaises(exceptions.WaitTimeout):
+            self.assertIsNone(utils.wait_until_true(lambda: False,
+                                                    timeout=1, sleep=1))
+
+    def test_timeout_given_exception(self):
+        class MyTimeoutException(exceptions.YardstickException):
+            message = 'My timeout exception'
+
+        with self.assertRaises(MyTimeoutException):
+            self.assertIsNone(
+                utils.wait_until_true(lambda: False, timeout=1, sleep=1,
+                                      exception=MyTimeoutException))