Merge "IXIA traffic generator"
[yardstick.git] / tests / unit / network_services / traffic_profile / test_http_ixload.py
1 # Copyright (c) 2017 Intel Corporation
2 #
3 # Licensed under the Apache License, Version 2.0 (the "License");
4 # you may not use this file except in compliance with the License.
5 # You may obtain a copy of the License at
6 #
7 #      http://www.apache.org/licenses/LICENSE-2.0
8 #
9 # Unless required by applicable law or agreed to in writing, software
10 # distributed under the License is distributed on an "AS IS" BASIS,
11 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12 # See the License for the specific language governing permissions and
13 # limitations under the License.
14
15
16 from __future__ import absolute_import
17 import unittest
18 import mock
19 import runpy
20
21 from oslo_serialization import jsonutils
22
23 from yardstick.network_services.traffic_profile import http_ixload
24
25
26 class TestIxLoadTrafficGen(unittest.TestCase):
27
28     def test_parse_run_test(self):
29         ports = [1, 2, 3]
30         test_input = {
31             "remote_server": "REMOTE_SERVER",
32             "ixload_cfg": "IXLOAD_CFG",
33             "result_dir": "RESULT_DIR",
34             "ixia_chassis": "IXIA_CHASSIS",
35             "IXIA": {
36                 "card": "CARD",
37                 "ports": ports,
38             },
39         }
40         j = jsonutils.dump_as_bytes(test_input)
41         ixload = http_ixload.IXLOADHttpTest(j)
42         self.assertDictEqual(ixload.test_input, test_input)
43         self.assertIsNone(ixload.parse_run_test())
44         self.assertEqual(ixload.ports_to_reassign, [
45             ["IXIA_CHASSIS", "CARD", 1],
46             ["IXIA_CHASSIS", "CARD", 2],
47             ["IXIA_CHASSIS", "CARD", 3],
48         ])
49
50     def test_format_ports_for_reassignment(self):
51         ports = [
52             ["IXIA_CHASSIS", "CARD", 1],
53             ["IXIA_CHASSIS", "CARD", 2],
54             ["IXIA_CHASSIS", "CARD", 3],
55         ]
56         formatted = http_ixload.IXLOADHttpTest.format_ports_for_reassignment(ports)
57         self.assertEqual(formatted, [
58             "IXIA_CHASSIS;CARD;1",
59             "IXIA_CHASSIS;CARD;2",
60             "IXIA_CHASSIS;CARD;3",
61         ])
62
63     def test_reassign_ports(self):
64         ports = [1, 2, 3]
65         test_input = {
66             "remote_server": "REMOTE_SERVER",
67             "ixload_cfg": "IXLOAD_CFG",
68             "result_dir": "RESULT_DIR",
69             "ixia_chassis": "IXIA_CHASSIS",
70             "IXIA": {
71                 "card": "CARD",
72                 "ports": ports,
73             },
74         }
75         j = jsonutils.dump_as_bytes(test_input)
76         ixload = http_ixload.IXLOADHttpTest(j)
77         repository = mock.Mock()
78         test = mock.MagicMock()
79         test.setPorts = mock.Mock()
80         ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
81         ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
82         self.assertIsNone(ixload.reassign_ports(test, repository, ports_to_reassign))
83
84     def test_reassign_ports_error(self):
85         ports = [1, 2, 3]
86         test_input = {
87             "remote_server": "REMOTE_SERVER",
88             "ixload_cfg": "IXLOAD_CFG",
89             "result_dir": "RESULT_DIR",
90             "ixia_chassis": "IXIA_CHASSIS",
91             "IXIA": {
92                 "card": "CARD",
93                 "ports": ports,
94             },
95         }
96         j = jsonutils.dump_as_bytes(test_input)
97         ixload = http_ixload.IXLOADHttpTest(j)
98         repository = mock.Mock()
99         test = "test"
100         ports_to_reassign = [(1, 2, 3), (1, 2, 4)]
101         ixload.format_ports_for_reassignment = mock.Mock(return_value=["1;2;3"])
102         ixload.ix_load = mock.MagicMock()
103         ixload.ix_load.delete = mock.Mock()
104         ixload.ix_load.disconnect = mock.Mock()
105         with self.assertRaises(Exception):
106             ixload.reassign_ports(test, repository, ports_to_reassign)
107
108     def test_stat_collector(self):
109         args = [0, 1]
110         self.assertIsNone(
111             http_ixload.IXLOADHttpTest.stat_collector(*args))
112
113     def test_IxL_StatCollectorCommand(self):
114         args = [[0, 1, 2, 3], [0, 1, 2, 3]]
115         self.assertIsNone(
116             http_ixload.IXLOADHttpTest.IxL_StatCollectorCommand(*args))
117
118     def test_set_results_dir(self):
119         test_stat_collector = mock.MagicMock()
120         test_stat_collector.setResultDir = mock.Mock()
121         results_on_windows = "c:/Results"
122         self.assertIsNone(
123             http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector,
124                                                        results_on_windows))
125
126     def test_set_results_dir_error(self):
127         test_stat_collector = ""
128         results_on_windows = "c:/Results"
129         with self.assertRaises(Exception):
130             http_ixload.IXLOADHttpTest.set_results_dir(test_stat_collector, results_on_windows)
131
132     def test_load_config_file(self):
133         ports = [1, 2, 3]
134         test_input = {
135             "remote_server": "REMOTE_SERVER",
136             "ixload_cfg": "IXLOAD_CFG",
137             "result_dir": "RESULT_DIR",
138             "ixia_chassis": "IXIA_CHASSIS",
139             "IXIA": {
140                 "card": "CARD",
141                 "ports": ports,
142             },
143         }
144         j = jsonutils.dump_as_bytes(test_input)
145         ixload = http_ixload.IXLOADHttpTest(j)
146         ixload.ix_load = mock.MagicMock()
147         ixload.ix_load.new = mock.Mock(return_value="")
148         self.assertIsNotNone(ixload.load_config_file("ixload.cfg"))
149
150     def test_load_config_file_error(self):
151         ports = [1, 2, 3]
152         test_input = {
153             "remote_server": "REMOTE_SERVER",
154             "ixload_cfg": "IXLOAD_CFG",
155             "result_dir": "RESULT_DIR",
156             "ixia_chassis": "IXIA_CHASSIS",
157             "IXIA": {
158                 "card": "CARD",
159                 "ports": ports,
160             },
161         }
162         j = jsonutils.dump_as_bytes(test_input)
163         ixload = http_ixload.IXLOADHttpTest(j)
164         ixload.ix_load = "test"
165         with self.assertRaises(Exception):
166             ixload.load_config_file("ixload.cfg")
167
168     @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
169     @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
170     def test_start_http_test_connect_error(self, mock_collector_type, mock_ixload_type):
171         ports = [1, 2, 3]
172         test_input = {
173             "remote_server": "REMOTE_SERVER",
174             "ixload_cfg": "IXLOAD_CFG",
175             "result_dir": "RESULT_DIR",
176             "ixia_chassis": "IXIA_CHASSIS",
177             "IXIA": {
178                 "card": "CARD",
179                 "ports": ports,
180             },
181         }
182
183         j = jsonutils.dump_as_bytes(test_input)
184
185         mock_ixload = mock_ixload_type()
186         mock_ixload.connect.side_effect = RuntimeError
187
188         ixload = http_ixload.IXLOADHttpTest(j)
189         ixload.results_on_windows = 'windows_result_dir'
190         ixload.result_dir = 'my_result_dir'
191
192         with self.assertRaises(RuntimeError):
193             ixload.start_http_test()
194
195     @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
196     @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
197     def test_start_http_test(self, mock_collector_type, mock_ixload_type):
198         ports = [1, 2, 3]
199         test_input = {
200             "remote_server": "REMOTE_SERVER",
201             "ixload_cfg": "IXLOAD_CFG",
202             "result_dir": "RESULT_DIR",
203             "ixia_chassis": "IXIA_CHASSIS",
204             "IXIA": {
205                 "card": "CARD",
206                 "ports": ports,
207             },
208         }
209
210         j = jsonutils.dump_as_bytes(test_input)
211
212         ixload = http_ixload.IXLOADHttpTest(j)
213         ixload.results_on_windows = 'windows_result_dir'
214         ixload.result_dir = 'my_result_dir'
215         ixload.load_config_file = mock.MagicMock()
216
217         self.assertIsNone(ixload.start_http_test())
218
219     @mock.patch('yardstick.network_services.traffic_profile.http_ixload.IxLoad')
220     @mock.patch('yardstick.network_services.traffic_profile.http_ixload.StatCollectorUtils')
221     def test_start_http_test_reassign_error(self, mock_collector_type, mock_ixload_type):
222         ports = [1, 2, 3]
223         test_input = {
224             "remote_server": "REMOTE_SERVER",
225             "ixload_cfg": "IXLOAD_CFG",
226             "result_dir": "RESULT_DIR",
227             "ixia_chassis": "IXIA_CHASSIS",
228             "IXIA": {
229                 "card": "CARD",
230                 "ports": ports,
231             },
232         }
233
234         j = jsonutils.dump_as_bytes(test_input)
235
236         ixload = http_ixload.IXLOADHttpTest(j)
237         ixload.load_config_file = mock.MagicMock()
238
239         reassign_ports = mock.Mock(side_effect=RuntimeError)
240         ixload.reassign_ports = reassign_ports
241         ixload.results_on_windows = 'windows_result_dir'
242         ixload.result_dir = 'my_result_dir'
243
244         ixload.start_http_test()
245         self.assertEqual(reassign_ports.call_count, 1)
246
247     @mock.patch("yardstick.network_services.traffic_profile.http_ixload.IXLOADHttpTest")
248     def test_main(self, IXLOADHttpTest):
249         args = ["1", "2", "3"]
250         http_ixload.main(args)