Merge "Add grafana config for TC071_Network Latency, Throughput, Packet Loss and...
[yardstick.git] / tests / unit / benchmark / scenarios / storage / test_storagecapacity.py
1 #!/usr/bin/env python
2
3 ##############################################################################
4 # Copyright (c) 2016 Huawei Technologies Co.,Ltd and others.
5 #
6 # All rights reserved. This program and the accompanying materials
7 # are made available under the terms of the Apache License, Version 2.0
8 # which accompanies this distribution, and is available at
9 # http://www.apache.org/licenses/LICENSE-2.0
10 ##############################################################################
11
12 # Unittest for yardstick.benchmark.scenarios.storage.storagecapacity.StorageCapacity
13
14 import mock
15 import unittest
16 import os
17 import json
18
19 from yardstick.benchmark.scenarios.storage import storagecapacity
20
21 DISK_SIZE_SAMPLE_OUTPUT = '{"Numberf of devides": "2", "Total disk size in bytes": "1024000000"}'
22 BLOCK_SIZE_SAMPLE_OUTPUT = '{"/dev/sda": 1024, "/dev/sdb": 4096}'
23 DISK_UTIL_RAW_OUTPUT = "vda 10.00\nvda 0.00"
24 DISK_UTIL_SAMPLE_OUTPUT = '{"vda": {"avg_util": 5.0, "max_util": 10.0, "min_util": 0.0}}'
25
26 @mock.patch('yardstick.benchmark.scenarios.storage.storagecapacity.ssh')
27 class StorageCapacityTestCase(unittest.TestCase):
28
29     def setUp(self):
30         self.scn = {
31                "options": {
32                    'test_type': 'disk_size'
33                }
34         }
35         self.ctx = {
36                 "host": {
37                     'ip': '172.16.0.137',
38                     'user': 'cirros',
39                     'password': "root"
40                 }
41         }
42         self.result = {}
43
44     def test_capacity_successful_setup(self, mock_ssh):
45         c = storagecapacity.StorageCapacity(self.scn, self.ctx)
46
47         mock_ssh.SSH().execute.return_value = (0, '', '')
48         c.setup()
49         self.assertIsNotNone(c.client)
50         self.assertTrue(c.setup_done)
51
52     def test_capacity_disk_size_successful(self, mock_ssh):
53         c = storagecapacity.StorageCapacity(self.scn, self.ctx)
54
55         mock_ssh.SSH().execute.return_value = (0, DISK_SIZE_SAMPLE_OUTPUT, '')
56         c.run(self.result)
57         expected_result = json.loads(DISK_SIZE_SAMPLE_OUTPUT)
58         self.assertEqual(self.result, expected_result)
59
60     def test_capacity_block_size_successful(self, mock_ssh):
61         args = {
62             "options": {
63                 'test_type': 'block_size'
64             }
65         }
66         c = storagecapacity.StorageCapacity(args, self.ctx)
67
68         mock_ssh.SSH().execute.return_value = (0, BLOCK_SIZE_SAMPLE_OUTPUT, '')
69         c.run(self.result)
70         expected_result = json.loads(BLOCK_SIZE_SAMPLE_OUTPUT)
71         self.assertEqual(self.result, expected_result)
72
73     def test_capacity_disk_utilization_successful(self, mock_ssh):
74         args = {
75             "options": {
76                 'test_type': 'disk_utilization',
77                 'interval': 1,
78                 'count': 2
79             }
80         }
81         c = storagecapacity.StorageCapacity(args, self.ctx)
82
83         mock_ssh.SSH().execute.return_value = (0, DISK_UTIL_RAW_OUTPUT, '')
84         c.run(self.result)
85         expected_result = json.loads(DISK_UTIL_SAMPLE_OUTPUT)
86         self.assertEqual(self.result, expected_result)
87
88     def test_capacity_unsuccessful_script_error(self, mock_ssh):
89         c = storagecapacity.StorageCapacity(self.scn, self.ctx)
90
91         mock_ssh.SSH().execute.return_value = (1, '', 'FOOBAR')
92         self.assertRaises(RuntimeError, c.run, self.result)
93
94 def main():
95     unittest.main()
96
97 if __name__ == '__main__':
98     main()