import os
import six
from six.moves import configparser
+import time
import unittest
import yardstick
-from yardstick.common import utils
+from yardstick import ssh
+import yardstick.error
from yardstick.common import constants
+from yardstick.common import utils
+from yardstick.common import exceptions
class IterSubclassesTestCase(unittest.TestCase):
("=".join(item) for item in sorted(flattened_data.items())))
self.assertEqual(result, line)
+ def test_get_key_with_default_negative(self):
+ with self.assertRaises(KeyError):
+ utils.get_key_with_default({}, 'key1')
+
+ @mock.patch('yardstick.common.utils.open', create=True)
+ def test_(self, mock_open):
+ mock_open.side_effect = IOError
+
+ with self.assertRaises(IOError):
+ utils.find_relative_file('my/path', 'task/path')
+
+ self.assertEqual(mock_open.call_count, 2)
+
+ @mock.patch('yardstick.common.utils.open', create=True)
+ def test_open_relative_path(self, mock_open):
+ mock_open_result = mock_open()
+ mock_open_call_count = 1 # initial call to get result
+
+ self.assertEqual(utils.open_relative_file('foo', 'bar'), mock_open_result)
+
+ mock_open_call_count += 1 # one more call expected
+ self.assertEqual(mock_open.call_count, mock_open_call_count)
+ self.assertIn('foo', mock_open.call_args_list[-1][0][0])
+ self.assertNotIn('bar', mock_open.call_args_list[-1][0][0])
+
+ def open_effect(*args, **kwargs):
+ if kwargs.get('name', args[0]) == os.path.join('bar', 'foo'):
+ return mock_open_result
+ raise IOError(errno.ENOENT, 'not found')
+
+ mock_open.side_effect = open_effect
+ self.assertEqual(utils.open_relative_file('foo', 'bar'), mock_open_result)
+
+ mock_open_call_count += 2 # two more calls expected
+ self.assertEqual(mock_open.call_count, mock_open_call_count)
+ self.assertIn('foo', mock_open.call_args_list[-1][0][0])
+ self.assertIn('bar', mock_open.call_args_list[-1][0][0])
+
+ # test an IOError of type ENOENT
+ mock_open.side_effect = IOError(errno.ENOENT, 'not found')
+ with self.assertRaises(IOError):
+ # the second call still raises
+ utils.open_relative_file('foo', 'bar')
+
+ mock_open_call_count += 2 # two more calls expected
+ self.assertEqual(mock_open.call_count, mock_open_call_count)
+ self.assertIn('foo', mock_open.call_args_list[-1][0][0])
+ self.assertIn('bar', mock_open.call_args_list[-1][0][0])
+
+ # test an IOError other than ENOENT
+ mock_open.side_effect = IOError(errno.EBUSY, 'busy')
+ with self.assertRaises(IOError):
+ utils.open_relative_file('foo', 'bar')
+
+ mock_open_call_count += 1 # one more call expected
+ self.assertEqual(mock_open.call_count, mock_open_call_count)
+
class TestMacAddressToHex(unittest.TestCase):
"""
socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
- assert sorted(socket_map.keys()) == [0]
- assert sorted(socket_map[0].keys()) == [2, 3, 4]
+ self.assertEqual(sorted(socket_map.keys()), [0])
+ self.assertEqual(sorted(socket_map[0].keys()), [2, 3, 4])
def test_single_socket_hyperthread(self):
cpuinfo = """\
"""
socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
- assert sorted(socket_map.keys()) == [0]
- assert sorted(socket_map[0].keys()) == [1, 2, 3]
- assert sorted(socket_map[0][1]) == [5]
- assert sorted(socket_map[0][2]) == [6]
- assert sorted(socket_map[0][3]) == [7]
+ self.assertEqual(sorted(socket_map.keys()), [0])
+ self.assertEqual(sorted(socket_map[0].keys()), [1, 2, 3])
+ self.assertEqual(sorted(socket_map[0][1]), [5])
+ self.assertEqual(sorted(socket_map[0][2]), [6])
+ self.assertEqual(sorted(socket_map[0][3]), [7])
def test_dual_socket_hyperthread(self):
cpuinfo = """\
"""
socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
- assert sorted(socket_map.keys()) == [0, 1]
- assert sorted(socket_map[0].keys()) == [0, 1, 2]
- assert sorted(socket_map[1].keys()) == [26, 27, 28]
- assert sorted(socket_map[0][0]) == [44]
- assert sorted(socket_map[0][1]) == [1]
- assert sorted(socket_map[0][2]) == [2]
- assert sorted(socket_map[1][26]) == [85]
- assert sorted(socket_map[1][27]) == [86]
- assert sorted(socket_map[1][28]) == [43, 87]
+ self.assertEqual(sorted(socket_map.keys()), [0, 1])
+ self.assertEqual(sorted(socket_map[0].keys()), [0, 1, 2])
+ self.assertEqual(sorted(socket_map[1].keys()), [26, 27, 28])
+ self.assertEqual(sorted(socket_map[0][0]), [44])
+ self.assertEqual(sorted(socket_map[0][1]), [1])
+ self.assertEqual(sorted(socket_map[0][2]), [2])
+ self.assertEqual(sorted(socket_map[1][26]), [85])
+ self.assertEqual(sorted(socket_map[1][27]), [86])
+ self.assertEqual(sorted(socket_map[1][28]), [43, 87])
def test_dual_socket_no_hyperthread(self):
cpuinfo = """\
"""
socket_map = utils.SocketTopology.parse_cpuinfo(cpuinfo)
processors = socket_map.processors()
- assert processors == [1, 2, 43, 44, 85, 86, 87]
+ self.assertEqual(processors, [1, 2, 43, 44, 85, 86, 87])
cores = socket_map.cores()
- assert cores == [0, 1, 2, 26, 27, 28]
+ self.assertEqual(cores, [0, 1, 2, 26, 27, 28])
sockets = socket_map.sockets()
- assert sockets == [0, 1]
+ self.assertEqual(sockets, [0, 1])
class ChangeObjToDictTestCase(unittest.TestCase):
os.environ.clear()
os.environ.update(base_env)
- @mock.patch('yardstick.common.utils.configparser.ConfigParser')
+ @mock.patch.object(configparser, 'ConfigParser')
def test_parse_ini_file(self, mock_config_parser_type):
defaults = {
'default1': 'value1',
result = utils.parse_ini_file('my_path')
self.assertDictEqual(result, expected)
- @mock.patch('yardstick.common.utils.configparser.ConfigParser')
- def test_parse_ini_file_missing_section_header(self, mock_config_parser_type):
+ @mock.patch.object(utils, 'logger')
+ @mock.patch.object(configparser, 'ConfigParser')
+ def test_parse_ini_file_missing_section_header(
+ self, mock_config_parser_type, *args):
mock_config_parser = mock_config_parser_type()
- mock_config_parser.read.side_effect = \
- configparser.MissingSectionHeaderError(mock.Mock(), 321, mock.Mock())
+ mock_config_parser.read.side_effect = (
+ configparser.MissingSectionHeaderError(mock.Mock(), 321,
+ mock.Mock()))
with self.assertRaises(configparser.MissingSectionHeaderError):
utils.parse_ini_file('my_path')
- @mock.patch('yardstick.common.utils.configparser.ConfigParser')
+ @mock.patch.object(configparser, 'ConfigParser')
def test_parse_ini_file_no_file(self, mock_config_parser_type):
mock_config_parser = mock_config_parser_type()
mock_config_parser.read.return_value = False
with self.assertRaises(RuntimeError):
utils.parse_ini_file('my_path')
- @mock.patch('yardstick.common.utils.configparser.ConfigParser')
+ @mock.patch.object(configparser, 'ConfigParser')
def test_parse_ini_file_no_default_section_header(self, mock_config_parser_type):
s1 = {
'key1': 'value11',
def test_error_class(self):
with self.assertRaises(RuntimeError):
- utils.ErrorClass()
+ yardstick.error.ErrorClass()
- error_instance = utils.ErrorClass(test='')
+ error_instance = yardstick.error.ErrorClass(test='')
with self.assertRaises(AttributeError):
error_instance.get_name()
self.assertEqual('this is a byte array', out)
-def main():
- unittest.main()
-
-if __name__ == '__main__':
- main()
+class ReadMeminfoTestCase(unittest.TestCase):
+
+ MEMINFO = (b'MemTotal: 65860500 kB\n'
+ b'MemFree: 28690900 kB\n'
+ b'MemAvailable: 52873764 kB\n'
+ b'Active(anon): 3015676 kB\n'
+ b'HugePages_Total: 8\n'
+ b'Hugepagesize: 1048576 kB')
+ MEMINFO_DICT = {'MemTotal': '65860500',
+ 'MemFree': '28690900',
+ 'MemAvailable': '52873764',
+ 'Active(anon)': '3015676',
+ 'HugePages_Total': '8',
+ 'Hugepagesize': '1048576'}
+
+ def test_read_meminfo(self):
+ ssh_client = ssh.SSH('user', 'host')
+ with mock.patch.object(ssh_client, 'get_file_obj') as \
+ mock_get_client, \
+ mock.patch.object(six, 'BytesIO',
+ return_value=six.BytesIO(self.MEMINFO)):
+ output = utils.read_meminfo(ssh_client)
+ mock_get_client.assert_called_once_with('/proc/meminfo', mock.ANY)
+ self.assertEqual(self.MEMINFO_DICT, output)
+
+
+class TimerTestCase(unittest.TestCase):
+
+ def test__getattr(self):
+ with utils.Timer() as timer:
+ time.sleep(1)
+ self.assertEqual(1, round(timer.total_seconds(), 0))
+ self.assertEqual(1, timer.delta.seconds)
+
+ def test__enter_with_timeout(self):
+ with utils.Timer(timeout=10) as timer:
+ time.sleep(1)
+ self.assertEqual(1, round(timer.total_seconds(), 0))
+
+ def test__enter_with_timeout_exception(self):
+ with self.assertRaises(exceptions.TimerTimeout):
+ with utils.Timer(timeout=1):
+ time.sleep(2)
+
+
+class WaitUntilTrueTestCase(unittest.TestCase):
+
+ def test_no_timeout(self):
+ self.assertIsNone(utils.wait_until_true(lambda: True,
+ timeout=1, sleep=1))
+
+ def test_timeout_generic_exception(self):
+ with self.assertRaises(exceptions.WaitTimeout):
+ self.assertIsNone(utils.wait_until_true(lambda: False,
+ timeout=1, sleep=1))
+
+ def test_timeout_given_exception(self):
+ class MyTimeoutException(exceptions.YardstickException):
+ message = 'My timeout exception'
+
+ with self.assertRaises(MyTimeoutException):
+ self.assertIsNone(
+ utils.wait_until_true(lambda: False, timeout=1, sleep=1,
+ exception=MyTimeoutException))